c# - Aparente raza/error BufferBlock.Post/Receive/ReceiveAsync
task-parallel-library async-await (1)
Stephen parece pensar que la siguiente es la solución
var m = await messageQueue.ReceiveAsync ();
en lugar de:
var m = await messageQueue.ReceiveAsync (TimeSpan.FromSeconds (30));
¿Puedes confirmar o negar esto?
publicado en forma cruzada en http://social.msdn.microsoft.com/Forums/en-US/tpldataflow/thread/89b3f71d-3777-4fad-9c11-50d8dc81a4a9
Lo sé ... Realmente no estoy usando TplDataflow a su máximo potencial. ATM Estoy simplemente usando BufferBlock
como una cola segura para la transmisión de mensajes, donde el productor y el consumidor se ejecutan a diferentes velocidades. Estoy viendo un comportamiento extraño que me deja perplejo en cuanto a cómo proceder.
private BufferBlock<object> messageQueue = new BufferBlock<object>();
public void Send(object message)
{
var accepted=messageQueue.Post(message);
logger.Info("Send message was called qlen = {0} accepted={1}",
messageQueue.Count,accepted);
}
public async Task<object> GetMessageAsync()
{
try
{
var m = await messageQueue.ReceiveAsync(TimeSpan.FromSeconds(30));
//despite messageQueue.Count>0 next line
//occasionally does not execute
logger.Info("message received");
//.......
}
catch(TimeoutException)
{
//do something
}
}
En el código anterior (que es parte de una solución distribuida de 2000 líneas), Send
se está llamando periódicamente cada 100 ms aproximadamente. Esto significa que un elemento se messageQueue
en messageQueue
alrededor de 10 veces por segundo. Esto es verificado Sin embargo, ocasionalmente parece que ReceiveAsync
no se completa dentro del tiempo de espera (es decir, el Post
no está causando que ReceiveAsync
complete) y TimeoutException
se TimeoutException
después de 30 TimeoutException
. En este punto, messageQueue.Count
está en los cientos. Esto es inesperado. Este problema también se ha observado a velocidades de publicación más lentas (1 publicación / segundo) y generalmente ocurre antes de que 1000 elementos pasen por BufferBlock
.
Por lo tanto, para evitar este problema, estoy usando el siguiente código, que funciona, pero ocasionalmente causa 1 latencia al recibir (debido al error que se produce arriba)
public async Task<object> GetMessageAsync()
{
try
{
object m;
var attempts = 0;
for (; ; )
{
try
{
m = await messageQueue.ReceiveAsync(TimeSpan.FromSeconds(1));
}
catch (TimeoutException)
{
attempts++;
if (attempts >= 30) throw;
continue;
}
break;
}
logger.Info("message received");
//.......
}
catch(TimeoutException)
{
//do something
}
}
Esto parece una condición de carrera en TDF para mí, pero no puedo llegar al fondo de por qué esto no ocurre en los otros lugares donde utilizo BufferBlock
de manera similar. Cambiar experimentalmente de ReceiveAsync
a Receive
no ayuda. No lo he comprobado, pero imagino que de manera aislada, el código anterior funciona perfectamente. Es un patrón que he visto documentado en "Introducción a TPL Dataflow" tpldataflow.docx .
¿Qué puedo hacer para llegar al fondo de esto? ¿Hay alguna métrica que pueda ayudar a inferir lo que está sucediendo? Si no puedo crear un caso de prueba confiable, ¿qué más información puedo ofrecer?
¡Ayuda!