c# - Observable de tareas encadenadas
async-await system.reactive (3)
Después de hacer una buena prueba, creo que hace bien el trabajo usando los operadores Rx integrados.
public static IObservable<TResult> Generate<TResult>(
Func<Task<TResult>> initialState,
Func<TResult, bool> condition,
Func<TResult, Task<TResult>> iterate,
Func<TResult, TResult> resultSelector,
IScheduler scheduler = null)
{
return Observable.Create<TResult>(o =>
{
var current = default(TResult);
return
Observable
.FromAsync(initialState)
.Select(y => resultSelector(y))
.Do(c => current = c)
.Select(x =>
Observable
.While(
() => condition(current),
Observable
.FromAsync(() => iterate(current))
.Select(y => resultSelector(y))
.Do(c => current = c))
.StartWith(x))
.Switch()
.Where(x => condition(x))
.ObserveOn(scheduler ?? Scheduler.Default)
.Subscribe(o);
});
}
He probado este código con lo siguiente:
bool Continue(IEnumerable<BrokeredMessage> prev)
{
return prev.Any();
}
Task<IEnumerable<BrokeredMessage>> ProduceFirst()
{
return
Task.FromResult(
EnumerableEx.Return(
new BrokeredMessage()
{
SequenceNumber = 1
}));
}
Task<IEnumerable<BrokeredMessage>> ProduceNext(IEnumerable<BrokeredMessage> prev)
{
return Task.FromResult(
prev.Last().SequenceNumber < 3
? EnumerableEx.Return(
new BrokeredMessage()
{
SequenceNumber = prev.Last().SequenceNumber + 1
})
: Enumerable.Empty<BrokeredMessage>());
}
public class BrokeredMessage
{
public int SequenceNumber;
}
Y ejecutando esta secuencia:
var ob = Generate(
async () => await ProduceFirst(),
prev => Continue(prev),
async prev => await ProduceNext(prev),
item => item);
Obtuve este resultado:
Mi código de prueba también usó Extensiones interactivas del equipo de Extensión reactiva - NuGet "Ix-Main".
Intento crear un Observable donde cada elemento se produce mediante una tarea asincrónica. El siguiente elemento debe producirse a través de una llamada asincrónica sobre el resultado del elemento anterior (co-recursión). En el lenguaje de "Generar", se vería algo así, excepto que Generate no es compatible con Async (ni admite al delegado en el estado inicial.
var ob = Observable.Generate(
async () => await ProduceFirst(), // Task<T> ProduceFirst()
prev => Continue(prev) // bool Continue(T);
async prev => await ProduceNext(prev) // Task<T> ProduceNext(T)
item => item
);
Como ejemplo más concreto, para ver todos los mensajes de una cola de ServiceBus obteniendo 100 mensajes a la vez, implemente ProduceFirst, Continue y ProduceNext de la siguiente manera:
Task<IEnumerable<BrokeredMessage>> ProduceFirst()
{
const int batchSize = 100;
return _serviceBusReceiver.PeekBatchAsync(batchSize);
}
bool Continue(IEnumerable<BrokeredMessage> prev)
{
return prev.Any();
}
async Task<IEnumerable<BrokeredMessage>> ProduceNext(IEnumerable<BrokeredMessage> prev)
{
const int batchSize = 100;
return (await _serviceBusReceiver.PeekBatchAsync(prev.Last().SequenceNumber, batchSize + 1)).Skip(1)
}
Y luego llame .SelectMany(i => i)
en IObservable<IEnumerable<BrokeredMessage>>
para convertirlo en IObservable<BrokeredMessage>
Donde _serviceBusReceiver es una instancia de una interfaz de la siguiente manera:
public interface IServiceBusReceiver
{
Task<IEnumerable<BrokeredMessage>> PeekBatchAsync(int batchSize);
Task<IEnumerable<BrokeredMessage>> PeekBatchAsync(long fromSequenceNumber, int batchSize);
}
Y BrokeredMessage es de https://msdn.microsoft.com/en-us/library/microsoft.servicebus.messaging.brokeredmessage.aspx
Si va a rodar su propia función de generación asíncrona, le recomendaría el uso de la programación recursiva en lugar de envolver un ciclo while.
public static IObservable<TResult> Generate<TResult>(
Func<Task<TResult>> initialState,
Func<TResult, bool> condition,
Func<TResult, Task<TResult>> iterate,
Func<TResult, TResult> resultSelector,
IScheduler scheduler = null)
{
var s = scheduler ?? Scheduler.Default;
return Observable.Create<TResult>(async obs => {
return s.Schedule(await initialState(), async (state, self) =>
{
if (!condition(state))
{
obs.OnCompleted();
return;
}
obs.OnNext(resultSelector(state));
self(await iterate(state));
});
});
}
Esto tiene un par de ventajas. Primero, puede cancelar esto, con un ciclo while simple no hay forma de cancelarlo directamente, de hecho ni siquiera regresa para la función de suscripción hasta que el observable se haya completado. En segundo lugar, esto le permite controlar la programación / asincronía de cada elemento (lo que hace que las pruebas sean fáciles), esto también lo hace un mejor ajuste general para la biblioteca
Creo que esta podría ser la respuesta correcta:
Esta no es una buena respuesta. No utilice.
Generate
un Generate
propio que admite async / await en las funciones de estado inicial + iterar:
public static IObservable<TResult> Generate<TResult>(
Func<Task<TResult>> initialState,
Func<TResult, bool> condition,
Func<TResult, Task<TResult>> iterate,
Func<TResult, TResult> resultSelector
)
{
return Observable.Create<TResult>(async obs =>
{
var state = await initialState();
while (condition(state))
{
var result = resultSelector(state);
obs.OnNext(result);
state = await iterate(state);
}
obs.OnCompleted();
return System.Reactive.Disposables.Disposable.Empty;
});
}
Desafortunadamente, esto parece tener el efecto secundario de que la producción de mensajes va mucho más allá del consumo. Si el observador procesa los mensajes lentamente, obtendrá millones de mensajes antes de procesarlos. No es exactamente lo que queremos de un autobús de servicio.
Voy a trabajar en lo anterior, tal vez leer un poco más, y publicaré una pregunta más específica si es necesario.