Рубрики
Без рубрики

Как использовать Spark Clusters для параллельной обработки больших данных

Автор оригинала: FreeCodeCapm Team.

Hari Santanam.

Используйте устойчивый набор данных Apache Spark Spark (RDD) с databricks

Из-за физических ограничений индивидуальный компьютерный процессор в значительной степени достиг верхнего потолка для скорости с текущими конструкциями. Таким образом, аппаратные производители добавили больше процессоров на материнскую плату (параллельные ядра CPU, работают на одинаковой скорости).

Но … Большинство программных приложений, написанных за последние несколько десятилетий, не были написаны для параллельной обработки.

Кроме того, сбор данных в геометрической основе получил экспоненциально больше, из-за дешевых устройств, которые могут собирать конкретные данные (такие как температура, звук, скорость …).

Для более эффективного обрабатывания этих данных были более новые методы программирования.

Кластер вычислительных процессов аналогичен группе работников. Команда может работать лучше и эффективнее, чем один работник. Они бассейны ресурсы. Это означает, что они обмениваются информацией, разрушают задачи и собирают обновления и выходы, чтобы придумать один набор результатов.

Точно так же, как фермеры пошли от работы над одним полем для работы с комбайнами и тракторами для эффективного производства продуктов питания от более крупных и больших фермерских хозяйств, а сельскохозяйственные кооперативы облегчают обработку, кластер работает вместе для решения большего и более сложного сбора и обработки данных.

Кластерные вычисления и параллельная обработка были ответами, и сегодня у нас есть Spark apache Spach. Databricks – это унифицированная платформа аналитики, используемая для запуска вычислений Sparch Cluster простым и простом способом.

Что такое искра?

Apache Spark – это мотонасодержащая единая единая аналитика двигателя для больших данных и обучения машин. Первоначально он был разработан в UC Berkeley.

Искра быстро. Это использует преимущества вычислений в памяти и другие оптимизации. В настоящее время он содержит запись для крупномасштабной сортировки на диске.

Spark использует упругие распределенные наборы данных (RDD) для выполнения параллельной обработки в кластере или компьютерных процессорах.

Он имеет простой в использовании API для работы на больших наборах наборах, на различных языках программирования. Он также имеет API для преобразования данных и знакомых API-сигналов кадра данных для манипулирования полуструктурированными данными.

По сути, Spark использует Cluster Manager для координации работы через кластер компьютеров. Кластер – это группа компьютеров, которые соединены и координат друг с другом для обработки данных и вычислительной.

Зажигание, состоят из процессов и процессов и исполнителей водителя.

Кратко поставить, процесс драйвера запускает основную функцию, а анализировать и распределяет работу через исполнители. Изучатели фактически выполняют назначенные задачи – выполняемый код и отчетность к узлу драйвера.

В реальных приложениях в бизнесе и появлении программирования AI параллельная обработка становится необходимостью эффективности, скорости и сложности.

Отлично – так что такое Databricks?

Databricks – это унифицированная платформа аналитики, от создателей Apache Spark. Это облегчает запуск облачных оптимизированных искрных кластеров в минутах.

Подумайте об этом как о пакете All-in-One, чтобы написать свой код. Вы можете использовать Spark (не беспокоясь о базовых деталях) и производить результаты.

Он также включает в себя ноутбуки Jupyter, которые могут быть общими, а также предоставляя интеграцию GitHub, соединения со многими широко используемыми инструментами и мониторинга автоматизации, планирования и отладки. Смотри здесь для дополнительной информации.

Вы можете бесплатно подписаться с помощью сообщества. Это позволит вам играть с зажигающими кластерами. Другие преимущества, в зависимости от плана, включают в себя:

  • Получите кластеры и работают за считанные секунды как на экземплярах AWS и Azure CPU и процессора для максимальной гибкости.
  • Начните быстро с внутренней интеграцией Tensorflow, KERAS и их зависимостей на кластеры Databricks.

