Рубрики
Без рубрики

Рабочий процесс с воздушным потоком

Airflow-это проект с открытым исходным кодом, запущенный в Airbnb. Это инструмент для динамической организации рабочего процесса вашего приложения. Который легко масштабируется до бесконечности благодаря своей модульной конструкции.

Автор оригинала: feroz khan.

Airflow-это проект с открытым исходным кодом, запущенный в Airbnb. Это инструмент для динамической организации потока желаний вашего приложения, который легко масштабируется до бесконечности благодаря модульной архитектуре и механизму очереди сообщений.

Его также можно понимать как предварительное приложение cron, которое выполняет задачи, когда выполняются их зависимости. И даже может повторить задачу, если она не удалась, в течение определенного количества времени, настроенного для нее.

Вот как выглядит трубопровод с воздушным потоком:

Вот как выглядит трубопровод с воздушным потоком:

В приведенном выше примере каждый блок представляет задачу, и некоторые из задач связаны с другими задачами, отражающими их зависимости и отношения.

Допустим, вам нужно разработать приложение, которое поможет вашему клиенту найти некоторые общие продукты, доступные в Интернете на какой-то выбранной платформе электронной коммерции, и создать отчет, а затем отправить их. Для этой цели вы можете разработать рабочий процесс, в котором одна задача предназначена для сбора данных с платформы электронной коммерции, другая-для классификации данных в зависимости от их типа и так далее.

Эти задачи создаются в файле python с именем DAG(Directed Acyclic Graph) file. A DAG может иметь произвольное количество задач. И один DAG представляет собой единый логический рабочий процесс.

Пример DAG:

from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta

args = {
    'owner': 'owner',
    'start_date': datetime.today()
}

dag = DAG(
    dag_id='common_products',
    default_args=args,
    schedule_interval=timedelta(1)
)

init = BashOperator(
    task_id='init',
    bash_command='python /opt/jobs/init.py',
    dag=dag
)

data_config = BashOperator(
    task_id='data_config',
    bash_command='python /opt/jobs/data_config.py',
    dag=dag
)

platform_a = BashOperator(
    task_id='platform_a',
    bash_command='python /opt/jobs/collect_data.py platform_a',
    dag=dag
)

platform_b = BashOperator(
    task_id='platform_b',
    bash_command='python /opt/jobs/collect_data.py platform_b',
    dag=dag
)

platform_c = BashOperator(
    task_id='platform_c',
    bash_command='python /opt/jobs/collect_data.py platform_c',
    dag=dag
)

platform_d = BashOperator(
    task_id='platform_d',
    bash_command='python /opt/jobs/collect_data.py platform_d',
    dag=dag
)

categorise_data = BashOperator(
    task_id='categorise_data',
    bash_command='python /opt/jobs/categorise_data.py',
    dag=dag
)

find_most_common = BashOperator(
    task_id='find_most_common',
    bash_command='python /opt/jobs/find_most_common.py',
    dag=dag
)

compare_price = BashOperator(
    task_id='compare_price',
    bash_command='python /opt/jobs/compare_price.py',
    dag=dag
)

generate_report = BashOperator(
    task_id='generate_report',
    bash_command='/usr/bin/python /opt/jobs/generate_report.py',
    dag=dag
)

# setup the logical flow beetween each tasks
data_config.set_upstream(init)
platform_a.set_upstream(data_config)
platform_b.set_upstream(data_config)
platform_c.set_upstream(data_config)
platform_d.set_upstream(data_config)
categorise_data.set_upstream(platform_a)
categorise_data.set_upstream(platform_b)
categorise_data.set_upstream(platform_c)
categorise_data.set_upstream(platform_d)
find_most_common.set_upstream(categorise_data)
compare_price.set_upstream(find_most_common)
generate_report.set_upstream(compare_price)

Файл DAG можно сохранить в каталоге dag по умолчанию airflow ~/airflow/dag . В строке конфигурации dag schedule_interval=timedelta(1) сообщит планировщику воздушного потока, чтобы он выполнял этот поток один раз в день.

Вот как будет выглядеть эта СУМКА.

Снимок экрана 2017-08-24 в 10.49.42 вечера.png

Воздушный поток имеет очень элегантный интерфейс для мониторинга рабочего процесса и просмотра журнала для отдельных задач, очень приятно.

Это очень простой поток о том, как можно использовать воздушный поток. Потенциально он может быть использован для проектирования любого вида рабочего процесса, независимо от его сложности.