c# multithreading blockingcollection

c# - Usando BlockingCollection<>: OperationCanceledException, ¿hay una mejor manera?



multithreading (4)

¿Qué tal el BlockingQueue que hice hace un tiempo?

http://apichange.codeplex.com/SourceControl/changeset/view/76c98b8c7311#ApiChange.Api%2fsrc%2fInfrastructure%2fBlockingQueue.cs

Debería hacerlo bien sin ninguna excepción. La cola actual simplemente cierra el evento cuando se desecha, lo que podría no ser lo que usted desea. Es posible que desee hacer un nulo y esperar hasta que se hayan procesado todos los elementos. Aparte de esto debe adaptarse a sus necesidades.

using System.Collections.Generic; using System.Collections; using System.Threading; using System; namespace ApiChange.Infrastructure { /// <summary> /// A blocking queue which supports end markers to signal that no more work is left by inserting /// a null reference. This constrains the queue to reference types only. /// </summary> /// <typeparam name="T"></typeparam> public class BlockingQueue<T> : IEnumerable<T>, IEnumerable, IDisposable where T : class { /// <summary> /// The queue used to store the elements /// </summary> private Queue<T> myQueue = new Queue<T>(); bool myAllItemsProcessed = false; ManualResetEvent myEmptyEvent = new ManualResetEvent(false); /// <summary> /// Deques an element from the queue and returns it. /// If the queue is empty the thread will block. If the queue is stopped it will immedieately /// return with null. /// </summary> /// <returns>An object of type T</returns> public T Dequeue() { if (myAllItemsProcessed) return null; lock (myQueue) { while (myQueue.Count == 0) { if(!Monitor.Wait(myQueue, 45)) { // dispatch any work which is not done yet if( myQueue.Count > 0 ) continue; } // finito if (myAllItemsProcessed) { return null; } } T result = myQueue.Dequeue(); if (result == null) { myAllItemsProcessed = true; myEmptyEvent.Set(); } return result; } } /// <summary> /// Releases the waiters by enqueuing a null reference which causes all waiters to be released. /// The will then get a null reference as queued element to signal that they should terminate. /// </summary> public void ReleaseWaiters() { Enqueue(null); } /// <summary> /// Waits the until empty. This does not mean that all items are already process. Only that /// the queue contains no more pending work. /// </summary> public void WaitUntilEmpty() { myEmptyEvent.WaitOne(); } /// <summary> /// Adds an element of type T to the queue. /// The consumer thread is notified (if waiting) /// </summary> /// <param name="data_in">An object of type T</param> public void Enqueue(T data_in) { lock (myQueue) { myQueue.Enqueue(data_in); Monitor.PulseAll(myQueue); } } /// <summary> /// Returns an IEnumerator of Type T for this queue /// </summary> /// <returns></returns> IEnumerator<T> IEnumerable<T>.GetEnumerator() { while (true) { T item = Dequeue(); if (item == null) break; else yield return item; } } /// <summary> /// Returns a untyped IEnumerator for this queue /// </summary> /// <returns></returns> IEnumerator IEnumerable.GetEnumerator() { return ((IEnumerable<T>)this).GetEnumerator(); } #region IDisposable Members /// <summary> /// Closes the EmptyEvent WaitHandle. /// </summary> public void Dispose() { myEmptyEvent.Close(); } #endregion } }

Estoy haciendo uso del tipo BlockingCollection<> (francamente excelente) para una aplicación de alto rendimiento y multiproceso.

Hay una gran cantidad de rendimiento a través de la colección y en el nivel micro es muy eficaz. Sin embargo, para cada ''lote'' siempre se terminará marcando el token de cancelación. Esto da como resultado que se lance una excepción en cualquier llamada Take espera. Eso está bien, pero me habría conformado con un valor de retorno o un parámetro de salida para señalarlo, porque a) las excepciones tienen una sobrecarga obvia yb) al depurar, no quiero desactivar manualmente la interrupción en la excepción para ese específico excepción.

La implementación parece intensa, y en teoría supongo que podría desmontar y recrear mi propia versión que no usó excepciones, pero ¿tal vez hay una forma menos compleja?

Podría agregar un objeto null (o, si no, un marcador de posición) a la colección para indicar que el proceso debería finalizar, sin embargo, también debe haber un medio para abortar bien, es decir, despertar a los hilos de espera y decirles de alguna manera que algo está sucediendo.

Entonces, ¿tipos alternativos de colección? Recrear mi propio? ¿Alguna forma de abusar de esta?

(Algún contexto: fui con BlockingCollection<> porque tiene una ventaja sobre el bloqueo manual alrededor de una Queue . Lo mejor que puedo decir es que el uso de primitivas de subprocesos es excelente y, en mi caso, unos pocos milisegundos aquí y allá y un núcleo óptimo es de uso crucial.)

Edición: Acabo de abrir una recompensa por esta. No creo que la respuesta de Anastasiosyal cubra la consulta que planteo en mi comentario al respecto . Sé que este es un problema difícil. ¿Alguien puede ayudar?


Como supongo que ya lo has hecho tú mismo, mirando la fuente reflejada de BlockingCollection parece desafortunadamente que cuando se pasa un CancelToken a BlockingCollection y se cancela, obtendrás la OperationCancelledException como se puede ver en la imagen de abajo (con un par de soluciones tras la imagen)

