example ejemplo code c# multithreading queue

code - threadpool c# ejemplo



¿Hay una mejor manera de esperar para los hilos en cola? (8)

¿Qué tal un Fork y Join que usa solo Monitor ;-p

Forker p = new Forker(); foreach (var obj in collection) { var tmp = obj; p.Fork(delegate { DoSomeWork(tmp); }); } p.Join();

El código completo se muestra en esta respuesta anterior .

O para una cola de productor / consumidor de tamaño limitado (seguro para subprocesos, etc.), here .

¿Hay una mejor manera de esperar los hilos en cola antes de ejecutar otro proceso?

Actualmente estoy haciendo:

this.workerLocker = new object(); // Global variable this.RunningWorkers = arrayStrings.Length; // Global variable // Initiate process foreach (string someString in arrayStrings) { ThreadPool.QueueUserWorkItem(this.DoSomething, someString); Thread.Sleep(100); } // Waiting execution for all queued threads lock (this.workerLocker) // Global variable (object) { while (this.RunningWorkers > 0) { Monitor.Wait(this.workerLocker); } } // Do anything else Console.WriteLine("END");

// Method DoSomething() definition public void DoSomething(object data) { // Do a slow process... . . . lock (this.workerLocker) { this.RunningWorkers--; Monitor.Pulse(this.workerLocker); } }


.NET 4.0 tiene una nueva clase para eso, Barrier .

Aparte de eso, su método no es tan malo, y podría optimizarlo un poco pulsando solo si RunningWorkers es 0 después de la disminución. Eso se vería así:

this.workerLocker = new object(); // Global variable this.RunningWorkers = arrayStrings.Length; // Global variable foreach (string someString in arrayStrings) { ThreadPool.QueueUserWorkItem(this.DoSomething, someString); //Thread.Sleep(100); } // Waiting execution for all queued threads Monitor.Wait(this.workerLocker);

// Method DoSomething() definition public void DoSomething(object data) { // Do a slow process... // ... lock (this.workerLocker) { this.RunningWorkers--; if (this.RunningWorkers == 0) Monitor.Pulse(this.workerLocker); } }

Podría usar un EventWaithandle o AutoResetEvent pero todos están envueltos en la API de Win32. Dado que la clase Monitor es código administrado puro, preferiría un Monitor para esta situación.


Además de la barrera, señalado por Henk Holterman (BTW es un uso muy malo de la barrera, vea mi comentario a su respuesta), .NET 4.0 ofrece muchas otras opciones (para usarlas en .NET 3.5 es necesario descargar un DLL extra de Microsoft ). Blogué un post que los enumera todos , pero mi favorito es definitivamente Parallel.ForEach:

Parallel.ForEach<string>(arrayStrings, someString => { DoSomething(someString); });

Detrás de las escenas, Parallel.ForEach hace cola para el nuevo y mejorado grupo de subprocesos y espera hasta que todos los subprocesos estén listos.


Es probable que desee echar un vistazo a AutoResetEvent y ManualResetEvent.

Estos están diseñados exactamente para esta situación (esperando que termine un subproceso de ThreadPool, antes de hacer "algo").

Harías algo como esto:

static void Main(string[] args) { List<ManualResetEvent> resetEvents = new List<ManualResetEvent>(); foreach (var x in Enumerable.Range(1, WORKER_COUNT)) { ManualResetEvent resetEvent = new ManualResetEvent(); ThreadPool.QueueUserWorkItem(DoSomething, resetEvent); resetEvents.Add(resetEvent); } // wait for all ManualResetEvents WaitHandle.WaitAll(resetEvents.ToArray()); // You probably want to use an array instead of a List, a list was just easier for the example :-) } public static void DoSomething(object data) { ManualResetEvent resetEvent = data as ManualResetEvent; // Do something resetEvent.Set(); }

Edición: Olvidó mencionar que puede esperar un solo hilo, cualquier hilo y así también. También dependiendo de su situación, AutoResetEvent puede simplificar un poco las cosas, ya que (como su nombre lo indica) puede señalar eventos automáticamente :-)


