write tutorial streams read nodejs node example create javascript node.js stream

javascript - tutorial - Implementando un flujo de transformación en buffer



stream nodejs (2)

Aquí está mi opinión sobre su problema.

La idea básica es crear un flujo de Transform , que nos permitirá ejecutar su lógica de almacenamiento en búfer personalizada antes de enviar los datos en la salida del flujo:

var util = require(''util'') var stream = require(''stream'') var BufferStream = function (streamOptions) { stream.Transform.call(this, streamOptions) this.buffer = new Buffer('''') } util.inherits(BufferStream, stream.Transform) BufferStream.prototype._transform = function (chunk, encoding, done) { // custom buffering logic // ie. add chunk to this.buffer, check buffer size, etc. this.buffer = new Buffer(chunk) this.push(chunk) done() }

Luego, debemos anular el método .pipe() para que se nos notifique cuando BufferStream se canaliza en una secuencia, lo que nos permite escribir datos en él automáticamente:

BufferStream.prototype.pipe = function (destination, options) { var res = BufferStream.super_.prototype.pipe.call(this, destination, options) res.write(this.buffer) return res }

De esta manera, cuando escribimos buffer.pipe(someStream) , realizamos la tubería como se buffer.pipe(someStream) y escribimos el buffer interno en el flujo de salida. Después de eso, la clase Transform se encarga de todo, a la vez que realiza un seguimiento de la contrapresión y todo eso.

Aquí hay una esencia de trabajo . Tenga en cuenta que no me molesté en escribir una lógica de almacenamiento en búfer correcta (es decir, no me importa el tamaño del búfer interno), pero esto debería ser fácil de solucionar.

Estoy tratando de implementar un flujo con la nueva API de flujos de Node.js que almacenará una cierta cantidad de datos. Cuando este flujo se canaliza a otro flujo, o si algo consume eventos readable , este flujo debe vaciar su búfer y simplemente convertirse en transferencia. El problema es que este flujo se canalizará a muchos otros flujos, y cuando se adjunta cada flujo de destino, el búfer debe vaciarse incluso si ya se ha descargado a otro flujo .

Por ejemplo:

  1. BufferStream implementa stream.Transform y mantiene un búfer de anillo interno de 512 KB
  2. ReadableStreamA se canaliza a una instancia de BufferStream
  3. BufferStream escribe en su búfer de anillo, leyendo los datos de ReadableStreamA medida que entran. (No importa si los datos se pierden, ya que el búfer sobrescribe los datos antiguos).
  4. BufferStream se canaliza a WritableStreamB
  5. WritableStreamB recibe el búfer completo de 512KB y continúa obteniendo datos a medida que se escriben desde ReadableStreamA través de BufferStream .
  6. BufferStream se canaliza a WritableStreamC
  7. WritableStreamC también recibe el búfer completo de 512 KB, pero este búfer ahora es diferente de lo que recibió WritableStreamB , porque desde entonces se han escrito más datos en BufferStream .

¿Es esto posible con la API de flujos? El único método en el que puedo pensar sería crear un objeto con un método que haga girar una nueva secuencia PassThrough para cada destino, lo que significa que no podría simplemente canalizar hacia y desde ella.

Para lo que vale, he hecho esto con la antigua API "fluida" simplemente escuchando nuevos controladores en eventos de data . Cuando se adjuntaba una nueva función con .on(''data'') , la llamaría directamente con una copia del búfer de anillo.


La respuesta de Paul es buena, pero no creo que cumpla con los requisitos exactos. Parece que lo que debe suceder es que cada vez que se llama a pipe () en este flujo de transformación, primero debe vaciar el búfer que representa toda la acumulación de datos entre el momento en que se creó el flujo de transformación / (conectado al flujo de origen) y la hora en que se conectó a la secuencia actual de escritura / destino.

Algo como esto podría ser más correcto:

var BufferStream = function () { stream.Transform.apply(this, arguments); this.buffer = []; //I guess an array will do }; util.inherits(BufferStream, stream.Transform); BufferStream.prototype._transform = function (chunk, encoding, done) { this.push(chunk ? String(chunk) : null); this.buffer.push(chunk ? String(chunk) : null); done() }; BufferStream.prototype.pipe = function (destination, options) { var res = BufferStream.super_.prototype.pipe.apply(this, arguments); this.buffer.forEach(function (b) { res.write(String(b)); }); return res; }; return new BufferStream();

Supongo que esto:

BufferStream.super_.prototype.pipe.apply(this, arguments);

es equivalente a esto:

stream.Transform.prototype.pipe.apply(this, arguments);

Probablemente podría optimizar esto y usar algunas banderas cuando se llame a pipe / unpipe.