python - flask marshmallow
Cómo usar Flask-SQLAlchemy en una tarea de apio (4)
Actualización: desde entonces hemos comenzado a usar una mejor manera de manejar el desmontaje de la aplicación y configurarla por tarea, según el patrón descrito en la documentación más reciente del matraz .
extensions.py
import flask
from flask.ext.sqlalchemy import SQLAlchemy
from celery import Celery
class FlaskCelery(Celery):
def __init__(self, *args, **kwargs):
super(FlaskCelery, self).__init__(*args, **kwargs)
self.patch_task()
if ''app'' in kwargs:
self.init_app(kwargs[''app''])
def patch_task(self):
TaskBase = self.Task
_celery = self
class ContextTask(TaskBase):
abstract = True
def __call__(self, *args, **kwargs):
if flask.has_app_context():
return TaskBase.__call__(self, *args, **kwargs)
else:
with _celery.app.app_context():
return TaskBase.__call__(self, *args, **kwargs)
self.Task = ContextTask
def init_app(self, app):
self.app = app
self.config_from_object(app.config)
celery = FlaskCelery()
db = SQLAlchemy()
app.py
from flask import Flask
from extensions import celery, db
def create_app():
app = Flask()
#configure/initialize all your extensions
db.init_app(app)
celery.init_app(app)
return app
Una vez que haya configurado su aplicación de esta manera, puede ejecutar y usar apio sin tener que ejecutarlo explícitamente desde un contexto de aplicación, ya que todas sus tareas se ejecutarán automáticamente en un contexto de aplicación si es necesario, y usted no tiene Preocuparse explícitamente por el desmontaje posterior a la tarea, que es un tema importante de gestionar (ver otras respuestas a continuación).
La respuesta anterior a continuación sigue funcionando pero no es una solución tan limpia
Prefiero ejecutar todo el apio dentro del contexto de la aplicación creando un archivo separado que invoque apio.start () con el contexto de la aplicación. Esto significa que su archivo de tareas no tiene que estar lleno de configuraciones de contexto y desmontables. También se presta bien al patrón de fábrica de aplicaciones del matraz.
extensions.py
from from flask.ext.sqlalchemy import SQLAlchemy
from celery import Celery
db = SQLAlchemy()
celery = Celery()
tasks.py
from extensions import celery, db
from flask.globals import current_app
from celery.signals import task_postrun
@celery.task
def do_some_stuff():
current_app.logger.info("I have the application context")
#you can now use the db object from extensions
@task_postrun.connect
def close_session(*args, **kwargs):
# Flask SQLAlchemy will automatically create new sessions for you from
# a scoped session factory, given that we are maintaining the same app
# context, this ensures tasks have a fresh session (e.g. session errors
# won''t propagate across tasks)
db.session.remove()
app.py
from extensions import celery, db
def create_app():
app = Flask()
#configure/initialize all your extensions
db.init_app(app)
celery.config_from_object(app.config)
return app
RunCelery.py
from app import create_app
from extensions import celery
app = create_app()
if __name__ == ''__main__'':
with app.app_context():
celery.start()
Recientemente cambié a Celery 3.0. Antes de eso, estaba usando Flask-Celery para integrar Apio con Frasco. Aunque tenía muchos problemas, como ocultar algunas potentes funcionalidades de Aplery, pero me permitió utilizar el contexto completo de la aplicación Flask y, especialmente, Flask-SQLAlchemy.
En mis tareas de fondo estoy procesando datos y SQLAlchemy ORM para almacenar los datos. El mantenedor de Flask-Celery ha dejado de admitir el complemento. El complemento estaba reduciendo la instancia de Flask en la tarea para poder tener acceso completo a SQLAlchemy.
Estoy intentando replicar este comportamiento en mi archivo tasks.py pero sin éxito. ¿Tiene alguna pista sobre cómo lograr esto?
En su archivo tasks.py haga lo siguiente:
from main import create_app
app = create_app()
celery = Celery(__name__)
celery.add_defaults(lambda: app.config)
@celery.task
def create_facet(project_id, **kwargs):
with app.test_request_context():
# your code
Usé la respuesta de Paul Gibbs con dos diferencias. En lugar de task_postrun usé worker_process_init. Y en lugar de .remove () utilicé db.session.expire_all ().
No estoy 100% seguro, pero por lo que entiendo, la forma en que esto funciona es cuando Celery crea un proceso de trabajo, todas las sesiones de db heredadas / compartidas caducarán, y SQLAlchemy creará nuevas sesiones a pedido exclusivas de ese proceso de trabajo.
Hasta ahora parece que ha arreglado mi problema. Con la solución de Paul, cuando un trabajador terminaba y eliminaba la sesión, otro trabajador que usaba la misma sesión seguía ejecutando su consulta, por lo que db.session.remove () cerró la conexión mientras se estaba utilizando, lo que me dio una "conexión perdida a MySQL". servidor durante la consulta "excepción.
¡Gracias Paul por dirigirme en la dirección correcta!
No importa que no funcionó. Terminé teniendo una discusión en mi fábrica de aplicaciones Flask para no ejecutar db.init_app (aplicación) si Apio lo llamaba. En cambio, los trabajadores lo llamarán después de que Aplery los bifurca. Ahora veo varias conexiones en mi lista de procesos MySQL.
from extensions import db
from celery.signals import worker_process_init
from flask import current_app
@worker_process_init.connect
def celery_worker_init_db(**_):
db.init_app(current_app)
from flask import Flask
from werkzeug.utils import import_string
from celery.signals import worker_process_init, celeryd_init
from flask_celery import Celery
from src.app import config_from_env, create_app
celery = Celery()
def get_celery_conf():
config = import_string(''src.settings'')
config = {k: getattr(config, k) for k in dir(config) if k.isupper()}
config[''BROKER_URL''] = config[''CELERY_BROKER_URL'']
return config
@celeryd_init.connect
def init_celeryd(conf=None, **kwargs):
conf.update(get_celery_conf())
@worker_process_init.connect
def init_celery_flask_app(**kwargs):
app = create_app()
app.app_context().push()
- Actualizar la configuración de apio en apical init
- Use la fábrica de su matraz para iniciar todas las extensiones de matraz, incluida la extensión SQLAlchemy.
Al hacer esto, podemos mantener la conexión de la base de datos por trabajador.
Si desea ejecutar su tarea en contexto de matraz, puede subclase Task.__call__
:
class SmartTask(Task):
abstract = True
def __call__(self, *_args, **_kwargs):
with self.app.flask_app.app_context():
with self.app.flask_app.test_request_context():
result = super(SmartTask, self).__call__(*_args, **_kwargs)
return result
class SmartCelery(Celery):
def init_app(self, app):
super(SmartCelery, self).init_app(app)
self.Task = SmartTask