Рубрики
Без рубрики

Расширение Apache Pig с помощью Python UDFs

Apache Pig-популярная система для выполнения сложных потоков данных на основе карт Hadoop. Свинья особенно велика, потому что она расширяема. К концу этого урока вы сможете писать латинские скрипты Pig, которые выполняют код Python как часть более крупного рабочего процесса сокращения карт.

Автор оригинала: Sheena.

( источник изображения )

Вступление

Apache Pig -популярная система для выполнения сложных потоков данных на основе карт Hadoop. Он добавляет слой абстракции поверх механизмов mapreduce Hadoop, чтобы позволить разработчикам получить представление о данных и операциях с этими данными на высоком уровне. Свинья позволяет вам делать вещи более явно. Например, вы можете объединить два или более источника данных (так же, как соединение SQL). Написание соединения в виде функции map и reduce немного затягивает, и обычно этого стоит избегать. Таким образом, Pig великолепен, потому что он упрощает сложные задачи – он предоставляет высокоуровневый язык сценариев, который позволяет пользователям более полно представлять свой поток данных.

Свинья особенно велика, потому что она расширяема. Этот учебник будет посвящен его расширяемости. К концу этого урока вы сможете писать латинские скрипты Pig, которые выполняют код Python как часть более крупного рабочего процесса сокращения карт. Pig можно расширить и на другие языки, но пока мы будем придерживаться Python.

Прежде чем мы продолжим

Этот учебник опирается на кучу знаний. Это будет очень полезно, если вы немного знаете латынь питона и свиньи. Также будет полезно немного узнать о том, как работает mapreduce в контексте Hadoop.

Определяемые пользователем функции (UDFs)

UDF свиньи-это функция, доступная Свинье, но написанная на языке, который не является латиницей Свиньи. Pig позволяет регистрировать UDFS для использования в скрипте PigLatin. UDF должен соответствовать определенному прототипу – вы не можете просто написать свою функцию так, как хотите, потому что тогда Pig не будет знать, как вызывать вашу функцию, он не будет знать, какие аргументы ему нужны, и он не будет знать, какого возвращаемого значения ожидать. Существует несколько основных типов UDF:

Eval Udf

Это наиболее распространенный тип UDF. Он используется в операторах FOREACH type. Вот пример функции eval в действии:

users = LOAD 'user_data' AS (name: chararray);
upper_users = FOREACH users GENERATE my_udfs.to_upper_case(name);

Этот код довольно прост – Pig на самом деле не выполняет обработку строк, поэтому мы вводим UDF, который это делает. Есть некоторые недостающие части, к которым я перейду позже, в частности, как Свинья знает, что означает my_udfs и тому подобное.

Агрегация UDFs

Это всего лишь частный случай оценки UDF. Агрегатная функция обычно применяется к сгруппированным данным. Например:

user_sales = LOAD 'user_sales' AS (name: chararray, price: float);
grouped_sales = GROUP user_sales BY name;
number_of_sales = FOREACH grouped_sales GENERATE group, COUNT(user_sales);

Другими словами, совокупный UDF-это UDF, который используется для объединения нескольких фрагментов информации. Здесь мы агрегируем данные о продажах, чтобы показать, сколько покупок было совершено каждым пользователем.

Фильтр UDFs

Фильтр UDF возвращает логическое значение. Если у вас есть источник данных с кучей строк, и только часть этих строк полезна для текущего анализа, то какая-то функция фильтра будет полезна. Примером функции фильтра является следующее действие:

user_messages = LOAD 'user_twits' AS (name:chararray, message:chararray);
rude_messages = FILTER user_messages by my_udfs.contains_naughty_words(message);

Хватит разговоров, давайте закодируем

В этом разделе мы напишем пару UDF на Python и сделаем их доступными в скриптах PigLatin.

Вот о простейшем Python UDF, который вы можете написать:

from pig_util import outputSchema

@outputSchema('word:chararray')
def hi_world():
    return "hello world"

Вывод данных из функции имеет определенную форму. Свинье нравится, если вы указываете схему данных, потому что тогда она знает, что она может сделать с этими данными. Вот для чего нужен output_schema декоратор. Существует множество различных способов указать схему, мы немного поговорим об этом.

