Рубрики
Без рубрики

select – Ожидайте ввода-вывода эффективно

Автор оригинала: Doug Hellmann.

Цель:

Дождитесь уведомления о готовности входного или выходного канала.

Модуль select обеспечивает доступ к функциям мониторинга ввода-вывода для конкретной платформы. Наиболее переносимым интерфейсом является функция POSIX select () , которая доступна в Unix и Windows. Модуль также включает poll () , API только для Unix и несколько параметров, которые работают только с определенными вариантами Unix.

Примечание

Новый модуль селекторов предоставляет интерфейс более высокого уровня, построенный на основе API в select . Легче создавать переносимый код с помощью selectors , поэтому используйте этот модуль, если только низкоуровневые API, предоставляемые select , не требуются каким-либо образом.

Использование select ()

Функция Python select () – это прямой интерфейс для реализации базовой операционной системы. Он отслеживает сокеты, открытые файлы и каналы (все, что имеет метод fileno () , который возвращает действительный дескриптор файла), пока они не станут доступными для чтения или записи или пока не возникнет ошибка связи. select () упрощает одновременный мониторинг нескольких подключений и более эффективен, чем написание цикла опроса в Python с использованием тайм-аутов сокетов, поскольку мониторинг происходит на сетевом уровне операционной системы, а не на переводчик.

Примечание

Использование файловых объектов Python с select () работает для Unix, но не поддерживается в Windows.

Пример эхо-сервера из раздела сокетов можно расширить, чтобы отслеживать более одного соединения одновременно, используя select () . Новая версия начинается с создания неблокирующего сокета TCP/IP и настройки его для прослушивания адреса.

select_echo_server.py

import select
import socket
import sys
import queue

# Create a TCP/IP socket
server  socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.setblocking(0)

# Bind the socket to the port
server_address  ('localhost', 10000)
print('starting up on {} port {}'.format(*server_address),
      filesys.stderr)
server.bind(server_address)

# Listen for incoming connections
server.listen(5)

Аргументы для select () – это три списка, содержащих каналы связи для отслеживания. Первый – это список объектов, которые необходимо проверить на предмет входящих данных для чтения, второй содержит объекты, которые будут получать исходящие данные, когда в их буфере останется место, а третий – те, которые могут иметь ошибку (обычно комбинация объекты входного и выходного каналов). Следующим шагом на сервере является настройка списков, содержащих источники ввода и назначения вывода, которые будут переданы в select () .

# Sockets from which we expect to read
inputs  [server]

# Sockets to which we expect to write
outputs  []

Соединения добавляются и удаляются из этих списков основным циклом сервера. Поскольку эта версия сервера будет ждать, пока сокет станет доступным для записи, прежде чем отправлять какие-либо данные (вместо немедленной отправки ответа), каждому выходному соединению требуется очередь, которая действует как буфер для данных, которые будут отправлены через него.

# Outgoing message queues (socket:Queue)
message_queues  {}

Основная часть серверной программы зацикливается, вызывая select () для блокировки и ожидания сетевой активности.

while inputs:

    # Wait for at least one of the sockets to be
    # ready for processing
    print('waiting for the next event', filesys.stderr)
    readable, writable, exceptional  select.select(inputs,
                                                    outputs,
                                                    inputs)

select () возвращает три новых списка, содержащих подмножества содержимого переданных списков. Все сокеты в списке readable имеют входящие данные в буфере и доступны для чтения . Все сокеты в списке доступны для записи имеют свободное пространство в их буфере и могут быть записаны. В сокетах, возвращенных в исключительном , произошла ошибка (фактическое определение «исключительного состояния» зависит от платформы).

«Читаемые» сокеты представляют три возможных случая. Если сокет является основным сокетом «сервера», который используется для прослушивания соединений, то условие «читабельность» означает, что он готов принять другое входящее соединение. В дополнение к добавлению нового подключения к списку входов для мониторинга, в этом разделе клиентский сокет не блокируется.

# Handle inputs
    for s in readable:

        if s is server:
            # A "readable" socket is ready to accept a connection
            connection, client_address  s.accept()
            print('  connection from', client_address,
                  filesys.stderr)
            connection.setblocking(0)
            inputs.append(connection)

            # Give the connection a queue for data
            # we want to send
            message_queues[connection]  queue.Queue()

Следующий случай – это установленное соединение с клиентом, который отправил данные. Данные считываются с помощью recv () , затем помещаются в очередь, чтобы их можно было отправить через сокет и обратно клиенту.

