Автор оригинала: Déborah Mesquita.
Конвейеры извлечения данных могут быть сложны в построении и управлении, поэтому рекомендуется использовать инструмент, который может помочь вам в решении этих задач. Apache Air flow -это популярная платформа управления рабочими процессами с открытым исходным кодом, и в этой статье вы узнаете, как использовать ее для автоматизации вашего первого рабочего процесса .
Я предполагаю, что вы уже знаете, как создавать и запускать скрипты Bash и Python. Этот учебник был построен с использованием Ubuntu 20.04 с установленными ImageMagick , tesseract и Python3.
Одна важная концепция заключается в том, что вы будете использовать воздушный поток только для автоматизации и управления задачами. Это означает, что вам все равно придется проектировать и разбивать свой рабочий процесс на скрипты Bash и/или Python.
Чтобы увидеть, как это работает, мы сначала создадим рабочий процесс и запустим его вручную, а затем посмотрим, как автоматизировать его с помощью Airflow.
Задача, которую вы автоматизируете
В этом уроке вы извлечете данные из файла .pdf и сохраните их в файл .csv. Это основные задачи для достижения этой цели:
- Извлеките текст из файла .pdf и сохраните его в файле .txt
- Извлеките нужные метаданные из текстового файла и сохраните их в файле .csv
Для запуска первой задачи вы будете использовать инструмент ImageMagick для преобразования страницы .pdf в a .png-файл, а затем используйте tesseract для преобразования изображения в файл .txt. Эти задачи будут определены в скрипте Bash. Для извлечения метаданных вы будете использовать Python и регулярные выражения.
1. Скрипт для извлечения текста из файла .pdf
Вы создадите сценарий вот так:
- Получите имя файла .pdf в качестве параметра
- Преобразуйте страницу в a .png файл
- Преобразуйте изображение в файл .txt
Это сценарий, чтобы сделать все это:
#!/bin/bash PDF_FILENAME="$1" convert -density 600 "$PDF_FILENAME" "$PDF_FILENAME.png" tesseract "$PDF_FILENAME.png" "$PDF_FILENAME"
Сохраните это в файле с именем pdf_to_text.sh
, затем запустите chmod +x pdf_to_text.sh
и, наконец, запустить ./pdf_to_text.sh pdf_filename
для создания файла .txt. Я использую этот pdf в качестве примера.
2. Скрипт для извлечения метаданных и сохранения их в файл .csv
Теперь, когда у вас есть txt – файл, пришло время создать правило регулярных выражений для извлечения данных. Цель состоит в том, чтобы извлечь то, что происходит в каждый час встречи. Вы можете извлечь данные с помощью шаблона, который фиксирует часы и то, что происходит сразу после этого и до разрыва строки. С регулярным выражением вы можете сделать это с помощью такого шаблона: (\d:\d\d)-(\d:\d\d) (.*\n)
. Затем вы сохраните эти данные в файл .csv. Вот код:
import re import csv if __name__ == "__main__": pattern = "(\d:\d\d)-(\d:\d\d) (.*\n)" with open('metadata.csv', 'w', newline='') as file: writer = csv.writer(file) writer.writerow(["start_hour","end_hour","activity"]) txt = open("extracted_text.txt", "r", encoding="ISO-8859–1").read() extracted_data = re.findall(pattern,txt) for data in extracted_data: moment = [data[0].strip(),data[1].strip(),data[2].strip()] writer.writerow(moment)
Сохраните это в файле с именем extract_metadata.py
и запустите его с помощью python3 extrac_metadata.py
. Вы можете видеть, что файл metadata.csv
был создан. Красиво, правда?
В этой статье мы не имеем дела с лучшими практиками создания рабочих процессов, но если вы хотите узнать об этом больше, я настоятельно рекомендую этот доклад самого создателя Airflow: Functional Data Engineering – A Set of Best Practices .
Хорошо, теперь, когда у вас есть свой рабочий процесс, пришло время автоматизировать его с помощью воздушного потока.
Чтобы использовать Air flow, мы запускаем веб-сервер, а затем получаем доступ к пользовательскому интерфейсу через браузер. Вы можете запланировать выполнение заданий автоматически, поэтому помимо сервера вам также потребуется запустить планировщик. В производственных условиях мы обычно запускаем его на выделенном сервере, но здесь мы запустим его локально, чтобы понять, как он работает. Вы создадите виртуальную среду и выполните следующие команды для установки всего:
$ python3 -m venv .env $ source .env/bin/activate $ pip3 install apache-airflow $ pip3 install cattrs==1.0.0. #I had to run this to work $ airflow version # check if everything is ok $ airflow initdb #start the database Airflow uses $ airflow scheduler #start the scheduler
Затем откройте другое окно терминала и запустите сервер:
$ source .env/bin/activate $ airflow webserver -p 8080
А теперь давай открывай https://localhost:8080 чтобы получить доступ к пользовательскому интерфейсу воздушного потока.
Создание вашего первого DAG
С помощью Airflow вы задаете свой рабочий процесс в DAG (направленный ациклический граф). DAG-это скрипт Python, который содержит коллекцию всех задач, организованных таким образом, чтобы отразить их отношения и зависимости, как указано здесь .
Для задания задач используются Операторы . Существуют операторы Bash (для выполнения команд bash), операторы Python (для вызова функций Python), операторы MySQL (для выполнения команд SQL) и так далее. В этом уроке вы будете использовать только оператор Bash для запуска скриптов.
Когда вы запускаете рабочий процесс , он создает DAG Run , объект, представляющий экземпляр DAG во времени. Поскольку вы впервые используете Air flow, в этом уроке мы запустим программу вручную и пока не будем использовать планировщик.
Давайте создадим first_dag.py
скрипт, чтобы понять, как все это сочетается.
from airflow import DAG from airflow.operators.bash_operator import BashOperator from airflow.utils.dates import days_ago default_args = { 'owner': 'Déborah Mesquita', 'start_date': days_ago(1) } # Defining the DAG using Context Manager with DAG( 'extract-meeting-activities', default_args=default_args, schedule_interval=None, ) as dag: t1 = BashOperator( task_id = 'extract_text_from_pdf', bash_command = 'YOUR-LOCAL-PATH/airflow-tutorial/pdf_to_text.sh {{ dag_run.conf["working_path"] if dag_run else "" }} {{ dag_run.conf["pdf_filename"] if dag_run else "" }}', ) t2 = BashOperator( task_id = 'extract_metadata_from_text', bash_command = 'python3 YOUR-LOCAL-PATH/airflow-tutorial/extract_metadata.py {{ dag_run.conf["working_path"] if dag_run else "" }}', ) t1 >> t2 # Defining the task dependencies
Вы указываете два параметра в default_args
dict, владельца и дату начала. Для создания экземпляра DAG мы используем контекстный менеджер. Первый параметр-это имя DAG ( extract-meeting-activities
), затем мы передаем default_args
и устанавливаем schedule_interval
в None, так как запускаем рабочий процесс вручную.
После этого пришло время создавать задачи. В этом рабочем процессе вы будете использовать оператор Bash, так как рабочий процесс состоит из запуска скриптов Bash и Python.
Помните, что первый скрипт получает имя файла pdf в качестве параметра? В этот ДЕНЬ вы передадите этот параметр при запуске рабочего процесса вручную. Параметры хранятся в файле dag_run.conf
dict, и вы можете использовать ключи для доступа к ним.
Чтобы рабочий процесс работал должным образом, вам нужно будет внести некоторые изменения в сценарии. Когда Airflow запускает скрипты, он делает это во временном каталоге, что означает, что это не будет тот же каталог, в котором мы запускали скрипты вручную. Таким образом, вы передадите ПУТЬ, содержащий PDF-файл, который будет тем же путем, по которому вы хотите сохранить csv-файл в качестве параметра. Вы можете увидеть окончательные скрипты здесь, в этом репозитории .
Затем пришло время определить зависимости задач. Для этого вы используете оператор bitshift, >>, что означает, что t1 работает первым, а t2-вторым.
Запуск вашего первого рабочего процесса
По умолчанию Airflow просматривает каталог ~/airflow/dag
для поиска DAG. Поскольку мы не изменили конфигурацию воздушного потока, это должно быть для вас. Так что давай копируй файл first_dag.py в этот каталог.
$ cp first_dag.py ~/airflow/dags/
Теперь, если вы идете в https://localhost:8080 вы можете увидеть ДЕНЬ.
Чтобы вызвать DAQ, нажмите на его имя ( extract-meeting-activities
), а затем нажмите на кнопку “Trigger DAQ”:
Теперь передайте параметр в текстовое поле и нажмите кнопку триггера. Вот эти параметры:
{"working_path": "YOUR-LOCAL-PATH/airflow-tutorial/", "pdf_filename": "sample_meeting_agenda.pdf"}
Тогда, если вы зайдете в “TreeView”, вы сможете увидеть все пробеги за этот ДЕНЬ. Зеленый означает, что все прошло хорошо, красный означает, что задача провалилась, а желтый означает, что задача еще не запущена и готова к повторной попытке. Ознакомьтесь с жизненным циклом задачи здесь . Чтобы проверить журналы для каждой задачи, вы можете нажать на квадрат, а затем нажать на кнопку “Просмотреть журнал”.
И вот он у вас, ваш первый рабочий процесс, работающий с воздушным потоком! Некоторые хорошие следующие шаги могут заключаться в том , чтобы узнать больше о DAG Runs , проверить интерфейс командной строки или использовать оператор для выполнения SQL-команд .
Спасибо за чтение!