golang coroutines close go goroutine

coroutines - Go: Siempre tenga x número de goroutines corriendo en cualquier momento



goroutine pool (6)

  1. Crear canal para pasar datos a los goroutines.
  2. Inicia 20 goroutines que procesan los datos del canal en un bucle.
  3. Envíe los datos al canal en lugar de comenzar una nueva goroutina.

Veo muchos tutoriales y ejemplos sobre cómo hacer que Go espere a que termine x número de goroutines, pero lo que estoy tratando de hacer es asegurar que siempre haya x número en ejecución, por lo que se lanzará un nuevo goroutine tan pronto como termine uno. .

Específicamente tengo unos cientos de miles de "cosas que hacer" que están procesando algunas cosas que están saliendo de MySQL. Así funciona así:

db, err := sql.Open("mysql", connection_string) checkErr(err) defer db.Close() rows,err := db.Query(`SELECT id FROM table`) checkErr(err) defer rows.Close() var id uint for rows.Next() { err := rows.Scan(&id) checkErr(err) go processTheThing(id) } checkErr(err) rows.Close()

Actualmente eso lanzará varios cientos de miles de hilos de processTheThing() . Lo que necesito es que se lance un máximo de x números (lo llamaremos 20) goroutines. Así que comienza con el lanzamiento de 20 para las primeras 20 filas, y de ahí en adelante lanzará una nueva goroutina para la siguiente identificación en el momento en que una de las goroutines actuales haya terminado. Así que en cualquier momento siempre hay 20 corriendo.

Estoy seguro de que esto es bastante simple / estándar, pero parece que no puedo encontrar una buena explicación en ninguno de los tutoriales o ejemplos o cómo se hace esto.


Aquí creo que algo simple como esto funcionará:

package main import "fmt" const MAX = 20 func main() { sem := make(int, MAX) for { sem <- 1 // will block if there is MAX ints in sem go func() { fmt.Println("hello again, world") <-sem // removes an int from sem, allowing another to proceed }() } }


Este es un problema simple de producer-consumer , que en Go puede resolverse fácilmente utilizando canales para amortiguar los paquetes.

Para ponerlo simple: cree un canal que acepte sus ID. Ejecute una serie de rutinas que leerán del canal en un bucle y luego procesarán la ID. A continuación, ejecute su bucle que alimentará las ID al canal.

Ejemplo:

func producer() { var buffer = make(chan uint) for i := 0; i < 20; i++ { go consumer(buffer) } for _, id := range IDs { buffer <- id } } func consumer(buffer chan uint) { for { id := <- buffer // Do your things here } }

Cosas que saber:

  • Los canales no almacenados están bloqueando: si el elemento escrito en el canal no se acepta, la rutina que alimenta el elemento se bloqueará hasta que esté
  • Mi ejemplo carece de un mecanismo de cierre: debe encontrar una manera de hacer que el productor espere a que todos los consumidores terminen su ciclo antes de regresar. La forma más sencilla de hacerlo es con otro canal. Te dejo que lo pienses.

Gracias a todos por ayudarme con esto. Sin embargo, no creo que nadie realmente proporcionara algo que funcionara y que fuera simple / comprensible, aunque todos ustedes me ayudaron a entender la técnica.

Lo que he hecho al final es que creo que es mucho más comprensible y práctico como respuesta a mi pregunta específica, así que lo publicaré aquí en caso de que alguien más tenga la misma pregunta.

De alguna manera esto terminó pareciéndose mucho a lo que publicó OneOfOne, lo cual es genial porque ahora lo entiendo. Pero el código de OneOfOne que encontré muy difícil de entender al principio debido a que las funciones de pasar a funciones hacían que fuera bastante confuso entender qué bit era para qué. Creo que esta manera tiene mucho más sentido:

package main import ( "fmt" "sync" ) const xthreads = 5 // Total number of threads to use, excluding the main() thread func doSomething(a int) { fmt.Println("My job is",a) return } func main() { var ch = make(chan int, 50) // This number 50 can be anything as long as it''s larger than xthreads var wg sync.WaitGroup // This starts xthreads number of goroutines that wait for something to do wg.Add(xthreads) for i:=0; i<xthreads; i++ { go func() { for { a, ok := <-ch if !ok { // if there is nothing to do and the channel has been closed then end the goroutine wg.Done() return } doSomething(a) // do the thing } }() } // Now the jobs can be added to the channel, which is used as a queue for i:=0; i<50; i++ { ch <- i // add i to the queue } close(ch) // This tells the goroutines there''s nothing else to do wg.Wait() // Wait for the threads to finish }


La answer Grzegorz isur es la forma más eficiente de hacerlo, pero para un recién llegado podría ser difícil de implementar sin leer el código, así que aquí hay una implementación muy simple:

type idProcessor func(id uint) func SpawnStuff(limit uint, proc idProcessor) chan<- uint { ch := make(chan uint) for i := uint(0); i < limit; i++ { go func() { for { id, ok := <-ch if !ok { return } proc(id) } }() } return ch } func main() { runtime.GOMAXPROCS(4) var wg sync.WaitGroup //this is just for the demo, otherwise main will return fn := func(id uint) { fmt.Println(id) wg.Done() } wg.Add(1000) ch := SpawnStuff(10, fn) for i := uint(0); i < 1000; i++ { ch <- i } close(ch) //should do this to make all the goroutines exit gracefully wg.Wait() }

playground


Puede que encuentre interesante el artículo sobre patrones de concurrencia de Go , especialmente en la sección de paralelismo acotado , que explica el patrón exacto que necesita.

Puede usar el canal de estructuras vacías como protección limitadora para controlar el número de goroutines de trabajadores concurrentes :

package main import "fmt" func main() { maxGoroutines := 10 guard := make(chan struct{}, maxGoroutines) for i := 0; i < 30; i++ { guard <- struct{}{} // would block if guard channel is already filled go func(n int) { worker(n) <-guard }(i) } } func worker(i int) { fmt.Println("doing work on", i) }