else:
            data  s.recv(1024)
            if data:
                # A readable client socket has data
                print('  received {!r} from {}'.format(
                    data, s.getpeername()), filesys.stderr,
                )
                message_queues[s].put(data)
                # Add output channel for response
                if s not in outputs:
                    outputs.append(s)

Доступный для чтения сокет без данных доступен от клиента, который отключился, и поток готов к закрытию.

else:
                # Interpret empty result as closed connection
                print('  closing', client_address,
                      filesys.stderr)
                # Stop listening for input on the connection
                if s in outputs:
                    outputs.remove(s)
                inputs.remove(s)
                s.close()

                # Remove message queue
                del message_queues[s]

Меньше случаев для записываемых подключений. Если в очереди есть данные для подключения, отправляется следующее сообщение. В противном случае соединение удаляется из списка выходных соединений, так что следующий раз прохождение цикла select () не указывает, что сокет готов к отправке данных.

# Handle outputs
    for s in writable:
        try:
            next_msg  message_queues[s].get_nowait()
        except queue.Empty:
            # No messages waiting so stop checking
            # for writability.
            print('  ', s.getpeername(), 'queue empty',
                  filesys.stderr)
            outputs.remove(s)
        else:
            print('  sending {!r} to {}'.format(next_msg,
                                                s.getpeername()),
                  filesys.stderr)
            s.send(next_msg)

Наконец, если возникает ошибка сокета, он закрывается.

# Handle "exceptional conditions"
    for s in exceptional:
        print('exception condition on', s.getpeername(),
              filesys.stderr)
        # Stop listening for input on the connection
        inputs.remove(s)
        if s in outputs:
            outputs.remove(s)
        s.close()

        # Remove message queue
        del message_queues[s]

В примере клиентской программы используются два сокета, чтобы продемонстрировать, как сервер с помощью select () управляет несколькими подключениями одновременно. Клиент начинает с подключения каждого сокета TCP/IP к серверу.

select_echo_multiclient.py

import socket
import sys

messages  [
    'This is the message. ',
    'It will be sent ',
    'in parts.',
]
server_address  ('localhost', 10000)

# Create a TCP/IP socket
socks  [
    socket.socket(socket.AF_INET, socket.SOCK_STREAM),
    socket.socket(socket.AF_INET, socket.SOCK_STREAM),
]

# Connect the socket to the port where the server is listening
print('connecting to {} port {}'.format(*server_address),
      filesys.stderr)
for s in socks:
    s.connect(server_address)

Затем он отправляет по одному фрагменту сообщения через каждый сокет и считывает все ответы, доступные после записи новых данных.

for message in messages:
    outgoing_data  message.encode()

    # Send messages on both sockets
    for s in socks:
        print('{}: sending {!r}'.format(s.getsockname(),
                                        outgoing_data),
              filesys.stderr)
        s.send(outgoing_data)

    # Read responses on both sockets
    for s in socks:
        data  s.recv(1024)
        print('{}: received {!r}'.format(s.getsockname(),
                                         data),
              filesys.stderr)
        if not data:
            print('closing socket', s.getsockname(),
                  filesys.stderr)
            s.close()

Запустите сервер в одном окне, а клиент – в другом. Вывод будет выглядеть так, с разными номерами портов.

$ python3 select_echo_server.py
starting up on localhost port 10000
waiting for the next event
  connection from ('127.0.0.1', 61003)
waiting for the next event
  connection from ('127.0.0.1', 61004)
waiting for the next event
  received b'This is the message. ' from ('127.0.0.1', 61003)
  received b'This is the message. ' from ('127.0.0.1', 61004)
waiting for the next event
  sending b'This is the message. ' to ('127.0.0.1', 61003)
  sending b'This is the message. ' to ('127.0.0.1', 61004)
waiting for the next event
   ('127.0.0.1', 61003) queue empty
   ('127.0.0.1', 61004) queue empty
waiting for the next event
  received b'It will be sent ' from ('127.0.0.1', 61003)
  received b'It will be sent ' from ('127.0.0.1', 61004)
waiting for the next event
  sending b'It will be sent ' to ('127.0.0.1', 61003)
  sending b'It will be sent ' to ('127.0.0.1', 61004)
waiting for the next event
   ('127.0.0.1', 61003) queue empty
   ('127.0.0.1', 61004) queue empty
waiting for the next event
  received b'in parts.' from ('127.0.0.1', 61003)
