Рубрики
Без рубрики

Data Engineering 101: Автоматизация первого извлечения данных

Фото Денниса Куммера на Unsplash. За последние несколько недель мы обсуждали несколько важных топ … Tagged с SQL, Python, базой данных.

Фотография Деннис Куммер на Неспособный .

За последние несколько недель мы обсуждали несколько важных тем в мире Техника и автоматизация данных Анкет

Мы заложили основу для понимания словарный запас и основные концепции, которые использует инженер данных . Теперь пришло время начать создавать свой первый набор партийных работ.

Для этого мы будем использовать Apache Airflow Библиотека, чтобы помочь автоматизировать нашу работу.

Кроме того, для нашего живого источника данных мы будем использовать Sfgov 311 набор данных. Вы можете получить эту информацию в любое время и получить самые современные данные о 311 отчетах. Обычно это включает в себя такие вещи, как вандализм, нарушения парковки и т. Д.

Для этого конвейера мы сначала извлекаем данные в необработанный CSV, а затем загружать их в базу данных MySQL.

В этой статье мы рассмотрим код, необходимый для этого, а также указать на некоторые причины для различных шагов, которые мы предпринимаем.

Прежде чем начать работу, вам нужно настроить среду воздушного потока, чтобы иметь возможность выполнить вместе с каждым из шагов, обсуждаемых в этой статье. Если вы еще этого не сделали, мы находим это статья, чтобы быть одним из наших личных фаворитов Анкет

При создании трубопроводов данных, особенно тех, которые более ориентированы на партии, полезно извлекать данные в необработанный уровень данных. Это позволяет вам иметь резервное копирование необработанных данных.

Когда в данных есть ошибка, наличие необработанного файла может помочь вам выяснить, находится ли ошибка данных в источнике или в вашем общем процессе.

В нашем случае мы будем получать данные из набора данных JSON, который онлайн. Мы можем сделать это, используя функцию Pandas, называемую [read_json] (https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.read_json.html ) . Это может либо прочитать файл, либо URL.

Мы сделаем это, сделав функцию, которую вы можете вызвать, которая извлечет данные на основе JSON либо из URL, либо файла.

У нас также может быть метка временных времен к имени файла, чтобы убедиться, что мы знаем, когда мы вытащили данные. Это может быть использовано позже при попытке отслеживать любые изменения в данных, которые могут быть более моментальными и обновлением данных.

Это можно сделать с помощью DateTime объект, как показано ниже:

Это имя файла будет использоваться позже с функцией экстракта, которую мы создаем ниже:

# Step 1: Import all necessary packages.

# For scheduling
import datetime as dt

# For function jsonToCsv
import pandas as pd

# For function csvToSql
import csv
import pymysql

# Backwards compatibility of pymysql to mysqldb
pymysql.install_as_MySQLdb()

# Importing MySQLdb now
import MySQLdb

# For Apache Airflow
from airflow import DAG
from airflow.operators.python_operator import PythonOperator


# Step 2: Define functions for operators.

# A JSON string reader to .csv writer function.
def jsonToCsv(url, outputcsv):

    # Reads the JSON string into a pandas DataFrame object.
    data = pd.read_json(url)

    # Convert the object to a .csv file.
    # It is unnecessary to separate the JSON reading and the .csv writing.
    data.to_csv(outputcsv)

    return 'Read JSON and written to .csv'

Эта задача все еще должна быть реализована. Тем не менее, мы покажем вам, что в конце, как только у нас будет настроена функция загрузки.

После того, как у вас есть извлечение, ваш следующий шаг – загрузить ваши данные в какой -то необработанный слой в вашем хранилище данных.

Ключ на этом шаге – не манипулировать вашими данными. Причина в том, что если ваши данные имеют некоторую форму проблемы или проблемы с данными из источника данных, то легче отследить.

Вы можете сделать это, внедряя проверки качества данных на каждом шаге.   В необработанных проверках вы обычно проверяете, что типы данных имеют смысл.

Например, все ли даты даты поля? Все ли государства действительными штатами? Хотите верьте, хотите нет, у нас были проблемы здесь. «Мы» не государственная аббревиатура.

Это проверки здравомыслия, чтобы убедиться, что данные, которые вы набираете, верны.

Теоретически, ваше приложение должно иметь входные данные с двумя проверками. Тем не менее, мы никогда не доверяем уровню приложения.

Все это в стороне, вы можете использовать код ниже. Что вы заметите, так это то, что у нас сначала есть подключение к базе данных с использованием MySQL, а затем загрузите строку CSV по строке:

def csvToSql():

    # Attempt connection to a database
    try:
        dbconnect = MySQLdb.connect(
                host='localhost',
                user='root',
                passwd='databasepwd',
                db='mydb'
                )
    except:
        print('Can\'t connect.')

    # Define a cursor iterator object to function and to traverse the database.
    cursor = dbconnect.cursor()
    # Open and read from the .csv file
    with open('./rogoben.csv') as csv_file:

        # Assign the .csv data that will be iterated by the cursor.
        csv_data = csv.reader(csv_file)

        # Insert data using SQL statements and Python
        for row in csv_data:
            cursor.execute(
            'INSERT INTO rogobenDB3(number, docusignid, publicurl, filingtype, \
                    cityagencyname, cityagencycontactname, \
                    cityagencycontacttelephone, cityagencycontactemail, \
                    bidrfpnumber, natureofcontract, datesigned, comments, \
                    filenumber, originalfilingdate, amendmentdescription, \
                    additionalnamesrequired, signername, signertitle) ' \
                    'VALUES("%s", "%s", "%s", "%s", "%s", "%s", "%s", "%s", "%s", \
                    "%s", "%s", "%s", "%s", "%s", "%s", "%s", "%s", "%s")',
                    row
                    )

    # Commit the changes
    dbconnect.commit()

    '''
    # Print all rows - FOR DEBUGGING ONLY
    cursor.execute("SELECT * FROM rogobenDB3")
    rows = cursor.fetchall()

    print(cursor.rowcount)
    for row in rows:
        print(row)
    '''

    # Close the connection
    cursor.close()

    # Confirm completion
    return 'Read .csv and written to the MySQL database'