GetConsumingEnumerable invoca TryTakeWithNoTimeValidation en BlockingCollection, que a su vez genera esta excepción.

Solución # 1

Una posible estrategia sería, suponiendo que usted tenga más control sobre sus productores y sus consumidores, en lugar de pasar el token de cancelación a BlockingCollection, (que generará esta excepción), usted pasa el token de cancelación a sus productores y a sus consumidores.

Si sus productores no producen y sus consumidores no consumen, entonces ha cancelado la operación de manera efectiva sin presentar esta excepción y al pasar CancelaciónToken. Ninguno en su Bloqueo de recolección.

Casos especiales Cancelación cuando la BlockingCollection está en BoundedCapacity o en Empty

Productores bloqueados : los subprocesos productores se bloquearán cuando se alcance BoundedCapacity en BlockingCollection. Por lo tanto, cuando se intenta cancelar y BlockingCollection está en BoundedCapacity (lo que significa que sus consumidores no están bloqueados, pero los productores están bloqueados porque no pueden agregar ningún elemento adicional a la cola), entonces deberá permitir que se consuman elementos adicionales (uno para cada subproceso productor) que desbloqueará a los productores (porque están bloqueados al agregar al bloqueo de recolección) y, a su vez, permitirá que su lógica de cancelación se active en el lado del productor.

Consumidores bloqueados : cuando sus consumidores están bloqueados porque la cola está vacía, puede insertar una unidad de trabajo vacía (una para cada subproceso de consumidor) en la colección de Bloqueo para desbloquear los subprocesos del consumidor y permitir que su lógica de cancelación se active. El lado del consumidor.

Cuando hay elementos en la cola y no se ha alcanzado ningún límite, como BoundedCapacity o Empty, los subprocesos de productores y consumidores no deben bloquearse.

Solución # 2

Usando una unidad de cancelación de trabajo.

Cuando su solicitud necesita cancelarse, entonces sus productores (tal vez solo 1 productor será suficiente mientras que los otros simplemente cancelen la producción) producirán una unidad de trabajo de cancelación (podrían ser nulas como usted también menciona o alguna clase que implementa una interfaz de marcador). Cuando los consumidores consumen esta unidad de trabajo y detectan que en realidad es una unidad de cancelación de trabajo, se activa su lógica de cancelación. El número de unidades de cancelación de trabajo a producir debe ser igual al número de hilos de consumo.

Nuevamente, se necesita precaución cuando estamos cerca de BoundedCapacity, ya que podría ser una señal de que algunos de los productores están bloqueados. Dependiendo de la cantidad de productores / consumidores, podría tener un consumidor consumiendo hasta que todos los productores (excepto 1) hayan cerrado. Esto asegura que no haya productores persistentes alrededor. Cuando solo queda 1 productor, su último consumidor puede cerrar y el productor puede dejar de producir unidades de cancelación de trabajo.


Kieren,

Desde mi inspección, personalmente no conozco ningún tipo de hilo seguro para el patrón ProducerConsumer que haga exactamente lo que usted quería. No reclamo esto como una solución competitiva, pero le propongo que decore BlockingCollection<T> con pocos extension method que le dará la libertad de suministrar cualquier tipo integrado o personalizado en lugar de CancellationToken predeterminado.

Nivel 1:

A continuación se muestra la lista de métodos predeterminados que utilizan el método TryAddWithNoTimeValidation subyacente para agregar a la cola.

public void Add(T item){ this.TryAddWithNoTimeValidation(item, -1, new CancellationToken()); } public void Add(T item, CancellationToken cancellationToken){ this.TryAddWithNoTimeValidation(item, -1, cancellationToken); } public bool TryAdd(T item){ return this.TryAddWithNoTimeValidation(item, 0, new CancellationToken()); } public bool TryAdd(T item, TimeSpan timeout){ BlockingCollection<T>.ValidateTimeout(timeout); return this.TryAddWithNoTimeValidation(item, (int) timeout.TotalMilliseconds, new CancellationToken()); } public bool TryAdd(T item, int millisecondsTimeout){ BlockingCollection<T>.ValidateMillisecondsTimeout(millisecondsTimeout); return this.TryAddWithNoTimeValidation(item, millisecondsTimeout, new CancellationToken()); } public bool TryAdd(T item, int millisecondsTimeout, CancellationToken cancellationToken){ BlockingCollection<T>.ValidateMillisecondsTimeout(millisecondsTimeout); return this.TryAddWithNoTimeValidation(item, millisecondsTimeout, cancellationToken); }

Ahora puede proporcionar una extensión para cualquiera / todos los métodos que le interesen.

Etapa 2:

Ahora remite su implementación de TryAddWithNoTimeValidation lugar de la predeterminada.

Puedo darle una versión alternativa de TryAddWithNoTimeValidation que continúa de forma segura sin lanzar OperationCancellation excepción OperationCancellation .


Puede indicar el final de un lote estableciendo una marca en el último elemento (agregue una propiedad bool IsLastItem o envuélvala). O puede enviar un nulo como último elemento (aunque no estoy seguro si un nulo atraviesa la selección de bloqueo correctamente).

Si puede eliminar la necesidad del concepto de ''lote'', puede crear un hilo adicional para Tomar () y Procesar continuamente los datos de su colección de bloqueo y no hacer nada más.