programming parallel net examples example asp c# .net parallel-processing task-parallel-library

c# - parallel - No ConcurrentList<T> en.Net 4.0?



task c# example (10)

Estaba encantado de ver el nuevo espacio de nombres de System.Collections.Concurrent en .Net 4.0, ¡bastante bueno! He visto ConcurrentDictionary , ConcurrentQueue , ConcurrentStack , ConcurrentBag y BlockingCollection .

Una cosa que parece estar misteriosamente ausente es una ConcurrentList<T> . ¿Debo escribir eso yo mismo (o sacarlo de la web :))?

¿Me estoy perdiendo algo obvio aquí?


¿Para qué usarías una ConcurrentList?

El concepto de contenedor de acceso aleatorio en un mundo enhebrado no es tan útil como puede parecer. La declaración

if (i < MyConcurrentList.Count) x = MyConcurrentList[i];

como un todo, aún no estaría seguro para subprocesos.

En lugar de crear una Lista concurrente, intente construir soluciones con lo que hay allí. Las clases más comunes son el ConcurrentBag y especialmente el BlockingCollection.


Al ejecutar el código secuencialmente, las estructuras de datos utilizadas son diferentes (bien escritas) al ejecutar código simultáneamente. La razón es que el código secuencial implica un orden implícito. Sin embargo, el código concurrente no implica ningún orden; ¡mejor aún, implica la falta de un orden definido!

Debido a esto, las estructuras de datos con un orden implícito (como la Lista) no son muy útiles para resolver problemas concurrentes. Una lista implica orden, pero no define claramente qué es ese orden. Debido a esto, el orden de ejecución del código que manipula la lista determinará (hasta cierto punto) el orden implícito de la lista, que está en conflicto directo con una solución concurrente eficiente.

Recuerde que la concurrencia es un problema de datos, no un problema de código. No puede Implementar el código primero (o reescribir el código secuencial existente) y obtener una solución simultánea bien diseñada. Primero debe diseñar las estructuras de datos, teniendo en cuenta que el ordenamiento implícito no existe en un sistema concurrente.


Algunas personas destacaron algunos puntos de los bienes (y algunos de mis pensamientos):

  • Podría parecer una locura al acceso aleatorio no permitido (indizador) pero a mí me parece bien. Solo debe pensar que hay muchos métodos en las colecciones de subprocesos múltiples que pueden fallar como Indizador y Eliminar. También podría definir la acción de falla (retroceso) para el acceso de escritura como "error" o simplemente "agregar al final".
  • No es porque sea una colección multiproceso que siempre se usará en un contexto multiproceso. O también podría ser utilizado por un solo escritor y un lector.
  • Otra forma de poder utilizar el indexador de manera segura podría ser envolver acciones en un bloqueo de la colección utilizando su raíz (si se hace pública).
  • Para muchas personas, hacer un rootLock visible va contra la "buena práctica". No estoy 100% seguro de este punto porque si está oculto, se elimina mucha flexibilidad para el usuario. Siempre debemos recordar que programar multiproceso no es para nadie. No podemos evitar todo tipo de uso incorrecto.
  • Microsoft tendrá que trabajar y definir un nuevo estándar para introducir el uso adecuado de la colección de subprocesos múltiples. Primero, el IEnumerator no debería tener un moveNext, pero debería tener un GetNext que devuelva verdadero o falso y obtener un parámetro de tipo T (de esta manera la iteración ya no estaría bloqueando). Además, Microsoft ya usa "usar" internamente en el foreach pero a veces usa el IEnumerator directamente sin envolverlo con "usar" (un error en la vista de colección y probablemente en más lugares) - El uso de IEnumerator como envoltorio es una práctica recomendada por Microsoft. Este error elimina el buen potencial para el iterador seguro ... Iterator que bloquea la recopilación en el constructor y desbloquea su método Dispose - para un método foreach de bloqueo.

Esa no es una respuesta. Esto solo son comentarios que realmente no se ajustan a un lugar específico.

... Mi conclusión, Microsoft tiene que hacer algunos cambios profundos al "foreach" para hacer que la colección MultiThreaded sea más fácil de usar. También debe seguir sus propias reglas de uso de IEnumerator. Hasta eso, podemos escribir un MultiThreadList fácilmente que usaría un iterador de bloqueo pero que no seguirá "IList". En su lugar, deberá definir su propia interfaz "IListPersonnal" que podría fallar en "insertar", "eliminar" y acceso aleatorio (indexador) sin excepción. ¿Pero quién querrá usarlo si no es estándar?