Теперь, если бы это было сохранено в файле под названием “my_udfs.py” вы могли бы использовать его в латинском алфавите Свиньи, как это:

-- first register it to make it available
REGISTER 'myudf.py' using jython as my_special_udfs

users = LOAD 'user_data' AS (name: chararray);
hello_users = FOREACH users GENERATE name, my_special_udfs.hi_world();

Указание схемы вывода UDF

Теперь UDF имеет вход и выход. Этот небольшой раздел посвящен выходам. Здесь мы рассмотрим различные способы, которыми вы можете указать формат вывода Python UDF с помощью схемы вывода декоратора. У нас есть несколько вариантов, вот они:

# our original udf
# it returns a single chararray (that's PigLatin for String)
@outputSchema('word:chararray')
def hi_world():
    return "hello world"
    
# this one returns a Python tuple. Pig recognises the first element 
# of the tuple as a chararray like before, and the next one as a 
# long (a kind of integer)
@outputSchema("word:chararray,number:long")
def hi_everyone():
  return "hi there", 15

#we can use outputSchema to define nested schemas too, here is a bag of tuples
@outputSchema('some_bag:bag{t:(field_1:chararray, field_2:int)}')
def bag_udf():
    return [
        ('hi',1000),
        ('there',2000),
        ('bill',0)
    ]

#and here is a map
@outputSchema('something_nice:map[]')
def my_map_maker():
    return {"a":"b", "c":"d", "e","f"}

Таким образом, схема вывода может использоваться для обозначения того, что функция выводит один или комбинацию основных типов. Эти типы:

  • массив символов: как строка
  • bytearray: набор байтов в строке. Как струна, но не так дружелюбно по-человечески
  • long: длинное целое число
  • int: нормальное целое число
  • double: число с плавающей запятой
  • дата и время
  • логический

Если схема не указана, то Pig предполагает, что UDF выводит массив байтов.

Аргументы UDF

У UDF есть не только выходы, но и входы! Это предложение должно быть подано в разделе “dah”. Я зарезервировал его для отдельного раздела, чтобы не загромождать обсуждение схем вывода. Эта часть довольно прямолинейна, поэтому я просто пройдусь по ней…

Сначала несколько UDF:

def deal_with_a_string(s1):
    return s1 + " for the win!"

def deal_with_two_strings(s1,s2):
    return s1 + " " + s2
    
def square_a_number(i):
    return i*i
    
def now_for_a_bag(lBag):
    lOut = []
    for i,l in enumerate(lBag):
        lNew = [i,] + l
        lOut.append(lNew)
    return lOut

И здесь мы используем эти UDF в латинском алфавите свиньи:

REGISTER 'myudf.py' using jython as myudfs

users = LOAD 'user_data' AS (firstname: chararray, lastname:chararray,some_integer:int);

winning_users    = FOREACH users GENERATE myudfs.deal_with_a_string(firstname);
full_names       = FOREACH users GENERATE myudfs.deal_with_two_strings(firstname,lastname);
squared_integers = FOREACH users GENERATE myudfs.square_a_number(some_integer);

users_by_number = GROUP users by some_integer;
indexed_users_by_number = FOREACH users_by_number GENERATE group,myudfs.now_for_a_bag(users);

Помимо стандартных UDFS Python

Есть несколько ошибок в использовании Python в форме UDF. Во-первых, несмотря на то, что мы пишем наши UDF на Python, Pig выполняет их в Jython. Jython-это реализация Python, которая работает на виртуальной машине Java (JVM). В большинстве случаев это не проблема, поскольку Jython стремится реализовать все те же функции CPython, но есть некоторые библиотеки, которые он не позволяет. Например, вы не можете использовать numpy из Python.

Кроме того, Pig на самом деле не допускает UDF-фильтров Python. Вы можете делать только такие вещи, как это:

