c# system.reactive async-await .net-4.5 c#-5.0

c# - esperando en un observable



system.reactive async-await (2)

Así que en los días tristes de C # 4.0, creé la siguiente clase "WorkflowExecutor" que permitía flujos de trabajo asíncronos en el hilo de la GUI al piratear las continuaciones de "rendimiento de rendimiento" de IEnumerable para esperar observables. Por lo tanto, el siguiente código, en button1Click, simplemente inicia un flujo de trabajo simple que actualiza el texto, espera a que haga clic en button2 y se repite después de 1 segundo.

public sealed partial class Form1 : Form { readonly Subject<Unit> _button2Subject = new Subject<Unit>(); readonly WorkflowExecutor _workflowExecutor = new WorkflowExecutor(); public Form1() { InitializeComponent(); } IEnumerable<IObservable<Unit>> CreateAsyncHandler() { Text = "Initializing"; var scheduler = new ControlScheduler(this); while (true) { yield return scheduler.WaitTimer(1000); Text = "Waiting for Click"; yield return _button2Subject; Text = "Click Detected!"; yield return scheduler.WaitTimer(1000); Text = "Restarting"; } } void button1_Click(object sender, EventArgs e) { _workflowExecutor.Run(CreateAsyncHandler()); } void button2_Click(object sender, EventArgs e) { _button2Subject.OnNext(Unit.Default); } void button3_Click(object sender, EventArgs e) { _workflowExecutor.Stop(); } } public static class TimerHelper { public static IObservable<Unit> WaitTimer(this IScheduler scheduler, double ms) { return Observable.Timer(TimeSpan.FromMilliseconds(ms), scheduler).Select(_ => Unit.Default); } } public sealed class WorkflowExecutor { IEnumerator<IObservable<Unit>> _observables; IDisposable _subscription; public void Run(IEnumerable<IObservable<Unit>> actions) { _observables = (actions ?? new IObservable<Unit>[0]).GetEnumerator(); Continue(); } void Continue() { if (_subscription != null) { _subscription.Dispose(); } if (_observables.MoveNext()) { _subscription = _observables.Current.Subscribe(_ => Continue()); } } public void Stop() { Run(null); } }

La parte inteligente de la idea, que utiliza las continuaciones de "rendimiento" para realizar el trabajo asíncrono, se tomó de la idea AsyncIOPipe de Daniel Earwicker: http://smellegantcode.wordpress.com/2008/12/05/asynchronous-sockets-with-yield-return-of-lambdas/ , luego agregué el marco reactivo encima de él.

Ahora tengo problemas para reescribir esto usando la función asíncrona en C # 5.0, pero parece que debería ser algo sencillo de hacer. Cuando convierto los observables en tareas, solo se ejecutan una vez y el bucle while se bloquea la segunda vez. Cualquier ayuda para arreglar eso sería genial.

Todo lo que dijo / preguntó, ¿qué me da el mecanismo de async / await que no funciona el WorkflowExecutor? ¿Hay algo que pueda hacer con async / await que no pueda hacer (dada una cantidad de código similar) con WorkflowExecutor?


Como mencionó James, puede esperar una secuencia <T> de IObservable que comience con Rx v2.0 Beta. El comportamiento es devolver el último elemento (antes de OnCompleted), o lanzar el OnError que se observó. Si la secuencia no contiene elementos, obtendrás una InvalidOperationException.

Note que usando esto, puede obtener todos los otros comportamientos deseados:

  • Obtenga el primer elemento esperando xs.FirstAsync ()
  • Asegúrese de que solo haya un valor único esperando xs.SingleAsync ()
  • Cuando esté bien con una secuencia vacía, espere xs.DefaultIfEmpty ()
  • Para obtener todos los elementos, espere xs.ToArray () o espere xs.ToList ()

Puedes hacer cosas aún más sofisticadas, como calcular el resultado de una agregación pero observar valores intermedios utilizando Do y Scan:

var xs = Observable.Range(0, 10, Scheduler.Default); var res = xs.Scan((x, y) => x + y) .Do(x => { Console.WriteLine("Busy. Current sum is {0}", x); }); Console.WriteLine("Done! The sum is {0}", await res);


Como se dio cuenta, Task es una cosa que solo se usa una sola vez, a diferencia del "flujo de eventos" de Observable. Una buena manera de pensar en esto (IMHO) es el gráfico 2x2 en la publicación del equipo Rx sobre 2.0 Beta :

Dependiendo de las circunstancias (una sola vez en comparación con la ''secuencia'' de eventos), mantener Observable podría tener más sentido.

Si puedes saltar a la Beta reactiva 2.0, entonces puedes "esperar" a los observables con eso. Por ejemplo, mi propio intento de obtener una versión ''aproximada / asincrónica'' (aproximada) de su código sería:

public sealed partial class Form1 : Form { readonly Subject<Unit> _button2Subject = new Subject<Unit>(); private bool shouldRun = false; public Form1() { InitializeComponent(); } async Task CreateAsyncHandler() { Text = "Initializing"; while (shouldRun) { await Task.Delay(1000); Text = "Waiting for Click"; await _button2Subject.FirstAsync(); Text = "Click Detected!"; await Task.Delay(1000); Text = "Restarting"; } } async void button1_Click(object sender, EventArgs e) { shouldRun = true; await CreateAsyncHandler(); } void button2_Click(object sender, EventArgs e) { _button2Subject.OnNext(Unit.Default); } void button3_Click(object sender, EventArgs e) { shouldRun = false; } }