remarks cref c# asynchronous system.reactive reactive-programming

cref - remarks c#



¿Cómo recuperar la función asíncrona desde rx subscribe? (3)

Cambiar esto a:

Observable.Timer(TimeSpan.FromMilliseconds(100)) .SelectMany(async _ => await RunAsync()) .Subscribe();

Suscribirse no mantiene la operación asincrónica dentro de Observable.

Me gustaría recuperar una función asíncrona dentro de una suscripción Rx.

Por ejemplo, así:

public class Consumer { private readonly Service _service = new Service(); public ReplaySubject<string> Results = new ReplaySubject<string>(); public void Trigger() { Observable.Timer(TimeSpan.FromMilliseconds(100)).Subscribe(async _ => await RunAsync()); } public Task RunAsync() { return _service.DoAsync(); } } public class Service { public async Task<string> DoAsync() { return await Task.Run(() => Do()); } private static string Do() { Thread.Sleep(TimeSpan.FromMilliseconds(200)); throw new ArgumentException("invalid!"); return "foobar"; } } [Test] public async Task Test() { var sut = new Consumer(); sut.Trigger(); var result = await sut.Results.FirstAsync(); }

¿Qué se necesita hacer para detectar la excepción correctamente?


No desea pasar un método async a Subscribe , porque eso creará un método de async void . Haz tu mejor async void para evitar el async void .

En su caso, creo que lo que quiere es llamar al método async para cada elemento de la secuencia y luego almacenar en caché todos los resultados. En ese caso, use SelectMany para llamar al método async para cada elemento y Replay en caché (más un Connect para poner en marcha la bola):

public class Consumer { private readonly Service _service = new Service(); public IObservable<string> Trigger() { var connectible = Observable.Timer(TimeSpan.FromMilliseconds(100)) .SelectMany(_ => RunAsync()) .Replay(); connectible.Connect(); return connectible; } public Task<string> RunAsync() { return _service.DoAsync(); } }

Cambié la propiedad Results que se devolverá del método Trigger lugar, que creo que tiene más sentido, por lo que ahora la prueba se ve así:

[Test] public async Task Test() { var sut = new Consumer(); var results = sut.Trigger(); var result = await results.FirstAsync(); }


La respuesta de Paul Betts funciona en la mayoría de los escenarios, pero si quieres bloquear la transmisión mientras esperas que termine la función asíncrona, necesitas algo como esto:

Observable.Interval(TimeSpan.FromSeconds(1)) .Select(l => Observable.FromAsync(asyncMethod)) .Concat() .Subscribe();

O:

Observable.Interval(TimeSpan.FromSeconds(1)) .Select(_ => Observable.Defer(() => asyncMethod().ToObservable())) .Concat() .Subscribe();