Рубрики
Без рубрики

Сервер громкой игры с высокой пропускной способностью с WebSockets Python

Отслеживание ошибки в моей игре, подчеркивая по исправлению, и код, чтобы заменить все это. Теги с WebDev, Python, программированием, Геймедугом.

Ошибка возникла во время соревнования с Моя игра Отказ Один из 80 игроков застрял. Как действительно застрял: нарушение дефекта! Ошибка не должна произошла, не может произойти, но это сделало. Единственная возможность была использование стека веб-розетки. Оказывается, что слой не работал как предназначен, и не было никакого способа исправить это. Увы, я искал альтернативное решение.

Здесь я опишу, что я придумал: новый, более целенаправленный стек, используя WebShets Python. Множество COROUTINES, ASYNCIO и очередей. У меня полный рабочий пример в конце этой статьи. Это, вероятно, последнее состояние, в котором он может автономным; Я буду изменять его, чтобы соответствовать еще более крепче с моим игровым кодом.

Моя игра Это многопользовательская игра-головоломка, согласованная сообщениями. Я разделяю каждый экземпляр игры от других. Для сообщений я делаю это с классической концепцией «комнат». Это название, которое также использует Flask-Socketio, и это то, что используется моя первая реализация.

В основном, сообщения в игре не нуждаются в определенном общем порядке. Они могут прибыть в другое приказ различным клиентам. Есть несколько ситуаций, когда это не правда, мест, где мне нужны некоторые сообщения, чтобы иметь определенный общий заказ. Там нет многих из них, и это, вероятно, почему я не заметил дефект раньше.

Когда я впервые начал проект, я спросил, будь то библиотека, если используется на одной клиентской системе, эхо в порядке, может поддерживать заказ клиентам. Ответ был да, но правда нет. Учитывая нагрузку в основном связанной сети, она обычно поддерживает заказ. Только в нескольких временах стресса или из-за глупых удачи, он отправляет сообщения вне заказа.

Хорошо, я, вероятно, могу поставить очередь вокруг него. Тьфу, пропускная способность падает на Abysmal 70msgs/s. Без очереди было уже медленное 1200msg/s, но этого было достаточно для моей игры. После немного назад, я и автор библиотеки не согласны на то, что является приемлемой пропускной способностью.

Так что я схватил WebSockets Вместо этого библиотека собила доказательство концепции и получила 12 000 мсг/с. Да, это больше, как я ожидал.

На самом деле, я бы ожидал еще больше. И долгосрочный, если я получу достаточно трафика, я перепишу это в C ++. Пропускная способность должна быть полностью связана с сетью, но это все еще CPU, связанная на сервере. Я сделал много низкоуровневых сетей, прежде чем узнать, что я могу подтолкнуть его выше, но для моих нужд 12K/S – более чем достаточно. Я, вероятно, масштабирую количество серверов, прежде чем беспокоиться о оптимизации одного из них.

На код!

Модуль «Websockets» – минимальная реализация WebSockets Отказ Это звучало как то, что я хотел. Я не хотел идти на низкий уровень обработки протокола. Это оставило меня, пишущую всю свою логику высокого уровня, в частности клиентов.

Библиотека проста в использовании. У меня есть основной пример, работающий с небольшими усилиями. Конечно, тогда есть много деталей для покрытия. Вот быстрый список вещей, которые мне нужно было поддержать, первые функции:

-Handle входящий клиент, где «ручка» в основном делается в основном в библиотеке – позвоните клиенту присоединиться к игровой комнате (каждый клиент может присоединиться только к одной комнате в моей системе, упрощающий код) «Установить другой клиент, чтобы присоединиться к той же игровой комнате – allowlobla других клиентов присоединиться к другим номерам – выталкиваю клиента, чтобы отправить сообщение -Provide общий заказ к сообщению, с помощью идентификатора сообщения – Dispatch это сообщение для всех клиентов в комнате

В моем финальном коде я сохраню сообщения в магазине Redis, затем в конце концов на монгодб. Это не является частью моего примера кода.

И есть несколько ситуаций или ошибок, которые мне придется иметь дело.

-A клиент отключает чисто или резко – клиент отправляет дерьмо – клиент медленный – замедленный номер в комнате, если в нем больше нет клиентов

Мой сервер поддерживает список клиентов в списке комнат:

@dataclass
class Client:
    socket: Any # What type?
    id: int
    disconnected: bool = False

@dataclass
class Room:
    key: str
    clients: Dict[int,Client] = field(default_factory=dict)
    new_clients: List[Client] = field(default_factory=list)
    msg_id: int = 0
    event_queue: asyncio.Queue = field(default_factory=asyncio.Queue)
    listening: bool = False
    future: Any = None # What Type?

Я использую аннотации типа для моего Python, наряду с Mypy, чтобы проверить типы. Увы, для нескольких классов библиотеки я не уверен в типов. Поскольку многие из них создаются автоматически, или возвращаются из других функций, трудно определить тип. В конечном итоге я пойму все типы.

В этих типах данных розетка является единственной частью, непосредственно подключенной к модулю «Websockets». Он отслеживает входящее соединение и используется для отправки и получения данных.

Вкратце, listay_room Функция обрабатывает входящие клиентские соединения. Я толкаю все сообщения на Event_Queue из Комната Отказ listay_room Функция слушает эту очередь и отправляет сообщения всем клиентам в комнату.

Один слушатель на комнату

Я изначально имел одну очередь прослушивания, которая обрабатывала все комнаты. Когда я в конечном итоге напишу сервер нижнего уровня, как в C ++, я бы сохранил эту структуру. Когда вы получаете недостаточный уровень, вы можете контролировать гораздо больше деталей, удаляя потребность в CITROUTINES полностью.

Но в Python есть несколько причин, по которым я использую один слушатель на комнату:

не накладные расходы -Дедис -плохой клиенты

Накладные расходы – это мой мой код Python, а библиотечный код, окружающий пишу клиентов. Это не много, но он может складывать с большим количеством активности. Я подозреваю, что разборы и форматирование JSON – самая большая часть этого. Но это не причина у меня есть один слушатель на комнату. Поскольку код Python работает как один реальный поток, не имеет значения, является ли этот код в одном слушателе или многих слушателей. Это все неизбежная вычислительная нагрузка.

Первая реальная причина, redis, это хорошо поведение мотиватора. Для каждого исходящего сообщения я должен создать уникальный идентификатор сообщения. В пример кода я отслеживаю это в Python, в Комната класс. На моем финальном сервере я отследую это в целочисленном ключ Redis. Кроме того, я буду хранить все сообщения в списке Redis. Отдельный процесс очистит это регулярно и сохраняет сообщения MongoDB. Вызывы redis найдите время, время, когда сервер может вместо этого обрабатывать сообщения для других помещений. Таким образом, я хочу отделить комнаты. В то время как одна комната ждет redis, остальные могут продолжать обработку.

Вторая причина, плохие клиенты, является неудачной необходимостью. Возможно, что клиент отключен или не удается обработать сообщения достаточно быстро. По большей части это обрабатывается буферами. Звонки на Socket.Send эффективно асинхронные, по меньшей мере, пока очередь не заполнится. Когда это произойдет, Отправить Будет подождать, пока в очереди есть место. В ожидании всех остальных номеров остановится, не в состоянии отправить любые сообщения. Имея одну очередь в комнату, я ограничиваю урон клиента только к этой комнате.

Это, вероятно, произойдет. Во-первых, библиотека WebSockets имеет функцию тайм-аута. Не отвечающие клиентам будут отключены долго, до того, как буферы исходящих сокет. Моя игра просто не генерирует достаточно сообщений, чтобы когда-либо заполнить буферы. Экстраполяция из моего стрессового теста, с предполагаемым средним размером сообщения, есть место для игровых сообщений 25K в стандартных буферах. И типичный проход моей игры, с командой, генерирует только от 3 до 4 тысяч сообщений.

В любом случае, это хорошая защита.

клиенты, new_clients и память

Одним из преимуществ одной реальной нити не требуется беспокоиться о фактических Расы данных Отказ Они просто не происходят, так как они будут в многопоточной приложении. Ура! Нет возможности повреждения памяти.

Это не означает, что гоночные условия, что-то другое, не происходит. Логические проблемы параллелизма все еще существуют, хотя и в меньшей степени. Спасибо Cooperative Threading! Самая значительная проблема в моем коде в моем коде состоит с Клиенты объект. Прослушиватель очереди, итерации над клиентами. Если список модифицирован во время итерации, Python бросит одновременный исключение модификации. Это строгий нет – нет, поскольку итератор не имеет представления о том, что он должен сделать.

