Рубрики
Без рубрики

Создание услуг рекомендации фильма с Apache Spark & Flask – часть 2

Этот урок Spark Apache входит в подробности о том, как использовать модели обучения Spark Machine, или даже еще один вид объектов аналитики данных, в веб-сервисе. Используя язык Python, мы сделаем эту задачу очень простой благодаря собственным возможностям Python, и на основе Python на основе Python, таких как колба.

Автор оригинала: Jose A Dianes.

Этот урок SPACK Apache входит в подробности в том, как использовать модели обучения Spark Machine, или даже еще один вид объектов аналитики данных, в веб-сервисе. Это открывает дверь в онлайн-прогнозы, рекомендации и т. Д., Используя язык Python, мы сделаем эту задачу очень простой благодаря собственным возможностям Python, а также к фреймам на основе Python, таких как колба.

Читайте часть 1 этого Учебника Apache Spark здесь Отказ Большая часть кода в этой первой части, о том, как использовать ALS с набором данных Public Movielens, исходит от моего решения одним из упражнений, предложенных в CS100.1X Введение в большие данные с Apache Sparch от Энтони Д. Джозефа на EDX , это тоже Публично доступно с 2014 года на Spark Summit Отказ Там я добавил с незначительными модификациями для использования более крупного набора данных, а также код о том, как хранить и перезагрузить модель для последующего использования.

Эта первая часть мощно сочетается в себе с этим вторым. Делая это, вы сможете разработать полный Сервис рекомендации в режиме онлайн Отказ

Наш полный веб-сервис содержит три файла Python:

  • Engine.py Определяет механизм рекомендации, упаковка внутри всех вычислений, связанных с искрами.
  • app.py это веб-приложение Flask, которое определяет API для отдыха вокруг двигателя.
  • server.py Инициализы A Cherrypy Веб-сервер После создания Spark Context и веб-приложения Flask, используя предыдущий.

Но давайте подробно объясним каждый из них вместе с особенностями развертывания такой системы, использующей искру в качестве вычисленного двигателя. Мы поставим акцент на том, как использовать модельную модель в веб-контексте, с которым мы имеем дело. Для объяснения данных movielens и как построить модель, используя искру, взгляните на руководство по созданию модели.

Весь код для этого учебника доступен в Github Repo Отказ Есть также Репо, объясняя многие концепции, связанные с искром Отказ Иди туда и сделай их своими.

Рекомендации двигателя

В самом сердцем нашей Рекомендации фильма веб-сервис проживает Рекомендацию (I.E. Engine.py в нашем окончательном развертывании). Он представлен классом Рекомендации медленно И в этом разделе описано шаг за шагом, как его функциональность и реализация.

Запуск двигателя

Когда двигатель инициализируется, нам нужно впервые генерировать модель ALS. При желании (мы не будем делать здесь), мы могли бы загрузить ранее сохраняемую модель, чтобы использовать его для рекомендаций. Более того, нам может потребоваться загрузить или предвкушать любые RDDS, которые будут использоваться позже, чтобы сделать рекомендации.

Мы будем делать такие вещи в __init__ Метод нашего Рекомендации медленно класс (используя два частных метода). В этом случае мы не сохраним время. Мы повторим весь процесс каждый раз, когда двигатель он создает.

import os
from pyspark.mllib.recommendation import ALS
 
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)



