Рубрики
Без рубрики

Асинхронные задачи с использованием сельдерея с Django

С помощью сельдерея с Django пользовательский опыт может значительно улучшиться. Вот как заставить асинхронные задачи работать в вашем приложении или веб-сайте!

Автор оригинала: Udit Agarwal.

Предпосылки

Вступление

Сельдерей-это очередь задач, основанная на распределенной передаче сообщений. Он используется для обработки длительных асинхронных задач. С другой стороны, RabbitMQ-это брокер сообщений, который используется Сельдереем для отправки и получения сообщений. Сельдерей идеально подходит для задач, выполнение которых займет некоторое время, но мы не хотим, чтобы наши запросы блокировались во время обработки этих задач. В качестве примера можно привести отправку электронной почты, SMS-сообщения, удаленные вызовы API и т. Д.

Цель

  • В этом уроке мы запустим работника в Сельдерее для обработки длинных задач и отслеживания задач по мере их перехода в разные состояния.
  • Поехали!

Использование локального приложения Django

Структура каталогов локальных файлов

Мы собираемся использовать приложение Django под названием my celery. Наш каталог структурирован таким образом. Корень нашего приложения django – “мой сельдерей”.

  • мой сельдерей
    • мой сельдерей
    • мой сельдерей
      • мой сельдерей
      • мой сельдерей
      • мой сельдерей
      • мой сельдерей

Добавьте строки в settings.py файл, чтобы сообщить сельдерею, что мы будем использовать RabbitMQ в качестве брокера сообщений и принимать данные в формате json.

BROKER_URL = 'amqp://guest:guest@localhost//'
CELERY_ACCEPT_CONTENT = ['json']
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
  • CELERY_ACCEPT_CONTENT – это тип содержимого, которое разрешено получать.
  • CELERY_TASK_SERIALIZER – это строка, используемая для идентификации метода сериализации по умолчанию |. CELERY_RESULT_SERIALIZER
  • - это тип формата сериализации результатов.

После добавления брокера сообщений добавьте строки в новый файл celery.py это говорит Сельдерею, что мы будем использовать настройки в settings.py определено выше.

from __future__ import absolute_import
import os
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'mycelery.settings')
    
from django.conf import settings
from celery import Celery
    
app = Celery('mycelery',
    backend='amqp',
    broker='amqp://guest@localhost//')
    
# This reads, e.g., CELERY_ACCEPT_CONTENT = ['json'] from settings.py:
app.config_from_object('django.conf:settings')
    
# For autodiscover_tasks to work, you must define your tasks in a file called 'tasks.py'.
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)
    
@app.task(bind=True)
def debug_task(self):
    print("Request: {0!r}".format(self.request))

Создание задач в сельдерее

Создайте приложение под названием my celery app и сделайте tasks.py файл в папке этого приложения. Все задачи будут определены в этом файле.

В tasks.py, мы просто играем с номером, так что сейчас это будет долгая задача.

from celery import shared_task,current_task
from numpy import random
from scipy.fftpack import fft
    
@shared_task
def fft_random(n):
    for i in range(n):
        x = random.normal(0, 0.1, 2000)
        y = fft(x)
        if(i%30 == 0):
            process_percent = int(100 * float(i) / float(n))
            current_task.update_state(state='PROGRESS',
                meta={'process_percent': process_percent})
    return random.random()

Используя метод current_task.update_state () , мы можем передавать статус выполненной задачи брокеру сообщений каждые 30 итераций.

Вызов задач в Django

Для вызова вышеуказанной задачи требуются следующие строки кода. Вы можете поместить эти строки в свои файлы из любого места, где вы хотите их вызвать.

from .tasks import fft_random
job = fft_random.delay(int(n))

Импортируйте метод и сделайте вызов. Вот оно! Теперь ваша операция выполняется в фоновом режиме.

Получить статус задачи

Чтобы получить статус задачи выше, определите следующий метод в views.py

# Create your views here.
def task_state(request):
    data = 'Fail'
    if request.is_ajax():
        if 'task_id' in request.POST.keys() and request.POST['task_id']:
            task_id = request.POST['task_id']
            task = AsyncResult(task_id)
            data = task.result or task.state
        else:
            data = 'No task_id in the request'
    else:
        data = 'This is not an ajax request'

    json_data = json.dumps(data)
    return HttpResponse(json_data, content_type='application/json')

Task_id отправляется методу из JavaScript. Этот метод проверяет состояние задачи с идентификатором task_id и возвращает их в формате json в JavaScript. Затем мы можем вызвать этот метод из нашего JavaScript и показать соответствующую панель.

Вывод

Мы можем использовать сельдерей для выполнения различных типов задач, от отправки электронной почты до очистки веб-сайта . В случае длительных задач мы хотели бы показать статус задачи нашему пользователю, и мы можем использовать простую панель JavaScript, которая вызывает URL-адрес статуса задачи и устанавливает время, затраченное на выполнение задачи. С помощью сельдерея пользовательский опыт на веб-сайтах Django может быть значительно улучшен.