Рубрики
Без рубрики

Техника данных 101: написание вашего первого трубопровода

Одной из главных ролей инженера-данных с данными может быть подведена в соответствии с получением данных из точки A до точки B. W … Tagged Pitced Python, База данных, Datascity, Nownners.

Одна из главных ролей инженера данных может быть суммирована как получение данных из точки A до точки B.

Нам часто нужно вытащить данные из одной системы и вставить ее в другую. Это может быть для различных целей. Это включает в себя аналитику, интеграцию и машинное обучение.

Но для того, чтобы получить эти данные, мы должны использовать то, что известно как ETL/Data Publics.

Это процессы, которые используют данные из одной системы данных в другую.

Одним из вопросов, который нам нужно ответить, так как инженеры данных являются тем, как часто нам нужно обновлять эти данные. Именно здесь вопрос о пакете против потока вступает в игру. Это два основных типа ETL/ELTS, которые существуют.

В течение очень долгого времени почти каждый конвейер данных был тем, что мы рассмотрим пакетный трубопровод. Это означает, что трубопровод обычно работает один раз в день, час, неделю и т. Д. Есть какой-то конкретный интервал времени, но данные не живут.

Пакетные задания относится к данным, которые загружаются в куски или партии, а не сразу. Таким образом, термин пакетные задания, поскольку данные загружаются в партии.

Сравните это с потоковыми данными, где, как только новая строка добавляется в базу данных приложения, она передается в аналитическую систему.

Это обычно делается с использованием различных форм моделей типов шины PUB/SUB или событий. Все эти системы позволяют передавать транзакционные данные почти сразу, как только возникает транзакция.

Некоторые могут спросить, почему мы не просто используем потоковое для всего. Разве не лучше иметь живые данные все время?

В некоторых случаях это правда.   Но часто создание потоковых систем технически более сложным, и поддерживает его также сложно.

Принимая во внимание, что в то время как пакетные задания работают по нормальным интервалам, могут потерпеть неудачу, им не нужно исправить сразу, потому что у них часто есть несколько часов или дней, прежде чем они запускаются снова.

В сравнении, потоковая система живет все время. Сбои и ошибки должны быть исправлены как можно скорее.

На данный момент мы собираемся сосредоточиться на разработке того, что традиционно больше пакетных рабочих мест.

Помимо подбора вашей общей парадигмы для вашего ETL, вам нужно будет определиться с вашим инструментом ETL.

Если вы просто хотите добраться до раздела кодирования, не стесняйтесь пропустить в разделе ниже.

Но мы не можем остолить слишком далеко в разработке трубопроводов, не ссылаются на несколько вариантов, с которыми ваша команда данных должна работать.

Существует множество инструментов автоматизации трубопроводов и рабочего процесса.

Давайте сломаем их на два определенных варианта.

Перетащите по сравнению с рамки.

Параметры перетаскивания предлагают вам возможность узнать почти ничего о коде – – это было бы похоже на SSIS и Informatica.

Они отлично подходят для людей, которые не требуют никакого пользовательского кода для реализации.

Хотя многие из этих инструментов предлагают настраиваемый код, который будет добавлен, оно разбегает цель.

Если ваша команда может написать код, мы находим его более полезным для записи трубопроводов, используя каркасы, так как они часто позволяют лучше использовать настройку. Хотя Informatica довольно мощна и делает много тяжелого подъема, пока вы можете подать счет.

Тем не менее, многие люди полагаются на кодовые рамки для своих ETL (некоторые компании, такие как Airbnb и Spotify, разработали свои собственные).

Эти рамки часто реализуются в Python и называются Воздушный поток и луйджи Отказ Обе эти рамки могут быть использованы в качестве рабочих процессов и предлагают различные преимущества.

Но на данный момент давайте посмотрим на то, что это похоже на строительство базового трубопровода в воздухе и луиджи.

У более поздних постов мы поговорим больше о дизайне. Но на данный момент мы просто демонстрируем, как писать трубопроводы ETL.

Чтобы сделать трубопроводы в воздушном потоке, есть несколько конкретных конфигураций, которые вам нужно настроить.

Существует набор аргументов, которые вы хотите установить, а затем вам также нужно будет вызовать фактический DAG, который вы создаете с помощью этих args по умолчанию.

Смотрите конфиг ниже.

default_args = {
'owner': 'default_user',
'start_date': airflow.utils.dates.days_ago(1),
'depends_on_past': True,
  #With this set to true, the pipeline won't run if the previous day failed
'email': ['info@example.com'],
'email_on_failure': True,
 #upon failure this pipeline will send an email to your email set above
'email_on_retry': False,
'retries': 5,
'retry_delay': timedelta(minutes=30),
}


dag = DAG(
'basic_dag_1',
default_args=default_args,
schedule_interval=timedelta(days=1),
)

Это только база вашего дага.

Вы можете установить такие вещи, как часто, как часто вы запускаете фактический конвейер передачи данных — как если вы хотите запускать свое расписание ежедневно, затем используйте следующие параметры кода. Например, вы можете использовать Schood_Interval = '@ Daily' Отказ Или вместо этого вы можете использовать CRON, как это: Schood_Interval = '0 0 * * *' Отказ