Con todo el respeto debido a las excelentes respuestas que ya se proporcionaron, hay momentos en los que simplemente quiero un IList seguro para subprocesos. Nada avanzado o elegante. El rendimiento es importante en muchos casos, pero a veces eso no es una preocupación. Sí, siempre habrá desafíos sin métodos como "TryGetValue", etc., pero en la mayoría de los casos solo quiero algo que pueda enumerar sin tener que preocuparme por poner bloqueos alrededor de todo. Y sí, alguien probablemente encuentre algún "error" en mi implementación que podría llevar a un punto muerto o algo así (supongo) pero seamos honestos: cuando se trata de multi-threading, si no escribe su código correctamente, se va a estancamiento de todos modos. Con esto en mente, decidí hacer una implementación simple de ConcurrentList que proporciona estas necesidades básicas.

Y por lo que vale: hice una prueba básica de agregar 10,000,000 de artículos a la Lista regular y a la Lista concurrente y los resultados fueron:

Lista terminada en: 7793 milisegundos. Concurrente terminado en: 8064 milisegundos.

public class ConcurrentList<T> : IList<T>, IDisposable { #region Fields private readonly List<T> _list; private readonly ReaderWriterLockSlim _lock; #endregion #region Constructors public ConcurrentList() { this._lock = new ReaderWriterLockSlim(LockRecursionPolicy.NoRecursion); this._list = new List<T>(); } public ConcurrentList(int capacity) { this._lock = new ReaderWriterLockSlim(LockRecursionPolicy.NoRecursion); this._list = new List<T>(capacity); } public ConcurrentList(IEnumerable<T> items) { this._lock = new ReaderWriterLockSlim(LockRecursionPolicy.NoRecursion); this._list = new List<T>(items); } #endregion #region Methods public void Add(T item) { try { this._lock.EnterWriteLock(); this._list.Add(item); } finally { this._lock.ExitWriteLock(); } } public void Insert(int index, T item) { try { this._lock.EnterWriteLock(); this._list.Insert(index, item); } finally { this._lock.ExitWriteLock(); } } public bool Remove(T item) { try { this._lock.EnterWriteLock(); return this._list.Remove(item); } finally { this._lock.ExitWriteLock(); } } public void RemoveAt(int index) { try { this._lock.EnterWriteLock(); this._list.RemoveAt(index); } finally { this._lock.ExitWriteLock(); } } public int IndexOf(T item) { try { this._lock.EnterReadLock(); return this._list.IndexOf(item); } finally { this._lock.ExitReadLock(); } } public void Clear() { try { this._lock.EnterWriteLock(); this._list.Clear(); } finally { this._lock.ExitWriteLock(); } } public bool Contains(T item) { try { this._lock.EnterReadLock(); return this._list.Contains(item); } finally { this._lock.ExitReadLock(); } } public void CopyTo(T[] array, int arrayIndex) { try { this._lock.EnterReadLock(); this._list.CopyTo(array, arrayIndex); } finally { this._lock.ExitReadLock(); } } public IEnumerator<T> GetEnumerator() { return new ConcurrentEnumerator<T>(this._list, this._lock); } IEnumerator IEnumerable.GetEnumerator() { return new ConcurrentEnumerator<T>(this._list, this._lock); } ~ConcurrentList() { this.Dispose(false); } public void Dispose() { this.Dispose(true); } private void Dispose(bool disposing) { if (disposing) GC.SuppressFinalize(this); this._lock.Dispose(); } #endregion #region Properties public T this[int index] { get { try { this._lock.EnterReadLock(); return this._list[index]; } finally { this._lock.ExitReadLock(); } } set { try { this._lock.EnterWriteLock(); this._list[index] = value; } finally { this._lock.ExitWriteLock(); } } } public int Count { get { try { this._lock.EnterReadLock(); return this._list.Count; } finally { this._lock.ExitReadLock(); } } } public bool IsReadOnly { get { return false; } } #endregion } public class ConcurrentEnumerator<T> : IEnumerator<T> { #region Fields private readonly IEnumerator<T> _inner; private readonly ReaderWriterLockSlim _lock; #endregion #region Constructor public ConcurrentEnumerator(IEnumerable<T> inner, ReaderWriterLockSlim @lock) { this._lock = @lock; this._lock.EnterReadLock(); this._inner = inner.GetEnumerator(); } #endregion #region Methods public bool MoveNext() { return _inner.MoveNext(); } public void Reset() { _inner.Reset(); } public void Dispose() { this._lock.ExitReadLock(); } #endregion #region Properties public T Current { get { return _inner.Current; } } object IEnumerator.Current { get { return _inner.Current; } } #endregion }


En los casos en que las lecturas exceden en gran medida a las escrituras, o las escrituras (por frecuentes que sean) no coincidentes , puede ser apropiado un enfoque de copiado en escritura .

La implementación que se muestra a continuación es

