Рубрики
Без рубрики

Упростите большие задания ETL с использованием SQLALCHEMY

С точки зрения технической изобретательности Google BigQuery – это, вероятно, самый впечатляющий хранилище данных на … Теги с хранилищами данных, Python, SQL, Google Cloud.

С точки зрения технической изобретательности Google BigQuery, вероятно, является самым впечатляющим складом данных на рынке. BigQuery отличается от других хранилищ данных, в том, что основная архитектура передается всем в области Google Cloud, что означает, что вам не нужно платить за выделенный кластер дорогих серверов, чтобы изредка запускать запросы для крупномасштабных данных.

Перемещение данных в склады данных традиционно включает в себя сброс дерьма неструктурированных или полуструктурированных файлов в хранилище, такие как S3, хранение Google или Data Lakes, прежде чем загружать их в пункт назначения. Не удивительно, что поколения, вдохновленные Maprecuce, увековечивая устаревшие процессы в технологии, как обычно делают фанаты Java (не @ me). Google Установил этот статус-кво с биемой, который мы мучительно работали в предыдущем руководстве. К счастью, среди нас таинственные герои, известные только как «сторонние разработчики».

Pybigquery это разъем для SQLALCHEMY, Что позволяет нам подключаться и запрашивать битийческий стол, как будто это была реляционная база данных, так как вы наверняка выполняли с помощью PymySQL или PSYCOPG2. Это действительно так просто, как это звучит. Чтобы сделать все интересные, мы собираемся построить небольшой скрипт для трубных данных взад и вперед от BigQuery до MySQL, и мы сделаем это, создавая экземпляры класса ароматического аглоства базы данных базы данных для обеих сторон. Теперь мы получаем бэки.

_ Отказ от ответственности : То, что мы можем взаимодействовать с большим путем так же легко, как и кислотно-совместимая реляционная база данных, не означает, что мы всегда должны. Ваша обязанность понять плюсы и минусы складов данных и признают, когда их использовать.

Делать наш первый большой запрос

Давайте настроим вас, чтобы сделать голый минимум: потянув строки от биготь с SQLalchemy. Я спашу нас агония, объясняя, что a GCloud учетные данные JSON File Есть и предположить, у вас уже есть один для вашего проекта.

Очевидные пакеты Python мы должны устанавливать Pybigquery и SQLalchemy Отказ Помните: мы не только легко подключаемся к бижу, но и демонстрируйте, насколько легко доставить информацию взад-вперед между этим современным хранилищем данных и традиционными реляционными базами данных. В моем случае я застрял, используя MySQL:

$ pip3 install pybigquery pymysql sqlalchemy loguru

Так же, как обычные соединения SQLALCHEMY, нам нужно создать соединительную строку URI для подключения к BigQuery. Вы заметите некоторую новую терминологию ниже:

bigquery://[GOOGLE_CLOUD_PROJECT_NAME]/[BIGQUERY_DATASET_NAME]

Никогда не бойтесь – вы можете легко найти ваш GCP название проекта , Биологическое название набора данных и Биологическое название таблицы через Боковочная консоль :

Боковочная терминология.

Бросьте все это добро в файл конфигурации, как тот, который у меня ниже. Как добавленный бонус, построить свой BigQuery_uri Переменная здесь, чтобы избежать прохождения тонн значений конфигурации позже:

"""BigQuery Configuration."""
from os import environ


# Google BigQuery config
gcp_credentials = environ.get('GCP_CREDENTIALS')
gcp_project = environ.get('GCP_PROJECT')
bigquery_dataset = environ.get('GCP_BIGQUERY_DATASET')
bigquery_table = environ.get('GCP_BIGQUERY_TABLE')
bigquery_uri = f'bigquery://{gcp_project}/{bigquery_dataset}'

Подключение к биену

Создание двигателя SQLALCHEMY для BigQuery должна выглядеть так же, как всегда, с единственной заметной разницей, являющейся Complents_Path Параметр теперь указывает на наше GCloud учетные данные JSON файл:

from sqlalchemy.engine import create_engine
from config import bigquery_uri


engine = create_engine(bigquery_uri,
                       credentials_path='/path/to/credentials.json')

Теперь мы можем поразить бодрое, так как мы будем с любым двигателем SQLALCHEMY:

...

query = f'SELECT title, url, referrer FROM {bigquery_dataset}.{bigquery_table} \
          WHERE referrer IS NOT NULL \
          AND title IS NOT NULL \
          ORDER BY RAND () LIMIT 20;'
rows = engine.execute(query).fetchall()

Ряды Переменная, возвращаемая нашим запросом, является типичной SQLALCHEMY РЕЗУЛЬТАТПРОКСИЯ , что позволяет легко пройти эти данные в другие направления. Мы можем просмотреть данные, которые мы получили только для удовольствия:

...

rows = [dict(row) for row in rows]
pp = pprint.PrettyPrinter(indent=2)
pp.pprint(rows)

