Рубрики
Без рубрики

Как вы можете реализовать возобновляемые задачи в сельдерею

Предисловие сельдерея – это высокомасштабная система задач для Python. Это позволяет отправлять задачи на Q … Помечено Python, Showdev, Учебник.

Сельдерей – это высокомасштабная система задач для Python. Это позволяет отправлять задачи в очереди и выполнять их асинхронно через работник. Задачи сельдерей самостоятельно очень полезны, однако много раз я заметил необходимость приостановить/отменить задачу. Теперь сельдерей делает Разрешить выключить очередь задач для отмены задач. Но я хотел что-то масштабируемое, чтобы приостановить, продолжать и отменить задачу (не нарушая очередь) без потери данных.

Поэтому я решил использовать Рабочий процесс шаблон , помогал с Контроль автобуса для достижения этой цели. Возможно, это руководство поможет кому-то еще ищет приостановленные/возобновляемые целлюлозные задачи, а также!

Вы можете найти полную демонстрацию, чтобы играть в мой github repo Отказ

Концепция

С Рабочие процессы сельдерея – Вы можете разработать всю вашу операцию, чтобы разделить на цепь задач. Это не обязательно должно быть чисто цепочкой, но она должна следовать общему концепции одной задачи после очередной задачи (или задачи Группа ).

Как только у вас есть рабочий процесс, вы можете Наконец определить Указывает на паузу на протяжении всего вашего рабочего процесса. На каждый Из этих точек вы можете проверить, есть ли пользователь Frontend запросил операцию паузы и продолжить соответственно. Концепция это: –

Комплексная и трудоемкая операция O разделена на 5 задач сельдерей – T1, T2, T3, T4 и T5 – каждая из этих задач (кроме первого) зависят от возвращаемого значения предыдущей задачи.

Давайте предположим, что мы определим точки, чтобы сделать паузу После каждой задачи так что рабочий процесс выглядит

  • T1 выполняет
  • T1 завершает, проверьте, запросил ли пользователь Pause
    • Если пользователь не запрашивал паузу – продолжить
    • Если пользователь запросил паузу, сериализация Оставшаяся цепочка рабочего процесса и хранить его где-то, чтобы продолжить позже

… и так далее. Поскольку после каждой задачи есть пункт паузы, эта проверка выполняется после каждого из них (кроме последнего конечно).

Но это только теория, я изо всех сил пытался найти реализацию этого в любом месте онлайн, так что вот то, что я придумал

from typing import Any, Optional

from celery import shared_task
from celery.canvas import Signature, chain, signature


@shared_task(bind=True)
def pause_or_continue(
    self, retval: Optional[Any] = None, clause: dict = None, callback: dict = None
):
    # Task to use for deciding whether to pause the operation chain
    if signature(clause)(retval):
        # Pause requested, call given callback with retval and remaining chain
        # chain should be reversed as the order of execution follows from end to start
        signature(callback)(retval, self.request.chain[::-1])
        self.request.chain = None
        return "Pausing"
    else:
        # Continue to the next task in chain
        return retval


def tappable(ch: chain, clause: Signature, callback: Signature, nth: Optional[int] = 1):
    """
    Make a operation workflow chain pause-able/resume-able by inserting
    the pause_or_continue task for every nth task in given chain

    ch: chain
        The workflow chain

    clause: Signature
        Signature of a task that takes one argument - return value of
        last executed task in workflow (if any - othewise `None` is passsed)
        - and returns a boolean, indicating whether or not the operation should continue

        Should return True if operation should continue normally, or be paused

    callback: Signature
        Signature of a task that takes 2 arguments - return value of
        last executed task in workflow (if any - othewise `None` is passsed) and
        remaining chain of the operation workflow as a json dict object
        No return value is expected

        This task will be called when `clause` returns `True` (i.e task is pausing)
        The return value and the remaining chain can be handled accordingly by this task

    nth: Int
        Check `clause` after every nth task in the chain
        Default value is 1, i.e check `clause` after every task
        Hence, by default, user given `clause` is called and checked
        after every task

    NOTE: The passed in chain is mutated in place
    Returns the mutated chain
    """
    newch = []
    for n, sig in enumerate(ch.tasks):
        if n != 0 and n % nth == nth - 1:
            newch.append(pause_or_continue.s(clause=clause, callback=callback))
        newch.append(sig)
    ch.tasks = tuple(newch)
    return ch

