RxPY - Operadores conectables

publicar

Este método convertirá lo observable en un observable conectable.

Sintaxis

publish(mapper=None)

Parámetros

mapeador: opcional. Una función que se utiliza para multidifundir valores de origen varias veces, sin tener que realizar varias suscripciones.

Ejemplo

from rx import create, range, operators as op
import random
def test_observable(observer, scheduler):
   observer.on_next(random.random())
   observer.on_completed()
source = create(test_observable).pipe(op.publish())
test1 = source.subscribe(on_next = lambda i: print("From subscriber 1 - {0}".format(i)))
test2 = source.subscribe(on_next = lambda i: print("From subscriber 2 –
{0}".format(i)))
source.connect()

Salida

E:\pyrx>python testrx.py
From subscriber 1 - 0.14751607273318490
From subscriber 2 - 0.1475160727331849

ref_count

Este operador hará que lo observable sea un observable normal.

Sintaxis

ref_count()

Ejemplo

from rx import create, operators as op
import random
def test_observable(observer, scheduler):
   observer.on_next(random.random())
source = create(test_observable).pipe(op.publish(),op.ref_count())
test1 = source.subscribe(on_next = lambda i: print("From subscriber 1 - {0}".format(i)))
test2 = source.subscribe(on_next = lambda i: print("From subscriber 2 - {0}".format(i)))

Salida

E:\pyrx>python testrx.py
From subscriber 1 - 0.8230640432381131

repetición

Este método funciona de manera similar a replaySubject. Este método devolverá los mismos valores, incluso si el observable ya se ha emitido, y algunos de los suscriptores se retrasan en la suscripción.

Sintaxis

replay()

Ejemplo

from rx import create, range, operators as op
import random
from threading import Timer
def test_observable(observer, scheduler):
   observer.on_next(random.random())
   observer.on_completed()
source = create(test_observable).pipe(op.replay())
test1 = source.subscribe(on_next = lambda i: print("From subscriber 1 - {0}".format(i)))
test2 = source.subscribe(on_next = lambda i: print("From subscriber 2 - {0}".format(i)))
source.connect()
print("subscriber called after delay ")
def last_subscriber():
   test3 = source.subscribe(on_next = lambda i: print("From subscriber 3 - {0}".format(i)))
t = Timer(5.0, last_subscriber)
t.start()

Salida

E:\pyrx>python testrx.py
From subscriber 1 - 0.8340998157725388
From subscriber 2 - 0.8340998157725388
subscriber called after delay
From subscriber 3 - 0.8340998157725388