method - task c# example
SecuenciaciĆ³n de tareas y reentrada (1)
Tengo el siguiente escenario, que creo que podría ser bastante común:
Hay una tarea (un controlador de comandos de UI) que puede completarse de forma sincrónica o asíncrona.
Los comandos pueden llegar más rápido de lo que se procesan.
Si ya hay una tarea pendiente para un comando, la nueva tarea del controlador de comandos debe ponerse en cola y procesarse secuencialmente.
El resultado de cada tarea nueva puede depender del resultado de la tarea previa.
La cancelación debe ser observada, pero me gustaría dejarla fuera del alcance de esta pregunta por simplicidad. Además, la seguridad de subprocesos (concurrencia) no es un requisito, pero debe admitirse la reentrada.
Aquí hay un ejemplo básico de lo que estoy tratando de lograr (como una aplicación de consola, por simplicidad):
using System;
using System.Threading.Tasks;
namespace ConsoleApp
{
class Program
{
static void Main(string[] args)
{
var asyncOp = new AsyncOp<int>();
Func<int, Task<int>> handleAsync = async (arg) =>
{
Console.WriteLine("this task arg: " + arg);
//await Task.Delay(arg); // make it async
return await Task.FromResult(arg); // sync
};
Console.WriteLine("Test #1...");
asyncOp.RunAsync(() => handleAsync(1000));
asyncOp.RunAsync(() => handleAsync(900));
asyncOp.RunAsync(() => handleAsync(800));
asyncOp.CurrentTask.Wait();
Console.WriteLine("/nPress any key to continue to test #2...");
Console.ReadLine();
asyncOp.RunAsync(() =>
{
asyncOp.RunAsync(() => handleAsync(200));
return handleAsync(100);
});
asyncOp.CurrentTask.Wait();
Console.WriteLine("/nPress any key to exit...");
Console.ReadLine();
}
// AsyncOp
class AsyncOp<T>
{
Task<T> _pending = Task.FromResult(default(T));
public Task<T> CurrentTask { get { return _pending; } }
public Task<T> RunAsync(Func<Task<T>> handler)
{
var pending = _pending;
Func<Task<T>> wrapper = async () =>
{
// await the prev task
var prevResult = await pending;
Console.WriteLine("/nprev task result: " + prevResult);
// start and await the handler
return await handler();
};
_pending = wrapper();
return _pending;
}
}
}
}
La salida:
Test #1... prev task result: 0 this task arg: 1000 prev task result: 1000 this task arg: 900 prev task result: 900 this task arg: 800 Press any key to continue to test #2... prev task result: 800 prev task result: 800 this task arg: 200 this task arg: 100 Press any key to exit...
Funciona de acuerdo con los requisitos, hasta que se vuelva a introducir en la prueba n. ° 2:
asyncOp.RunAsync(() =>
{
asyncOp.RunAsync(() => handleAsync(200));
return handleAsync(100);
});
El resultado deseado debe ser 100
, 200
, en lugar de 200
, 100
, porque ya hay una tarea externa pendiente para 100
. Eso es obviamente porque la tarea interna se ejecuta de forma síncrona, rompiendo la lógica var pending = _pending; /* ... */ _pending = wrapper()
var pending = _pending; /* ... */ _pending = wrapper()
para la tarea externa.
¿Cómo hacer que funcione para la prueba n. ° 2 también?
Una solución sería aplicar asincronía para cada tarea, con Task.Factory.StartNew(..., TaskScheduler.FromCurrentSynchronizationContext()
. Sin embargo, no quiero imponer la ejecución asincrónica en los controladores de comandos que podrían ser sincrónicos internamente. , No quiero depender del comportamiento de un contexto de sincronización en particular (es decir, confiar en que Task.Factory.StartNew
debería regresar antes de que la tarea creada realmente se haya iniciado).
En el proyecto de la vida real, soy responsable de lo que AsyncOp
está arriba, pero no tengo control sobre los manejadores de comandos (es decir, lo que sea que esté dentro de handleAsync
).
Casi me olvido de que es posible construir una Task
manualmente, sin iniciarla ni programarla. Luego, "Task.Factory.StartNew" frente a "new Task (...). Start" me puso nuevamente en la pista. Creo que este es uno de esos pocos casos en los que Task<TResult>
constructor Task<TResult>
puede ser realmente útil, junto con las tareas anidadas ( Task<Task<T>>
) y Task.Unwrap()
:
// AsyncOp
class AsyncOp<T>
{
Task<T> _pending = Task.FromResult(default(T));
public Task<T> CurrentTask { get { return _pending; } }
public Task<T> RunAsync(Func<Task<T>> handler, bool useSynchronizationContext = false)
{
var pending = _pending;
Func<Task<T>> wrapper = async () =>
{
// await the prev task
var prevResult = await pending;
Console.WriteLine("/nprev task result: " + prevResult);
// start and await the handler
return await handler();
};
var task = new Task<Task<T>>(wrapper);
var inner = task.Unwrap();
_pending = inner;
task.RunSynchronously(useSynchronizationContext ?
TaskScheduler.FromCurrentSynchronizationContext() :
TaskScheduler.Current);
return inner;
}
}
La salida:
Test #1... prev task result: 0 this task arg: 1000 prev task result: 1000 this task arg: 900 prev task result: 900 this task arg: 800 Press any key to continue to test #2... prev task result: 800 this task arg: 100 prev task result: 100 this task arg: 200
Ahora también es muy fácil hacer que AsyncOp
sea AsyncOp
subprocesos agregando un lock
para proteger el _pending
, si es necesario.
Actualizado , a continuación se muestra la versión más reciente de este patrón, que utiliza TaskCompletionSource
y es seguro para subprocesos:
/// <summary>
/// AsyncOperation
/// By Noseratio - http://.com/a/21427264
/// </summary>
/// <typeparam name="T">Task result type</typeparam>
class AsyncOperation<T>
{
readonly object _lock = new Object();
Task<T> _currentTask = null;
CancellationTokenSource _currentCts = null;
// a client of this class (e.g. a ViewModel) has an option
// to handle TaskSucceeded or TaskFailed, if needed
public event EventHandler<TaskEventArgs> TaskSucceeded = null;
public event EventHandler<TaskEventArgs> TaskFailing = null;
public Task<T> CurrentTask
{
get
{
lock (_lock)
return _currentTask;
}
}
public bool IsCurrent(Task task)
{
lock (_lock)
return task == _currentTask;
}
public bool IsPending
{
get
{
lock (_lock)
return _currentTask != null && !_currentTask.IsCompleted;
}
}
public bool IsCancellationRequested
{
get
{
lock (_lock)
return _currentCts != null && _currentCts.IsCancellationRequested;
}
}
public void Cancel()
{
lock (_lock)
{
if (_currentTask != null && !_currentTask.IsCompleted)
_currentCts.Cancel();
}
}
/// <summary>
/// Start the task routine and observe the result of the previous task routine
/// </summary>
/// <param name="routine"></param>
/// <param name="token"></param>
/// <param name="cancelPrevious"></param>
/// <param name="throwImmediately"></param>
public Task<T> StartAsync(
Func<CancellationToken, Task<T>> routine,
CancellationToken token,
bool cancelPrevious = true,
bool throwImmediately = true)
{
Task<T> previousTask = null; // pending instance
CancellationTokenSource previousCts = null; // pending instance CTS
CancellationTokenSource thisCts = CancellationTokenSource.CreateLinkedTokenSource(token);
TaskCompletionSource<T> thisTcs = new TaskCompletionSource<T>(); // this task
CancellationToken thisToken; // this task''s cancellation Token
Task<T> routineTask = null; // as returned by routine
lock (_lock)
{
// remember the _currentTask as previousTask
previousTask = _currentTask;
previousCts = _currentCts;
thisToken = thisCts.Token;
// set the new _currentTask
_currentTask = thisTcs.Task;
_currentCts = thisCts;
}
Action startAsync = async () =>
{
// because startAsync is "async void" method,
// any exception not handled inside it
// will be immediately thrown on the current synchronization context,
// more details: http://.com/a/22395161/1768303
// run and await this task
try
{
// await the previous task instance
if (previousTask != null)
{
if (cancelPrevious)
previousCts.Cancel();
try
{
await previousTask;
}
catch (OperationCanceledException)
{
// ignore previous cancellations
}
}
thisToken.ThrowIfCancellationRequested();
routineTask = routine(thisToken);
await routineTask;
}
catch (Exception ex)
{
// ignore cancellation
if (ex is OperationCanceledException)
{
System.Diagnostics.Debug.Print("Task cancelled, id={0}", thisTcs.Task.Id);
thisTcs.SetCanceled();
return;
}
// fire TaskFailing
System.Diagnostics.Debug.Print("Task failing, id={0}", thisTcs.Task.Id);
if (this.TaskFailing != null)
{
var args = new TaskEventArgs(thisTcs.Task, ex);
this.TaskFailing(this, args);
if (args.Handled)
{
// exception handled
// make thisTcs cancelled rather than faulted
thisTcs.SetCanceled();
return;
}
}
// exception unhandled
thisTcs.SetException(ex);
if (throwImmediately)
throw; // rethrow on the current synchronization context
// exception should be observed via CurrentTask.Exception
return;
}
// success, fire TaskSucceeded
System.Diagnostics.Debug.Print("Task succeded, id={0}", thisTcs.Task.Id);
thisTcs.SetResult(routineTask.Result);
if (this.TaskSucceeded != null)
this.TaskSucceeded(this, new TaskEventArgs(thisTcs.Task));
};
startAsync();
return thisTcs.Task;
}
// StartAsync with CancellationToken.None
public Task<T> StartAsync(
Func<CancellationToken, Task<T>> routine,
bool cancelPrevious = true,
bool throwImmediately = true)
{
return StartAsync(routine, CancellationToken.None, cancelPrevious: true, throwImmediately: true);
}
/// <summary>
/// TaskEventArgs
/// </summary>
public class TaskEventArgs : EventArgs
{
public Task<T> Task { get; private set; }
public Exception Exception { get; private set; }
public bool Handled { get; set; }
public TaskEventArgs(Task<T> task, Exception exception = null)
{
this.Task = task;
this.Exception = exception;
}
}
}