c# - example - ¿Cómo limitar la cantidad de operaciones concurrentes de E/S asincrónicas?
task run async c# (11)
// let''s say there is a list of 1000+ URLs
string[] urls = { "http://google.com", "http://yahoo.com", ... };
// now let''s send HTTP requests to each of these URLs in parallel
urls.AsParallel().ForAll(async (url) => {
var client = new HttpClient();
var html = await client.GetStringAsync(url);
});
Aquí está el problema, comienza más de 1000 solicitudes web simultáneas. ¿Existe alguna manera fácil de limitar la cantidad simultánea de estas solicitudes http asincrónicas? Para que no se descarguen más de 20 páginas web en un momento dado. ¿Cómo hacerlo de la manera más eficiente?
Aunque 1000 tareas se pueden poner en cola muy rápidamente, la biblioteca de Tareas paralelas solo puede manejar tareas simultáneas equivalentes a la cantidad de núcleos de CPU en la máquina. Eso significa que si tiene una máquina de cuatro núcleos, solo se ejecutarán 4 tareas en un momento determinado (a menos que baje el parámetro MaxDegreeOfParallelism).
Básicamente, va a querer crear una Acción o Tarea para cada URL a la que quiera acceder, colocarlas en una Lista y luego procesar esa lista, limitando el número que se puede procesar en paralelo.
La publicación de mi blog muestra cómo hacer esto con Tareas y con Acciones, y proporciona un ejemplo de proyecto que puede descargar y ejecutar para ver ambos en acción.
Con acciones
Si usa Actions, puede usar la función incorporada .Net Parallel.Invoke. Aquí lo limitamos a ejecutar como máximo 20 hilos en paralelo.
var listOfActions = new List<Action>();
foreach (var url in urls)
{
var localUrl = url;
// Note that we create the Task here, but do not start it.
listOfTasks.Add(new Task(() => CallUrl(localUrl)));
}
var options = new ParallelOptions {MaxDegreeOfParallelism = 20};
Parallel.Invoke(options, listOfActions.ToArray());
Con tareas
Con Tasks no hay función incorporada. Sin embargo, puede usar el que brindo en mi blog.
/// <summary>
/// Starts the given tasks and waits for them to complete. This will run, at most, the specified number of tasks in parallel.
/// <para>NOTE: If one of the given tasks has already been started, an exception will be thrown.</para>
/// </summary>
/// <param name="tasksToRun">The tasks to run.</param>
/// <param name="maxTasksToRunInParallel">The maximum number of tasks to run in parallel.</param>
/// <param name="cancellationToken">The cancellation token.</param>
public static async Task StartAndWaitAllThrottledAsync(IEnumerable<Task> tasksToRun, int maxTasksToRunInParallel, CancellationToken cancellationToken = new CancellationToken())
{
await StartAndWaitAllThrottledAsync(tasksToRun, maxTasksToRunInParallel, -1, cancellationToken);
}
/// <summary>
/// Starts the given tasks and waits for them to complete. This will run the specified number of tasks in parallel.
/// <para>NOTE: If a timeout is reached before the Task completes, another Task may be started, potentially running more than the specified maximum allowed.</para>
/// <para>NOTE: If one of the given tasks has already been started, an exception will be thrown.</para>
/// </summary>
/// <param name="tasksToRun">The tasks to run.</param>
/// <param name="maxTasksToRunInParallel">The maximum number of tasks to run in parallel.</param>
/// <param name="timeoutInMilliseconds">The maximum milliseconds we should allow the max tasks to run in parallel before allowing another task to start. Specify -1 to wait indefinitely.</param>
/// <param name="cancellationToken">The cancellation token.</param>
public static async Task StartAndWaitAllThrottledAsync(IEnumerable<Task> tasksToRun, int maxTasksToRunInParallel, int timeoutInMilliseconds, CancellationToken cancellationToken = new CancellationToken())
{
// Convert to a list of tasks so that we don''t enumerate over it multiple times needlessly.
var tasks = tasksToRun.ToList();
using (var throttler = new SemaphoreSlim(maxTasksToRunInParallel))
{
var postTaskTasks = new List<Task>();
// Have each task notify the throttler when it completes so that it decrements the number of tasks currently running.
tasks.ForEach(t => postTaskTasks.Add(t.ContinueWith(tsk => throttler.Release())));
// Start running each task.
foreach (var task in tasks)
{
// Increment the number of tasks currently running and wait if too many are running.
await throttler.WaitAsync(timeoutInMilliseconds, cancellationToken);
cancellationToken.ThrowIfCancellationRequested();
task.Start();
}
// Wait for all of the provided tasks to complete.
// We wait on the list of "post" tasks instead of the original tasks, otherwise there is a potential race condition where the throttler''s using block is exited before some Tasks have had their "post" action completed, which references the throttler, resulting in an exception due to accessing a disposed object.
await Task.WhenAll(postTaskTasks.ToArray());
}
}
Y luego, creando su lista de tareas y llamando a la función para que se ejecuten, digamos que un máximo de 20 simultáneas a la vez, puede hacer esto:
var listOfTasks = new List<Task>();
foreach (var url in urls)
{
var localUrl = url;
// Note that we create the Task here, but do not start it.
listOfTasks.Add(new Task(async () => await CallUrl(localUrl)));
}
await Tasks.StartAndWaitAllThrottledAsync(listOfTasks, 20);
Definitivamente puede hacer esto en las últimas versiones de async para .NET, usando .NET 4.5 Beta. La publicación anterior de ''usr'' apunta a un buen artículo escrito por Stephen Toub, pero la noticia menos anunciada es que el semáforo asíncrono en realidad llegó al lanzamiento Beta de .NET 4.5.
Si miras a nuestra querida clase SemaphoreSlim
(que deberías usar porque es más WaitAsync(...)
que el Semaphore
original), ahora cuenta con la serie de sobrecargas WaitAsync(...)
, con todos los argumentos esperados: intervalos de tiempo de espera, tokens de cancelación , todos tus amigos habituales de programación :)
Stephen también escribió una publicación más reciente en el blog sobre los nuevos productos .NET 4.5 que salieron con beta. Vea Novedades para el Paralelismo en .NET 4.5 Beta .
Por último, aquí hay un código de ejemplo sobre cómo usar SemaphoreSlim para la aceleración del método asíncrono:
public async Task MyOuterMethod()
{
// let''s say there is a list of 1000+ URLs
var urls = { "http://google.com", "http://yahoo.com", ... };
// now let''s send HTTP requests to each of these URLs in parallel
var allTasks = new List<Task>();
var throttler = new SemaphoreSlim(initialCount: 20);
foreach (var url in urls)
{
// do an async wait until we can schedule again
await throttler.WaitAsync();
// using Task.Run(...) to run the lambda in its own parallel
// flow on the threadpool
allTasks.Add(
Task.Run(async () =>
{
try
{
var client = new HttpClient();
var html = await client.GetStringAsync(url);
}
finally
{
throttler.Release();
}
}));
}
// won''t get here until all urls have been put into tasks
await Task.WhenAll(allTasks);
// won''t get here until all tasks have completed in some way
// (either success or exception)
}
Por último, pero probablemente una digna mención es una solución que utiliza la programación basada en TPL. Puede crear tareas delegadas en el TPL que aún no se han iniciado y permitir que un planificador de tareas personalizado limite la concurrencia. De hecho, aquí hay una muestra de MSDN:
Ver también TaskScheduler .
Desafortunadamente, al .NET Framework le faltan los combinadores más importantes para orquestar tareas asincrónicas paralelas. No hay tal cosa incorporada.
Mira la clase AsyncSemaphore construida por el más respetable Stephen Toub. Lo que quiere se llama semáforo y necesita una versión asincrónica.
El ejemplo de Theo Yaung es agradable, pero hay una variante sin lista de tareas de espera.
class SomeChecker
{
private const int ThreadCount=20;
private CountdownEvent _countdownEvent;
private SemaphoreSlim _throttler;
public Task Check(IList<string> urls)
{
_countdownEvent = new CountdownEvent(urls.Count);
_throttler = new SemaphoreSlim(ThreadCount);
return Task.Run( // prevent UI thread lock
async () =>{
foreach (var url in urls)
{
// do an async wait until we can schedule again
await _throttler.WaitAsync();
ProccessUrl(url); // NOT await
}
//instead of await Task.WhenAll(allTasks);
_countdownEvent.Wait();
});
}
private async Task ProccessUrl(string url)
{
try
{
var page = await new WebClient()
.DownloadStringTaskAsync(new Uri(url));
ProccessResult(page);
}
finally
{
_throttler.Release();
_countdownEvent.Signal();
}
}
private void ProccessResult(string page){/*....*/}
}
Hay muchas trampas y el uso directo de un semáforo puede ser complicado en casos de error, por lo que sugeriría utilizar el paquete AsyncEnumerator NuGet en lugar de reinventar la rueda:
// let''s say there is a list of 1000+ URLs
string[] urls = { "http://google.com", "http://yahoo.com", ... };
// now let''s send HTTP requests to each of these URLs in parallel
await urls.ParallelForEachAsync(async (url) => {
var client = new HttpClient();
var html = await client.GetStringAsync(url);
}, maxDegreeOfParallelism: 20);
Los cálculos paralelos se deben usar para acelerar las operaciones vinculadas a la CPU. Aquí estamos hablando de operaciones vinculadas de E / S. Su implementación debe ser puramente asincrónica , a menos que esté abrumando el núcleo único ocupado en su CPU multi-core.
EDITAR Me gusta la sugerencia hecha por usr para usar un "semáforo asíncrono" aquí.
Pregunta anterior, nueva respuesta. @vitidev tenía un bloque de código que se reutilizó casi intacto en un proyecto que revisé. Después de discutir con algunos colegas uno preguntó "¿Por qué no utilizas los métodos TPL incorporados?" ActionBlock luce como el ganador allí. https://msdn.microsoft.com/en-us/library/hh194773(v=vs.110).aspx . Probablemente no termine cambiando ningún código existente, pero definitivamente buscará adoptar este nuget y reutilizar las mejores prácticas del Sr. Softy para el paralelismo acelerado.
SemaphoreSlim puede ser muy útil aquí. Este es el método de extensión que he creado.
/// <summary>
/// Concurrently Executes async actions for each item of <see cref="IEnumerable<typeparamref name="T"/>
/// </summary>
/// <typeparam name="T">Type of IEnumerable</typeparam>
/// <param name="enumerable">instance of <see cref="IEnumerable<typeparamref name="T"/>"/></param>
/// <param name="action">an async <see cref="Action" /> to execute</param>
/// <param name="maxActionsToRunInParallel">Optional, max numbers of the actions to run in parallel,
/// Must be grater than 0</param>
/// <returns>A Task representing an async operation</returns>
/// <exception cref="ArgumentOutOfRangeException">If the maxActionsToRunInParallel is less than 1</exception>
public static async Task ForEachAsyncConcurrent<T>(
this IEnumerable<T> enumerable,
Func<T, Task> action,
int? maxActionsToRunInParallel = null)
{
if (maxActionsToRunInParallel.HasValue)
{
using (var semaphoreSlim = new SemaphoreSlim(
maxActionsToRunInParallel.Value, maxActionsToRunInParallel.Value))
{
var tasksWithThrottler = new List<Task>();
foreach (var item in enumerable)
{
// Increment the number of currently running tasks and wait if they are more than limit.
await semaphoreSlim.WaitAsync();
tasksWithThrottler.Add(Task.Run(async () =>
{
await action(item);
// action is completed, so decrement the number of currently running tasks
semaphoreSlim.Release();
}));
}
// Wait for all of the provided tasks to complete.
await Task.WhenAll(tasksWithThrottler.ToArray());
}
}
else
{
await Task.WhenAll(enumerable.Select(item => action(item)));
}
}
Uso de la muestra:
await enumerable.ForEachAsyncConcurrent(
async item =>
{
await SomeAsyncMethod(item);
},
5);
Si tiene un IEnumerable (es decir, cadenas de URL s) y desea realizar una operación de E / S encuadernada con cada una de estas (es decir, realizar una solicitud http asincrónica) simultáneamente Y, opcionalmente, también desea establecer el número máximo de concurrentes Solicitudes de E / S en tiempo real, he aquí cómo puedes hacerlo. De esta forma no se utiliza el grupo de subprocesos y otros, el método usa semáforo para controlar las solicitudes de E / S simultáneas máximas, similar a un patrón de ventana deslizante que una solicitud completa, abandona el semáforo y el siguiente entra.
uso: aguardar ForEachAsync (urlStrings, YourAsyncFunc, optionalMaxDegreeOfConcurrency);
public static Task ForEachAsync<TIn>(
IEnumerable<TIn> inputEnumerable,
Func<TIn, Task> asyncProcessor,
int? maxDegreeOfParallelism = null)
{
int maxAsyncThreadCount = maxDegreeOfParallelism ?? DefaultMaxDegreeOfParallelism;
SemaphoreSlim throttler = new SemaphoreSlim(maxAsyncThreadCount, maxAsyncThreadCount);
IEnumerable<Task> tasks = inputEnumerable.Select(async input =>
{
await throttler.WaitAsync().ConfigureAwait(false);
try
{
await asyncProcessor(input).ConfigureAwait(false);
}
finally
{
throttler.Release();
}
});
return Task.WhenAll(tasks);
}
Use MaxDegreeOfParallelism
, que es una opción que puede especificar en Parallel.ForEach()
:
var options = new ParallelOptions { MaxDegreeOfParallelism = 20 };
Parallel.ForEach(urls, options,
url =>
{
var client = new HttpClient();
var html = client.GetStringAsync(url);
// do stuff with html
});