user_messages = LOAD 'user_twits' AS (name:chararray, message:chararray);
--add a field that says whether it is naughty (1) or not (0)
messages_with_rudeness = FOREACH user_messages GENERATE name,message,contains_naughty_words(message) as naughty;     
--then filter by the naughty field
filtered_messages = FILTER messages_with_rudeness by (naughty==1);    
-- and finally strip away the naughty field                  
rude_messages = FOREACH filtered_messages GENERATE name,message;  

Python Streaming UDFs

Pig позволяет подключаться к потоковому API Hadoop, это позволяет нам обойти проблему Jython, когда нам это нужно. Если вы раньше не слышали о потоковой передаче Hadoop, вот в чем суть: Hadoop позволяет писать картографы и редукторы на любом языке, который дает вам доступ к stdin и stdout. Так что это практически любой язык, который вы хотите. Как Python 3 или даже Корова . Поскольку это учебник по Python, приведенные ниже примеры будут на Python, но вы можете подключить все, что захотите.

Вот простой потоковый скрипт Python, назовем его simple_stream.py :

#! /usr/bin/env python

import sys
import string

for line in sys.stdin:
    if len(line) == 0: continue   
    l = line.split()    #split the line by whitespace
    for i,s in enumerate(l):
        print "{key}\t{value}\n".format(key=i,value=s) # give out a key value pair for each word in the line

Цель состоит в том, чтобы заставить Hadoop запускать скрипт на каждом узле. Это означает, что строка хэширования ( #! ) должна быть действительна на каждом узле, все операторы импорта должны быть действительны на каждом узле (любые импортированные пакеты должны быть установлены на каждом узле); и любые другие файлы системного уровня или ресурсы, доступные в скрипте Python, должны быть доступны таким же образом на каждом узле.

Ладно, перейдем к свинячьим штучкам…

Чтобы сделать потоковое UDF доступным для Pig, мы используем оператор define . Вы можете прочитать все об этом здесь

Вот как мы можем использовать его с нашим скриптом simple_stream:

DEFINE stream_alias 'simple_stream.py' SHIP('simple_stream.py');
user_messages = LOAD 'user_twits' AS (name:chararray, message:chararray);
just_messages = FOREACH user_messages generate message;
streamed = STREAM just_messages THROUGH stream_alias;
DUMP streamed;

Давайте посмотрим на то, что ОПРЕДЕЛЯЕТ оператор немного ближе. Общий формат, который мы используем, таков:

DEFINE alias 'command' SHIP('files');

Псевдоним-это имя, которое мы используем для доступа к нашей потоковой функции из нашего латинского алфавита свиньи. Команда-это системная команда, которую Pig вызовет, когда ему нужно будет использовать нашу потоковую функцию. И, наконец, SHIP сообщает Pig, какие файлы и зависимости Pig должен распространять на узлы Hadoop, чтобы команда могла работать.

Затем, как только у нас есть ресурсы, которые мы хотим передать через нашу потоковую функцию, мы просто используем команду STREAM , как указано выше.

И это все

Ну, вроде того. Свиная латынь-довольно большая вещь, этот учебник едва коснулся поверхности ее возможностей. Если бы вся загрузка, обучение и тому подобное не имели для вас смысла, я бы предложил проверить более вводный учебник по Пиглатину, прежде чем возвращаться сюда. Этого урока должно быть достаточно, чтобы вы начали использовать Python из заданий Pig.

Python – это тоже довольно большая вещь. Понимание системы импорта Python действительно полезно, если вы хотите использовать Python в кластере Hadoop. Также стоит понять некоторые мелкие детали, например, как работают декораторы Python.

Есть также несколько более технических способов вызова Python из Pig, этот учебник призван стать введением в UDFs, а не окончательным руководством. Для получения дополнительных примеров и более подробных обсуждений различных декораторов и тому подобного, что Pig предоставляет UDFS на основе Jython, я бы предложил взглянуть на официальную документацию Pig.

Еще одна тема, затронутая лишь вкратце, касалась потоковой передачи Hadoop, это сама по себе мощная технология, но на самом деле довольно проста в использовании, как только вы начнете. Я много раз использовал потоковый API, не нуждаясь ни в чем таком сложном, как Pig Latin – стоит иметь возможность использовать этот API как автономную вещь.