angular - multiple - MĂșltiples observables trabajando juntos
rxjs subscribe to multiple observables (2)
Tengo una entrada que, a medida que el usuario escribe, realiza una búsqueda en tiempo real. Por ejemplo, supongamos que ha buscado lo siguiente:
coche
El resultado sería:
[
{
id: "1"
name: "Ferrari"
},
{
id: "2"
name: "Porsche"
}
]
Esto lo pude hacer con éxito, así es cómo:
class WordComponent {
word: Subject<string> = new Subject<string>();
result: any[] = [];
constructor(private http: Http) {
this.subscribe();
}
subscribe(): void {
this.word.debounceTime(400)
.distinctUntilChanged()
.switchMap((word: string): Observable<any[]> => this.http.get(word))
.subscribe((result: any[]): any[] => this.result = result);
}
search(event: any): void {
this.word.next(event.target.value);
}
}
Y la vista:
<input type="text" placeholder="Word" (keyup)="search($event)">
Quiero que el usuario pueda escribir varias palabras al mismo tiempo y realizar una búsqueda en tiempo real de cada palabra por separado. Por ejemplo, supongamos que ha buscado lo siguiente:
sol de comida del coche
El resultado para el automóvil sería:
[
{
id: "1"
name: "Ferrari"
},
{
id: "2"
name: "Porsche"
}
]
El resultado para la comida sería:
[
{
id: "3"
name: "egg"
},
{
id: "4"
name: "cheese"
}
]
El resultado para el sol sería:
[
{
id: "5"
name: "star"
},
{
id: "6"
name: "sky"
}
]
Y también fusiona los resultados de cada palabra, en este caso se vería así:
[
[{
id: "1"
name: "Ferrari"
},
{
id: "2"
name: "Porsche"
}
],
[{
id: "3"
name: "egg"
},
{
id: "4"
name: "cheese"
}
],
[{
id: "5"
name: "star"
},
{
id: "6"
name: "sky"
}
]
]
Pero digamos que el usuario, después de escribir todas las palabras y realizar la búsqueda, desea cambiar una de ellas. Solo se debe rehacer la búsqueda de la palabra que se modificó, y la fusión del resultado final también deberá rehacerse.
Todavía no conozco todas las características de rxjs y no sé cuál sería la forma ideal de lograrlo. Si desea una referencia, el sitio Display Purposes tiene un motor de búsqueda muy similar.
Creo que necesitas algo como esto:
subscribe(): void {
this.word.debounceTime(400)
.distinctUntilChanged()
.switchMap((words: string): Observable<any[]> =>
Observable.forkJoin(words.split('' '').map(word => this.http.get(word)))
)
.map(arrayWithArrays => [].concat(arrayWithArrays)
.subscribe((result: any[]): any[] => this.result = result);
}
He encontrado esta solución enorme pero parcial:
Violín aquí: https://jsfiddle.net/mtawrhLs/1/
Rx.Observable.prototype.combineAllCont = function () {
let lastValues = [];
return Rx.Observable.create(obs => {
let i = 0;
let subscription = this.subscribe(stream => {
const streamIndex = i;
subscription.add(stream.subscribe(res => {
lastValues[streamIndex] = res;
obs.next(lastValues);
}));
i++;
});
return subscription;
});
}
/** STUFF TO DEMO **/
let searchBox = [
'''',
''car'',
''car '',
''car food'',
''car food sun'',
''cat food sun'',
''cat sun''
]
function performSearch(word) {
return Rx.Observable.defer(() => {
console.log(''sending search for '' + word);
return Rx.Observable.of({
word: word,
results: word.split('''')
});
})
}
let searchBoxStream = Rx.Observable.interval(500)
.take(searchBox.length * 2)
.map((i) => searchBox[i % searchBox.length]);
/** GOOD STUFF STARTS HERE **/
let wordsStream = searchBoxStream
.map((str) => str.trim().split('' '').filter((w) => w.trim() != ''''));
let wordSearchSubjects = [];
let wordSearchStreamSubject = new Rx.ReplaySubject(1);
wordsStream.subscribe((words) => {
const nWords = words.length;
const nSubjects = wordSearchSubjects.length;
// Update streams
for (i = 0; i < nWords && i < nSubjects; i++) {
wordSearchSubjects[i].next(words[i]);
}
// Create streams
for (i = nSubjects; i < nWords; i++) {
const wordSearchSubject = new Rx.ReplaySubject(1);
wordSearchSubjects.push(wordSearchSubject);
wordSearchStreamSubject.next(
wordSearchSubject.asObservable()
.distinctUntilChanged()
.flatMap((w) => performSearch(w))
.concat(Rx.Observable.of(false)) // Ending signal
)
wordSearchSubjects[i].next(words[i]);
}
// Delete streams
for (i = nWords; i < nSubjects; i++) {
wordSearchSubjects[i].complete();
}
wordSearchSubjects.length = nWords;
});
let wordSearchStream = wordSearchStreamSubject
.combineAllCont()
.map((arr) => arr.filter((r) => r !== false));
resultingStream = wordSearchStream
.map((arr) => {
let ret = [];
arr.forEach(search => {
ret.push(search.results);
});
return ret;
})
.subscribe((arr) => console.log(arr));
Cosas para mejorar:
- Tuve que usar una
combineAll
personalizada.combineAll
implementación que no espera la transmisión original para completarse. Esto puede proporcionar pérdidas de memoria si no se implementa correctamente, y debe eliminar las suscripciones internas que se completan (no lo hace ahora) - La detección de cambio de palabras debe mejorarse: si quita una palabra del medio, la mitad de las palabras se buscará nuevamente.
Probablemente hay una solución mejor y más directa, pero podría salir con este lío. Se siente muy hacky / peligroso.