callback - qué - Rx Java Android: Cómo convertir este bloque de devolución de llamada a Observer
rxjava android retrofit (2)
En general, este es el enfoque correcto para establecer un puente entre el workd asíncrono / de devolución de llamada y el reactivo, pero ahora se desaconseja el uso de Observable.create() , ya que requiere conocimiento avanzado para hacerlo bien.
Debería usar el método de creación más reciente Observable.fromEmitter() , que se verá bastante igual:
return Observable.fromEmitter(new Action1<Emitter<Integer>>() {
@Override
public void call(Emitter<Integer> emitter) {
transObs.setTransferListener(new TransferListener() {
@Override
public void onStateChanged(int id, TransferState state) {
if (state == TransferState.COMPLETED)
emitter.onCompleted();
}
@Override
public void onProgressChanged(int id, long bytesCurrent, long bytesTotal) {
}
@Override
public void onError(int id, Exception ex) {
emitter.onError(ex);
}
});
emitter.setCancellation(new Cancellable() {
@Override
public void cancel() throws Exception {
// Deal with unsubscription:
// 1. unregister the listener to avoid memory leak
// 2. cancel the upload
}
});
}
}, Emitter.BackpressureMode.DROP);
Lo que se agregó aquí es: tratar con la anulación de la suscripción: cancelar la carga y anular el registro para evitar fugas de memoria y especificar la estrategia de contrapresión.
puedes leer más aquí .
Notas adicionales:
- si está interesado en el progreso, puede llamar a Siguiente () con progreso en
onProgressChanged()y convertir el Observable aObservable<Integer>. - de lo contrario, es posible que desee considerar el uso de
Completableque es observable sin emisiones enonNext(), pero solo enonCompleted()esto puedeonCompleted()a su caso si no está interesado con las indicaciones de progreso.
Estoy intentando subir un archivo a través del S3 Android SDK de Amazon. He usado RX Java un poco, pero no estoy seguro de cómo convertir este método a un método que devuelve un Observable porque quiero encadenar el resultado de este método a otra llamada observable. Me confunde, supongo, por el hecho de que esto no vuelve de inmediato y no puede volver hasta que OnError o OnState cambien. ¿Cómo manejo estas situaciones de forma RX?
public void uploadFile(TransferObserver transferObserver){
transferObserver.setTransferListener(new TransferListener() {
@Override
public void onStateChanged(int id, TransferState state) {
}
@Override
public void onProgressChanged(int id, long bytesCurrent, long bytesTotal) {
}
@Override
public void onError(int id, Exception ex) {
}
});
}
Si alguien pudiera responder con RX Java 2 y lambdas eso sería genial porque sigo quedándome corto en esta
@Yosriz No pude obtener tu código para compilar, pero me ayudaste bastante, de modo que basándote en tu respuesta, aquí está lo que ahora tengo:
return Observable.fromEmitter(new Action1<AsyncEmitter<Integer>>() {
@Override
public void call(AsyncEmitter<Integer> emitter) {
transObs.setTransferListener(new TransferListener() {
@Override
public void onStateChanged(int id, TransferState state) {
if (state == TransferState.COMPLETED)
emitter.onCompleted();
}
@Override
public void onProgressChanged(int id, long bytesCurrent, long bytesTotal) {
}
@Override
public void onError(int id, Exception ex) {
emitter.onError(ex);
}
});
emitter.setCancellation(new AsyncEmitter.Cancellable() {
@Override
public void cancel() throws Exception {
transObs.cleanTransferListener();
}
});
}
}, AsyncEmitter.BackpressureMode.DROP);