waiting for the next event
  received b'in parts.' from ('127.0.0.1', 61004)
  sending b'in parts.' to ('127.0.0.1', 61003)
waiting for the next event
   ('127.0.0.1', 61003) queue empty
  sending b'in parts.' to ('127.0.0.1', 61004)
waiting for the next event
   ('127.0.0.1', 61004) queue empty
waiting for the next event
  closing ('127.0.0.1', 61004)
  closing ('127.0.0.1', 61004)
waiting for the next event

В выводе клиента отображаются данные, отправляемые и получаемые с использованием обоих сокетов.

$ python3 select_echo_multiclient.py
connecting to localhost port 10000
('127.0.0.1', 61003): sending b'This is the message. '
('127.0.0.1', 61004): sending b'This is the message. '
('127.0.0.1', 61003): received b'This is the message. '
('127.0.0.1', 61004): received b'This is the message. '
('127.0.0.1', 61003): sending b'It will be sent '
('127.0.0.1', 61004): sending b'It will be sent '
('127.0.0.1', 61003): received b'It will be sent '
('127.0.0.1', 61004): received b'It will be sent '
('127.0.0.1', 61003): sending b'in parts.'
('127.0.0.1', 61004): sending b'in parts.'
('127.0.0.1', 61003): received b'in parts.'
('127.0.0.1', 61004): received b'in parts.'

Неблокирующий ввод-вывод с таймаутом

select () также принимает необязательный четвертый параметр, который представляет собой количество секунд ожидания перед прерыванием мониторинга, если ни один канал не стал активным. Использование значения тайм-аута позволяет основной программе вызывать select () как часть более крупного цикла обработки, выполняя другие действия между проверками сетевого ввода.

По истечении времени ожидания select () возвращает три пустых списка. Обновление примера сервера для использования тайм-аута требует добавления дополнительного аргумента к вызову select () и обработки пустых списков после возврата select () .

select_echo_server_timeout.py

readable, writable, exceptional  select.select(inputs,
                                                    outputs,
                                                    inputs,
                                                    timeout)

    if not (readable or writable or exceptional):
        print('  timed out, do some other work here',
              filesys.stderr)
        continue

Эта «медленная» версия клиентской программы делает паузу после отправки каждого сообщения, чтобы имитировать задержку или другую задержку при передаче.

select_echo_slow_client.py

import socket
import sys
import time

# Create a TCP/IP socket
sock  socket.socket(socket.AF_INET, socket.SOCK_STREAM)

# Connect the socket to the port where the server is listening
server_address  ('localhost', 10000)
print('connecting to {} port {}'.format(*server_address),
      filesys.stderr)
sock.connect(server_address)

time.sleep(1)

messages  [
    'Part one of the message.',
    'Part two of the message.',
]
amount_expected  len(''.join(messages))

try:

    # Send data
    for message in messages:
        data  message.encode()
        print('sending {!r}'.format(data), filesys.stderr)
        sock.sendall(data)
        time.sleep(1.5)

    # Look for the response
    amount_received  0

    while amount_received < amount_expected:
        data  sock.recv(16)
        amount_received  len(data)
        print('received {!r}'.format(data), filesys.stderr)

finally:
    print('closing socket', filesys.stderr)
    sock.close()

Запуск нового сервера с медленным клиентом дает:

$ python3 select_echo_server_timeout.py
starting up on localhost port 10000
waiting for the next event
  timed out, do some other work here
waiting for the next event
  connection from ('127.0.0.1', 61144)
waiting for the next event
  timed out, do some other work here
waiting for the next event
  received b'Part one of the message.' from ('127.0.0.1', 61144)
waiting for the next event
  sending b'Part one of the message.' to ('127.0.0.1', 61144)
waiting for the next event
('127.0.0.1', 61144) queue empty
waiting for the next event
  timed out, do some other work here
waiting for the next event
  received b'Part two of the message.' from ('127.0.0.1', 61144)
waiting for the next event
  sending b'Part two of the message.' to ('127.0.0.1', 61144)
waiting for the next event
('127.0.0.1', 61144) queue empty
waiting for the next event
  timed out, do some other work here
waiting for the next event
closing ('127.0.0.1', 61144)
waiting for the next event
  timed out, do some other work here

И это результат клиента:

$ python3 select_echo_slow_client.py
connecting to localhost port 10000
sending b'Part one of the message.'
sending b'Part two of the message.'
received b'Part one of the '
received b'message.Part two'
received b' of the message.'
closing socket

Использование poll ()

