thread safe lists concurrentbag concurrent c# collections properties thread-safety

c# - lists - Propiedad Thread-safe List<T>



concurrentbag c# (10)

Aquí está la clase que solicitó:

namespace AI.Collections { using System; using System.Collections; using System.Collections.Generic; using System.Linq; using System.Runtime.Serialization; using System.Threading.Tasks; using System.Threading.Tasks.Dataflow; /// <summary> /// Just a simple thread safe collection. /// </summary> /// <typeparam name="T"></typeparam> /// <value>Version 1.5</value> /// <remarks>TODO replace locks with AsyncLocks</remarks> [DataContract( IsReference = true )] public class ThreadSafeList<T> : IList<T> { /// <summary> /// TODO replace the locks with a ReaderWriterLockSlim /// </summary> [DataMember] private readonly List<T> _items = new List<T>(); public ThreadSafeList( IEnumerable<T> items = null ) { this.Add( items ); } public long LongCount { get { lock ( this._items ) { return this._items.LongCount(); } } } public IEnumerator<T> GetEnumerator() { return this.Clone().GetEnumerator(); } IEnumerator IEnumerable.GetEnumerator() { return this.GetEnumerator(); } public void Add( T item ) { if ( Equals( default( T ), item ) ) { return; } lock ( this._items ) { this._items.Add( item ); } } public Boolean TryAdd( T item ) { try { if ( Equals( default( T ), item ) ) { return false; } lock ( this._items ) { this._items.Add( item ); return true; } } catch ( NullReferenceException ) { } catch ( ObjectDisposedException ) { } catch ( ArgumentNullException ) { } catch ( ArgumentOutOfRangeException ) { } catch ( ArgumentException ) { } return false; } public void Clear() { lock ( this._items ) { this._items.Clear(); } } public bool Contains( T item ) { lock ( this._items ) { return this._items.Contains( item ); } } public void CopyTo( T[] array, int arrayIndex ) { lock ( this._items ) { this._items.CopyTo( array, arrayIndex ); } } public bool Remove( T item ) { lock ( this._items ) { return this._items.Remove( item ); } } public int Count { get { lock ( this._items ) { return this._items.Count; } } } public bool IsReadOnly { get { return false; } } public int IndexOf( T item ) { lock ( this._items ) { return this._items.IndexOf( item ); } } public void Insert( int index, T item ) { lock ( this._items ) { this._items.Insert( index, item ); } } public void RemoveAt( int index ) { lock ( this._items ) { this._items.RemoveAt( index ); } } public T this[ int index ] { get { lock ( this._items ) { return this._items[ index ]; } } set { lock ( this._items ) { this._items[ index ] = value; } } } /// <summary> /// Add in an enumerable of items. /// </summary> /// <param name="collection"></param> /// <param name="asParallel"></param> public void Add( IEnumerable<T> collection, Boolean asParallel = true ) { if ( collection == null ) { return; } lock ( this._items ) { this._items.AddRange( asParallel ? collection.AsParallel().Where( arg => !Equals( default( T ), arg ) ) : collection.Where( arg => !Equals( default( T ), arg ) ) ); } } public Task AddAsync( T item ) { return Task.Factory.StartNew( () => { this.TryAdd( item ); } ); } /// <summary> /// Add in an enumerable of items. /// </summary> /// <param name="collection"></param> public Task AddAsync( IEnumerable<T> collection ) { if ( collection == null ) { throw new ArgumentNullException( "collection" ); } var produce = new TransformBlock<T, T>( item => item, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = Environment.ProcessorCount } ); var consume = new ActionBlock<T>( action: async obj => await this.AddAsync( obj ), dataflowBlockOptions: new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = Environment.ProcessorCount } ); produce.LinkTo( consume ); return Task.Factory.StartNew( async () => { collection.AsParallel().ForAll( item => produce.SendAsync( item ) ); produce.Complete(); await consume.Completion; } ); } /// <summary> /// Returns a new copy of all items in the <see cref="List{T}" />. /// </summary> /// <returns></returns> public List<T> Clone( Boolean asParallel = true ) { lock ( this._items ) { return asParallel ? new List<T>( this._items.AsParallel() ) : new List<T>( this._items ); } } /// <summary> /// Perform the <paramref name="action" /> on each item in the list. /// </summary> /// <param name="action"> /// <paramref name="action" /> to perform on each item. /// </param> /// <param name="performActionOnClones"> /// If true, the <paramref name="action" /> will be performed on a <see cref="Clone" /> of the items. /// </param> /// <param name="asParallel"> /// Use the <see cref="ParallelQuery{TSource}" /> method. /// </param> /// <param name="inParallel"> /// Use the /// <see /// cref="Parallel.ForEach{TSource}(System.Collections.Generic.IEnumerable{TSource},System.Action{TSource})" /> /// method. /// </param> public void ForEach( Action<T> action, Boolean performActionOnClones = true, Boolean asParallel = true, Boolean inParallel = false ) { if ( action == null ) { throw new ArgumentNullException( "action" ); } var wrapper = new Action<T>( obj => { try { action( obj ); } catch ( ArgumentNullException ) { //if a null gets into the list then swallow an ArgumentNullException so we can continue adding } } ); if ( performActionOnClones ) { var clones = this.Clone( asParallel: asParallel ); if ( asParallel ) { clones.AsParallel().ForAll( wrapper ); } else if ( inParallel ) { Parallel.ForEach( clones, wrapper ); } else { clones.ForEach( wrapper ); } } else { lock ( this._items ) { if ( asParallel ) { this._items.AsParallel().ForAll( wrapper ); } else if ( inParallel ) { Parallel.ForEach( this._items, wrapper ); } else { this._items.ForEach( wrapper ); } } } } /// <summary> /// Perform the <paramref name="action" /> on each item in the list. /// </summary> /// <param name="action"> /// <paramref name="action" /> to perform on each item. /// </param> /// <param name="performActionOnClones"> /// If true, the <paramref name="action" /> will be performed on a <see cref="Clone" /> of the items. /// </param> /// <param name="asParallel"> /// Use the <see cref="ParallelQuery{TSource}" /> method. /// </param> /// <param name="inParallel"> /// Use the /// <see /// cref="Parallel.ForEach{TSource}(System.Collections.Generic.IEnumerable{TSource},System.Action{TSource})" /> /// method. /// </param> public void ForAll( Action<T> action, Boolean performActionOnClones = true, Boolean asParallel = true, Boolean inParallel = false ) { if ( action == null ) { throw new ArgumentNullException( "action" ); } var wrapper = new Action<T>( obj => { try { action( obj ); } catch ( ArgumentNullException ) { //if a null gets into the list then swallow an ArgumentNullException so we can continue adding } } ); if ( performActionOnClones ) { var clones = this.Clone( asParallel: asParallel ); if ( asParallel ) { clones.AsParallel().ForAll( wrapper ); } else if ( inParallel ) { Parallel.ForEach( clones, wrapper ); } else { clones.ForEach( wrapper ); } } else { lock ( this._items ) { if ( asParallel ) { this._items.AsParallel().ForAll( wrapper ); } else if ( inParallel ) { Parallel.ForEach( this._items, wrapper ); } else { this._items.ForEach( wrapper ); } } } } } }

Quiero una implementación de List<T> como una propiedad que se puede utilizar de forma segura sin ninguna duda.

Algo como esto:

private List<T> _list; private List<T> MyT { get { // return a copy of _list; } set { _list = value; } }

Parece que todavía necesito devolver una copia (clonada) de la colección, de modo que si en algún lugar estamos iterando la colección y, al mismo tiempo, se establece la colección, no se genera ninguna excepción.

¿Cómo implementar una propiedad de recolección segura para subprocesos?


Básicamente, si desea enumerar de manera segura, necesita usar el bloqueo.

Por favor refiérase a MSDN en esto. http://msdn.microsoft.com/en-us/library/6sh2ey19.aspx

Aquí hay una parte de MSDN que podría interesarle:

Los miembros públicos estáticos (Shared en Visual Basic) de este tipo son seguros para subprocesos. No se garantiza que ningún miembro de instancia sea seguro para subprocesos.

Una lista puede admitir varios lectores al mismo tiempo, siempre que la colección no se modifique. Enumerar a través de una colección no es intrínsecamente un procedimiento seguro para subprocesos. En el raro caso en que una enumeración contenga uno o más accesos de escritura, la única forma de garantizar la seguridad de la secuencia es bloquear la colección durante toda la enumeración. Para permitir que la colección sea accedida por múltiples hilos para lectura y escritura, debe implementar su propia sincronización.


Creo que _list.ToList() te hará una copia. También puede consultarlo si lo necesita, como por ejemplo:

_list.Select("query here").ToList();

De todos modos, msdn dice que esto es de hecho una copia y no simplemente una referencia. Ah, y sí, tendrás que bloquear el método set como los otros lo han señalado.


Incluso cuando obtuvo la mayor cantidad de votos, uno usualmente no puede tomar System.Collections.Concurrent.ConcurrentBag<T> como un reemplazo seguro de subprocesos para System.Collections.Generic.List<T> tal como está (Radek Stromský ya lo señaló) fuera) no ordenado.

Pero hay una clase llamada System.Collections.Generic.SynchronizedCollection<T> que ya está en .NET 3.0 como parte del framework, pero está así bien escondida en una ubicación donde no se espera que sea poco conocida y probablemente nunca te has tropezado con eso (al menos nunca lo hice).

SynchronizedCollection<T> se compila en el ensamblado System.ServiceModel.dll (que es parte del perfil del cliente pero no de la biblioteca de clases portátil).

Espero que ayude.


Incluso la respuesta aceptada es ConcurrentBag, no creo que sea un reemplazo real de la lista en todos los casos, como dice Radek en la respuesta: "ConcurrentBag es una colección desordenada, por lo que a diferencia de List no garantiza el orden. Tampoco se puede acceder a los elementos por índice ".

Por lo tanto, si usa .NET 4.0 o una versión posterior, una solución alternativa podría ser utilizar ConcurrentDictionary con TKey entero como índice de matriz y TValue como valor de matriz. Esta es una forma recomendada de reemplazar la lista en el curso C # Concurrent Collections de Pluralsight. ConcurrentDictionary resuelve ambos problemas mencionados anteriormente: acceso y ordenación de índices (no podemos confiar en los pedidos ya que su tabla hash está bajo el capó, pero la implementación actual de .NET ahorra el orden de adición de elementos).


Puedes usar:

var threadSafeArrayList = ArrayList.Synchronized(new ArrayList());

crear Thread Safe ArrayLsit


Si está orientando .Net 4, hay algunas opciones en System.Collections.Concurrent Namespace

Puede usar ConcurrentBag<T> en este caso en lugar de List<T>


También puedes usar el más primitivo

Monitor.Enter(lock); Monitor.Exit(lock);

qué bloqueo usa (ver esta publicación C # Bloqueo de un objeto que se reasigna en el bloque de bloqueo ).

Si espera excepciones en el código, esto no es seguro, pero le permite hacer algo como lo siguiente:

using System; using System.Collections.Generic; using System.Threading; using System.Linq; public class Something { private readonly object _lock; private readonly List<string> _contents; public Something() { _lock = new object(); _contents = new List<string>(); } public Modifier StartModifying() { return new Modifier(this); } public class Modifier : IDisposable { private readonly Something _thing; public Modifier(Something thing) { _thing = thing; Monitor.Enter(Lock); } public void OneOfLotsOfDifferentOperations(string input) { DoSomethingWith(input); } private void DoSomethingWith(string input) { Contents.Add(input); } private List<string> Contents { get { return _thing._contents; } } private object Lock { get { return _thing._lock; } } public void Dispose() { Monitor.Exit(Lock); } } } public class Caller { public void Use(Something thing) { using (var modifier = thing.StartModifying()) { modifier.OneOfLotsOfDifferentOperations("A"); modifier.OneOfLotsOfDifferentOperations("B"); modifier.OneOfLotsOfDifferentOperations("A"); modifier.OneOfLotsOfDifferentOperations("A"); modifier.OneOfLotsOfDifferentOperations("A"); } } }

Una de las cosas buenas de esto es que obtendrá el bloqueo durante la serie de operaciones (en lugar de bloquear cada operación). Lo que significa que la salida debería salir en los fragmentos correctos (mi uso de esto fue obtener algún resultado en la pantalla de un proceso externo)

Realmente me gusta la simplicidad + transparencia de ThreadSafeList + que hace lo importante para detener los bloqueos


Use la instrucción de lock para hacer esto. ( Lea aquí para más información ) .

private List<T> _list; private List<T> MyT { get { return _list; } set { //Lock so only one thread can change the value at any given time. lock (_list) { _list = value; } } }

Para tu información, probablemente no es exactamente lo que estás preguntando: es probable que quieras bloquear tu código, pero no puedo asumirlo. Eche un vistazo a la palabra clave de lock y adapte su uso a su situación específica.

Si es necesario, puede lock tanto el bloque get como el set usando la variable _list , lo que lo haría para que una lectura / escritura no pueda ocurrir al mismo tiempo.


Yo pensaría que hacer una clase ThreadSafeList de muestra sería fácil:

public class ThreadSafeList<T> : IList<T> { protected List<T> _interalList = new List<T>(); // Other Elements of IList implementation public IEnumerator<T> GetEnumerator() { return Clone().GetEnumerator(); } System.Collections.IEnumerator System.Collections.IEnumerable.GetEnumerator() { return Clone().GetEnumerator(); } protected static object _lock = new object(); public List<T> Clone() { List<T> newList = new List<T>(); lock (_lock) { _interalList.ForEach(x => newList.Add(x)); } return newList; } }

Simplemente clone la lista antes de solicitar un enumerador y, por lo tanto, cualquier enumeración funciona en una copia que no se puede modificar mientras se ejecuta.