В настоящее время, будучи способным обрабатывать огромные объемы данных, могут быть интересным навыком: аналитики, профилирование пользователей, статистика – практически любой бизнес, который требует экстраполяции информации от любых данных, так или иначе, используя некоторые большие инструменты для данных или платформы.
Одним из самых интересных инструментов является Beam Apache, каркас, который дает нам приборы для создания процедур для преобразования, процесса, совокупности и манипулирования данными для наших потребностей.
Давайте попробуем посмотреть, как мы можем использовать его в очень простом сценарии.
Контекст
Представьте, что у нас есть база данных с информацией о пользователях, посещающих веб-сайт, с каждой записью:
- Страна посещения пользователя
- Продолжительность визита
- имя пользователя
Мы хотим создать некоторые отчеты, содержащие:
- Для каждой страны Количество пользователей посещение сайта
- Для каждой страны среднее время посещения
Мы будем использовать Луч Apache , Google SDK (ранее называемый DataFlow), представляющий Модель программирования Направлен на упрощение механизма масштабной обработки данных.
Он был пожертвован на Фонду Apache и называемый лучом, потому что он способен обрабатывать данные в любой необходимости в нужной форме: партии и Потоки (луч). Это дает вам возможность определить трубопроводы Для обработки данных в режиме реального времени ( потоки ) и исторические данные ( партии ).
Определение трубопровода полностью рассмотрено контекстом, который вы будете использовать, чтобы запустить его, поэтому Beam дает вам возможность выбрать один из поддерживаемых бегунов, которые вы можете использовать:
- Модель луча: локальное исполнение вашего трубопровода
- Google Cloud Dataflow: DataFlow в качестве службы
- Apache Flink.
- Apache Spark.
- Apache GearPump
- Apache Hadoop MapReduce
- Jstorm.
- IBM Streams
Мы будем запустить модель Beam One, что в основном выполняет все на локальной машине.
Модель программирования
Хотя это не будет глубоким объяснением модели программирования DataFlow, необходимо понять, какой является трубопровод: набор манипуляций, выполненных на наборе входных данных, которые обеспечивают новый набор данных. Точнее, трубопровод изготовлен из трансформирует применяется к коллекции.
Прямо из Веб-сайт Apache Beam :
Трубопровод инкапсулирует все ваши задачи обработки данных, от начала до конца. Это включает в себя данные ввода для чтения, преобразующие эти данные и запись выходных данных.
Трубопровод получает данные, вводимые извне и представляют его как Коллекции (формально названо Pholelection
ы), каждый из них
потенциально распределенный, многоэлементный, набор данных
Когда один или несколько Трансформировать
S применяется к Packollection
, новый Packollection
Сгенерировано (и по этой причине в результате оформление Pholelection
S ж неизменные объекты).
Первый и последний шаг трубопровода, конечно, те, которые могут читать и записывать данные в и из нескольких хранилищ – вы можете найти список здесь Отказ
Приложение
У нас будут данные в CSV
Файл, поэтому первое, что нам нужно сделать, это прочитать содержимое файла и предоставить структурированное представление всех строк.
Общий ряд CSV
Файл будет похоже на следующее:
United States Of America, 0.5, John Doe
С колоннами, являющимися Страна , Время посещения … в считанные секунды и Имя пользователя , соответственно.
Учитывая данные, которые мы хотим предоставить, давайте посмотрим, что будет делать наш трубопровод и как.
Прочитайте набор входных данных
Первый шаг будет читать входной файл.
with apache_beam.Pipeline(options=options) as p: rows = ( p | ReadFromText(input_filename) | apache_beam.ParDo(Split()) )
В приведенном выше контексте P
это пример apache_beam.pipeline
И первое, что мы делаем, это применить встроенный преобразование, apache_beam.io.textio.readfromText
Это загрузит содержимое файла в Packollection
Отказ После этого мы применяем конкретную логику, Сплит
, чтобы обработать каждую строку в входном файле и предоставить более удобное представление (словарь, конкретно).
Вот …| Сплит Функция:
class Split(apache_beam.DoFn): def process(self, element): country, duration, user = element.split(",") return [{ 'country': country, 'duration': float(duration), 'user': user }]
Pardo
Преобразование – это ядро, и, согласно официальной документации Apache Beam Documentation:
Pardo
Полезно для различных общих операций обработки данных, в том числе:
- Фильтрация набора данных. Вы можете использовать
Pardo
Рассмотреть каждый элемент вPackollection
и либо выведите этот элемент к новой коллекции или отказаться от него. - Форматирование или тип преобразования каждого элемента в наборе данных. Если ваш ввод
Packollection
Содержит элементы, которые имеют другой тип или формат, чем вы хотите, вы можете использоватьPardo
выполнять преобразование на каждый элемент и вывести результат к новомуPackollection
Отказ - Извлечение частей каждого элемента в наборе данных. Если у вас есть
Packollection
Записи с несколькими полями, например, вы можете использоватьPardo
Чтобы разбирать только поля, которые вы хотите рассмотреть в новыйPackollection
Отказ - Выполнение вычислений на каждом элементе в наборе данных. Вы можете использовать
Pardo
выполнять простые или сложные вычисления на каждом элементе или определенные элементы, изPackollection
и выводить результаты как новыйPackollection
Отказ
Пожалуйста, прочитайте больше этого здесь Отказ
Группировка соответствующей информации в соответствующих ключах
На данный момент у нас есть список действительных строк, но нам нужно реорганизовать информацию по ключам, которые являются странами, на которые ссылаются такие строки. Например, если у нас есть три ряда, как следующее:
Испания (ES), 2.2, Джон Доу> Испания (ES), 2.9, Джон Уэйн> Великобритания (Великобритания), 4.2, Фрэнк Синатра
Нам нужно переставить такую информацию:
{ "Spain (ES)": [2.2, 2.9], "United kingdom (UK)": [4.2] }
Если мы сделаем это, у нас есть вся информация в хорошей форме, чтобы все расчеты нам нужны.
Вот так:
timings = ( rows | apache_beam.ParDo(CollectTimings()) | "Grouping timings" >> apache_beam.GroupByKey() | "Calculating average" >> apache_beam.CombineValues( apache_beam.combiners.MeanCombineFn() ) ) users = ( rows | apache_beam.ParDo(CollectUsers()) | "Grouping users" >> apache_beam.GroupByKey() | "Counting users" >> apache_beam.CombineValues( apache_beam.combiners.CountCombineFn() ) )
Классы Коллегирование
и Коллекционировать
В основном фильтруйте строки, которые представляют интерес для нашей цели. Они также переставят каждый из них в правильной форме, это что-то вроде:
(«Испания (ES)», 2.2)
На данный момент мы можем использовать Groupbykey
Преобразовать, что создаст единую запись, что, невероятно, группирует всю информацию, которая разделяет ту же клавиши:
(«Испания (ES)», (2.2, 2.9))
Примечание. Ключ всегда первый элемент кортежа.
Самый последний недостающий бит логики для применения – это тот, который должен обрабатывать значения, связанные с каждым ключом. Встроенный преобразование – apache_beam.combinevalues
, что в значительной степени я объясняет.
Логика, которые применяются apache_beam.combiners.mancombinefn
и apache_beam.combiners.countCombinefn
Соответственно: первые рассчитывают среднее значение арифметики, последний подсчитывает элемент набора.
Ради полноты, вот определение двух классов Коллегирование
и Коллекционировать
:
class CollectTimings(apache_beam.DoFn): def process(self, element): """ Returns a list of tuples containing country and duration """ result = [ (element['country'], element['duration']) ] return result class CollectUsers(apache_beam.DoFn): def process(self, element): """ Returns a list of tuples containing country and user name """ result = [ (element['country'], element['user']) ] return result
Примечание. Операция применения несколько раз некоторых преобразований к данному Packollection
генерирует несколько новых коллекций. Это называется Сборная ветвление Отказ Здесь очень хорошо представлено:
Источник: https://beam.apache.org/images/design-your-pipeline-multiple-pcollections.png
По сути, теперь у нас есть два набора информации – среднее время посещения для каждой страны и количество пользователей для каждой страны. То, что мы пропавшим, – это одна структура, содержащая всю необходимую информацию.
Также, сделав ветвление трубопроводов, нам нужно компенсировать данные. Мы можем сделать это с помощью CogroupbyKey
, что не менее чем Присоединяйтесь к сделано на Два или более Коллекции, которые имеют одинаковые ключи.
Последние два преобразования являются теми, которые форматируют информацию CSV
Записи, в то время как другой пишет их в файл.
После этого, в результате ?| yump.txt Файл будет содержать строки, как этот:
Италия (ИТ), 36,2.23611111111
Это означает, что 36 человек посетили сайт из Италии, расходующихся в среднем 2,23 секунды на сайте.
Входные данные
Данные, используемые для этого моделирования, были получены процедуры: 10 000 строк, максимум 200 разных пользователей, расходует от 1 до 5 секунд на сайте. Это было необходимо, чтобы иметь приблизительную оценку в результате полученных полученных ценностей. Новая статья о Тестирование трубопроводов Вероятно, последует.
Репозиторий GitHub
Репозиторий GitHub для этой статьи здесь Отказ
Файл readme.md содержит все необходимое, чтобы попробовать его локально.!