que observables multiple forkjoin ejemplo angular reactjs react-native rxjs observable

angular - multiple - MĂșltiples observables trabajando juntos



rxjs subscribe to multiple observables (2)

Tengo una entrada que, a medida que el usuario escribe, realiza una búsqueda en tiempo real. Por ejemplo, supongamos que ha buscado lo siguiente:

coche

El resultado sería:

[ { id: "1" name: "Ferrari" }, { id: "2" name: "Porsche" } ]

Esto lo pude hacer con éxito, así es cómo:

class WordComponent { word: Subject<string> = new Subject<string>(); result: any[] = []; constructor(private http: Http) { this.subscribe(); } subscribe(): void { this.word.debounceTime(400) .distinctUntilChanged() .switchMap((word: string): Observable<any[]> => this.http.get(word)) .subscribe((result: any[]): any[] => this.result = result); } search(event: any): void { this.word.next(event.target.value); } }

Y la vista:

<input type="text" placeholder="Word" (keyup)="search($event)">

Quiero que el usuario pueda escribir varias palabras al mismo tiempo y realizar una búsqueda en tiempo real de cada palabra por separado. Por ejemplo, supongamos que ha buscado lo siguiente:

sol de comida del coche

El resultado para el automóvil sería:

[ { id: "1" name: "Ferrari" }, { id: "2" name: "Porsche" } ]

El resultado para la comida sería:

[ { id: "3" name: "egg" }, { id: "4" name: "cheese" } ]

El resultado para el sol sería:

[ { id: "5" name: "star" }, { id: "6" name: "sky" } ]

Y también fusiona los resultados de cada palabra, en este caso se vería así:

[ [{ id: "1" name: "Ferrari" }, { id: "2" name: "Porsche" } ], [{ id: "3" name: "egg" }, { id: "4" name: "cheese" } ], [{ id: "5" name: "star" }, { id: "6" name: "sky" } ] ]

Pero digamos que el usuario, después de escribir todas las palabras y realizar la búsqueda, desea cambiar una de ellas. Solo se debe rehacer la búsqueda de la palabra que se modificó, y la fusión del resultado final también deberá rehacerse.

Todavía no conozco todas las características de rxjs y no sé cuál sería la forma ideal de lograrlo. Si desea una referencia, el sitio Display Purposes tiene un motor de búsqueda muy similar.


Creo que necesitas algo como esto:

subscribe(): void { this.word.debounceTime(400) .distinctUntilChanged() .switchMap((words: string): Observable<any[]> => Observable.forkJoin(words.split('' '').map(word => this.http.get(word))) ) .map(arrayWithArrays => [].concat(arrayWithArrays) .subscribe((result: any[]): any[] => this.result = result); }


He encontrado esta solución enorme pero parcial:

Violín aquí: https://jsfiddle.net/mtawrhLs/1/

Rx.Observable.prototype.combineAllCont = function () { let lastValues = []; return Rx.Observable.create(obs => { let i = 0; let subscription = this.subscribe(stream => { const streamIndex = i; subscription.add(stream.subscribe(res => { lastValues[streamIndex] = res; obs.next(lastValues); })); i++; }); return subscription; }); } /** STUFF TO DEMO **/ let searchBox = [ '''', ''car'', ''car '', ''car food'', ''car food sun'', ''cat food sun'', ''cat sun'' ] function performSearch(word) { return Rx.Observable.defer(() => { console.log(''sending search for '' + word); return Rx.Observable.of({ word: word, results: word.split('''') }); }) } let searchBoxStream = Rx.Observable.interval(500) .take(searchBox.length * 2) .map((i) => searchBox[i % searchBox.length]); /** GOOD STUFF STARTS HERE **/ let wordsStream = searchBoxStream .map((str) => str.trim().split('' '').filter((w) => w.trim() != '''')); let wordSearchSubjects = []; let wordSearchStreamSubject = new Rx.ReplaySubject(1); wordsStream.subscribe((words) => { const nWords = words.length; const nSubjects = wordSearchSubjects.length; // Update streams for (i = 0; i < nWords && i < nSubjects; i++) { wordSearchSubjects[i].next(words[i]); } // Create streams for (i = nSubjects; i < nWords; i++) { const wordSearchSubject = new Rx.ReplaySubject(1); wordSearchSubjects.push(wordSearchSubject); wordSearchStreamSubject.next( wordSearchSubject.asObservable() .distinctUntilChanged() .flatMap((w) => performSearch(w)) .concat(Rx.Observable.of(false)) // Ending signal ) wordSearchSubjects[i].next(words[i]); } // Delete streams for (i = nWords; i < nSubjects; i++) { wordSearchSubjects[i].complete(); } wordSearchSubjects.length = nWords; }); let wordSearchStream = wordSearchStreamSubject .combineAllCont() .map((arr) => arr.filter((r) => r !== false)); resultingStream = wordSearchStream .map((arr) => { let ret = []; arr.forEach(search => { ret.push(search.results); }); return ret; }) .subscribe((arr) => console.log(arr));

Cosas para mejorar:

  1. Tuve que usar una combineAll personalizada. combineAll implementación que no espera la transmisión original para completarse. Esto puede proporcionar pérdidas de memoria si no se implementa correctamente, y debe eliminar las suscripciones internas que se completan (no lo hace ahora)
  2. La detección de cambio de palabras debe mejorarse: si quita una palabra del medio, la mitad de las palabras se buscará nuevamente.

Probablemente hay una solución mejor y más directa, pero podría salir con este lío. Se siente muy hacky / peligroso.