Рубрики
Без рубрики

Apache Beam: пример питона

Небольшой пример пучкового трубопровода Apache в Python.

Автор оригинала: Bruno Ripa.

В настоящее время, будучи способным обрабатывать огромные объемы данных, могут быть интересным навыком: аналитики, профилирование пользователей, статистика – практически любой бизнес, который требует экстраполяции информации от любых данных, так или иначе, используя некоторые большие инструменты для данных или платформы.

Одним из самых интересных инструментов является Beam Apache, каркас, который дает нам приборы для создания процедур для преобразования, процесса, совокупности и манипулирования данными для наших потребностей.

Давайте попробуем посмотреть, как мы можем использовать его в очень простом сценарии.

Контекст

Представьте, что у нас есть база данных с информацией о пользователях, посещающих веб-сайт, с каждой записью:

  • Страна посещения пользователя
  • Продолжительность визита
  • имя пользователя

Мы хотим создать некоторые отчеты, содержащие:

  1. Для каждой страны Количество пользователей посещение сайта
  2. Для каждой страны среднее время посещения

Мы будем использовать Луч Apache , Google SDK (ранее называемый DataFlow), представляющий Модель программирования Направлен на упрощение механизма масштабной обработки данных.

Он был пожертвован на Фонду Apache и называемый лучом, потому что он способен обрабатывать данные в любой необходимости в нужной форме: партии и Потоки (луч). Это дает вам возможность определить трубопроводы Для обработки данных в режиме реального времени ( потоки ) и исторические данные ( партии ).

Определение трубопровода полностью рассмотрено контекстом, который вы будете использовать, чтобы запустить его, поэтому Beam дает вам возможность выбрать один из поддерживаемых бегунов, которые вы можете использовать:

  • Модель луча: локальное исполнение вашего трубопровода
  • Google Cloud Dataflow: DataFlow в качестве службы
  • Apache Flink.
  • Apache Spark.
  • Apache GearPump
  • Apache Hadoop MapReduce
  • Jstorm.
  • IBM Streams

Мы будем запустить модель Beam One, что в основном выполняет все на локальной машине.

Модель программирования

Хотя это не будет глубоким объяснением модели программирования DataFlow, необходимо понять, какой является трубопровод: набор манипуляций, выполненных на наборе входных данных, которые обеспечивают новый набор данных. Точнее, трубопровод изготовлен из трансформирует применяется к коллекции.

Прямо из Веб-сайт Apache Beam :

Трубопровод инкапсулирует все ваши задачи обработки данных, от начала до конца. Это включает в себя данные ввода для чтения, преобразующие эти данные и запись выходных данных.

Трубопровод получает данные, вводимые извне и представляют его как Коллекции (формально названо Pholelection ы), каждый из них

потенциально распределенный, многоэлементный, набор данных

Когда один или несколько Трансформировать S применяется к Packollection , новый Packollection Сгенерировано (и по этой причине в результате оформление Pholelection S ж неизменные объекты).

Первый и последний шаг трубопровода, конечно, те, которые могут читать и записывать данные в и из нескольких хранилищ – вы можете найти список здесь Отказ

Приложение

У нас будут данные в CSV Файл, поэтому первое, что нам нужно сделать, это прочитать содержимое файла и предоставить структурированное представление всех строк.

Общий ряд CSV Файл будет похоже на следующее:

United States Of America, 0.5, John Doe

С колоннами, являющимися Страна , Время посещения … в считанные секунды и Имя пользователя , соответственно.

Учитывая данные, которые мы хотим предоставить, давайте посмотрим, что будет делать наш трубопровод и как.

Прочитайте набор входных данных

Первый шаг будет читать входной файл.

with apache_beam.Pipeline(options=options) as p:

    rows = (
        p |
        ReadFromText(input_filename) |
        apache_beam.ParDo(Split())
    )

В приведенном выше контексте P это пример apache_beam.pipeline И первое, что мы делаем, это применить встроенный преобразование, apache_beam.io.textio.readfromText Это загрузит содержимое файла в Packollection Отказ После этого мы применяем конкретную логику, Сплит , чтобы обработать каждую строку в входном файле и предоставить более удобное представление (словарь, конкретно).

Вот …| Сплит Функция:

class Split(apache_beam.DoFn):

    def process(self, element):
        country, duration, user = element.split(",")

        return [{
            'country': country,
            'duration': float(duration),
            'user': user
        }]

Pardo Преобразование – это ядро, и, согласно официальной документации Apache Beam Documentation:

