seguidores para monitorizar hashtracking hashtags hashtagify estadisticas conseguir buscador concurrency go

concurrency - monitorizar - ¿La mejor manera de implementar contadores globales para aplicaciones altamente concurrentes?



hashtracking (7)

¿Cuál es la mejor manera de implementar contadores globales para una aplicación altamente concurrente? En mi caso, es posible que tenga rutinas de 10K-20K go que hagan "trabajo", y quiero contar el número y los tipos de elementos en los que están trabajando colectivamente las rutinas ...

El estilo de codificación síncrona "clásico" se vería así:

var work_counter int func GoWorkerRoutine() { for { // do work atomic.AddInt32(&work_counter,1) } }

Ahora esto se complica más porque quiero rastrear el "tipo" de trabajo que se está realizando, así que realmente necesito algo como esto:

var work_counter map[string]int var work_mux sync.Mutex func GoWorkerRoutine() { for { // do work work_mux.Lock() work_counter["type1"]++ work_mux.Unlock() } }

Parece que debería haber una forma optimizada de "ir" usando canales o algo similar a esto:

var work_counter int var work_chan chan int // make() called somewhere else (buffered) // started somewher else func GoCounterRoutine() { for { select { case c := <- work_chan: work_counter += c break } } } func GoWorkerRoutine() { for { // do work work_chan <- 1 } }

A este último ejemplo todavía le falta el mapa, pero eso es bastante fácil de agregar. ¿Proporcionará este estilo un mejor rendimiento que un simple incremento atómico? No puedo decir si esto es más o menos complicado cuando hablamos de acceso simultáneo a un valor global en comparación con algo que puede bloquearse en la E / S para completar ...

Los pensamientos son apreciados.

Actualización 5/28/2013:

Probé un par de implementaciones, y los resultados no fueron los que esperaba, aquí está mi código fuente del contador:

package helpers import ( ) type CounterIncrementStruct struct { bucket string value int } type CounterQueryStruct struct { bucket string channel chan int } var counter map[string]int var counterIncrementChan chan CounterIncrementStruct var counterQueryChan chan CounterQueryStruct var counterListChan chan chan map[string]int func CounterInitialize() { counter = make(map[string]int) counterIncrementChan = make(chan CounterIncrementStruct,0) counterQueryChan = make(chan CounterQueryStruct,100) counterListChan = make(chan chan map[string]int,100) go goCounterWriter() } func goCounterWriter() { for { select { case ci := <- counterIncrementChan: if len(ci.bucket)==0 { return } counter[ci.bucket]+=ci.value break case cq := <- counterQueryChan: val,found:=counter[cq.bucket] if found { cq.channel <- val } else { cq.channel <- -1 } break case cl := <- counterListChan: nm := make(map[string]int) for k, v := range counter { nm[k] = v } cl <- nm break } } } func CounterIncrement(bucket string, counter int) { if len(bucket)==0 || counter==0 { return } counterIncrementChan <- CounterIncrementStruct{bucket,counter} } func CounterQuery(bucket string) int { if len(bucket)==0 { return -1 } reply := make(chan int) counterQueryChan <- CounterQueryStruct{bucket,reply} return <- reply } func CounterList() map[string]int { reply := make(chan map[string]int) counterListChan <- reply return <- reply }

Utiliza canales tanto para escrituras como para lecturas, lo que parece lógico.

Aquí están mis casos de prueba:

func bcRoutine(b *testing.B,e chan bool) { for i := 0; i < b.N; i++ { CounterIncrement("abc123",5) CounterIncrement("def456",5) CounterIncrement("ghi789",5) CounterIncrement("abc123",5) CounterIncrement("def456",5) CounterIncrement("ghi789",5) } e<-true } func BenchmarkChannels(b *testing.B) { b.StopTimer() CounterInitialize() e:=make(chan bool) b.StartTimer() go bcRoutine(b,e) go bcRoutine(b,e) go bcRoutine(b,e) go bcRoutine(b,e) go bcRoutine(b,e) <-e <-e <-e <-e <-e } var mux sync.Mutex var m map[string]int func bmIncrement(bucket string, value int) { mux.Lock() m[bucket]+=value mux.Unlock() } func bmRoutine(b *testing.B,e chan bool) { for i := 0; i < b.N; i++ { bmIncrement("abc123",5) bmIncrement("def456",5) bmIncrement("ghi789",5) bmIncrement("abc123",5) bmIncrement("def456",5) bmIncrement("ghi789",5) } e<-true } func BenchmarkMutex(b *testing.B) { b.StopTimer() m=make(map[string]int) e:=make(chan bool) b.StartTimer() for i := 0; i < b.N; i++ { bmIncrement("abc123",5) bmIncrement("def456",5) bmIncrement("ghi789",5) bmIncrement("abc123",5) bmIncrement("def456",5) bmIncrement("ghi789",5) } go bmRoutine(b,e) go bmRoutine(b,e) go bmRoutine(b,e) go bmRoutine(b,e) go bmRoutine(b,e) <-e <-e <-e <-e <-e }

Implementé un punto de referencia simple con solo un mutex alrededor del mapa (solo pruebas de escritura), y comparé ambos con 5 goroutines que se ejecutan en paralelo. Aquí están los resultados:

$ go test --bench=. helpers PASS BenchmarkChannels 100000 15560 ns/op BenchmarkMutex 1000000 2669 ns/op ok helpers 4.452s

No hubiera esperado que el mutex fuera mucho más rápido ...

¿Más pensamientos?


El último estuvo cerca:

package main import "fmt" func main() { ch := make(chan int, 3) go GoCounterRoutine(ch) go GoWorkerRoutine(1, ch) // not run as goroutine because mein() would just end GoWorkerRoutine(2, ch) } // started somewhere else func GoCounterRoutine(ch chan int) { counter := 0 for { ch <- counter counter += 1 } } func GoWorkerRoutine(n int, ch chan int) { var seq int for seq := range ch { // do work: fmt.Println(n, seq) } }

Esto introduce un único punto de falla: si el contador goroutine muere, todo se pierde. Esto puede no ser un problema si todas las goroutinas se ejecutan en una computadora, pero puede convertirse en un problema si se encuentran dispersas en la red. Para hacer que el contador sea inmune a las fallas de nodos individuales en el clúster, se deben usar algoritmos especiales .


Implementé esto con un simple mapa + exclusión mutua que parece ser la mejor manera de manejar esto, ya que es la "forma más simple" (que es lo que Go dice usar para elegir bloqueos frente a canales).

requestID := client.msgID.Get() form.Set("id", requestID)

Puede ejecutar el código en https://play.golang.org/p/9bDMDLFBAY . Hice una versión empaquetada simple en gist.github.com


La otra respuesta que usa sync / atomic es adecuada para cosas como contadores de páginas, pero no para enviar identificadores únicos a una API externa. Para hacer eso, necesita una operación de "incremento y devolución", que solo se puede implementar como un bucle CAS.

Aquí hay un bucle CAS alrededor de un int32 para generar identificadores de mensaje únicos:

package main import ( "fmt" "sync" ) type single struct { mu sync.Mutex values map[string]int64 } var counters = single{ values: make(map[string]int64), } func (s *single) Get(key string) int64 { s.mu.Lock() defer s.mu.Unlock() return s.values[key] } func (s *single) Incr(key string) int64 { s.mu.Lock() defer s.mu.Unlock() s.values[key]++ return s.values[key] } func main() { fmt.Println(counters.Incr("bar")) fmt.Println(counters.Incr("bar")) fmt.Println(counters.Incr("bar")) fmt.Println(counters.Get("foo")) fmt.Println(counters.Get("bar")) }

Para usarlo, simplemente haz:

import "sync/atomic" type UniqueID struct { counter int32 } func (c *UniqueID) Get() int32 { for { val := atomic.LoadInt32(&c.counter) if atomic.CompareAndSwapInt32(&c.counter, val, val+1) { return val } } }

Esto tiene una ventaja sobre los canales en que no requiere tantos recursos extra inactivos: los goroutines existentes se usan cuando piden ID en lugar de usar un goroutine para cada contador que su programa necesita.

TODO: Benchmark contra canales. Voy a adivinar que los canales son peores en el caso de no contienda y mejor en el caso de alta contienda, ya que tienen cola mientras este código simplemente gira intentando ganar la carrera.


No tengas miedo de usar las exclusiones mutuas y las cerraduras solo porque pienses que "no son correctas". En su segundo ejemplo, está absolutamente claro lo que está pasando, y eso cuenta mucho. Tendrá que probarlo usted mismo para ver qué tan contento está el mutex y si agregar complicaciones complicará el rendimiento.

Si necesitas un mayor rendimiento, quizás Shards sea la mejor manera de hacerlo: http://play.golang.org/p/uLirjskGeN

El inconveniente es que sus cuentas solo estarán tan actualizadas como decida su fragmentación. También puede haber éxitos de rendimiento desde el momento de la llamada. time.Since() tanto, pero, como siempre, time.Since() primero :)


No use sync/atomic - desde la página enlazada

El paquete atómico proporciona primitivas de memoria atómica de bajo nivel útiles para implementar algoritmos de sincronización. Estas funciones requieren un gran cuidado para ser utilizadas correctamente. A excepción de las aplicaciones especiales de bajo nivel, la sincronización se realiza mejor con los canales o las instalaciones del paquete de sincronización.

La última vez que tuve que hacer esto , comparé algo que se parecía a su segundo ejemplo con un mutex y algo que se parecía a su tercer ejemplo con un canal. El código de los canales se ganó cuando las cosas se pusieron realmente ocupadas, pero asegúrate de que el buffer del canal sea grande.


Pregunta antigua, pero me topé con esto y puede ayudar: https://github.com/uber-go/atomic

Básicamente, los ingenieros de Uber han desarrollado algunas funciones útiles en la parte superior del paquete sync/atomic

Todavía no he probado esto en producción, pero el código base es muy pequeño y la implementación de la mayoría de las funciones es bastante estándar.

Definitivamente preferido sobre el uso de canales o mutex básicos.


Si está intentando sincronizar un grupo de trabajadores (p. Ej., Permitir que n goroutines eliminen cierta cantidad de trabajo), los canales son una muy buena forma de hacerlo, pero si todo lo que necesita es un contador (p. Ej., Vistas de página ) entonces son una exageración. Los paquetes de sync y sync/atomic están ahí para ayudar.

import "sync/atomic" type count32 int32 func (c *count32) increment() int32 { return atomic.AddInt32((*int32)(c), 1) } func (c *count32) get() int32 { return atomic.LoadInt32((*int32)(c)) }

Ejemplo de Go Playground