class RecommendationEngine:
    """A movie recommendation engine
    """
 
    def __count_and_average_ratings(self):
        """Updates the movies ratings counts from 
        the current data self.ratings_RDD
        """
        logger.info("Counting movie ratings...")
        movie_ID_with_ratings_RDD = self.ratings_RDD.map(lambda x: (x[1], x[2])).groupByKey()
        movie_ID_with_avg_ratings_RDD = movie_ID_with_ratings_RDD.map(get_counts_and_averages)
        self.movies_rating_counts_RDD = movie_ID_with_avg_ratings_RDD.map(lambda x: (x[0], x[1][0]))
 
 
    def __train_model(self):
        """Train the ALS model with the current dataset
        """
        logger.info("Training the ALS model...")
        self.model = ALS.train(self.ratings_RDD, self.rank, seed=self.seed,
                               iterations=self.iterations, lambda_=self.regularization_parameter)
        logger.info("ALS model built!")
 
 
    def __init__(self, sc, dataset_path):
        """Init the recommendation engine given a Spark context and a dataset path
        """
 
        logger.info("Starting up the Recommendation Engine: ")
 
        self.sc = sc
 
        # Load ratings data for later use
        logger.info("Loading Ratings data...")
        ratings_file_path = os.path.join(dataset_path, 'ratings.csv')
        ratings_raw_RDD = self.sc.textFile(ratings_file_path)
        ratings_raw_data_header = ratings_raw_RDD.take(1)[0]
        self.ratings_RDD = ratings_raw_RDD.filter(lambda line: line!=ratings_raw_data_header)\
            .map(lambda line: line.split(",")).map(lambda tokens: (int(tokens[0]),int(tokens[1]),float(tokens[2]))).cache()
        # Load movies data for later use
        logger.info("Loading Movies data...")
        movies_file_path = os.path.join(dataset_path, 'movies.csv')
        movies_raw_RDD = self.sc.textFile(movies_file_path)
        movies_raw_data_header = movies_raw_RDD.take(1)[0]
        self.movies_RDD = movies_raw_RDD.filter(lambda line: line!=movies_raw_data_header)\
            .map(lambda line: line.split(",")).map(lambda tokens: (int(tokens[0]),tokens[1],tokens[2])).cache()
        self.movies_titles_RDD = self.movies_RDD.map(lambda x: (int(x[0]),x[1])).cache()
        # Pre-calculate movies ratings counts
        self.__count_and_average_ratings()
 
        # Train the model
        self.rank = 8
        self.seed = 5L
        self.iterations = 10
        self.regularization_parameter = 0.1
        self.__train_model() 

Вся форма кода __init__ И два частных метода были объяснены в руководстве по поводу создания модели.

Добавление новых рейтингов

При использовании Совместная фильтрация и искра Чередующиеся наименьшие квадраты Нам нужно пересчитать модель прогнозирования для каждой новой партии пользовательских рейтингов. Это было объяснено в нашем предыдущем руководстве по созданию модели.

def add_ratings(self, ratings):
    """Add additional movie ratings in the format (user_id, movie_id, rating)
    """
    # Convert ratings to an RDD
    new_ratings_RDD = self.sc.parallelize(ratings)
    # Add new ratings to the existing ones
    self.ratings_RDD = self.ratings_RDD.union(new_ratings_RDD)
    # Re-compute movie ratings count
    self.__count_and_average_ratings()
    # Re-train the ALS model with the new ratings
    self.__train_model()

    return ratings

# Attach the function to a class method
RecommendationEngine.add_ratings = add_ratings

Рекомендации

Мы также объяснили, как сделать рекомендации с помощью нашей модели ALS в руководстве по поводу создания рекомендателя фильма. Здесь мы в основном повторяем эквивалентный код, завернутый в метод нашего РекомендацииНинер Класс и использование частного метода, который будет использоваться для каждого метода прогнозирования.

def __predict_ratings(self, user_and_movie_RDD):
    """Gets predictions for a given (userID, movieID) formatted RDD
    Returns: an RDD with format (movieTitle, movieRating, numRatings)
    """
    predicted_RDD = self.model.predictAll(user_and_movie_RDD)
    predicted_rating_RDD = predicted_RDD.map(lambda x: (x.product, x.rating))
    predicted_rating_title_and_count_RDD = \
        predicted_rating_RDD.join(self.movies_titles_RDD).join(self.movies_rating_counts_RDD)
    predicted_rating_title_and_count_RDD = \
        predicted_rating_title_and_count_RDD.map(lambda r: (r[1][0][1], r[1][0][0], r[1][1]))

    return predicted_rating_title_and_count_RDD
    
