c# multithreading asynchronous async-await tpl-dataflow

c# - ¿Cómo puedo asegurarme de que un bloque de flujo de datos solo crea subprocesos a pedido?



multithreading asynchronous (1)

Antes de implementar su solución en el entorno ASP.NET, le sugiero que cambie su arquitectura: IIS puede suspender los subprocesos en ASP.NET para su propio uso después de que se maneje la solicitud, por lo que su tarea podría quedar sin terminar. Un mejor enfoque es crear un demonio de servicio de Windows separado, que maneje su flujo de datos.

Ahora volvamos al flujo de datos TPL.

Me encanta la biblioteca de flujo de datos TPL, pero su documentación es un verdadero desastre.
El único documento útil que he encontrado es Introducción al flujo de datos TPL .

Hay algunas pistas que pueden ser útiles, especialmente las relacionadas con los ajustes de configuración (le sugiero que investigue la implementación de su propio TaskScheduler con su propia implementación de MaxMessagesPerTask y la opción MaxMessagesPerTask ) si necesita:

Los bloques de flujo de datos integrados son configurables, con un gran control proporcionado sobre cómo y dónde los bloques realizan su trabajo. Aquí hay algunos mandos clave disponibles para el desarrollador, todos los cuales están expuestos a través de la clase DataflowBlockOptions y sus tipos derivados ( ExecutionDataflowBlockOptions y GroupingDataflowBlockOptions ), cuyas instancias pueden proporcionarse a los bloques en el momento de la construcción.

  • Personalización de TaskScheduler, como @ i3arnon mencionó:

    De forma predeterminada, los bloques de flujo de datos programan el trabajo en TaskScheduler.Default , que apunta al funcionamiento interno de .NET ThreadPool .

  • MaxDegreeOfParallelism

    El valor predeterminado es 1 , lo que significa que solo una cosa puede suceder en un bloque a la vez. Si se establece en un valor superior a 1 , el bloque puede procesar ese número de mensajes al mismo tiempo. Si se establece en DataflowBlockOptions.Unbounded (-1) , se puede procesar cualquier número de mensajes al mismo tiempo, con el máximo administrado automáticamente por el planificador subyacente dirigido por el bloque de flujo de datos. Tenga en cuenta que MaxDegreeOfParallelism es un máximo, no un requisito.

  • MaxMessagesPerTask

    TPL Dataflow se centra tanto en la eficiencia como en el control. Cuando hay concesiones necesarias entre los dos, el sistema se esfuerza por proporcionar una calidad predeterminada, pero también permite al desarrollador personalizar el comportamiento de acuerdo con una situación particular. Un ejemplo es el intercambio entre desempeño y equidad. De forma predeterminada, los bloques de flujo de datos intentan minimizar el número de objetos de tarea que son necesarios para procesar todos sus datos. Esto proporciona una ejecución muy eficiente; mientras un bloque tenga datos disponibles para ser procesados, las tareas de ese bloque permanecerán para procesar los datos disponibles, solo se retirarán cuando no haya más datos disponibles (hasta que los datos estén nuevamente disponibles, momento en el cual se ejecutarán más tareas). Sin embargo, esto puede llevar a problemas de equidad. Si el sistema está actualmente saturado de datos de procesamiento de un conjunto dado de bloques, y luego los datos llegan a otros bloques, esos últimos bloques tendrán que esperar a que los primeros bloques terminen de procesarse antes de que puedan comenzar o, o bien, arriesgarse a una suscripción excesiva el sistema. Este puede o no ser el comportamiento correcto para una situación dada. Para solucionar esto, existe la opción MaxMessagesPerTask . El valor predeterminado es DataflowBlockOptions.Unbounded (-1) , lo que significa que no hay un máximo. Sin embargo, si se establece en un número positivo, ese número representará el número máximo de mensajes que un bloque dado puede usar una sola tarea para procesar. Una vez que se alcanza ese límite, el bloque debe retirar la tarea y reemplazarlo con una réplica para continuar con el procesamiento. Estas réplicas se tratan de manera justa con respecto a todas las demás tareas programadas para el programador, lo que permite que los bloques logren un mínimo de equidad entre ellos. En el extremo, si MaxMessagesPerTask se establece en 1, se utilizará una sola tarea por mensaje, logrando la máxima equidad a expensas de más tareas que las que podrían haber sido necesarias.

  • MaxNumberOfGroups

    Los bloques de agrupación son capaces de rastrear la cantidad de grupos que han producido y se completan automáticamente (rechazando más mensajes ofrecidos) después de que se haya generado esa cantidad de grupos. De forma predeterminada, el número de grupos es DataflowBlockOptions.Unbounded (-1) , pero puede establecerse explícitamente en un valor mayor que uno.

  • CancelaciónToken

    Este token se monitorea durante la vida útil del bloque de flujo de datos. Si llega una solicitud de cancelación antes de que se complete el bloqueo, el bloqueo dejará de funcionar de la manera más educada y rápida posible.

  • Codicioso

    Por defecto, los bloques de destino son codiciosos y quieren que se les ofrezcan todos los datos.

  • BoundedCapacity

    Este es el límite en la cantidad de elementos que el bloque puede almacenar y tener en vuelo en cualquier momento.

