java-8 java-stream rx-java observable

Diferencia entre las secuencias de Java 8 y los observables de RxJava



java-8 java-stream (7)

¿Las secuencias de Java 8 son similares a las observables de RxJava?

Definición de flujo de Java 8:

Las clases en el nuevo paquete java.util.stream proporcionan una API Stream para admitir operaciones de estilo funcional en secuencias de elementos.


Existen algunas diferencias técnicas y conceptuales, por ejemplo, las secuencias de Java 8 son secuencias de valores síncronas de valores de un solo uso, mientras que los observables RxJava son secuencias de valores potencialmente asincrónicas re-observables, basadas en push-pull adaptativamente. RxJava está dirigido a Java 6+ y también funciona en Android.


Java 8 Stream y RxJava se ve bastante similar. Tienen operadores parecidos (filtro, mapa, flatMap ...) pero no están diseñados para el mismo uso.

Puede realizar tareas asíncronas utilizando RxJava.

Con la transmisión Java 8, atravesará elementos de su colección.

Puede hacer más o menos lo mismo en RxJava (elementos transversales de una colección) pero, dado que RxJava se enfoca en tareas concurrentes, ..., usa sincronización, bloqueo, ... Entonces, la misma tarea usando RxJava puede ser más lenta que con Java 8 stream.

RxJava se puede comparar con CompletableFuture , pero eso puede ser capaz de calcular más de un solo valor.


Java 8 Streams permite el procesamiento de colecciones realmente grandes de manera eficiente, al tiempo que aprovecha las arquitecturas multinúcleo. Por el contrario, RxJava tiene un solo subproceso de forma predeterminada (sin programadores). Por lo tanto, RxJava no aprovechará las máquinas multinúcleo a menos que codifique esa lógica usted mismo.


Java 8 Streams se basan en extracción. Se itera sobre una secuencia de Java 8 que consume cada elemento. Y podría ser una corriente interminable.