* Реализация также можно увидеть в Tappable.py.py

Объяснение – Pause_or_continue.

Здесь Pause_or_continue вышеупомянутый Точка паузы . Это задача, которая будет называться по определенным интервалам (интервалы как в интервалах задач, не так, как через интервалы времени). Эта задача затем вызывает предоставленную пользователем функцию (на самом деле задача) – пункт – Чтобы проверить, следует ли продолжить задачу.

Если пункт Функция (на самом деле задача) возвращает Правда , пользователь предоставил Перезвони Функция называется, последнее возвращаемое значение ( Если есть – Нет в противном случае) передается на этот обратный вызов, а также Оставшаяся цепочка задач Отказ Обратный вызов делает то, что нужно сделать, и Pause_or_continue Наборы self.request.Chain. к Нет , который рассказывает сельдерей «Цепочка задач теперь пуста – все закончено».

Если пункт Функция (на самом деле задача) возвращает Ложь , возвращаемое значение из предыдущей задачи ( Если есть – Нет В противном случае) возвращается назад для следующей задачи, чтобы получить – и цепочка продолжается. Следовательно, рабочий процесс продолжается.

Почему оговорки и обратный вызов подписи задач и не регулярные функции?

Оба пункт и Обратный вызов называются напрямую – без задерживать или apply_async. . Он выполнен в текущем процессе, в текущем контексте. Так что это ведет себя точно так же, как нормальная функция, то зачем использовать подписи ?

Ответ сериализация. Вы не можете удобно пройти регулярный объект функции к задаче сельдерея. Но ты может пройти подпись задач. Это именно то, что я здесь делаю. Оба пункт и Обратный вызов должно быть обычный Подпись объект целлюлозного задания.

Что self.request.Chain?

Self.request.Chain Сохраняет список диктопов (представляющих JSONS в качестве сериализатора задачи сельдерей, по умолчанию JSON) – каждый из них представляет собой подпись задач. Каждая задача из этого списка выполняется в обратном порядке. Вот почему список перевернут, прежде чем пройти от предоставленного пользователя Обратный вызов Функция (на самом деле задача) – пользователь, вероятно, ожидает, что порядок задач остается вправо.

Быстрое примечание : Необходимо к этому обсуждению, но если вы используете ссылка Параметр из apply_async построить цепочку вместо цепь примитивный сам. self.request.callback это свойство, которое будет изменено (то есть набор Нет Чтобы удалить обратный вызов и остановить цепь) вместо self.request.Chain.

Объяснение – Tappable

Tappable это просто базовая функция, которая принимает цепь (которая является единственным примитивным процессом, примитивным здесь, для краткости) и вставляет Pause_or_continue. после каждого nth задача. Вы можете вставить их везде, где вы действительно хотите, это зависит от вас, чтобы определить очки паузы в вашей работе. Это просто пример!

Для каждого цепь Объект, фактические подписи задач (в порядке, идущих слева направо) хранятся в .таки имущество. Это кортеж подписей задач. Итак, все, что нам нужно сделать, это принять этот кортеж, преобразовать в список, вставьте очки паузы и обратно обратно в кортеж, чтобы назначить цепочку. Затем верните модифицированный объект цепи.

пункт и Обратный вызов также прикреплен к Pause_or_continue подпись. Нормальный материал сельдерея.

Это охватывает основную концепцию, но чтобы продемонстрировать реальный проект, используя этот шаблон (а также демонстрировать возобновление части приостановленной задачи), вот небольшая демонстрация всех необходимых ресурсов

Этот пример использования предполагает концепцию базового веб-сервера с базой данных. Всякий раз, когда начинается операция (цепочка рабочего процесса I.E), это назначен идентификатор и хранится в базе данных. Схема этой таблицы выглядит

