arithmetic - mongodb date operators
¿Cómo escuchar los cambios en una colección MongoDB? (9)
Mira esto: Cambiar las transmisiones
10 de enero de 2018 - Versión 3.6
* EDIT: escribí un artículo sobre cómo hacer esto https://medium.com/riow/mongodb-data-collection-change-85b63d96ff76
https://docs.mongodb.com/v3.6/changeStreams/
Es nuevo en mongodb 3.6 https://docs.mongodb.com/manual/release-notes/3.6/ 2018/01/10
$ mongod --version
db version v3.6.2
Para usar changeStreams, la base de datos debe ser un conjunto de replicación
Más acerca de los conjuntos de replicación: https://docs.mongodb.com/manual/replication/
Su base de datos será un " Standalone " por defecto.
Cómo convertir un Standalone a un conjunto de réplicas: https://docs.mongodb.com/manual/tutorial/convert-standalone-to-replica-set/
El siguiente ejemplo es una aplicación práctica de cómo puede usar esto.
* Específicamente para Nodo.
/* file.js */
''use strict''
module.exports = function (
app,
io,
User // Collection Name
) {
// SET WATCH ON COLLECTION
const changeStream = User.watch();
// Socket Connection
io.on(''connection'', function (socket) {
console.log(''Connection!'');
// USERS - Change
changeStream.on(''change'', function(change) {
console.log(''COLLECTION CHANGED'');
User.find({}, (err, data) => {
if (err) throw err;
if (data) {
// RESEND ALL USERS
socket.emit(''users'', data);
}
});
});
});
};
/* END - file.js */
Enlaces útiles:
https://docs.mongodb.com/manual/tutorial/convert-standalone-to-replica-set
https://docs.mongodb.com/manual/tutorial/change-streams-example
https://docs.mongodb.com/v3.6/tutorial/change-streams-example
http://plusnconsulting.com/post/MongoDB-Change-Streams
Estoy creando una especie de sistema de cola de trabajos en segundo plano con MongoDB como almacén de datos. ¿Cómo puedo "escuchar" insertos en una colección MongoDB antes de generar trabajadores para procesar el trabajo? ¿Tengo que sondear cada pocos segundos para ver si hay algún cambio desde la última vez, o hay alguna manera en que mi script pueda esperar que ocurran las inserciones? Este es un proyecto de PHP en el que estoy trabajando, pero no dude en responder en Ruby o en un idioma independiente.
Alternativamente, puede usar el método Mongo FindAndUpdate estándar, y dentro de la devolución de llamada, desencadenar un evento EventEmitter (en Node) cuando se ejecuta la devolución de llamada.
Cualquier otra parte de la aplicación o arquitectura que escuche este evento será notificada de la actualización y cualquier información relevante enviada allí también. Esta es una forma muy simple de lograr notificaciones de Mongo.
Desde MongoDB 3.6 habrá una nueva API de notificaciones llamada Change Streams que puede usar para esto. Vea esta publicación en el blog para ver un ejemplo . Ejemplo de eso:
cursor = client.my_db.my_collection.changes([
{''$match'': {
''operationType'': {''$in'': [''insert'', ''replace'']}
}},
{''$match'': {
''newDocument.n'': {''$gte'': 1}
}}
])
# Loops forever.
for change in cursor:
print(change[''newDocument''])
En realidad, en lugar de ver resultados, ¿por qué no se avisa cuando se inserta algo nuevo utilizando middleware provisto por el esquema de mangosta?
Puede ver el evento de insertar un nuevo documento y hacer algo después de esta inserción
Hay un ejemplo de Java que se puede encontrar here .
MongoClient mongoClient = new MongoClient();
DBCollection coll = mongoClient.getDatabase("local").getCollection("oplog.rs");
DBCursor cur = coll.find().sort(BasicDBObjectBuilder.start("$natural", 1).get())
.addOption(Bytes.QUERYOPTION_TAILABLE | Bytes.QUERYOPTION_AWAITDATA);
System.out.println("== open cursor ==");
Runnable task = () -> {
System.out.println("/tWaiting for events");
while (cur.hasNext()) {
DBObject obj = cur.next();
System.out.println( obj );
}
};
new Thread(task).start();
La clave es OPCIONES DE CONSULTA dadas aquí.
También puede cambiar la consulta de búsqueda, si no necesita cargar todos los datos cada vez.
BasicDBObject query= new BasicDBObject();
query.put("ts", new BasicDBObject("$gt", new BsonTimestamp(1471952088, 1))); //timestamp is within some range
query.put("op", "i"); //Only insert operation
DBCursor cur = coll.find(query).sort(BasicDBObjectBuilder.start("$natural", 1).get())
.addOption(Bytes.QUERYOPTION_TAILABLE | Bytes.QUERYOPTION_AWAITDATA);
La versión 3.6 de MongoDB ahora incluye flujos de cambios, que es esencialmente una API en la parte superior del OpLog que permite casos de uso similares a los de activación / notificación.
Aquí hay un enlace a un ejemplo de Java: http://mongodb.github.io/mongo-java-driver/3.6/driver/tutorials/change-streams/
Un ejemplo de NodeJS podría ser algo así como:
var MongoClient = require(''mongodb'').MongoClient;
MongoClient.connect("mongodb://localhost:22000/MyStore?readConcern=majority")
.then(function(client){
let db = client.db(''MyStore'')
let change_streams = db.collection(''products'').watch()
change_streams.on(''change'', function(change){
console.log(JSON.stringify(change));
});
});
Lo que estás pensando suena como disparadores. MongoDB no tiene ningún soporte para desencadenantes, sin embargo, algunas personas han "hecho las suyas" utilizando algunos trucos. La clave aquí es el oplog.
Cuando ejecuta MongoDB en un conjunto de réplicas, todas las acciones de MongoDB se registran en un registro de operaciones (conocido como oplog). El oplog es básicamente una lista de las modificaciones realizadas en los datos. Réplicas Establece la función escuchando los cambios en este oplog y luego aplicando los cambios localmente.
¿Suena familiar?
Aquí no puedo detallar todo el proceso, hay varias páginas de documentación, pero las herramientas que necesita están disponibles.
Primero algunos comentarios sobre el oplog - Breve descripción - Diseño de la colección local
(que contiene el oplog)
También querrás aprovechar los cursores disponibles . Estos le proporcionarán una forma de escuchar los cambios en lugar de sondearlos. Tenga en cuenta que la replicación utiliza cursores disponibles, por lo que esta es una función admitida.
MongoDB tiene lo que se denomina capped collections
y tailable cursors
que permiten a MongoDB enviar datos a los oyentes.
Una capped collection
es esencialmente una colección que tiene un tamaño fijo y solo permite inserciones. Esto es lo que se vería para crear uno:
db.createCollection("messages", { capped: true, size: 100000000 })
Cursores disponibles de MongoDB ( publicación original de Jonathan H. Wage )
Rubí
coll = db.collection(''my_collection'')
cursor = Mongo::Cursor.new(coll, :tailable => true)
loop do
if doc = cursor.next_document
puts doc
else
sleep 1
end
end
PHP
$mongo = new Mongo();
$db = $mongo->selectDB(''my_db'')
$coll = $db->selectCollection(''my_collection'');
$cursor = $coll->find()->tailable(true);
while (true) {
if ($cursor->hasNext()) {
$doc = $cursor->getNext();
print_r($doc);
} else {
sleep(1);
}
}
Python (por Robert Stewart)
from pymongo import Connection
import time
db = Connection().my_db
coll = db.my_collection
cursor = coll.find(tailable=True)
while cursor.alive:
try:
doc = cursor.next()
print doc
except StopIteration:
time.sleep(1)
Perl (por Max )
use 5.010;
use strict;
use warnings;
use MongoDB;
my $db = MongoDB::Connection->new;
my $coll = $db->my_db->my_collection;
my $cursor = $coll->find->tailable(1);
for (;;)
{
if (defined(my $doc = $cursor->next))
{
say $doc;
}
else
{
sleep 1;
}
}
Recursos adicionales:
Un artículo que habla de cursores disponibles con más detalle.
Ejemplos de PHP, Ruby, Python y Perl del uso de cursores disponibles.
Muchas de estas respuestas solo le darán nuevos registros y no actualizaciones y / o son extremadamente ineficaces.
La única manera confiable y efectiva de hacerlo es crear un cursor de disponibilidad en la colección local db: oplog.rs para obtener TODOS los cambios en MongoDB y hacer con él lo que usted desee. (¡MongoDB incluso hace esto más o menos para soportar la replicación!)
Explicación de lo que contiene el oplog: https://www.compose.com/articles/the-mongodb-oplog-and-node-js/
Ejemplo de una biblioteca Node.js que proporciona una API sobre lo que está disponible para hacer con el oplog: https://github.com/cayasso/mongo-oplog