javascript - Rusible RxJS stream
angular (5)
Tengo un componente simple con un solo botón que inicia y detiene el flujo de números generados por el temporizador RxJS.
import { Component, OnInit } from ''@angular/core'';
import { BehaviorSubject, Observable, timer, merge } from ''rxjs'';
import { filter, bufferToggle, windowToggle, mergeMap, mergeAll, share } from ''rxjs/operators'';
@Component({
selector: ''my-app'',
template: `<button (click)="toggle()">{{ (active$ | async) ? ''Pause'' : ''Play'' }}</button>`,
styleUrls: [ ''./app.component.css'' ]
})
export class AppComponent implements OnInit {
active$ = new BehaviorSubject<boolean>(true);
ngOnInit(): void {
const on$ = this.active$.pipe(filter(v => v));
const off$ = this.active$.pipe(filter(v => !v));
const stream$ = timer(500, 500).pipe(share());
const out$ = merge(
stream$.pipe(
bufferToggle(off$, () => on$),
mergeAll(),
),
stream$.pipe(
windowToggle(on$, () => off$),
mergeAll(),
),
);
out$.subscribe(v => console.log(v));
}
toggle(): void {
this.active$.next(!this.active$.value);
}
}
Necesito agregar una característica más.
Necesito pausar la transmisión (
this.active$.next(false)
) automáticamente usando la condición basada en el valor de la transmisión.
Por ejemplo, pausa la secuencia si el último valor es el múltiplo de 5.
¿Tienes alguna idea sobre cómo hacer esto?
Aquí hay un ejemplo ejecutable en stackblitz https://stackblitz.com/edit/angular-6hjznn
Aquí hay un operador de pausa personalizado que solo acumulará valores en un búfer cuando la señal de pausa sea
true
, y los emitirá uno por uno cuando sea
false
.
Combínelo con un operador de
tap
simple para alternar la señal de pausa del sujeto de comportamiento cuando el valor alcanza una condición específica, y tiene algo que hará una pausa en el clic del botón y también se detendrá cuando el valor cumpla con una condición (múltiplo de 12 en este caso):
Aquí está el operador de
pause
:
function pause<T>(pauseSignal: Observable<boolean>) {
return (source: Observable<T>) => Observable.create(observer => {
const buffer = [];
let paused = false;
let error;
let isComplete = false;
function notify() {
while (!paused && buffer.length) {
const value = buffer.shift();
observer.next(value);
}
if (!buffer.length && error) {
observer.error(error);
}
if (!buffer.length && isComplete) {
observer.complete();
}
}
const subscription = pauseSignal.subscribe(
p => {
paused = !p;
notify();
},
e => {
error = e;
notify();
},
() => {});
subscription.add(source.subscribe(
v => {
buffer.push(v);
notify();
},
e => {
error = e;
notify();
},
() => {
isComplete = true;
notify();
}
));
return subscription;
});
}
Aquí está el uso de él:
const CONDITION = x => (x > 0) && ((x % 12) === 0); // is multiple
this.active$ = new BehaviorSubject<boolean>(true);
const stream$ = timer(500, 500);
const out$ = stream$.pipe(
pause(this.active$),
tap(value => {
if (CONDITION(value)) {
this.active$.next(false);
}
}));
this.d = out$.subscribe(v => console.log(v));
Y un ejemplo de trabajo: https://stackblitz.com/edit/angular-bvxnbf
Aquí hay una forma sencilla de hacerlo.
Use el
timer()
solo como un emisor, e incremente una cuenta por separado.
Esto te da un poco más de control directo.
export class AppComponent implements OnInit {
active = true;
out$: Observable<number>;
count = 0;
ngOnInit(): void {
const stream$ = timer(500, 500);
this.out$ = stream$.pipe(
filter(v => this.active),
map(v => {
this.count += 1;
return this.count;
}),
tap(v => {
if (this.count % 5 === 0) {
this.active = false;
}
})
)
}
}
Podría implementar un búfer personalizado utilizando una matriz.
Cuando
bufferToggle
emite, agregue esos valores a su búfer personalizado.
Luego tome los valores del búfer personalizado hasta que un determinado elemento en el búfer coincida con una condición de detención.
Emitir esos valores y pausar su flujo.
ngOnInit(): void {
this.active$ = new BehaviorSubject<boolean>(true);
const on$ = this.active$.pipe(filter(v => v));
const off$ = this.active$.pipe(filter(v => !v));
timer(500, 500).pipe(
share(),
pausable(on$, off$, v => this.active$.value && this.pauseOn(v), () => this.active$.next(false), false)
).subscribe(console.log);
}
pauseOn(value: number): boolean { return value % 10 === 0; }
export function pausable<T, O>(
on$: Observable<any>,
off$: Observable<O>,
haltCondition: (value: T) => boolean,
pause: () => void,
spread: boolean) {
return (source: Observable<T>) => defer(() => { // defer is used so that each subscription gets its own buffer
let buffer: T[] = [];
return merge(
source.pipe(
bufferToggle(off$, () => on$),
tap(values => buffer = buffer.concat(values)), // append values to your custom buffer
map(_ => buffer.findIndex(haltCondition)), // find the index of the first element that matches the halt condition
tap(haltIndex => haltIndex >= 0 ? pause() : null), // pause the stream when a value matching the halt condition was found
map(haltIndex => buffer.splice(0, haltIndex === -1 ? customBuffer.length : haltIndex + 1)), // get all values from your custom buffer until a haltCondition is met
mergeMap(toEmit => spread ? from(toEmit) : toEmit.length > 0 ? of(toEmit) : EMPTY) // optional value spread (what your mergeAll did)
),
source.pipe(
windowToggle(on$, () => off$),
mergeMap(x => x),
tap(value => haltCondition(value) ? pause() : null), // pause the stream when an unbuffered value matches the halt condition
),
);
});
}
El operador en pausa emitirá actualmente valores que coincidan con la condición de detención y luego detendrá el flujo directamente después.
Podría ajustarse a sus necesidades específicas, por ejemplo, simplificado con menos parámetros de entrada o se podría incorporar el
pausable
share
en
pausable
.
Aquí hay un stackblitz: https://stackblitz.com/edit/angular-rbv9by
No distribuí la matriz del búfer en este ejemplo para que pueda ver fácilmente qué valores provienen del búfer y cuáles no.
Original
Use
tap
en algún lugar de su flujo antes del búfer para verificar el valor y pausar el flujo según alguna condición.
this.stream$.pipe(
tap(value => this.active$.value && value % 5 === 0 ? this.active$.next(false) : null),
bufferToggle(off$, () => on$),
mergeAll(),
)
Supongo que el comportamiento deseado no está relacionado con obtener los valores que el temporizador emite per se, y que en lugar de pausar las notificaciones a un flujo continuo (en su ejemplo, el temporizador continúa aunque no veamos los valores impreso), está bien dejar de emitir cuando está en pausa.
Mi solución está inspirada en la receta del cronómetro.
La solución a continuación utiliza dos botones separados para reproducir y pausar, pero puede ajustar esto al gusto. Pasamos los botones (ViewChild) al servicio en el gancho ngAfterViewInit del componente, luego nos suscribimos a la transmisión.
// pausable.component.ts
ngAfterViewInit() {
this.pausableService.initPausableStream(this.start.nativeElement, this.pause.nativeElement);
this.pausableService.counter$
.pipe(takeUntil(this.unsubscribe$)) // don''t forget to unsubscribe :)
.subscribe((state: State) => {
console.log(state.value); // whatever you need
});
}
// pausable.service.ts
import { Injectable } from ''@angular/core'';
import { merge, fromEvent, Subject, interval, NEVER } from ''rxjs'';
import { mapTo, startWith, scan, switchMap, tap, map } from ''rxjs/operators'';
export interface State {
active: boolean;
value: number;
}
@Injectable({
providedIn: ''root''
})
export class PausableService {
public counter$;
constructor() { }
initPausableStream(start: HTMLElement, pause: HTMLElement) {
// convenience functions to map an element click to a result
const fromClick = (el: HTMLElement) => fromEvent(el, ''click'');
const clickMapTo = (el: HTMLElement, obj: {}) => fromClick(el).pipe(mapTo(obj));
const pauseByCondition$ = new Subject();
const pauseCondition = (state: State): boolean => state.value % 5 === 0 && state.value !== 0;
// define the events that may trigger a change
const events$ = merge(
clickMapTo(start, { active: true }),
clickMapTo(pause, { active: false }),
pauseByCondition$.pipe(mapTo({ active: false }))
);
// switch the counter stream based on events
this.counter$ = events$.pipe(
startWith({ active: true, value: 0 }),
scan((state: State, curr) => ({ ...state, ...curr }), {}),
switchMap((state: State) => state.active
? interval(500).pipe(
tap(_ => ++state.value),
map(_ => state))
: NEVER),
tap((state: State) => {
if (pauseCondition(state)) {
pauseByCondition$.next(); // trigger pause
}
})
);
}
}
Tan simple como se puede obtener con un
windowToggle
y usar el ejemplo de trabajo active.next (falso):
https://stackblitz.com/edit/angular-pdw7kw
defer(() => {
let count = 0;
return stream$.pipe(
windowToggle(on$, () => off$),
exhaustMap(obs => obs),
mergeMap(_ => {
if ((++count) % 5 === 0) {
this.active$.next(false)
return never()
}
return of(count)
}),
)
}).subscribe(console.log)