Давайте начнем. Если вы уже использовали Databricks ранее, пропустите вниз к следующей части. В противном случае вы можете зарегистрироваться здесь и выберите «Сообщество Edition», чтобы попробовать его бесплатно.

Следуйте указаниям там. Они ясны, кратко и легко:

  • Создать кластер
  • Прикрепите ноутбук к кластеру и запустите команды в ноутбуке в кластере
  • Манипулировать данными и создать график
  • Операции на API Python DataFrame; Создайте DataFrame из Databricks DataSet
  • Манипулировать результатами данных и отображения

Теперь, когда вы создали программу данных на кластере, давайте перейдем к другому набору данных, с большим количеством операций, чтобы вы могли иметь больше данных.

DataSet – это доклад о счастье мира 2017 года по стране, на основе различных факторов, таких как ВВП, щедрость, доверие, семья и другие. Поля и их описания перечислены дальше в статье.

Ранее я загрузил набор данных, затем переместил его в DBFS DataBricks (систему файлов databricks), просто перетаскивая и падая в окно в databricks.

Или вы можете нажать на данные из левой навигационной панели, нажмите «Добавить данные», затем перетащите или перезагрузите и добавьте.

# File location and type#this file was dragged and dropped into Databricks from stored #location; https://www.kaggle.com/unsdsn/world-happiness#2017.csv 
file_location = "/FileStore/tables/2017.csv"file_type = "csv"
# CSV options# The applied options are for CSV files. For other file types, these # will be ignored: Schema is inferred; first row is header - I # deleted header row in editor and intentionally left it 'false' to #contrast with later rdd parsing, #delimiter # separated, #file_location; if you don't delete header row, instead of reading #C0, C1, it would read "country", "dystopia" etc.infer_schema = "true"first_row_is_header = "false"delimiter = ","df = spark.read.format(file_type) \  .option("inferSchema", infer_schema) \  .option("header", first_row_is_header) \  .option("sep", delimiter) \  .load(file_location)
display(df)

Теперь давайте загрузим файл в эластичный распределенный наборный набор данных SPRASET (RDD). RDD выполняет параллельную обработку по кластеру или компьютерным процессорам и делает операции данных быстрее и эффективнее.

#load the file into Spark's Resilient Distributed Dataset(RDD)data_file = "/FileStore/tables/2017.csv"raw_rdd = sc.textFile(data_file).cache()#show the top 5 lines of the fileraw_rdd.take(5)

Обратите внимание на «Зажигание» ниже, чуть выше вывода. Нажмите на вид, чтобы увидеть подробности, как показано в окне встроенного вправо.

Databricks и Sparks имеют отличные визуализации процессов.

В Spark работа связана с цепочкой зависимостей RDD, организованных в прямом ациклическом графе (DAG). В даге ветви направлены с одного узла в другое, без спинки петли. Задачи отправляются в планировщик, который выполняет их с использованием трубопроводов для оптимизации работы и преобразовать на минимальные этапы.

Не волнуйтесь, если вышеуказанные предметы кажутся сложными. На определенном этапе возникают визуальные снимки процессов, для которых вы нажали кнопку просмотра SPRAL. Вы можете или не понадобиться этой информации – это там, если вы.

Записи RDD разделены запятыми, которые нам нужно разделить перед анализом и созданием данных DataFrame. Затем мы будем делать определенные столбцы из набора данных для использования.

