single rxjava2 rxjava just examples example create java android rx-java

just - Observable vs Flowable rxJava2



rxjava2 observable create (3)

El hecho de que su Flowable se Flowable después de emitir 128 valores sin manejo de contrapresión no significa que siempre se bloqueará después de exactamente 128 valores: a veces se bloqueará después de 10, y a veces no se bloqueará en absoluto. Creo que esto es lo que sucedió cuando probaste el ejemplo con Observable : no hubo contrapresión, por lo que tu código funcionó normalmente, la próxima vez que no. La diferencia en RxJava 2 es que ya no existe un concepto de contrapresión en Observable s, y no hay forma de manejarlo. Si está diseñando una secuencia reactiva que probablemente requiera un manejo explícito de la contrapresión, entonces Flowable es su mejor opción.

He estado buscando el nuevo rx java 2 y ya no estoy seguro de entender la idea de la backpressure ...

Soy consciente de que tenemos Observable que no tiene soporte de backpressure y Flowable que lo tiene.

Entonces, según el ejemplo, digamos que tengo flowable con interval :

Flowable.interval(1, TimeUnit.MILLISECONDS, Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Consumer<Long>() { @Override public void accept(Long aLong) throws Exception { // do smth } });

Esto se bloqueará después de alrededor de 128 valores, y eso es bastante obvio que estoy consumiendo más lento que obtener elementos.

Pero luego tenemos lo mismo con Observable

Observable.interval(1, TimeUnit.MILLISECONDS, Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Consumer<Long>() { @Override public void accept(Long aLong) throws Exception { // do smth } });

Esto no se bloqueará en absoluto, incluso cuando me demore un poco en consumirlo, aún funciona. Para hacer que Flowable funcione, digamos que puse el operador onBackpressureDrop , el bloqueo desapareció pero tampoco se emitieron todos los valores.

Entonces, la pregunta básica que no puedo encontrar actualmente en mi cabeza es ¿por qué debería preocuparme la backpressure cuando puedo usar Observable simple y recibir todos los valores sin administrar el buffer ? O tal vez desde el otro lado, ¿qué ventajas me brinda la backpressure a favor de administrar y manejar el consumo?


La contrapresión es cuando su observable (editor) está creando más eventos de los que su suscriptor puede manejar. Por lo tanto, puede hacer que los suscriptores pierdan eventos, o puede obtener una gran cola de eventos que eventualmente acaba sin memoria. Flowable tiene en cuenta la contrapresión. Observable no. Eso es.

me recuerda a un embudo que cuando tiene demasiado líquido se desborda. Flowable puede ayudar a no hacer que eso suceda:

con tremenda contrapresión:

pero con el uso de fluidos, hay mucha menos contrapresión:

Rxjava2 tiene algunas estrategias de contrapresión que puede usar según su caso de uso. por estrategia quiero decir que Rxjava2 proporciona una forma de manejar los objetos que no pueden procesarse debido al desbordamiento (contrapresión).

Aquí están las estrategias. No los revisaré todos, pero, por ejemplo, si no desea preocuparse por los elementos que se desbordan, puede usar una estrategia de caída como esta:

observable.toFlowable (BackpressureStrategy.DROP)

Hasta donde sé, debería haber un límite de 128 elementos en la cola, después de eso puede haber un desbordamiento (contrapresión). Incluso si no es 128, está cerca de ese número. Espero que esto ayude a alguien.

si necesita cambiar el tamaño del búfer de 128, parece que se puede hacer así (pero observe las restricciones de memoria:

myObservable.toFlowable(BackpressureStrategy.MISSING).buffer(256); //but using MISSING might be slower.

en el desarrollo de software, generalmente la estrategia de contrapresión significa que usted le dice al emisor que disminuya la velocidad un poco ya que el consumidor no puede manejar la velocidad de sus eventos de emisión.


Lo que la contrapresión se manifiesta en la práctica son los búferes limitados, Flowable.observeOn tiene un búfer de 128 elementos que se drena tan rápido como el flujo descendente puede soportarlo. Puede aumentar este tamaño de búfer individualmente para manejar la fuente de ráfaga y todas las prácticas de gestión de contrapresión aún se aplican desde 1.x. Observable.observeOn tiene un búfer ilimitado que sigue recopilando los elementos y su aplicación puede quedarse sin memoria.

Puede usar Observable por ejemplo:

  • manejo de eventos GUI
  • trabajando con secuencias cortas (menos de 1000 elementos en total)

Puede usar Flowable por ejemplo:

  • fuentes frías y no cronometradas
  • generador como fuentes
  • accesores de red y bases de datos