Рубрики
Без рубрики

SQLalchemy делает ETL волшебным образом легко

Автор оригинала: Peter Gleeson.

Одним из ключевых аспектов любого рабочего процесса данных науки является источник, очистка и хранение необработанных данных в форме, которую можно использовать вверх по течению. Этот процесс обычно называют «нагрузкой на преобразование экстракта» или ETL.

Важно разработать эффективные, надежные и надежные процессы ETL или «трубопроводы передачи данных». Неэффективный трубопровод сделает работу с данными медленным и непродуктивным. Невоздушный трубопровод сломается легко, оставляя пробелы.

Хуже того, ненадежный канатный трубопровод будет молча загрязнять вашу базу данных с ложными данными, которые могут не стать очевидными, пока ущерб не будет сделан.

Хотя критически важно развитие ETL может быть медленным и громоздким процессом в разрядниках. К счастью, есть решения с открытым исходным кодом, которые делают жизнь намного проще.

Что такое SQLALCHEMY?

Одним из таких растворов является модуль Python, называемый SQLalchemy. Это позволяет инженерам и разработчикам данных определять схемы, записи запросов и манипулировать баз данных SQL полностью через Python.

Объект SQLALCHEMY REALACTION MAPPER MAPPER (ORM) и функциональные возможности языка экспрессии утриктируют некоторые характеристики, очевидные между различными реализациями SQL, позволяя связать классы Python и конструировать с таблицами и выражениями данных.

Здесь мы пройдем несколько основных моментов SQLALCHEMY, чтобы узнать, что это может сделать, и как он может сделать развитие ETL более гладким процессом.

Установка

Вы можете установить SQLALCHEMY, используя установщик пакета PIP.

$ sudo pip install sqlalchemy

Что касается самого SQL, существует много разных версий, включая MySQL, Postgres, Oracle и Microsoft SQL Server. Для этой статьи мы будем использовать SQLite.

SQLite – это реализация с открытым исходным кодом SQL, которая обычно представляется предварительно установленным с Linux и Mac OS X. Он также доступен для Windows. Если у вас уже нет его в вашей системе, вы можете следовать Эти инструкции встать и бежать.

В новом каталоге используйте терминал для создания новой базы данных:

$ mkdir sqlalchemy-demo && cd sqlalchemy-demo
$ touch demo.db

Определение схемы

А Схема базы данных Определяет структуру системы базы данных с точки зрения таблиц, столбцов, полей и отношений между ними. Схемы могут быть определены в RAW SQL или через использование функции SQLALCHEMY.

Ниже приведен пример, показывающий, как определить схему двух таблиц для воображаемой платформы блогов. Одним из них является таблица пользователей, а другая – это таблица загрузки постов.

from sqlalchemy import *
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
from sqlalchemy.sql import *

engine = create_engine('sqlite:///demo.db')
Base = declarative_base()

class Users(Base):
    __tablename__ = "users"
    UserId = Column(Integer, primary_key=True)
    Title = Column(String)
    FirstName = Column(String)
    LastName = Column(String)
    Email = Column(String)
    Username = Column(String)
    DOB = Column(DateTime)

class Uploads(Base):
    __tablename__ = "uploads"
    UploadId = Column(Integer, primary_key=True)
    UserId = Column(Integer)
    Title = Column(String)
    Body = Column(String)
    Timestamp = Column(DateTime)

Users.__table__.create(bind=engine, checkfirst=True)
Uploads.__table__.create(bind=engine, checkfirst=True)

Во-первых, импортируйте все необходимое от SQLalchemy. Тогда используйте create_engine (connection_string) Для подключения к вашей базе данных. Точная строка подключения будет зависеть от версии SQL, с которой вы работаете. Этот пример использует относительный путь к базе данных SQLite, создаваемой ранее.

Далее начните определять ваши таблицы классов. Первый в примере это Пользователи Отказ Каждый столбец в этой таблице определен как переменная класса с использованием SQLALCHEMYS Столбец (тип) , где Тип Это тип данных (например, целое число , String , dateTime и т. Д.). Использовать Primary_key = True обозначить столбцы, которые будут использоваться в качестве основных ключей.

Следующая таблица определена здесь Загрузите Отказ Это очень такая же идея – каждый столбец определяется как раньше.

Последние две строки фактически создают таблицы. CheckFirst = True Параметр гарантирует, что новые таблицы создаются только в том случае, если они в настоящее время не существуют в базе данных.

Извлекать

