thread safe example concurrentbag concurrent .net collections thread-safety blocking objectpool

.net - example - thread safe list c#



Rendimiento de BlockingCollection(T) (2)

Durante un tiempo en mi empresa, hemos utilizado una implementación de ObjectPool<T> local que proporciona acceso bloqueado a sus contenidos. Es bastante sencillo: una Queue<T> , un object para bloquear, y un AutoResetEvent para señalar a un hilo de "préstamo" cuando se agrega un elemento.

La carne de la clase son realmente estos dos métodos:

public T Borrow() { lock (_queueLock) { if (_queue.Count > 0) return _queue.Dequeue(); } _objectAvailableEvent.WaitOne(); return Borrow(); } public void Return(T obj) { lock (_queueLock) { _queue.Enqueue(obj); } _objectAvailableEvent.Set(); }

Hemos estado usando esta y algunas otras clases de colección en lugar de las proporcionadas por System.Collections.Concurrent porque estamos usando .NET 3.5, no 4.0. Pero recientemente descubrimos que, dado que estamos usando Extensiones reactivas , en realidad tenemos el espacio de nombres Concurrent disponible para nosotros (en System.Threading.dll).

Naturalmente, pensé que dado que BlockingCollection<T> es una de las clases principales en el espacio de nombres Concurrent , probablemente ofrecería un mejor rendimiento que cualquier cosa que yo o mis compañeros de equipo hayan escrito.

Así que intenté escribir una nueva implementación que funciona de manera muy simple:

public T Borrow() { return _blockingCollection.Take(); } public void Return(T obj) { _blockingCollection.Add(obj); }

Para mi sorpresa, de acuerdo con algunas pruebas simples (pedir prestado / regresar al grupo unas cuantas veces desde múltiples subprocesos), nuestra implementación original supera significativamente a BlockingCollection<T> en términos de rendimiento . Ambos parecen funcionar correctamente ; es solo que nuestra implementación original parece ser mucho más rápida.

Mi pregunta:

  1. ¿Por qué sería esto? ¿Es quizás porque BlockingCollection<T> ofrece una mayor flexibilidad (entiendo que funciona envolviendo un IProducerConsumerCollection<T> ), que necesariamente introduce la sobrecarga de rendimiento?
  2. ¿Es esto solo un uso erróneo de la clase BlockingCollection<T> ?
  3. Si este es un uso apropiado de BlockingCollection<T> , ¿simplemente no lo uso correctamente? Por ejemplo, ¿es el enfoque de Take / Add demasiado simplista, y hay una manera mucho mejor de obtener la misma funcionalidad?

A menos que alguien tenga alguna idea que ofrecer en respuesta a esa tercera pregunta, parece que nos quedaremos con nuestra implementación original por ahora.


Hay un par de posibilidades potenciales, aquí.

En primer lugar, BlockingCollection<T> en las extensiones reactivas es un backport, y no es exactamente lo mismo que la versión final de .NET 4. No me sorprendería si el rendimiento de este backport difiera de .NET 4 RTM (aunque no he perfilado esta colección, específicamente). Gran parte de la TPL funciona mejor en .NET 4 que en el backport .NET 3.5.

Dicho esto, sospecho que su implementación superará a BlockingCollection<T> si tiene un solo subproceso productor y un único subproceso de consumidor. Con un productor y un consumidor, su bloqueo tendrá un impacto menor en el rendimiento total, y el evento de reinicio es un medio muy eficaz de esperar del lado del consumidor.

Sin embargo, BlockingCollection<T> está diseñado para permitir que muchos subprocesos de productores "pongan en cola" muy bien los datos. Esto no funcionará bien con su implementación, ya que la contención de bloqueo comenzará a ser problemática con bastante rapidez.

Dicho esto, también me gustaría señalar una idea errónea aquí:

... probablemente ofrecería un mejor rendimiento que cualquier cosa que yo o mis compañeros escribiéramos.

Esto a menudo no es cierto. Las clases de recopilación de marcos generalmente funcionan muy bien , pero a menudo no son la opción más eficaz para un escenario determinado. Dicho esto, tienden a tener un buen desempeño, a la vez que son muy flexibles y muy robustos. A menudo tienden a escalar muy bien. Las clases de colección "escritas por el hogar" a menudo superan las colecciones de marcos en escenarios específicos, pero tienden a ser problemáticas cuando se usan en escenarios fuera del escenario para el cual fueron diseñadas específicamente. Sospecho que esta es una de esas situaciones.


ConurrentQueue/AutoResetEvent BlockingCollection contra un combo ConurrentQueue/AutoResetEvent (similar a la solución de OP, pero sin bloqueo) en .Net 4, y el último combo fue mucho más rápido para mi caso de uso, que abandoné BlockingCollection. Desafortunadamente, esto fue hace casi un año y no pude encontrar los resultados de referencia.

Usar un AutoResetEvent separado no hace que las cosas sean mucho más complicadas. De hecho, uno podría incluso abstraerlo, de una vez por todas, en un BlockingCollectionSlim ...

BlockingCollection también se basa internamente en un ConcurrentQueue, pero does algunos malabares adicionales con semáforos delgados y tokens de cancelación , lo que proporciona características adicionales, pero a un costo, incluso cuando no se usa. También se debe tener en cuenta que BlockingCollection no está casado con ConcurrentQueue, sino que también se puede usar con otros implementadores de IProducerConsumerCollection .

Una implementación de BlockingCollectionSlim ilimitada, bastante básica:

class BlockingCollectionSlim<T> { private readonly ConcurrentQueue<T> _queue = new ConcurrentQueue<T>(); private readonly AutoResetEvent _autoResetEvent = new AutoResetEvent(false); public void Add(T item) { _queue.Enqueue(item); _autoResetEvent.Set(); } public bool TryPeek(out T result) { return _queue.TryPeek(out result); } public T Take() { T item; while (!_queue.TryDequeue(out item)) _autoResetEvent.WaitOne(); return item; } public bool TryTake(out T item, TimeSpan patience) { if (_queue.TryDequeue(out item)) return true; var stopwatch = Stopwatch.StartNew(); while (stopwatch.Elapsed < patience) { if (_queue.TryDequeue(out item)) return true; var patienceLeft = (patience - stopwatch.Elapsed); if (patienceLeft <= TimeSpan.Zero) break; else if (patienceLeft < MinWait) // otherwise the while loop will degenerate into a busy loop, // for the last millisecond before patience runs out patienceLeft = MinWait; _autoResetEvent.WaitOne(patienceLeft); } return false; } private static readonly TimeSpan MinWait = TimeSpan.FromMilliseconds(1);