callback - qué - Rx Java Android: Cómo convertir este bloque de devolución de llamada a Observer
rxjava android retrofit (2)
En general, este es el enfoque correcto para establecer un puente entre el workd asíncrono / de devolución de llamada y el reactivo, pero ahora se desaconseja el uso de Observable.create()
, ya que requiere conocimiento avanzado para hacerlo bien.
Debería usar el método de creación más reciente Observable.fromEmitter()
, que se verá bastante igual:
return Observable.fromEmitter(new Action1<Emitter<Integer>>() {
@Override
public void call(Emitter<Integer> emitter) {
transObs.setTransferListener(new TransferListener() {
@Override
public void onStateChanged(int id, TransferState state) {
if (state == TransferState.COMPLETED)
emitter.onCompleted();
}
@Override
public void onProgressChanged(int id, long bytesCurrent, long bytesTotal) {
}
@Override
public void onError(int id, Exception ex) {
emitter.onError(ex);
}
});
emitter.setCancellation(new Cancellable() {
@Override
public void cancel() throws Exception {
// Deal with unsubscription:
// 1. unregister the listener to avoid memory leak
// 2. cancel the upload
}
});
}
}, Emitter.BackpressureMode.DROP);
Lo que se agregó aquí es: tratar con la anulación de la suscripción: cancelar la carga y anular el registro para evitar fugas de memoria y especificar la estrategia de contrapresión.
puedes leer más aquí .
Notas adicionales:
- si está interesado en el progreso, puede llamar a Siguiente () con progreso en
onProgressChanged()
y convertir el Observable aObservable<Integer>
. - de lo contrario, es posible que desee considerar el uso de
Completable
que es observable sin emisiones enonNext()
, pero solo enonCompleted()
esto puedeonCompleted()
a su caso si no está interesado con las indicaciones de progreso.
Estoy intentando subir un archivo a través del S3 Android SDK de Amazon. He usado RX Java un poco, pero no estoy seguro de cómo convertir este método a un método que devuelve un Observable porque quiero encadenar el resultado de este método a otra llamada observable. Me confunde, supongo, por el hecho de que esto no vuelve de inmediato y no puede volver hasta que OnError o OnState cambien. ¿Cómo manejo estas situaciones de forma RX?
public void uploadFile(TransferObserver transferObserver){
transferObserver.setTransferListener(new TransferListener() {
@Override
public void onStateChanged(int id, TransferState state) {
}
@Override
public void onProgressChanged(int id, long bytesCurrent, long bytesTotal) {
}
@Override
public void onError(int id, Exception ex) {
}
});
}
Si alguien pudiera responder con RX Java 2 y lambdas eso sería genial porque sigo quedándome corto en esta
@Yosriz No pude obtener tu código para compilar, pero me ayudaste bastante, de modo que basándote en tu respuesta, aquí está lo que ahora tengo:
return Observable.fromEmitter(new Action1<AsyncEmitter<Integer>>() {
@Override
public void call(AsyncEmitter<Integer> emitter) {
transObs.setTransferListener(new TransferListener() {
@Override
public void onStateChanged(int id, TransferState state) {
if (state == TransferState.COMPLETED)
emitter.onCompleted();
}
@Override
public void onProgressChanged(int id, long bytesCurrent, long bytesTotal) {
}
@Override
public void onError(int id, Exception ex) {
emitter.onError(ex);
}
});
emitter.setCancellation(new AsyncEmitter.Cancellable() {
@Override
public void cancel() throws Exception {
transObs.cleanTransferListener();
}
});
}
}, AsyncEmitter.BackpressureMode.DROP);