c# - method - ¿Por qué este código TAP async/await es más lento que la versión TPL?
return task c# (3)
Creo que estoy viendo el problema aquí, o al menos una parte de él. Mira de cerca los dos bits de código a continuación; no son equivalentes.
// Prime our thread pump with max threads.
for (var i = 0; i < parameters.MaxThreads; i++)
{
Task.Run((Action) StartCrmRequest, cancellationToken);
}
Y:
// Prime our thread pump with max threads.
for (var i = 0; i < parameters.MaxThreads; i++)
{
await StartCrmRequest();
}
En el código original (lo tomo como un hecho que es funcionalmente correcto) hay una sola llamada a ContinueWith
. Esa es exactamente la cantidad de declaraciones de await
que esperaría ver en una reescritura trivial si se pretende preservar el comportamiento original.
No es una regla dura y rápida y solo es aplicable en casos simples, pero, sin embargo, es algo bueno para estar atento.
Tuve que escribir una aplicación de consola llamada servicio web de Microsoft Dynamics CRM para realizar una acción en más de ocho mil objetos de CRM. Los detalles de la llamada al servicio web son irrelevantes y no se muestran aquí, pero necesitaba un cliente de múltiples subprocesos para poder hacer llamadas en paralelo. Quería poder controlar el número de subprocesos utilizados desde una configuración y también para que la aplicación cancele toda la operación si el número de errores de servicio alcanzó un umbral definido por la configuración.
Lo escribí utilizando Task Parallel Library Task.Run y ContinueWith, haciendo un seguimiento de cuántas llamadas (subprocesos) estaban en curso, cuántos errores recibimos y si el usuario había cancelado desde el teclado. Todo funcionó bien y tuve un registro extenso para asegurarme de que los hilos estaban terminando limpiamente y que todo estaba ordenado al final de la carrera. Pude ver que el programa estaba utilizando el número máximo de subprocesos en paralelo y, si se alcanzó nuestro límite máximo, esperar hasta que se completara una tarea en ejecución antes de iniciar otra.
Durante la revisión del código, mi colega sugirió que sería mejor hacerlo con async / await en lugar de tareas y continuaciones, así que creé una rama y la reescribí de esa manera. Los resultados fueron interesantes: la versión async / await fue casi el doble de lenta, y nunca alcanzó el número máximo de operaciones / subprocesos paralelos permitidos. El TPL uno siempre llegó a 10 subprocesos en paralelo, mientras que la versión async / await nunca superó los 5.
Mi pregunta es: ¿Me he equivocado al escribir el código async / await (o incluso el código TPL)? Si no lo he codificado mal, ¿puede explicar por qué el async / await es menos eficiente, y eso significa que es mejor seguir utilizando TPL para el código de múltiples subprocesos?
Tenga en cuenta que el código con el que probé en realidad no se llama CRM: la clase CrmClient simplemente duerme durante un tiempo especificado en la configuración (cinco segundos) y luego lanza una excepción. Esto significaba que no había variables externas que pudieran afectar el rendimiento.
Para los propósitos de esta pregunta, creé un programa simplificado que combina ambas versiones; cuál se llama está determinado por un ajuste de configuración. Cada uno de ellos comienza con un corredor de arranque que configura el entorno, crea la clase de cola y luego utiliza un TaskCompletionSource para esperar a que se complete. Se utiliza un CancelTokenSource para indicar una cancelación del usuario. La lista de identificadores para procesar se lee desde un archivo incrustado y se inserta en una ConcurrentQueue. Ambos comienzan a llamar a StartCrmRequest tantas veces como max-threads; posteriormente, cada vez que se procesa un resultado, el método ProcessResult vuelve a llamar a StartCrmRequest y continúa hasta que se procesan todos nuestros identificadores.
Puede clonar / descargar el programa completo desde aquí: https://bitbucket.org/kentrob/pmgfixso/
Aquí está la configuración relevante:
<appSettings>
<add key="TellUserAfterNCalls" value="5"/>
<add key="CrmErrorsBeforeQuitting" value="20"/>
<add key="MaxThreads" value="10"/>
<add key="CallIntervalMsecs" value="5000"/>
<add key="UseAsyncAwait" value="True" />
</appSettings>
Comenzando con la versión TPL, aquí está el corredor de arranque que inicia el administrador de colas:
public static class TplRunner
{
private static readonly CancellationTokenSource CancellationTokenSource = new CancellationTokenSource();
public static void StartQueue(RuntimeParameters parameters, IEnumerable<string> idList)
{
Console.CancelKeyPress += (s, args) =>
{
CancelCrmClient();
args.Cancel = true;
};
var start = DateTime.Now;
Program.TellUser("Start: " + start);
var taskCompletionSource = new TplQueue(parameters)
.Start(CancellationTokenSource.Token, idList);
while (!taskCompletionSource.Task.IsCompleted)
{
if (Console.KeyAvailable)
{
if (Console.ReadKey().Key != ConsoleKey.Q) continue;
Console.WriteLine("When all threads are complete, press any key to continue.");
CancelCrmClient();
}
}
var end = DateTime.Now;
Program.TellUser("End: {0}. Elapsed = {1} secs.", end, (end - start).TotalSeconds);
}
private static void CancelCrmClient()
{
CancellationTokenSource.Cancel();
Console.WriteLine("Cancelling Crm client. Web service calls in operation will have to run to completion.");
}
}
Aquí está el propio gestor de colas TPL:
public class TplQueue
{
private readonly RuntimeParameters parameters;
private readonly object locker = new object();
private ConcurrentQueue<string> idQueue = new ConcurrentQueue<string>();
private readonly CrmClient crmClient;
private readonly TaskCompletionSource<bool> taskCompletionSource = new TaskCompletionSource<bool>();
private int threadCount;
private int crmErrorCount;
private int processedCount;
private CancellationToken cancelToken;
public TplQueue(RuntimeParameters parameters)
{
this.parameters = parameters;
crmClient = new CrmClient();
}
public TaskCompletionSource<bool> Start(CancellationToken cancellationToken, IEnumerable<string> ids)
{
cancelToken = cancellationToken;
foreach (var id in ids)
{
idQueue.Enqueue(id);
}
threadCount = 0;
// Prime our thread pump with max threads.
for (var i = 0; i < parameters.MaxThreads; i++)
{
Task.Run((Action) StartCrmRequest, cancellationToken);
}
return taskCompletionSource;
}
private void StartCrmRequest()
{
if (taskCompletionSource.Task.IsCompleted)
{
return;
}
if (cancelToken.IsCancellationRequested)
{
Program.TellUser("Crm client cancelling...");
ClearQueue();
return;
}
var count = GetThreadCount();
if (count >= parameters.MaxThreads)
{
return;
}
string id;
if (!idQueue.TryDequeue(out id)) return;
IncrementThreadCount();
crmClient.CompleteActivityAsync(new Guid(id), parameters.CallIntervalMsecs).ContinueWith(ProcessResult);
processedCount += 1;
if (parameters.TellUserAfterNCalls > 0 && processedCount%parameters.TellUserAfterNCalls == 0)
{
ShowProgress(processedCount);
}
}
private void ProcessResult(Task<CrmResultMessage> response)
{
if (response.Result.CrmResult == CrmResult.Failed && ++crmErrorCount == parameters.CrmErrorsBeforeQuitting)
{
Program.TellUser(
"Quitting because CRM error count is equal to {0}. Already queued web service calls will have to run to completion.",
crmErrorCount);
ClearQueue();
}
var count = DecrementThreadCount();
if (idQueue.Count == 0 && count == 0)
{
taskCompletionSource.SetResult(true);
}
else
{
StartCrmRequest();
}
}
private int GetThreadCount()
{
lock (locker)
{
return threadCount;
}
}
private void IncrementThreadCount()
{
lock (locker)
{
threadCount = threadCount + 1;
}
}
private int DecrementThreadCount()
{
lock (locker)
{
threadCount = threadCount - 1;
return threadCount;
}
}
private void ClearQueue()
{
idQueue = new ConcurrentQueue<string>();
}
private static void ShowProgress(int processedCount)
{
Program.TellUser("{0} activities processed.", processedCount);
}
}
Tenga en cuenta que soy consciente de que un par de contadores no son seguros para subprocesos pero no son críticos; La variable threadCount es la única crítica.
Aquí está el método del cliente de CRM ficticio:
public Task<CrmResultMessage> CompleteActivityAsync(Guid activityId, int callIntervalMsecs)
{
// Here we would normally call a CRM web service.
return Task.Run(() =>
{
try
{
if (callIntervalMsecs > 0)
{
Thread.Sleep(callIntervalMsecs);
}
throw new ApplicationException("Crm web service not available at the moment.");
}
catch
{
return new CrmResultMessage(activityId, CrmResult.Failed);
}
});
}
Y aquí están las mismas clases async / await (con métodos comunes eliminados por razones de brevedad):
public static class AsyncRunner
{
private static readonly CancellationTokenSource CancellationTokenSource = new CancellationTokenSource();
public static void StartQueue(RuntimeParameters parameters, IEnumerable<string> idList)
{
var start = DateTime.Now;
Program.TellUser("Start: " + start);
var taskCompletionSource = new AsyncQueue(parameters)
.StartAsync(CancellationTokenSource.Token, idList).Result;
while (!taskCompletionSource.Task.IsCompleted)
{
...
}
var end = DateTime.Now;
Program.TellUser("End: {0}. Elapsed = {1} secs.", end, (end - start).TotalSeconds);
}
}
El gestor de colas async / await:
public class AsyncQueue
{
private readonly RuntimeParameters parameters;
private readonly object locker = new object();
private readonly CrmClient crmClient;
private readonly TaskCompletionSource<bool> taskCompletionSource = new TaskCompletionSource<bool>();
private CancellationToken cancelToken;
private ConcurrentQueue<string> idQueue = new ConcurrentQueue<string>();
private int threadCount;
private int crmErrorCount;
private int processedCount;
public AsyncQueue(RuntimeParameters parameters)
{
this.parameters = parameters;
crmClient = new CrmClient();
}
public async Task<TaskCompletionSource<bool>> StartAsync(CancellationToken cancellationToken,
IEnumerable<string> ids)
{
cancelToken = cancellationToken;
foreach (var id in ids)
{
idQueue.Enqueue(id);
}
threadCount = 0;
// Prime our thread pump with max threads.
for (var i = 0; i < parameters.MaxThreads; i++)
{
await StartCrmRequest();
}
return taskCompletionSource;
}
private async Task StartCrmRequest()
{
if (taskCompletionSource.Task.IsCompleted)
{
return;
}
if (cancelToken.IsCancellationRequested)
{
...
return;
}
var count = GetThreadCount();
if (count >= parameters.MaxThreads)
{
return;
}
string id;
if (!idQueue.TryDequeue(out id)) return;
IncrementThreadCount();
var crmMessage = await crmClient.CompleteActivityAsync(new Guid(id), parameters.CallIntervalMsecs);
ProcessResult(crmMessage);
processedCount += 1;
if (parameters.TellUserAfterNCalls > 0 && processedCount%parameters.TellUserAfterNCalls == 0)
{
ShowProgress(processedCount);
}
}
private async void ProcessResult(CrmResultMessage response)
{
if (response.CrmResult == CrmResult.Failed && ++crmErrorCount == parameters.CrmErrorsBeforeQuitting)
{
Program.TellUser(
"Quitting because CRM error count is equal to {0}. Already queued web service calls will have to run to completion.",
crmErrorCount);
ClearQueue();
}
var count = DecrementThreadCount();
if (idQueue.Count == 0 && count == 0)
{
taskCompletionSource.SetResult(true);
}
else
{
await StartCrmRequest();
}
}
}
Entonces, al establecer MaxThreads en 10 y CrmErrorsBeforeQuitting en 20, la versión TPL de mi máquina se completa en 19 segundos y la versión async / await tarda 35 segundos. Dado que tengo más de 8000 llamadas para hacer esto, es una diferencia significativa. ¿Algunas ideas?
Creo que sobre complicaste tu solución y terminaste no llegando a donde querías en ninguna de las dos implementaciones.
En primer lugar, las conexiones a cualquier host HTTP están limitadas por el administrador de puntos de servicio . El límite predeterminado para los entornos de cliente es 2, pero puede aumentarlo usted mismo.
No importa la cantidad de subprocesos que genere, no habrá más solicitudes activas que las de allwed.
Entonces, como alguien señaló, await
lógicamente bloquear el flujo de ejecución.
Y, finalmente, pasó su tiempo creando un AsyncQueue
cuando debería haber usado flujos de datos TPL .
Cuando se implementa con async / await, esperaría que el algoritmo de enlace de E / S se ejecute en un solo hilo. A diferencia de @KirillShlenskiy, creo que la parte responsable de "regresar" al contexto de la persona que llama no es responsable de la desaceleración. Creo que superas el conjunto de subprocesos intentando usarlo para operaciones de E / S enlazadas. Está diseñado principalmente para operaciones de cómputo.
Echa un vistazo a ForEachAsync. Siento que eso es lo que estás buscando (la discusión de Stephen Toub, encontrarás los videos de Wischik también significativos):
http://blogs.msdn.com/b/pfxteam/archive/2012/03/05/10278165.aspx
(Utilice el grado de concurrencia para reducir la huella de memoria)