recorrer for datos coleccion array c# .net wpf task-parallel-library blockingcollection

datos - foreach object c#



¿Por qué la iteración sobre GetConsumingEnumerable() no vacía completamente la colección de bloqueo subyacente? (4)

Tengo un problema cuantificable y repetible que utiliza la biblioteca paralela de tareas, BlockingCollection<T> , ConcurrentQueue<T> y GetConsumingEnumerable al intentar crear una canalización simple.

En pocas palabras, agregar entradas a un BlockingCollection<T> predeterminado (que bajo el capó se basa en un ConcurrentQueue<T> ) de un subproceso, no garantiza que se salgan del BlockingCollection<T> de otro subproceso que llama al GetConsumingEnumerable() Método.

He creado una aplicación Winforms muy simple para reproducir / simular esto que solo imprime números enteros en la pantalla.

  • Timer1 es responsable de poner en cola los elementos de trabajo ... Utiliza un diccionario concurrente llamado _tracker para que sepa lo que ya ha agregado a la colección de bloqueo.
  • Timer2 solo está registrando el estado de recuento de BlockingCollection y del _tracker
  • El botón de INICIO inicia un Paralell.ForEach que simplemente itera sobre las colecciones de bloqueo GetConsumingEnumerable() y comienza a imprimirlas en el segundo cuadro de lista.
  • El botón STOP detiene el Timer1 evita que se Timer1 más entradas a la colección de bloqueo.

public partial class Form1 : Form { private int Counter = 0; private BlockingCollection<int> _entries; private ConcurrentDictionary<int, int> _tracker; private CancellationTokenSource _tokenSource; private TaskFactory _factory; public Form1() { _entries = new BlockingCollection<int>(); _tracker = new ConcurrentDictionary<int, int>(); _tokenSource = new CancellationTokenSource(); _factory = new TaskFactory(); InitializeComponent(); } private void timer1_Tick(object sender, EventArgs e) { //ADDING TIMER -> LISTBOX 1 for(var i = 0; i < 3; i++,Counter++) { if (_tracker.TryAdd(Counter, Counter)) _entries.Add(Counter); listBox1.Items.Add(string.Format("Adding {0}", Counter)); } } private void timer2_Tick_1(object sender, EventArgs e) { //LOGGING TIMER -> LIST BOX 3 listBox3.Items.Add(string.Format("Tracker Count : {0} / Entries Count : {1}", _tracker.Count, _entries.Count)); } private void button1_Click(object sender, EventArgs e) { //START BUTTON -> LOGS TO LIST BOX 2 var options = new ParallelOptions { CancellationToken = _tokenSource.Token, MaxDegreeOfParallelism = 1 }; _factory.StartNew(() => { Parallel.ForEach(_entries.GetConsumingEnumerable(), options, DoWork); }); timer1.Enabled = timer2.Enabled = true; timer1.Start(); timer2.Start(); } private void DoWork(int entry) { Thread.Sleep(1000); //Sleep for 1 second to simulate work being done. Invoke((MethodInvoker)(() => listBox2.Items.Add(string.Format("Processed {0}", entry)))); int oldEntry; _tracker.TryRemove(entry, out oldEntry); } private void button2_Click(object sender, EventArgs e) { //STOP BUTTON timer1.Stop(); timer1.Enabled = false; }

Aquí está la secuencia de eventos:

  • Presiona inicio
  • Timer1 ticks & ListBox1 se actualiza inmediatamente con 3 mensajes (Añadiendo 0, 1, 2)
  • ListBox2 se actualiza posteriormente con 3 mensajes, 1 segundo de diferencia
    • Procesando 0
    • Procesando 1
    • Procesamiento 2
  • Timer1 ticks & ListBox1 se actualiza inmediatamente con 3 mensajes (Agregando 3, 4, 5)
  • ListBox2 se actualiza posteriormente con 2 mensajes, con 1 segundo de diferencia
    • Procesando 3
    • Procesamiento 4
    • El proceso 5 no se imprime ... parece que se ha "perdido"
  • Presione STOP para evitar que el temporizador 1 agregue más mensajes
  • Espera ... "Procesando 5" todavía no aparece

Puede ver que el diccionario concurrente sigue rastreando que 1 elemento aún no se ha procesado y posteriormente se eliminó de _tracker

Si vuelvo a presionar Iniciar, el temporizador 1 comienza a agregar más 3 entradas más y el bucle Paralelo vuelve a la vida útil de impresión 5, 6, 7 y 8.

Estoy completamente perdido en cuanto a por qué ocurre esto. Llamar a empezar de nuevo obviamente llama a una nueva tarea, que llama a Paralell foreach, y vuelve a ejecutar GetConsumingEnumerable (), que encuentra mágicamente la entrada que falta ... I

¿Por qué BlockingCollection.GetConsumingEnumerable() no garantiza la iteración sobre cada elemento que se agrega a la colección?

¿Por qué la adición de más entradas posteriormente provoca que se "despegue" y continúa con su procesamiento?


A partir de .net 4.5, puede crear un particionador que tomará solo 1 elemento a la vez:

var partitioner = Partitioner.Create(jobsBatchesQ.queue.GetConsumingEnumerable(), EnumerablePartitionerOptions.NoBuffering); Parallel.ForEach(partitioner, new ParallelOptions { MaxDegreeOfParallelism = (currentTask.ParallelLevel > 0 ? currentTask.ParallelLevel : 1) }, (batch, state) => {//do stuff}

https://msdn.microsoft.com/en-us/library/system.collections.concurrent.enumerablepartitioneroptions(v=vs.110).aspx


No pude replicar su comportamiento con una aplicación de consola simple que hace básicamente lo mismo (ejecutándose en .Net 4.5 beta, lo que podría marcar la diferencia). Pero creo que la razón por la que esto sucede es que Parallel.ForEach() intenta optimizar la ejecución dividiendo la colección de entrada en fragmentos. Y con su enumerable, no se puede crear un fragmento hasta que agregue más elementos a la colección. Para obtener más información, consulte Particiones personalizadas para PLINQ y TPL en MSDN .

Para solucionar este problema, no utilice Parallel.ForEach() . Si aún desea procesar los elementos en paralelo, puede iniciar una Task en cada iteración.


No puede usar GetConsumingEnumerable() en Parallel.ForEach() .

Utilice el GetConsumingPartitioner de los blogs.msdn.com/b/pfxteam/archive/2010/04/06/9990420.aspx

En la publicación del blog también obtendrá una explicación de por qué no se puede usar GetConsumingEnumerable()

El algoritmo de partición empleado de manera predeterminada tanto por Parallel.ForEach como por PLINQ usa el fragmentado para minimizar los costos de sincronización: en lugar de tomar el bloqueo una vez por elemento, tomará el bloqueo, tomará un grupo de elementos (un fragmento) y luego suelte la cerradura.

es decir, Parallel.ForEach espere hasta que reciba un grupo de elementos de trabajo antes de continuar. Exactamente lo que muestra tu experimento.


Siento que debo tener en cuenta que en los casos en que pueda llamar al método .CompleteAdding () de BlockingCollection antes de ejecutar Parallel.foreach, el problema que describió anteriormente no será un problema. He usado estos dos objetos juntos muchas veces con excelentes resultados.

Además, siempre puedes restablecer tu BlockingCollection después de llamar a CompleteAdding () para agregar más elementos cuando sea necesario (_entries = new BlockingCollection ();)

Cambiar el código de evento de clic arriba como sigue, resolvería su problema con la entrada faltante y lo haría funcionar como se esperaba, si hace clic en los botones de inicio y parada varias veces:

private void button2_Click(object sender, EventArgs e) { //STOP BUTTON timer1.Stop(); timer1.Enabled = false; >>>>_entries.CompleteAdding(); >>>>_entries = new BlockingCollection<int>(); }