multithreading - threads - ¿Cómo definirías un grupo de goroutines para ser ejecutados a la vez en Golang?
go threads (3)
TL; TR: Por favor, ve a la última parte y dime cómo resolverías este problema.
Empecé a usar Golang esta mañana viniendo de Python. Quiero llamar a un ejecutable de código cerrado desde Go varias veces, con un poco de concurrencia, con diferentes argumentos de línea de comandos. Mi código resultante está funcionando muy bien, pero me gustaría obtener su opinión para mejorarlo. Como estoy en una etapa de aprendizaje temprano, también explicaré mi flujo de trabajo.
En aras de la simplicidad, suponga aquí que este "programa externo de código cerrado" es zenity
, una herramienta de línea de comandos de Linux que puede mostrar cuadros de mensaje gráficos desde la línea de comandos.
Llamar a un archivo ejecutable desde Go
Entonces, en Go, iría así:
package main
import "os/exec"
func main() {
cmd := exec.Command("zenity", "--info", "--text=''Hello World''")
cmd.Run()
}
Esto debería estar funcionando bien. Tenga en cuenta que .Run()
es un equivalente funcional a .Start()
seguido de .Wait()
. Esto es genial, pero si quisiera ejecutar este programa solo una vez, toda la programación no valdría la pena. Así que hagámoslo muchas veces.
Llamar a un ejecutable varias veces
Ahora que lo he hecho funcionar, me gustaría llamar a mi programa varias veces, con argumentos de línea de comandos personalizados (aquí solo en aras de la simplicidad).
package main
import (
"os/exec"
"strconv"
)
func main() {
NumEl := 8 // Number of times the external program is called
for i:=0; i<NumEl; i++ {
cmd := exec.Command("zenity", "--info", "--text=''Hello from iteration n." + strconv.Itoa(i) + "''")
cmd.Run()
}
}
Ok, lo hicimos! Pero todavía no puedo ver la ventaja de Go over Python ... Este fragmento de código se ejecuta en realidad en serie. Tengo una CPU de núcleo múltiple y me gustaría aprovecharla. Así que agreguemos algo de concurrencia con goroutines.
Goroutines, o una forma de hacer que mi programa sea paralelo
a) Primer intento: solo agregue "ir" a todos lados
Reescribamos nuestro código para que sea más fácil llamar y reutilizar y agregar la famosa palabra clave go
:
package main
import (
"os/exec"
"strconv"
)
func main() {
NumEl := 8
for i:=0; i<NumEl; i++ {
go callProg(i) // <--- There!
}
}
func callProg(i int) {
cmd := exec.Command("zenity", "--info", "--text=''Hello from iteration n." + strconv.Itoa(i) + "''")
cmd.Run()
}
¡Nada! ¿Cuál es el problema? Todos los goroutines se ejecutan a la vez. Realmente no sé por qué zenity no se ejecuta pero AFAIK, el programa Go salió antes de que el programa zenity external pudiera incluso inicializarse. Esto fue confirmado por el uso del time.Sleep
. time.Sleep
: esperar un par de segundos fue suficiente para permitir que la instancia 8 de zenity se lance. No sé si esto puede considerarse un error.
Para empeorar las cosas, el programa real que realmente me gustaría llamar toma un tiempo para ejecutarse. Si ejecuto 8 instancias de este programa en paralelo en mi CPU de 4 núcleos, va a perder un poco de tiempo haciendo un montón de cambio de contexto ... No sé cómo se comportan los simples goroutines Go, pero exec.Command
lanzará zenity 8 veces en 8 hilos diferentes Para empeorar las cosas, quiero ejecutar este programa más de 100.000 veces. Hacer todo eso a la vez en goroutines no será eficiente en absoluto. Aún así, me gustaría aprovechar mi CPU de 4 núcleos.
b) Segundo intento: usar grupos de gorutines
Los recursos en línea tienden a recomendar el uso de sync.WaitGroup
para este tipo de trabajo. El problema con ese enfoque es que básicamente estás trabajando con lotes de rutinas: si creo de WaitGroup de 4 miembros, el programa Go esperará a que los 4 programas externos finalicen antes de llamar a un nuevo lote de 4 programas. Esto no es eficiente: la CPU se desperdicia, una vez más.
Algunos otros recursos recomendaron el uso de un canal almacenado para hacer el trabajo:
package main
import (
"os/exec"
"strconv"
)
func main() {
NumEl := 8 // Number of times the external program is called
NumCore := 4 // Number of available cores
c := make(chan bool, NumCore - 1)
for i:=0; i<NumEl; i++ {
go callProg(i, c)
c <- true // At the NumCoreth iteration, c is blocking
}
}
func callProg(i int, c chan bool) {
defer func () {<- c}()
cmd := exec.Command("zenity", "--info", "--text=''Hello from iteration n." + strconv.Itoa(i) + "''")
cmd.Run()
}
Esto parece feo Los canales no fueron diseñados para este propósito: estoy explotando un efecto secundario. Me encanta el concepto de defer
pero odio tener que declarar una función (incluso una lambda) para extraer un valor del canal ficticio que creé. Ah, y por supuesto, usar un canal ficticio es, por sí mismo, feo.
c) Tercer intento: muere cuando todos los niños están muertos
Ahora estamos casi terminados. Solo tengo que tener en cuenta otro efecto secundario: el programa Go se cierra antes de que se cierren todas las ventanas emergentes de zenity. Esto se debe a que cuando el ciclo está finalizado (en la octava iteración), nada impide que el programa finalice. Esta vez, sync.WaitGroup
será útil.
package main
import (
"os/exec"
"strconv"
"sync"
)
func main() {
NumEl := 8 // Number of times the external program is called
NumCore := 4 // Number of available cores
c := make(chan bool, NumCore - 1)
wg := new(sync.WaitGroup)
wg.Add(NumEl) // Set the number of goroutines to (0 + NumEl)
for i:=0; i<NumEl; i++ {
go callProg(i, c, wg)
c <- true // At the NumCoreth iteration, c is blocking
}
wg.Wait() // Wait for all the children to die
close(c)
}
func callProg(i int, c chan bool, wg *sync.WaitGroup) {
defer func () {
<- c
wg.Done() // Decrease the number of alive goroutines
}()
cmd := exec.Command("zenity", "--info", "--text=''Hello from iteration n." + strconv.Itoa(i) + "''")
cmd.Run()
}
Hecho.
Mis preguntas
- ¿Conoces alguna otra forma adecuada de limitar el número de goroutines ejecutados a la vez?
No me refiero a los hilos; cómo Go administra los goroutines internamente no es relevante. Me refiero a limitar la cantidad de goroutines lanzados al mismo tiempo: exec.Command
crea un nuevo hilo cada vez que se llama, por lo que debo controlar el número de veces que se llama.
- ¿Te parece bien ese código?
- ¿Sabes cómo evitar el uso de un canal ficticio en ese caso?
No puedo convencerme a mí mismo de que esos canales ficticios son el camino a seguir.
Generaría 4 rutinas de trabajador que leen las tareas desde un canal común. Los goroutines que son más rápidos que otros (porque están programados de manera diferente o tienen tareas simples) recibirán más tareas de este canal que otros. Además de eso, usaría un sync.WaitGroup para esperar a que finalicen todos los trabajadores. La parte restante es solo la creación de las tareas. Aquí puede ver una implementación de ejemplo de ese enfoque:
package main
import (
"os/exec"
"strconv"
"sync"
)
func main() {
tasks := make(chan *exec.Cmd, 64)
// spawn four worker goroutines
var wg sync.WaitGroup
for i := 0; i < 4; i++ {
wg.Add(1)
go func() {
for cmd := range tasks {
cmd.Run()
}
wg.Done()
}()
}
// generate some tasks
for i := 0; i < 10; i++ {
tasks <- exec.Command("zenity", "--info", "--text=''Hello from iteration n."+strconv.Itoa(i)+"''")
}
close(tasks)
// wait for the workers to finish
wg.Wait()
}
Probablemente haya otros enfoques posibles, pero creo que esta es una solución muy clara que es fácil de entender.
Un enfoque simple para la regulación (ejecute f()
N veces pero la máxima concurrencia máxima), solo un esquema:
package main
import (
"sync"
)
const maxConcurrency = 4 // for example
var throttle = make(chan int, maxConcurrency)
func main() {
const N = 100 // for example
var wg sync.WaitGroup
for i := 0; i < N; i++ {
throttle <- 1 // whatever number
wg.Add(1)
go f(i, &wg, throttle)
}
wg.Wait()
}
func f(i int, wg *sync.WaitGroup, throttle chan int) {
defer wg.Done()
// whatever processing
println(i)
<-throttle
}
Probablemente no llamaría "dummy" al canal del throttle
. En mi humilde opinión, es una forma elegante (no es mi invención, por supuesto), cómo limitar la concurrencia.
Por cierto: tenga en cuenta que está ignorando el error devuelto por cmd.Run()
.
intenta esto: https://github.com/korovkin/limiter
limiter := NewConcurrencyLimiter(10)
limiter.Execute(func() {
zenity(...)
})
limiter.Wait()