Рубрики
Без рубрики

Spark & Python: SQL и фреймы данных

Этот учебник познакомит вас с возможностями Spark. Используя язык SQL и фреймы данных, вы можете легко выполнять исследовательский анализ данных.

Автор оригинала: Jose A Dianes.

Инструкции

Моя серия учебных пособий Spark & Python может быть рассмотрена индивидуально, хотя существует более или менее линейная “история”, если следовать ей последовательно. Используя один и тот же набор данных, они пытаются решить с ним связанный набор задач.

Это не единственный, но хороший способ следовать этим учебникам Spark-сначала клонировать репо GitHub , а затем запустить свой собственный IPython notebook в режиме PySpark . Например, если у нас есть автономная установка Spark, запущенная в нашем localhost с максимальным объемом 6 ГБ на узел, назначенный IPython:

MASTER="spark://127.0.0.1:7077" SPARK_EXECUTOR_MEMORY="6G" IPYTHON_OPTS="notebook --pylab inline" ~/spark-1.3.1-bin-hadoop2.6/bin/pyspark

Обратите внимание, что путь к команде pyspark будет зависеть от конкретной установки. Поэтому в качестве требования вам необходимо установить Spark на той же машине, на которой вы собираетесь запустить IPython notebook сервер.

Дополнительные параметры Spark см. в разделе здесь . В общем случае работает правило передачи параметров, описанное в форме spark.executor.memory as SPARK_EXECUTOR_MEMORY при вызове IPython/PySpark.

Наборы данных

Мы будем использовать наборы данных из KDD Cup 1999 .

Рекомендации

Справочником по этим и другим темам, связанным с Spark, является Learning Spark Холдена Карау, Энди Конвински, Патрика Уэнделла и Матея Захарии.

Набор данных соревнований KDD Cup 1999 подробно описан здесь .

Вступление

В этом учебном пособии будут представлены возможности Spark для работы с данными структурированным способом. В принципе, все вращается вокруг концепции фрейма данных и использования языка SQL для их запроса. Мы увидим, как абстракция фрейма данных, очень популярная в других экосистемах анализа данных (например, R и Python/Pandas), очень эффективна при выполнении исследовательского анализа данных. На самом деле, очень легко выражать запросы данных при использовании вместе с языком SQL. Кроме того, Spark прозрачно распределяет эту структуру данных на основе столбцов, чтобы сделать процесс запроса максимально эффективным.

Получение данных и создание RDD

Как и в предыдущих блокнотах, мы будем использовать сокращенный набор данных (10 процентов), предоставленный для KDD Cup 1999 , содержащий почти полмиллиона сетевых взаимодействий. Файл предоставляется в виде файла Gzip, который мы загрузим локально.

import urllib
f = urllib.urlretrieve ("http://kdd.ics.uci.edu/databases/kddcup99/kddcup.data_10_percent.gz", "kddcup.data_10_percent.gz")


data_file = "./kddcup.data_10_percent.gz"
raw_data = sc.textFile(data_file).cache()

Получение фрейма данных

Spark DataFrame – это распределенная коллекция данных, организованная в именованные столбцы. Концептуально он эквивалентен таблице в реляционной базе данных или фрейму данных в R или Pandas. Они могут быть построены из широкого спектра источников, таких как существующий RDD в нашем случае.

Точкой входа во все функции SQL в Spark является класс SQLContext . Чтобы создать базовый экземпляр, все, что нам нужно, – это ссылка SparkContext . Поскольку мы запускаем Spark в режиме оболочки (используя PySpark), мы можем использовать для этой цели глобальный контекстный объект sc .

from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)

Вывод схемы

С помощью SQLContext мы готовы создать DataFrame из нашего существующего RDD. Но сначала нам нужно сообщить Spark SQL схему в наших данных.

Spark SQL может преобразовать RDD объектов Row в DataFrame . Строки строятся путем передачи списка пар ключ/значение в виде kwargs классу Row . Ключи определяют имена столбцов, а типы выводятся путем просмотра первой строки. Поэтому важно, чтобы в первой строке RDD не было отсутствующих данных, чтобы правильно вывести схему.

В нашем случае нам сначала нужно разделить данные, разделенные запятыми, а затем использовать информацию в описании задачи KDD 1999 для получения имен столбцов /.

from pyspark.sql import Row

csv_data = raw_data.map(lambda l: l.split(","))
row_data = csv_data.map(lambda p: Row(
    duration=int(p[0]), 
    protocol_type=p[1],
    service=p[2],
    flag=p[3],
    src_bytes=int(p[4]),
    dst_bytes=int(p[5])
    )
)

