Рубрики
Без рубрики

Как создать конвейер данных для обогащения ваших данных с помощью анализа Pyspark и настроений

В этом уроке вы узнаете, как обогатить данные COVID19 твитов с положительным результатом. Y … Tagged с учебником, новичками, Python, Codenewbie.

В этом уроке вы узнаете, как обогатить данные COVID19 твитов с положительным результатом. Вы будете использовать Pyspark и когнитивные услуги и узнаете о дополненной аналитике.

🤯 О, подожди! Что такое дополненная аналитика?

По словам Гартнера Отчет , дополненная аналитика – это использование таких технологий, как машинное обучение и ИИ, для оказания помощи в подготовке данных и генерации понимания. Его главная цель – помочь большему количеству людей извлечь ценность из данных и генерировать понимание в простом, разговорном порядке. Для нашего примера мы извлекаем положительный результат из твита, чтобы помочь понять общее настроение к Covid-19. Да, это дополненная аналитика!

Что такое pyspark?

Pyspark – это структура, которую мы используем для работы с Apache Spark и Python Анкет Если вы хотите узнать больше о Spark, проверьте это Bitesize Series Анкет

Что такое анализ настроений?

Анализ настроений является частью использования NLP (обработка естественного языка), которое сочетает в себе аналитику текста, лингвистику вычислений и многое другое для систематического изучения аффективных состояний и субъективной информации, такой как твиты.

Проще говоря, понимайте чувство текста, если текст положительный, отрицательный или нейтральный. Это может подумать о человеке, который написал это, были ли они довольны ситуацией, когда написали твит?

В нашем примере мы увидим, как мы можем извлечь положительные результаты из текста твитов Covid-19, чтобы лучше понять чувства людей, пишущих эти твиты.

Я знаю, чувства, верно?! Это сложно!

ХОРОШО! Понял тебя!

В этом уроке вы собираетесь использовать Azure Cognitive Service , что дает нам возможности анализа настроений из коробки.

При работе с этим мы можем использовать Textanalyticsclient Клиентская библиотека или рычаг REST API Анкет Сегодня вы будете использовать API REST, поскольку он дает нам большую гибкость.

Предварительные условия

  • Apache Spark Environment с ноутбуками, это может быть DataBricks , или вы можете начать локальную среду с Docker, выполнив следующую команду: Docker Run -p 8888: 8888 Jupyter/pyspark -notebook
  • Azure Free Account
  • Скачать kaggle Данные твита Covid-19
  • Когнитивные услуги Бесплатная учетная запись (посмотрите на картинку ниже)

Пошаговый учебник – Полный конвейер данных:

В этом пошаговом учебном пособии вы узнаете, как загрузить данные с помощью Pyspark, создать пользователь определить функцию для подключения к API аналитики настроений, добавить данные настроения и сохранить все в файлы формата Parquet.

Теперь вам нужно извлечь загрузку данных в вашу среду Apache Spark, скорее, это DataBricks или ноутбук Jupyter Pyspark. Для DataBricks Используйте это, для Юпитер использовать это.

Для обоих случаев вам понадобится file_location Обязательно следите за этим.

Шаг 1: Загрузка данных с помощью pyspark

Вот как вы загружаете данные в объект DataFrame Pyspark, Spark попытается вывести схему непосредственно из CSV. Одна из вещей, которые вы заметите, заключается в том, что при работе с CSV и выводом схемы Spark часто относится к большинству столбцов как Строка формат.

inputDF = spark.read.\
format("com.databricks.spark.csv").\
option("header", "true").\
option("inferSchema", "true").load("/FileStore/tables/covid19_tweets.csv")

Шаг 2: Предоставьте более точную схему нашим данным:

Здесь вы определяете ожидаемый шлема а позже разыгрывает данные, чтобы соответствовать им. Вы будете использовать Structtype и Structfield которые являются датами данных Spark SQL, которые помогают вам определить схему.

с Column Функциональность создает новый DataFrame с желаемым столбцом в соответствии с именем и значением, с которым вы его предоставляете.

from pyspark.sql.types import *
from pyspark.sql.functions import *