И вот это:

[
  {
    "referrer": "https://www.google.com/",
    "title": "The Art of Routing in Flask",
    "url": "https://hackersandslackers.com/the-art-of-building-flask-routes/"
  },
  {
    "referrer": "https://www.facebook.com/",
    "title": "Demystifying Flask\"s Application Factory And the Application Context",
    "url": "https://hackersandslackers.com/demystifying-flask-application-factory/"
  },
  {
    "referrer": "https://www.google.com/",
    "title": "Connect Flask to a Database with Flask-SQLAlchemy",
    "url": "https://hackersandslackers.com/manage-database-models-with-flask-sqlalchemy/"
  },
  {
    "referrer": "https://www.google.com/",
    "title": "Constructing Database Queries with SQLAlchemy",
    "url": "https://hackersandslackers.com/constructuing-database-queries-with-the-sqlalchemy-orm/"
  },
  {
    "referrer": "https://www.google.com/",
    "title": "Constructing Database Queries with SQLAlchemy",
    "url": "https://hackersandslackers.com/constructuing-database-queries-with-the-sqlalchemy-orm/"
  }
]

ETL с базами данных BeagQuery и SQL

Если вытягивание рядов от BigQuery было все, что вы надеялись достичь, не стесняйтесь пропустить счастливо. Для этих амбициозных немногих, которые остаются, я дам вам в секрете: это не 5-лайнерский «трюк» учебник. О, нет … Этот учебник по созданию основы для нестабильных данных. Это миссия для выделения табличных данных, чтобы свободно перемещаться от хранилища данных в базу данных, не знаю о границах, которые когда-то стояли между ними. Мы строим противоположность Brexit.

По мере роста сообщества SQLALCHEMY, новый ландшафт начинает разворачиваться там, где Каждый источник данных и назначение в нашем стеке совместим с SQLALCHEMY . Это может не звучать поразительно, но идея источников данных, обменивающихся одним диалеком, несколько беспрецедентно. Забудьте о зажигательных кластерах или очередях обмена сообщениями: мы собираемся стандартизировать способ работы с каждым источником данных. Мы сделаем это с помощью абстрагированных баз данных через классы Python, так как таковы, что выборные строки из MySQL будут использовать один и тот же метод, что и выясняет из Postgres или BigQuery без каких-либо особенности.

Классы находятся в сессии

Я собираюсь создать один класс Python, достаточно гибкий для подключения к любому источнику. Этот класс будет иметь методы _ извлекать _ или вставлять Строки, поэтому мы можем легко использовать один и тот же класс для каждого источника данных, независимо от того, что является источником данных по сравнению с получателем. Наш класс имеет три заметных метода:

  1. fetch_rows () : Выполняет запрос, предназначенный для Выберите ряды.
  2. insluet_rows () : Принимает коллекцию строк, которые будут вынуждены по горло из таблицы базы данных. Мы можем выбрать заменить существующую таблицу, если вы существуете, используя заменить Ключевое слово Arg.
  3. contruct_response () Создает хороший человеческий синопсис конечного результата паломничества наших данных.

Довольно простые вещи:

"""Generic Data Client."""
from sqlalchemy import MetaData, Table


class DataClient:

    def __init__(self, engine):
        self.engine = engine
        self.metadata = MetaData(bind=self.engine)
        self.table_name = None

    @property
    def table(self):
        if self.table_name:
            return Table(self.table_name, self.metadata, autoload=True)
        return None

    def fetch_rows(self, query, table=None):
        """Fetch all rows via query."""
        rows = self.engine.execute(query).fetchall()
        return rows

    def insert_rows(self, rows, table=None, replace=None):
        """Insert rows into table."""
        if replace:
            self.engine.execute(f'TRUNCATE TABLE {table}')
        self.table_name = table
        self.engine.execute(self.table.insert(), rows)
        return self.construct_response(rows, table)

    @staticmethod
    def construct_response(rows, table):
        """Summarize results of an executed query."""
        columns = rows[0].keys()
        column_names = ", ".join(columns)
        num_rows = len(rows)
        return f'Inserted {num_rows} rows into `{table}` with {len(columns)} columns: {column_names}'

Теперь мы используем то же самое Dataclient Класс дважды: по одному за источник данных/пункт назначения. Если мы посмотрим поближе, мы можем видеть это insluet_rows () ожидает ввода Ряды , что происходит, чтобы быть такими же типами данных, которые fetch_rows () выходы. Это означает, что мы могли бы «привлечь» строки из источника A и «вставить» эти строки в пункт назначения B только в двух строках кода. Перемещение данных в противоположном направлении так же просто. Разве это не отлично, когда мы все согласны со стандартными библиотеками?

Нам все еще нужно создать для создания клиента для нашей базы данных MySQL, которая не пота. Давайте выбиваем это, похоже на то, что мы только что сделали.

Клиент базы данных SQL

Возвращаясь к нашему config.py Файл, давайте добавим переменные для подключения к MySQL:

from os import environ

# Google BigQuery config
gcp_credentials = environ.get('GCP_CREDENTIALS')
gcp_project = environ.get('GCP_PROJECT')
bigquery_dataset = environ.get('GCP_BIGQUERY_DATASET')
bigquery_table = environ.get('GCP_BIGQUERY_TABLE')
bigquery_uri = f'bigquery://{gcp_project}/{bigquery_dataset}'

# SQL database config
rdbms_user = environ.get('DATABASE_USERNAME')
rdbms_pass = environ.get('DATABASE_PASSWORD')
rdbms_host = environ.get('DATABASE_HOST')
rdbms_port = environ.get('DATABASE_PORT')
rdbms_name = environ.get('DATABASE_NAME')
rdbms_uri = f'mysql+pymysql://{rdbms_user}:{rdbms_pass}@{rdbms_host}:{rdbms_port}/{rdbms_name}'

# Locally stored queries
local_sql_folder = 'sql'

Мы только что расширились на то, что нам пришлось приспособиться к второму источнику данных (мы можем ссылаться на нашу базу данных SQL как RDBMS Каждый сейчас, и тогда, если кто-то смущен. Теперь давайте создадим наш второй двигатель:

"""SQL Database Engine."""
from sqlalchemy.engine import create_engine
from config import rdbms_uri


rdbms_engine = create_engine(rdbms_uri)

Время связать все вместе.

Он принимает два

Давайте танго. Во-первых, мы создаем два источника данных:

from biquery_sql_etl.engines import bigquery_engine, rdbms_engine
from biquery_sql_etl.client import DataClient


def init_pipeline():
    """Move data between Bigquery and MySQL."""
    bqc = DataClient(bigquery_engine)
    dbc = DataClient(rdbms_engine)

Ничего не сумасшедшего здесь! BQC Это наш бигкий клиент, а DBC Наша реляционная база данных клиента.

Теперь, какие данные мы перемещаемся между двумя? Я пошел вперед и беспокоился об этом от нашего имени, импортируя кучу SQL-запросов, которые я хранил в словаре, именованном sql_queries Действительно Если вы заинтересованы, почему я хранил их в качестве словаря, проверьте исходный код в конце.

Вот как это пойдет: каждый запрос собирается выбрать кучу рядов от BigQuery, и сбросьте эти ряды в MySQL. В конце концов, мы будем войти, сколько строк мигрировали:

...

from loguru import logger
from biquery_sql_etl.queries import sql_queries


logger.add('logs/queries.log', format="{time} {message}", level="INFO")


def init_pipeline():
    """Move data between Bigquery and MySQL."""
    num_rows = 0
    bqc = DataClient(bigquery_engine)
    dbc = DataClient(rdbms_engine)
    for table_name, query in sql_queries.items():
        rows = bqc.fetch_rows(query)
        insert = dbc.insert_rows(rows, table_name, replace=True)
        logger.info(insert)
        num_rows += len(rows)
    logger.info(f"Completed migration of {num_rows} rows from BigQuery to MySQL.")

Aaaand выпуска:

2020-02-11 07:29:26.499 - Inserted 100 rows into `weekly_stats` with 4 columns: title, url, slug, views

2020-02-11 07:29:28.552 - Inserted 500 rows into `monthly_stats` with 4 columns: title, url, slug, views

2020-02-11 07:29:28.552 - Completed migration of 600 rows from BigQuery to MySQL.

Что мы можем взять из этого?

Капитализм имеет тенденцию консолидировать все время со временем (предприятиями, продуктами, вариантами), тем не менее, кажется, у нас есть дополнительные варианты хранения данных и данных, чем когда-либо. Последнее десятилетие привело нас Redshift, BigQuery, Cabrachdb, Cassandra, Snowflake … Список продолжается. Единственная тенденция более смешна, чем это обилие выбора – это тенденция на предприятиях, чтобы включить все они. Я готов поспорить, что у вашей компании в средне разместная компания имеет пакет данных, который включает в себя:

  • Несколько типов RDBMS
  • Хранилище данных
  • Необъяснимо неподоведенный экземпляр динамода
  • Забытая база данных MongoDB, которая настроила какой-то парень Frontend лет назад и активно взломается, как мы говорим.

Принятие решений заключается в том, как не являющиеся вкладчикам в актуальность офиса. Больше вариантов означают больше возможностей для людей, чтобы сделать выбор самообслуживания. В программных командах это обычно переводит в бессмысленную архитектуру, где каждая услуга, база данных или хранилище данных является реликвией невидимой политической пьесы. Я не вижу этого изменения в ближайшее время, так что, возможно, мы на что-то в конце концов.

Исходный код для этого руководства нашел здесь:

https://github.com/hackersandslackers/bigquery-sqlalchemy-tutorial

Оригинал: “https://dev.to/hackersandslackers/simplify-bigquery-etl-jobs-using-sqlalchemy-3554”