c# - para - ¿Las extensiones reactivas son compatibles con los amortiguadores rodantes?
extensores para amortiguadores de moto (4)
Con Rx Extensions 2.0, puede responder a ambos requisitos con una nueva sobrecarga de búfer que acepta un tiempo de espera y un tamaño:
this.subscription = this.dataService
.Where(x => !string.Equals("FOO", x.Key.Source))
.Buffer(TimeSpan.FromMilliseconds(100), 1)
.ObserveOn(this.dispatcherService)
.Where(x => x.Count != 0)
.Subscribe(this.OnBufferReceived);
Consulte https://msdn.microsoft.com/en-us/library/hh229200(v=vs.103).aspx para obtener la documentación.
Estoy usando extensiones reactivas para recopilar datos en buffers de 100 ms:
this.subscription = this.dataService
.Where(x => !string.Equals("FOO", x.Key.Source))
.Buffer(TimeSpan.FromMilliseconds(100))
.ObserveOn(this.dispatcherService)
.Where(x => x.Count != 0)
.Subscribe(this.OnBufferReceived);
Esto funciona bien. Sin embargo, quiero un comportamiento ligeramente diferente al proporcionado por la operación de Buffer
. Esencialmente, quiero restablecer el temporizador si se recibe otro elemento de datos. Solo cuando no se han recibido datos para los 100 ms completos, quiero manejarlos. Esto abre la posibilidad de nunca manejar los datos, por lo que también debería poder especificar un recuento máximo. Me imagino algo como:
.SlidingBuffer(TimeSpan.FromMilliseconds(100), 10000)
He echado un vistazo y no he podido encontrar nada como esto en Rx? ¿Alguien puede confirmar / negar esto?
Escribí una extensión para hacer la mayor parte de lo que BufferWithInactivity
: BufferWithInactivity
.
Aquí está:
public static IObservable<IEnumerable<T>> BufferWithInactivity<T>(
this IObservable<T> source,
TimeSpan inactivity,
int maximumBufferSize)
{
return Observable.Create<IEnumerable<T>>(o =>
{
var gate = new object();
var buffer = new List<T>();
var mutable = new SerialDisposable();
var subscription = (IDisposable)null;
var scheduler = Scheduler.ThreadPool;
Action dump = () =>
{
var bts = buffer.ToArray();
buffer = new List<T>();
if (o != null)
{
o.OnNext(bts);
}
};
Action dispose = () =>
{
if (subscription != null)
{
subscription.Dispose();
}
mutable.Dispose();
};
Action<Action<IObserver<IEnumerable<T>>>> onErrorOrCompleted =
onAction =>
{
lock (gate)
{
dispose();
dump();
if (o != null)
{
onAction(o);
}
}
};
Action<Exception> onError = ex =>
onErrorOrCompleted(x => x.OnError(ex));
Action onCompleted = () => onErrorOrCompleted(x => x.OnCompleted());
Action<T> onNext = t =>
{
lock (gate)
{
buffer.Add(t);
if (buffer.Count == maximumBufferSize)
{
dump();
mutable.Disposable = Disposable.Empty;
}
else
{
mutable.Disposable = scheduler.Schedule(inactivity, () =>
{
lock (gate)
{
dump();
}
});
}
}
};
subscription =
source
.ObserveOn(scheduler)
.Subscribe(onNext, onError, onCompleted);
return () =>
{
lock (gate)
{
o = null;
dispose();
}
};
});
}
Esto es posible combinando los métodos incorporados de Window
y Throttle
de Observable
. Primero, resolvamos el problema más simple donde ignoramos la condición de conteo máximo:
public static IObservable<IList<T>> BufferUntilInactive<T>(this IObservable<T> stream, TimeSpan delay)
{
var closes = stream.Throttle(delay);
return stream.Window(() => closes).SelectMany(window => window.ToList());
}
El poderoso método de la Window
hizo el trabajo pesado. Ahora es bastante fácil ver cómo agregar un conteo máximo:
public static IObservable<IList<T>> BufferUntilInactive<T>(this IObservable<T> stream, TimeSpan delay, Int32? max=null)
{
var closes = stream.Throttle(delay);
if (max != null)
{
var overflows = stream.Where((x,index) => index+1>=max);
closes = closes.Merge(overflows);
}
return stream.Window(() => closes).SelectMany(window => window.ToList());
}
Escribiré un post explicando esto en mi blog. https://gist.github.com/2244036
Documentación para el método de la ventana:
Supongo que esto se puede implementar sobre el método Buffer como se muestra a continuación:
public static IObservable<IList<T>> SlidingBuffer<T>(this IObservable<T> obs, TimeSpan span, int max)
{
return Observable.CreateWithDisposable<IList<T>>(cl =>
{
var acc = new List<T>();
return obs.Buffer(span)
.Subscribe(next =>
{
if (next.Count == 0) //no activity in time span
{
cl.OnNext(acc);
acc.Clear();
}
else
{
acc.AddRange(next);
if (acc.Count >= max) //max items collected
{
cl.OnNext(acc);
acc.Clear();
}
}
}, err => cl.OnError(err), () => { cl.OnNext(acc); cl.OnCompleted(); });
});
}
NOTA: No lo he probado, pero espero que te dé la idea.