Рубрики
Без рубрики

การ ใช้ งาน งาน apache airflow บน docker เบื้อง ต้น

Apache Airflow Apache Airflow เป็น Открытый источник ที่ เข้า มา จัด การ задача งาน ต่าง ๆ โดย ต้อง เขีย … Тег с новичками, докером, Python.

Apache Airflow เป็น Открытый исходный код ที่ เข้า มา จัด การ задача งาน ต่าง ๆ โดย ต้อง เขียน เป็น python code โดย แต่ละ แต่ละ Задача สามารถ ดู Workflow การ ทำ งาน ได้ อย่าง ละเอียด ละเอียด

เรา สามารถ ตั้ง เวลา การ ทำ งาน คล้าย crontab (задания CRON) บน Linux ได้ เช่น เมื่อ รัน Работа A ตอน เวลา 01.00 น. ทุก วัน รัน работа B ตอน 08.00 น. ใน วัน อาทิตย์ โดย Работа ใน. AIRFLOW จะ เรียก ว่า DAGS (направленные ациклические графики) มี веб-интерфейс ให้ ให้ เรา мониторинг Задача ที่ ที่ เกิด ขึ้น การ ทำ งาน ใน แต่ละ задача เรา สามารถ ตั้ง ค่า Alert หาก มี มี มี มี เกิด เกิด ขึ้น ขึ้น

Web UI

Web ui ของ apache airflow ที่ ช่วย ให้ จัด การ dag ต่าง ๆ ๆ

Шаг 1: Гит клон

git clone https://gitlab.com/tanakrit1/apache-airflow-with-docker-compose.git

config ต่าง ๆ ที่ อยู่ ภาย ใน docker-compose.yml

อ่าน docker-compose เพิ่มเติม
  • airflow-планировщик – Планировщик контролирует все задачи и DAGS, затем запускает экземпляры задач после завершения их зависимостей.
  • Airflow-WebServer – Вебсервер доступен в http://localhost: 8080 Отказ
  • воздушный поток – рабочий, который выполняет задачи, заданные планировщиком.
  • Airflow-init – Служба инициализации.
  • цветок – цветочное приложение для мониторинга окружающей среды. Это доступно в http://localhost: 8080 Отказ
  • Postgres – База данных.
  • Redis – Redis – брокер, который пересылает сообщения от планировщика на работника.

โฟลเดอร์ จะ ถูก เชื่อมต่อ ระหว่าง Хост กับ Контейнер

  • ./dags – Вы можете поставить ваши файлы DAG здесь.
  • ./logs – Содержит журналы из выполнения задач и планировщика.
  • ./plugins – Вы можете поставить свои пользовательские плагины здесь.

Шаг 2: ก่อน เริ่ม Запустить воздушный поток เป็น ครั้ง แรก

เรา ต้อง เรียก ใช้ การ สร้าง ฐาน ข้อมูล ใน สร้าง ดำเนิน ให้ เรียก ใน ใช้ ใช้ ใช้ ใช้ ใช้ ใช้เรียก

docker-compose up airflow-init

หลัง จาก การ เริ่ม ต้น เสร็จ สมบูรณ์ จะ เห็น ข้อ ข้อ ความ ดัง ไป ไป นี้ นี้ นี้

ชื่อ บัญชี ผู้ ใช้ ที่ ถูก สร้าง ขึ้น คือ воздушный поток และ รหัส ผ่าน расход воздуха

Шаг 3: เริ่ม ต้น การ ใช้ งาน งาน воздушный поток

ตอน นี้ เรา สามารถ Начальная служба ทั้งหมด ด้วย คำ สั่ง

docker-compose up

เรา สามารถ ล็อก อิน เข้า ใช้ งาน งาน u u u u ของ ของ ของ ได้ ได้ ที่ http://localhost: 8080 Отказ

Рабочий процесс ที่ เรา จะ เขียน กัน คือ

  • ดึง ค่า COVID-19 ประจำ วัน จาก API ของ กรม ควบคุม โรค โรค
  • Вставить ข้อมูล ลง Postgres База данных
  • ส่ง อีเมลเตือน การ ทำ งาน (ใช้ Gmail SMTP-сервер)
