Рубрики
Без рубрики

Асинхронный ввод-вывод с абстракциями класса протокола

Автор оригинала: Doug Hellmann.

До этого момента все примеры избегали смешивания операций параллелизма и ввода-вывода, чтобы сосредоточиться на одной концепции за раз. Однако переключение контекстов при блоках ввода-вывода является одним из основных вариантов использования asyncio . Основываясь на уже представленных концепциях параллелизма, в этом разделе исследуются два примера программ, реализующих простой эхо-сервер и клиент, аналогичные примерам, используемым в разделах сокетов и сокетов. Клиент может подключиться к серверу, отправить некоторые данные, а затем получить те же данные в качестве ответа. Каждый раз, когда инициируется операция ввода-вывода, исполняемый код передает управление циклу событий, позволяя другим задачам выполняться до тех пор, пока ввод-вывод не будет готов.

Эхо-сервер

Сервер начинает с импорта модулей, необходимых для настройки asyncio и ведения журнала, а затем создает объект цикла событий.

asyncio_echo_server_protocol.py

import asyncio
import logging
import sys

SERVER_ADDRESS  ('localhost', 10000)

logging.basicConfig(
    levellogging.DEBUG,
    format'%(name)s: %(message)s',
    streamsys.stderr,
)
log  logging.getLogger('main')

event_loop  asyncio.get_event_loop()

Затем он определяет подкласс asyncio.Protocol для обработки взаимодействия с клиентом. Методы объекта протокола вызываются на основе событий, связанных с серверным сокетом.

class EchoServer(asyncio.Protocol):

Каждое новое клиентское соединение вызывает вызов connection_made () . Аргумент транспорта – это экземпляр asyncio.Transport , который предоставляет абстракцию для выполнения асинхронного ввода-вывода с использованием сокета. Различные типы связи обеспечивают разные реализации транспорта с использованием одного и того же API. Например, существуют отдельные классы транспорта для работы с сокетами и для работы с конвейерами к подпроцессам. Адрес входящего клиента доступен из транспорта через get_extra_info () , метод, зависящий от реализации.

def connection_made(self, transport):
        self.transport  transport
        self.address  transport.get_extra_info('peername')
        self.log  logging.getLogger(
            'EchoServer_{}_{}'.format(*self.address)
        )
        self.log.debug('connection accepted')

После установления соединения при отправке данных от клиента на сервер вызывается метод протокола data_received () для передачи данных для обработки. Данные передаются в виде байтовой строки, и приложение должно декодировать их соответствующим образом. Здесь результаты регистрируются, а затем ответ немедленно отправляется обратно клиенту путем вызова transport.write () .

def data_received(self, data):
        self.log.debug('received {!r}'.format(data))
        self.transport.write(data)
        self.log.debug('sent {!r}'.format(data))

Некоторые транспорты поддерживают специальный индикатор конца файла («EOF»). Когда встречается EOF, вызывается метод eof_received () . В этой реализации EOF отправляется обратно клиенту, чтобы указать, что он был получен. Поскольку не все транспорты поддерживают явный EOF, этот протокол сначала спрашивает транспорт, безопасно ли отправлять EOF.

def eof_received(self):
        self.log.debug('received EOF')
        if self.transport.can_write_eof():
            self.transport.write_eof()

Когда соединение закрывается, обычно или в результате ошибки, вызывается метод протокола connection_lost () . Если произошла ошибка, аргумент содержит соответствующий объект исключения. В противном случае это None .

def connection_lost(self, error):
        if error:
            self.log.error('ERROR: {}'.format(error))
        else:
            self.log.debug('closing')
        super().connection_lost(error)

Запуск сервера состоит из двух шагов. Сначала приложение сообщает циклу обработки событий создать новый объект сервера, используя класс протокола, имя хоста и сокет, на котором он будет прослушиваться. Метод create_server () – это сопрограмма, поэтому результаты должны обрабатываться циклом событий, чтобы фактически запустить сервер. При завершении сопрограммы создается экземпляр asyncio.Server , связанный с циклом событий.

