Рубрики
Без рубрики

Spark & Python: Работа с RDDs (II)

Это учебник Spark и Python, который научит вас работать с RDDs (часть II).

Автор оригинала: Jose A Dianes.

Инструкции

Моя серия учебников Spark & Python может быть рассмотрена индивидуально, хотя существует более или менее линейная “история”, если следовать ей последовательно. Используя один и тот же набор данных, они пытаются решить с ним связанный набор задач.

Это не единственный способ, но хороший способ следовать этим учебникам Spark-сначала клонировать репо GitHub , а затем запускать свой собственный IPython notebook в режиме PySpark . Например, если у нас есть автономная Spark установка, работающая в нашем localhost с максимальным объемом 6 Гб на узел, назначенный IPython:

MASTER="spark://127.0.0.1:7077" SPARK_EXECUTOR_MEMORY="6G" IPYTHON_OPTS="notebook --pylab inline" ~/spark-1.3.1-bin-hadoop2.6/bin/pyspark

Обратите внимание, что путь к команде pyspark будет зависеть от вашей конкретной установки. Таким образом, в качестве требования вам нужно установить Spark на той же машине, на которой вы собираетесь запустить IPython notebook server.

Дополнительные параметры Spark см. в разделе здесь . В общем случае работает правило передачи опций, описанное в виде spark.executor.memory as SPARK_EXECUTOR_MEMORY при вызове IPython/PySpark.

Наборы данных

Мы будем использовать наборы данных из KDD Cup 1999 .

Рекомендации

Справочником по этим и другим темам, связанным с Spark, является Learning Spark Holden Karau, Andy Konwinski, Patrick Wendell и Matei Zaharia.

Набор данных соревнований KDD Cup 1999 подробно описан здесь .

Агрегирование данных на RDDs

Мы можем агрегировать данные RDD в Spark с помощью трех различных действий: reduce , fold и aggregate . Последний из них является более общим и каким-то образом включает в себя первые два.

Получение данных и создание RDD

В этом разделе мы будем использовать сокращенный набор данных (10 процентов), предоставленный для KDD Cup 1999 , содержащий почти полмиллиона сетевых взаимодействий. Файл предоставляется в виде файла Gzip, который мы будем загружать локально.

import urllib
f = urllib.urlretrieve ("http://kdd.ics.uci.edu/databases/kddcup99/kddcup.data_10_percent.gz", "kddcup.data_10_percent.gz")


data_file = "./kddcup.data_10_percent.gz"
raw_data = sc.textFile(data_file)

Проверка длительности взаимодействия по тегу

И fold , и reduce принимают функцию в качестве аргумента, который применяется к двум элементам RDD. Действие fold отличается от reduce тем, что оно получает и дополнительное начальное нулевое значение , которое будет использоваться для начального вызова. Это значение должно быть элементом идентификации для предоставленной функции.

В качестве примера представьте, что мы хотим знать общую продолжительность наших взаимодействий для нормальных и атакующих взаимодействий. Мы можем использовать reduce следующим образом.

# parse data
csv_data = raw_data.map(lambda x: x.split(","))

# separate into different RDDs
normal_csv_data = csv_data.filter(lambda x: x[41]=="normal.")
attack_csv_data = csv_data.filter(lambda x: x[41]!="normal.")

Функция, которую мы передаем в reduce , получает и возвращает элементы того же типа RDD. Если мы хотим суммировать длительности, нам нужно извлечь этот элемент в новый RDD.

normal_duration_data = normal_csv_data.map(lambda x: int(x[0]))
attack_duration_data = attack_csv_data.map(lambda x: int(x[0]))

Теперь мы можем уменьшить эти новые RDDS.

total_normal_duration = normal_duration_data.reduce(lambda x, y: x + y)
total_attack_duration = attack_duration_data.reduce(lambda x, y: x + y)

print "Total duration for 'normal' interactions is {}".\
    format(total_normal_duration)
print "Total duration for 'attack' interactions is {}".\
    format(total_attack_duration)
Total duration for 'normal' interactions is 21075991  
Total duration for 'attack' interactions is 2626792  

Мы можем пойти дальше и использовать подсчеты для вычисления средних продолжительности.

normal_count = normal_duration_data.count()
attack_count = attack_duration_data.count()

print "Mean duration for 'normal' interactions is {}".\
    format(round(total_normal_duration/float(normal_count),3))
print "Mean duration for 'attack' interactions is {}".\
    format(round(total_attack_duration/float(attack_count),3))
Mean duration for 'normal' interactions is 216.657  
Mean duration for 'attack' interactions is 6.621  

У нас есть первый (и слишком упрощенный) подход к выявлению атакующих взаимодействий.

Лучший Способ-Использование aggregate

