c# buffering system.reactive

c# - Rx IObservable buffering para suavizar ráfagas de eventos



system.reactive (1)

Tengo una secuencia observable que produce eventos en ráfagas rápidas (es decir, cinco eventos uno tras otro, luego un retraso prolongado, luego otro ráfaga rápida de eventos, etc.). Quiero suavizar estas ráfagas insertando un breve retraso entre eventos. Imagina el siguiente diagrama como ejemplo:

Raw: --oooo--------------ooooo-----oo----------------ooo| Buffered: --o--o--o--o--------o--o--o--o--o--o--o---------o--o--o|

Mi enfoque actual es generar un temporizador similar a un metrónomo a través de Observable.Interval() que señala cuándo está bien extraer otro evento de la secuencia en bruto. El problema es que no puedo averiguar cómo combinar ese temporizador con mi secuencia observable sin búfer sin formato.

IObservable.Zip() está cerca de hacer lo que quiero, pero solo funciona mientras la secuencia en bruto esté produciendo eventos más rápido que el temporizador. Tan pronto como hay una pausa significativa en el flujo sin procesar, el temporizador acumula una serie de eventos no deseados que se emparejan inmediatamente con la siguiente ráfaga de eventos del flujo sin procesar.

Idealmente, quiero un método de extensión IObservable con la siguiente firma de función que produce el mejor valor que he descrito anteriormente. Ahora, ven a mi rescate StackOverflow :)

public static IObservable<T> Buffered(this IObservable<T> src, TimeSpan minDelay)

PD. Soy nuevo en Rx, así que mis disculpas si esta es una pregunta trivialmente simple ...

1. Enfoque simple pero defectuoso

Aquí está mi primera solución ingenua y simplista que tiene algunos problemas:

public static IObservable<T> Buffered<T>(this IObservable<T> source, TimeSpan minDelay) { Queue<T> q = new Queue<T>(); source.Subscribe(x => q.Enqueue(x)); return Observable.Interval(minDelay).Where(_ => q.Count > 0).Select(_ => q.Dequeue()); }

El primer problema obvio con esto es que la IDisponible devuelta por la suscripción interna a la fuente sin procesar se pierde y, por lo tanto, la suscripción no se puede terminar. Al llamar a Dispose en el IDisposible devuelto por este método, se elimina el temporizador, pero no la fuente de eventos en bruto subyacente que ahora está llenando innecesariamente la cola sin que quede nadie para extraer eventos de la cola.

El segundo problema es que no hay manera de que se promulguen excepciones o notificaciones de fin de flujo de flujo de eventos en bruto a un flujo en búfer: simplemente se ignoran cuando se suscriben a la fuente en bruto.

Y por último, pero no menos importante, ahora tengo un código que se despierta periódicamente, independientemente de si realmente hay trabajo que hacer, que preferiría evitar en este maravilloso nuevo mundo reactivo.

2. Forma demasiado compleja de nombrar

Para resolver los problemas encontrados en mi enfoque simplista inicial, escribí una función mucho más complicada que se comporta como IObservable.Delay() (usé .NET Reflector para leer ese código y lo usé como la base de mi función). Desafortunadamente, gran parte de la lógica de repetición, como AnonymousObservable no está disponible públicamente fuera del código del sistema, por lo que tuve que copiar y pegar un montón de código. Esta solución parece funcionar, pero dada su complejidad, estoy menos seguro de que está libre de errores.

Simplemente no puedo creer que no haya una manera de lograr esto usando alguna combinación de las extensiones reactivas estándar. Odio sentir que estoy reinventando innecesariamente la rueda, y el patrón que estoy tratando de construir parece ser bastante estándar.


Esto es en realidad un duplicado de una forma de enviar eventos en búfer en intervalos regulares , pero incluiré un resumen aquí (el original parece bastante confuso porque mira algunas alternativas).

public static IObservable<T> Buffered<T>(this IObservable<T> source, TimeSpan minDelay) { return source.Drain(x => Observable.Empty<int>() .Delay(minDelay) .StartWith(x) ); }

Mi implementación de Drain funciona como SelectMany , excepto que espera que la salida anterior termine primero (usted podría pensar que es ConactMany , mientras que SelectMany se SelectMany más a MergeMany ). El Drain integrado no funciona de esta manera, por lo que deberá incluir la implementación a continuación:

public static class ObservableDrainExtensions { public static IObservable<TOut> Drain<TSource, TOut>( this IObservable<TSource> source, Func<TSource, IObservable<TOut>> selector) { return Observable.Defer(() => { BehaviorSubject<Unit> queue = new BehaviorSubject<Unit>(new Unit()); return source .Zip(queue, (v, q) => v) .SelectMany(v => selector(v) .Do(_ => { }, () => queue.OnNext(new Unit())) ); }); } }