-- Create operations table
-- Keeps track of operations and the users that started them
CREATE TABLE operations (
  id INTEGER PRIMARY KEY AUTOINCREMENT,
  requester_id INTEGER NOT NULL,
  completion TEXT NOT NULL,
  workflow_store TEXT,
  result TEXT,
  FOREIGN KEY (requester_id) REFERENCES user (id)
);

Единственное поле, которое нужно знать о прямо сейчас, это Завершение Отказ Это просто хранит статус операции

  • Когда операция запускается и создается запись БД, это установлено на В ХОДЕ ВЫПОЛНЕНИЯ
  • Когда пользовательская запрашивает паузу, контроллер маршрута (I.E View) изменяет это на Запрашивая паузу
  • Когда операция на самом деле делает паузу и Обратный вызов (из …| Tappable , внутри Pause_or_continue ) называется Обратный вызов должен изменить это на Пауз Когда задача завершена, это должно быть изменено на
  • ЗАВЕРШЕННЫЙ

Пример пункта

@celery.task()
def should_pause(_, operation_id: int):
    # This is the `clause` to be used for `tappable`
    # i.e it lets celery know whether to pause or continue
    db = get_db()

    # Check the database to see if user has requested pause on the operation
    operation = db.execute(
        "SELECT * FROM operations WHERE id = ?", (operation_id,)
    ).fetchone()
    return operation["completion"] == "REQUESTING PAUSE"

Это задача, чтобы позвонить в очках паузы, определить, следует ли делать или нет. Это функция, которая занимает 2 параметра ….. Хорошо вроде. Первый является обязательным, Tappable требует пункт Чтобы иметь один (и ровно один) аргумент – значит, он может передавать возвращаемое значение предыдущей задачи (даже если это возвращаемое значение – Нет ). В этом примере значение возврата не требуется для использования – поэтому мы можем просто игнорировать его.

Второй параметр – это операция I d. Смотри, все это пункт Подходит – проверяет базу данных для работы операции (рабочего процесса) и посмотреть, имеет ли она статус Запрашивая паузу Отказ Для этого необходимо знать идентификатор операции. Но пункт Должно быть задача с одним аргументом, что дает?

Ну, хорошая вещь подписи может быть частичным. Когда задача впервые начата и Tappable цепь создана. Операция ID Известен И, следовательно, мы можем сделать Dover_pause.s (application_id) Чтобы получить подпись задачи, которая берет один Параметр, который является возвращаемой стоимостью предыдущей задачи. Что квалифицируется как пункт Действительно

Пример обратного вызова

import os
import json
from typing import Any, List

@celery.task()
def save_state(retval: Any, chains: dict, operation_id: int):
    # This is the `callback` to be used for `tappable`
    # i.e this is called when an operation is pausing
    db = get_db()

    # Prepare directories to store the workflow
    operation_dir = os.path.join(app.config["OPERATIONS"], f"{operation_id}")
    workflow_file = os.path.join(operation_dir, "workflow.json")
    if not os.path.isdir(operation_dir):
        os.makedirs(operation_dir, exist_ok=True)

    # Store the remaining workflow chain, serialized into json
    with open(workflow_file, "w") as f:
        json.dump(chains, f)

    # Store the result from the last task and the workflow json path
    db.execute(
        """
        UPDATE operations
        SET completion = ?,
            workflow_store = ?,
            result = ?
        WHERE id = ?
        """,
        ("PAUSED", workflow_file, f"{retval}", operation_id),
    )
    db.commit()

И вот задача, которую нужно вызвать, когда задача приоритет Отказ Помните, что это должно принять последнюю величину возврата выполненной задачи и оставшийся список подписей (в порядке, слева направо). Есть дополнительный параметр – apport_id – снова. Объяснение этого такое же, как и один для пункт .

Эта функция хранит оставшуюся цепочку в файле json (поскольку это список диктопов). Помните, что вы можете использовать другой сериализатор – я использую JSON, поскольку это Serializer задач по умолчанию, используемый сельдереем.

