Автор оригинала: Jose A Dianes.
Инструкции
Моя серия учебников Spark & Python может быть рассмотрена индивидуально, хотя существует более или менее линейная “история”, если следовать ей последовательно. Используя один и тот же набор данных, они пытаются решить с ним связанный набор задач.
Это не единственный способ, но хороший способ следовать этим учебникам Spark-сначала клонировать репо GitHub , а затем запускать свой собственный IPython notebook в режиме PySpark . Например, если у нас есть автономная Spark установка, работающая в нашем localhost
с максимальным объемом 6 Гб на узел, назначенный IPython:
MASTER="spark://127.0.0.1:7077" SPARK_EXECUTOR_MEMORY="6G" IPYTHON_OPTS="notebook --pylab inline" ~/spark-1.3.1-bin-hadoop2.6/bin/pyspark
Обратите внимание, что путь к команде pyspark
будет зависеть от вашей конкретной установки. Таким образом, в качестве требования вам нужно установить Spark на той же машине, на которой вы собираетесь запустить IPython notebook
server.
Дополнительные параметры Spark см. в разделе здесь . В общем случае работает правило передачи опций, описанное в виде spark.executor.memory
as SPARK_EXECUTOR_MEMORY
при вызове IPython/PySpark.
Наборы данных
Мы будем использовать наборы данных из KDD Cup 1999 .
Рекомендации
Справочником по этим и другим темам, связанным с Spark, является Learning Spark Holden Karau, Andy Konwinski, Patrick Wendell и Matei Zaharia.
Набор данных соревнований KDD Cup 1999 подробно описан здесь .
Агрегирование данных на RDDs
Мы можем агрегировать данные RDD в Spark с помощью трех различных действий: reduce
, fold
и aggregate
. Последний из них является более общим и каким-то образом включает в себя первые два.
Получение данных и создание RDD
В этом разделе мы будем использовать сокращенный набор данных (10 процентов), предоставленный для KDD Cup 1999 , содержащий почти полмиллиона сетевых взаимодействий. Файл предоставляется в виде файла Gzip, который мы будем загружать локально.
import urllib f = urllib.urlretrieve ("http://kdd.ics.uci.edu/databases/kddcup99/kddcup.data_10_percent.gz", "kddcup.data_10_percent.gz") data_file = "./kddcup.data_10_percent.gz" raw_data = sc.textFile(data_file)
Проверка длительности взаимодействия по тегу
И fold
, и reduce
принимают функцию в качестве аргумента, который применяется к двум элементам RDD. Действие fold
отличается от reduce
тем, что оно получает и дополнительное начальное нулевое значение , которое будет использоваться для начального вызова. Это значение должно быть элементом идентификации для предоставленной функции.
В качестве примера представьте, что мы хотим знать общую продолжительность наших взаимодействий для нормальных и атакующих взаимодействий. Мы можем использовать reduce
следующим образом.
# parse data csv_data = raw_data.map(lambda x: x.split(",")) # separate into different RDDs normal_csv_data = csv_data.filter(lambda x: x[41]=="normal.") attack_csv_data = csv_data.filter(lambda x: x[41]!="normal.")
Функция, которую мы передаем в reduce
, получает и возвращает элементы того же типа RDD. Если мы хотим суммировать длительности, нам нужно извлечь этот элемент в новый RDD.
normal_duration_data = normal_csv_data.map(lambda x: int(x[0])) attack_duration_data = attack_csv_data.map(lambda x: int(x[0]))
Теперь мы можем уменьшить эти новые RDDS.
total_normal_duration = normal_duration_data.reduce(lambda x, y: x + y) total_attack_duration = attack_duration_data.reduce(lambda x, y: x + y) print "Total duration for 'normal' interactions is {}".\ format(total_normal_duration) print "Total duration for 'attack' interactions is {}".\ format(total_attack_duration)
Total duration for 'normal' interactions is 21075991 Total duration for 'attack' interactions is 2626792
Мы можем пойти дальше и использовать подсчеты для вычисления средних продолжительности.
normal_count = normal_duration_data.count() attack_count = attack_duration_data.count() print "Mean duration for 'normal' interactions is {}".\ format(round(total_normal_duration/float(normal_count),3)) print "Mean duration for 'attack' interactions is {}".\ format(round(total_attack_duration/float(attack_count),3))
Mean duration for 'normal' interactions is 216.657 Mean duration for 'attack' interactions is 6.621
У нас есть первый (и слишком упрощенный) подход к выявлению атакующих взаимодействий.
Лучший Способ-Использование aggregate
Действие aggregate
освобождает нас от ограничения того, что возврат должен быть того же типа, что и RDD, над которым мы работаем. Как и в случае с fold
, мы предоставляем начальное нулевое значение типа, который хотим вернуть. Затем мы предоставляем две функции. Первый используется для объединения элементов нашего РДД с аккумулятором. Вторая функция необходима для объединения двух аккумуляторов. Давайте посмотрим на это в действии, вычисляя среднее значение, которое мы делали раньше.
normal_sum_count = normal_duration_data.aggregate( (0,0), # the initial value (lambda acc, value: (acc[0] + value, acc[1] + 1)), # combine val/acc (lambda acc1, acc2: (acc1[0] + acc2[0], acc1[1] + acc2[1])) ) print "Mean duration for 'normal' interactions is {}".\ format(round(normal_sum_count[0]/float(normal_sum_count[1]),3))
Mean duration for 'normal' interactions is 216.657
В предыдущей агрегации первый элемент накопителя сохраняет общую сумму, а второй элемент-счет. Объединение аккумулятора с элементом RDD состоит в суммировании значения и увеличении количества. Объединение двух аккумуляторов требует только попарной суммы.
Мы можем сделать то же самое с взаимодействиями типа атаки.
attack_sum_count = attack_duration_data.aggregate( (0,0), # the initial value (lambda acc, value: (acc[0] + value, acc[1] + 1)), # combine value with acc (lambda acc1, acc2: (acc1[0] + acc2[0], acc1[1] + acc2[1])) # combine accumulators ) print "Mean duration for 'attack' interactions is {}".\ format(round(attack_sum_count[0]/float(attack_sum_count[1]),3))
Mean duration for 'attack' interactions is 6.621
Работа с парой Ключ/значение RDDs
Spark предоставляет специальные функции для работы с RDDS, элементы которых являются парами ключ/значение. Они обычно используются для выполнения агрегаций и других обработок по ключу.
В этом разделе мы покажем, как, работая с парами ключ/значение, мы можем обрабатывать наш набор данных сетевых взаимодействий более практичным и мощным способом, чем тот, который использовался в предыдущих блокнотах. Агрегация пар ключ/значение будет особенно эффективна при попытке исследовать каждый тип тега в наших сетевых атаках индивидуально.
Получение данных и создание RDD
Как и в первом разделе, мы будем использовать сокращенный набор данных (10%), предоставленный для KDD Cup 1999 , содержащий почти полмиллиона сетевых взаимодействий. Файл предоставляется в виде файла Gzip, который мы будем загружать локально.
import urllib f = urllib.urlretrieve ("http://kdd.ics.uci.edu/databases/kddcup99/kddcup.data_10_percent.gz", "kddcup.data_10_percent.gz") data_file = "./kddcup.data_10_percent.gz" raw_data = sc.textFile(data_file)
Создание пары RDD для типов взаимодействия
В этой записной книжке мы хотим провести некоторый исследовательский анализ данных нашего набора данных сетевых взаимодействий. Более конкретно мы хотим охарактеризовать каждый тип сетевого взаимодействия с точки зрения некоторых его переменных, таких как продолжительность. Для этого нам сначала нужно создать подходящий для этого RDD, где каждое взаимодействие анализируется как строка CSV, представляющая значение, и помещается вместе с соответствующим тегом в качестве ключа.
Обычно мы создаем пару ключ/значение RDDS, применяя функцию с помощью map
к исходным данным. Эта функция возвращает соответствующую пару для данного элемента RDD. Мы можем действовать следующим образом.
csv_data = raw_data.map(lambda x: x.split(",")) key_value_data = csv_data.map(lambda x: (x[41], x)) # x[41] contains the network interaction tag
Теперь у нас есть готовые к использованию данные пары ключ/значение. Давайте возьмем первый элемент, чтобы посмотреть, как он выглядит.
key_value_data.take(1)
[(u'normal.', [u'0', u'tcp', u'http', u'SF', u'181', u'5450', u'0', u'0', u'0', u'0', u'0', u'1', u'0', u'0', u'0', u'0', u'0', u'0', u'0', u'0', u'0', u'0', u'8', u'8', u'0.00', u'0.00', u'0.00', u'0.00', u'1.00', u'0.00', u'0.00', u'9', u'9', u'1.00', u'0.00', u'0.11', u'0.00', u'0.00', u'0.00', u'0.00', u'0.00', u'normal.'])]
Агрегирование данных с помощью пары Ключ/значение RDDS
Мы можем использовать все преобразования и действия, доступные для обычных RDDS с парой ключ/значение RDDS. Нам просто нужно заставить функции работать с парными элементами. Кроме того, Spark предоставляет специальные функции для работы с RDDS, содержащими парные элементы. Они очень похожи на те, которые доступны для общих RDDS.
Например, у нас есть преобразование reduceByKey
, которое мы можем использовать следующим образом для вычисления общей продолжительности каждого типа сетевого взаимодействия.
key_value_duration = csv_data.map(lambda x: (x[41], float(x[0]))) durations_by_key = key_value_duration.reduceByKey(lambda x, y: x + y) durations_by_key.collect()
[(u'guess_passwd.', 144.0), (u'nmap.', 0.0), (u'warezmaster.', 301.0), (u'rootkit.', 1008.0), (u'warezclient.', 627563.0), (u'smurf.', 0.0), (u'pod.', 0.0), (u'neptune.', 0.0), (u'normal.', 21075991.0), (u'spy.', 636.0), (u'ftp_write.', 259.0), (u'phf.', 18.0), (u'portsweep.', 1991911.0), (u'teardrop.', 0.0), (u'buffer_overflow.', 2751.0), (u'land.', 0.0), (u'imap.', 72.0), (u'loadmodule.', 326.0), (u'perl.', 124.0), (u'multihop.', 1288.0), (u'back.', 284.0), (u'ipsweep.', 43.0), (u'satan.', 64.0)]
У нас есть определенное подсчетное действие для пар ключ/значение.
counts_by_key = key_value_data.countByKey() counts_by_key defaultdict(, {u'guess_passwd.': 53, u'nmap.': 231, u'warezmaster.': 20, u'rootkit.': 10, u'warezclient.': 1020, u'smurf.': 280790, u'pod.': 264, u'neptune.': 107201, u'normal.': 97278, u'spy.': 2, u'ftp_write.': 8, u'phf.': 4, u'portsweep.': 1040, u'teardrop.': 979, u'buffer_overflow.': 30, u'land.': 21, u'imap.': 12, u'loadmodule.': 9, u'perl.': 3, u'multihop.': 7, u'back.': 2203, u'ipsweep.': 1247, u'satan.': 1589})
Использование combineByKey
Это наиболее общая из функций агрегирования по ключам. Большинство других комбинаторов для каждого ключа реализованы с его помощью. Мы можем думать о нем как о эквиваленте aggregate
, поскольку он позволяет пользователю возвращать значения, которые не являются тем же типом, что и наши входные данные.
Например, мы можем использовать его для расчета средней продолжительности каждого типа следующим образом.
sum_counts = key_value_duration.combineByKey( (lambda x: (x, 1)), # the initial value, with value x and count 1 (lambda acc, value: (acc[0]+value, acc[1]+1)), # how to combine a pair value with the accumulator: sum value, and increment count (lambda acc1, acc2: (acc1[0]+acc2[0], acc1[1]+acc2[1])) # combine accumulators ) sum_counts.collectAsMap()
{u'back.': (284.0, 2203), u'buffer_overflow.': (2751.0, 30), u'ftp_write.': (259.0, 8), u'guess_passwd.': (144.0, 53), u'imap.': (72.0, 12), u'ipsweep.': (43.0, 1247), u'land.': (0.0, 21), u'loadmodule.': (326.0, 9), u'multihop.': (1288.0, 7), u'neptune.': (0.0, 107201), u'nmap.': (0.0, 231), u'normal.': (21075991.0, 97278), u'perl.': (124.0, 3), u'phf.': (18.0, 4), u'pod.': (0.0, 264), u'portsweep.': (1991911.0, 1040), u'rootkit.': (1008.0, 10), u'satan.': (64.0, 1589), u'smurf.': (0.0, 280790), u'spy.': (636.0, 2), u'teardrop.': (0.0, 979), u'warezclient.': (627563.0, 1020), u'warezmaster.': (301.0, 20)}
Мы видим, что аргументы очень похожи на те, которые были переданы в aggregate
в предыдущей записной книжке. Результат, связанный с каждым типом, представлен в виде пары. Если мы действительно хотим получить средние значения, нам нужно сделать деление, прежде чем собирать результаты.
duration_means_by_type = sum_counts.map(lambda (key,value): (key, round(value[0]/value[1],3))).collectAsMap() # Print them sorted for tag in sorted(duration_means_by_type, key=duration_means_by_type.get, reverse=True): print tag, duration_means_by_type[tag]
portsweep. 1915.299 warezclient. 615.258 spy. 318.0 normal. 216.657 multihop. 184.0 rootkit. 100.8 buffer_overflow. 91.7 perl. 41.333 loadmodule. 36.222 ftp_write. 32.375 warezmaster. 15.05 imap. 6.0 phf. 4.5 guess_passwd. 2.717 back. 0.129 satan. 0.04 ipsweep. 0.034 nmap. 0.0 smurf. 0.0 pod. 0.0 neptune. 0.0 teardrop. 0.0 land. 0.0
Небольшой шаг к пониманию того, что заставляет сетевое взаимодействие считаться атакой.