RxPY - Operadores de transformación

buffer

Este operador recopilará todos los valores de la fuente observable y los emitirá a intervalos regulares una vez que se cumpla la condición de límite dada.

Sintaxis

buffer(boundaries)

Parámetros

fronteras: La entrada es observable que decidirá cuándo detenerse para que se emitan los valores recolectados.

Valor devuelto

El valor de retorno es observable, que tendrá todos los valores recolectados de la fuente en base a la observación y que la duración del tiempo se decide por la entrada observable tomada.

Ejemplo

from rx import of, interval, operators as op
from datetime import date
test = of(1, 2,3,4,5,6,7,8,9,10)
sub1 = test.pipe(
   op.buffer(interval(1.0))
)
sub1.subscribe(lambda x: print("The element is {0}".format(x)))

Salida

E:\pyrx>python test1.py
The elements are [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]

suelo_por

Este operador agrupará los valores provenientes de la fuente observable en función de la función key_mapper dada.

Sintaxis

group_by(key_mapper)

Parámetros

key_mapper: Esta función se encargará de extraer claves de la fuente observable.

Valor devuelto

Devuelve un observable con valores agrupados según la función key_mapper.

Ejemplo

from rx import from_, interval, operators as op
test = from_(["A", "B", "C", "D"])
sub1 = test.pipe(
   op.group_by(lambda v: v[0])
)
sub1.subscribe(lambda x: print("The element is {0}".format(x)))

Salida

E:\pyrx>python testrx.py
The element is <rx.core.observable.groupedobservable.GroupedObservable object
at
 0x000000C99A2E6550>
The element is <rx.core.observable.groupedobservable.GroupedObservable object at
 0x000000C99A2E65C0>
The element is <rx.core.observable.groupedobservable.GroupedObservable object at
 0x000000C99A2E6588>
The element is <rx.core.observable.groupedobservable.GroupedObservable object at
 0x000000C99A2E6550>

mapa

Este operador cambiará cada valor de la fuente observable a un nuevo valor basado en la salida del mapper_func dado.

Sintaxis

map(mapper_func:None)

Parámetros

mapper_func: (opcional) Cambiará los valores de la fuente observable en función de la salida proveniente de esta función.

Ejemplo

from rx import of, interval, operators as op
test = of(1, 2,3,4,5,6,7,8,9,10)
sub1 = test.pipe(
   op.map(lambda x :x*x)
)
sub1.subscribe(lambda x: print("The element is {0}".format(x)))

Salida

E:\pyrx>python testrx.py
The element is 1
The element is 4
The element is 9
The element is 16
The element is 25
The element is 36
The element is 49
The element is 64
The element is 81
The element is 100

escanear

Este operador aplicará una función de acumulador a los valores provenientes de la fuente observable y devolverá un observable con nuevos valores.

Sintaxis

scan(accumulator_func, seed=NotSet)

Parámetros

acumulador_func: Esta función se aplica a todos los valores de la fuente observable.

semilla: (opcional) El valor inicial que se utilizará dentro de la función acumulada.

Valor devuelto

Este operador devolverá un observable que tendrá nuevos valores basados ​​en la función de acumulador aplicada a cada valor de la fuente observable.

Ejemplo

from rx import of, interval, operators as op
test = of(1, 2,3,4,5,6,7,8,9,10)
sub1 = test.pipe(
   op.scan(lambda acc, a: acc + a, 0))
sub1.subscribe(lambda x: print("The element is {0}".format(x)))

Salida

E:\pyrx>python testrx.py
The element is 1
The element is 3
The element is 6
The element is 10
The element is 15
The element is 21
The element is 28
The element is 36
The element is 45
The element is 55