Автор оригинала: Doug Hellmann.
Цель:
Дождитесь уведомления о готовности входного или выходного канала.
Модуль select
обеспечивает доступ к функциям мониторинга ввода-вывода для конкретной платформы. Наиболее переносимым интерфейсом является функция POSIX select ()
, которая доступна в Unix и Windows. Модуль также включает poll ()
, API только для Unix и несколько параметров, которые работают только с определенными вариантами Unix.
Примечание
Новый модуль селекторов предоставляет интерфейс более высокого уровня, построенный на основе API в select
. Легче создавать переносимый код с помощью selectors
, поэтому используйте этот модуль, если только низкоуровневые API, предоставляемые select
, не требуются каким-либо образом.
Использование select ()
Функция Python select ()
– это прямой интерфейс для реализации базовой операционной системы. Он отслеживает сокеты, открытые файлы и каналы (все, что имеет метод fileno ()
, который возвращает действительный дескриптор файла), пока они не станут доступными для чтения или записи или пока не возникнет ошибка связи. select ()
упрощает одновременный мониторинг нескольких подключений и более эффективен, чем написание цикла опроса в Python с использованием тайм-аутов сокетов, поскольку мониторинг происходит на сетевом уровне операционной системы, а не на переводчик.
Примечание
Использование файловых объектов Python с select ()
работает для Unix, но не поддерживается в Windows.
Пример эхо-сервера из раздела сокетов можно расширить, чтобы отслеживать более одного соединения одновременно, используя select ()
. Новая версия начинается с создания неблокирующего сокета TCP/IP и настройки его для прослушивания адреса.
select_echo_server.py
import select import socket import sys import queue # Create a TCP/IP socket server socket.socket(socket.AF_INET, socket.SOCK_STREAM) server.setblocking(0) # Bind the socket to the port server_address ('localhost', 10000) print('starting up on {} port {}'.format(*server_address), filesys.stderr) server.bind(server_address) # Listen for incoming connections server.listen(5)
Аргументы для select ()
– это три списка, содержащих каналы связи для отслеживания. Первый – это список объектов, которые необходимо проверить на предмет входящих данных для чтения, второй содержит объекты, которые будут получать исходящие данные, когда в их буфере останется место, а третий – те, которые могут иметь ошибку (обычно комбинация объекты входного и выходного каналов). Следующим шагом на сервере является настройка списков, содержащих источники ввода и назначения вывода, которые будут переданы в select ()
.
# Sockets from which we expect to read inputs [server] # Sockets to which we expect to write outputs []
Соединения добавляются и удаляются из этих списков основным циклом сервера. Поскольку эта версия сервера будет ждать, пока сокет станет доступным для записи, прежде чем отправлять какие-либо данные (вместо немедленной отправки ответа), каждому выходному соединению требуется очередь, которая действует как буфер для данных, которые будут отправлены через него.
# Outgoing message queues (socket:Queue) message_queues {}
Основная часть серверной программы зацикливается, вызывая select ()
для блокировки и ожидания сетевой активности.
while inputs: # Wait for at least one of the sockets to be # ready for processing print('waiting for the next event', filesys.stderr) readable, writable, exceptional select.select(inputs, outputs, inputs)
select ()
возвращает три новых списка, содержащих подмножества содержимого переданных списков. Все сокеты в списке readable
имеют входящие данные в буфере и доступны для чтения . Все сокеты в списке доступны для записи
имеют свободное пространство в их буфере и могут быть записаны. В сокетах, возвращенных в исключительном
, произошла ошибка (фактическое определение «исключительного состояния» зависит от платформы).
«Читаемые» сокеты представляют три возможных случая. Если сокет является основным сокетом «сервера», который используется для прослушивания соединений, то условие «читабельность» означает, что он готов принять другое входящее соединение. В дополнение к добавлению нового подключения к списку входов для мониторинга, в этом разделе клиентский сокет не блокируется.
# Handle inputs for s in readable: if s is server: # A "readable" socket is ready to accept a connection connection, client_address s.accept() print(' connection from', client_address, filesys.stderr) connection.setblocking(0) inputs.append(connection) # Give the connection a queue for data # we want to send message_queues[connection] queue.Queue()
Следующий случай – это установленное соединение с клиентом, который отправил данные. Данные считываются с помощью recv ()
, затем помещаются в очередь, чтобы их можно было отправить через сокет и обратно клиенту.
else: data s.recv(1024) if data: # A readable client socket has data print(' received {!r} from {}'.format( data, s.getpeername()), filesys.stderr, ) message_queues[s].put(data) # Add output channel for response if s not in outputs: outputs.append(s)
Доступный для чтения сокет без данных доступен от клиента, который отключился, и поток готов к закрытию.
else: # Interpret empty result as closed connection print(' closing', client_address, filesys.stderr) # Stop listening for input on the connection if s in outputs: outputs.remove(s) inputs.remove(s) s.close() # Remove message queue del message_queues[s]
Меньше случаев для записываемых подключений. Если в очереди есть данные для подключения, отправляется следующее сообщение. В противном случае соединение удаляется из списка выходных соединений, так что следующий раз прохождение цикла select ()
не указывает, что сокет готов к отправке данных.
# Handle outputs for s in writable: try: next_msg message_queues[s].get_nowait() except queue.Empty: # No messages waiting so stop checking # for writability. print(' ', s.getpeername(), 'queue empty', filesys.stderr) outputs.remove(s) else: print(' sending {!r} to {}'.format(next_msg, s.getpeername()), filesys.stderr) s.send(next_msg)
Наконец, если возникает ошибка сокета, он закрывается.
# Handle "exceptional conditions" for s in exceptional: print('exception condition on', s.getpeername(), filesys.stderr) # Stop listening for input on the connection inputs.remove(s) if s in outputs: outputs.remove(s) s.close() # Remove message queue del message_queues[s]
В примере клиентской программы используются два сокета, чтобы продемонстрировать, как сервер с помощью select ()
управляет несколькими подключениями одновременно. Клиент начинает с подключения каждого сокета TCP/IP к серверу.
select_echo_multiclient.py
import socket import sys messages [ 'This is the message. ', 'It will be sent ', 'in parts.', ] server_address ('localhost', 10000) # Create a TCP/IP socket socks [ socket.socket(socket.AF_INET, socket.SOCK_STREAM), socket.socket(socket.AF_INET, socket.SOCK_STREAM), ] # Connect the socket to the port where the server is listening print('connecting to {} port {}'.format(*server_address), filesys.stderr) for s in socks: s.connect(server_address)
Затем он отправляет по одному фрагменту сообщения через каждый сокет и считывает все ответы, доступные после записи новых данных.
for message in messages: outgoing_data message.encode() # Send messages on both sockets for s in socks: print('{}: sending {!r}'.format(s.getsockname(), outgoing_data), filesys.stderr) s.send(outgoing_data) # Read responses on both sockets for s in socks: data s.recv(1024) print('{}: received {!r}'.format(s.getsockname(), data), filesys.stderr) if not data: print('closing socket', s.getsockname(), filesys.stderr) s.close()
Запустите сервер в одном окне, а клиент – в другом. Вывод будет выглядеть так, с разными номерами портов.
$ python3 select_echo_server.py starting up on localhost port 10000 waiting for the next event connection from ('127.0.0.1', 61003) waiting for the next event connection from ('127.0.0.1', 61004) waiting for the next event received b'This is the message. ' from ('127.0.0.1', 61003) received b'This is the message. ' from ('127.0.0.1', 61004) waiting for the next event sending b'This is the message. ' to ('127.0.0.1', 61003) sending b'This is the message. ' to ('127.0.0.1', 61004) waiting for the next event ('127.0.0.1', 61003) queue empty ('127.0.0.1', 61004) queue empty waiting for the next event received b'It will be sent ' from ('127.0.0.1', 61003) received b'It will be sent ' from ('127.0.0.1', 61004) waiting for the next event sending b'It will be sent ' to ('127.0.0.1', 61003) sending b'It will be sent ' to ('127.0.0.1', 61004) waiting for the next event ('127.0.0.1', 61003) queue empty ('127.0.0.1', 61004) queue empty waiting for the next event received b'in parts.' from ('127.0.0.1', 61003) waiting for the next event received b'in parts.' from ('127.0.0.1', 61004) sending b'in parts.' to ('127.0.0.1', 61003) waiting for the next event ('127.0.0.1', 61003) queue empty sending b'in parts.' to ('127.0.0.1', 61004) waiting for the next event ('127.0.0.1', 61004) queue empty waiting for the next event closing ('127.0.0.1', 61004) closing ('127.0.0.1', 61004) waiting for the next event
В выводе клиента отображаются данные, отправляемые и получаемые с использованием обоих сокетов.
$ python3 select_echo_multiclient.py connecting to localhost port 10000 ('127.0.0.1', 61003): sending b'This is the message. ' ('127.0.0.1', 61004): sending b'This is the message. ' ('127.0.0.1', 61003): received b'This is the message. ' ('127.0.0.1', 61004): received b'This is the message. ' ('127.0.0.1', 61003): sending b'It will be sent ' ('127.0.0.1', 61004): sending b'It will be sent ' ('127.0.0.1', 61003): received b'It will be sent ' ('127.0.0.1', 61004): received b'It will be sent ' ('127.0.0.1', 61003): sending b'in parts.' ('127.0.0.1', 61004): sending b'in parts.' ('127.0.0.1', 61003): received b'in parts.' ('127.0.0.1', 61004): received b'in parts.'
Неблокирующий ввод-вывод с таймаутом
select ()
также принимает необязательный четвертый параметр, который представляет собой количество секунд ожидания перед прерыванием мониторинга, если ни один канал не стал активным. Использование значения тайм-аута позволяет основной программе вызывать select ()
как часть более крупного цикла обработки, выполняя другие действия между проверками сетевого ввода.
По истечении времени ожидания select ()
возвращает три пустых списка. Обновление примера сервера для использования тайм-аута требует добавления дополнительного аргумента к вызову select ()
и обработки пустых списков после возврата select ()
.
select_echo_server_timeout.py
readable, writable, exceptional select.select(inputs, outputs, inputs, timeout) if not (readable or writable or exceptional): print(' timed out, do some other work here', filesys.stderr) continue
Эта «медленная» версия клиентской программы делает паузу после отправки каждого сообщения, чтобы имитировать задержку или другую задержку при передаче.
select_echo_slow_client.py
import socket import sys import time # Create a TCP/IP socket sock socket.socket(socket.AF_INET, socket.SOCK_STREAM) # Connect the socket to the port where the server is listening server_address ('localhost', 10000) print('connecting to {} port {}'.format(*server_address), filesys.stderr) sock.connect(server_address) time.sleep(1) messages [ 'Part one of the message.', 'Part two of the message.', ] amount_expected len(''.join(messages)) try: # Send data for message in messages: data message.encode() print('sending {!r}'.format(data), filesys.stderr) sock.sendall(data) time.sleep(1.5) # Look for the response amount_received 0 while amount_received < amount_expected: data sock.recv(16) amount_received len(data) print('received {!r}'.format(data), filesys.stderr) finally: print('closing socket', filesys.stderr) sock.close()
Запуск нового сервера с медленным клиентом дает:
$ python3 select_echo_server_timeout.py starting up on localhost port 10000 waiting for the next event timed out, do some other work here waiting for the next event connection from ('127.0.0.1', 61144) waiting for the next event timed out, do some other work here waiting for the next event received b'Part one of the message.' from ('127.0.0.1', 61144) waiting for the next event sending b'Part one of the message.' to ('127.0.0.1', 61144) waiting for the next event ('127.0.0.1', 61144) queue empty waiting for the next event timed out, do some other work here waiting for the next event received b'Part two of the message.' from ('127.0.0.1', 61144) waiting for the next event sending b'Part two of the message.' to ('127.0.0.1', 61144) waiting for the next event ('127.0.0.1', 61144) queue empty waiting for the next event timed out, do some other work here waiting for the next event closing ('127.0.0.1', 61144) waiting for the next event timed out, do some other work here
И это результат клиента:
$ python3 select_echo_slow_client.py connecting to localhost port 10000 sending b'Part one of the message.' sending b'Part two of the message.' received b'Part one of the ' received b'message.Part two' received b' of the message.' closing socket
Использование poll ()
Функция poll ()
предоставляет функции, аналогичные функции select ()
, но базовая реализация более эффективна. Компромисс заключается в том, что poll ()
не поддерживается в Windows, поэтому программы, использующие poll ()
, менее переносимы.
Эхо-сервер, построенный на poll ()
, запускается с тем же кодом конфигурации сокета, который использовался в других примерах.
select_poll_echo_server.py
import select import socket import sys import queue # Create a TCP/IP socket server socket.socket(socket.AF_INET, socket.SOCK_STREAM) server.setblocking(0) # Bind the socket to the port server_address ('localhost', 10000) print('starting up on {} port {}'.format(*server_address), filesys.stderr) server.bind(server_address) # Listen for incoming connections server.listen(5) # Keep up with the queues of outgoing messages message_queues {}
Значение тайм-аута, переданное в poll ()
, представлено в миллисекундах, а не в секундах, поэтому для того, чтобы сделать паузу на полную секунду, тайм-аут должен быть установлен на 1000
.
# Do not block forever (milliseconds) TIMEOUT 1000
Python реализует poll ()
с классом, который управляет отслеживаемыми зарегистрированными каналами данных. Каналы добавляются путем вызова register ()
с флагами, указывающими, какие события интересны для этого канала. Полный набор флагов приведен в таблице ниже.
Флаги событий для опроса ()
Мероприятие
Описание
ПОЛЛИН
Вход готов
ПОЛЛПРИ
Приоритетный ввод готов
ЗАГРЯЗНЕНИЕ
Возможность получать вывод
ПОЛЛЕРР
Ошибка
ПОЛЛХУП
Канал закрыт
POLLNVAL
Канал не открыт
Эхо-сервер будет настраивать одни сокеты только для чтения, а другие – для чтения или записи. Соответствующие комбинации флагов сохраняются в локальных переменных READ_ONLY
и READ_WRITE
.
# Commonly used flag sets READ_ONLY ( select.POLLIN | select.POLLPRI | select.POLLHUP | select.POLLERR ) READ_WRITE READ_ONLY | select.POLLOUT
Сокет server
зарегистрирован, поэтому любые входящие соединения или данные вызывают событие.
# Set up the poller poller select.poll() poller.register(server, READ_ONLY)
Поскольку poll ()
возвращает список кортежей, содержащих дескриптор файла для сокета и флаг события, необходимо сопоставление номеров файловых дескрипторов с объектами для извлечения socket
в читать или писать оттуда.
# Map file descriptors to socket objects fd_to_socket { server.fileno(): server, }
Цикл сервера вызывает poll ()
, а затем обрабатывает «события», возвращаемые путем поиска сокета и выполнения действий на основе флага в событии.
while True: # Wait for at least one of the sockets to be # ready for processing print('waiting for the next event', filesys.stderr) events poller.poll(TIMEOUT) for fd, flag in events: # Retrieve the actual socket from its file descriptor s fd_to_socket[fd]
Как и в случае с select ()
, когда сокет основного сервера «доступен для чтения», это на самом деле означает наличие ожидающего подключения от клиента. Новое соединение регистрируется с помощью флагов READ_ONLY
для отслеживания новых данных, которые пройдут через него.
# Handle inputs if flag & (select.POLLIN | select.POLLPRI): if s is server: # A readable socket is ready # to accept a connection connection, client_address s.accept() print(' connection', client_address, filesys.stderr) connection.setblocking(0) fd_to_socket[connection.fileno()] connection poller.register(connection, READ_ONLY) # Give the connection a queue for data to send message_queues[connection] queue.Queue()
Другие сокеты, кроме сервера, являются существующими клиентами, и recv ()
используется для доступа к данным, ожидающим чтения.
else: data s.recv(1024)
Если recv ()
возвращает какие-либо данные, они помещаются в исходящую очередь для сокета, а флаги для этого сокета изменяются с помощью modify ()
, поэтому poll ()
будет следить за тем, чтобы сокет был готов к приему данных.
if data: # A readable client socket has data print(' received {!r} from {}'.format( data, s.getpeername()), filesys.stderr, ) message_queues[s].put(data) # Add output channel for response poller.modify(s, READ_WRITE)
Пустая строка, возвращаемая recv ()
, означает, что клиент отключился, поэтому unregister ()
используется для указания объекту poll
игнорировать сокет.
else: # Interpret empty result as closed connection print(' closing', client_address, filesys.stderr) # Stop listening for input on the connection poller.unregister(s) s.close() # Remove message queue del message_queues[s]
Флаг POLLHUP
указывает на клиента, который «разорвал» соединение, не закрывая его полностью. Сервер перестает опрашивать исчезающих клиентов.
elif flag & select.POLLHUP: # Client hung up print(' closing', client_address, '(HUP)', filesys.stderr) # Stop listening for input on the connection poller.unregister(s) s.close()
Обработка доступных для записи сокетов похожа на версию, использованную в примере для select ()
, за исключением того, что modify ()
используется для изменения флагов для сокета в опросчике, вместо того, чтобы удалить его из списка вывода.
elif flag & select.POLLOUT: # Socket is ready to send data, # if there is any to send. try: next_msg message_queues[s].get_nowait() except queue.Empty: # No messages waiting so stop checking print(s.getpeername(), 'queue empty', filesys.stderr) poller.modify(s, READ_ONLY) else: print(' sending {!r} to {}'.format( next_msg, s.getpeername()), filesys.stderr, ) s.send(next_msg)
И, наконец, любые события с POLLERR
заставляют сервер закрывать сокет.
elif flag & select.POLLERR: print(' exception on', s.getpeername(), filesys.stderr) # Stop listening for input on the connection poller.unregister(s) s.close() # Remove message queue del message_queues[s]
Когда сервер на основе опроса запускается вместе с select_echo_multiclient.py
(клиентская программа, использующая несколько сокетов), это результат.
$ python3 select_poll_echo_server.py starting up on localhost port 10000 waiting for the next event waiting for the next event waiting for the next event waiting for the next event connection ('127.0.0.1', 61253) waiting for the next event connection ('127.0.0.1', 61254) waiting for the next event received b'This is the message. ' from ('127.0.0.1', 61253) received b'This is the message. ' from ('127.0.0.1', 61254) waiting for the next event sending b'This is the message. ' to ('127.0.0.1', 61253) sending b'This is the message. ' to ('127.0.0.1', 61254) waiting for the next event ('127.0.0.1', 61253) queue empty ('127.0.0.1', 61254) queue empty waiting for the next event received b'It will be sent ' from ('127.0.0.1', 61253) received b'It will be sent ' from ('127.0.0.1', 61254) waiting for the next event sending b'It will be sent ' to ('127.0.0.1', 61253) sending b'It will be sent ' to ('127.0.0.1', 61254) waiting for the next event ('127.0.0.1', 61253) queue empty ('127.0.0.1', 61254) queue empty waiting for the next event received b'in parts.' from ('127.0.0.1', 61253) received b'in parts.' from ('127.0.0.1', 61254) waiting for the next event sending b'in parts.' to ('127.0.0.1', 61253) sending b'in parts.' to ('127.0.0.1', 61254) waiting for the next event ('127.0.0.1', 61253) queue empty ('127.0.0.1', 61254) queue empty waiting for the next event closing ('127.0.0.1', 61254) waiting for the next event closing ('127.0.0.1', 61254) waiting for the next event
Параметры для конкретной платформы
Менее переносимые параметры, предоставляемые select
: epoll
, API edge polling , поддерживаемый Linux; kqueue
, который использует очередь ядра BSD; и kevent
, интерфейс события ядра BSD. Обратитесь к документации библиотеки операционной системы для получения более подробной информации о том, как они работают.
Смотрите также
- Стандартная библиотека документации для избранных
- селекторы – абстракция более высокого уровня поверх
select
. - HOWOTO по программированию сокетов – учебное руководство Гордона Макмиллана, включенное в стандартную документацию библиотеки.
- socket – низкоуровневая сетевая связь.
SocketServer
– платформа для создания сетевых серверных приложений.- asyncio – фреймворк асинхронного ввода-вывода
- Сетевое программирование Unix, Том 1: Сетевой API сокетов, 3/E У. Ричард Стивенс, Билл Феннер и Эндрю М. Рудофф. Опубликовано Addison-Wesley Professional, 2004. ISBN-10: 0131411551
- Основы сетевого программирования Python, 3/E Авторы Брэндон Родс и Джон Герцен. Опубликовано Apress, 2014. ISBN-10: 1430258543