java multithreading asynchronous java-8 rx-java

Diferencia entre CompletableFuture, Future y RxJava''s Observable



multithreading asynchronous (2)

Me gustaría saber la diferencia entre CompletableFuture , Future y Observable RxJava .

Lo que sé es que todos son asincrónicos pero

Future.get() bloquea el hilo

CompletableFuture proporciona los métodos de devolución de llamada

RxJava Observable --- similar a CompletableFuture con otros beneficios (no estoy seguro)

Por ejemplo: si el cliente necesita hacer múltiples llamadas de servicio y cuando usamos Futures (Java) Future.get() se ejecutará secuencialmente ... me gustaría saber cómo es mejor en RxJava.

Y la documentación http://reactivex.io/intro.html dice

Es difícil usar Futures para componer de manera óptima flujos de ejecución asincrónicos condicionales (o imposible, ya que las latencias de cada solicitud varían en tiempo de ejecución). Esto se puede hacer, por supuesto, pero rápidamente se vuelve complicado (y por lo tanto propenso a errores) o se bloquea prematuramente en Future.get (), lo que elimina el beneficio de la ejecución asincrónica.

Realmente interesado en saber cómo RxJava resuelve este problema. Me resultó difícil de entender por la documentación.


He estado trabajando con Rx Java desde 0.9, ahora en 1.3.2 y pronto migrando a 2.x Lo uso en un proyecto privado en el que ya trabajo durante 8 años.

Ya no programaría sin esta biblioteca. Al principio era escéptico, pero es un estado mental completamente diferente que debes crear. Tranquilo difícil al principio. A veces miraba las canicas por horas .. jajaja

Es solo una cuestión de práctica y realmente conocer el flujo (también conocido como contrato de observables y observadores), una vez que llegue allí, odiará hacerlo de otra manera.

Para mí no hay realmente un inconveniente en esa biblioteca.

Caso de uso: tengo una vista de monitor que contiene 9 indicadores (CPU, MEM, red, etc.). Al iniciar la vista, la vista se suscribe a una clase de monitor del sistema que devuelve un observable (intervalo) que contiene todos los datos de los 9 metros. Llevará cada segundo un nuevo resultado a la vista (¡así que no sondeará!). Ese observable usa un mapa plano para obtener simultáneamente (¡asíncrono!) Recuperar datos de 9 fuentes diferentes y comprime el resultado en un nuevo modelo que su vista obtendrá en onNext ().

¿Cómo diablos vas a hacer eso con futuros, completables, etc.? ¡Buena suerte! :)

Rx Java resuelve muchos problemas en la programación para mí y hace que sea mucho más fácil ...

Ventajas:

  • Statelss !!! (Algo importante que mencionar, lo más importante quizás)
  • Gestión de subprocesos fuera de la caja
  • Cree secuencias que tengan su propio ciclo de vida.
  • Todo es observable, así que encadenar es fácil
  • Menos código para escribir
  • Jarra individual en classpath (muy ligera)
  • Altamente concurrente
  • Ya no hay infierno de llamadas
  • Basado en suscriptor (contrato estricto entre consumidor y productor)
  • Estrategias de contrapresión (disyuntor similar)
  • Espléndido manejo de errores y recuperación
  • Muy buena documentación (canicas <3)
  • Control completo
  • Mucho mas ...

Desventajas: - Difícil de probar


Futuros

Futures se introdujeron en Java 5 (2004). Básicamente son marcadores de posición para el resultado de una operación que aún no ha terminado. Una vez que finalice la operación, el Future contendrá ese resultado. Por ejemplo, una operación puede ser una instancia Runnable o Callable que se envía a un ExecutorService . El remitente de la operación puede usar el objeto Future para verificar si la operación isDone() , o esperar a que termine de usar el método get() bloqueo.

Ejemplo:

/** * A task that sleeps for a second, then returns 1 **/ public static class MyCallable implements Callable<Integer> { @Override public Integer call() throws Exception { Thread.sleep(1000); return 1; } } public static void main(String[] args) throws Exception{ ExecutorService exec = Executors.newSingleThreadExecutor(); Future<Integer> f = exec.submit(new MyCallable()); System.out.println(f.isDone()); //False System.out.println(f.get()); //Waits until the task is done, then prints 1 }

Futuros Completables

CompletableFutures se introdujeron en Java 8 (2014). De hecho, son una evolución de los futuros regulares, inspirados en los futuros escuchables de Google, parte de la biblioteca Guava . Son futuros que también le permiten unir tareas en una cadena. Puede usarlos para decirle a algún subproceso de trabajo que "vaya a hacer una tarea X, y cuando haya terminado, haga otra cosa usando el resultado de X". Con CompletableFutures, puede hacer algo con el resultado de la operación sin bloquear realmente un subproceso para esperar el resultado. Aquí hay un ejemplo simple:

/** * A supplier that sleeps for a second, and then returns one **/ public static class MySupplier implements Supplier<Integer> { @Override public Integer get() { try { Thread.sleep(1000); } catch (InterruptedException e) { //Do nothing } return 1; } } /** * A (pure) function that adds one to a given Integer **/ public static class PlusOne implements Function<Integer, Integer> { @Override public Integer apply(Integer x) { return x + 1; } } public static void main(String[] args) throws Exception { ExecutorService exec = Executors.newSingleThreadExecutor(); CompletableFuture<Integer> f = CompletableFuture.supplyAsync(new MySupplier(), exec); System.out.println(f.isDone()); // False CompletableFuture<Integer> f2 = f.thenApply(new PlusOne()); System.out.println(f2.get()); // Waits until the "calculation" is done, then prints 2 }

RxJava

RxJava es una biblioteca completa para programación reactiva creada en Netflix. De un vistazo, parecerá ser similar a las transmisiones de Java 8 . Lo es, excepto que es mucho más poderoso.

De manera similar a Futures, RxJava se puede usar para unir un conjunto de acciones síncronas o asíncronas para crear una tubería de procesamiento. A diferencia de los futuros, que son de un solo uso, RxJava funciona en flujos de cero o más elementos. Incluyendo transmisiones interminables con un número infinito de elementos. También es mucho más flexible y potente gracias a un conjunto increíblemente rico de operadores .

A diferencia de las transmisiones de Java 8, RxJava también tiene un mecanismo de backpressure , que le permite manejar casos en los que diferentes partes de su tubería de procesamiento operan en diferentes hilos, a diferentes velocidades .

La desventaja de RxJava es que a pesar de la documentación sólida, es una biblioteca difícil de aprender debido al cambio de paradigma involucrado. El código Rx también puede ser una pesadilla para depurar, especialmente si hay varios hilos involucrados, y aún peor, si se necesita contrapresión.

Si desea entrar en él, hay una page completa de varios tutoriales en el sitio web oficial, además de la documentation oficial y Javadoc . También puede echar un vistazo a algunos de los videos como este, que ofrece una breve introducción a Rx y también habla sobre las diferencias entre Rx y Futures.

Bonus: secuencias reactivas de Java 9

Los flujos reactivos de Java 9, también conocidos como Flow API, son un conjunto de interfaces implementadas por varias bibliotecas de flujos reactivos como RxJava 2 , Akka Streams y Vertx . Permiten que estas bibliotecas reactivas se interconecten, al tiempo que conservan la importante contrapresión.