Airflow-это проект с открытым исходным кодом, запущенный в Airbnb. Это инструмент для динамической организации потока желаний вашего приложения, который легко масштабируется до бесконечности благодаря модульной архитектуре и механизму очереди сообщений.
Его также можно понимать как предварительное приложение cron, которое выполняет задачи, когда выполняются их зависимости. И даже может повторить задачу, если она не удалась, в течение определенного количества времени, настроенного для нее.
Вот как выглядит трубопровод с воздушным потоком:
В приведенном выше примере каждый блок представляет задачу, и некоторые из задач связаны с другими задачами, отражающими их зависимости и отношения.
Допустим, вам нужно разработать приложение, которое поможет вашему клиенту найти некоторые общие продукты, доступные в Интернете на какой-то выбранной платформе электронной коммерции, и создать отчет, а затем отправить их. Для этой цели вы можете разработать рабочий процесс, в котором одна задача предназначена для сбора данных с платформы электронной коммерции, другая-для классификации данных в зависимости от их типа и так далее.
Эти задачи создаются в файле python с именем DAG(Directed Acyclic Graph) file. A DAG может иметь произвольное количество задач. И один DAG представляет собой единый логический рабочий процесс.
Пример DAG:
from airflow import DAG from airflow.operators.bash_operator import BashOperator from datetime import datetime, timedelta args = { 'owner': 'owner', 'start_date': datetime.today() } dag = DAG( dag_id='common_products', default_args=args, schedule_interval=timedelta(1) ) init = BashOperator( task_id='init', bash_command='python /opt/jobs/init.py', dag=dag ) data_config = BashOperator( task_id='data_config', bash_command='python /opt/jobs/data_config.py', dag=dag ) platform_a = BashOperator( task_id='platform_a', bash_command='python /opt/jobs/collect_data.py platform_a', dag=dag ) platform_b = BashOperator( task_id='platform_b', bash_command='python /opt/jobs/collect_data.py platform_b', dag=dag ) platform_c = BashOperator( task_id='platform_c', bash_command='python /opt/jobs/collect_data.py platform_c', dag=dag ) platform_d = BashOperator( task_id='platform_d', bash_command='python /opt/jobs/collect_data.py platform_d', dag=dag ) categorise_data = BashOperator( task_id='categorise_data', bash_command='python /opt/jobs/categorise_data.py', dag=dag ) find_most_common = BashOperator( task_id='find_most_common', bash_command='python /opt/jobs/find_most_common.py', dag=dag ) compare_price = BashOperator( task_id='compare_price', bash_command='python /opt/jobs/compare_price.py', dag=dag ) generate_report = BashOperator( task_id='generate_report', bash_command='/usr/bin/python /opt/jobs/generate_report.py', dag=dag ) # setup the logical flow beetween each tasks data_config.set_upstream(init) platform_a.set_upstream(data_config) platform_b.set_upstream(data_config) platform_c.set_upstream(data_config) platform_d.set_upstream(data_config) categorise_data.set_upstream(platform_a) categorise_data.set_upstream(platform_b) categorise_data.set_upstream(platform_c) categorise_data.set_upstream(platform_d) find_most_common.set_upstream(categorise_data) compare_price.set_upstream(find_most_common) generate_report.set_upstream(compare_price)
Файл DAG можно сохранить в каталоге dag по умолчанию airflow ~/airflow/dag
. В строке конфигурации dag schedule_interval=timedelta(1)
сообщит планировщику воздушного потока, чтобы он выполнял этот поток один раз в день.
Вот как будет выглядеть эта СУМКА.
Воздушный поток имеет очень элегантный интерфейс для мониторинга рабочего процесса и просмотра журнала для отдельных задач, очень приятно.
Это очень простой поток о том, как можно использовать воздушный поток. Потенциально он может быть использован для проектирования любого вида рабочего процесса, независимо от его сложности.