Автор оригинала: Jose A Dianes.
Инструкции
Моя серия учебных пособий Spark & Python может быть рассмотрена индивидуально, хотя существует более или менее линейная “история”, если следовать ей последовательно. Используя один и тот же набор данных, они пытаются решить с ним связанный набор задач.
Это не единственный, но хороший способ следовать этим учебникам Spark-сначала клонировать репо GitHub , а затем запустить свой собственный IPython notebook в режиме PySpark . Например, если у нас есть автономная установка Spark, запущенная в нашем localhost
с максимальным объемом 6 ГБ на узел, назначенный IPython:
MASTER="spark://127.0.0.1:7077" SPARK_EXECUTOR_MEMORY="6G" IPYTHON_OPTS="notebook --pylab inline" ~/spark-1.3.1-bin-hadoop2.6/bin/pyspark
Обратите внимание, что путь к команде pyspark
будет зависеть от конкретной установки. Поэтому в качестве требования вам необходимо установить Spark на той же машине, на которой вы собираетесь запустить IPython notebook
сервер.
Дополнительные параметры Spark см. в разделе здесь . В общем случае работает правило передачи параметров, описанное в форме spark.executor.memory
as SPARK_EXECUTOR_MEMORY
при вызове IPython/PySpark.
Наборы данных
Мы будем использовать наборы данных из KDD Cup 1999 .
Рекомендации
Справочником по этим и другим темам, связанным с Spark, является Learning Spark Холдена Карау, Энди Конвински, Патрика Уэнделла и Матея Захарии.
Набор данных соревнований KDD Cup 1999 подробно описан здесь .
Вступление
В этом учебном пособии будут представлены возможности Spark для работы с данными структурированным способом. В принципе, все вращается вокруг концепции фрейма данных и использования языка SQL для их запроса. Мы увидим, как абстракция фрейма данных, очень популярная в других экосистемах анализа данных (например, R и Python/Pandas), очень эффективна при выполнении исследовательского анализа данных. На самом деле, очень легко выражать запросы данных при использовании вместе с языком SQL. Кроме того, Spark прозрачно распределяет эту структуру данных на основе столбцов, чтобы сделать процесс запроса максимально эффективным.
Получение данных и создание RDD
Как и в предыдущих блокнотах, мы будем использовать сокращенный набор данных (10 процентов), предоставленный для KDD Cup 1999 , содержащий почти полмиллиона сетевых взаимодействий. Файл предоставляется в виде файла Gzip, который мы загрузим локально.
import urllib f = urllib.urlretrieve ("http://kdd.ics.uci.edu/databases/kddcup99/kddcup.data_10_percent.gz", "kddcup.data_10_percent.gz") data_file = "./kddcup.data_10_percent.gz" raw_data = sc.textFile(data_file).cache()
Получение фрейма данных
Spark DataFrame
– это распределенная коллекция данных, организованная в именованные столбцы. Концептуально он эквивалентен таблице в реляционной базе данных или фрейму данных в R или Pandas. Они могут быть построены из широкого спектра источников, таких как существующий RDD в нашем случае.
Точкой входа во все функции SQL в Spark является класс SQLContext
. Чтобы создать базовый экземпляр, все, что нам нужно, – это ссылка SparkContext
. Поскольку мы запускаем Spark в режиме оболочки (используя PySpark), мы можем использовать для этой цели глобальный контекстный объект sc
.
from pyspark.sql import SQLContext sqlContext = SQLContext(sc)
Вывод схемы
С помощью SQLContext
мы готовы создать DataFrame
из нашего существующего RDD. Но сначала нам нужно сообщить Spark SQL схему в наших данных.
Spark SQL может преобразовать RDD объектов Row
в DataFrame
. Строки строятся путем передачи списка пар ключ/значение в виде kwargs классу Row
. Ключи определяют имена столбцов, а типы выводятся путем просмотра первой строки. Поэтому важно, чтобы в первой строке RDD не было отсутствующих данных, чтобы правильно вывести схему.
В нашем случае нам сначала нужно разделить данные, разделенные запятыми, а затем использовать информацию в описании задачи KDD 1999 для получения имен столбцов /.
from pyspark.sql import Row csv_data = raw_data.map(lambda l: l.split(",")) row_data = csv_data.map(lambda p: Row( duration=int(p[0]), protocol_type=p[1], service=p[2], flag=p[3], src_bytes=int(p[4]), dst_bytes=int(p[5]) ) )
Как только у нас будет наш RDD Row
, мы сможем вывести и зарегистрировать схему.
interactions_df = sqlContext.createDataFrame(row_data) interactions_df.registerTempTable("interactions")
Теперь мы можем выполнять SQL-запросы над нашим фреймом данных, который был зарегистрирован как таблица.
# Select tcp network interactions with more than 1 second duration and no transfer from destination tcp_interactions = sqlContext.sql(""" SELECT duration, dst_bytes FROM interactions WHERE protocol_type = 'tcp' AND duration > 1000 AND dst_bytes = 0 """) tcp_interactions.show()
duration dst_bytes 5057 0 5059 0 5051 0 5056 0 5051 0 5039 0 5062 0 5041 0 5056 0 5064 0 5043 0 5061 0 5049 0 5061 0 5048 0 5047 0 5044 0 5063 0 5068 0 5062 0
Результаты SQL-запросов являются RDDS и поддерживают все обычные операции RDD.
# Output duration together with dst_bytes tcp_interactions_out = tcp_interactions.map(lambda p: "Duration: {}, Dest. bytes: {}".format(p.duration, p.dst_bytes)) for ti_out in tcp_interactions_out.collect(): print ti_out
Duration: 5057, Dest. bytes: 0 Duration: 5059, Dest. bytes: 0 Duration: 5051, Dest. bytes: 0 Duration: 5056, Dest. bytes: 0 Duration: 5051, Dest. bytes: 0 Duration: 5039, Dest. bytes: 0 Duration: 5062, Dest. bytes: 0 Duration: 5041, Dest. bytes: 0 Duration: 5056, Dest. bytes: 0 Duration: 5064, Dest. bytes: 0 Duration: 5043, Dest. bytes: 0 Duration: 5061, Dest. bytes: 0 Duration: 5049, Dest. bytes: 0 Duration: 5061, Dest. bytes: 0 Duration: 5048, Dest. bytes: 0 Duration: 5047, Dest. bytes: 0 Duration: 5044, Dest. bytes: 0 Duration: 5063, Dest. bytes: 0 Duration: 5068, Dest. bytes: 0 Duration: 5062, Dest. bytes: 0 Duration: 5046, Dest. bytes: 0 Duration: 5052, Dest. bytes: 0 Duration: 5044, Dest. bytes: 0 Duration: 5054, Dest. bytes: 0 Duration: 5039, Dest. bytes: 0 Duration: 5058, Dest. bytes: 0 Duration: 5051, Dest. bytes: 0 Duration: 5032, Dest. bytes: 0 Duration: 5063, Dest. bytes: 0 Duration: 5040, Dest. bytes: 0 Duration: 5051, Dest. bytes: 0 Duration: 5066, Dest. bytes: 0 Duration: 5044, Dest. bytes: 0 Duration: 5051, Dest. bytes: 0 Duration: 5036, Dest. bytes: 0 Duration: 5055, Dest. bytes: 0 Duration: 2426, Dest. bytes: 0 Duration: 5047, Dest. bytes: 0 Duration: 5057, Dest. bytes: 0 Duration: 5037, Dest. bytes: 0 Duration: 5057, Dest. bytes: 0 Duration: 5062, Dest. bytes: 0 Duration: 5051, Dest. bytes: 0 Duration: 5051, Dest. bytes: 0 Duration: 5053, Dest. bytes: 0 Duration: 5064, Dest. bytes: 0 Duration: 5044, Dest. bytes: 0 Duration: 5051, Dest. bytes: 0 Duration: 5033, Dest. bytes: 0 Duration: 5066, Dest. bytes: 0 Duration: 5063, Dest. bytes: 0 Duration: 5056, Dest. bytes: 0 Duration: 5042, Dest. bytes: 0 Duration: 5063, Dest. bytes: 0 Duration: 5060, Dest. bytes: 0 Duration: 5056, Dest. bytes: 0 Duration: 5049, Dest. bytes: 0 Duration: 5043, Dest. bytes: 0 Duration: 5039, Dest. bytes: 0 Duration: 5041, Dest. bytes: 0 Duration: 42448, Dest. bytes: 0 Duration: 42088, Dest. bytes: 0 Duration: 41065, Dest. bytes: 0 Duration: 40929, Dest. bytes: 0 Duration: 40806, Dest. bytes: 0 Duration: 40682, Dest. bytes: 0 Duration: 40571, Dest. bytes: 0 Duration: 40448, Dest. bytes: 0 Duration: 40339, Dest. bytes: 0 Duration: 40232, Dest. bytes: 0 Duration: 40121, Dest. bytes: 0 Duration: 36783, Dest. bytes: 0 Duration: 36674, Dest. bytes: 0 Duration: 36570, Dest. bytes: 0 Duration: 36467, Dest. bytes: 0 Duration: 36323, Dest. bytes: 0 Duration: 36204, Dest. bytes: 0 Duration: 32038, Dest. bytes: 0 Duration: 31925, Dest. bytes: 0 Duration: 31809, Dest. bytes: 0 Duration: 31709, Dest. bytes: 0 Duration: 31601, Dest. bytes: 0 Duration: 31501, Dest. bytes: 0 Duration: 31401, Dest. bytes: 0 Duration: 31301, Dest. bytes: 0 Duration: 31194, Dest. bytes: 0 Duration: 31061, Dest. bytes: 0 Duration: 30935, Dest. bytes: 0 Duration: 30835, Dest. bytes: 0 Duration: 30735, Dest. bytes: 0 Duration: 30619, Dest. bytes: 0 Duration: 30518, Dest. bytes: 0 Duration: 30418, Dest. bytes: 0 Duration: 30317, Dest. bytes: 0 Duration: 30217, Dest. bytes: 0 Duration: 30077, Dest. bytes: 0 Duration: 25420, Dest. bytes: 0 Duration: 22921, Dest. bytes: 0 Duration: 22821, Dest. bytes: 0 Duration: 22721, Dest. bytes: 0 Duration: 22616, Dest. bytes: 0 Duration: 22516, Dest. bytes: 0 Duration: 22416, Dest. bytes: 0 Duration: 22316, Dest. bytes: 0 Duration: 22216, Dest. bytes: 0 Duration: 21987, Dest. bytes: 0 Duration: 21887, Dest. bytes: 0 Duration: 21767, Dest. bytes: 0 Duration: 21661, Dest. bytes: 0 Duration: 21561, Dest. bytes: 0 Duration: 21455, Dest. bytes: 0 Duration: 21334, Dest. bytes: 0 Duration: 21223, Dest. bytes: 0 Duration: 21123, Dest. bytes: 0 Duration: 20983, Dest. bytes: 0 Duration: 14682, Dest. bytes: 0 Duration: 14420, Dest. bytes: 0 Duration: 14319, Dest. bytes: 0 Duration: 14198, Dest. bytes: 0 Duration: 14098, Dest. bytes: 0 Duration: 13998, Dest. bytes: 0 Duration: 13898, Dest. bytes: 0 Duration: 13796, Dest. bytes: 0 Duration: 13678, Dest. bytes: 0 Duration: 13578, Dest. bytes: 0 Duration: 13448, Dest. bytes: 0 Duration: 13348, Dest. bytes: 0 Duration: 13241, Dest. bytes: 0 Duration: 13141, Dest. bytes: 0 Duration: 13033, Dest. bytes: 0 Duration: 12933, Dest. bytes: 0 Duration: 12833, Dest. bytes: 0 Duration: 12733, Dest. bytes: 0 Duration: 12001, Dest. bytes: 0 Duration: 5678, Dest. bytes: 0 Duration: 5010, Dest. bytes: 0 Duration: 1298, Dest. bytes: 0 Duration: 1031, Dest. bytes: 0 Duration: 36438, Dest. bytes: 0
Мы можем легко взглянуть на нашу схему фрейма данных, используя print Schema
.
interactions_df.printSchema() root |-- dst_bytes: long (nullable = true) |-- duration: long (nullable = true) |-- flag: string (nullable = true) |-- protocol_type: string (nullable = true) |-- service: string (nullable = true) |-- src_bytes: long (nullable = true)
Запросы как Операции с фреймами данных
Spark DataFrame
предоставляет специфичный для домена язык для обработки структурированных данных. Этот язык включает в себя методы, которые мы можем объединить, чтобы сделать выбор, фильтрацию, группировку и т. Д. Например, предположим, что мы хотим подсчитать, сколько взаимодействий существует для каждого типа протокола. Мы можем действовать следующим образом.
from time import time t0 = time() interactions_df.select("protocol_type", "duration", "dst_bytes").groupBy("protocol_type").count().show() tt = time() - t0 print "Query performed in {} seconds".format(round(tt,3))
protocol_type count udp 20354 tcp 190065 icmp 283602 Query performed in 20.568 seconds
Теперь представьте, что мы хотим подсчитать, сколько взаимодействий длится более 1 секунды, без передачи данных из пункта назначения, сгруппированных по типу протокола. Мы можем просто добавить для фильтрации вызовы к предыдущим.
t0 = time() interactions_df.select("protocol_type", "duration", "dst_bytes").filter(interactions_df.duration>1000).filter(interactions_df.dst_bytes==0).groupBy("protocol_type").count().show() tt = time() - t0 print "Query performed in {} seconds".format(round(tt,3))
protocol_type count tcp 139 Query performed in 16.641 seconds
Мы можем использовать это для выполнения некоторого исследовательского анализа данных . Давайте посчитаем, сколько у нас атак и нормальных взаимодействий. Сначала нам нужно добавить столбец label в наши данные.
def get_label_type(label): if label!="normal.": return "attack" else: return "normal" row_labeled_data = csv_data.map(lambda p: Row( duration=int(p[0]), protocol_type=p[1], service=p[2], flag=p[3], src_bytes=int(p[4]), dst_bytes=int(p[5]), label=get_label_type(p[41]) ) ) interactions_labeled_df = sqlContext.createDataFrame(row_labeled_data)
На этот раз нам не нужно регистрировать схему, так как мы собираемся использовать интерфейс OOqueryinterface.
Давайте проверим, что предыдущее действительно работает, подсчитав атаку и нормальные данные в нашем фрейме данных.
t0 = time() interactions_labeled_df.select("label").groupBy("label").count().show() tt = time() - t0 print "Query performed in {} seconds".format(round(tt,3))
label count attack 396743 normal 97278 Query performed in 17.325 seconds
Теперь мы хотим подсчитать их по метке и типу протокола, чтобы увидеть, насколько важен тип протокола для определения того, является ли взаимодействие атакой или нет.
t0 = time() interactions_labeled_df.select("label", "protocol_type").groupBy("label", "protocol_type").count().show() tt = time() - t0 print "Query performed in {} seconds".format(round(tt,3))
label protocol_type count attack udp 1177 attack tcp 113252 attack icmp 282314 normal udp 19177 normal tcp 76813 normal icmp 1288 Query performed in 17.253 seconds
На первый взгляд кажется, что udp взаимодействия находятся в более низкой пропорции между сетевыми атаками по сравнению с другими типами протоколов.
И мы можем делать гораздо более сложные группировки. Например, добавьте к предыдущему “разделение” на основе передачи данных от цели.
t0 = time() interactions_labeled_df.select("label", "protocol_type", "dst_bytes").groupBy("label", "protocol_type", interactions_labeled_df.dst_bytes==0).count().show() tt = time() - t0 print "Query performed in {} seconds".format(round(tt,3))
label protocol_type (dst_bytes = 0) count normal icmp true 1288 attack udp true 1166 attack udp false 11 normal udp true 3594 normal udp false 15583 attack tcp true 110583 attack tcp false 2669 normal tcp true 9313 normal tcp false 67500 attack icmp true 282314 Query performed in 17.284 seconds
Мы видим, насколько актуально это новое разделение для определения того, является ли сетевое взаимодействие атакой.
На этом мы остановимся, но мы можем увидеть, насколько эффективен этот тип запросов для изучения наших данных. На самом деле мы можем повторить все разбиения, которые мы видели в предыдущих блокнотах при введении деревьев классификации, просто выбрав, нащупав и отфильтровав наш фрейм данных. Для получения более подробного (но менее реального) списка операций и источников данных Spark DataFrame
ознакомьтесь с официальной документацией здесь .