RxPY - Guía rápida

Este capítulo explica qué es la programación reactiva, qué es RxPY, sus operadores, características, ventajas y desventajas.

¿Qué es la programación reactiva?

La programación reactiva es un paradigma de programación que se ocupa del flujo de datos y la propagación del cambio. Significa que, cuando un componente emite un flujo de datos, el cambio se propagará a otros componentes mediante una biblioteca de programación reactiva. La propagación del cambio continuará hasta que llegue al receptor final.

Al usar RxPY, tiene un buen control sobre los flujos de datos asincrónicos, por ejemplo, una solicitud realizada a la URL se puede rastrear mediante el uso de observable y el observador para escuchar cuando la solicitud está completa para una respuesta o error.

RxPY le ofrece manejar flujos de datos asincrónicos usando Observables, consulta los flujos de datos usando Operators es decir, filtrar, sumar, concatizar, asignar y también hacer uso de la concurrencia para los flujos de datos utilizando Schedulers. Crear un Observable, le da a un objeto observador con los métodos on_next (v), on_error (e) y on_completed (), que necesita sersubscribed para que recibamos una notificación cuando ocurra un evento.

El Observable se puede consultar usando múltiples operadores en un formato de cadena usando el operador de tubería.

RxPY ofrece operadores en varias categorías como: -

  • Operadores matemáticos

  • Operadores de transformación

  • Operadores de filtrado

  • Operadores de manejo de errores

  • Operadores de servicios públicos

  • Operadores condicionales

  • Operadores de creación

  • Operadores conectables

Estos operadores se explican en detalle en este tutorial.

¿Qué es RxPy?

RxPY se define como a library for composing asynchronous and event-based programs using observable collections and pipable query operators in Python según el sitio web oficial de RxPy, que es https://rxpy.readthedocs.io/en/latest/.

RxPY es una biblioteca de Python para admitir la programación reactiva. RxPy significaReactive Extensions for Python. Es una biblioteca que usa observables para trabajar con programación reactiva que se ocupa de llamadas de datos asíncronos, devoluciones de llamada y programas basados ​​en eventos.

Características de RxPy

En RxPy, los siguientes conceptos se encargan de manejar la tarea asincrónica:

Observable

Un observable es una función que crea un observador y lo adjunta a la fuente que tiene flujos de datos que se esperan de, por ejemplo, tweets, eventos relacionados con la computadora, etc.

Observador

Es un objeto con los métodos on_next (), on_error () y on_completed (), que se llamará cuando haya interacción con el observable, es decir, la fuente interactúa para un ejemplo de Tweets entrantes, etc.

Suscripción

Cuando se crea el observable, para ejecutar el observable debemos suscribirnos a él.

Operadores

Un operador es una función pura que toma un observable como entrada y la salida también es un observable. Puede utilizar varios operadores en datos observables mediante el operador de tubería.

Tema

Un sujeto es una secuencia observable, así como un observador que puede realizar multidifusión, es decir, hablar con muchos observadores que se han suscrito. El sujeto es un observable frío, es decir, los valores serán compartidos entre los observadores que se hayan suscrito.

Programadores

Una característica importante de RxPy es la concurrencia, es decir, permitir que la tarea se ejecute en paralelo. Para que eso suceda, RxPy tiene dos operadores subscribe_on () y observe_on () que trabajan con planificadores y decidirán la ejecución de la tarea suscrita.

Ventajas de usar RxPY

Las siguientes son las ventajas de RxPy:

  • RxPY es una biblioteca impresionante cuando se trata del manejo de eventos y flujos de datos asíncronos. RxPY utiliza observables para trabajar con programación reactiva que se ocupa de llamadas de datos asíncronos, devoluciones de llamada y programas basados ​​en eventos.

  • RxPY ofrece una gran colección de operadores en categorías matemáticas, de transformación, de filtrado, de utilidad, condicional, de manejo de errores y de unión que facilita la vida cuando se usa con programación reactiva.

  • La simultaneidad, es decir, el trabajo de múltiples tareas juntas se logra usando programadores en RxPY.

  • El rendimiento se mejora con RxPY ya que el manejo de tareas asíncronas y el procesamiento en paralelo se simplifica.

Desventaja de usar RxPY

  • Depurar el código con observables es un poco difícil.

En este capítulo trabajaremos en la instalación de RxPy. Para comenzar a trabajar con RxPY, primero debemos instalar Python. Entonces, vamos a trabajar en lo siguiente:

  • Instalar Python
  • Instalar RxPy

Instalación de Python

Vaya al sitio oficial de Python: https://www.python.org/downloads/.como se muestra a continuación, y haga clic en la última versión disponible para Windows, Linux / Unix y mac os. Descargue Python según su sistema operativo de 64 o 32 bits disponible con usted.

Una vez que haya descargado, haga clic en el .exe file y siga los pasos para instalar Python en su sistema.

