Рубрики
Без рубрики

Spark & Python: Работа с RDDs (I)

В этом руководстве представлены два различных способа получения данных в базовую структуру данных Spark, RDD.

Автор оригинала: Jose A Dianes.

Инструкции

Моя серия учебных пособий Spark & Python может быть рассмотрена индивидуально, хотя существует более или менее линейная “история”, если следовать ей последовательно. Используя один и тот же набор данных, они пытаются решить с ним связанный набор задач.

Это не единственный, но хороший способ следовать этим учебникам Spark-сначала клонировать репо GitHub , а затем запустить свой собственный IPython notebook в режиме PySpark . Например, если у нас есть автономная установка Spark, запущенная в нашем localhost с максимальным объемом 6 ГБ на узел, назначенный IPython:

MASTER="spark://127.0.0.1:7077" SPARK_EXECUTOR_MEMORY="6G" IPYTHON_OPTS="notebook --pylab inline" ~/spark-1.3.1-bin-hadoop2.6/bin/pyspark

Обратите внимание, что путь к команде pyspark будет зависеть от конкретной установки. Поэтому в качестве требования вам необходимо установить Spark на той же машине, на которой вы собираетесь запустить IPython notebook сервер.

Дополнительные параметры Spark см. в разделе здесь . В общем случае работает правило передачи параметров, описанное в форме spark.executor.memory as SPARK_EXECUTOR_MEMORY при вызове IPython/PySpark.

Наборы данных

Мы будем использовать наборы данных из KDD Cup 1999 .

Рекомендации

Справочником по этим и другим темам, связанным с Spark, является Learning Spark Холдена Карау, Энди Конвински, Патрика Уэнделла и Матея Захарии.

Набор данных соревнований KDD Cup 1999 подробно описан здесь .

Создание RDD

В этом разделе мы представим два различных способа получения данных в базовую структуру данных Spark – Устойчивый распределенный набор данных или RDD . RDD – это распределенная коллекция элементов. Вся работа в Spark выражается либо в создании новых RDDS, преобразовании существующих RDDs, либо в вызове действий над RDDs для вычисления результата.Spark автоматически распределяет данные, содержащиеся в RDDS, по всему кластеру и распараллеливает операции, которые вы выполняете с ними.

Получение файлов данных

В этом блокноте мы будем использовать сокращенный набор данных (10 процентов), предоставленный для Кубка KDD 1999, содержащий почти полмиллиона сетевых взаимодействий. Файл предоставляется в виде файла Gzip , который мы загрузим локально.

import urllib
f = urllib.urlretrieve ("http://kdd.ics.uci.edu/databases/kddcup99/kddcup.data_10_percent.gz", "kddcup.data_10_percent.gz")

Создание RDD из файла

Наиболее распространенный способ создания RDD-загрузить его из файла. Обратите внимание, что текстовый файл Spark может напрямую обрабатывать сжатые файлы.

data_file = "./kddcup.data_10_percent.gz"
raw_data = sc.textFile(data_file)

Теперь у нас есть файл данных, загруженный в raw_data RDD.

Не вдаваясь в Spark преобразования и действия , самое основное, что мы можем сделать, чтобы проверить, правильно ли мы получили содержимое RDD, – это подсчитать() количество строк, загруженных из файла в RDD.

raw_data.count()
494021

Мы также можем проверить первые несколько записей в наших данных.

