Рубрики
Без рубрики

Начало работы с балкой Apache для распределенной обработки данных

Запустите вашу первую большую работу обработки данных в Google Cloud Dataflow с Apache Beam Python SDK. Теги с лучом, Python, учебником, DataFlow.

Maprecuce Был революционным, когда он был впервые опубликован в 2004 году. Он предоставил программированную модель для наборов данных пакетных обработчиков с терабайтами данных. Mapreatuce был построен на трех, казалось бы, простых этапах: карта, сортировка и уменьшение. Он использовал общего назначения HDFS (Hadoop Распределенная файловая система) Файловая система для ввода/вывода и, следовательно, была способна обработать практически любые данные.

Вакансии в Maprectuce были общежития, чтобы написать. API на высоком уровне, такие как Улей а также Свинья Предоставил API на более высокий уровень упаковки API и сделали его намного легче получить вещи, сделанные в MapReduce.

Тем не менее, модель Maprecuce имел другие недостатки. Например, жесткая карта-сортировка-уменьшение потока не является оптимальным для каждого вида работы: фаза сортировки часто не нужна, а иногда это будет гораздо более полезно для цепных редукторов непосредственно без новой фазы сортировки карты. MapReduce также был построен для толерантности неисправностей, который на самом деле не требуется за пределами Google Scale: он настаивает на том, чтобы написать все промежуточное состояние на HDFS, что делает более медленнее и принимает много места для хранения.

Новые рамки и модели программирования, такие как Apache Spark , Apache Tez. и Apache Flink появился, чтобы решить эти короткие сочетания. SPARK делает все возможное, чтобы сохранить данные рядом с исполнителями или даже в памяти между задачами, которые могут много ускорить. Это абстракции набора данных, такие как Устойчивый распределенный набор данных (RDD) и DataSet Также намного легче рассуждать и написать программы, работающие в распределенной настройке.

Apache Beam Еще одна абстракция для массивно параллельных рабочих мест обработки. Луч позволяет объявлять оба пакетных, так и потоковых заданий в унифицированной моде, и они могут работать в каком-либо исполнении, например, Spark, Flink, Google Cloud Dataflow , Apache Samza или Twister 2 Отказ

В этой статье мы объявим перерабатывающий трубопровод с Python Python Python Apache Picon и выполнить трубопровод в DataFlow. В качестве альтернативы вы можете выполнить конвейер в локальной машине.

Настраивать

Код для примера можно найти в WordCount/ папка Этот репозиторий Отказ Начать, перейти к папке и установить требования к

$ pip install -r requirements.txt

Вы, вероятно, захотите создать виртуальную среду перед запуском команды. Вы также можете установить Apache Beam Python SDK напрямую с PIP Установите Apache-Beam [GCP] , где GCP Пакет включает в себя все необходимое для использования Runner Cloud Dataflow Cloud Dataflow в качестве операционного двигателя.

Настройка Google Cloud

ПРИМЕЧАНИЕ. Запуск трубопровода потребуется заряды на вашей учетной записи Google Cloud. Видеть Ценообразование Dataflow . Для меня запуск этого трубопровода несколько раз понесенные заряды составляет 0,01 доллара (охватываются кредитами свободного уровня).

Если вы хотите выполнить конвейер в DataFlow, вам нужно будет выполнить следующие шаги:

  1. Создать Google Cloud аккаунт и проект в пределах аккаунта в Консоль Отказ
  2. Установить и настроить Google Cloud CLI, если вы хотите управлять ресурсами из командной строки.
  3. Включить API DataFlow Как указано здесь. Вам также может потребоваться включить другие API: вы получите подробные инструкции, когда ваш трубопровод не удается.
  4. Создайте ведро GCP для записи вывода, желательно в том же регион где вы запускаете DataFlow. Создать ведро из командной строки, запустите: GSUTIL MB -P Project_ID -C Стандартный регион --b на GS://Bucket_name Отказ Заполните идентификатор проекта, область и имя ведра здесь.
  5. Создайте учетную запись услуг Загрузите ключ JSON и сохраните его где-нибудь в машине.

Запуск трубопровода

Определение трубопровода находится в main.py Отказ Файл почти идентичен wordcount_minim.py от примеров луча. Мы скоро пойдем через код трубопровода, но если вы спешите, вот детали, как выполнить трубопровод первый локально с локальным исполнением двигателя ( DirectRunner ) без учетной записи Google Cloud:

$ python main.py --runner DirectRunner --input gs://dataflow-samples/shakespeare/kinglear.txt --output output/counts

Следует лишь занять несколько секунд для выполнения трубопровода. Проверьте вывод в Выход/счетчик папка:

$ cat output/counts-00000-of-00001 | head -n 5
KING: 243
LEAR: 236
DRAMATIS: 1
PERSONAE: 1
king: 65

Чтобы запустить тот же скрипт с Dataflowrunner. , запись вывода в ведро GCS:

