concurrency latency actor disruptor-pattern

concurrency - ¿Cómo funciona el patrón de disruptor de LMAX?



latency actor (5)

De este artículo :

El patrón de disruptor es una cola de procesamiento por lotes respaldada por una matriz circular (es decir, el búfer en anillo) llena de objetos de transferencia asignados previamente que utiliza barreras de memoria para sincronizar productores y consumidores a través de secuencias.

Las barreras de memoria son un poco difíciles de explicar y el blog de Trisha ha hecho el mejor intento en mi opinión con este post: http://mechanitis.blogspot.com/2011/08/dissecting-disruptor-why-its-so-fast.html

Pero si no desea sumergirse en los detalles de bajo nivel, simplemente puede saber que las barreras de memoria en Java se implementan a través de la palabra clave volatile o a través de java.util.concurrent.AtomicLong . Las secuencias de patrones de interruptores son AtomicLong y se comunican entre productores y consumidores a través de barreras de memoria en lugar de bloqueos.

Me resulta más fácil entender un concepto a través del código, por lo que el código a continuación es un CoralQueue simple de CoralQueue , que es una implementación del patrón de interruptores realizada por CoralBlocks con la que estoy afiliado. En el código a continuación, puede ver cómo el patrón del disruptor implementa el procesamiento por lotes y cómo el buffer de anillo (es decir, una matriz circular) permite una comunicación sin basura entre dos subprocesos:

package com.coralblocks.coralqueue.sample.queue; import com.coralblocks.coralqueue.AtomicQueue; import com.coralblocks.coralqueue.Queue; import com.coralblocks.coralqueue.util.MutableLong; public class Sample { public static void main(String[] args) throws InterruptedException { final Queue<MutableLong> queue = new AtomicQueue<MutableLong>(1024, MutableLong.class); Thread consumer = new Thread() { @Override public void run() { boolean running = true; while(running) { long avail; while((avail = queue.availableToPoll()) == 0); // busy spin for(int i = 0; i < avail; i++) { MutableLong ml = queue.poll(); if (ml.get() == -1) { running = false; } else { System.out.println(ml.get()); } } queue.donePolling(); } } }; consumer.start(); MutableLong ml; for(int i = 0; i < 10; i++) { while((ml = queue.nextToDispatch()) == null); // busy spin ml.set(System.nanoTime()); queue.flush(); } // send a message to stop consumer... while((ml = queue.nextToDispatch()) == null); // busy spin ml.set(-1); queue.flush(); consumer.join(); // wait for the consumer thread to die... } }

Estoy tratando de entender el patrón del disruptor . He visto el video de InfoQ y he intentado leer su artículo. Entiendo que hay un búfer de anillo involucrado, que se inicializa como una matriz extremadamente grande para aprovechar la localidad de caché, eliminar la asignación de nueva memoria.

Parece que hay uno o más enteros atómicos que hacen un seguimiento de las posiciones. Cada ''evento'' parece obtener una identificación única y su posición en el anillo se encuentra al encontrar su módulo con respecto al tamaño del anillo, etc., etc.

Desafortunadamente, no tengo un sentido intuitivo de cómo funciona. He realizado muchas aplicaciones comerciales y estudié el modelo de actor , miré SEDA, etc.

En su presentación mencionaron que este patrón es básicamente cómo funcionan los enrutadores; Sin embargo, tampoco he encontrado ninguna buena descripción de cómo funcionan los enrutadores.

¿Hay algunos buenos consejos para una mejor explicación?


De hecho, me tomé el tiempo para estudiar la fuente real, por pura curiosidad, y la idea detrás de esto es bastante simple. La versión más reciente en el momento de escribir este post es 3.2.1.

Hay un búfer que almacena eventos asignados previamente que contendrán los datos para que los consumidores los lean.

El búfer está respaldado por una matriz de indicadores (matriz de enteros) de su longitud que describe la disponibilidad de las ranuras de búfer (consulte más detalles para obtener más información). Se accede a la matriz como un java # AtomicIntegerArray, por lo que para el propósito de esta explicación también puede asumir que es uno.

Puede haber cualquier número de productores. Cuando el productor desea escribir en el búfer, se genera un número largo (como al llamar a AtomicLong # getAndIncrement, el Disruptor en realidad usa su propia implementación, pero funciona de la misma manera). Llamemos a esto generado desde hace mucho tiempo. De manera similar, un consumerCallId se genera cuando un consumidor ENDS lee una ranura de un búfer. Se accede a la ConsumerCallId más reciente.

(Si hay muchos consumidores, se elige la llamada con el ID más bajo).

Estos identificadores se comparan, y si la diferencia entre los dos es menor que en el lado del búfer, el productor puede escribir.

(Si producerCallId es mayor que el reciente consumerCallId + bufferSize, significa que el búfer está lleno, y el productor está obligado a esperar en el bus hasta que haya un punto disponible).

Luego, al productor se le asigna la ranura en el búfer según su callId (que es prducerCallId modulo bufferSize, pero como bufferSize es siempre una potencia de 2 (límite impuesto en la creación del búfer), la operación actuall utilizada es producerCallId & (bufferSize - 1 )). Entonces es libre de modificar el evento en esa ranura.

(El algoritmo real es un poco más complicado, e involucra el almacenamiento en caché del último ID de consumidor en una referencia atómica por separado, para propósitos de optimización).

Cuando el evento fue modificado, el cambio es "publicado". Cuando se publica la ranura correspondiente en la matriz de marcas, se rellena con la marca actualizada. El valor del indicador es el número del bucle (producerCallId dividido por bufferSize (nuevamente, dado que bufferSize es potencia de 2, la operación real es un cambio a la derecha).

De manera similar puede haber cualquier número de consumidores. Cada vez que un consumidor desea acceder al búfer, se genera un consumerCallId (dependiendo de cómo se agregaron los consumidores al disruptor, el atómico utilizado en la generación de id puede ser compartido o separado para cada uno de ellos). Este consumerCallId se compara con el productentCallId más reciente, y si es el menor de los dos, el lector puede progresar.

(Similarmente, si producerCallId es parejo a consumerCallId, significa que el búfer está vacío y el consumidor está obligado a esperar. La estrategia de espera se define mediante una WaitStrategy durante la creación del disruptor).

Para los consumidores individuales (los que tienen su propio generador de identificación), lo siguiente que se verifica es la posibilidad de consumir por lotes. Las ranuras en el búfer se examinan en orden desde la respectiva a la ConsumerCallId (el índice se determina de la misma manera que para los productores), a la respectiva a la productora reciente CallId.

Se examinan en un bucle comparando el valor del indicador escrito en la matriz del indicador, con un valor del indicador generado para el ConsumerCallId. Si las banderas coinciden, significa que los productores que llenan los espacios han cometido sus cambios. Si no, el bucle se rompe, y se devuelve el changeId comprometido más alto. Las ranuras desde ConsumerCallId hasta que se recibieron en changeId se pueden consumir por lotes.

Si un grupo de consumidores lee juntos (los que tienen un generador de ID compartido), cada uno solo toma un solo Id. De llamada, y solo se comprueba y devuelve la ranura para ese único Id. De llamada.


El proyecto Google Code hace referencia a un documento técnico sobre la implementación del búfer de anillo, sin embargo, es un poco seco, académico y difícil para alguien que quiera aprender cómo funciona. Sin embargo, hay algunas publicaciones de blog que han comenzado a explicar los aspectos internos de una manera más legible. Hay una explicación de la memoria intermedia de anillo que es el núcleo del patrón del disruptor, una descripción de las barreras del consumidor (la parte relacionada con la lectura del disruptor) y cierta información sobre el manejo de múltiples productores disponibles.

La descripción más simple del Disruptor es: es una forma de enviar mensajes entre subprocesos de la manera más eficiente posible. Puede usarse como alternativa a una cola, pero también comparte una serie de características con SEDA y Actores.

Comparado a las colas:

El Disruptor proporciona la capacidad de pasar un mensaje a otros subprocesos, activándolo si es necesario (similar a un BlockingQueue). Sin embargo, hay 3 diferencias distintas.

  1. El usuario del Disruptor define cómo se almacenan los mensajes al extender la clase de Entrada y proporcionar una fábrica para realizar la preasignación. Esto permite la reutilización (copia) de la memoria o la entrada podría contener una referencia a otro objeto.
  2. Poner mensajes en el Disruptor es un proceso de 2 fases, primero se reclama una ranura en el búfer de anillo, que proporciona al usuario la Entrada que puede rellenarse con los datos apropiados. Luego se debe confirmar la entrada, este enfoque de 2 fases es necesario para permitir el uso flexible de la memoria mencionada anteriormente. Es la confirmación la que hace que el mensaje sea visible para las hebras del consumidor.
  3. Es responsabilidad del consumidor realizar un seguimiento de los mensajes que se han consumido desde el búfer de anillo. Alejar esta responsabilidad del búfer de anillo ayudó a reducir la cantidad de contención de escritura, ya que cada hilo mantiene su propio contador.

Comparado con los actores

El modelo Actor está más cerca del Disruptor que la mayoría de los otros modelos de programación, especialmente si utiliza las clases BatchConsumer / BatchHandler que se proporcionan. Estas clases ocultan todas las complejidades de mantener los números de secuencia consumidos y proporcionan un conjunto de devoluciones de llamada simples cuando ocurren eventos importantes. Sin embargo, hay un par de diferencias sutiles.

  1. El Disruptor usa un modelo de consumidor de 1 subproceso - 1, donde los Actores usan un modelo N: M, es decir, puede tener tantos actores como desee y se distribuirán en un número fijo de subprocesos (generalmente 1 por núcleo).
  2. La interfaz BatchHandler proporciona una devolución de llamada adicional (y muy importante) onEndOfBatch() . Esto permite a los consumidores lentos, por ejemplo, aquellos que realizan E / S para agrupar eventos para mejorar el rendimiento. Es posible realizar lotes en otros marcos de Actor, sin embargo, como casi todos los demás marcos no proporcionan una devolución de llamada al final del lote, debe usar un tiempo de espera para determinar el final del lote, lo que resulta en una latencia deficiente.

Comparado con SEDA

LMAX creó el patrón Disruptor para reemplazar un enfoque basado en SEDA.

  1. La principal mejora que proporcionó sobre SEDA fue la capacidad de realizar trabajos en paralelo. Para hacer esto, el Disruptor admite la difusión múltiple de los mismos mensajes (en el mismo orden) a varios consumidores. Esto evita la necesidad de etapas de horquilla en la tubería.
  2. También permitimos que los consumidores esperen los resultados de otros consumidores sin tener que poner otra fase de cola entre ellos. Un consumidor puede simplemente observar el número de secuencia de un consumidor del que depende. Esto evita la necesidad de unir etapas en la tubería.

Comparado con las barreras de memoria

Otra forma de pensarlo es como una barrera de memoria ordenada y estructurada. Donde la barrera del productor forma la barrera de escritura y la barrera del consumidor es la barrera de lectura.


Martin Fowler ha escrito un artículo sobre LMAX y el patrón de disruptor, The LMAX Architecture , que puede aclararlo más.


Primero nos gustaría entender el modelo de programación que ofrece.

Hay uno o más escritores. Hay uno o más lectores. Hay una línea de entradas, totalmente ordenadas de antiguas a nuevas (ilustradas de izquierda a derecha). Los escritores pueden agregar nuevas entradas en el extremo derecho. Cada lector lee entradas secuencialmente de izquierda a derecha. Los lectores no pueden leer escritores pasados, obviamente.

No hay concepto de eliminación de entrada. Utilizo "lector" en lugar de "consumidor" para evitar que se consuma la imagen de las entradas. Sin embargo, entendemos que las entradas a la izquierda del último lector se vuelven inútiles.

Generalmente los lectores pueden leer concurrentemente e independientemente. Sin embargo podemos declarar dependencias entre los lectores. Las dependencias del lector pueden ser arbitrarias acíclicas. Si el lector B depende del lector A, el lector B no puede leer más allá del lector A.

La dependencia del lector surge porque el lector A puede anotar una entrada, y el lector B depende de esa anotación. Por ejemplo, A realiza algún cálculo en una entrada y almacena el resultado en el campo a en la entrada. A luego continúa, y ahora B puede leer la entrada y el valor de A almacenado. Si el lector C no depende de A, C no debe intentar leer a .

Este es de hecho un modelo de programación interesante. Independientemente del rendimiento, el modelo solo puede beneficiar a muchas aplicaciones.

Por supuesto, el principal objetivo de LMAX es el rendimiento. Utiliza un anillo de entradas pre-asignado. El anillo es lo suficientemente grande, pero está limitado para que el sistema no se cargue más allá de la capacidad de diseño. Si el anillo está lleno, el / los escritor (es) esperarán hasta que los lectores más lentos avanzen y hagan espacio.

Los objetos de entrada están asignados previamente y viven para siempre, para reducir el costo de la recolección de basura. No insertamos nuevos objetos de entrada ni eliminamos objetos de entrada antiguos, en cambio, un escritor solicita una entrada preexistente, llena sus campos y notifica a los lectores. Esta acción aparente de 2 fases es en realidad simplemente una acción atómica.

setNewEntry(EntryPopulator); interface EntryPopulator{ void populate(Entry existingEntry); }

La asignación previa de entradas también significa que las entradas adyacentes (muy probablemente) se ubican en celdas de memoria adyacentes, y debido a que los lectores leen las entradas de manera secuencial, esto es importante para utilizar cachés de CPU.

Y muchos esfuerzos para evitar el bloqueo, CAS, incluso la barrera de la memoria (por ejemplo, use una variable de secuencia no volátil si solo hay un escritor)

Para los desarrolladores de lectores: los diferentes lectores de anotaciones deben escribir en diferentes campos, para evitar la contención de escritura. (En realidad, deberían escribir en diferentes líneas de caché). Un lector de anotaciones no debe tocar nada que puedan leer otros lectores no dependientes. Es por esto que digo que estos lectores anotan entradas, en lugar de modificar entradas.