Рубрики
Без рубрики

Программирование PySpark

Этот учебник по программированию PySpark познакомит вас с тем, что такое PySpark, и расскажет о фундаментальных концепциях PySpark, таких как RDDs, DataFrame и потоковая передача PySpark.

Автор оригинала: Swatee.

Python и Apache Spark-самые популярные модные слова в аналитической индустрии. Apache Spark-это популярная платформа с открытым исходным кодом, которая обеспечивает молниеносную обработку данных и поддерживает различные языки, такие как Scala, Python, Java и R. Затем все сводится к вашим языковым предпочтениям и объему работы. В этой статье о программировании PySpark я буду говорить о Spark с Python, чтобы продемонстрировать, как Python использует функциональные возможности Apache Spark.

PySpark – это совместная работа Apache Spark и Python.

Apache Spark -это платформа кластерных вычислений с открытым исходным кодом, основанная на скорости, простоте использования и потоковой аналитике, в то время как Python -это универсальный язык программирования высокого уровня. Он предоставляет широкий спектр библиотек и в основном используется для машинного обучения и потоковой аналитики в реальном времени.

Другими словами, это API Python для Spark, который позволяет вам использовать простоту Python и мощь Apache Spark для укрощения больших данных.

Другими словами, это API Python для Spark, который позволяет вам использовать простоту Python и мощь Apache Spark для укрощения больших данных.

Возможно, вам интересно, почему я выбрал Python для работы с Spark, когда есть другие доступные языки. Чтобы ответить на этот вопрос, я перечислил несколько преимуществ, которыми вы будете пользоваться с Python:

  • Python очень прост в освоении и реализации.
  • Он предоставляет простой и всеобъемлющий API.
  • С Python читаемость кода, обслуживание и знакомство намного лучше.
  • Он предоставляет различные варианты визуализации данных, что затруднительно при использовании Scala или Java.
  • Python поставляется с широким спектром библиотек, таких как numpy, pandas, scikit-learn, seaborn, matplotlib и т. Д.
  • Он поддерживается огромным и активным сообществом.

Теперь, когда вы знаете преимущества программирования PySpark, давайте просто погрузимся в основы PySpark.

Устойчивые распределенные наборы данных (RDDs)

RDDs являются строительными блоками любого приложения Spark. Rds расшифровывается как:

  • Resilient: Он отказоустойчив и способен восстанавливать данные при сбое.
  • Распределенные: Данные распределяются между несколькими узлами в кластере.
  • Набор данных: Набор секционированных данных со значениями.

Это слой абстрактных данных над распределенной коллекцией. Она неизменна по своей природе и следует ленивой трансформации .

С помощью RDDs можно выполнять два типа операций:

  1. Преобразования: Эти операции применяются для создания нового RDD.
  2. Действия: Эти операции применяются к RDD для указания Apache Spark применить вычисления и передать результат обратно драйверу.

Фрейм данных

Dataframe в PySpark-это распределенная коллекция структурированных или полуструктурированных данных. Эти данные в фрейме данных хранятся в строках под именованными столбцами, которые аналогичны таблицам реляционной базы данных или листам Excel.

Он также имеет некоторые общие атрибуты с RDD, такие как Неизменяемый в природе, следует ленивым оценкам и распределяется в природе. Он поддерживает широкий спектр форматов, таких как JSON, CSV, TXT и многие другие. Кроме того, вы можете загрузить его из существующих RDDS или программно указав схему.

PySpark SQL

PySpark SQL-это модуль абстракции более высокого уровня над ядром PySpark. Он в основном используется для обработки структурированных и полуструктурированных наборов данных. Он также предоставляет оптимизированный API, который может считывать данные из различных источников данных, содержащих различные форматы файлов. Таким образом, с помощью PySpark вы можете обрабатывать данные, используя SQL, а также HiveQL. Благодаря этой функции PySpark SQL постепенно набирает популярность среди программистов баз данных и пользователей Apache Hive.

Потоковая передача PySpark

Потоковая передача PySpark – это масштабируемая, отказоустойчивая система, которая следует парадигме пакетной обработки RDD. Он в основном работает в мини-пакетах или пакетных интервалах, которые могут варьироваться от 500 мс до больших интервальных окон.

