programming parallel new net library for examples example .net-4.0 parallel-processing msmq task-parallel-library

.net-4.0 - new - parallel programming vb



Cómo procesar mensajes MSMQ en paralelo (7)

Estoy escribiendo un servicio de Windows para consumir mensajes MSMQ. El servicio tendrá períodos de alta actividad (80k mensajes entrando muy rápidamente) y largos períodos de inactividad (podrían ser varios días sin un nuevo mensaje).

Procesar los mensajes está muy vinculado a la red, por lo que obtengo un gran beneficio del paralelismo. Pero durante los períodos de inactividad, no quiero atar un montón de hilos esperando mensajes que no lleguen pronto.

La interfaz de MSMQ parece estar muy centrada en un flujo de trabajo sincrónico: obtener un mensaje, procesarlo, obtener otro, etc. ¿Cómo debería estructurar mi código para poder aprovechar el paralelismo durante períodos de alta actividad pero sin atar un montón? de hilos durante los períodos de ninguna actividad? Puntos de bonificación por usar el TPL. Pseudocódigo sería apreciado.


Aquí hay una versión simplificada de lo que terminé haciendo:

while(true) { int msgCount = 0; Parallel.ForEach(Enumerable.Range(0,20), (i) => { MessageQueue queue = new MessageQueue(_queuePath); try { msg = queue.Receive(TimeSpan.Zero); // do work Interlocked.Increment(ref msgCount); catch(MessageQueueException mqex) { if (mqex.MessageQueueErrorCode == MessageQueueErrorCode.IOTimeout) { return; // nothing in queue } else throw; } } if (msgCount < 20) Thread.Sleep(1000); // nothing more to do, take a break }

Así que trato de obtener mensajes de 20 a la vez, contando los que recibo. Para esos 20, dejo que el TPL vaya a la ciudad. Al final, si he procesado menos de 20 mensajes, la cola está vacía y duermo el hilo por un segundo antes de volver a intentarlo.


Así es como lo hago (cambiando el código sobre la marcha para que haya errores ortográficos):

for (int i = 0; i < numberOfSimultaneousRequests; i++) priorityQueue.BeginReceive(TimeSpan.FromDays(30), state, callback);

y la devolución de llamada se ve así:

private void ProcessMessage(IAsyncResult asyncResult) { try { Message msg = priorityQueue.EndReceive(asyncResult); //Do something with the message } finally { priorityQueue.BeginDequeue(null, ProcessMessage);//start processing another one }



He hecho un montón de MSMQ (incluidas las implementaciones móviles) a lo largo de los años y tiene razón en la caracterización del "flujo de trabajo sincrónico". No es que no pueda tomar los diversos envolventes de mensaje y procesarlos en los diferentes núcleos a través de TPL ... el factor limitante es leer / escribir en la cola ... inherentemente una operación en serie. No puede enviar 8 mensajes a la vez (una computadora con 8 núcleos) por ejemplo.

Tenía una necesidad similar (sin usar el espacio de nombres System.Messaging) y la resolví con la ayuda de un libro que leí "Camply and Johnson", "Programación paralela con Microsoft.NET".

Consulte su capítulo "Tareas paralelas" y específicamente la parte del uso de una cola global que coopera con colas locales por subproceso para el procesamiento de trabajo (es decir, el TPL) que utiliza un algoritmo de "robo de trabajo" para realizar el equilibrio de carga. Modelé mi solución, en parte, después de su ejemplo. La versión final de mi sistema tuvo una gran diferencia en su rendimiento (de 23 mensajes por segundo a más de 200).

Dependiendo de cuánto le tome a su sistema pasar de 0 a 80,000, querrá tomar el mismo diseño y extenderlo a través de varios servidores (cada uno con múltiples procesadores y múltiples núcleos). En teoría, mi configuración requeriría un poco menos de 7 minutos para pulir todos los 80K, por lo que al agregar una segunda computadora se reduciría a aproximadamente ~ 3 minutos y 20 segundos, etc., etc., etc. El truco es la trabajo robando lógica.

Comida para el pensamiento …

Una edición rápida: por cierto, la computadora es una estación de trabajo Dell T7500 con doble núcleo cuádruple Xeons @ 3GHz, 24 GB de RAM, Windows 7 Ultimate edición de 64 bits.


La solución también depende en parte de cómo se manejan los mensajes.

Usé un WorkflowService alojado en Windows Server AppFabric con enlace Net.Msmq y una cola transaccional. El enlace transaccional net.msmq era necesario para gestionar el procesamiento de mensajes fuera de servicio. El flujo de trabajo es una máquina de estado .Net 4.0.1 y los mensajes llegan a la misma cola desde diferentes sistemas. Es posible, por ejemplo, hacer que un sistema envíe una actualización a una instancia de máquina de estado antes de que otro sistema haya enviado un mensaje para instanciarlo. Para habilitar el procesamiento de mensajes fuera de servicio, el host del servicio de flujo de trabajo usa BufferedReceive para bloquear los mensajes, y vuelve a intentar obtenerlos de la subcola de bloqueo. BufferedReceive tiene un máximo de mensajes pendientes configurados para la longitud de lote máxima probable, ya que los mensajes en la cola bloqueada se devuelven a la cola de reintento en el frente .

WF también tiene una serie de ajustes de aceleración. Mi longitud de lote máxima probable es de aproximadamente 20000. Establecí MaxConcurrentCalls en 64, y MaxConcurrentInstances en 20000. Esto da como resultado que IIS / WAS maneje 64 llamadas concurrentes.

Sin embargo, y esto es lo que ocurre, dado que las recepciones en el flujo de trabajo son unidireccionales, esto no significa que los procesos WF engendrados finalicen tan pronto como finalice la recepción. Lo que sucede a continuación en mi escenario es que después de que el mensaje es dequeado y se invoca una instancia de WF, que es una de las 64 llamadas, el motor de flujo de trabajo programa una serie de pasos a seguir y uno de ellos es una operación de base de datos.

El problema es que 64 llamadas pueden ser el máximo, pero si la tasa de descarga de mensajes es más alta que la tasa de finalización asíncrona del proceso, a medida que se procesa el lote entrante de mensajes habrá un mayor número de subprocesos de ejecución (en mi caso WF instancias). Esto puede provocar que ocurran cosas inesperadas, por ejemplo, el grupo de conexiones ADO.NET tiene un valor predeterminado de 100 como número máximo de conexiones. Esto provocará que los procesos excedan el tiempo de espera de las conexiones de un grupo agotado. Para este problema en particular, puede aumentar el valor de MaxPoolSize o utilizar Service Broker para tratar las operaciones de db de forma asincrónica también (lo que significa más complejidad en el flujo de trabajo).

Espero que esto ayude a alguien.


NServiceBus tiene un gran concepto para este mismo problema. Se llama Distribuidor . La idea es que el distribuidor pueda reenviar el trabajo a realizar y distribuirlo a través de cualquier número de nodos hijo en ejecución. Dependiendo del tipo de trabajo que se realice, por ejemplo, cálculos pesados ​​frente a grabaciones en disco, puede distribuirlo en múltiples procesos o incluso en varias máquinas.


Solo tratando de alguna manera similar, tpl parece ser capaz de lanzar algún tipo de excepción de seguridad de subprocesos donde se encuentra con problemas físicos, por ejemplo, intente crear una sqlconnection fuera del tpl foreach y usarla dentro del cuerpo del bucle, arrojó una excepción para mí. Inicié una cola antes de ingresar al cuerpo, enumerando una lista de cadenas y me pareció que estaba bien, mi código procesaba 10000 elementos constantemente por debajo de 500 ms usando mensajes de 1 vía en un i7 2500 8gb y msmq local