python postgresql bulkinsert binary-data psycopg2

python - Utilice la tabla de copia binaria FROM con psycopg2



postgresql bulkinsert (2)

Tengo decenas de millones de filas para transferir desde archivos de matriz multidimensional a una base de datos PostgreSQL. Mis herramientas son Python y psycopg2. La forma más eficiente de realizar una gran parte de la información de los datos mediante el uso de copy_from . Sin embargo, mis datos son en su mayoría números de punto flotante de 32 bits (real o float4), así que prefiero no convertir de real → text → real. Aquí hay una base de datos de ejemplo DDL:

CREATE TABLE num_data ( id serial PRIMARY KEY NOT NULL, node integer NOT NULL, ts smallint NOT NULL, val1 real, val2 double precision );

Aquí es donde estoy con Python usando cadenas (texto):

# Just one row of data num_row = [23253, 342, -15.336734, 2494627.949375] import psycopg2 # Python3: from io import StringIO # Python2, use: from cStringIO import StringIO conn = psycopg2.connect("dbname=mydb user=postgres") curs = conn.cursor() # Convert floating point numbers to text, write to COPY input cpy = StringIO() cpy.write(''/t''.join([repr(x) for x in num_row]) + ''/n'') # Insert data; database converts text back to floating point numbers cpy.seek(0) curs.copy_from(cpy, ''num_data'', columns=(''node'', ''ts'', ''val1'', ''val2'')) conn.commit()

¿Hay un equivalente que podría funcionar utilizando un modo binario? Es decir, mantener los números de punto flotante en binario? Esto no solo preservaría la precisión del punto flotante, sino que podría ser más rápido.

(Nota: para ver la misma precisión que en el ejemplo, use SET extra_float_digits=''2'' )


Aquí está el equivalente binario de COPY FROM para Python 3:

from io import BytesIO from struct import pack import psycopg2 # Two rows of data; "id" is not in the upstream data source # Columns: node, ts, val1, val2 data = [(23253, 342, -15.336734, 2494627.949375), (23256, 348, 43.23524, 2494827.949375)] conn = psycopg2.connect("dbname=mydb user=postgres") curs = conn.cursor() # Determine starting value for sequence curs.execute("SELECT nextval(''num_data_id_seq'')") id_seq = curs.fetchone()[0] # Make a binary file object for COPY FROM cpy = BytesIO() # 11-byte signature, no flags, no header extension cpy.write(pack(''!11sii'', b''PGCOPY/n/377/r/n/0'', 0, 0)) # Columns: id, node, ts, val1, val2 # Zip: (column position, format, size) row_format = list(zip(range(-1, 4), (''i'', ''i'', ''h'', ''f'', ''d''), ( 4, 4, 2, 4, 8 ))) for row in data: # Number of columns/fields (always 5) cpy.write(pack(''!h'', 5)) for col, fmt, size in row_format: value = (id_seq if col == -1 else row[col]) cpy.write(pack(''!i'' + fmt, size, value)) id_seq += 1 # manually increment sequence outside of database # File trailer cpy.write(pack(''!h'', -1)) # Copy data to database cpy.seek(0) curs.copy_expert("COPY num_data FROM STDIN WITH BINARY", cpy) # Update sequence on database curs.execute("SELECT setval(''num_data_id_seq'', %s, false)", (id_seq,)) conn.commit()

Actualizar

Reescribí el enfoque anterior para escribir los archivos para COPY. Mis datos en Python están en matrices NumPy, por lo que tiene sentido usarlos. Aquí hay algunos data ejemplo con 1M filas, 7 columnas:

import psycopg2 import numpy as np from struct import pack from io import BytesIO from datetime import datetime conn = psycopg2.connect("dbname=mydb user=postgres") curs = conn.cursor() # NumPy record array shape = (7, 2000, 500) print(''Generating data with %i rows, %i columns'' % (shape[1]*shape[2], shape[0])) dtype = ([(''id'', ''i4''), (''node'', ''i4''), (''ts'', ''i2'')] + [(''s'' + str(x), ''f4'') for x in range(shape[0])]) data = np.empty(shape[1]*shape[2], dtype) data[''id''] = np.arange(shape[1]*shape[2]) + 1 data[''node''] = np.tile(np.arange(shape[1]) + 1, shape[2]) data[''ts''] = np.repeat(np.arange(shape[2]) + 1, shape[1]) data[''s0''] = np.random.rand(shape[1]*shape[2]) * 100 prv = ''s0'' for nxt in data.dtype.names[4:]: data[nxt] = data[prv] + np.random.rand(shape[1]*shape[2]) * 10 prv = nxt