При этом Spark Streaming получает непрерывный поток входных данных из таких источников, как Apache Flume, Kinesis, Kafka, TCP-сокеты и т. Д. Эти потоковые данные затем внутренне разбиваются на несколько небольших пакетов на основе интервала batch и пересылаются в механизм Spark. Spark Engine обрабатывает эти пакеты данных, используя сложные алгоритмы, выраженные с помощью функций высокого уровня, таких как map, reduce, join и window. После завершения обработки обработанные пакеты затем отправляются в базы данных, файловые системы и живые информационные панели.

При этом Spark Streaming получает непрерывный поток входных данных из таких источников, как Apache Flume, Kinesis, Kafka, TCP-сокеты и т. Д. Эти потоковые данные затем внутренне разбиваются на несколько небольших пакетов на основе интервала || batch || и пересылаются в механизм Spark. Spark Engine обрабатывает эти пакеты данных, используя сложные алгоритмы, выраженные с помощью функций высокого уровня, таких как map, reduce, join и window. После завершения обработки обработанные пакеты затем отправляются в базы данных, файловые системы и живые информационные панели.

Ключевой абстракцией для потоковой передачи Spark является дискретизированный поток (DStream). Потоки построены на RDDS, что позволяет разработчикам Spark работать в одном контексте RDDs и пакетов для решения проблем потоковой передачи. Кроме того, Spark Streaming также интегрируется с MLlib, SQL, фреймами данных и GraphX, что расширяет ваш горизонт функциональных возможностей. Будучи высокоуровневым API, Spark Streaming обеспечивает отказоустойчивую семантику “ровно один раз” для операций с отслеживанием состояния.

ЗАПИСКА : семантика “ровно один раз” означает, что события будут обрабатываться “ровно один раз” всеми операторами в потоковом приложении, даже если произойдет какой-либо сбой.

На приведенной ниже диаграмме представлены основные компоненты потоковой передачи искры.

На приведенной ниже диаграмме представлены основные компоненты потоковой передачи искры.

Как вы можете видеть, данные поступают в поток Spark из различных источников, таких как Kafka, Flume, Twitter, ZeroMQ, Kinesis или TCP-сокеты и многие другие. Кроме того, эти данные обрабатываются с использованием сложных алгоритмов, выраженных с помощью функций высокого уровня, таких как map, reduce, join и window. Наконец, эти обработанные данные передаются в различные файловые системы, базы данных и живые информационные панели для дальнейшего использования.

Я надеюсь, что это дало вам четкое представление о том, как работает потоковая передача PySpark. Давайте теперь перейдем к последней, но самой заманчивой теме этой статьи о программировании PySpark, а именно к машинному обучению.

Машинное обучение

Как вы уже знаете, Python-это зрелый язык, который с незапамятных времен активно используется для науки о данных и машинного обучения. В PySpark машинное обучение облегчается библиотекой Python под названием MLlib (Библиотека машинного обучения). Это не что иное, как оболочка над ядром PySpark, которая выполняет анализ данных с использованием алгоритмов машинного обучения, таких как классификация, кластеризация, линейная регрессия и некоторые другие.

Одной из привлекательных особенностей машинного обучения с помощью PySpark является то, что он работает в распределенных системах и обладает высокой масштабируемостью.

MLlib предоставляет три основные функции машинного обучения с помощью PySpark:

  1. Подготовка данных: It предоставляет различные функции, такие как извлечение, преобразование, выбор, хэширование и т.д.
  2. Алгоритмы машинного обучения: Он использует некоторые популярные и продвинутые алгоритмы регрессии, классификации и кластеризации для машинного обучения.
  3. Утилиты: Он имеет статистические методы, такие как тестирование хи-квадрат, описательная статистика, линейная алгебра и методы оценки моделей.

Позвольте мне показать вам, как реализовать машинное обучение с использованием классификации с помощью логистической регрессии.

Здесь я проведу простой прогнозный анализ данных инспекции пищевых продуктов города Чикаго.