He escrito una pequeña canalización utilizando la API de flujo de datos TPL que recibe datos de varios subprocesos y realiza el manejo de ellos.

Configuración 1

Cuando lo configuro para usar MaxDegreeOfParallelism = Environment.ProcessorCount (llega a 8 en mi caso) para cada bloque, me doy cuenta de que llena búferes en varios subprocesos y el procesamiento del segundo bloque no comienza hasta que se han recibido +1700 elementos. todos los temas. Puedes ver esto en acción here .

Configuración 2

Cuando configuro MaxDegreeOfParallelism = 1 , me doy cuenta de que todos los elementos se reciben en un solo hilo y el procesamiento del envío ya comienza después de que se reciben + - 40 elementos. Datos aquí .

Configuración 3

Cuando configuro MaxDegreeOfParallelism = 1 e introduzco un retraso de 1000ms antes de enviar cada entrada, observo que los elementos se envían tan pronto como se reciben y cada elemento recibido se coloca en un hilo separado. Datos here .

Hasta aquí la configuración. Mis preguntas son las siguientes:

  1. Cuando comparo las configuraciones 1 y 2, me doy cuenta de que los elementos de procesamiento se inician mucho más rápido cuando se realiza en serie en comparación con el paralelo (incluso después de tener en cuenta el hecho de que el paralelo tiene 8 veces más hilos). ¿Qué causa esta diferencia?

  2. Dado que esto se ejecutará en un entorno ASP.NET, no quiero generar subprocesos innecesarios ya que todos provienen de un solo conjunto de subprocesos. Como se muestra en la configuración 3, aún se propagará a través de múltiples subprocesos, incluso cuando solo hay un puñado de datos. Esto también es sorprendente porque de la configuración 1 supongo que los datos se distribuyen secuencialmente en subprocesos (observe cómo los primeros 50 elementos van al subproceso 16). ¿Puedo asegurarme de que solo cree nuevos subprocesos a pedido?

  3. Hay otro concepto llamado el BufferBlock<T> . Si el TransformBlock<T> ya está en la cola, ¿cuál sería la diferencia práctica de cambiar el primer paso en mi canalización ( ReceiveElement ) por un BufferBlock ?

class Program { static void Main(string[] args) { var dataflowProcessor = new DataflowProcessor<string>(); var amountOfTasks = 5; var tasks = new Task[amountOfTasks]; for (var i = 0; i < amountOfTasks; i++) { tasks[i] = SpawnThread(dataflowProcessor, $"Task {i + 1}"); } foreach (var task in tasks) { task.Start(); } Task.WaitAll(tasks); Console.WriteLine("Finished feeding threads"); // Needs to use async main Console.Read(); } private static Task SpawnThread(DataflowProcessor<string> dataflowProcessor, string taskName) { return new Task(async () => { await FeedData(dataflowProcessor, taskName); }); } private static async Task FeedData(DataflowProcessor<string> dataflowProcessor, string threadName) { foreach (var i in Enumerable.Range(0, short.MaxValue)) { await Task.Delay(1000); // Only used for the delayedSerialProcessing test dataflowProcessor.Process($"Thread name: {threadName}/t Thread ID:{Thread.CurrentThread.ManagedThreadId}/t Value:{i}"); } } } public class DataflowProcessor<T> { private static readonly ExecutionDataflowBlockOptions ExecutionOptions = new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = Environment.ProcessorCount }; private static readonly TransformBlock<T, T> ReceiveElement = new TransformBlock<T, T>(element => { Console.WriteLine($"Processing received element in thread {Thread.CurrentThread.ManagedThreadId}"); return element; }, ExecutionOptions); private static readonly ActionBlock<T> SendElement = new ActionBlock<T>(element => { Console.WriteLine($"Processing sent element in thread {Thread.CurrentThread.ManagedThreadId}"); Console.WriteLine(element); }, ExecutionOptions); static DataflowProcessor() { ReceiveElement.LinkTo(SendElement); ReceiveElement.Completion.ContinueWith(x => { if (x.IsFaulted) { ((IDataflowBlock) ReceiveElement).Fault(x.Exception); } else { ReceiveElement.Complete(); } }); } public void Process(T newElement) { ReceiveElement.Post(newElement); } }