Рубрики
Без рубрики

Spark & Python: Логистическая регрессия MLlib

В этом уроке вы узнаете, как использовать библиотеку машинного обучения Spark MLlib для построения классификатора логистической регрессии для обнаружения сетевых атак.

Автор оригинала: Jose A Dianes.

Инструкции

Моя серия учебников Spark & Python может быть рассмотрена индивидуально, хотя существует более или менее линейная “история”, если следовать ей последовательно. Используя один и тот же набор данных, они пытаются решить с ним связанный набор задач.

Это не единственный способ, но хороший способ следовать этим учебникам Spark-сначала клонировать репо GitHub , а затем запускать свой собственный IPython notebook в режиме PySpark . Например, если у нас есть автономная Spark установка, работающая в нашем localhost с максимальным объемом 6 Гб на узел, назначенный IPython:

MASTER="spark://127.0.0.1:7077" SPARK_EXECUTOR_MEMORY="6G" IPYTHON_OPTS="notebook --pylab inline" ~/spark-1.3.1-bin-hadoop2.6/bin/pyspark

Обратите внимание, что путь к команде pyspark будет зависеть от вашей конкретной установки. Таким образом, в качестве требования вам необходимо установить Spark на той же машине, на которой вы собираетесь запустить сервер IPython notebook .

Дополнительные параметры Spark см. в разделе здесь . В общем случае работает правило передачи опций, описанное в виде spark.executor.memory as SPARK_EXECUTOR_MEMORY при вызове IPython/PySpark.

Наборы данных

Мы будем использовать наборы данных из KDD Cup 1999 .

Рекомендации

Справочником по этим и другим темам, связанным с Spark, является Learning Spark Holden Karau, Andy Konwinski, Patrick Wendell и Matei Zaharia.

Набор данных соревнований KDD Cup 1999 подробно описан здесь .

Вступление

В этом уроке мы будем использовать библиотеку машинного обучения Spark MLlib для построения классификатора Логистической регрессии для обнаружения сетевых атак. Мы будем использовать полный набор данных KDD Cup 1999 для тестирования возможностей Spark с большими наборами данных.

Кроме того, мы введем два способа выполнения выбора модели : с помощью корреляционной матрицы и с помощью проверки гипотез.

Для вашей справки при показе времени выполнения, во время обработки этого учебника, наш кластер Spark содержит:

  • Восемь узлов, причем один из них выступает в качестве ведущего, а остальные-в качестве рабочих (всего доступно 14 исполнительных ядер).
  • Каждый узел содержит 8 ГБ оперативной памяти, причем для каждого узла используется 6 ГБ.
  • Каждый узел имеет двухъядерный процессор Intel с частотой 2,4 ГГц.

Получение данных и создание RDD

Как мы уже говорили, на этот раз мы будем использовать полный набор данных , предоставленный для KDD Cup 1999 , содержащий почти полмиллиона сетевых взаимодействий. Файл предоставляется в виде файла Gzip, который мы будем загружать локально.

import urllib
f = urllib.urlretrieve ("http://kdd.ics.uci.edu/databases/kddcup99/kddcup.data.gz", "kddcup.data.gz")


data_file = "./kddcup.data.gz"
raw_data = sc.textFile(data_file)

print "Train data size is {}".format(raw_data.count())
Train data size is 4898431

KDD Cup 1999 также предоставляет тестовые данные, которые мы загрузим в отдельный RDD.

ft = urllib.urlretrieve("http://kdd.ics.uci.edu/databases/kddcup99/corrected.gz", "corrected.gz")


test_data_file = "./corrected.gz"
test_raw_data = sc.textFile(test_data_file)

print "Test data size is {}".format(test_raw_data.count())
Test data size is 311029

Маркированные точки

Помеченная точка – это локальный вектор, связанный с меткой/ответом. В MLlib помеченные точки используются в алгоритмах контролируемого обучения и хранятся в виде двойников. Для бинарной классификации метка должна быть либо 0 (отрицательная), либо 1 (положительная).

Подготовка обучающих данных

В нашем случае мы заинтересованы в обнаружении сетевых атак в целом. Нам не нужно определять, с каким типом атаки мы имеем дело. Поэтому мы будем помечать каждое сетевое взаимодействие как не атаку (т. е. “нормальный” тег) или атаку (т. е. что-либо еще, кроме “нормального”).

from pyspark.mllib.regression import LabeledPoint
from numpy import array

def parse_interaction(line):
    line_split = line.split(",")
    # leave_out = [1,2,3,41]
    clean_line_split = line_split[0:1]+line_split[4:41]
    attack = 1.0
    if line_split[41]=='normal.':
        attack = 0.0
    return LabeledPoint(attack, array([float(x) for x in clean_line_split]))

