Рубрики
Без рубрики

Асинхронные задачи с использованием Flask, Redis и Сельдерея

В этой статье мы будем использовать Redis и Сельдерей с колбой для создания асинхронной системы обмена сообщениями, чтобы создать простое приложение напоминания.

Автор оригинала: Robley Gori.

Вступление

По мере развития веб-приложений и увеличения их использования варианты использования также диверсифицируются. Сейчас мы создаем и используем веб-сайты для более сложных задач, чем когда-либо прежде. Некоторые из этих задач могут быть обработаны и обратная связь передана пользователям мгновенно, в то время как другие требуют дальнейшей обработки и ретрансляции результатов позже. Более широкое распространение доступа в Интернет и устройств с поддержкой Интернета привело к увеличению трафика конечных пользователей.

В попытке справиться с возросшим трафиком или усложнением функциональности иногда мы можем отложить работу и передать результаты позже. Таким образом, мы не заставляем пользователя ждать неизвестное время в нашем веб-приложении, а вместо этого отправляем результаты позже. Мы можем достичь этого, используя фоновые задачи для обработки работы при низком трафике или обработки работы пакетами.

Одним из решений, которые мы можем использовать для достижения этой цели, является Сельдерей . Это помогает нам разбивать сложные части работы и выполнять их на разных машинах, чтобы облегчить нагрузку на одну машину или сократить время, необходимое для завершения.

В этой статье мы рассмотрим использование сельдерея для планирования фоновых задач в приложении Flask для разгрузки ресурсоемких задач и определения приоритетов реагирования на запросы конечных пользователей.

Что такое очередь задач?

Очередь задач-это механизм распределения небольших единиц работы или задач, которые могут быть выполнены без вмешательства в цикл запроса-ответа большинства веб-приложений.

Очереди задач полезны при делегировании работы, которая в противном случае замедлила бы работу приложений в ожидании ответов. Они также могут использоваться для выполнения ресурсоемких задач, в то время как основная машина или процесс взаимодействуют с пользователем.

Таким образом, взаимодействие с пользователем является последовательным, своевременным и не зависит от рабочей нагрузки.

Что такое сельдерей?

Сельдерей-это асинхронная очередь задач, основанная на распределенной передаче сообщений для распределения рабочей нагрузки между машинами или потоками. Система сельдерея состоит из клиента, брокера и нескольких работников.

Эти работники отвечают за выполнение задач или частей работы, которые помещаются в очередь и ретранслируют результаты. С помощью Celery вы можете иметь как локальных, так и удаленных работников, что означает, что работа может быть делегирована различным и более способным машинам через Интернет, а результаты переданы обратно клиенту.

Таким образом, нагрузка на основную машину уменьшается, и появляется больше ресурсов для обработки запросов пользователей по мере их поступления.

Клиент в установке Сельдерея отвечает за выдачу заданий рабочим, а также за связь с ними с помощью брокера сообщений. Брокер облегчает связь между клиентом и рабочими в установке Сельдерея через очередь сообщений, где сообщение добавляется в очередь, и брокер доставляет его клиенту.

Примеры таких брокеров сообщений включают Redis и RabbitMQ .

Зачем использовать сельдерей?

Существуют различные причины, по которым мы должны использовать сельдерей для наших фоновых задач. Во-первых, она достаточно масштабируема, позволяя добавлять больше рабочих по требованию для удовлетворения возросшей нагрузки или трафика. Сельдерей также все еще находится в активной разработке, что означает, что он является поддерживаемым проектом наряду с его краткой документацией и активным сообществом пользователей.

Еще одним преимуществом является то, что Сельдерей легко интегрируется в несколько веб-фреймворков, причем большинство из них имеют библиотеки для облегчения интеграции.

Он также предоставляет функциональность для взаимодействия с другими веб-приложениями через webhooks, где нет библиотеки для поддержки этого взаимодействия.

Сельдерей также может использовать различные брокеры сообщений, которые предлагают нам гибкость. Рекомендуется использовать RabbitMQ, но он также может поддерживать Redis и Beanstalk .

Демонстрационное Приложение

Мы создадим приложение Flask, которое позволит пользователям устанавливать напоминания, которые будут доставляться на их электронную почту в определенное время.

Мы также предоставим функциональность для настройки количества времени до вызова сообщения или напоминания и отправки сообщения пользователю.

Установка

Как и любой другой проект, наша работа будет проходить в виртуальной среде, которую мы создадим и будем управлять с помощью инструмента Pipenv :

$ pipenv install --three
$ pipenv shell

Для этого проекта нам нужно будет установить пакеты Flask и Celery, чтобы начать работу:

$ pipenv install flask celery

Вот как будет выглядеть файловая структура нашего приложения Flask:

.
├── Pipfile                    # manage our environment
├── Pipfile.lock
├── README.md
├── __init__.py
├── app.py                     # main Flask application implementation
├── config.py                  # to host the configuration
├── requirements.txt           # store our requirements
└── templates
    └── index.html             # the landing page

1 directory, 8 files