Как только схема определена, следующая задача – Экстракт необработанные данные из его источника. Точные данные могут дико варьироваться в зависимости от случая в случае, в зависимости от того, как предоставляется необработанные данные. Может быть, ваше приложение вызывает внутреннюю или стороннюю API или, возможно, вам нужно прочитать данные, зарегистрированные в файле CSV.

Ниже приведен пример использует два API для моделирования данных для платформы вымышленного блогов, описанной выше. Пользователи Таблица будет заполнена профилями, сгенерированными в RandomUser.me и Загрузите Стол будет содержать лориам IPSUM-вдохновленные данные об организации Jsonsholder Отказ

Python’s Запросы Модуль можно использовать для вызова этих API, как показано ниже:

import requests

url = 'https://randomuser.me/api/?results=10'
users_json = requests.get(url).json()
url2 = 'https://jsonplaceholder.typicode.com/posts/'
uploads_json = requests.get(url2).json()

Данные в настоящее время проводятся в двух объектах ( users_json и uploads_json ) в формате json. Следующим шагом будет трансформироваться и загружать эти данные в таблицы, определенные ранее.

Преобразовать

До того, как данные могут быть загружены в базу данных, важно убедиться, что она находится в правильном формате. Объекты JSON, созданные в вышеупомянутом коде выше, вложены и содержат больше данных, чем требуется для определения таблиц.

Важный посреднический шаг для трансформировать Данные из его текущего вложенного формата JSON в плоский формат, который можно безопасно записать в базу данных без ошибок.

Для примера, проходящего через эту статью, данные относительно просты и не понадобится много преобразований. Код ниже создает два списка, Пользователи и Загрузите , который будет использоваться на последнем шаге:

from datetime import datetime, timedelta
from random import randint

users, uploads = [], []

for i, result in enumerate(users_json['results']):
    row = {}
    row['UserId'] = i
    row['Title'] = result['name']['title']
    row['FirstName'] = result['name']['first']
    row['LastName'] = result['name']['last']
    row['Email'] = result['email']
    row['Username'] = result['login']['username']
    dob = datetime.strptime(result['dob'],'%Y-%m-%d %H:%M:%S')    
    row['DOB'] = dob.date()
    
    users.append(row)
    
for result in uploads_json:
    row = {}
    row['UploadId'] = result['id']
    row['UserId'] = result['userId']
    row['Title'] = result['title']
    row['Body'] = result['body']
    delta = timedelta(seconds=randint(1,86400))
    row['Timestamp'] = datetime.now() - delta
    
    uploads.append(row)

Главный шаг здесь – проиграть через объекты JSON, созданные ранее. Для каждого результата создайте новый объект словаря Python с ключами, соответствующими каждому столбцу, определенному для соответствующей таблицы в схеме. Это гарантирует, что данные больше не вложены, и сохраняет только данные, необходимые для таблиц.

Другой шаг – использовать Python’s datetime Модуль для манипулирования датами и преобразовать их в Datetime Введите объекты, которые можно записать в базу данных. Ради этого примера случайного Datetime Объекты генерируются с использованием TimEdelta () Метод из модуля DateTime Python.

Каждый созданный словарь добавляется в список, который будет использоваться на последнем этапе трубопровода.

Нагрузка

Наконец, данные в форме, которая может быть загружен в базу данных. SQLalchemy делает этот шаг прямо через свою сессию API.

API сеанса действует немного похожи на посредник или «холдинг-зона» для объектов Python, который вы либо загружены, либо связаны с базой данных. Эти объекты могут быть манипулируемыми в сеансе, прежде чем быть приверженным к базе данных.

В приведенном ниже коде создается новый объект сеанса, добавляет к нему строки, а затем сливаются и совершают их в базу данных:

Session = sessionmaker(bind=engine)
session = Session()

for user in users:
    row = Users(**user)
    session.add(row)
    
for upload in uploads:
    row = Uploads(**upload)
    session.add(row)

session.commit()

SessionMaker Фабрика используется для создания вновь настроенных Сессия классы. Сессия это каждый день Python класс, который создается на второй строке как сессия Отказ

Далее два цикла, которые повторяются через Пользователи и Загрузите Списки, созданные ранее. Элементы этих списков являются объектами словаря, клавиши которых соответствуют столбцам, приведенным в Пользователи и Загрузите классы, определенные ранее.

Каждый объект используется для создания нового экземпляра соответствующего класса (используя Handy Python quey_function (** uDe_dict) трюк). Этот объект добавляется к текущему сеансу с session.add () Отказ

Наконец, когда сеанс содержит строки, которые будут добавлены, session.commit () используется для совершения транзакции в базу данных.