Есть три случая, когда список должен быть изменен:

-Когда клиент отключается в listay_socket. -Когда клиент отключается в listay_room. -Когда новый клиент присоединяется к комнате

Сначала я обрабатывал отключение в listay_socket Функция, но через тестирование заметило, что это может быть Socket.Send () Вызов, который обнаруживает отключение сначала. Таким образом, отключение происходит в нескольких местах. В обоих случаях я просто отмечаю клиента как Отключить в Клиент состав. listay_room пропускает отключенные клиенты при отправке сообщений. Это отследит их и благополучно удаляет их из комнаты после петли итерации.

Когда новый клиент присоединяется к комнате, listay_socket добавляет его к new_clients список. listay_room Затем добавляют новые клиенты перед каждым циклом сообщений. Это сделает это сразу после получения сообщения, чтобы убедиться, что все новые клиенты получают сообщение. Это означает, что сообщения комнаты могут прибыть на клиента до «присоединения» ответа от присоединения к комнате. В моей игре, получая этот заказ вместе с отправкой старых сообщений, для клиентов важно получать постоянное игровое состояние. Я, вероятно, придется отрегулировать этот код немного.

В коем случае не делает listay_socket. знать, если это безопасно работать с клиенты , так как он не может сказать Если listay_room находится внутри или снаружи цикла. Блокировка не плохая идея, но она вводит удаленную задержку на входящей стороне прослушивания и задержек в слушатель комнаты. Почему замок, когда мне не нужно?

В ретроспективе может быть недостаток, что большая часть параллелизма COROUTINE является неявной, особенно если использование чего-то вроде Eventlets. Как программист, это менее очевидно, где происходит логическое переключение потока. Вам просто нужно знать, что каждая операция ждут, каждый звонок Asyncio, и каждый вызов Websocket – это потенциальное местоположение для переключателя потока. Было бы приятно сказать, что вы должны предположить, что выключатель возможен в любое время, но тогда я не мог положиться на нее, не включив в некоторые места и потребует куча замков.

Использовать замки Если вы не уверены. Производительность не имеет значения, если ваша игра перерывается. ворчать

Статистика и пропускная способность

Я добавил простой Статистика класс для пропускной способности на сервере. Он излучает время для всех входящих и исходящих сообщений на комнату. 12K/S – это то, что произойдет, если у меня есть несколько клиентов, связанных с уникальными комнатами. Моя машина ударяет, что ограничение 12K с процессом сервера, привязанного при 100% процессора.

К сожалению, я должен скорректировать свой номер до 10k. Как только я переместил комнаты на отдельных слушателей, я ударил гораздо больше над головой. Я не совсем уверен, почему – я не могу представить, что это дополнительное количество COROUTINES. Скорее всего, в асинковых вещах есть некоторые настройки, но все еще достаточно быстро, что я не обеспокоен.

Как любопытство, я измерил один клиент, подключенный к серверу. Это немного немного на 5KMSG/s. Поскольку это клиент и сервер, у меня есть два процесса. Они оба при использовании 58% процессора. В идеале они должны быть на 50% процессора, поскольку они отправляют сообщение взад и вперед. Эта дополнительная 8% обрабатывается, потраченная на вещи, кроме обрабатывающих сообщение. Возможно, если бы я написал систему в C ++, она стала ближе к 50%, но никогда не добраться туда полностью. Пропускная способность, однако, должна подняться.

Когда я говорю, C ++ будет быстрее, это из-за моего опыта. У меня лучше контролировать то, что происходит и умело использовать этот контроль. Легко добраться неправильно и вверх с парящей кучей, которая хуже, чем чистая версия Python. Серверный код жесткий!

Статистика не напрямую измеряет время отклика. Но зная природа пинг-понга в тестах, я могу рассчитать примерно то, что это было. При фракционных миллисекундах на сообщение оно будет шум по сравнению с накладной головкой истинной сети, когда развернута.

Эта статистика я рассчитываю здесь не отлично. Если бы я пытался написать действительно высокопроизводительный сервер, я бы отслеживал средние, стандартные отклонения, крайности и записи все лучше. Числа, которые он показывает, настолько переведен для моей игры, хотя, что нет необходимости для большего.

