python sqlalchemy pyodbc pandas-to-sql

python - Acelerando pandas.DataFrame.to_sql con fast_executemany de pyODBC



sqlalchemy pandas-to-sql (8)

Rendimiento de SQL Server INSERT: pyodbc vs. turbodbc

Cuando se utiliza to_sql para cargar un DataFrame de pandas en SQL Server, turbodbc definitivamente será más rápido que pyodbc sin fast_executemany . Sin embargo, con fast_executemany habilitado para pyodbc, ambos enfoques producen esencialmente el mismo rendimiento.

Entornos de prueba:

[venv1_pyodbc]
pyodbc 2.0.25

[venv2_turbodbc]
turbodbc 3.0.0
sqlalchemy-turbodbc 0.1.0

[común a ambos]
Python 3.6.4 de 64 bits en Windows
SQLAlchemy 1.3.0b1
pandas 0.23.4
numpy 1.15.4

Código de prueba:

# for pyodbc engine = create_engine(''mssql+pyodbc://sa:whatever@SQL_panorama'', fast_executemany=True) # for turbodbc # engine = create_engine(''mssql+turbodbc://sa:whatever@SQL_panorama'') # test data num_rows = 10000 num_cols = 100 df = pd.DataFrame( [[f''row{x:04}col{y:03}'' for y in range(num_cols)] for x in range(num_rows)], columns=[f''col{y:03}'' for y in range(num_cols)] ) t0 = time.time() df.to_sql("sqlalchemy_test", engine, if_exists=''replace'', index=None) print(f"pandas wrote {num_rows} rows in {(time.time() - t0):0.1f} seconds")

Las pruebas se realizaron doce (12) veces para cada entorno, descartando el mejor y el peor momento para cada uno. Resultados (en segundos):

rank pyodbc turbodbc ---- ------ -------- 1 22.8 27.5 2 23.4 28.1 3 24.6 28.2 4 25.2 28.5 5 25.7 29.3 6 26.9 29.9 7 27.0 31.4 8 30.1 32.1 9 33.6 32.5 10 39.8 32.9 ---- ------ -------- average 27.9 30.0

Me gustaría enviar un pandas.DataFrame grande a un servidor remoto que ejecute MS SQL. La forma en que lo hago ahora es mediante la conversión de un objeto data_frame a una lista de tuplas y luego enviarlo con la función executemany() de executemany() . Es algo parecido a esto:

import pyodbc as pdb list_of_tuples = convert_df(data_frame) connection = pdb.connect(cnxn_str) cursor = connection.cursor() cursor.fast_executemany = True cursor.executemany(sql_statement, list_of_tuples) connection.commit() cursor.close() connection.close()

Entonces comencé a preguntarme si las cosas pueden acelerarse (o al menos ser más legibles) utilizando el método data_frame.to_sql() . Se me ha ocurrido la siguiente solución:

import sqlalchemy as sa engine = sa.create_engine("mssql+pyodbc:///?odbc_connect=%s" % cnxn_str) data_frame.to_sql(table_name, engine, index=False)

Ahora el código es más legible, pero la carga es al menos 150 veces más lenta ...

¿Hay alguna forma de fast_executemany la fast_executemany cuando se usa SQLAlchemy?

Estoy usando pandas-0.20.3, pyODBC-4.0.21 y sqlalchemy-1.1.13.


Después de contactar a los desarrolladores de SQLAlchemy, surgió una manera de resolver este problema. Muchas gracias a ellos por el gran trabajo!

Uno tiene que usar un evento de ejecución del cursor y verificar si se ha executemany indicador de executemany . Si ese es el caso, fast_executemany opción fast_executemany . Por ejemplo:

from sqlalchemy import event @event.listens_for(engine, ''before_cursor_execute'') def receive_before_cursor_execute(conn, cursor, statement, params, context, executemany): if executemany: cursor.fast_executemany = True

Más información sobre eventos de ejecución se puede encontrar here .

ACTUALIZACIÓN: Se fast_executemany soporte para fast_executemany de pyodbc en SQLAlchemy 1.3.0 , por lo que este hackeo ya no es necesario.


Me encontré con el mismo problema pero usando PostgreSQL. Ahora simplemente lanzan la versión 0.24.0 de pandas y hay un nuevo parámetro en la función to_sql llamado method que resolvió mi problema.

from sqlalchemy import create_engine engine = create_engine(your_options) data_frame.to_sql(table_name, engine, method="multi")

La velocidad de carga es 100 veces más rápida para mí. También recomiendo establecer el parámetro chunksize si va a enviar muchos datos.