# Create the server and let the loop finish the coroutine before
# starting the real event loop.
factory  event_loop.create_server(EchoServer, *SERVER_ADDRESS)
server  event_loop.run_until_complete(factory)
log.debug('starting up on {} port {}'.format(*SERVER_ADDRESS))

Затем необходимо запустить цикл обработки событий для обработки событий и обработки клиентских запросов. Для долго работающей службы метод run_forever () – самый простой способ сделать это. Когда цикл обработки событий останавливается либо кодом приложения, либо сигналом процесса, сервер может быть закрыт для правильной очистки сокета, а затем цикл событий может быть закрыт, чтобы завершить обработку любых других сопрограмм до выхода из программы.

# Enter the event loop permanently to handle all connections.
try:
    event_loop.run_forever()
finally:
    log.debug('closing server')
    server.close()
    event_loop.run_until_complete(server.wait_closed())
    log.debug('closing event loop')
    event_loop.close()

Эхо-клиент

Создание клиента с использованием класса протокола очень похоже на создание сервера. Код снова начинается с импорта модулей, необходимых для настройки asyncio и ведения журнала, а затем создания объекта цикла событий.

asyncio_echo_client_protocol.py

import asyncio
import functools
import logging
import sys

MESSAGES  [
    b'This is the message. ',
    b'It will be sent ',
    b'in parts.',
]
SERVER_ADDRESS  ('localhost', 10000)

logging.basicConfig(
    levellogging.DEBUG,
    format'%(name)s: %(message)s',
    streamsys.stderr,
)
log  logging.getLogger('main')

event_loop  asyncio.get_event_loop()

Класс клиентского протокола определяет те же методы, что и сервер, с разными реализациями. Конструктор класса принимает два аргумента: список сообщений для отправки и экземпляр Future для использования, чтобы сигнализировать о том, что клиент завершил цикл работы, получив ответ от сервера.

class EchoClient(asyncio.Protocol):

    def __init__(self, messages, future):
        super().__init__()
        self.messages  messages
        self.log  logging.getLogger('EchoClient')
        self.f  future

Когда клиент успешно подключается к серверу, он немедленно начинает связь. Последовательность сообщений отправляется по одному, хотя основной сетевой код может объединять несколько сообщений в одну передачу. Когда все сообщения исчерпаны, отправляется EOF.

Хотя кажется, что все данные отправляются немедленно, на самом деле транспортный объект буферизует исходящие данные и устанавливает обратный вызов для фактической передачи, когда буфер сокета готов к приему данных. Все это выполняется прозрачно, поэтому код приложения может быть написан так, как будто операция ввода-вывода выполняется сразу.

def connection_made(self, transport):
        self.transport  transport
        self.address  transport.get_extra_info('peername')
        self.log.debug(
            'connecting to {} port {}'.format(*self.address)
        )
        # This could be transport.writelines() except that
        # would make it harder to show each part of the message
        # being sent.
        for msg in self.messages:
            transport.write(msg)
            self.log.debug('sending {!r}'.format(msg))
        if transport.can_write_eof():
            transport.write_eof()

Когда получен ответ от сервера, он регистрируется.

def data_received(self, data):
        self.log.debug('received {!r}'.format(data))

И когда либо получен маркер конца файла, либо соединение закрывается со стороны сервера, локальный транспортный объект закрывается, а будущий объект помечается как выполненный путем установки результата.

def eof_received(self):
        self.log.debug('received EOF')
        self.transport.close()
        if not self.f.done():
            self.f.set_result(True)

    def connection_lost(self, exc):
        self.log.debug('server closed connection')
        self.transport.close()
        if not self.f.done():
            self.f.set_result(True)
        super().connection_lost(exc)

Обычно класс протокола передается в цикл событий для создания соединения. В этом случае, поскольку цикл событий не имеет средства для передачи дополнительных аргументов конструктору протокола, необходимо создать partial , чтобы обернуть клиентский класс и передать список сообщений для отправки и < code> Будущий экземпляр. Этот новый вызываемый объект затем используется вместо класса при вызове create_connection () для установления клиентского соединения.