Функция poll () предоставляет функции, аналогичные функции select () , но базовая реализация более эффективна. Компромисс заключается в том, что poll () не поддерживается в Windows, поэтому программы, использующие poll () , менее переносимы.

Эхо-сервер, построенный на poll () , запускается с тем же кодом конфигурации сокета, который использовался в других примерах.

select_poll_echo_server.py

import select
import socket
import sys
import queue

# Create a TCP/IP socket
server  socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.setblocking(0)

# Bind the socket to the port
server_address  ('localhost', 10000)
print('starting up on {} port {}'.format(*server_address),
      filesys.stderr)
server.bind(server_address)

# Listen for incoming connections
server.listen(5)

# Keep up with the queues of outgoing messages
message_queues  {}

Значение тайм-аута, переданное в poll () , представлено в миллисекундах, а не в секундах, поэтому для того, чтобы сделать паузу на полную секунду, тайм-аут должен быть установлен на 1000 .

# Do not block forever (milliseconds)
TIMEOUT  1000

Python реализует poll () с классом, который управляет отслеживаемыми зарегистрированными каналами данных. Каналы добавляются путем вызова register () с флагами, указывающими, какие события интересны для этого канала. Полный набор флагов приведен в таблице ниже.

Флаги событий для опроса ()

Мероприятие

Описание

ПОЛЛИН

Вход готов

ПОЛЛПРИ

Приоритетный ввод готов

ЗАГРЯЗНЕНИЕ

Возможность получать вывод

ПОЛЛЕРР

Ошибка

ПОЛЛХУП

Канал закрыт

POLLNVAL

Канал не открыт

Эхо-сервер будет настраивать одни сокеты только для чтения, а другие – для чтения или записи. Соответствующие комбинации флагов сохраняются в локальных переменных READ_ONLY и READ_WRITE .

# Commonly used flag sets
READ_ONLY  (
    select.POLLIN |
    select.POLLPRI |
    select.POLLHUP |
    select.POLLERR
)
READ_WRITE  READ_ONLY | select.POLLOUT

Сокет server зарегистрирован, поэтому любые входящие соединения или данные вызывают событие.

# Set up the poller
poller  select.poll()
poller.register(server, READ_ONLY)

Поскольку poll () возвращает список кортежей, содержащих дескриптор файла для сокета и флаг события, необходимо сопоставление номеров файловых дескрипторов с объектами для извлечения socket в читать или писать оттуда.

# Map file descriptors to socket objects
fd_to_socket  {
    server.fileno(): server,
}

Цикл сервера вызывает poll () , а затем обрабатывает «события», возвращаемые путем поиска сокета и выполнения действий на основе флага в событии.

while True:

    # Wait for at least one of the sockets to be
    # ready for processing
    print('waiting for the next event', filesys.stderr)
    events  poller.poll(TIMEOUT)

    for fd, flag in events:

        # Retrieve the actual socket from its file descriptor
        s  fd_to_socket[fd]

Как и в случае с select () , когда сокет основного сервера «доступен для чтения», это на самом деле означает наличие ожидающего подключения от клиента. Новое соединение регистрируется с помощью флагов READ_ONLY для отслеживания новых данных, которые пройдут через него.

# Handle inputs
        if flag & (select.POLLIN | select.POLLPRI):

            if s is server:
                # A readable socket is ready
                # to accept a connection
                connection, client_address  s.accept()
                print('  connection', client_address,
                      filesys.stderr)
                connection.setblocking(0)
                fd_to_socket[connection.fileno()]  connection
                poller.register(connection, READ_ONLY)

                # Give the connection a queue for data to send
                message_queues[connection]  queue.Queue()

Другие сокеты, кроме сервера, являются существующими клиентами, и recv () используется для доступа к данным, ожидающим чтения.

else:
                data  s.recv(1024)

Если recv () возвращает какие-либо данные, они помещаются в исходящую очередь для сокета, а флаги для этого сокета изменяются с помощью modify () , поэтому poll () будет следить за тем, чтобы сокет был готов к приему данных.

if data:
                    # A readable client socket has data
                    print('  received {!r} from {}'.format(
                        data, s.getpeername()), filesys.stderr,
                    )
                    message_queues[s].put(data)
                    # Add output channel for response
                    poller.modify(s, READ_WRITE)

Пустая строка, возвращаемая recv () , означает, что клиент отключился, поэтому unregister () используется для указания объекту poll игнорировать сокет.

