c# - remarks - Avance histórico avanzado y transmisión en vivo en Rx
remarks c# (1)
Lo que le ofrece el HistoricalScheduler
es la capacidad de controlar el movimiento de avance del tiempo virtual del programador.
Lo que no se obtiene es acceso aleatorio en el tiempo. A medida que avanza el tiempo virtual, las acciones programadas se ejecutan, por lo que deben programarse con anticipación. Cualquier acción programada en el pasado, es decir, en un momento absoluto que se encuentra detrás del valor HistoricalScheduler.Now
, se ejecuta inmediatamente.
Para reproducir los eventos, debe registrarlos de alguna manera, luego programarlos usando una instancia de un Programador HistoricalScheduler
y luego avanzar.
Cuando se avanza en el tiempo, las acciones programadas se ejecutan en sus tiempos de vencimiento, y cuando los observables envían OnXXX()
a sus suscriptores, la propiedad Now
del programador tendrá el tiempo virtual actual.
Cada suscriptor necesitará acceder a su propio programador para controlar el tiempo independientemente de otros suscriptores. Esto significa efectivamente crear un observable por suscriptor.
Este es un ejemplo rápido que he encontrado (que se ejecutaría en LINQPad si hiciera referencia al paquete nuget rx-main).
Primero grabo una transmisión en vivo (¡de una forma totalmente no productiva!) Grabando eventos en una lista. Como sugiere, el uso de TimeStamp()
funciona bien para capturar el tiempo:
/* record a live stream */
var source = Observable.Interval(TimeSpan.FromSeconds(1));
var log = source.Take(5).Timestamp().ToList().Wait();
Console.WriteLine("Time now is " + DateTime.Now);
Ahora podemos usar HistoricalScheduler combinado con el uso astuto de Generar para programar eventos. Tenga en cuenta que este enfoque evita que una tonelada de eventos programados se pongan en cola de antemano, en lugar de eso solo programamos uno a la vez:
var scheduler = new HistoricalScheduler();
/* set up the scheduling of the recording events */
var replay = Observable.Generate(
log.GetEnumerator(),
events => events.MoveNext(),
events => events,
events => events.Current.Value,
events => events.Current.Timestamp,
scheduler);
Ahora, cuando nos suscribimos, puede ver que la propiedad HistoricalScheduler
''s Now
tiene la hora virtual del evento:
replay.Subscribe(
i => Console.WriteLine("Event: {0} happened at {1}", i,
scheduler.Now));
Finalmente, podemos comenzar la programación (usando Start () solo intentamos reproducir todos los eventos, en lugar de usar AdvanceTo
para moverse a una hora específica) es como hacer AdvanceTo(DateTime.MaxValue);
scheduler.Start();
La salida para mí fue:
Time now is 07/01/2014 15:17:27
Event: 0 happened at 07/01/2014 15:17:23 +00:00
Event: 1 happened at 07/01/2014 15:17:24 +00:00
Event: 2 happened at 07/01/2014 15:17:25 +00:00
Event: 3 happened at 07/01/2014 15:17:26 +00:00
Event: 4 happened at 07/01/2014 15:17:27 +00:00
El resultado es que probablemente terminará teniendo que crear su propia API sobre esta herramienta para obtener algo que se adapte a sus propósitos particulares. Te deja bastante trabajo, pero aún así es algo bastante poderoso.
Lo bueno es que el observable en vivo y el observable reproducido realmente no se ven diferentes entre sí, siempre que recuerde que siempre debe parametrizar su programador (!), Y así pueden hacer que las mismas consultas se ejecuten fácilmente sobre ellos, con consultas temporales, todas trabajando con el Tiempo virtual del planificador.
He usado esto para probar nuevas consultas sobre datos antiguos con gran efecto en escenarios comerciales.
Lo que no intenta ser es un control de transporte , como por ejemplo, el servicio de desplazamiento hacia adelante y hacia atrás en una GUI. Por lo general, ejecuta el historial en grandes porciones, almacenando la salida de nuevas consultas y luego utiliza estos datos para su posterior visualización en una GUI para que los usuarios puedan moverse de un lado a otro a través de algún otro mecanismo que proporcione.
Finalmente, no necesita ReplaySubject
para almacenar en caché la transmisión en vivo; pero necesita algunos medios para grabar eventos para su reproducción, esto podría ser un observador que escribe en un registro.
Tengo un observable que normalmente implemento usando un Subject
normal debajo, para que los interesados puedan suscribirse a una transmisión de notificaciones en vivo.
Ahora me gustaría mantener esa transmisión en vivo, pero también exponer una transmisión histórica de todos los eventos que han sido Y se han adjuntado tiempos absolutos a esas notificaciones para saber cuándo sucedieron ASÍ ASÍ QUE los suscriptores puedan avanzar la transmisión histórica a cualquier Punto en el tiempo antes de reproducir la cronología.
- Creo que la mayor parte de esto podría lograrse con un Programador HistoricalScheduler y su método AdvanceTo, pero no estoy seguro de cómo exactamente.
- ¿Y es el uso de Timestamped para guardar los tiempos de los eventos necesarios?
- ¿Y es necesario un ReplaySubject para almacenar en caché la transmisión en vivo en registros históricos que luego podrían reproducirse utilizando el HistoricScheduler?
¿Cómo se pueden implementar esas dos secuencias exactamente para la misma fuente, o en otras palabras, cómo puede asignarse lo siguiente a los requisitos actuales?
[ver el título "Repetir el pasado" ]