c# - quitar - La mejor manera de limitar el número de Tareas activas que se ejecutan a través de la Biblioteca de tareas paralelas
desagrupar barra de tareas windows 10 (6)
Acabo de dar una answer que es muy aplicable a esta pregunta.
Básicamente, la clase Tarea TPL se hace para programar el trabajo vinculado a la CPU. No está hecho para bloquear el trabajo.
Está trabajando con un recurso que no es CPU: esperando las respuestas del servicio. Esto significa que el TPL manipulará mal su recurso porque asume la vinculación de la CPU en cierto grado.
Administre los recursos usted mismo: Comience un número fijo de subprocesos o tareas de Ejecución larga (que es básicamente el mismo). Decida el número de hilos empíricamente.
No puede poner sistemas no confiables en producción. Por ese motivo, recomiendo el n. ° 1 pero acelerado . No cree tantos hilos como elementos de trabajo. Cree tantos hilos que sean necesarios para saturar el servicio remoto. Escriba usted mismo una función de ayuda que engendra N hilos y los utiliza para procesar elementos de trabajo M. Obtiene resultados totalmente predecibles y confiables de esa manera.
Considere una cola con muchos trabajos que necesitan procesamiento. La limitación de la cola solo puede obtener 1 trabajo a la vez y no hay forma de saber cuántos trabajos hay. Los trabajos tardan 10 segundos en completarse e implican una gran cantidad de espera de las respuestas de los servicios web, por lo que no están vinculados a la CPU.
Si utilizo algo como esto
while (true)
{
var job = Queue.PopJob();
if (job == null)
break;
Task.Factory.StartNew(job.Execute);
}
Luego, abrirá furiosamente los trabajos de la cola mucho más rápido de lo que puede completarlos, se quedará sin memoria y caerá de culo. >. <
No puedo usar (no creo) ParallelOptions.MaxDegreeOfParallelism porque no puedo usar Parallel.Invoke o Parallel.ForEach
3 alternativas que he encontrado
Reemplace Task.Factory.StartNew con
Task task = new Task(job.Execute,TaskCreationOptions.LongRunning) task.Start();
Lo cual parece resolver el problema de alguna manera, pero no estoy claro exactamente qué está haciendo y si este es el mejor método.
Cree un planificador de tareas personalizado que limite el grado de concurrencia
Use algo como BlockingCollection para agregar trabajos a la colección cuando se inicie y elimine cuando termine para limitar el número que se puede ejecutar.
Con el # 1 tengo que confiar en que la decisión correcta se toma automáticamente, # 2 / # 3 Tengo que calcular el número máximo de tareas que puedo ejecutar yo mismo.
¿Lo he entendido bien? ¿Cuál es la mejor manera, o hay otra manera?
EDITAR - Esto es lo que he encontrado a partir de las siguientes respuestas, patrón productor-consumidor.
Además, el objetivo del rendimiento global no era decalar trabajos más rápido de lo que se podía procesar y no tener múltiples hilos de sondeo (no se muestra aquí, pero eso es una operación no bloqueante y generará enormes costos de transacción si se sondea a alta frecuencia desde múltiples lugares) .
// BlockingCollection<>(1) will block if try to add more than 1 job to queue (no
// point in being greedy!), or is empty on take.
var BlockingCollection<Job> jobs = new BlockingCollection<Job>(1);
// Setup a number of consumer threads.
// Determine MAX_CONSUMER_THREADS empirically, if 4 core CPU and 50% of time
// in job is blocked waiting IO then likely be 8.
for(int numConsumers = 0; numConsumers < MAX_CONSUMER_THREADS; numConsumers++)
{
Thread consumer = new Thread(() =>
{
while (!jobs.IsCompleted)
{
var job = jobs.Take();
job.Execute();
}
}
consumer.Start();
}
// Producer to take items of queue and put in blocking collection ready for processing
while (true)
{
var job = Queue.PopJob();
if (job != null)
jobs.Add(job);
else
{
jobs.CompletedAdding()
// May need to wait for running jobs to finish
break;
}
}
El problema aquí no parece ser que haya demasiadas Task
ejecución , sino demasiadas Task
programadas . Su código intentará programar tantas Task
como sea posible, sin importar qué tan rápido se ejecuten. Y si tiene demasiados trabajos, esto significa que obtendrá OOM.
Debido a esto, ninguna de las soluciones propuestas en realidad resolverá su problema. Si parece que la simple especificación de LongRunning
resuelve su problema, lo más probable es que la creación de un nuevo Thread
(que es lo que hace LongRunning
) lleva algo de tiempo, lo que LongRunning
manera efectiva la obtención de nuevos trabajos. Por lo tanto, esta solución solo funciona por accidente, y lo más probable es que conduzca a otros problemas más adelante.
En cuanto a la solución, estoy de acuerdo con usr: la solución más simple que funciona razonablemente bien es crear un número fijo de tareas LongRunning
y tener un ciclo que llame a Queue.PopJob()
(protegido por un lock
si ese método no es seguro para subprocesos) ) y Execute()
s el trabajo.
ACTUALIZACIÓN: Después de pensar un poco más, me di cuenta de que el siguiente intento probablemente se comportará terriblemente. Úselo solo si está realmente seguro de que funcionará bien para usted.
Pero el TPL intenta descubrir el mejor grado de paralelismo, incluso para Task
vinculadas a IO. Por lo tanto, puede intentar usar eso para su ventaja. Long Task
s no funcionará aquí, porque desde el punto de vista de TPL, parece que no se realiza ningún trabajo y comenzará una nueva Task
una y otra vez. Lo que puede hacer en su lugar es comenzar una nueva Task
al final de cada Task
. De esta forma, TPL sabrá lo que está sucediendo y su algoritmo puede funcionar bien. Además, para que el TPL decida el grado de paralelismo, al comienzo de una Task
que está primero en su línea, comience otra línea de Task
.
Este algoritmo puede funcionar bien. Pero también es posible que el TPL tome una mala decisión con respecto al grado de paralelismo, en realidad no he intentado algo como esto.
En el código, se vería así:
void ProcessJobs(bool isFirst)
{
var job = Queue.PopJob(); // assumes PopJob() is thread-safe
if (job == null)
return;
if (isFirst)
Task.Factory.StartNew(() => ProcessJobs(true));
job.Execute();
Task.Factory.StartNew(() => ProcessJob(false));
}
Y comienza con
Task.Factory.StartNew(() => ProcessJobs(true));
Las posibles divisiones de flujo y las continuas causadas por la await
, más adelante en su código o en una biblioteca de terceros, no funcionarán bien con las tareas de larga ejecución (o subprocesos), por lo que no se moleste en utilizar tareas de ejecución prolongada. En el mundo async/await
, son inútiles. Más detalles here .
Puede llamar a ThreadPool.SetMaxThreads
pero antes de realizar esta llamada, asegúrese de establecer el número mínimo de hilos con ThreadPool.SetMinThreads
, usando valores inferiores o iguales a los máximos. Y, por cierto, la documentación de MSDN es incorrecta. PUEDE ir por debajo de la cantidad de núcleos en su máquina con esas llamadas a métodos, al menos en .NET 4.5 y 4.6 donde utilicé esta técnica para reducir la potencia de procesamiento de un servicio de memoria de 32 bits limitado.
Sin embargo, si no desea restringir toda la aplicación, sino solo la parte de procesamiento, un programador de tareas personalizado hará el trabajo. Hace mucho tiempo, MS lanzó samples con varios programadores personalizados de tareas, incluido un LimitedConcurrencyLevelTaskScheduler
. Task.Factory.StartNew
tarea principal de procesamiento manualmente con Task.Factory.StartNew
, proporcionando el programador de tareas personalizado, y cada otra tarea generada por él lo usará, incluyendo async/await
Task.Yield
e incluso Task.Yield
, utilizado para lograr asincronías desde el principio en un método async
.
Pero para su caso particular, ambas soluciones no dejarán de agotar su cola de trabajos antes de completarlos. Eso puede no ser deseable, dependiendo de la implementación y el propósito de esa cola suya. Son más como "disparar un montón de tareas y dejar que el planificador encuentre el tiempo para ejecutarlas" tipo de soluciones. Entonces, quizás algo un poco más apropiado aquí podría ser un método más estricto de control sobre la ejecución de los trabajos a través de semaphores
. El código se vería así:
semaphore = new SemaphoreSlim(max_concurrent_jobs);
while(...){
job = Queue.PopJob();
semaphore.Wait();
ProcessJobAsync(job);
}
async Task ProcessJobAsync(Job job){
await Task.Yield();
... Process the job here...
semaphore.Release();
}
Hay más de una manera de despellejar a un gato. Usa lo que creas apropiado.
Microsoft tiene una biblioteca genial llamada DataFlow que hace exactamente lo que quiere (y mucho más). Detalles here .
Debe usar la clase ActionBlock y establecer el parámetro MaxDegreeOfParallelism del objeto ExecutionDataflowBlockOptions. ActionBlock funciona muy bien con async / await, por lo que incluso cuando se esperan sus llamadas externas, no se comenzarán a procesar nuevos trabajos.
ExecutionDataflowBlockOptions actionBlockOptions = new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = 10
};
this.sendToAzureActionBlock = new ActionBlock<List<Item>>(async items => await ProcessItems(items),
actionBlockOptions);
...
this.sendToAzureActionBlock.Post(itemsToProcess)
Utilizo un mecanismo de cola de mensajes / buzón para lograr esto. Es similar al modelo de actor. Tengo una clase que tiene un MailBox. Yo llamo a esta clase mi "trabajador". Puede recibir mensajes. Esos mensajes están en cola y, esencialmente, definen las tareas que quiero que el trabajador ejecute. El trabajador usará Task.Wait () para que su tarea finalice antes de quitar el siguiente mensaje y comenzar la siguiente tarea.
Al limitar el número de trabajadores que tengo, puedo limitar el número de subprocesos / tareas simultáneas que se están ejecutando.
Esto se describe, con el código fuente, en la publicación de mi blog en un motor de cálculo distribuido. Si miras el código para IActor y WorkerNode, espero que tenga sentido.
TaskCreationOptions.LongRunning
es útil para bloquear tareas y su uso aquí es legítimo. Lo que hace es sugerirle al programador que dedique un hilo a la tarea. El programador intenta mantener el número de subprocesos en el mismo nivel que la cantidad de núcleos de CPU para evitar un cambio de contexto excesivo.
Está bien descrito en Threading en C # por Joseph Albahari