create django postgresql bulk-load

Django bulk_create con ignorar filas que causan IntegrityError?



bulk insert postgresql (5)

Esto ahora es posible en Django 2.2

Django 2.2 agrega una nueva opción bulk_create método bulk_create , de la documentation :

En las bases de datos que lo admiten (todas excepto PostgreSQL <9.5 y Oracle), establecer el parámetro ignore_conflicts en True indica a la base de datos que ignore el error de insertar filas que no superen las restricciones, como valores únicos duplicados. Al habilitar este parámetro, se desactiva la configuración de la clave principal en cada instancia del modelo (si la base de datos normalmente lo admite).

Ejemplo:

Entry.objects.bulk_create([ Entry(headline=''This is a test''), Entry(headline=''This is only a test''), ], ignore_conflicts=True)

Estoy utilizando bulk_create para cargar miles o filas en una base de datos postgresql. Desafortunadamente, algunas de las filas están causando IntegrityError y deteniendo el proceso bulk_create. Me preguntaba si habría una manera de decirle a Django que ignore esas filas y guarde la mayor cantidad posible del lote.


(Nota: no uso Django, por lo que puede haber respuestas más específicas para el marco)

Django no puede hacer esto simplemente ignorando las fallas de INSERT porque PostgreSQL anula toda la transacción en el primer error.

Django necesitaría uno de estos enfoques:

  1. INSERT cada fila en una transacción separada e ignore los errores (muy lento);
  2. Cree un SAVEPOINT antes de cada inserción (puede tener problemas de escala);
  3. Use un procedimiento o consulta para insertar solo si la fila no existe (complicado y lento); o
  4. Inserción masiva o (mejor) COPY los datos en una tabla TEMPORARY , luego fusionarlos en el lado del servidor de la tabla principal.

El enfoque de tipo "upsert" (3) parece una buena idea, pero " upsert" y "insert-si-no-existe" es sorprendentemente complicado .

Personalmente, tomaría (4): Me insertaría de forma masiva en una nueva tabla separada, probablemente UNLOGGED o TEMPORARY , luego ejecutaría algo de SQL manual para:

LOCK TABLE realtable IN EXCLUSIVE MODE; INSERT INTO realtable SELECT * FROM temptable WHERE NOT EXISTS ( SELECT 1 FROM realtable WHERE temptable.id = realtable.id );

La LOCK TABLE ... IN EXCLUSIVE MODE evita que una inserción concurrente que crea una fila provoque un conflicto con una inserción realizada por la declaración anterior y falle. No impide SELECT concurrentes, solo SELECT ... FOR UPDATE , INSERT , UPDATE y DELETE , por lo que las lecturas de la tabla continúan de manera normal.

Si no puede permitirse bloquear las escrituras concurrentes durante demasiado tiempo, podría utilizar un CTE de escritura para copiar los rangos de filas de temptable a realtable , reintentando cada bloque si falla.


Incluso en Django 1.11 no hay manera de hacer esto. Encontré una mejor opción que usar Raw SQL. Se utiliza djnago-query-builder . Tiene un método de upsert

from querybuilder.query import Query q = Query().from_table(YourModel) # replace with your real objects rows = [YourModel() for i in range(10)] q.upsert(rows, [''unique_fld1'', ''unique_fld2''], [''fld1_to_update'', ''fld2_to_update''])

Nota: la librería solo soporta postgreSQL

Aquí hay una gist que uso para la inserción masiva que admite ignorar IntegrityErrors y devuelve los registros insertados.


O 5. Divide y vencerás.

No probé ni comparé esto a fondo, pero funciona bastante bien para mí. YMMV, dependiendo en particular de cuántos errores espera obtener en una operación masiva.

def psql_copy(records): count = len(records) if count < 1: return True try: pg.copy_bin_values(records) return True except IntegrityError: if count == 1: # found culprit! msg = "Integrity error copying record:/n%r" logger.error(msg % records[0], exc_info=True) return False finally: connection.commit() # There was an integrity error but we had more than one record. # Divide and conquer. mid = count / 2 return psql_copy(records[:mid]) and psql_copy(records[mid:]) # or just return False


Una solución rápida y sucia para esto que no involucra el SQL manual y las tablas temporales es simplemente intentar insertar de forma masiva los datos. Si falla, vuelva a la inserción en serie.

objs = [(Event), (Event), (Event)...] try: Event.objects.bulk_create(objs) except IntegrityError: for obj in objs: try: obj.save() except IntegrityError: continue

Si tiene muchos y muchos errores, puede que esto no sea tan eficiente (pasará más tiempo insertándose en serie que haciéndolo a granel), pero estoy trabajando en un conjunto de datos de alta cardinalidad con algunos duplicados, por lo que esto resuelve la mayor parte de mi problemas.