Как только у нас будет наш RDD Row , мы сможем вывести и зарегистрировать схему.

interactions_df = sqlContext.createDataFrame(row_data)
interactions_df.registerTempTable("interactions")

Теперь мы можем выполнять SQL-запросы над нашим фреймом данных, который был зарегистрирован как таблица.

# Select tcp network interactions with more than 1 second duration and no transfer from destination
tcp_interactions = sqlContext.sql("""
    SELECT duration, dst_bytes FROM interactions WHERE protocol_type = 'tcp' AND duration > 1000 AND dst_bytes = 0
""")
tcp_interactions.show()
duration dst_bytes
    5057     0        
    5059     0        
    5051     0        
    5056     0        
    5051     0        
    5039     0        
    5062     0        
    5041     0        
    5056     0        
    5064     0        
    5043     0        
    5061     0        
    5049     0        
    5061     0        
    5048     0        
    5047     0        
    5044     0        
    5063     0        
    5068     0        
    5062     0        

Результаты SQL-запросов являются RDDS и поддерживают все обычные операции RDD.

# Output duration together with dst_bytes
tcp_interactions_out = tcp_interactions.map(lambda p: "Duration: {}, Dest. bytes: {}".format(p.duration, p.dst_bytes))
for ti_out in tcp_interactions_out.collect():
  print ti_out
    Duration: 5057, Dest. bytes: 0  
    Duration: 5059, Dest. bytes: 0  
    Duration: 5051, Dest. bytes: 0  
    Duration: 5056, Dest. bytes: 0  
    Duration: 5051, Dest. bytes: 0  
    Duration: 5039, Dest. bytes: 0  
    Duration: 5062, Dest. bytes: 0  
    Duration: 5041, Dest. bytes: 0  
    Duration: 5056, Dest. bytes: 0  
    Duration: 5064, Dest. bytes: 0  
    Duration: 5043, Dest. bytes: 0  
    Duration: 5061, Dest. bytes: 0  
    Duration: 5049, Dest. bytes: 0  
    Duration: 5061, Dest. bytes: 0  
    Duration: 5048, Dest. bytes: 0  
    Duration: 5047, Dest. bytes: 0  
    Duration: 5044, Dest. bytes: 0  
    Duration: 5063, Dest. bytes: 0  
    Duration: 5068, Dest. bytes: 0  
    Duration: 5062, Dest. bytes: 0  
    Duration: 5046, Dest. bytes: 0  
    Duration: 5052, Dest. bytes: 0  
    Duration: 5044, Dest. bytes: 0  
    Duration: 5054, Dest. bytes: 0  
    Duration: 5039, Dest. bytes: 0  
    Duration: 5058, Dest. bytes: 0  
    Duration: 5051, Dest. bytes: 0  
    Duration: 5032, Dest. bytes: 0  
    Duration: 5063, Dest. bytes: 0  
    Duration: 5040, Dest. bytes: 0  
    Duration: 5051, Dest. bytes: 0  
    Duration: 5066, Dest. bytes: 0  
    Duration: 5044, Dest. bytes: 0  
    Duration: 5051, Dest. bytes: 0  
    Duration: 5036, Dest. bytes: 0  
    Duration: 5055, Dest. bytes: 0  
    Duration: 2426, Dest. bytes: 0  
    Duration: 5047, Dest. bytes: 0  
    Duration: 5057, Dest. bytes: 0  
    Duration: 5037, Dest. bytes: 0  
    Duration: 5057, Dest. bytes: 0  
    Duration: 5062, Dest. bytes: 0  
    Duration: 5051, Dest. bytes: 0  
    Duration: 5051, Dest. bytes: 0  
    Duration: 5053, Dest. bytes: 0  
    Duration: 5064, Dest. bytes: 0  
    Duration: 5044, Dest. bytes: 0  
    Duration: 5051, Dest. bytes: 0  
    Duration: 5033, Dest. bytes: 0  
    Duration: 5066, Dest. bytes: 0  
    Duration: 5063, Dest. bytes: 0  
    Duration: 5056, Dest. bytes: 0  
    Duration: 5042, Dest. bytes: 0  
    Duration: 5063, Dest. bytes: 0  
    Duration: 5060, Dest. bytes: 0  
    Duration: 5056, Dest. bytes: 0  
    Duration: 5049, Dest. bytes: 0  
    Duration: 5043, Dest. bytes: 0  
    Duration: 5039, Dest. bytes: 0  
    Duration: 5041, Dest. bytes: 0  
    Duration: 42448, Dest. bytes: 0  
    Duration: 42088, Dest. bytes: 0  
    Duration: 41065, Dest. bytes: 0  
    Duration: 40929, Dest. bytes: 0  
    Duration: 40806, Dest. bytes: 0  
    Duration: 40682, Dest. bytes: 0  
    Duration: 40571, Dest. bytes: 0  
    Duration: 40448, Dest. bytes: 0  
    Duration: 40339, Dest. bytes: 0  
    Duration: 40232, Dest. bytes: 0  
    Duration: 40121, Dest. bytes: 0  
    Duration: 36783, Dest. bytes: 0  
    Duration: 36674, Dest. bytes: 0  
    Duration: 36570, Dest. bytes: 0  
    Duration: 36467, Dest. bytes: 0  
    Duration: 36323, Dest. bytes: 0  
    Duration: 36204, Dest. bytes: 0  
    Duration: 32038, Dest. bytes: 0  
    Duration: 31925, Dest. bytes: 0  
    Duration: 31809, Dest. bytes: 0  
    Duration: 31709, Dest. bytes: 0  
    Duration: 31601, Dest. bytes: 0  
    Duration: 31501, Dest. bytes: 0  
    Duration: 31401, Dest. bytes: 0  
    Duration: 31301, Dest. bytes: 0  
    Duration: 31194, Dest. bytes: 0  
    Duration: 31061, Dest. bytes: 0  
    Duration: 30935, Dest. bytes: 0  
    Duration: 30835, Dest. bytes: 0  
    Duration: 30735, Dest. bytes: 0  
    Duration: 30619, Dest. bytes: 0  
    Duration: 30518, Dest. bytes: 0  
    Duration: 30418, Dest. bytes: 0  
    Duration: 30317, Dest. bytes: 0  
    Duration: 30217, Dest. bytes: 0  
    Duration: 30077, Dest. bytes: 0  
    Duration: 25420, Dest. bytes: 0  
    Duration: 22921, Dest. bytes: 0  
    Duration: 22821, Dest. bytes: 0  
    Duration: 22721, Dest. bytes: 0  
    Duration: 22616, Dest. bytes: 0  
    Duration: 22516, Dest. bytes: 0  
    Duration: 22416, Dest. bytes: 0  
    Duration: 22316, Dest. bytes: 0  
    Duration: 22216, Dest. bytes: 0  
    Duration: 21987, Dest. bytes: 0  
    Duration: 21887, Dest. bytes: 0  
    Duration: 21767, Dest. bytes: 0  
    Duration: 21661, Dest. bytes: 0  
    Duration: 21561, Dest. bytes: 0  
    Duration: 21455, Dest. bytes: 0  
    Duration: 21334, Dest. bytes: 0  
    Duration: 21223, Dest. bytes: 0  
    Duration: 21123, Dest. bytes: 0  
    Duration: 20983, Dest. bytes: 0  
    Duration: 14682, Dest. bytes: 0  
    Duration: 14420, Dest. bytes: 0  
    Duration: 14319, Dest. bytes: 0  
    Duration: 14198, Dest. bytes: 0  
    Duration: 14098, Dest. bytes: 0  
    Duration: 13998, Dest. bytes: 0  
    Duration: 13898, Dest. bytes: 0  
    Duration: 13796, Dest. bytes: 0  
    Duration: 13678, Dest. bytes: 0  
    Duration: 13578, Dest. bytes: 0  
    Duration: 13448, Dest. bytes: 0  
    Duration: 13348, Dest. bytes: 0  
    Duration: 13241, Dest. bytes: 0  
    Duration: 13141, Dest. bytes: 0  
    Duration: 13033, Dest. bytes: 0  
    Duration: 12933, Dest. bytes: 0  
    Duration: 12833, Dest. bytes: 0  
    Duration: 12733, Dest. bytes: 0  
    Duration: 12001, Dest. bytes: 0  
    Duration: 5678, Dest. bytes: 0  
    Duration: 5010, Dest. bytes: 0  
    Duration: 1298, Dest. bytes: 0  
    Duration: 1031, Dest. bytes: 0  
    Duration: 36438, Dest. bytes: 0  