Теперь мне нужно получить это интегрированное с моим существующим сервером. Я думаю, что потеряю способность использовать один порт и один экземпляр Python. Это не большая потеря, это то, что я собирался делать в какой-то момент, в любом случае. Игровой сервер не должен быть таким же, как веб-сервер. Это как для производительности, так и для стабильности. В конце концов, я должен иметь достаточно нагрузки, я должен запустить несколько игровых серверов (несколько серверов, обработающих веб-сокетные соединения). У меня есть план, чтобы масштабировать это направление, но это будет долгое время, прежде чем доберешься.

Ниже приведен код для сервера, клиента Python и образец клиента для браузера. Хотя детали могут немного измениться, структура останется рядом с этим. Учитывая размер кода, лучше не вращать это в библиотеку. Я прямо сейчас адаптирую это для Моя игра сервер.

Этот код довольно стабилен и должен работать отправной точкой для ваших собственных нужд. Хотя, конечно, я не могу сделать никаких обещаний этого. Я, скорее всего, обнаружим вещи в моем приложении, требующим исправлениях.

ws_server.py

from typing import *
from dataclasses import dataclass, field
import asyncio, websockets, json, time
from collections import defaultdict

#sys.path.append('../server')
#from escape.live_game_state import LiveGameState

def encode_msg(msg: Dict) -> str:
    return json.dumps(msg, ensure_ascii=False)

def decode_msg(text: str) -> Dict:
    return json.loads(text)


@dataclass
class Client:
    socket: Any # What type?
    id: int
    disconnected: bool = False

@dataclass
class Room:
    key: str
    clients: Dict[int,Client] = field(default_factory=dict)
    new_clients: List[Client] = field(default_factory=list)
    msg_id: int = 0
    event_queue: asyncio.Queue = field(default_factory=asyncio.Queue)
    listening: bool = False
    future: Any = None # What Type?

    def client_count(self) -> int:
        return len([c.id for c in self.clients.values() if not c.disconnected])

client_id_count = 0

rooms: Dict[str, Room] = {}


# Used to get a basic idea of throughput
class Stats:
    def __init__(self, name):
        self._name = name
        self._count = 0
        self._time = time.monotonic()

    def incr(self, amount = 1):
        self._count += amount
        if self._count > 5000:
            end_time = time.monotonic()
            print( f'{self._name} {self._count / (end_time-self._time)}/s' )
            self._count = 0
            self._time = end_time


async def listen_room(room):
    if room.listening:
        raise Exception(f'Already listening to {room.key}')

    room.listening = True
    print(f'Listen Room {room.key}')
    stats = Stats(f'Outgoing {room.key}')
    while True:
        qevent = await room.event_queue.get()
        if qevent == None:
            break

        # Add any new clients that have shown up, this handler must control this to avoid it
        # happening inside the loop below
        if len(room.new_clients) > 0:
            for client in room.new_clients:
                room.clients[client.id] = client
            room.new_clients = []

        # In my game I'll track IDs in Redis, to survie unexpected failures.
        # The messages will also be pushed there, to be picked up by another process for DB storage
        room.msg_id += 1
        qevent['msg_id'] = room.msg_id

        count = 0
        disconnected: List[int] = []
        for client in room.clients.values():
            if client.disconnected:
                disconnected.append(client.id)
                continue
            count += 1

            # There's likely some asyncio technique to do this in parallel
            try:
                await client.socket.send(encode_msg(qevent))
            except websockets.ConnectionClosed:
                print("Lost client in send")
                client.disconnected = True
                # Hoping incoming will detect disconnected as well

        stats.incr(count)

        # Remove clients that aren't there anymore. I don't really need this in my game, but it's
        # good to not let long-lived rooms build-up cruft.
        for d in disconnected:
            # Check again since they may have reconnected in other loop
            if room.clients[d]:
                del room.clients[d]

    print(f'Unlisten Room {room.key}')
    room.listening = False


