c# - ObserveOn y SubscribeOn: donde se realiza el trabajo
.net system.reactive (3)
A menudo .SubcribeOn
que .SubcribeOn
se usa para establecer el hilo donde se está ejecutando el código dentro de .Subscribe
. Pero para recordar, solo piense que publicar y suscribirse debe ser un par como yin-yang. Para establecer dónde Subscribe''s code
está ejecutando Subscribe''s code
use ObserveOn
. Para establecer dónde Observable''s code
ejecutó el Observable''s code
utilizó SubscribeOn
. O en resumen, mantra: where-what
, Subscribe-Observe
, Observe-Subscribe
.
Basado en leer esta pregunta: ¿Cuál es la diferencia entre SubscribeOn y ObserveOn?
ObserveOn
establece donde se ObserveOn
el código en el controlador de Subscribe
se ejecuta:
stream.Subscribe(_ => { // this code here });
El método SubscribeOn
establece en qué subproceso se realiza la configuración de la transmisión.
Me hacen comprender que si no se establecen explícitamente, entonces se usa el TaskPool.
Ahora mi pregunta es, digamos que hago algo como esto:
Observable.Interval(new Timespan(0, 0, 1)).Where(t => predicate(t)).SelectMany(t => lots_of(t)).ObserveOnDispatcher().Subscribe(t => some_action(t));
¿Dónde se SelectMany
predicate
Where
y SelectMany
lots_of
dado que some_action
se está ejecutando en el despachador?
Encontré la respuesta de James muy clara y completa. Sin embargo, a pesar de esto todavía me encuentro teniendo que explicar las diferencias.
Por lo tanto, creé un ejemplo muy simple / estúpido que me permite demostrar gráficamente a qué programadores se están llamando las cosas. MyScheduler
un MyScheduler
clase que ejecuta acciones inmediatamente, pero cambiará el color de la consola.
La salida de texto del programador SubscribeOn
muestra en rojo y la del programador ObserveOn
se imprime en azul.
using System;
using System.Reactive.Concurrency;
using System.Reactive.Disposables;
using System.Reactive.Linq;
namespace SchedulerExample
{
class Program
{
static void Main(string[] args)
{
var mydata = new[] {"A", "B", "C", "D", "E"};
var observable = Observable.Create<string>(observer =>
{
Console.WriteLine("Observable.Create");
return mydata.ToObservable().
Subscribe(observer);
});
observable.
SubscribeOn(new MyScheduler(ConsoleColor.Red)).
ObserveOn(new MyScheduler(ConsoleColor.Blue)).
Subscribe(s => Console.WriteLine("OnNext {0}", s));
Console.ReadKey();
}
}
}
Esto produce:
Y para referencia MyScheduler (no adecuado para uso real):
using System;
using System.Reactive.Concurrency;
using System.Reactive.Disposables;
namespace SchedulerExample
{
class MyScheduler : IScheduler
{
private readonly ConsoleColor _colour;
public MyScheduler(ConsoleColor colour)
{
_colour = colour;
}
public IDisposable Schedule<TState>(TState state, Func<IScheduler, TState, IDisposable> action)
{
return Execute(state, action);
}
private IDisposable Execute<TState>(TState state, Func<IScheduler, TState, IDisposable> action)
{
var tmp = Console.ForegroundColor;
Console.ForegroundColor = _colour;
action(this, state);
Console.ForegroundColor = tmp;
return Disposable.Empty;
}
public IDisposable Schedule<TState>(TState state, TimeSpan dueTime, Func<IScheduler, TState, IDisposable> action)
{
throw new NotImplementedException();
}
public IDisposable Schedule<TState>(TState state, DateTimeOffset dueTime, Func<IScheduler, TState, IDisposable> action)
{
throw new NotImplementedException();
}
public DateTimeOffset Now
{
get { return DateTime.UtcNow; }
}
}
}
Hay mucha información engañosa acerca de SubscribeOn
y ObserveOn
.
Resumen
-
SubscribeOn
intercepta llamadas al único método deIObservable<T>
, que esSubscribe
, y llama aIDisposable
en el identificadorIDisposable
devuelveSubscribe
. -
ObserveOn
intercepta las llamadas a los métodos deIObserver<T>
, que sonOnNext
,OnCompleted
yOnError
. - Ambos métodos hacen que las llamadas respectivas se realicen en el planificador especificado.
Análisis y Demostraciones
La declaración
ObserveOn establece dónde se ejecuta el código en el controlador Suscribir:
es más confuso que útil. A lo que te refieres como el "controlador de suscripciones" es realmente un controlador OnNext
. Recuerde, el método Subscribe
de IObservable
acepta un IObserver
que tiene los OnNext
, OnCompleted
y OnError
, pero son los métodos de extensión los que proporcionan las sobrecargas de conveniencia que aceptan lambdas y crean una implementación de IObserver
para usted.
Sin embargo, déjame apropiarme del término; Creo que el "controlador de suscripción" es el código en el observable que se invoca cuando se llama a Subscribe
. De esta forma, la descripción anterior se asemeja más al propósito de SubscribeOn
.
SuscribirEn
SubscribeOn
hace que el método Subscribe
de un observable se ejecute de forma asincrónica en el programador o contexto especificado. Lo utiliza cuando no desea llamar al método Subscribe
en un observable del hilo en el que se está ejecutando, generalmente porque puede ser de larga ejecución y no desea bloquear el hilo de llamada.
Cuando llamas a Subscribe
, llamas a un observable que puede ser parte de una larga cadena de observables. Es solo lo observable que se aplica el SubscribeOn
a los efectos. Ahora bien, puede darse el caso de que todos los observables de la cadena se suscriban inmediatamente y en el mismo hilo, pero no tiene por qué ser así. Por ejemplo, piense en Concat
, que solo se suscribe a cada flujo sucesivo una vez que el flujo precedente ha finalizado, y normalmente esto tendrá lugar en cualquier tema del que el flujo anterior haya llamado OnCompleted
.
Entonces, SubscribeOn
encuentra entre su llamada a Subscribe
y la observable a la que se está suscribiendo, interceptando la llamada y haciéndola asíncrona.
También afecta la eliminación de suscripciones. Subscribe
devuelve un identificador IDisposable
que se utiliza para darse de baja. SubscribeOn
garantiza que las llamadas a Dispose
se programen en el planificador suministrado.
Un punto común de confusión al tratar de entender lo que hace SubscribeOn
es que el manejador de Subscribe
de un observable bien puede llamar a OnNext
, OnCompleted
u OnError
en este mismo hilo. Sin embargo, su propósito no es afectar estas llamadas. No es raro que una transmisión se complete antes de que regrese el método de Subscribe
. Observable.Return
hace esto, por ejemplo. Vamos a ver.
Si usa el método Spy que escribí, y ejecute el siguiente código:
Console.WriteLine("Calling from Thread: " + Thread.CurrentThread.ManagedThreadId);
var source = Observable.Return(1).Spy("Return");
source.Subscribe();
Console.WriteLine("Subscribe returned");
Obtienes esta salida (la identificación del hilo puede variar por supuesto):
Calling from Thread: 1
Return: Observable obtained on Thread: 1
Return: Subscribed to on Thread: 1
Return: OnNext(1) on Thread: 1
Return: OnCompleted() on Thread: 1
Return: Subscription completed.
Subscribe returned
Puede ver que todo el controlador de suscripción ejecutó en el mismo subproceso y finalizó antes de volver.
Usemos SubscribeOn
para ejecutar esto de forma asincrónica. Espiaremos tanto el Return
observable como el SubscribeOn
observable:
Console.WriteLine("Calling from Thread: " + Thread.CurrentThread.ManagedThreadId);
var source = Observable.Return(1).Spy("Return");
source.SubscribeOn(Scheduler.Default).Spy("SubscribeOn").Subscribe();
Console.WriteLine("Subscribe returned");
Estas salidas (números de línea agregados por mí):
01 Calling from Thread: 1
02 Return: Observable obtained on Thread: 1
03 SubscribeOn: Observable obtained on Thread: 1
04 SubscribeOn: Subscribed to on Thread: 1
05 SubscribeOn: Subscription completed.
06 Subscribe returned
07 Return: Subscribed to on Thread: 2
08 Return: OnNext(1) on Thread: 2
09 SubscribeOn: OnNext(1) on Thread: 2
10 Return: OnCompleted() on Thread: 2
11 SubscribeOn: OnCompleted() on Thread: 2
12 Return: Subscription completed.
01 - El método principal se está ejecutando en el subproceso 1.
02 - el Return
observable se evalúa en el hilo de llamada. Estamos recibiendo el IObservable
aquí, nada se está suscribiendo todavía.
03 - el SubscribeOn
observable se evalúa en el hilo de llamada.
04 - Ahora, finalmente llamamos al método de SubscribeOn
de SubscribeOn
.
05 - El método de Subscribe
se completa de forma asíncrona ...
06 - ... y el hilo 1 vuelve al método principal. ¡Este es el efecto de SubscribeOn en acción!
07 - Mientras tanto, SubscribeOn
programó una llamada en el programador predeterminado para Return
. Aquí se recibe en el hilo 2.
08 - Y como lo hace Return
, llama a OnNext
en la OnNext
Subscribe
...
09 - y SubscribeOn
es solo un pase ahora.
10,11 - Lo mismo para OnCompleted
12 - Y por último, el manejador de suscripción de Return
está hecho.
¡Afortunadamente eso aclara el propósito y el efecto de SubscribeOn
!
ObserveOn
Si piensa en SubscribeOn
como un interceptor para el método de Subscribe
que pasa la llamada a un hilo diferente, ObserveOn
realiza el mismo trabajo, pero para las OnNext
, OnCompleted
y OnError
.
Recuerda nuestro ejemplo original:
Console.WriteLine("Calling from Thread: " + Thread.CurrentThread.ManagedThreadId);
var source = Observable.Return(1).Spy("Return");
source.Subscribe();
Console.WriteLine("Subscribe returned");
Lo cual dio esta salida:
Calling from Thread: 1
Return: Observable obtained on Thread: 1
Return: Subscribed to on Thread: 1
Return: OnNext(1) on Thread: 1
Return: OnCompleted() on Thread: 1
Return: Subscription completed.
Subscribe returned
Ahora modifiquemos esto para usar ObserveOn
:
Console.WriteLine("Calling from Thread: " + Thread.CurrentThread.ManagedThreadId);
var source = Observable.Return(1).Spy("Return");
source.ObserveOn(Scheduler.Default).Spy("ObserveOn").Subscribe();
Console.WriteLine("Subscribe returned");
Obtenemos el siguiente resultado:
01 Calling from Thread: 1
02 Return: Observable obtained on Thread: 1
03 ObserveOn: Observable obtained on Thread: 1
04 ObserveOn: Subscribed to on Thread: 1
05 Return: Subscribed to on Thread: 1
06 Return: OnNext(1) on Thread: 1
07 ObserveOn: OnNext(1) on Thread: 2
08 Return: OnCompleted() on Thread: 1
09 Return: Subscription completed.
10 ObserveOn: Subscription completed.
11 Subscribe returned
12 ObserveOn: OnCompleted() on Thread: 2
01 - El método principal se está ejecutando en el subproceso 1.
02 - Como antes, el Return
observable se evalúa en el hilo de llamada. Estamos recibiendo el IObservable
aquí, nada se está suscribiendo todavía.
03 - ObserveOn
observable también se evalúa en el hilo de llamada.
04 - Ahora suscribimos, una vez más en el hilo de llamada, primero en ObserveOn
observable ...
05 - ... que luego pasa la llamada al Return
observable.
06 - Ahora Return
llamadas OnNext
en su controlador de Subscribe
.
07 - Aquí está el efecto de ObserveOn
. Podemos ver que OnNext
está programado de forma asíncrona en el Tema 2.
08 - Mientras tanto, las llamadas de Return
se OnCompleted
en el subproceso 1 ...
09 - Y el controlador de suscripción de Return
completa ...
10 - y luego también lo hace el controlador de suscripción de ObserveOn
...
11 - por lo que el control se devuelve al método principal
12 - Mientras tanto, ObserveOn
ha OnCompleted
llamada de Return
de Return
al Subproceso 2. Esto podría haber sucedido en cualquier momento durante el 09-11 porque se está ejecutando de forma asíncrona. Simplemente sucede que finalmente se llama ahora.
¿Cuáles son los casos de uso típicos?
Lo más frecuente es que uses SubscribeOn
en una GUI cuando necesites Subscribe
a un observable de larga ejecución y desees desconectarlo lo antes posible, tal vez porque sabes que es uno de esos observables que hace todo el trabajo en la suscripción. entrenador de animales. Aplíquelo al final de la cadena observable, porque este es el primer observable que se invoca cuando se suscribe.
Con mayor frecuencia, verá ObserveOn
utilizado en una GUI cuando desee asegurarse de que las OnNext
, OnCompleted
y OnError
se OnError
nuevo a la OnError
distribución. Aplíquelo al final de la cadena observable para hacer la transición lo más tarde posible.
Espero que pueda ver que la respuesta a su pregunta es que ObserveOnDispatcher
no hará ninguna diferencia en los subprocesos en los que Where
y SelectMany
se ejecutan, ¡todo depende de qué flujo de hilos los llama! El controlador de suscripción de stream se invocará en el hilo de llamada, pero es imposible decir Where
y SelectMany
se ejecutará SelectMany
sin saber cómo se implementa la stream
.
Observables con tiempos de vida que duran más que la llamada Suscribir
Hasta ahora, hemos estado buscando exclusivamente en Observable.Return
. Return
completa su flujo dentro del controlador Subscribe
. Eso no es atípico, pero es igualmente común que las transmisiones sobrevivan al manejador de Subscribe
. Mira Observable.Timer
por ejemplo:
Console.WriteLine("Calling from Thread: " + Thread.CurrentThread.ManagedThreadId);
var source = Observable.Timer(TimeSpan.FromSeconds(1)).Spy("Timer");
source.Subscribe();
Console.WriteLine("Subscribe returned");
Esto devuelve lo siguiente:
Calling from Thread: 1
Timer: Observable obtained on Thread: 1
Timer: Subscribed to on Thread: 1
Timer: Subscription completed.
Subscribe returned
Timer: OnNext(0) on Thread: 2
Timer: OnCompleted() on Thread: 2
Puede ver claramente que la suscripción se completa y luego se OnNext
y OnCompleted
más adelante en un hilo diferente.
Tenga en cuenta que ninguna combinación de SubscribeOn
u ObserveOn
tendrá ningún efecto sobre qué thread o scheduler Timer
OnNext
invocar OnNext
y OnCompleted
.
Claro, puedes usar SubscribeOn
para determinar el hilo de Subscribe
:
Console.WriteLine("Calling from Thread: " + Thread.CurrentThread.ManagedThreadId);
var source = Observable.Timer(TimeSpan.FromSeconds(1)).Spy("Timer");
source.SubscribeOn(NewThreadScheduler.Default).Spy("SubscribeOn").Subscribe();
Console.WriteLine("Subscribe returned");
(Estoy cambiando deliberadamente a NewThreadScheduler
aquí para evitar confusiones en el caso de que Timer
ocurra para obtener el mismo hilo del grupo de subprocesos que SubscribeOn
)
Dando:
Calling from Thread: 1
Timer: Observable obtained on Thread: 1
SubscribeOn: Observable obtained on Thread: 1
SubscribeOn: Subscribed to on Thread: 1
SubscribeOn: Subscription completed.
Subscribe returned
Timer: Subscribed to on Thread: 2
Timer: Subscription completed.
Timer: OnNext(0) on Thread: 3
SubscribeOn: OnNext(0) on Thread: 3
Timer: OnCompleted() on Thread: 3
SubscribeOn: OnCompleted() on Thread: 3
Aquí puede ver claramente el hilo principal en el hilo (1) que regresa después de sus llamadas Subscribe
, pero la suscripción del Timer
obtiene su propio hilo (2), pero las llamadas OnNext
y OnCompleted
se ejecutan en el hilo (3).
Ahora, para ObserveOn
, cambiemos el código a (para quienes sigan el código, use el paquete nuget rx-wpf):
var dispatcher = Dispatcher.CurrentDispatcher;
Console.WriteLine("Calling from Thread: " + Thread.CurrentThread.ManagedThreadId);
var source = Observable.Timer(TimeSpan.FromSeconds(1)).Spy("Timer");
source.ObserveOnDispatcher().Spy("ObserveOn").Subscribe();
Console.WriteLine("Subscribe returned");
Este código es un poco diferente. La primera línea asegura que tenemos un despachador, y también traemos ObserveOnDispatcher
, esto es como ObserveOn
, excepto que especifica que debemos usar el DispatcherScheduler
del hilo en el que se evalúa ObserveOnDispatcher
.
Este código da el siguiente resultado:
Calling from Thread: 1
Timer: Observable obtained on Thread: 1
ObserveOn: Observable obtained on Thread: 1
ObserveOn: Subscribed to on Thread: 1
Timer: Subscribed to on Thread: 1
Timer: Subscription completed.
ObserveOn: Subscription completed.
Subscribe returned
Timer: OnNext(0) on Thread: 2
ObserveOn: OnNext(0) on Thread: 1
Timer: OnCompleted() on Thread: 2
ObserveOn: OnCompleted() on Thread: 1
Tenga en cuenta que el despachador (y el hilo principal) son el subproceso 1. El Timer
sigue llamando a OnNext
y OnCompleted
en el subproceso de su elección (2), pero ObserveOnDispatcher
está ObserveOnDispatcher
las llamadas en el subproceso del repartidor, subproceso (1).
También tenga en cuenta que si tuviéramos que bloquear el hilo del ObserveOnDispatcher
(por ejemplo, por un Thread.Sleep
), veríamos que ObserveOnDispatcher
bloquearía (este código funciona mejor dentro de un método principal de LINQPad):
var dispatcher = Dispatcher.CurrentDispatcher;
Console.WriteLine("Calling from Thread: " + Thread.CurrentThread.ManagedThreadId);
var source = Observable.Timer(TimeSpan.FromSeconds(1)).Spy("Timer");
source.ObserveOnDispatcher().Spy("ObserveOn").Subscribe();
Console.WriteLine("Subscribe returned");
Console.WriteLine("Blocking the dispatcher");
Thread.Sleep(2000);
Console.WriteLine("Unblocked");
Y verás resultados como este:
Calling from Thread: 1
Timer: Observable obtained on Thread: 1
ObserveOn: Observable obtained on Thread: 1
ObserveOn: Subscribed to on Thread: 1
Timer: Subscribed to on Thread: 1
Timer: Subscription completed.
ObserveOn: Subscription completed.
Subscribe returned
Blocking the dispatcher
Timer: OnNext(0) on Thread: 2
Timer: OnCompleted() on Thread: 2
Unblocked
ObserveOn: OnNext(0) on Thread: 1
ObserveOn: OnCompleted() on Thread: 1
Con las llamadas a través de ObserveOnDispatcher
solo se puede salir una vez que se ha ejecutado Sleep
.
Puntos clave
Es útil tener en cuenta que Reactive Extensions es esencialmente una biblioteca de subprocesos libres, y trata de ser lo más vago posible sobre el hilo en el que se ejecuta: debe interferir deliberadamente con ObserveOn
, SubscribeOn
y pasar programadores específicos a los operadores que los aceptan. para cambiar esto
No hay nada que un consumidor de un observable pueda hacer para controlar lo que está haciendo internamente: ObserveOn
y SubscribeOn
son decorators que envuelven el área de la superficie de los observadores y observables para ordenar las llamadas entre los hilos. Espero que estos ejemplos lo hayan dejado claro.