Рубрики
Без рубрики

Spark & Python: Деревья решений MLlib

В этом руководстве вы узнаете, как использовать библиотеку машинного обучения Spark MLlib для построения классификатора дерева решений для обнаружения сетевых атак и использовать полные наборы данных для тестирования возможностей Spark с большими наборами данных.

Автор оригинала: Jose A Dianes.

Инструкции

Моя серия учебных пособий Spark & Python может быть рассмотрена индивидуально, хотя существует более или менее линейная “история”, если следовать ей последовательно. Используя один и тот же набор данных, они пытаются решить с ним связанный набор задач.

Это не единственный, но хороший способ следовать этим учебникам Spark-сначала клонировать репо GitHub , а затем запустить свой собственный IPython notebook в режиме PySpark . Например, если у нас есть автономная установка Spark, запущенная в нашем localhost с максимальным объемом 6 ГБ на узел, назначенный IPython:

MASTER="spark://127.0.0.1:7077" SPARK_EXECUTOR_MEMORY="6G" IPYTHON_OPTS="notebook --pylab inline" ~/spark-1.3.1-bin-hadoop2.6/bin/pyspark

Обратите внимание, что путь к команде pyspark будет зависеть от конкретной установки. Поэтому в качестве требования вам необходимо установить Spark на той же машине, на которой вы собираетесь запустить IPython notebook сервер.

Дополнительные параметры Spark см. в разделе здесь . В общем случае работает правило передачи параметров, описанное в форме spark.executor.memory as SPARK_EXECUTOR_MEMORY при вызове IPython/PySpark.

Наборы данных

Мы будем использовать наборы данных из KDD Cup 1999 .

Рекомендации

Справочником по этим и другим темам, связанным с Spark, является Learning Spark Холдена Карау, Энди Конвински, Патрика Уэнделла и Матея Захарии.

Набор данных соревнований KDD Cup 1999 подробно описан здесь .

Вступление

В этом уроке мы будем использовать библиотеку машинного обучения Spark MLlib для построения Дерева решений классификатора для обнаружения сетевых атак. Мы будем использовать полный набор данных KDD Cup 1999 для тестирования возможностей Spark с большими наборами данных.

