go - tag - title seo length
Cómo utilizar una piscina goroutine (3)
Este ejemplo utiliza dos canales, uno para las entradas y otro para la salida. Los trabajadores pueden escalar a cualquier tamaño y cada goroutine trabaja en la cola de entrada y guarda toda la salida en el canal de salida. Comentarios sobre métodos más fáciles son muy bienvenidos.
package main
import (
"fmt"
"sync"
)
var wg sync.WaitGroup
func worker(input chan string, output chan string) {
defer wg.Done()
// Consumer: Process items from the input channel and send results to output channel
for value := range input {
output <- value + " processed"
}
}
func main() {
var jobs = []string{"one", "two", "three", "four", "two", "three", "four", "two", "three", "four", "two", "three", "four", "two", "three", "four", "two"}
input := make(chan string, len(jobs))
output := make(chan string, len(jobs))
workers := 250
// Increment waitgroup counter and create go routines
for i := 0; i < workers; i++ {
wg.Add(1)
go worker(input, output)
}
// Producer: load up input channel with jobs
for _, job := range jobs {
input <- job
}
// Close input channel since no more jobs are being sent to input channel
close(input)
// Wait for all goroutines to finish processing
wg.Wait()
// Close output channel since all workers have finished processing
close(output)
// Read from output channel
for result := range output {
fmt.Println(result)
}
}
Quiero usar Go para descargar hojas de cálculo de precios de acciones de Yahoo Finance. Haré una solicitud http para cada acción en su propia goroutina. Tengo una lista de alrededor de 2500 símbolos, pero en lugar de hacer 2500 solicitudes en paralelo, prefiero hacer 250 a la vez. En Java crearía un grupo de subprocesos y reutilizaría los subprocesos cuando se liberen. Estaba tratando de encontrar algo similar, una piscina de goroutine, por así decirlo, pero no pude encontrar ningún recurso. Apreciaría si alguien me puede decir cómo realizar la tarea en cuestión o indicarme los recursos para la misma. ¡Gracias!
Puede usar la biblioteca de implementación del grupo de subprocesos en Go
desde este repositorio de git
Here está el bonito blog sobre cómo usar los canales como grupo de subprocesos
Fragmento del blog.
var (
MaxWorker = os.Getenv("MAX_WORKERS")
MaxQueue = os.Getenv("MAX_QUEUE")
)
//Job represents the job to be run
type Job struct {
Payload Payload
}
// A buffered channel that we can send work requests on.
var JobQueue chan Job
// Worker represents the worker that executes the job
type Worker struct {
WorkerPool chan chan Job
JobChannel chan Job
quit chan bool
}
func NewWorker(workerPool chan chan Job) Worker {
return Worker{
WorkerPool: workerPool,
JobChannel: make(chan Job),
quit: make(chan bool)}
}
// Start method starts the run loop for the worker, listening for a quit channel in
// case we need to stop it
func (w Worker) Start() {
go func() {
for {
// register the current worker into the worker queue.
w.WorkerPool <- w.JobChannel
select {
case job := <-w.JobChannel:
// we have received a work request.
if err := job.Payload.UploadToS3(); err != nil {
log.Errorf("Error uploading to S3: %s", err.Error())
}
case <-w.quit:
// we have received a signal to stop
return
}
}
}()
}
// Stop signals the worker to stop listening for work requests.
func (w Worker) Stop() {
go func() {
w.quit <- true
}()
}
Supongo que la forma más sencilla es crear 250 goroutines y pasarles un canal que pueda usar para pasar enlaces del goroutine principal a los infantiles, escuchando ese canal.
Cuando todos los enlaces pasan a goroutines, cierras un canal y todos los goroutines terminan sus trabajos.
Para asegurarse de que Goroutine principal se complete antes de que los niños procesen los datos, puede usar sync.WaitGroup
.
Aquí hay un código para ilustrar (no es una versión final de trabajo pero muestra el punto) que dije anteriormente:
func worker(linkChan chan string, wg *sync.WaitGroup) {
// Decreasing internal counter for wait-group as soon as goroutine finishes
defer wg.Done()
for url := range linkChan {
// Analyze value and do the job here
}
}
func main() {
lCh := make(chan string)
wg := new(sync.WaitGroup)
// Adding routines to workgroup and running then
for i := 0; i < 250; i++ {
wg.Add(1)
go worker(lCh, wg)
}
// Processing all links by spreading them to `free` goroutines
for _, link := range yourLinksSlice {
lCh <- link
}
// Closing channel (waiting in goroutines won''t continue any more)
close(lCh)
// Waiting for all goroutines to finish (otherwise they die as main routine dies)
wg.Wait()
}