RxPY - Trabajar con el sujeto

Un sujeto es una secuencia observable, así como un observador que puede realizar multidifusión, es decir, hablar con muchos observadores que se han suscrito.

Vamos a discutir los siguientes temas sobre el tema:

  • Crea un tema
  • Suscribirse a un tema
  • Pasando datos al sujeto
  • BehaviorSubject
  • ReplaySubject
  • AsyncSubject

Crea un tema

Para trabajar con un asunto, necesitamos importar el asunto como se muestra a continuación:

from rx.subject import Subject

Puede crear un sujeto-objeto de la siguiente manera:

subject_test = Subject()

El objeto es un observador que tiene tres métodos:

  • on_next(value)
  • on_error (error) y
  • on_completed()

Suscribirse a un tema

Puede crear múltiples suscripciones sobre el tema como se muestra a continuación:

subject_test.subscribe(
   lambda x: print("The value is {0}".format(x))
)
subject_test.subscribe(
   lambda x: print("The value is {0}".format(x))
)

Pasando datos al sujeto

Puede pasar datos al sujeto creado utilizando el método on_next (valor) como se muestra a continuación:

subject_test.on_next("A")
subject_test.on_next("B")

Los datos se pasarán a toda la suscripción, agregados sobre el tema.

Aquí hay un ejemplo práctico del tema.

Ejemplo

from rx.subject import Subject
subject_test = Subject()
subject_test.subscribe(
   lambda x: print("The value is {0}".format(x))
)
subject_test.subscribe(
   lambda x: print("The value is {0}".format(x))
)
subject_test.on_next("A")
subject_test.on_next("B")

El objeto subject_test se crea llamando a un Subject (). El objeto subject_test hace referencia a los métodos on_next (valor), on_error (error) y on_completed (). La salida del ejemplo anterior se muestra a continuación:

Salida

E:\pyrx>python testrx.py
The value is A
The value is A
The value is B
The value is B

Podemos usar el método on_completed () para detener la ejecución del sujeto como se muestra a continuación.

Ejemplo

from rx.subject import Subject
subject_test = Subject()
subject_test.subscribe(
   lambda x: print("The value is {0}".format(x))
)
subject_test.subscribe(
   lambda x: print("The value is {0}".format(x))
)
subject_test.on_next("A")
subject_test.on_completed()
subject_test.on_next("B")

Una vez que llamamos a complete, no se invoca el siguiente método llamado más tarde.

Salida

E:\pyrx>python testrx.py
The value is A
The value is A

Veamos ahora cómo llamar al método on_error (error).

Ejemplo

from rx.subject import Subject
subject_test = Subject()
subject_test.subscribe(
   on_error = lambda e: print("Error : {0}".format(e))
)
subject_test.subscribe(
   on_error = lambda e: print("Error : {0}".format(e))
)
subject_test.on_error(Exception('There is an Error!'))

Salida

E:\pyrx>python testrx.py
Error: There is an Error!
Error: There is an Error!

Comportamiento Sujeto

BehaviorSubject le dará el último valor cuando se le llame. Puede crear un sujeto de comportamiento como se muestra a continuación:

from rx.subject import BehaviorSubject
behavior_subject = BehaviorSubject("Testing Behaviour Subject"); // initialized the behaviour subject with value:Testing Behaviour Subject

Aquí, hay un ejemplo práctico para usar Behavior Subject

Ejemplo

from rx.subject import BehaviorSubject
behavior_subject = BehaviorSubject("Testing Behaviour Subject");
behavior_subject.subscribe(
   lambda x: print("Observer A : {0}".format(x))
)
behavior_subject.on_next("Hello")
behavior_subject.subscribe(
   lambda x: print("Observer B : {0}".format(x))
)
behavior_subject.on_next("Last call to Behaviour Subject")

Salida

E:\pyrx>python testrx.py
Observer A : Testing Behaviour Subject
Observer A : Hello
Observer B : Hello
Observer A : Last call to Behaviour Subject
Observer B : Last call to Behaviour Subject

Asunto de repetición

Un tema de reproducción es similar al sujeto de comportamiento, en el que puede almacenar los valores en búfer y reproducir los mismos para los nuevos suscriptores. A continuación, se muestra un ejemplo práctico de tema de repetición.

Ejemplo

from rx.subject import ReplaySubject
replay_subject = ReplaySubject(2)
replay_subject.subscribe(lambda x: print("Testing Replay Subject A: {0}".format(x)))
replay_subject.on_next(1)
replay_subject.on_next(2)
replay_subject.on_next(3)
replay_subject.subscribe(lambda x: print("Testing Replay Subject B: {0}".format(x)));
replay_subject.on_next(5)

El valor de búfer utilizado es 2 en el sujeto de la repetición. Por lo tanto, los dos últimos valores se almacenarán en búfer y se utilizarán para los nuevos suscriptores llamados.

Salida

E:\pyrx>python testrx.py
Testing Replay Subject A: 1
Testing Replay Subject A: 2
Testing Replay Subject A: 3
Testing Replay Subject B: 2
Testing Replay Subject B: 3
Testing Replay Subject A: 5
Testing Replay Subject B: 5

AsyncSubject

En el caso de AsyncSubject, el último valor llamado se pasa al suscriptor, y se hará solo después de que se llame al método complete ().

Ejemplo

from rx.subject import AsyncSubject
async_subject = AsyncSubject()
async_subject.subscribe(lambda x: print("Testing Async Subject A: {0}".format(x)))
async_subject.on_next(1)
async_subject.on_next(2)
async_subject.on_completed()
async_subject.subscribe(lambda x: print("Testing Async Subject B: {0}".format(x)))
Here, before complete is called, the last value passed to the subject is 2, and the same is given to the subscribers.

Salida

E:\pyrx>python testrx.py
Testing Async Subject A: 2
Testing Async Subject B: 2