def get_top_ratings(self, user_id, movies_count):
    """Recommends up to movies_count top unrated movies to user_id
    """
    # Get pairs of (userID, movieID) for user_id unrated movies
    user_unrated_movies_RDD = self.movies_RDD.filter(lambda rating: not rating[1]==user_id).map(lambda x: (user_id, x[0]))
    # Get predicted ratings
    ratings = self.__predict_ratings(user_unrated_movies_RDD).filter(lambda r: r[2]>=25).takeOrdered(movies_count, key=lambda x: -x[1])

    return ratings

# Attach the functions to class methods
RecommendationEngine.__predict_ratings = __predict_ratings
RecommendationEngine.get_top_ratings = get_top_ratings

Разделение формы получения главных безупрежных фильмов, мы также захочем получить рейтинги к определенным фильмам. Мы сделаем это с новым методом в нашем Рекомендательный двигатель Отказ

def get_ratings_for_movie_ids(self, user_id, movie_ids):
    """Given a user_id and a list of movie_ids, predict ratings for them 
    """
    requested_movies_RDD = self.sc.parallelize(movie_ids).map(lambda x: (user_id, x))
    # Get predicted ratings
    ratings = self.__predict_ratings(requested_movies_RDD).collect()

    return ratings

# Attach the function to a class method
RecommendationEngine.get_ratings_for_movie_ids = get_ratings_for_movie_ids

Создание веб-API вокруг нашего двигателя с помощью колбы

Колбу это веб-микрозаменение для Python. Очень легко запустить веб-API, просто импортируя в наш скрипт и используя некоторые аннотации, чтобы связать наш сервисный конечные точки с функциями Python. В нашем случае мы будем обернуть наше Рекомендации медленно Методы вокруг некоторых из этих конечных точек и обмена JSON Отформатированные данные с веб-клиентом.

На самом деле так просто, что мы покажем весь app.py Здесь вместо того, чтобы идти по частям.

from flask import Blueprint
main = Blueprint('main', __name__)
 
import json
from engine import RecommendationEngine
 
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
 
from flask import Flask, request
 
@main.route("//ratings/top/", methods=["GET"])
def top_ratings(user_id, count):
    logger.debug("User %s TOP ratings requested", user_id)
    top_ratings = recommendation_engine.get_top_ratings(user_id,count)
    return json.dumps(top_ratings)
 
@main.route("//ratings/", methods=["GET"])
def movie_ratings(user_id, movie_id):
    logger.debug("User %s rating requested for movie %s", user_id, movie_id)
    ratings = recommendation_engine.get_ratings_for_movie_ids(user_id, [movie_id])
    return json.dumps(ratings)
 
 
@main.route("//ratings", methods = ["POST"])
def add_ratings(user_id):
    # get the ratings from the Flask POST request object
    ratings_list = request.form.keys()[0].strip().split("\n")
    ratings_list = map(lambda x: x.split(","), ratings_list)
    # create a list with the format required by the negine (user_id, movie_id, rating)
    ratings = map(lambda x: (user_id, int(x[0]), float(x[1])), ratings_list)
    # add them to the model using then engine API
    recommendation_engine.add_ratings(ratings)
 
    return json.dumps(ratings)
 
 
def create_app(spark_context, dataset_path):
    global recommendation_engine 
 
    recommendation_engine = RecommendationEngine(spark_context, dataset_path)    
    
    app = Flask(__name__)
    app.register_blueprint(main)
    return app

В основном мы используем приложение следующим образом:

  • Мы инициализируем, что при звонке create_app Отказ Здесь Рекомендации медленно Объект создан, а затем мы связываем @ main.ruoute Аннотации, определенные выше. Каждая аннотация определяется (см. Flask Docs ):
  • Маршрут, это его URL и может содержать параметры между <>. Они отображаются с аргументами функции.
  • Список доступных HTTP-методов.
  • Есть три из этих аннотаций, определенные, которые соответствуют трем Рекомендации медленно Методы:

    • Получить//ratings/top Получите лучшие рекомендации от двигателя.
    • Получить//ratings Получите прогнозируемый рейтинг для индивидуального фильма.
    • Post//ratings Добавьте новые рейтинги. Формат представляет собой серию строк (заканчивая новой сепаратором) с movie_id и Рейтинг разделенных запятыми. Например, следующий файл соответствует десяти новых рейтингах пользователя, используемых в качестве примера в руководстве по созданию модели:
260,9  
1,8  
16,7  
25,8  
32,9  
335,4  
379,3  
296,7  
858,10  
50,8  

Развертывание сервера WSGI с помощью Cherrypy

Среди прочего, Cherrypy Framework Особенности надежного, HTTP/1.1, совместимый WebServer WebServer WebServer WSGI. Также легко запустить несколько HTTP-серверов (например, на нескольких портах) одновременно. Все это делает его идеальным кандидатом для простого развертывания производственного веб-сервера для нашей онлайн-рекомендации.

Используйте, что мы сделаем из сервера Cherrypy, относительно простой. Снова мы покажем здесь полный server.py Скрипт, а затем объяснить это немного.

import time, sys, cherrypy, os
from paste.translogger import TransLogger
from app import create_app
from pyspark import SparkContext, SparkConf
 
def init_spark_context():
    # load spark context
    conf = SparkConf().setAppName("movie_recommendation-server")
    # IMPORTANT: pass aditional Python modules to each worker
    sc = SparkContext(conf=conf, pyFiles=['engine.py', 'app.py'])
 
    return sc
 
 
def run_server(app):
 
    # Enable WSGI access logging via Paste
    app_logged = TransLogger(app)
 
    # Mount the WSGI callable object (app) on the root directory
    cherrypy.tree.graft(app_logged, '/')
 
    # Set the configuration of the web server
    cherrypy.config.update({
        'engine.autoreload.on': True,
        'log.screen': True,
        'server.socket_port': 5432,
        'server.socket_host': '0.0.0.0'
    })
 
    # Start the CherryPy WSGI web server
    cherrypy.engine.start()
    cherrypy.engine.block()
 
 
if __name__ == "__main__":
    # Init spark context and load libraries
    sc = init_spark_context()
    dataset_path = os.path.join('datasets', 'ml-latest')
    app = create_app(sc, dataset_path)
 
    # start web server
    run_server(app)

Это довольно стандартное использование Cherrypy Отказ Если мы посмотрим на __main__ Точка входа, мы делаем три вещи:

  • Создайте контекст искры, как определено в функции init_spark_context прохождение адированных модулей Python там.
  • Создайте приложение Flask, вызывая create_app Мы определены в app.py Отказ
  • Запустите сам сервер.

Смотрите следующий раздел о запуске сервера.

Запуск сервера с искрой

Для того, чтобы сервер запустился при удалении доступа к контексту искры и кластера, нам нужно отправить server.py файл на Pyspark Используя Зажигание Отправить Отказ Разные параметры при использовании этой команды лучше объяснены в Искра Docummentaion. В нашем случае мы будем использовать что-то вроде следующего: ~/Spark-1.3.1-bin-hadoop2.6/bin/spark-ade - master spark://169.254.206.2: 7077 --total-Executor-Cors 14 - Dexecutor-Memory 6G Server.py Важные биты:

  • Использовать Зажигание Отправить И не Pyspark напрямую.
  • - мастер Параметры должны указывать на настройку вашей Spark Cluster (может быть локальным).
  • Вы можете пройти дополнительные параметры конфигурации, такие как - Тотальный исполнитель-сердечники и --Executor-Memory.

Вы увидите вывод, как следующее:

INFO:engine:Starting up the Recommendation Engine: 
INFO:engine:Loading Ratings data...
INFO:engine:Loading Movies data...
INFO:engine:Counting movie ratings...
INFO:engine:Training the ALS model...
       ... More Spark and CherryPy logging
INFO:engine:ALS model built!                                                                                                 
[05/Jul/2015:14:06:29] ENGINE Bus STARTING
[05/Jul/2015:14:06:29] ENGINE Started monitor thread 'Autoreloader'.
[05/Jul/2015:14:06:29] ENGINE Started monitor thread '_TimeoutMonitor'.
[05/Jul/2015:14:06:29] ENGINE Serving on http://0.0.0.0:5432
[05/Jul/2015:14:06:29] ENGINE Bus STARTED

