Автор оригинала: Doug Hellmann.
До этого момента все примеры избегали смешивания операций параллелизма и ввода-вывода, чтобы сосредоточиться на одной концепции за раз. Однако переключение контекстов при блоках ввода-вывода является одним из основных вариантов использования asyncio
. Основываясь на уже представленных концепциях параллелизма, в этом разделе исследуются два примера программ, реализующих простой эхо-сервер и клиент, аналогичные примерам, используемым в разделах сокетов и сокетов. Клиент может подключиться к серверу, отправить некоторые данные, а затем получить те же данные в качестве ответа. Каждый раз, когда инициируется операция ввода-вывода, исполняемый код передает управление циклу событий, позволяя другим задачам выполняться до тех пор, пока ввод-вывод не будет готов.
Эхо-сервер
Сервер начинает с импорта модулей, необходимых для настройки asyncio
и ведения журнала, а затем создает объект цикла событий.
asyncio_echo_server_protocol.py
import asyncio import logging import sys SERVER_ADDRESS ('localhost', 10000) logging.basicConfig( levellogging.DEBUG, format'%(name)s: %(message)s', streamsys.stderr, ) log logging.getLogger('main') event_loop asyncio.get_event_loop()
Затем он определяет подкласс asyncio.Protocol
для обработки взаимодействия с клиентом. Методы объекта протокола вызываются на основе событий, связанных с серверным сокетом.
class EchoServer(asyncio.Protocol):
Каждое новое клиентское соединение вызывает вызов connection_made ()
. Аргумент транспорта – это экземпляр asyncio.Transport
, который предоставляет абстракцию для выполнения асинхронного ввода-вывода с использованием сокета. Различные типы связи обеспечивают разные реализации транспорта с использованием одного и того же API. Например, существуют отдельные классы транспорта для работы с сокетами и для работы с конвейерами к подпроцессам. Адрес входящего клиента доступен из транспорта через get_extra_info ()
, метод, зависящий от реализации.
def connection_made(self, transport): self.transport transport self.address transport.get_extra_info('peername') self.log logging.getLogger( 'EchoServer_{}_{}'.format(*self.address) ) self.log.debug('connection accepted')
После установления соединения при отправке данных от клиента на сервер вызывается метод протокола data_received ()
для передачи данных для обработки. Данные передаются в виде байтовой строки, и приложение должно декодировать их соответствующим образом. Здесь результаты регистрируются, а затем ответ немедленно отправляется обратно клиенту путем вызова transport.write ()
.
def data_received(self, data): self.log.debug('received {!r}'.format(data)) self.transport.write(data) self.log.debug('sent {!r}'.format(data))
Некоторые транспорты поддерживают специальный индикатор конца файла («EOF»). Когда встречается EOF, вызывается метод eof_received ()
. В этой реализации EOF отправляется обратно клиенту, чтобы указать, что он был получен. Поскольку не все транспорты поддерживают явный EOF, этот протокол сначала спрашивает транспорт, безопасно ли отправлять EOF.
def eof_received(self): self.log.debug('received EOF') if self.transport.can_write_eof(): self.transport.write_eof()
Когда соединение закрывается, обычно или в результате ошибки, вызывается метод протокола connection_lost ()
. Если произошла ошибка, аргумент содержит соответствующий объект исключения. В противном случае это None
.
def connection_lost(self, error): if error: self.log.error('ERROR: {}'.format(error)) else: self.log.debug('closing') super().connection_lost(error)
Запуск сервера состоит из двух шагов. Сначала приложение сообщает циклу обработки событий создать новый объект сервера, используя класс протокола, имя хоста и сокет, на котором он будет прослушиваться. Метод create_server ()
– это сопрограмма, поэтому результаты должны обрабатываться циклом событий, чтобы фактически запустить сервер. При завершении сопрограммы создается экземпляр asyncio.Server
, связанный с циклом событий.
# Create the server and let the loop finish the coroutine before # starting the real event loop. factory event_loop.create_server(EchoServer, *SERVER_ADDRESS) server event_loop.run_until_complete(factory) log.debug('starting up on {} port {}'.format(*SERVER_ADDRESS))
Затем необходимо запустить цикл обработки событий для обработки событий и обработки клиентских запросов. Для долго работающей службы метод run_forever ()
– самый простой способ сделать это. Когда цикл обработки событий останавливается либо кодом приложения, либо сигналом процесса, сервер может быть закрыт для правильной очистки сокета, а затем цикл событий может быть закрыт, чтобы завершить обработку любых других сопрограмм до выхода из программы.
# Enter the event loop permanently to handle all connections. try: event_loop.run_forever() finally: log.debug('closing server') server.close() event_loop.run_until_complete(server.wait_closed()) log.debug('closing event loop') event_loop.close()
Эхо-клиент
Создание клиента с использованием класса протокола очень похоже на создание сервера. Код снова начинается с импорта модулей, необходимых для настройки asyncio
и ведения журнала, а затем создания объекта цикла событий.
asyncio_echo_client_protocol.py
import asyncio import functools import logging import sys MESSAGES [ b'This is the message. ', b'It will be sent ', b'in parts.', ] SERVER_ADDRESS ('localhost', 10000) logging.basicConfig( levellogging.DEBUG, format'%(name)s: %(message)s', streamsys.stderr, ) log logging.getLogger('main') event_loop asyncio.get_event_loop()
Класс клиентского протокола определяет те же методы, что и сервер, с разными реализациями. Конструктор класса принимает два аргумента: список сообщений для отправки и экземпляр Future
для использования, чтобы сигнализировать о том, что клиент завершил цикл работы, получив ответ от сервера.
class EchoClient(asyncio.Protocol): def __init__(self, messages, future): super().__init__() self.messages messages self.log logging.getLogger('EchoClient') self.f future
Когда клиент успешно подключается к серверу, он немедленно начинает связь. Последовательность сообщений отправляется по одному, хотя основной сетевой код может объединять несколько сообщений в одну передачу. Когда все сообщения исчерпаны, отправляется EOF.
Хотя кажется, что все данные отправляются немедленно, на самом деле транспортный объект буферизует исходящие данные и устанавливает обратный вызов для фактической передачи, когда буфер сокета готов к приему данных. Все это выполняется прозрачно, поэтому код приложения может быть написан так, как будто операция ввода-вывода выполняется сразу.
def connection_made(self, transport): self.transport transport self.address transport.get_extra_info('peername') self.log.debug( 'connecting to {} port {}'.format(*self.address) ) # This could be transport.writelines() except that # would make it harder to show each part of the message # being sent. for msg in self.messages: transport.write(msg) self.log.debug('sending {!r}'.format(msg)) if transport.can_write_eof(): transport.write_eof()
Когда получен ответ от сервера, он регистрируется.
def data_received(self, data): self.log.debug('received {!r}'.format(data))
И когда либо получен маркер конца файла, либо соединение закрывается со стороны сервера, локальный транспортный объект закрывается, а будущий объект помечается как выполненный путем установки результата.
def eof_received(self): self.log.debug('received EOF') self.transport.close() if not self.f.done(): self.f.set_result(True) def connection_lost(self, exc): self.log.debug('server closed connection') self.transport.close() if not self.f.done(): self.f.set_result(True) super().connection_lost(exc)
Обычно класс протокола передается в цикл событий для создания соединения. В этом случае, поскольку цикл событий не имеет средства для передачи дополнительных аргументов конструктору протокола, необходимо создать partial
, чтобы обернуть клиентский класс и передать список сообщений для отправки и < code> Будущий экземпляр. Этот новый вызываемый объект затем используется вместо класса при вызове create_connection ()
для установления клиентского соединения.
client_completed asyncio.Future() client_factory functools.partial( EchoClient, messagesMESSAGES, futureclient_completed, ) factory_coroutine event_loop.create_connection( client_factory, *SERVER_ADDRESS, )
Чтобы запустить клиент, цикл событий вызывается один раз с сопрограммой для создания клиента, а затем снова с экземпляром Future
, предоставленным клиенту для связи, когда он завершится. Использование двух подобных вызовов позволяет избежать бесконечного цикла в клиентской программе, которая, вероятно, захочет завершить работу после завершения связи с сервером. Если для ожидания создания клиента сопрограммой использовался только первый вызов, она могла бы не обработать все данные ответа и не очистить соединение с сервером должным образом.
log.debug('waiting for client to complete') try: event_loop.run_until_complete(factory_coroutine) event_loop.run_until_complete(client_completed) finally: log.debug('closing event loop') event_loop.close()
Выход
Запуск сервера в одном окне и клиента в другом дает следующий результат.
$ python3 asyncio_echo_client_protocol.py asyncio: Using selector: KqueueSelector main: waiting for client to complete EchoClient: connecting to ::1 port 10000 EchoClient: sending b'This is the message. ' EchoClient: sending b'It will be sent ' EchoClient: sending b'in parts.' EchoClient: received b'This is the message. It will be sent in parts.' EchoClient: received EOF EchoClient: server closed connection main: closing event loop $ python3 asyncio_echo_client_protocol.py asyncio: Using selector: KqueueSelector main: waiting for client to complete EchoClient: connecting to ::1 port 10000 EchoClient: sending b'This is the message. ' EchoClient: sending b'It will be sent ' EchoClient: sending b'in parts.' EchoClient: received b'This is the message. It will be sent in parts.' EchoClient: received EOF EchoClient: server closed connection main: closing event loop $ python3 asyncio_echo_client_protocol.py asyncio: Using selector: KqueueSelector main: waiting for client to complete EchoClient: connecting to ::1 port 10000 EchoClient: sending b'This is the message. ' EchoClient: sending b'It will be sent ' EchoClient: sending b'in parts.' EchoClient: received b'This is the message. It will be sent in parts.' EchoClient: received EOF EchoClient: server closed connection main: closing event loop
Хотя клиент всегда отправляет сообщения отдельно, при первом запуске клиентом сервер получает одно большое сообщение и возвращает его клиенту. Эти результаты различаются при последующих запусках в зависимости от того, насколько загружена сеть и очищаются ли сетевые буферы до подготовки всех данных.
$ python3 asyncio_echo_server_protocol.py asyncio: Using selector: KqueueSelector main: starting up on localhost port 10000 EchoServer_::1_63347: connection accepted EchoServer_::1_63347: received b'This is the message. It will be sent in parts.' EchoServer_::1_63347: sent b'This is the message. It will be sent in parts.' EchoServer_::1_63347: received EOF EchoServer_::1_63347: closing EchoServer_::1_63387: connection accepted EchoServer_::1_63387: received b'This is the message. ' EchoServer_::1_63387: sent b'This is the message. ' EchoServer_::1_63387: received b'It will be sent in parts.' EchoServer_::1_63387: sent b'It will be sent in parts.' EchoServer_::1_63387: received EOF EchoServer_::1_63387: closing EchoServer_::1_63389: connection accepted EchoServer_::1_63389: received b'This is the message. It will be sent ' EchoServer_::1_63389: sent b'This is the message. It will be sent ' EchoServer_::1_63389: received b'in parts.' EchoServer_::1_63389: sent b'in parts.' EchoServer_::1_63389: received EOF EchoServer_::1_63389: closing