Мы можем легко взглянуть на нашу схему фрейма данных, используя print Schema .

interactions_df.printSchema()

root  
 |-- dst_bytes: long (nullable = true)  
 |-- duration: long (nullable = true)  
 |-- flag: string (nullable = true)  
 |-- protocol_type: string (nullable = true)  
 |-- service: string (nullable = true)  
 |-- src_bytes: long (nullable = true)  

Запросы как Операции с фреймами данных

Spark DataFrame предоставляет специфичный для домена язык для обработки структурированных данных. Этот язык включает в себя методы, которые мы можем объединить, чтобы сделать выбор, фильтрацию, группировку и т. Д. Например, предположим, что мы хотим подсчитать, сколько взаимодействий существует для каждого типа протокола. Мы можем действовать следующим образом.

from time import time

t0 = time()
interactions_df.select("protocol_type", "duration", "dst_bytes").groupBy("protocol_type").count().show()
tt = time() - t0

print "Query performed in {} seconds".format(round(tt,3))
protocol_type count   
udp           20354   
tcp           190065  
icmp          283602  
Query performed in 20.568 seconds  

Теперь представьте, что мы хотим подсчитать, сколько взаимодействий длится более 1 секунды, без передачи данных из пункта назначения, сгруппированных по типу протокола. Мы можем просто добавить для фильтрации вызовы к предыдущим.

