Рубрики
Без рубрики

Построение широковещательной платформы на Python

Мы описываем решение для разработки сервисов потоковой передачи данных. Мы покажем, как бережливое решение может быть построено с помощью простой, но эффективной 3-уровневой архитектуры, которая приводит к созданию высоко настраиваемого программного продукта.

Автор оригинала: Pietro Grandinetti PhD.

Аннотация – В данной статье мы опишем решение для разработки сервисов потоковой передачи данных. В отличие от платных платформ с большими данными, мы показываем, как бережливое решение может быть построено с помощью простой, но эффективной 3-уровневой архитектуры, которая приводит к созданию высоко настраиваемого программного продукта. Мы показываем примеры кода на Python, которые используют различные современные инструменты, такие как Django, Django-каналы и websockets.

Edit – Работа, описанная в этой статье, теперь находится в прямом эфире по адресу katiedj.com

Ключевые слова : Python, Django, MVP, Websockets

Вступление

Ученые и исследователи программного обеспечения часто сталкиваются с общей ситуацией, когда их работа принесла некоторые интересные результаты с точки зрения анализа данных, которыми они хотели бы поделиться с сообществом.

На самом деле открытые платформы, такие как GitHub, позволяют пользователям публиковать программное обеспечение в обмен на видимость и (возможно, что более важно) обратную связь. Кроме того, совместное использование наборов данных является хорошо известным методом повышения качества данных, а также анализа программного обеспечения.

С другой стороны, часто бывает так, что источник данных не может быть обнародован — например, данные могут генерироваться датчиками (в этом случае мы будем говорить о “реальных” данных) или проприетарной программной системой, которая генерирует журнал в реальном времени (в этом случае мы будем говорить о “синтетических” данных).

Один из самых интересных сценариев возникает, когда стартап почти непреднамеренно создал бережливый продукт, создавая более крупный, и становится возможным получить некоторый первоначальный доход, продавая первый. Продажа его напрямую, через посредство, поставила бы под угрозу более крупные планы запуска, потенциально раскрывая секреты конкурентам.

Таким образом, решение состоит в том, чтобы построить систему вещания, которая сохраняет в тайне способ генерации данных, но продает доступ к самой трансляции.

Вышеприведенные рассуждения являются основной мотивацией нашей работы. Эта статья будет идейно-плотной. Поэтому мы начинаем с обзора контента, облегчающего чтение.

Там будет три раздела, каждый из которых описывает одну часть архитектуры (см. Также рисунок ниже):

  • Система вещания. Нам нравится думать об этом как о радио, которое посылает сообщения тому, кто слушает правильные каналы. Однако, чтобы улучшить модульность всей системы, радио также строится для того, чтобы принимать от внешней системы данные, которые она затем будет транслировать. В нашем приложении мы построили этот модуль на Python, используя Django и Django-каналы.
  • Слушатель(слушатели). Они были бы клиентами, если бы речь шла о стандартной клиент-серверной архитектуре — это агенты, прослушивающие радиоканалы. Мы построили эту часть на Python, используя модуль websockets.
  • Система генератора данных. Это частный модуль, детали реализации которого не сообщаются слушателям радио. Он производит новые данные с помощью нераскрытого программного обеспечения, а затем отправляет их на радио в формате, которым можно поделиться (возможно, продать). Эта часть может быть построена на любом языке, или даже может быть реальной системой датчиков, или любым устройством “интернета вещей”.
Система генератора данных. Это частный модуль, детали реализации которого не сообщаются слушателям радио. Он производит новые данные с помощью нераскрытого программного обеспечения, а затем отправляет их на радио в формате, которым можно поделиться (возможно, продать). Эта часть может быть построена на любом языке, или даже может быть реальной системой датчиков, или любым устройством

