RxPY - Creando observables
crear
Este método se utiliza para crear un observable. Tendrá el método del observador, es decir
on_next() - Esta función se llama cuando el Observable emite un elemento.
on_completed() - Se llama a esta función cuando el Observable está completo.
on_error() - Se llama a esta función cuando ocurre un error en el Observable.
Aquí hay un ejemplo práctico:
testrx.py
from rx import create
def test_observable(observer, scheduler):
observer.on_next("Hello")
observer.on_error("Error occured")
observer.on_completed()
source = create(test_observable)
source.subscribe(
on_next = lambda i: print("Got - {0}".format(i)),
on_error = lambda e: print("Error : {0}".format(e)),
on_completed = lambda: print("Job Done!"),
)
Aquí está el output de lo observable creado -
E:\pyrx>python testrx.py
Got - Hello
Job Done!
vacío
Este observable no generará nada y emitirá directamente el estado completo.
Sintaxis
empty()
Valor devuelto
Devolverá un observable sin elementos.
Ejemplo
from rx import empty
test = empty()
test.subscribe(
lambda x: print("The value is {0}".format(x)),
on_error = lambda e: print("Error : {0}".format(e)),
on_completed = lambda: print("Job Done!")
)
Salida
E:\pyrx>python testrx.py
Job Done!
Nunca
Este método crea un observable que nunca alcanzará el estado completo.
Sintaxis
never()
Valor devuelto
Devolverá un observable que nunca se completará.
Ejemplo
from rx import never
test = never()
test.subscribe(
lambda x: print("The value is {0}".format(x)),
on_error = lambda e: print("Error : {0}".format(e)),
on_completed = lambda: print("Job Done!")
)
Salida
It does not show any output.
lanzar
Este método creará un observable que arrojará un error.
Sintaxis
throw(exception)
Parámetros
excepción: un objeto que tiene detalles de error.
Valor devuelto
Se devuelve un observable con detalles de error.
Ejemplo
from rx import throw
test = throw(Exception('There is an Error!'))
test.subscribe(
lambda x: print("The value is {0}".format(x)),
on_error = lambda e: print("Error : {0}".format(e)),
on_completed = lambda: print("Job Done!")
)
Salida
E:\pyrx>python testrx.py
Error: There is an Error!
desde_
Este método convertirá la matriz u objeto dado en un observable.
Sintaxis
from_(iterator)
Parámetros
iterador: este es un objeto o matriz.
Valor devuelto
Esto devolverá un observable para el iterador dado.
Ejemplo
from rx import from_
test = from_([1,2,3,4,5,6,7,8,9,10])
test.subscribe(
lambda x: print("The value is {0}".format(x)),
on_error = lambda e: print("Error : {0}".format(e)),
on_completed = lambda: print("Job Done!")
)
Salida
E:\pyrx>python testrx.py
The value is 1
The value is 2
The value is 3
The value is 4
The value is 5
The value is 6
The value is 7
The value is 8
The value is 9
The value is 10
Job Done!
intervalo
Este método dará una serie de valores producidos después de un tiempo de espera.
Sintaxis
interval(period)
Parámetros
período: para iniciar la secuencia entera.
Valor devuelto
Devuelve un observable con todos los valores en orden secuencial.
Ejemplo
import rx
from rx import operators as ops
rx.interval(1).pipe(
ops.map(lambda i: i * i)
).subscribe(lambda x: print("The value is {0}".format(x)))
input("Press any key to exit\n")
Salida
E:\pyrx>python testrx.py
Press any key to exit
The value is 0
The value is 1
The value is 4
The value is 9
The value is 16
The value is 25
The value is 36
The value is 49
The value is 64
The value is 81
The value is 100
The value is 121
The value is 144
The value is 169
The value is 196
The value is 225
The value is 256
The value is 289
The value is 324
The value is 361
The value is 400
sólo
Este método convertirá el valor dado en un observable.
Sintaxis
just(value)
Parámetros
valor: para convertirlo en un observable.
Valor devuelto
Devolverá un observable con los valores dados.
Ejemplo
from rx import just
test = just([15, 25,50, 55])
test.subscribe(
lambda x: print("The value is {0}".format(x)),
on_error = lambda e: print("Error : {0}".format(e)),
on_completed = lambda: print("Job Done!")
)
Salida
E:\pyrx>python testrx.py
The value is [15, 25, 50, 55]
Job Done!
rango
Este método dará un rango de números enteros según la entrada dada.
Sintaxis
range(start, stop=None)
Parámetros
inicio: primer valor a partir del cual se iniciará el rango.
stop: opcional, el último valor para que el rango se detenga.
Valor devuelto
Esto devolverá un observable con un valor entero basado en la entrada dada.
Ejemplo
from rx import range
test = range(0,10)
test.subscribe(
lambda x: print("The value is {0}".format(x)),
on_error = lambda e: print("Error : {0}".format(e)),
on_completed = lambda: print("Job Done!")
)
Salida
E:\pyrx>python testrx.py
The value is 0
The value is 1
The value is 2
The value is 3
The value is 4
The value is 5
The value is 6
The value is 7
The value is 8
The value is 9
Job Done!
repetir_valor
Este método creará un observable que repetirá el valor dado según se dé el recuento.
Sintaxis
repeat_value(value=None, repeat_count=None)
Parámetros
valor: opcional. El valor que se va a repetir.
repeat_count: opcional. El número de veces que se repetirá el valor dado.
Valor devuelto
Devolverá un observable que repetirá el valor dado según se dé el recuento.
Ejemplo
from rx import repeat_value
test = repeat_value(44,10)
test.subscribe(
lambda x: print("The value is {0}".format(x)),
on_error = lambda e: print("Error : {0}".format(e)),
on_completed = lambda: print("Job Done!")
)
Salida
E:\pyrx>python testrx.py
The value is 44
The value is 44
The value is 44
The value is 44
The value is 44
The value is 44
The value is 44
The value is 44
The value is 44
The value is 44
Job Done!
comienzo
Este método toma una función como entrada y devuelve un observable que devolverá el valor de la función de entrada.
Sintaxis
start(func)
Parámetros
func: una función que se llamará.
Valor devuelto
Devuelve un observable que tendrá un valor de retorno de la función de entrada.
Ejemplo
from rx import start
test = start(lambda : "Hello World")
test.subscribe(
lambda x: print("The value is {0}".format(x)),
on_error = lambda e: print("Error : {0}".format(e)),
on_completed = lambda: print("Job Done!")
)
Salida
E:\pyrx>python testrx.py
The value is Hello World
Job Done!
Temporizador
Este método emitirá los valores en secuencia después de que finalice el tiempo de espera.
Sintaxis
timer(duetime)
Parámetros
duetime: tiempo después del cual debe emitir el primer valor.
Valor devuelto
Devolverá un observable con valores emitidos después del tiempo de vencimiento.
Ejemplo
import rx
from rx import operators as ops
rx.timer(5.0, 10).pipe(
ops.map(lambda i: i * i)
).subscribe(lambda x: print("The value is {0}".format(x)))
input("Press any key to exit\n")
Salida
E:\pyrx>python testrx.py
Press any key to exit
The value is 0
The value is 1
The value is 4
The value is 9
The value is 16
The value is 25
The value is 36
The value is 49
The value is 64