c# .net system.reactive

c# - ObserveOn y SubscribeOn: donde se realiza el trabajo



.net system.reactive (3)

A menudo .SubcribeOn que .SubcribeOn se usa para establecer el hilo donde se está ejecutando el código dentro de .Subscribe . Pero para recordar, solo piense que publicar y suscribirse debe ser un par como yin-yang. Para establecer dónde Subscribe''s code está ejecutando Subscribe''s code use ObserveOn . Para establecer dónde Observable''s code ejecutó el Observable''s code utilizó SubscribeOn . O en resumen, mantra: where-what , Subscribe-Observe , Observe-Subscribe .

Basado en leer esta pregunta: ¿Cuál es la diferencia entre SubscribeOn y ObserveOn?

ObserveOn establece donde se ObserveOn el código en el controlador de Subscribe se ejecuta:

stream.Subscribe(_ => { // this code here });

El método SubscribeOn establece en qué subproceso se realiza la configuración de la transmisión.

Me hacen comprender que si no se establecen explícitamente, entonces se usa el TaskPool.

Ahora mi pregunta es, digamos que hago algo como esto:

Observable.Interval(new Timespan(0, 0, 1)).Where(t => predicate(t)).SelectMany(t => lots_of(t)).ObserveOnDispatcher().Subscribe(t => some_action(t));

¿Dónde se SelectMany predicate Where y SelectMany lots_of dado que some_action se está ejecutando en el despachador?


Encontré la respuesta de James muy clara y completa. Sin embargo, a pesar de esto todavía me encuentro teniendo que explicar las diferencias.

Por lo tanto, creé un ejemplo muy simple / estúpido que me permite demostrar gráficamente a qué programadores se están llamando las cosas. MyScheduler un MyScheduler clase que ejecuta acciones inmediatamente, pero cambiará el color de la consola.

La salida de texto del programador SubscribeOn muestra en rojo y la del programador ObserveOn se imprime en azul.

using System; using System.Reactive.Concurrency; using System.Reactive.Disposables; using System.Reactive.Linq; namespace SchedulerExample { class Program { static void Main(string[] args) { var mydata = new[] {"A", "B", "C", "D", "E"}; var observable = Observable.Create<string>(observer => { Console.WriteLine("Observable.Create"); return mydata.ToObservable(). Subscribe(observer); }); observable. SubscribeOn(new MyScheduler(ConsoleColor.Red)). ObserveOn(new MyScheduler(ConsoleColor.Blue)). Subscribe(s => Console.WriteLine("OnNext {0}", s)); Console.ReadKey(); } } }

Esto produce:

Y para referencia MyScheduler (no adecuado para uso real):

using System; using System.Reactive.Concurrency; using System.Reactive.Disposables; namespace SchedulerExample { class MyScheduler : IScheduler { private readonly ConsoleColor _colour; public MyScheduler(ConsoleColor colour) { _colour = colour; } public IDisposable Schedule<TState>(TState state, Func<IScheduler, TState, IDisposable> action) { return Execute(state, action); } private IDisposable Execute<TState>(TState state, Func<IScheduler, TState, IDisposable> action) { var tmp = Console.ForegroundColor; Console.ForegroundColor = _colour; action(this, state); Console.ForegroundColor = tmp; return Disposable.Empty; } public IDisposable Schedule<TState>(TState state, TimeSpan dueTime, Func<IScheduler, TState, IDisposable> action) { throw new NotImplementedException(); } public IDisposable Schedule<TState>(TState state, DateTimeOffset dueTime, Func<IScheduler, TState, IDisposable> action) { throw new NotImplementedException(); } public DateTimeOffset Now { get { return DateTime.UtcNow; } } } }


Hay mucha información engañosa acerca de SubscribeOn y ObserveOn .

Resumen

  • SubscribeOn intercepta llamadas al único método de IObservable<T> , que es Subscribe , y llama a IDisposable en el identificador IDisposable devuelve Subscribe .
  • ObserveOn intercepta las llamadas a los métodos de IObserver<T> , que son OnNext , OnCompleted y OnError .
  • Ambos métodos hacen que las llamadas respectivas se realicen en el planificador especificado.

Análisis y Demostraciones

La declaración

ObserveOn establece dónde se ejecuta el código en el controlador Suscribir:

es más confuso que útil. A lo que te refieres como el "controlador de suscripciones" es realmente un controlador OnNext . Recuerde, el método Subscribe de IObservable acepta un IObserver que tiene los OnNext , OnCompleted y OnError , pero son los métodos de extensión los que proporcionan las sobrecargas de conveniencia que aceptan lambdas y crean una implementación de IObserver para usted.

Sin embargo, déjame apropiarme del término; Creo que el "controlador de suscripción" es el código en el observable que se invoca cuando se llama a Subscribe . De esta forma, la descripción anterior se asemeja más al propósito de SubscribeOn .

SuscribirEn

SubscribeOn hace que el método Subscribe de un observable se ejecute de forma asincrónica en el programador o contexto especificado. Lo utiliza cuando no desea llamar al método Subscribe en un observable del hilo en el que se está ejecutando, generalmente porque puede ser de larga ejecución y no desea bloquear el hilo de llamada.

Cuando llamas a Subscribe , llamas a un observable que puede ser parte de una larga cadena de observables. Es solo lo observable que se aplica el SubscribeOn a los efectos. Ahora bien, puede darse el caso de que todos los observables de la cadena se suscriban inmediatamente y en el mismo hilo, pero no tiene por qué ser así. Por ejemplo, piense en Concat , que solo se suscribe a cada flujo sucesivo una vez que el flujo precedente ha finalizado, y normalmente esto tendrá lugar en cualquier tema del que el flujo anterior haya llamado OnCompleted .

Entonces, SubscribeOn encuentra entre su llamada a Subscribe y la observable a la que se está suscribiendo, interceptando la llamada y haciéndola asíncrona.

También afecta la eliminación de suscripciones. Subscribe devuelve un identificador IDisposable que se utiliza para darse de baja. SubscribeOn garantiza que las llamadas a Dispose se programen en el planificador suministrado.

Un punto común de confusión al tratar de entender lo que hace SubscribeOn es que el manejador de Subscribe de un observable bien puede llamar a OnNext , OnCompleted u OnError en este mismo hilo. Sin embargo, su propósito no es afectar estas llamadas. No es raro que una transmisión se complete antes de que regrese el método de Subscribe . Observable.Return hace esto, por ejemplo. Vamos a ver.

Si usa el método Spy que escribí, y ejecute el siguiente código:

Console.WriteLine("Calling from Thread: " + Thread.CurrentThread.ManagedThreadId); var source = Observable.Return(1).Spy("Return"); source.Subscribe(); Console.WriteLine("Subscribe returned");

Obtienes esta salida (la identificación del hilo puede variar por supuesto):

Calling from Thread: 1 Return: Observable obtained on Thread: 1 Return: Subscribed to on Thread: 1 Return: OnNext(1) on Thread: 1 Return: OnCompleted() on Thread: 1 Return: Subscription completed. Subscribe returned

Puede ver que todo el controlador de suscripción ejecutó en el mismo subproceso y finalizó antes de volver.

Usemos SubscribeOn para ejecutar esto de forma asincrónica. Espiaremos tanto el Return observable como el SubscribeOn observable:

Console.WriteLine("Calling from Thread: " + Thread.CurrentThread.ManagedThreadId); var source = Observable.Return(1).Spy("Return"); source.SubscribeOn(Scheduler.Default).Spy("SubscribeOn").Subscribe(); Console.WriteLine("Subscribe returned");

Estas salidas (números de línea agregados por mí):

01 Calling from Thread: 1 02 Return: Observable obtained on Thread: 1 03 SubscribeOn: Observable obtained on Thread: 1 04 SubscribeOn: Subscribed to on Thread: 1 05 SubscribeOn: Subscription completed. 06 Subscribe returned 07 Return: Subscribed to on Thread: 2 08 Return: OnNext(1) on Thread: 2 09 SubscribeOn: OnNext(1) on Thread: 2 10 Return: OnCompleted() on Thread: 2 11 SubscribeOn: OnCompleted() on Thread: 2 12 Return: Subscription completed.

01 - El método principal se está ejecutando en el subproceso 1.

02 - el Return observable se evalúa en el hilo de llamada. Estamos recibiendo el IObservable aquí, nada se está suscribiendo todavía.

03 - el SubscribeOn observable se evalúa en el hilo de llamada.

04 - Ahora, finalmente llamamos al método de SubscribeOn de SubscribeOn .

05 - El método de Subscribe se completa de forma asíncrona ...

06 - ... y el hilo 1 vuelve al método principal. ¡Este es el efecto de SubscribeOn en acción!

07 - Mientras tanto, SubscribeOn programó una llamada en el programador predeterminado para Return . Aquí se recibe en el hilo 2.

08 - Y como lo hace Return , llama a OnNext en la OnNext Subscribe ...

09 - y SubscribeOn es solo un pase ahora.

10,11 - Lo mismo para OnCompleted

12 - Y por último, el manejador de suscripción de Return está hecho.

¡Afortunadamente eso aclara el propósito y el efecto de SubscribeOn !

ObserveOn

Si piensa en SubscribeOn como un interceptor para el método de Subscribe que pasa la llamada a un hilo diferente, ObserveOn realiza el mismo trabajo, pero para las OnNext , OnCompleted y OnError .

Recuerda nuestro ejemplo original:

Console.WriteLine("Calling from Thread: " + Thread.CurrentThread.ManagedThreadId); var source = Observable.Return(1).Spy("Return"); source.Subscribe(); Console.WriteLine("Subscribe returned");

Lo cual dio esta salida:

Calling from Thread: 1 Return: Observable obtained on Thread: 1 Return: Subscribed to on Thread: 1 Return: OnNext(1) on Thread: 1 Return: OnCompleted() on Thread: 1 Return: Subscription completed. Subscribe returned

Ahora modifiquemos esto para usar ObserveOn :

Console.WriteLine("Calling from Thread: " + Thread.CurrentThread.ManagedThreadId); var source = Observable.Return(1).Spy("Return"); source.ObserveOn(Scheduler.Default).Spy("ObserveOn").Subscribe(); Console.WriteLine("Subscribe returned");

Obtenemos el siguiente resultado:

01 Calling from Thread: 1 02 Return: Observable obtained on Thread: 1 03 ObserveOn: Observable obtained on Thread: 1 04 ObserveOn: Subscribed to on Thread: 1 05 Return: Subscribed to on Thread: 1 06 Return: OnNext(1) on Thread: 1 07 ObserveOn: OnNext(1) on Thread: 2 08 Return: OnCompleted() on Thread: 1 09 Return: Subscription completed. 10 ObserveOn: Subscription completed. 11 Subscribe returned 12 ObserveOn: OnCompleted() on Thread: 2

01 - El método principal se está ejecutando en el subproceso 1.

02 - Como antes, el Return observable se evalúa en el hilo de llamada. Estamos recibiendo el IObservable aquí, nada se está suscribiendo todavía.

03 - ObserveOn observable también se evalúa en el hilo de llamada.

04 - Ahora suscribimos, una vez más en el hilo de llamada, primero en ObserveOn observable ...

05 - ... que luego pasa la llamada al Return observable.

06 - Ahora Return llamadas OnNext en su controlador de Subscribe .

07 - Aquí está el efecto de ObserveOn . Podemos ver que OnNext está programado de forma asíncrona en el Tema 2.

08 - Mientras tanto, las llamadas de Return se OnCompleted en el subproceso 1 ...

09 - Y el controlador de suscripción de Return completa ...

10 - y luego también lo hace el controlador de suscripción de ObserveOn ...

11 - por lo que el control se devuelve al método principal

12 - Mientras tanto, ObserveOn ha OnCompleted llamada de Return de Return al Subproceso 2. Esto podría haber sucedido en cualquier momento durante el 09-11 porque se está ejecutando de forma asíncrona. Simplemente sucede que finalmente se llama ahora.

¿Cuáles son los casos de uso típicos?

Lo más frecuente es que uses SubscribeOn en una GUI cuando necesites Subscribe a un observable de larga ejecución y desees desconectarlo lo antes posible, tal vez porque sabes que es uno de esos observables que hace todo el trabajo en la suscripción. entrenador de animales. Aplíquelo al final de la cadena observable, porque este es el primer observable que se invoca cuando se suscribe.

Con mayor frecuencia, verá ObserveOn utilizado en una GUI cuando desee asegurarse de que las OnNext , OnCompleted y OnError se OnError nuevo a la OnError distribución. Aplíquelo al final de la cadena observable para hacer la transición lo más tarde posible.

Espero que pueda ver que la respuesta a su pregunta es que ObserveOnDispatcher no hará ninguna diferencia en los subprocesos en los que Where y SelectMany se ejecutan, ¡todo depende de qué flujo de hilos los llama! El controlador de suscripción de stream se invocará en el hilo de llamada, pero es imposible decir Where y SelectMany se ejecutará SelectMany sin saber cómo se implementa la stream .

Observables con tiempos de vida que duran más que la llamada Suscribir

Hasta ahora, hemos estado buscando exclusivamente en Observable.Return . Return completa su flujo dentro del controlador Subscribe . Eso no es atípico, pero es igualmente común que las transmisiones sobrevivan al manejador de Subscribe . Mira Observable.Timer por ejemplo:

Console.WriteLine("Calling from Thread: " + Thread.CurrentThread.ManagedThreadId); var source = Observable.Timer(TimeSpan.FromSeconds(1)).Spy("Timer"); source.Subscribe(); Console.WriteLine("Subscribe returned");

Esto devuelve lo siguiente:

Calling from Thread: 1 Timer: Observable obtained on Thread: 1 Timer: Subscribed to on Thread: 1 Timer: Subscription completed. Subscribe returned Timer: OnNext(0) on Thread: 2 Timer: OnCompleted() on Thread: 2

Puede ver claramente que la suscripción se completa y luego se OnNext y OnCompleted más adelante en un hilo diferente.

Tenga en cuenta que ninguna combinación de SubscribeOn u ObserveOn tendrá ningún efecto sobre qué thread o scheduler Timer OnNext invocar OnNext y OnCompleted .

Claro, puedes usar SubscribeOn para determinar el hilo de Subscribe :

Console.WriteLine("Calling from Thread: " + Thread.CurrentThread.ManagedThreadId); var source = Observable.Timer(TimeSpan.FromSeconds(1)).Spy("Timer"); source.SubscribeOn(NewThreadScheduler.Default).Spy("SubscribeOn").Subscribe(); Console.WriteLine("Subscribe returned");

(Estoy cambiando deliberadamente a NewThreadScheduler aquí para evitar confusiones en el caso de que Timer ocurra para obtener el mismo hilo del grupo de subprocesos que SubscribeOn )

Dando:

Calling from Thread: 1 Timer: Observable obtained on Thread: 1 SubscribeOn: Observable obtained on Thread: 1 SubscribeOn: Subscribed to on Thread: 1 SubscribeOn: Subscription completed. Subscribe returned Timer: Subscribed to on Thread: 2 Timer: Subscription completed. Timer: OnNext(0) on Thread: 3 SubscribeOn: OnNext(0) on Thread: 3 Timer: OnCompleted() on Thread: 3 SubscribeOn: OnCompleted() on Thread: 3

Aquí puede ver claramente el hilo principal en el hilo (1) que regresa después de sus llamadas Subscribe , pero la suscripción del Timer obtiene su propio hilo (2), pero las llamadas OnNext y OnCompleted se ejecutan en el hilo (3).

Ahora, para ObserveOn , cambiemos el código a (para quienes sigan el código, use el paquete nuget rx-wpf):

var dispatcher = Dispatcher.CurrentDispatcher; Console.WriteLine("Calling from Thread: " + Thread.CurrentThread.ManagedThreadId); var source = Observable.Timer(TimeSpan.FromSeconds(1)).Spy("Timer"); source.ObserveOnDispatcher().Spy("ObserveOn").Subscribe(); Console.WriteLine("Subscribe returned");

Este código es un poco diferente. La primera línea asegura que tenemos un despachador, y también traemos ObserveOnDispatcher , esto es como ObserveOn , excepto que especifica que debemos usar el DispatcherScheduler del hilo en el que se evalúa ObserveOnDispatcher .

Este código da el siguiente resultado:

Calling from Thread: 1 Timer: Observable obtained on Thread: 1 ObserveOn: Observable obtained on Thread: 1 ObserveOn: Subscribed to on Thread: 1 Timer: Subscribed to on Thread: 1 Timer: Subscription completed. ObserveOn: Subscription completed. Subscribe returned Timer: OnNext(0) on Thread: 2 ObserveOn: OnNext(0) on Thread: 1 Timer: OnCompleted() on Thread: 2 ObserveOn: OnCompleted() on Thread: 1

Tenga en cuenta que el despachador (y el hilo principal) son el subproceso 1. El Timer sigue llamando a OnNext y OnCompleted en el subproceso de su elección (2), pero ObserveOnDispatcher está ObserveOnDispatcher las llamadas en el subproceso del repartidor, subproceso (1).

También tenga en cuenta que si tuviéramos que bloquear el hilo del ObserveOnDispatcher (por ejemplo, por un Thread.Sleep ), veríamos que ObserveOnDispatcher bloquearía (este código funciona mejor dentro de un método principal de LINQPad):

var dispatcher = Dispatcher.CurrentDispatcher; Console.WriteLine("Calling from Thread: " + Thread.CurrentThread.ManagedThreadId); var source = Observable.Timer(TimeSpan.FromSeconds(1)).Spy("Timer"); source.ObserveOnDispatcher().Spy("ObserveOn").Subscribe(); Console.WriteLine("Subscribe returned"); Console.WriteLine("Blocking the dispatcher"); Thread.Sleep(2000); Console.WriteLine("Unblocked");

Y verás resultados como este:

Calling from Thread: 1 Timer: Observable obtained on Thread: 1 ObserveOn: Observable obtained on Thread: 1 ObserveOn: Subscribed to on Thread: 1 Timer: Subscribed to on Thread: 1 Timer: Subscription completed. ObserveOn: Subscription completed. Subscribe returned Blocking the dispatcher Timer: OnNext(0) on Thread: 2 Timer: OnCompleted() on Thread: 2 Unblocked ObserveOn: OnNext(0) on Thread: 1 ObserveOn: OnCompleted() on Thread: 1

Con las llamadas a través de ObserveOnDispatcher solo se puede salir una vez que se ha ejecutado Sleep .

Puntos clave

Es útil tener en cuenta que Reactive Extensions es esencialmente una biblioteca de subprocesos libres, y trata de ser lo más vago posible sobre el hilo en el que se ejecuta: debe interferir deliberadamente con ObserveOn , SubscribeOn y pasar programadores específicos a los operadores que los aceptan. para cambiar esto

No hay nada que un consumidor de un observable pueda hacer para controlar lo que está haciendo internamente: ObserveOn y SubscribeOn son decorators que envuelven el área de la superficie de los observadores y observables para ordenar las llamadas entre los hilos. Espero que estos ejemplos lo hayan dejado claro.