tareas syllabus ingles basado aprendizaje abt c# asynchronous queue async-await .net-4.5

c# - syllabus - Cola agilizada basada en tareas



aprendizaje basado en tareas en ingles (5)

Aquí está la implementación que estoy usando actualmente.

public class MessageQueue<T> { ConcurrentQueue<T> queue = new ConcurrentQueue<T>(); ConcurrentQueue<TaskCompletionSource<T>> waitingQueue = new ConcurrentQueue<TaskCompletionSource<T>>(); object queueSyncLock = new object(); public void Enqueue(T item) { queue.Enqueue(item); ProcessQueues(); } public async Task<T> DequeueAsync(CancellationToken ct) { TaskCompletionSource<T> tcs = new TaskCompletionSource<T>(); ct.Register(() => { lock (queueSyncLock) { tcs.TrySetCanceled(); } }); waitingQueue.Enqueue(tcs); ProcessQueues(); return tcs.Task.IsCompleted ? tcs.Task.Result : await tcs.Task; } private void ProcessQueues() { TaskCompletionSource<T> tcs = null; T firstItem = default(T); lock (queueSyncLock) { while (true) { if (waitingQueue.TryPeek(out tcs) && queue.TryPeek(out firstItem)) { waitingQueue.TryDequeue(out tcs); if (tcs.Task.IsCanceled) { continue; } queue.TryDequeue(out firstItem); } else { break; } tcs.SetResult(firstItem); } } } }

Funciona bastante bien, pero hay mucha controversia en queueSyncLock , ya que estoy haciendo un gran uso del CancellationToken para cancelar algunas de las tareas de espera. Por supuesto, esto conduce a un bloqueo considerablemente menor que vería con un BlockingCollection pero ...

Me pregunto si existe un medio más suave y sin bloqueos para lograr el mismo fin

Me pregunto si existe una implementación / envoltura para ConcurrentQueue , similar a BlockingCollection donde tomar de la colección no bloquea, sino que es asíncrono y provocará una asincronía hasta que se coloque un elemento en la cola.

He creado mi propia implementación, pero parece que no está funcionando como se esperaba. Me pregunto si estoy reinventando algo que ya existe.

Aquí está mi implementación:

public class MessageQueue<T> { ConcurrentQueue<T> queue = new ConcurrentQueue<T>(); ConcurrentQueue<TaskCompletionSource<T>> waitingQueue = new ConcurrentQueue<TaskCompletionSource<T>>(); object queueSyncLock = new object(); public void Enqueue(T item) { queue.Enqueue(item); ProcessQueues(); } public async Task<T> Dequeue() { TaskCompletionSource<T> tcs = new TaskCompletionSource<T>(); waitingQueue.Enqueue(tcs); ProcessQueues(); return tcs.Task.IsCompleted ? tcs.Task.Result : await tcs.Task; } private void ProcessQueues() { TaskCompletionSource<T> tcs=null; T firstItem=default(T); while (true) { bool ok; lock (queueSyncLock) { ok = waitingQueue.TryPeek(out tcs) && queue.TryPeek(out firstItem); if (ok) { waitingQueue.TryDequeue(out tcs); queue.TryDequeue(out firstItem); } } if (!ok) break; tcs.SetResult(firstItem); } } }


Mi atempt (tiene un evento levantado cuando se crea una "promesa", y puede ser utilizado por un productor externo para saber cuándo producir más elementos):

public class AsyncQueue<T> { private ConcurrentQueue<T> _bufferQueue; private ConcurrentQueue<TaskCompletionSource<T>> _promisesQueue; private object _syncRoot = new object(); public AsyncQueue() { _bufferQueue = new ConcurrentQueue<T>(); _promisesQueue = new ConcurrentQueue<TaskCompletionSource<T>>(); } /// <summary> /// Enqueues the specified item. /// </summary> /// <param name="item">The item.</param> public void Enqueue(T item) { TaskCompletionSource<T> promise; do { if (_promisesQueue.TryDequeue(out promise) && !promise.Task.IsCanceled && promise.TrySetResult(item)) { return; } } while (promise != null); lock (_syncRoot) { if (_promisesQueue.TryDequeue(out promise) && !promise.Task.IsCanceled && promise.TrySetResult(item)) { return; } _bufferQueue.Enqueue(item); } } /// <summary> /// Dequeues the asynchronous. /// </summary> /// <param name="cancellationToken">The cancellation token.</param> /// <returns></returns> public Task<T> DequeueAsync(CancellationToken cancellationToken) { T item; if (!_bufferQueue.TryDequeue(out item)) { lock (_syncRoot) { if (!_bufferQueue.TryDequeue(out item)) { var promise = new TaskCompletionSource<T>(); cancellationToken.Register(() => promise.TrySetCanceled()); _promisesQueue.Enqueue(promise); this.PromiseAdded.RaiseEvent(this, EventArgs.Empty); return promise.Task; } } } return Task.FromResult(item); } /// <summary> /// Gets a value indicating whether this instance has promises. /// </summary> /// <value> /// <c>true</c> if this instance has promises; otherwise, <c>false</c>. /// </value> public bool HasPromises { get { return _promisesQueue.Where(p => !p.Task.IsCanceled).Count() > 0; } } /// <summary> /// Occurs when a new promise /// is generated by the queue /// </summary> public event EventHandler PromiseAdded; }


No sé de una solución sin bloqueo, pero puede echar un vistazo a la nueva biblioteca de Dataflow , que forma parte de Async CTP . Un simple BufferBlock<T> debería ser suficiente, por ejemplo:

BufferBlock<int> buffer = new BufferBlock<int>();

La producción y el consumo se realizan más fácilmente a través de métodos de extensión en los tipos de bloques de flujo de datos.

La producción es tan simple como:

buffer.Post(13);

y el consumo está listo para sincronizar:

int item = await buffer.ReceiveAsync();

Te recomiendo que uses Dataflow si es posible; hacer que ese buffer sea eficiente y correcto es más difícil de lo que parece.


Puede ser exagerado para su caso de uso (dada la curva de aprendizaje), pero las Extensiones reactivas proporcionan todo el pegamento que podría desear para la composición asincrónica.

En esencia, se suscribe a los cambios y se le envían a medida que están disponibles, y puede hacer que el sistema envíe los cambios a una secuencia separada.


Solo podría usar un BlockingCollection (usando el ConcurrentQueue predeterminado) y ajustar la llamada a Take in a Task para que pueda await :

var bc = new BlockingCollection<T>(); T element = await Task.Run( () => bc.Take() );