Рубрики
Без рубрики

Сколько Python вам нужно знать, чтобы написать воздушный поток DAG? Часть 1.

Не уверен в вас, но мне интересно об этом, когда мне нужно в борту новых коллег, коллег с … Теги от воздушного потока, Python.

Сколько питона написать DAG? (3 часть серии)

Не уверен в тебе, но мне интересно, когда мне нужно в борту новых коллег, коллег, которые пишут в основном SQL только до накануне.

Этот пост для людей, которым необходимо начать писать писать трубопроводы (или DAGS) для Apache Airflow , знайте уже, как обрабатывать их данные, но мало или не знают о питоне.

Этот пост не скажет вам, как установить воздушный поток на машине, это Краткое руководство Воля, но проверь со своими коллегами, как вы делаете в своей организации. Мое предположение состоит в том, что вы работаете в среде, где воздушный поток уже используется в производстве, и вам не нужно беспокоиться об этом. Если это не так, вам понадобится нечто большее, чем этот пост.

Я надеюсь, что после прочтения этого вы сможете понять код, написанный вашим коллегами, и чувствую себя достаточно уверенно, чтобы начать писать свои собственные даги.

Я постараюсь сохранить это как можно более практичным, как будто вы должны работать со мной. Я предполагаю, что вы ничего не знаете о Python, не стесняйтесь пропустить разделы, с которыми вы знакомы. Также не стесняйтесь задавать вопросы, я всегда готов к чату.

Скучные вещи

Я напишу еще один пост о строительных блоках воздушного потока, пока позвольте мне просто поделиться этим:

  • DAG – это процесс ETL (пока 1 файл dag)
  • DAG изготовлен из нескольких задач
  • Задача является экземпляром оператора
  • Оператор делает вещи (перемещает данные, отправляет электронные письма, пишет сообщение на dev.to)

Если вам нужно знать, на каком DAG означает, просто щелкните значок очага, и я скажу вам.

from datetime import datetime

from airflow.models import DAG
from airflow.operators.dummy_operator import DummyOperator

my_dag = DAG(dag_id="sample_dag",
             start_date=datetime(2020, 4, 29),
             schedule_interval="0 0 * * *"
             )

start = DummyOperator(dag=my_dag,
                      task_id="start"
                      )

end = DummyOperator(dag=my_dag,
                    task_id="end"
                    )

start >> end

В конце этого поста вы сможете распознать, что этот DAG работает каждый день в полночь и делает более или менее ничего.

Но давайте попробуем прочитать его линию по линии.

Импорт

Python может сделать много вещей, но для экономии ресурсов не все доступно все время. Что нужно загружать в компьютерную память, остается разработчикам.

Когда вы хотите использовать что-то, что не является частью основного Python, вы можете Импорт нужный модуль или библиотека Python.

import antigravity

Теперь, если я хочу использовать метод антигравитационный Мне нужно называть это, используя имя модуля: antigrivity.fly ()

Если имя модуля слишком длинное, вы можете использовать псевдоним:

import antigravity as ag

ag.fly()

Широко используемые модули имеют стандартизированные псевдонимы (но это еще одна история).

Некоторые модули довольно большие, и можно импортировать только часть их.

from datetime import datetime

from airflow.models import DAG
from airflow.operators.dummy_operator import DummyOperator

Это позволяет избежать имени модуля (см. Ниже).

Не все модули установлены на вашем компьютере при установке Python, вы можете установить больше модуля, используя инструмент, похожий на Пип Отказ

Определение Дага

my_dag = DAG(dag_id="sample_dag",
             start_date=datetime(2020, 4, 29),
             schedule_interval="0 0 * * *"
             )

Это создаст наш DAG, который Python будет хранить в переменной my_dag что мы будем использовать позже.

Этот даг будет поднят планировщиком воздушного потока и выполнен в зависимости от его Schood_Interval и start_date Отказ dag_id. Идентификатор используется внутри воздушного потока, у вас не может быть другого DAG с тем же именем.

