threading thread net for book c# multithreading waithandle

c# - thread - ¿Solución para el límite de manejo WaitHandle.WaitAll 64?



for thread c# (8)

Mi aplicación genera una gran cantidad de diferentes subprocesos de trabajo pequeños a través de ThreadPool.QueueUserWorkItem que sigo a través de múltiples instancias de ManualResetEvent . Uso el método WaitHandle.WaitAll para bloquear el cierre de mi aplicación hasta que estos hilos se hayan completado.

Nunca he tenido ningún problema antes, sin embargo, como mi aplicación está recibiendo más carga, es decir, se están creando más subprocesos, ahora estoy empezando a obtener esta excepción:

WaitHandles must be less than or equal to 64 - missing documentation

¿Cuál es la mejor solución alternativa a esto?

Fragmento de código

List<AutoResetEvent> events = new List<AutoResetEvent>(); // multiple instances of... var evt = new AutoResetEvent(false); events.Add(evt); ThreadPool.QueueUserWorkItem(delegate { // do work evt.Set(); }); ... WaitHandle.WaitAll(events.ToArray());

Solución

int threadCount = 0; ManualResetEvent finished = new ManualResetEvent(false); ... Interlocked.Increment(ref threadCount); ThreadPool.QueueUserWorkItem(delegate { try { // do work } finally { if (Interlocked.Decrement(ref threadCount) == 0) { finished.Set(); } } }); ... finished.WaitOne();


Añadiendo a la respuesta de dtb cuando queremos tener devoluciones de llamadas.

using System; using System.Runtime.Remoting.Messaging; using System.Threading; class Program { static void Main(string[] args) { Main m = new Main(); m.TestMRE(); Console.ReadKey(); } } class Main { CalHandler handler = new CalHandler(); int numberofTasks =0; public void TestMRE() { for (int j = 0; j <= 3; j++) { Console.WriteLine("Outer Loop is :" + j.ToString()); ManualResetEvent signal = new ManualResetEvent(false); numberofTasks = 4; for (int i = 0; i <= 3; i++) { CalHandler.count caller = new CalHandler.count(handler.messageHandler); caller.BeginInvoke(i, new AsyncCallback(NumberCallback),signal); } signal.WaitOne(); } } private void NumberCallback(IAsyncResult result) { AsyncResult asyncResult = (AsyncResult)result; CalHandler.count caller = (CalHandler.count)asyncResult.AsyncDelegate; int num = caller.EndInvoke(asyncResult); Console.WriteLine("Number is :"+ num.ToString()); ManualResetEvent mre = (ManualResetEvent)asyncResult.AsyncState; if (Interlocked.Decrement(ref numberofTasks) == 0) { mre.Set(); } } } public class CalHandler { public delegate int count(int number); public int messageHandler ( int number ) { return number; } }


Añadiendo a la respuesta de dtb puedes envolver esto en una clase simple y agradable.

public class Countdown : IDisposable { private readonly ManualResetEvent done; private readonly int total; private long current; public Countdown(int total) { this.total = total; current = total; done = new ManualResetEvent(false); } public void Signal() { if (Interlocked.Decrement(ref current) == 0) { done.Set(); } } public void Wait() { done.WaitOne(); } public void Dispose() { ((IDisposable)done).Dispose(); } }


Comenzando con .NET 4.0, tiene dos opciones más (e IMO, más limpio) disponibles para usted.

El primero es usar la CountdownEvent . Evita la necesidad de tener que manejar el incremento y la disminución por su cuenta:

int tasks = <however many tasks you''re performing>; // Dispose when done. using (var e = new CountdownEvent(tasks)) { // Queue work. ThreadPool.QueueUserWorkItem(() => { // Do work ... // Signal when done. e.Signal(); }); // Wait till the countdown reaches zero. e.Wait(); }

Sin embargo, hay una solución aún más robusta, y es usar la clase Task , así:

// The source of your work items, create a sequence of Task instances. Task[] tasks = Enumerable.Range(0, 100).Select(i => // Create task here. Task.Factory.StartNew(() => { // Do work. } // No signalling, no anything. ).ToArray(); // Wait on all the tasks. Task.WaitAll(tasks);

El uso de la clase Task y la llamada a WaitAll es mucho más limpio, IMO, ya que está tejiendo menos primitivas de subprocesamiento en todo el código (aviso, no hay identificadores de espera); no tiene que configurar un contador, manejar incrementos / decrementos, simplemente configurar sus tareas y luego esperarlas. Esto permite que el código sea más expresivo en lo que se refiere a lo que se quiere hacer y no a las primitivas de cómo (al menos, en términos de gestionar la paralelización).

.NET 4.5 ofrece incluso más opciones, puede simplificar la generación de la secuencia de instancias de Task llamando al método de Run estático en la clase de Task :

// The source of your work items, create a sequence of Task instances. Task[] tasks = Enumerable.Range(0, 100).Select(i => // Create task here. Task.Run(() => { // Do work. }) // No signalling, no anything. ).ToArray(); // Wait on all the tasks. Tasks.WaitAll(tasks);

O bien, puede aprovechar la biblioteca TPL DataFlow (está en el espacio de nombres del System , por lo que es oficial, a pesar de que es una descarga de NuGet, como Entity Framework) y usar un ActionBlock<TInput> , así:

// Create the action block. Since there''s not a non-generic // version, make it object, and pass null to signal, or // make T the type that takes the input to the action // and pass that. var actionBlock = new ActionBlock<object>(o => { // Do work. }); // Post 100 times. foreach (int i in Enumerable.Range(0, 100)) actionBlock.Post(null); // Signal complete, this doesn''t actually stop // the block, but says that everything is done when the currently // posted items are completed. actionBlock.Complete(); // Wait for everything to complete, the Completion property // exposes a Task which can be waited on. actionBlock.Completion.Wait();

Tenga en cuenta que ActionBlock<TInput> procesa de manera predeterminada un elemento a la vez, por lo que si desea que procese varias acciones a la vez, debe establecer el número de elementos simultáneos que desea procesar en el constructor pasando un ExecutionDataflowBlockOptions instancia y configuración de la propiedad MaxDegreeOfParallelism :

var actionBlock = new ActionBlock<object>(o => { // Do work. }, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 4 });

Si su acción es realmente segura para subprocesos, puede establecer la propiedad DataFlowBlockOptions.Unbounded en DataFlowBlockOptions.Unbounded :

var actionBlock = new ActionBlock<object>(o => { // Do work. }, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = DataFlowBlockOptions.Unbounded });

El punto es que tienes un control preciso sobre cuán paralelas quieres que sean tus opciones.

Por supuesto, si tiene una secuencia de elementos que desea pasar a su ActionBlock<TInput> , entonces puede vincular una ISourceBlock<TOutput> para alimentar el ActionBlock<TInput> , así:

// The buffer block. var buffer = new BufferBlock<int>(); // Create the action block. Since there''s not a non-generic // version, make it object, and pass null to signal, or // make T the type that takes the input to the action // and pass that. var actionBlock = new ActionBlock<int>(o => { // Do work. }); // Link the action block to the buffer block. // NOTE: An IDisposable is returned here, you might want to dispose // of it, although not totally necessary if everything works, but // still, good housekeeping. using (link = buffer.LinkTo(actionBlock, // Want to propagate completion state to the action block. new DataflowLinkOptions { PropagateCompletion = true, }, // Can filter on items flowing through if you want. i => true) { // Post 100 times to the *buffer* foreach (int i in Enumerable.Range(0, 100)) buffer.Post(i); // Signal complete, this doesn''t actually stop // the block, but says that everything is done when the currently // posted items are completed. actionBlock.Complete(); // Wait for everything to complete, the Completion property // exposes a Task which can be waited on. actionBlock.Completion.Wait(); }

Dependiendo de lo que necesite hacer, la biblioteca de TPL Dataflow se convierte en una opción mucho más atractiva, ya que maneja la concurrencia en todas las tareas vinculadas entre sí, y le permite ser muy específico sobre qué tan paralela quiere que sea cada pieza. , manteniendo la separación adecuada de las preocupaciones para cada bloque.


Cree una variable que realice un seguimiento del número de tareas en ejecución:

int numberOfTasks = 100;

Crea una señal:

ManualResetEvent signal = new ManualResetEvent(false);

Disminuya el número de tareas cada vez que finalice una tarea:

if (Interlocked.Decrement(ref numberOftasks) == 0) {

Si no queda tarea, configure la señal:

signal.Set(); }

Mientras tanto, en otro lugar, espera a que se establezca la señal:

signal.WaitOne();


Lo resolví simplemente paginando el número de eventos para esperar sin perder mucho rendimiento, y está funcionando perfectamente en el entorno de producción. Sigue el código:

var events = new List<ManualResetEvent>(); // code omited var newEvent = new ManualResetEvent(false); events.Add(newEvent); ThreadPool.QueueUserWorkItem(c => { //thread code newEvent.Set(); }); // code omited var wait = true; while (wait) { WaitHandle.WaitAll(events.Take(60).ToArray()); events.RemoveRange(0, events.Count > 59 ? 60 : events.Count); wait = events.Any(); }


Su solución alternativa no es correcta. La razón es que Set y WaitOne podrían competir si el último elemento de trabajo provoca que threadCount llegue a cero antes de que el hilo de cola haya tenido la oportunidad de poner en cola todos los elementos de trabajo. La solución es simple. Trate su hilo de espera como si fuera un elemento de trabajo en sí mismo. Inicialice threadCount en 1 y realice un decremento y una señal cuando la cola esté completa.

int threadCount = 1; ManualResetEvent finished = new ManualResetEvent(false); ... Interlocked.Increment(ref threadCount); ThreadPool.QueueUserWorkItem(delegate { try { // do work } finally { if (Interlocked.Decrement(ref threadCount) == 0) { finished.Set(); } } }); ... if (Interlocked.Decrement(ref threadCount) == 0) { finished.Set(); } finished.WaitOne();

Como preferencia personal, me gusta usar la clase CountdownEvent para hacer el conteo por mí.

var finished = new CountdownEvent(1); ... finished.AddCount(); ThreadPool.QueueUserWorkItem(delegate { try { // do work } finally { finished.Signal(); } }); ... finished.Signal(); finished.Wait();


Windows XP SP3 admite un máximo de dos WaitHandles. Para casos más de 2 aplicaciones WaitHandles termina prematuramente.


protected void WaitAllExt(WaitHandle[] waitHandles) { //workaround for limitation of WaitHandle.WaitAll by <=64 wait handles const int waitAllArrayLimit = 64; var prevEndInd = -1; while (prevEndInd < waitHandles.Length - 1) { var stInd = prevEndInd + 1; var eInd = stInd + waitAllArrayLimit - 1; if (eInd > waitHandles.Length - 1) { eInd = waitHandles.Length - 1; } prevEndInd = eInd; //do wait var whSubarray = waitHandles.Skip(stInd).Take(eInd - stInd + 1).ToArray(); WaitHandle.WaitAll(whSubarray); } }