python - nueva - Bulk Insertar un marco de datos de Pandas usando SQLAlchemy
nueva columna en pandas (6)
Tengo algunos marcos de datos de pandas bastante grandes y me gustaría usar los nuevos mapeos de SQL para subirlos a un servidor de Microsoft SQL a través de la Alquimia de SQL. El método pandas.to_sql, aunque agradable, es lento.
Estoy teniendo problemas para escribir el código ...
Me gustaría poder pasar esta función un marco de datos de pandas al que llamo table
, un nombre de esquema al que llamo schema
y un nombre de tabla al que llamo name
. Idealmente, la función 1.) eliminará la tabla si ya existe. 2.) crear una nueva tabla 3.) crear un asignador y 4.) realizar una inserción masiva utilizando el asignador y los datos de pandas. Estoy atrapado en la parte 3.
Aquí está mi código (ciertamente áspero) Estoy teniendo problemas para conseguir que la función del asignador funcione con mis claves principales. Realmente no necesito claves primarias pero la función del asignador lo requiere.
Gracias por las ideas.
from sqlalchemy import create_engine Table, Column, MetaData
from sqlalchemy.orm import mapper, create_session
from sqlalchemy.ext.declarative import declarative_base
from pandas.io.sql import SQLTable, SQLDatabase
def bulk_upload(table, schema, name):
e = create_engine(''mssql+pyodbc://MYDB'')
s = create_session(bind=e)
m = MetaData(bind=e,reflect=True,schema=schema)
Base = declarative_base(bind=e,metadata=m)
t = Table(name,m)
m.remove(t)
t.drop(checkfirst=True)
sqld = SQLDatabase(e, schema=schema,meta=m)
sqlt = SQLTable(name, sqld, table).table
sqlt.metadata = m
m.create_all(bind=e,tables=[sqlt])
class MyClass(Base):
return
mapper(MyClass, sqlt)
s.bulk_insert_mappings(MyClass, table.to_dict(orient=''records''))
return
Basado en las respuestas de @ansonw:
def to_sql(engine, df, table, if_exists=''fail'', sep=''/t'', encoding=''utf8''):
# Create Table
df[:0].to_sql(table, engine, if_exists=if_exists)
# Prepare data
output = cStringIO.StringIO()
df.to_csv(output, sep=sep, header=False, encoding=encoding)
output.seek(0)
# Insert data
connection = engine.raw_connection()
cursor = connection.cursor()
cursor.copy_from(output, table, sep=sep, null='''')
connection.commit()
cursor.close()
Inserto 200000 líneas en 5 segundos en lugar de 4 minutos
Como se trata de una gran carga de trabajo de E / S, también puede utilizar el módulo de subprocesamiento de python mediante multiprocessing.dummy . Esto aceleró las cosas para mí:
import math
from multiprocessing.dummy import Pool as ThreadPool
...
def insert_df(df, *args, **kwargs):
nworkers = 4
chunksize = math.floor(df.shape[0] / nworkers)
chunks = [(chunksize * i, (chunksize * i) + chunksize) for i in range(nworkers)]
chunks.append((chunksize * nworkers, df.shape[0]))
pool = ThreadPool(nworkers)
def worker(chunk):
i, j = chunk
df.iloc[i:j, :].to_sql(*args, **kwargs)
pool.map(worker, chunks)
pool.close()
pool.join()
....
insert_df(df, "foo_bar", engine, if_exists=''append'')
Esto podría haber sido respondido para entonces, pero encontré la solución al recopilar diferentes respuestas en este sitio y alinearme con el documento de SQLAlchemy.
- La tabla ya debe existir en db1; con un índice configurado con auto_increment activado.
- La clase actual debe alinearse con el marco de datos importado en el CSV y la tabla en el db1.
Espero que esto ayude a quienquiera que venga aquí y quiera mezclar Panda y SQLAlchemy de una manera rápida.
from urllib import quote_plus as urlquote
import sqlalchemy
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, Numeric
from sqlalchemy.orm import sessionmaker
import pandas as pd
# Set up of the engine to connect to the database
# the urlquote is used for passing the password which might contain special characters such as "/"
engine = create_engine(''mysql://root:%s@localhost/db1'' % urlquote(''weirdPassword*withsp€cialcharacters''), echo=False)
conn = engine.connect()
Base = declarative_base()
#Declaration of the class in order to write into the database. This structure is standard and should align with SQLAlchemy''s doc.
class Current(Base):
__tablename__ = ''tableName''
id = Column(Integer, primary_key=True)
Date = Column(String(500))
Type = Column(String(500))
Value = Column(Numeric())
def __repr__(self):
return "(id=''%s'', Date=''%s'', Type=''%s'', Value=''%s'')" % (self.id, self.Date, self.Type, self.Value)
# Set up of the table in db and the file to import
fileToRead = ''file.csv''
tableToWriteTo = ''tableName''
# Panda to create a lovely dataframe
df_to_be_written = pd.read_csv(fileToRead)
# The orient=''records'' is the key of this, it allows to align with the format mentioned in the doc to insert in bulks.
listToWrite = df_to_be_written.to_dict(orient=''records'')
metadata = sqlalchemy.schema.MetaData(bind=engine,reflect=True)
table = sqlalchemy.Table(tableToWriteTo, metadata, autoload=True)
# Open the session
Session = sessionmaker(bind=engine)
session = Session()
# Inser the dataframe into the database in one bulk
conn.execute(table.insert(), listToWrite)
# Commit the changes
session.commit()
# Close the session
session.close()
Me encontré con un problema similar con pd.to_sql que tardaba horas en cargar datos. El código masivo a continuación insertó los mismos datos en unos pocos segundos.
from sqlalchemy import create_engine
import psycopg2 as pg
#load python script that batch loads pandas df to sql
import cStringIO
address = ''postgresql://<username>:<pswd>@<host>:<port>/<database>''
engine = create_engine(address)
connection = engine.raw_connection()
cursor = connection.cursor()
#df is the dataframe containing an index and the columns "Event" and "Day"
#create Index column to use as primary key
df.reset_index(inplace=True)
df.rename(columns={''index'':''Index''}, inplace =True)
#create the table but first drop if it already exists
command = ''''''DROP TABLE IF EXISTS localytics_app2;
CREATE TABLE localytics_app2
(
"Index" serial primary key,
"Event" text,
"Day" timestamp without time zone,
);''''''
cursor.execute(command)
connection.commit()
#stream the data using ''to_csv'' and StringIO(); then use sql''s ''copy_from'' function
output = cStringIO.StringIO()
#ignore the index
df.to_csv(output, sep=''/t'', header=False, index=False)
#jump to start of stream
output.seek(0)
contents = output.getvalue()
cur = connection.cursor()
#null values become ''''
cur.copy_from(output, ''localytics_app2'', null="")
connection.commit()
cur.close()
Mi solución específica de postgres a continuación crea automáticamente la tabla de la base de datos utilizando el marco de datos de pandas y realiza una inserción masiva rápida utilizando la COPY my_table FROM ...
import io
import pandas as pd
from sqlalchemy import create_engine
def write_to_table(df, db_engine, schema, table_name, if_exists=''fail''):
string_data_io = io.StringIO()
df.to_csv(string_data_io, sep=''|'', index=False)
pd_sql_engine = pd.io.sql.pandasSQL_builder(db_engine, schema=schema)
table = pd.io.sql.SQLTable(table_name, pd_sql_engine, frame=df,
index=False, if_exists=if_exists, schema=schema)
table.create()
string_data_io.seek(0)
string_data_io.readline() # remove header
with db_engine.connect() as connection:
with connection.connection.cursor() as cursor:
copy_cmd = "COPY %s.%s FROM STDIN HEADER DELIMITER ''|'' CSV" % (schema, table_name)
cursor.copy_expert(copy_cmd, string_data_io)
connection.connection.commit()
Para cualquier persona que enfrente este problema y tenga la base de datos de destino como Redshift, tenga en cuenta que Redshift no implementa el conjunto completo de comandos de Postgres, por lo que algunas de las respuestas que usan COPY FROM
o copy_from()
Postgres no funcionarán. psycopg2.ProgrammingError: error de sintaxis en o cerca del error "estándar" al intentar copiar desde un desplazamiento al rojo
La solución para acelerar los INSERTs a Redshift es usar una ingesta de archivos u Odo.
Referencia:
Acerca de Odo http://odo.pydata.org/en/latest/perf.html
Odo con desplazamiento al rojo
https://github.com/blaze/odo/blob/master/docs/source/aws.rst
Redshift COPY (desde archivo S3)
https://docs.aws.amazon.com/redshift/latest/dg/r_COPY.html