with shared_task run how celery_result_backend celery_broker_url autodiscover_tasks app python mysql sqlalchemy celery django-celery

python - shared_task - Crear una conexión de base de datos separada para cada trabajador de apio



python django celery (1)

Sigo teniendo problemas con mysql extraños mientras que los trabajadores ejecutan tareas justo después de la creación.

Usamos django 1.3, apio 3.1.17, djorm-ext-pool 0.5

Comenzamos el proceso de apio con concurrencia 3. Hasta el momento, mi obeservación es que, cuando comienza el proceso de los trabajadores, todos obtienen la misma conexión de mysql. Registramos el ID de conexión de db como a continuación.

from django.db import connection connection.cursor() logger.info("Task %s processing with db connection %s", str(task_id), str(connection.connection.thread_id()))

Cuando todos los trabajadores obtienen tareas, la primera se ejecuta con éxito, pero las otras dos dan errores Mysql extraños. O bien los errores con "servidor Mysql desaparecido", o con una condición en la que Django arroja el error "DoesNotExist". claramente existen los objetos que Django está consultando.

Después de este error, cada trabajador comienza a obtener su propia conexión de base de datos después de lo cual no encontramos ningún problema.

¿Cuál es el comportamiento predeterminado del apio? Está diseñado para compartir la misma conexión de base de datos. Si es así, ¿cómo se maneja la comunicación entre procesos? Idealmente, preferiría una conexión de base de datos diferente para cada trabajador.

Probé el código mencionado en el siguiente enlace que no funcionó. Conexión de la base de datos del trabajador de apio

También hemos corregido el código de apio que se sugiere a continuación. https://github.com/celery/celery/issues/2453

Para aquellos que rechazaron la pregunta, amablemente háganme saber el motivo del voto a la baja.


El apio se inicia con el comando debajo

celery -A myproject worker --loglevel=debug --concurrency=3 -Q testqueue

myproject.py como parte del proceso maestro realizaba algunas consultas a la base de datos mysql antes de bifurcar los procesos de trabajo.

Como parte del flujo de consultas en el proceso principal, django ORM crea un conjunto de conexiones sqlalchemy si aún no existe. Los procesos de trabajo se crean.

El apio como parte de las reparaciones de django cierra las conexiones existentes.

def close_database(self, **kwargs): if self._close_old_connections: return self._close_old_connections() # Django 1.6 if not self.db_reuse_max: return self._close_database() if self._db_recycles >= self.db_reuse_max * 2: self._db_recycles = 0 self._close_database() self._db_recycles += 1

En efecto, lo que podría estar sucediendo es que el objeto del conjunto sqlalchemy con una conexión db no utilizada se copia en el proceso de 3 trabajadores cuando se bifurca. Entonces, los 3 grupos diferentes tienen 3 objetos de conexión que apuntan al mismo descriptor de archivo de conexión.

Los trabajadores mientras ejecutan las tareas cuando se les solicita una conexión db, todos los trabajadores obtienen la misma conexión no utilizada del grupo sqlalchemy porque actualmente no se utiliza. El hecho de que todas las conexiones apuntan al mismo descriptor de archivo ha causado que la conexión MySQL desaparezca.

Las nuevas conexiones creadas allí después son todas nuevas y no apuntan al mismo descriptor de archivo de socket.

Solución:

En el proceso principal, agregue

from django.db import connection connection.cursor()

antes de que se realice cualquier importación es decir, antes incluso se djorm-ext-pool módulo djorm-ext-pool .

De esta forma, todas las consultas db usarán la conexión creada por django fuera del grupo. Cuando la corrección aprya django cierra la conexión, la conexión realmente se cierra en lugar de volver al grupo de alquimia que sale del grupo de alquimia sin conexiones en el momento de hacer frente a todos los trabajadores cuando se bifurcan. Luego, cuando los trabajadores piden conexión db, sqlalchemy devuelve una de las conexiones recién creadas.