tpl - tasks c# example
Foreach paralelo con lambda asincrónico (3)
Creé un método de extensión para esto que hace uso de SemaphoreSlim y también permite establecer el máximo grado de paralelismo
/// <summary>
/// Concurrently Executes async actions for each item of <see cref="IEnumerable<typeparamref name="T"/>
/// </summary>
/// <typeparam name="T">Type of IEnumerable</typeparam>
/// <param name="enumerable">instance of <see cref="IEnumerable<typeparamref name="T"/>"/></param>
/// <param name="action">an async <see cref="Action" /> to execute</param>
/// <param name="maxDegreeOfParallelism">Optional, An integer that represents the maximum degree of parallelism,
/// Must be grater than 0</param>
/// <returns>A Task representing an async operation</returns>
/// <exception cref="ArgumentOutOfRangeException">If the maxActionsToRunInParallel is less than 1</exception>
public static async Task ForEachAsyncConcurrent<T>(
this IEnumerable<T> enumerable,
Func<T, Task> action,
int? maxDegreeOfParallelism = null)
{
if (maxDegreeOfParallelism.HasValue)
{
using (var semaphoreSlim = new SemaphoreSlim(
maxDegreeOfParallelism.Value, maxDegreeOfParallelism.Value))
{
var tasksWithThrottler = new List<Task>();
foreach (var item in enumerable)
{
// Increment the number of currently running tasks and wait if they are more than limit.
await semaphoreSlim.WaitAsync();
tasksWithThrottler.Add(Task.Run(async () =>
{
await action(item);
// action is completed, so decrement the number of currently running tasks
semaphoreSlim.Release();
}));
}
// Wait for all tasks to complete.
await Task.WhenAll(tasksWithThrottler.ToArray());
}
}
else
{
await Task.WhenAll(enumerable.Select(item => action(item)));
}
}
Uso de la muestra:
await enumerable.ForEachAsyncConcurrent(
async item =>
{
await SomeAsyncMethod(item);
},
5);
Me gustaría manejar una colección en paralelo, pero estoy teniendo problemas para implementarla y, por lo tanto, espero algo de ayuda.
El problema surge si quiero llamar a un método marcado como sincronización en C #, dentro de la lambda del bucle paralelo. Por ejemplo:
var bag = new ConcurrentBag<object>();
Parallel.ForEach(myCollection, async item =>
{
// some pre stuff
var response = await GetData(item);
bag.Add(response);
// some post stuff
}
var count = bag.Count;
El problema ocurre cuando el recuento es 0, porque todos los subprocesos creados son efectivamente solo subprocesos de fondo y la llamada Parallel.ForEach
no espera a completarse. Si elimino la palabra clave async, el método se ve así:
var bag = new ConcurrentBag<object>();
Parallel.ForEach(myCollection, item =>
{
// some pre stuff
var responseTask = await GetData(item);
responseTask.Wait();
var response = responseTask.Result;
bag.Add(response);
// some post stuff
}
var count = bag.Count;
Funciona, pero desactiva por completo la astucia de espera y tengo que hacer un manejo de excepción manual .. (Se quitó por brevedad).
¿Cómo puedo implementar un bucle Parallel.ForEach
, que utiliza la palabra clave await dentro de la lambda? ¿Es posible?
El prototipo del método Parallel.ForEach tiene un parámetro Action<T>
como parámetro, pero quiero que espere mi lambda asíncrona.
Puede usar el método de extensión ParallelForEachAsync
del paquete AsyncEnumerator NuGet :
using System.Collections.Async;
var bag = new ConcurrentBag<object>();
await myCollection.ParallelForEachAsync(async item =>
{
// some pre stuff
var response = await GetData(item);
bag.Add(response);
// some post stuff
}, maxDegreeOfParallelism: 10);
var count = bag.Count;
Si solo quieres el paralelismo simple, puedes hacer esto:
var bag = new ConcurrentBag<object>();
var tasks = myCollection.Select(async item =>
{
// some pre stuff
var response = await GetData(item);
bag.Add(response);
// some post stuff
});
await Task.WhenAll(tasks);
var count = bag.Count;
Si necesita algo más complejo, consulte la publicación ForEachAsync
Stephen Toub .