Parece que los Pandas 0.23.0 y 0.24.0 usan inserciones de múltiples valores con PyODBC, lo que impide que la ejecución rápida ayude: se emite una única INSERT ... VALUES ... por fragmento. Los fragmentos de inserción de valores múltiples son una mejora con respecto al antiguo valor predeterminado de ejecución lenta, pero al menos en las pruebas simples todavía prevalece el método de ejecución rápida, por no mencionar que no es necesario chunksize cálculos de chunksize manual, como se requiere con las inserciones de valores múltiples. Se puede forzar el comportamiento anterior mediante la aplicación de pareo, si no se proporciona una opción de configuración en el futuro:

import pandas.io.sql def insert_statement(self, data, conn): return self.table.insert(), data pandas.io.sql.SQLTable.insert_statement = insert_statement

El futuro está aquí y, al menos en la rama master , el método de inserción se puede controlar mediante el method= argumento de palabra clave method= de to_sql() . El valor predeterminado es None , lo que fuerza el método de ejecución. Al pasar el method=''multi'' obtiene el uso de la inserción de valores múltiples. Incluso se puede utilizar para implementar enfoques específicos de DBMS, como COPY Postgresql.


Según lo señalado por @Pylander

¡Turbodbc es la mejor opción para la ingesta de datos, por mucho!

Me emocioné tanto que escribí un ''blog'' en mi github y medio: por favor, visite https://medium.com/@erickfis/etl-process-with-turbodbc-1d19ed71510e

para un ejemplo de trabajo y comparación con pandas.to_sql

Larga historia corta,

con turbodbc tengo 10000 líneas (77 columnas) en 3 segundos

con pandas.to_sql Tengo las mismas 10000 líneas (77 columnas) en 198 segundos ...

Y esto es lo que estoy haciendo con todo detalle.

Las importaciones:

import sqlalchemy import pandas as pd import numpy as np import turbodbc import time

Cargue y trate algunos datos: sustituya mi sample.pkl por el suyo:

df = pd.read_pickle(''sample.pkl'') df.columns = df.columns.str.strip() # remove white spaces around column names df = df.applymap(str.strip) # remove white spaces around values df = df.replace('''', np.nan) # map nans, to drop NAs rows and columns later df = df.dropna(how=''all'', axis=0) # remove rows containing only NAs df = df.dropna(how=''all'', axis=1) # remove columns containing only NAs df = df.replace(np.nan, ''NA'') # turbodbc hates null values...

Crea la tabla usando sqlAlchemy

Desafortunadamente, turbodbc requiere mucha sobrecarga con una gran cantidad de trabajo manual de SQL, para crear las tablas y para insertar datos en ellas.

Afortunadamente, Python es pura alegría y podemos automatizar este proceso de escritura de código SQL.

El primer paso es crear la tabla que recibirá nuestros datos. Sin embargo, crear la tabla manualmente escribiendo el código SQL puede ser problemático si su tabla tiene más de unas pocas columnas. En mi caso, muy a menudo las tablas tienen 240 columnas!

Aquí es donde sqlAlchemy y pandas aún pueden ayudarnos: los pandas son malos para escribir un gran número de filas (10000 en este ejemplo), pero ¿qué pasa con solo 6 filas, la cabecera de la tabla? De esta manera, automatizamos el proceso de creación de las tablas.

Crear conexión sqlAlchemy:

