javascript - programacion - ¿Cómo obtener el valor actual de RxJS Subject u Observable?
rxjs en 30 dias (8)
Tengo un servicio Angular 2:
import {Storage} from ''./storage'';
import {Injectable} from ''angular2/core'';
import {Subject} from ''rxjs/Subject'';
@Injectable()
export class SessionStorage extends Storage {
private _isLoggedInSource = new Subject<boolean>();
isLoggedIn = this._isLoggedInSource.asObservable();
constructor() {
super(''session'');
}
setIsLoggedIn(value: boolean) {
this.setItem(''_isLoggedIn'', value, () => {
this._isLoggedInSource.next(value);
});
}
}
Todo funciona muy bien. Pero tengo otro componente que no necesita suscribirse, solo necesita obtener el valor actual de isLoggedIn en un momento determinado. ¿Cómo puedo hacer esto?
¡La única forma de obtener valores "de" un Observable / Asunto es con suscribirse!
Si está usando
getValue()
, está haciendo algo imprescindible en el paradigma declarativo.
Está allí como una escotilla de escape, pero el
99.9% del tiempo NO debe usar
getValue()
.
Hay algunas cosas interesantes que
getValue()
hará: arrojará un error si el sujeto se ha dado de baja, evitará que obtenga un valor si el sujeto está muerto porque está errado, etc. Pero, de nuevo, está ahí como una escotilla de escape para raras circunstancias.
Hay varias formas de obtener el último valor de un sujeto u observable de una manera "Rx-y":
-
Usando
BehaviorSubject
: Pero en realidad suscribiéndome a él . Cuando se suscribe por primera vez aBehaviorSubject
, enviará sincrónicamente el valor anterior que recibió o con el que se inicializó. -
Uso de un
ReplaySubject(N)
: esto almacenará en caché los valores deN
y los reproducirá a los nuevos suscriptores. -
A.withLatestFrom(B)
: utilice este operador para obtener el valor más reciente deB
observable cuando se emiteA
observable. Le dará ambos valores en una matriz[a, b]
. -
A.combineLatest(B)
: utilice este operador para obtener los valores más recientes deA
yB
cada vez que se emitenA
oB
Le dará ambos valores en una matriz. -
shareReplay()
: realiza una multidifusión observable a través de unReplaySubject
, pero le permite volver a intentar el observable en caso de error. (Básicamente te da ese comportamiento prometedor de almacenamiento en caché). -
publishReplay()
,publishBehavior(initialValue)
,multicast(subject: BehaviorSubject | ReplaySubject)
, etc.: otros operadores que aprovechanBehaviorSubject
yReplaySubject
. Diferentes sabores de la misma cosa, básicamente multidifunden la fuente observable al canalizar todas las notificaciones a través de un tema. Debe llamar aconnect()
para suscribirse a la fuente con el asunto.
Encontré el mismo problema en componentes secundarios donde inicialmente tendría que tener el valor actual del Asunto, luego suscribirse al Asunto para escuchar los cambios. Solo mantengo el valor actual en el Servicio para que esté disponible para que los componentes accedan, por ejemplo:
import {Storage} from ''./storage'';
import {Injectable} from ''angular2/core'';
import {Subject} from ''rxjs/Subject'';
@Injectable()
export class SessionStorage extends Storage {
isLoggedIn: boolean;
private _isLoggedInSource = new Subject<boolean>();
isLoggedIn = this._isLoggedInSource.asObservable();
constructor() {
super(''session'');
this.currIsLoggedIn = false;
}
setIsLoggedIn(value: boolean) {
this.setItem(''_isLoggedIn'', value, () => {
this._isLoggedInSource.next(value);
});
this.isLoggedIn = value;
}
}
Un componente que necesita el valor actual podría acceder desde el servicio, es decir:
sessionStorage.isLoggedIn
No estoy seguro si esta es la práctica correcta :)
Puede almacenar el último valor emitido por separado del Observable. Luego léelo cuando sea necesario.
// global.d.ts
declare module ''rxjs'' {
interface Observable<T> {
/**
* _Extension Method_ - Returns current value of an Observable.
* Value is retrieved using _first()_ operator to avoid the need to unsubscribe.
*/
value: Observable<T>;
}
}
// observable.extension.ts
Object.defineProperty(Observable.prototype, ''value'', {
get <T>(this: Observable<T>): Observable<T> {
return this.pipe(
filter(value => value !== null && value !== undefined),
first());
},
});
// using the extension getter example
this.myObservable$.value
.subscribe(value => {
// whatever code you need...
});
Tuve una situación similar en la que los suscriptores tardíos se suscriben al Asunto después de que llegó su valor.
Encontré ReplaySubject que es similar a BehaviorSubject, funciona de ReplaySubject en este caso. Y aquí hay un enlace para una mejor explicación: http://reactivex.io/rxjs/manual/overview.html#replaysubject
Un
Subject
u
Observable
no tiene un valor actual.
Cuando se emite un valor, se pasa a los suscriptores y el
Observable
con él.
Si desea tener un valor actual, use
BehaviorSubject
que está diseñado exactamente para ese propósito.
BehaviorSubject
mantiene el último valor emitido y lo emite inmediatamente a los nuevos suscriptores.
También tiene un método
getValue()
para obtener el valor actual.
Una respuesta similar fue rechazada. Pero creo que puedo justificar lo que estoy sugiriendo aquí para casos limitados.
Si bien es cierto que un observable no tiene un valor actual , a menudo tendrá un valor disponible de inmediato . Por ejemplo, con las tiendas redux / flux / akita puede solicitar datos de una tienda central, en función de una cantidad de observables y ese valor generalmente estará disponible de inmediato.
Si este es el caso, cuando se
subscribe
, el valor volverá inmediatamente.
Entonces, digamos que recibió una llamada a un servicio y, al finalizar, desea obtener el último valor de algo de su tienda, que posiblemente no emita :
Puede intentar hacer esto (y debe mantener todo lo posible "dentro de las tuberías"):
serviceCallResponse$.pipe(withLatestFrom(store$.select(x => x.customer)))
.subscribe(([ serviceCallResponse, customer] => {
// we have serviceCallResponse and customer
});
El problema con esto es que se bloqueará hasta que el observable secundario emita un valor, que potencialmente nunca podría ser.
Recientemente me encontré necesitando evaluar un observable solo si un valor estaba disponible de inmediato , y lo que es más importante, necesitaba poder detectar si no lo estaba. Terminé haciendo esto:
serviceCallResponse$.pipe()
.subscribe(serviceCallResponse => {
// immediately try to subscribe to get the ''available'' value
// note: immediately unsubscribe afterward to ''cancel'' if needed
let customer = undefined;
// whatever the secondary observable is
const secondary$ = store$.select(x => x.customer);
// subscribe to it, and assign to closure scope
sub = secondary$.pipe(take(1)).subscribe(_customer => customer = _customer);
sub.unsubscribe();
// if there''s a delay or customer isn''t available the value won''t have been set before we get here
if (customer === undefined)
{
// handle, or ignore as needed
return throwError(''Customer was not immediately available'');
}
});
Tenga en cuenta que para todo lo anterior, estoy usando la
subscribe
para obtener el valor (como comenta @Ben).
No uso una propiedad
.value
, incluso si tuviera un
BehaviorSubject
.
Aunque puede sonar exagerado, esta es solo otra solución "posible" para mantener el tipo Observable y reducir las repeticiones ...
Siempre puede crear un getter de extensión para obtener el valor actual de un Observable.
Para hacer esto, necesitaría extender la interfaz
Observable<T>
en un archivo de declaración de
global.d.ts
global.d.ts.
Luego implemente el
getter de extensión
en un archivo
observable.extension.ts
y finalmente incluya ambos tipos y archivo de extensión en su aplicación.
Puede consultar esta Respuesta de para saber cómo incluir las extensiones en su aplicación Angular.
let lastValue: number;
const subscription = new Service().start();
subscription
.subscribe((data) => {
lastValue = data;
}
);
const observable = of(''response'')
function hasValue(value: any) {
return value !== null && value !== undefined;
}
function getValue<T>(observable: Observable<T>): Promise<T> {
return observable
.pipe(
filter(hasValue),
first()
)
.toPromise();
}
const result = await getValue(observable)
// Do the logic with the result
// .................
// .................
// .................
Puede consultar el artículo completo sobre cómo implementarlo desde aquí. https://www.imkrish.com/how-to-get-current-value-of-observable-in-a-clean-way/