node.js - strong - Concatenar dos(o n) secuencias
pugjs comment (6)
2 secuencias:
Dadas las transmisiones legibles
stream1
ystream2
, ¿cuál es una forma idiomática (concisa) de concatenar una secuencia que contengastream1
ystream2
?No puedo hacer
stream1.pipe(outStream); stream2.pipe(outStream)
stream1.pipe(outStream); stream2.pipe(outStream)
, porque los contenidos de la secuencia se mezclan.n transmisiones:
Dado un EventEmitter que emite un número indeterminado de transmisiones, por ejemplo
eventEmitter.emit(''stream'', stream1) eventEmitter.emit(''stream'', stream2) eventEmitter.emit(''stream'', stream3) ... eventEmitter.emit(''end'')
¿Cuál es una forma idiomática (concisa) de obtener una transmisión con todas las secuencias concatenadas ?
Puede que sea más conciso, pero aquí hay uno que funciona:
var util = require(''util'');
var EventEmitter = require(''events'').EventEmitter;
function ConcatStream(streamStream) {
EventEmitter.call(this);
var isStreaming = false,
streamsEnded = false,
that = this;
var streams = [];
streamStream.on(''stream'', function(stream){
stream.pause();
streams.push(stream);
ensureState();
});
streamStream.on(''end'', function() {
streamsEnded = true;
ensureState();
});
var ensureState = function() {
if(isStreaming) return;
if(streams.length == 0) {
if(streamsEnded)
that.emit(''end'');
return;
}
isStreaming = true;
streams[0].on(''data'', onData);
streams[0].on(''end'', onEnd);
streams[0].resume();
};
var onData = function(data) {
that.emit(''data'', data);
};
var onEnd = function() {
isStreaming = false;
streams[0].removeAllListeners(''data'');
streams[0].removeAllListeners(''end'');
streams.shift();
ensureState();
};
}
util.inherits(ConcatStream, EventEmitter);
Realizamos un seguimiento del estado con streams
(la cola de transmisiones, push
hacia atrás y shift
desde la parte frontal), isStreaming
y streamsEnded
. Cuando recibimos un nuevo flujo, lo presionamos y, cuando termina un ciclo, dejamos de escuchar y lo cambiamos. Cuando termina la secuencia de transmisiones, establecemos streamsEnded
.
En cada uno de estos eventos, verificamos el estado en el que nos encontramos. Si ya estamos transmitiendo (canalizando una transmisión), no hacemos nada. Si la cola está vacía y se establece streamsEnded
, emitimos el evento end
. Si hay algo en la cola, lo reanudamos y escuchamos sus eventos.
* Tenga en cuenta que la pause
y el resume
son orientativos, por lo que algunas secuencias pueden no comportarse correctamente y requerirán almacenamiento en búfer. Este ejercicio se deja al lector.
Habiendo hecho todo esto, haría el caso n=2
construyendo un EventEmitter
, creando un ConcatStream
con él y emitiendo dos eventos de stream
seguidos por un evento end
. Estoy seguro de que se podría hacer de manera más concisa, pero podemos usar lo que tenemos.
El paquete de flujo combinado concatena transmisiones. Ejemplo del archivo README:
var CombinedStream = require(''combined-stream'');
var fs = require(''fs'');
var combinedStream = CombinedStream.create();
combinedStream.append(fs.createReadStream(''file1.txt''));
combinedStream.append(fs.createReadStream(''file2.txt''));
combinedStream.pipe(fs.createWriteStream(''combined.txt''));
Creo que debes agregar todas las transmisiones a la vez. Si la cola se ejecuta en blanco, el combinedStream
termina automáticamente. Ver el número 5 .
La biblioteca stream-stream es una alternativa que tiene un .end
explícito, pero es mucho menos popular y presumiblemente no tan bien probado. Utiliza la API streams2 del Nodo 0.10 (ver esta discusión ).
streamee.js es un conjunto de transformadores de flujo y compositores basados en el nodo 1.0+ streams e incluye un método de concatenación:
var stream1ThenStream2 = streamee.concatenate([stream1, stream2]);
https://github.com/joepie91/node-combined-stream2 es un reemplazo instantáneo compatible con Streams2 para el módulo de flujo combinado (que se describe arriba). Se envuelve automáticamente las transmisiones Streams1.
Código de ejemplo para combined-stream2:
var CombinedStream = require(''combined-stream2'');
var fs = require(''fs'');
var combinedStream = CombinedStream.create();
combinedStream.append(fs.createReadStream(''file1.txt''));
combinedStream.append(fs.createReadStream(''file2.txt''));
combinedStream.pipe(fs.createWriteStream(''combined.txt''));
esto se puede hacer con nodejs vainilla
import { PassThrough } from ''stream''
const merge = (...streams) => {
let pass = new PassThrough()
let waiting = streams.length
for (let stream of streams) {
pass = stream.pipe(pass, {end: false})
stream.once(''end'', () => --waiting === 0 && pass.emit(''end''))
}
return pass
}