Некоторые соображения при использовании нескольких сценариев и Spark-Sead

Есть два вопроса, которые нам нужно работать при использовании искры в развертывании, как это. Первый в том, что искровой кластер является дистрибьютой средой Рабочие Оршестрированный из искры Мастер где запущен сценарий Python. Это означает, что мастер является единственным с доступом к представленному скрипту и локальным дополнительным файлам. Если бы мы хотим, чтобы рабочие могли получить доступ к дополнительным импортированным моем Python, они либо должны быть частью нашего Python Distributubutuon, либо нам нужен неявно. Мы делаем это, используя pyfiles = ['Engine.py', 'app.py'] Параметр при создании SparkContext объект.

Второй вопрос связан с предыдущим, но немного сложнее. В Spark, при использовании преобразований (например map на RDD) мы не можем ссылаться на другие RDDS или объекты, которые не доступны глобально в контексте выполнения. Например, мы не можем ссылаться на переменные экземпляра класса. Из-за этого мы определили все функции, которые передаются в преобразования RDD за пределами Рекомендации класс.

Попробуя сервис

Теперь давайте попробуем службу попробую, используя те же данные, которые мы использовали на руководстве по созданию модели. То есть сначала мы собираемся добавить рейтинги, а затем мы собираемся получить лучшие рейтинги и отдельные рейтинги.

Размещение новых рейтингов

Итак, во-первых, во-первых, нам нужно пройти наш сервис, как объяснено в предыдущем разделе. Когда-то работает, мы будем использовать Curl размещать новые рейтинги из оболочки. Если у нас есть файл user_ratings.file (см. Получение исходного кода ниже) В текущей папке просто выполните следующую команду: Curl --data-binare @ user_ratings.file http://: 5432/0/itings Замена С IP-адресом, в котором у вас работает сервер (например, localhost или 127.0.0.1 Если его работает локально). Эта команда начнет несколько вычислений и в конечном итоге с выходом, представляющим рейтинги, которые были представлены в виде списка списков (это просто для проверки того, что процесс был успешным).

В окне «Вывод сервера» вы увидите фактические вывод вычисления искры, вместе с выходными сообщениями Cherrypy о HTTP-запросах.

Получить лучшие рекомендации

Этот вы можете сделать это, используя также скручивание, но рендеринг JSON будет лучше, если вы просто используете свой веб-браузер. Просто пойти в http://: 5432/0/Ratings/Top/10 Чтобы получить лучшие 10 рекомендаций фильмов для пользователей, которые мы размещены рекомендации. Результаты должны соответствовать тем, кто видел в руководстве по поводу создания модели.

Получение индивидуальных рейтингов

Точно так же мы можем скрутить/перейти к http://: 5432/0/Ratings/500 Чтобы получить прогнозируемый рейтинг для фильма Викторина (1994) Отказ

Получение исходного кода

Исходный код для трех файлов Python вместе с дополнительными файлами, которые составляют наш веб-сервис, можно найти в следующие цики Отказ

Выводы

В этом уроке мы построили спокойную API вокруг механизма для искровой рекомендации с использованием микро-каркасных фреймворков колба. Мы также связали API на производственный веб-сервер, такой как предоставленный Cherrypy Framework. Делая так, что у нас есть Масштабируемое Сервис Мы можем использовать для получения персонализированных рекомендаций фильмов для нескольких веб-приложений.

Более того, подход и технологии, описанные здесь, могут использоваться аналогичным образом с другими зажимающими моделями. Например, мы могли бы подключить услугу с моделью обучения машины Mllib для классификации или прогнозирования или с помощью светильника для анализа данных в реальном времени.

То есть искра обеспечивает масштабируемую аналитику данных и, используя его из Python, мы открываем дверь к использованию легких веб-каркасов, таких как колба и Creberpy, которые являются выразительными, мощными и очень простыми в использовании и развертывании.