Как только вы настроите вашу базовую конфигурацию, вы можете начать собрать операторы для воздушного потока.

Операторы по сути являются изолированными задачами, которые вы хотите сделать. Это может быть извлечь данные, перемещая файл, запуская некоторые данные преобразования и т. Д.

Например, если вы посмотрите ниже, мы используем несколько операторов. К ним относятся Pythonoperator и Bashoperator.

Это позволяет запустить команды в Python или Bash и создавать зависимости между указанными задачами.

from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator
from datetime import datetime, timedelta

default_args = {
'owner': 'default_user',
'start_date': airflow.utils.dates.days_ago(1),
'depends_on_past': True,
  #With this set to true, the pipeline won't run if the previous day failed
'email': ['info@example.com'],
'email_on_failure': True,
 #upon failure this pipeline will send an email to your email set above
'email_on_retry': False,
'retries': 5,
'retry_delay': timedelta(minutes=30),
}

dag = DAG(
'basic_dag_1',
default_args=default_args,
schedule_interval=timedelta(days=1),
)

def my_func():
    print('Hello from my_func')


bashtask = BashOperator(
task_id='print_date',
bash_command='date',
dag=dag,
)

dummy_task  = DummyOperator(task_id='dummy_task', retries=3)

python_task = PythonOperator(task_id='python_task', python_callable=my_func)

dummy_task.set_downstream(bashtask)
python_task.set_downstream(bashtask)

Мы идем немного более глубоко на воздушных трубопроводах здесь. Но это главный гид этого.

Вы можете продолжать создавать больше задач или разработать абстракции, чтобы помочь управлять сложностью трубопровода. Но использование выше операторов воздушных потоков является отличное введение.

Сейчас на Луиджи.

Luigi – это еще один рабочий процесс, который можно использовать для разработки трубопроводов. В некоторых отношениях мы находим это проще, а другими способами, он может быстро стать более сложным.

Причина, по которой мы лично найдем Luigi Chireter, это потому, что это нарушает основные задачи на три основных шага.

Это можно увидеть в том, что Luigi определяет как «задание».

Задача

В задаче Luigi функций класса три, которые наиболее используются, требуются (), RUN () и вывода ().

Что делают каждая из этих функций в Луиджи?

Требуется () аналогичен зависимостям в воздухе.

Вы по сути, ссылаясь на предыдущий класс задач, вывод файла или другой выход.

Например:

import luigi

class BasicTask(luigi.Task):

  def requires(self): 
    [FileExistsTask(self.input_filepath)]

В этом случае требуется функция ждет файла на землю.

Но это также может дождаться задачи, чтобы закончить или какой-то другой вывод. Не каждое задание нуждается в требуемой функции. Но его можно использовать для ссылки на предыдущую задачу, которая должна быть завершена для запуска текущей задачи.

Но задачи нуждаются в функции Run (). Функция Run () по существу является фактической задачей самой задачей. Что вы хотите сделать? Например:

class LoadTask(luigi.Task):

file_path = luigi.Parameter()
    def run(self):
        cnx = mysql.connector.connect(user='joe', database='test')
        stmt = "insert into basic_date(col1,col2,col3)  select distinct col1, col2, col3 from table1" 
        curs=cnx.cursor()
        curs.execute(stmt)
        curs.commit()
        curs.close()
        with self.output().open('w') as out_file:
            print >> out_file, strDate1, strDate2
    def output(self):
            return luigi.file.LocalTarget(path='/tmp/123')

Выход задачи является целью, которая может быть файлом в локальной файловой системе, файл на S3 Amazon, некоторые данные в базе данных и т. Д.

Вы можете увидеть небольшую разницу между двумя конвейерами. Воздушный поток завернут в одном конкретном операторе, тогда как Luigi разработан как больший класс.

В конце дня эта небольшая разница может привести к большому количеству изменений дизайна в вашем трубопроводе.

Лично нам нравится воздушный поток из-за большего сообщества. Однако во многих отношениях Луиджи может иметь слегка нижнюю панель к вступлению, насколько это выясняет. Нет много разных операторов, которые могут быть использованы. Вместо этого вы решаете, что на самом деле делает каждую задачу.

Это может позволить немного больше свободы, но и гораздо большее, через дизайн и развитие.

Так что в конце концов, вам придется выбрать то, с чем вы хотите иметь дело. Независимо от того, что вы выбираете, в вашем коде всегда будут ошибки.

Удачи, и спасибо за чтение!

Техника данных 101: Введение в технику

Каковы различные виды облачных вычислений

4 простых идеи Python для автоматизации Ваш рабочий процесс

4 должны иметь навыки для ученых данных

SQL лучшие практики — Проектирование видео ETL

5 Отличные библиотеки для управления большими данными с Python

Присоединение к данным в Dynamodb и S3 для Live Ad HOC анализ

Оригинал: “https://dev.to/seattledataguy/data-engineering-101-writing-your-first-pipeline-2k23”