library - Creación de una secuencia Node.js a partir de dos secuencias canalizadas
npm (2)
Me gustaría combinar dos flujos de Node.js en uno, canalizándolos, si es posible. Estoy usando flujos de Transform .
En otras palabras, me gustaría que mi biblioteca devuelva myStream
para que la usen las personas. Por ejemplo podrían escribir:
process.stdin.pipe(myStream).pipe(process.stdout);
Y, internamente, estoy usando un vendorStream
terceros vendorStream
que funciona, conectado a mi propia lógica contenida en myInternalStream
. Entonces, lo que está arriba se traduciría a:
process.stdin.pipe(vendorStream).pipe(myInternalStream).pipe(process.stdout);
¿Puedo hacer algo así? He intentado var myStream = vendorStream.pipe(myInternalStream)
pero eso obviamente no funciona.
Para hacer una analogía con bash
, digamos que quiero escribir un programa que verifique si la letra h
está presente en la última línea de algún flujo ( tail -n 1 | grep h
), puedo crear un script de shell:
# myscript.sh
tail -n 1 | grep h
Y luego si la gente lo hace:
$ printf "abc/ndef/nghi" | . myscript.sh
Simplemente funciona.
Esto es lo que tengo hasta ahora:
// Combine a pipe of two streams into one stream
var util = require(''util'')
, Transform = require(''stream'').Transform;
var chunks1 = [];
var stream1 = new Transform();
var soFar = '''';
stream1._transform = function(chunk, encoding, done) {
chunks1.push(chunk.toString());
var pieces = (soFar + chunk).split(''/n'');
soFar = pieces.pop();
for (var i = 0; i < pieces.length; i++) {
var piece = pieces[i];
this.push(piece);
}
return done();
};
var chunks2 = [];
var count = 0;
var stream2 = new Transform();
stream2._transform = function(chunk, encoding, done) {
chunks2.push(chunk.toString());
count = count + 1;
this.push(count + '' '' + chunk.toString() + ''/n'');
done();
};
var stdin = process.stdin;
var stdout = process.stdout;
process.on(''exit'', function () {
console.error(''chunks1: '' + JSON.stringify(chunks1));
console.error(''chunks2: '' + JSON.stringify(chunks2));
});
process.stdout.on(''error'', process.exit);
// stdin.pipe(stream1).pipe(stream2).pipe(stdout);
// $ (printf "abc/nd"; sleep 1; printf "ef/nghi/n") | node streams-combine.js
// Outputs:
// 1 abc
// 2 def
// 3 ghi
// chunks1: ["abc/nd","ef/nghi/n"]
// chunks2: ["abc","def","ghi"]
// Best working solution I could find
var stream3 = function(src) {
return src.pipe(stream1).pipe(stream2);
};
stream3(stdin).pipe(stdout);
// $ (printf "abc/nd"; sleep 1; printf "ef/nghi/n") | node streams-combine.js
// Outputs:
// 1 abc
// 2 def
// 3 ghi
// chunks1: ["abc/nd","ef/nghi/n"]
// chunks2: ["abc","def","ghi"]
¿Es esto posible? Déjame saber si lo que estoy tratando de hacer no está claro.
¡Gracias!
Puedes estar atento a que se unpipe
algo a tu flujo y luego unpipe
y canalizarlo a los flujos que te interesan:
var PassThrough = require(''stream'').PassThrough;
var stream3 = new PassThrough();
// When a source stream is piped to us, undo that pipe, and save
// off the source stream piped into our internally managed streams.
stream3.on(''pipe'', function(source) {
source.unpipe(this);
this.transformStream = source.pipe(stream1).pipe(stream2);
});
// When we''re piped to another stream, instead pipe our internal
// transform stream to that destination.
stream3.pipe = function(destination, options) {
return this.transformStream.pipe(destination, options);
};
stdin.pipe(stream3).pipe(stdout);
Puede extraer esta funcionalidad en su propia clase de secuencia construible:
var util = require(''util'');
var PassThrough = require(''stream'').PassThrough;
var StreamCombiner = function() {
this.streams = Array.prototype.slice.apply(arguments);
this.on(''pipe'', function(source) {
source.unpipe(this);
for(i in this.streams) {
source = source.pipe(this.streams[i]);
}
this.transformStream = source;
});
};
util.inherits(StreamCombiner, PassThrough);
StreamCombiner.prototype.pipe = function(dest, options) {
return this.transformStream.pipe(dest, options);
};
var stream3 = new StreamCombiner(stream1, stream2);
stdin.pipe(stream3).pipe(stdout);
Una opción es tal vez utilizar multipipe que le permite encadenar múltiples transformaciones juntas, envueltas como un solo flujo de transformación:
// my-stream.js
var multipipe = require(''multipipe'');
module.exports = function createMyStream() {
return multipipe(vendorStream, myInternalStream);
};
Entonces puedes hacer:
var createMyStream = require(''./my-stream'');
var myStream = createMyStream();
process.stdin.pipe(myStream).pipe(process.stdout);