tutorial - Analizar enormes archivos de registro en Node.js-leer en línea por línea
npm (9)
Necesito hacer algunos análisis de archivos de registro grandes (5-10 Gb) en Javascript / Node.js (estoy usando Cube).
El logline se ve algo así como:
10:00:43.343423 I''m a friendly log message. There are 5 cats, and 7 dogs. We are in state "SUCCESS".
Necesitamos leer cada línea, hacer algunos análisis (por ejemplo, quitar 5
, 7
y SUCCESS
), luego bombear estos datos en Cube ( https://github.com/square/cube ) usando su cliente JS.
En primer lugar, ¿cuál es la forma canónica en Node para leer en un archivo, línea por línea?
Parece ser una pregunta bastante común en línea:
- http://www.quora.com/What-is-the-best-way-to-read-a-file-line-by-line-in-node-js
- ¿Lee un archivo una línea a la vez en node.js?
Muchas de las respuestas parecen apuntar a un grupo de módulos de terceros:
- https://github.com/nickewing/line-reader
- https://github.com/jahewson/node-byline
- https://github.com/pkrumins/node-lazy
- https://github.com/Gagle/Node-BufferedReader
Sin embargo, esto parece una tarea bastante básica: seguramente, hay una manera simple dentro del archivo stdlib de leer en un archivo de texto, línea por línea.
En segundo lugar, necesito procesar cada línea (por ejemplo, convertir la marca de tiempo en un objeto Date y extraer campos útiles).
¿Cuál es la mejor manera de hacerlo, maximizando el rendimiento? ¿Hay alguna manera de que no se bloquee en la lectura en cada línea o en el envío a Cube?
En tercer lugar, supongo que usar divisiones de cadena, y el equivalente JS de contiene (IndexOf! = -1?) Será mucho más rápido que las expresiones regulares. ¿Alguien ha tenido mucha experiencia en analizar cantidades masivas de datos de texto en Node.js?
Saludos, Victor
Además de leer el gran archivo línea por línea, también puede leerlo en porciones. Para más, consulte este artículo
var offset = 0;
var chunkSize = 2048;
var chunkBuffer = new Buffer(chunkSize);
var fp = fs.openSync(''filepath'', ''r'');
var bytesRead = 0;
while(bytesRead = fs.readSync(fp, chunkBuffer, 0, chunkSize, offset)) {
offset += bytesRead;
var str = chunkBuffer.slice(0, bytesRead).toString();
var arr = str.split(''/n'');
if(bytesRead = chunkSize) {
// the last item of the arr may be not a full line, leave it to the next chunk
offset -= arr.pop().length;
}
lines.push(arr);
}
console.log(lines);
Busqué una solución para analizar archivos muy grandes (gbs) línea por línea usando una secuencia. Todas las bibliotecas y ejemplos de terceros no se ajustaban a mis necesidades, ya que procesaban los archivos, línea por línea (como 1, 2, 3, 4 ...) o leían todo el archivo en la memoria.
La siguiente solución puede analizar archivos muy grandes, línea por línea usando stream & pipe. Para las pruebas utilicé un archivo de 2.1 gb con 17.000.000 de registros. El uso de Ram no excedió los 60 mb.
var fs = require(''fs'')
, es = require(''event-stream'');
var lineNr = 0;
var s = fs.createReadStream(''very-large-file.csv'')
.pipe(es.split())
.pipe(es.mapSync(function(line){
// pause the readstream
s.pause();
lineNr += 1;
// process line here and call s.resume() when rdy
// function below was for logging memory usage
logMemoryUsage(lineNr);
// resume the readstream, possibly from a callback
s.resume();
})
.on(''error'', function(err){
console.log(''Error while reading file.'', err);
})
.on(''end'', function(){
console.log(''Read entire file.'')
})
);
¡Por favor déjame saber cómo va!
En base a this respuesta a las preguntas, implementé una clase que puede usar para leer un archivo de forma sincronizada línea por línea con fs.readSync()
. Puede hacer esta "pausa" y "reanudar" usando una promesa Q
( jQuery
parece requerir un DOM así que no puede ejecutarlo con nodejs
):
var fs = require(''fs'');
var Q = require(''q'');
var lr = new LineReader(filenameToLoad);
lr.open();
var promise;
workOnLine = function () {
var line = lr.readNextLine();
promise = complexLineTransformation(line).then(
function() {console.log(''ok'');workOnLine();},
function() {console.log(''error'');}
);
}
workOnLine();
complexLineTransformation = function (line) {
var deferred = Q.defer();
// ... async call goes here, in callback: deferred.resolve(''done ok''); or deferred.reject(new Error(error));
return deferred.promise;
}
function LineReader (filename) {
this.moreLinesAvailable = true;
this.fd = undefined;
this.bufferSize = 1024*1024;
this.buffer = new Buffer(this.bufferSize);
this.leftOver = '''';
this.read = undefined;
this.idxStart = undefined;
this.idx = undefined;
this.lineNumber = 0;
this._bundleOfLines = [];
this.open = function() {
this.fd = fs.openSync(filename, ''r'');
};
this.readNextLine = function () {
if (this._bundleOfLines.length === 0) {
this._readNextBundleOfLines();
}
this.lineNumber++;
var lineToReturn = this._bundleOfLines[0];
this._bundleOfLines.splice(0, 1); // remove first element (pos, howmany)
return lineToReturn;
};
this.getLineNumber = function() {
return this.lineNumber;
};
this._readNextBundleOfLines = function() {
var line = "";
while ((this.read = fs.readSync(this.fd, this.buffer, 0, this.bufferSize, null)) !== 0) { // read next bytes until end of file
this.leftOver += this.buffer.toString(''utf8'', 0, this.read); // append to leftOver
this.idxStart = 0
while ((this.idx = this.leftOver.indexOf("/n", this.idxStart)) !== -1) { // as long as there is a newline-char in leftOver
line = this.leftOver.substring(this.idxStart, this.idx);
this._bundleOfLines.push(line);
this.idxStart = this.idx + 1;
}
this.leftOver = this.leftOver.substring(this.idxStart);
if (line !== "") {
break;
}
}
};
}
He creado un módulo de nodos para leer archivos de gran tamaño de forma asíncrona o JSON. Probado en archivos grandes.
var fs = require(''fs'')
, util = require(''util'')
, stream = require(''stream'')
, es = require(''event-stream'');
module.exports = FileReader;
function FileReader(){
}
FileReader.prototype.read = function(pathToFile, callback){
var returnTxt = '''';
var s = fs.createReadStream(pathToFile)
.pipe(es.split())
.pipe(es.mapSync(function(line){
// pause the readstream
s.pause();
//console.log(''reading line: ''+line);
returnTxt += line;
// resume the readstream, possibly from a callback
s.resume();
})
.on(''error'', function(){
console.log(''Error while reading file.'');
})
.on(''end'', function(){
console.log(''Read entire file.'');
callback(returnTxt);
})
);
};
FileReader.prototype.readJSON = function(pathToFile, callback){
try{
this.read(pathToFile, function(txt){callback(JSON.parse(txt));});
}
catch(err){
throw new Error(''json file is not valid! ''+err.stack);
}
};
Simplemente guarde el archivo como file-reader.js, y úselo de esta manera:
var FileReader = require(''./file-reader'');
var fileReader = new FileReader();
fileReader.readJSON(__dirname + ''/largeFile.json'', function(jsonObj){/*callback logic here*/});
Puede usar el paquete readline
incorporado, consulte los documentos here . Yo uso stream para crear una nueva corriente de salida.
var fs = require(''fs''),
readline = require(''readline''),
stream = require(''stream'');
var instream = fs.createReadStream(''/path/to/file'');
var outstream = new stream;
outstream.readable = true;
outstream.writable = true;
var rl = readline.createInterface({
input: instream,
output: outstream,
terminal: false
});
rl.on(''line'', function(line) {
console.log(line);
//Do your stuff ...
//Then write to outstream
rl.write(cubestuff);
});
Los archivos grandes tardarán un tiempo en procesarse. Dime si funciona.
Realmente me gustó la respuesta que en realidad merece ser la respuesta correcta aquí. Hice algunas mejoras:
- El código está en una clase (modular)
- El análisis está incluido
- La capacidad de reanudar se da al exterior en caso de que haya un trabajo asincrónico encadenado a leer el CSV, como insertar en DB, o una solicitud HTTP.
- Lectura en bloques / tamaños de lote que el usuario puede declarar. También me encargué de la codificación en la transmisión, en caso de que tengas archivos con codificación diferente.
Aquí está el código:
''use strict''
const fs = require(''fs''),
util = require(''util''),
stream = require(''stream''),
es = require(''event-stream''),
parse = require("csv-parse"),
iconv = require(''iconv-lite'');
class CSVReader {
constructor(filename, batchSize, columns) {
this.reader = fs.createReadStream(filename).pipe(iconv.decodeStream(''utf8''))
this.batchSize = batchSize || 1000
this.lineNumber = 0
this.data = []
this.parseOptions = {delimiter: ''/t'', columns: true, escape: ''/'', relax: true}
}
read(callback) {
this.reader
.pipe(es.split())
.pipe(es.mapSync(line => {
++this.lineNumber
parse(line, this.parseOptions, (err, d) => {
this.data.push(d[0])
})
if (this.lineNumber % this.batchSize === 0) {
callback(this.data)
}
})
.on(''error'', function(){
console.log(''Error while reading file.'')
})
.on(''end'', function(){
console.log(''Read entirefile.'')
}))
}
continue () {
this.data = []
this.reader.resume()
}
}
module.exports = CSVReader
Entonces, básicamente, así es como lo usarás:
let reader = CSVReader(''path_to_file.csv'')
reader.read(() => reader.continue())
Probé esto con un archivo CSV de 35GB y funcionó para mí y es por eso que elegí compilarlo en la respuesta de , las retroalimentaciones son bienvenidas.
Todavía tengo el mismo problema. Después de comparar varios módulos que parecen tener esta característica, decidí hacerlo yo mismo, es más simple de lo que pensaba.
esencia: https://gist.github.com/deemstone/8279565
var fetchBlock = lineByline(filepath, onEnd);
fetchBlock(function(lines, start){ ... }); //lines{array} start{int} lines[0] No.
fetchBlock()
el archivo abierto en un cierre, que fetchBlock()
devuelto obtendrá un bloque del archivo, final dividir en matriz (tratará el segmento de la última búsqueda).
Establecí el tamaño del bloque en 1024 para cada operación de lectura. Esto puede tener errores, pero la lógica del código es obvia, pruébalo tú mismo.
Usé https://www.npmjs.com/package/line-by-line para leer más de 1 000 000 líneas de un archivo de texto. En este caso, una capacidad ocupada de RAM era de aproximadamente 50-60 megabytes.
const LineByLineReader = require(''line-by-line''),
lr = new LineByLineReader(''big_file.txt'');
lr.on(''error'', function (err) {
// ''err'' contains error object
});
lr.on(''line'', function (line) {
// pause emitting of lines...
lr.pause();
// ...do your asynchronous line processing..
setTimeout(function () {
// ...and continue emitting lines.
lr.resume();
}, 100);
});
lr.on(''end'', function () {
// All lines are read, file is closed now.
});
node-byline usa streams, por lo que preferiría ese para tus enormes archivos.
para sus conversiones de fecha usaría moment.js .
para maximizar su rendimiento, podría pensar en utilizar un clúster de software. hay algunos bonitos módulos que envuelven bastante bien el módulo de clúster nativo del nodo. me gusta cluster-master de isaacs. por ejemplo, puedes crear un grupo de x trabajadores que calculen un archivo.
para la comparación de divisiones frente a las expresiones regulares, use benchmark.js . No lo he probado hasta ahora. benchmark.js está disponible como un nodo-módulo