programming parallel net example async c# multithreading scheduled-tasks task-parallel-library pfx

c# - parallel - patrón de consumidor productor clásico mediante el bloqueo de la colección y las tareas.net 4 TPL



task factory c# (3)

Antes he usado un patrón que crea una especie de consumidor de colas "bajo demanda" (basado en el consumo de un ConcurrentQueue):

private void FireAndForget(Action fire) { _firedEvents.Enqueue(fire); lock (_taskLock) { if (_launcherTask == null) { _launcherTask = new Task(LaunchEvents); _launcherTask.ContinueWith(EventsComplete); _launcherTask.Start(); } } } private void LaunchEvents() { Action nextEvent; while (_firedEvents.TryDequeue(out nextEvent)) { if (_synchronized) { var syncEvent = nextEvent; _mediator._syncContext.Send(state => syncEvent(), null); } else { nextEvent(); } lock (_taskLock) { if (_firedEvents.Count == 0) { _launcherTask = null; break; } } } } private void EventsComplete(Task task) { if (task.IsFaulted && task.Exception != null) { // Do something with task Exception here } }

Por favor vea abajo el pseudo codigo

//Single or multiple Producers produce using below method void Produce(object itemToQueue) { concurrentQueue.enqueue(itemToQueue); consumerSignal.set; } //somewhere else we have started a consumer like this //we have only one consumer void StartConsumer() { while (!concurrentQueue.IsEmpty()) { if (concurrentQueue.TrydeQueue(out item)) { //long running processing of item } } consumerSignal.WaitOne(); }

¿Cómo puedo trasladar este patrón que he usado desde tiempos inmemoriales para usar las tareas creadas en la tarea y las nuevas funciones de señalización de la red 4. En otras palabras, si alguien escribiera este patrón utilizando la red 4, cómo se vería? El pseudo código está bien. Ya estoy usando .net 4 concurrentQueue como puedes ver. ¿Cómo puedo usar una tarea y posiblemente usar algún mecanismo de señalización más nuevo si es posible? Gracias

Solución a mi problema a continuación gracias a Jon / Dan. Dulce. Sin señalización manual ni tipo de bucle (true) o while (itemstoProcess) como los viejos tiempos

//Single or multiple Producers produce using below method void Produce(object itemToQueue) { blockingCollection.add(item); } //somewhere else we have started a consumer like this //this supports multiple consumers ! task(StartConsuming()).Start; void StartConsuming() { foreach (object item in blockingCollection.GetConsumingEnumerable()) { //long running processing of item } } cancellations are handled using cancel tokens


Deberías usar BlockingCollection<T> . Hay un ejemplo en la documentación.

Esa clase está diseñada específicamente para hacer esto trivial.


Tu segundo bloque de código se ve mejor. Pero, comenzar una Task y luego esperar inmediatamente no tiene sentido. Simplemente llame a Take y luego procese el artículo que se devuelve directamente en el hilo consumidor. Así es como debe hacerse el patrón productor-consumidor. Si cree que el procesamiento de los elementos de trabajo es lo suficientemente intensivo como para garantizar a más consumidores, entonces, con toda seguridad, inicie más consumidores. BlockingCollection es seguro para múltiples productores y múltiples consumidores.

public class YourCode { private BlockingCollection<object> queue = new BlockingCollection<object>(); public YourCode() { var thread = new Thread(StartConsuming); thread.IsBackground = true; thread.Start(); } public void Produce(object item) { queue.Add(item); } private void StartConsuming() { while (true) { object item = queue.Take(); // Add your code to process the item here. // Do not start another task or thread. } } }