Агрегация

Другая прохладная особенность SQLALCHEMY – это возможность использовать систему языка экспрессии для записи и выполнения ошибок Backend-Agnostic SQL.

Каковы преимущества записи бэк-агностических запросов? Для начала они делают какие-либо будущие миграционные проекты намного проще. Различные версии SQL имеют несколько несовместимых синтаксисов, но язык выражения SQLachmy, выступает в качестве лингва франки между ними.

Кроме того, возможность запросить и взаимодействовать с вашей базой данных в беспрепятственно, Pythonic, является реальным преимуществом для разработчиков, которые предпочли бы полностью работать на языке, которые они знают лучше всего. Однако SQLALCHEMY также позволит вам работать на простых SQL, для случаев, когда проще использовать предварительно письменный запрос.

Здесь мы расширим пример платформы вымышленного блогов, чтобы проиллюстрировать, как это работает. Как только базовые пользователи и загрузки таблиц были созданы и заполнены, следующий шаг может быть создан для создания Агрегированные Таблица – например, показывая, сколько статей каждый пользователь опубликовал, и время их последнего активного.

Сначала определите класс для агрегированного стола:

class UploadCounts(Base):
    __tablename__ = "upload_counts"
    UserId = Column(Integer, primary_key=True)
    LastActive = Column(DateTime)
    PostCount = Column(Integer)

UploadCounts.__table__.create(bind=engine, checkfirst=True)

Эта таблица будет иметь три столбца. Для каждого UserID Он будет хранить временную метку, когда они были последними активными, и количество загрузки их загрузки.

На простых SQL эта таблица будет заполнена с использованием запроса по линиям:

INSERT INTO upload_counts
SELECT
  UserId,
  MAX(Timestamp) AS LastActive,
  COUNT(UploadId) AS PostCount
FROM
  uploads
GROUP BY 1;

В SQLALCHEMY, это было бы написано как:

connection = engine.connect()

query = select([Uploads.UserId,
    func.max(Uploads.Timestamp).label('LastActive'),
    func.count(Uploads.UploadId).label('PostCount')]).\ 
    group_by('UserId')

results = connection.execute(query)

for result in results:
    row = UploadCounts(**result)
    session.add(row)
    
session.commit()

Первая строка создает Соединение Объект, используя двигатель Объект Подключиться () метод. Далее запрос определяется с помощью Выберите () функция.

Этот запрос такой же, как простая версия SQL, приведенная выше. Это выбирает UserID столбец из Загрузите Таблица. Это также относится к funcc.max () к Timestamp Столбец, который идентифицирует самый последний меток времени. Это помечено ХАДЛИВАЙТЕ используя Этикетка () метод.

Аналогично, запрос относится func.count () Подсчитать количество записей, которые появляются в Название столбец. Это помечено Postcount Отказ

Наконец, запрос использует group_by () Для групповых результатов по UserID Отказ

Чтобы использовать результаты запроса, A для Loop Iterate по объектам строки, возвращаемых Connection.execute (запрос) Отказ Каждая строка используется для создания экземпляра экземпляра UploadsCounts Табличный класс. Как и прежде, каждая строка добавляется к сессия Объект, и, наконец, сеанс предан в базе данных.

Проверка

После того, как вы запустите этот скрипт, вы можете убедить себя, что данные были правильно написаны в Demo.db База данных создана ранее.

После выхода Python откройте базу данных в SQLite:

$ sqlite3 demo.db

Теперь вы должны иметь возможность запускать следующие запросы:

SELECT * FROM users;

SELECT * FROM uploads;

SELECT * FROM upload_counts;

И содержимое каждой таблицы будет напечатано на консоль! Планируем сценарий Python для регулярных интервалов, вы можете быть уверены, что база данных будет сохранена в актуальном состоянии.

Теперь вы можете использовать эти таблицы для записи запросов для дальнейшего анализа, или создавать приборные панели для целей визуализации.

Чтение дальше

Если вы сделали это так далеко, то, надеюсь, вы узнаете что-то или две вещи о том, как SQLalchemy может сделать развитие ETL в Python намного простым!

Для одной статьи невозможно сделать полный справедливость всем особенностям SQLALCHEMY. Однако одним из ключевых преимуществ проекта является глубина и детализация его документации. Вы можете погрузиться в него здесь Отказ

В противном случае проверьте этот читчик Если вы хотите быстро начать.

Полный код для этой статьи можно найти в этот гид Отказ

Спасибо за прочтение! Если у вас есть какие-либо вопросы или комментарии, пожалуйста, оставьте ответ ниже.