En mi base de datos, tengo dos tablas que parecen:

CREATE TABLE num_data_binary ( id integer PRIMARY KEY, node integer NOT NULL, ts smallint NOT NULL, s0 real, s1 real, s2 real, s3 real, s4 real, s5 real, s6 real ) WITH (OIDS=FALSE);

y otra tabla similar llamada num_data_text .

Aquí hay algunas funciones de ayuda sencillas para preparar los datos para COPY (formatos de texto y binarios) utilizando la información en la matriz de registros NumPy:

def prepare_text(dat): cpy = BytesIO() for row in dat: cpy.write(''/t''.join([repr(x) for x in row]) + ''/n'') return(cpy) def prepare_binary(dat): pgcopy_dtype = [(''num_fields'',''>i2'')] for field, dtype in dat.dtype.descr: pgcopy_dtype += [(field + ''_length'', ''>i4''), (field, dtype.replace(''<'', ''>''))] pgcopy = np.empty(dat.shape, pgcopy_dtype) pgcopy[''num_fields''] = len(dat.dtype) for i in range(len(dat.dtype)): field = dat.dtype.names[i] pgcopy[field + ''_length''] = dat.dtype[i].alignment pgcopy[field] = dat[field] cpy = BytesIO() cpy.write(pack(''!11sii'', b''PGCOPY/n/377/r/n/0'', 0, 0)) cpy.write(pgcopy.tostring()) # all rows cpy.write(pack(''!h'', -1)) # file trailer return(cpy)

Así es como estoy usando las funciones de ayuda para comparar los dos métodos de formato COPY:

def time_pgcopy(dat, table, binary): print(''Processing copy object for '' + table) tstart = datetime.now() if binary: cpy = prepare_binary(dat) else: # text cpy = prepare_text(dat) tendw = datetime.now() print(''Copy object prepared in '' + str(tendw - tstart) + ''; '' + str(cpy.tell()) + '' bytes; transfering to database'') cpy.seek(0) if binary: curs.copy_expert(''COPY '' + table + '' FROM STDIN WITH BINARY'', cpy) else: # text curs.copy_from(cpy, table) conn.commit() tend = datetime.now() print(''Database copy time: '' + str(tend - tendw)) print('' Total time: '' + str(tend - tstart)) return time_pgcopy(data, ''num_data_text'', binary=False) time_pgcopy(data, ''num_data_binary'', binary=True)

Aquí está la salida de los dos time_pgcopy comandos time_pgcopy :

Processing copy object for num_data_text Copy object prepared in 0:01:15.288695; 84355016 bytes; transfering to database Database copy time: 0:00:37.929166 Total time: 0:01:53.217861 Processing copy object for num_data_binary Copy object prepared in 0:00:01.296143; 80000021 bytes; transfering to database Database copy time: 0:00:23.325952 Total time: 0:00:24.622095

Por lo tanto, los pasos NumPy → file y file → database son mucho más rápidos con el enfoque binario. La diferencia obvia es cómo Python prepara el archivo COPY, que es muy lento para el texto. En términos generales, el formato binario se carga en la base de datos en 2/3 del tiempo como el formato de texto para este esquema.

Por último, comparé los valores en ambas tablas dentro de la base de datos para ver si los números eran diferentes. Alrededor de 1.46% de las filas tienen valores diferentes para la columna s0 , y esta fracción aumenta a 6.17% para s6 (probablemente relacionado con el método aleatorio que usé). Las diferencias absolutas distintas de cero entre todos los valores flotantes de 70M y 32 bits oscilan entre 9.3132257e-010 y 7.6293945e-006. Estas pequeñas diferencias entre el texto y los métodos de carga binarios se deben a la pérdida de precisión de las conversiones float → text → float requeridas para el método de formato de texto.


Here está mi versión. Basado en la versión de Mike.

Es muy ad-hoc pero hay dos ventajas:

  • Espere el generador y actúe como una corriente sobrecargando readline
  • Ejemplo de cómo escribir en formato binario hstore .