mydb = ''someDB'' def make_con(db): """Connect to a specified db.""" database_connection = sqlalchemy.create_engine( ''mssql+pymssql://{0}:{1}@{2}/{3}''.format( myuser, mypassword, myhost, db ) ) return database_connection pd_connection = make_con(mydb)

Crear tabla en SQL Server

Usando pandas + sqlAlchemy, pero solo para preparar espacio para turbodbc como se mencionó anteriormente. Tenga en cuenta que df.head () aquí: estamos utilizando pandas + sqlAlchemy para insertar solo 6 filas de nuestros datos. Esto se ejecutará bastante rápido y se está haciendo para automatizar la creación de la tabla.

table = ''testing'' df.head().to_sql(table, con=pd_connection, index=False)

Ahora que la mesa ya está en su lugar, seamos serios aquí.

Conexión Turbodbc:

def turbo_conn(mydb): """Connect to a specified db - turbo.""" database_connection = turbodbc.connect( driver=''ODBC Driver 17 for SQL Server'', server=myhost, database=mydb, uid=myuser, pwd=mypassword ) return database_connection

Preparando comandos de sql y datos para turbodbc. Vamos a automatizar esta creación de código siendo creativo:

def turbo_write(mydb, df, table): """Use turbodbc to insert data into sql.""" start = time.time() # preparing columns colunas = ''('' colunas += '', ''.join(df.columns) colunas += '')'' # preparing value place holders val_place_holder = [''?'' for col in df.columns] sql_val = ''('' sql_val += '', ''.join(val_place_holder) sql_val += '')'' # writing sql query for turbodbc sql = f""" INSERT INTO {mydb}.dbo.{table} {colunas} VALUES {sql_val} """ # writing array of values for turbodbc valores_df = [df[col].values for col in df.columns] # cleans the previous head insert with connection.cursor() as cursor: cursor.execute(f"delete from {mydb}.dbo.{table}") connection.commit() # inserts data, for real with connection.cursor() as cursor: try: cursor.executemanycolumns(sql, valores_df) connection.commit() except Exception: connection.rollback() print(''something went wrong'') stop = time.time() - start return print(f''finished in {stop} seconds'')

Escribiendo datos usando turbodbc - Tengo 10000 líneas (77 columnas) en 3 segundos:

turbo_write(mydb, df.sample(10000), table)

Comparación del método de pandas: tengo las mismas 10000 líneas (77 columnas) en 198 segundos ...

table = ''pd_testing'' def pandas_comparisson(df, table): """Load data using pandas.""" start = time.time() df.to_sql(table, con=pd_connection, index=False) stop = time.time() - start return print(f''finished in {stop} seconds'') pandas_comparisson(df.sample(10000), table)

Ambiente y condiciones

Python 3.6.7 :: Anaconda, Inc. TURBODBC version ‘3.0.0’ sqlAlchemy version ‘1.2.12’ pandas version ‘0.23.4’ Microsoft SQL Server 2014 user with bulk operations privileges

Consulte https://erickfis.github.io/loose-code/ para ver las actualizaciones de este código.


Solo quería añadir a la respuesta de @JK.

Si está utilizando este enfoque:

@event.listens_for(engine, ''before_cursor_execute'') def receive_before_cursor_execute(conn, cursor, statement, params, context, executemany): if executemany: cursor.fast_executemany = True

Y está recibiendo este error:

"sqlalchemy.exc.DBAPIError: (pyodbc.Error) (''HY010'', ''[HY010] [Microsoft] [SQL Server Native Client 11.0] Error de secuencia de función (0) (SQLParamData)'') [SQL: ''INSERT INTO .. . (...) VALORES (?,?) ''] [Parámetros: ((..., ...), (..., ...)] (Antecedentes de este error en: http://sqlalche.me/e/dbapi ) "

Codifique sus valores de cadena de esta manera: ''yourStringValue''.encode(''ascii'')

Esto resolverá tu problema.


Solo quería publicar este ejemplo completo como una opción adicional de alto rendimiento para aquellos que pueden usar la nueva biblioteca turbodbc: http://turbodbc.readthedocs.io/en/latest/

Claramente hay muchas opciones en el flujo entre pandas .to_sql (), activando fast_executemany a través de sqlalchemy, usando pyodbc directamente con tuplas / listas / etc, o incluso intentando BULK UPLOAD con archivos planos.

Con suerte, lo siguiente podría hacer la vida más placentera a medida que la funcionalidad evolucione en el proyecto pandas actual o incluya algo como la integración de turbodbc en el futuro.

import pandas as pd import numpy as np from turbodbc import connect, make_options from io import StringIO test_data = ''''''id,transaction_dt,units,measures 1,2018-01-01,4,30.5 1,2018-01-03,4,26.3 2,2018-01-01,3,12.7 2,2018-01-03,3,8.8'''''' df_test = pd.read_csv(StringIO(test_data), sep='','') df_test[''transaction_dt''] = pd.to_datetime(df_test[''transaction_dt'']) options = make_options(parameter_sets_to_buffer=1000) conn = connect(driver=''{SQL Server}'', server=''server_nm'', database=''db_nm'', turbodbc_options=options) test_query = ''''''DROP TABLE IF EXISTS [db_name].[schema].[test] CREATE TABLE [db_name].[schema].[test] ( id int NULL, transaction_dt datetime NULL, units int NULL, measures float NULL ) INSERT INTO [db_name].[schema].[test] (id,transaction_dt,units,measures) VALUES (?,?,?,?) '''''' cursor.executemanycolumns(test_query, [df_test[''id''].values, df_test[''transaction_dt''].values, df_test[''units''].values, df_test[''measures''].values]

turbodbc debería ser MUY rápido en muchos casos de uso (particularmente con matrices numpy). Observe lo sencillo que es pasar las matrices de números subyacentes de las columnas del marco de datos como parámetros a la consulta directamente. También creo que esto ayuda a prevenir la creación de objetos intermedios que aumentan el consumo de memoria. Espero que esto sea de ayuda!


EDITAR (08/03/2019): Gord Thompson comentó a continuación con buenas noticias de los registros de actualización de sqlalchemy: Desde SQLAlchemy 1.3.0, publicado el 2019-03-04, sqlalchemy ahora admite engine = create_engine(sqlalchemy_url, fast_executemany=True) para mssql+pyodbc dialecto mssql+pyodbc . Es decir, ya no es necesario definir una función y usar @event.listens_for(engine, ''before_cursor_execute'') significa que la función que se muestra a continuación se puede eliminar y solo se debe establecer el indicador en la declaración create_engine, y aún se mantiene la velocidad arriba.

Mensaje original:

Acaba de hacer una cuenta para publicar esto. Quería comentar debajo del hilo anterior ya que es un seguimiento de la respuesta ya provista. La solución anterior me funcionó con el controlador SQL de la Versión 17 en una escritura de almacenamiento Microsft SQL desde una instalación basada en Ubuntu.

El código completo que utilicé para acelerar las cosas significativamente (hablando> 100 veces la velocidad) está debajo. Este es un fragmento de código llave en mano siempre que altere la cadena de conexión con sus detalles relevantes. Al cartel de arriba, muchas gracias por la solución, ya que estaba buscando bastante tiempo para esto.

import pandas as pd import numpy as np import time from sqlalchemy import create_engine, event from urllib.parse import quote_plus conn = "DRIVER={ODBC Driver 17 for SQL Server};SERVER=IP_ADDRESS;DATABASE=DataLake;UID=USER;PWD=PASS" quoted = quote_plus(conn) new_con = ''mssql+pyodbc:///?odbc_connect={}''.format(quoted) engine = create_engine(new_con) @event.listens_for(engine, ''before_cursor_execute'') def receive_before_cursor_execute(conn, cursor, statement, params, context, executemany): print("FUNC call") if executemany: cursor.fast_executemany = True table_name = ''fast_executemany_test'' df = pd.DataFrame(np.random.random((10**4, 100))) s = time.time() df.to_sql(table_name, engine, if_exists = ''replace'', chunksize = None) print(time.time() - s)

Basado en los comentarios a continuación, quería tomarme un tiempo para explicar algunas limitaciones sobre la implementación de pandas to_sql y la forma en que se maneja la consulta. Hay 2 cosas que pueden hacer que se MemoryError el MemoryError afaik:

1) Suponiendo que estás escribiendo en un almacenamiento de SQL remoto. Cuando intenta escribir un DataFrame de pandas grande con el método to_sql , convierte todo el marco de datos en una lista de valores. Esta transformación ocupa mucho más RAM que el DataFrame original (además de eso, ya que el DataFrame antiguo todavía está presente en la RAM). Esta lista se proporciona a la última llamada de executemany para su conector ODBC. Creo que el conector ODBC tiene algunos problemas para manejar consultas tan grandes. Una forma de resolver esto es proporcionar al método to_sql un argumento chunksize (10 ** 5 parece estar alrededor de lo óptimo, lo que da aproximadamente 600 mbit / s (!) Velocidades en una aplicación MSSQL Storage de 2 CPU 7GB ram de Azure; recomiendo Azure por cierto). Por lo tanto, la primera limitación, que es el tamaño de la consulta, se puede eludir proporcionando un argumento de chunksize . Sin embargo, esto no le permitirá escribir un marco de datos del tamaño de 10 ** 7 o mayor, (al menos no en la máquina virtual con la que estoy trabajando, que tiene ~ 55 GB de RAM), siendo el número 2.

Esto se puede sortear dividiendo el DataFrame con np.split (siendo 10 ** 6 trozos de DataFrame de tamaño). Estos se pueden escribir de forma iterativa. Intentaré realizar una solicitud de extracción cuando tenga una solución preparada para el método to_sql en el núcleo de los pandas, por lo que no tendrá que hacer esto antes de la to_sql . De todos modos, terminé escribiendo una función similar (no llave en mano) a la siguiente:

import pandas as pd import numpy as np def write_df_to_sql(df, **kwargs): chunks = np.split(df, df.shape()[0] / 10**6) for chunk in chunks: chunk.to_sql(**kwargs) return True

Un ejemplo más completo del fragmento de código anterior se puede ver aquí: https://gitlab.com/timelord/timelord/blob/master/timelord/utils/connector.py

Es una clase que escribí que incorpora el parche y facilita algunos de los gastos generales necesarios que vienen con la configuración de conexiones con SQL. Todavía tengo que escribir alguna documentación. También estaba planeando contribuir con el parche a los pandas en sí, pero aún no he encontrado una buena manera de hacerlo.

Espero que esto ayude.