javascript - sagas - redux saga tutorial español
¿Cómo vincular los eventos de eventos emitidos a la saga redux? (5)
Estoy tratando de usar la saga redux para conectar eventos desde PouchDB a mi aplicación React.js , pero estoy luchando por descubrir cómo conectar los eventos emitidos desde PouchDB a mi Saga. Dado que el evento utiliza una función de devolución de llamada (y no puedo pasarle un generador), no puedo usar yield put()
dentro de la devolución de llamada, sino que produce errores extraños después de la compilación ES2015 (usando Webpack).
Así que esto es lo que estoy tratando de lograr, la parte que no funciona está dentro de replication.on(''change'' (info) => {})
.
function * startReplication (wrapper) {
while (yield take(DATABASE_SET_CONFIGURATION)) {
yield call(wrapper.connect.bind(wrapper))
// Returns a promise, or false.
let replication = wrapper.replicate()
if (replication) {
replication.on(''change'', (info) => {
yield put(replicationChange(info))
})
}
}
}
export default [ startReplication ]
Podemos usar eventChannel
de redux-saga
Aquí está mi ejemplo
// fetch history messages
function* watchMessageEventChannel(client) {
const chan = eventChannel(emitter => {
client.on(''message'', (message) => emitter(message));
return () => {
client.close().then(() => console.log(''logout''));
};
});
while (true) {
const message = yield take(chan);
yield put(receiveMessage(message));
}
}
function* fetchMessageHistory(action) {
const client = yield realtime.createIMClient(''demo_uuid'');
// listen message event
yield fork(watchMessageEventChannel, client);
}
Tenga en cuenta :
los mensajes en un eventChannel no están almacenados de forma predeterminada. Si desea procesar el message event
solo uno por uno, no puede usar la llamada de bloqueo después de const message = yield take(chan);
O bien, debe proporcionar un búfer a la fábrica de EventChannel para especificar la estrategia de búfer para el canal (por ejemplo, eventChannel (subscriber, buffer)). Ver los documentos de la API redux-saga para más información
Como lo explicó Nirrek, cuando necesite conectarse a fuentes de datos push, tendrá que crear un iterador de eventos para esa fuente.
Me gustaría agregar que el mecanismo anterior podría volverse reutilizable. Entonces no tenemos que volver a crear un iterador de evento para cada fuente diferente.
La solución es crear un canal genérico con métodos de put
y take
. Puede llamar al método take
desde dentro del generador y conectar el método put
a la interfaz del oyente de su fuente de datos.
Aquí hay una posible implementación. Tenga en cuenta que el canal almacena temporalmente los mensajes si nadie los está esperando (p. Ej., El generador está ocupado haciendo alguna llamada remota)
function createChannel () {
const messageQueue = []
const resolveQueue = []
function put (msg) {
// anyone waiting for a message ?
if (resolveQueue.length) {
// deliver the message to the oldest one waiting (First In First Out)
const nextResolve = resolveQueue.shift()
nextResolve(msg)
} else {
// no one is waiting ? queue the event
messageQueue.push(msg)
}
}
// returns a Promise resolved with the next message
function take () {
// do we have queued messages ?
if (messageQueue.length) {
// deliver the oldest queued message
return Promise.resolve(messageQueue.shift())
} else {
// no queued messages ? queue the taker until a message arrives
return new Promise((resolve) => resolveQueue.push(resolve))
}
}
return {
take,
put
}
}
Entonces, el canal anterior se puede usar cada vez que desee escuchar una fuente de datos de inserción externa. Para tu ejemplo
function createChangeChannel (replication) {
const channel = createChannel()
// every change event will call put on the channel
replication.on(''change'', channel.put)
return channel
}
function * startReplication (getState) {
// Wait for the configuration to be set. This can happen multiple
// times during the life cycle, for example when the user wants to
// switch database/workspace.
while (yield take(DATABASE_SET_CONFIGURATION)) {
let state = getState()
let wrapper = state.database.wrapper
// Wait for a connection to work.
yield apply(wrapper, wrapper.connect)
// Trigger replication, and keep the promise.
let replication = wrapper.replicate()
if (replication) {
yield call(monitorChangeEvents, createChangeChannel(replication))
}
}
}
function * monitorChangeEvents (channel) {
while (true) {
const info = yield call(channel.take) // Blocks until the promise resolves
yield put(databaseActions.replicationChange(info))
}
}
Gracias a @Yassine Elouafi
Creé la implementación corta de canales generales con licencia del MIT como extensión redux-saga para lenguaje TypeScript, basada en la solución de @Yassine Elouafi.
// redux-saga/channels.ts
import { Saga } from ''redux-saga'';
import { call, fork } from ''redux-saga/effects'';
export interface IChannel<TMessage> {
take(): Promise<TMessage>;
put(message: TMessage): void;
}
export function* takeEvery<TMessage>(channel: IChannel<TMessage>, saga: Saga) {
while (true) {
const message: TMessage = yield call(channel.take);
yield fork(saga, message);
}
}
export function createChannel<TMessage>(): IChannel<TMessage> {
const messageQueue: TMessage[] = [];
const resolveQueue: ((message: TMessage) => void)[] = [];
function put(message: TMessage): void {
if (resolveQueue.length) {
const nextResolve = resolveQueue.shift();
nextResolve(message);
} else {
messageQueue.push(message);
}
}
function take(): Promise<TMessage> {
if (messageQueue.length) {
return Promise.resolve(messageQueue.shift());
} else {
return new Promise((resolve: (message: TMessage) => void) => resolveQueue.push(resolve));
}
}
return {
take,
put
};
}
Y el uso de ejemplo similar a redux-saga * takeEvery construction
// example-socket-action-binding.ts
import { put } from ''redux-saga/effects'';
import {
createChannel,
takeEvery as takeEveryChannelMessage
} from ''./redux-saga/channels'';
export function* socketBindActions(
socket: SocketIOClient.Socket
) {
const socketChannel = createSocketChannel(socket);
yield* takeEveryChannelMessage(socketChannel, function* (action: IAction) {
yield put(action);
});
}
function createSocketChannel(socket: SocketIOClient.Socket) {
const socketChannel = createChannel<IAction>();
socket.on(''action'', (action: IAction) => socketChannel.put(action));
return socketChannel;
}
También tuve el mismo problema usando PouchDB y encontré las respuestas proporcionadas extremadamente útiles e interesantes. Sin embargo, hay muchas maneras de hacer lo mismo en PouchDB y busqué un poco y encontré un enfoque diferente que quizás sea más fácil de razonar.
Si no adjunta los oyentes a la solicitud db.change
, devuelve los datos de cambio directamente a la persona que llama y agrega " continuous: true
a la opción hará que emita una "longpoll" y no la devuelva hasta que se haya producido algún cambio. Por lo tanto, se puede lograr el mismo resultado con la siguiente
export function * monitorDbChanges() {
var info = yield call([db, db.info]); // get reference to last change
let lastSeq = info.update_seq;
while(true){
try{
var changes = yield call([db, db.changes], { since: lastSeq, continuous: true, include_docs: true, heartbeat: 20000 });
if (changes){
for(let i = 0; i < changes.results.length; i++){
yield put({type: ''CHANGED_DOC'', doc: changes.results[i].doc});
}
lastSeq = changes.last_seq;
}
}catch (error){
yield put({type: ''monitor-changes-error'', err: error})
}
}
}
Hay una cosa que no he llegado al fondo. Si reemplazo el ciclo for
por change.results.forEach((change)=>{...})
entonces obtengo un error de sintaxis no válido en el yield
. Supongo que tiene algo que ver con algún choque en el uso de iteradores.
El problema fundamental que tenemos que resolver es que los emisores de eventos están ''basados en el empuje'', mientras que las sagas están ''basadas en el tirón''.
Si se suscribe a un evento como el siguiente: replication.on(''change'', (info) => {})
, entonces la devolución de llamada se ejecuta cada vez que el emisor del evento de replication
decide presionar un nuevo valor.
Con sagas, tenemos que darle la vuelta al control. Es la saga la que debe tener el control cuando decide responder a la disponibilidad de nueva información de cambio. Dicho de otra manera, una saga necesita extraer la nueva información.
A continuación se muestra un ejemplo de una forma de lograr esto:
function* startReplication(wrapper) {
while (yield take(DATABASE_SET_CONFIGURATION)) {
yield apply(wrapper, wrapper.connect);
let replication = wrapper.replicate()
if (replication)
yield call(monitorChangeEvents, replication);
}
}
function* monitorChangeEvents(replication) {
const stream = createReadableStreamOfChanges(replication);
while (true) {
const info = yield stream.read(); // Blocks until the promise resolves
yield put(replicationChange(info));
}
}
// Returns a stream object that has read() method we can use to read new info.
// The read() method returns a Promise that will be resolved when info from a
// change event becomes available. This is what allows us to shift from working
// with a ''push-based'' model to a ''pull-based'' model.
function createReadableStreamOfChanges(replication) {
let deferred;
replication.on(''change'', info => {
if (!deferred) return;
deferred.resolve(info);
deferred = null;
});
return {
read() {
if (deferred)
return deferred.promise;
deferred = {};
deferred.promise = new Promise(resolve => deferred.resolve = resolve);
return deferred.promise;
}
};
}
Hay un JSbin del ejemplo anterior aquí: http://jsbin.com/cujudes/edit?js,console
También debería echar un vistazo a la respuesta de Yassine Elouafi a una pregunta similar: ¿Puedo usar los generadores es6 de redux-saga como un oyente de mensajes para websockets o fuentes de eventos?