Деревья решений являются популярным инструментом машинного обучения отчасти потому, что они легко интерпретируются, обрабатывают категориальные функции, расширяются до параметров многоклассовой классификации, не требуют масштабирования объектов и способны фиксировать нелинейности и взаимодействия объектов. В этой тетради мы сначала обучим дерево классификации, включающее каждый отдельный предиктор. Затем мы будем использовать наши результаты для выбора модели. Как только мы выясним наиболее важные из них (основные расколы в дереве), мы построим минимальное дерево, используя только три из них (первые два уровня дерева, чтобы сравнить производительность и точность.

Для вашей справки при отображении времени выполнения во время обработки этого руководства наш кластер Spark содержит:

  • Восемь узлов, один из которых выступает в качестве ведущего, а остальные-в качестве рабочих (в общей сложности доступно 14 исполнительных ядер).
  • Каждый узел содержит 8 ГБ оперативной памяти, причем для каждого узла используется 6 ГБ.
  • Каждый узел имеет двухъядерный процессор Intel с частотой 2,4 ГГц.

Получение данных и создание RDD

Как мы уже говорили, на этот раз мы будем использовать полный набор данных , предоставленный для KDD Cup 1999 , содержащий почти полмиллиона взаимодействий в Нью-Йорке. Файл предоставляется в виде файла Gzip, который мы загрузим локально.

import urllib


f = urllib.urlretrieve ("http://kdd.ics.uci.edu/databases/kddcup99/kddcup.data.gz", "kddcup.data.gz")


data_file = "./kddcup.data.gz"
raw_data = sc.textFile(data_file)

print "Train data size is {}".format(raw_data.count())
Train data size is 4898431

KDD Cup 1999 также предоставляет тестовые данные, которые мы загрузим в отдельный RDD.

ft = urllib.urlretrieve("http://kdd.ics.uci.edu/databases/kddcup99/corrected.gz", "corrected.gz")


test_data_file = "./corrected.gz"
test_raw_data = sc.textFile(test_data_file)

print "Test data size is {}".format(test_raw_data.count())
Test data size is 311029

Обнаружение сетевых атак с использованием Деревьев решений

В этом разделе мы обучим дерево классификации , которое , как и в случае с логистической регрессией , будет предсказывать, является ли сетевое взаимодействие нормальным или атакой .

Обучение дерева классификации с использованием MLlib требует некоторых параметров:

  • Данные обучения
  • Классы Num
  • Категориальные функции в: отображение от столбца к категориальным переменным arity. Это необязательно, хотя это должно повысить точность модели. Однако это требует, чтобы мы заранее знали уровни в наших категориальных переменных. во-вторых, нам нужно проанализировать наши данные, чтобы преобразовать метки в целочисленные значения в диапазоне arity.
  • Метрика примесей
  • Максимальная глубина дерева
  • И дерево максимальное количество бункеров

В следующем разделе мы рассмотрим, как получить все метки в наборе данных и преобразовать их в числовые коэффициенты.

Подготовка данных

Как мы уже говорили, для того, чтобы извлечь выгоду из способности деревьев легко работать с категориальными переменными, нам нужно преобразовать их в числовые коэффициенты. Но сначала нам нужно получить все возможные уровни. Мы будем использовать преобразования set в RDD, проанализированном в формате CSV.

from pyspark.mllib.regression import LabeledPoint
from numpy import array

csv_data = raw_data.map(lambda x: x.split(","))
test_csv_data = test_raw_data.map(lambda x: x.split(","))

protocols = csv_data.map(lambda x: x[1]).distinct().collect()
services = csv_data.map(lambda x: x[2]).distinct().collect()
flags = csv_data.map(lambda x: x[3]).distinct().collect()

И теперь мы можем использовать эти списки Python в нашей функции create_labeled_point . Если уровень фактора отсутствует в данных обучения, мы назначаем особый уровень. Помните, что мы не можем использовать данные тестирования для обучения нашей модели, даже уровни факторов. Данные тестирования представляют собой неизвестное нам в реальном случае.

def create_labeled_point(line_split):
    # leave_out = [41]
    clean_line_split = line_split[0:41]
    
    # convert protocol to numeric categorical variable
    try: 
        clean_line_split[1] = protocols.index(clean_line_split[1])
    except:
        clean_line_split[1] = len(protocols)
        
    # convert service to numeric categorical variable
    try:
        clean_line_split[2] = services.index(clean_line_split[2])
    except:
        clean_line_split[2] = len(services)
    
    # convert flag to numeric categorical variable
    try:
        clean_line_split[3] = flags.index(clean_line_split[3])
    except:
        clean_line_split[3] = len(flags)
    
    # convert label to binary label
    attack = 1.0
    if line_split[41]=='normal.':
        attack = 0.0
        
    return LabeledPoint(attack, array([float(x) for x in clean_line_split]))

training_data = csv_data.map(create_labeled_point)
test_data = test_csv_data.map(create_labeled_point)

Обучение классификатора

Теперь мы готовы обучить наше классификационное дерево. Мы сохраним значение maxDepth небольшим. Это приведет к меньшей точности, но мы получим меньше расщеплений, чтобы позже мы могли лучше интерпретировать дерево. В производственной системе мы постараемся увеличить это значение, чтобы найти лучшую точность.

from pyspark.mllib.tree import DecisionTree, DecisionTreeModel
from time import time

# Build the model
t0 = time()
tree_model = DecisionTree.trainClassifier(training_data, numClasses=2, 
                                          categoricalFeaturesInfo={1: len(protocols), 2: len(services), 3: len(flags)},
                                          impurity='gini', maxDepth=4, maxBins=100)
tt = time() - t0

print "Classifier trained in {} seconds".format(round(tt,3))
Classifier trained in 439.971 seconds

Оценка модели

Чтобы измерить ошибку классификации в наших тестовых данных, мы используем map в test_data RDD и модель для прогнозирования каждого класса тестовых точек.

predictions = tree_model.predict(test_data.map(lambda p: p.features))
labels_and_preds = test_data.map(lambda p: p.label).zip(predictions)

Результаты классификации возвращаются в pars с фактической меткой теста и прогнозируемой меткой. Это используется для вычисления ошибки классификации с помощью filter и count следующим образом.

t0 = time()
test_accuracy = labels_and_preds.filter(lambda (v, p): v == p).count() / float(test_data.count())
tt = time() - t0

print "Prediction made in {} seconds. Test accuracy is {}".format(round(tt,3), round(test_accuracy,4))
Prediction made in 38.651 seconds. Test accuracy is 0.9155

ПРИМЕЧАНИЕ: преобразование zip не работает должным образом с PySpark 1.2.1. Это происходит в 1.3

Интерпретация модели

Понимание наших расщеплений дерева-отличное упражнение для того, чтобы объяснить наши классификационные метки с точки зрения предикторов и значений, которые они принимают. Используя метод toDebugString в нашей трехмодельной модели, мы можем получить много информации о разделениях, узлах и т. Д.

print "Learned classification tree model:"
print tree_model.toDebugString()

Изученная модель дерева классификации:

DecisionTreeModel classifier of depth 4 with 27 nodes
  If (feature 22 <= 89.0)
   If (feature 3 in {2.0,3.0,4.0,7.0,9.0,10.0})
    If (feature 36 <= 0.43)
     If (feature 28 <= 0.19)
      Predict: 1.0
     Else (feature 28 > 0.19)
      Predict: 0.0
    Else (feature 36 > 0.43)
     If (feature 2 in {0.0,3.0,15.0,26.0,27.0,36.0,42.0,58.0,67.0})
      Predict: 0.0
     Else (feature 2 not in {0.0,3.0,15.0,26.0,27.0,36.0,42.0,58.0,67.0})
      Predict: 1.0
   Else (feature 3 not in {2.0,3.0,4.0,7.0,9.0,10.0})
    If (feature 2 in {50.0,51.0})
     Predict: 0.0
    Else (feature 2 not in {50.0,51.0})
     If (feature 32 <= 168.0)
      Predict: 1.0
     Else (feature 32 > 168.0)
      Predict: 0.0
  Else (feature 22 > 89.0)
   If (feature 5 <= 0.0)
    If (feature 11 <= 0.0)
     If (feature 31 <= 253.0)
      Predict: 1.0
     Else (feature 31 > 253.0)
      Predict: 1.0
    Else (feature 11 > 0.0)
     If (feature 2 in {12.0})
      Predict: 0.0
     Else (feature 2 not in {12.0})
      Predict: 1.0
   Else (feature 5 > 0.0)
    If (feature 29 <= 0.08)
     If (feature 2 in {3.0,4.0,26.0,36.0,42.0,58.0,68.0})
      Predict: 0.0
     Else (feature 2 not in {3.0,4.0,26.0,36.0,42.0,58.0,68.0})
      Predict: 1.0
    Else (feature 29 > 0.08)
     Predict: 1.0

Например, сетевое взаимодействие со следующими функциями (см. Описание здесь ) будет классифицироваться нашей моделью как атака:

  • count , количество подключений к тому же хосту, что и текущее соединение за последние две секунды, превышает 32.
  • dst_bytes , количество байтов данных от места назначения до источника, равно 0.
  • сервис не является ни уровнем 0, ни 52.
  • logged_in имеет значение false.

Из нашего списка услуг мы знаем, что:

print "Service 0 is {}".format(services[0])
print "Service 52 is {}".format(services[52])
Service 0 is urp_i  
Service 52 is tftp_u  

Таким образом, мы можем охарактеризовать сетевые взаимодействия с более чем 32 подключениями к одному и тому же серверу за последние 2 секунды, передавая нулевые байты из пункта назначения в источник , где служба не является ни urp_i , ни tftp_u и не вошла в систему, как сетевые атаки. Аналогичный подход может быть использован для каждого терминального узла дерева.

Мы видим, что count – это первый узел, разделенный в дереве. Помните, что каждый раздел выбирается жадно, выбирая лучшее разделение из набора возможных разбиений, чтобы максимизировать прирост информации в узле дерева (см. Подробнее здесь ). На втором уровне мы находим переменные flag (нормальное или ошибочное состояние соединения) и dst_bytes (количество байтов данных от места назначения до источника) и так далее.

Эта объясняющая способность дерева классификации (или регрессии) является одним из его основных преимуществ. Понимание данных является ключевым фактором для построения более совершенных моделей.

Построение минимальной модели С использованием Трех основных расщеплений

Итак, теперь, когда мы знаем основные функции, предсказывающие сетевую атаку, благодаря нашим разделениям дерева классификации, давайте использовать их для построения минимального дерева классификации только с тремя основными переменными: count , dst_bytes и flag .

Нам нужно определить соответствующую функцию для создания помеченных точек.

def create_labeled_point_minimal(line_split):
    # leave_out = [41]
    clean_line_split = line_split[3:4] + line_split[5:6] + line_split[22:23]
    
    # convert flag to numeric categorical variable
    try:
        clean_line_split[0] = flags.index(clean_line_split[0])
    except:
        clean_line_split[0] = len(flags)
    
    # convert label to binary label
    attack = 1.0
    if line_split[41]=='normal.':
        attack = 0.0
        
    return LabeledPoint(attack, array([float(x) for x in clean_line_split]))

training_data_minimal = csv_data.map(create_labeled_point_minimal)
test_data_minimal = test_csv_data.map(create_labeled_point_minimal)

Которые мы используем для обучения модели.

# Build the model
t0 = time()
tree_model_minimal = DecisionTree.trainClassifier(
    training_data_minimal, numClasses=2, 
    categoricalFeaturesInfo={0: len(flags)},
    impurity='gini', maxDepth=3, maxBins=32)
tt = time() - t0

print "Classifier trained in {} seconds".format(round(tt,3))
Classifier trained in 226.519 seconds

Теперь мы можем предсказать по данным тестирования и рассчитать точность.

predictions_minimal = tree_model_minimal.predict(test_data_minimal.map(lambda p: p.features))
labels_and_preds_minimal = test_data_minimal.map(lambda p: p.label).zip(predictions_minimal)


t0 = time()
test_accuracy = labels_and_preds_minimal.filter(lambda (v, p): v == p).count() / float(test_data_minimal.count())
tt = time() - t0

print "Prediction made in {} seconds. Test accuracy is {}".format(round(tt,3), round(test_accuracy,4))
Prediction made in 23.202 seconds. Test accuracy is 0.909

Таким образом, мы обучили дерево классификации только с тремя наиболее важными предикторами в половине случаев и с не такой уж плохой точностью. На самом деле, дерево классификации-это очень хороший инструмент выбора модели!