templatecache - save cache angularjs
Caché reactivo del servicio HTTP (3)
Casi cualquier lógica complicada se sale de control rápidamente si utiliza rxjs
simples. Preferiría implementar el operador de cache
personalizado desde cero, puede usar esta gist como ejemplo.
Estoy utilizando RsJS 5 (5.0.1) para almacenar en caché en Angular 2. Funciona bien.
La carne de la función de almacenamiento en caché es:
const observable = Observable.defer(
() => actualFn().do(() => this.console.log(''CACHE MISS'', cacheKey))
)
.publishReplay(1, this.RECACHE_INTERVAL)
.refCount().take(1)
.do(() => this.console.log(''CACHE HIT'', cacheKey));
El actualFn
es el this.http.get(''/some/resource'')
.
Como digo, esto está funcionando perfectamente para mí. La memoria caché se devuelve del observable durante la duración de RECACHE_INTERVAL
. Si se realiza una solicitud después de ese intervalo, se actualFn()
al actualFn()
.
Lo que estoy tratando de averiguar es cuándo caduca el RECACHE_INTERVAL
y se actualFn()
al actualFn()
- cómo devolver el último valor. Hay un espacio de tiempo entre la RECACHE_INTERVAL
caducidad de RECACHE_INTERVAL
y la actualFn()
la función actualFn()
que el observable no devuelve un valor. Me gustaría deshacerme de esa brecha en el tiempo y siempre devolver el último valor.
Podría usar un efecto secundario y almacenar la última llamada de valor .next(lastValue)
mientras espero que la respuesta HTTP regrese, pero esto parece ingenuo. Me gustaría usar una forma "RxJS", una solución de función pura, si es posible.
Vea esta demostración: https://jsbin.com/todude/10/edit?js,console
Tenga en cuenta que estoy intentando obtener resultados en caché a 1200ms
cuando el caso se invalida y luego a 1300ms
cuando la solicitud anterior aún está pendiente (tarda 200ms
). Ambos resultados se reciben como deberían.
Esto sucede porque cuando te suscribes y publishReplay()
no contiene ningún valor válido, no emitirá nada y no se completará de inmediato (gracias a take(1)
), por lo que debe suscribirse a su fuente, lo que hace que el HTTP peticiones (esto de hecho ocurre en refCount()
).
Luego, el segundo suscriptor no recibirá nada y se agregará a la serie de observadores en publishReplay()
. No realizará otra suscripción porque ya está suscrita a su fuente ( refCount()
) y está esperando la respuesta.
Así que la situación que estás describiendo no debería suceder, creo. Eventualmente haga una demostración que demuestre su problema.
EDITAR :
Emisión de elementos invalidados y artículos nuevos
El siguiente ejemplo muestra una funcionalidad un poco diferente a la del ejemplo vinculado. Si la respuesta en caché se invalida, se emitirá de todos modos y, a continuación, recibirá también el nuevo valor. Esto significa que el suscriptor recibe uno o dos valores:
- 1 valor: el valor en caché
- 2 valores: el valor en caché invalidado y luego un nuevo valor que se almacenará en caché a partir de ahora.
El código podría parecerse a lo siguiente:
let counter = 1;
const RECACHE_INTERVAL = 1000;
function mockDataFetch() {
return Observable.of(counter++)
.delay(200);
}
let source = Observable.defer(() => {
const now = (new Date()).getTime();
return mockDataFetch()
.map(response => {
return {
''timestamp'': now,
''response'': response,
};
});
});
let updateRequest = source
.publishReplay(1)
.refCount()
.concatMap(value => {
if (value.timestamp + RECACHE_INTERVAL > (new Date()).getTime()) {
return Observable.from([value.response, null]);
} else {
return Observable.of(value.response);
}
})
.takeWhile(value => value);
setTimeout(() => updateRequest.subscribe(val => console.log("Response 0:", val)), 0);
setTimeout(() => updateRequest.subscribe(val => console.log("Response 50:", val)), 50);
setTimeout(() => updateRequest.subscribe(val => console.log("Response 200:", val)), 200);
setTimeout(() => updateRequest.subscribe(val => console.log("Response 1200:", val)), 1200);
setTimeout(() => updateRequest.subscribe(val => console.log("Response 1300:", val)), 1300);
setTimeout(() => updateRequest.subscribe(val => console.log("Response 1500:", val)), 1500);
setTimeout(() => updateRequest.subscribe(val => console.log("Response 3500:", val)), 3500);
Vea una demostración en vivo: https://jsbin.com/ketemi/2/edit?js,console
Esto imprime para consolar la siguiente salida:
Response 0: 1
Response 50: 1
Response 200: 1
Response 1200: 1
Response 1300: 1
Response 1200: 2
Response 1300: 2
Response 1500: 2
Response 3500: 2
Response 3500: 3
Los avisos 1200
y 1300
recibieron primero el antiguo valor en caché 1
inmediatamente y luego otro valor con el nuevo valor 2
.
Por otro lado, 1500
recibió solo el nuevo valor porque 2
ya está en caché y es válido.
Lo más confuso es probablemente por qué estoy usando concatMap().takeWhile()
. Esto se debe a que debo asegurarme de que la respuesta nueva (no la invalidada) sea el último valor antes de enviar una notificación completa y probablemente no haya un operador para eso (ni first()
ni takeWhile()
son aplicables para este caso de uso).
Emitiendo solo el elemento actual sin esperar a que se actualice
Otro caso de uso podría ser cuando queremos emitir solo el valor almacenado en caché mientras no esperamos una respuesta nueva de la solicitud HTTP.
let counter = 1;
const RECACHE_INTERVAL = 1000;
function mockDataFetch() {
return Observable.of(counter++)
.delay(200);
}
let source = Observable.defer(() => {
const now = (new Date()).getTime();
return mockDataFetch()
.map(response => {
return {
''timestamp'': now,
''response'': response,
};
});
});
let updateRequest = source
.publishReplay(1)
.refCount()
.concatMap((value, i) => {
if (i === 0) {
if (value.timestamp + RECACHE_INTERVAL > (new Date()).getTime()) { // is cached item valid?
return Observable.from([value.response, null]);
} else {
return Observable.of(value.response);
}
}
return Observable.of(null);
})
.takeWhile(value => value);
setTimeout(() => updateRequest.subscribe(val => console.log("Response 0:", val)), 0);
setTimeout(() => updateRequest.subscribe(val => console.log("Response 50:", val)), 50);
setTimeout(() => updateRequest.subscribe(val => console.log("Response 200:", val)), 200);
setTimeout(() => updateRequest.subscribe(val => console.log("Response 1200:", val)), 1200);
setTimeout(() => updateRequest.subscribe(val => console.log("Response 1300:", val)), 1300);
setTimeout(() => updateRequest.subscribe(val => console.log("Response 1500:", val)), 1500);
setTimeout(() => updateRequest.subscribe(val => console.log("Response 3500:", val)), 3500);
setTimeout(() => updateRequest.subscribe(val => console.log("Response 3800:", val)), 3800);
Vea una demostración en vivo: https://jsbin.com/kebapu/2/edit?js,console
Este ejemplo se imprime en la consola:
Response 0: 1
Response 50: 1
Response 200: 1
Response 1200: 1
Response 1300: 1
Response 1500: 2
Response 3500: 2
Response 3800: 3
Observe que tanto 1200
como 1300
reciben el valor 1
porque ese es el valor almacenado en caché aunque ahora no es válido. La primera llamada a 1200
simplemente genera una nueva solicitud HTTP sin esperar su respuesta y solo emite el valor almacenado en caché. Luego, a las 1500
el valor nuevo se almacena en caché, por lo que se vuelve a emitir. Lo mismo se aplica en 3500
y 3800
.
Tenga en cuenta que el suscriptor a las 1200
recibirá la next
notificación inmediatamente, pero la notificación complete
se enviará solo después de que la solicitud HTTP haya finalizado. Tenemos que esperar porque si enviamos el mensaje complete
justo después de la next
, haríamos que la cadena eliminara sus elementos desechables, lo que también debería cancelar la solicitud HTTP (que es lo que definitivamente no queremos hacer).
Respuesta actualizada:
Si siempre desea utilizar el valor anterior mientras se realiza una nueva solicitud, puede poner otro tema en la cadena que mantenga el valor más reciente.
Luego puede repetir el valor para que sea posible saber si proviene de la memoria caché o no. El suscriptor puede entonces filtrar los valores almacenados en caché si no están interesados en ellos.
// Take values while they pass the predicate, then return one more
// i.e also return the first value which returned false
const takeWhileInclusive = predicate => src =>
src
.flatMap(v => Observable.from([v, v]))
.takeWhile((v, index) =>
index % 2 === 0 ? true : predicate(v, index)
)
.filter((v, index) => index % 2 !== 1);
// Source observable will still push its values into the subject
// even after the subscriber unsubscribes
const keepHot = subject => src =>
Observable.create(subscriber => {
src.subscribe(subject);
return subject.subscribe(subscriber);
});
const cachedRequest = request
// Subjects below only store the most recent value
// so make sure most recent is marked as ''fromCache''
.flatMap(v => Observable.from([
{fromCache: false, value: v},
{fromCache: true, value: v}
]))
// Never complete subject
.concat(Observable.never())
// backup cache while new request is in progress
.let(keepHot(new ReplaySubject(1)))
// main cache with expiry time
.let(keepHot(new ReplaySubject(1, this.RECACHE_INTERVAL)))
.publish()
.refCount()
.let(takeWhileInclusive(v => v.fromCache));
// Cache will be re-filled by request when there is another subscription after RECACHE_INTERVAL
// Subscribers will get the most recent cached value first then an updated value
https://acutmore.jsbin.com/kekevib/8/edit?js,console
Respuesta original:
En lugar de establecer un tamaño de ventana en el objeto de reproducción, puede cambiar la fuente observable para que se repita después de un retraso.
const observable = Observable.defer(
() => actualFn().do(() => this.console.log(''CACHE MISS'', cacheKey))
)
.repeatWhen(_ => _.delay(this.RECACHE_INTERVAL))
.publishReplay(1)
.refCount()
.take(1)
.do(() => this.console.log(''CACHE HIT'', cacheKey));
El operador de repeatWhen
requiere RxJs-beta12 o superior https://github.com/ReactiveX/rxjs/blob/master/CHANGELOG.md#500-beta12-2016-09-09