name method documentacion comentarios c# task-parallel-library tpl-dataflow

c# - method - Implementando la finalización correcta de un bloque recuperable



param name c# (2)

Teaser : chicos, esta pregunta no se trata de cómo implementar la política de reintento. Se trata de completar correctamente un bloque TPL Dataflow.

Esta pregunta es principalmente una continuación de mi pregunta anterior Reintentar política dentro de ITargetBlock . La respuesta a esta pregunta fue la solución inteligente de @ svick que utiliza TransformBlock (fuente) y TransformManyBlock (objetivo). El único problema que queda es completar este bloque de una manera correcta : espere a que todos los intentos se completen primero y luego complete el bloque de destino. Aquí es a lo que terminé (es solo un fragmento, no le preste demasiada atención a un conjunto de retries no son seguros para la rosca):

var retries = new HashSet<RetryingMessage<TInput>>(); TransformManyBlock<RetryableMessage<TInput>, TOutput> target = null; target = new TransformManyBlock<RetryableMessage<TInput>, TOutput>( async message => { try { var result = new[] { await transform(message.Data) }; retries.Remove(message); return result; } catch (Exception ex) { message.Exceptions.Add(ex); if (message.RetriesRemaining == 0) { if (failureHandler != null) failureHandler(message.Exceptions); retries.Remove(message); } else { retries.Add(message); message.RetriesRemaining--; Task.Delay(retryDelay) .ContinueWith(_ => target.Post(message)); } return null; } }, dataflowBlockOptions); source.LinkTo(target); source.Completion.ContinueWith(async _ => { while (target.InputCount > 0 || retries.Any()) await Task.Delay(100); target.Complete(); });

La idea es realizar algún tipo de sondeo y verificar si todavía hay mensajes que esperan ser procesados ​​y no hay mensajes que requieran reintentar. Pero en esta solución no me gusta la idea de votar.

Sí, puedo encapsular la lógica de agregar / eliminar reintentos en una clase separada, e incluso, por ejemplo, realizar alguna acción cuando el conjunto de reintentos queda vacío, pero ¿cómo tratar la condición target.InputCount > 0 ? No hay una devolución de llamada que se llame cuando no hay mensajes pendientes para el bloque, por lo que parece que verificar target.ItemCount en un bucle con un pequeño retraso es una opción única.

¿Alguien sabe una forma más inteligente de lograr esto?


La combinación de la respuesta de hwcverwe y el comentario de JamieSee podría ser la solución ideal.

Primero, necesitas crear más de un evento:

var signal = new ManualResetEvent(false); var completedEvent = new ManualResetEvent(false);

Luego, debe crear un observador y suscribirse a TransformManyBlock , de modo que reciba una notificación cuando ocurra un evento relevante:

var observer = new RetryingBlockObserver<TOutput>(completedEvent); var observable = target.AsObservable(); observable.Subscribe(observer);

Lo observable puede ser bastante fácil:

private class RetryingBlockObserver<T> : IObserver<T> { private ManualResetEvent completedEvent; public RetryingBlockObserver(ManualResetEvent completedEvent) { this.completedEvent = completedEvent; } public void OnCompleted() { completedEvent.Set(); } public void OnError(Exception error) { //TODO } public void OnNext(T value) { //TODO } }

Y puede esperar la señal o la finalización (agotamiento de todos los elementos de origen), o ambos

source.Completion.ContinueWith(async _ => { WaitHandle.WaitAll(completedEvent, signal); // Or WaitHandle.WaitAny, depending on your needs! target.Complete(); });

Puede inspeccionar el valor del resultado de WaitAll para comprender qué evento se configuró y reaccionar en consecuencia. También puede agregar otros eventos al código, pasándolos al observador, para que pueda configurarlos cuando sea necesario. Puede diferenciar su comportamiento y responder de manera diferente cuando se produce un error, por ejemplo


Tal vez un ManualResetEvent puede hacer el truco para usted.

Agregar una propiedad pública a TransformManyBlock

private ManualResetEvent _signal = new ManualResetEvent(false); public ManualResetEvent Signal { get { return _signal; } }

Y aquí tienes:

var retries = new HashSet<RetryingMessage<TInput>>(); TransformManyBlock<RetryableMessage<TInput>, TOutput> target = null; target = new TransformManyBlock<RetryableMessage<TInput>, TOutput>( async message => { try { var result = new[] { await transform(message.Data) }; retries.Remove(message); // Sets the state of the event to signaled, allowing one or more waiting threads to proceed if(!retries.Any()) Signal.Set(); return result; } catch (Exception ex) { message.Exceptions.Add(ex); if (message.RetriesRemaining == 0) { if (failureHandler != null) failureHandler(message.Exceptions); retries.Remove(message); // Sets the state of the event to signaled, allowing one or more waiting threads to proceed if(!retries.Any()) Signal.Set(); } else { retries.Add(message); message.RetriesRemaining--; Task.Delay(retryDelay) .ContinueWith(_ => target.Post(message)); } return null; } }, dataflowBlockOptions); source.LinkTo(target); source.Completion.ContinueWith(async _ => { //Blocks the current thread until the current WaitHandle receives a signal. target.Signal.WaitOne(); target.Complete(); });

No estoy seguro de dónde está configurado su target.InputCount . Entonces, en el lugar donde cambias target.InputCount puedes agregar el siguiente código:

if(InputCount == 0) Signal.Set();