javascript - remedios - ¿Cómo puedo controlar el flujo del programa usando eventos y promesas?
tratamiento para el flujo abundante (3)
Si sus llamadas a send()
se mezclan entre sí, debe guardarlas en caché. Para asegurarse de que el mensaje recibido coincida con el enviado, debe asignar una id
única para cada mensaje a la carga útil.
Entonces, el remitente de tu mensaje se verá así
class MyClass extends EventEmitter {
constructor() {
// [redacted]
this.messages = new Map();
}
handle(data) {
this.socket.on(''data'', data => {
this.messages.get(data.id)(data);
this.messages.delete(data.id);
});
}
send(data) {
return return new Promise((resolve, reject) => {
this.messages.set(data.id, resolve);
this.socket.write(data);
});
}
}
Este código no será sensible al orden de los mensajes y obtendrá la API que desee.
Tengo una clase así:
import net from ''net'';
import {EventEmitter} from ''events'';
import Promise from ''bluebird'';
class MyClass extends EventEmitter {
constructor(host = ''localhost'', port = 10011) {
super(EventEmitter);
this.host = host;
this.port = port;
this.socket = null;
this.connect();
}
connect() {
this.socket = net.connect(this.port, this.host);
this.socket.on(''connect'', this.handle.bind(this));
}
handle(data) {
this.socket.on(''data'', data => {
});
}
send(data) {
this.socket.write(data);
}
}
¿Cómo convertiría el método de send
en una promesa, que devuelve un valor del evento de data
del socket? El servidor solo envía datos cuando se le envían datos, que no sean un mensaje de conexión que se puede suprimir fácilmente.
He intentado algo como:
handle(data) {
this.socket.on(''data'', data => {
return this.socket.resolve(data);
});
this.socket.on(''error'', this.socket.reject.bind(this));
}
send(data) {
return new Promise((resolve, reject) => {
this.socket.resolve = resolve;
this.socket.reject = reject;
this.socket.write(data);
});
}
Obviamente, esto no funcionará porque resolve
/ reject
se sobreescribirá entre sí al encadenar y / o send
llamadas múltiples veces en paralelo.
También existe el problema de llamar a send
dos veces en paralelo y resolver cualquier respuesta que se reciba primero.
Actualmente tengo una implementación que usa una cola y difiere, pero se siente desordenada ya que la fila se revisa constantemente.
Me gustaría poder hacer lo siguiente:
let c = new MyClass(''localhost'', 10011);
c.send(''foo'').then(response => {
return c.send(''bar'', response.param);
//`response` should be the data returned from `this.socket.on(''data'')`.
}).then(response => {
console.log(response);
}).catch(error => console.log(error));
Solo para agregar, no tengo ningún control sobre los datos que se reciben, lo que significa que no se pueden modificar fuera de la transmisión.
Editar : Parece que esto es bastante imposible, debido a que TCP no tiene un flujo de solicitud-respuesta. ¿Cómo se puede implementar esto aún utilizando promesas, pero usando una cadena de promesa de una sola ejecución (una solicitud a la vez) o una cola?
Destilé el problema al mínimo y lo convertí en ejecutable del navegador:
- La clase Socket es burlada.
- Se eliminó la información sobre el puerto, el host y la herencia de
EventEmitter
.
La solución funciona agregando nuevas solicitudes a la cadena de promesas, pero permitiendo un máximo de solicitudes abiertas / no respondidas en un punto de tiempo determinado. .send
devuelve una nueva promesa cada vez que se llama y la clase se encarga de toda la sincronización interna. Por lo tanto, se puede llamar .send
varias veces y se garantiza el procesamiento ordenado correcto (FIFO) de las solicitudes. Una característica adicional que agregué es recortar la cadena de promesas, si no hay solicitudes pendientes.
Advertencia : omití el manejo de errores por completo, pero debe ajustarse a su caso de uso particular de todos modos.
class SocketMock {
constructor(){
this.connected = new Promise( (resolve, reject) => setTimeout(resolve,200) );
this.listeners = {
// ''error'' : [],
''data'' : []
}
}
send(data){
console.log(`SENDING DATA: ${data}`);
var response = `SERVER RESPONSE TO: ${data}`;
setTimeout( () => this.listeners[''data''].forEach(cb => cb(response)),
Math.random()*2000 + 250);
}
on(event, callback){
this.listeners[event].push(callback);
}
}
class SingleRequestCoordinator {
constructor() {
this._openRequests = 0;
this.socket = new SocketMock();
this._promiseChain = this.socket
.connected.then( () => console.log(''SOCKET CONNECTED''));
this.socket.on(''data'', (data) => {
this._openRequests -= 1;
console.log(this._openRequests);
if(this._openRequests === 0){
console.log(''NO PENDING REQUEST --- trimming the chain'');
this._promiseChain = this.socket.connected
}
this._deferred.resolve(data);
});
}
send(data) {
this._openRequests += 1;
this._promiseChain = this._promiseChain
.then(() => {
this._deferred = Promise.defer();
this.socket.send(data);
return this._deferred.promise;
});
return this._promiseChain;
}
}
var sender = new SingleRequestCoordinator();
sender.send(''data-1'').then(data => console.log(`GOT DATA FROM SERVER --- ${data}`));
sender.send(''data-2'').then(data => console.log(`GOT DATA FROM SERVER --- ${data}`));
sender.send(''data-3'').then(data => console.log(`GOT DATA FROM SERVER --- ${data}`));
setTimeout(() => sender.send(''data-4'')
.then(data => console.log(`GOT DATA FROM SERVER --- ${data}`)), 10000);
socket.write(data[, encoding][, callback])
toma una devolución de llamada. Puede rechazar o resolver en esta devolución de llamada.
class MyClass extends EventEmitter {
constructor(host = ''localhost'', port = 10011) {
super(EventEmitter);
this.host = host;
this.port = port;
this.socket = null;
this.requests = null;
this.connect();
}
connect() {
this.socket = net.connect(this.port, this.host);
this.socket.on(''connect'', () => {
this.requests = [];
this.socket.on(''data'', this.handle.bind(this));
this.socket.on(''error'', this.error.bind(this));
});
}
handle(data) {
var [request, resolve, reject] = this.requests.pop();
// I''m not sure what will happen with the destructuring if requests is empty
if(resolve) {
resolve(data);
}
}
error(error) {
var [request, resolve, reject] = this.requests.pop();
if(reject) {
reject(error);
}
}
send(data) {
return new Promise((resolve, reject) => {
if(this.requests === null) {
return reject(''Not connected'');
}
this.requests.push([data, resolve, reject]);
this.socket.write(data);
});
}
}
No probado y, por lo tanto, no estoy seguro sobre las firmas de métodos, pero esa es la idea básica.
Esto supone que habrá un solo handle
o evento de error
por cada solicitud realizada.
Cuanto más lo pienso, parece imposible sin información adicional en los datos de su aplicación, como los números de paquete para que coincida con una respuesta a una solicitud.
La forma en que se implementa ahora (y también la forma en que está en su pregunta), ni siquiera está seguro de que una respuesta coincida exactamente con un evento de handle
.