Действие aggregate освобождает нас от ограничения того, что возврат должен быть того же типа, что и RDD, над которым мы работаем. Как и в случае с fold , мы предоставляем начальное нулевое значение типа, который хотим вернуть. Затем мы предоставляем две функции. Первый используется для объединения элементов нашего РДД с аккумулятором. Вторая функция необходима для объединения двух аккумуляторов. Давайте посмотрим на это в действии, вычисляя среднее значение, которое мы делали раньше.

normal_sum_count = normal_duration_data.aggregate(
    (0,0), # the initial value
    (lambda acc, value: (acc[0] + value, acc[1] + 1)), # combine val/acc
    (lambda acc1, acc2: (acc1[0] + acc2[0], acc1[1] + acc2[1]))
)

print "Mean duration for 'normal' interactions is {}".\
    format(round(normal_sum_count[0]/float(normal_sum_count[1]),3))
Mean duration for 'normal' interactions is 216.657  

В предыдущей агрегации первый элемент накопителя сохраняет общую сумму, а второй элемент-счет. Объединение аккумулятора с элементом RDD состоит в суммировании значения и увеличении количества. Объединение двух аккумуляторов требует только попарной суммы.

Мы можем сделать то же самое с взаимодействиями типа атаки.

attack_sum_count = attack_duration_data.aggregate(
    (0,0), # the initial value
    (lambda acc, value: (acc[0] + value, acc[1] + 1)), # combine value with acc
    (lambda acc1, acc2: (acc1[0] + acc2[0], acc1[1] + acc2[1])) # combine accumulators
)

print "Mean duration for 'attack' interactions is {}".\
    format(round(attack_sum_count[0]/float(attack_sum_count[1]),3))
Mean duration for 'attack' interactions is 6.621  

Работа с парой Ключ/значение RDDs

Spark предоставляет специальные функции для работы с RDDS, элементы которых являются парами ключ/значение. Они обычно используются для выполнения агрегаций и других обработок по ключу.

В этом разделе мы покажем, как, работая с парами ключ/значение, мы можем обрабатывать наш набор данных сетевых взаимодействий более практичным и мощным способом, чем тот, который использовался в предыдущих блокнотах. Агрегация пар ключ/значение будет особенно эффективна при попытке исследовать каждый тип тега в наших сетевых атаках индивидуально.

Получение данных и создание RDD

Как и в первом разделе, мы будем использовать сокращенный набор данных (10%), предоставленный для KDD Cup 1999 , содержащий почти полмиллиона сетевых взаимодействий. Файл предоставляется в виде файла Gzip, который мы будем загружать локально.

import urllib
f = urllib.urlretrieve ("http://kdd.ics.uci.edu/databases/kddcup99/kddcup.data_10_percent.gz", "kddcup.data_10_percent.gz")

data_file = "./kddcup.data_10_percent.gz"
raw_data = sc.textFile(data_file)

Создание пары RDD для типов взаимодействия

В этой записной книжке мы хотим провести некоторый исследовательский анализ данных нашего набора данных сетевых взаимодействий. Более конкретно мы хотим охарактеризовать каждый тип сетевого взаимодействия с точки зрения некоторых его переменных, таких как продолжительность. Для этого нам сначала нужно создать подходящий для этого RDD, где каждое взаимодействие анализируется как строка CSV, представляющая значение, и помещается вместе с соответствующим тегом в качестве ключа.

Обычно мы создаем пару ключ/значение RDDS, применяя функцию с помощью map к исходным данным. Эта функция возвращает соответствующую пару для данного элемента RDD. Мы можем действовать следующим образом.

csv_data = raw_data.map(lambda x: x.split(","))
key_value_data = csv_data.map(lambda x: (x[41], x)) # x[41] contains the network interaction tag

Теперь у нас есть готовые к использованию данные пары ключ/значение. Давайте возьмем первый элемент, чтобы посмотреть, как он выглядит.

  key_value_data.take(1)
[(u'normal.',
[u'0',
 u'tcp',
 u'http',
 u'SF',
 u'181',
 u'5450',
 u'0',
 u'0',
 u'0',
 u'0',
 u'0',
 u'1',
 u'0',
 u'0',
 u'0',
 u'0',
 u'0',
 u'0',
 u'0',
 u'0',
 u'0',
 u'0',
 u'8',
 u'8',
 u'0.00',
 u'0.00',
 u'0.00',
 u'0.00',
 u'1.00',
 u'0.00',
 u'0.00',
 u'9',
 u'9',
 u'1.00',
 u'0.00',
 u'0.11',
 u'0.00',
 u'0.00',
 u'0.00',
 u'0.00',
 u'0.00',
 u'normal.'])]

Агрегирование данных с помощью пары Ключ/значение RDDS

Мы можем использовать все преобразования и действия, доступные для обычных RDDS с парой ключ/значение RDDS. Нам просто нужно заставить функции работать с парными элементами. Кроме того, Spark предоставляет специальные функции для работы с RDDS, содержащими парные элементы. Они очень похожи на те, которые доступны для общих RDDS.

Например, у нас есть преобразование reduceByKey , которое мы можем использовать следующим образом для вычисления общей продолжительности каждого типа сетевого взаимодействия.