CREATE SEQUENCE id;
CREATE TABLE public.daily_covid19_reports (
    id int4 NOT NULL DEFAULT nextval('id'::regclass),
    confirmed int4 NULL,
    recovered int4 NULL,
    hospitalized int4 NULL,
    deaths int4 NULL,
    new_confirmed int4 NULL,
    new_recovered int4 NULL,
    new_hospitalized int4 NULL,
    new_deaths int4 NULL,
    update_date timestamp NULL,
    "source" varchar(100) NULL,
    dev_by varchar(100) NULL,
    server_by varchar(100) NULL,
    CONSTRAINT daily_covid19_reports_pkey PRIMARY KEY (id)
);
โค้ด ทั้งหมด อยู่ ใน./Dags/covid-daily.py แล้ว

Шаг 4: สร้าง dag Объект ด้วย Python เอา ไว้ ใน./Dags

from airflow import DAG

default_args = {
    'owner': 'airflow',
    'start_date': datetime(2021, 2, 17),
    'schedule_interval': None,
}
with DAG('covid19_daily',
         schedule_interval='@daily',
         default_args=default_args,
         description='A simple data pipeline for COVID-19 report',
         catchup=False) as dag:

schood_interval คือ การ สั่ง รัน เป็น เวลา สามารถ ใช้ เครื่อง มือ กำหนด เวลา ได้ ที่ นี้

Шаг 5: ดึง ข้อมูล จาก API

import json
import requests

def get_covid19_report_today():
    url = 'https://covid19.th-stat.com/api/open/today'
    response = requests.get(url)
    data = response.json()

    with open('data.json', 'w') as f:
        json.dump(data, f)

    return data

Шаг 6: Вставка ข้อมูล ลง Postgres База данных

สร้าง Соединение ด้วย Web UI ของ Airflow ไป ที่ admin> Соединения ระบุ Conn ID: airflow_db ใช้ postgreeshook ใน การ เชื่อมต่อ กับ ฐาน ข้อมูล ข้อมูล

from airflow.hooks.postgres_hook import PostgresHook
from airflow.utils.dates import days_ago
from datetime import datetime

def insert_data():
    with open('data.json') as f:
        data = json.load(f)

    print(data) 

    mysql_hook = PostgresHook(postgres_conn_id='airflow_db')

    insert = """
        INSERT INTO public.daily_covid19_reports
            (confirmed, recovered, hospitalized, deaths, new_confirmed, new_recovered, new_hospitalized, new_deaths, update_date, "source", dev_by, server_by)
        VALUES(%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s);
    """

    mysql_hook.run(insert, parameters=(data['Confirmed'],
                                       data['Recovered'],
                                       data['Hospitalized'],
                                       data['Deaths'],
                                       data['NewConfirmed'],
                                       data['NewRecovered'],
                                       data['NewHospitalized'],
                                       data['NewDeaths'],
                                       datetime.strptime(data['UpdateDate'], '%d/%m/%Y %H:%M'),
                                       data['Source'],
                                       data['DevBy'],
                                       data['SeverBy']))

Шаг 7: สร้าง Задача ด้วย Оператор และ เชื่อมต่อ задача

ใช้ Pythonoperator เพื่อ ไป ใช้ Функция ที่ เขียน ขึ้น ขึ้น

from airflow.operators.python_operator import PythonOperator

    t1 = PythonOperator(
        task_id='get_covid19_report_today',
        python_callable=get_covid19_report_today
    )

    t2 = PythonOperator(
        task_id='insert_data',
        python_callable=insert_data
    )

    t1 >> t2

Шаг 8: สร้าง Задача สำหรับ ส่ง อีเมล ด้วย Emailoperator.

Настройка SMTP-сервера สำหรับ ส่ง เมล ด้วย Gmail แก้ไข ภาย ใน ไฟล์ airflow.cfg โปรเจค นี้ ได้ ได้ เชื่อมต่อ นี้ ระหว่าง Хост กับ Контейнер ให้ แล้ว อยู่ ใน . \ Работник แก้ไข แล้ว ให้ Перезапуск Docker-Compose

[smtp]
smtp_host = smtp.gmail.com
smtp_starttls = True
smtp_ssl = False
smtp_user = YOUR_EMAIL_ADDRESS
smtp_password = 16_DIGIT_APP_PASSWORD
smtp_port = 587
smtp_mail_from = YOUR_EMAIL_ADDRESS

เพิ่ม. Задача ภาย ใน ไฟล์ python

from airflow.operators.email_operator import EmailOperator

    t3 = EmailOperator(
        task_id='send_email',
        to=['your@gmail.com'],
        subject='Your covid19 report today is ready',
        html_content='Please check your dashboard. :)'
    )

    t1 >> t2 >> t3

ดู เพิ่มเติม ได้ ที่

Оригинал: “https://dev.to/tanakritseangnet/apache-airflow-docker-5c9o”