flask celery flask-sqlalchemy

flask - Frasco con create_app, SQLAlchemy y Apio



celery flask-sqlalchemy (2)

Aquí hay una solución que funciona con el patrón de fábrica de aplicaciones de matraz y también crea una tarea de apio con contexto, sin necesidad de usar app.app_context() explícitamente en las tareas. En mi aplicación, es realmente difícil obtener ese objeto de aplicación mientras se evita las importaciones circulares, pero esto lo resuelve. Esto también es bueno para la última versión de apio 4.2 en el momento de escribir este artículo.

Estructura:

repo_name/ manage.py base/ base/__init__.py base/app.py base/runcelery.py base/celeryconfig.py base/utility/celery_util.py base/tasks/workers.py

Así que la base es el paquete de aplicación principal en este ejemplo. En la base/__init__.py creamos la instancia de apio de la siguiente manera:

from celery import Celery celery = Celery(''base'', config_source=''base.celeryconfig'')

El archivo base/app.py contiene la aplicación de fábrica fabrica create_app y init_celery(app, celery) el init_celery(app, celery) que contiene:

from base import celery from base.utility.celery_util import init_celery def create_app(config_obj): """An application factory, as explained here: http://flask.pocoo.org/docs/patterns/appfactories/. :param config_object: The configuration object to use. """ app = Flask(''base'') app.config.from_object(config_obj) init_celery(app, celery=celery) register_extensions(app) register_blueprints(app) register_errorhandlers(app) register_app_context_processors(app) return app

Pasando a la base/runcelery.py contenidos:

from flask.helpers import get_debug_flag from base.settings import DevConfig, ProdConfig from base import celery from base.app import create_app from base.utility.celery_util import init_celery CONFIG = DevConfig if get_debug_flag() else ProdConfig app = create_app(CONFIG) init_celery(app, celery)

A continuación, el archivo base/celeryconfig.py (como ejemplo):

# -*- coding: utf-8 -*- """ Configure Celery. See the configuration guide at -> http://docs.celeryproject.org/en/master/userguide/configuration.html#configuration """ ## Broker settings. broker_url = ''pyamqp://guest:guest@localhost:5672//'' broker_heartbeat=0 # List of modules to import when the Celery worker starts. imports = (''base.tasks.workers'',) ## Using the database to store task state and results. result_backend = ''rpc'' #result_persistent = False accept_content = [''json'', ''application/text''] result_serializer = ''json'' timezone = "UTC" # define periodic tasks / cron here # beat_schedule = { # ''add-every-10-seconds'': { # ''task'': ''workers.add_together'', # ''schedule'': 10.0, # ''args'': (16, 16) # }, # }

Ahora define el init_celery en el archivo base/utility/celery_util.py :

# -*- coding: utf-8 -*- def init_celery(app, celery): """Add flask app context to celery.Task""" TaskBase = celery.Task class ContextTask(TaskBase): abstract = True def __call__(self, *args, **kwargs): with app.app_context(): return TaskBase.__call__(self, *args, **kwargs) celery.Task = ContextTask

Para los trabajadores en base/tasks/workers.py :

from base import celery as celery_app from flask_security.utils import config_value, send_mail from base.bp.users.models.user_models import User @celery_app.task def send_welcome_email(email, user_id, confirmation_link): """Background task to send a welcome email with flask-security''s mail. You don''t need to use with app.app_context() as Task has app context. """ user = User.query.filter_by(id=user_id).first() print(f''sending user {user} a welcome email'') send_mail(config_value(''EMAIL_SUBJECT_REGISTER''), email, ''welcome'', user=user, confirmation_link=confirmation_link) @celery_app.task def do_some_stuff(): print(g)

Luego, debe iniciar el apio y el apio en dos indicaciones de cmd diferentes desde la carpeta repo_name .

En un indicador de cmd, haga un celery -A base.runcelery:celery beat y la otra celery -A base.runcelery:celery worker .

Luego, ejecute la tarea que necesitaba el contexto del matraz. Deberia trabajar.

Estoy realmente luchando para conseguir la configuración adecuada para Flask, SQLAlchemy y Celery. He buscado mucho y he intentado diferentes enfoques, nada parece funcionar. O me perdí el contexto de la aplicación o no puedo ejecutar los trabajadores o hay otros problemas. La estructura es muy general, por lo que puedo construir una aplicación más grande.

Estoy usando: Flask 0.10.1, SQLAlchemy 1.0, Celery 3.1.13, mi configuración actual es la siguiente:

app / __ init__.py

#Empty

app / config.py

import os basedir = os.path.abspath(os.path.dirname(__file__)) class Config: @staticmethod def init_app(app): pass class LocalConfig(Config): DEBUG = True SQLALCHEMY_DATABASE_URI = r"sqlite:///" + os.path.join(basedir, "data-dev.sqlite") CELERY_BROKER_URL = ''amqp://guest:guest@localhost:5672//'' config = { "local": LocalConfig}

app / exstensions.py

from flask.ext.sqlalchemy import SQLAlchemy from celery import Celery db = SQLAlchemy() celery = Celery()

app / factory.py

