thread safe current concurrentbag concurrent collection c# multithreading queue concurrent-collections

c# - safe - Cómo trabajar en subprocesos con ConcurrentQueue<T>



thread safe dictionary c# (3)

Estoy tratando de averiguar cuál será la mejor manera de trabajar con una cola. Tengo un proceso que devuelve un DataTable. Cada DataTable, a su vez, se combina con el DataTable anterior. Hay un problema, demasiados registros para mantener hasta la última edición de BulkCopy (OutOfMemory).

Por lo tanto, he determinado que debo procesar cada DataTable entrante inmediatamente. Pensando en el ConcurrentQueue<T> ... pero no veo cómo el método WriteQueuedData() sabría quitar la cola de una tabla y escribirla en la base de datos.

Por ejemplo:

public class TableTransporter { private ConcurrentQueue<DataTable> tableQueue = new ConcurrentQueue<DataTable>(); public TableTransporter() { tableQueue.OnItemQueued += new EventHandler(WriteQueuedData); // no events available } public void ExtractData() { DataTable table; // perform data extraction tableQueue.Enqueue(table); } private void WriteQueuedData(object sender, EventArgs e) { BulkCopy(e.Table); } }

Mi primera pregunta es que, aparte del hecho de que no tengo ningún evento al que suscribirme, si llamo ExtractData() forma asíncrona, ¿será esto todo lo que necesito? En segundo lugar, ¿hay algo que me esté perdiendo de la forma en que funciona ConcurrentQueue<T> y necesito algún tipo de activador para trabajar de forma asíncrona con los objetos en cola?

Actualización Acabo de derivar una clase de ConcurrentQueue<T> que tiene un controlador de eventos OnItemQueued. Entonces:

new public void Enqueue (DataTable Table) { base.Enqueue(Table); OnTableQueued(new TableQueuedEventArgs(Table)); } public void OnTableQueued(TableQueuedEventArgs table) { EventHandler<TableQueuedEventArgs> handler = TableQueued; if (handler != null) { handler(this, table); } }

¿Alguna preocupación sobre esta implementación?


Creo que ConcurrentQueue es útil solo en muy pocos casos. Su principal ventaja es que es libre de bloqueo. Sin embargo, normalmente los subprocesos de producción tienen que informar a los subprocesos de los consumidores de alguna manera que hay datos disponibles para procesar. Esta señalización entre hilos necesita bloqueos y niega el beneficio de usar ConcurrentQueue . La forma más rápida de sincronizar hilos es usando Monitor.Pulse() , que funciona solo dentro de un bloqueo. Todas las demás herramientas de sincronización son incluso más lentas.

Por supuesto, el consumidor solo puede verificar continuamente si hay algo en la cola, que funciona sin bloqueos, pero es una enorme pérdida de recursos del procesador. Un poco mejor es si el consumidor espera entre la comprobación.

Subir un hilo al escribir en la cola es una muy mala idea. El uso de ConcurrentQueue para guardar, tal vez 1 microsegundo, se perderá completamente al ejecutar el eventhandler , lo que puede llevar 1000 veces más.

Si todo el procesamiento se realiza en un controlador de eventos o una llamada asíncrona, la pregunta es ¿por qué todavía se necesita una cola? Mejor pase los datos directamente al manejador y no use una cola en absoluto.

Tenga en cuenta que la implementación de ConcurrentQueue es bastante complicada para permitir la concurrencia. En la mayoría de los casos, es mejor usar una Queue<> normal Queue<> y bloquear todos los accesos a la cola. Dado que el acceso a la cola solo necesita microsegundos, es extremadamente improbable que 2 subprocesos accedan a la cola en el mismo microsegundo y casi nunca habrá demora debido al bloqueo. El uso de una Queue<> normal Queue<> con bloqueo a menudo resultará en una ejecución de código más rápida que ConcurrentQueue .


Desde mi comprensión del problema, te faltan algunas cosas.

La cola concurrente es una estructura de datos diseñada para aceptar múltiples subprocesos que se leen y escriben en la cola sin que tenga que bloquear explícitamente la estructura de datos. (Todo lo que el jazz se ocupa detrás de escena o la colección se implementa de tal manera que no es necesario bloquearla).

Con eso en mente, parece que el patrón que está tratando de usar es el "Producir / Consumidor". Primero, tiene algunas tareas que producen trabajo (y que agrega elementos a la cola). Y segundo, tienes una segunda tarea. Consumir cosas de la cola (y elementos de la cola).

Así que realmente quieres dos hilos: uno que agrega elementos y otro que elimina elementos. Debido a que está utilizando una colección concurrente, puede tener varios subprocesos que agreguen elementos y múltiples subprocesos que los eliminen. Pero obviamente, mientras más contención tenga en la cola concurrente, más rápido se convertirá en el cuello de botella.


Esta es la solución completa para lo que se me ocurrió:

public class TableTransporter { private static int _indexer; private CustomQueue tableQueue = new CustomQueue(); private Func<DataTable, String> RunPostProcess; private string filename; public TableTransporter() { RunPostProcess = new Func<DataTable, String>(SerializeTable); tableQueue.TableQueued += new EventHandler<TableQueuedEventArgs>(tableQueue_TableQueued); } void tableQueue_TableQueued(object sender, TableQueuedEventArgs e) { // do something with table // I can''t figure out is how to pass custom object in 3rd parameter RunPostProcess.BeginInvoke(e.Table,new AsyncCallback(PostComplete), filename); } public void ExtractData() { // perform data extraction tableQueue.Enqueue(MakeTable()); Console.WriteLine("Table count [{0}]", tableQueue.Count); } private DataTable MakeTable() { return new DataTable(String.Format("Table{0}", _indexer++)); } private string SerializeTable(DataTable Table) { string file = Table.TableName + ".xml"; DataSet dataSet = new DataSet(Table.TableName); dataSet.Tables.Add(Table); Console.WriteLine("[{0}]Writing {1}", Thread.CurrentThread.ManagedThreadId, file); string xmlstream = String.Empty; using (MemoryStream memstream = new MemoryStream()) { XmlSerializer xmlSerializer = new XmlSerializer(typeof(DataSet)); XmlTextWriter xmlWriter = new XmlTextWriter(memstream, Encoding.UTF8); xmlSerializer.Serialize(xmlWriter, dataSet); xmlstream = UTF8ByteArrayToString(((MemoryStream)xmlWriter.BaseStream).ToArray()); using (var fileStream = new FileStream(file, FileMode.Create)) fileStream.Write(StringToUTF8ByteArray(xmlstream), 0, xmlstream.Length + 2); } filename = file; return file; } private void PostComplete(IAsyncResult iasResult) { string file = (string)iasResult.AsyncState; Console.WriteLine("[{0}]Completed: {1}", Thread.CurrentThread.ManagedThreadId, file); RunPostProcess.EndInvoke(iasResult); } public static String UTF8ByteArrayToString(Byte[] ArrBytes) { return new UTF8Encoding().GetString(ArrBytes); } public static Byte[] StringToUTF8ByteArray(String XmlString) { return new UTF8Encoding().GetBytes(XmlString); } } public sealed class CustomQueue : ConcurrentQueue<DataTable> { public event EventHandler<TableQueuedEventArgs> TableQueued; public CustomQueue() { } public CustomQueue(IEnumerable<DataTable> TableCollection) : base(TableCollection) { } new public void Enqueue (DataTable Table) { base.Enqueue(Table); OnTableQueued(new TableQueuedEventArgs(Table)); } public void OnTableQueued(TableQueuedEventArgs table) { EventHandler<TableQueuedEventArgs> handler = TableQueued; if (handler != null) { handler(this, table); } } } public class TableQueuedEventArgs : EventArgs { #region Fields #endregion #region Init public TableQueuedEventArgs(DataTable Table) {this.Table = Table;} #endregion #region Functions #endregion #region Properties public DataTable Table {get;set;} #endregion }

Como prueba de concepto, parece funcionar bastante bien. A lo sumo vi 4 hilos de trabajador.