key_value_duration = csv_data.map(lambda x: (x[41], float(x[0]))) 
durations_by_key = key_value_duration.reduceByKey(lambda x, y: x + y)

durations_by_key.collect()
[(u'guess_passwd.', 144.0),
 (u'nmap.', 0.0),
 (u'warezmaster.', 301.0),
 (u'rootkit.', 1008.0),
 (u'warezclient.', 627563.0),
 (u'smurf.', 0.0),
 (u'pod.', 0.0),
 (u'neptune.', 0.0),
 (u'normal.', 21075991.0),
 (u'spy.', 636.0),
 (u'ftp_write.', 259.0),
 (u'phf.', 18.0),
 (u'portsweep.', 1991911.0),
 (u'teardrop.', 0.0),
 (u'buffer_overflow.', 2751.0),
 (u'land.', 0.0),
 (u'imap.', 72.0),
 (u'loadmodule.', 326.0),
 (u'perl.', 124.0),
 (u'multihop.', 1288.0),
 (u'back.', 284.0),
 (u'ipsweep.', 43.0),
 (u'satan.', 64.0)]

У нас есть определенное подсчетное действие для пар ключ/значение.

counts_by_key = key_value_data.countByKey()
counts_by_key

defaultdict(, {u'guess_passwd.': 53, u'nmap.': 231, u'warezmaster.': 20, u'rootkit.': 10, u'warezclient.': 1020, u'smurf.': 280790, u'pod.': 264, u'neptune.': 107201, u'normal.': 97278, u'spy.': 2, u'ftp_write.': 8, u'phf.': 4, u'portsweep.': 1040, u'teardrop.': 979, u'buffer_overflow.': 30, u'land.': 21, u'imap.': 12, u'loadmodule.': 9, u'perl.': 3, u'multihop.': 7, u'back.': 2203, u'ipsweep.': 1247, u'satan.': 1589})

Использование combineByKey

Это наиболее общая из функций агрегирования по ключам. Большинство других комбинаторов для каждого ключа реализованы с его помощью. Мы можем думать о нем как о эквиваленте aggregate , поскольку он позволяет пользователю возвращать значения, которые не являются тем же типом, что и наши входные данные.

Например, мы можем использовать его для расчета средней продолжительности каждого типа следующим образом.

sum_counts = key_value_duration.combineByKey(
    (lambda x: (x, 1)), # the initial value, with value x and count 1
    (lambda acc, value: (acc[0]+value, acc[1]+1)), # how to combine a pair value with the accumulator: sum value, and increment count
    (lambda acc1, acc2: (acc1[0]+acc2[0], acc1[1]+acc2[1])) # combine accumulators
)

sum_counts.collectAsMap()
{u'back.': (284.0, 2203),
u'buffer_overflow.': (2751.0, 30),
u'ftp_write.': (259.0, 8),
u'guess_passwd.': (144.0, 53),
u'imap.': (72.0, 12),
u'ipsweep.': (43.0, 1247),
u'land.': (0.0, 21),
u'loadmodule.': (326.0, 9),
u'multihop.': (1288.0, 7),
u'neptune.': (0.0, 107201),
u'nmap.': (0.0, 231),
u'normal.': (21075991.0, 97278),
u'perl.': (124.0, 3),
u'phf.': (18.0, 4),
u'pod.': (0.0, 264),
u'portsweep.': (1991911.0, 1040),
u'rootkit.': (1008.0, 10),
u'satan.': (64.0, 1589),
u'smurf.': (0.0, 280790),
u'spy.': (636.0, 2),
u'teardrop.': (0.0, 979),
u'warezclient.': (627563.0, 1020),
u'warezmaster.': (301.0, 20)}

Мы видим, что аргументы очень похожи на те, которые были переданы в aggregate в предыдущей записной книжке. Результат, связанный с каждым типом, представлен в виде пары. Если мы действительно хотим получить средние значения, нам нужно сделать деление, прежде чем собирать результаты.

duration_means_by_type = sum_counts.map(lambda (key,value): (key, round(value[0]/value[1],3))).collectAsMap()

# Print them sorted
for tag in sorted(duration_means_by_type, key=duration_means_by_type.get, reverse=True):
    print tag, duration_means_by_type[tag]
portsweep. 1915.299  
warezclient. 615.258  
spy. 318.0  
normal. 216.657  
multihop. 184.0  
rootkit. 100.8  
buffer_overflow. 91.7  
perl. 41.333  
loadmodule. 36.222  
ftp_write. 32.375  
warezmaster. 15.05  
imap. 6.0  
phf. 4.5  
guess_passwd. 2.717  
back. 0.129  
satan. 0.04  
ipsweep. 0.034  
nmap. 0.0  
smurf. 0.0  
pod. 0.0  
neptune. 0.0  
teardrop. 0.0  
land. 0.0  

Небольшой шаг к пониманию того, что заставляет сетевое взаимодействие считаться атакой.