client_completed  asyncio.Future()

client_factory  functools.partial(
    EchoClient,
    messagesMESSAGES,
    futureclient_completed,
)
factory_coroutine  event_loop.create_connection(
    client_factory,
    *SERVER_ADDRESS,
)

Чтобы запустить клиент, цикл событий вызывается один раз с сопрограммой для создания клиента, а затем снова с экземпляром Future , предоставленным клиенту для связи, когда он завершится. Использование двух подобных вызовов позволяет избежать бесконечного цикла в клиентской программе, которая, вероятно, захочет завершить работу после завершения связи с сервером. Если для ожидания создания клиента сопрограммой использовался только первый вызов, она могла бы не обработать все данные ответа и не очистить соединение с сервером должным образом.

log.debug('waiting for client to complete')
try:
    event_loop.run_until_complete(factory_coroutine)
    event_loop.run_until_complete(client_completed)
finally:
    log.debug('closing event loop')
    event_loop.close()

Выход

Запуск сервера в одном окне и клиента в другом дает следующий результат.

$ python3 asyncio_echo_client_protocol.py
asyncio: Using selector: KqueueSelector
main: waiting for client to complete
EchoClient: connecting to ::1 port 10000
EchoClient: sending b'This is the message. '
EchoClient: sending b'It will be sent '
EchoClient: sending b'in parts.'
EchoClient: received b'This is the message. It will be sent in parts.'
EchoClient: received EOF
EchoClient: server closed connection
main: closing event loop

$ python3 asyncio_echo_client_protocol.py
asyncio: Using selector: KqueueSelector
main: waiting for client to complete
EchoClient: connecting to ::1 port 10000
EchoClient: sending b'This is the message. '
EchoClient: sending b'It will be sent '
EchoClient: sending b'in parts.'
EchoClient: received b'This is the message. It will be sent in parts.'
EchoClient: received EOF
EchoClient: server closed connection
main: closing event loop

$ python3 asyncio_echo_client_protocol.py
asyncio: Using selector: KqueueSelector
main: waiting for client to complete
EchoClient: connecting to ::1 port 10000
EchoClient: sending b'This is the message. '
EchoClient: sending b'It will be sent '
EchoClient: sending b'in parts.'
EchoClient: received b'This is the message. It will be sent in parts.'
EchoClient: received EOF
EchoClient: server closed connection
main: closing event loop

Хотя клиент всегда отправляет сообщения отдельно, при первом запуске клиентом сервер получает одно большое сообщение и возвращает его клиенту. Эти результаты различаются при последующих запусках в зависимости от того, насколько загружена сеть и очищаются ли сетевые буферы до подготовки всех данных.

$ python3 asyncio_echo_server_protocol.py
asyncio: Using selector: KqueueSelector
main: starting up on localhost port 10000
EchoServer_::1_63347: connection accepted
EchoServer_::1_63347: received b'This is the message. It will be sent in parts.'
EchoServer_::1_63347: sent b'This is the message. It will be sent in parts.'
EchoServer_::1_63347: received EOF
EchoServer_::1_63347: closing

EchoServer_::1_63387: connection accepted
EchoServer_::1_63387: received b'This is the message. '
EchoServer_::1_63387: sent b'This is the message. '
EchoServer_::1_63387: received b'It will be sent in parts.'
EchoServer_::1_63387: sent b'It will be sent in parts.'
EchoServer_::1_63387: received EOF
EchoServer_::1_63387: closing

EchoServer_::1_63389: connection accepted
EchoServer_::1_63389: received b'This is the message. It will be sent '
EchoServer_::1_63389: sent b'This is the message. It will be sent '
EchoServer_::1_63389: received b'in parts.'
EchoServer_::1_63389: sent b'in parts.'
EchoServer_::1_63389: received EOF
EchoServer_::1_63389: closing