El administrador de paquetes de Python, es decir, pip, también se instalará por defecto con la instalación anterior. Para que funcione globalmente en su sistema, agregue directamente la ubicación de python a la variable PATH, lo mismo se muestra al inicio de la instalación, para recordar marcar la casilla de verificación, que dice ADD to PATH. En caso de que olvide verificarlo, siga los pasos que se indican a continuación para agregarlo a PATH.

Para agregar a PATH, siga los pasos a continuación:

Haga clic con el botón derecho en el icono de su computadora y haga clic en propiedades → Configuración avanzada del sistema.

Mostrará la pantalla como se muestra a continuación:

Haga clic en Variables de entorno como se muestra arriba. Mostrará la pantalla como se muestra a continuación:

Seleccione Ruta y haga clic en el botón Editar, agregue la ruta de ubicación de su python al final. Ahora, revisemos la versión de Python.

Comprobando la versión de Python

E:\pyrx>python --version
Python 3.7.3

Instalar RxPY

Ahora que tenemos Python instalado, vamos a instalar RxPy.

Una vez que se instala Python, también se instalará el administrador de paquetes de Python, es decir, pip. A continuación se muestra el comando para verificar la versión de pip:

E:\pyrx>pip --version
pip 19.1.1 from c:\users\xxxx\appdata\local\programs\python\python37\lib\site-
packages\pip (python 3.7)

Tenemos pip instalado y la versión es 19.1.1. Ahora, usaremos pip para instalar RxPy

El comando es el siguiente:

pip install rx

En este tutorial, usamos RxPY versión 3 y python versión 3.7.3. El funcionamiento de RxPY versión 3 difiere un poco con la versión anterior, es decir, RxPY versión 1.

En este capítulo, vamos a discutir las diferencias entre las 2 versiones y los cambios que deben realizarse en caso de que esté actualizando las versiones de Python y RxPY.

Observable en RxPY

En la versión 1 de RxPy, Observable era una clase separada:

from rx import Observable

Para usar el Observable, debe usarlo de la siguiente manera:

Observable.of(1,2,3,4,5,6,7,8,9,10)

En RxPy versión 3, Observable es directamente parte del paquete rx.

Example

import rx
rx.of(1,2,3,4,5,6,7,8,9,10)

Operadores en RxPy

En la versión 1, el operador era métodos en la clase Observable. Por ejemplo, para hacer uso de operadores tenemos que importar Observable como se muestra a continuación:

from rx import Observable

Los operadores se utilizan como Observable.operator, por ejemplo, como se muestra a continuación:

Observable.of(1,2,3,4,5,6,7,8,9,10)\
   .filter(lambda i: i %2 == 0) \
   .sum() \
   .subscribe(lambda x: print("Value is {0}".format(x)))

En el caso de la versión 3 de RxPY, los operadores funcionan y se importan y usan de la siguiente manera:

import rx
from rx import operators as ops
rx.of(1,2,3,4,5,6,7,8,9,10).pipe(
   ops.filter(lambda i: i %2 == 0),
   ops.sum()
).subscribe(lambda x: print("Value is {0}".format(x)))

Encadenamiento de operadores mediante el método Pipe ()

En la versión 1 de RxPy, en caso de que tuviera que usar varios operadores en un observable, tenía que hacer lo siguiente:

Example

from rx import Observable
Observable.of(1,2,3,4,5,6,7,8,9,10)\
   .filter(lambda i: i %2 == 0) \
   .sum() \
   .subscribe(lambda x: print("Value is {0}".format(x)))

Pero, en el caso de la versión 3 de RxPY, puede usar el método pipe () y múltiples operadores como se muestra a continuación:

Example

import rx
from rx import operators as ops
rx.of(1,2,3,4,5,6,7,8,9,10).pipe(
   ops.filter(lambda i: i %2 == 0),
   ops.sum()
).subscribe(lambda x: print("Value is {0}".format(x)))

Un observable es una función que crea un observador y lo adjunta a la fuente donde se esperan valores, por ejemplo, clics, eventos de mouse de un elemento dom, etc.

Los temas que se mencionan a continuación se estudiarán en detalle en este capítulo.

  • Crear observables

  • Suscribir y ejecutar un Observable

Crea observables

Para crear un observable usaremos create() y pasarle la función que tiene los siguientes elementos.

  • on_next() - Esta función se llama cuando el Observable emite un elemento.

  • on_completed() - Esta función se llama cuando se completa el Observable.

  • on_error() - Esta función se llama cuando ocurre un error en el Observable.

Para trabajar con el método create (), primero importe el método como se muestra a continuación:

from rx import create

Aquí hay un ejemplo práctico, para crear un observable:

testrx.py

from rx import create
deftest_observable(observer, scheduler):
   observer.on_next("Hello")
   observer.on_error("Error")
   observer.on_completed()
source = create(test_observable).

Suscribir y ejecutar un Observable

