python - Utilice la tabla de copia binaria FROM con psycopg2
postgresql bulkinsert (2)
Tengo decenas de millones de filas para transferir desde archivos de matriz multidimensional a una base de datos PostgreSQL. Mis herramientas son Python y psycopg2. La forma más eficiente de realizar una gran parte de la información de los datos mediante el uso de copy_from
. Sin embargo, mis datos son en su mayoría números de punto flotante de 32 bits (real o float4), así que prefiero no convertir de real → text → real. Aquí hay una base de datos de ejemplo DDL:
CREATE TABLE num_data
(
id serial PRIMARY KEY NOT NULL,
node integer NOT NULL,
ts smallint NOT NULL,
val1 real,
val2 double precision
);
Aquí es donde estoy con Python usando cadenas (texto):
# Just one row of data
num_row = [23253, 342, -15.336734, 2494627.949375]
import psycopg2
# Python3:
from io import StringIO
# Python2, use: from cStringIO import StringIO
conn = psycopg2.connect("dbname=mydb user=postgres")
curs = conn.cursor()
# Convert floating point numbers to text, write to COPY input
cpy = StringIO()
cpy.write(''/t''.join([repr(x) for x in num_row]) + ''/n'')
# Insert data; database converts text back to floating point numbers
cpy.seek(0)
curs.copy_from(cpy, ''num_data'', columns=(''node'', ''ts'', ''val1'', ''val2''))
conn.commit()
¿Hay un equivalente que podría funcionar utilizando un modo binario? Es decir, mantener los números de punto flotante en binario? Esto no solo preservaría la precisión del punto flotante, sino que podría ser más rápido.
(Nota: para ver la misma precisión que en el ejemplo, use SET extra_float_digits=''2''
)
Aquí está el equivalente binario de COPY FROM para Python 3:
from io import BytesIO
from struct import pack
import psycopg2
# Two rows of data; "id" is not in the upstream data source
# Columns: node, ts, val1, val2
data = [(23253, 342, -15.336734, 2494627.949375),
(23256, 348, 43.23524, 2494827.949375)]
conn = psycopg2.connect("dbname=mydb user=postgres")
curs = conn.cursor()
# Determine starting value for sequence
curs.execute("SELECT nextval(''num_data_id_seq'')")
id_seq = curs.fetchone()[0]
# Make a binary file object for COPY FROM
cpy = BytesIO()
# 11-byte signature, no flags, no header extension
cpy.write(pack(''!11sii'', b''PGCOPY/n/377/r/n/0'', 0, 0))
# Columns: id, node, ts, val1, val2
# Zip: (column position, format, size)
row_format = list(zip(range(-1, 4),
(''i'', ''i'', ''h'', ''f'', ''d''),
( 4, 4, 2, 4, 8 )))
for row in data:
# Number of columns/fields (always 5)
cpy.write(pack(''!h'', 5))
for col, fmt, size in row_format:
value = (id_seq if col == -1 else row[col])
cpy.write(pack(''!i'' + fmt, size, value))
id_seq += 1 # manually increment sequence outside of database
# File trailer
cpy.write(pack(''!h'', -1))
# Copy data to database
cpy.seek(0)
curs.copy_expert("COPY num_data FROM STDIN WITH BINARY", cpy)
# Update sequence on database
curs.execute("SELECT setval(''num_data_id_seq'', %s, false)", (id_seq,))
conn.commit()
Actualizar
Reescribí el enfoque anterior para escribir los archivos para COPY. Mis datos en Python están en matrices NumPy, por lo que tiene sentido usarlos. Aquí hay algunos data
ejemplo con 1M filas, 7 columnas:
import psycopg2
import numpy as np
from struct import pack
from io import BytesIO
from datetime import datetime
conn = psycopg2.connect("dbname=mydb user=postgres")
curs = conn.cursor()
# NumPy record array
shape = (7, 2000, 500)
print(''Generating data with %i rows, %i columns'' % (shape[1]*shape[2], shape[0]))
dtype = ([(''id'', ''i4''), (''node'', ''i4''), (''ts'', ''i2'')] +
[(''s'' + str(x), ''f4'') for x in range(shape[0])])
data = np.empty(shape[1]*shape[2], dtype)
data[''id''] = np.arange(shape[1]*shape[2]) + 1
data[''node''] = np.tile(np.arange(shape[1]) + 1, shape[2])
data[''ts''] = np.repeat(np.arange(shape[2]) + 1, shape[1])
data[''s0''] = np.random.rand(shape[1]*shape[2]) * 100
prv = ''s0''
for nxt in data.dtype.names[4:]:
data[nxt] = data[prv] + np.random.rand(shape[1]*shape[2]) * 10
prv = nxt
En mi base de datos, tengo dos tablas que parecen:
CREATE TABLE num_data_binary
(
id integer PRIMARY KEY,
node integer NOT NULL,
ts smallint NOT NULL,
s0 real,
s1 real,
s2 real,
s3 real,
s4 real,
s5 real,
s6 real
) WITH (OIDS=FALSE);
y otra tabla similar llamada num_data_text
.
Aquí hay algunas funciones de ayuda sencillas para preparar los datos para COPY (formatos de texto y binarios) utilizando la información en la matriz de registros NumPy:
def prepare_text(dat):
cpy = BytesIO()
for row in dat:
cpy.write(''/t''.join([repr(x) for x in row]) + ''/n'')
return(cpy)
def prepare_binary(dat):
pgcopy_dtype = [(''num_fields'',''>i2'')]
for field, dtype in dat.dtype.descr:
pgcopy_dtype += [(field + ''_length'', ''>i4''),
(field, dtype.replace(''<'', ''>''))]
pgcopy = np.empty(dat.shape, pgcopy_dtype)
pgcopy[''num_fields''] = len(dat.dtype)
for i in range(len(dat.dtype)):
field = dat.dtype.names[i]
pgcopy[field + ''_length''] = dat.dtype[i].alignment
pgcopy[field] = dat[field]
cpy = BytesIO()
cpy.write(pack(''!11sii'', b''PGCOPY/n/377/r/n/0'', 0, 0))
cpy.write(pgcopy.tostring()) # all rows
cpy.write(pack(''!h'', -1)) # file trailer
return(cpy)
Así es como estoy usando las funciones de ayuda para comparar los dos métodos de formato COPY:
def time_pgcopy(dat, table, binary):
print(''Processing copy object for '' + table)
tstart = datetime.now()
if binary:
cpy = prepare_binary(dat)
else: # text
cpy = prepare_text(dat)
tendw = datetime.now()
print(''Copy object prepared in '' + str(tendw - tstart) + ''; '' +
str(cpy.tell()) + '' bytes; transfering to database'')
cpy.seek(0)
if binary:
curs.copy_expert(''COPY '' + table + '' FROM STDIN WITH BINARY'', cpy)
else: # text
curs.copy_from(cpy, table)
conn.commit()
tend = datetime.now()
print(''Database copy time: '' + str(tend - tendw))
print('' Total time: '' + str(tend - tstart))
return
time_pgcopy(data, ''num_data_text'', binary=False)
time_pgcopy(data, ''num_data_binary'', binary=True)
Aquí está la salida de los dos time_pgcopy
comandos time_pgcopy
:
Processing copy object for num_data_text
Copy object prepared in 0:01:15.288695; 84355016 bytes; transfering to database
Database copy time: 0:00:37.929166
Total time: 0:01:53.217861
Processing copy object for num_data_binary
Copy object prepared in 0:00:01.296143; 80000021 bytes; transfering to database
Database copy time: 0:00:23.325952
Total time: 0:00:24.622095
Por lo tanto, los pasos NumPy → file y file → database son mucho más rápidos con el enfoque binario. La diferencia obvia es cómo Python prepara el archivo COPY, que es muy lento para el texto. En términos generales, el formato binario se carga en la base de datos en 2/3 del tiempo como el formato de texto para este esquema.
Por último, comparé los valores en ambas tablas dentro de la base de datos para ver si los números eran diferentes. Alrededor de 1.46% de las filas tienen valores diferentes para la columna s0
, y esta fracción aumenta a 6.17% para s6
(probablemente relacionado con el método aleatorio que usé). Las diferencias absolutas distintas de cero entre todos los valores flotantes de 70M y 32 bits oscilan entre 9.3132257e-010 y 7.6293945e-006. Estas pequeñas diferencias entre el texto y los métodos de carga binarios se deben a la pérdida de precisión de las conversiones float → text → float requeridas para el método de formato de texto.
Here está mi versión. Basado en la versión de Mike.
Es muy ad-hoc pero hay dos ventajas:
- Espere el generador y actúe como una corriente sobrecargando
readline
- Ejemplo de cómo escribir en formato binario
hstore
.