flask - Frasco con create_app, SQLAlchemy y Apio
celery flask-sqlalchemy (2)
Aquí hay una solución que funciona con el patrón de fábrica de aplicaciones de matraz y también crea una tarea de apio con contexto, sin necesidad de usar app.app_context()
explícitamente en las tareas. En mi aplicación, es realmente difícil obtener ese objeto de aplicación mientras se evita las importaciones circulares, pero esto lo resuelve. Esto también es bueno para la última versión de apio 4.2 en el momento de escribir este artículo.
Estructura:
repo_name/
manage.py
base/
base/__init__.py
base/app.py
base/runcelery.py
base/celeryconfig.py
base/utility/celery_util.py
base/tasks/workers.py
Así que la base
es el paquete de aplicación principal en este ejemplo. En la base/__init__.py
creamos la instancia de apio de la siguiente manera:
from celery import Celery
celery = Celery(''base'', config_source=''base.celeryconfig'')
El archivo base/app.py
contiene la aplicación de fábrica fabrica create_app
y init_celery(app, celery)
el init_celery(app, celery)
que contiene:
from base import celery
from base.utility.celery_util import init_celery
def create_app(config_obj):
"""An application factory, as explained here:
http://flask.pocoo.org/docs/patterns/appfactories/.
:param config_object: The configuration object to use.
"""
app = Flask(''base'')
app.config.from_object(config_obj)
init_celery(app, celery=celery)
register_extensions(app)
register_blueprints(app)
register_errorhandlers(app)
register_app_context_processors(app)
return app
Pasando a la base/runcelery.py
contenidos:
from flask.helpers import get_debug_flag
from base.settings import DevConfig, ProdConfig
from base import celery
from base.app import create_app
from base.utility.celery_util import init_celery
CONFIG = DevConfig if get_debug_flag() else ProdConfig
app = create_app(CONFIG)
init_celery(app, celery)
A continuación, el archivo base/celeryconfig.py
(como ejemplo):
# -*- coding: utf-8 -*-
"""
Configure Celery. See the configuration guide at ->
http://docs.celeryproject.org/en/master/userguide/configuration.html#configuration
"""
## Broker settings.
broker_url = ''pyamqp://guest:guest@localhost:5672//''
broker_heartbeat=0
# List of modules to import when the Celery worker starts.
imports = (''base.tasks.workers'',)
## Using the database to store task state and results.
result_backend = ''rpc''
#result_persistent = False
accept_content = [''json'', ''application/text'']
result_serializer = ''json''
timezone = "UTC"
# define periodic tasks / cron here
# beat_schedule = {
# ''add-every-10-seconds'': {
# ''task'': ''workers.add_together'',
# ''schedule'': 10.0,
# ''args'': (16, 16)
# },
# }
Ahora define el init_celery en el archivo base/utility/celery_util.py
:
# -*- coding: utf-8 -*-
def init_celery(app, celery):
"""Add flask app context to celery.Task"""
TaskBase = celery.Task
class ContextTask(TaskBase):
abstract = True
def __call__(self, *args, **kwargs):
with app.app_context():
return TaskBase.__call__(self, *args, **kwargs)
celery.Task = ContextTask
Para los trabajadores en base/tasks/workers.py
:
from base import celery as celery_app
from flask_security.utils import config_value, send_mail
from base.bp.users.models.user_models import User
@celery_app.task
def send_welcome_email(email, user_id, confirmation_link):
"""Background task to send a welcome email with flask-security''s mail.
You don''t need to use with app.app_context() as Task has app context.
"""
user = User.query.filter_by(id=user_id).first()
print(f''sending user {user} a welcome email'')
send_mail(config_value(''EMAIL_SUBJECT_REGISTER''),
email,
''welcome'', user=user,
confirmation_link=confirmation_link)
@celery_app.task
def do_some_stuff():
print(g)
Luego, debe iniciar el apio y el apio en dos indicaciones de cmd diferentes desde la carpeta repo_name
.
En un indicador de cmd, haga un celery -A base.runcelery:celery beat
y la otra celery -A base.runcelery:celery worker
.
Luego, ejecute la tarea que necesitaba el contexto del matraz. Deberia trabajar.
Estoy realmente luchando para conseguir la configuración adecuada para Flask, SQLAlchemy y Celery. He buscado mucho y he intentado diferentes enfoques, nada parece funcionar. O me perdí el contexto de la aplicación o no puedo ejecutar los trabajadores o hay otros problemas. La estructura es muy general, por lo que puedo construir una aplicación más grande.
Estoy usando: Flask 0.10.1, SQLAlchemy 1.0, Celery 3.1.13, mi configuración actual es la siguiente:
app / __ init__.py
#Empty
app / config.py
import os
basedir = os.path.abspath(os.path.dirname(__file__))
class Config:
@staticmethod
def init_app(app):
pass
class LocalConfig(Config):
DEBUG = True
SQLALCHEMY_DATABASE_URI = r"sqlite:///" + os.path.join(basedir,
"data-dev.sqlite")
CELERY_BROKER_URL = ''amqp://guest:guest@localhost:5672//''
config = {
"local": LocalConfig}
app / exstensions.py
from flask.ext.sqlalchemy import SQLAlchemy
from celery import Celery
db = SQLAlchemy()
celery = Celery()
app / factory.py
from extensions import db, celery
from flask import Flask
from flask import g
from config import config
def create_before_request(app):
def before_request():
g.db = db
return before_request
def create_app(config_name):
app = Flask(__name__)
app.config.from_object(config[config_name])
db.init_app(app)
celery.config_from_object(config)
# Register the blueprints
# Add the before request handler
app.before_request(create_before_request(app))
return app
app / manage.py
from factory import create_app
app = create_app("local")
from flask import render_template
from flask import request
@app.route(''/test'', methods=[''POST''])
def task_simple():
import tasks
tasks.do_some_stuff.delay()
return ""
if __name__ == "__main__":
app.run()
app / models.py
from extensions import db
class User(db.Model):
__tablename__ = "user"
id = db.Column(db.Integer, primary_key=True)
username = db.Column(db.String(128), unique=True, nullable=False)
aplicación / tareas.py
from extensions import celery
from celery.signals import task_prerun
from flask import g, current_app
@task_prerun.connect
def close_session(*args, **kwargs):
with current_app.app_context():
# use g.db
print g
@celery.task()
def do_some_stuff():
with current_app.app_context():
# use g.db
print g
En la carpeta de aplicaciones:
- iniciando el servidor web de desarrollo con:
python.exe manage.py
- iniciando los trabajadores con:
celery.exe worker -A tasks
Recibo un error de importación que no tiene ningún sentido para mí. ¿Debo estructurar la aplicación de manera diferente? Al final, creo que quiero una configuración bastante básica, por ejemplo, usando Flask con el patrón de fábrica, poder usar la extensión Flask-SQLAlchmey y tener algún trabajador que necesite acceder a la base de datos.
Cualquier ayuda es muy apreciada.
El rastreo se ejecuta al arrancar el trabajador de apio.
Traceback (most recent call last):
File "[PATH]/scripts/celery-script.py", line 9, in <module>
load_entry_point(''celery==3.1.13'', ''console_scripts'', ''celery'')()
File "[PATH]/lib/site-packages/celery/__main__.py", line 30, in main
main()
File "[PATH]/lib/site-packages/celery/bin/celery.py", line 81, in main
cmd.execute_from_commandline(argv)
File "[PATH]/lib/site-packages/celery/bin/celery.py", line 769, in execute_from_commandline
super(CeleryCommand, self).execute_from_commandline(argv)))
File "[PATH]/lib/site-packages/celery/bin/base.py", line 305, in execute_from_commandline
argv = self.setup_app_from_commandline(argv)
File "[PATH]/lib/site-packages/celery/bin/base.py", line 473, in setup_app_from_commandline
user_preload = tuple(self.app.user_options[''preload''] or ())
AttributeError: ''Flask'' object has no attribute ''user_options''
ACTUALIZACIÓN Cambio el código de acuerdo con la sugerencia del comentario. El trabajador se inicia ahora, pero cuando lo prueba con una solicitud de obtención a http://127.0.0.1:5000/test
. Me sale el siguiente rastreo:
Traceback (most recent call last):
File "[PATH]/lib/site-packages/celery/app/trace.py", line 230, in trace_task
args=args, kwargs=kwargs)
File "[PATH]/lib/site-packages/celery/utils/dispatch/signal.py", line 166, in send
response = receiver(signal=self, sender=sender, /**named)
File "[PATH]/app/stackoverflow/tasks.py", line 7, in close_session
with current_app.app_context():
File "[PATH]/lib/site-packages/werkzeug/local.py", line 338, in __getattr__
return getattr(self._get_current_object(), name)
File "[PATH]/lib/site-packages/werkzeug/local.py", line 297, in _get_current_object
return self.__local()
File "[PATH]/lib/site-packages/flask/globals.py", line 34, in _find_app
raise RuntimeError(''working outside of application context'')
RuntimeError: working outside of application context exc, exc_info.traceback)))
ACTUALIZACIÓN Basado en los comentarios de Marteen, cambié el código. La versión actual de trabajo se encuentra en: https://gist.github.com/anonymous/fa47834db2f4f3b8b257 . Cualquier mejora adicional o sugerencias son bienvenidas.
Me fui con el consejo de current_app.
Tu objeto de apio necesita acceso al contexto de la aplicación. Encontré información en línea sobre la creación del objeto Celery con una función de fábrica. El siguiente ejemplo se prueba sin un intermediario de mensajes.
#factory.py
from celery import Celery
from config import config
def create_celery_app(app=None):
app = app or create_app(config)
celery = Celery(__name__, broker=app.config[''CELERY_BROKER_URL''])
celery.conf.update(app.config)
TaskBase = celery.Task
class ContextTask(TaskBase):
abstract = True
def __call__(self, *args, **kwargs):
with app.app_context():
return TaskBase.__call__(self, *args, **kwargs)
celery.Task = ContextTask
return celery
y en task.py:
#tasks.py
from factory import create_celery_app
from celery.signals import task_prerun
from flask import g
celery = create_celery_app()
@task_prerun.connect
def celery_prerun(*args, **kwargs):
#print g
with celery.app.app_context():
# # use g.db
print g
@celery.task()
def do_some_stuff():
with celery.app.app_context():
# use g.db
g.user = "test"
print g.user
Algunos enlaces:
Patrón de matraz para crear una instancia de apio con función de fábrica
Aplicación utilizando tanto la fábrica de aplicaciones como el apio.