  • sin cerradura
  • increíblemente rápido para lecturas simultáneas , incluso mientras las modificaciones concurrentes están en curso, sin importar cuánto tiempo tomen
  • porque las "instantáneas" son inmutables, la atomicidad sin cerradura es posible, es decir var snap = _list; snap[snap.Count - 1]; var snap = _list; snap[snap.Count - 1]; nunca (bueno, excepto por una lista vacía del curso) throw, y también obtendrá una enumeración segura de subprocesos con semántica de instantáneas de forma gratuita ... ¡cómo ME ENCANTÓ la inmutabilidad!
  • implementado genéricamente , aplicable a cualquier estructura de datos y cualquier tipo de modificación
  • muerto simple , es decir, fácil de probar, depurar, verificar leyendo el código
  • utilizable en .Net 3.5

Para que el copy-on-write funcione, debe mantener sus estructuras de datos efectivamente inmutables , es decir, nadie puede cambiarlas una vez que las haya puesto a disposición de otros hilos. Cuando quiera modificar, usted

  1. clonar la estructura
  2. hacer modificaciones en el clon
  3. intercambiar atómicamente en la referencia al clon modificado

Código

static class CopyOnWriteSwapper { public static void Swap<T>(ref T obj, Func<T, T> cloner, Action<T> op) where T : class { while (true) { var objBefore = Volatile.Read(ref obj); var newObj = cloner(objBefore); op(newObj); if (Interlocked.CompareExchange(ref obj, newObj, objBefore) == objBefore) return; } } }

Uso

CopyOnWriteSwapper.Swap(ref _myList, orig => new List<string>(orig), clone => clone.Add("asdf"));

Si necesita más rendimiento, ayudará a desgenerizar el método, por ejemplo, crear un método para cada tipo de modificación (Agregar, Eliminar, ...) que desee, y codificar los punteros de la función cloner y op .

NB # 1 Es su responsabilidad asegurarse de que nadie modifique la estructura de datos (supuestamente) inmutable. No hay nada que podamos hacer en una implementación genérica para evitar eso, pero cuando se especializa en List<T> , puede evitar modificaciones utilizando List.AsReadOnly()

NB # 2 Tenga cuidado con los valores en la lista. El enfoque de copiar al escribir protege la membresía de su lista solamente, pero si no coloca cadenas, sino otros objetos mutables allí, debe cuidar la seguridad del hilo (por ejemplo, bloquear). Pero eso es ortogonal a esta solución y, por ejemplo, el bloqueo de los valores mutables se puede usar fácilmente sin problemas. Solo debes ser consciente de ello.

NB # 3 Si su estructura de datos es enorme y la modifica con frecuencia, el enfoque de copiar-todo-escribir podría ser prohibitivo tanto en términos de consumo de memoria como del costo de copia de la CPU. En ese caso, es posible que desee utilizar Colecciones Inmutables de MS en su lugar.


Implementé uno similar al de Brian''s . El mío es diferente:

