c# - programming - BufferBlock deadlock con OutputAvailableAsync después de TryReceiveAll
task factory c# (2)
Este es un error en SourceCore
utilizado internamente por BufferBlock
. Su método TryReceiveAll
no _enableOffering
el miembro de datos booleanos TryReceive
mientras que TryReceive
sí TryReceive
hace. Eso da como resultado que la tarea devuelta por OutputAvailableAsync
nunca se complete.
Aquí hay una reproducción mínima:
var buffer = new BufferBlock<object>();
buffer.Post(null);
IList<object> items;
buffer.TryReceiveAll(out items);
var outputAvailableAsync = buffer.OutputAvailableAsync();
buffer.Post(null);
await outputAvailableAsync; // Never completes
Acabo de solucionarlo en el repositorio de .Net con esta solicitud de extracción . Esperemos que la solución se encuentre en el paquete nuget pronto.
Mientras trabajaba en una respuesta a esta pregunta , escribí este fragmento:
var buffer = new BufferBlock<object>();
var producer = Task.Run(async () =>
{
while (true)
{
await Task.Delay(TimeSpan.FromMilliseconds(100));
buffer.Post(null);
Console.WriteLine("Post " + buffer.Count);
}
});
var consumer = Task.Run(async () =>
{
while (await buffer.OutputAvailableAsync())
{
IList<object> items;
buffer.TryReceiveAll(out items);
Console.WriteLine("TryReceiveAll " + buffer.Count);
}
});
await Task.WhenAll(consumer, producer);
El productor debe publicar elementos en el búfer cada 100 ms y el consumidor debe eliminar todos los elementos del búfer y esperar asincrónicamente a que aparezcan más elementos.
Lo que realmente sucede es que el productor borra todos los elementos una vez, y luego nunca más se mueve más allá de OutputAvailableAsync
. Si cambio al consumidor para eliminar elementos uno por uno, funciona como excepción:
while (await buffer.OutputAvailableAsync())
{
object item;
while (buffer.TryReceive(out item)) ;
}
¿Estoy malinterpretando algo? Si no, ¿cuál es el problema?
Por desgracia, es a finales de septiembre de 2015 y, aunque solucionó el error, no se solucionó en la versión que se publicó dos días después de que se solucionó el error: Microsoft TPL Dataflow versión 4.5.24.
Sin embargo, IReceivableSourceBlock.TryReceive (...) funciona correctamente. Un método de extensión resolverá el problema. Después de una nueva versión de TPL Dataflow, será fácil cambiar el método de extensión.
/// <summary>
/// This extension method returns all available items in the IReceivableSourceBlock
/// or an empty sequence if nothing is available. The functin does not wait.
/// </summary>
/// <typeparam name="T">The type of items stored in the IReceivableSourceBlock</typeparam>
/// <param name="buffer">the source where the items should be extracted from </param>
/// <returns>The IList with the received items. Empty if no items were available</returns>
public static IList<T> TryReceiveAllEx<T>(this IReceivableSourceBlock<T> buffer)
{
/* Microsoft TPL Dataflow version 4.5.24 contains a bug in TryReceiveAll
* Hence this function uses TryReceive until nothing is available anymore
* */
IList<T> receivedItems = new List<T>();
T receivedItem = default(T);
while (buffer.TryReceive<T>(out receivedItem))
{
receivedItems.Add(receivedItem);
}
return receivedItems;
}
uso:
while (await this.bufferBlock.OutputAvailableAsync())
{
// some data available
var receivedItems = this.bufferBlock.TryReceiveAllEx();
if (receivedItems.Any())
{
ProcessReceivedItems(bufferBlock);
}
}