Apache Airflow เป็น Открытый исходный код ที่ เข้า มา จัด การ задача งาน ต่าง ๆ โดย ต้อง เขียน เป็น python code โดย แต่ละ แต่ละ Задача สามารถ ดู Workflow การ ทำ งาน ได้ อย่าง ละเอียด ละเอียด
เรา สามารถ ตั้ง เวลา การ ทำ งาน คล้าย crontab (задания CRON) บน Linux ได้ เช่น เมื่อ รัน Работа A ตอน เวลา 01.00 น. ทุก วัน รัน работа B ตอน 08.00 น. ใน วัน อาทิตย์ โดย Работа ใน. AIRFLOW จะ เรียก ว่า DAGS (направленные ациклические графики) มี веб-интерфейс ให้ ให้ เรา мониторинг Задача ที่ ที่ เกิด ขึ้น การ ทำ งาน ใน แต่ละ задача เรา สามารถ ตั้ง ค่า Alert หาก มี มี มี มี เกิด เกิด ขึ้น ขึ้น
Web UI
Web ui ของ apache airflow ที่ ช่วย ให้ จัด การ dag ต่าง ๆ ๆ
Шаг 1: Гит клон
git clone https://gitlab.com/tanakrit1/apache-airflow-with-docker-compose.git
config ต่าง ๆ ที่ อยู่ ภาย ใน docker-compose.yml
อ่าน docker-compose เพิ่มเติม
airflow-планировщик
– Планировщик контролирует все задачи и DAGS, затем запускает экземпляры задач после завершения их зависимостей.Airflow-WebServer
– Вебсервер доступен в http://localhost: 8080 Отказвоздушный поток
– рабочий, который выполняет задачи, заданные планировщиком.Airflow-init
– Служба инициализации.цветок
– цветочное приложение для мониторинга окружающей среды. Это доступно в http://localhost: 8080 ОтказPostgres
– База данных.Redis
– Redis – брокер, который пересылает сообщения от планировщика на работника.
โฟลเดอร์ จะ ถูก เชื่อมต่อ ระหว่าง Хост กับ Контейнер
./dags
– Вы можете поставить ваши файлы DAG здесь../logs
– Содержит журналы из выполнения задач и планировщика../plugins
– Вы можете поставить свои пользовательские плагины здесь.
Шаг 2: ก่อน เริ่ม Запустить воздушный поток เป็น ครั้ง แรก
เรา ต้อง เรียก ใช้ การ สร้าง ฐาน ข้อมูล ใน สร้าง ดำเนิน ให้ เรียก ใน ใช้ ใช้ ใช้ ใช้ ใช้ ใช้เรียก
docker-compose up airflow-init
หลัง จาก การ เริ่ม ต้น เสร็จ สมบูรณ์ จะ เห็น ข้อ ข้อ ความ ดัง ไป ไป นี้ นี้ นี้
ชื่อ บัญชี ผู้ ใช้ ที่ ถูก สร้าง ขึ้น คือ воздушный поток
และ รหัส ผ่าน расход воздуха
Шаг 3: เริ่ม ต้น การ ใช้ งาน งาน воздушный поток
ตอน นี้ เรา สามารถ Начальная служба ทั้งหมด ด้วย คำ สั่ง
docker-compose up
เรา สามารถ ล็อก อิน เข้า ใช้ งาน งาน u u u u ของ ของ ของ ได้ ได้ ที่ http://localhost: 8080 Отказ
Рабочий процесс ที่ เรา จะ เขียน กัน คือ
- ดึง ค่า COVID-19 ประจำ วัน จาก API ของ กรม ควบคุม โรค โรค
- Вставить ข้อมูล ลง Postgres База данных
- ส่ง อีเมลเตือน การ ทำ งาน (ใช้ Gmail SMTP-сервер)
CREATE SEQUENCE id; CREATE TABLE public.daily_covid19_reports ( id int4 NOT NULL DEFAULT nextval('id'::regclass), confirmed int4 NULL, recovered int4 NULL, hospitalized int4 NULL, deaths int4 NULL, new_confirmed int4 NULL, new_recovered int4 NULL, new_hospitalized int4 NULL, new_deaths int4 NULL, update_date timestamp NULL, "source" varchar(100) NULL, dev_by varchar(100) NULL, server_by varchar(100) NULL, CONSTRAINT daily_covid19_reports_pkey PRIMARY KEY (id) );
โค้ด ทั้งหมด อยู่ ใน./Dags/covid-daily.py แล้ว
Шаг 4: สร้าง dag Объект ด้วย Python เอา ไว้ ใน./Dags
from airflow import DAG default_args = { 'owner': 'airflow', 'start_date': datetime(2021, 2, 17), 'schedule_interval': None, } with DAG('covid19_daily', schedule_interval='@daily', default_args=default_args, description='A simple data pipeline for COVID-19 report', catchup=False) as dag:
schood_interval คือ การ สั่ง รัน เป็น เวลา สามารถ ใช้ เครื่อง มือ กำหนด เวลา ได้ ที่ นี้
Шаг 5: ดึง ข้อมูล จาก API
import json import requests def get_covid19_report_today(): url = 'https://covid19.th-stat.com/api/open/today' response = requests.get(url) data = response.json() with open('data.json', 'w') as f: json.dump(data, f) return data
Шаг 6: Вставка ข้อมูล ลง Postgres База данных
สร้าง Соединение ด้วย Web UI ของ Airflow ไป ที่ admin> Соединения ระบุ Conn ID: airflow_db
ใช้ postgreeshook ใน การ เชื่อมต่อ กับ ฐาน ข้อมูล ข้อมูล
from airflow.hooks.postgres_hook import PostgresHook from airflow.utils.dates import days_ago from datetime import datetime def insert_data(): with open('data.json') as f: data = json.load(f) print(data) mysql_hook = PostgresHook(postgres_conn_id='airflow_db') insert = """ INSERT INTO public.daily_covid19_reports (confirmed, recovered, hospitalized, deaths, new_confirmed, new_recovered, new_hospitalized, new_deaths, update_date, "source", dev_by, server_by) VALUES(%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s); """ mysql_hook.run(insert, parameters=(data['Confirmed'], data['Recovered'], data['Hospitalized'], data['Deaths'], data['NewConfirmed'], data['NewRecovered'], data['NewHospitalized'], data['NewDeaths'], datetime.strptime(data['UpdateDate'], '%d/%m/%Y %H:%M'), data['Source'], data['DevBy'], data['SeverBy']))
Шаг 7: สร้าง Задача ด้วย Оператор และ เชื่อมต่อ задача
ใช้ Pythonoperator เพื่อ ไป ใช้ Функция ที่ เขียน ขึ้น ขึ้น
from airflow.operators.python_operator import PythonOperator t1 = PythonOperator( task_id='get_covid19_report_today', python_callable=get_covid19_report_today ) t2 = PythonOperator( task_id='insert_data', python_callable=insert_data ) t1 >> t2
Шаг 8: สร้าง Задача สำหรับ ส่ง อีเมล ด้วย Emailoperator.
Настройка SMTP-сервера สำหรับ ส่ง เมล ด้วย Gmail แก้ไข ภาย ใน ไฟล์ airflow.cfg
โปรเจค นี้ ได้ ได้ เชื่อมต่อ นี้ ระหว่าง Хост กับ Контейнер ให้ แล้ว อยู่ ใน . \ Работник
แก้ไข แล้ว ให้ Перезапуск Docker-Compose
[smtp] smtp_host = smtp.gmail.com smtp_starttls = True smtp_ssl = False smtp_user = YOUR_EMAIL_ADDRESS smtp_password = 16_DIGIT_APP_PASSWORD smtp_port = 587 smtp_mail_from = YOUR_EMAIL_ADDRESS
เพิ่ม. Задача ภาย ใน ไฟล์ python
from airflow.operators.email_operator import EmailOperator t3 = EmailOperator( task_id='send_email', to=['your@gmail.com'], subject='Your covid19 report today is ready', html_content='Please check your dashboard. :)' ) t1 >> t2 >> t3
ดู เพิ่มเติม ได้ ที่
- https://airflow.apache.org/docs/apache-airflow/stable/tutorial.html
- https://towardsdatascience.com/getting-started-with-apache-airflow-df1aa77d7b1b
- https://towardsdatascience.com/getting-started-with-airflow-using-docker-cd8b44dbff98
- https://itorn.net/l-ngaich-apache-airflow/
- https://www.bluebirz.net/th/619/try-apache-airflow/
- https://link.medium.com/Ytno47gW7db
Оригинал: “https://dev.to/tanakritseangnet/apache-airflow-docker-5c9o”