python - remove - query document mongodb
Fast or Bulk Upsert en pymongo (7)
¿Cómo puedo hacer un postre a granel en pymongo? Quiero actualizar un montón de entradas y hacerlas de una en una es muy lento.
La respuesta a una pregunta casi idéntica es aquí: Bulk update / upsert en MongoDB?
La respuesta aceptada en realidad no responde la pregunta. Simplemente da un enlace a la CLI de mongo para hacer importaciones / exportaciones.
También estaría abierto a que alguien me explique por qué no es posible / no es una mejor práctica realizar un aporte masivo, pero explique cuál es la solución preferida para este tipo de problema.
¡Gracias!
Actualización masiva más rápida con Python 3.5+, motor y asyncio:
import asyncio
import datetime
import logging
import random
import time
import motor.motor_asyncio
import pymongo.errors
async def execute_bulk(bulk):
try:
await bulk.execute()
except pymongo.errors.BulkWriteError as err:
logging.error(err.details)
async def main():
cnt = 0
bulk = db.initialize_unordered_bulk_op()
tasks = []
async for document in db.find({}, {}, no_cursor_timeout=True):
cnt += 1
bulk.find({''_id'': document[''_id'']}).update({''$set'': {"random": random.randint(0,10)}})
if not cnt % 1000:
task = asyncio.ensure_future(execute_bulk(bulk))
tasks.append(task)
bulk = db.initialize_unordered_bulk_op()
if cnt % 1000:
task = asyncio.ensure_future(bulk.execute(bulk))
tasks.append(task)
logging.info(''%s processed'', cnt)
await asyncio.gather(*tasks)
logging.basicConfig(level=''INFO'')
db = motor.motor_asyncio.AsyncIOMotorClient()[''database''][''collection'']
start_time = time.time()
loop = asyncio.get_event_loop()
try:
loop.run_until_complete(main())
finally:
execution_time = time.time() - start_time
logging.info(''Execution time: %s'', datetime.timedelta(seconds=execution_time))
La respuesta sigue siendo la misma: no hay soporte para envíos masivos.
Las versiones modernas de pymongo (mayor que 3.x) envuelven operaciones masivas en una interfaz consistente que baja la calificación donde la versión del servidor no admite operaciones masivas. Esto ahora es consistente en los controladores oficialmente compatibles con MongoDB.
Por lo tanto, el método preferido para la codificación es usar bulk_write()
lugar, donde utiliza una acción de operación UpdateOne
otra adecuada. Y ahora, por supuesto, es preferible utilizar las listas de idiomas naturales en lugar de un constructor específico
La traducción directa de la antigua documentación:
from pymongo import UpdateOne
operations = [
UpdateOne({ "field1": 1},{ "$push": { "vals": 1 } },upsert=True),
UpdateOne({ "field1": 1},{ "$push": { "vals": 2 } },upsert=True),
UpdateOne({ "field1": 1},{ "$push": { "vals": 3 } },upsert=True)
]
result = collection.bulk_write(operations)
O el ciclo clásico de transformación de documentos:
import random
from pymongo import UpdateOne
random.seed()
operations = []
for doc in collection.find():
# Set a random number on every document update
operations.append(
UpdateOne({ "_id": doc["_id"] },{ "$set": { "random": random.randint(0,10) } })
)
# Send once every 1000 in batch
if ( len(operations) == 1000 ):
collection.bulk_write(operations,ordered=False)
operations = []
if ( len(operations) > 0 ):
collection.bulk_write(operations,ordered=False)
El resultado devuelto es de BulkWriteResult
que contendrá los contadores de los documentos coincidentes y actualizados, así como los valores _id
devueltos para los "upserts" que se produzcan.
Existe un error de concepción sobre el tamaño de la matriz de operaciones masivas. La solicitud real tal como se envió al servidor no puede superar el límite de BSON de 16 MB, ya que ese límite también se aplica a la "solicitud" enviada al servidor que también utiliza el formato BSON.
Sin embargo, eso no gobierna el tamaño de la matriz de solicitud que puede compilar, ya que las operaciones reales solo se enviarán y procesarán en lotes de 1000 de todos modos. La única restricción real es que esas 1000 instrucciones de operación en sí mismas no crean realmente un documento BSON de más de 16 MB. Que de hecho es una orden bastante difícil.
El concepto general de los métodos masivos es "menos tráfico", como resultado de enviar muchas cosas a la vez y solo tratar con una respuesta del servidor. La reducción de esa sobrecarga adjunta a cada solicitud de actualización ahorra mucho tiempo.
MongoDB 2.6+ tiene soporte para operaciones masivas. Esto incluye inserciones en bloque, actualizaciones, actualizaciones, etc. El objetivo de esto es reducir / eliminar las demoras de la latencia de ida y vuelta de las operaciones de registro por registro (''documento por documento'' es correcto).
¿Entonces, cómo funciona esto? Ejemplo en Python, porque eso es en lo que estoy trabajando
>>> import pymongo
>>> pymongo.version
''2.7rc0''
Para usar esta característica, creamos un objeto ''masivo'', le agregamos documentos, luego ejecutamos y enviaremos todas las actualizaciones a la vez. Advertencias: El tamaño BSON de las operaciones recopiladas (suma de los bsonsizes) no puede sobrepasar el límite de tamaño de documento de 16 MB. Por supuesto, el número de operaciones puede variar significativamente, su millaje puede variar.
Ejemplo en Pymongo de la operación de inserción a granel:
import pymongo
conn = pymongo.MongoClient(''myserver'', 8839)
db = conn[''mydbname'']
coll = db.myCollection
bulkop = coll.initialize_ordered_bulk_op()
retval = bulkop.find({''field1'':1}).upsert().update({''$push'':{''vals'':1})
retval = bulkop.find({''field1'':1}).upsert().update({''$push'':{''vals'':2})
retval = bulkop.find({''field1'':1}).upsert().update({''$push'':{''vals'':3})
retval = bulkop.execute()
Este es el método esencial. Más información disponible en:
Puede actualizar todos los documentos que coincidan con su especificación de consulta utilizando multi = True.
Aquí hay un error acerca de hacer un lote de comandos de la manera que desee.
disponible solo en pymongo 2.7+
si tiene muchos datos, y desea usar "_id" para juzgar si existen datos,
puedes probar...
import pymongo
from pymongo import UpdateOne
client = pymongo.MongoClient(''localhost'', 27017)
db=client[''sampleDB'']
collectionInfo = db.sample
#sample data
datas=[
{"_id":123456,"name":"aaa","N":1,"comment":"first sample","lat":22,"lng":33},
{"_id":234567,"name":"aaa","N":1,"comment":"second sample","lat":22,"lng":33},
{"_id":345678,"name":"aaa","N":1,"comment":"xxx sample","lat":22,"lng":33},
{"_id":456789,"name":"aaa","N":1,"comment":"yyy sample","lat":22,"lng":33},
{"_id":123456,"name":"aaaaaaaaaaaaaaaaaa","N":1,"comment":"zzz sample","lat":22,"lng":33},
{"_id":11111111,"name":"aaa","N":1,"comment":"zzz sample","lat":22,"lng":33}
]
#you should split judge item and other data
ids=[data.pop("_id") for data in datas]
operations=[UpdateOne({"_id":idn},{''$set'':data},upsert=True) for idn ,data in zip(ids,datas)]
collectionInfo.bulk_write(operations)
Mi inglés es muy pobre, si no puedes entender lo que digo, lo siento