# create expected schema
expectedSchema = StructType([
  StructField("user_name", StringType(), True),
  StructField("user_location", StringType(), True),
  StructField("user_description", StringType(), True),
  StructField("user_created", StringType(), True),
  StructField("user_followers", FloatType(), True),
  StructField("user_friends", FloatType(), True),
  StructField("user_favourites", FloatType(), True),
  StructField("user_verified", BooleanType(), True),
  StructField("date", StringType(), True),
  StructField("text", StringType(), True),
  StructField("hashtags", StringType(), True),
  StructField("source", StringType(), True),
  StructField("is_retweet", BooleanType(), True)])

Теперь давайте создадим свой новый DataFrame с правильной схемой!

Обратите внимание, что вы назначаете новую схему inputdf , что означает, что у вас больше не будет доступа к старому DataFrame.

# Set data types - cast the data in columns to match the schema

inputDF = inputDF \
  .withColumn("user_name", inputDF["user_name"].cast("string")) \
  .withColumn("user_location", inputDF["user_location"].cast("string")) \
  .withColumn("user_description", inputDF["user_description"].cast("string")) \
  .withColumn("user_created", inputDF["user_created"].cast("string")) \
  .withColumn("user_followers", inputDF["user_followers"].cast("float")) \
  .withColumn("user_friends", inputDF["user_friends"].cast("float")) \
  .withColumn("user_favourites", inputDF["user_favourites"].cast("float")) \
  .withColumn("user_verified", inputDF["user_verified"].cast("boolean")) \
  .withColumn("date", inputDF["date"].cast("string")) \
  .withColumn("text", inputDF["text"].cast("string")) \
withColumn("hashtags", inputDF["hashtags"].cast("string"))\
withColumn("source", inputDF["source"].cast("string")) \
  .withColumn("is_retweet", inputDF["is_retweet"].cast("boolean")) \

Шаг 3: Подключитесь к анализу настроений с API REST

Для подключения и потребления услуг анализа настроений нам необходимо предоставить конечную точку анализа настроений и ключ доступа. Оба могут быть найдены в.

Поиск конечной точки, это может быть из раздела обзора или из ключей и конечных точек.

Поиск ключа доступа:

Найдя ключ и конечную точку, для производства и работы в команде, вам необходимо хранить их в каком -то безопасном месте, попробуйте обеспечить сохраненные ключи в бесплатном тексту в коде, это не безопасно. Вы можете получить хакеры, добывая свою облачную среду для биткойнов.

Для данных DataBricks вы можете использовать dbutils.secrets функциональность. Это как Установите это Анкет

Если вы работаете локально с ноутбуком Juypter Pyspark, вы можете использовать простой текст, но не забудьте удалить его, когда вы совершаете свой код в репозиторе GIT.

Это как работать с dbutils, предоставляя ему объем и ключевое имя.

В этом фрагменте кода применение названа – mle2ebigdatakv, а имя для ключа – undimentendpoint и SentimentAccesskeys .

# provide endpoint and key 
sentimentEndpoint = dbutils.secrets.get(scope="mle2ebigdatakv", key="sentimentEndpoint")
sentimentAccessKeys = dbutils.secrets.get(scope="mle2ebigdatakv", key="sentimentAccessKeys")

Давайте создадим сами соединения, анализ настроений ожидает получить документ, подобный объекту, для этого вы будете работать с Python Dictionary и построите запрос DOC с ID. Идентификатор должен быть уникальным для каждого запроса.

Заметьте здесь ranguage_api_url Переменная, именно здесь вы строите запрос на когнитивный анализ, просят настроение анализа текста с версией 3.0.

import requests

# build the rest API request with language_api_url
language_api_url = sentimentEndpoint + "/text/analytics/v3.0/sentiment"
headers = {"Ocp-Apim-Subscription-Key": sentimentAccessKeys}
def constractDocRequest(text):
  docRequest = {}
  doc = {}
  doc["id"]= text
  doc["text"]= text
  docRequest["documents"] = [doc]
  return docRequest

Попробуйте запустить его с некоторым текстом, вы увидите, что ответ согласуется с мнением оценки для Положительный, нейтральный и отрицательный Анкет