##Importing the required libraries
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import HashingTF, Tokenizer
from pyspark.sql import Row
from pyspark.sql.functions import UserDefinedFunction
from pyspark.sql.types import * ##creating a RDD by importing and parsing the input data
def csvParse(s):
import csv
from StringIO import StringIO
sio = StringIO(s)
value = csv.reader(sio).next()
sio.close()
return value food_inspections = sc.textFile('file:////home/edureka/Downloads/Food_Inspections_Chicago_data.csv')\
.map(csvParse) ##Display data format
food_inspections.take(1)
Здесь я проведу простой прогнозный анализ данных инспекции пищевых продуктов города Чикаго.
#Structuring the data
schema = StructType([
StructField("id", IntegerType(), False),
StructField("name", StringType(), False),
StructField("results", StringType(), False),
StructField("violations", StringType(), True)])
#creating a dataframe and a temporary table (Results) required for the predictive analysis. ##sqlContext is used to perform transformations on structured data
ins_df = spark.createDataFrame(food_inspections.map(lambda l: (int(l[0]), l[1], l[12], l[13])) , schema)
ins_df.registerTempTable('Count_Results')
ins_df.show()
Здесь я проведу простой прогнозный анализ данных инспекции пищевых продуктов города Чикаго.
##Let's now understand our dataset
#show the distinct values in the results column
result_data = ins_df.select('results').distinct().show()
Здесь я проведу простой прогнозный анализ данных инспекции пищевых продуктов города Чикаго.
##converting the existing dataframe into a new dataframe ###each inspection is represented as a label-violations pair. ####Here 0.0 represents a failure, 1.0 represents a success, and -1.0 represents some results besides those two
def label_Results(s):
if s == 'Fail':
return 0.0
elif s == 'Pass with Conditions' or s == 'Pass':
return 1.0
else:
return -1.0
ins_label = UserDefinedFunction(label_Results, DoubleType())
labeled_Data = ins_df.select(ins_label(ins_df.results).alias('label'), ins_df.violations).where('label >= 0')
labeled_Data.take(1)
Здесь я проведу простой прогнозный анализ данных инспекции пищевых продуктов города Чикаго.
##Creating a logistic regression model from the input dataframe
tokenizer = Tokenizer(inputCol="violations", outputCol="words")
hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features")
lr = LogisticRegression(maxIter=10, regParam=0.01)
pipeline = Pipeline(stages=[tokenizer, hashingTF, lr])
model = pipeline.fit(labeled_Data)
## Evaluating with Test Data test_Data = sc.textFile('file:////home/edureka/Downloads/Food_Inspections_test.csv')\
.map(csvParse) \
.map(lambda l: (int(l[0]), l[1], l[12], l[13]))
test_df = spark.createDataFrame(test_Data, schema).where("results = 'Fail' OR results = 'Pass' OR results = 'Pass with Conditions'")
predict_Df = model.transform(test_df)
predict_Df.registerTempTable('Predictions')
predict_Df.columns
Здесь я проведу простой прогнозный анализ данных инспекции пищевых продуктов города Чикаго.
## Printing 1st row
predict_Df.take(1)
Здесь я проведу простой прогнозный анализ данных инспекции пищевых продуктов города Чикаго.
## Predicting the final result
numOfSuccess = predict_Df.where("""(prediction = 0 AND results = 'Fail') OR
(prediction = 1 AND (results = 'Pass' OR
results = 'Pass with Conditions'))""").count()
numOfInspections = predict_Df.count()
print "There were", numOfInspections, "inspections and there were", numOfSuccess, "successful predictions"
print "This is a", str((float(numOfSuccess) / float(numOfInspections)) * 100) + "%", "success rate"
Здесь я проведу простой прогнозный анализ данных инспекции пищевых продуктов города Чикаго.

На этом мы подходим к концу этого блога о программировании PySpark. Надеюсь, это помогло добавить некоторую ценность вашим знаниям.

Если вы нашли этот блог программирования PySpark интересным, вы можете продолжить и прочитать похожие блоги здесь.

У вас есть к нам вопрос? Пожалуйста, упомяните об этом в разделе комментариев, и мы свяжемся с вами.