Pardo Полезно для различных общих операций обработки данных, в том числе:

  • Фильтрация набора данных. Вы можете использовать Pardo Рассмотреть каждый элемент в Packollection и либо выведите этот элемент к новой коллекции или отказаться от него.
  • Форматирование или тип преобразования каждого элемента в наборе данных. Если ваш ввод Packollection Содержит элементы, которые имеют другой тип или формат, чем вы хотите, вы можете использовать Pardo выполнять преобразование на каждый элемент и вывести результат к новому Packollection Отказ
  • Извлечение частей каждого элемента в наборе данных. Если у вас есть Packollection Записи с несколькими полями, например, вы можете использовать Pardo Чтобы разбирать только поля, которые вы хотите рассмотреть в новый Packollection Отказ
  • Выполнение вычислений на каждом элементе в наборе данных. Вы можете использовать Pardo выполнять простые или сложные вычисления на каждом элементе или определенные элементы, из Packollection и выводить результаты как новый Packollection Отказ

Пожалуйста, прочитайте больше этого здесь Отказ

Группировка соответствующей информации в соответствующих ключах

На данный момент у нас есть список действительных строк, но нам нужно реорганизовать информацию по ключам, которые являются странами, на которые ссылаются такие строки. Например, если у нас есть три ряда, как следующее:

Испания (ES), 2.2, Джон Доу> Испания (ES), 2.9, Джон Уэйн> Великобритания (Великобритания), 4.2, Фрэнк Синатра

Нам нужно переставить такую информацию:

{
    "Spain (ES)": [2.2, 2.9],
    "United kingdom (UK)": [4.2]
}

Если мы сделаем это, у нас есть вся информация в хорошей форме, чтобы все расчеты нам нужны.

Вот так:

timings = (
    rows |
    apache_beam.ParDo(CollectTimings()) |
    "Grouping timings" >> apache_beam.GroupByKey() |
    "Calculating average" >> apache_beam.CombineValues(
        apache_beam.combiners.MeanCombineFn()
    )
)

users = (
    rows |
    apache_beam.ParDo(CollectUsers()) |
    "Grouping users" >> apache_beam.GroupByKey() |
    "Counting users" >> apache_beam.CombineValues(
        apache_beam.combiners.CountCombineFn()
    )
)

Классы Коллегирование и Коллекционировать В основном фильтруйте строки, которые представляют интерес для нашей цели. Они также переставят каждый из них в правильной форме, это что-то вроде:

(«Испания (ES)», 2.2)

На данный момент мы можем использовать Groupbykey Преобразовать, что создаст единую запись, что, невероятно, группирует всю информацию, которая разделяет ту же клавиши:

(«Испания (ES)», (2.2, 2.9))

Примечание. Ключ всегда первый элемент кортежа.

Самый последний недостающий бит логики для применения – это тот, который должен обрабатывать значения, связанные с каждым ключом. Встроенный преобразование – apache_beam.combinevalues , что в значительной степени я объясняет.

Логика, которые применяются apache_beam.combiners.mancombinefn и apache_beam.combiners.countCombinefn Соответственно: первые рассчитывают среднее значение арифметики, последний подсчитывает элемент набора.

Ради полноты, вот определение двух классов Коллегирование и Коллекционировать :

class CollectTimings(apache_beam.DoFn):

    def process(self, element):
        """
        Returns a list of tuples containing country and duration
        """

        result = [
            (element['country'], element['duration'])
        ]
        return result


class CollectUsers(apache_beam.DoFn):

    def process(self, element):
        """
        Returns a list of tuples containing country and user name
        """
        result = [
            (element['country'], element['user'])
        ]
        return result

Примечание. Операция применения несколько раз некоторых преобразований к данному Packollection генерирует несколько новых коллекций. Это называется Сборная ветвление Отказ Здесь очень хорошо представлено:

Источник: https://beam.apache.org/images/design-your-pipeline-multiple-pcollections.png

По сути, теперь у нас есть два набора информации – среднее время посещения для каждой страны и количество пользователей для каждой страны. То, что мы пропавшим, – это одна структура, содержащая всю необходимую информацию.

Также, сделав ветвление трубопроводов, нам нужно компенсировать данные. Мы можем сделать это с помощью CogroupbyKey , что не менее чем Присоединяйтесь к сделано на Два или более Коллекции, которые имеют одинаковые ключи.

Последние два преобразования являются теми, которые форматируют информацию CSV Записи, в то время как другой пишет их в файл.

После этого, в результате ?| yump.txt Файл будет содержать строки, как этот:

Италия (ИТ), 36,2.23611111111

Это означает, что 36 человек посетили сайт из Италии, расходующихся в среднем 2,23 секунды на сайте.

Входные данные

Данные, используемые для этого моделирования, были получены процедуры: 10 000 строк, максимум 200 разных пользователей, расходует от 1 до 5 секунд на сайте. Это было необходимо, чтобы иметь приблизительную оценку в результате полученных полученных ценностей. Новая статья о Тестирование трубопроводов Вероятно, последует.

Репозиторий GitHub

Репозиторий GitHub для этой статьи здесь Отказ

Файл readme.md содержит все необходимое, чтобы попробовать его локально.!