tiene son sirven saber que procesador para nucleos los hilos cuantos como multithreading go concurrency goroutine

multithreading - son - para que sirven los nucleos de un procesador



¿Es esto un grupo de hilos de trabajo idiomático en Go? (2)

Puede implementar un semáforo de conteo para limitar la concurrencia de goroutine.

var tokens = make(chan struct{}, 20) func worker(id string, work string, o chan string, wg *sync.WaitGroup) { defer wg.Done() tokens <- struct{}{} // acquire a token before performing work sleepMs := rand.Intn(1000) fmt.Printf("worker ''%s'' received: ''%s'', sleep %dms/n", id, work, sleepMs) time.Sleep(time.Duration(sleepMs) * time.Millisecond) <-tokens // release the token o <- work + fmt.Sprintf("-%dms", sleepMs) }

Este es el diseño general utilizado para limitar el número de trabajadores. Por supuesto, puede cambiar la ubicación de liberación / adquisición de tokens para adaptarse a su código.

Estoy intentando escribir un grupo de trabajadores simple con goroutines.

  • Es el código que escribí idiomático? Si no, ¿qué debería cambiar?
  • Quiero poder establecer el número máximo de hilos de trabajo en 5 y bloquear hasta que un trabajador esté disponible si los 5 están ocupados. ¿Cómo ampliaría esto para tener solo un grupo de 5 trabajadores como máximo? ¿Engendro las 5 rutinas estáticas, y le doy a cada work_channel ?

código:

package main import ( "fmt" "math/rand" "sync" "time" ) func worker(id string, work string, o chan string, wg *sync.WaitGroup) { defer wg.Done() sleepMs := rand.Intn(1000) fmt.Printf("worker ''%s'' received: ''%s'', sleep %dms/n", id, work, sleepMs) time.Sleep(time.Duration(sleepMs) * time.Millisecond) o <- work + fmt.Sprintf("-%dms", sleepMs) } func main() { var work_channel = make(chan string) var results_channel = make(chan string) // create goroutine per item in work_channel go func() { var c = 0 var wg sync.WaitGroup for work := range work_channel { wg.Add(1) go worker(fmt.Sprintf("%d", c), work, results_channel, &wg) c++ } wg.Wait() fmt.Println("closing results channel") close(results_channel) }() // add work to the work_channel go func() { for c := ''a''; c < ''z''; c++ { work_channel <- fmt.Sprintf("%c", c) } close(work_channel) fmt.Println("sent work to work_channel") }() for x := range results_channel { fmt.Printf("result: %s/n", x) } }


Su solución no es un conjunto de herramientas de trabajo en ningún sentido: su código no limita las rutinas concurrentes, y no "reutiliza" las rutinas (siempre comienza una nueva cuando se recibe un nuevo trabajo).

Patrón productor-consumidor

Cuando publiqué en Bruteforce MD5 Password cracker , puede hacer uso del patrón productor-consumidor . Podrías tener un goroutine de productor designado que genere los trabajos (cosas que hacer / calcular) y enviarlos a un canal de trabajos . Podría tener un conjunto fijo de goroutinas de consumo (por ejemplo, 5 de ellas) que recorrerían el canal en el que se entregan los trabajos, y cada uno ejecutaría / completaría los trabajos recibidos.

El administrador de producción podría simplemente cerrar el canal de jobs cuando se generaron y enviaron todos los trabajos, indicando adecuadamente a los consumidores que no se generarán más empleos. La construcción de for ... range en un canal maneja el evento "cerrar" y finaliza correctamente. Tenga en cuenta que todos los trabajos enviados antes de cerrar el canal aún se entregarán.

Esto daría como resultado un diseño limpio, daría como resultado un número fijo (pero arbitrario) de goroutines, y siempre utilizaría 100% de CPU (si el número de goroutines es mayor que el número de núcleos de CPU). También tiene la ventaja de que puede "acelerarse" con la selección adecuada de la capacidad del canal (canal en búfer) y la cantidad de goroutinas de consumo .

Tenga en cuenta que este modelo para tener un productor designado goroutine no es obligatorio. También podría tener múltiples goroutines para producir trabajos, pero también debe sincronizarlos para cerrar el canal de jobs cuando todas las goroutinas de productores terminen de producir trabajos; de lo contrario, intentar enviar otro trabajo al canal de jobs cuando ya se ha cerrado genera un pánico de tiempo de ejecución. Por lo general, la producción de trabajos es barata y se puede producir a un ritmo mucho más rápido de lo que se puede ejecutar, por lo que este modelo para producirlos en 1 rutina mientras muchos los consumen / ejecutan es bueno en la práctica.

Manejo de resultados:

Si los trabajos tienen resultados, puede optar por tener un canal de resultados designado en el que los resultados puedan ser entregados ("enviados de regreso"), o puede optar por manejar los resultados en el consumidor cuando el trabajo se completa / finaliza. Este último incluso puede implementarse teniendo una función de "devolución de llamada" que maneja los resultados. Lo importante es si los resultados se pueden procesar de forma independiente o si deben fusionarse (por ejemplo, map-reduce framework) o agregados.

Si va con un canal de results , también necesita una rutina que reciba los valores de la misma, evitando que los consumidores se bloqueen (podría ocurrir si se llena el buffer de results ).

Con canal de results

En lugar de enviar valores de string simples como trabajos y resultados, crearía un tipo de contenedor que puede contener cualquier información adicional y, por lo tanto, es mucho más flexible:

type Job struct { Id int Work string Result string }

Tenga en cuenta que la estructura de Job también envuelve el resultado, por lo que cuando enviamos el resultado, también contiene el Job original como contexto, a menudo muy útil . También tenga en cuenta que es rentable enviar punteros ( *Job ) en los canales en lugar de valores de Job , por lo que no es necesario hacer "innumerables" copias de los Job , y también el tamaño del valor de la estructura del Job vuelve irrelevante.

Así es como se vería este productor-consumidor:

Utilizaría 2 valores sync.WaitGroup , su función será la siguiente:

var wg, wg2 sync.WaitGroup

El productor es responsable de generar trabajos para ser ejecutados:

func produce(jobs chan<- *Job) { defer wg.Done() // Generate jobs: id := 0 for c := ''a''; c <= ''z''; c++ { id++ jobs <- &Job{Id: id, Work: fmt.Sprintf("%c", c)} } close(jobs) }

Cuando finaliza (no hay más trabajos), se cierra el canal de jobs que indica a los consumidores que no van a llegar más trabajos.

Tenga en cuenta que produce() ve el canal de jobs solo como enviado , porque eso es lo que el productor necesita hacer solo con eso: enviar trabajos sobre él (además de cerrarlo , pero eso también está permitido en un canal de solo envío ). Una recepción accidental en el productor sería un error de tiempo de compilación (detectado temprano, en tiempo de compilación).

La responsabilidad del consumidor es recibir trabajos siempre que se puedan recibir trabajos y ejecutarlos:

func consume(id int, jobs <-chan *Job, results chan<- *Job) { defer wg.Done() for job := range jobs { sleepMs := rand.Intn(1000) fmt.Printf("worker #%d received: ''%s'', sleep %dms/n", id, job.Work, sleepMs) time.Sleep(time.Duration(sleepMs) * time.Millisecond) job.Result = job.Work + fmt.Sprintf("-%dms", sleepMs) results <- job } }

Tenga en cuenta que consume() ve el canal de jobs como solo recepción ; el consumidor solo necesita recibir de él. Del mismo modo, el canal de results se envía solo para el consumidor.

También tenga en cuenta que el canal de results no se puede cerrar aquí, ya que hay varios goroutines de consumo, y solo el primer intento de cerrarlo tendría éxito y otros provocarían pánico en el tiempo de ejecución. results canal de results puede (debe) cerrarse después de que terminaron todas las rutinas de consumo, porque entonces podemos estar seguros de que no se enviarán más valores (resultados) en el canal de results .

Tenemos resultados que deben analizarse:

func analyze(results <-chan *Job) { defer wg2.Done() for job := range results { fmt.Printf("result: %s/n", job.Result) } }

Como puede ver, esto también recibe resultados siempre que puedan aparecer (hasta que se cierre el canal de results ). El canal de results para el analizador es solo de recepción .

Tenga en cuenta el uso de tipos de canales: siempre que sea suficiente, use solo un tipo de canal unidireccional para detectar y prevenir errores de manera anticipada, en tiempo de compilación. Solo use el tipo de canal bidireccional si necesita ambas direcciones.

Y así es como todos estos están pegados:

func main() { jobs := make(chan *Job, 100) // Buffered channel results := make(chan *Job, 100) // Buffered channel // Start consumers: for i := 0; i < 5; i++ { // 5 consumers wg.Add(1) go consume(i, jobs, results) } // Start producing go produce(jobs) // Start analyzing: wg2.Add(1) go analyze(results) wg.Wait() // Wait all consumers to finish processing jobs // All jobs are processed, no more values will be sent on results: close(results) wg2.Wait() // Wait analyzer to analyze all results }

Ejemplo de salida:

Aquí hay un ejemplo de salida:

Como puede ver, los resultados vienen y se analizan antes de que todos los trabajos se pongan en cola:

worker #1 received: ''e'', sleep 81ms worker #2 received: ''b'', sleep 887ms worker #3 received: ''c'', sleep 847ms worker #4 received: ''d'', sleep 59ms worker #0 received: ''a'', sleep 81ms worker #4 received: ''f'', sleep 318ms result: d-59ms worker #0 received: ''h'', sleep 540ms worker #1 received: ''g'', sleep 425ms result: e-81ms result: a-81ms worker #4 received: ''i'', sleep 456ms result: f-318ms worker #1 received: ''j'', sleep 300ms result: g-425ms worker #0 received: ''k'', sleep 694ms result: h-540ms worker #1 received: ''l'', sleep 511ms result: j-300ms worker #4 received: ''m'', sleep 162ms result: i-456ms worker #3 received: ''n'', sleep 89ms result: c-847ms worker #2 received: ''o'', sleep 728ms result: b-887ms worker #3 received: ''p'', sleep 274ms result: n-89ms worker #4 received: ''q'', sleep 211ms result: m-162ms worker #4 received: ''r'', sleep 445ms result: q-211ms result: p-274ms worker #3 received: ''s'', sleep 237ms worker #0 received: ''t'', sleep 106ms result: k-694ms worker #1 received: ''u'', sleep 495ms result: l-511ms result: t-106ms worker #0 received: ''v'', sleep 466ms worker #3 received: ''w'', sleep 528ms result: s-237ms worker #2 received: ''x'', sleep 258ms result: o-728ms result: r-445ms worker #4 received: ''y'', sleep 47ms worker #4 received: ''z'', sleep 947ms result: y-47ms result: u-495ms result: x-258ms result: v-466ms result: w-528ms

Pruebe la aplicación completa en el patio de juegos Go .

Sin un canal de results

El código se simplifica significativamente si no usamos un canal de results , pero los goroutines del consumidor manejan el resultado de inmediato (imprímalo en nuestro caso). En este caso, no necesitamos 2 valores de sync.WaitGroup (el segundo solo fue necesario para esperar a que se complete el analizador).

Sin un canal de results , la solución completa es la siguiente:

var wg sync.WaitGroup type Job struct { Id int Work string } func produce(jobs chan<- *Job) { defer wg.Done() // Generate jobs: id := 0 for c := ''a''; c <= ''z''; c++ { id++ jobs <- &Job{Id: id, Work: fmt.Sprintf("%c", c)} } close(jobs) } func consume(id int, jobs <-chan *Job) { defer wg.Done() for job := range jobs { sleepMs := rand.Intn(1000) fmt.Printf("worker #%d received: ''%s'', sleep %dms/n", id, job.Work, sleepMs) time.Sleep(time.Duration(sleepMs) * time.Millisecond) fmt.Printf("result: %s/n", job.Work+fmt.Sprintf("-%dms", sleepMs)) } } func main() { jobs := make(chan *Job, 100) // Buffered channel // Start consumers: for i := 0; i < 5; i++ { // 5 consumers wg.Add(1) go consume(i, jobs) } // Start producing go produce(jobs) wg.Wait() // Wait all consumers to finish processing jobs }

La salida es "similar" a la del canal de results (pero, por supuesto, el orden de ejecución / finalización es aleatorio).

Pruebe esta variante en el patio de juegos Go .