Так структурирован ответ:

{
    "documents": [
        {
            "id": "1",
            "sentiment": "positive",
            "confidenceScores": {
                "positive": 1.0,
                "neutral": 0.0,
                "negative": 0.0
            },
            "sentences": [
                {
                    "sentiment": "positive",
                    "confidenceScores": {
                        "positive": 1.0,
                        "neutral": 0.0,
                        "negative": 0.0
                    },
                    "offset": 0,
                    "length": 66,
                    "text": "covid19 is not scary at all, it's actually an opportunity to thrive"
                }
            ],
            "warnings": []
        }
    ],
    "errors": [],
    "modelVersion": "2020-04-01"
}

Давайте создадим функциональность Python, чтобы извлечь настроения и зарегистрировать функцию с помощью PYSPARK через API UDF (определенная пользовательская функция).

Вы должны убедиться, что вы на самом деле получаете документ от API REST, а также защищали свои функции от отправки пустого текста в службу анализа настроений, поскольку он будет разрешаться в ошибке.

Вот как вы все соединяете вместе:

from pyspark.sql.functions import UDF

# extract the sentiment out of the returned json doc
def extractSentiment(doc,sentimentType):
  if doc == {} or not 'documents' in doc:
    return 0.0
  return float(doc['documents'][0]['confidenceScores'][sentimentType])
#function for extracting the positive sentiment 
def getPositiveSentiment(text):
  if bool(text.strip()) == False:
    return 0.0
  positive = extractSentiment(constructDocRequest(text),'positive')
  return positive
# creating the udf function pointer
get_positive_sentiment = udf(getPositiveSentiment, StringType())
# create a new DF with new column represetning positive sentiment score        
enrichedDF_positiveSentiment = inputDF.withColumn('positive_sentiment', get_positive_sentiment(inputStream["text"])) 

После обогащения ваших данных важно сохранить их в хранилище для будущих потребностей, вы сохраните их в формате Parquet, который сохранит схему нетронутой. Формат Parquet Apache предназначен для обеспечения эффективного столбчатого хранилища данных по сравнению с файлами на основе строк, такими как CSV, поскольку он позволяет лучше сжать и более быстрое сканирование подмножества столбцов в дальнейшем. VS CSV, где мы должны прочитать весь файл и столбцы, чтобы запросить только их подмножество.

Вот как это делается с pyspark:

# Stream processed data to parquet for the Data Science to explore and build ML models

enrichedDF_poisitveSentiment.write \
  .format("parquet") \
  .save("/DATALAKE/COVID19_TWEETS/REFINED/WITH_SENTIMENT/")

Вы можете определить имя и структуру ваших данных, но убедитесь, что эти данные теперь содержали новый столбец с положительным настроением.

Эти данные могут позже использоваться в различных инструментах визуализации данных или для исследователей.

Шаг 4:

Там нет шага 4, вы закончили !!!

🐞 А как насчет ошибок?

Давайте посмотрим на это исключение JSON:

{'error': {'code': 'InvalidRequest',           
           'innererror': {'code': 'EmptyRequest',                                                           'message': 'Request body must be present.'
                         },         
           'message': 'Invalid Request.'}
}

Оплатить уведомление Сообщение об ошибке Когда вы видите такую ошибку, возможно, что вы не подвергаетесь цитате с когнитивными службами. Если вы узнаете о сервисе и пробуете его, лучше использовать несколько образцов, а не целых наборов данных, так как у вас может закончиться квота на свободном уровне, поскольку это хорошо для транзакций до 5K.

Этот учебник провел вас через то, как использовать существующие услуги API REST для обогащения ваших данных для будущей работы и дополненной аналитики.

Чтобы узнать больше, ознакомьтесь с GitHub Repo: COVID-19-E2E-BIG-DATA-ML-SYSTEM Анкет

Рад ответить на ваши вопросы и следить за вами в Adi Polak – Data и AI @ Twitter 🐦.

Оригинал: “https://dev.to/adipolak/how-to-build-a-data-pipeline-to-enrich-your-data-with-pyspark-and-sentiment-analysis-3ad6”