android - RxJava+Retrofit polling largo
long-polling rx-java (1)
Aunque no es lo ideal, creo que podría usar los efectos secundarios de RX para lograr el resultado deseado (operaciones ''DoOn'').
Observable<CredentialsWithTimestamp> credentialsProvider = Observable.just(new CredentialsWithTimestamp("credentials", 1434873025320L)); // replace with your implementation
Observable<ServerResponse> o = credentialsProvider.flatMap(credentialsWithTimestamp -> {
// side effect variable
AtomicLong timestamp = new AtomicLong(credentialsWithTimestamp.timestamp); // computational steering (inc. initial value)
return Observable.just(credentialsWithTimestamp.credentials) // same credentials are reused for each request - if invalid / onError, the later retry() will be called for new credentials
.flatMap(credentials -> api.query("request", credentials, timestamp.get())) // this will use the value from previous doOnNext
.doOnNext(serverResponse -> timestamp.set(serverResponse.getTimestamp()))
.repeat();
})
.retry()
.share();
private static class CredentialsWithTimestamp {
public final String credentials;
public final long timestamp; // I assume this is necessary for you from the first request
public CredentialsWithTimestamp(String credentials, long timestamp) {
this.credentials = credentials;
this.timestamp = timestamp;
}
}
Al suscribirse a ''o'' se repetirá el observable interno. Si se produce un error, ''o'' volverá a intentarlo y volverá a solicitarlo desde el flujo de credenciales.
En su ejemplo, la dirección computacional se logra al actualizar la variable de marca de tiempo, que es necesaria para la próxima solicitud.
Mi problema es que no puedo obtener flujo infinito con Retrofit
. Después de obtener las credenciales para la solicitud inicial de sondeo (), hago la solicitud inicial de sondeo (). Cada solicitud de sondeo () responde en 25 segundos si no hay ningún cambio, o antes si hay algún cambio, devolviendo a change_data []. Cada respuesta contiene los datos de timestamp
necesarios para la próxima solicitud de sondeo. Debería hacer una nueva solicitud de sondeo () después de cada respuesta de sondeo (). Aquí está mi código:
getServerApi().getLongPollServer()
.flatMap(longPollServer -> getLongPollServerApi(longPollServer.getServer()).poll("a_check", Config.LONG_POLLING_SERVER_TIMEOUT, 2, longPollServer.getKey(), longPollServer.getTs(), "")
.take(1)
.flatMap(longPollEnvelope -> getLongPollServerApi(longPollServer.getServer()).poll("a_check", Config.LONG_POLLING_SERVER_TIMEOUT, 2, longPollServer.getKey(), longPollEnvelope.getTs(), "")))
.retry()
.subscribe(longPollEnvelope1 -> {
processUpdates(longPollEnvelope1.getUpdates());
});
Soy nuevo en RxJava, tal vez no entiendo algo, pero no puedo obtener un flujo infinito. Recibo 3 llamadas, luego onNext y onComplete.
PS ¿Tal vez hay una mejor solución para implementar encuestas largas en Android?