Haskell rápida cola concurrente
concurrency profiling (2)
El problema
¡Hola! Estoy escribiendo una biblioteca de registro y me encantaría crear un registrador, que se ejecutaría en un subproceso separado, mientras que todos los subprocesos de aplicaciones solo le enviarían mensajes. Quiero encontrar la solución más eficaz para este problema. Necesito una cola de desbloqueo simple aquí.
Enfoques
He creado algunas pruebas para ver cómo funcionan las soluciones disponibles y obtengo resultados muy extraños aquí. Probé 4 implementaciones (código fuente proporcionado a continuación) basado en:
- pipes-concurrency
- Control.Concurrent.Chan
- Control.Concurrent.Chan.Unagi
- Basado en MVar como se describe en el libro "Programación paralela y concurrente en Haskell" Tenga en cuenta que esta técnica nos da colas limitadas de capacidad 1: se usa solo para pruebas
Pruebas
Aquí está el código fuente utilizado para la prueba:
{-# LANGUAGE NoMonomorphismRestriction #-}
import Control.Concurrent (threadDelay)
import Control.Monad (forever)
import Pipes
import qualified Pipes.Concurrent as Pipes
import Control.Applicative
import Control.Monad (replicateM_)
import System.Environment (getArgs)
import Control.Concurrent.Chan
import Control.Concurrent (forkIO)
import qualified Control.Concurrent.Chan.Unagi as U
import Control.Concurrent.MVar
import Criterion.Main
data Event = Msg String | Status | Quit deriving (Show)
----------------------------------------------------------------------
-- Pipes
----------------------------------------------------------------------
pipesLogMsg = yield (Msg "hello")
pipesManyLogs num = replicateM_ num pipesLogMsg
pipesAddProducer num o = Pipes.forkIO $ do runEffect $ (pipesManyLogs num) >-> Pipes.toOutput o
Pipes.performGC
pipesHandler max = loop 0
where
loop mnum = do
if mnum == max
then lift $ pure ()
else do event <- await
case event of
Msg _ -> loop (mnum + 1)
Status -> (lift $ putStrLn (show mnum)) *> loop mnum
Quit -> return ()
----------------------------------------------------------------------
-- Chan
----------------------------------------------------------------------
chanAddProducer num ch = forkIO $ chanManyLogs num ch
chanManyLogs num ch = replicateM_ num (writeChan ch (Msg "hello"))
chanHandler ch max = handlerIO (readChan ch) max
----------------------------------------------------------------------
-- Unagi-Chan
----------------------------------------------------------------------
uchanAddProducer num ch = forkIO $ uchanManyLogs num ch
uchanManyLogs num ch = replicateM_ num (U.writeChan ch (Msg "hello"))
uchanHandler ch max = handlerIO (U.readChan ch) max
----------------------------------------------------------------------
-- MVars
----------------------------------------------------------------------
mvarAddProducer num m = forkIO $ mvarManyLogs num m
mvarManyLogs num m = replicateM_ num (putMVar m (Msg "hello"))
mvarHandler m max = handlerIO (takeMVar m) max
----------------------------------------------------------------------
-- Utils
----------------------------------------------------------------------
handlerIO f max = loop 0 where
loop mnum = do
if mnum == max
then pure ()
else do event <- f
case event of
Msg _ -> loop (mnum + 1)
Status -> putStrLn (show mnum) *> loop mnum
Quit -> return ()
----------------------------------------------------------------------
-- Main
----------------------------------------------------------------------
main = defaultMain [
bench "pipes" $ nfIO $ do
(output, input) <- Pipes.spawn Pipes.Unbounded
replicateM_ prodNum (pipesAddProducer msgNum output)
runEffect $ Pipes.fromInput input >-> pipesHandler totalMsg
, bench "Chan" $ nfIO $ do
ch <- newChan
replicateM_ prodNum (chanAddProducer msgNum ch)
chanHandler ch totalMsg
, bench "Unagi-Chan" $ nfIO $ do
(inCh, outCh) <- U.newChan
replicateM_ prodNum (uchanAddProducer msgNum inCh)
uchanHandler outCh totalMsg
, bench "MVar" $ nfIO $ do
m <- newEmptyMVar
replicateM_ prodNum (mvarAddProducer msgNum m)
mvarHandler m totalMsg
]
where
prodNum = 20
msgNum = 1000
totalMsg = msgNum * prodNum
Puede compilarlo con ghc -O2 Main.hs
y simplemente ejecutarlo. Las pruebas crean 20 productores de mensajes, cada uno produce 1000000 mensajes.
Resultados
benchmarking pipes
time 46.68 ms (46.19 ms .. 47.31 ms)
0.999 R² (0.999 R² .. 1.000 R²)
mean 47.59 ms (47.20 ms .. 47.95 ms)
std dev 708.3 μs (558.4 μs .. 906.1 μs)
benchmarking Chan
time 4.252 ms (4.171 ms .. 4.351 ms)
0.995 R² (0.991 R² .. 0.998 R²)
mean 4.233 ms (4.154 ms .. 4.314 ms)
std dev 244.8 μs (186.3 μs .. 333.5 μs)
variance introduced by outliers: 35% (moderately inflated)
benchmarking Unagi-Chan
time 1.209 ms (1.198 ms .. 1.224 ms)
0.996 R² (0.993 R² .. 0.999 R²)
mean 1.267 ms (1.244 ms .. 1.308 ms)
std dev 102.4 μs (61.70 μs .. 169.3 μs)
variance introduced by outliers: 62% (severely inflated)
benchmarking MVar
time 1.746 ms (1.714 ms .. 1.774 ms)
0.997 R² (0.995 R² .. 0.998 R²)
mean 1.716 ms (1.694 ms .. 1.739 ms)
std dev 73.99 μs (65.32 μs .. 85.48 μs)
variance introduced by outliers: 29% (moderately inflated)
Pregunta
Me encantaría preguntarte por qué la versión simultánea de tuberías funciona tan lentamente y por qué es mucho más lenta que incluso la basada en chan. Estoy muy sorprendido de que el MVar es el más rápido de todas las versiones. ¿Alguien podría decir más, por qué obtenemos estos resultados y si podemos mejorar en cualquier caso?
Por lo tanto, puedo ofrecerles una breve descripción de algunos de los análisis de Chan
y TQueue
(que las pipes-concurrency
están usando internamente aquí) que motivaron algunas decisiones de diseño que se unagi-chan
en unagi-chan
. No estoy seguro de si va a responder a su pregunta. Recomiendo forjar diferentes colas y jugar con variaciones mientras se realiza una evaluación comparativa para tener una idea real de lo que está sucediendo.
Chan
Chan
parece a
data Chan a
= Chan (MVar (Stream a)) -- pointer to "head", where we read from
(MVar (Stream a)) -- pointer to "tail", where values written to
type Stream a = MVar (ChItem a)
data ChItem a = ChItem a (Stream a)
Es una lista MVar
de MVar
s. Los dos MVar
en el tipo Chan
actúan como punteros a la cabecera y cola actuales de la lista, respectivamente. Así es como se ve una escritura:
writeChan :: Chan a -> a -> IO ()
writeChan (Chan _ writeVar) val = do
new_hole <- newEmptyMVar mask_ $ do
old_hole <- takeMVar writeVar -- [1]
putMVar old_hole (ChItem val new_hole) -- [2]
putMVar writeVar new_hole -- [3]
En 1, el escritor tiene un bloqueo en el extremo de escritura, en 2 nuestro artículo a
está disponible para el lector, y en 3 el final de escritura está desbloqueado para otros escritores.
Esto realmente funciona bastante bien en un escenario de un solo consumidor / productor único (vea el gráfico aquí ) porque las lecturas y escrituras no compiten. Pero una vez que tenga varios escritores simultáneos, puede comenzar a tener problemas:
un escritor que golpea 1 mientras que otro está en 2 bloqueará y se desecará (el más rápido que he podido medir un cambio de contexto es ~ 150ns (bastante rápido); probablemente hay situaciones en las que es mucho más lento). Por lo tanto, cuando hay muchos escritores que sostienen que básicamente están haciendo un gran viaje redondo a través del programador, en una cola de espera para el
MVar
y, finalmente, la escritura puede completarse.Cuando un escritor se desincroniza (porque se agotó el tiempo) mientras que en 2, se mantiene en un bloqueo y no se permite que se complete ninguna escritura hasta que se pueda reprogramar nuevamente; esto se convierte más en un problema cuando estamos sobre-suscritos , es decir, cuando nuestra proporción de hilos / núcleo es alta.
Por último, el uso de un elemento de MVar
requiere cierta sobrecarga en términos de asignación y, lo que es más importante, cuando acumulamos muchos objetos mutables, podemos causar mucha presión en el GC.
TQueue
TQueue
es genial porque STM
hace que sea muy simple razonar acerca de su corrección. Es una cola funcional al estilo de la cola de salida, y una write
consiste simplemente en leer la pila de escritores, analizar nuestro elemento y escribirlo:
data TQueue a = TQueue (TVar [a])
(TVar [a])
writeTQueue :: TQueue a -> a -> STM ()
writeTQueue (TQueue _ write) a = do
listend <- readTVar write -- a transaction with a consistent
writeTVar write (a:listend) -- view of memory
Si después de que writeTQueue
escribir su nueva pila, otra escritura intercalada hace lo mismo, se reintentará una de las escrituras. A writeTQueue
se intercalan más writeTQueue
s, el efecto de la contención empeora. Sin embargo, el rendimiento se degrada mucho más lentamente que en Chan
porque solo hay una única operación de writeTVar
que puede anular los writeTQueue
competencia, y la transacción es muy pequeña (solo una lectura y una (:)
).
Una lectura funciona "eliminando en la cola" la pila del lado de escritura, invirtiéndola y almacenando la pila invertida en su propia variable para "hacer estallar" fácilmente (en conjunto, esto nos da una amortización O (1) push and pop)
readTQueue :: TQueue a -> STM a
readTQueue (TQueue read write) = do
xs <- readTVar read
case xs of
(x:xs'') -> do writeTVar read xs''
return x
[] -> do ys <- readTVar write
case ys of
[] -> retry
_ -> case reverse ys of
[] -> error "readTQueue"
(z:zs) -> do writeTVar write []
writeTVar read zs
return z
Los lectores tienen un problema de contención moderada y simétrica para los escritores. En el caso general, los lectores y escritores no compiten, sin embargo, cuando la pila de lectores está agotada, los lectores compiten con otros lectores y escritores. Sospecho que si ha cargado TQueue
una TQueue
con suficientes valores y luego ha lanzado 4 lectores y 4 escritores, es posible que pueda generar Livelock, ya que el revés no se completó antes de la siguiente escritura. También es interesante observar que a diferencia de MVar
, una escritura a una TVar
en la que muchos lectores están esperando los despierta a todos simultáneamente (esto podría ser más o menos eficiente, según el escenario).
Sospecho que no ves muchas de las debilidades de TQueue
en tu prueba; sobre todo, se ven los efectos moderados de la contención de escritura y la sobrecarga de una gran cantidad de asignaciones y GC de muchos objetos mutables.
unagi-chan
unagi-chan
fue diseñado en primer lugar para manejar bien la contención. Es conceptualmente muy simple, pero la implementación tiene algunas complejidades.
data ChanEnd a = ChanEnd AtomicCounter (IORef (Int , Stream a))
data Stream a = Stream (Array (Cell a)) (IORef (Maybe (Stream a)))
data Cell a = Empty | Written a | Blocking (MVar a)
Los lados de lectura y escritura de la cola comparten la Stream
en la que coordinan los valores pasados (de escritor a lector) y las indicaciones de bloqueo (de lector a escritor), y los lados de lectura y escritura tienen un contador atómico independiente. Una escritura funciona como:
un escritor llama al
incrCounter
atómico en el contador de escritura para recibir su índice único para coordinar con su (único) lectorel escritor encuentra su celda y realiza un CAS de
Written a
si tiene éxito, sale, de lo contrario ve que un lector lo ha superado y está bloqueando (o está procediendo a bloquear), por lo que hace un
(/Blocking v)-> putMVar va)
y sale.
Una lectura funciona de una manera similar y obvia.
La primera innovación es hacer del punto de contención una operación atómica que no se degrade bajo la contención (como lo haría un bucle CAS / reintento o un bloqueo tipo Chan). Basándose en pruebas comparativas y experimentos simples, el primop de búsqueda y adición, expuesto por la biblioteca atomic-primops
, funciona mejor.
Luego, en 2, tanto el lector como el escritor deben realizar solo una comparación e intercambio (la ruta rápida para el lector es una simple lectura no atómica) para finalizar la coordinación.
Así que para tratar de hacer unagi-chan
bueno, nosotros
use fetch-and-add para manejar el punto de contención
use técnicas de bloqueo libre, de manera que cuando estamos demasiado suscritos a un hilo que se está programando en momentos inoportunos no bloquea el progreso de otros hilos (un escritor bloqueado puede bloquear a lo más el lector "asignado" por el contador; lea las advertencias con respecto a las excepciones asíncronas en documentos
unagi-chan
, y note queChan
tiene una semántica más agradable aquí)use una matriz para almacenar nuestros elementos, que tiene una mejor ubicación (pero vea más abajo) una sobrecarga por elemento y pone menos presión en el GC
Una nota final re. usar una matriz: las escrituras concurrentes en una matriz generalmente son una mala idea para la escala porque causa mucho tráfico de coherencia de caché ya que las solteras se invalidan de un lado a otro en sus hebras de escritor. El término general es "falso compartir". Pero también hay aspectos positivos en cuanto a la caché y los inconvenientes de diseños alternativos que se me ocurren que podrían incluir escrituras en bandas o algo así; He estado experimentando con esto un poco, pero no tengo nada concluyente en este momento.
Un lugar en el que legítimamente nos preocupa el intercambio falso está en nuestro contador, que alineamos y rellenamos a 64 bytes; esto sí se mostró en los puntos de referencia, y el único inconveniente es el mayor uso de memoria.
Si tuviera que adivinar por qué las pipes-concurrency
tienen un rendimiento más deficiente, es porque cada lectura y escritura están envueltas en una transacción STM
, mientras que las otras bibliotecas usan primitivas de concurrencia de bajo nivel más eficientes.