cref - remarks c#
¿Cómo recuperar la función asíncrona desde rx subscribe? (3)
Cambiar esto a:
Observable.Timer(TimeSpan.FromMilliseconds(100))
.SelectMany(async _ => await RunAsync())
.Subscribe();
Suscribirse no mantiene la operación asincrónica dentro de Observable.
Me gustaría recuperar una función asíncrona dentro de una suscripción Rx.
Por ejemplo, así:
public class Consumer
{
private readonly Service _service = new Service();
public ReplaySubject<string> Results = new ReplaySubject<string>();
public void Trigger()
{
Observable.Timer(TimeSpan.FromMilliseconds(100)).Subscribe(async _ => await RunAsync());
}
public Task RunAsync()
{
return _service.DoAsync();
}
}
public class Service
{
public async Task<string> DoAsync()
{
return await Task.Run(() => Do());
}
private static string Do()
{
Thread.Sleep(TimeSpan.FromMilliseconds(200));
throw new ArgumentException("invalid!");
return "foobar";
}
}
[Test]
public async Task Test()
{
var sut = new Consumer();
sut.Trigger();
var result = await sut.Results.FirstAsync();
}
¿Qué se necesita hacer para detectar la excepción correctamente?
No desea pasar un método async
a Subscribe
, porque eso creará un método de async void
. Haz tu mejor async void
para evitar el async void
.
En su caso, creo que lo que quiere es llamar al método async
para cada elemento de la secuencia y luego almacenar en caché todos los resultados. En ese caso, use SelectMany
para llamar al método async
para cada elemento y Replay
en caché (más un Connect
para poner en marcha la bola):
public class Consumer
{
private readonly Service _service = new Service();
public IObservable<string> Trigger()
{
var connectible = Observable.Timer(TimeSpan.FromMilliseconds(100))
.SelectMany(_ => RunAsync())
.Replay();
connectible.Connect();
return connectible;
}
public Task<string> RunAsync()
{
return _service.DoAsync();
}
}
Cambié la propiedad Results
que se devolverá del método Trigger
lugar, que creo que tiene más sentido, por lo que ahora la prueba se ve así:
[Test]
public async Task Test()
{
var sut = new Consumer();
var results = sut.Trigger();
var result = await results.FirstAsync();
}
La respuesta de Paul Betts funciona en la mayoría de los escenarios, pero si quieres bloquear la transmisión mientras esperas que termine la función asíncrona, necesitas algo como esto:
Observable.Interval(TimeSpan.FromSeconds(1))
.Select(l => Observable.FromAsync(asyncMethod))
.Concat()
.Subscribe();
O:
Observable.Interval(TimeSpan.FromSeconds(1))
.Select(_ => Observable.Defer(() => asyncMethod().ToObservable()))
.Concat()
.Subscribe();