javascript angular rxjs

javascript - Rusible RxJS stream



angular (5)

Tengo un componente simple con un solo botón que inicia y detiene el flujo de números generados por el temporizador RxJS.

import { Component, OnInit } from ''@angular/core''; import { BehaviorSubject, Observable, timer, merge } from ''rxjs''; import { filter, bufferToggle, windowToggle, mergeMap, mergeAll, share } from ''rxjs/operators''; @Component({ selector: ''my-app'', template: `<button (click)="toggle()">{{ (active$ | async) ? ''Pause'' : ''Play'' }}</button>`, styleUrls: [ ''./app.component.css'' ] }) export class AppComponent implements OnInit { active$ = new BehaviorSubject<boolean>(true); ngOnInit(): void { const on$ = this.active$.pipe(filter(v => v)); const off$ = this.active$.pipe(filter(v => !v)); const stream$ = timer(500, 500).pipe(share()); const out$ = merge( stream$.pipe( bufferToggle(off$, () => on$), mergeAll(), ), stream$.pipe( windowToggle(on$, () => off$), mergeAll(), ), ); out$.subscribe(v => console.log(v)); } toggle(): void { this.active$.next(!this.active$.value); } }

Esto funciona perfectamente.

Necesito agregar una característica más. Necesito pausar la transmisión ( this.active$.next(false) ) automáticamente usando la condición basada en el valor de la transmisión. Por ejemplo, pausa la secuencia si el último valor es el múltiplo de 5.

¿Tienes alguna idea sobre cómo hacer esto?

Aquí hay un ejemplo ejecutable en stackblitz https://stackblitz.com/edit/angular-6hjznn


Aquí hay un operador de pausa personalizado que solo acumulará valores en un búfer cuando la señal de pausa sea true , y los emitirá uno por uno cuando sea false .

Combínelo con un operador de tap simple para alternar la señal de pausa del sujeto de comportamiento cuando el valor alcanza una condición específica, y tiene algo que hará una pausa en el clic del botón y también se detendrá cuando el valor cumpla con una condición (múltiplo de 12 en este caso):

Aquí está el operador de pause :

function pause<T>(pauseSignal: Observable<boolean>) { return (source: Observable<T>) => Observable.create(observer => { const buffer = []; let paused = false; let error; let isComplete = false; function notify() { while (!paused && buffer.length) { const value = buffer.shift(); observer.next(value); } if (!buffer.length && error) { observer.error(error); } if (!buffer.length && isComplete) { observer.complete(); } } const subscription = pauseSignal.subscribe( p => { paused = !p; notify(); }, e => { error = e; notify(); }, () => {}); subscription.add(source.subscribe( v => { buffer.push(v); notify(); }, e => { error = e; notify(); }, () => { isComplete = true; notify(); } )); return subscription; }); }

Aquí está el uso de él:

const CONDITION = x => (x > 0) && ((x % 12) === 0); // is multiple this.active$ = new BehaviorSubject<boolean>(true); const stream$ = timer(500, 500); const out$ = stream$.pipe( pause(this.active$), tap(value => { if (CONDITION(value)) { this.active$.next(false); } })); this.d = out$.subscribe(v => console.log(v));

Y un ejemplo de trabajo: https://stackblitz.com/edit/angular-bvxnbf


Aquí hay una forma sencilla de hacerlo. Use el timer() solo como un emisor, e incremente una cuenta por separado. Esto te da un poco más de control directo.

export class AppComponent implements OnInit { active = true; out$: Observable<number>; count = 0; ngOnInit(): void { const stream$ = timer(500, 500); this.out$ = stream$.pipe( filter(v => this.active), map(v => { this.count += 1; return this.count; }), tap(v => { if (this.count % 5 === 0) { this.active = false; } }) ) } }

https://stackblitz.com/edit/angular-nzs7zh


Podría implementar un búfer personalizado utilizando una matriz. Cuando bufferToggle emite, agregue esos valores a su búfer personalizado. Luego tome los valores del búfer personalizado hasta que un determinado elemento en el búfer coincida con una condición de detención. Emitir esos valores y pausar su flujo.

