django multiprocessing

Django multiprocesamiento y conexiones a bases de datos.



multiprocessing (8)

Fondo:

Estoy trabajando en un proyecto que utiliza Django con una base de datos de Postgres. También estamos usando mod_wsgi en caso de que eso importe, ya que algunas de mis búsquedas en la web han hecho mención de ello. Al enviar el formulario web, la vista de Django inicia un trabajo que llevará una cantidad de tiempo considerable (más de lo que el usuario desearía esperar), por lo que iniciamos el trabajo a través de una llamada al sistema en segundo plano. El trabajo que se está ejecutando ahora necesita poder leer y escribir en la base de datos. Debido a que este trabajo lleva mucho tiempo, usamos el multiprocesamiento para ejecutar partes de él en paralelo.

Problema:

La secuencia de comandos de nivel superior tiene una conexión de base de datos y cuando genera procesos secundarios, parece que la conexión de los padres está disponible para los niños. Luego hay una excepción acerca de cómo se debe llamar a SET TRANSACTION ISOLATION LEVEL antes de una consulta. La investigación ha indicado que esto se debe a tratar de usar la misma conexión de base de datos en múltiples procesos. Un hilo que encontré sugirió llamar a connection.close () al inicio de los procesos secundarios para que Django cree automáticamente una nueva conexión cuando la necesite, y por lo tanto, cada proceso secundario tendrá una conexión única, es decir, no compartida. Esto no funcionó para mí, ya que llamar a connection.close () en el proceso secundario hizo que el proceso principal se quejara de que se perdió la conexión.

Otros hallazgos:

Algunas cosas que leí parecían indicar que realmente no se puede hacer esto, y que el multiprocesamiento, mod_wsgi y Django no juegan bien juntos. Eso parece difícil de creer, supongo.

Algunos sugirieron usar apio, lo que podría ser una solución a largo plazo, pero no puedo instalar el apio en este momento, a la espera de algunos procesos de aprobación, por lo que no es una opción en este momento.

Encontré varias referencias en SO y en otros lugares sobre conexiones de base de datos persistentes, lo que creo que es un problema diferente.

También se encontraron referencias a psycopg2.pool y pgpool y algo acerca de bouncer. Es cierto que no entendía la mayor parte de lo que estaba leyendo sobre ellos, pero ciertamente no me llamó la atención por ser lo que estaba buscando.

"Work-Around" actual:

Por ahora, he vuelto a ejecutar las cosas en serie, y funciona, pero es más lento de lo que me gustaría.

¿Alguna sugerencia sobre cómo puedo usar el multiprocesamiento para ejecutar en paralelo? Parece que si los padres y los dos hijos tuvieran conexiones independientes con la base de datos, las cosas estarían bien, pero parece que no puedo tener ese comportamiento.

Gracias, y lo siento por la longitud!


(No es una gran solución, pero una posible solución)

¿Si no puede usar apio, tal vez podría implementar su propio sistema de colas, básicamente agregando tareas a alguna tabla de tareas y teniendo un cron regular que las elimine y procese? (a través de un comando de gestión)


Cuando use múltiples bases de datos, debe cerrar todas las conexiones.

from django import db for connection_name in db.connections.databases: db.connections[connection_name].close()

EDITAR

Use lo mismo que se menciona en @lechup para cerrar todas las conexiones (no estoy seguro desde qué versión de django se agregó este método):

from django import db db.connections.close_all()


El multiprocesamiento copia los objetos de conexión entre los procesos porque procesa los procesos y, por lo tanto, copia todos los descriptores de archivo del proceso principal. Dicho esto, una conexión al servidor SQL es solo un archivo, puede verlo en linux en / proc // fd / .... cualquier archivo abierto se compartirá entre procesos bifurcados. Puedes encontrar más sobre forking here .

Mi solución fue simplemente cerrar la conexión db justo antes de iniciar los procesos, cada proceso recrea la conexión en sí misma cuando la necesitará (probado en django 1.4):

from django import db db.connections.close_all() def db_worker(): some_paralell_code() Process(target = db_worker,args = ())

Pgbouncer / pgpool no está conectado con subprocesos en un significado de multiprocesamiento. Es más bien una solución para no cerrar la conexión en cada solicitud = acelerar la conexión a postgres mientras está bajo una carga alta.

Actualizar:

Para eliminar por completo los problemas con la conexión de la base de datos, simplemente mueva toda la lógica conectada a la base de datos a db_worker. Quería pasar QueryDict como argumento ... Mejor idea es simplemente pasar la lista de identificadores ... Ver QueryDict y values_list (''id'', flat = ¡Cierto), y no te olvides de convertirlo en lista! list (QueryDict) antes de pasar a db_worker. Gracias a eso no copiamos modelos de conexión de base de datos.

def db_worker(models_ids): obj = PartModelWorkerClass(model_ids) # here You do Model.objects.filter(id__in = model_ids) obj.run() model_ids = Model.objects.all().values_list(''id'', flat=True) model_ids = list(model_ids) # cast to list process_count = 5 delta = (len(model_ids) / process_count) + 1 # do all the db stuff here ... # here you can close db connection from django import db db.connections.close_all() for it in range(0:process_count): Process(target = db_worker,args = (model_ids[it*delta:(it+1)*delta]))


Hola, me encontré con este problema y pude resolverlo realizando lo siguiente (estamos implementando un sistema de tareas limitadas)

task.py

from django.db import connection def as_task(fn): """ this is a decorator that handles task duties, like setting up loggers, reporting on status...etc """ connection.close() # this is where i kill the database connection VERY IMPORTANT # This will force django to open a new unique connection, since on linux at least # Connections do not fare well when forked #...etc

ScheduledJob.py

from django.db import connection def run_task(request, job_id): """ Just a simple view that when hit with a specific job id kicks of said job """ # your logic goes here # ... processor = multiprocessing.Queue() multiprocessing.Process( target=call_command, # all of our tasks are setup as management commands in django args=[ job_info.management_command, ], kwargs= { ''web_processor'': processor, }.items() + vars(options).items()).start() result = processor.get(timeout=10) # wait to get a response on a successful init # Result is a tuple of [TRUE|FALSE,<ErrorMessage>] if not result[0]: raise Exception(result[1]) else: # THE VERY VERY IMPORTANT PART HERE, notice that up to this point we haven''t touched the db again, but now we absolutely have to call connection.close() connection.close() # we do some database accessing here to get the most recently updated job id in the database

Honestamente, para prevenir las condiciones de carrera (con múltiples usuarios simultáneos) sería mejor llamar a database.close () lo más rápido posible después de que haya finalizado el proceso. Todavía puede haber una posibilidad de que otro usuario en algún lugar de la línea realice una solicitud al db antes de que tenga la oportunidad de vaciar la base de datos.

Honestamente, sería más seguro y más inteligente que su fork no llame directamente al comando, ¡sino que llame a un script en el sistema operativo para que la tarea generada se ejecute en su propia shell django!


Para Python 3 y Django 1.9 esto es lo que funcionó para mí:

import multiprocessing import django django.setup() # Must call setup def db_worker(): for name, info in django.db.connections.databases.items(): # Close the DB connections django.db.connection.close() # Execute parallel code here if __name__ == ''__main__'': multiprocessing.Process(target=db_worker)

Tenga en cuenta que sin el django.setup () no podría hacer que esto funcione. Supongo que es necesario inicializar algo de nuevo para el multiprocesamiento.


Podría dar más recursos a Postgre, en Debian / Ubuntu puede editar:

nano /etc/postgresql/9.4/main/postgresql.conf

reemplazando 9.4 por su versión de postgre.

Aquí hay algunas líneas útiles que deben actualizarse con valores de ejemplo para hacerlo, los nombres hablan por sí solos:

max_connections=100 shared_buffers = 3000MB temp_buffers = 800MB effective_io_concurrency = 300 max_worker_processes = 80

Tenga cuidado de no aumentar demasiado estos parámetros, ya que podría provocar errores si Postgre intenta tomar más recursos que los disponibles. Los ejemplos anteriores se están ejecutando bien en una máquina Ram Debian de 8 GB equipada con 4 núcleos.


Si todo lo que necesita es paralelismo de E / S y no procesar el paralelismo, puede evitar este problema cambiando sus procesos a subprocesos. Reemplazar

from multiprocessing import Process

con

from threading import Thread

El objeto Thread tiene la misma interfaz que Procsess


Tuve problemas de "conexión cerrada" al ejecutar los casos de prueba de Django secuencialmente. Además de las pruebas, también hay otro proceso que modifica intencionalmente la base de datos durante la ejecución de la prueba. Este proceso se inicia en cada caso de prueba setUp ().

Una solución simple fue heredar mis clases de prueba de TransactionTestCase lugar de TestCase . Esto asegura que la base de datos se haya escrito realmente y que el otro proceso tenga una vista actualizada de los datos.