Прежде чем перейти к деталям реализации, мы хотели бы подчеркнуть некоторые положительные моменты такой архитектуры:

  • Четкое разделение между генератором данных и радио (т. е. широковещательным сервером) делает их очень гибкими и настраиваемыми.
  • Как следствие, над двумя модулями могут работать разные люди: легко представить себе специалистов по обработке данных, работающих над генератором данных, и веб-разработчиков, работающих над радио.
  • Как дальнейшее следствие, архитектура полностью независима от языка: мы сказали, что построили все на Python, но на самом деле вполне возможно построить генератор данных на R или C++ (возможно, если это система с высокими вычислениями), а трансляцию на Ruby или Java, просто чтобы назвать несколько альтернатив. Двум модулям нужно только договориться о формате обмениваемых данных, который, скорее всего, будет JSON или XML.
  • В качестве бизнес-модели мы сказали, что наиболее естественным было бы продать доступ к общедоступным каналам для получения данных. На самом деле, даже более тонкая модель имеет смысл: продажа доступа к частному API, который публикует данные.

Давайте теперь продолжим статью, обсудив более технические детали реализации различных модулей.

Широковещательный сервер (Радио)

Основными задачами сервера должны быть получение данных от Генератора данных и публикация их для извлечения клиентами. Таким образом, эти требования переводятся в:

  • Конечная точка API, которая используется генератором данных каждый раз, когда готов новый образец.
  • Публичные каналы, где клиенты регистрируются, чтобы получить новые данные после их публикации.

Стоит отметить, что два вышеприведенных пункта основаны на очень разных поведениях: с одной стороны (первый пункт) используется API, а это значит, что он должен быть вызван другой стороной. С другой стороны (второй пункт), данные просто передаются в эфир, независимо от того, кто их слушает.

Почему такая разница? Проще говоря, было бы очень дорого построить конечную точку, которая может быть использована слушателем, особенно если есть много подключенных слушателей, тогда стандартный подход REST API не является лучшим выбором дизайна — мы использовали для него Websocket.

Напротив, Генератор данных-это управляемая система, поэтому он может свободно вызывать конечную точку RESTful всякий раз, когда новый образец готов к использованию.

Мы использовали Python для создания этого модуля и чистый Django 2.0 для API. У нас есть Django-приложение под названием publisher, чье urls.py файл очень прост:

# urls.py
from django.urls import path
from . import views

app_name = 'publisher'

urlpatterns = [
    path('publish', views.PublisherView.as_view(), name='main'),
]

Обратите внимание , что мы используем функцию path , новую в Django 2.0. Таким образом, конечная точка подвергается воздействию пути /publish . Теперь давайте покажем представление Django, которое получает данные от генератора данных и публикует их в каналах.

# views.py
class PublisherView(View):

    def post(self, request):
        try:
            key = request.META['HTTP_API_KEY']
            if not key in settings.API_KEYS:
                return JsonResponse(
                    {'error': 'API-KEY not valid'}, status=400)
        except:
            return JsonResponse(
                {'error': 'API-KEY missing in headers'}, status=400)
        try:
            body = json.loads(request.body.decode('utf-8'))
        except:
            return JsonResponse(
                {'error': 'POST data is not JSON'}, status=400)
        try:
            group = body['network']
        except:
            return JsonResponse(
                {'error': '*network* key missing'}, status=400)
        if group not in settings.API_KEYS[key]:
            return JsonResponse(
                {'error': 'You cannot broadcast to this channel'}, status=403)
        if not 'data' in body:
            return JsonResponse(
                {'error': '*data* key missing'}, status=400)
        if not isinstance(body['data'], dict):
            return JsonResponse(
                {'error': 'Can only broadcast json data'}, status=400)
        # all tests are OK
        Group(group).send({'text': json.dumps(body['data'])})
        return JsonResponse({'message': 'OK'}, status=200)