Даг создан с использованием аргументов, которые мы передаем к его конструктору ( dag () ), если это первый раз, когда вы передаете аргументы в метод Python, позвольте мне выделить несколько вещей:

  • Проходим три аргумента с форматом param_name = ценность
  • Мы проходим три аргумента этого DAG, но класс DAG принимает больше, пропавшее отсутствующее заполнено значениями по умолчанию. Вы можете полный список параметров здесь
  • dag_id на самом деле единственный обязательный параметр. Как вы, вероятно, угадывались обязательные параметры, не имеют по умолчанию и должны быть переданы каждый раз, когда вы вызываете функцию
  • формат param_name = значение не нужен, но позволяет нам пропускать значения в порядке, который мы предпочитаем. Использование имен позволяет нам пройти только необходимые параметры, и мы не вынуждены менять наш код, если функция, которую мы используем, добавляет дополнительные параметры.

По этой причине вы могли бы удалить dag_id = , инвертировать названные аргументы, и код все равно будет работать:

my_dag = DAG("sample_dag",
             schedule_interval="0 0 * * *"
             start_date=datetime(2020, 4, 29)
             )

Пока это …

my_dag = DAG("sample_dag",
             "0 0 * * *"
             datetime(2020, 4, 29)
             )

… заставляет меня открывать ссылку выше, чтобы увидеть, что на самом деле являются вторыми и третьим параметрами конструктора DAG.

Дополнительные примечания:

  • datetime (2020, 4, 29) Возвращает 29 апреля 2019 года в формате, понятно для метода DAG
  • Schood_Interval Использует формат, называемый выражением Crontab. Этот даг будет работать каждый день в полночь. Вы можете узнать больше о выражении Crontab, используя что-то вроде Crontab.guru (или нажав единорог Так что я напишу сообщение о выражении Crontab для вас)
  • Технически говоря, мы создаем экземпляр класса DAG (Но я не буду писать пост об этом, и вы должны чувствовать себя плохо, думая, что щелчок сделает меня вашим объектом ориентированным писателем)

На данный момент мы готовы добавить задания на наш DAG.

Задачи

start = DummyOperator(dag=my_dag,
                      task_id="start"
                      )

end = DummyOperator(dag=my_dag,
                    task_id="end"
                    )

start >> end

Определение задачи выглядит похоже на определение DAG. Здесь мы используем очень важный оператор воздушного потока: декуператор, который делает абсолютно ничего, но позволяет нам сосредоточиться на других вещах:

  • Задача нуждается в уникальном task_id.
  • task_id является обязательным, это моя связь, чтобы держать его по имени
  • Помните переменную DAG? Задача должна знать своего родительского DAG, поэтому мы используем эту переменную здесь: даг = My_dag

Наконец у нас есть линия

start >> end

Если вы уже догадались, что это означает Начать выполнение сопровождается конец Исполнение вы догадались правильно. Мы также можем написать это как конец << Начать , но, конечно, он менее непосредственно.

>> и << называются оператором Bitshift (вы можете использовать эти новые знания как ледокол на следующем встрече).

Если вы читаете некоторые старые потоки по воздушным потокам, вы могли бы найти этот древний синтаксис (но мы современные люди Лучше выкл. Прилипать к операторам битов):

start.set_downstream(end)

Если вы прокрутите вверх, теперь вы сможете понять, что делает этот даг.

Вот несколько вопросов, чтобы проверить, есть ли вы все правильно:

  1. Из какого модуля мы импортируем Дукетперантор ?
  2. В определении DAG я могу заменить start_date = datetime (2020, 4, 29), с start_date = datetime.dateTime (2020, 4, 29), ?
  3. Потому что Дульеператор ничего не делает, будет задача Начать и конец бежать вместе?
  4. Мой родительский день в этом году находится 7 мая 2020 года, этот даг бежит в тот день?
  5. Будут ли два определения задач?
start = DummyOperator(my_dag,
                      "start"
                      )
start = DummyOperator(dag_id=my_dag,
                      "start"
                      )

Ответы.

На этом этапе вы должны быть знакомы с концепциями Python, как импорт и как вызывать методы с более или менее аргументами. Вы также должны иметь общее представление о взаимосвязи между потоками воздушных потоков и задачами, а также как создавать зависимости между задачами.

В следующем эпизоде мы будем сосредоточиться на некоторых фундаментальных структурах данных Python данных, которые могут сделать код нашего DAGS проще и проще для обслуживания, некоторые менее фигурные операторы и другие приятные вещи. До тех пор остаться … запланировано.

Сколько питона написать DAG? (3 часть серии)

Оригинал: “https://dev.to/mucio/how-much-python-do-you-need-to-know-to-write-an-airflow-dag-part-1-20jp”