javascript - tutorial - Implementando un flujo de transformación en buffer
stream nodejs (2)
Aquí está mi opinión sobre su problema.
La idea básica es crear un flujo de Transform
, que nos permitirá ejecutar su lógica de almacenamiento en búfer personalizada antes de enviar los datos en la salida del flujo:
var util = require(''util'')
var stream = require(''stream'')
var BufferStream = function (streamOptions) {
stream.Transform.call(this, streamOptions)
this.buffer = new Buffer('''')
}
util.inherits(BufferStream, stream.Transform)
BufferStream.prototype._transform = function (chunk, encoding, done) {
// custom buffering logic
// ie. add chunk to this.buffer, check buffer size, etc.
this.buffer = new Buffer(chunk)
this.push(chunk)
done()
}
Luego, debemos anular el método .pipe()
para que se nos notifique cuando BufferStream
se canaliza en una secuencia, lo que nos permite escribir datos en él automáticamente:
BufferStream.prototype.pipe = function (destination, options) {
var res = BufferStream.super_.prototype.pipe.call(this, destination, options)
res.write(this.buffer)
return res
}
De esta manera, cuando escribimos buffer.pipe(someStream)
, realizamos la tubería como se buffer.pipe(someStream)
y escribimos el buffer interno en el flujo de salida. Después de eso, la clase Transform
se encarga de todo, a la vez que realiza un seguimiento de la contrapresión y todo eso.
Aquí hay una esencia de trabajo . Tenga en cuenta que no me molesté en escribir una lógica de almacenamiento en búfer correcta (es decir, no me importa el tamaño del búfer interno), pero esto debería ser fácil de solucionar.
Estoy tratando de implementar un flujo con la nueva API de flujos de Node.js que almacenará una cierta cantidad de datos. Cuando este flujo se canaliza a otro flujo, o si algo consume eventos readable
, este flujo debe vaciar su búfer y simplemente convertirse en transferencia. El problema es que este flujo se canalizará a muchos otros flujos, y cuando se adjunta cada flujo de destino, el búfer debe vaciarse incluso si ya se ha descargado a otro flujo .
Por ejemplo:
-
BufferStream
implementastream.Transform
y mantiene un búfer de anillo interno de 512 KB -
ReadableStreamA
se canaliza a una instancia deBufferStream
-
BufferStream
escribe en su búfer de anillo, leyendo los datos deReadableStreamA
medida que entran. (No importa si los datos se pierden, ya que el búfer sobrescribe los datos antiguos). -
BufferStream
se canaliza aWritableStreamB
-
WritableStreamB
recibe el búfer completo de 512KB y continúa obteniendo datos a medida que se escriben desdeReadableStreamA
través deBufferStream
. -
BufferStream
se canaliza aWritableStreamC
-
WritableStreamC
también recibe el búfer completo de 512 KB, pero este búfer ahora es diferente de lo que recibióWritableStreamB
, porque desde entonces se han escrito más datos enBufferStream
.
¿Es esto posible con la API de flujos? El único método en el que puedo pensar sería crear un objeto con un método que haga girar una nueva secuencia PassThrough para cada destino, lo que significa que no podría simplemente canalizar hacia y desde ella.
Para lo que vale, he hecho esto con la antigua API "fluida" simplemente escuchando nuevos controladores en eventos de data
. Cuando se adjuntaba una nueva función con .on(''data'')
, la llamaría directamente con una copia del búfer de anillo.
La respuesta de Paul es buena, pero no creo que cumpla con los requisitos exactos. Parece que lo que debe suceder es que cada vez que se llama a pipe () en este flujo de transformación, primero debe vaciar el búfer que representa toda la acumulación de datos entre el momento en que se creó el flujo de transformación / (conectado al flujo de origen) y la hora en que se conectó a la secuencia actual de escritura / destino.
Algo como esto podría ser más correcto:
var BufferStream = function () {
stream.Transform.apply(this, arguments);
this.buffer = []; //I guess an array will do
};
util.inherits(BufferStream, stream.Transform);
BufferStream.prototype._transform = function (chunk, encoding, done) {
this.push(chunk ? String(chunk) : null);
this.buffer.push(chunk ? String(chunk) : null);
done()
};
BufferStream.prototype.pipe = function (destination, options) {
var res = BufferStream.super_.prototype.pipe.apply(this, arguments);
this.buffer.forEach(function (b) {
res.write(String(b));
});
return res;
};
return new BufferStream();
Supongo que esto:
BufferStream.super_.prototype.pipe.apply(this, arguments);
es equivalente a esto:
stream.Transform.prototype.pipe.apply(this, arguments);
Probablemente podría optimizar esto y usar algunas banderas cuando se llame a pipe / unpipe.