rxjava2 - rxjava tutorial
RxJava Obteniendo observables en paralelo (2)
El operador paralelo demostró ser un problema para casi todos los casos de uso y no hace lo que la mayoría espera de él, por lo que se eliminó en la versión 1.0.0.rc.4: https://github.com/ReactiveX/RxJava/pull/1716
here se puede ver un buen ejemplo de cómo hacer este tipo de comportamiento y obtener una ejecución paralela.
En su código de ejemplo no está claro si
searchServiceClient
es síncrono o asíncrono.
Afecta ligeramente la forma de resolver el problema, ya que si ya es asíncrono, no se necesita una programación adicional.
Si se necesita una programación adicional síncrona.
Primero, aquí hay algunos ejemplos simples que muestran un comportamiento síncrono y asíncrono:
import rx.Observable;
import rx.Subscriber;
import rx.schedulers.Schedulers;
public class ParallelExecution {
public static void main(String[] args) {
System.out.println("------------ mergingAsync");
mergingAsync();
System.out.println("------------ mergingSync");
mergingSync();
System.out.println("------------ mergingSyncMadeAsync");
mergingSyncMadeAsync();
System.out.println("------------ flatMapExampleSync");
flatMapExampleSync();
System.out.println("------------ flatMapExampleAsync");
flatMapExampleAsync();
System.out.println("------------");
}
private static void mergingAsync() {
Observable.merge(getDataAsync(1), getDataAsync(2)).toBlocking().forEach(System.out::println);
}
private static void mergingSync() {
// here you''ll see the delay as each is executed synchronously
Observable.merge(getDataSync(1), getDataSync(2)).toBlocking().forEach(System.out::println);
}
private static void mergingSyncMadeAsync() {
// if you have something synchronous and want to make it async, you can schedule it like this
// so here we see both executed concurrently
Observable.merge(getDataSync(1).subscribeOn(Schedulers.io()), getDataSync(2).subscribeOn(Schedulers.io())).toBlocking().forEach(System.out::println);
}
private static void flatMapExampleAsync() {
Observable.range(0, 5).flatMap(i -> {
return getDataAsync(i);
}).toBlocking().forEach(System.out::println);
}
private static void flatMapExampleSync() {
Observable.range(0, 5).flatMap(i -> {
return getDataSync(i);
}).toBlocking().forEach(System.out::println);
}
// artificial representations of IO work
static Observable<Integer> getDataAsync(int i) {
return getDataSync(i).subscribeOn(Schedulers.io());
}
static Observable<Integer> getDataSync(int i) {
return Observable.create((Subscriber<? super Integer> s) -> {
// simulate latency
try {
Thread.sleep(1000);
} catch (Exception e) {
e.printStackTrace();
}
s.onNext(i);
s.onCompleted();
});
}
}
El siguiente es un intento de proporcionar un ejemplo que coincida más estrechamente con su código:
import java.util.List;
import rx.Observable;
import rx.Subscriber;
import rx.schedulers.Schedulers;
public class ParallelExecutionExample {
public static void main(String[] args) {
final long startTime = System.currentTimeMillis();
Observable<Tile> searchTile = getSearchResults("search term")
.doOnSubscribe(() -> logTime("Search started ", startTime))
.doOnCompleted(() -> logTime("Search completed ", startTime));
Observable<TileResponse> populatedTiles = searchTile.flatMap(t -> {
Observable<Reviews> reviews = getSellerReviews(t.getSellerId())
.doOnCompleted(() -> logTime("getSellerReviews[" + t.id + "] completed ", startTime));
Observable<String> imageUrl = getProductImage(t.getProductId())
.doOnCompleted(() -> logTime("getProductImage[" + t.id + "] completed ", startTime));
return Observable.zip(reviews, imageUrl, (r, u) -> {
return new TileResponse(t, r, u);
}).doOnCompleted(() -> logTime("zip[" + t.id + "] completed ", startTime));
});
List<TileResponse> allTiles = populatedTiles.toList()
.doOnCompleted(() -> logTime("All Tiles Completed ", startTime))
.toBlocking().single();
}
private static Observable<Tile> getSearchResults(String string) {
return mockClient(new Tile(1), new Tile(2), new Tile(3));
}
private static Observable<Reviews> getSellerReviews(int id) {
return mockClient(new Reviews());
}
private static Observable<String> getProductImage(int id) {
return mockClient("image_" + id);
}
private static void logTime(String message, long startTime) {
System.out.println(message + " => " + (System.currentTimeMillis() - startTime) + "ms");
}
private static <T> Observable<T> mockClient(T... ts) {
return Observable.create((Subscriber<? super T> s) -> {
// simulate latency
try {
Thread.sleep(1000);
} catch (Exception e) {
}
for (T t : ts) {
s.onNext(t);
}
s.onCompleted();
}).subscribeOn(Schedulers.io());
// note the use of subscribeOn to make an otherwise synchronous Observable async
}
public static class TileResponse {
public TileResponse(Tile t, Reviews r, String u) {
// store the values
}
}
public static class Tile {
private final int id;
public Tile(int i) {
this.id = i;
}
public int getSellerId() {
return id;
}
public int getProductId() {
return id;
}
}
public static class Reviews {
}
}
Esto produce:
Search started => 65ms
Search completed => 1094ms
getProductImage[1] completed => 2095ms
getSellerReviews[2] completed => 2095ms
getProductImage[3] completed => 2095ms
zip[1] completed => 2096ms
zip[2] completed => 2096ms
getProductImage[2] completed => 2096ms
getSellerReviews[1] completed => 2096ms
zip[3] completed => 2096ms
All Tiles Completed => 2097ms
getSellerReviews[3] completed => 2097ms
He simulado que cada llamada de IO tome 1000 ms, por lo que es obvio dónde está la latencia y que está sucediendo en paralelo. Imprime el progreso realizado en milisegundos transcurridos.
El truco aquí es que flatMap combina llamadas asíncronas, por lo que siempre que los Observables que se fusionen sean asíncronos, todos se ejecutarán simultáneamente.
Si una llamada como
getProductImage(t.getProductId())
fue síncrona, se puede hacer asíncrona de esta manera: getProductImage (t.getProductId ()). SubscribeOn (Schedulers.io).
Aquí está la parte importante del ejemplo anterior sin todos los tipos de registro y repetitivo:
Observable<Tile> searchTile = getSearchResults("search term");;
Observable<TileResponse> populatedTiles = searchTile.flatMap(t -> {
Observable<Reviews> reviews = getSellerReviews(t.getSellerId());
Observable<String> imageUrl = getProductImage(t.getProductId());
return Observable.zip(reviews, imageUrl, (r, u) -> {
return new TileResponse(t, r, u);
});
});
List<TileResponse> allTiles = populatedTiles.toList()
.toBlocking().single();
Espero que esto ayude.
Necesito ayuda para implementar llamadas asíncronas paralelas en RxJava. Elegí un caso de uso simple en el que la primera llamada obtiene (en lugar de buscar) una lista de productos (mosaico) para mostrar. Las llamadas posteriores salen y se obtienen (A) REVISIONES y (B) IMÁGENES DEL PRODUCTO
Después de varios intentos llegué a este lugar.
1 Observable<Tile> searchTile = searchServiceClient.getSearchResults(searchTerm);
2 List<Tile> allTiles = new ArrayList<Tile>();
3 ClientResponse response = new ClientResponse();
4 searchTile.parallel(oTile -> {
5 return oTile.flatMap(t -> {
6 Observable<Reviews> reviews = reviewsServiceClient.getSellerReviews(t.getSellerId());
7 Observable<String> imageUrl = reviewsServiceClient.getProductImage(t.getProductId());
8 return Observable.zip(reviews, imageUrl, (r, u) -> {
9 t.setReviews(r);
10 t.setImageUrl(u);
11 return t;
12 });
13 });
14 }).subscribe(e -> {
15 allTiles.add((Tile) e);
16 });
Línea 1: se apaga y busca el producto (mosaico) que se mostrará
Línea 4: Tomamos la lista de Observables y la SHARD para obtener reseñas e imágenes.
Mentira 6,7: Fetch the Observable review and Observable url
Línea 8: Finalmente, los 2 observables se comprimen para devolver un Observable actualizado
Línea 15: finalmente, la línea 15 recopila todos los productos individuales que se mostrarán en una colección que puede devolverse a la capa de llamada
Mientras que el Observable se ha fragmentado y en nuestras pruebas se ejecuta en 4 subprocesos diferentes; la búsqueda de reseñas e imágenes parece ser una tras otra. Sospecho que el paso zip en la línea 8 está causando básicamente la invocación secuencial de los 2 observables (revisiones y url).
¿Tiene este grupo alguna sugerencia para buscar paralelamente reiews y URL de imágenes? En esencia, el gráfico de cascada adjunto arriba debería verse más apilado verticalmente. Las llamadas a revisiones e imágenes deben ser paralelas
gracias y raman
Las personas que todavía son @ JDK 7, cuyo IDE aún no detecta automáticamente la fuente JDK 8 y qué probar la brillante respuesta (y explicación) anterior de @benjchristensen pueden usar este código JDK 7 descaradamente refractado. ¡Felicitaciones a @benjchristensen por una increíble explicación y ejemplo!
import java.util.List;
import rx.Observable;
import rx.Subscriber;
import rx.functions.Action0;
import rx.functions.Func1;
import rx.functions.Func2;
import rx.schedulers.Schedulers;
public class ParallelExecutionExample
{
public static void main(String[] args)
{
final long startTime = System.currentTimeMillis();
Observable<Tile> searchTile = getSearchResults("search term")
.doOnSubscribe(new Action0()
{
@Override
public void call()
{
logTime("Search started ", startTime);
}
})
.doOnCompleted(new Action0()
{
@Override
public void call()
{
logTime("Search completed ", startTime);
}
});
Observable<TileResponse> populatedTiles = searchTile.flatMap(new Func1<Tile, Observable<TileResponse>>()
{
@Override
public Observable<TileResponse> call(final Tile t)
{
Observable<Reviews> reviews = getSellerReviews(t.getSellerId())
.doOnCompleted(new Action0()
{
@Override
public void call()
{
logTime("getSellerReviews[" + t.id + "] completed ", startTime);
}
});
Observable<String> imageUrl = getProductImage(t.getProductId())
.doOnCompleted(new Action0()
{
@Override
public void call()
{
logTime("getProductImage[" + t.id + "] completed ", startTime);
}
});
return Observable.zip(reviews, imageUrl, new Func2<Reviews, String, TileResponse>()
{
@Override
public TileResponse call(Reviews r, String u)
{
return new TileResponse(t, r, u);
}
})
.doOnCompleted(new Action0()
{
@Override
public void call()
{
logTime("zip[" + t.id + "] completed ", startTime);
}
});
}
});
List<TileResponse> allTiles = populatedTiles
.toList()
.doOnCompleted(new Action0()
{
@Override
public void call()
{
logTime("All Tiles Completed ", startTime);
}
})
.toBlocking()
.single();
}
private static Observable<Tile> getSearchResults(String string)
{
return mockClient(new Tile(1), new Tile(2), new Tile(3));
}
private static Observable<Reviews> getSellerReviews(int id)
{
return mockClient(new Reviews());
}
private static Observable<String> getProductImage(int id)
{
return mockClient("image_" + id);
}
private static void logTime(String message, long startTime)
{
System.out.println(message + " => " + (System.currentTimeMillis() - startTime) + "ms");
}
private static <T> Observable<T> mockClient(final T... ts)
{
return Observable.create(new Observable.OnSubscribe<T>()
{
@Override
public void call(Subscriber<? super T> s)
{
try
{
Thread.sleep(1000);
}
catch (Exception e)
{
}
for (T t : ts)
{
s.onNext(t);
}
s.onCompleted();
}
})
.subscribeOn(Schedulers.io());
// note the use of subscribeOn to make an otherwise synchronous Observable async
}
public static class TileResponse
{
public TileResponse(Tile t, Reviews r, String u)
{
// store the values
}
}
public static class Tile
{
private final int id;
public Tile(int i)
{
this.id = i;
}
public int getSellerId()
{
return id;
}
public int getProductId()
{
return id;
}
}
public static class Reviews
{
}
}