starter actuator java android rx-java okhttp

actuator - Usando RxJava y Okhttp



spring boot actuator security (4)

Es más fácil y seguro usar Observable.defer() lugar de Observable.create() :

final OkHttpClient client = new OkHttpClient(); Observable.defer(new Func0<Observable<Response>>() { @Override public Observable<Response> call() { try { Response response = client.newCall(new Request.Builder().url("your url").build()).execute(); return Observable.just(response); } catch (IOException e) { return Observable.error(e); } } });

De esa manera se maneja la baja y la contrapresión. Aquí hay una gran publicación de Dan Lew sobre create() y defer() .

Si desea ir a la ruta de Observable.create() , entonces debería verse más como en esta biblioteca con las llamadas de isUnsubscribed() esparcidas por todas partes. Y creo que esto todavía no maneja la contrapresión.

Quiero solicitar a una url que use okhttp en otro hilo (como el hilo IO) y obtener Response en el hilo principal de Android, pero no sé cómo crear un Observable .


Llegué tarde a la discusión pero, si por alguna razón el código necesita transmitir el cuerpo de la respuesta, entonces el fromCallable o la fromCallable no lo hará. En su lugar, se puede emplear el operador de using .

Single.using(() -> okHttpClient.newCall(okRequest).execute(), // 1 response -> { // 2 ... return Single.just((Consumer<OutputStream>) fileOutput -> { try (InputStream upstreamResponseStream = response.body().byteStream(); OutputStream fileOutput = responseBodyOutput) { ByteStreams.copy(upstreamResponseStream, output); } }); }, Response::close, // 3 false) // 4 .subscribeOn(Schedulers.io()) // 5 .subscribe(copier -> copier.accept(...), // 6 throwable -> ...); // 7

  1. La primera lambda ejecuta la respuesta después de la suscripción .
  2. La segunda lambda crea el tipo observable, aquí con Single.just(...)
  3. La tercera lambda dispone la respuesta. Con el defer se podría haber utilizado el estilo try-with-resources.
  4. Establezca el interruptor de activación en false para que el eliminador se llame después del evento de terminal, es decir, después de que se haya ejecutado el consumidor de suscripción.
  5. Por supuesto hacer que la cosa suceda en otro grupo de hilos.
  6. Aquí está la lambda que consumirá el cuerpo de respuesta. Sin establecer eager en false , el código generará una excepción IOException con el motivo ''cerrado'' porque la respuesta ya estará cerrada antes de ingresar esta lambda.
  7. onError lambda debe manejar las excepciones, especialmente la IOException que ya no se puede capturar con el operador que using , ya que fue posible con un try / catch with defer .


Primero agregue RxAndroid a sus dependencias, luego cree su Observable así:

Subscription subscription = Observable.create(new Observable.OnSubscribe<Response>() { OkHttpClient client = new OkHttpClient(); @Override public void call(Subscriber<? super Response> subscriber) { try { Response response = client.newCall(new Request.Builder().url("your url").build()).execute(); if (response.isSuccessful()) { if(!subscriber.isUnsubscribed()){ subscriber.onNext(response); } subscriber.onCompleted(); } else if (!response.isSuccessful() && !subscriber.isUnsubscribed()) { subscriber.onError(new Exception("error")); } } catch (IOException e) { if (!subscriber.isUnsubscribed()) { subscriber.onError(e); } } } }) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Subscriber<Response>() { @Override public void onCompleted() { } @Override public void onError(Throwable e) { } @Override public void onNext(Response response) { } });

Solicitará su url en otro hilo (hilo io) y lo observará en el hilo principal de Android.

Y finalmente, cuando salga de la pantalla, use subsribtion.unsubscribe() para evitar la pérdida de memoria.

Cuando use Observable.create , debe escribir una gran cantidad de código repetitivo, también debe manejar la suscripción por su cuenta. Una mejor alternativa es usar defer . Forma el documento:

no cree el Observable hasta que el observador se suscriba, y cree un Observable nuevo para cada observador

El operador de Aplazamiento espera hasta que un observador se suscriba, y luego genera un Observable, generalmente con una función de fábrica Observable. Hace esto de nuevo para cada suscriptor, por lo que aunque cada suscriptor puede pensar que se está suscribiendo al mismo Observable, de hecho, cada suscriptor obtiene su propia secuencia individual.

Entonces, como mencionó Marcin KoziƄski , solo necesitas hacer esto:

final OkHttpClient client = new OkHttpClient(); Observable.defer(new Func0<Observable<Response>>() { @Override public Observable<Response> call() { try { Response response = client.newCall(new Request.Builder().url("your url").build()).execute(); return Observable.just(response); } catch (IOException e) { return Observable.error(e); } } });