Para suscribirnos a un observable, necesitamos usar la función subscribe () y pasar la función de devolución de llamada on_next, on_error y on_completed.

Aquí hay un ejemplo de trabajo:

testrx.py

from rx import create
deftest_observable(observer, scheduler):
   observer.on_next("Hello")
   observer.on_completed()
source = create(test_observable)
source.subscribe(
   on_next = lambda i: print("Got - {0}".format(i)),
   on_error = lambda e: print("Error : {0}".format(e)),
   on_completed = lambda: print("Job Done!"),
)

El método subscribe () se encarga de ejecutar lo observable. La función de devolución de llamadaon_next, on_error y on_completeddebe pasarse al método de suscripción. La llamada al método de suscripción, a su vez, ejecuta la función test_observable ().

No es obligatorio pasar las tres funciones de devolución de llamada al método subscribe (). Puede pasar según sus requisitos on_next (), on_error () y on_completed ().

La función lambda se usa para on_next, on_error y on_completed. Tomará los argumentos y ejecutará la expresión dada.

Aquí está la salida, del observable creado:

E:\pyrx>python testrx.py
Got - Hello
Job Done!

Este capítulo explica en detalle los operadores en RxPY. Estos operadores incluyen:

  • Trabajar con operadores
  • Operadores matemáticos
  • Operadores de transformación
  • Operadores de filtrado
  • Operadores de manejo de errores
  • Operadores de servicios públicos
  • Operadores condicionales
  • Operadores de creación
  • Operadores conectables
  • Combinando operadores

Python reactivo (Rx) tiene casi muchos operadores, que facilitan la vida con la codificación de Python. Puede usar estos múltiples operadores juntos, por ejemplo, mientras trabaja con cadenas, puede usar operadores de mapa, filtro y combinación.

Trabajar con operadores

Puede trabajar con varios operadores juntos usando el método pipe (). Este método permite encadenar varios operadores juntos.

Aquí hay un ejemplo práctico del uso de operadores:

test = of(1,2,3) // an observable
subscriber = test.pipe(
   op1(),
   op2(),
   op3()
)

En el ejemplo anterior, hemos creado un método observable usando of () que toma los valores 1, 2 y 3. Ahora, en este observable, puede realizar una operación diferente, usando cualquier número de operadores usando el método pipe () como se muestra encima. La ejecución de los operadores continuará secuencialmente sobre el dato observable.

Para trabajar con operadores, primero impórtelo como se muestra a continuación:

from rx import of, operators as op

Aquí hay un ejemplo práctico:

testrx.py