#split RDD before parsing and building dataframecsv_rdd = raw_rdd.map(lambda row: row.split(","))#print 2 rowsprint(csv_rdd.take(2))#print typesprint(type(csv_rdd))print('potential # of columns: ', len(csv_rdd.take(1)[0]))
#use specific columns from dataset
from pyspark.sql import Row
parsed_rdd = csv_rdd.map(lambda r: Row(    country = r[0],   #country, position 1, type=string    happiness_rank = r[1],    happiness_score = r[2],    gdp_per_capita = r[5],    family = r[6],    health = r[7],    freedom = r[8],    generosity = r[9],    trust = r[10],    dystopia = r[11],    label = r[-1]    ))parsed_rdd.take(5)

Вот столбцы и определения для набора данных счастья:

Колонны и определения набора данных счастья

Страна – название страны.

Регион – регион Страна принадлежит.

Ранг счастья – звание страны на основе балла счастья.

Оценка счастья – метрическая, измеренная в 2015 году, задавая выборку людей на вопрос: «Как бы вы оценили ваше счастье по шкале от 0 до 10, где 10 является самым счастливым».

Экономика (ВВП на душу населения) – степень, в которой ВВП (валовой внутренний продукт) способствует расчету оценки счастья

Семья – степень, в которой семья способствует расчету оценки счастья

Здоровье – (ожидаемая продолжительность жизни) степень, в которой ожидаемая продолжительность жизни способствовала расчету оценки счастья

Свобода – степень, в которой свобода способствовала расчету оценки счастья.

Доверие – (правительственная коррупция) степень, в которой восприятие коррупции способствует оценке счастья.

Щедрость – степень, в которой щедрость способствовала расчету оценки счастья.

Остаток дистопии – степень, в которой находится дистопия остаток вклад в расчет место счастья или состояния, в котором все неприятное или плохое, как правило, тоталитарная или экологичность деградировала. Остаток – что осталось или осталось после того, как все остальное учитывается или забрано).

# Create a view or table
temp_table_name = "2017_csv"
df.createOrReplaceTempView(temp_table_name)
#build dataframe from RDD created earlierdf = sqlContext.createDataFrame(parsed_rdd)display(df.head(10)#view the dataframe's schemadf.printSchema()
#build temporary table to run SQL commands#table only alive for the session#table scoped to the cluster; highly optimizeddf.registerTempTable("happiness")#display happiness_score counts using dataframe syntaxdisplay(df.groupBy('happiness_score')          .count()          .orderBy('count', ascending=False)       )
df.registerTempTable("happiness")
#display happiness_score counts using dataframe syntaxdisplay(df.groupBy('happiness_score')          .count()          .orderBy('count', ascending=False)       )

Теперь давайте будем использовать SQL, чтобы запустить запрос, чтобы сделать то же самое. Цель состоит в том, чтобы показать вам разные способы обработки данных и сравнить методы.

#use SQL to run query to do same thing as previously done with dataframe (count by happiness_score)happ_query = sqlContext.sql("""                        SELECT happiness_score, count(*) as freq                        FROM happiness                        GROUP BY happiness_score                        ORDER BY 2 DESC                        """)display(happ_query)

Другой запрос SQL для практики нашей обработки данных:

#another sql queryhapp_stats = sqlContext.sql("""                            SELECT                              country,                              happiness_rank,                              dystopia                            FROM happiness                            WHERE happiness_rank > 20                            """)display(happ_stats)

Там! Вы сделали это – создали скопление с зажиганием и завершили процесс запроса на DataSet, используя этот кластер. Вы можете использовать это своими наборами набора данных для обработки и вывода ваших больших проектов данных.

Вы также можете играть с помощью диаграмм, щелкните значок диаграммы/графика в нижней части любого вывода, укажите значения и тип графика и посмотрите, что происходит. Это весело.

Код опубликован в ноутбуке здесь На Databricks Public Forum и будет доступен около 6 месяцев в соответствии с Databricks.

  • Для получения дополнительной информации об использовании искр с глубоким обучением, прочитайте Это отличная статья По Favio Vázquez.

Спасибо за прочтение! Я надеюсь, что у вас есть интересные программы с databricks и наслаждаются ею столько, сколько у меня есть. Пожалуйста, хлопайте, если вы нашли это интересно или полезно.

Полный список моих статей см. здесь Отказ