  • Administro la matriz directamente.
  • No entro en los bloqueos dentro del bloque try.
  • Yo uso el yield return para producir un enumerador.
  • Apoyo la recursion de bloqueo Esto permite lecturas de la lista durante la iteración.
  • Uso bloqueos de lectura actualizables siempre que sea posible.
  • DoSync métodos DoSync y GetSync permiten interacciones secuenciales que requieren acceso exclusivo a la lista.

El código :

public class ConcurrentList<T> : IList<T>, IDisposable { private ReaderWriterLockSlim _lock = new ReaderWriterLockSlim(LockRecursionPolicy.SupportsRecursion); private int _count = 0; public int Count { get { _lock.EnterReadLock(); try { return _count; } finally { _lock.ExitReadLock(); } } } public int InternalArrayLength { get { _lock.EnterReadLock(); try { return _arr.Length; } finally { _lock.ExitReadLock(); } } } private T[] _arr; public ConcurrentList(int initialCapacity) { _arr = new T[initialCapacity]; } public ConcurrentList():this(4) { } public ConcurrentList(IEnumerable<T> items) { _arr = items.ToArray(); _count = _arr.Length; } public void Add(T item) { _lock.EnterWriteLock(); try { var newCount = _count + 1; EnsureCapacity(newCount); _arr[_count] = item; _count = newCount; } finally { _lock.ExitWriteLock(); } } public void AddRange(IEnumerable<T> items) { if (items == null) throw new ArgumentNullException("items"); _lock.EnterWriteLock(); try { var arr = items as T[] ?? items.ToArray(); var newCount = _count + arr.Length; EnsureCapacity(newCount); Array.Copy(arr, 0, _arr, _count, arr.Length); _count = newCount; } finally { _lock.ExitWriteLock(); } } private void EnsureCapacity(int capacity) { if (_arr.Length >= capacity) return; int doubled; checked { try { doubled = _arr.Length * 2; } catch (OverflowException) { doubled = int.MaxValue; } } var newLength = Math.Max(doubled, capacity); Array.Resize(ref _arr, newLength); } public bool Remove(T item) { _lock.EnterUpgradeableReadLock(); try { var i = IndexOfInternal(item); if (i == -1) return false; _lock.EnterWriteLock(); try { RemoveAtInternal(i); return true; } finally { _lock.ExitWriteLock(); } } finally { _lock.ExitUpgradeableReadLock(); } } public IEnumerator<T> GetEnumerator() { _lock.EnterReadLock(); try { for (int i = 0; i < _count; i++) // deadlocking potential mitigated by lock recursion enforcement yield return _arr[i]; } finally { _lock.ExitReadLock(); } } IEnumerator IEnumerable.GetEnumerator() { return this.GetEnumerator(); } public int IndexOf(T item) { _lock.EnterReadLock(); try { return IndexOfInternal(item); } finally { _lock.ExitReadLock(); } } private int IndexOfInternal(T item) { return Array.FindIndex(_arr, 0, _count, x => x.Equals(item)); } public void Insert(int index, T item) { _lock.EnterUpgradeableReadLock(); try { if (index > _count) throw new ArgumentOutOfRangeException("index"); _lock.EnterWriteLock(); try { var newCount = _count + 1; EnsureCapacity(newCount); // shift everything right by one, starting at index Array.Copy(_arr, index, _arr, index + 1, _count - index); // insert _arr[index] = item; _count = newCount; } finally { _lock.ExitWriteLock(); } } finally { _lock.ExitUpgradeableReadLock(); } } public void RemoveAt(int index) { _lock.EnterUpgradeableReadLock(); try { if (index >= _count) throw new ArgumentOutOfRangeException("index"); _lock.EnterWriteLock(); try { RemoveAtInternal(index); } finally { _lock.ExitWriteLock(); } } finally { _lock.ExitUpgradeableReadLock(); } } private void RemoveAtInternal(int index) { Array.Copy(_arr, index + 1, _arr, index, _count - index-1); _count--; // release last element Array.Clear(_arr, _count, 1); } public void Clear() { _lock.EnterWriteLock(); try { Array.Clear(_arr, 0, _count); _count = 0; } finally { _lock.ExitWriteLock(); } } public bool Contains(T item) { _lock.EnterReadLock(); try { return IndexOfInternal(item) != -1; } finally { _lock.ExitReadLock(); } } public void CopyTo(T[] array, int arrayIndex) { _lock.EnterReadLock(); try { if(_count > array.Length - arrayIndex) throw new ArgumentException("Destination array was not long enough."); Array.Copy(_arr, 0, array, arrayIndex, _count); } finally { _lock.ExitReadLock(); } } public bool IsReadOnly { get { return false; } } public T this[int index] { get { _lock.EnterReadLock(); try { if (index >= _count) throw new ArgumentOutOfRangeException("index"); return _arr[index]; } finally { _lock.ExitReadLock(); } } set { _lock.EnterUpgradeableReadLock(); try { if (index >= _count) throw new ArgumentOutOfRangeException("index"); _lock.EnterWriteLock(); try { _arr[index] = value; } finally { _lock.ExitWriteLock(); } } finally { _lock.ExitUpgradeableReadLock(); } } } public void DoSync(Action<ConcurrentList<T>> action) { GetSync(l => { action(l); return 0; }); } public TResult GetSync<TResult>(Func<ConcurrentList<T>,TResult> func) { _lock.EnterWriteLock(); try { return func(this); } finally { _lock.ExitWriteLock(); } } public void Dispose() { _lock.Dispose(); } }


La razón por la cual no existe una Lista concurrente es porque fundamentalmente no se puede escribir. La razón es que varias operaciones importantes en IList dependen de índices, y eso simplemente no funcionará. Por ejemplo:

int catIndex = list.IndexOf("cat"); list.Insert(catIndex, "dog");

El efecto que persigue el autor es insertar "perro" antes que "gato", pero en un entorno multiproceso, cualquier cosa puede pasar a la lista entre esas dos líneas de código. Por ejemplo, otro hilo podría hacer una list.RemoveAt(0) , desplazando toda la lista hacia la izquierda, pero lo más importante es que catIndex no cambiará. El impacto aquí es que la operación de Insert pondrá al "perro" detrás del gato, no antes.

Las diversas implementaciones que ve como "respuestas" a esta pregunta son buenas, pero como se muestra arriba, no ofrecen resultados confiables. Si realmente quieres una semántica similar a una lista en un entorno multiproceso, no puedes lograrlo colocando bloqueos dentro de los métodos de implementación de la lista. Debe asegurarse de que cualquier índice que utilice viva por completo dentro del contexto del bloqueo. El resultado es que puede usar una Lista en un entorno multiproceso con el bloqueo correcto, pero no se puede hacer que la lista exista en ese mundo.

Si cree que necesita una lista concurrente, en realidad solo hay dos posibilidades:

