just - Observable vs Flowable rxJava2
rxjava2 observable create (3)
El hecho de que su
Flowable
se
Flowable
después de emitir 128 valores sin manejo de contrapresión no significa que siempre se bloqueará después de exactamente 128 valores: a veces se bloqueará después de 10, y a veces no se bloqueará en absoluto.
Creo que esto es lo que sucedió cuando probaste el ejemplo con
Observable
: no hubo contrapresión, por lo que tu código funcionó normalmente, la próxima vez que no.
La diferencia en RxJava 2 es que ya no existe un concepto de contrapresión en
Observable
s, y no hay forma de manejarlo.
Si está diseñando una secuencia reactiva que probablemente requiera un manejo explícito de la contrapresión, entonces
Flowable
es su mejor opción.
He estado buscando el nuevo rx java 2 y ya no estoy seguro de entender la idea de la
backpressure
...
Soy consciente de que tenemos
Observable
que no tiene soporte de
backpressure
y
Flowable
que lo tiene.
Entonces, según el ejemplo, digamos que tengo
flowable
con
interval
:
Flowable.interval(1, TimeUnit.MILLISECONDS, Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<Long>() {
@Override
public void accept(Long aLong) throws Exception {
// do smth
}
});
Esto se bloqueará después de alrededor de 128 valores, y eso es bastante obvio que estoy consumiendo más lento que obtener elementos.
Pero luego tenemos lo mismo con
Observable
Observable.interval(1, TimeUnit.MILLISECONDS, Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<Long>() {
@Override
public void accept(Long aLong) throws Exception {
// do smth
}
});
Esto no se bloqueará en absoluto, incluso cuando me demore un poco en consumirlo, aún funciona.
Para hacer que
Flowable
funcione, digamos que puse el operador
onBackpressureDrop
, el bloqueo desapareció pero tampoco se emitieron todos los valores.
Entonces, la pregunta básica que no puedo encontrar actualmente en mi cabeza es ¿por qué debería preocuparme la
backpressure
cuando puedo usar
Observable
simple y recibir todos los valores sin administrar el
buffer
?
O tal vez desde el otro lado, ¿qué ventajas me brinda la
backpressure
a favor de administrar y manejar el consumo?
La contrapresión es cuando su observable (editor) está creando más eventos de los que su suscriptor puede manejar.
Por lo tanto, puede hacer que los suscriptores pierdan eventos, o puede obtener una gran cola de eventos que eventualmente acaba sin memoria.
Flowable
tiene en cuenta la contrapresión.
Observable
no.
Eso es.
me recuerda a un embudo que cuando tiene demasiado líquido se desborda. Flowable puede ayudar a no hacer que eso suceda:
con tremenda contrapresión:
pero con el uso de fluidos, hay mucha menos contrapresión:
Rxjava2 tiene algunas estrategias de contrapresión que puede usar según su caso de uso. por estrategia quiero decir que Rxjava2 proporciona una forma de manejar los objetos que no pueden procesarse debido al desbordamiento (contrapresión).
Aquí están las estrategias. No los revisaré todos, pero, por ejemplo, si no desea preocuparse por los elementos que se desbordan, puede usar una estrategia de caída como esta:
observable.toFlowable (BackpressureStrategy.DROP)
Hasta donde sé, debería haber un límite de 128 elementos en la cola, después de eso puede haber un desbordamiento (contrapresión). Incluso si no es 128, está cerca de ese número. Espero que esto ayude a alguien.
si necesita cambiar el tamaño del búfer de 128, parece que se puede hacer así (pero observe las restricciones de memoria:
myObservable.toFlowable(BackpressureStrategy.MISSING).buffer(256); //but using MISSING might be slower.
en el desarrollo de software, generalmente la estrategia de contrapresión significa que usted le dice al emisor que disminuya la velocidad un poco ya que el consumidor no puede manejar la velocidad de sus eventos de emisión.
Lo que la contrapresión se manifiesta en la práctica son los búferes limitados,
Flowable.observeOn
tiene un búfer de 128 elementos que se drena tan rápido como el flujo descendente puede soportarlo.
Puede aumentar este tamaño de búfer individualmente para manejar la fuente de ráfaga y todas las prácticas de gestión de contrapresión aún se aplican desde 1.x.
Observable.observeOn
tiene un búfer ilimitado que sigue recopilando los elementos y su aplicación puede quedarse sin memoria.
Puede usar
Observable
por ejemplo:
- manejo de eventos GUI
- trabajando con secuencias cortas (menos de 1000 elementos en total)
Puede usar
Flowable
por ejemplo:
- fuentes frías y no cronometradas
- generador como fuentes
- accesores de red y bases de datos