else:
                    # Interpret empty result as closed connection
                    print('  closing', client_address,
                          filesys.stderr)
                    # Stop listening for input on the connection
                    poller.unregister(s)
                    s.close()

                    # Remove message queue
                    del message_queues[s]

Флаг POLLHUP указывает на клиента, который «разорвал» соединение, не закрывая его полностью. Сервер перестает опрашивать исчезающих клиентов.

elif flag & select.POLLHUP:
            # Client hung up
            print('  closing', client_address, '(HUP)',
                  filesys.stderr)
            # Stop listening for input on the connection
            poller.unregister(s)
            s.close()

Обработка доступных для записи сокетов похожа на версию, использованную в примере для select () , за исключением того, что modify () используется для изменения флагов для сокета в опросчике, вместо того, чтобы удалить его из списка вывода.

elif flag & select.POLLOUT:
            # Socket is ready to send data,
            # if there is any to send.
            try:
                next_msg  message_queues[s].get_nowait()
            except queue.Empty:
                # No messages waiting so stop checking
                print(s.getpeername(), 'queue empty',
                      filesys.stderr)
                poller.modify(s, READ_ONLY)
            else:
                print('  sending {!r} to {}'.format(
                    next_msg, s.getpeername()), filesys.stderr,
                )
                s.send(next_msg)

И, наконец, любые события с POLLERR заставляют сервер закрывать сокет.

elif flag & select.POLLERR:
            print('  exception on', s.getpeername(),
                  filesys.stderr)
            # Stop listening for input on the connection
            poller.unregister(s)
            s.close()

            # Remove message queue
            del message_queues[s]

Когда сервер на основе опроса запускается вместе с select_echo_multiclient.py (клиентская программа, использующая несколько сокетов), это результат.

$ python3 select_poll_echo_server.py
starting up on localhost port 10000
waiting for the next event
waiting for the next event
waiting for the next event
waiting for the next event
  connection ('127.0.0.1', 61253)
waiting for the next event
  connection ('127.0.0.1', 61254)
waiting for the next event
  received b'This is the message. ' from ('127.0.0.1', 61253)
  received b'This is the message. ' from ('127.0.0.1', 61254)
waiting for the next event
  sending b'This is the message. ' to ('127.0.0.1', 61253)
  sending b'This is the message. ' to ('127.0.0.1', 61254)
waiting for the next event
('127.0.0.1', 61253) queue empty
('127.0.0.1', 61254) queue empty
waiting for the next event
  received b'It will be sent ' from ('127.0.0.1', 61253)
  received b'It will be sent ' from ('127.0.0.1', 61254)
waiting for the next event
  sending b'It will be sent ' to ('127.0.0.1', 61253)
  sending b'It will be sent ' to ('127.0.0.1', 61254)
waiting for the next event
('127.0.0.1', 61253) queue empty
('127.0.0.1', 61254) queue empty
waiting for the next event
  received b'in parts.' from ('127.0.0.1', 61253)
  received b'in parts.' from ('127.0.0.1', 61254)
waiting for the next event
  sending b'in parts.' to ('127.0.0.1', 61253)
  sending b'in parts.' to ('127.0.0.1', 61254)
waiting for the next event
('127.0.0.1', 61253) queue empty
('127.0.0.1', 61254) queue empty
waiting for the next event
  closing ('127.0.0.1', 61254)
waiting for the next event
  closing ('127.0.0.1', 61254)
waiting for the next event

Параметры для конкретной платформы

Менее переносимые параметры, предоставляемые select : epoll , API edge polling , поддерживаемый Linux; kqueue , который использует очередь ядра BSD; и kevent , интерфейс события ядра BSD. Обратитесь к документации библиотеки операционной системы для получения более подробной информации о том, как они работают.

Смотрите также

  • Стандартная библиотека документации для избранных
  • селекторы – абстракция более высокого уровня поверх select .
  • HOWOTO по программированию сокетов – учебное руководство Гордона Макмиллана, включенное в стандартную документацию библиотеки.
  • socket – низкоуровневая сетевая связь.
  • SocketServer – платформа для создания сетевых серверных приложений.
  • asyncio – фреймворк асинхронного ввода-вывода
  • Сетевое программирование Unix, Том 1: Сетевой API сокетов, 3/E У. Ричард Стивенс, Билл Феннер и Эндрю М. Рудофф. Опубликовано Addison-Wesley Professional, 2004. ISBN-10: 0131411551
  • Основы сетевого программирования Python, 3/E Авторы Брэндон Родс и Джон Герцен. Опубликовано Apress, 2014. ISBN-10: 1430258543