python - procesos - programacion asincrona en java
Realizar una tarea asincrónica en Flask (2)
Enhebrar es otra posible solución. Aunque la solución basada en Celery es mejor para aplicaciones a escala, si no espera demasiado tráfico en el punto final en cuestión, el enhebrado es una alternativa viable.
Esta solución se basa en la presentación PyCon 2016 Flask at Scale de Miguel Grinberg , específicamente la diapositiva 41 en su plataforma de diapositivas. Su código también está disponible en github para aquellos interesados en la fuente original.
Desde la perspectiva del usuario, el código funciona de la siguiente manera:
- Realiza una llamada al punto final que realiza la tarea de ejecución larga.
- Este punto final devuelve 202 Aceptado con un enlace para verificar el estado de la tarea.
- Las llamadas al enlace de estado devuelven 202 mientras la tarea aún se está ejecutando, y devuelve 200 (y el resultado) cuando se completa la tarea.
Para convertir una llamada API a una tarea en segundo plano, simplemente agregue el decorador @async_api.
Aquí hay un ejemplo completamente contenido:
from flask import Flask, g, abort, current_app, request, url_for
from werkzeug.exceptions import HTTPException, InternalServerError
from flask_restful import Resource, Api
from datetime import datetime
from functools import wraps
import threading
import time
import uuid
tasks = {}
app = Flask(__name__)
api = Api(app)
@app.before_first_request
def before_first_request():
"""Start a background thread that cleans up old tasks."""
def clean_old_tasks():
"""
This function cleans up old tasks from our in-memory data structure.
"""
global tasks
while True:
# Only keep tasks that are running or that finished less than 5
# minutes ago.
five_min_ago = datetime.timestamp(datetime.utcnow()) - 5 * 60
tasks = {task_id: task for task_id, task in tasks.items()
if ''t'' not in task or task[''t''] > five_min_ago}
time.sleep(60)
if not current_app.config[''TESTING'']:
thread = threading.Thread(target=clean_old_tasks)
thread.start()
def async_api(f):
@wraps(f)
def wrapped(*args, **kwargs):
def task(flask_app, environ):
# Create a request context similar to that of the original request
# so that the task can have access to flask.g, flask.request, etc.
with flask_app.request_context(environ):
try:
tasks[task_id][''rv''] = f(*args, **kwargs)
except HTTPException as e:
tasks[task_id][''rv''] = current_app.handle_http_exception(e)
except Exception as e:
# The function raised an exception, so we set a 500 error
tasks[task_id][''rv''] = InternalServerError()
if current_app.debug:
# We want to find out if something happened so reraise
raise
finally:
# We record the time of the response, to help in garbage
# collecting old tasks
tasks[task_id][''t''] = datetime.timestamp(datetime.utcnow())
# close the database session (if any)
# Assign an id to the asynchronous task
task_id = uuid.uuid4().hex
# Record the task, and then launch it
tasks[task_id] = {''task'': threading.Thread(
target=task, args=(current_app._get_current_object(),
request.environ))}
tasks[task_id][''task''].start()
# Return a 202 response, with a link that the client can use to
# obtain task status
print(url_for(''gettaskstatus'', task_id=task_id))
return ''accepted'', 202, {''Location'': url_for(''gettaskstatus'', task_id=task_id)}
return wrapped
class GetTaskStatus(Resource):
def get(self, task_id):
"""
Return status about an asynchronous task. If this request returns a 202
status code, it means that task hasn''t finished yet. Else, the response
from the task is returned.
"""
task = tasks.get(task_id)
if task is None:
abort(404)
if ''rv'' not in task:
return '''', 202, {''Location'': url_for(''gettaskstatus'', task_id=task_id)}
return task[''rv'']
class CatchAll(Resource):
@async_api
def get(self, path=''''):
# perform some intensive processing
print("starting processing task")
time.sleep(10)
print("completed processing task")
return f''The answer is: {path}''
api.add_resource(CatchAll, ''/<path:path>'', ''/'')
api.add_resource(GetTaskStatus, ''/status/<task_id>'')
if __name__ == ''__main__'':
app.run(debug=True)
Estoy escribiendo una aplicación en Flask, que funciona muy bien, excepto que
WSGI
es sincrónico y bloqueante.
Tengo una tarea en particular que llama a una API de terceros y esa tarea puede tardar varios minutos en completarse.
Me gustaría hacer esa llamada (en realidad es una serie de llamadas) y dejar que se ejecute.
mientras se devuelve el control a Flask.
Mi vista se ve así:
@app.route(''/render/<id>'', methods=[''POST''])
def render_script(id=None):
...
data = json.loads(request.data)
text_list = data.get(''text_list'')
final_file = audio_class.render_audio(data=text_list)
# do stuff
return Response(
mimetype=''application/json'',
status=200
)
Ahora, lo que quiero hacer es tener la línea
final_file = audio_class.render_audio()
ejecutar y proporcionar una devolución de llamada que se ejecutará cuando regrese el método, mientras que Flask puede continuar procesando solicitudes. Esta es la única tarea que necesito para que Flask se ejecute de forma asincrónica, y me gustaría obtener algunos consejos sobre la mejor manera de implementar esto.
He mirado a Twisted y Klein, pero no estoy seguro de que sean exagerados, ya que quizás Threading sea suficiente. ¿O tal vez el apio es una buena opción para esto?
Celery para manejar la tarea asincrónica por ti. Deberá instalar un agente para que sirva como cola de tareas (se recomiendan RabbitMQ y Redis).
app.py
:
from flask import Flask
from celery import Celery
broker_url = ''amqp://guest@localhost'' # Broker URL for RabbitMQ task queue
app = Flask(__name__)
celery = Celery(app.name, broker=broker_url)
celery.config_from_object(''celeryconfig'') # Your celery configurations in a celeryconfig.py
@celery.task(bind=True)
def some_long_task(self, x, y):
# Do some long task
...
@app.route(''/render/<id>'', methods=[''POST''])
def render_script(id=None):
...
data = json.loads(request.data)
text_list = data.get(''text_list'')
final_file = audio_class.render_audio(data=text_list)
some_long_task.delay(x, y) # Call your async task and pass whatever necessary variables
return Response(
mimetype=''application/json'',
status=200
)
Ejecute su aplicación Flask e inicie otro proceso para ejecutar su trabajador de apio.
$ celery worker -A app.celery --loglevel=debug
También me referiría a la redacción de Miguel Gringberg para obtener una guía más detallada sobre el uso de Celery with Flask.