tag strong script comment node.js stream eventemitter

node.js - strong - Concatenar dos(o n) secuencias



pugjs comment (6)

  • 2 secuencias:

    Dadas las transmisiones legibles stream1 y stream2 , ¿cuál es una forma idiomática (concisa) de concatenar una secuencia que contenga stream1 y stream2 ?

    No puedo hacer stream1.pipe(outStream); stream2.pipe(outStream) stream1.pipe(outStream); stream2.pipe(outStream) , porque los contenidos de la secuencia se mezclan.

  • n transmisiones:

    Dado un EventEmitter que emite un número indeterminado de transmisiones, por ejemplo

    eventEmitter.emit(''stream'', stream1) eventEmitter.emit(''stream'', stream2) eventEmitter.emit(''stream'', stream3) ... eventEmitter.emit(''end'')

    ¿Cuál es una forma idiomática (concisa) de obtener una transmisión con todas las secuencias concatenadas ?


Puede que sea más conciso, pero aquí hay uno que funciona:

var util = require(''util''); var EventEmitter = require(''events'').EventEmitter; function ConcatStream(streamStream) { EventEmitter.call(this); var isStreaming = false, streamsEnded = false, that = this; var streams = []; streamStream.on(''stream'', function(stream){ stream.pause(); streams.push(stream); ensureState(); }); streamStream.on(''end'', function() { streamsEnded = true; ensureState(); }); var ensureState = function() { if(isStreaming) return; if(streams.length == 0) { if(streamsEnded) that.emit(''end''); return; } isStreaming = true; streams[0].on(''data'', onData); streams[0].on(''end'', onEnd); streams[0].resume(); }; var onData = function(data) { that.emit(''data'', data); }; var onEnd = function() { isStreaming = false; streams[0].removeAllListeners(''data''); streams[0].removeAllListeners(''end''); streams.shift(); ensureState(); }; } util.inherits(ConcatStream, EventEmitter);

Realizamos un seguimiento del estado con streams (la cola de transmisiones, push hacia atrás y shift desde la parte frontal), isStreaming y streamsEnded . Cuando recibimos un nuevo flujo, lo presionamos y, cuando termina un ciclo, dejamos de escuchar y lo cambiamos. Cuando termina la secuencia de transmisiones, establecemos streamsEnded .

En cada uno de estos eventos, verificamos el estado en el que nos encontramos. Si ya estamos transmitiendo (canalizando una transmisión), no hacemos nada. Si la cola está vacía y se establece streamsEnded , emitimos el evento end . Si hay algo en la cola, lo reanudamos y escuchamos sus eventos.

* Tenga en cuenta que la pause y el resume son orientativos, por lo que algunas secuencias pueden no comportarse correctamente y requerirán almacenamiento en búfer. Este ejercicio se deja al lector.

Habiendo hecho todo esto, haría el caso n=2 construyendo un EventEmitter , creando un ConcatStream con él y emitiendo dos eventos de stream seguidos por un evento end . Estoy seguro de que se podría hacer de manera más concisa, pero podemos usar lo que tenemos.


El paquete de flujo combinado concatena transmisiones. Ejemplo del archivo README:

var CombinedStream = require(''combined-stream''); var fs = require(''fs''); var combinedStream = CombinedStream.create(); combinedStream.append(fs.createReadStream(''file1.txt'')); combinedStream.append(fs.createReadStream(''file2.txt'')); combinedStream.pipe(fs.createWriteStream(''combined.txt''));

Creo que debes agregar todas las transmisiones a la vez. Si la cola se ejecuta en blanco, el combinedStream termina automáticamente. Ver el número 5 .

La biblioteca stream-stream es una alternativa que tiene un .end explícito, pero es mucho menos popular y presumiblemente no tan bien probado. Utiliza la API streams2 del Nodo 0.10 (ver esta discusión ).


streamee.js es un conjunto de transformadores de flujo y compositores basados ​​en el nodo 1.0+ streams e incluye un método de concatenación:

var stream1ThenStream2 = streamee.concatenate([stream1, stream2]);


https://github.com/joepie91/node-combined-stream2 es un reemplazo instantáneo compatible con Streams2 para el módulo de flujo combinado (que se describe arriba). Se envuelve automáticamente las transmisiones Streams1.

Código de ejemplo para combined-stream2:

var CombinedStream = require(''combined-stream2''); var fs = require(''fs''); var combinedStream = CombinedStream.create(); combinedStream.append(fs.createReadStream(''file1.txt'')); combinedStream.append(fs.createReadStream(''file2.txt'')); combinedStream.pipe(fs.createWriteStream(''combined.txt''));


esto se puede hacer con nodejs vainilla

import { PassThrough } from ''stream'' const merge = (...streams) => { let pass = new PassThrough() let waiting = streams.length for (let stream of streams) { pass = stream.pipe(pass, {end: false}) stream.once(''end'', () => --waiting === 0 && pass.emit(''end'')) } return pass }


¡Una operación de reducción simple debería estar bien en nodejs !

const {PassThrough} = require(''stream'') let joined = [s0, s1, s2, ...sN].reduce((pt, s, i, a) => { s.pipe(pt, {end: false}) s.once(''end'', () => a.every(s => s.ended) && pt.emit(''end'')) return pt }, new PassThrough())

Saludos;)