$ export GOOGLE_APPLICATION_CREDENTIALS=/path/to/credentials.json
$ export GCP_PROJECT=your-gcp-project-11111111
$ export GCP_BUCKET=your-gcp-bucket
$ export GCP_REGION=europe-west-1
$ python main.py --runner DataflowRunner --project ${GCP_PROJECT} --region=${GCP_REGION} --staging_location=gs://${GCP_BUCKET}/staging --temp_location gs://${GCP_BUCKET}/temp --job_name wordcount-job --input gs://dataflow-samples/shakespeare/kinglear.txt --output gs://${GCP_BUCKET}/output/counts

Запуск трубопровода в облаке может занять до трех минут до конца.

Понять трубопровод

Полное определение трубопровода здесь (без разборки аргументов и т. Д.):

import re
from past.builtins import unicode

import apache_beam as beam

from apache_beam.options.pipeline_options import (
    PipelineOptions,
)

from apache_beam.io import ReadFromText, WriteToText

with beam.Pipeline(options=options) as p:

    lines = p | ReadFromText(input_file)

    counts = (
        lines
        | "Split"
        >> (
            beam.FlatMap(lambda x: re.findall(r"[A-Za-z\']+", x)).with_output_types(
                unicode
            )
        )
        | "PairWithOne" >> beam.Map(lambda x: (x, 1))
        | "GroupAndSum" >> beam.CombinePerKey(sum)
    )

    def format_result(word_count):
        (word, count) = word_count
        return "%s: %s" % (word, count)

    output = counts | "Format" >> beam.Map(format_result)
    output | WriteToText(output_file)

Давайте начнем с создания трубопровода. Объект трубопровода создан с Варианты Указание, например, конфигурация для выполнения двигателя. Трубопровод используется в качестве менеджера контекста С) как p так что это может быть выполнен в __exit__ Отказ

Мы начинаем трубопровод с ReadFromText от apache_beam.io упаковка:

lines = p | ReadFromText(input_file)

ReadFromText Возвращает Packollection , который является лучшим сроком для DataSet :

Плоскостроение представляет собой потенциально распределенный многоэлементный набор данных, который действует как данные трубопровода. Apache Beam Transforms Используйте объекты PCOLLECUTER в качестве входных данных и выходов для каждого шага в вашем трубопроводе. Pcollection может удерживать набор данных фиксированного размера или неограниченным набором данных из непрерывно обновления источника данных.

ReadFromText Сама – это Ptransform :

Преобразование представляет собой операцию обработки, которая преобразует данные. Преобразование принимает одну или несколько PhoLlections в качестве входа, выполняет операцию, которую вы указываете на каждом элементе в этой коллекции, и производит один или несколько Phoillections в качестве вывода. Преобразование может выполнять почти любой вид операции обработки, включая выполнение математических вычислений на данные, преобразовывать данные из одного формата в другую, группировать данные вместе, считывание и запись данных, фильтрацию данных для вывода только элементов, которые вы хотите или объединяют элементы данных в отдельные значения.

В Python SDK преобразования приковываются вертикальной баром |. . Посмотреть здесь Для определения __OR__ Отказ

Так что сейчас линии это Packollection Содержащие все строки в входном текстовом файле. Вот следующие шаги:

counts = (
    lines
    | "Split"
    >> (
        beam.FlatMap(lambda x: re.findall(r"[A-Za-z\']+", x)).with_output_types(
            unicode
        )
    )
    | "PairWithOne" >> beam.Map(lambda x: (x, 1))
    | "GroupAndSum" >> beam.CombinePerKey(sum)
)

Оператор битового сдвига – Переопределен Определение __rrshift__ для Ptransform разрешить название этому. В «Сплит» Преобразование, каждая линия разделена на слова. Эта коллекция коллекций сплющена в коллекцию с луч. Плоскость . «Парвитоне» Преобразовать карты каждое слово в кортеж (х, 1) Отказ Первый элемент – это ключ, а второй элемент является значением. Пары ключа-значений затем подают в преобразование «GroupAndsum», где все значения суммируются ключом. Это распаралленнее количество слов!

Наконец, выход отформатирован и написан:

def format_result(word_count):
    (word, count) = word_count
    return "%s: %s" % (word, count)

output = counts | "Format" >> beam.Map(format_result)
output | WriteToText(output_file)

«Формат» Преобразовать карты каждый (Слово, счет) Пара с format_result функция. Выход записан на emput_file с WriteTotext преобразовать.

Заключение

Это завершает мой быстрый запуск балки Apache. Это очень многообещающий проект, и есть много, чтобы учиться. Пожалуйста, оставьте комментарии или вопросы, если у вас есть, большое спасибо за чтение!

Оригинал: “https://dev.to/ksaaskil/getting-started-with-apache-beam-for-distributed-data-processing-3i2d”