haskell concurrency profiling stm haskell-pipes

Haskell rápida cola concurrente



concurrency profiling (2)

El problema

¡Hola! Estoy escribiendo una biblioteca de registro y me encantaría crear un registrador, que se ejecutaría en un subproceso separado, mientras que todos los subprocesos de aplicaciones solo le enviarían mensajes. Quiero encontrar la solución más eficaz para este problema. Necesito una cola de desbloqueo simple aquí.

Enfoques

He creado algunas pruebas para ver cómo funcionan las soluciones disponibles y obtengo resultados muy extraños aquí. Probé 4 implementaciones (código fuente proporcionado a continuación) basado en:

  1. pipes-concurrency
  2. Control.Concurrent.Chan
  3. Control.Concurrent.Chan.Unagi
  4. Basado en MVar como se describe en el libro "Programación paralela y concurrente en Haskell" Tenga en cuenta que esta técnica nos da colas limitadas de capacidad 1: se usa solo para pruebas

Pruebas

Aquí está el código fuente utilizado para la prueba:

{-# LANGUAGE NoMonomorphismRestriction #-} import Control.Concurrent (threadDelay) import Control.Monad (forever) import Pipes import qualified Pipes.Concurrent as Pipes import Control.Applicative import Control.Monad (replicateM_) import System.Environment (getArgs) import Control.Concurrent.Chan import Control.Concurrent (forkIO) import qualified Control.Concurrent.Chan.Unagi as U import Control.Concurrent.MVar import Criterion.Main data Event = Msg String | Status | Quit deriving (Show) ---------------------------------------------------------------------- -- Pipes ---------------------------------------------------------------------- pipesLogMsg = yield (Msg "hello") pipesManyLogs num = replicateM_ num pipesLogMsg pipesAddProducer num o = Pipes.forkIO $ do runEffect $ (pipesManyLogs num) >-> Pipes.toOutput o Pipes.performGC pipesHandler max = loop 0 where loop mnum = do if mnum == max then lift $ pure () else do event <- await case event of Msg _ -> loop (mnum + 1) Status -> (lift $ putStrLn (show mnum)) *> loop mnum Quit -> return () ---------------------------------------------------------------------- -- Chan ---------------------------------------------------------------------- chanAddProducer num ch = forkIO $ chanManyLogs num ch chanManyLogs num ch = replicateM_ num (writeChan ch (Msg "hello")) chanHandler ch max = handlerIO (readChan ch) max ---------------------------------------------------------------------- -- Unagi-Chan ---------------------------------------------------------------------- uchanAddProducer num ch = forkIO $ uchanManyLogs num ch uchanManyLogs num ch = replicateM_ num (U.writeChan ch (Msg "hello")) uchanHandler ch max = handlerIO (U.readChan ch) max ---------------------------------------------------------------------- -- MVars ---------------------------------------------------------------------- mvarAddProducer num m = forkIO $ mvarManyLogs num m mvarManyLogs num m = replicateM_ num (putMVar m (Msg "hello")) mvarHandler m max = handlerIO (takeMVar m) max ---------------------------------------------------------------------- -- Utils ---------------------------------------------------------------------- handlerIO f max = loop 0 where loop mnum = do if mnum == max then pure () else do event <- f case event of Msg _ -> loop (mnum + 1) Status -> putStrLn (show mnum) *> loop mnum Quit -> return () ---------------------------------------------------------------------- -- Main ---------------------------------------------------------------------- main = defaultMain [ bench "pipes" $ nfIO $ do (output, input) <- Pipes.spawn Pipes.Unbounded replicateM_ prodNum (pipesAddProducer msgNum output) runEffect $ Pipes.fromInput input >-> pipesHandler totalMsg , bench "Chan" $ nfIO $ do ch <- newChan replicateM_ prodNum (chanAddProducer msgNum ch) chanHandler ch totalMsg , bench "Unagi-Chan" $ nfIO $ do (inCh, outCh) <- U.newChan replicateM_ prodNum (uchanAddProducer msgNum inCh) uchanHandler outCh totalMsg , bench "MVar" $ nfIO $ do m <- newEmptyMVar replicateM_ prodNum (mvarAddProducer msgNum m) mvarHandler m totalMsg ] where prodNum = 20 msgNum = 1000 totalMsg = msgNum * prodNum

Puede compilarlo con ghc -O2 Main.hs y simplemente ejecutarlo. Las pruebas crean 20 productores de mensajes, cada uno produce 1000000 mensajes.

Resultados

benchmarking pipes time 46.68 ms (46.19 ms .. 47.31 ms) 0.999 R² (0.999 R² .. 1.000 R²) mean 47.59 ms (47.20 ms .. 47.95 ms) std dev 708.3 μs (558.4 μs .. 906.1 μs) benchmarking Chan time 4.252 ms (4.171 ms .. 4.351 ms) 0.995 R² (0.991 R² .. 0.998 R²) mean 4.233 ms (4.154 ms .. 4.314 ms) std dev 244.8 μs (186.3 μs .. 333.5 μs) variance introduced by outliers: 35% (moderately inflated) benchmarking Unagi-Chan time 1.209 ms (1.198 ms .. 1.224 ms) 0.996 R² (0.993 R² .. 0.999 R²) mean 1.267 ms (1.244 ms .. 1.308 ms) std dev 102.4 μs (61.70 μs .. 169.3 μs) variance introduced by outliers: 62% (severely inflated) benchmarking MVar time 1.746 ms (1.714 ms .. 1.774 ms) 0.997 R² (0.995 R² .. 0.998 R²) mean 1.716 ms (1.694 ms .. 1.739 ms) std dev 73.99 μs (65.32 μs .. 85.48 μs) variance introduced by outliers: 29% (moderately inflated)

Pregunta

Me encantaría preguntarte por qué la versión simultánea de tuberías funciona tan lentamente y por qué es mucho más lenta que incluso la basada en chan. Estoy muy sorprendido de que el MVar es el más rápido de todas las versiones. ¿Alguien podría decir más, por qué obtenemos estos resultados y si podemos mejorar en cualquier caso?


Por lo tanto, puedo ofrecerles una breve descripción de algunos de los análisis de Chan y TQueue (que las pipes-concurrency están usando internamente aquí) que motivaron algunas decisiones de diseño que se unagi-chan en unagi-chan . No estoy seguro de si va a responder a su pregunta. Recomiendo forjar diferentes colas y jugar con variaciones mientras se realiza una evaluación comparativa para tener una idea real de lo que está sucediendo.

Chan

Chan parece a

data Chan a = Chan (MVar (Stream a)) -- pointer to "head", where we read from (MVar (Stream a)) -- pointer to "tail", where values written to type Stream a = MVar (ChItem a) data ChItem a = ChItem a (Stream a)

Es una lista MVar de MVar s. Los dos MVar en el tipo Chan actúan como punteros a la cabecera y cola actuales de la lista, respectivamente. Así es como se ve una escritura:

writeChan :: Chan a -> a -> IO () writeChan (Chan _ writeVar) val = do new_hole <- newEmptyMVar mask_ $ do old_hole <- takeMVar writeVar -- [1] putMVar old_hole (ChItem val new_hole) -- [2] putMVar writeVar new_hole -- [3]

En 1, el escritor tiene un bloqueo en el extremo de escritura, en 2 nuestro artículo a está disponible para el lector, y en 3 el final de escritura está desbloqueado para otros escritores.

Esto realmente funciona bastante bien en un escenario de un solo consumidor / productor único (vea el gráfico aquí ) porque las lecturas y escrituras no compiten. Pero una vez que tenga varios escritores simultáneos, puede comenzar a tener problemas:

  • un escritor que golpea 1 mientras que otro está en 2 bloqueará y se desecará (el más rápido que he podido medir un cambio de contexto es ~ 150ns (bastante rápido); probablemente hay situaciones en las que es mucho más lento). Por lo tanto, cuando hay muchos escritores que sostienen que básicamente están haciendo un gran viaje redondo a través del programador, en una cola de espera para el MVar y, finalmente, la escritura puede completarse.

  • Cuando un escritor se desincroniza (porque se agotó el tiempo) mientras que en 2, se mantiene en un bloqueo y no se permite que se complete ninguna escritura hasta que se pueda reprogramar nuevamente; esto se convierte más en un problema cuando estamos sobre-suscritos , es decir, cuando nuestra proporción de hilos / núcleo es alta.

Por último, el uso de un elemento de MVar requiere cierta sobrecarga en términos de asignación y, lo que es más importante, cuando acumulamos muchos objetos mutables, podemos causar mucha presión en el GC.

TQueue

TQueue es genial porque STM hace que sea muy simple razonar acerca de su corrección. Es una cola funcional al estilo de la cola de salida, y una write consiste simplemente en leer la pila de escritores, analizar nuestro elemento y escribirlo:

data TQueue a = TQueue (TVar [a]) (TVar [a]) writeTQueue :: TQueue a -> a -> STM () writeTQueue (TQueue _ write) a = do listend <- readTVar write -- a transaction with a consistent writeTVar write (a:listend) -- view of memory

Si después de que writeTQueue escribir su nueva pila, otra escritura intercalada hace lo mismo, se reintentará una de las escrituras. A writeTQueue se intercalan más writeTQueue s, el efecto de la contención empeora. Sin embargo, el rendimiento se degrada mucho más lentamente que en Chan porque solo hay una única operación de writeTVar que puede anular los writeTQueue competencia, y la transacción es muy pequeña (solo una lectura y una (:) ).

Una lectura funciona "eliminando en la cola" la pila del lado de escritura, invirtiéndola y almacenando la pila invertida en su propia variable para "hacer estallar" fácilmente (en conjunto, esto nos da una amortización O (1) push and pop)

readTQueue :: TQueue a -> STM a readTQueue (TQueue read write) = do xs <- readTVar read case xs of (x:xs'') -> do writeTVar read xs'' return x [] -> do ys <- readTVar write case ys of [] -> retry _ -> case reverse ys of [] -> error "readTQueue" (z:zs) -> do writeTVar write [] writeTVar read zs return z

Los lectores tienen un problema de contención moderada y simétrica para los escritores. En el caso general, los lectores y escritores no compiten, sin embargo, cuando la pila de lectores está agotada, los lectores compiten con otros lectores y escritores. Sospecho que si ha cargado TQueue una TQueue con suficientes valores y luego ha lanzado 4 lectores y 4 escritores, es posible que pueda generar Livelock, ya que el revés no se completó antes de la siguiente escritura. También es interesante observar que a diferencia de MVar , una escritura a una TVar en la que muchos lectores están esperando los despierta a todos simultáneamente (esto podría ser más o menos eficiente, según el escenario).

Sospecho que no ves muchas de las debilidades de TQueue en tu prueba; sobre todo, se ven los efectos moderados de la contención de escritura y la sobrecarga de una gran cantidad de asignaciones y GC de muchos objetos mutables.

unagi-chan

unagi-chan fue diseñado en primer lugar para manejar bien la contención. Es conceptualmente muy simple, pero la implementación tiene algunas complejidades.

data ChanEnd a = ChanEnd AtomicCounter (IORef (Int , Stream a)) data Stream a = Stream (Array (Cell a)) (IORef (Maybe (Stream a))) data Cell a = Empty | Written a | Blocking (MVar a)

Los lados de lectura y escritura de la cola comparten la Stream en la que coordinan los valores pasados ​​(de escritor a lector) y las indicaciones de bloqueo (de lector a escritor), y los lados de lectura y escritura tienen un contador atómico independiente. Una escritura funciona como:

  1. un escritor llama al incrCounter atómico en el contador de escritura para recibir su índice único para coordinar con su (único) lector

  2. el escritor encuentra su celda y realiza un CAS de Written a

  3. si tiene éxito, sale, de lo contrario ve que un lector lo ha superado y está bloqueando (o está procediendo a bloquear), por lo que hace un (/Blocking v)-> putMVar va) y sale.