from extensions import db, celery from flask import Flask from flask import g from config import config def create_before_request(app): def before_request(): g.db = db return before_request def create_app(config_name): app = Flask(__name__) app.config.from_object(config[config_name]) db.init_app(app) celery.config_from_object(config) # Register the blueprints # Add the before request handler app.before_request(create_before_request(app)) return app

app / manage.py

from factory import create_app app = create_app("local") from flask import render_template from flask import request @app.route(''/test'', methods=[''POST'']) def task_simple(): import tasks tasks.do_some_stuff.delay() return "" if __name__ == "__main__": app.run()

app / models.py

from extensions import db class User(db.Model): __tablename__ = "user" id = db.Column(db.Integer, primary_key=True) username = db.Column(db.String(128), unique=True, nullable=False)

aplicación / tareas.py

from extensions import celery from celery.signals import task_prerun from flask import g, current_app @task_prerun.connect def close_session(*args, **kwargs): with current_app.app_context(): # use g.db print g @celery.task() def do_some_stuff(): with current_app.app_context(): # use g.db print g

En la carpeta de aplicaciones:

  • iniciando el servidor web de desarrollo con: python.exe manage.py
  • iniciando los trabajadores con: celery.exe worker -A tasks

Recibo un error de importación que no tiene ningún sentido para mí. ¿Debo estructurar la aplicación de manera diferente? Al final, creo que quiero una configuración bastante básica, por ejemplo, usando Flask con el patrón de fábrica, poder usar la extensión Flask-SQLAlchmey y tener algún trabajador que necesite acceder a la base de datos.

Cualquier ayuda es muy apreciada.

El rastreo se ejecuta al arrancar el trabajador de apio.

Traceback (most recent call last): File "[PATH]/scripts/celery-script.py", line 9, in <module> load_entry_point(''celery==3.1.13'', ''console_scripts'', ''celery'')() File "[PATH]/lib/site-packages/celery/__main__.py", line 30, in main main() File "[PATH]/lib/site-packages/celery/bin/celery.py", line 81, in main cmd.execute_from_commandline(argv) File "[PATH]/lib/site-packages/celery/bin/celery.py", line 769, in execute_from_commandline super(CeleryCommand, self).execute_from_commandline(argv))) File "[PATH]/lib/site-packages/celery/bin/base.py", line 305, in execute_from_commandline argv = self.setup_app_from_commandline(argv) File "[PATH]/lib/site-packages/celery/bin/base.py", line 473, in setup_app_from_commandline user_preload = tuple(self.app.user_options[''preload''] or ()) AttributeError: ''Flask'' object has no attribute ''user_options''

ACTUALIZACIÓN Cambio el código de acuerdo con la sugerencia del comentario. El trabajador se inicia ahora, pero cuando lo prueba con una solicitud de obtención a http://127.0.0.1:5000/test . Me sale el siguiente rastreo:

Traceback (most recent call last): File "[PATH]/lib/site-packages/celery/app/trace.py", line 230, in trace_task args=args, kwargs=kwargs) File "[PATH]/lib/site-packages/celery/utils/dispatch/signal.py", line 166, in send response = receiver(signal=self, sender=sender, /**named) File "[PATH]/app/stackoverflow/tasks.py", line 7, in close_session with current_app.app_context(): File "[PATH]/lib/site-packages/werkzeug/local.py", line 338, in __getattr__ return getattr(self._get_current_object(), name) File "[PATH]/lib/site-packages/werkzeug/local.py", line 297, in _get_current_object return self.__local() File "[PATH]/lib/site-packages/flask/globals.py", line 34, in _find_app raise RuntimeError(''working outside of application context'') RuntimeError: working outside of application context exc, exc_info.traceback)))

ACTUALIZACIÓN Basado en los comentarios de Marteen, cambié el código. La versión actual de trabajo se encuentra en: https://gist.github.com/anonymous/fa47834db2f4f3b8b257 . Cualquier mejora adicional o sugerencias son bienvenidas.


Me fui con el consejo de current_app.

Tu objeto de apio necesita acceso al contexto de la aplicación. Encontré información en línea sobre la creación del objeto Celery con una función de fábrica. El siguiente ejemplo se prueba sin un intermediario de mensajes.

#factory.py from celery import Celery from config import config def create_celery_app(app=None): app = app or create_app(config) celery = Celery(__name__, broker=app.config[''CELERY_BROKER_URL'']) celery.conf.update(app.config) TaskBase = celery.Task class ContextTask(TaskBase): abstract = True def __call__(self, *args, **kwargs): with app.app_context(): return TaskBase.__call__(self, *args, **kwargs) celery.Task = ContextTask return celery

y en task.py:

#tasks.py from factory import create_celery_app from celery.signals import task_prerun from flask import g celery = create_celery_app() @task_prerun.connect def celery_prerun(*args, **kwargs): #print g with celery.app.app_context(): # # use g.db print g @celery.task() def do_some_stuff(): with celery.app.app_context(): # use g.db g.user = "test" print g.user

Algunos enlaces:

Patrón de matraz para crear una instancia de apio con función de fábrica

Aplicación utilizando tanto la fábrica de aplicaciones como el apio.

Fuente para la aplicación factory.py.

Fuente para la aplicación tasks.py