tutorial quick guide español create python multithreading session sqlalchemy

python - quick - Manejo adecuado de sesiones de SQLAlchemy en aplicaciones de múltiples hilos



sqlalchemy select (1)

Tengo problemas para entender cómo abrir y cerrar las sesiones de bases de datos de manera eficiente, como entendí en la documentación de sqlalchemy, si uso scoped_session para construir mi objeto Session, y luego uso el objeto Sesión devuelto para crear sesiones, es threadsafe, así que básicamente cada hilo obtendrá su propia sesión, y no habrá problemas con ella. Ahora el ejemplo siguiente funciona, lo puse en un ciclo infinito para ver si cierra las sesiones correctamente y si lo controlé correctamente (en mysql ejecutando "SHOW PROCESSLIST;"), las conexiones siguen creciendo, no las cierra , aunque utilicé session.close (), e incluso eliminé el objeto scoped_session al final de cada ejecución. ¿Qué estoy haciendo mal? Mi objetivo en una aplicación más grande es usar la cantidad mínima de conexiones de base de datos requeridas, porque mi implementación de trabajo actual crea una nueva sesión en cada método donde es necesaria y la cierra antes de regresar, lo que parece ineficaz.

from sqlalchemy import create_engine from sqlalchemy.orm import sessionmaker, scoped_session from threading import Thread from Queue import Queue, Empty as QueueEmpty from models import MyModel DATABASE_CONNECTION_INFO = ''mysql://username:password@localhost:3306/dbname'' class MTWorker(object): def __init__(self, worker_count=5): self.task_queue = Queue() self.worker_count = worker_count self.db_engine = create_engine(DATABASE_CONNECTION_INFO, echo=False) self.DBSession = scoped_session( sessionmaker( autoflush=True, autocommit=False, bind=self.db_engine ) ) def _worker(self): db_session = self.DBSession() while True: try: task_id = self.task_queue.get(False) try: item = db_session.query(MyModel).filter(MyModel.id == task_id).one() # do something with item except Exception as exc: # if an error occurrs we skip it continue finally: db_session.commit() self.task_queue.task_done() except QueueEmpty: db_session.close() return def start(self): try: db_session = self.DBSession() all_items = db_session.query(MyModel).all() for item in all_items: self.task_queue.put(item.id) for _i in range(self.worker_count): t = Thread(target=self._worker) t.start() self.task_queue.join() finally: db_session.close() self.DBSession.remove() if __name__ == ''__main__'': while True: mt_worker = MTWorker(worker_count=50) mt_worker.start()


Solo debe llamar a create_engine y scoped_session una vez por proceso (por base de datos). Cada uno obtendrá su propio grupo de conexiones o sesiones (respectivamente), por lo que debe asegurarse de crear solo un grupo. Simplemente conviértalo en un nivel de módulo global. si necesita administrar sus sesiones de manera más concreta, probablemente no deba usar scoped_session

Otro cambio es usar DBSession directamente como si fuera una sesión. los métodos de sesión de llamada en scoped_session crearán de forma transparente una sesión local de subprocesos, si es necesario, y reenviarán la llamada de método a la sesión.

Otra cosa a tener en cuenta es el pool_size del grupo de conexiones, que es 5 por defecto. Para muchas aplicaciones está bien, pero si está creando muchos subprocesos, es posible que deba ajustar ese parámetro

DATABASE_CONNECTION_INFO = ''mysql://username:password@localhost:3306/dbname'' db_engine = create_engine(DATABASE_CONNECTION_INFO, echo=False) DBSession = scoped_session( sessionmaker( autoflush=True, autocommit=False, bind=db_engine ) ) class MTWorker(object): def __init__(self, worker_count=5): self.task_queue = Queue() self.worker_count = worker_count # snip