go concurrency channel goroutine

go - Cómo transmitir mensajes usando el canal



concurrency channel (4)

Soy nuevo y estoy tratando de crear un servidor de chat simple donde los clientes puedan transmitir mensajes a todos los clientes conectados.

En mi servidor, tengo un goroutine (infinito para bucle) que acepta la conexión y todas las conexiones son recibidas por un canal.

go func() { for { conn, _ := listener.Accept() ch <- conn } }()

Luego, inicio un controlador (goroutine) para cada cliente conectado. Dentro del controlador, trato de transmitir a todas las conexiones iterando a través del canal.

for c := range ch { conn.Write(msg) }

Sin embargo, no puedo transmitir porque (creo que al leer los documentos) el canal debe cerrarse antes de iterar. No estoy seguro de cuándo debo cerrar el canal porque quiero aceptar continuamente nuevas conexiones y cerrar el canal no me permite hacerlo. Si alguien me puede ayudar, o proporcionar una mejor manera de transmitir mensajes a todos los clientes conectados, sería apreciado.


Debido a que los canales Go siguen el patrón de Procesos secuenciales de comunicación (CSP), los canales son una entidad de comunicación punto a punto. Siempre hay un escritor y un lector involucrados en cada intercambio.

Sin embargo, cada extremo del canal se puede compartir entre múltiples goroutines. Esto es seguro, no hay condiciones de carrera peligrosas.

Por lo tanto, puede haber múltiples escritores compartiendo el final de la escritura. Y / o puede haber múltiples lectores compartiendo el final de la lectura. Escribí más sobre esto en una respuesta diferente , que incluye ejemplos.

Si realmente necesita una transmisión, no puede hacerlo directamente, pero no es difícil implementar una rutina rutinaria que copie un valor en cada uno de los grupos de canales de salida.


Lo que está haciendo es un patrón de abanico, es decir, múltiples puntos finales están escuchando una sola fuente de entrada. El resultado de este patrón es que solo uno de estos oyentes podrá recibir el mensaje siempre que haya un mensaje en la fuente de entrada. La única excepción es un close de canal. Este close será reconocido por todos los oyentes, y por lo tanto una "emisión".

Pero lo que quiere hacer es transmitir un mensaje leído desde la conexión, para que podamos hacer algo como esto:

Cuando se conoce el número de oyentes

Deje que cada trabajador escuche un canal de transmisión dedicado y envíe el mensaje desde el canal principal a cada canal de transmisión dedicado.

type worker struct { source chan interface{} quit chan struct{} } func (w *worker) Start() { w.source = make(chan interface{}, 10) // some buffer size to avoid blocking go func() { for { select { case msg := <-w.source // do something with msg case <-quit: // will explain this in the last section return } } }() }

Y luego podríamos tener un grupo de trabajadores:

workers := []*worker{&worker{}, &worker{}} for _, worker := range workers { worker.Start() }

Entonces comience nuestro oyente:

go func() { for { conn, _ := listener.Accept() ch <- conn } }()

Y un despachador:

go func() { for { msg := <- ch for _, worker := workers { worker.source <- msg } } }()

Cuando no se conoce el número de oyentes

En este caso, la solución dada anteriormente todavía funciona. La única diferencia es que siempre que necesite un nuevo trabajador, debe crear un nuevo trabajador, ponerlo en marcha y luego empujarlo a la división de workers . Pero este método requiere un segmento seguro para subprocesos, que necesita un bloqueo a su alrededor. Una de las implementaciones puede verse así:

type threadSafeSlice struct { sync.Mutex workers []*worker } func (slice *threadSafeSlice) Push(w *worker) { slice.Lock() defer slice.Unlock() workers = append(workers, w) } func (slice *threadSafeSlice) Iter(routine func(*worker)) { slice.Lock() defer slice.Unlock() for _, worker := range workers { routine(worker) } }

Siempre que quiera comenzar un trabajador:

w := &worker{} w.Start() threadSafeSlice.Push(w)

Y su despachador se cambiará a:

go func() { for { msg := <- ch threadSafeSlice.Iter(func(w *worker) { w.source <- msg }) } }()

Últimas palabras: nunca dejes una gorutina colgando

Una de las buenas prácticas es: nunca deje una gorutina colgando. Entonces, cuando termine de escuchar, debe cerrar todas las gorutinas que disparó. Esto se realizará a través del canal de quit en worker :

Primero necesitamos crear un canal global de señalización para quit :

globalQuit := make(chan struct{})

Y cada vez que creamos un trabajador, le asignamos el canal globalQuit como señal de salida:

worker.quit = globalQuit

Luego, cuando queremos cerrar a todos los trabajadores, simplemente hacemos:

close(globalQuit)

Como el close será reconocido por todas las goroutinas que escuchan (este es el punto que entendió), todas las goroutinas serán devueltas. Recuerde cerrar también su rutina de despachador, pero se lo dejaré a usted :)


Transmita a un segmento de canal y use sync. Mutex para administrar la adición y eliminación de canales puede ser la forma más fácil en su caso.

