method - task parallel library c#
Llamar a TaskCompletionSource.SetResult de manera no bloqueante (3)
He descubierto que TaskCompletionSource.SetResult();
Invoca el código en espera de la tarea antes de volver. En mi caso eso resulta en un punto muerto.
Esta es una versión simplificada que se inicia en un Thread
ordinario
void ReceiverRun()
while (true)
{
var msg = ReadNextMessage();
TaskCompletionSource<Response> task = requests[msg.RequestID];
if(msg.Error == null)
task.SetResult(msg);
else
task.SetException(new Exception(msg.Error));
}
}
La parte "asíncrona" del código se ve así.
await SendAwaitResponse("first message");
SendAwaitResponse("second message").Wait();
La espera es en realidad anidada dentro de llamadas no asíncronas.
El SendAwaitResponse (simplificado)
public static Task<Response> SendAwaitResponse(string msg)
{
var t = new TaskCompletionSource<Response>();
requests.Add(GetID(msg), t);
stream.Write(msg);
return t.Task;
}
Mi suposición era que el segundo SendAwaitResponse se ejecutaría en un subproceso ThreadPool pero continúa en el subproceso creado para ReceiverRun.
¿Hay alguna forma de establecer el resultado de una tarea sin continuar con el código esperado?
La aplicación es una aplicación de consola .
He descubierto que TaskCompletionSource.SetResult (); Invoca el código en espera de la tarea antes de volver. En mi caso eso resulta en un punto muerto.
Sí, tengo una publicación de blog que documenta esto (AFAIK no está documentada en MSDN). El punto muerto se produce por dos cosas:
- Hay una mezcla de código
async
y de bloqueo (es decir, un métodoasync
está llamandoWait
). - Las continuaciones de la tarea se programan utilizando
TaskContinuationOptions.ExecuteSynchronously
.
Recomiendo comenzar con la solución más simple posible: eliminar lo primero (1). Es decir, no mezclar llamadas async
y de Wait
:
await SendAwaitResponse("first message");
SendAwaitResponse("second message").Wait();
En su lugar, el uso await
constantemente:
await SendAwaitResponse("first message");
await SendAwaitResponse("second message");
Si lo necesita, puede Wait
en un punto alternativo más arriba en la pila de llamadas ( no en un método async
).
Esa es mi solución más recomendada. Sin embargo, si desea intentar eliminar la segunda cosa (2), puede hacer un par de trucos: envuelva el SetResult
en un Task.Run
para forzarlo en un hilo separado (mi biblioteca AsyncEx tiene métodos de extensión *WithBackgroundContinuations
que sí lo hacen exactamente esto), o AsyncContext
su hilo un contexto real (como mi tipo AsyncContext
) y especifique ConfigureAwait(false)
, lo que hará que la continuación ignore el indicador ExecuteSynchronously
.
Pero esas soluciones son mucho más complejas que simplemente separar el código async
y de bloqueo.
Como nota al margen, eche un vistazo a TPL Dataflow ; Suena como si pudiera encontrarlo útil.
"Mi suposición era que la segunda SendAwaitResponse se ejecutaría en un subproceso ThreadPool pero continúa en el subproceso creado para ReceiverRun".
Depende completamente de lo que haga dentro de SendAwaitResponse. La asincronía y la concurrencia no son lo mismo .
Echa un vistazo a: C # 5 Async / Await - ¿es * concurrente *?
Como su aplicación es una aplicación de consola, se ejecuta en el contexto de sincronización predeterminado, donde la llamada de continuación de await
se llamará en el mismo hilo en el que se completó la tarea en espera. Si desea cambiar los hilos después de await SendAwaitResponse
, puede hacerlo con await Task.Yield()
:
await SendAwaitResponse("first message");
await Task.Yield();
// will be continued on a pool thread
// ...
SendAwaitResponse("second message").Wait(); // so no deadlock
Puede mejorar aún más esto almacenando Thread.CurrentThread.ManagedThreadId
dentro de Task.Result
y comparándolo con la identificación del subproceso actual después de la await
. Si todavía está en el mismo hilo, await Task.Yield()
.
Aunque entiendo que SendAwaitResponse
es una versión simplificada de su código real, aún es completamente sincrónico en su interior (la forma en que lo mostró en su pregunta). ¿Por qué esperarías un cambio de hilo allí?
De todos modos, probablemente debas rediseñar tu lógica de manera que no haga suposiciones sobre en qué hilo estás actualmente. Evite mezclar Task.Wait()
y Task.Wait()
y haga que todo su código sea asíncrono. Por lo general, es posible mantener solo una Wait()
algún lugar en el nivel superior (por ejemplo, dentro de Main
).
[EDITADO] Calling task.SetResult(msg)
de ReceiverRun
realidad transfiere el flujo de control al punto donde await
en la task
, sin un cambio de hilo, debido al comportamiento del contexto de sincronización predeterminado. Por lo tanto, el código que realiza el procesamiento del mensaje real está tomando control del hilo ReceiverRun
. Eventualmente, SendAwaitResponse("second message").Wait()
Se llama a SendAwaitResponse("second message").Wait()
en el mismo hilo, lo que provoca el interbloqueo.
A continuación se muestra un código de aplicación de consola, modelado después de su muestra. Utiliza await Task.Yield()
dentro de ProcessAsync
para programar la continuación en un subproceso separado, por lo que el flujo de control regresa a ReceiverRun
y no hay ningún interbloqueo.
using System;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;
namespace ConsoleApplication
{
class Program
{
class Worker
{
public struct Response
{
public string message;
public int threadId;
}
CancellationToken _token;
readonly ConcurrentQueue<string> _messages = new ConcurrentQueue<string>();
readonly ConcurrentDictionary<string, TaskCompletionSource<Response>> _requests = new ConcurrentDictionary<string, TaskCompletionSource<Response>>();
public Worker(CancellationToken token)
{
_token = token;
}
string ReadNextMessage()
{
// using Thread.Sleep(100) for test purposes here,
// should be using ManualResetEvent (or similar synchronization primitive),
// depending on how messages arrive
string message;
while (!_messages.TryDequeue(out message))
{
Thread.Sleep(100);
_token.ThrowIfCancellationRequested();
}
return message;
}
public void ReceiverRun()
{
LogThread("Enter ReceiverRun");
while (true)
{
var msg = ReadNextMessage();
LogThread("ReadNextMessage: " + msg);
var tcs = _requests[msg];
tcs.SetResult(new Response { message = msg, threadId = Thread.CurrentThread.ManagedThreadId });
_token.ThrowIfCancellationRequested(); // this is how we terminate the loop
}
}
Task<Response> SendAwaitResponse(string msg)
{
LogThread("SendAwaitResponse: " + msg);
var tcs = new TaskCompletionSource<Response>();
_requests.TryAdd(msg, tcs);
_messages.Enqueue(msg);
return tcs.Task;
}
public async Task ProcessAsync()
{
LogThread("Enter Worker.ProcessAsync");
var task1 = SendAwaitResponse("first message");
await task1;
LogThread("result1: " + task1.Result.message);
// avoid deadlock for task2.Wait() with Task.Yield()
// comment this out and task2.Wait() will dead-lock
if (task1.Result.threadId == Thread.CurrentThread.ManagedThreadId)
await Task.Yield();
var task2 = SendAwaitResponse("second message");
task2.Wait();
LogThread("result2: " + task2.Result.message);
var task3 = SendAwaitResponse("third message");
// still on the same thread as with result 2, no deadlock for task3.Wait()
task3.Wait();
LogThread("result3: " + task3.Result.message);
var task4 = SendAwaitResponse("fourth message");
await task4;
LogThread("result4: " + task4.Result.message);
// avoid deadlock for task5.Wait() with Task.Yield()
// comment this out and task5.Wait() will dead-lock
if (task4.Result.threadId == Thread.CurrentThread.ManagedThreadId)
await Task.Yield();
var task5 = SendAwaitResponse("fifth message");
task5.Wait();
LogThread("result5: " + task5.Result.message);
LogThread("Leave Worker.ProcessAsync");
}
public static void LogThread(string message)
{
Console.WriteLine("{0}, thread: {1}", message, Thread.CurrentThread.ManagedThreadId);
}
}
static void Main(string[] args)
{
Worker.LogThread("Enter Main");
var cts = new CancellationTokenSource(5000); // cancel after 5s
var worker = new Worker(cts.Token);
Task receiver = Task.Run(() => worker.ReceiverRun());
Task main = worker.ProcessAsync();
try
{
Task.WaitAll(main, receiver);
}
catch (Exception e)
{
Console.WriteLine("Exception: " + e.Message);
}
Worker.LogThread("Leave Main");
Console.ReadLine();
}
}
}
Esto no es muy diferente de hacer Task.Run(() => task.SetResult(msg))
dentro de ReceiverRun
. La única ventaja que se me ocurre es que usted tiene un control explícito sobre cuándo cambiar los hilos. De esta manera, puede permanecer en el mismo hilo el mayor tiempo posible (por ejemplo, para task2
, task3
, task4
, pero aún necesita otro cambio de hilo después de task4
para evitar un interbloqueo en task5.Wait()
).
Ambas soluciones eventualmente harían que el grupo de hilos crezca, lo que es malo en términos de rendimiento y escalabilidad.
Ahora, si reemplazamos task.Wait()
con task.Wait()
await task
todas partes dentro de ProcessAsync
en el código anterior, no tendremos que usar a la await Task.Yield
y aún no habrá interbloqueos. Sin embargo, toda la cadena de llamadas de await
después de la primera await task1
dentro de ProcessAsync
se ejecutará en el subproceso ReceiverRun
. Siempre y cuando no bloqueemos este hilo con otras llamadas al estilo Wait()
y no hagamos un montón de trabajo vinculado a la CPU mientras procesamos los mensajes, este enfoque podría funcionar bien (estilo asincrónico de await
IO) las llamadas aún deberían estar bien, y en realidad pueden desencadenar un cambio de hilo implícito).
Dicho esto, creo que necesitarías un hilo separado con un contexto de sincronización de serialización instalado en él para procesar los mensajes (similar a WindowsFormsSynchronizationContext
). Ahí es donde debe ejecutarse el código asíncrono que contiene las awaits
. Aún Task.Wait
evitar el uso de Task.Wait
en ese hilo. Y si un procesamiento de mensaje individual requiere mucho trabajo vinculado a la CPU, debe usar Task.Run
para dicho trabajo. Para llamadas asíncronas enlazadas a IO, puede permanecer en el mismo hilo.
Es posible que desee consultar ActionDispatcher
/ ActionDispatcherSynchronizationContext
de la asíncrona de Nito de para su lógica de procesamiento asíncrono de mensajes. Con suerte, Stephen interviene y proporciona una mejor respuesta.