Для нашего проекта на основе сельдерея мы будем использовать Redis в качестве брокера сообщений, и мы можем найти инструкции по его настройке на их домашней странице .

Реализация

Давайте начнем с создания приложения Flask, которое будет отображать форму, позволяющую пользователям вводить детали сообщения, которое будет отправлено в будущем.

Мы добавим следующее к вашему app.py файл:

from flask import Flask, flash, render_template, request, redirect, url_for

app = Flask(__name__)
app.config.from_object("config")
app.secret_key = app.config['SECRET_KEY']

@app.route('/', methods=['GET', 'POST'])
def index():
    if request.method == 'GET':
        return render_template('index.html')

    elif request.method == 'POST':
        email = request.form['email']
        first_name = request.form['first_name']
        last_name = request.form['last_name']
        message = request.form['message']
        duration = request.form['duration']
        duration_unit = request.form['duration_unit']

        flash("Message scheduled")
        return redirect(url_for('index'))


if __name__ == '__main__':
    app.run(debug=True)

Это действительно простое приложение с одним маршрутом для обработки GET и POST запроса формы. Как только детали будут представлены, мы можем передать данные функции, которая будет планировать работу.

Чтобы деклуттерировать наш основной файл приложения, мы поместим переменные конфигурации в отдельный файл config.py файл и загрузить конфигурацию из файла:

app.config.from_object("config")

Ваш config.py файл будет находиться в той же папке, что и app.py файл и содержит некоторые основные конфигурации:

SECRET_KEY = 'very_very_secure_and_secret'
# more config

А пока давайте реализуем целевую страницу как index.html :

{% for message in get_flashed_messages() %}
  

{{ message }}

{% endfor %}
First Name: Last Name: Email: Message: Duration:

Стиль и форматирование были усечены для краткости, не стесняйтесь форматировать/стилизовать свой HTML так, как вам хочется.

Теперь мы можем начать ваше приложение:

целевая страница

Отправка Электронных Писем С Помощью Flask-Mail

Для отправки электронных писем из нашего приложения Flask мы будем использовать библиотеку Flask-Mail , которую добавим в наш проект следующим образом:

$ pipenv install flask-mail

С нашим приложением Flask и формой на месте мы теперь можем интегрировать Flask-Mail в ваш app.py :

from flask_mail import Mail, Message

app = Flask(__name__)
app.config.from_object("config")
app.secret_key = app.config['SECRET_KEY']

# set up Flask-Mail Integration
mail = Mail(app)

def send_mail(data):
    """ Function to send emails.
    """
    with app.app_context():
        msg = Message("Ping!",
                    sender="admin.ping",
                    recipients=[data['email']])
        msg.body = data['message']
        mail.send(msg)

Функция send_main(data) получит отправляемое сообщение и получателя письма, а затем будет вызвана по истечении указанного времени для отправки письма пользователю.

Нам также нужно будет добавить следующие переменные в ваш config.py для того, чтобы Flack-Mail работал:

# Flask-Mail
MAIL_SERVER = 'smtp.googlemail.com'
MAIL_PORT = 587
MAIL_USE_TLS = True
MAIL_USERNAME = 'mail-username'
MAIL_PASSWORD = 'mail-password'

Интеграция Сельдерея

Теперь, когда наше приложение Flask готово и оснащено функцией отправки электронной почты, мы можем интегрировать Сельдерей, чтобы запланировать отправку писем на более поздний срок.

Наш app.py будет изменено снова:

# Existing imports are maintained
from celery import Celery

# Flask app and flask-mail configuration truncated

# Set up celery client
client = Celery(app.name, broker=app.config['CELERY_BROKER_URL'])
client.conf.update(app.config)

# Add this decorator to our send_mail function
@client.task
def send_mail(data):
    # Function remains the same

@app.route('/', methods=['GET', 'POST'])
def index():
    if request.method == 'GET':
        return render_template('index.html')

    elif request.method == 'POST':
        data = {}
        data['email'] = request.form['email']
        data['first_name'] = request.form['first_name']
        data['last_name'] = request.form['last_name']
        data['message'] = request.form['message']
        duration = int(request.form['duration'])
        duration_unit = request.form['duration_unit']

        if duration_unit == 'minutes':
            duration *= 60
        elif duration_unit == 'hours':
            duration *= 3600
        elif duration_unit == 'days':
            duration *= 86400

        send_mail.apply_async(args=[data], countdown=duration)
        flash(f"Email will be sent to {data['email']} in {request.form['duration']} {duration_unit}")

        return redirect(url_for('index'))

Мы импортируем celery и используем его для инициализации клиента Celery в нашем приложении Flask, прикрепляя URL-адрес брокера обмена сообщениями. В нашем случае мы будем использовать Redis в качестве брокера, поэтому мы добавим следующее к вашему config.py :

CELERY_BROKER_URL = 'redis://localhost:6379/0'
CELERY_RESULT_BACKEND = 'redis://localhost:6379/0'

