python - Acelerando pandas.DataFrame.to_sql con fast_executemany de pyODBC
sqlalchemy pandas-to-sql (8)
Rendimiento de SQL Server INSERT: pyodbc vs. turbodbc
Cuando se utiliza to_sql
para cargar un DataFrame de pandas en SQL Server, turbodbc definitivamente será más rápido que pyodbc sin fast_executemany
. Sin embargo, con fast_executemany
habilitado para pyodbc, ambos enfoques producen esencialmente el mismo rendimiento.
Entornos de prueba:
[venv1_pyodbc]
pyodbc 2.0.25
[venv2_turbodbc]
turbodbc 3.0.0
sqlalchemy-turbodbc 0.1.0
[común a ambos]
Python 3.6.4 de 64 bits en Windows
SQLAlchemy 1.3.0b1
pandas 0.23.4
numpy 1.15.4
Código de prueba:
# for pyodbc
engine = create_engine(''mssql+pyodbc://sa:whatever@SQL_panorama'', fast_executemany=True)
# for turbodbc
# engine = create_engine(''mssql+turbodbc://sa:whatever@SQL_panorama'')
# test data
num_rows = 10000
num_cols = 100
df = pd.DataFrame(
[[f''row{x:04}col{y:03}'' for y in range(num_cols)] for x in range(num_rows)],
columns=[f''col{y:03}'' for y in range(num_cols)]
)
t0 = time.time()
df.to_sql("sqlalchemy_test", engine, if_exists=''replace'', index=None)
print(f"pandas wrote {num_rows} rows in {(time.time() - t0):0.1f} seconds")
Las pruebas se realizaron doce (12) veces para cada entorno, descartando el mejor y el peor momento para cada uno. Resultados (en segundos):
rank pyodbc turbodbc
---- ------ --------
1 22.8 27.5
2 23.4 28.1
3 24.6 28.2
4 25.2 28.5
5 25.7 29.3
6 26.9 29.9
7 27.0 31.4
8 30.1 32.1
9 33.6 32.5
10 39.8 32.9
---- ------ --------
average 27.9 30.0
Me gustaría enviar un pandas.DataFrame
grande a un servidor remoto que ejecute MS SQL. La forma en que lo hago ahora es mediante la conversión de un objeto data_frame
a una lista de tuplas y luego enviarlo con la función executemany()
de executemany()
. Es algo parecido a esto:
import pyodbc as pdb
list_of_tuples = convert_df(data_frame)
connection = pdb.connect(cnxn_str)
cursor = connection.cursor()
cursor.fast_executemany = True
cursor.executemany(sql_statement, list_of_tuples)
connection.commit()
cursor.close()
connection.close()
Entonces comencé a preguntarme si las cosas pueden acelerarse (o al menos ser más legibles) utilizando el método data_frame.to_sql()
. Se me ha ocurrido la siguiente solución:
import sqlalchemy as sa
engine = sa.create_engine("mssql+pyodbc:///?odbc_connect=%s" % cnxn_str)
data_frame.to_sql(table_name, engine, index=False)
Ahora el código es más legible, pero la carga es al menos 150 veces más lenta ...
¿Hay alguna forma de fast_executemany
la fast_executemany
cuando se usa SQLAlchemy?
Estoy usando pandas-0.20.3, pyODBC-4.0.21 y sqlalchemy-1.1.13.
Después de contactar a los desarrolladores de SQLAlchemy, surgió una manera de resolver este problema. Muchas gracias a ellos por el gran trabajo!
Uno tiene que usar un evento de ejecución del cursor y verificar si se ha executemany
indicador de executemany
. Si ese es el caso, fast_executemany
opción fast_executemany
. Por ejemplo:
from sqlalchemy import event
@event.listens_for(engine, ''before_cursor_execute'')
def receive_before_cursor_execute(conn, cursor, statement, params, context, executemany):
if executemany:
cursor.fast_executemany = True
Más información sobre eventos de ejecución se puede encontrar here .
ACTUALIZACIÓN: Se fast_executemany
soporte para fast_executemany
de pyodbc
en SQLAlchemy 1.3.0 , por lo que este hackeo ya no es necesario.
Me encontré con el mismo problema pero usando PostgreSQL. Ahora simplemente lanzan la versión 0.24.0 de pandas y hay un nuevo parámetro en la función to_sql
llamado method
que resolvió mi problema.
from sqlalchemy import create_engine
engine = create_engine(your_options)
data_frame.to_sql(table_name, engine, method="multi")
La velocidad de carga es 100 veces más rápida para mí. También recomiendo establecer el parámetro chunksize
si va a enviar muchos datos.
Parece que los Pandas 0.23.0 y 0.24.0 usan inserciones de múltiples valores con PyODBC, lo que impide que la ejecución rápida ayude: se emite una única INSERT ... VALUES ...
por fragmento. Los fragmentos de inserción de valores múltiples son una mejora con respecto al antiguo valor predeterminado de ejecución lenta, pero al menos en las pruebas simples todavía prevalece el método de ejecución rápida, por no mencionar que no es necesario chunksize
cálculos de chunksize
manual, como se requiere con las inserciones de valores múltiples. Se puede forzar el comportamiento anterior mediante la aplicación de pareo, si no se proporciona una opción de configuración en el futuro:
import pandas.io.sql
def insert_statement(self, data, conn):
return self.table.insert(), data
pandas.io.sql.SQLTable.insert_statement = insert_statement
El futuro está aquí y, al menos en la rama master
, el método de inserción se puede controlar mediante el method=
argumento de palabra clave method=
de to_sql()
. El valor predeterminado es None
, lo que fuerza el método de ejecución. Al pasar el method=''multi''
obtiene el uso de la inserción de valores múltiples. Incluso se puede utilizar para implementar enfoques específicos de DBMS, como COPY
Postgresql.
Según lo señalado por @Pylander
¡Turbodbc es la mejor opción para la ingesta de datos, por mucho!
Me emocioné tanto que escribí un ''blog'' en mi github y medio: por favor, visite https://medium.com/@erickfis/etl-process-with-turbodbc-1d19ed71510e
para un ejemplo de trabajo y comparación con pandas.to_sql
Larga historia corta,
con turbodbc tengo 10000 líneas (77 columnas) en 3 segundos
con pandas.to_sql Tengo las mismas 10000 líneas (77 columnas) en 198 segundos ...
Y esto es lo que estoy haciendo con todo detalle.
Las importaciones:
import sqlalchemy
import pandas as pd
import numpy as np
import turbodbc
import time
Cargue y trate algunos datos: sustituya mi sample.pkl por el suyo:
df = pd.read_pickle(''sample.pkl'')
df.columns = df.columns.str.strip() # remove white spaces around column names
df = df.applymap(str.strip) # remove white spaces around values
df = df.replace('''', np.nan) # map nans, to drop NAs rows and columns later
df = df.dropna(how=''all'', axis=0) # remove rows containing only NAs
df = df.dropna(how=''all'', axis=1) # remove columns containing only NAs
df = df.replace(np.nan, ''NA'') # turbodbc hates null values...
Crea la tabla usando sqlAlchemy
Desafortunadamente, turbodbc requiere mucha sobrecarga con una gran cantidad de trabajo manual de SQL, para crear las tablas y para insertar datos en ellas.
Afortunadamente, Python es pura alegría y podemos automatizar este proceso de escritura de código SQL.
El primer paso es crear la tabla que recibirá nuestros datos. Sin embargo, crear la tabla manualmente escribiendo el código SQL puede ser problemático si su tabla tiene más de unas pocas columnas. En mi caso, muy a menudo las tablas tienen 240 columnas!
Aquí es donde sqlAlchemy y pandas aún pueden ayudarnos: los pandas son malos para escribir un gran número de filas (10000 en este ejemplo), pero ¿qué pasa con solo 6 filas, la cabecera de la tabla? De esta manera, automatizamos el proceso de creación de las tablas.
Crear conexión sqlAlchemy:
mydb = ''someDB''
def make_con(db):
"""Connect to a specified db."""
database_connection = sqlalchemy.create_engine(
''mssql+pymssql://{0}:{1}@{2}/{3}''.format(
myuser, mypassword,
myhost, db
)
)
return database_connection
pd_connection = make_con(mydb)
Crear tabla en SQL Server
Usando pandas + sqlAlchemy, pero solo para preparar espacio para turbodbc como se mencionó anteriormente. Tenga en cuenta que df.head () aquí: estamos utilizando pandas + sqlAlchemy para insertar solo 6 filas de nuestros datos. Esto se ejecutará bastante rápido y se está haciendo para automatizar la creación de la tabla.
table = ''testing''
df.head().to_sql(table, con=pd_connection, index=False)
Ahora que la mesa ya está en su lugar, seamos serios aquí.
Conexión Turbodbc:
def turbo_conn(mydb):
"""Connect to a specified db - turbo."""
database_connection = turbodbc.connect(
driver=''ODBC Driver 17 for SQL Server'',
server=myhost,
database=mydb,
uid=myuser,
pwd=mypassword
)
return database_connection
Preparando comandos de sql y datos para turbodbc. Vamos a automatizar esta creación de código siendo creativo:
def turbo_write(mydb, df, table):
"""Use turbodbc to insert data into sql."""
start = time.time()
# preparing columns
colunas = ''(''
colunas += '', ''.join(df.columns)
colunas += '')''
# preparing value place holders
val_place_holder = [''?'' for col in df.columns]
sql_val = ''(''
sql_val += '', ''.join(val_place_holder)
sql_val += '')''
# writing sql query for turbodbc
sql = f"""
INSERT INTO {mydb}.dbo.{table} {colunas}
VALUES {sql_val}
"""
# writing array of values for turbodbc
valores_df = [df[col].values for col in df.columns]
# cleans the previous head insert
with connection.cursor() as cursor:
cursor.execute(f"delete from {mydb}.dbo.{table}")
connection.commit()
# inserts data, for real
with connection.cursor() as cursor:
try:
cursor.executemanycolumns(sql, valores_df)
connection.commit()
except Exception:
connection.rollback()
print(''something went wrong'')
stop = time.time() - start
return print(f''finished in {stop} seconds'')
Escribiendo datos usando turbodbc - Tengo 10000 líneas (77 columnas) en 3 segundos:
turbo_write(mydb, df.sample(10000), table)
Comparación del método de pandas: tengo las mismas 10000 líneas (77 columnas) en 198 segundos ...
table = ''pd_testing''
def pandas_comparisson(df, table):
"""Load data using pandas."""
start = time.time()
df.to_sql(table, con=pd_connection, index=False)
stop = time.time() - start
return print(f''finished in {stop} seconds'')
pandas_comparisson(df.sample(10000), table)
Ambiente y condiciones
Python 3.6.7 :: Anaconda, Inc.
TURBODBC version ‘3.0.0’
sqlAlchemy version ‘1.2.12’
pandas version ‘0.23.4’
Microsoft SQL Server 2014
user with bulk operations privileges
Consulte https://erickfis.github.io/loose-code/ para ver las actualizaciones de este código.
Solo quería añadir a la respuesta de @JK.
Si está utilizando este enfoque:
@event.listens_for(engine, ''before_cursor_execute'')
def receive_before_cursor_execute(conn, cursor, statement, params, context, executemany):
if executemany:
cursor.fast_executemany = True
Y está recibiendo este error:
"sqlalchemy.exc.DBAPIError: (pyodbc.Error) (''HY010'', ''[HY010] [Microsoft] [SQL Server Native Client 11.0] Error de secuencia de función (0) (SQLParamData)'') [SQL: ''INSERT INTO .. . (...) VALORES (?,?) ''] [Parámetros: ((..., ...), (..., ...)] (Antecedentes de este error en: http://sqlalche.me/e/dbapi ) "
Codifique sus valores de cadena de esta manera: ''yourStringValue''.encode(''ascii'')
Esto resolverá tu problema.
Solo quería publicar este ejemplo completo como una opción adicional de alto rendimiento para aquellos que pueden usar la nueva biblioteca turbodbc: http://turbodbc.readthedocs.io/en/latest/
Claramente hay muchas opciones en el flujo entre pandas .to_sql (), activando fast_executemany a través de sqlalchemy, usando pyodbc directamente con tuplas / listas / etc, o incluso intentando BULK UPLOAD con archivos planos.
Con suerte, lo siguiente podría hacer la vida más placentera a medida que la funcionalidad evolucione en el proyecto pandas actual o incluya algo como la integración de turbodbc en el futuro.
import pandas as pd
import numpy as np
from turbodbc import connect, make_options
from io import StringIO
test_data = ''''''id,transaction_dt,units,measures
1,2018-01-01,4,30.5
1,2018-01-03,4,26.3
2,2018-01-01,3,12.7
2,2018-01-03,3,8.8''''''
df_test = pd.read_csv(StringIO(test_data), sep='','')
df_test[''transaction_dt''] = pd.to_datetime(df_test[''transaction_dt''])
options = make_options(parameter_sets_to_buffer=1000)
conn = connect(driver=''{SQL Server}'', server=''server_nm'', database=''db_nm'', turbodbc_options=options)
test_query = ''''''DROP TABLE IF EXISTS [db_name].[schema].[test]
CREATE TABLE [db_name].[schema].[test]
(
id int NULL,
transaction_dt datetime NULL,
units int NULL,
measures float NULL
)
INSERT INTO [db_name].[schema].[test] (id,transaction_dt,units,measures)
VALUES (?,?,?,?) ''''''
cursor.executemanycolumns(test_query, [df_test[''id''].values, df_test[''transaction_dt''].values, df_test[''units''].values, df_test[''measures''].values]
turbodbc debería ser MUY rápido en muchos casos de uso (particularmente con matrices numpy). Observe lo sencillo que es pasar las matrices de números subyacentes de las columnas del marco de datos como parámetros a la consulta directamente. También creo que esto ayuda a prevenir la creación de objetos intermedios que aumentan el consumo de memoria. Espero que esto sea de ayuda!
EDITAR (08/03/2019): Gord Thompson comentó a continuación con buenas noticias de los registros de actualización de sqlalchemy: Desde SQLAlchemy 1.3.0, publicado el 2019-03-04, sqlalchemy ahora admite engine = create_engine(sqlalchemy_url, fast_executemany=True)
para mssql+pyodbc
dialecto mssql+pyodbc
. Es decir, ya no es necesario definir una función y usar @event.listens_for(engine, ''before_cursor_execute'')
significa que la función que se muestra a continuación se puede eliminar y solo se debe establecer el indicador en la declaración create_engine, y aún se mantiene la velocidad arriba.
Mensaje original:
Acaba de hacer una cuenta para publicar esto. Quería comentar debajo del hilo anterior ya que es un seguimiento de la respuesta ya provista. La solución anterior me funcionó con el controlador SQL de la Versión 17 en una escritura de almacenamiento Microsft SQL desde una instalación basada en Ubuntu.
El código completo que utilicé para acelerar las cosas significativamente (hablando> 100 veces la velocidad) está debajo. Este es un fragmento de código llave en mano siempre que altere la cadena de conexión con sus detalles relevantes. Al cartel de arriba, muchas gracias por la solución, ya que estaba buscando bastante tiempo para esto.
import pandas as pd
import numpy as np
import time
from sqlalchemy import create_engine, event
from urllib.parse import quote_plus
conn = "DRIVER={ODBC Driver 17 for SQL Server};SERVER=IP_ADDRESS;DATABASE=DataLake;UID=USER;PWD=PASS"
quoted = quote_plus(conn)
new_con = ''mssql+pyodbc:///?odbc_connect={}''.format(quoted)
engine = create_engine(new_con)
@event.listens_for(engine, ''before_cursor_execute'')
def receive_before_cursor_execute(conn, cursor, statement, params, context, executemany):
print("FUNC call")
if executemany:
cursor.fast_executemany = True
table_name = ''fast_executemany_test''
df = pd.DataFrame(np.random.random((10**4, 100)))
s = time.time()
df.to_sql(table_name, engine, if_exists = ''replace'', chunksize = None)
print(time.time() - s)
Basado en los comentarios a continuación, quería tomarme un tiempo para explicar algunas limitaciones sobre la implementación de pandas to_sql
y la forma en que se maneja la consulta. Hay 2 cosas que pueden hacer que se MemoryError
el MemoryError
afaik:
1) Suponiendo que estás escribiendo en un almacenamiento de SQL remoto. Cuando intenta escribir un DataFrame de pandas grande con el método to_sql
, convierte todo el marco de datos en una lista de valores. Esta transformación ocupa mucho más RAM que el DataFrame original (además de eso, ya que el DataFrame antiguo todavía está presente en la RAM). Esta lista se proporciona a la última llamada de executemany
para su conector ODBC. Creo que el conector ODBC tiene algunos problemas para manejar consultas tan grandes. Una forma de resolver esto es proporcionar al método to_sql
un argumento chunksize (10 ** 5 parece estar alrededor de lo óptimo, lo que da aproximadamente 600 mbit / s (!) Velocidades en una aplicación MSSQL Storage de 2 CPU 7GB ram de Azure; recomiendo Azure por cierto). Por lo tanto, la primera limitación, que es el tamaño de la consulta, se puede eludir proporcionando un argumento de chunksize
. Sin embargo, esto no le permitirá escribir un marco de datos del tamaño de 10 ** 7 o mayor, (al menos no en la máquina virtual con la que estoy trabajando, que tiene ~ 55 GB de RAM), siendo el número 2.
Esto se puede sortear dividiendo el DataFrame con np.split
(siendo 10 ** 6 trozos de DataFrame de tamaño). Estos se pueden escribir de forma iterativa. Intentaré realizar una solicitud de extracción cuando tenga una solución preparada para el método to_sql
en el núcleo de los pandas, por lo que no tendrá que hacer esto antes de la to_sql
. De todos modos, terminé escribiendo una función similar (no llave en mano) a la siguiente:
import pandas as pd
import numpy as np
def write_df_to_sql(df, **kwargs):
chunks = np.split(df, df.shape()[0] / 10**6)
for chunk in chunks:
chunk.to_sql(**kwargs)
return True
Un ejemplo más completo del fragmento de código anterior se puede ver aquí: https://gitlab.com/timelord/timelord/blob/master/timelord/utils/connector.py
Es una clase que escribí que incorpora el parche y facilita algunos de los gastos generales necesarios que vienen con la configuración de conexiones con SQL. Todavía tengo que escribir alguna documentación. También estaba planeando contribuir con el parche a los pandas en sí, pero aún no he encontrado una buena manera de hacerlo.
Espero que esto ayude.