Рубрики
Без рубрики

Асинхронный ввод / вывод, используя COROUTINES и TORESS

Автор оригинала: Doug Hellmann.

В этом разделе рассматриваются альтернативные версии двух образцов программ, реализующих простой сервер Echo и клиент, используя COROUTINES и ASYNCIO потоков API вместо протокола и абстракций транспортировки. Примеры работают на более низком уровне абстракции, чем протокол API, но обработанные ранее событиями событиями являются аналогичными.

Echo server.

Сервер запускается, импортируя модули, которые необходимо настроить <код> ASYNCIO и журнал, а затем создает объект цикла событий.

asyncio_echo_server_coroutine.py

import asyncio
import logging
import sys

SERVER_ADDRESS  ('localhost', 10000)
logging.basicConfig(
    levellogging.DEBUG,
    format'%(name)s: %(message)s',
    streamsys.stderr,
)
log  logging.getLogger('main')

event_loop  asyncio.get_event_loop()

Затем он определяет COROUTINE для обработки коммуникации. Каждый раз, когда клиент подключается, будет вызываться новый экземпляр COROUTINE, так что в функции код подключается только с одним клиентом одновременно. Язык Runtime Python управляет состоянием для каждого экземпляра COROUTINE, поэтому код приложения не должен управлять любыми дополнительными структурами данных для отслеживания отдельных клиентов.

Аргументы в COROUTENINE – Streamreader и <код> streamwriter экземпляры, связанные с новым соединением. Как и в случае с Transport , клиентский адрес можно получить через метод Writer <код> get_extra_info () .

async def echo(reader, writer):
    address  writer.get_extra_info('peername')
    log  logging.getLogger('echo_{}_{}'.format(*address))
    log.debug('connection accepted')

Хотя COROUTINE вызывается, когда соединение установлено, пока не может быть никаких данных для чтения. Чтобы избежать блокировки во время чтения, COROUTINE использует <код> ждать с вызовом ждать () , чтобы позволить цикле события продолжать обработку других задач, пока нет данных для чтения.

while True:
        data  await reader.read(128)

Если клиент отправляет данные, возвращается из <код> a ждать и может быть отправлен обратно клиенту, передавая его к писателю. Несколько вызовов на WRITE () могут использоваться для выходящих данных буфера, а затем <код> сливают () используется для промывки результатов. С момента промывки сети ввода/вывода можно заблокировать, снова <код> a ждать используется для восстановления элементов управления в контуре события, что следит за сокеткой записи и вызывает писатель, когда можно отправить больше данных.

if data:
            log.debug('received {!r}'.format(data))
            writer.write(data)
            await writer.drain()
            log.debug('sent {!r}'.format(data))

Если клиент не отправил никаких данных, READ () возвращает пустую байтовую строку, чтобы указать, что соединение закрыто. Сервер должен закрыть розетку для записи клиенту, а затем COROUTINE может вернуться, чтобы указать, что он закончен.

else:
            log.debug('closing')
            writer.close()
            return

Есть два шага для запуска сервера. Сначала приложение говорит о цикле событий для создания нового объекта сервера, используя COROUTINE и имя хоста и сокета, на котором для прослушивания. Метод start_server () сам по себе является COROUTINE, поэтому результаты должны быть обработаны контуром события, чтобы фактически запустить сервер. Завершение COROUTINE производит <код> asyncio.server привязанный к контуре события.

# Create the server and let the loop finish the coroutine before
# starting the real event loop.
factory  asyncio.start_server(echo, *SERVER_ADDRESS)
server  event_loop.run_until_complete(factory)
log.debug('starting up on {} port {}'.format(*SERVER_ADDRESS))

Затем петля событий должна быть запускается, чтобы обработать события и обрабатывать запросы клиентов. Для длительной службы <код> run_forever () – самый простой способ сделать это. Когда цикл событий останавливается либо по коду приложения, либо путем сигнализации процесса, сервер может быть закрыт для правильной очистки сокета, а затем цикл событий может быть закрыт для завершения обработки любых других Ciboutines перед выходами на программу.

# Enter the event loop permanently to handle all connections.
try:
    event_loop.run_forever()
except KeyboardInterrupt:
    pass
finally:
    log.debug('closing server')
    server.close()
    event_loop.run_until_complete(server.wait_closed())
    log.debug('closing event loop')
    event_loop.close()

Echo Client.

Построение клиента с использованием COROUTINE очень похоже на построение сервера. Код снова начинается, импортируя модули, которые необходимо настроить ASYNCIO и LOGUATING, а затем создание объекта Loop события.

asyncio_echo_client_coroutine.py

import asyncio
import logging
import sys

MESSAGES  [
    b'This is the message. ',
    b'It will be sent ',
    b'in parts.',
]
SERVER_ADDRESS  ('localhost', 10000)

logging.basicConfig(
    levellogging.DEBUG,
    format'%(name)s: %(message)s',
    streamsys.stderr,
)
log  logging.getLogger('main')

event_loop  asyncio.get_event_loop()

