c# - puede - Estrangulamiento de tareas asíncronas
programación asíncrona con c# pdf (3)
Digamos que tiene 1000 URL, y solo quiere tener 50 solicitudes abiertas a la vez; pero tan pronto como se completa una solicitud, se abre una conexión a la siguiente URL en la lista. De esta forma, siempre hay exactamente 50 conexiones abiertas a la vez, hasta que se agote la lista de URL.
La siguiente solución simple ha surgido muchas veces aquí en SO. No usa código de bloqueo y no crea hilos explícitamente, por lo que se escala muy bien:
const int MAX_DOWNLOADS = 50;
static async Task DownloadAsync(string[] urls)
{
using (var semaphore = new SemaphoreSlim(MAX_DOWNLOADS))
using (var httpClient = new HttpClient())
{
var tasks = urls.Select(async url =>
{
await semaphore.WaitAsync();
try
{
var data = await httpClient.GetStringAsync(url);
Console.WriteLine(data);
}
finally
{
semaphore.Release();
}
});
await Task.WhenAll(tasks);
}
}
El hecho es que el procesamiento de los datos descargados debe hacerse en una tubería diferente , con un nivel de paralelismo diferente , especialmente si se trata de un procesamiento vinculado a la CPU.
Por ejemplo, probablemente desee tener 4 subprocesos haciendo simultáneamente el procesamiento de datos (la cantidad de núcleos de CPU) y hasta 50 solicitudes pendientes para más datos (que no usan subprocesos en absoluto). AFAICT, esto no es lo que su código está haciendo actualmente.
Ahí es donde TPL Dataflow o Rx pueden ser útiles como una solución preferida. Sin embargo, es ciertamente posible implementar algo como esto con TPL simple. Tenga en cuenta que el único código de bloqueo aquí es el que está procesando los datos dentro de Task.Run
:
const int MAX_DOWNLOADS = 50;
const int MAX_PROCESSORS = 4;
// process data
class Processing
{
SemaphoreSlim _semaphore = new SemaphoreSlim(MAX_PROCESSORS);
HashSet<Task> _pending = new HashSet<Task>();
object _lock = new Object();
async Task ProcessAsync(string data)
{
await _semaphore.WaitAsync();
try
{
await Task.Run(() =>
{
// simuate work
Thread.Sleep(1000);
Console.WriteLine(data);
});
}
finally
{
_semaphore.Release();
}
}
public async void QueueItemAsync(string data)
{
var task = ProcessAsync(data);
lock (_lock)
_pending.Add(task);
try
{
await task;
}
catch
{
if (!task.IsCanceled && !task.IsFaulted)
throw; // not the task''s exception, rethrow
// don''t remove faulted/cancelled tasks from the list
return;
}
// remove successfully completed tasks from the list
lock (_lock)
_pending.Remove(task);
}
public async Task WaitForCompleteAsync()
{
Task[] tasks;
lock (_lock)
tasks = _pending.ToArray();
await Task.WhenAll(tasks);
}
}
// download data
static async Task DownloadAsync(string[] urls)
{
var processing = new Processing();
using (var semaphore = new SemaphoreSlim(MAX_DOWNLOADS))
using (var httpClient = new HttpClient())
{
var tasks = urls.Select(async (url) =>
{
await semaphore.WaitAsync();
try
{
var data = await httpClient.GetStringAsync(url);
// put the result on the processing pipeline
processing.QueueItemAsync(data);
}
finally
{
semaphore.Release();
}
});
await Task.WhenAll(tasks.ToArray());
await processing.WaitForCompleteAsync();
}
}
Me gustaría ejecutar un montón de tareas asíncronas, con un límite de cuántas tareas pueden estar pendientes de finalizar en un momento dado.
Digamos que tiene 1000 URL, y solo quiere tener 50 solicitudes abiertas a la vez; pero tan pronto como se completa una solicitud, se abre una conexión a la siguiente URL en la lista. De esta forma, siempre hay exactamente 50 conexiones abiertas a la vez, hasta que se agote la lista de URL.
También quiero utilizar un número dado de hilos si es posible.
Se me ocurrió un método de extensión, ThrottleTasksAsync
, que hace lo que quiero. ¿Ya hay una solución más simple? Yo asumiría que este es un escenario común.
Uso:
class Program
{
static void Main(string[] args)
{
Enumerable.Range(1, 10).ThrottleTasksAsync(5, 2, async i => { Console.WriteLine(i); return i; }).Wait();
Console.WriteLine("Press a key to exit...");
Console.ReadKey(true);
}
}
Aquí está el código:
static class IEnumerableExtensions
{
public static async Task<Result_T[]> ThrottleTasksAsync<Enumerable_T, Result_T>(this IEnumerable<Enumerable_T> enumerable, int maxConcurrentTasks, int maxDegreeOfParallelism, Func<Enumerable_T, Task<Result_T>> taskToRun)
{
var blockingQueue = new BlockingCollection<Enumerable_T>(new ConcurrentBag<Enumerable_T>());
var semaphore = new SemaphoreSlim(maxConcurrentTasks);
// Run the throttler on a separate thread.
var t = Task.Run(() =>
{
foreach (var item in enumerable)
{
// Wait for the semaphore
semaphore.Wait();
blockingQueue.Add(item);
}
blockingQueue.CompleteAdding();
});
var taskList = new List<Task<Result_T>>();
Parallel.ForEach(IterateUntilTrue(() => blockingQueue.IsCompleted), new ParallelOptions { MaxDegreeOfParallelism = maxDegreeOfParallelism },
_ =>
{
Enumerable_T item;
if (blockingQueue.TryTake(out item, 100))
{
taskList.Add(
// Run the task
taskToRun(item)
.ContinueWith(tsk =>
{
// For effect
Thread.Sleep(2000);
// Release the semaphore
semaphore.Release();
return tsk.Result;
}
)
);
}
});
// Await all the tasks.
return await Task.WhenAll(taskList);
}
static IEnumerable<bool> IterateUntilTrue(Func<bool> condition)
{
while (!condition()) yield return true;
}
}
El método utiliza BlockingCollection
y SemaphoreSlim
para que funcione. El regulador se ejecuta en un hilo y todas las tareas asíncronas se ejecutan en el otro hilo. Para lograr el paralelismo, agregué un parámetro maxDegreeOfParallelism que se pasó a un bucle Parallel.ForEach
reasignado como un ciclo while.
La versión anterior era:
foreach (var master = ...)
{
var details = ...;
Parallel.ForEach(details, detail => {
// Process each detail record here
}, new ParallelOptions { MaxDegreeOfParallelism = 15 });
// Perform the final batch updates here
}
Pero, el grupo de subprocesos se agota rápidamente, y no puede hacer async
/ await
.
Bonificación: para evitar el problema en BlockingCollection
donde se lanza una excepción en Take()
cuando se llama a CompleteAdding()
, estoy usando la sobrecarga de TryTake
con un tiempo de espera TryTake
. Si no utilizo el tiempo de espera en TryTake
, se evitaría el uso de BlockingCollection
porque TryTake
no se bloqueará. ¿Hay una mejor manera? Idealmente, habría un método TakeAsync
.
Como se sugirió, use TPL Dataflow.
Un TransformBlock<TInput, TOutput>
puede ser lo que está buscando.
Define un MaxDegreeOfParallelism
para limitar cuántas cadenas se pueden transformar (es decir, cuántas URL se pueden descargar) en paralelo. A continuación, publica las URL en el bloque y, cuando finaliza, le dice al bloque que ha terminado de agregar elementos y obtiene las respuestas.
var downloader = new TransformBlock<string, HttpResponse>(
url => Download(url),
new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 50 }
);
var buffer = new BufferBlock<HttpResponse>();
downloader.LinkTo(buffer);
foreach(var url in urls)
downloader.Post(url);
//or await downloader.SendAsync(url);
downloader.Complete();
await downloader.Completion;
IList<HttpResponse> responses;
if (buffer.TryReceiveAll(out responses))
{
//process responses
}
Nota: TransformBlock
almacena su entrada y salida. ¿Por qué, entonces, tenemos que vincularlo a un BufferBlock
?
Debido a que TransformBlock
no se completará hasta que se hayan consumido todos los elementos ( HttpResponse
), y await downloader.Completion
. La terminación se suspenderá. En cambio, dejamos que el programa de downloader
reenvíe toda su salida a un bloque de almacenamiento intermedio dedicado, luego esperamos a que finalice el downloader
e inspeccionamos el bloque de almacenamiento intermedio.
Según lo solicitado, aquí está el código con el que terminé.
El trabajo se configura en una configuración de detalle maestra y cada maestro se procesa como un lote. Cada unidad de trabajo está en cola de esta manera:
var success = true;
// Start processing all the master records.
Master master;
while (null != (master = await StoredProcedures.ClaimRecordsAsync(...)))
{
await masterBuffer.SendAsync(master);
}
// Finished sending master records
masterBuffer.Complete();
// Now, wait for all the batches to complete.
await batchAction.Completion;
return success;
Los maestros se almacenan en búfer uno a la vez para ahorrar trabajo para otros procesos externos. Los detalles de cada maestro se envían para trabajar a través de masterTransform
TransformManyBlock
. Un BatchedJoinBlock
también se crea para recopilar los detalles en un lote.
El trabajo real se realiza en detailTransform
TransformBlock
, de forma asincrónica, 150 a la vez. BoundedCapacity
se establece en 300 para garantizar que demasiados Masters no se almacenan en el búfer al principio de la cadena, al tiempo que deja espacio para que se pongan en cola suficientes registros de detalles para permitir que se procesen 150 registros a la vez. El bloque emite un object
a sus destinos, porque se filtra a través de los enlaces dependiendo de si se trata de un Detail
o una Exception
.
batchAction
ActionBlock
recopila la salida de todos los lotes y realiza actualizaciones masivas de la base de datos, registro de errores, etc. para cada lote.
Habrá varios BatchedJoinBlock
s, uno para cada maestro. Como cada ISourceBlock
se ISourceBlock
secuencialmente y cada lote solo acepta la cantidad de registros de detalles asociados con un maestro, los lotes se procesarán en orden. Cada bloque solo emite un grupo y se desvincula al finalizar. Solo el último bloque de lote propaga su finalización al ActionBlock
final.
La red de flujo de datos:
// The dataflow network
BufferBlock<Master> masterBuffer = null;
TransformManyBlock<Master, Detail> masterTransform = null;
TransformBlock<Detail, object> detailTransform = null;
ActionBlock<Tuple<IList<object>, IList<object>>> batchAction = null;
// Buffer master records to enable efficient throttling.
masterBuffer = new BufferBlock<Master>(new DataflowBlockOptions { BoundedCapacity = 1 });
// Sequentially transform master records into a stream of detail records.
masterTransform = new TransformManyBlock<Master, Detail>(async masterRecord =>
{
var records = await StoredProcedures.GetObjectsAsync(masterRecord);
// Filter the master records based on some criteria here
var filteredRecords = records;
// Only propagate completion to the last batch
var propagateCompletion = masterBuffer.Completion.IsCompleted && masterTransform.InputCount == 0;
// Create a batch join block to encapsulate the results of the master record.
var batchjoinblock = new BatchedJoinBlock<object, object>(records.Count(), new GroupingDataflowBlockOptions { MaxNumberOfGroups = 1 });
// Add the batch block to the detail transform pipeline''s link queue, and link the batch block to the the batch action block.
var detailLink1 = detailTransform.LinkTo(batchjoinblock.Target1, detailResult => detailResult is Detail);
var detailLink2 = detailTransform.LinkTo(batchjoinblock.Target2, detailResult => detailResult is Exception);
var batchLink = batchjoinblock.LinkTo(batchAction, new DataflowLinkOptions { PropagateCompletion = propagateCompletion });
// Unlink batchjoinblock upon completion.
// (the returned task does not need to be awaited, despite the warning.)
batchjoinblock.Completion.ContinueWith(task =>
{
detailLink1.Dispose();
detailLink2.Dispose();
batchLink.Dispose();
});
return filteredRecords;
}, new ExecutionDataflowBlockOptions { BoundedCapacity = 1 });
// Process each detail record asynchronously, 150 at a time.
detailTransform = new TransformBlock<Detail, object>(async detail => {
try
{
// Perform the action for each detail here asynchronously
await DoSomethingAsync();
return detail;
}
catch (Exception e)
{
success = false;
return e;
}
}, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 150, BoundedCapacity = 300 });
// Perform the proper action for each batch
batchAction = new ActionBlock<Tuple<IList<object>, IList<object>>>(async batch =>
{
var details = batch.Item1.Cast<Detail>();
var errors = batch.Item2.Cast<Exception>();
// Do something with the batch here
}, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 4 });
masterBuffer.LinkTo(masterTransform, new DataflowLinkOptions { PropagateCompletion = true });
masterTransform.LinkTo(detailTransform, new DataflowLinkOptions { PropagateCompletion = true });