c# - TPL Dataflow, garantiza la finalización solo cuando TODOS los bloques de datos de origen se completan
concurrency task-parallel-library (4)
¿Cómo puedo volver a escribir el código que completa el código cuando se completan AMBOS transformblocks? ¿Pensé que la terminación significa que está marcado como completado Y que la "cola de salida" está vacía?
public Test()
{
broadCastBlock = new BroadcastBlock<int>(i =>
{
return i;
});
transformBlock1 = new TransformBlock<int, string>(i =>
{
Console.WriteLine("1 input count: " + transformBlock1.InputCount);
Thread.Sleep(50);
return ("1_" + i);
});
transformBlock2 = new TransformBlock<int, string>(i =>
{
Console.WriteLine("2 input count: " + transformBlock1.InputCount);
Thread.Sleep(20);
return ("2_" + i);
});
processorBlock = new ActionBlock<string>(i =>
{
Console.WriteLine(i);
});
//Linking
broadCastBlock.LinkTo(transformBlock1, new DataflowLinkOptions { PropagateCompletion = true });
broadCastBlock.LinkTo(transformBlock2, new DataflowLinkOptions { PropagateCompletion = true });
transformBlock1.LinkTo(processorBlock, new DataflowLinkOptions { PropagateCompletion = true });
transformBlock2.LinkTo(processorBlock, new DataflowLinkOptions { PropagateCompletion = true });
}
public void Start()
{
const int numElements = 100;
for (int i = 1; i <= numElements; i++)
{
broadCastBlock.SendAsync(i);
}
//mark completion
broadCastBlock.Complete();
processorBlock.Completion.Wait();
Console.WriteLine("Finished");
Console.ReadLine();
}
}
Edité el código, agregando un conteo de búfer de entrada para cada bloque de transformación. Claramente, los 100 elementos se transmiten a cada uno de los bloques de transformación. Pero tan pronto como uno de los bloques de transformación termina, el bloque de procesador no acepta más elementos y, en cambio, el búfer de entrada del bloque de transformación incompleto simplemente vacía el búfer de entrada.
El problema aquí es que está configurando la propiedad PropagateCompletion
cada vez que llama al método LinkTo
para vincular los bloques y los diferentes tiempos de espera en sus bloques de transformación.
De la documentación para el método Complete
en la interfaz IDataflowBlock
(el énfasis es mío):
Señala al IDataflowBlock que no debe aceptar ni producir más mensajes ni consumir más mensajes pospuestos .
Debido a que escalona los tiempos de espera en cada una de las instancias de TransformBlock<TInput, TOutput>
, transformBlock2
(en espera de 20 ms) finaliza antes de transformBlock1
(en espera de 50 ms). transformBlock2
completa primero, y luego envía la señal a processorBlock
que luego dice "No acepto nada más" (y transformBlock1
aún no ha producido todos sus mensajes).
Tenga en cuenta que el procesamiento de transformBlock1
antes de transformBlock1
no está absolutamente garantizado; es factible que el grupo de subprocesos (suponiendo que está usando el programador predeterminado) procesará las tareas en un orden diferente (pero probablemente no lo hará, ya que robará el trabajo de las colas una vez que se realicen los elementos de 20 ms).
Su tubería se ve así:
broadcastBlock
/ /
transformBlock1 transformBlock2
/ /
processorBlock
Para solucionar esto, debes tener un canal que se vea así:
broadcastBlock
/ /
transformBlock1 transformBlock2
| |
processorBlock1 processorBlock2
Lo que se logra simplemente creando dos ActionBlock<TInput>
separadas de ActionBlock<TInput>
, así:
// The action, can be a method, makes it easier to share.
Action<string> a = i => Console.WriteLine(i);
// Create the processor blocks.
processorBlock1 = new ActionBlock<string>(a);
processorBlock2 = new ActionBlock<string>(a);
// Linking
broadCastBlock.LinkTo(transformBlock1,
new DataflowLinkOptions { PropagateCompletion = true });
broadCastBlock.LinkTo(transformBlock2,
new DataflowLinkOptions { PropagateCompletion = true });
transformBlock1.LinkTo(processorBlock1,
new DataflowLinkOptions { PropagateCompletion = true });
transformBlock2.LinkTo(processorBlock2,
new DataflowLinkOptions { PropagateCompletion = true });
Entonces necesitas esperar en ambos bloques de procesador en lugar de solo uno:
Task.WhenAll(processorBlock1.Completion, processorBlock2.Completion).Wait();
Una nota muy importante aquí; cuando se crea un ActionBlock<TInput>
, el valor predeterminado es tener la propiedad MaxDegreeOfParallelism
en la instancia ExecutionDataflowBlockOptions
MaxDegreeOfParallelism
establecida en uno.
Esto significa que las llamadas al delegado de Action<T>
que pasa al ActionBlock<TInput>
son seguras para subprocesos, solo una se ejecutará a la vez.
Como ahora tiene dos ActionBlock<TInput>
apuntan al mismo delegado de Action<T>
, no se le garantiza la seguridad de subprocesos.
Si su método es seguro para subprocesos, entonces no tiene que hacer nada (lo que le permitiría establecer la propiedad MaxDegreeOfParallelism
en DataflowBlockOptions.Unbounded
, ya que no hay razón para bloquear).
Si no es seguro para subprocesos y necesita garantizarlo, debe recurrir a primitivas de sincronización tradicionales, como la declaración de lock
.
En este caso, lo haría así (aunque claramente no es necesario, ya que el método WriteLine
en la clase Console
es seguro para subprocesos):
// The lock.
var l = new object();
// The action, can be a method, makes it easier to share.
Action<string> a = i => {
// Ensure one call at a time.
lock (l) Console.WriteLine(i);
};
// And so on...
El problema es exactamente lo que dijo CasperOne en su respuesta. Una vez que se completa el primer bloque de transformación, el bloque del procesador pasa al "modo de finalización": procesará los elementos restantes en su cola de entrada, pero no aceptará ningún elemento nuevo.
Sin embargo, hay una solución más simple que dividir el bloque del procesador en dos: no establezca PropagateCompletion
, sino que establezca la finalización del bloque del procesador manualmente cuando ambos bloques de transformación se completen:
Task.WhenAll(transformBlock1.Completion, transformBlock2.Completion)
.ContinueWith(_ => processorBlock.Complete());
Otras respuestas son bastante claras acerca de por qué PropagateCompletion = true desordena las cosas cuando un bloque tiene más de dos fuentes.
Para proporcionar una solución simple al problema, es posible que desee ver una biblioteca de código abierto DataflowEx que resuelve este tipo de problema con reglas de finalización más inteligentes integradas. (Utiliza el enlace de flujo de datos TPL internamente, pero admite una propagación de finalización compleja. La implementación parece similar a WhenAll, pero también controla la adición del enlace dinámico. Consulte Dataflow.RegisterDependency() y TaskEx.AwaitableWhenAll() para obtener información detallada).
Cambié ligeramente tu código para que todo funcione con DataflowEx:
public CompletionDemo1()
{
broadCaster = new BroadcastBlock<int>(
i =>
{
return i;
}).ToDataflow();
transformBlock1 = new TransformBlock<int, string>(
i =>
{
Console.WriteLine("1 input count: " + transformBlock1.InputCount);
Thread.Sleep(50);
return ("1_" + i);
});
transformBlock2 = new TransformBlock<int, string>(
i =>
{
Console.WriteLine("2 input count: " + transformBlock2.InputCount);
Thread.Sleep(20);
return ("2_" + i);
});
processor = new ActionBlock<string>(
i =>
{
Console.WriteLine(i);
}).ToDataflow();
/** rather than TPL linking
broadCastBlock.LinkTo(transformBlock1, new DataflowLinkOptions { PropagateCompletion = true });
broadCastBlock.LinkTo(transformBlock2, new DataflowLinkOptions { PropagateCompletion = true });
transformBlock1.LinkTo(processorBlock, new DataflowLinkOptions { PropagateCompletion = true });
transformBlock2.LinkTo(processorBlock, new DataflowLinkOptions { PropagateCompletion = true });
**/
//Use DataflowEx linking
var transform1 = transformBlock1.ToDataflow();
var transform2 = transformBlock2.ToDataflow();
broadCaster.LinkTo(transform1);
broadCaster.LinkTo(transform2);
transform1.LinkTo(processor);
transform2.LinkTo(processor);
}
El código completo está here .
Descargo de responsabilidad: Soy el autor de DataflowEx, que se publica bajo la licencia MIT.
Una adición a la respuesta de svick: para ser consistente con el comportamiento que obtiene con la opción PropagateCompletion, también debe enviar excepciones en caso de que un bloque anterior haya fallado. Un método de extensión como el siguiente también se encarga de eso:
public static void CompleteWhenAll(this IDataflowBlock target, params IDataflowBlock[] sources) {
if (target == null) return;
if (sources.Length == 0) { target.Complete(); return; }
Task.Factory.ContinueWhenAll(
sources.Select(b => b.Completion).ToArray(),
tasks => {
var exceptions = (from t in tasks where t.IsFaulted select t.Exception).ToList();
if (exceptions.Count != 0) {
target.Fault(new AggregateException(exceptions));
} else {
target.Complete();
}
}
);
}