<Код> echo_client coroutine принимает аргументы, рассказывая ей, где сервер и какие сообщения отправлять.

async def echo_client(address, messages):

COROUTINE вызывается, когда задача начинается, но не имеет активного подключения к работе. Поэтому первый шаг, чтобы клиент устанавливал свое собственное соединение. Он использует a ждать , чтобы избежать блокировки другой активности, когда <код> open_connection () работает COROUTINE.

log  logging.getLogger('echo_client')

    log.debug('connecting to {} port {}'.format(*address))
    reader, writer  await asyncio.open_connection(*address)

Open_Connection () COROUTINE возвращает <код> streamreader и <код> streamwriter экземпляры, связанные с новым сокетом. Следующим шагом является использование писателя для отправки данных на сервер. Как и на сервере, писатель будет буфет исходящие данные, пока сокет не будет готов или Drint () для промывки результатов. С момента промывки сети ввода/вывода можно заблокировать, снова <код> a ждать используется для восстановления элементов управления в контуре события, что следит за сокеткой записи и вызывает писатель, когда можно отправить больше данных.

# This could be writer.writelines() except that
    # would make it harder to show each part of the message
    # being sent.
    for msg in messages:
        writer.write(msg)
        log.debug('sending {!r}'.format(msg))
    if writer.can_write_eof():
        writer.write_eof()
    await writer.drain()

Затем клиент ищет ответ с сервера, пытаясь прочитать данные, пока не осталось ничего читать. Чтобы избежать блокировки индивидуального READ () CALL, a ждать дает контроль обратно к контуре события. Если сервер отправил данные, это заново. Если сервер не отправил никаких данных, READ () возвращает пустую байтовую строку, чтобы указать, что соединение закрыто. Клиент должен закрыть розетку для отправки на сервер, а затем вернуться, чтобы указать, что он закончен.

log.debug('waiting for response')
    while True:
        data  await reader.read(128)
        if data:
            log.debug('received {!r}'.format(data))
        else:
            log.debug('closing')
            writer.close()
            return

Чтобы запустить клиент, цикл события вызывается с COROUTINE для создания клиента. Использование RUN_UNTIL_COMPLETE () позволяет избежать бесконечного цикла в клиентской программе. В отличие от примера протокола, не требуется отдельное будущее для сигнала, когда COROUTINE завершится, потому что <код> echo_client () содержит все саму логику клиента, и она не возвращается до тех пор, пока она не получила ответ и закрывающую Серверное соединение.

try:
    event_loop.run_until_complete(
        echo_client(SERVER_ADDRESS, MESSAGES)
    )
finally:
    log.debug('closing event loop')
    event_loop.close()

Выход

Запуск сервера в одном окне и клиент в другом производит следующий вывод.

$ python3 asyncio_echo_client_coroutine.py
asyncio: Using selector: KqueueSelector
echo_client: connecting to localhost port 10000
echo_client: sending b'This is the message. '
echo_client: sending b'It will be sent '
echo_client: sending b'in parts.'
echo_client: waiting for response
echo_client: received b'This is the message. It will be sent in parts.'
echo_client: closing
main: closing event loop

$ python3 asyncio_echo_client_coroutine.py
asyncio: Using selector: KqueueSelector
echo_client: connecting to localhost port 10000
echo_client: sending b'This is the message. '
echo_client: sending b'It will be sent '
echo_client: sending b'in parts.'
echo_client: waiting for response
echo_client: received b'This is the message. It will be sent in parts.'
echo_client: closing
main: closing event loop

$ python3 asyncio_echo_client_coroutine.py
asyncio: Using selector: KqueueSelector
echo_client: connecting to localhost port 10000
echo_client: sending b'This is the message. '
echo_client: sending b'It will be sent '
echo_client: sending b'in parts.'
echo_client: waiting for response
echo_client: received b'This is the message. It will be sent '
echo_client: received b'in parts.'
echo_client: closing
main: closing event loop

Хотя клиент всегда отправляет сообщения отдельно, в первые два раза клиент запускает сервер, получает одно большое сообщение и отголовать обратно к клиенту. Эти результаты варьируются в последующих прогонах, основанные на том, насколько занят сеть, и могут ли сетевые буферы промыты до того, как все данные получают.

$ python3 asyncio_echo_server_coroutine.py
asyncio: Using selector: KqueueSelector
main: starting up on localhost port 10000
echo_::1_64624: connection accepted
echo_::1_64624: received b'This is the message. It will be sent in parts.'
echo_::1_64624: sent b'This is the message. It will be sent in parts.'
echo_::1_64624: closing

echo_::1_64626: connection accepted
echo_::1_64626: received b'This is the message. It will be sent in parts.'
echo_::1_64626: sent b'This is the message. It will be sent in parts.'
echo_::1_64626: closing

echo_::1_64627: connection accepted
echo_::1_64627: received b'This is the message. It will be sent '
echo_::1_64627: sent b'This is the message. It will be sent '
echo_::1_64627: received b'in parts.'
echo_::1_64627: sent b'in parts.'
echo_::1_64627: closing