RXJava Observable está por defecto basado en push. Usted se suscribe a un Observable y se le notificará cuando llegue el siguiente elemento (al onNext ), o cuando se complete la onCompleted (al completar) o cuando se onError un error (al onError ). Porque con Observable recibe los onNext , onCompleted , onError , puede realizar algunas funciones potentes como combinar diferentes Observable s con uno nuevo ( zip , merge , concat ). Otras cosas que podría hacer es el almacenamiento en caché, la aceleración, ... Y utiliza más o menos la misma API en diferentes idiomas (RxJava, RX en C #, RxJS, ...)

Por defecto, RxJava es de un solo subproceso. A menos que comience a usar Programadores, todo sucederá en el mismo hilo.


Las respuestas existentes son completas y correctas, pero falta un ejemplo claro para principiantes. Permítanme poner algunos términos concretos detrás de "push / pull-based" y "re-observable". Nota : Odio el término Observable (es una transmisión por amor de Dios), así que simplemente me referiré a las transmisiones J8 vs RX.

Considere una lista de enteros,

digits = [1,2,3,4,5]

Un J8 Stream es una utilidad para modificar la colección. Por ejemplo, incluso los dígitos se pueden extraer como,

evens = digits.stream().filter(x -> x%2).collect(Collectors.toList())

Esto es básicamente el mapa de Python , filtro, reducción , una adición muy agradable (y muy atrasado) a Java. Pero, ¿qué pasaría si los dígitos no se recopilaran con anticipación, y si los dígitos se transmitieran mientras la aplicación se estaba ejecutando, podríamos filtrar los pares en tiempo real?

Imagine que un proceso de subproceso separado genera números enteros en momentos aleatorios mientras la aplicación se está ejecutando ( --- indica el tiempo)

digits = 12345---6------7--8--9-10--------11--12

En RX, even puede reaccionar a cada nuevo dígito y aplicar el filtro en tiempo real

even = -2-4-----6---------8----10------------12

No es necesario almacenar listas de entrada y salida. Si desea una lista de salida, no hay problema que sea transferible también. De hecho, todo es una corriente.

evens_stored = even.collect()

Es por eso que términos como "sin estado" y "funcional" están más asociados con RX


RxJava también está estrechamente relacionado con la ReactiveStreams y se considera una implementación simple de la API de flujos reactivos (por ejemplo, en comparación con la implementación de flujos Akka ). La principal diferencia es que las corrientes reactivas están diseñadas para poder manejar la contrapresión, pero si echa un vistazo a la página de corrientes reactivas, obtendrá la idea. Describen sus objetivos bastante bien y las corrientes también están estrechamente relacionadas con el manifiesto reactivo .

Las secuencias de Java 8 son más o menos la implementación de una colección ilimitada, bastante similar a Scala Stream o Clojure lazy seq .


TL; DR : todas las bibliotecas de procesamiento de secuencia / secuencia están ofreciendo API muy similar para la construcción de tuberías. Las diferencias están en la API para el manejo de subprocesos múltiples y la composición de tuberías.

RxJava es bastante diferente de Stream. De todas las cosas de JDK, la más cercana a rx.Observable es quizás java.util.stream.Collector Stream + CompletableFuture combo (que tiene un costo de tratar con una capa adicional de mónada, es decir, tener que manejar la conversión entre Stream<CompletableFuture<T>> y CompletableFuture<Stream<T>> ).

Existen diferencias significativas entre Observable y Stream:

  • Los flujos están basados ​​en pull, los observables están basados ​​en push. Esto puede sonar demasiado abstracto, pero tiene consecuencias significativas que son muy concretas.
  • Stream solo se puede usar una vez, Observable se puede suscribir muchas veces
  • Stream#parallel() divide la secuencia en particiones, Observable#subscribeOn() y Observable#observeOn() no; es complicado emular Stream#parallel() comportamiento Stream#parallel() con Observable, una vez tuvo el método .parallel() pero este método causó tanta confusión que el .parallel() se movió a un repositorio separado en github, RxJavaParallel. Más detalles están en otra respuesta .
  • Stream#parallel() no permite especificar un grupo de subprocesos para usar, a diferencia de la mayoría de los métodos de RxJava que aceptan el Programador opcional. Dado que todas las instancias de transmisión en una JVM usan el mismo grupo de bifurcación, agregar .parallel() puede afectar accidentalmente el comportamiento en otro módulo de su programa
  • Las transmisiones carecen de operaciones relacionadas con el tiempo como Observable#interval() , Observable#window() y muchas otras; Esto se debe principalmente a que las transmisiones se basan en pull
  • Las transmisiones ofrecen un conjunto restringido de operaciones en comparación con RxJava. Por ejemplo, las takeUntil() carecen de operaciones de corte ( takeWhile() , takeUntil() ); La solución para usar Stream#anyMatch() es limitada: es una operación terminal, por lo que no puede usarla más de una vez por transmisión
  • A partir de JDK 8, no existe la operación Stream # zip, que a veces es bastante útil
  • Los flujos son difíciles de construir por usted mismo, Observable se puede construir de muchas maneras EDITAR: Como se señaló en los comentarios, hay formas de construir un flujo. Sin embargo, dado que no hay un cortocircuito no terminal, no puede, por ejemplo, generar fácilmente un flujo de líneas en el archivo (JDK proporciona archivos # líneas y líneas BufferedReader # fuera de la caja, y otros escenarios similares se pueden gestionar construyendo flujo de Iterator).
  • Observable ofrece la facilidad de gestión de recursos ( Observable#using() ); puede envolver el flujo de E / S o mutex con él y asegurarse de que el usuario no se olvide de liberar el recurso: se eliminará automáticamente al finalizar la suscripción; Stream tiene el método onClose(Runnable) , pero debe llamarlo manualmente o mediante try-with-resources. P.ej. debe tener en cuenta que Files # lines () debe estar encerrado en el bloque try-with-resources.
  • Los observables están sincronizados hasta el final (en realidad no verifiqué si lo mismo es cierto para Streams). Esto le evita pensar si las operaciones básicas son seguras para subprocesos (la respuesta siempre es ''sí'', a menos que haya un error), pero la sobrecarga relacionada con la concurrencia estará allí, sin importar si su código lo necesita o no.

Redondeo: RxJava difiere significativamente de Streams. Las alternativas reales de RxJava son otras implementaciones de ReactiveStreams , por ejemplo, parte relevante de Akka.

Actualización Hay un truco para usar el grupo de unión de bifurcación no predeterminado para el Stream#parallel , consulte Grupo de subprocesos personalizado en flujo paralelo de Java 8

Actualización Todo lo anterior se basa en la experiencia con RxJava 1.x. Ahora que RxJava 2.x está aquí , esta respuesta puede estar desactualizada.