python - create_engine - sqlalchemy sql server
¿Cómo manejar SQLAlchemy Connections en ProcessPool? (3)
@roman: Buen desafío que tienes ahí.
He estado en un escenario similar antes, así que aquí están mis 2 centavos : a menos que este consumidor solo "lea" y "escriba" el mensaje, sin hacer ningún procesamiento real de él, podría rediseñar este consumidor como consumidor / productor que consumirá el mensaje, procesará el mensaje y luego pondrá el resultado en otra cola, esa cola (mensajes procesados, por ejemplo) podría leerse por 1..N procesos asíncronos no agrupados que tendrían que abrir la conexión de DB en su propia cuenta Ciclo de vida completo.
Puedo extender mi respuesta, pero no sé si este enfoque se ajusta a sus necesidades; si es así, puedo brindarle más detalles sobre el diseño ampliado.
Tengo un reactor que obtiene mensajes de un agente RabbitMQ y activa métodos de trabajo para procesar estos mensajes en un grupo de procesos, algo como esto:
Esto se implementa utilizando python asyncio
, loop.run_in_executor()
y concurrent.futures.ProcessPoolExecutor
.
Ahora quiero acceder a la base de datos en los métodos de trabajo utilizando SQLAlchemy. Principalmente el procesamiento será muy sencillo y rápido de las operaciones de CRUD.
El reactor procesará 10-50 mensajes por segundo al principio, por lo que no es aceptable abrir una nueva conexión de base de datos para cada solicitud. Más bien me gustaría mantener una conexión persistente por proceso.
Mis preguntas son: ¿Cómo puedo hacer esto? ¿Puedo simplemente almacenarlos en una variable global? ¿El grupo de conexión SQA manejará esto por mí? ¿Cómo limpiar cuando el reactor se detiene?
[Actualizar]
- La base de datos es MySQL con InnoDB.
¿Por qué elegir este patrón con un grupo de procesos?
La implementación actual utiliza un patrón diferente donde cada consumidor se ejecuta en su propio hilo. De alguna manera esto no funciona muy bien. Ya hay alrededor de 200 consumidores cada uno ejecutando en su propio subproceso, y el sistema está creciendo rápidamente. Para escalar mejor, la idea era separar las preocupaciones y consumir mensajes en un bucle de E / S y delegar el procesamiento a un grupo. Por supuesto, el rendimiento de todo el sistema está principalmente vinculado a la E / S. Sin embargo, la CPU es un problema al procesar grandes conjuntos de resultados.
La otra razón fue la "facilidad de uso". Si bien el manejo de la conexión y el consumo de mensajes se implementan de forma asíncrona, el código en el trabajador puede ser sincrónico y simple.
Pronto se hizo evidente que el acceso a los sistemas remotos a través de conexiones de red persistentes desde el trabajador es un problema. Esto es para lo que son los canales de comunicación: Dentro del trabajador, puedo otorgar solicitudes al bus de mensajes a través de estos canales.
Una de mis ideas actuales es manejar el acceso a la base de datos de una manera similar: pasar las declaraciones a través de una cola al bucle de eventos donde se envían a la base de datos. Sin embargo, no tengo idea de cómo hacer esto con SQLAlchemy. ¿Dónde estaría el punto de entrada? Los objetos deben ser pickled
cuando pasan a través de una cola. ¿Cómo obtengo tal objeto de una consulta SQA? La comunicación con la base de datos debe funcionar de forma asíncrona para no bloquear el bucle de eventos. ¿Puedo usar, por ejemplo, aiomysql como un controlador de base de datos para SQA?
Su requisito de una conexión de base de datos por proceso de agrupación de procesos puede satisfacerse fácilmente si se tiene cuidado de cómo ejemplificar la session
, suponiendo que está trabajando con el orm, en los procesos de trabajo.
Una solución simple sería tener una session global que reutilizar en todas las solicitudes:
# db.py
engine = create_engine("connection_uri", pool_size=1, max_overflow=0)
DBSession = scoped_session(sessionmaker(bind=engine))
Y en la tarea del trabajador:
# task.py
from db import engine, DBSession
def task():
DBSession.begin() # each task will get its own transaction over the global connection
...
DBSession.query(...)
...
DBSession.close() # cleanup on task end
Los argumentos pool_size
y max_overflow
customize el QueuePool predeterminado utilizado por create_engine. pool_size
se asegurará de que su proceso solo mantenga viva una conexión por proceso en el grupo de procesos.
Si desea que se vuelva a conectar, puede usar DBSession.remove()
que eliminará la sesión del registro y la volverá a conectar la próxima vez que use DBSession. También puede usar el argumento de recycle
de Pool para hacer que la conexión se vuelva a conectar después de la cantidad de tiempo especificada.
Durante el desarrollo / depuración de errores, puede usar AssertionPool que generará una excepción si más de una conexión se retira del grupo, vea cómo cambiar las implementaciones de los grupos de servidores.
Un enfoque que me ha servido muy bien es usar un servidor web para manejar y escalar el grupo de procesos. flask-sqlalchemy incluso en su estado predeterminado mantendrá un grupo de conexiones y no cerrará cada conexión en cada ciclo de respuesta de solicitud.
El ejecutor asyncio puede simplemente llamar a los puntos finales de url para ejecutar sus funciones. El beneficio adicional es que dado que todos los procesos que realizan el trabajo están detrás de una url, puede escalar de manera trivial su grupo de trabajadores a través de máquinas múltiples, agregando más procesos a través de Gunicorn o uno de los muchos otros métodos para escalar un servidor wsgi simple. Además, obtienes toda la bondad tolerante a los fallos.
El inconveniente es que podría estar pasando más información a través de la red. Sin embargo, como usted dice, el problema está vinculado a la CPU y probablemente pasará muchos más datos hacia y desde la base de datos.