Давайте также прокомментируем поведение этого представления шаг за шагом:

  1. Конечная точка включена для POST-запросов.
  2. Заголовок запроса должен содержать поле с именем API-KEY . Кроме того, это поле должно соответствовать одному из включенных ключей (хранится в settings.API_KEYS ). Это очень стандартный способ защиты конечной точки. В принципе, только пользователи, у которых есть секретный личный буквенно-цифровой ключ, могут действительно использовать API.
  3. Тело запроса должно быть в хорошо сформированном формате JSON.
  4. Тело должно содержать в поле network имя допустимой группы, то есть имя канала, на который будут отправляться данные.
  5. Имя такого канала должно быть среди каналов, которые включены для данного конкретного API-КЛЮЧА. Это делается для того, чтобы избежать того, что авторизованный пользователь отправляет данные на канал, на который он не должен их отправлять.
  6. Данные, которые будут транслироваться, должны быть в формате JSON.
  7. Наконец, после того, как все тесты пройдены, данные передаются по каналу. Обратите внимание, что мы используем объект Group пакета Django-channels.

Давайте теперь “соединим точки” и покажем модуль, который обрабатывает соединения, то есть каналы. Как мы уже упоминали, мы использовали пакет Django-channels, который в основном реализует протокол Websocket очень по-джанго. На самом деле, используя Django-каналы, становится чрезвычайно легко обрабатывать простые соединения слушателей. Вот весь модуль (очень мало строк!):

# consumers.py
from channels.generic import websockets

class MyBroadcast(websockets.WebsocketConsumer):

    http_user = True
    strict_ordering = False

    def connection_groups(self, **kwargs):
        """
        Called to return the list of groups to automatically add/remove
        this connection to/from.
        """
        return [self.channel_name]

    def receive(self, text=None, bytes=None, **kwargs):
        # Not talking!
        pass

class SampleBroadcast(MyBroadcast):
    channel_name = "sample_net"

Чтобы дать больше понимания, каждый подкласс MyBroadcast просто должен создать имя своего собственного канала ( sample_net в приведенном выше коде), а остальное автоматически обрабатывается большой работой, проделанной внутри пакета Django-channels.

Слушатель трансляции

Хотя самой интересной частью архитектуры является Широковещательный сервер, радио бесполезно, когда его никто не слушает. Поэтому в нашей концепции также важно предоставить готовые к использованию клиентские модули, которые пользователи могут просто загрузить и запустить для получения данных.

В идеале эти модули должны быть опубликованы (например, в GitHub), а затем опытные пользователи могут настроить поведение в соответствии со своими потребностями, отправить запросы на слияние и внести свой вклад в дальнейшем.

Кроме того, в идеале мы должны предоставить клиентские модули на нескольких языках (Python, R, MatLab и т. Д.), Чтобы каждый пользователь мог использовать свою любимую среду для анализа данных.

Давайте теперь покажем, как написать подобный модуль на Python. Мы использовали отличный пакет websockets, построенный поверх стандартной библиотеки asyncio.

# radio_listener.py
class WSClient():

    def __init__(self, url):
        self.url = url
        # constant values, but can be passed as params
        self.reply_timeout = 10
        self.ping_timeout = 5
        self.sleep_time = 5

    async def listen_forever(self):
        while True:
            logger.debug('Creating new connection...')
            try:
                async with websockets.connect(self.url) as ws:
                    while True:
                        try:
                            reply = await asyncio.wait_for(ws.recv(), timeout=self.reply_timeout)
                        except (asyncio.TimeoutError, websockets.exceptions.ConnectionClosed):
                            try:
                                pong = await ws.ping()
                                await asyncio.wait_for(pong, timeout=self.ping_timeout)
                                logger.debug('Ping OK, keeping connection alive...')
                                continue
                            except:
                                logger.debug('Ping error - retrying connection in {} sec (Ctrl-C to quit)'.format(self.sleep_time))
                                await asyncio.sleep(self.sleep_time)
                                break
                        # Here do something with the data
                        logger.debug('Got data > {}'.format(reply))
            except socket.gaierror:
                logger.debug('Socket error - retrying connection in {} sec (Ctrl-C to quit)'.format(self.sleep_time))
                await asyncio.sleep(self.sleep_time)
               	continue
            except ConnectionRefusedError:
                logger.debug('Nobody seems to listen to this URL')
                logger.debug('Exiting...')
                break

