performance - Reducción del tiempo de pausa de recolección de basura en un programa Haskell
garbage-collection ghc (4)
Estamos desarrollando un programa que recibe y reenvía "mensajes", mientras mantiene un historial temporal de esos mensajes, para que pueda informarle el historial de mensajes si así lo solicita. Los mensajes se identifican numéricamente, generalmente tienen un tamaño de alrededor de 1 kilobyte, y necesitamos mantener cientos de miles de estos mensajes.
Deseamos optimizar este programa para la latencia: el tiempo entre enviar y recibir un mensaje debe ser inferior a 10 milisegundos.
El programa está escrito en Haskell y compilado con GHC. Sin embargo, hemos descubierto que las pausas de recolección de basura son demasiado largas para nuestros requisitos de latencia: más de 100 milisegundos en nuestro programa del mundo real.
El siguiente programa es una versión simplificada de nuestra aplicación.
Utiliza un
Data.Map.Strict
para almacenar mensajes.
Los mensajes son
ByteString
s identificados por un
Int
.
Se insertan 1,000,000 de mensajes en orden numérico creciente, y los mensajes más antiguos se eliminan continuamente para mantener el historial en un máximo de 200,000 mensajes.
module Main (main) where
import qualified Control.Exception as Exception
import qualified Control.Monad as Monad
import qualified Data.ByteString as ByteString
import qualified Data.Map.Strict as Map
data Msg = Msg !Int !ByteString.ByteString
type Chan = Map.Map Int ByteString.ByteString
message :: Int -> Msg
message n = Msg n (ByteString.replicate 1024 (fromIntegral n))
pushMsg :: Chan -> Msg -> IO Chan
pushMsg chan (Msg msgId msgContent) =
Exception.evaluate $
let
inserted = Map.insert msgId msgContent chan
in
if 200000 < Map.size inserted
then Map.deleteMin inserted
else inserted
main :: IO ()
main = Monad.foldM_ pushMsg Map.empty (map message [1..1000000])
Compilamos y ejecutamos este programa usando:
$ ghc --version
The Glorious Glasgow Haskell Compilation System, version 7.10.3
$ ghc -O2 -optc-O3 Main.hs
$ ./Main +RTS -s
3,116,460,096 bytes allocated in the heap
385,101,600 bytes copied during GC
235,234,800 bytes maximum residency (14 sample(s))
124,137,808 bytes maximum slop
600 MB total memory in use (0 MB lost due to fragmentation)
Tot time (elapsed) Avg pause Max pause
Gen 0 6558 colls, 0 par 0.238s 0.280s 0.0000s 0.0012s
Gen 1 14 colls, 0 par 0.179s 0.250s 0.0179s 0.0515s
INIT time 0.000s ( 0.000s elapsed)
MUT time 0.652s ( 0.745s elapsed)
GC time 0.417s ( 0.530s elapsed)
EXIT time 0.010s ( 0.052s elapsed)
Total time 1.079s ( 1.326s elapsed)
%GC time 38.6% (40.0% elapsed)
Alloc rate 4,780,213,353 bytes per MUT second
Productivity 61.4% of total user, 49.9% of total elapsed
La métrica importante aquí es la "pausa máxima" de 0.0515s, o 51 milisegundos. Deseamos reducir esto en al menos un orden de magnitud.
La experimentación muestra que la duración de una pausa de GC está determinada por el número de mensajes en el historial. La relación es aproximadamente lineal, o tal vez superlineal. La siguiente tabla muestra esta relación. ( Puede ver nuestras pruebas de evaluación comparativa aquí , y algunas tablas aquí ).
msgs history length max GC pause (ms)
=================== =================
12500 3
25000 6
50000 13
100000 30
200000 56
400000 104
800000 199
1600000 487
3200000 1957
6400000 5378
Hemos experimentado con varias otras variables para encontrar si pueden reducir esta latencia, ninguna de las cuales hace una gran diferencia.
Entre estas variables sin importancia están: optimización (
-O
,
-O2
);
Opciones de GC RTS (
-G
,
-H
,
-A
,
-c
), número de núcleos (
-N
), diferentes estructuras de datos (
Data.Sequence
datos), el tamaño de los mensajes y la cantidad de basura de corta duración generada.
El factor determinante abrumador es la cantidad de mensajes en el historial.
Nuestra teoría de trabajo es que las pausas son lineales en la cantidad de mensajes porque cada ciclo de GC tiene que recorrer toda la memoria accesible de trabajo y copiarla, que son operaciones claramente lineales.
Preguntas:
- ¿Es correcta esta teoría del tiempo lineal? ¿Se puede expresar la duración de las pausas GC de esta manera simple, o la realidad es más compleja?
- Si la pausa de GC es lineal en la memoria de trabajo, ¿hay alguna forma de reducir los factores constantes involucrados?
- ¿Hay alguna opción para GC incremental, o algo así? Solo podemos ver trabajos de investigación. Estamos muy dispuestos a cambiar el rendimiento por una menor latencia.
- ¿Hay alguna forma de "particionar" la memoria para ciclos GC más pequeños, aparte de dividirla en múltiples procesos?
Bueno, usted encontró la limitación de los idiomas con GC: no son aptos para sistemas hardcore en tiempo real.
Tienes 2 opciones:
Primero, aumente el tamaño del almacenamiento dinámico y use un sistema de almacenamiento en caché de 2 niveles, los mensajes más antiguos se envían al disco y mantiene los mensajes más nuevos en la memoria, puede hacerlo mediante la paginación del sistema operativo. Sin embargo, el problema con esta solución es que la paginación puede ser costosa dependiendo de las capacidades de lectura de la unidad de memoria secundaria utilizada.
2º Programe esa solución usando ''C'' e interconecte con FFI para haskell. De esa manera puede hacer su propia gestión de memoria. Esta sería la mejor opción, ya que puede controlar la memoria que necesita usted mismo.
En realidad, está haciendo bastante bien tener un tiempo de pausa de 51 ms con más de 200 Mb de datos en vivo. El sistema en el que trabajo tiene un tiempo de pausa máximo mayor con la mitad de esa cantidad de datos en vivo.
Su suposición es correcta, el tiempo de pausa principal de GC es directamente proporcional a la cantidad de datos en vivo, y desafortunadamente no hay forma de evitarlo con GHC tal como está. Experimentamos con GC incremental en el pasado, pero fue un proyecto de investigación y no alcanzó el nivel de madurez necesario para incorporarlo al GHC liberado.
Una cosa que esperamos que ayude con esto en el futuro son las regiones compactas: https://phabricator.haskell.org/D1264 . Es un tipo de gestión de memoria manual donde compacta una estructura en el montón, y el GC no tiene que atravesarla. Funciona mejor para datos de larga duración, pero tal vez sea lo suficientemente bueno como para usar para mensajes individuales en su configuración. Nuestro objetivo es tenerlo en GHC 8.2.0.
Si está en una configuración distribuida y tiene un balanceador de carga de algún tipo, hay trucos que puede jugar para evitar el golpe de pausa, básicamente se asegura de que el balanceador de carga no envíe solicitudes a las máquinas que están a punto de haga un GC importante y, por supuesto, asegúrese de que la máquina aún complete el GC aunque no reciba solicitudes.
He intentado su fragmento de código con un enfoque de
IOVector
utilizando
IOVector
como estructura de datos subyacente.
En mi sistema (GHC 7.10.3, las mismas opciones de compilación) esto resultó en una reducción del tiempo máximo (la métrica que mencionó en su OP) en ~ 22%.
NÓTESE BIEN. Hice dos suposiciones aquí:
- Una estructura de datos mutable es adecuada para el problema (supongo que pasar mensajes implica IO de todos modos)
- Tus mensajes son continuos
Con algunos parámetros
Int
y aritméticos adicionales (como cuando los ID de mensaje se restablecen de nuevo a 0 o
minBound
), debería ser sencillo determinar si un determinado mensaje todavía está en el historial y recuperarlo del índice correspondiente en el ringbuffer.
Para su placer de prueba:
import qualified Control.Exception as Exception
import qualified Control.Monad as Monad
import qualified Data.ByteString as ByteString
import qualified Data.Map.Strict as Map
import qualified Data.Vector.Mutable as Vector
data Msg = Msg !Int !ByteString.ByteString
type Chan = Map.Map Int ByteString.ByteString
data Chan2 = Chan2
{ next :: !Int
, maxId :: !Int
, ringBuffer :: !(Vector.IOVector ByteString.ByteString)
}
chanSize :: Int
chanSize = 200000
message :: Int -> Msg
message n = Msg n (ByteString.replicate 1024 (fromIntegral n))
newChan2 :: IO Chan2
newChan2 = Chan2 0 0 <$> Vector.unsafeNew chanSize
pushMsg2 :: Chan2 -> Msg -> IO Chan2
pushMsg2 (Chan2 ix _ store) (Msg msgId msgContent) =
let ix'' = if ix == chanSize then 0 else ix + 1
in Vector.unsafeWrite store ix'' msgContent >> return (Chan2 ix'' msgId store)
pushMsg :: Chan -> Msg -> IO Chan
pushMsg chan (Msg msgId msgContent) =
Exception.evaluate $
let
inserted = Map.insert msgId msgContent chan
in
if chanSize < Map.size inserted
then Map.deleteMin inserted
else inserted
main, main1, main2 :: IO ()
main = main2
main1 = Monad.foldM_ pushMsg Map.empty (map message [1..1000000])
main2 = newChan2 >>= /c -> Monad.foldM_ pushMsg2 c (map message [1..1000000])
Tengo que estar de acuerdo con los demás: si tiene restricciones difíciles en tiempo real, entonces usar un lenguaje GC no es lo ideal.
Sin embargo, puede considerar experimentar con otras estructuras de datos disponibles en lugar de solo Data.Map.
Lo reescribí usando Data.Sequence y obtuve algunas mejoras prometedoras:
msgs history length max GC pause (ms)
=================== =================
12500 0.7
25000 1.4
50000 2.8
100000 5.4
200000 10.9
400000 21.8
800000 46
1600000 87
3200000 175
6400000 350
Aunque está optimizando la latencia, noté que otras métricas también mejoraron. En el caso 200000, el tiempo de ejecución cae de 1.5s a 0.2s, y el uso total de memoria cae de 600MB a 27MB.
Debo señalar que hice trampa ajustando el diseño:
-
Quité el
Int
delMsg
, por lo que no está en dos lugares. -
En lugar de usar un Mapa de
Int
s aByteString
s, utilicé unaSequence
deByteString
s, y en lugar de unaInt
por mensaje, creo que se puede hacer con unInt
para toda laSequence
. Suponiendo que los mensajes no se pueden reordenar, puede usar un solo desplazamiento para traducir el mensaje que desea a su ubicación en la cola.
(
getMsg
una función adicional
getMsg
para demostrar eso).
{-# LANGUAGE BangPatterns #-}
import qualified Control.Exception as Exception
import qualified Control.Monad as Monad
import qualified Data.ByteString as ByteString
import Data.Sequence as S
newtype Msg = Msg ByteString.ByteString
data Chan = Chan Int (Seq ByteString.ByteString)
message :: Int -> Msg
message n = Msg (ByteString.replicate 1024 (fromIntegral n))
maxSize :: Int
maxSize = 200000
pushMsg :: Chan -> Msg -> IO Chan
pushMsg (Chan !offset sq) (Msg msgContent) =
Exception.evaluate $
let newSize = 1 + S.length sq
newSq = sq |> msgContent
in
if newSize <= maxSize
then Chan offset newSq
else
case S.viewl newSq of
(_ :< newSq'') -> Chan (offset+1) newSq''
S.EmptyL -> error "Can''t happen"
getMsg :: Chan -> Int -> Maybe Msg
getMsg (Chan offset sq) i_ = getMsg'' (i_ - offset)
where
getMsg'' i
| i < 0 = Nothing
| i >= S.length sq = Nothing
| otherwise = Just (Msg (S.index sq i))
main :: IO ()
main = Monad.foldM_ pushMsg (Chan 0 S.empty) (map message [1..5 * maxSize])