Esto es lo que puede hacer para broadcast en golang:

  • Puede transmitir un cambio de estado compartido con sync.Cond. De esta manera, no tiene ninguna asignación de asignación una vez, pero no puede agregar tiempo de espera funcional o trabajar con otro canal.
  • Puede transmitir un cambio de estado compartido con un canal antiguo cercano y crear un nuevo canal y sincronizar. De esta manera, tiene una asignación por cambio de estado, pero puede agregar tiempo de espera funcional y trabajar con otro canal.
  • Puede transmitir a una porción de devolución de llamada de función y usar sync.Mutex para administrarlos. La persona que llama puede hacer cosas de canal. De esta forma, tiene más de una asignación por persona que llama y trabaja con otro canal.
  • Puede transmitir a un segmento de canal y usar sync.Mutex para administrarlos. De esta forma, tiene más de una asignación por persona que llama y trabaja con otro canal.
  • Puede transmitir a una porción de sync.WaitGroup y usar sync.Mutex para administrarlos.

Una solución más elegante es un "corredor", donde los clientes pueden suscribirse y cancelar la suscripción a los mensajes.

Para manejar también la suscripción y la cancelación de la suscripción de manera elegante, podemos utilizar canales para esto, de modo que el bucle principal del intermediario que recibe y distribuye los mensajes puede incorporar todo esto utilizando una sola declaración de select , y la sincronización se da por la naturaleza de la solución.

Otro truco es almacenar a los suscriptores en un mapa, mapeando desde el canal que usamos para distribuirles mensajes. Por lo tanto, use el canal como la clave en el mapa, y luego agregar y eliminar clientes es simple. Esto es posible porque los valores de los canales son comparable , y su comparación es muy eficiente ya que los valores de los canales son simples punteros a los descriptores de los canales.

Sin más preámbulos, aquí hay una implementación simple de intermediario:

type Broker struct { stopCh chan struct{} publishCh chan interface{} subCh chan chan interface{} unsubCh chan chan interface{} } func NewBroker() *Broker { return &Broker{ stopCh: make(chan struct{}), publishCh: make(chan interface{}, 1), subCh: make(chan chan interface{}, 1), unsubCh: make(chan chan interface{}, 1), } } func (b *Broker) Start() { subs := map[chan interface{}]struct{}{} for { select { case <-b.stopCh: return case msgCh := <-b.subCh: subs[msgCh] = struct{}{} case msgCh := <-b.unsubCh: delete(subs, msgCh) case msg := <-b.publishCh: for msgCh := range subs { // msgCh is buffered, use non-blocking send to protect the broker: select { case msgCh <- msg: default: } } } } } func (b *Broker) Stop() { close(b.stopCh) } func (b *Broker) Subscribe() chan interface{} { msgCh := make(chan interface{}, 5) b.subCh <- msgCh return msgCh } func (b *Broker) Unsubscribe(msgCh chan interface{}) { b.unsubCh <- msgCh } func (b *Broker) Publish(msg interface{}) { b.publishCh <- msg }

Ejemplo usándolo:

func main() { // Create and start a broker: b := NewBroker() go b.Start() // Create and subscribe 3 clients: clientFunc := func(id int) { msgCh := b.Subscribe() for { fmt.Printf("Client %d got message: %v/n", id, <-msgCh) } } for i := 0; i < 3; i++ { go clientFunc(i) } // Start publishing messages: go func() { for msgId := 0; ; msgId++ { b.Publish(fmt.Sprintf("msg#%d", msgId)) time.Sleep(300 * time.Millisecond) } }() time.Sleep(time.Second) }

La salida de lo anterior será (pruébalo en Go Playground ):

Client 2 got message: msg#0 Client 0 got message: msg#0 Client 1 got message: msg#0 Client 2 got message: msg#1 Client 0 got message: msg#1 Client 1 got message: msg#1 Client 1 got message: msg#2 Client 2 got message: msg#2 Client 0 got message: msg#2 Client 2 got message: msg#3 Client 0 got message: msg#3 Client 1 got message: msg#3

Mejoras

Puede considerar las siguientes mejoras. Estos pueden o no ser útiles dependiendo de cómo / para qué utilice el corredor.

Broker.Unsubscribe() puede cerrar el canal de mensajes, lo que indica que no se enviarán más mensajes en él:

func (b *Broker) Unsubscribe(msgCh chan interface{}) { b.unsubCh <- msgCh close(msgCh) }

Esto permitiría a los clientes range el canal de mensajes, de esta manera:

msgCh := b.Subscribe() for msg := range msgCh { fmt.Printf("Client %d got message: %v/n", id, msg) }

Entonces, si alguien msgCh suscripción de este msgCh esta manera:

b.Unsubscribe(msgCh)

El bucle de rango anterior finalizará después de procesar todos los mensajes que se enviaron antes de la llamada a Unsubscribe() .

Si desea que sus clientes confíen en que se cierre el canal de mensajes, y la vida útil del agente es más estrecha que la vida útil de su aplicación, también puede cerrar todos los clientes suscritos cuando el agente se detiene, en el método Start() esta manera:

case <-b.stopCh: for msgCh := range subs { close(msgCh) } return