multithreading - son - para que sirven los nucleos de un procesador
¿Es esto un grupo de hilos de trabajo idiomático en Go? (2)
Puede implementar un semáforo de conteo para limitar la concurrencia de goroutine.
var tokens = make(chan struct{}, 20)
func worker(id string, work string, o chan string, wg *sync.WaitGroup) {
defer wg.Done()
tokens <- struct{}{} // acquire a token before performing work
sleepMs := rand.Intn(1000)
fmt.Printf("worker ''%s'' received: ''%s'', sleep %dms/n", id, work, sleepMs)
time.Sleep(time.Duration(sleepMs) * time.Millisecond)
<-tokens // release the token
o <- work + fmt.Sprintf("-%dms", sleepMs)
}
Este es el diseño general utilizado para limitar el número de trabajadores. Por supuesto, puede cambiar la ubicación de liberación / adquisición de tokens para adaptarse a su código.
Estoy intentando escribir un grupo de trabajadores simple con goroutines.
- Es el código que escribí idiomático? Si no, ¿qué debería cambiar?
- Quiero poder establecer el número máximo de hilos de trabajo en 5 y bloquear hasta que un trabajador esté disponible si los 5 están ocupados. ¿Cómo ampliaría esto para tener solo un grupo de 5 trabajadores como máximo? ¿Engendro las 5 rutinas estáticas, y le doy a cada
work_channel
?
código:
package main
import (
"fmt"
"math/rand"
"sync"
"time"
)
func worker(id string, work string, o chan string, wg *sync.WaitGroup) {
defer wg.Done()
sleepMs := rand.Intn(1000)
fmt.Printf("worker ''%s'' received: ''%s'', sleep %dms/n", id, work, sleepMs)
time.Sleep(time.Duration(sleepMs) * time.Millisecond)
o <- work + fmt.Sprintf("-%dms", sleepMs)
}
func main() {
var work_channel = make(chan string)
var results_channel = make(chan string)
// create goroutine per item in work_channel
go func() {
var c = 0
var wg sync.WaitGroup
for work := range work_channel {
wg.Add(1)
go worker(fmt.Sprintf("%d", c), work, results_channel, &wg)
c++
}
wg.Wait()
fmt.Println("closing results channel")
close(results_channel)
}()
// add work to the work_channel
go func() {
for c := ''a''; c < ''z''; c++ {
work_channel <- fmt.Sprintf("%c", c)
}
close(work_channel)
fmt.Println("sent work to work_channel")
}()
for x := range results_channel {
fmt.Printf("result: %s/n", x)
}
}
Su solución no es un conjunto de herramientas de trabajo en ningún sentido: su código no limita las rutinas concurrentes, y no "reutiliza" las rutinas (siempre comienza una nueva cuando se recibe un nuevo trabajo).
Patrón productor-consumidor
Cuando publiqué en Bruteforce MD5 Password cracker , puede hacer uso del patrón productor-consumidor . Podrías tener un goroutine de productor designado que genere los trabajos (cosas que hacer / calcular) y enviarlos a un canal de trabajos . Podría tener un conjunto fijo de goroutinas de consumo (por ejemplo, 5 de ellas) que recorrerían el canal en el que se entregan los trabajos, y cada uno ejecutaría / completaría los trabajos recibidos.
El administrador de producción podría simplemente cerrar el canal de jobs
cuando se generaron y enviaron todos los trabajos, indicando adecuadamente a los consumidores que no se generarán más empleos. La construcción de for ... range
en un canal maneja el evento "cerrar" y finaliza correctamente. Tenga en cuenta que todos los trabajos enviados antes de cerrar el canal aún se entregarán.
Esto daría como resultado un diseño limpio, daría como resultado un número fijo (pero arbitrario) de goroutines, y siempre utilizaría 100% de CPU (si el número de goroutines es mayor que el número de núcleos de CPU). También tiene la ventaja de que puede "acelerarse" con la selección adecuada de la capacidad del canal (canal en búfer) y la cantidad de goroutinas de consumo .
Tenga en cuenta que este modelo para tener un productor designado goroutine no es obligatorio. También podría tener múltiples goroutines para producir trabajos, pero también debe sincronizarlos para cerrar el canal de jobs
cuando todas las goroutinas de productores terminen de producir trabajos; de lo contrario, intentar enviar otro trabajo al canal de jobs
cuando ya se ha cerrado genera un pánico de tiempo de ejecución. Por lo general, la producción de trabajos es barata y se puede producir a un ritmo mucho más rápido de lo que se puede ejecutar, por lo que este modelo para producirlos en 1 rutina mientras muchos los consumen / ejecutan es bueno en la práctica.
Manejo de resultados:
Si los trabajos tienen resultados, puede optar por tener un canal de resultados designado en el que los resultados puedan ser entregados ("enviados de regreso"), o puede optar por manejar los resultados en el consumidor cuando el trabajo se completa / finaliza. Este último incluso puede implementarse teniendo una función de "devolución de llamada" que maneja los resultados. Lo importante es si los resultados se pueden procesar de forma independiente o si deben fusionarse (por ejemplo, map-reduce framework) o agregados.
Si va con un canal de results
, también necesita una rutina que reciba los valores de la misma, evitando que los consumidores se bloqueen (podría ocurrir si se llena el buffer de results
).
Con canal de results
En lugar de enviar valores de string
simples como trabajos y resultados, crearía un tipo de contenedor que puede contener cualquier información adicional y, por lo tanto, es mucho más flexible:
type Job struct {
Id int
Work string
Result string
}
Tenga en cuenta que la estructura de Job
también envuelve el resultado, por lo que cuando enviamos el resultado, también contiene el Job
original como contexto, a menudo muy útil . También tenga en cuenta que es rentable enviar punteros ( *Job
) en los canales en lugar de valores de Job
, por lo que no es necesario hacer "innumerables" copias de los Job
, y también el tamaño del valor de la estructura del Job
vuelve irrelevante.
Así es como se vería este productor-consumidor:
Utilizaría 2 valores sync.WaitGroup
, su función será la siguiente:
var wg, wg2 sync.WaitGroup
El productor es responsable de generar trabajos para ser ejecutados:
func produce(jobs chan<- *Job) {
defer wg.Done()
// Generate jobs:
id := 0
for c := ''a''; c <= ''z''; c++ {
id++
jobs <- &Job{Id: id, Work: fmt.Sprintf("%c", c)}
}
close(jobs)
}
Cuando finaliza (no hay más trabajos), se cierra el canal de jobs
que indica a los consumidores que no van a llegar más trabajos.
Tenga en cuenta que produce()
ve el canal de jobs
solo como enviado , porque eso es lo que el productor necesita hacer solo con eso: enviar trabajos sobre él (además de cerrarlo , pero eso también está permitido en un canal de solo envío ). Una recepción accidental en el productor sería un error de tiempo de compilación (detectado temprano, en tiempo de compilación).
La responsabilidad del consumidor es recibir trabajos siempre que se puedan recibir trabajos y ejecutarlos:
func consume(id int, jobs <-chan *Job, results chan<- *Job) {
defer wg.Done()
for job := range jobs {
sleepMs := rand.Intn(1000)
fmt.Printf("worker #%d received: ''%s'', sleep %dms/n", id, job.Work, sleepMs)
time.Sleep(time.Duration(sleepMs) * time.Millisecond)
job.Result = job.Work + fmt.Sprintf("-%dms", sleepMs)
results <- job
}
}
Tenga en cuenta que consume()
ve el canal de jobs
como solo recepción ; el consumidor solo necesita recibir de él. Del mismo modo, el canal de results
se envía solo para el consumidor.
También tenga en cuenta que el canal de results
no se puede cerrar aquí, ya que hay varios goroutines de consumo, y solo el primer intento de cerrarlo tendría éxito y otros provocarían pánico en el tiempo de ejecución. results
canal de results
puede (debe) cerrarse después de que terminaron todas las rutinas de consumo, porque entonces podemos estar seguros de que no se enviarán más valores (resultados) en el canal de results
.
Tenemos resultados que deben analizarse:
func analyze(results <-chan *Job) {
defer wg2.Done()
for job := range results {
fmt.Printf("result: %s/n", job.Result)
}
}
Como puede ver, esto también recibe resultados siempre que puedan aparecer (hasta que se cierre el canal de results
). El canal de results
para el analizador es solo de recepción .
Tenga en cuenta el uso de tipos de canales: siempre que sea suficiente, use solo un tipo de canal unidireccional para detectar y prevenir errores de manera anticipada, en tiempo de compilación. Solo use el tipo de canal bidireccional si necesita ambas direcciones.
Y así es como todos estos están pegados:
func main() {
jobs := make(chan *Job, 100) // Buffered channel
results := make(chan *Job, 100) // Buffered channel
// Start consumers:
for i := 0; i < 5; i++ { // 5 consumers
wg.Add(1)
go consume(i, jobs, results)
}
// Start producing
go produce(jobs)
// Start analyzing:
wg2.Add(1)
go analyze(results)
wg.Wait() // Wait all consumers to finish processing jobs
// All jobs are processed, no more values will be sent on results:
close(results)
wg2.Wait() // Wait analyzer to analyze all results
}
Ejemplo de salida:
Aquí hay un ejemplo de salida:
Como puede ver, los resultados vienen y se analizan antes de que todos los trabajos se pongan en cola:
worker #1 received: ''e'', sleep 81ms
worker #2 received: ''b'', sleep 887ms
worker #3 received: ''c'', sleep 847ms
worker #4 received: ''d'', sleep 59ms
worker #0 received: ''a'', sleep 81ms
worker #4 received: ''f'', sleep 318ms
result: d-59ms
worker #0 received: ''h'', sleep 540ms
worker #1 received: ''g'', sleep 425ms
result: e-81ms
result: a-81ms
worker #4 received: ''i'', sleep 456ms
result: f-318ms
worker #1 received: ''j'', sleep 300ms
result: g-425ms
worker #0 received: ''k'', sleep 694ms
result: h-540ms
worker #1 received: ''l'', sleep 511ms
result: j-300ms
worker #4 received: ''m'', sleep 162ms
result: i-456ms
worker #3 received: ''n'', sleep 89ms
result: c-847ms
worker #2 received: ''o'', sleep 728ms
result: b-887ms
worker #3 received: ''p'', sleep 274ms
result: n-89ms
worker #4 received: ''q'', sleep 211ms
result: m-162ms
worker #4 received: ''r'', sleep 445ms
result: q-211ms
result: p-274ms
worker #3 received: ''s'', sleep 237ms
worker #0 received: ''t'', sleep 106ms
result: k-694ms
worker #1 received: ''u'', sleep 495ms
result: l-511ms
result: t-106ms
worker #0 received: ''v'', sleep 466ms
worker #3 received: ''w'', sleep 528ms
result: s-237ms
worker #2 received: ''x'', sleep 258ms
result: o-728ms
result: r-445ms
worker #4 received: ''y'', sleep 47ms
worker #4 received: ''z'', sleep 947ms
result: y-47ms
result: u-495ms
result: x-258ms
result: v-466ms
result: w-528ms
Pruebe la aplicación completa en el patio de juegos Go .
Sin un canal de results
El código se simplifica significativamente si no usamos un canal de results
, pero los goroutines del consumidor manejan el resultado de inmediato (imprímalo en nuestro caso). En este caso, no necesitamos 2 valores de sync.WaitGroup
(el segundo solo fue necesario para esperar a que se complete el analizador).
Sin un canal de results
, la solución completa es la siguiente:
var wg sync.WaitGroup
type Job struct {
Id int
Work string
}
func produce(jobs chan<- *Job) {
defer wg.Done()
// Generate jobs:
id := 0
for c := ''a''; c <= ''z''; c++ {
id++
jobs <- &Job{Id: id, Work: fmt.Sprintf("%c", c)}
}
close(jobs)
}
func consume(id int, jobs <-chan *Job) {
defer wg.Done()
for job := range jobs {
sleepMs := rand.Intn(1000)
fmt.Printf("worker #%d received: ''%s'', sleep %dms/n", id, job.Work, sleepMs)
time.Sleep(time.Duration(sleepMs) * time.Millisecond)
fmt.Printf("result: %s/n", job.Work+fmt.Sprintf("-%dms", sleepMs))
}
}
func main() {
jobs := make(chan *Job, 100) // Buffered channel
// Start consumers:
for i := 0; i < 5; i++ { // 5 consumers
wg.Add(1)
go consume(i, jobs)
}
// Start producing
go produce(jobs)
wg.Wait() // Wait all consumers to finish processing jobs
}
La salida es "similar" a la del canal de results
(pero, por supuesto, el orden de ejecución / finalización es aleatorio).
Pruebe esta variante en el patio de juegos Go .