Автор оригинала: Jose A Dianes.
Этот урок SPACK Apache входит в подробности в том, как использовать модели обучения Spark Machine, или даже еще один вид объектов аналитики данных, в веб-сервисе. Это открывает дверь в онлайн-прогнозы, рекомендации и т. Д., Используя язык Python, мы сделаем эту задачу очень простой благодаря собственным возможностям Python, а также к фреймам на основе Python, таких как колба.
Читайте часть 1 этого Учебника Apache Spark здесь Отказ Большая часть кода в этой первой части, о том, как использовать ALS с набором данных Public Movielens, исходит от моего решения одним из упражнений, предложенных в CS100.1X Введение в большие данные с Apache Sparch от Энтони Д. Джозефа на EDX , это тоже Публично доступно с 2014 года на Spark Summit Отказ Там я добавил с незначительными модификациями для использования более крупного набора данных, а также код о том, как хранить и перезагрузить модель для последующего использования.
Эта первая часть мощно сочетается в себе с этим вторым. Делая это, вы сможете разработать полный Сервис рекомендации в режиме онлайн Отказ
Наш полный веб-сервис содержит три файла Python:
Engine.pyОпределяет механизм рекомендации, упаковка внутри всех вычислений, связанных с искрами.app.pyэто веб-приложение Flask, которое определяет API для отдыха вокруг двигателя.server.pyИнициализы A Cherrypy Веб-сервер После создания Spark Context и веб-приложения Flask, используя предыдущий.
Но давайте подробно объясним каждый из них вместе с особенностями развертывания такой системы, использующей искру в качестве вычисленного двигателя. Мы поставим акцент на том, как использовать модельную модель в веб-контексте, с которым мы имеем дело. Для объяснения данных movielens и как построить модель, используя искру, взгляните на руководство по созданию модели.
Весь код для этого учебника доступен в Github Repo Отказ Есть также Репо, объясняя многие концепции, связанные с искром Отказ Иди туда и сделай их своими.
Рекомендации двигателя
В самом сердцем нашей Рекомендации фильма веб-сервис проживает Рекомендацию (I.E. Engine.py в нашем окончательном развертывании). Он представлен классом Рекомендации медленно И в этом разделе описано шаг за шагом, как его функциональность и реализация.
Запуск двигателя
Когда двигатель инициализируется, нам нужно впервые генерировать модель ALS. При желании (мы не будем делать здесь), мы могли бы загрузить ранее сохраняемую модель, чтобы использовать его для рекомендаций. Более того, нам может потребоваться загрузить или предвкушать любые RDDS, которые будут использоваться позже, чтобы сделать рекомендации.
Мы будем делать такие вещи в __init__ Метод нашего Рекомендации медленно класс (используя два частных метода). В этом случае мы не сохраним время. Мы повторим весь процесс каждый раз, когда двигатель он создает.
import os
from pyspark.mllib.recommendation import ALS
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class RecommendationEngine:
"""A movie recommendation engine
"""
def __count_and_average_ratings(self):
"""Updates the movies ratings counts from
the current data self.ratings_RDD
"""
logger.info("Counting movie ratings...")
movie_ID_with_ratings_RDD = self.ratings_RDD.map(lambda x: (x[1], x[2])).groupByKey()
movie_ID_with_avg_ratings_RDD = movie_ID_with_ratings_RDD.map(get_counts_and_averages)
self.movies_rating_counts_RDD = movie_ID_with_avg_ratings_RDD.map(lambda x: (x[0], x[1][0]))
def __train_model(self):
"""Train the ALS model with the current dataset
"""
logger.info("Training the ALS model...")
self.model = ALS.train(self.ratings_RDD, self.rank, seed=self.seed,
iterations=self.iterations, lambda_=self.regularization_parameter)
logger.info("ALS model built!")
def __init__(self, sc, dataset_path):
"""Init the recommendation engine given a Spark context and a dataset path
"""
logger.info("Starting up the Recommendation Engine: ")
self.sc = sc
# Load ratings data for later use
logger.info("Loading Ratings data...")
ratings_file_path = os.path.join(dataset_path, 'ratings.csv')
ratings_raw_RDD = self.sc.textFile(ratings_file_path)
ratings_raw_data_header = ratings_raw_RDD.take(1)[0]
self.ratings_RDD = ratings_raw_RDD.filter(lambda line: line!=ratings_raw_data_header)\
.map(lambda line: line.split(",")).map(lambda tokens: (int(tokens[0]),int(tokens[1]),float(tokens[2]))).cache()
# Load movies data for later use
logger.info("Loading Movies data...")
movies_file_path = os.path.join(dataset_path, 'movies.csv')
movies_raw_RDD = self.sc.textFile(movies_file_path)
movies_raw_data_header = movies_raw_RDD.take(1)[0]
self.movies_RDD = movies_raw_RDD.filter(lambda line: line!=movies_raw_data_header)\
.map(lambda line: line.split(",")).map(lambda tokens: (int(tokens[0]),tokens[1],tokens[2])).cache()
self.movies_titles_RDD = self.movies_RDD.map(lambda x: (int(x[0]),x[1])).cache()
# Pre-calculate movies ratings counts
self.__count_and_average_ratings()
# Train the model
self.rank = 8
self.seed = 5L
self.iterations = 10
self.regularization_parameter = 0.1
self.__train_model()
Вся форма кода __init__ И два частных метода были объяснены в руководстве по поводу создания модели.
Добавление новых рейтингов
При использовании Совместная фильтрация и искра Чередующиеся наименьшие квадраты Нам нужно пересчитать модель прогнозирования для каждой новой партии пользовательских рейтингов. Это было объяснено в нашем предыдущем руководстве по созданию модели.
def add_ratings(self, ratings):
"""Add additional movie ratings in the format (user_id, movie_id, rating)
"""
# Convert ratings to an RDD
new_ratings_RDD = self.sc.parallelize(ratings)
# Add new ratings to the existing ones
self.ratings_RDD = self.ratings_RDD.union(new_ratings_RDD)
# Re-compute movie ratings count
self.__count_and_average_ratings()
# Re-train the ALS model with the new ratings
self.__train_model()
return ratings
# Attach the function to a class method
RecommendationEngine.add_ratings = add_ratings
Рекомендации
Мы также объяснили, как сделать рекомендации с помощью нашей модели ALS в руководстве по поводу создания рекомендателя фильма. Здесь мы в основном повторяем эквивалентный код, завернутый в метод нашего РекомендацииНинер Класс и использование частного метода, который будет использоваться для каждого метода прогнозирования.
def __predict_ratings(self, user_and_movie_RDD):
"""Gets predictions for a given (userID, movieID) formatted RDD
Returns: an RDD with format (movieTitle, movieRating, numRatings)
"""
predicted_RDD = self.model.predictAll(user_and_movie_RDD)
predicted_rating_RDD = predicted_RDD.map(lambda x: (x.product, x.rating))
predicted_rating_title_and_count_RDD = \
predicted_rating_RDD.join(self.movies_titles_RDD).join(self.movies_rating_counts_RDD)
predicted_rating_title_and_count_RDD = \
predicted_rating_title_and_count_RDD.map(lambda r: (r[1][0][1], r[1][0][0], r[1][1]))
return predicted_rating_title_and_count_RDD
def get_top_ratings(self, user_id, movies_count):
"""Recommends up to movies_count top unrated movies to user_id
"""
# Get pairs of (userID, movieID) for user_id unrated movies
user_unrated_movies_RDD = self.movies_RDD.filter(lambda rating: not rating[1]==user_id).map(lambda x: (user_id, x[0]))
# Get predicted ratings
ratings = self.__predict_ratings(user_unrated_movies_RDD).filter(lambda r: r[2]>=25).takeOrdered(movies_count, key=lambda x: -x[1])
return ratings
# Attach the functions to class methods
RecommendationEngine.__predict_ratings = __predict_ratings
RecommendationEngine.get_top_ratings = get_top_ratings
Разделение формы получения главных безупрежных фильмов, мы также захочем получить рейтинги к определенным фильмам. Мы сделаем это с новым методом в нашем Рекомендательный двигатель Отказ
def get_ratings_for_movie_ids(self, user_id, movie_ids):
"""Given a user_id and a list of movie_ids, predict ratings for them
"""
requested_movies_RDD = self.sc.parallelize(movie_ids).map(lambda x: (user_id, x))
# Get predicted ratings
ratings = self.__predict_ratings(requested_movies_RDD).collect()
return ratings
# Attach the function to a class method
RecommendationEngine.get_ratings_for_movie_ids = get_ratings_for_movie_ids
Создание веб-API вокруг нашего двигателя с помощью колбы
Колбу это веб-микрозаменение для Python. Очень легко запустить веб-API, просто импортируя в наш скрипт и используя некоторые аннотации, чтобы связать наш сервисный конечные точки с функциями Python. В нашем случае мы будем обернуть наше Рекомендации медленно Методы вокруг некоторых из этих конечных точек и обмена JSON Отформатированные данные с веб-клиентом.
На самом деле так просто, что мы покажем весь app.py Здесь вместо того, чтобы идти по частям.
from flask import Blueprint
main = Blueprint('main', __name__)
import json
from engine import RecommendationEngine
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
from flask import Flask, request
@main.route("//ratings/top/", methods=["GET"])
def top_ratings(user_id, count):
logger.debug("User %s TOP ratings requested", user_id)
top_ratings = recommendation_engine.get_top_ratings(user_id,count)
return json.dumps(top_ratings)
@main.route("//ratings/", methods=["GET"])
def movie_ratings(user_id, movie_id):
logger.debug("User %s rating requested for movie %s", user_id, movie_id)
ratings = recommendation_engine.get_ratings_for_movie_ids(user_id, [movie_id])
return json.dumps(ratings)
@main.route("//ratings", methods = ["POST"])
def add_ratings(user_id):
# get the ratings from the Flask POST request object
ratings_list = request.form.keys()[0].strip().split("\n")
ratings_list = map(lambda x: x.split(","), ratings_list)
# create a list with the format required by the negine (user_id, movie_id, rating)
ratings = map(lambda x: (user_id, int(x[0]), float(x[1])), ratings_list)
# add them to the model using then engine API
recommendation_engine.add_ratings(ratings)
return json.dumps(ratings)
def create_app(spark_context, dataset_path):
global recommendation_engine
recommendation_engine = RecommendationEngine(spark_context, dataset_path)
app = Flask(__name__)
app.register_blueprint(main)
return app
В основном мы используем приложение следующим образом:
- Мы инициализируем, что при звонке
create_appОтказ ЗдесьРекомендации медленноОбъект создан, а затем мы связываем@ main.ruouteАннотации, определенные выше. Каждая аннотация определяется (см. Flask Docs ): - Маршрут, это его URL и может содержать параметры между <>. Они отображаются с аргументами функции.
- Список доступных HTTP-методов.
Есть три из этих аннотаций, определенные, которые соответствуют трем
Рекомендации медленноМетоды:Получить/Получите лучшие рекомендации от двигателя./ratings/top Получить/Получите прогнозируемый рейтинг для индивидуального фильма./ratings Post/Добавьте новые рейтинги. Формат представляет собой серию строк (заканчивая новой сепаратором) с/ratings movie_idиРейтингразделенных запятыми. Например, следующий файл соответствует десяти новых рейтингах пользователя, используемых в качестве примера в руководстве по созданию модели:
260,9 1,8 16,7 25,8 32,9 335,4 379,3 296,7 858,10 50,8
Развертывание сервера WSGI с помощью Cherrypy
Среди прочего, Cherrypy Framework Особенности надежного, HTTP/1.1, совместимый WebServer WebServer WebServer WSGI. Также легко запустить несколько HTTP-серверов (например, на нескольких портах) одновременно. Все это делает его идеальным кандидатом для простого развертывания производственного веб-сервера для нашей онлайн-рекомендации.
Используйте, что мы сделаем из сервера Cherrypy, относительно простой. Снова мы покажем здесь полный server.py Скрипт, а затем объяснить это немного.
import time, sys, cherrypy, os
from paste.translogger import TransLogger
from app import create_app
from pyspark import SparkContext, SparkConf
def init_spark_context():
# load spark context
conf = SparkConf().setAppName("movie_recommendation-server")
# IMPORTANT: pass aditional Python modules to each worker
sc = SparkContext(conf=conf, pyFiles=['engine.py', 'app.py'])
return sc
def run_server(app):
# Enable WSGI access logging via Paste
app_logged = TransLogger(app)
# Mount the WSGI callable object (app) on the root directory
cherrypy.tree.graft(app_logged, '/')
# Set the configuration of the web server
cherrypy.config.update({
'engine.autoreload.on': True,
'log.screen': True,
'server.socket_port': 5432,
'server.socket_host': '0.0.0.0'
})
# Start the CherryPy WSGI web server
cherrypy.engine.start()
cherrypy.engine.block()
if __name__ == "__main__":
# Init spark context and load libraries
sc = init_spark_context()
dataset_path = os.path.join('datasets', 'ml-latest')
app = create_app(sc, dataset_path)
# start web server
run_server(app)
Это довольно стандартное использование Cherrypy Отказ Если мы посмотрим на __main__ Точка входа, мы делаем три вещи:
- Создайте контекст искры, как определено в функции
init_spark_contextпрохождение адированных модулей Python там. - Создайте приложение Flask, вызывая
create_appМы определены вapp.pyОтказ - Запустите сам сервер.
Смотрите следующий раздел о запуске сервера.
Запуск сервера с искрой
Для того, чтобы сервер запустился при удалении доступа к контексту искры и кластера, нам нужно отправить server.py файл на Pyspark Используя Зажигание Отправить Отказ Разные параметры при использовании этой команды лучше объяснены в Искра Docummentaion. В нашем случае мы будем использовать что-то вроде следующего: ~/Spark-1.3.1-bin-hadoop2.6/bin/spark-ade - master spark://169.254.206.2: 7077 --total-Executor-Cors 14 - Dexecutor-Memory 6G Server.py Важные биты:
- Использовать
Зажигание ОтправитьИ неPysparkнапрямую. -
- мастерПараметры должны указывать на настройку вашей Spark Cluster (может быть локальным). - Вы можете пройти дополнительные параметры конфигурации, такие как
- Тотальный исполнитель-сердечникии--Executor-Memory.
Вы увидите вывод, как следующее:
INFO:engine:Starting up the Recommendation Engine:
INFO:engine:Loading Ratings data...
INFO:engine:Loading Movies data...
INFO:engine:Counting movie ratings...
INFO:engine:Training the ALS model...
... More Spark and CherryPy logging
INFO:engine:ALS model built!
[05/Jul/2015:14:06:29] ENGINE Bus STARTING
[05/Jul/2015:14:06:29] ENGINE Started monitor thread 'Autoreloader'.
[05/Jul/2015:14:06:29] ENGINE Started monitor thread '_TimeoutMonitor'.
[05/Jul/2015:14:06:29] ENGINE Serving on http://0.0.0.0:5432
[05/Jul/2015:14:06:29] ENGINE Bus STARTED
Некоторые соображения при использовании нескольких сценариев и Spark-Sead
Есть два вопроса, которые нам нужно работать при использовании искры в развертывании, как это. Первый в том, что искровой кластер является дистрибьютой средой Рабочие Оршестрированный из искры Мастер где запущен сценарий Python. Это означает, что мастер является единственным с доступом к представленному скрипту и локальным дополнительным файлам. Если бы мы хотим, чтобы рабочие могли получить доступ к дополнительным импортированным моем Python, они либо должны быть частью нашего Python Distributubutuon, либо нам нужен неявно. Мы делаем это, используя pyfiles = ['Engine.py', 'app.py'] Параметр при создании SparkContext объект.
Второй вопрос связан с предыдущим, но немного сложнее. В Spark, при использовании преобразований (например map на RDD) мы не можем ссылаться на другие RDDS или объекты, которые не доступны глобально в контексте выполнения. Например, мы не можем ссылаться на переменные экземпляра класса. Из-за этого мы определили все функции, которые передаются в преобразования RDD за пределами Рекомендации класс.
Попробуя сервис
Теперь давайте попробуем службу попробую, используя те же данные, которые мы использовали на руководстве по созданию модели. То есть сначала мы собираемся добавить рейтинги, а затем мы собираемся получить лучшие рейтинги и отдельные рейтинги.
Размещение новых рейтингов
Итак, во-первых, во-первых, нам нужно пройти наш сервис, как объяснено в предыдущем разделе. Когда-то работает, мы будем использовать Curl размещать новые рейтинги из оболочки. Если у нас есть файл user_ratings.file (см. Получение исходного кода ниже) В текущей папке просто выполните следующую команду: Curl --data-binare @ user_ratings.file http:// Замена С IP-адресом, в котором у вас работает сервер (например, localhost или 127.0.0.1 Если его работает локально). Эта команда начнет несколько вычислений и в конечном итоге с выходом, представляющим рейтинги, которые были представлены в виде списка списков (это просто для проверки того, что процесс был успешным).
В окне «Вывод сервера» вы увидите фактические вывод вычисления искры, вместе с выходными сообщениями Cherrypy о HTTP-запросах.
Получить лучшие рекомендации
Этот вы можете сделать это, используя также скручивание, но рендеринг JSON будет лучше, если вы просто используете свой веб-браузер. Просто пойти в http:// Чтобы получить лучшие 10 рекомендаций фильмов для пользователей, которые мы размещены рекомендации. Результаты должны соответствовать тем, кто видел в руководстве по поводу создания модели.
Получение индивидуальных рейтингов
Точно так же мы можем скрутить/перейти к http:// Чтобы получить прогнозируемый рейтинг для фильма Викторина (1994) Отказ
Получение исходного кода
Исходный код для трех файлов Python вместе с дополнительными файлами, которые составляют наш веб-сервис, можно найти в следующие цики Отказ
Выводы
В этом уроке мы построили спокойную API вокруг механизма для искровой рекомендации с использованием микро-каркасных фреймворков колба. Мы также связали API на производственный веб-сервер, такой как предоставленный Cherrypy Framework. Делая так, что у нас есть Масштабируемое Сервис Мы можем использовать для получения персонализированных рекомендаций фильмов для нескольких веб-приложений.
Более того, подход и технологии, описанные здесь, могут использоваться аналогичным образом с другими зажимающими моделями. Например, мы могли бы подключить услугу с моделью обучения машины Mllib для классификации или прогнозирования или с помощью светильника для анализа данных в реальном времени.
То есть искра обеспечивает масштабируемую аналитику данных и, используя его из Python, мы открываем дверь к использованию легких веб-каркасов, таких как колба и Creberpy, которые являются выразительными, мощными и очень простыми в использовании и развертывании.