ngOnInit(): void { this.active$ = new BehaviorSubject<boolean>(true); const on$ = this.active$.pipe(filter(v => v)); const off$ = this.active$.pipe(filter(v => !v)); timer(500, 500).pipe( share(), pausable(on$, off$, v => this.active$.value && this.pauseOn(v), () => this.active$.next(false), false) ).subscribe(console.log); } pauseOn(value: number): boolean { return value % 10 === 0; } export function pausable<T, O>( on$: Observable<any>, off$: Observable<O>, haltCondition: (value: T) => boolean, pause: () => void, spread: boolean) { return (source: Observable<T>) => defer(() => { // defer is used so that each subscription gets its own buffer let buffer: T[] = []; return merge( source.pipe( bufferToggle(off$, () => on$), tap(values => buffer = buffer.concat(values)), // append values to your custom buffer map(_ => buffer.findIndex(haltCondition)), // find the index of the first element that matches the halt condition tap(haltIndex => haltIndex >= 0 ? pause() : null), // pause the stream when a value matching the halt condition was found map(haltIndex => buffer.splice(0, haltIndex === -1 ? customBuffer.length : haltIndex + 1)), // get all values from your custom buffer until a haltCondition is met mergeMap(toEmit => spread ? from(toEmit) : toEmit.length > 0 ? of(toEmit) : EMPTY) // optional value spread (what your mergeAll did) ), source.pipe( windowToggle(on$, () => off$), mergeMap(x => x), tap(value => haltCondition(value) ? pause() : null), // pause the stream when an unbuffered value matches the halt condition ), ); }); }

El operador en pausa emitirá actualmente valores que coincidan con la condición de detención y luego detendrá el flujo directamente después. Podría ajustarse a sus necesidades específicas, por ejemplo, simplificado con menos parámetros de entrada o se podría incorporar el pausable share en pausable .

Aquí hay un stackblitz: https://stackblitz.com/edit/angular-rbv9by

No distribuí la matriz del búfer en este ejemplo para que pueda ver fácilmente qué valores provienen del búfer y cuáles no.

Original

Use tap en algún lugar de su flujo antes del búfer para verificar el valor y pausar el flujo según alguna condición.

this.stream$.pipe( tap(value => this.active$.value && value % 5 === 0 ? this.active$.next(false) : null), bufferToggle(off$, () => on$), mergeAll(), )


Supongo que el comportamiento deseado no está relacionado con obtener los valores que el temporizador emite per se, y que en lugar de pausar las notificaciones a un flujo continuo (en su ejemplo, el temporizador continúa aunque no veamos los valores impreso), está bien dejar de emitir cuando está en pausa.

Mi solución está inspirada en la receta del cronómetro.

La solución a continuación utiliza dos botones separados para reproducir y pausar, pero puede ajustar esto al gusto. Pasamos los botones (ViewChild) al servicio en el gancho ngAfterViewInit del componente, luego nos suscribimos a la transmisión.

// pausable.component.ts ngAfterViewInit() { this.pausableService.initPausableStream(this.start.nativeElement, this.pause.nativeElement); this.pausableService.counter$ .pipe(takeUntil(this.unsubscribe$)) // don''t forget to unsubscribe :) .subscribe((state: State) => { console.log(state.value); // whatever you need }); }

// pausable.service.ts import { Injectable } from ''@angular/core''; import { merge, fromEvent, Subject, interval, NEVER } from ''rxjs''; import { mapTo, startWith, scan, switchMap, tap, map } from ''rxjs/operators''; export interface State { active: boolean; value: number; } @Injectable({ providedIn: ''root'' }) export class PausableService { public counter$; constructor() { } initPausableStream(start: HTMLElement, pause: HTMLElement) { // convenience functions to map an element click to a result const fromClick = (el: HTMLElement) => fromEvent(el, ''click''); const clickMapTo = (el: HTMLElement, obj: {}) => fromClick(el).pipe(mapTo(obj)); const pauseByCondition$ = new Subject(); const pauseCondition = (state: State): boolean => state.value % 5 === 0 && state.value !== 0; // define the events that may trigger a change const events$ = merge( clickMapTo(start, { active: true }), clickMapTo(pause, { active: false }), pauseByCondition$.pipe(mapTo({ active: false })) ); // switch the counter stream based on events this.counter$ = events$.pipe( startWith({ active: true, value: 0 }), scan((state: State, curr) => ({ ...state, ...curr }), {}), switchMap((state: State) => state.active ? interval(500).pipe( tap(_ => ++state.value), map(_ => state)) : NEVER), tap((state: State) => { if (pauseCondition(state)) { pauseByCondition$.next(); // trigger pause } }) ); } }


Tan simple como se puede obtener con un windowToggle y usar el ejemplo de trabajo active.next (falso): https://stackblitz.com/edit/angular-pdw7kw

defer(() => { let count = 0; return stream$.pipe( windowToggle(on$, () => off$), exhaustMap(obs => obs), mergeMap(_ => { if ((++count) % 5 === 0) { this.active$.next(false) return never() } return of(count) }), ) }).subscribe(console.log)