Чтобы ваша функция send_mail() выполнялась как фоновая задача, мы добавим @client.task декоратор, чтобы наш клиент сельдерея знал об этом.

После настройки клиента Сельдерея изменяется основная функция, которая также обрабатывает ввод формы.

Сначала мы упаковываем входные данные для функции send_mail() в словарь. Затем мы вызываем нашу почтовую функцию через API вызова задачи Сельдерея, используя функцию apply_async , которая принимает аргументы, требуемые нашей функцией.

Устанавливается необязательный параметр countdown , определяющий задержку между запуском кода и выполнением задачи.

Эта длительность выражается в секундах, поэтому мы преобразуем длительность, пройденную пользователем, в секунды в зависимости от выбранной им единицы времени.

После того, как пользователь отправит форму, мы подтвердим прием и уведомим его через баннер, когда сообщение будет отправлено.

Сведение Всего Воедино

Чтобы запустить наш проект, нам понадобятся два терминала, один для запуска нашего приложения Flask, а другой для запуска работника сельдерея, который будет отправлять сообщения в фоновом режиме.

Запустите приложение Flask в первом терминале:

$ python app.py

Во втором терминале запустите виртуальную среду, а затем запустите Celery worker:

# start the virtualenv
$ pipenv shell
$ celery worker -A app.client --loglevel=info

Если все пойдет хорошо, мы получим следующую обратную связь в терминале, работающем с клиентом Celery:

сельдерей стартует

Теперь перейдем к http://localhost:5000 и заполните детали, планируя, что письмо придет через 2 минуты после отправки.

Над формой появится сообщение с указанием адреса, по которому будет отправлено письмо, и срока, по истечении которого оно будет отправлено. В нашем терминале сельдерея мы также сможем увидеть запись журнала, которая означает, что наша электронная почта была запланирована:

[2019-10-23 16:27:25,399: INFO/MainProcess] Received task: app.send_mail[d65025c8-a291-40d0-aea2-e816cb40cd78]  ETA:[2019-10-23 13:29:25.170622+00:00]

Раздел ETA записи показывает, когда будет вызвана функция send_email() и, следовательно, когда будет отправлено электронное письмо.

Пока все идет хорошо. Наши электронные письма планируются и отправляются в указанное время, однако одной вещи не хватает. У нас нет видимости задач до или после их выполнения, и мы не можем сказать, действительно ли письмо было отправлено или нет.

По этой причине давайте внедрим решение мониторинга для наших фоновых задач, чтобы мы могли просматривать задачи, а также быть в курсе, если что-то пойдет не так и задачи не будут выполнены так, как планировалось.

Мониторинг нашего Кластера Сельдерея С Помощью Цветка

Flower -это веб-инструмент, который обеспечивает видимость нашей настройки сельдерея и предоставляет функциональные возможности для просмотра хода выполнения задач, истории, подробностей и статистики, включая показатели успеха или неудач. Мы также можем отслеживать всех работников в нашем кластере и задачи, которые они в настоящее время выполняют.

Установка Flower так же проста, как:

$ pipenv install flower

Ранее мы уточнили детали нашего клиента сельдерея в нашем app.py файл. Нам нужно будет передать этого клиента Флауэру, чтобы следить за ним.

Чтобы достичь этого, нам нужно открыть третье окно терминала, перейти в нашу виртуальную среду и запустить наш инструмент мониторинга:

$ pipenv shell
$ flower -A app.client --port=5555

При запуске Flower мы указываем клиент Celery, передавая его через аргумент application ( -A ), а также указывая порт, который будет использоваться через аргумент --port .

С нашим мониторингом на месте, давайте запланируем еще одно электронное письмо, которое будет отправлено на приборную панель, а затем перейдем к http://localhost:5555 , где нас приветствуют следующие:

цветочная посадочная страница

На этой странице мы можем увидеть список работников нашего кластера сельдерея, который в настоящее время состоит только из нашей машины.

Чтобы просмотреть электронную почту, которую мы только что запланировали, нажмите на кнопку Tasks в левом верхнем углу панели мониторинга, и это приведет нас на страницу, где мы можем увидеть запланированные задачи:

запланированные задачи

В этом разделе мы видим, что у нас было запланировано два письма, и одно из них было успешно отправлено в назначенное время. Электронные письма должны были быть отправлены через 1 минуту и 5 минут соответственно для целей тестирования.

Мы также можем видеть время получения текста и когда он был выполнен из этого раздела.

В разделе монитор есть графики, отображающие успешность и частоту отказов фоновых задач.

Мы можем планировать сообщения так долго, как пожелаем, но это также означает, что наш работник должен быть в Сети и функционировать в то время, когда задача должна быть выполнена.

Вывод

Мы успешно создали кластер сельдерея и интегрировали его в наше приложение Flask, которое позволяет пользователям планировать отправку электронных писем через определенное время в будущем.

Функция отправки электронной почты была делегирована фоновой задаче и помещена в очередь, где она будет выбрана и выполнена работником в нашем локальном кластере сельдерея.

Исходный код этого проекта, как всегда, доступен на Github .