training_data = raw_data.map(parse_interaction)

Подготовка тестовых данных

Аналогично мы обрабатываем наш файл тестовых данных.

test_data = test_raw_data.map(parse_interaction)

Обнаружение сетевых атак с помощью логистической регрессии

Логистическая регрессия широко используется для прогнозирования бинарного ответа. Spark реализует два алгоритма для решения логистической регрессии: мини-пакетный градиентный спуск и L-BFGS. L-BFGS рекомендуется использовать по сравнению с мини-пакетным градиентным спуском для более быстрой конвергенции.

Обучение классификатора

from pyspark.mllib.classification import LogisticRegressionWithLBFGS
from time import time

# Build the model
t0 = time()
logit_model = LogisticRegressionWithLBFGS.train(training_data)
tt = time() - t0

print "Classifier trained in {} seconds".format(round(tt,3))
Classifier trained in 365.599 seconds

Оценка модели на новых данных

Чтобы измерить ошибку классификации на наших тестовых данных, мы используем map на test_data RDD и модель для прогнозирования каждого класса тестовых точек.

labels_and_preds = test_data.map(lambda p: (p.label, logit_model.predict(p.features)))

Результаты классификации возвращаются в парс с фактической тестовой меткой и прогнозируемой меткой. Это используется для вычисления ошибки классификации с помощью filter и count следующим образом.

t0 = time()
test_accuracy = labels_and_preds.filter(lambda (v, p): v == p).count() / float(test_data.count())
tt = time() - t0

print "Prediction made in {} seconds. Test accuracy is {}".format(round(tt,3), round(test_accuracy,4))
Prediction made in 34.291 seconds. Test accuracy is 0.9164

Это приличная точность. Мы знаем, что есть пространство для улучшения с лучшим выбором переменных, а также путем включения категориальных переменных (например, мы исключили “протокол” и “сервис”).

Выбор модели

Выбор модели или объекта помогает нам строить более интерпретируемые и эффективные модели (или классификатор в данном случае). Для иллюстрации мы рассмотрим два различных подхода: корреляционные матрицы и проверку гипотез.

Использование корреляционной матрицы

В предыдущей записной книжке мы рассчитали корреляционную матрицу, чтобы найти предикторы, которые сильно коррелируют. Там есть много возможных вариантов, чтобы упростить нашу модель. Мы можем выбрать различные комбинации коррелированных переменных и оставить только те, которые их представляют. Читатель может попробовать разные комбинации. Здесь мы выберем для иллюстрации следующее:

  • Из описания задачи KDD Cup 99 мы знаем, что переменная dst_host_same_src_port_rate ссылается на процент последних 100 подключений к одному и тому же порту для одного и того же хоста назначения. В нашей корреляционной матрице (и вспомогательных фреймах данных) мы находим, что эта матрица сильно и положительно коррелирует с src_bytes и srv_count . Первый-это количество байтов, отправленных от источника к месту назначения. Позднее-это количество подключений к той же службе, что и текущее соединение за последние 2 секунды. Мы решили не включать dst_host_same_src_port_rate в нашу модель, так как мы включаем два других.

  • Переменные error_rate и srv_error_rate (% соединений, имеющих ошибки SYN для одного и того же хоста и одной и той же службы соответственно) имеют высокую положительную корреляцию. Более того, набор переменных, с которыми они сильно коррелируют, практически одинаков. Они выглядят очень похожими на нашу модель. Мы будем держать только error_rate .

  • Аналогичная ситуация происходит с rerror_rate и srv_rerror_rate (% соединений, имеющих REJ ошибки), поэтому мы будем держать только rerror_rate .

  • То же самое с переменными с префиксом dst_host_ для предыдущих (например, dst_host_srv_serror_rate ).