Una lectura funciona de una manera similar y obvia.

La primera innovación es hacer del punto de contención una operación atómica que no se degrade bajo la contención (como lo haría un bucle CAS / reintento o un bloqueo tipo Chan). Basándose en pruebas comparativas y experimentos simples, el primop de búsqueda y adición, expuesto por la biblioteca atomic-primops , funciona mejor.

Luego, en 2, tanto el lector como el escritor deben realizar solo una comparación e intercambio (la ruta rápida para el lector es una simple lectura no atómica) para finalizar la coordinación.

Así que para tratar de hacer unagi-chan bueno, nosotros

  • use fetch-and-add para manejar el punto de contención

  • use técnicas de bloqueo libre, de manera que cuando estamos demasiado suscritos a un hilo que se está programando en momentos inoportunos no bloquea el progreso de otros hilos (un escritor bloqueado puede bloquear a lo más el lector "asignado" por el contador; lea las advertencias con respecto a las excepciones asíncronas en documentos unagi-chan , y note que Chan tiene una semántica más agradable aquí)

  • use una matriz para almacenar nuestros elementos, que tiene una mejor ubicación (pero vea más abajo) una sobrecarga por elemento y pone menos presión en el GC

Una nota final re. usar una matriz: las escrituras concurrentes en una matriz generalmente son una mala idea para la escala porque causa mucho tráfico de coherencia de caché ya que las solteras se invalidan de un lado a otro en sus hebras de escritor. El término general es "falso compartir". Pero también hay aspectos positivos en cuanto a la caché y los inconvenientes de diseños alternativos que se me ocurren que podrían incluir escrituras en bandas o algo así; He estado experimentando con esto un poco, pero no tengo nada concluyente en este momento.

Un lugar en el que legítimamente nos preocupa el intercambio falso está en nuestro contador, que alineamos y rellenamos a 64 bytes; esto sí se mostró en los puntos de referencia, y el único inconveniente es el mayor uso de memoria.


Si tuviera que adivinar por qué las pipes-concurrency tienen un rendimiento más deficiente, es porque cada lectura y escritura están envueltas en una transacción STM , mientras que las otras bibliotecas usan primitivas de concurrencia de bajo nivel más eficientes.