Рубрики
Без рубрики

Работа с сигналами сельдерея

Если вы новичок в сельдерею, начните здесь. Иногда при использовании сельдерея вы можете получить уведомление, когда … с меткой Python, сельдерей, сигналы.

Если вы новичок в сельдерею, Начните здесь Отказ

Иногда при использовании сельдерея вы можете получить уведомление, когда задача работает в фоновом режиме успешно или когда она не удается. Вы также можете захотеть выполнить функцию каждый раз, прежде чем задача сельдерей запускается или после его завершения. Эти и многие другие по той же линии – все ситуации, где сигналы будет пригодиться.

Я создам простой файл Tasks.py и создать сельдерей, чтобы продемонстрировать, как использовать конденситы сельдерей.

from celery import Celery

app = Celery('tasks', broker='redis://localhost:6379/0') 

@app.task
def add(x, y):
    return x + y

Убедитесь, что ваш сервер Redis работает и начнет работник сельдерея:

(env) $ celery -A tasks worker --loglevel=INFO

Затем запустите файл Tasks.py и выполните Добавить задача:

(env) $ python -i tasks.py

>>> add.delay(4, 4)

>>> 

По умолчанию, что возвращено, это Asyncresult пример Но это не то, что мы заинтересованы. На терминале с рабочим средством вашего сельдерея вы должны увидеть что-то похожее на это:

[2020-11-03 07:01:02,024: INFO/ForkPoolWorker-2] Task tasks.add[ce1ee079-6434-4f54-ace2-360ff316546b] succeeded in 0.0005510429999979749s: 8

Задача успешно выполняется, и 8 является результатом, как и ожидалось.

Сигналы

Есть много из сигналов, которые предлагает сельдерей, но я сосредоточусь на 4 простых, чтобы продемонстрировать, как работает сигналы в целом.

  1. task_preerun.
  2. task_postrun.
  3. task_success.
  4. task_failure.

task_preerun.

Этот сигнал отправляется до Задача выполнена.

from celery import Celery
from celery.signals import task_prerun

app = Celery('tasks', broker='redis://localhost:6379/0')

@app.task
def add(x, y):
    return x + y

@task_prerun.connect(sender=add)
def task_prerun_notifier(sender=None, **kwargs):
    print("From task_prerun_notifier ==> Running just before add() executes")

Отправитель – это объект задачи, выполненный (Add Add функция в этом случае).

Бег add.deLay (4, 4) Как до сих пор дает следующий вывод на терминале сельдерея:

[2020-11-03 07:23:19,183: WARNING/ForkPoolWorker-2] From task_prerun_notifier ==> Running just before add() executes
[2020-11-03 07:23:19,184: INFO/ForkPoolWorker-2] Task tasks.add[1ef11c46-f461-4eb8-84ca-5c5cdab62a74] succeeded in 0.0016491969999998801s: 8

Незадолго до выполнения задачи, распечатки сигналов и отпечатки, как ожидалось.

task_postrun.

Отправляется после Задача была выполнена.

from celery.signals import task_prerun, task_postrun

app = Celery('tasks', broker='redis://localhost:6379/0')

@app.task
def add(x, y):
    return x + y

@task_prerun.connect(sender=add)
def task_prerun_notifier(sender=None, **kwargs):
    print("From task_prerun_notifier ==> Running just before add() executes")

@task_postrun.connect(sender=add)
def task_postrun_notifier(sender=None, **kwargs):
    print("From task_postrun_notifier ==> Ok, done!")

Запуск это должно дать следующий результат:

[2020-11-03 17:03:51,655: WARNING/ForkPoolWorker-2] From task_prerun_notifier ==> Running just before add() executes
[2020-11-03 17:03:51,656: INFO/ForkPoolWorker-2] Task tasks.add[7da6ee71-1941-4a87-b993-8136d94ac067] succeeded in 0.0017917519999999243s: 8
[2020-11-03 17:03:51,657: WARNING/ForkPoolWorker-2] From task_postrun_notifier ==> Ok, done!

task_success.

Отправляется, когда задача преуспевает.

from celery import Celery
from celery.signals import task_prerun, task_postrun, task_success

app = Celery('tasks', broker='redis://localhost:6379/0')

@app.task
def add(x, y):
    return x + y

@task_prerun.connect(sender=add)
def task_prerun_notifier(sender=None, **kwargs):
    print("From task_prerun_notifier ==> Running just before add() executes")

@task_postrun.connect(sender=add)
def task_postrun_notifier(sender=None, **kwargs):
    print("From task_postrun_notifier ==> Ok, done!")

@task_success.connect(sender=add)
def task_success_notifier(sender=None, **kwargs):
    print("From task_success_notifier ==> Task run successfully!")

Результат:

[2020-11-03 17:40:47,276: INFO/MainProcess] Received task: tasks.add[6603eb49-75ab-4653-b32f-ebe760a52de0]  
[2020-11-03 17:40:47,279: WARNING/ForkPoolWorker-2] From task_prerun_notifier ==> Running just before add() executes
[2020-11-03 17:40:47,281: WARNING/ForkPoolWorker-2] From task_success_notifier ==> Task run successfully!
[2020-11-03 17:40:47,281: INFO/ForkPoolWorker-2] Task tasks.add[6603eb49-75ab-4653-b32f-ebe760a52de0] succeeded in 0.00201471799999986s: 8
[2020-11-03 17:40:47,282: WARNING/ForkPoolWorker-2] From task_postrun_notifier ==> Ok, done!

task_failure.

Отправляется, когда задача терпит неудачу.

from celery import Celery
from celery.signals import task_prerun, task_postrun, task_failure

app = Celery('tasks', broker='redis://localhost:6379/0')

@app.task
def add(x, y):
    raise Exception

@task_prerun.connect(sender=add)
def task_prerun_notifier(sender=None, **kwargs):
    print("From task_prerun_notifier ==> Running just before add() executes")

@task_postrun.connect(sender=add)
def task_postrun_notifier(sender=None, **kwargs):
    print("From task_postrun_notifier ==> Ok, done!")

@task_failure.connect(sender=add)
def task_failure_notifier(sender=None, **kwargs):
    print("From task_failure_notifier ==> Task failed successfully! 😅")

Результат:

[2020-11-03 17:44:36,082: INFO/MainProcess] Received task: tasks.add[da4a03e8-5530-4c9e-afeb-75f8e0b1be5d]  
[2020-11-03 17:44:36,085: WARNING/ForkPoolWorker-2] From task_prerun_notifier ==> Running just before add() executes
[2020-11-03 17:44:36,096: WARNING/ForkPoolWorker-2] From task_failure_notifier ==> Task failed successfully! 😅
[2020-11-03 17:44:36,096: ERROR/ForkPoolWorker-2] Task tasks.add[da4a03e8-5530-4c9e-afeb-75f8e0b1be5d] raised unexpected: Exception()
Traceback (most recent call last):
  ...
   in add
    raise Exception
Exception
[2020-11-03 17:44:36,097: WARNING/ForkPoolWorker-2] From task_postrun_notifier ==> Ok, done!

Оригинал: “https://dev.to/wangonya/working-with-celery-signals-38lh”