t0 = time()
interactions_df.select("protocol_type", "duration", "dst_bytes").filter(interactions_df.duration>1000).filter(interactions_df.dst_bytes==0).groupBy("protocol_type").count().show()
tt = time() - t0

print "Query performed in {} seconds".format(round(tt,3))
protocol_type count  
tcp           139    
Query performed in 16.641 seconds  

Мы можем использовать это для выполнения некоторого исследовательского анализа данных . Давайте посчитаем, сколько у нас атак и нормальных взаимодействий. Сначала нам нужно добавить столбец label в наши данные.

def get_label_type(label):
    if label!="normal.":
        return "attack"
    else:
        return "normal"
    
row_labeled_data = csv_data.map(lambda p: Row(
    duration=int(p[0]), 
    protocol_type=p[1],
    service=p[2],
    flag=p[3],
    src_bytes=int(p[4]),
    dst_bytes=int(p[5]),
    label=get_label_type(p[41])
    )
)
interactions_labeled_df = sqlContext.createDataFrame(row_labeled_data)

На этот раз нам не нужно регистрировать схему, так как мы собираемся использовать интерфейс OOqueryinterface.

Давайте проверим, что предыдущее действительно работает, подсчитав атаку и нормальные данные в нашем фрейме данных.

    t0 = time()
    interactions_labeled_df.select("label").groupBy("label").count().show()
    tt = time() - t0
    
    print "Query performed in {} seconds".format(round(tt,3))
label  count   
attack 396743  
normal 97278   
Query performed in 17.325 seconds  

Теперь мы хотим подсчитать их по метке и типу протокола, чтобы увидеть, насколько важен тип протокола для определения того, является ли взаимодействие атакой или нет.

    t0 = time()
    interactions_labeled_df.select("label", "protocol_type").groupBy("label", "protocol_type").count().show()
    tt = time() - t0
    
    print "Query performed in {} seconds".format(round(tt,3))
label  protocol_type count   
attack udp           1177    
attack tcp           113252  
attack icmp          282314  
normal udp           19177   
normal tcp           76813  
normal icmp          1288   
Query performed in 17.253 seconds 

На первый взгляд кажется, что udp взаимодействия находятся в более низкой пропорции между сетевыми атаками по сравнению с другими типами протоколов.

И мы можем делать гораздо более сложные группировки. Например, добавьте к предыдущему “разделение” на основе передачи данных от цели.

t0 = time()
interactions_labeled_df.select("label", "protocol_type", "dst_bytes").groupBy("label", "protocol_type", interactions_labeled_df.dst_bytes==0).count().show()
tt = time() - t0

print "Query performed in {} seconds".format(round(tt,3))
label  protocol_type (dst_bytes = 0) count  
normal icmp          true            1288   
attack udp           true            1166  
attack udp           false           11    
normal udp           true            3594  
normal udp           false           15583  
attack tcp           true            110583  
attack tcp           false           2669  
normal tcp           true            9313  
normal tcp           false           67500  
attack icmp          true            282314  
Query performed in 17.284 seconds  

Мы видим, насколько актуально это новое разделение для определения того, является ли сетевое взаимодействие атакой.

На этом мы остановимся, но мы можем увидеть, насколько эффективен этот тип запросов для изучения наших данных. На самом деле мы можем повторить все разбиения, которые мы видели в предыдущих блокнотах при введении деревьев классификации, просто выбрав, нащупав и отфильтровав наш фрейм данных. Для получения более подробного (но менее реального) списка операций и источников данных Spark DataFrame ознакомьтесь с официальной документацией здесь .