Автор оригинала: Jose A Dianes.
Инструкции
Моя серия учебников Spark & Python может быть рассмотрена индивидуально, хотя существует более или менее линейная “история”, если следовать ей последовательно. Используя один и тот же набор данных, они пытаются решить с ним связанный набор задач.
Это не единственный способ, но хороший способ следовать этим учебникам Spark-сначала клонировать репо GitHub , а затем запускать свой собственный IPython notebook в режиме PySpark . Например, если у нас есть автономная Spark установка, работающая в нашем localhost
с максимальным объемом 6 Гб на узел, назначенный IPython:
MASTER="spark://127.0.0.1:7077" SPARK_EXECUTOR_MEMORY="6G" IPYTHON_OPTS="notebook --pylab inline" ~/spark-1.3.1-bin-hadoop2.6/bin/pyspark
Обратите внимание, что путь к команде pyspark
будет зависеть от вашей конкретной установки. Таким образом, в качестве требования вам необходимо установить Spark на той же машине, на которой вы собираетесь запустить сервер IPython notebook
.
Дополнительные параметры Spark см. в разделе здесь . В общем случае работает правило передачи опций, описанное в виде spark.executor.memory
as SPARK_EXECUTOR_MEMORY
при вызове IPython/PySpark.
Наборы данных
Мы будем использовать наборы данных из KDD Cup 1999 .
Рекомендации
Справочником по этим и другим темам, связанным с Spark, является Learning Spark Holden Karau, Andy Konwinski, Patrick Wendell и Matei Zaharia.
Набор данных соревнований KDD Cup 1999 подробно описан здесь .
Вступление
В этом уроке мы будем использовать библиотеку машинного обучения Spark MLlib для построения классификатора Логистической регрессии для обнаружения сетевых атак. Мы будем использовать полный набор данных KDD Cup 1999 для тестирования возможностей Spark с большими наборами данных.
Кроме того, мы введем два способа выполнения выбора модели : с помощью корреляционной матрицы и с помощью проверки гипотез.
Для вашей справки при показе времени выполнения, во время обработки этого учебника, наш кластер Spark содержит:
- Восемь узлов, причем один из них выступает в качестве ведущего, а остальные-в качестве рабочих (всего доступно 14 исполнительных ядер).
- Каждый узел содержит 8 ГБ оперативной памяти, причем для каждого узла используется 6 ГБ.
- Каждый узел имеет двухъядерный процессор Intel с частотой 2,4 ГГц.
Получение данных и создание RDD
Как мы уже говорили, на этот раз мы будем использовать полный набор данных , предоставленный для KDD Cup 1999 , содержащий почти полмиллиона сетевых взаимодействий. Файл предоставляется в виде файла Gzip, который мы будем загружать локально.
import urllib f = urllib.urlretrieve ("http://kdd.ics.uci.edu/databases/kddcup99/kddcup.data.gz", "kddcup.data.gz") data_file = "./kddcup.data.gz" raw_data = sc.textFile(data_file) print "Train data size is {}".format(raw_data.count())
Train data size is 4898431
KDD Cup 1999 также предоставляет тестовые данные, которые мы загрузим в отдельный RDD.
ft = urllib.urlretrieve("http://kdd.ics.uci.edu/databases/kddcup99/corrected.gz", "corrected.gz") test_data_file = "./corrected.gz" test_raw_data = sc.textFile(test_data_file) print "Test data size is {}".format(test_raw_data.count())
Test data size is 311029
Маркированные точки
Помеченная точка – это локальный вектор, связанный с меткой/ответом. В MLlib помеченные точки используются в алгоритмах контролируемого обучения и хранятся в виде двойников. Для бинарной классификации метка должна быть либо 0 (отрицательная), либо 1 (положительная).
Подготовка обучающих данных
В нашем случае мы заинтересованы в обнаружении сетевых атак в целом. Нам не нужно определять, с каким типом атаки мы имеем дело. Поэтому мы будем помечать каждое сетевое взаимодействие как не атаку (т. е. “нормальный” тег) или атаку (т. е. что-либо еще, кроме “нормального”).
from pyspark.mllib.regression import LabeledPoint from numpy import array def parse_interaction(line): line_split = line.split(",") # leave_out = [1,2,3,41] clean_line_split = line_split[0:1]+line_split[4:41] attack = 1.0 if line_split[41]=='normal.': attack = 0.0 return LabeledPoint(attack, array([float(x) for x in clean_line_split])) training_data = raw_data.map(parse_interaction)
Подготовка тестовых данных
Аналогично мы обрабатываем наш файл тестовых данных.
test_data = test_raw_data.map(parse_interaction)
Обнаружение сетевых атак с помощью логистической регрессии
Логистическая регрессия широко используется для прогнозирования бинарного ответа. Spark реализует два алгоритма для решения логистической регрессии: мини-пакетный градиентный спуск и L-BFGS. L-BFGS рекомендуется использовать по сравнению с мини-пакетным градиентным спуском для более быстрой конвергенции.
Обучение классификатора
from pyspark.mllib.classification import LogisticRegressionWithLBFGS from time import time # Build the model t0 = time() logit_model = LogisticRegressionWithLBFGS.train(training_data) tt = time() - t0 print "Classifier trained in {} seconds".format(round(tt,3))
Classifier trained in 365.599 seconds
Оценка модели на новых данных
Чтобы измерить ошибку классификации на наших тестовых данных, мы используем map
на test_data
RDD и модель для прогнозирования каждого класса тестовых точек.
labels_and_preds = test_data.map(lambda p: (p.label, logit_model.predict(p.features)))
Результаты классификации возвращаются в парс с фактической тестовой меткой и прогнозируемой меткой. Это используется для вычисления ошибки классификации с помощью filter
и count
следующим образом.
t0 = time() test_accuracy = labels_and_preds.filter(lambda (v, p): v == p).count() / float(test_data.count()) tt = time() - t0 print "Prediction made in {} seconds. Test accuracy is {}".format(round(tt,3), round(test_accuracy,4))
Prediction made in 34.291 seconds. Test accuracy is 0.9164
Это приличная точность. Мы знаем, что есть пространство для улучшения с лучшим выбором переменных, а также путем включения категориальных переменных (например, мы исключили “протокол” и “сервис”).
Выбор модели
Выбор модели или объекта помогает нам строить более интерпретируемые и эффективные модели (или классификатор в данном случае). Для иллюстрации мы рассмотрим два различных подхода: корреляционные матрицы и проверку гипотез.
Использование корреляционной матрицы
В предыдущей записной книжке мы рассчитали корреляционную матрицу, чтобы найти предикторы, которые сильно коррелируют. Там есть много возможных вариантов, чтобы упростить нашу модель. Мы можем выбрать различные комбинации коррелированных переменных и оставить только те, которые их представляют. Читатель может попробовать разные комбинации. Здесь мы выберем для иллюстрации следующее:
Из описания задачи KDD Cup 99 мы знаем, что переменная
dst_host_same_src_port_rate
ссылается на процент последних 100 подключений к одному и тому же порту для одного и того же хоста назначения. В нашей корреляционной матрице (и вспомогательных фреймах данных) мы находим, что эта матрица сильно и положительно коррелирует сsrc_bytes
иsrv_count
. Первый-это количество байтов, отправленных от источника к месту назначения. Позднее-это количество подключений к той же службе, что и текущее соединение за последние 2 секунды. Мы решили не включатьdst_host_same_src_port_rate
в нашу модель, так как мы включаем два других.Переменные
error_rate
иsrv_error_rate
(% соединений, имеющих ошибки SYN для одного и того же хоста и одной и той же службы соответственно) имеют высокую положительную корреляцию. Более того, набор переменных, с которыми они сильно коррелируют, практически одинаков. Они выглядят очень похожими на нашу модель. Мы будем держать толькоerror_rate
.Аналогичная ситуация происходит с
rerror_rate
иsrv_rerror_rate
(% соединений, имеющих REJ ошибки), поэтому мы будем держать толькоrerror_rate
.То же самое с переменными с префиксом
dst_host_
для предыдущих (например,dst_host_srv_serror_rate
).
Здесь мы остановимся, хотя читатель может продолжать экспериментировать с удалением коррелированных переменных (например, same_srv_rate
и diff_srv_rate
являются хорошими кандидатами. Наш список переменных, которые мы отбросим, включает в себя:
dst_host_same_src_port_rate
, (столбец 35).srv_serror_rate
(столбец 25).srv_rerror_rate
(столбец 27).dst_host_srv_serror_rate
(столбец 38).dst_host_srv_rerror_rate
(столбец 40).
Оценка новой модели
Перейдем к оценке нашей редуцированной модели. Сначала нам нужно предоставить обучающие и тестовые наборы данных, содержащие только выбранные переменные. Для этого мы определим новую функцию ot parse raw data, которая сохраняет только то, что нам нужно.
def parse_interaction_corr(line): line_split = line.split(",") # leave_out = [1,2,3,25,27,35,38,40,41] clean_line_split = line_split[0:1]+line_split[4:25]+line_split[26:27]+line_split[28:35]+line_split[36:38]+line_split[39:40] attack = 1.0 if line_split[41]=='normal.': attack = 0.0 return LabeledPoint(attack, array([float(x) for x in clean_line_split])) corr_reduced_training_data = raw_data.map(parse_interaction_corr) corr_reduced_test_data = test_raw_data.map(parse_interaction_corr)
Примечание: при выборе элементов в разбиении, понимание списка с leave_out список для фильтрации действительно более питонен, чем нарезка и конкатенация, но мы обнаружили, что он менее эффективен. Это очень важно при работе с большими наборами данных. То parse_interaction функции будут вызываться для каждого элемента в RDD, поэтому нам нужно сделать их как можно более эффективными.
Теперь мы можем обучить модель.
# Build the model t0 = time() logit_model_2 = LogisticRegressionWithLBFGS.train(corr_reduced_training_data) tt = time() - t0 print "Classifier trained in {} seconds".format(round(tt,3))
Classifier trained in 319.124 seconds
И оценить его точность по данным испытаний.
labels_and_preds = corr_reduced_test_data.map(lambda p: (p.label, logit_model_2.predict(p.features))) t0 = time() test_accuracy = labels_and_preds.filter(lambda (v, p): v == p).count() / float(corr_reduced_test_data.count()) tt = time() - t0 print "Prediction made in {} seconds. Test accuracy is {}".format(round(tt,3), round(test_accuracy,4))
Prediction made in 34.102 seconds. Test accuracy is 0.8599
Как и ожидалось, мы сократили точность, а также время обучения. Однако это не кажется хорошей сделкой! По крайней мере, не для логистической регрессии, а с учетом предикторов, которые мы решили опустить. Мы потеряли довольно много точности и не получили много времени выполнения во время тренировки. Более того, время предсказания не улучшилось.
Использование проверки гипотез
Проверка гипотез является мощным инструментом статистического вывода и обучения, чтобы определить, является ли результат статистически значимым. MLlib поддерживает тесты Пирсона на хи-квадрат ( χ2) для хорошей подгонки и независимости. Тест на добротность подгонки требует входного типа Vector
, в то время как тест на независимость требует Matrix
в качестве входных данных. Кроме того, MLlib также поддерживает входной тип RDD[LabeledPoint]
для включения выбора объектов с помощью тестов независимости хи-квадрат. Опять же, эти методы являются частью пакета Statistics
.
В нашем случае мы хотим выполнить какой-то выбор функции, поэтому мы предоставим RDD LabeledPoint
. Внутренне MLlib вычислит матрицу непредвиденных обстоятельств и выполнит тест хи-квадрат (χ2) человека. Характеристики должны быть категоричными. Вещественнозначные признаки будут рассматриваться как категориальные в каждом из своих различных значений. Существует ограничение в 1000 различных значений, поэтому нам нужно либо исключить некоторые функции, либо классифицировать их. В этом случае мы будем рассматривать только объекты, которые либо принимают логические значения, либо просто несколько различных числовых значений в нашем наборе данных. Мы могли бы преодолеть это ограничение, определив более сложную функцию parse_interaction
, которая правильно классифицирует каждый объект.
feature_names = ["land","wrong_fragment", "urgent","hot","num_failed_logins", "logged_in","num_compromised", "root_shell","su_attempted", "num_root","num_file_creations", "num_shells","num_access_files", "num_outbound_cmds", "is_hot_login","is_guest_login", "count","srv_count","serror_rate", "srv_serror_rate","rerror_rate", "srv_rerror_rate","same_srv_rate", "diff_srv_rate","srv_diff_host_rate", "dst_host_count","dst_host_srv_count", "dst_host_same_srv_rate","dst_host_diff_srv_rate", "dst_host_same_src_port_rate", "dst_host_srv_diff_host_rate","dst_host_serror_rate", "dst_host_srv_serror_rate", "dst_host_rerror_rate","dst_host_srv_rerror_rate"] def parse_interaction_categorical(line): line_split = line.split(",") clean_line_split = line_split[6:41] attack = 1.0 if line_split[41]=='normal.': attack = 0.0 return LabeledPoint(attack, array([float(x) for x in clean_line_split])) training_data_categorical = raw_data.map(parse_interaction_categorical) from pyspark.mllib.stat import Statistics chi = Statistics.chiSqTest(training_data_categorical)
Теперь мы можем проверить полученные значения, поместив их в фрейм данных Pandas .
import pandas as pd pd.set_option('display.max_colwidth', 30) records = [(result.statistic, result.pValue) for result in chi] chi_df = pd.DataFrame(data=records, index= feature_names, columns=["Statistic","p-value"]) chi_df
земля | 0.495304 | 4.649840e-01 |
неправильный фрагмент | 0.000000 | 3.068555e+02 |
срочный | 0.000000 | 3.871844e+01 |
горячий | 0.000000 | 1.946331e+04 |
num_failed_logins | 0.000000 | 1.277691e+02 |
logged_in | 0.000000 | 3.273098e+06 |
num_compromised | 0.000000 | 2.011863e+03 |
root_shell | 0.000000 | 1.044918e+03 |
su_attempted | 0.000000 | 4.340000e+02 |
num_root | 0.000000 | 2.287168e+04 |
num_file_creations | 0.000000 | 9.179739e+03 |
num_shells | 0.000000 | 1.380028e+03 |
num_access_files | 0.000000 | 1.873477e+04 |
num_outbound_cmds | 1.000000 | 0.000000e+00 |
is_hot_login | 0.004498 | 8.070987e+00 |
is_guest_login | 0.000000 | 1.350051e+04 |
считать | 0.000000 | 4.546398e+06 |
srv_count | 0.000000 | 2.296060e+06 |
error_rate | 0.000000 | 2.684199e+05 |
srv_serror_rate | 0.000000 | 3.026270e+05 |
error_rate | 0.000000 | 9.860453e+03 |
srv_rerror_rate | 0.000000 | 3.247639e+04 |
same_srv_rate | 0.000000 | 3.999124e+05 |
diff_srv_rate | 0.000000 | 3.909998e+05 |
srv_diff_host_rate | 0.000000 | 1.365459e+06 |
dst_host_count | 0.000000 | 2.520479e+06 |
dst_host_srv_count | 0.000000 | 1.439086e+06 |
dst_host_same_srv_rate | 0.000000 | 1.237932e+06 |
dst_host_diff_srv_rate | 0.000000 | 1.339002e+06 |
dst_host_same_src_port_rate | 0.000000 | 2.915195e+06 |
dst_host_srv_diff_host_rate | 0.000000 | 2.226291e+06 |
dst_host_serror_rate | 0.000000 | 4.074546e+05 |
dst_host_srv_serror_rate | 0.000000 | 4.550990e+05 |
dst_host_rerror_rate | 0.000000 | 1.364790e+05 |
dst_host_srv_rerror_rate | 0.000000 | 2.545474e+05 |
Из этого мы заключаем, что предикторы land
и num_outbound_cmds
могут быть удалены из нашей модели, не оказывая существенного влияния на вашу точность. Давай попробуем.
Оценка новой модели
Таким образом, единственной модификацией нашей первой функции parse_interaction
будет удаление столбцов 6 и 19, соответствующих двум предикторам, которые мы не хотим быть частью нашей модели.
def parse_interaction_chi(line): line_split = line.split(",") # leave_out = [1,2,3,6,19,41] clean_line_split = line_split[0:1] + line_split[4:6] + line_split[7:19] + line_split[20:41] attack = 1.0 if line_split[41]=='normal.': attack = 0.0 return LabeledPoint(attack, array([float(x) for x in clean_line_split])) training_data_chi = raw_data.map(parse_interaction_chi) test_data_chi = test_raw_data.map(parse_interaction_chi)
Теперь мы снова построим классификатор логистической регрессии.
# Build the model t0 = time() logit_model_chi = LogisticRegressionWithLBFGS.train(training_data_chi) tt = time() - t0 print "Classifier trained in {} seconds".format(round(tt,3))
Classifier trained in 356.507 seconds
И оценить в тестовых данных.
labels_and_preds = test_data_chi.map(lambda p: (p.label, logit_model_chi.predict(p.features))) t0 = time() test_accuracy = labels_and_preds.filter(lambda (v, p): v == p).count() / float(test_data_chi.count()) tt = time() - t0 print "Prediction made in {} seconds. Test accuracy is {}".format(round(tt,3), round(test_accuracy,4))
Prediction made in 34.656 seconds. Test accuracy is 0.9164
Таким образом, мы можем видеть, что, используя проверку гипотез, мы смогли удалить два предиктора без снижения точности тестирования вообще. Время тренировок также немного улучшилось. Сейчас это может показаться большим сокращением модели, но это нечто при работе с большими источниками данных. Более того, мы должны иметь возможность классифицировать те пять предикторов, которые мы оставили по разным причинам, и либо включить их в модель, либо исключить, если они не являются статистически значимыми.
Кроме того, мы могли бы попытаться удалить некоторые из тех предикторов, которые сильно коррелируют, стараясь не слишком снижать точность. В конце концов, мы должны получить модель, более легкую для понимания и использования.