c# task-parallel-library async-await dataflow tpl-dataflow

c# - Aparente raza/error BufferBlock.Post/Receive/ReceiveAsync



task-parallel-library async-await (1)

Stephen parece pensar que la siguiente es la solución

var m = await messageQueue.ReceiveAsync ();

en lugar de:

var m = await messageQueue.ReceiveAsync (TimeSpan.FromSeconds (30));

¿Puedes confirmar o negar esto?

publicado en forma cruzada en http://social.msdn.microsoft.com/Forums/en-US/tpldataflow/thread/89b3f71d-3777-4fad-9c11-50d8dc81a4a9

Lo sé ... Realmente no estoy usando TplDataflow a su máximo potencial. ATM Estoy simplemente usando BufferBlock como una cola segura para la transmisión de mensajes, donde el productor y el consumidor se ejecutan a diferentes velocidades. Estoy viendo un comportamiento extraño que me deja perplejo en cuanto a cómo proceder.

private BufferBlock<object> messageQueue = new BufferBlock<object>(); public void Send(object message) { var accepted=messageQueue.Post(message); logger.Info("Send message was called qlen = {0} accepted={1}", messageQueue.Count,accepted); } public async Task<object> GetMessageAsync() { try { var m = await messageQueue.ReceiveAsync(TimeSpan.FromSeconds(30)); //despite messageQueue.Count>0 next line //occasionally does not execute logger.Info("message received"); //....... } catch(TimeoutException) { //do something } }

En el código anterior (que es parte de una solución distribuida de 2000 líneas), Send se está llamando periódicamente cada 100 ms aproximadamente. Esto significa que un elemento se messageQueue en messageQueue alrededor de 10 veces por segundo. Esto es verificado Sin embargo, ocasionalmente parece que ReceiveAsync no se completa dentro del tiempo de espera (es decir, el Post no está causando que ReceiveAsync complete) y TimeoutException se TimeoutException después de 30 TimeoutException . En este punto, messageQueue.Count está en los cientos. Esto es inesperado. Este problema también se ha observado a velocidades de publicación más lentas (1 publicación / segundo) y generalmente ocurre antes de que 1000 elementos pasen por BufferBlock .

Por lo tanto, para evitar este problema, estoy usando el siguiente código, que funciona, pero ocasionalmente causa 1 latencia al recibir (debido al error que se produce arriba)

public async Task<object> GetMessageAsync() { try { object m; var attempts = 0; for (; ; ) { try { m = await messageQueue.ReceiveAsync(TimeSpan.FromSeconds(1)); } catch (TimeoutException) { attempts++; if (attempts >= 30) throw; continue; } break; } logger.Info("message received"); //....... } catch(TimeoutException) { //do something } }

Esto parece una condición de carrera en TDF para mí, pero no puedo llegar al fondo de por qué esto no ocurre en los otros lugares donde utilizo BufferBlock de manera similar. Cambiar experimentalmente de ReceiveAsync a Receive no ayuda. No lo he comprobado, pero imagino que de manera aislada, el código anterior funciona perfectamente. Es un patrón que he visto documentado en "Introducción a TPL Dataflow" tpldataflow.docx .

¿Qué puedo hacer para llegar al fondo de esto? ¿Hay alguna métrica que pueda ayudar a inferir lo que está sucediendo? Si no puedo crear un caso de prueba confiable, ¿qué más información puedo ofrecer?

¡Ayuda!