example await async all javascript node.js es6-promise

javascript - await - promise.all example



¿Cuál es la mejor manera de limitar la concurrencia al usar Promise.all() de ES6? (7)

Tengo un código que está iterando sobre una lista que fue consultada desde una base de datos y haciendo una solicitud HTTP para cada elemento en esa lista. Esa lista a veces puede ser un número razonablemente grande (en miles), y me gustaría asegurarme de que no esté llegando a un servidor web con miles de solicitudes HTTP simultáneas.

Una versión abreviada de este código actualmente se parece a esto ...

function getCounts() { return users.map(user => { return new Promise(resolve => { remoteServer.getCount(user) // makes an HTTP request .then(() => { /* snip */ resolve(); }); }); }); } Promise.all(getCounts()).then(() => { /* snip */});

Este código se está ejecutando en el nodo 4.3.2. Para reiterar, ¿se puede administrar Promise.all para que solo un cierto número de Promises estén en curso en un momento dado?


Así que traté de hacer que algunos ejemplos mostrados funcionaran para mi código, pero como esto era solo para un script de importación y no un código de producción, usar el paquete npm batch-promises fue sin duda el camino más fácil para mí

NOTA: Requiere tiempo de ejecución para ser compatible con Promise o para ser rellenado.

Api batchPromises (int: batchSize, array: Collection, i => Promise: Iteratee) Promise: Iteratee se llamará después de cada lote.

Utilizar:

const identifyTransactions = async function() { let promises = [] let concurrency = 0 for (let tx of this.transactions) { if (concurrency > 4) await Promise.all(promises).then(r => { promises = []; concurrency = 0 }) promises.push(tx.identifyTransaction()) concurrency++ } if (promises.length > 0) await Promise.all(promises) //resolve the rest }


En lugar de usar promesas para limitar las solicitudes de http, use el http.Agent.maxSockets incorporado del http.Agent.maxSockets . Esto elimina el requisito de usar una biblioteca o escribir su propio código de agrupación, y tiene la ventaja adicional de que tiene más control sobre lo que está limitando.

agent.maxSockets

Por defecto está configurado en Infinito. Determina cuántos sockets concurrentes puede abrir el agente por origen. Origin es una combinación de ''host: puerto'' o ''host: puerto: localAddress''.

Por ejemplo:

var http = require(''http''); var agent = new http.Agent({maxSockets: 5}); // 5 concurrent connections per origin var request = http.request({..., agent: agent}, ...);

Si realiza múltiples solicitudes al mismo origen, también podría beneficiarle establecer keepAlive en true (consulte la documentación anterior para obtener más información).


Esto fue lo que hice usando Promise.all , dentro de mi código aquí

const pLimit = require(''p-limit''); // Example Concurrency of 3 promise at once const limit = pLimit(3); let urls = [ "http://www.exampleone.com/", "http://www.exampletwo.com/", "http://www.examplethree.com/", "http://www.examplefour.com/", ] // Create an array of our promises using map (fetchData() returns a promise) let promises = urls.map(url => { // wrap the function we are calling in the limit function we defined above return limit(() => fetchData(url)); }); (async () => { // Only three promises are run at once (as defined above) const result = await Promise.all(promises); console.log(result); })();

Si quieres ver un ejemplo: https://jsfiddle.net/thecodermarcelo/av2tp83o/5/


Si sabe cómo funcionan los iteradores y cómo se consumen, no necesitará ninguna biblioteca adicional, ya que puede ser muy fácil crear su propia concurrencia. Déjame demostrar:

/* [Symbol.iterator]() is equivalent to .values() const iterator = [1,2,3][Symbol.iterator]() */ const iterator = [1,2,3].values() // loop over all items with for..of for (const x of iterator) { console.log(''x:'', x) // notices how this loop continues the same iterator // and consumes the rest of the iterator, making the // outer loop not logging any more x''s for (const y of iterator) { console.log(''y:'', y) } }

Podemos usar el mismo iterador y compartirlo entre los trabajadores.
Si hubiera usado .entries() lugar de .values() , habría obtenido una matriz 2D con [index, value] que demostraré a continuación con una concurrencia de 2

const sleep = n => new Promise(rs => setTimeout(rs,n)) async function doWork(iterator) { for (let [index, item] of iterator) { await sleep(1000) console.log(index + '': '' + item) } } const arr = Array.from(''abcdefghij'') const workers = new Array(2).fill(arr.entries()).map(doWork) // ^--- starts two workers sharing the same iterator Promise.all(workers).then(() => console.log(''done''))

Nota: lo diferente de esto en comparación con el ejemplo de async-pool es que genera dos trabajadores, por lo que si un trabajador lanza un error por alguna razón en el índice 5, no impedirá que el otro trabajador haga el resto. Así que pasas de hacer 2 concurrencias a 1. (para que no se detenga ahí) Y luego será más difícil saber cuándo terminan todos los trabajadores, ya que Promise.all . Promise.all saldrán pronto si uno falla. Así que mi consejo es que atrapes todos los errores dentro de la función doWork


Tenga en cuenta que Promise.all() no Promise.all() las promesas para comenzar su trabajo, al crear la promesa sí lo hace.

Teniendo esto en cuenta, una solución sería verificar cada vez que se resuelva una promesa si se debe iniciar una nueva promesa o si ya se encuentra en el límite.

Sin embargo, realmente no hay necesidad de reinventar la rueda aquí. Una biblioteca que podría usar para este propósito es es6-promise-pool . De sus ejemplos:

// On the Web, leave out this line and use the script tag above instead. var PromisePool = require(''es6-promise-pool'') var promiseProducer = function () { // Your code goes here. // If there is work left to be done, return the next work item as a promise. // Otherwise, return null to indicate that all promises have been created. // Scroll down for an example. } // The number of promises to process simultaneously. var concurrency = 3 // Create a pool. var pool = new PromisePool(promiseProducer, concurrency) // Start the pool. var poolPromise = pool.start() // Wait for the pool to settle. poolPromise.then(function () { console.log(''All promises fulfilled'') }, function (error) { console.log(''Some promise rejected: '' + error.message) })


http://bluebirdjs.com/docs/api/promise.map.html de bluebird puede tomar una opción de concurrencia para controlar cuántas promesas deben ejecutarse en paralelo. A veces es más fácil que .all porque no necesita crear la matriz de promesa.

const Promise = require(''bluebird'') function getCounts() { return Promise.map(users, user => { return new Promise(resolve => { remoteServer.getCount(user) // makes an HTTP request .then(() => { /* snip */ resolve(); }); }); }, {concurrency: 10}); // <---- at most 10 http requests at a time }


P-Limit

He comparado la limitación de concurrencia de promesa con un script personalizado, bluebird, es6-promise-pool y p-limit. Creo que npmjs.com/package/p-limit tiene la implementación más simple y simplificada para esta necesidad. npmjs.com/package/p-limit .

Requerimientos

Ser compatible con async en el ejemplo.

Mi ejemplo

En este ejemplo, necesitamos ejecutar una función para cada URL en la matriz (como, tal vez, una solicitud de API). Aquí esto se llama fetchData() . Si tuviéramos que procesar una serie de miles de elementos, la concurrencia definitivamente sería útil para ahorrar recursos de CPU y memoria.

batch-promises Easily batch promises NOTE: Requires runtime to support Promise or to be polyfilled. Api batchPromises(int: batchSize, array: Collection, i => Promise: Iteratee) The Promise: Iteratee will be called after each batch. Use: import batchPromises from ''batch-promises''; batchPromises(2, [1,2,3,4,5], i => new Promise((resolve, reject) => { // The iteratee will fire after each batch resulting in the following behaviour: // @ 100ms resolve items 1 and 2 (first batch of 2) // @ 200ms resolve items 3 and 4 (second batch of 2) // @ 300ms resolve remaining item 5 (last remaining batch) setTimeout(() => { resolve(i); }, 100); })) .then(results => { console.log(results); // [1,2,3,4,5] });

El resultado del registro de la consola es una matriz de los datos de respuesta de promesas resueltos.