node firebaseapp example español dev javascript asynchronous rxjs

javascript - firebaseapp - Espere una operación asincrónica en el Siguiente de RxJS Observable



rxjs node (3)

Cada operación que desee realizar se puede modelar como observable. Incluso la operación síncrona se puede modelar de esta manera. Luego puede usar el map para convertir su secuencia en una secuencia de secuencias, luego use concatAll para aplanar la secuencia.

someObservable .map(function (item) { if (item === "do-something-async") { // create an Observable that will do the async action when it is subscribed // return Rx.Observable.timer(5000); // or maybe an ajax call? Use `defer` so that the call does not // start until concatAll() actually subscribes. return Rx.Observable.defer(function () { return Rx.Observable.ajaxAsObservable(...); }); } else { // do something synchronous but model it as an async operation (using Observable.return) // Use defer so that the sync operation is not carried out until // concatAll() reaches this item. return Rx.Observable.defer(function () { return Rx.Observable.return(someSyncAction(item)); }); } }) .concatAll() // consume each inner observable in sequence .subscribe(function (result) { }, function (error) { console.log("error", error); }, function () { console.log("complete"); });

Para responder a algunos de sus comentarios ... en algún momento debe forzar algunas expectativas sobre la secuencia de funciones. En la mayoría de los lenguajes, cuando se trata de funciones que son posiblemente asincrónicas, las firmas de función son asíncronas y la naturaleza actual asincrónica vs sincronización de la función se oculta como un detalle de implementación de la función. Esto es cierto tanto si usa promesas de javaScript, observables de Rx, tareas de C #, futuros de C ++, etc. Las funciones terminan devolviendo una promesa / observable / tarea / futuro / etc. y si la función es realmente sincrónica, entonces el objeto que devuelve es solo ya completado.

Habiendo dicho eso, dado que esto es JavaScript, puedes hacer trampa:

var makeObservable = function (func) { return Rx.Observable.defer(function () { // execute the function and then examine the returned value. // if the returned value is *not* an Rx.Observable, then // wrap it using Observable.return var result = func(); return result instanceof Rx.Observable ? result: Rx.Observable.return(result); }); } someObservable .map(makeObservable) .concatAll() .subscribe(function (result) { }, function (error) { console.log("error", error); }, function () { console.log("complete"); });

Tengo una secuencia RxJS siendo consumida de la manera normal ...

Sin embargo, en el controlador "onNext" observable, algunas de las operaciones se completarán sincrónicamente, pero otras requieren devoluciones de llamada asincrónicas, que deben esperar antes de procesar el siguiente elemento en la secuencia de entrada.

... un poco confundido cómo hacer esto. ¿Algunas ideas? ¡Gracias!

someObservable.subscribe( function onNext(item) { if (item == ''do-something-async-and-wait-for-completion'') { setTimeout( function() { console.log(''okay, we can continue''); } , 5000 ); } else { // do something synchronously and keep on going immediately console.log(''ready to go!!!''); } }, function onError(error) { console.log(''error''); }, function onComplete() { console.log(''complete''); } );


En primer lugar, mueva sus operaciones de sincronización de subscribe , no está hecho para operaciones de sincronización.

Lo que puedes usar es mergeMap (alias flatMap ) o concatMap . concatMap ambos, porque concatMap es en realidad mergeMap con el parámetro concurrent establecido en 1. Esto es útil, ya que a veces querrá limitar el número de consultas concurrentes, pero aún ejecutar un par de ellas simultáneamente.

source.concatMap(item => { if (item == ''do-something-async-and-wait-for-completion'') { return Rx.Observable.timer(5000) .mapTo(item) .do(e => console.log(''okay, we can continue'')); } else { // do something synchronously and keep on going immediately return Rx.Observable.of(item) .do(e => console.log(''ready to go!!!'')); } }).subscribe();

También mostraré cómo puedes calificar tus llamadas limitadas. Un consejo: solo límite de velocidad en el punto donde realmente lo necesita, como cuando se llama a una API externa que permite solo un cierto número de solicitudes por segundo o minutos. De lo contrario, es mejor limitar el número de operaciones concurrentes y permitir que el sistema se mueva a la velocidad máxima.

Comenzamos con el siguiente fragmento:

const concurrent; const delay; source.mergeMap(item => selector(item, delay) , concurrent)

A continuación, debemos seleccionar los valores de concurrent , delay e implementar selector . concurrent y el delay están estrechamente relacionados. Por ejemplo, si queremos ejecutar 10 elementos por segundo, podemos usar concurrent = 10 y delay = 1000 (milisegundo), pero también concurrent = 5 y delay = 500 o concurrent = 4 y delay = 400 . La cantidad de elementos por segundo siempre será concurrent / (delay / 1000) .

Ahora permite implementar selector . Tenemos un par de opciones. Podemos establecer un tiempo de ejecución mínimo para el selector , podemos agregarle un retraso constante, podemos emitir los resultados tan pronto como estén disponibles, podemos emitir el resultado solo después de que haya pasado el retraso mínimo, etc. Incluso es posible para agregar un tiempo de espera utilizando los operadores de timeout . Conveniencia.

Establezca un tiempo mínimo, envíe el resultado anticipadamente:

function selector(item, delay) { return Rx.Observable.of(item) .delay(1000) // replace this with your actual call. .merge(Rx.Observable.timer(delay).ignoreElements()) }

Establezca un tiempo mínimo, envíe el resultado tarde:

function selector(item, delay) { return Rx.Observable.of(item) .delay(1000) // replace this with your actual call. .zip(Rx.Observable.timer(delay), (item, _)) }

Agregue tiempo, envíe el resultado anticipadamente:

function selector(item, delay) { return Rx.Observable.of(item) .delay(1000) // replace this with your actual call. .concat(Rx.Observable.timer(delay).ignoreElements()) }

Agregue tiempo, envíe el resultado tarde:

function selector(item, delay) { return Rx.Observable.of(item) .delay(1000) // replace this with your actual call. .delay(delay) }


Otro ejemplo simple para realizar operaciones asincrónicas manuales.

¡Tenga en cuenta que no es una buena práctica reactiva! Si solo desea esperar 1000 ms, use Rx.Observable.timer o demore el operador.

someObservable.flatMap(response => { return Rx.Observable.create(observer => { setTimeout(() => { observer.next(''the returned value'') observer.complete() }, 1000) }) }).subscribe()

Ahora, reemplace setTimeout por su función asíncrona, como Image.onload o fileReader.onload ...