raw_data.take(5)
[u'0,tcp,http,SF,181,5450,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,8,8,0.00,0.00,0.00,0.00,1.00,0.00,0.00,
9,9,1.00,0.00,0.11,0.00,0.00,0.00,0.00,0.00,normal.',
u'0,tcp,http,SF,239,486,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,8,8,0.00,0.00,0.00,0.00,1.00,0.00,0.00,
19,19,1.00,0.00,0.05,0.00,0.00,0.00,0.00,0.00,normal.', u'0,tcp,http,SF,235,1337,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,8,8,0.00,0.00,0.00,0.00,1.00,0.00,
0.00,29,29,1.00,0.00,0.03,0.00,0.00,0.00,0.00,0.00,normal.', 
u'0,tcp,http,SF,219,1337,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,6,6,0.00,0.00,0.00,0.00,1.00,
0.00,0.00,39,39,1.00,0.00,0.03,0.00,0.00,0.00,0.00,0.00,normal.', 
u'0,tcp,http,SF,217,2032,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,6,6,0.00,0.00,0.00,0.00,1.00,
0.00,0.00,49,49,1.00,0.00,0.02,0.00,0.00,0.00,0.00,0.00,normal.']  

В дальнейших разделах мы будем использовать эти необработанные данные, чтобы узнать о различных преобразованиях и действиях Spark. Не беспокойтесь слишком много о значении этих элементов. Мы подробно рассмотрим их в дальнейших разделах и учебных пособиях.

Создание и RDD с использованием распараллеливания

Другой способ создания RDD-это распараллеливание уже существующего списка.

a = range(100)
    
data = sc.parallelize(a)

Как и раньше, мы можем count() количество элементов в RDD.

data.count()
100

Как и прежде, мы можем получить доступ к первым нескольким элементам в нашем RDD.

data.take(5)
[0, 1, 2, 3, 4]

Основные операции RDD

В этом разделе будут представлены три основные, но важные операции Spark. Два из них-это преобразования |/карта и фильтр . Другой-это действие |/сбор . В то же время мы введем понятие постоянства в Spark.

Получение данных и создание RDD

Как и в нашем первом разделе, мы снова будем использовать сокращенный набор данных (10 процентов), предоставленный для KDD Cup 1999, содержащий почти полмиллиона сетевых взаимодействий. Файл предоставляется в виде файла Gzip, который мы загрузим локально.

import urllib
f = urllib.urlretrieve ("http://kdd.ics.uci.edu/databases/kddcup99/kddcup.data_10_percent.gz", "kddcup.data_10_percent.gz")

data_file = "./kddcup.data_10_percent.gz"
raw_data = sc.textFile(data_file)

Преобразование фильтра

Это преобразование может быть применено к RDDS, чтобы сохранить только элементы, удовлетворяющие определенному условию. Более конкретно, функция вычисляется для каждого элемента в исходном RDD. Новый результирующий RDD будет содержать только те элементы, которые заставляют функцию возвращать True .

Например, представьте, что мы хотим подсчитать, сколько нормально. взаимодействия, которые мы имеем в нашем наборе данных. Мы можем отфильтровать наши raw_data RDD следующим образом.

normal_raw_data = raw_data.filter(lambda x: 'normal.' in x)

Теперь мы можем подсчитать, сколько элементов у нас есть в новом RDD.

  from time import time
  t0 = time()
  normal_count = normal_raw_data.count()
  tt = time() - t0
  print "There are {} 'normal' interactions".format(normal_count)
  print "Count completed in {} seconds".format(round(tt,3))

There are 97278 'normal' interactions  
Count completed in 5.951 seconds

Помните из первого раздела, что у нас есть в общей сложности 494021 образец в нашем 10-процентном наборе данных. Здесь мы видим, что 97278 из них содержат слово normal tag.

Обратите также внимание, что мы измерили прошедшее время для подсчета элементов в RDD. Мы сделали это, потому что хотели указать, что фактические (распределенные) вычисления в Spark происходят, когда мы выполняем действия , а не преобразования . В этом случае count – это действие, которое мы выполняем на RDD. Мы можем применить столько преобразований, сколько захотим, к нашему RDD, и никакие вычисления не будут выполняться до тех пор, пока мы не вызовем первое действие, которое в данном случае занимает несколько секунд.

Преобразование карты

Используя преобразование map в Spark, мы можем применить функцию к каждому элементу в нашем RDD. Лямбды Python особенно выразительны для этого.

В этом случае мы хотим прочитать наш файл данных в формате CSV. Мы можем сделать это, применив лямбда-функцию к каждому элементу в RDD следующим образом.

  from pprint import pprint
  csv_data = raw_data.map(lambda x: x.split(","))
  t0 = time()
  head_rows = csv_data.take(5)
  tt = time() - t0
  print "Parse completed in {} seconds".format(round(tt,3))
  pprint(head_rows[0])

Parse completed in 1.715 seconds  
    [u'0',
     u'tcp',
     u'http',
     u'SF',
     u'181',
     u'5450',
     u'0',
     u'0',
     u'0',
     u'0',
     u'0',
     u'1',
     u'0',
     u'0',
     u'0',
     u'0',
     u'0',
     u'0',
     u'0',
     u'0',
     u'0',
     u'0',
     u'8',
     u'8',
     u'0.00',
     u'0.00',
     u'0.00',
     u'0.00',
     u'1.00',
     u'0.00',
     u'0.00',
     u'9',
     u'9',
     u'1.00',
     u'0.00',
     u'0.11',
     u'0.00',
     u'0.00',
     u'0.00',
     u'0.00',
     u'0.00',
     u'normal.']   

Опять же, все действия происходят, как только мы вызываем первую искру action (т. Е. take в данном случае). Что, если мы возьмем много элементов, а не только первые несколько из них?

    t0 = time()
    head_rows = csv_data.take(100000)
    tt = time() - t0
    print "Parse completed in {} seconds".format(round(tt,3))

Parse completed in 8.629 seconds

Мы видим, что это занимает больше времени. Функция map теперь применяется распределенным образом ко многим элементам в RDD, что увеличивает время выполнения.

Использование карты с предопределенными функциями

Конечно, мы можем использовать предопределенные функции с map , а не только lambda . Представьте, что мы хотим иметь каждый элемент в RDD в виде пары ключ-значение, где ключ-это тег (например, normal ), а значение-это весь список элементов, представляющих строку в файле формата CSV. Мы могли бы действовать следующим образом.

def parse_interaction(line):
    elems = line.split(",")
    tag = elems[41]
    return (tag, elems)

key_csv_data = raw_data.map(parse_interaction)
head_rows = key_csv_data.take(5)
pprint(head_rows[0])
(u'normal.',
     [u'0',
      u'tcp',
      u'http',
      u'SF',
      u'181',
      u'5450',
      u'0',
      u'0',
      u'0',
      u'0',
      u'0',
      u'1',
      u'0',
      u'0',
      u'0',
      u'0',
      u'0',
      u'0',
      u'0',
      u'0',
      u'0',
      u'0',
      u'8',
      u'8',
      u'0.00',
      u'0.00',
      u'0.00',
      u'0.00',
      u'1.00',
      u'0.00',
      u'0.00',
      u'9',
      u'9',
      u'1.00',
      u'0.00',
      u'0.11',
      u'0.00',
      u'0.00',
      u'0.00',
      u'0.00',
      u'0.00',
      u'normal.'])

Это было легко, не так ли?

В разделе о работе с парами ключ-значение мы будем использовать этот тип RDDS для выполнения агрегирования данных (например, подсчет по ключу).

Действие по сбору

До сих пор мы использовали действия count и take . Еще одно основное действие, которому мы должны научиться, – это собирать . В основном это приведет к тому, что все элементы в RDD будут помещены в память, чтобы мы могли работать с ними. По этой причине его следует использовать с осторожностью, особенно при работе с большими RDDS.

Пример использования наших необработанных данных.

t0 = time()
all_raw_data = raw_data.collect()
tt = time() - t0
print "Data collected in {} seconds".format(round(tt,3))
Data collected in 17.927 seconds

Это заняло больше времени, чем любое другое действие, которое мы использовали раньше, конечно. Каждый рабочий узел Spark, у которого есть фрагмент RDD, должен быть скоординирован, чтобы получить его часть, а затем уменьшить все вместе.

В качестве последнего примера, объединяющего все предыдущие, мы хотим собрать все взаимодействия normal в виде пар ключ-значение.

# get data from file
data_file = "./kddcup.data_10_percent.gz"
raw_data = sc.textFile(data_file)

# parse into key-value pairs
key_csv_data = raw_data.map(parse_interaction)

# filter normal key interactions
normal_key_interactions = key_csv_data.filter(lambda x: x[0] == "normal.")

# collect all
t0 = time()
all_normal = normal_key_interactions.collect()
tt = time() - t0
normal_count = len(all_normal)
print "Data collected in {} seconds".format(round(tt,3))
print "There are {} 'normal' interactions".format(normal_count)
Data collected in 12.485 seconds
There are 97278 normal interactions

Это число совпадает с предыдущим числом для нормальных взаимодействий. Новая процедура занимает больше времени. Это связано с тем, что мы извлекаем все данные с помощью collect , а затем используем len Python в результирующем списке. До этого мы просто подсчитывали общее количество элементов в RDD с помощью count .

Выборка Rds

До сих пор мы вводили создание RDD вместе с некоторыми базовыми преобразованиями, такими как map и filter , и некоторыми действиями, такими как count , take и collect .

В этом разделе будет показано, как выполнить выборку RDDS. Что касается преобразований, то будет введена выборка , поскольку она будет полезна во многих сценариях статистического обучения. Затем мы сравним результаты с действием take Sample .

Получение данных и создание RDD

В этом случае мы будем использовать полный набор данных , предоставленный для KDD Cup 1999, содержащий почти полмиллиона сетевых взаимодействий. Файл предоставляется в виде файла Gzip, который мы загрузим локально.

import urllib
f = urllib.urlretrieve ("http://kdd.ics.uci.edu/databases/kddcup99/kddcup.data.gz", "kddcup.data.gz")

Теперь мы можем использовать этот файл для создания нашего RDD.

data_file = "./kddcup.data.gz"
raw_data = sc.textFile(data_file)

Выборка Rds

В Spark есть две операции выборки: преобразование образец и действие взять образец . Используя преобразование, мы можем сказать Spark, чтобы применить последовательное преобразование к образцу данного RDD. Используя действие, мы извлекаем данный образец и можем поместить его в локальную память для использования в любой другой стандартной библиотеке (например, Scikit-learn).

Преобразование образца

Преобразование sample принимает до трех параметров. Во-первых, выполняется ли выборка с заменой или нет. Во-вторых, размер выборки в виде дроби. Наконец, мы можем дополнительно предоставить случайное семя .

raw_data_sample = raw_data.sample(False, 0.1, 1234)
sample_size = raw_data_sample.count()
total_size = raw_data.count()
print "Sample size is {} of {}".format(sample_size, total_size)
Sample size is 489957 of 4898431

Но сила выборки как преобразования заключается в том, что она выполняется как часть последовательности дополнительных преобразований. Это станет более мощным, как только мы начнем выполнять агрегации и операции с парами ключ-значение, и будет особенно полезно при использовании библиотеки машинного обучения Spark MLlib.

А пока представьте, что мы хотим получить приблизительное соотношение normal. взаимодействия в нашем наборе данных. Мы могли бы сделать это, подсчитав общее количество тегов, как мы делали в предыдущих блокнотах. Однако мы хотим получить более быстрый ответ, и нам нужен не точный ответ, а просто приближение. Мы можем сделать это следующим образом.

from time import time

# transformations to be applied
raw_data_sample_items = raw_data_sample.map(lambda x: x.split(","))
sample_normal_tags = raw_data_sample_items.filter(lambda x: "normal." in x)

# actions + time
t0 = time()
sample_normal_tags_count = sample_normal_tags.count()
tt = time() - t0

sample_normal_ratio = sample_normal_tags_count / float(sample_size)
print "The ratio of 'normal' interactions is {}".format(round(sample_normal_ratio,3)) 
print "Count done in {} seconds".format(round(tt,3))
The ratio of 'normal' interactions is 0.199  

Count done in 44.523 seconds

Давайте сравним это с вычислением соотношения без выборки.

# transformations to be applied
raw_data_items = raw_data.map(lambda x: x.split(","))
normal_tags = raw_data_items.filter(lambda x: "normal." in x)

# actions + time
t0 = time()
normal_tags_count = normal_tags.count()
tt = time() - t0

normal_ratio = normal_tags_count / float(total_size)
print "The ratio of 'normal' interactions is {}".format(round(normal_ratio,3)) 
print "Count done in {} seconds".format(round(tt,3))
The ratio of 'normal' interactions is 0.199  
Count done in 91.09 seconds  

Со временем мы снова сможем видеть. Чем больше преобразований мы применяем после выборки, тем больше этот выигрыш. Это связано с тем, что без выборки все преобразования применяются к полному набору данных.

Действие взять Образец

Если нам нужно захватить образец необработанных данных из нашего RDD в локальную память для использования другими библиотеками, не являющимися Spark, можно использовать takeSample .

Синтаксис очень похож, но в этом случае мы указываем количество элементов вместо размера выборки как часть полного размера данных.

t0 = time()
raw_data_sample = raw_data.takeSample(False, 400000, 1234)
normal_data_sample = [x.split(",") for x in raw_data_sample if "normal." in x]
tt = time() - t0

normal_sample_size = len(normal_data_sample)

normal_ratio = normal_sample_size / 400000.0
print "The ratio of 'normal' interactions is {}".format(normal_ratio)
print "Count done in {} seconds".format(round(tt,3))
The ratio of 'normal' interactions is 0.1988025  
Count done in 76.166 seconds  

Процесс был очень похож на предыдущий. Мы получили выборку примерно из 10 процентов данных, а затем отфильтровали и разделили.

Однако это заняло больше времени, даже с немного меньшей выборкой. Причина в том, что Spark просто распределил выполнение процесса выборки. Фильтрация и разделение результатов выполнялись локально в одном узле.

Установка операций на RDDS

Spark поддерживает многие операции, которые мы имеем в математических наборах, таких как объединение и пересечение, даже если сами красные не являются правильными наборами. Важно отметить, что эти операции требуют, чтобы RDDS, с которыми работают, были одного типа.

Операции набора довольно просты для понимания, поскольку они работают так, как ожидалось. Единственное соображение исходит из того факта, что RDDS не являются реальными наборами, и поэтому такие операции, как объединение RDDs, не удаляют дубликаты. В этой тетради мы кратко рассмотрим вычитание , различение и декартово .

Получение данных и создание RDD

Как и в нашем первом разделе, мы снова будем использовать сокращенный набор данных (10 процентов), предоставленный для KDD Cup 1999, содержащий почти полмиллиона сетевых взаимодействий. Файл предоставляется в виде файла Gzip, который мы загрузим локально.

import urllib
f = urllib.urlretrieve ("http://kdd.ics.uci.edu/databases/kddcup99/kddcup.data_10_percent.gz", "kddcup.data_10_percent.gz")

data_file = "./kddcup.data_10_percent.gz"
raw_data = sc.textFile(data_file)

Получение Атакующих Взаимодействий С Помощью вычитания

Для иллюстрации представьте, что у нас уже есть RDD с ненормальными (нормальными) взаимодействиями из некоторого предыдущего анализа.

normal_raw_data = raw_data.filter(lambda x: "normal." in x)

Мы можем получить взаимодействия атаки, вычитая нормальные взаимодействия из исходного нефильтрованного RDD следующим образом.

attack_raw_data = raw_data.subtract(normal_raw_data)

Давайте проведем некоторые подсчеты, чтобы проверить наши результаты.

from time import time

# count all
t0 = time()
raw_data_count = raw_data.count()
tt = time() - t0
print "All count in {} secs".format(round(tt,3))
All count in 5.261 secs
# count normal
t0 = time()
normal_raw_data_count = normal_raw_data.count()
tt = time() - t0
print "Normal count in {} secs".format(round(tt,3))

Нормальный отсчет в 5,571 секунды

# count attacks
t0 = time()
attack_raw_data_count = attack_raw_data.count()
tt = time() - t0
print "Attack count in {} secs".format(round(tt,3))

Количество атак за 12,075 секунды

print "There are {} normal interactions and {} attacks, \
from a total of {} interactions".format(normal_raw_data_count,attack_raw_data_count,raw_data_count)
There are 97278 normal interactions and 396743 attacks, from a total of 494021 interactions

Итак, теперь у нас есть два RDDS, один с нормальным взаимодействием, а другой с атаками.

Комбинации протоколов и служб С использованием декартовой

Мы можем вычислить декартово произведение между двумя RDDS, используя декартово преобразование. Он возвращает все возможные пары элементов между двумя RDDS. В нашем случае мы будем использовать его для генерации всех возможных комбинаций между сервисом и протоколом в наших сетевых взаимодействиях.

Прежде всего, нам нужно изолировать каждую коллекцию значений в двух отдельных RDDS. Для этого мы будем использовать distinct в наборе данных, проанализированном CSV. Из описания набора данных мы знаем, что протокол-это второй столбец, а служба-третий (тег-последний, а не первый, как показано на странице).

Итак, сначала давайте разберемся с протоколами.

csv_data = raw_data.map(lambda x: x.split(","))
protocols = csv_data.map(lambda x: x[1]).distinct()
protocols.collect()

[u’udp’, u’icmp’, u’tcp’]

Теперь мы делаем то же самое для услуг.

services = csv_data.map(lambda x: x[2]).distinct()
services.collect()
[u'domain',
u'http_443',
u'Z39_50',
u'smtp',
u'urp_i',
u'private',
u'echo',
u'shell',
u'red_i',
u'eco_i',
u'sunrpc',
u'ftp_data',
u'urh_i',
u'pm_dump',
u'pop_3',
u'pop_2',
u'systat',
u'ftp',
u'uucp',
u'whois',
u'netbios_dgm',
u'efs',
u'remote_job',
u'daytime',
u'ntp_u',
u'finger',
u'ldap',
u'netbios_ns',
u'kshell',
u'iso_tsap',
u'ecr_i',
u'nntp',
u'printer',
u'domain_u',
u'uucp_path',
u'courier',
u'exec',
u'time',
u'netstat',
u'telnet',
u'gopher',
u'rje',
u'sql_net',
u'link',
u'auth',
u'netbios_ssn',
u'csnet_ns',
u'X11',
u'IRC',
u'tftp_u',
u'login',
u'supdup',
u'name',
u'nnsp',
u'mtp',
u'http',
u'bgp',
u'ctf',
u'hostnames',
u'klogin',
u'vmnet',
u'tim_i',
u'discard',
u'imap4',
u'other',
u'ssh']

В данном случае это более длинный список.

Теперь мы можем сделать декартово произведение.

product = protocols.cartesian(services).collect()
print "There are {} combinations of protocol X service".format(len(product))
There are 198 combinations of protocol X service

Очевидно, что для таких небольших RDDS на самом деле не имеет смысла использовать декартово произведение. Мы могли бы прекрасно собрать значения после использования distinct и сделать декартово произведение локально. Кроме того, distinct и декартовы являются дорогостоящими операциями, поэтому их следует использовать с осторожностью, когда операционные наборы данных велики.