Me gusta mucho el patrón de inicio-finalización-asíncrono cuando tengo que esperar a que finalicen las tareas.

Te aconsejaría que envuelvas el BeginEnd en una clase de trabajadores:

public class StringWorker { private string m_someString; private IAsyncResult m_result; private Action DoSomethingDelegate; public StringWorker(string someString) { DoSomethingDelegate = DoSomething; } private void DoSomething() { throw new NotImplementedException(); } public IAsyncResult BeginDoSomething() { if (m_result != null) { throw new InvalidOperationException(); } m_result = DoSomethingDelegate.BeginInvoke(null, null); return m_result; } public void EndDoSomething() { DoSomethingDelegate.EndInvoke(m_result); } }

Para hacer su inicio y trabajo use este fragmento de código:

List<StringWorker> workers = new List<StringWorker>(); foreach (var someString in arrayStrings) { StringWorker worker = new StringWorker(someString); worker.BeginDoSomething(); workers.Add(worker); } foreach (var worker in workers) { worker.EndDoSomething(); } Console.WriteLine("END");

Y eso es.

Nota: Si desea obtener un resultado de nuevo desde el BeginEnd, cambie la "Acción" a Func y cambie el EndDoSomething para devolver un tipo.

public class StringWorker { private string m_someString; private IAsyncResult m_result; private Func<string> DoSomethingDelegate; public StringWorker(string someString) { DoSomethingDelegate = DoSomething; } private string DoSomething() { throw new NotImplementedException(); } public IAsyncResult BeginDoSomething() { if (m_result != null) { throw new InvalidOperationException(); } m_result = DoSomethingDelegate.BeginInvoke(null, null); return m_result; } public string EndDoSomething() { return DoSomethingDelegate.EndInvoke(m_result); } }


No estoy seguro de que realmente haya, recientemente he hecho algo similar para escanear cada IP de una subred para aceptar un puerto en particular.

Un par de cosas que puedo sugerir que pueden mejorar el rendimiento:

  • Utilice el método SetMaxThreads de ThreadPool para ajustar el rendimiento (es decir, el balanceo tiene muchos subprocesos ejecutándose a la vez, contra el tiempo de bloqueo en un número tan grande de subprocesos).

  • No duerma al configurar todos los elementos de trabajo, no hay necesidad real (de lo que estoy al tanto de inmediato. Sin embargo, duerma dentro del método DoSomething, tal vez solo por un milisegundo, para permitir que otros subprocesos salten allí si es necesario. .

Estoy seguro de que podrías implementar un método más personalizado, pero dudo que sea más eficiente que usar ThreadPool.

PD: No tengo una idea clara de la razón para usar el monitor, ¿ya que está bloqueando? Tenga en cuenta que la pregunta solo se hace porque no he usado la clase Monitor anteriormente, no porque realmente esté dudando de su uso.


Sí hay.

Enfoque sugerido

1) un contador y un mango de espera

int ActiveCount = 1; // 1 (!) is important EventWaitHandle ewhAllDone = new EventWaitHandle(false, ResetMode.Manual);

2) añadiendo bucle

foreach (string someString in arrayStrings) { Interlocked.Increment(ref ActiveCount); ThreadPool.QueueUserWorkItem(this.DoSomething, someString); // Thread.Sleep(100); // you really need this sleep ? } PostActionCheck(); ewhAllDone.Wait();

3) DoSomething debería verse como

{ try { // some long executing code } finally { // .... PostActionCheck(); } }

4) donde PostActionCheck es

void PostActionCheck() { if (Interlocked.Decrement(ref ActiveCount) == 0) ewhAllDone.Set(); }

Idea

ActiveCount se inicializa con 1 , y luego se incrementa n veces.

PostActionCheck se llama n + 1 veces. El último activará el evento.

El beneficio de esta solución es que utiliza un solo objeto del kernel (que es un evento) y 2 * n + 1 llamadas de API ligeras. (¿Puede haber menos?)

PD

Escribí el código aquí, podría haber escrito mal algunos nombres de clase.