from rx import of, operators as op
test = of(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
sub1 = test.pipe(
   op.filter(lambda s: s%2==0),
   op.reduce(lambda acc, x: acc + x)
)
sub1.subscribe(lambda x: print("Sum of Even numbers is {0}".format(x)))

En el ejemplo anterior, hay una lista de números, de la cual estamos filtrando números pares usando un operador de filtro y luego agregándolos usando un operador de reducción.

Output

E:\pyrx>python testrx.py
Sum of Even numbers is 30

Aquí hay una lista de operadores, que vamos a discutir:

  • Creando Observables
  • Operadores matemáticos
  • Operadores de transformación
  • Operadores de filtrado
  • Operadores de manejo de errores
  • Operadores de servicios públicos
  • Conditional
  • Connectable
  • Combinando operadores

Creando Observables

A continuación se muestran los observables que vamos a discutir en la categoría Creación.

Mostrar ejemplos

Observable Descripción
crear Este método se utiliza para crear un observable.
vacío Este observable no generará nada y emitirá directamente el estado completo.
Nunca Este método crea un observable que nunca alcanzará el estado completo.
lanzar Este método creará un observable que arrojará un error.
desde_ Este método convertirá la matriz u objeto dado en un observable.
intervalo Este método dará una serie de valores producidos después de un tiempo de espera.
sólo Este método convertirá el valor dado en un observable.
rango Este método dará un rango de números enteros según la entrada dada.
repetir_valor Este método creará un observable que repetirá el valor dado según se dé el recuento.
comienzo Este método toma una función como entrada y devuelve un observable que devolverá el valor de la función de entrada.
Temporizador Este método emitirá los valores en secuencia después de que finalice el tiempo de espera.

Operadores matemáticos

Los operadores que vamos a discutir en la categoría de operador matemático son los siguientes: -

Mostrar ejemplos

Operador Descripción
promedio Este operador calculará el promedio de la fuente observable dada y generará un observable que tendrá el valor promedio.
concat Este operador tomará dos o más observables y se le dará un solo observable con todos los valores en la secuencia.
contar

Este operador toma un Observable con valores y lo convierte en un Observable que tendrá un solo valor. La función de conteo toma la función de predicado como un argumento opcional.

La función es de tipo booleano y agregará valor a la salida solo si cumple la condición.

max Este operador dará un observable con el valor máximo de la fuente observable.
min Este operador dará un observable con un valor mínimo de la fuente observable.
reducir Este operador toma una función llamada función acumuladora que se usa en los valores provenientes de la fuente observable y devuelve los valores acumulados en forma de observable, con un valor semilla opcional pasado a la función acumuladora.
suma Este operador devolverá un observable con la suma de todos los valores de los observables de origen.

Operadores de transformación

Los operadores que vamos a discutir en la categoría de operador de transformación se mencionan a continuación:

Mostrar ejemplos

Operador Categoría
buffer Este operador recopilará todos los valores de la fuente observable y los emitirá a intervalos regulares una vez que se cumpla la condición de límite dada.
suelo_por Este operador agrupará los valores provenientes de la fuente observable en función de la función key_mapper dada.
mapa Este operador cambiará cada valor de la fuente observable a un nuevo valor basado en la salida del mapper_func dado.
escanear Este operador aplicará una función de acumulador a los valores provenientes de la fuente observable y devolverá un observable con nuevos valores.

Operadores de filtrado

Los operadores que vamos a discutir en la categoría de operadores de filtrado se dan a continuación:

Mostrar ejemplos

Operador Categoría
rebote Este operador dará los valores de la fuente observables, hasta que pase el intervalo de tiempo dado e ignorará el resto del tiempo.
distinto Este operador dará todos los valores que son distintos de la fuente observable.
element_at Este operador dará un elemento de la fuente observable para el índice dado.
filtrar Este operador filtrará los valores de la fuente observable según la función de predicado dada.
primero Este operador dará el primer elemento de la fuente observable.
ignore_elements Este operador ignorará todos los valores de la fuente observable y solo ejecutará llamadas a funciones de devolución de llamada completas o de error.
último Este operador dará el último elemento observable de la fuente.
omitir Este operador devolverá un observable que omitirá la primera aparición de elementos de recuento tomados como entrada.
skip_last Este operador devolverá un observable que omitirá la última aparición de elementos de recuento tomados como entrada.
tomar Este operador dará una lista de valores fuente en orden continuo según el recuento dado.
take_last Este operador dará una lista de valores fuente en orden continuo desde el último en función del recuento proporcionado.

Operadores de manejo de errores

Los operadores que vamos a discutir en la categoría de operador de manejo de errores son: -

Mostrar ejemplos

Operador Descripción
captura Este operador terminará la fuente observable cuando haya una excepción.
rever Este operador volverá a intentar en la fuente observable cuando haya un error y una vez que se complete el recuento de reintentos, terminará.

Operadores de servicios públicos

Los siguientes son los operadores que vamos a discutir en la categoría de operador de servicios públicos.

Mostrar ejemplos

Operador Descripción
retrasar Este operador retrasará la emisión observable de la fuente según la hora o fecha indicada.
materializar Este operador convertirá los valores de la fuente observable con los valores emitidos en forma de valores de notificación explícitos.
intervalo de tiempo Este operador dará el tiempo transcurrido entre los valores de la fuente observable.
se acabó el tiempo Este operador dará todos los valores de la fuente observables después del tiempo transcurrido o de lo contrario provocará un error.
marca de tiempo Este operador adjuntará una marca de tiempo a todos los valores de la fuente observable.

Operadores condicionales y booleanos

Los operadores que vamos a discutir en la categoría de operador condicional y booleano son los que se indican a continuación:

Mostrar ejemplos

Operador Descripción
todas Este operador comprobará si todos los valores de la fuente observable satisfacen la condición dada.
contiene Este operador devolverá un observable con el valor verdadero o falso si el valor dado está presente y si es el valor de la fuente observable.
default_if_empty Este operador devolverá un valor predeterminado si la fuente observable está vacía.
sequence_equal Este operador comparará dos secuencias de observables o una matriz de valores y devolverá un observable con el valor verdadero o falso.
saltar_hasta Este operador descartará valores de la fuente observable hasta que el segundo observable emita un valor.
saltar_mientras Este operador devolverá un observable con valores de la fuente observable que satisfaga la condición pasada.
tomar_hasta Este operador descartará los valores de la fuente observable después de que el segundo observable emita un valor o finalice.
tomar_mientras Este operador descartará los valores de la fuente observables cuando la condición falla.

Operadores conectables

Los operadores que vamos a discutir en la categoría de Operador conectable son:

Mostrar ejemplos

Operador Descripción
publicar Este método convertirá lo observable en un observable conectable.
ref_count Este operador hará que lo observable sea un observable normal.
repetición Este método funciona de manera similar a replaySubject. Este método devolverá los mismos valores, incluso si el observable ya ha emitido y algunos de los suscriptores se retrasan en la suscripción.

Operadores de combinación

Los siguientes son los operadores que vamos a discutir en la categoría Operador de combinación.

Mostrar ejemplos

Operador Descripción
combine_latest Este operador creará una tupla para el observable dado como entrada.
unir Este operador fusionará los observables dados.
Empezar con Este operador tomará los valores dados y agregará al comienzo de la fuente observable y devolverá la secuencia completa.
Código Postal Este operador devuelve un observable con valores en forma de tupla que se forma tomando el primer valor del observable dado y así sucesivamente.

Un sujeto es una secuencia observable, así como un observador que puede realizar multidifusión, es decir, hablar con muchos observadores que se han suscrito.

Vamos a discutir los siguientes temas sobre el tema:

  • Crea un tema
  • Suscribirse a un tema
  • Pasando datos al sujeto
  • BehaviorSubject
  • ReplaySubject
  • AsyncSubject

Crea un tema

Para trabajar con un asunto, necesitamos importar el asunto como se muestra a continuación:

from rx.subject import Subject

Puede crear un sujeto-objeto de la siguiente manera:

subject_test = Subject()

El objeto es un observador que tiene tres métodos:

  • on_next(value)
  • on_error (error) y
  • on_completed()

Suscribirse a un tema

Puede crear múltiples suscripciones sobre el tema como se muestra a continuación:

subject_test.subscribe(
   lambda x: print("The value is {0}".format(x))
)
subject_test.subscribe(
   lambda x: print("The value is {0}".format(x))
)

Pasando datos al sujeto

Puede pasar datos al sujeto creado utilizando el método on_next (valor) como se muestra a continuación:

subject_test.on_next("A")
subject_test.on_next("B")

Los datos se pasarán a toda la suscripción, agregados sobre el tema.

Aquí hay un ejemplo práctico del tema.

Ejemplo

from rx.subject import Subject
subject_test = Subject()
subject_test.subscribe(
   lambda x: print("The value is {0}".format(x))
)
subject_test.subscribe(
   lambda x: print("The value is {0}".format(x))
)
subject_test.on_next("A")
subject_test.on_next("B")

El objeto subject_test se crea llamando a un Subject (). El objeto subject_test hace referencia a los métodos on_next (valor), on_error (error) y on_completed (). La salida del ejemplo anterior se muestra a continuación:

Salida

E:\pyrx>python testrx.py
The value is A
The value is A
The value is B
The value is B

Podemos usar el método on_completed () para detener la ejecución del sujeto como se muestra a continuación.

Ejemplo

from rx.subject import Subject
subject_test = Subject()
subject_test.subscribe(
   lambda x: print("The value is {0}".format(x))
)
subject_test.subscribe(
   lambda x: print("The value is {0}".format(x))
)
subject_test.on_next("A")
subject_test.on_completed()
subject_test.on_next("B")

Una vez que llamamos a complete, no se invoca el siguiente método llamado más tarde.

Salida

E:\pyrx>python testrx.py
The value is A
The value is A

Veamos ahora cómo llamar al método on_error (error).

Ejemplo

from rx.subject import Subject
subject_test = Subject()
subject_test.subscribe(
   on_error = lambda e: print("Error : {0}".format(e))
)
subject_test.subscribe(
   on_error = lambda e: print("Error : {0}".format(e))
)
subject_test.on_error(Exception('There is an Error!'))

Salida

E:\pyrx>python testrx.py
Error: There is an Error!
Error: There is an Error!

Comportamiento Sujeto

BehaviorSubject le dará el último valor cuando se le llame. Puede crear un sujeto de comportamiento como se muestra a continuación:

from rx.subject import BehaviorSubject
behavior_subject = BehaviorSubject("Testing Behaviour Subject"); // initialized the behaviour subject with value:Testing Behaviour Subject

Aquí, hay un ejemplo práctico para usar Behavior Subject

Ejemplo

from rx.subject import BehaviorSubject
behavior_subject = BehaviorSubject("Testing Behaviour Subject");
behavior_subject.subscribe(
   lambda x: print("Observer A : {0}".format(x))
)
behavior_subject.on_next("Hello")
behavior_subject.subscribe(
   lambda x: print("Observer B : {0}".format(x))
)
behavior_subject.on_next("Last call to Behaviour Subject")

Salida

E:\pyrx>python testrx.py
Observer A : Testing Behaviour Subject
Observer A : Hello
Observer B : Hello
Observer A : Last call to Behaviour Subject
Observer B : Last call to Behaviour Subject

Asunto de repetición

Un tema de reproducción es similar al sujeto de comportamiento, en el que puede almacenar los valores en búfer y reproducir los mismos para los nuevos suscriptores. A continuación, se muestra un ejemplo práctico de tema de repetición.

Ejemplo

from rx.subject import ReplaySubject
replay_subject = ReplaySubject(2)
replay_subject.subscribe(lambda x: print("Testing Replay Subject A: {0}".format(x)))
replay_subject.on_next(1)
replay_subject.on_next(2)
replay_subject.on_next(3)
replay_subject.subscribe(lambda x: print("Testing Replay Subject B: {0}".format(x)));
replay_subject.on_next(5)

El valor de búfer utilizado es 2 en el sujeto de la repetición. Por lo tanto, los dos últimos valores se almacenarán en búfer y se utilizarán para los nuevos suscriptores llamados.

Salida

E:\pyrx>python testrx.py
Testing Replay Subject A: 1
Testing Replay Subject A: 2
Testing Replay Subject A: 3
Testing Replay Subject B: 2
Testing Replay Subject B: 3
Testing Replay Subject A: 5
Testing Replay Subject B: 5

AsyncSubject

En el caso de AsyncSubject, el último valor llamado se pasa al suscriptor, y se hará solo después de que se llame al método complete ().

Ejemplo

from rx.subject import AsyncSubject
async_subject = AsyncSubject()
async_subject.subscribe(lambda x: print("Testing Async Subject A: {0}".format(x)))
async_subject.on_next(1)
async_subject.on_next(2)
async_subject.on_completed()
async_subject.subscribe(lambda x: print("Testing Async Subject B: {0}".format(x)))
Here, before complete is called, the last value passed to the subject is 2, and the same is given to the subscribers.

Salida

E:\pyrx>python testrx.py
Testing Async Subject A: 2
Testing Async Subject B: 2

Una característica importante de RxPy es la concurrencia, es decir, permitir que la tarea se ejecute en paralelo. Para que eso suceda, tenemos dos operadores subscribe_on () y observe_on () que trabajarán con un planificador, que decidirá la ejecución de la tarea suscrita.

Aquí hay un ejemplo de trabajo, que muestra la necesidad de subscibe_on (), observe_on () y planificador.

Ejemplo

import random
import time
import rx
from rx import operators as ops
def adding_delay(value):
   time.sleep(random.randint(5, 20) * 0.1)
   return value
# Task 1
rx.of(1,2,3,4,5).pipe(
   ops.map(lambda a: adding_delay(a))
).subscribe(
   lambda s: print("From Task 1: {0}".format(s)),
   lambda e: print(e),
   lambda: print("Task 1 complete")
)
# Task 2
rx.range(1, 5).pipe(
   ops.map(lambda a: adding_delay(a))
).subscribe(
   lambda s: print("From Task 2: {0}".format(s)),
   lambda e: print(e),
   lambda: print("Task 2 complete")
) 
input("Press any key to exit\n")

En el ejemplo anterior, tengo 2 tareas: Tarea 1 y Tarea 2. La ejecución de la tarea es en secuencia. La segunda tarea comienza solo cuando se realiza la primera.

Salida

E:\pyrx>python testrx.py
From Task 1: 1
From Task 1: 2
From Task 1: 3
From Task 1: 4
From Task 1: 5
Task 1 complete
From Task 2: 1
From Task 2: 2
From Task 2: 3
From Task 2: 4
Task 2 complete

RxPy admite muchos Scheduler, y aquí vamos a hacer uso de ThreadPoolScheduler. ThreadPoolScheduler principalmente intentará administrar con los subprocesos de CPU disponibles.

En el ejemplo que hemos visto antes, vamos a hacer uso de un módulo de multiprocesamiento que nos dará el cpu_count. El recuento se le dará al ThreadPoolScheduler que logrará que la tarea funcione en paralelo en función de los subprocesos disponibles.

Aquí hay un ejemplo práctico:

import multiprocessing
import random
import time
from threading import current_thread
import rx
from rx.scheduler import ThreadPoolScheduler
from rx import operators as ops
# calculate cpu count, using which will create a ThreadPoolScheduler
thread_count = multiprocessing.cpu_count()
thread_pool_scheduler = ThreadPoolScheduler(thread_count)
print("Cpu count is : {0}".format(thread_count))
def adding_delay(value):
   time.sleep(random.randint(5, 20) * 0.1)
   return value
# Task 1
rx.of(1,2,3,4,5).pipe(
   ops.map(lambda a: adding_delay(a)),
   ops.subscribe_on(thread_pool_scheduler)
).subscribe(
   lambda s: print("From Task 1: {0}".format(s)),
   lambda e: print(e),
   lambda: print("Task 1 complete")
)
# Task 2
rx.range(1, 5).pipe(
   ops.map(lambda a: adding_delay(a)),
   ops.subscribe_on(thread_pool_scheduler)
).subscribe(
   lambda s: print("From Task 2: {0}".format(s)),
   lambda e: print(e),
   lambda: print("Task 2 complete")
)
input("Press any key to exit\n")

En el ejemplo anterior, tengo 2 tareas y cpu_count es 4. Dado que la tarea es 2 y los hilos disponibles con nosotros son 4, ambas tareas pueden comenzar en paralelo.

Salida

E:\pyrx>python testrx.py
Cpu count is : 4
Press any key to exit
From Task 1: 1
From Task 2: 1
From Task 1: 2
From Task 2: 2
From Task 2: 3
From Task 1: 3
From Task 2: 4
Task 2 complete
From Task 1: 4
From Task 1: 5
Task 1 complete

Si ve el resultado, ambas tareas se iniciaron en paralelo.

Ahora, considere un escenario, donde la tarea es mayor que el recuento de CPU, es decir, el recuento de CPU es 4 y las tareas son 5. En este caso, tendríamos que verificar si algún subproceso se ha liberado después de completar la tarea, para que pueda ser asignado a la nueva tarea disponible en la cola.

Para este propósito, podemos usar el operador observe_on () que observará el planificador si hay hilos libres. Aquí, hay un ejemplo de trabajo usando observe_on ()

Ejemplo

import multiprocessing
import random
import time
from threading import current_thread
import rx
from rx.scheduler import ThreadPoolScheduler
from rx import operators as ops
# calculate cpu count, using which will create a ThreadPoolScheduler
thread_count = multiprocessing.cpu_count()
thread_pool_scheduler = ThreadPoolScheduler(thread_count)
print("Cpu count is : {0}".format(thread_count))
def adding_delay(value):
   time.sleep(random.randint(5, 20) * 0.1)
   return value
# Task 1
rx.of(1,2,3,4,5).pipe(
   ops.map(lambda a: adding_delay(a)),
   ops.subscribe_on(thread_pool_scheduler)
).subscribe(
   lambda s: print("From Task 1: {0}".format(s)),
   lambda e: print(e),
   lambda: print("Task 1 complete")
)
# Task 2
rx.range(1, 5).pipe(
   ops.map(lambda a: adding_delay(a)),
   ops.subscribe_on(thread_pool_scheduler)
).subscribe(
   lambda s: print("From Task 2: {0}".format(s)),
   lambda e: print(e),
   lambda: print("Task 2 complete")
)
#Task 3
rx.range(1, 5).pipe(
   ops.map(lambda a: adding_delay(a)),
   ops.subscribe_on(thread_pool_scheduler)
).subscribe(
   lambda s: print("From Task 3: {0}".format(s)),
   lambda e: print(e),
   lambda: print("Task 3 complete")
)
#Task 4
rx.range(1, 5).pipe(
   ops.map(lambda a: adding_delay(a)),
   ops.subscribe_on(thread_pool_scheduler)
).subscribe(
   lambda s: print("From Task 4: {0}".format(s)),
   lambda e: print(e),
   lambda: print("Task 4 complete")
)
#Task 5
rx.range(1, 5).pipe(
   ops.map(lambda a: adding_delay(a)),
   ops.observe_on(thread_pool_scheduler)
).subscribe(
   lambda s: print("From Task 5: {0}".format(s)),
   lambda e: print(e),
   lambda: print("Task 5 complete")
)
input("Press any key to exit\n")

Salida

E:\pyrx>python testrx.py
Cpu count is : 4
From Task 4: 1
From Task 4: 2
From Task 1: 1
From Task 2: 1
From Task 3: 1
From Task 1: 2
From Task 3: 2
From Task 4: 3
From Task 3: 3
From Task 2: 2
From Task 1: 3
From Task 4: 4
Task 4 complete
From Task 5: 1
From Task 5: 2
From Task 5: 3
From Task 3: 4
Task 3 complete
From Task 2: 3
Press any key to exit
From Task 5: 4
Task 5 complete
From Task 1: 4
From Task 2: 4
Task 2 complete
From Task 1: 5
Task 1 complete

Si ve la salida, en el momento en que se completa la tarea 4, el hilo pasa a la siguiente tarea, es decir, la tarea 5 y la misma comienza a ejecutarse.

En este capítulo, discutiremos los siguientes temas en detalle:

  • Ejemplo básico que muestra el funcionamiento de operadores observables y suscripciones al observador.
  • Diferencia entre observable y sujeto.
  • Comprender los observables fríos y calientes.

A continuación se muestra un ejemplo básico que muestra el funcionamiento de los operadores observables y la suscripción del observador.

Ejemplo

test.py

import requests
import rx
import json
from rx import operators as ops
def filternames(x):
   if (x["name"].startswith("C")):
      return x["name"]
   else :
      return ""
content = requests.get('https://jsonplaceholder.typicode.com/users')
y = json.loads(content.text)
source = rx.from_(y)
case1 = source.pipe(
   ops.filter(lambda c: filternames(c)),
   ops.map(lambda a:a["name"])
)
case1.subscribe(
   on_next = lambda i: print("Got - {0}".format(i)), 8. RxPy — Examples
   on_error = lambda e: print("Error : {0}".format(e)),
   on_completed = lambda: print("Job Done!"),
)

Aquí hay un ejemplo muy simple, en el que obtengo datos de usuario de esta URL:

https://jsonplaceholder.typicode.com/users.

Filtrar los datos, para dar los nombres que comienzan con "C", y luego usar el mapa para devolver solo los nombres. Aquí está la salida para lo mismo:

E:\pyrx\examples>python test.py
Got - Clementine Bauch
Got - Chelsey Dietrich
Got - Clementina DuBuque
Job Done!

Diferencia entre observable y sujeto

En este ejemplo, veremos la diferencia entre un observable y un sujeto.

from rx import of, operators as op
import random
test1 = of(1,2,3,4,5)
sub1 = test1.pipe(
   op.map(lambda a : a+random.random())
)
print("From first subscriber")
subscriber1 = sub1.subscribe(lambda i: print("From sub1 {0}".format(i)))
print("From second subscriber")
subscriber2 = sub1.subscribe(lambda i: print("From sub2 {0}".format(i)))

Salida

E:\pyrx>python testrx.py
From first subscriber
From sub1 1.610450821095726
From sub1 2.9567564032037335
From sub1 3.933217537811936
From sub1 4.82444905626622
From sub1 5.929414892567188
From second subscriber
From sub2 1.8573813517529874
From sub2 2.902433239469483
From sub2 3.2289868093016825
From sub2 4.050413890694411
From sub2 5.226515068012821

En el ejemplo anterior, cada vez que se suscribe al observable, le dará nuevos valores.

Ejemplo de tema

from rx import of, operators as op
import random
from rx.subject import Subject
subject_test = Subject()
subject_test.subscribe(
   lambda x: print("From sub1 {0}".format(x))
)
subject_test.subscribe(
   lambda x: print("From sub2 {0}".format(x))
)
test1 = of(1,2,3,4,5)
sub1 = test1.pipe(
   op.map(lambda a : a+random.random())
)
subscriber = sub1.subscribe(subject_test)

Salida

E:\pyrx>python testrx.py
From sub1 1.1789422863284509
From sub2 1.1789422863284509
From sub1 2.5525627903260153
From sub2 2.5525627903260153
From sub1 3.4191549324778325
From sub2 3.4191549324778325
From sub1 4.644042420199624
From sub2 4.644042420199624
From sub1 5.079896897489065
From sub2 5.079896897489065

Si ve que los valores son compartidos, entre ambos suscriptores que usan el asunto.

Comprensión de los observables fríos y calientes

Un observable se clasifica como

  • Observables fríos
  • Observables calientes

La diferencia en los observables se notará cuando se suscriban varios suscriptores.

Observables fríos

Los observables fríos, son observables que se ejecutan y generan datos cada vez que se suscriben. Cuando se suscribe, se ejecuta el observable y se dan los valores nuevos.

El siguiente ejemplo da la comprensión del frío observable.

from rx import of, operators as op
import random
test1 = of(1,2,3,4,5)
sub1 = test1.pipe(
   op.map(lambda a : a+random.random())
)
print("From first subscriber")
subscriber1 = sub1.subscribe(lambda i: print("From sub1 {0}".format(i)))
print("From second subscriber")
subscriber2 = sub1.subscribe(lambda i: print("From sub2 {0}".format(i)))

Salida

E:\pyrx>python testrx.py
From first subscriber
From sub1 1.610450821095726
From sub1 2.9567564032037335
From sub1 3.933217537811936
From sub1 4.82444905626622
From sub1 5.929414892567188
From second subscriber
From sub2 1.8573813517529874
From sub2 2.902433239469483
From sub2 3.2289868093016825
From sub2 4.050413890694411
From sub2 5.226515068012821

En el ejemplo anterior, cada vez que se suscribe al observable, ejecutará los valores observables y emitidos. Los valores también pueden diferir de un suscriptor a otro, como se muestra en el ejemplo anterior.

Observables calientes

En el caso de los observables en caliente, emitirán los valores cuando estén listos y no siempre esperarán una suscripción. Cuando se emiten los valores, todos los suscriptores obtendrán el mismo valor.

Puede utilizar el observable en caliente cuando desee que los valores se emitan cuando el observable esté listo, o si desea compartir los mismos valores con todos sus suscriptores.

Un ejemplo de observable en caliente es el sujeto y los operadores conectables.

from rx import of, operators as op
import random
from rx.subject import Subject
subject_test = Subject()
subject_test.subscribe(
   lambda x: print("From sub1 {0}".format(x))
)
subject_test.subscribe(
   lambda x: print("From sub2 {0}".format(x))
)
test1 = of(1,2,3,4,5)
sub1 = test1.pipe(
   op.map(lambda a : a+random.random())
)
subscriber = sub1.subscribe(subject_test)

Salida

E:\pyrx>python testrx.py
From sub1 1.1789422863284509
From sub2 1.1789422863284509
From sub1 2.5525627903260153
From sub2 2.5525627903260153
From sub1 3.4191549324778325
From sub2 3.4191549324778325
From sub1 4.644042420199624
From sub2 4.644042420199624
From sub1 5.079896897489065
From sub2 5.079896897489065

Si ve, el mismo valor se comparte entre los suscriptores. Puede lograr lo mismo utilizando el operador observable conectable publish ().