traducir significa qué inglés ingles español cómo python celery

python - significa - trabajador de cierre de apio después de tarea particular



qué significa en español (3)

Estoy usando apio (grupo solo con concurrencia = 1) y quiero poder cerrar al trabajador una vez que se haya ejecutado una tarea en particular. Una advertencia es que quiero evitar cualquier posibilidad de que el trabajador realice más tareas después de esa.

Aquí está mi intento en el esquema:

from __future__ import absolute_import, unicode_literals from celery import Celery from celery.exceptions import WorkerShutdown from celery.signals import task_postrun app = Celery() app.config_from_object(''celeryconfig'') @app.task def add(x, y): return x + y @task_postrun.connect(sender=add) def shutdown(*args, **kwargs): raise WorkerShutdown()

Sin embargo, cuando corro el trabajador

celery -A celeryapp worker --concurrency=1 --pool=solo

y ejecuta la tarea

add.delay(1,4)

Me sale lo siguiente:

-------------- celery@sam-APOLLO-2000 v4.0.2 (latentcall) ---- **** ----- --- * *** * -- Linux-4.4.0-116-generic-x86_64-with-Ubuntu-16.04-xenial 2018-03-18 14:08:37 -- * - **** --- - ** ---------- [config] - ** ---------- .> app: __main__:0x7f596896ce90 - ** ---------- .> transport: redis://localhost:6379/0 - ** ---------- .> results: redis://localhost/ - *** --- * --- .> concurrency: 4 (solo) -- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker) --- ***** ----- -------------- [queues] .> celery exchange=celery(direct) key=celery [2018-03-18 14:08:39,892: WARNING/MainProcess] Restoring 1 unacknowledged message(s)

La tarea se vuelve a poner en cola y se ejecutará de nuevo en otro trabajador, lo que lleva a un bucle.

Esto también sucede cuando muevo la excepción WorkerShutdown dentro de la tarea en sí.

@app.task def add(x, y): print(x + y) raise WorkerShutdown()

¿Hay alguna manera de que pueda cerrar al trabajador después de una tarea en particular, mientras evito este desafortunado efecto secundario?


El proceso recomendado para cerrar a un trabajador es enviar la señal TERM . Esto hará que un trabajador de apio se cierre después de completar cualquier tarea actualmente en ejecución. Si envía una señal de QUIT al proceso principal del trabajador, el trabajador se cerrará de inmediato.

Sin embargo, los documentos de apio generalmente discuten esto en términos de manejo de apio desde una línea de comando o vía systemd / initd, pero adicionalmente proporciona un API de control de trabajador remoto a través de celery.app.control .
Puede revoke una tarea para evitar que los trabajadores ejecuten la tarea. Esto debería evitar el bucle que estás experimentando. Además, el control también permite el shutdown de un trabajador de esta manera.

Así que imagino que lo siguiente te dará el comportamiento que deseas.

@app.task(bind=True) def shutdown(self): app.control.revoke(self.id) # prevent this task from being executed again app.control.shutdown() # send shutdown signal to all workers

Dado que actualmente no es posible realizar la tarea desde dentro de la tarea, luego continuar ejecutando dicha tarea, este método de usar revoke evita este problema, de modo que, incluso si la tarea se pone en cola nuevamente, el nuevo trabajador simplemente la ignorará.

Alternativamente, lo siguiente también evitaría que una tarea entregada de nuevo se ejecute por segunda vez ...

@app.task(bind=True) def some_task(self): if self.request.delivery_info[''redelivered'']: raise Ignore() # ignore if this task was redelivered print(''This should only execute on first receipt of task'')

También vale la pena mencionar que AsyncResult también tiene un método de revoke que llama self.app.control.revoke por ti.


Si apaga al trabajador, una vez finalizada la tarea, no se volverá a poner en cola nuevamente.

@task_postrun.connect(sender=add) def shutdown(*args, **kwargs): app.control.broadcast(''shutdown'')

Esto cerrará con gracia al trabajador después de que se completen las tareas.

[2018-04-01 18:44:14,627: INFO/MainProcess] Connected to redis://localhost:6379/0 [2018-04-01 18:44:14,656: INFO/MainProcess] mingle: searching for neighbors [2018-04-01 18:44:15,719: INFO/MainProcess] mingle: all alone [2018-04-01 18:44:15,742: INFO/MainProcess] celery@foo ready. [2018-04-01 18:46:28,572: INFO/MainProcess] Received task: celery_worker_stop.add[ac8a65ff-5aad-41a6-a2d6-a659d021fb9b] [2018-04-01 18:46:28,585: INFO/ForkPoolWorker-4] Task celery_worker_stop.add[ac8a65ff-5aad-41a6-a2d6-a659d021fb9b] succeeded in 0.005628278013318777s: 3 [2018-04-01 18:46:28,665: WARNING/MainProcess] Got shutdown from remote

Nota: la transmisión cerrará todos los trabajadores. Si desea cerrar un trabajador específico, comience con un nombre

celery -A celeryapp worker -n self_killing --concurrency=1 --pool=solo

Ahora puedes apagar esto con el parámetro de destino.

app.control.broadcast(''shutdown'', destination=[''celery@self_killing''])


Si necesita cerrar un trabajador específico y no sabe de antemano su nombre, puede obtenerlo de las propiedades de la tarea. Basado en las respuestas anteriores, puedes usar:

app.control.shutdown(destination=[self.request.hostname])

o

app.control.broadcast(''shutdown'', destination=[self.request.hostname])

Nota:

  • Un trabajador debe iniciarse con un nombre (opción ''-n'' );
  • La tarea debe definirse con el parámetro bind=True .