concurrency - monitorizar - ¿La mejor manera de implementar contadores globales para aplicaciones altamente concurrentes?
hashtracking (7)
¿Cuál es la mejor manera de implementar contadores globales para una aplicación altamente concurrente? En mi caso, es posible que tenga rutinas de 10K-20K go que hagan "trabajo", y quiero contar el número y los tipos de elementos en los que están trabajando colectivamente las rutinas ...
El estilo de codificación síncrona "clásico" se vería así:
var work_counter int
func GoWorkerRoutine() {
for {
// do work
atomic.AddInt32(&work_counter,1)
}
}
Ahora esto se complica más porque quiero rastrear el "tipo" de trabajo que se está realizando, así que realmente necesito algo como esto:
var work_counter map[string]int
var work_mux sync.Mutex
func GoWorkerRoutine() {
for {
// do work
work_mux.Lock()
work_counter["type1"]++
work_mux.Unlock()
}
}
Parece que debería haber una forma optimizada de "ir" usando canales o algo similar a esto:
var work_counter int
var work_chan chan int // make() called somewhere else (buffered)
// started somewher else
func GoCounterRoutine() {
for {
select {
case c := <- work_chan:
work_counter += c
break
}
}
}
func GoWorkerRoutine() {
for {
// do work
work_chan <- 1
}
}
A este último ejemplo todavía le falta el mapa, pero eso es bastante fácil de agregar. ¿Proporcionará este estilo un mejor rendimiento que un simple incremento atómico? No puedo decir si esto es más o menos complicado cuando hablamos de acceso simultáneo a un valor global en comparación con algo que puede bloquearse en la E / S para completar ...
Los pensamientos son apreciados.
Actualización 5/28/2013:
Probé un par de implementaciones, y los resultados no fueron los que esperaba, aquí está mi código fuente del contador:
package helpers
import (
)
type CounterIncrementStruct struct {
bucket string
value int
}
type CounterQueryStruct struct {
bucket string
channel chan int
}
var counter map[string]int
var counterIncrementChan chan CounterIncrementStruct
var counterQueryChan chan CounterQueryStruct
var counterListChan chan chan map[string]int
func CounterInitialize() {
counter = make(map[string]int)
counterIncrementChan = make(chan CounterIncrementStruct,0)
counterQueryChan = make(chan CounterQueryStruct,100)
counterListChan = make(chan chan map[string]int,100)
go goCounterWriter()
}
func goCounterWriter() {
for {
select {
case ci := <- counterIncrementChan:
if len(ci.bucket)==0 { return }
counter[ci.bucket]+=ci.value
break
case cq := <- counterQueryChan:
val,found:=counter[cq.bucket]
if found {
cq.channel <- val
} else {
cq.channel <- -1
}
break
case cl := <- counterListChan:
nm := make(map[string]int)
for k, v := range counter {
nm[k] = v
}
cl <- nm
break
}
}
}
func CounterIncrement(bucket string, counter int) {
if len(bucket)==0 || counter==0 { return }
counterIncrementChan <- CounterIncrementStruct{bucket,counter}
}
func CounterQuery(bucket string) int {
if len(bucket)==0 { return -1 }
reply := make(chan int)
counterQueryChan <- CounterQueryStruct{bucket,reply}
return <- reply
}
func CounterList() map[string]int {
reply := make(chan map[string]int)
counterListChan <- reply
return <- reply
}
Utiliza canales tanto para escrituras como para lecturas, lo que parece lógico.
Aquí están mis casos de prueba:
func bcRoutine(b *testing.B,e chan bool) {
for i := 0; i < b.N; i++ {
CounterIncrement("abc123",5)
CounterIncrement("def456",5)
CounterIncrement("ghi789",5)
CounterIncrement("abc123",5)
CounterIncrement("def456",5)
CounterIncrement("ghi789",5)
}
e<-true
}
func BenchmarkChannels(b *testing.B) {
b.StopTimer()
CounterInitialize()
e:=make(chan bool)
b.StartTimer()
go bcRoutine(b,e)
go bcRoutine(b,e)
go bcRoutine(b,e)
go bcRoutine(b,e)
go bcRoutine(b,e)
<-e
<-e
<-e
<-e
<-e
}
var mux sync.Mutex
var m map[string]int
func bmIncrement(bucket string, value int) {
mux.Lock()
m[bucket]+=value
mux.Unlock()
}
func bmRoutine(b *testing.B,e chan bool) {
for i := 0; i < b.N; i++ {
bmIncrement("abc123",5)
bmIncrement("def456",5)
bmIncrement("ghi789",5)
bmIncrement("abc123",5)
bmIncrement("def456",5)
bmIncrement("ghi789",5)
}
e<-true
}
func BenchmarkMutex(b *testing.B) {
b.StopTimer()
m=make(map[string]int)
e:=make(chan bool)
b.StartTimer()
for i := 0; i < b.N; i++ {
bmIncrement("abc123",5)
bmIncrement("def456",5)
bmIncrement("ghi789",5)
bmIncrement("abc123",5)
bmIncrement("def456",5)
bmIncrement("ghi789",5)
}
go bmRoutine(b,e)
go bmRoutine(b,e)
go bmRoutine(b,e)
go bmRoutine(b,e)
go bmRoutine(b,e)
<-e
<-e
<-e
<-e
<-e
}
Implementé un punto de referencia simple con solo un mutex alrededor del mapa (solo pruebas de escritura), y comparé ambos con 5 goroutines que se ejecutan en paralelo. Aquí están los resultados:
$ go test --bench=. helpers
PASS
BenchmarkChannels 100000 15560 ns/op
BenchmarkMutex 1000000 2669 ns/op
ok helpers 4.452s
No hubiera esperado que el mutex fuera mucho más rápido ...
¿Más pensamientos?
El último estuvo cerca:
package main
import "fmt"
func main() {
ch := make(chan int, 3)
go GoCounterRoutine(ch)
go GoWorkerRoutine(1, ch)
// not run as goroutine because mein() would just end
GoWorkerRoutine(2, ch)
}
// started somewhere else
func GoCounterRoutine(ch chan int) {
counter := 0
for {
ch <- counter
counter += 1
}
}
func GoWorkerRoutine(n int, ch chan int) {
var seq int
for seq := range ch {
// do work:
fmt.Println(n, seq)
}
}
Esto introduce un único punto de falla: si el contador goroutine muere, todo se pierde. Esto puede no ser un problema si todas las goroutinas se ejecutan en una computadora, pero puede convertirse en un problema si se encuentran dispersas en la red. Para hacer que el contador sea inmune a las fallas de nodos individuales en el clúster, se deben usar algoritmos especiales .
Implementé esto con un simple mapa + exclusión mutua que parece ser la mejor manera de manejar esto, ya que es la "forma más simple" (que es lo que Go dice usar para elegir bloqueos frente a canales).
requestID := client.msgID.Get()
form.Set("id", requestID)
Puede ejecutar el código en https://play.golang.org/p/9bDMDLFBAY . Hice una versión empaquetada simple en gist.github.com
La otra respuesta que usa sync / atomic es adecuada para cosas como contadores de páginas, pero no para enviar identificadores únicos a una API externa. Para hacer eso, necesita una operación de "incremento y devolución", que solo se puede implementar como un bucle CAS.
Aquí hay un bucle CAS alrededor de un int32 para generar identificadores de mensaje únicos:
package main
import (
"fmt"
"sync"
)
type single struct {
mu sync.Mutex
values map[string]int64
}
var counters = single{
values: make(map[string]int64),
}
func (s *single) Get(key string) int64 {
s.mu.Lock()
defer s.mu.Unlock()
return s.values[key]
}
func (s *single) Incr(key string) int64 {
s.mu.Lock()
defer s.mu.Unlock()
s.values[key]++
return s.values[key]
}
func main() {
fmt.Println(counters.Incr("bar"))
fmt.Println(counters.Incr("bar"))
fmt.Println(counters.Incr("bar"))
fmt.Println(counters.Get("foo"))
fmt.Println(counters.Get("bar"))
}
Para usarlo, simplemente haz:
import "sync/atomic"
type UniqueID struct {
counter int32
}
func (c *UniqueID) Get() int32 {
for {
val := atomic.LoadInt32(&c.counter)
if atomic.CompareAndSwapInt32(&c.counter, val, val+1) {
return val
}
}
}
Esto tiene una ventaja sobre los canales en que no requiere tantos recursos extra inactivos: los goroutines existentes se usan cuando piden ID en lugar de usar un goroutine para cada contador que su programa necesita.
TODO: Benchmark contra canales. Voy a adivinar que los canales son peores en el caso de no contienda y mejor en el caso de alta contienda, ya que tienen cola mientras este código simplemente gira intentando ganar la carrera.
No tengas miedo de usar las exclusiones mutuas y las cerraduras solo porque pienses que "no son correctas". En su segundo ejemplo, está absolutamente claro lo que está pasando, y eso cuenta mucho. Tendrá que probarlo usted mismo para ver qué tan contento está el mutex y si agregar complicaciones complicará el rendimiento.
Si necesitas un mayor rendimiento, quizás Shards sea la mejor manera de hacerlo: http://play.golang.org/p/uLirjskGeN
El inconveniente es que sus cuentas solo estarán tan actualizadas como decida su fragmentación. También puede haber éxitos de rendimiento desde el momento de la llamada. time.Since()
tanto, pero, como siempre, time.Since()
primero :)
No use sync/atomic - desde la página enlazada
El paquete atómico proporciona primitivas de memoria atómica de bajo nivel útiles para implementar algoritmos de sincronización. Estas funciones requieren un gran cuidado para ser utilizadas correctamente. A excepción de las aplicaciones especiales de bajo nivel, la sincronización se realiza mejor con los canales o las instalaciones del paquete de sincronización.
La última vez que tuve que hacer esto , comparé algo que se parecía a su segundo ejemplo con un mutex y algo que se parecía a su tercer ejemplo con un canal. El código de los canales se ganó cuando las cosas se pusieron realmente ocupadas, pero asegúrate de que el buffer del canal sea grande.
Pregunta antigua, pero me topé con esto y puede ayudar: https://github.com/uber-go/atomic
Básicamente, los ingenieros de Uber han desarrollado algunas funciones útiles en la parte superior del paquete sync/atomic
Todavía no he probado esto en producción, pero el código base es muy pequeño y la implementación de la mayoría de las funciones es bastante estándar.
Definitivamente preferido sobre el uso de canales o mutex básicos.
Si está intentando sincronizar un grupo de trabajadores (p. Ej., Permitir que n goroutines eliminen cierta cantidad de trabajo), los canales son una muy buena forma de hacerlo, pero si todo lo que necesita es un contador (p. Ej., Vistas de página ) entonces son una exageración. Los paquetes de sync y sync/atomic están ahí para ayudar.
import "sync/atomic"
type count32 int32
func (c *count32) increment() int32 {
return atomic.AddInt32((*int32)(c), 1)
}
func (c *count32) get() int32 {
return atomic.LoadInt32((*int32)(c))
}