python postgresql connection-pooling celery

python - Conexión de la base de datos de apio Worker



postgresql connection-pooling (6)

Contribuir con mis hallazgos mediante la implementación y el seguimiento.

Bienvenidos comentarios.

Referencia: uso de la agrupación http://www.prschmid.com/2013/04/using-sqlalchemy-with-celery-tasks.html

Cada proceso de trabajo (modo prefork especificado por -ck) establecerá una nueva conexión a la base de datos sin agrupar o reutilizar. Entonces, si se usa la agrupación, la agrupación se ve solo en cada nivel de proceso de trabajador. Por lo tanto, el tamaño del grupo> 1 no es útil, pero reutilizar la conexión aún está bien para guardar la conexión de abrir y cerrar.

Si se usa una conexión por proceso de trabajador, se establece 1 conexión de base de datos por proceso de trabajador (apio prefork modo -Trabajador de aplicación -k) en la fase de inicialización. Guarda la conexión de abrir y cerrar repetidamente.

No importa la cantidad de subprocesos de trabajo (eventlet), cada subproceso de trabajo (celery -A application worker -P eventlet) solo establece una conexión a la base de datos sin agrupar o reutilizar. Así que para eventlet, todos los subprocesos de trabajo (eventlets) en un proceso de apio (apio - Un trabajador de aplicación ...) tienen una conexión de 1 db en cada momento.

Según los documentos de apio.

pero debe asegurarse de que sus tareas no realicen llamadas de bloqueo, ya que esto detendrá todas las demás operaciones en el trabajador hasta que vuelva la llamada de bloqueo.

Probablemente se deba a que la conexión de base de datos MYSQL está bloqueando llamadas.

Estoy usando el apio independiente (no dentro de Django). Estoy planeando tener un tipo de tarea de trabajador ejecutándose en múltiples máquinas físicas. La tarea hace lo siguiente

  1. Aceptar un documento XML.
  2. Transformalo
  3. Haz múltiples bases de datos de lecturas y escrituras.

Estoy usando PostgreSQL, pero esto se aplicaría por igual a otros tipos de tiendas que usan conexiones. En el pasado, he usado un grupo de conexión de base de datos para evitar crear una nueva conexión de base de datos en cada solicitud o evitar mantener la conexión abierta demasiado tiempo. Sin embargo, dado que cada trabajador de Apio se ejecuta en un proceso separado, no estoy seguro de cómo podrían compartir el grupo. ¿Me estoy perdiendo de algo? Sé que Celery le permite persistir un resultado devuelto por un trabajador de Celery, pero eso no es lo que estoy tratando de hacer aquí. Cada tarea puede realizar varias actualizaciones o inserciones diferentes según los datos procesados.

¿Cuál es la forma correcta de acceder a una base de datos desde un trabajador de apio?

¿Es posible compartir un grupo entre varios trabajadores / tareas o hay alguna otra forma de hacer esto?


Me gusta la idea de tigeronk2 de una conexión por trabajador. Como él dice, Celery mantiene su propio grupo de trabajadores, por lo que realmente no hay necesidad de un grupo de conexión de base de datos separado. Los documentos de Celery Signal explican cómo realizar la inicialización personalizada cuando se crea un trabajador, por lo que agregué el siguiente código a mi tasks.py y parece que funciona exactamente como esperaría. Incluso pude cerrar las conexiones cuando los trabajadores están apagados:

db_conn = None @worker_process_init.connect def init_worker(**kwargs): global db_conn print(''Initializing database connection for worker.'') db_conn = db.connect(DB_CONNECT_STRING) @worker_process_shutdown.connect def shutdown_worker(**kwargs): global db_conn if db_conn: print(''Closing database connectionn for worker.'') db_conn.close()


Puede anular el comportamiento predeterminado para tener trabajadores de subprocesos en lugar de un trabajador por proceso en su configuración de apio:

CELERYD_POOL = "celery.concurrency.threads.TaskPool"

Luego, puede almacenar la instancia de grupo compartido en su instancia de tarea y hacer referencia a ella desde cada invocación de tarea con hilos.


Quizás puedas usar pgbouncer . Para el apio, nada debe cambiar y la agrupación de conexiones se realiza fuera de los procesos. Tengo el mismo issue .

(''quizás'' porque no estoy seguro de si podría haber algún efecto secundario)


Tal vez, celery.concurrency.gevent podría proporcionar el uso compartido de la piscina y no agravar el GIL. Sin embargo, su soporte sigue siendo "experimental".

Y un psycopg2.pool.SimpleConnectionPool para compartir entre greenlets (coroutines) que se ejecutarán en un solo proceso / hilo.

Un poco de otra discusión de stack sobre el tema.


Tener una conexión DB por proceso de trabajo. Dado que el apio en sí mismo mantiene un grupo de procesos de trabajo, sus conexiones db siempre serán iguales a la cantidad de trabajadores de apio. Por otro lado, más o menos, se vinculará la agrupación de conexiones db a la gestión de procesos de apio. Pero eso debería estar bien, dado que GIL solo permite un hilo a la vez en un proceso.