python - create - sqlalchemy insert
InserciĆ³n masiva con SQLAlchemy ORM (10)
Esta es una forma:
values = [1, 2, 3]
Foo.__table__.insert().execute([{''bar'': x} for x in values])
Esto se insertará así:
INSERT INTO `foo` (`bar`) VALUES (1), (2), (3)
Referencia: las FAQ SQLAlchemy incluyen puntos de referencia para varios métodos de confirmación.
¿Hay alguna manera de hacer que SQLAlchemy haga una inserción masiva en lugar de insertar cada objeto individualmente? es decir,
obra:
INSERT INTO `foo` (`bar`) VALUES (1), (2), (3)
más bien que:
INSERT INTO `foo` (`bar`) VALUES (1)
INSERT INTO `foo` (`bar`) VALUES (2)
INSERT INTO `foo` (`bar`) VALUES (3)
Acabo de convertir un código para usar sqlalchemy en lugar de raw sql y, aunque ahora es mucho más agradable trabajar con él, ahora parece ser más lento (hasta un factor de 10), me pregunto si esta es la razón.
Puede ser que podría mejorar la situación usando sesiones de manera más eficiente. En este momento tengo autoCommit=False
y hago una session.commit()
después de haber agregado algunas cosas. Aunque esto parece causar que los datos se vuelvan obsoletos si el DB se cambia en otro lugar, como si incluso si hiciera una nueva consulta, ¿todavía obtuviera resultados viejos?
¡Gracias por tu ayuda!
Hasta donde yo sé, no hay forma de que el ORM emita inserciones en bloque. Creo que la razón subyacente es que SQLAlchemy necesita realizar un seguimiento de la identidad de cada objeto (es decir, nuevas claves primarias), y las inserciones masivas interfieren con eso. Por ejemplo, suponiendo que su tabla foo
contiene una columna de id
y está asignada a una clase Foo
:
x = Foo(bar=1)
print x.id
# None
session.add(x)
session.flush()
# BEGIN
# INSERT INTO foo (bar) VALUES(1)
# COMMIT
print x.id
# 1
Como SQLAlchemy recogió el valor de x.id
sin emitir otra consulta, podemos inferir que obtuvo el valor directamente de la INSERT
. Si no necesita acceso posterior a los objetos creados a través de las mismas instancias, puede omitir la capa ORM para su inserción:
Foo.__table__.insert().execute([{''bar'': 1}, {''bar'': 2}, {''bar'': 3}])
# INSERT INTO foo (bar) VALUES ((1,), (2,), (3,))
SQLAlchemy no puede hacer coincidir estas filas nuevas con ningún objeto existente, por lo que tendrá que consultarlas de nuevo para cualquier operación posterior.
En lo que respecta a los datos obsoletos, es útil recordar que la sesión no tiene una forma integrada de saber cuándo se cambia la base de datos fuera de la sesión. Para acceder a datos modificados externamente a través de instancias existentes, las instancias se deben marcar como caducadas . Esto ocurre de manera predeterminada en session.commit()
, pero se puede hacer manualmente llamando a session.expire_all()
o session.expire(instance)
. Un ejemplo (SQL omitido):
x = Foo(bar=1)
session.add(x)
session.commit()
print x.bar
# 1
foo.update().execute(bar=42)
print x.bar
# 1
session.expire(x)
print x.bar
# 42
session.commit()
expira x
, por lo que la primera declaración de impresión abre implícitamente una nueva transacción y vuelve a consultar los atributos de x
. Si comenta la primera declaración impresa, notará que la segunda ahora recoge el valor correcto, porque la nueva consulta no se emite hasta después de la actualización.
Esto tiene sentido desde el punto de vista del aislamiento transaccional: solo debe elegir modificaciones externas entre transacciones. Si esto te está causando problemas, te sugiero que clarifiques o re-pienses los límites de las transacciones de tu aplicación en lugar de llegar inmediatamente a session.expire_all()
.
La mejor respuesta que encontré hasta ahora fue en la documentación de sqlalchemy:
Hay un ejemplo completo de un punto de referencia de posibles soluciones.
Como se muestra en la documentación:
bulk_save_objects no es la mejor solución, pero su rendimiento es correcto.
La segunda mejor implementación en términos de legibilidad, creo que fue con el Core de SQLAlchemy:
def test_sqlalchemy_core(n=100000):
init_sqlalchemy()
t0 = time.time()
engine.execute(
Customer.__table__.insert(),
[{"name": ''NAME '' + str(i)} for i in xrange(n)]
)
El contexto de esta función se proporciona en el artículo de documentación.
La respuesta de Piere es correcta, pero un problema es que bulk_save_objects
por defecto no devuelve las claves primarias de los objetos, si eso le preocupa. Establezca return_defaults
en True
para obtener este comportamiento.
La documentación está here .
foos = [Foo(bar=''a'',), Foo(bar=''b''), Foo(bar=''c'')]
session.bulk_save_objects(foos, return_defaults=True)
for foo in foos:
assert foo.id is not None
session.commit()
Los documentos de sqlalchemy tienen una gran descripción sobre el rendimiento de varias técnicas que se pueden utilizar para inserciones masivas:
Los ORM básicamente no están destinados a inserciones masivas de alto rendimiento: esta es la razón por la cual SQLAlchemy ofrece el Core además del ORM como un componente de primera clase.
Para el caso de uso de inserciones masivas rápidas, el sistema de generación y ejecución de SQL sobre el que el ORM construye es parte del Núcleo. Usando este sistema directamente, podemos producir un INSERT que sea competitivo con el uso de la API de base de datos sin procesar directamente.
Alternativamente, SQLAlchemy ORM ofrece el conjunto de métodos Bulk Operations, que proporciona enlaces a las subsecciones del proceso de unidad de trabajo para emitir construcciones INSERT y UPDATE a nivel Core con un pequeño grado de automatización basada en ORM.
El siguiente ejemplo ilustra las pruebas basadas en el tiempo para varios métodos diferentes de insertar filas, yendo del más automatizado al menos. Con cPython 2.7, los tiempos de ejecución observaron:
classics-MacBook-Pro:sqlalchemy classic$ python test.py SQLAlchemy ORM: Total time for 100000 records 12.0471920967 secs SQLAlchemy ORM pk given: Total time for 100000 records 7.06283402443 secs SQLAlchemy ORM bulk_save_objects(): Total time for 100000 records 0.856323003769 secs SQLAlchemy Core: Total time for 100000 records 0.485800027847 secs sqlite3: Total time for 100000 records 0.487842082977 sec
Guión:
import time import sqlite3 from sqlalchemy.ext.declarative import declarative_base from sqlalchemy import Column, Integer, String, create_engine from sqlalchemy.orm import scoped_session, sessionmaker Base = declarative_base() DBSession = scoped_session(sessionmaker()) engine = None class Customer(Base): __tablename__ = "customer" id = Column(Integer, primary_key=True) name = Column(String(255)) def init_sqlalchemy(dbname=''sqlite:///sqlalchemy.db''): global engine engine = create_engine(dbname, echo=False) DBSession.remove() DBSession.configure(bind=engine, autoflush=False, expire_on_commit=False) Base.metadata.drop_all(engine) Base.metadata.create_all(engine) def test_sqlalchemy_orm(n=100000): init_sqlalchemy() t0 = time.time() for i in xrange(n): customer = Customer() customer.name = ''NAME '' + str(i) DBSession.add(customer) if i % 1000 == 0: DBSession.flush() DBSession.commit() print( "SQLAlchemy ORM: Total time for " + str(n) + " records " + str(time.time() - t0) + " secs") def test_sqlalchemy_orm_pk_given(n=100000): init_sqlalchemy() t0 = time.time() for i in xrange(n): customer = Customer(id=i+1, name="NAME " + str(i)) DBSession.add(customer) if i % 1000 == 0: DBSession.flush() DBSession.commit() print( "SQLAlchemy ORM pk given: Total time for " + str(n) + " records " + str(time.time() - t0) + " secs") def test_sqlalchemy_orm_bulk_insert(n=100000): init_sqlalchemy() t0 = time.time() n1 = n while n1 > 0: n1 = n1 - 10000 DBSession.bulk_insert_mappings( Customer, [ dict(name="NAME " + str(i)) for i in xrange(min(10000, n1)) ] ) DBSession.commit() print( "SQLAlchemy ORM bulk_save_objects(): Total time for " + str(n) + " records " + str(time.time() - t0) + " secs") def test_sqlalchemy_core(n=100000): init_sqlalchemy() t0 = time.time() engine.execute( Customer.__table__.insert(), [{"name": ''NAME '' + str(i)} for i in xrange(n)] ) print( "SQLAlchemy Core: Total time for " + str(n) + " records " + str(time.time() - t0) + " secs") def init_sqlite3(dbname): conn = sqlite3.connect(dbname) c = conn.cursor() c.execute("DROP TABLE IF EXISTS customer") c.execute( "CREATE TABLE customer (id INTEGER NOT NULL, " "name VARCHAR(255), PRIMARY KEY(id))") conn.commit() return conn def test_sqlite3(n=100000, dbname=''sqlite3.db''): conn = init_sqlite3(dbname) c = conn.cursor() t0 = time.time() for i in xrange(n): row = (''NAME '' + str(i),) c.execute("INSERT INTO customer (name) VALUES (?)", row) conn.commit() print( "sqlite3: Total time for " + str(n) + " records " + str(time.time() - t0) + " sec") if __name__ == ''__main__'': test_sqlalchemy_orm(100000) test_sqlalchemy_orm_pk_given(100000) test_sqlalchemy_orm_bulk_insert(100000) test_sqlalchemy_core(100000) test_sqlite3(100000)
Normalmente lo hago usando add_all
.
from app import session
from models import User
objects = [User(name="u1"), User(name="u2"), User(name="u3")]
session.add_all(objects)
session.commit()
SQLAlchemy introdujo eso en la versión 1.0.0
:
Operaciones a granel: documentos de SQLAlchemy
Con estas operaciones, ¡ahora puede hacer inserciones o actualizaciones masivas!
Por ejemplo (si desea la sobrecarga más baja para la tabla simple INSERT), puede usar Session.bulk_insert_mappings()
:
loadme = [
(1, ''a'')
, (2, ''b'')
, (3, ''c'')
]
dicts = []
for i in range(len(loadme)):
dicts.append(dict(bar=loadme[i][0], fly=loadme[i][1]))
s = Session()
s.bulk_insert_mappings(Foo, dicts)
s.commit()
O, si lo desea, omita las tuplas loadme
y escriba los diccionarios directamente en dicts
(pero me resulta más fácil dejar todo el palabreo fuera de los datos y cargar una lista de diccionarios en un bucle).
SQLAlchemy introdujo eso en la versión 1.0.0
:
Operaciones a granel: documentos de SQLAlchemy
Con estas operaciones, ¡ahora puede hacer inserciones o actualizaciones masivas!
Por ejemplo, puedes hacer:
s = Session()
objects = [
User(name="u1"),
User(name="u2"),
User(name="u3")
]
s.bulk_save_objects(objects)
s.commit()
Aquí, se realizará una inserción masiva.
Se agregó soporte directo a SQLAlchemy a partir de la versión 0.8
De acuerdo con los docs , connection.execute(table.insert().values(data))
debería hacer el truco. (Tenga en cuenta que esto no es lo mismo que connection.execute(table.insert(), data)
que da como resultado muchas inserciones de fila individuales a través de una llamada para executemany
). En cualquier cosa que no sea una conexión local, la diferencia en el rendimiento puede ser enorme.
Todos los caminos conducen a Roma , pero algunos de ellos cruzan montañas, requieren transbordadores, pero si quiere llegar rápidamente, tome la autopista.
En este caso, la autopista utilizará la función execute_batch() de psycopg2 . La documentación dice que es la mejor:
La implementación actual de executemany()
es (utilizando una subestimación extremadamente caritativa) que no funciona particularmente. Estas funciones se pueden usar para acelerar la ejecución repetida de una declaración contra un conjunto de parámetros. Al reducir el número de viajes de ida y vuelta del servidor, el rendimiento puede ser de órdenes de magnitud mejor que con executemany()
.
En mi propia prueba execute_batch()
es aproximadamente dos veces más rápido que executemany()
, y le da la opción de configurar el tamaño de la página para realizar más ajustes (si desea exprimir el último 2-3% de rendimiento del controlador).
La misma función se puede habilitar fácilmente si está utilizando SQLAlchemy estableciendo use_batch_mode=True
como parámetro cuando use_batch_mode=True
una instancia del motor con create_engine()