  1. Lo que realmente necesitas es un ConcurrentBag
  2. Necesita crear su propia colección, quizás implementada con una Lista y su propio control de concurrencia.

Si tiene un ConcurrentBag y está en una posición donde necesita pasarlo como un IList, entonces tiene un problema, porque el método que está llamando ha especificado que podrían intentar hacer algo como lo hice anteriormente con el gato y perro. En la mayoría de los mundos, lo que eso significa es que el método que está llamando simplemente no está diseñado para funcionar en un entorno de subprocesos múltiples. Eso significa que o lo refactoriza para que lo sea o, si no puede, tendrá que manejarlo con mucho cuidado. Es casi seguro que se le pedirá que cree su propia colección con sus propios bloqueos y llame al método ofensivo dentro de un candado.


Lo intenté hace un tiempo (también: en GitHub ). Mi implementación tuvo algunos problemas, que no entraré aquí. Déjame decirte, más importante, lo que aprendí.

En primer lugar, no hay manera de que obtenga una implementación completa de IList<T> que no tiene cerrojo y es seguro para subprocesos. En particular, las inserciones y eliminaciones aleatorias no van a funcionar, a menos que también se olvide de O (1) acceso aleatorio (es decir, a menos que "haga trampa" y simplemente use algún tipo de lista vinculada y deje que la indexación se chupe).

Lo que pensé que podría valer la pena era un subconjunto limitado de subprocesos de IList<T> : en particular, uno que permitiera Add y proporcionar acceso aleatorio de solo lectura por índice (pero sin Insert , RemoveAt , etc., y también sin acceso de escritura aleatorio).

Este fue el objetivo de mi implementación de ConcurrentList<T> . Pero cuando probé su rendimiento en escenarios multiproceso, descubrí que simplemente sincronizar las adiciones a una List<T> era más rápido . Básicamente, agregar una List<T> es muy rápido; la complejidad de los pasos computacionales involucrados es minúscula (incrementar un índice y asignarlo a un elemento en una matriz; eso es realmente ). Necesitaría una tonelada de escrituras concurrentes para ver cualquier tipo de contención de bloqueo en esto; e incluso entonces, el rendimiento promedio de cada escritura aún superaría la implementación más costosa aunque sin bloqueos en ConcurrentList<T> .

En el caso relativamente raro de que la matriz interna de la lista necesite redimensionarse, paga un pequeño costo. Finalmente, llegué a la conclusión de que este era el escenario de nicho en el que tendría sentido un tipo de colección ConcurrentList<T> solo agregado: cuando desea una baja sobrecarga garantizada para agregar un elemento en cada llamada individual (a diferencia de un objetivo de rendimiento amortizado) )

Simplemente no es una clase tan útil como se podría pensar.


ConcurrentList (como una matriz redimensionable, no una lista vinculada) no es fácil de escribir con operaciones sin bloqueo. Su API no se traduce bien en una versión "concurrente".


System.Collections.Generic.List<t> ya es seguro para múltiples lectores. Tratar de que sea seguro para múltiples escritores no tendría sentido. (Por las razones que Henk y Stephen ya mencionaron)