tutorial rxjava2 rxjava react example java asynchronous reactive-programming observable rx-java

rxjava2 - rxjava tutorial



RxJava Obteniendo observables en paralelo (2)

El operador paralelo demostró ser un problema para casi todos los casos de uso y no hace lo que la mayoría espera de él, por lo que se eliminó en la versión 1.0.0.rc.4: https://github.com/ReactiveX/RxJava/pull/1716

here se puede ver un buen ejemplo de cómo hacer este tipo de comportamiento y obtener una ejecución paralela.

En su código de ejemplo no está claro si searchServiceClient es síncrono o asíncrono. Afecta ligeramente la forma de resolver el problema, ya que si ya es asíncrono, no se necesita una programación adicional. Si se necesita una programación adicional síncrona.

Primero, aquí hay algunos ejemplos simples que muestran un comportamiento síncrono y asíncrono:

import rx.Observable; import rx.Subscriber; import rx.schedulers.Schedulers; public class ParallelExecution { public static void main(String[] args) { System.out.println("------------ mergingAsync"); mergingAsync(); System.out.println("------------ mergingSync"); mergingSync(); System.out.println("------------ mergingSyncMadeAsync"); mergingSyncMadeAsync(); System.out.println("------------ flatMapExampleSync"); flatMapExampleSync(); System.out.println("------------ flatMapExampleAsync"); flatMapExampleAsync(); System.out.println("------------"); } private static void mergingAsync() { Observable.merge(getDataAsync(1), getDataAsync(2)).toBlocking().forEach(System.out::println); } private static void mergingSync() { // here you''ll see the delay as each is executed synchronously Observable.merge(getDataSync(1), getDataSync(2)).toBlocking().forEach(System.out::println); } private static void mergingSyncMadeAsync() { // if you have something synchronous and want to make it async, you can schedule it like this // so here we see both executed concurrently Observable.merge(getDataSync(1).subscribeOn(Schedulers.io()), getDataSync(2).subscribeOn(Schedulers.io())).toBlocking().forEach(System.out::println); } private static void flatMapExampleAsync() { Observable.range(0, 5).flatMap(i -> { return getDataAsync(i); }).toBlocking().forEach(System.out::println); } private static void flatMapExampleSync() { Observable.range(0, 5).flatMap(i -> { return getDataSync(i); }).toBlocking().forEach(System.out::println); } // artificial representations of IO work static Observable<Integer> getDataAsync(int i) { return getDataSync(i).subscribeOn(Schedulers.io()); } static Observable<Integer> getDataSync(int i) { return Observable.create((Subscriber<? super Integer> s) -> { // simulate latency try { Thread.sleep(1000); } catch (Exception e) { e.printStackTrace(); } s.onNext(i); s.onCompleted(); }); } }

El siguiente es un intento de proporcionar un ejemplo que coincida más estrechamente con su código:

import java.util.List; import rx.Observable; import rx.Subscriber; import rx.schedulers.Schedulers; public class ParallelExecutionExample { public static void main(String[] args) { final long startTime = System.currentTimeMillis(); Observable<Tile> searchTile = getSearchResults("search term") .doOnSubscribe(() -> logTime("Search started ", startTime)) .doOnCompleted(() -> logTime("Search completed ", startTime)); Observable<TileResponse> populatedTiles = searchTile.flatMap(t -> { Observable<Reviews> reviews = getSellerReviews(t.getSellerId()) .doOnCompleted(() -> logTime("getSellerReviews[" + t.id + "] completed ", startTime)); Observable<String> imageUrl = getProductImage(t.getProductId()) .doOnCompleted(() -> logTime("getProductImage[" + t.id + "] completed ", startTime)); return Observable.zip(reviews, imageUrl, (r, u) -> { return new TileResponse(t, r, u); }).doOnCompleted(() -> logTime("zip[" + t.id + "] completed ", startTime)); }); List<TileResponse> allTiles = populatedTiles.toList() .doOnCompleted(() -> logTime("All Tiles Completed ", startTime)) .toBlocking().single(); } private static Observable<Tile> getSearchResults(String string) { return mockClient(new Tile(1), new Tile(2), new Tile(3)); } private static Observable<Reviews> getSellerReviews(int id) { return mockClient(new Reviews()); } private static Observable<String> getProductImage(int id) { return mockClient("image_" + id); } private static void logTime(String message, long startTime) { System.out.println(message + " => " + (System.currentTimeMillis() - startTime) + "ms"); } private static <T> Observable<T> mockClient(T... ts) { return Observable.create((Subscriber<? super T> s) -> { // simulate latency try { Thread.sleep(1000); } catch (Exception e) { } for (T t : ts) { s.onNext(t); } s.onCompleted(); }).subscribeOn(Schedulers.io()); // note the use of subscribeOn to make an otherwise synchronous Observable async } public static class TileResponse { public TileResponse(Tile t, Reviews r, String u) { // store the values } } public static class Tile { private final int id; public Tile(int i) { this.id = i; } public int getSellerId() { return id; } public int getProductId() { return id; } } public static class Reviews { } }

Esto produce:

Search started => 65ms Search completed => 1094ms getProductImage[1] completed => 2095ms getSellerReviews[2] completed => 2095ms getProductImage[3] completed => 2095ms zip[1] completed => 2096ms zip[2] completed => 2096ms getProductImage[2] completed => 2096ms getSellerReviews[1] completed => 2096ms zip[3] completed => 2096ms All Tiles Completed => 2097ms getSellerReviews[3] completed => 2097ms

He simulado que cada llamada de IO tome 1000 ms, por lo que es obvio dónde está la latencia y que está sucediendo en paralelo. Imprime el progreso realizado en milisegundos transcurridos.

El truco aquí es que flatMap combina llamadas asíncronas, por lo que siempre que los Observables que se fusionen sean asíncronos, todos se ejecutarán simultáneamente.

Si una llamada como getProductImage(t.getProductId()) fue síncrona, se puede hacer asíncrona de esta manera: getProductImage (t.getProductId ()). SubscribeOn (Schedulers.io).

Aquí está la parte importante del ejemplo anterior sin todos los tipos de registro y repetitivo:

Observable<Tile> searchTile = getSearchResults("search term");; Observable<TileResponse> populatedTiles = searchTile.flatMap(t -> { Observable<Reviews> reviews = getSellerReviews(t.getSellerId()); Observable<String> imageUrl = getProductImage(t.getProductId()); return Observable.zip(reviews, imageUrl, (r, u) -> { return new TileResponse(t, r, u); }); }); List<TileResponse> allTiles = populatedTiles.toList() .toBlocking().single();

Espero que esto ayude.

Necesito ayuda para implementar llamadas asíncronas paralelas en RxJava. Elegí un caso de uso simple en el que la primera llamada obtiene (en lugar de buscar) una lista de productos (mosaico) para mostrar. Las llamadas posteriores salen y se obtienen (A) REVISIONES y (B) IMÁGENES DEL PRODUCTO

Después de varios intentos llegué a este lugar.

1 Observable<Tile> searchTile = searchServiceClient.getSearchResults(searchTerm); 2 List<Tile> allTiles = new ArrayList<Tile>(); 3 ClientResponse response = new ClientResponse(); 4 searchTile.parallel(oTile -> { 5 return oTile.flatMap(t -> { 6 Observable<Reviews> reviews = reviewsServiceClient.getSellerReviews(t.getSellerId()); 7 Observable<String> imageUrl = reviewsServiceClient.getProductImage(t.getProductId()); 8 return Observable.zip(reviews, imageUrl, (r, u) -> { 9 t.setReviews(r); 10 t.setImageUrl(u); 11 return t; 12 }); 13 }); 14 }).subscribe(e -> { 15 allTiles.add((Tile) e); 16 });

Línea 1: se apaga y busca el producto (mosaico) que se mostrará

Línea 4: Tomamos la lista de Observables y la SHARD para obtener reseñas e imágenes.

Mentira 6,7: Fetch the Observable review and Observable url

Línea 8: Finalmente, los 2 observables se comprimen para devolver un Observable actualizado

Línea 15: finalmente, la línea 15 recopila todos los productos individuales que se mostrarán en una colección que puede devolverse a la capa de llamada

Mientras que el Observable se ha fragmentado y en nuestras pruebas se ejecuta en 4 subprocesos diferentes; la búsqueda de reseñas e imágenes parece ser una tras otra. Sospecho que el paso zip en la línea 8 está causando básicamente la invocación secuencial de los 2 observables (revisiones y url).

¿Tiene este grupo alguna sugerencia para buscar paralelamente reiews y URL de imágenes? En esencia, el gráfico de cascada adjunto arriba debería verse más apilado verticalmente. Las llamadas a revisiones e imágenes deben ser paralelas

gracias y raman


Las personas que todavía son @ JDK 7, cuyo IDE aún no detecta automáticamente la fuente JDK 8 y qué probar la brillante respuesta (y explicación) anterior de @benjchristensen pueden usar este código JDK 7 descaradamente refractado. ¡Felicitaciones a @benjchristensen por una increíble explicación y ejemplo!

import java.util.List; import rx.Observable; import rx.Subscriber; import rx.functions.Action0; import rx.functions.Func1; import rx.functions.Func2; import rx.schedulers.Schedulers; public class ParallelExecutionExample { public static void main(String[] args) { final long startTime = System.currentTimeMillis(); Observable<Tile> searchTile = getSearchResults("search term") .doOnSubscribe(new Action0() { @Override public void call() { logTime("Search started ", startTime); } }) .doOnCompleted(new Action0() { @Override public void call() { logTime("Search completed ", startTime); } }); Observable<TileResponse> populatedTiles = searchTile.flatMap(new Func1<Tile, Observable<TileResponse>>() { @Override public Observable<TileResponse> call(final Tile t) { Observable<Reviews> reviews = getSellerReviews(t.getSellerId()) .doOnCompleted(new Action0() { @Override public void call() { logTime("getSellerReviews[" + t.id + "] completed ", startTime); } }); Observable<String> imageUrl = getProductImage(t.getProductId()) .doOnCompleted(new Action0() { @Override public void call() { logTime("getProductImage[" + t.id + "] completed ", startTime); } }); return Observable.zip(reviews, imageUrl, new Func2<Reviews, String, TileResponse>() { @Override public TileResponse call(Reviews r, String u) { return new TileResponse(t, r, u); } }) .doOnCompleted(new Action0() { @Override public void call() { logTime("zip[" + t.id + "] completed ", startTime); } }); } }); List<TileResponse> allTiles = populatedTiles .toList() .doOnCompleted(new Action0() { @Override public void call() { logTime("All Tiles Completed ", startTime); } }) .toBlocking() .single(); } private static Observable<Tile> getSearchResults(String string) { return mockClient(new Tile(1), new Tile(2), new Tile(3)); } private static Observable<Reviews> getSellerReviews(int id) { return mockClient(new Reviews()); } private static Observable<String> getProductImage(int id) { return mockClient("image_" + id); } private static void logTime(String message, long startTime) { System.out.println(message + " => " + (System.currentTimeMillis() - startTime) + "ms"); } private static <T> Observable<T> mockClient(final T... ts) { return Observable.create(new Observable.OnSubscribe<T>() { @Override public void call(Subscriber<? super T> s) { try { Thread.sleep(1000); } catch (Exception e) { } for (T t : ts) { s.onNext(t); } s.onCompleted(); } }) .subscribeOn(Schedulers.io()); // note the use of subscribeOn to make an otherwise synchronous Observable async } public static class TileResponse { public TileResponse(Tile t, Reviews r, String u) { // store the values } } public static class Tile { private final int id; public Tile(int i) { this.id = i; } public int getSellerId() { return id; } public int getProductId() { return id; } } public static class Reviews { } }