async def listen_socket(websocket, path):
    global rooms, client_id_count
    print("connect", path)
    client_id_count += 1
    room: Optional[Room] = None
    client = Client(id=client_id_count, socket=websocket)

    stats = Stats('Incoming')
    try:
        async for message_raw in websocket:
            message = decode_msg(message_raw)
            if message['type'] == 'join':
                # Get/create room
                room_key = message['room']
                if not room_key in rooms:
                    room = Room(key=room_key)
                    rooms[room_key] = room

                    room.future = asyncio.ensure_future(listen_room(room))
                else:
                    room = rooms[room_key]

                # Add client to the room
                room.new_clients.append(client)

                # Tell the client which id they are.
                await websocket.send(encode_msg({
                    'type': 'joined',
                    'client_id': client.id
                }))

            elif room:
                # Identify message and pass it off to the room queue
                message['client_id'] = client.id
                await room.event_queue.put(message)
            else:
                # Behave as trival echo server if not in room (will be removed in my final version)
                await websocket.send(encode_msg(message))
            stats.incr()
    except websockets.ConnectionClosed:
        pass
    except Exception as e:
        # In case something else happens we want to ditch this client.  This won't come from
        # websockets, but likely the code above, like having a broken JSON message
        print(e)
        pass

    # Only mark disconnected for queue loop on clients isn't broken
    client.disconnected = True
    if room is not None:
        # Though if zero we can kill the listener and clean up fully
        if room.client_count() == 0:
            await room.event_queue.put(None)
            del rooms[room.key]
            await room.future
            print(f"Cleaned Room {room.key}")

    print("disconnect", rooms)


def main() -> None:
    start_server = websockets.serve(listen_socket, "localhost", 8765, ping_interval=5, ping_timeout=5)

    asyncio.get_event_loop().run_until_complete(start_server)

    asyncio.get_event_loop().run_forever()


main()

ws_client.py

Простой клиент, который подтверждает правильное упорядочение сообщений. Предоставить комнату в командной строке.

Есть возможность замедлить этот клиент, который заставляет сервер отключить его, когда буферы заполняют.

Вы отметиете, что мой код теста клиента является более жестким, чем мой код сервера и не хватает многих определений типов. Этот код не будет использоваться долгосрочный, но серверный код будет.

from typing import *
import asyncio, json, websockets, time, sys

if len(sys.argv) < 2:
    print(f"Sytnax {sys.argv[0]} room (delay)" )
    sys.exit(-1)

room = sys.argv[1]
# A non-zero slow creates a client that can't keep up. If there are other clients in the room
# it will end up breaking, causing the server to disconnect it.
slow = 0.0
if len(sys.argv) > 2:
    slow = float(sys.argv[2])

def encode_msg(msg: Dict) -> str:
    return json.dumps(msg, ensure_ascii=False)

def decode_msg(text: str) -> Dict:
    return json.loads(text)

# An even simpler stats tracker than the server 
trigger_count = 5000.0
if slow > 0:
    trigger_count /= (1+slow) * 100


async def reader(websocket):
    count = 0
    seq = 0
    last_time = time.monotonic()
    client_id = None
    last_msg_id = None

    async for message_raw in websocket:
        count += 1
        msg = decode_msg(message_raw)

        if msg['type'] == 'joined':
            client_id = msg['client_id']
        else:
            # Ensure the messages have a single total order
            msg_id = msg['msg_id']
            if last_msg_id is None:
                last_msg_id == msg_id
            else:
                if msg_id != (last_msg_id+1):
                    print(last_msg_id, msg_id)
                    raise Exception("bad msg sequence")

        if msg['type'] == 'ping' and client_id == msg['client_id']:
            # Ensure our own measures retain the order we sent them
            if msg['seq'] != seq:
                print(seq, message_raw)
                raise Exception("bad message seq")

        # Track rough throughput
        if count >= trigger_count:
            next_time = time.monotonic()
            print( f'{count /(next_time - last_time)}/s {room}' )
            last_time = time.monotonic()
            count = 0

        if client_id == msg['client_id']:
            seq += 1
            await websocket.send(encode_msg({'type': 'ping', 'seq': seq }))

        if slow > 0:
            await asyncio.sleep(slow)


async def hello():
    uri = "ws://localhost:8765"
    async with websockets.connect(uri) as websocket:
        print("Connect")
        await websocket.send( encode_msg({ 'type': 'join', 'room': room }) )
        consumer_task = asyncio.ensure_future(
            reader(websocket))
        done = await asyncio.wait(
            [consumer_task],
            return_when=asyncio.FIRST_COMPLETED,
        )


asyncio.get_event_loop().run_until_complete(hello())

ws_client.html.

Простой веб-браузерный клиент, чтобы доказать, что он работает там, где мне это нужно.




    

Text

Оригинал: “https://dev.to/mortoray/high-throughput-game-message-server-with-python-websockets-3mnf”