Воздушный поток является инструментом для организации сложного рабочего процесса, который был создан в Airbnb в 2014 году. В последние дни воздушный поток набрал значительную поддержку в нескольких организациях из -за возможности создавать сложные конвейеры данных с легкостью. Пользовательский интерфейс (пользовательский интерфейс) воздушного потока служит операционной панелью панели для планирования, мониторинга и управления любыми сценариями или приложениями.
Несмотря на то, что воздушный поток был принят несколькими организациями, развертывание и управление инфраструктурой всегда было сложным и вводило операционные накладные расходы. Чтобы бороться с этой проблемой, многие компании придумали идею представления управляемой инфраструктуры воздушного потока, такой как MWAA из Amazon Web Services (AWS). AWS берет на себя ответственность за сохранение вашей инфраструктуры практически без простоя и актуальных исправлений безопасности на месте вместе с последней версией, доступной для использования.
Кроме того, другими ключевыми преимуществами использования AWS MWAA является его эластичность, возможность масштабироваться вверх и вниз в зависимости от рабочей нагрузки и легкость создания и развертывания безопасной инфраструктуры производственного уровня с бесшовной интеграцией с другими услугами AWS.
Основы воздушного потока
Давайте погрузимся в некоторые ключевые понятия воздушного потока:)
В воздушном потоке рабочие процессы определяются в файле Python, который также называется DAG. Вы можете представить себе DAG как единую работу, которая может иметь несколько задач в целом. В каждом DAG есть три общие части, которые следующие:
- DAG инициализация
- Задачи
- Задачи зависимости
DAG (направленные ациклические графики)
Даги написаны на Python и часто идентифицируются по их уникальным dag_id
Анкет Во время инициализации мы указываем, когда начать, запланированное время и так далее. Вот простой DAG ниже:
from airflow.models import DAG from airflow.utils.dates import days_ago dag = DAG( dag_id="sample_dag", start_date=days_ago(2), description="Sample DAG", schedule_interval='@daily')
Задача
Задачи выполняют разные действия от выполнения сценария оболочки до запуска заданий EMR. Необходимо иметь DAG, прежде чем мы создам какую -либо задачу. Кроме того, каждая задача в DAG определяется оператором и аналогична dag_id
, task_id
должен быть уникальным в DAG.
def function_a (**kwargs): name = kwargs['name'] return f'hello {name} !!' first_task = PythonOperator( task_id="first_task", python_callable= function_a, op_kwargs= {'name': 'Fayaz'}, dag= dag) second_task = DummyOperator(task_id="second_task", dag=dag)
Задача задачи
Теперь последняя часть DAG – создать зависимости между задачами. В этом случае мы собираемся запустить First_task
сначала, а затем запустить second_task
Как только первая_task завершается. Так что это будет выглядеть так:
first_task >> second_task
Теперь мы поняли, что такое воздушный поток и как создать простой DAG, так что давайте раскручим AWS MWAA, чтобы запустить этот DAG.
Вам нужно использовать свою учетную запись AWS для выполнения следующих нескольких шагов, которые могут понести некоторые сборы.
Настройка управляемого экземпляра воздушного потока в AWS
Прежде чем создать новую среду MWAA, нам нужно будет создать S3 Bucket
который должен иметь версии включены.
Шаг 1. Перейти к Управляемая консоль воздушного потока и нажмите «Создать среду»
Шаг 2. Введите имя и выберите версию воздушного потока как 2.0.2 (последний)
Шаг 3. Выберите ведро S3 – то, что вы создали Шаг 4. Для типа папки DAGS s3://{bucket-name}/dags Шаг 5. Нажмите Следующий
Шаг 6. Нажмите на Создайте MWAA VPC
Шаг 7. Это приведет вас к странице с кучей VPC, деталями подсетей. Нажмите Создать стек который может занять несколько минут.
Шаг 8. Выберите только что созданный вами VPC и прокрутите вниз, чтобы ввести класс среды и другие конфигурации
Шаг 9. Выберите Создать новую роль и нажмите Далее
Шаг 10. Проверьте все детали и нажмите Создать среду
Как правило, для раскрытия инфраструктуры воздушного потока требуется 10-20 минут, так что настало время получить кофе ☕, прежде чем мы развертываем наш самый первый DAG 🤣 😂
🎉🎉 🎉 Когда вы обновите через несколько минут, вы увидите статус среды как Доступно
Так нажмите Открытый интерфейс Airflow
Ура!! Теперь мы получили окружающую среду и готовы к работе. Итак, давайте развернуть наш первый DAG.
Развертывание даг в AWS MWAA
Шаг 1: Чтобы развернуть DAG, нам нужно скопировать .py
файл в наш S3/DAGS
расположение. Скопируйте приведенный ниже код и поместите его в .py
Файл и сохраните его как demo_dag.py в вашем локале.
""" Importing necessary modules """ from airflow.models import DAG from airflow.utils.dates import days_ago from airflow.operators.dummy_operator import DummyOperator from airflow.operators.python_operator import PythonOperator """ Initializing DAGs """ dag = DAG( dag_id="grepy_sample_dag", start_date=days_ago(2), description="DAG which orchestrates a simple ML workflow", schedule_interval='@daily') """ Creating Tasks """ def function_a (**kwargs): name = kwargs['name'] return f'hello {name} !!' first_task = PythonOperator( task_id="first_task", python_callable= function_a, op_kwargs= {'name': 'Fayaz'}, dag= dag) second_task = DummyOperator(task_id="second_task", dag=dag) """ Dependencies """ first_task >> second_task
Шаг 2: Загрузите demo_dag.py
Задайте в свою папку S3/DAGS.
Шаг 3: Вот и все!! Теперь снова может потребоваться несколько минут, когда вы впервые разместите DAG, но это будет выглядеть так
Когда вы включите в первый раз, DAG будет автоматически запускаться, поэтому нажмите на имя DAG, которое приведет вас к представлению дерева, где вы увидите статус работы и зависимости задачи.
Одна из лучших частей – вы можете увидеть все журналы, когда вы переходите к представлению графика, который также доступен в CloudWatch, чтобы вы могли пригласить его в свой Splunk или Elk для дальнейшего анализа.
Вывод
В этом посте мы взглянули на высокий уровень оснований воздушного потока, и мы погрузились в управляемый воздушный поток AWS вместе с образцом DAG развертыванием, которое выполняет простую функцию Python. Аналогичным образом, вы можете организовать любой тип задачи, используя различные операторы, которые доступны в любой инфраструктуре воздушного потока.
Для дальнейшего чтения
Спасибо за ваше время и счастливое обучение !! 😊
Оригинал: “https://dev.to/aws-builders/aws-managed-airflow-for-your-complex-workflows-16d”