Рубрики
Без рубрики

Как построить конвейер извлечения данных с помощью Apache Air flow

Конвейеры извлечения данных могут быть сложны в построении и управлении, поэтому рекомендуется использовать инструмент, который может помочь вам в решении этих задач. Apache Air flow (https://airflow.apache.org/) является популярным…

Автор оригинала: Déborah Mesquita.

Конвейеры извлечения данных могут быть сложны в построении и управлении, поэтому рекомендуется использовать инструмент, который может помочь вам в решении этих задач. Apache Air flow -это популярная платформа управления рабочими процессами с открытым исходным кодом, и в этой статье вы узнаете, как использовать ее для автоматизации вашего первого рабочего процесса .

Я предполагаю, что вы уже знаете, как создавать и запускать скрипты Bash и Python. Этот учебник был построен с использованием Ubuntu 20.04 с установленными ImageMagick , tesseract и Python3.

Одна важная концепция заключается в том, что вы будете использовать воздушный поток только для автоматизации и управления задачами. Это означает, что вам все равно придется проектировать и разбивать свой рабочий процесс на скрипты Bash и/или Python.

Чтобы увидеть, как это работает, мы сначала создадим рабочий процесс и запустим его вручную, а затем посмотрим, как автоматизировать его с помощью Airflow.

Задача, которую вы автоматизируете

В этом уроке вы извлечете данные из файла .pdf и сохраните их в файл .csv. Это основные задачи для достижения этой цели:

Скриншот с 2020-12-08 15-53-50.png
  1. Извлеките текст из файла .pdf и сохраните его в файле .txt
  2. Извлеките нужные метаданные из текстового файла и сохраните их в файле .csv

Для запуска первой задачи вы будете использовать инструмент ImageMagick для преобразования страницы .pdf в a .png-файл, а затем используйте tesseract для преобразования изображения в файл .txt. Эти задачи будут определены в скрипте Bash. Для извлечения метаданных вы будете использовать Python и регулярные выражения.

1. Скрипт для извлечения текста из файла .pdf

Вы создадите сценарий вот так:

  1. Получите имя файла .pdf в качестве параметра
  2. Преобразуйте страницу в a .png файл
  3. Преобразуйте изображение в файл .txt

Это сценарий, чтобы сделать все это:

#!/bin/bash
PDF_FILENAME="$1"
convert -density 600 "$PDF_FILENAME" "$PDF_FILENAME.png"
tesseract "$PDF_FILENAME.png" "$PDF_FILENAME"

Сохраните это в файле с именем pdf_to_text.sh , затем запустите chmod +x pdf_to_text.sh и, наконец, запустить ./pdf_to_text.sh pdf_filename для создания файла .txt. Я использую этот pdf в качестве примера.

2. Скрипт для извлечения метаданных и сохранения их в файл .csv

Теперь, когда у вас есть txt – файл, пришло время создать правило регулярных выражений для извлечения данных. Цель состоит в том, чтобы извлечь то, что происходит в каждый час встречи. Вы можете извлечь данные с помощью шаблона, который фиксирует часы и то, что происходит сразу после этого и до разрыва строки. С регулярным выражением вы можете сделать это с помощью такого шаблона: (\d:\d\d)-(\d:\d\d) (.*\n) . Затем вы сохраните эти данные в файл .csv. Вот код:

import re
import csv
if __name__ == "__main__":
    pattern = "(\d:\d\d)-(\d:\d\d) (.*\n)"
with open('metadata.csv', 'w', newline='') as file:
   writer = csv.writer(file)
   writer.writerow(["start_hour","end_hour","activity"])
 
   txt = open("extracted_text.txt", "r", encoding="ISO-8859–1").read()
   extracted_data = re.findall(pattern,txt)
   for data in extracted_data:
      moment = [data[0].strip(),data[1].strip(),data[2].strip()]
      writer.writerow(moment)

Сохраните это в файле с именем extract_metadata.py и запустите его с помощью python3 extrac_metadata.py . Вы можете видеть, что файл metadata.csv был создан. Красиво, правда?

Скриншот с 2020-12-08 15-59-52.png

В этой статье мы не имеем дела с лучшими практиками создания рабочих процессов, но если вы хотите узнать об этом больше, я настоятельно рекомендую этот доклад самого создателя Airflow: Functional Data Engineering – A Set of Best Practices .

Хорошо, теперь, когда у вас есть свой рабочий процесс, пришло время автоматизировать его с помощью воздушного потока.

Чтобы использовать Air flow, мы запускаем веб-сервер, а затем получаем доступ к пользовательскому интерфейсу через браузер. Вы можете запланировать выполнение заданий автоматически, поэтому помимо сервера вам также потребуется запустить планировщик. В производственных условиях мы обычно запускаем его на выделенном сервере, но здесь мы запустим его локально, чтобы понять, как он работает. Вы создадите виртуальную среду и выполните следующие команды для установки всего:

$ python3 -m venv .env
$ source .env/bin/activate
$ pip3 install apache-airflow
$ pip3 install cattrs==1.0.0. #I had to run this to work
$ airflow version # check if everything is ok
$ airflow initdb #start the database Airflow uses
$ airflow scheduler #start the scheduler

Затем откройте другое окно терминала и запустите сервер:

$ source .env/bin/activate
$ airflow webserver -p 8080

А теперь давай открывай https://localhost:8080 чтобы получить доступ к пользовательскому интерфейсу воздушного потока.

Создание вашего первого DAG

С помощью Airflow вы задаете свой рабочий процесс в DAG (направленный ациклический граф). DAG-это скрипт Python, который содержит коллекцию всех задач, организованных таким образом, чтобы отразить их отношения и зависимости, как указано здесь .

Для задания задач используются Операторы . Существуют операторы Bash (для выполнения команд bash), операторы Python (для вызова функций Python), операторы MySQL (для выполнения команд SQL) и так далее. В этом уроке вы будете использовать только оператор Bash для запуска скриптов.

Когда вы запускаете рабочий процесс , он создает DAG Run , объект, представляющий экземпляр DAG во времени. Поскольку вы впервые используете Air flow, в этом уроке мы запустим программу вручную и пока не будем использовать планировщик.

Давайте создадим first_dag.py скрипт, чтобы понять, как все это сочетается.

from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.utils.dates import days_ago

default_args = {
    'owner': 'Déborah Mesquita',
    'start_date': days_ago(1)
    }

# Defining the DAG using Context Manager
with DAG(
        'extract-meeting-activities',
        default_args=default_args,
        schedule_interval=None,
        ) as dag:
        t1 = BashOperator(
                task_id = 'extract_text_from_pdf',
                bash_command = 'YOUR-LOCAL-PATH/airflow-tutorial/pdf_to_text.sh {{ dag_run.conf["working_path"] if dag_run else "" }} {{ dag_run.conf["pdf_filename"] if dag_run else "" }}',
        )

        t2 = BashOperator(
                task_id = 'extract_metadata_from_text',
                bash_command = 'python3 YOUR-LOCAL-PATH/airflow-tutorial/extract_metadata.py {{ dag_run.conf["working_path"] if dag_run else "" }}',
        )

        t1 >> t2 # Defining the task dependencies

Вы указываете два параметра в default_args dict, владельца и дату начала. Для создания экземпляра DAG мы используем контекстный менеджер. Первый параметр-это имя DAG ( extract-meeting-activities ), затем мы передаем default_args и устанавливаем schedule_interval в None, так как запускаем рабочий процесс вручную.

После этого пришло время создавать задачи. В этом рабочем процессе вы будете использовать оператор Bash, так как рабочий процесс состоит из запуска скриптов Bash и Python.

Помните, что первый скрипт получает имя файла pdf в качестве параметра? В этот ДЕНЬ вы передадите этот параметр при запуске рабочего процесса вручную. Параметры хранятся в файле dag_run.conf dict, и вы можете использовать ключи для доступа к ним.

Чтобы рабочий процесс работал должным образом, вам нужно будет внести некоторые изменения в сценарии. Когда Airflow запускает скрипты, он делает это во временном каталоге, что означает, что это не будет тот же каталог, в котором мы запускали скрипты вручную. Таким образом, вы передадите ПУТЬ, содержащий PDF-файл, который будет тем же путем, по которому вы хотите сохранить csv-файл в качестве параметра. Вы можете увидеть окончательные скрипты здесь, в этом репозитории .

Затем пришло время определить зависимости задач. Для этого вы используете оператор bitshift, >>, что означает, что t1 работает первым, а t2-вторым.

Запуск вашего первого рабочего процесса

По умолчанию Airflow просматривает каталог ~/airflow/dag для поиска DAG. Поскольку мы не изменили конфигурацию воздушного потока, это должно быть для вас. Так что давай копируй файл first_dag.py в этот каталог.

$ cp first_dag.py ~/airflow/dags/

Теперь, если вы идете в https://localhost:8080 вы можете увидеть ДЕНЬ.

Скриншот с 2020-12-08 16-14-50.png

Чтобы вызвать DAQ, нажмите на его имя ( extract-meeting-activities ), а затем нажмите на кнопку “Trigger DAQ”:

Скриншот с 2020-12-08 16-20-56.png

Теперь передайте параметр в текстовое поле и нажмите кнопку триггера. Вот эти параметры:

{"working_path": "YOUR-LOCAL-PATH/airflow-tutorial/", 
"pdf_filename": "sample_meeting_agenda.pdf"}

Тогда, если вы зайдете в “TreeView”, вы сможете увидеть все пробеги за этот ДЕНЬ. Зеленый означает, что все прошло хорошо, красный означает, что задача провалилась, а желтый означает, что задача еще не запущена и готова к повторной попытке. Ознакомьтесь с жизненным циклом задачи здесь . Чтобы проверить журналы для каждой задачи, вы можете нажать на квадрат, а затем нажать на кнопку “Просмотреть журнал”.

Скриншот с 2020-12-08 16-26-01.png

И вот он у вас, ваш первый рабочий процесс, работающий с воздушным потоком! Некоторые хорошие следующие шаги могут заключаться в том , чтобы узнать больше о DAG Runs , проверить интерфейс командной строки или использовать оператор для выполнения SQL-команд .

Спасибо за чтение!