Приведенный выше код определяет класс WSClient и использует стандартные методы асинхронного программирования и повторения соединения при работе с веб-приложениями.

Короче говоря, он ждет данных с сервера в бесконечном цикле и каждый раз, когда соединение терпит неудачу (или кажется, что терпит неудачу), сначала отправляет сообщение ping, а затем перезапускает новое соединение. Согласно хорошей практике программирования, почти каждое событие регистрируется (используя стандартное ведение журнала библиотеки Python, мы намеренно пропустили подробности о нем).

Генератор данных

Хотя мы сохранили Генератор данных в качестве последнего раздела этой статьи, это, скорее всего, ядро приложения. Даже более того, это, вероятно, что-то, что было построено по совершенно другой причине, и только позже было понято, что больше людей могут быть заинтересованы в использовании одних и тех же данных.

Как мы уже упоминали ранее, Генератором данных может быть практически что угодно: система датчиков, извлекающих данные из системы, сеть мобильных телефонов, отправляющих данные о местоположении GPS, и/или программное обеспечение, созданное учеными, которое вычисляет некоторую сложную математическую модель.

В нашем случае Генератор данных представлял собой программное обеспечение Python, способное моделировать поведение транспортных средств, предсказывать повороты на перекрестках, учитывать светофоры и в конечном итоге вычислять среднюю плотность транспортных средств на каждой улице крупных городов.

По сути, это симулятор дорожного движения. Общая идея трансляции на самом деле основана на том факте, что мы хотели бы поделиться фактическими данными, а не математикой, лежащей в основе модели. Заинтересованные читатели могут найти (очень) исчерпывающее обсуждение в этой рукописи (не вводите себя в заблуждение титульным листом: рукопись написана на английском языке).

Простой подход к периодической публикации данных, вычисляемых генератором, заключается в следующем:

import requests, json
import secret

def publish(net_name, jdata):
    url = secret.URL
    headers = {'content-type': 'application/json'}
    headers['API-KEY'] = secret.API_KEYS[net_name]
    payload = {'network': net_name}
    payload['data'] = jdata
    resp = requests.post(
        url,
        data=json.dumps(payload),
        headers=headers)
    return resp.status_code

def radio_publish():
    # initialize "net" object
    …
    #
    while True:
        jdata = net.dump_state()
        try:
            resp = publish(net.name, jdata)
            time.sleep(net.step)
        except:
            break
        net.next()
    logger.debug('Last published data is > {}'.format(jdata))
    return jdata

Эти две функции должны быть очень ясны, если вспомнить API широковещательного сервера. Обратите внимание, что в методе radio_publish существует бесконечный цикл, который сначала вызывает net.dump_state для получения последнего вычисленного образца данных, затем публикует его, а затем вызывает net.next , чтобы пригласить вычисление нового образца. Конечно, возможны и другие, более сложные парадигмы, такие как программирование на основе событий.

Выводы и перспективы

В этой статье мы показали путь к созданию платформы потоковой передачи данных. Мы выразили практические мотивы с помощью реальных бизнес-примеров, а также показали основу такой потоковой платформы, построенной на Python.

Мы несколько раз подчеркивали гибкость, которую дает наш выбор дизайна: сетевое разделение между тремя модулями позволяет быстро создавать прототипы и разрабатывать их, а также распределять командные ресурсы.

Наконец, остается множество путей, заслуживающих дальнейшего изучения: какова наилучшая инфраструктура для поддержки развертывания такой архитектуры, какова наилучшая коммуникационная парадигма между генератором данных и широковещательным сервером и, возможно, более интересно, как сообщество с открытым исходным кодом примет и внесет свой вклад в этот тип проектирования?