Если бы вы создали более надежную систему, то вы, вероятно, создали бы какую -то форму класса менеджера базы данных, которая просто взяла строку подключения, которую вы импортировали.

Однако, поскольку мы просто создаем это для демонстрации, мы поместили этот код в одну функцию.

Со всеми этими функциями вы можете официально настроить свой DAG. Как я упоминал в предыдущих статьях , DAG действует как блок -схема. Он направляет, какие задачи выполняются первыми, а какие задачи зависят от других задач.

В нашем случае у нас есть задача, которая извлекает данные и другую задачу, которая загружает указанные данные в таблицу MySQL. Эти две основные задачи помогут запустить ваш трубопровод, и это будет выглядеть так, как приведенный ниже.

Теперь со всеми этими функциями мы можем настроить трубопровод.

Настройка фактического трубопровода в воздушном потоке требует, чтобы вы установили набор аргументов по умолчанию. Это позволяет вам установить владельца, дату начала, как часто будет повторно трубопровод, и несколько других параметров:

# Step 3: Define the DAG, i.e. the workflow

# DAG's arguments
default_args = {
        'owner': 'rogoben',
        'start_date':dt.datetime(2020, 4, 16, 11, 00, 00),
        'concurrency': 1,
        'retries': 0
        }

# DAG's operators, or bones of the workflow
with DAG('parsing_govt_data',
        catchup=False, # To skip any intervals we didn't run
        default_args=default_args,
        schedule_interval='* 1 * * * *', # 's m h d mo y'; set to run every minute.
        ) as dag:

    opr_json_to_csv = PythonOperator(
            task_id='json_to_csv',
            python_callable=jsonToCsv,
            op_kwargs={
                'url':'https://data.sfgov.org/resource/pv99-gzft.json',
                'outputcsv':'./rogoben.csv'
                }
            )

    opr_csv_to_sql = PythonOperator(
            task_id='csv_to_sql',
            python_callable=csvToSql
            )

# The actual workflow
opr_json_to_csv >> opr_csv_to_sql

В дополнение к параметрам вам нужно будет настройка ваших конкретных операторов. В этом случае у нас есть две функции: Jsontocsv и CSVTOSQL Анкет Они будут использоваться в Pythonoperator Анкет Это позволяет вам создавать то, что мы называем задачами.

Чтобы убедиться, что задачи работают в порядке, который будет иметь смысл, вам необходимо определить зависимости.

Вы можете определить зависимость, используя оператор сдвига битов. Для тех, кто не знаком с оператором сдвига битов, он выглядит как >> или << Анкет

В этом случае вы бы определили это как OPR_JSON_TO_CSV >> OPR_CSV_TO_SQL Анкет

Это гарантирует, что OPR_JSON_TO_CSV Бежит до OPR_CSV_TO_SQL Анкет

По правде говоря, у вас будет дубликат данных, загружающих данные таким образом.

Чтобы справиться с дублирующими данными, вы можете загрузить необработанный слой, а затем проверить, чтобы убедиться, что вы не загружаете дубликаты данных в более позднем этаже. Поэтому мы сейчас не будем беспокоиться об этом.

С этим вы, по сути, завершили свой первый трубопровод.

Итак, куда вы положили свой трубопровод?

Для того, чтобы пробежать этот трубопровод, вам нужно будет сохранить его в своем воздушный поток/dags Папка, которую вы настроили. Если вы все еще не настроили его, то вы должны использовать наш Любимое руководство по настройке воздушного потока Анкет

airflow                  # airflow root directory.
├── dags                 # the dag root folder
│   ├── first_dag.py        # where you put your first task

Как только этот трубопровод будет сохранен-и до тех пор, пока у вас будет воздушный поток, работающий на заднем плане-ваш DAG автоматически будет забирается воздушным потоком.

Вы можете проверить это, отправившись в свой Localhost: 8080, где приборная панель Airflow работает по умолчанию.

Оттуда ваш даг должен появиться.

Как только он появится, вы можете начать изучать свой DAG, чтобы убедиться, что все различные задачи настроены.

Это будет выглядеть как изображения ниже:

Теперь ваш трубопровод готов к работе.

Поздравляю с строительством и автоматизацией вашего первого конвейера данных воздушного потока! Теперь вы можете принять эту структуру и использовать ее в других ваших ETL и конвейерах данных.

Это, конечно, является лишь первым уровнем платформы данных или хранилища данных. Отсюда вам все равно потребуется создать производственный уровень, уровень метрик и какой -то визуализацию данных или слой науки данных.

Тогда вы действительно можете начать оказывать влияние на ваши данные.

Если вы хотите узнать больше о науке о данных, облачных вычислениях и технологии, ознакомьтесь с статьями ниже!

Data Engineering 101: Написание вашего первого трубопровода

5 отличных библиотек для управления большими данными

Каковы различные виды облачных вычислений

4 простые идеи Python для автоматизации Ваш рабочий процесс

4 должны иметь навыки для ученых данных

Лучшие практики SQL — Проектирование видео ETL

5 отличных библиотек для управления большими данными с Python

Соединение данных в DynamoDB и S3 для живой рекламы HOC -анализ

Оригинал: “https://dev.to/seattledataguy/data-engineering-101-automating-your-first-data-extract-g6j”