panda nueva libreria insertar funciones filtrar documentacion datos crear como columna python pandas sqlalchemy

python - nueva - Bulk Insertar un marco de datos de Pandas usando SQLAlchemy



nueva columna en pandas (6)

Tengo algunos marcos de datos de pandas bastante grandes y me gustaría usar los nuevos mapeos de SQL para subirlos a un servidor de Microsoft SQL a través de la Alquimia de SQL. El método pandas.to_sql, aunque agradable, es lento.

Estoy teniendo problemas para escribir el código ...

Me gustaría poder pasar esta función un marco de datos de pandas al que llamo table , un nombre de esquema al que llamo schema y un nombre de tabla al que llamo name . Idealmente, la función 1.) eliminará la tabla si ya existe. 2.) crear una nueva tabla 3.) crear un asignador y 4.) realizar una inserción masiva utilizando el asignador y los datos de pandas. Estoy atrapado en la parte 3.

Aquí está mi código (ciertamente áspero) Estoy teniendo problemas para conseguir que la función del asignador funcione con mis claves principales. Realmente no necesito claves primarias pero la función del asignador lo requiere.

Gracias por las ideas.

from sqlalchemy import create_engine Table, Column, MetaData from sqlalchemy.orm import mapper, create_session from sqlalchemy.ext.declarative import declarative_base from pandas.io.sql import SQLTable, SQLDatabase def bulk_upload(table, schema, name): e = create_engine(''mssql+pyodbc://MYDB'') s = create_session(bind=e) m = MetaData(bind=e,reflect=True,schema=schema) Base = declarative_base(bind=e,metadata=m) t = Table(name,m) m.remove(t) t.drop(checkfirst=True) sqld = SQLDatabase(e, schema=schema,meta=m) sqlt = SQLTable(name, sqld, table).table sqlt.metadata = m m.create_all(bind=e,tables=[sqlt]) class MyClass(Base): return mapper(MyClass, sqlt) s.bulk_insert_mappings(MyClass, table.to_dict(orient=''records'')) return


Basado en las respuestas de @ansonw:

def to_sql(engine, df, table, if_exists=''fail'', sep=''/t'', encoding=''utf8''): # Create Table df[:0].to_sql(table, engine, if_exists=if_exists) # Prepare data output = cStringIO.StringIO() df.to_csv(output, sep=sep, header=False, encoding=encoding) output.seek(0) # Insert data connection = engine.raw_connection() cursor = connection.cursor() cursor.copy_from(output, table, sep=sep, null='''') connection.commit() cursor.close()

Inserto 200000 líneas en 5 segundos en lugar de 4 minutos


Como se trata de una gran carga de trabajo de E / S, también puede utilizar el módulo de subprocesamiento de python mediante multiprocessing.dummy . Esto aceleró las cosas para mí:

import math from multiprocessing.dummy import Pool as ThreadPool ... def insert_df(df, *args, **kwargs): nworkers = 4 chunksize = math.floor(df.shape[0] / nworkers) chunks = [(chunksize * i, (chunksize * i) + chunksize) for i in range(nworkers)] chunks.append((chunksize * nworkers, df.shape[0])) pool = ThreadPool(nworkers) def worker(chunk): i, j = chunk df.iloc[i:j, :].to_sql(*args, **kwargs) pool.map(worker, chunks) pool.close() pool.join() .... insert_df(df, "foo_bar", engine, if_exists=''append'')


Esto podría haber sido respondido para entonces, pero encontré la solución al recopilar diferentes respuestas en este sitio y alinearme con el documento de SQLAlchemy.

  1. La tabla ya debe existir en db1; con un índice configurado con auto_increment activado.
  2. La clase actual debe alinearse con el marco de datos importado en el CSV y la tabla en el db1.

Espero que esto ayude a quienquiera que venga aquí y quiera mezclar Panda y SQLAlchemy de una manera rápida.

from urllib import quote_plus as urlquote import sqlalchemy from sqlalchemy import create_engine from sqlalchemy.ext.declarative import declarative_base from sqlalchemy import Column, Integer, String, Numeric from sqlalchemy.orm import sessionmaker import pandas as pd # Set up of the engine to connect to the database # the urlquote is used for passing the password which might contain special characters such as "/" engine = create_engine(''mysql://root:%s@localhost/db1'' % urlquote(''weirdPassword*withsp€cialcharacters''), echo=False) conn = engine.connect() Base = declarative_base() #Declaration of the class in order to write into the database. This structure is standard and should align with SQLAlchemy''s doc. class Current(Base): __tablename__ = ''tableName'' id = Column(Integer, primary_key=True) Date = Column(String(500)) Type = Column(String(500)) Value = Column(Numeric()) def __repr__(self): return "(id=''%s'', Date=''%s'', Type=''%s'', Value=''%s'')" % (self.id, self.Date, self.Type, self.Value) # Set up of the table in db and the file to import fileToRead = ''file.csv'' tableToWriteTo = ''tableName'' # Panda to create a lovely dataframe df_to_be_written = pd.read_csv(fileToRead) # The orient=''records'' is the key of this, it allows to align with the format mentioned in the doc to insert in bulks. listToWrite = df_to_be_written.to_dict(orient=''records'') metadata = sqlalchemy.schema.MetaData(bind=engine,reflect=True) table = sqlalchemy.Table(tableToWriteTo, metadata, autoload=True) # Open the session Session = sessionmaker(bind=engine) session = Session() # Inser the dataframe into the database in one bulk conn.execute(table.insert(), listToWrite) # Commit the changes session.commit() # Close the session session.close()


Me encontré con un problema similar con pd.to_sql que tardaba horas en cargar datos. El código masivo a continuación insertó los mismos datos en unos pocos segundos.

from sqlalchemy import create_engine import psycopg2 as pg #load python script that batch loads pandas df to sql import cStringIO address = ''postgresql://<username>:<pswd>@<host>:<port>/<database>'' engine = create_engine(address) connection = engine.raw_connection() cursor = connection.cursor() #df is the dataframe containing an index and the columns "Event" and "Day" #create Index column to use as primary key df.reset_index(inplace=True) df.rename(columns={''index'':''Index''}, inplace =True) #create the table but first drop if it already exists command = ''''''DROP TABLE IF EXISTS localytics_app2; CREATE TABLE localytics_app2 ( "Index" serial primary key, "Event" text, "Day" timestamp without time zone, );'''''' cursor.execute(command) connection.commit() #stream the data using ''to_csv'' and StringIO(); then use sql''s ''copy_from'' function output = cStringIO.StringIO() #ignore the index df.to_csv(output, sep=''/t'', header=False, index=False) #jump to start of stream output.seek(0) contents = output.getvalue() cur = connection.cursor() #null values become '''' cur.copy_from(output, ''localytics_app2'', null="") connection.commit() cur.close()


Mi solución específica de postgres a continuación crea automáticamente la tabla de la base de datos utilizando el marco de datos de pandas y realiza una inserción masiva rápida utilizando la COPY my_table FROM ...

import io import pandas as pd from sqlalchemy import create_engine def write_to_table(df, db_engine, schema, table_name, if_exists=''fail''): string_data_io = io.StringIO() df.to_csv(string_data_io, sep=''|'', index=False) pd_sql_engine = pd.io.sql.pandasSQL_builder(db_engine, schema=schema) table = pd.io.sql.SQLTable(table_name, pd_sql_engine, frame=df, index=False, if_exists=if_exists, schema=schema) table.create() string_data_io.seek(0) string_data_io.readline() # remove header with db_engine.connect() as connection: with connection.connection.cursor() as cursor: copy_cmd = "COPY %s.%s FROM STDIN HEADER DELIMITER ''|'' CSV" % (schema, table_name) cursor.copy_expert(copy_cmd, string_data_io) connection.connection.commit()


Para cualquier persona que enfrente este problema y tenga la base de datos de destino como Redshift, tenga en cuenta que Redshift no implementa el conjunto completo de comandos de Postgres, por lo que algunas de las respuestas que usan COPY FROM o copy_from() Postgres no funcionarán. psycopg2.ProgrammingError: error de sintaxis en o cerca del error "estándar" al intentar copiar desde un desplazamiento al rojo

La solución para acelerar los INSERTs a Redshift es usar una ingesta de archivos u Odo.

Referencia:
Acerca de Odo http://odo.pydata.org/en/latest/perf.html
Odo con desplazamiento al rojo
https://github.com/blaze/odo/blob/master/docs/source/aws.rst
Redshift COPY (desde archivo S3)
https://docs.aws.amazon.com/redshift/latest/dg/r_COPY.html