Здесь мы остановимся, хотя читатель может продолжать экспериментировать с удалением коррелированных переменных (например, same_srv_rate и diff_srv_rate являются хорошими кандидатами. Наш список переменных, которые мы отбросим, включает в себя:

  • dst_host_same_src_port_rate , (столбец 35).
  • srv_serror_rate (столбец 25).
  • srv_rerror_rate (столбец 27).
  • dst_host_srv_serror_rate (столбец 38).
  • dst_host_srv_rerror_rate (столбец 40).

Оценка новой модели

Перейдем к оценке нашей редуцированной модели. Сначала нам нужно предоставить обучающие и тестовые наборы данных, содержащие только выбранные переменные. Для этого мы определим новую функцию ot parse raw data, которая сохраняет только то, что нам нужно.

def parse_interaction_corr(line):
    line_split = line.split(",")
    # leave_out = [1,2,3,25,27,35,38,40,41]
    clean_line_split = line_split[0:1]+line_split[4:25]+line_split[26:27]+line_split[28:35]+line_split[36:38]+line_split[39:40]
    attack = 1.0
    if line_split[41]=='normal.':
        attack = 0.0
    return LabeledPoint(attack, array([float(x) for x in clean_line_split]))

corr_reduced_training_data = raw_data.map(parse_interaction_corr)
corr_reduced_test_data = test_raw_data.map(parse_interaction_corr)

Примечание: при выборе элементов в разбиении, понимание списка с leave_out список для фильтрации действительно более питонен, чем нарезка и конкатенация, но мы обнаружили, что он менее эффективен. Это очень важно при работе с большими наборами данных. То parse_interaction функции будут вызываться для каждого элемента в RDD, поэтому нам нужно сделать их как можно более эффективными.

Теперь мы можем обучить модель.

# Build the model
t0 = time()
logit_model_2 = LogisticRegressionWithLBFGS.train(corr_reduced_training_data)
tt = time() - t0

print "Classifier trained in {} seconds".format(round(tt,3))
Classifier trained in 319.124 seconds

И оценить его точность по данным испытаний.

labels_and_preds = corr_reduced_test_data.map(lambda p: (p.label, logit_model_2.predict(p.features)))
t0 = time()
test_accuracy = labels_and_preds.filter(lambda (v, p): v == p).count() / float(corr_reduced_test_data.count())
tt = time() - t0

print "Prediction made in {} seconds. Test accuracy is {}".format(round(tt,3), round(test_accuracy,4))
Prediction made in 34.102 seconds. Test accuracy is 0.8599

Как и ожидалось, мы сократили точность, а также время обучения. Однако это не кажется хорошей сделкой! По крайней мере, не для логистической регрессии, а с учетом предикторов, которые мы решили опустить. Мы потеряли довольно много точности и не получили много времени выполнения во время тренировки. Более того, время предсказания не улучшилось.

Использование проверки гипотез

Проверка гипотез является мощным инструментом статистического вывода и обучения, чтобы определить, является ли результат статистически значимым. MLlib поддерживает тесты Пирсона на хи-квадрат ( χ2) для хорошей подгонки и независимости. Тест на добротность подгонки требует входного типа Vector , в то время как тест на независимость требует Matrix в качестве входных данных. Кроме того, MLlib также поддерживает входной тип RDD[LabeledPoint] для включения выбора объектов с помощью тестов независимости хи-квадрат. Опять же, эти методы являются частью пакета Statistics .

В нашем случае мы хотим выполнить какой-то выбор функции, поэтому мы предоставим RDD LabeledPoint . Внутренне MLlib вычислит матрицу непредвиденных обстоятельств и выполнит тест хи-квадрат (χ2) человека. Характеристики должны быть категоричными. Вещественнозначные признаки будут рассматриваться как категориальные в каждом из своих различных значений. Существует ограничение в 1000 различных значений, поэтому нам нужно либо исключить некоторые функции, либо классифицировать их. В этом случае мы будем рассматривать только объекты, которые либо принимают логические значения, либо просто несколько различных числовых значений в нашем наборе данных. Мы могли бы преодолеть это ограничение, определив более сложную функцию parse_interaction , которая правильно классифицирует каждый объект.

feature_names = ["land","wrong_fragment",
             "urgent","hot","num_failed_logins",
             "logged_in","num_compromised",
             "root_shell","su_attempted",
             "num_root","num_file_creations",
             "num_shells","num_access_files",
            "num_outbound_cmds",
             "is_hot_login","is_guest_login",
             "count","srv_count","serror_rate",
             "srv_serror_rate","rerror_rate",
             "srv_rerror_rate","same_srv_rate",
             "diff_srv_rate","srv_diff_host_rate",
             "dst_host_count","dst_host_srv_count",
             "dst_host_same_srv_rate","dst_host_diff_srv_rate",
             "dst_host_same_src_port_rate",
             "dst_host_srv_diff_host_rate","dst_host_serror_rate",
             "dst_host_srv_serror_rate",
             "dst_host_rerror_rate","dst_host_srv_rerror_rate"]


def parse_interaction_categorical(line):
    line_split = line.split(",")
    clean_line_split = line_split[6:41]
    attack = 1.0
    if line_split[41]=='normal.':
        attack = 0.0
    return LabeledPoint(attack, 
        array([float(x) for x in clean_line_split]))

training_data_categorical = raw_data.map(parse_interaction_categorical)


from pyspark.mllib.stat import Statistics

chi = Statistics.chiSqTest(training_data_categorical)

Теперь мы можем проверить полученные значения, поместив их в фрейм данных Pandas .

import pandas as pd
pd.set_option('display.max_colwidth', 30)

records = [(result.statistic, result.pValue) for result in chi]

chi_df = pd.DataFrame(data=records, index= feature_names, columns=["Statistic","p-value"])

chi_df 
земля 0.495304 4.649840e-01
неправильный фрагмент 0.000000 3.068555e+02
срочный 0.000000 3.871844e+01
горячий 0.000000 1.946331e+04
num_failed_logins 0.000000 1.277691e+02
logged_in 0.000000 3.273098e+06
num_compromised 0.000000 2.011863e+03
root_shell 0.000000 1.044918e+03
su_attempted 0.000000 4.340000e+02
num_root 0.000000 2.287168e+04
num_file_creations 0.000000 9.179739e+03
num_shells 0.000000 1.380028e+03
num_access_files 0.000000 1.873477e+04
num_outbound_cmds 1.000000 0.000000e+00
is_hot_login 0.004498 8.070987e+00
is_guest_login 0.000000 1.350051e+04
считать 0.000000 4.546398e+06
srv_count 0.000000 2.296060e+06
error_rate 0.000000 2.684199e+05
srv_serror_rate 0.000000 3.026270e+05
error_rate 0.000000 9.860453e+03
srv_rerror_rate 0.000000 3.247639e+04
same_srv_rate 0.000000 3.999124e+05
diff_srv_rate 0.000000 3.909998e+05
srv_diff_host_rate 0.000000 1.365459e+06
dst_host_count 0.000000 2.520479e+06
dst_host_srv_count 0.000000 1.439086e+06
dst_host_same_srv_rate 0.000000 1.237932e+06
dst_host_diff_srv_rate 0.000000 1.339002e+06
dst_host_same_src_port_rate 0.000000 2.915195e+06
dst_host_srv_diff_host_rate 0.000000 2.226291e+06
dst_host_serror_rate 0.000000 4.074546e+05
dst_host_srv_serror_rate 0.000000 4.550990e+05
dst_host_rerror_rate 0.000000 1.364790e+05
dst_host_srv_rerror_rate 0.000000 2.545474e+05

Из этого мы заключаем, что предикторы land и num_outbound_cmds могут быть удалены из нашей модели, не оказывая существенного влияния на вашу точность. Давай попробуем.

Оценка новой модели

Таким образом, единственной модификацией нашей первой функции parse_interaction будет удаление столбцов 6 и 19, соответствующих двум предикторам, которые мы не хотим быть частью нашей модели.

def parse_interaction_chi(line):
    line_split = line.split(",")
    # leave_out = [1,2,3,6,19,41]
    clean_line_split = line_split[0:1] + line_split[4:6] + line_split[7:19] + line_split[20:41]
    attack = 1.0
    if line_split[41]=='normal.':
        attack = 0.0
    return LabeledPoint(attack, array([float(x) for x in clean_line_split]))

training_data_chi = raw_data.map(parse_interaction_chi)
test_data_chi = test_raw_data.map(parse_interaction_chi)

Теперь мы снова построим классификатор логистической регрессии.

# Build the model
t0 = time()
logit_model_chi = LogisticRegressionWithLBFGS.train(training_data_chi)
tt = time() - t0

print "Classifier trained in {} seconds".format(round(tt,3))
Classifier trained in 356.507 seconds

И оценить в тестовых данных.

labels_and_preds = test_data_chi.map(lambda p: (p.label, logit_model_chi.predict(p.features)))
t0 = time()
test_accuracy = labels_and_preds.filter(lambda (v, p): v == p).count() / float(test_data_chi.count())
tt = time() - t0

print "Prediction made in {} seconds. Test accuracy is {}".format(round(tt,3), round(test_accuracy,4))
Prediction made in 34.656 seconds. Test accuracy is 0.9164

Таким образом, мы можем видеть, что, используя проверку гипотез, мы смогли удалить два предиктора без снижения точности тестирования вообще. Время тренировок также немного улучшилось. Сейчас это может показаться большим сокращением модели, но это нечто при работе с большими источниками данных. Более того, мы должны иметь возможность классифицировать те пять предикторов, которые мы оставили по разным причинам, и либо включить их в модель, либо исключить, если они не являются статистически значимыми.

Кроме того, мы могли бы попытаться удалить некоторые из тех предикторов, которые сильно коррелируют, стараясь не слишком снижать точность. В конце концов, мы должны получить модель, более легкую для понимания и использования.