node.js - node - Iteración sobre un cursor mongodb en serie(a la espera de devoluciones de llamadas antes de pasar al siguiente documento)
node js mongodb foreach (8)
Usando mongoskin, puedo hacer una consulta como esta, que devolverá un cursor:
myCollection.find({}, function(err, resultCursor) {
resultCursor.each(function(err, result) {
}
}
Sin embargo, me gustaría llamar a algunas funciones asíncronas para cada documento, y solo pasar al siguiente elemento del cursor después de que esto haya devuelto la llamada (similar a la estructura de cada serie en el módulo async.js). P.ej:
myCollection.find({}, function(err, resultCursor) {
resultCursor.each(function(err, result) {
externalAsyncFunction(result, function(err) {
//externalAsyncFunction completed - now want to move to next doc
});
}
}
¿Cómo podría hacer esto?
Gracias
ACTUALIZAR:
No quiero usar toArray()
ya que esta es una operación por lotes grande, y los resultados pueden no ajustarse en la memoria de una vez.
Esto funciona con un gran conjunto de datos al usar setImmediate:
var cursor = collection.find({filter...}).cursor();
cursor.nextObject(function fn(err, item) {
if (err || !item) return;
setImmediate(fnAction, item, arg1, arg2, function() {
cursor.nextObject(fn);
});
});
function fnAction(item, arg1, arg2, callback) {
// Here you can do whatever you want to do with your item.
return callback();
}
Puede obtener el resultado en una Array
e iterar usando una función recursiva, algo como esto.
myCollection.find({}).toArray(function (err, items) {
var count = items.length;
var fn = function () {
externalAsyncFuntion(items[count], function () {
count -= 1;
if (count) fn();
})
}
fn();
});
Editar:
Esto solo es aplicable para conjuntos de datos pequeños, para los más grandes debe usar cursores como se menciona en otras respuestas.
Puedes hacer algo como esto usando la biblioteca asíncrona. El punto clave aquí es verificar si el documento actual es nulo. Si es así, significa que has terminado.
async.series([
function (cb) {
cursor.each(function (err, doc) {
if (err) {
cb(err);
} else if (doc === null) {
cb();
} else {
console.log(doc);
array.push(doc);
}
});
}
], function (err) {
callback(err, array);
});
Puedes usar setTimeOut''s simple. Este es un ejemplo en mecanografiado que se ejecuta en nodejs (estoy usando promesas a través del módulo ''cuándo'' pero también se puede hacer sin ellos):
import mongodb = require("mongodb");
var dbServer = new mongodb.Server(''localhost'', 27017, {auto_reconnect: true}, {});
var db = new mongodb.Db(''myDb'', dbServer);
var util = require(''util'');
var when = require(''when''); //npm install when
var dbDefer = when.defer();
db.open(function() {
console.log(''db opened...'');
dbDefer.resolve(db);
});
dbDefer.promise.then(function(db : mongodb.Db){
db.collection(''myCollection'', function (error, dataCol){
if(error) {
console.error(error); return;
}
var doneReading = when.defer();
var processOneRecordAsync = function(record) : When.Promise{
var result = when.defer();
setTimeout (function() {
//simulate a variable-length operation
console.log(util.inspect(record));
result.resolve(''record processed'');
}, Math.random()*5);
return result.promise;
}
var runCursor = function (cursor : MongoCursor){
cursor.next(function(error : any, record : any){
if (error){
console.log(''an error occurred: '' + error);
return;
}
if (record){
processOneRecordAsync(record).then(function(r){
setTimeout(function() {runCursor(cursor)}, 1);
});
}
else{
//cursor up
doneReading.resolve(''done reading data.'');
}
});
}
dataCol.find({}, function(error, cursor : MongoCursor){
if (!error)
{
setTimeout(function() {runCursor(cursor)}, 1);
}
});
doneReading.promise.then(function(message : string){
//message=''done reading data''
console.log(message);
});
});
});
Puedes usar un futuro:
myCollection.find({}, function(err, resultCursor) {
resultCursor.count(Meteor.bindEnvironment(function(err,count){
for(var i=0;i<count;i++)
{
var itemFuture=new Future();
resultCursor.nextObject(function(err,item)){
itemFuture.result(item);
}
var item=itemFuture.wait();
//do what you want with the item,
//and continue with the loop if so
}
}));
});
Si alguien está buscando una forma Promesa de hacer esto (en lugar de usar callbacks de nextObject), aquí está. Estoy usando Node v4.2.2 y mongo driver v2.1.7. Esta es una especie de versión Cursor.forEach()
de Cursor.forEach()
:
function forEachSeries(cursor, iterator) {
return new Promise(function(resolve, reject) {
var count = 0;
function processDoc(doc) {
if (doc != null) {
count++;
return iterator(doc).then(function() {
return cursor.next().then(processDoc);
});
} else {
resolve(count);
}
}
cursor.next().then(processDoc);
});
}
Para usar esto, pase el cursor y un iterador que opera en cada documento de forma asincrónica (como lo haría con Cursor.forEach). El iterador necesita devolver una promesa, como lo hacen la mayoría de las funciones de controlador nativas de mongodb.
Digamos que desea actualizar todos los documentos en la test
recopilación. Así es como lo harías:
var theDb;
MongoClient.connect(dbUrl).then(function(db) {
theDb = db; // save it, we''ll need to close the connection when done.
var cur = db.collection(''test'').find();
return forEachSeries(cur, function(doc) { // this is the iterator
return db.collection(''test'').updateOne(
{_id: doc._id},
{$set: {updated: true}} // or whatever else you need to change
);
// updateOne returns a promise, if not supplied a callback. Just return it.
});
})
.then(function(count) {
console.log("All Done. Processed", count, "records");
theDb.close();
})
Si no desea cargar todos los resultados en la memoria utilizando paraArray, puede repetir usando el cursor con algo como lo siguiente.
myCollection.find({}, function(err, resultCursor) {
function processItem(err, item) {
if(item === null) {
return; // All done!
}
externalAsyncFunction(item, function(err) {
resultCursor.nextObject(processItem);
});
}
resultCursor.nextObject(processItem);
}
Un enfoque más moderno que utiliza async
/ await
:
const cursor = db.collection("foo").find({});
while(await cursor.hasNext()) {
const doc = await cursor.next();
// process doc here
}
Notas:
- Esto puede ser aún más fácil de hacer cuando llegan iteradores asíncronos .
- Probablemente desee agregar try / catch para la comprobación de errores.
- La función contenedora debe ser
async
o el código debe estar envuelto en(async function() { ... })()
ya que usaawait
. - Si lo desea,
await new Promise(resolve => setTimeout(resolve, 1000));
(pausa durante 1 segundo) al final del ciclo while para mostrar que procesa documentos uno después del otro.