После хранения оставшейся цепи он обновляет Завершение Статус к Пауза А также регистрирует путь к файлу JSON в DB.

Теперь давайте посмотрим в действии-

Пример запуска рабочего процесса

def start_operation(user_id, *operation_args, **operation_kwargs):
    db = get_db()
    operation_id: int = db.execute(
        "INSERT INTO operations (requester_id, completion) VALUES (?, ?)",
        (user_id, "IN PROGRESS"),
    ).lastrowid
    # Convert a regular workflow chain to a tappable one
    tappable_workflow = tappable(
        (T1.s() | T2.s() | T3.s() | T4.s() | T5.s(operation_id)),
        should_pause.s(operation_id),
        save_state.s(operation_id),
    )
    # Start the chain (i.e send task to celery to run asynchronously)
    tappable_workflow(*operation_args, **operation_kwargs)
    db.commit()
    return operation_id

Функция, которая принимает идентификатор пользователя и запускает рабочий процесс работы. Это более или менее непрактично, моделирующаяся в виде контроллера по представлению/маршруту. Но я думаю, что это получает общую идею.

Предположить T [1-4] Все являются единичными задачами операции, каждый из которых принимает доход предыдущей задачи в качестве аргумента. Просто пример обычной цепочки сельдерея, не стесняйтесь идти с ума со своими цепями.

T5 Это задача, которая сохраняет окончательный результат (результат T4 ) в базу данных. Так наряду с возвращаемой стоимостью от T4 Это нужна apport_id Отказ Который передается в подпись.

Пример приостановки рабочего процесса

def pause(operation_id):
    db = get_db()

    operation = db.execute(
        "SELECT * FROM operations WHERE id = ?", (operation_id,)
    ).fetchone()

    if operation and operation["completion"] == "IN PROGRESS":
        # Pause only if the operation is in progress
        db.execute(
            """
            UPDATE operations
            SET completion = ?
            WHERE id = ?
            """,
            ("REQUESTING PAUSE", operation_id),
        )
        db.commit()
        return 'success'

    return 'invalid id'

Это использует ранее упомянутую концепцию изменения входа БД для изменения Завершение к Запрашивая паузу Отказ Как только это будет совершено, в следующий раз Pause_or_continue звонки должен_pause , это будет знать, что пользователь запросил операцию на паузу И это будет делать это соответственно.

Пример возобновления рабочего процесса

def resume(operation_id):
    db = get_db()

    operation = db.execute(
        "SELECT * FROM operations WHERE id = ?", (operation_id,)
    ).fetchone()

    if operation and operation["completion"] == "PAUSED":
        # Resume only if the operation is paused
        with open(operation["workflow_store"]) as f:
            # Load the remaining workflow from the json
            workflow_json = json.load(f)
        # Load the chain from the json (i.e deserialize)
        workflow_chain = chain(signature(x) for x in serialized_ch)
        # Start the chain and feed in the last executed task result
        workflow_chain(operation["result"])

        db.execute(
            """
            UPDATE operations
            SET completion = ?
            WHERE id = ?
            """,
            ("IN PROGRESS", operation_id),
        )
        db.commit()
        return 'success'

    return 'invalid id'

Напомним, что, когда операция приостановлена – оставшийся рабочий процесс хранится в JSON. Так как мы в настоящее время ограничиваем рабочий процесс к цепь объект. Мы знаем, что это JSON – это список подписей, которые должны быть превращены в цепь . Итак, мы десерилизируем его соответственно и отправляем его на радость сельдерея.

Обратите внимание, что этот оставшийся рабочий процесс все еще есть Pause_or_continue Задачи, поскольку они были изначально – так что этот рабочий процесс сам, вновь приостановит/возобновляется. Когда он делает паузу, Workflow.json будет просто обновляться.

И это все! Вы можете проверить более сложный пример в самом репо. Задачи в Tasks.py Эти задачи управляются флейскими маршрутами в operations.py

Оригинал: “https://dev.to/totally_chase/how-you-can-implement-resumable-tasks-in-celery-3nk5”