Автор оригинала: Doug Hellmann.
Цель:
Обеспечивает поточно-ориентированную реализацию FIFO
Модуль queue
предоставляет структуру данных «первым пришел – первым обслужен» (FIFO), подходящую для многопоточного программирования. Его можно использовать для безопасной передачи сообщений или других данных между потоками-производителями и потребителями. Блокировка выполняется для вызывающей стороны, поэтому многие потоки могут безопасно и легко работать с одним и тем же экземпляром Queue
. Размер Queue
(количество содержащихся в ней элементов) может быть ограничен для ограничения использования памяти или обработки.
Примечание
Это обсуждение предполагает, что вы уже понимаете общую природу очереди. Если вы этого не сделаете, вы можете прочитать некоторые ссылки, прежде чем продолжить.
Базовая очередь FIFO
Класс Queue
реализует базовый контейнер “первым пришел – первым ушел”. Элементы добавляются к одному «концу» последовательности с помощью put ()
и удаляются с другого конца с помощью get ()
.
queue_fifo.py
import queue q queue.Queue() for i in range(5): q.put(i) while not q.empty(): print(q.get(), end' ') print()
В этом примере используется один поток, чтобы показать, что элементы удаляются из очереди в том же порядке, в котором они вставляются.
$ python3 queue_fifo.py 0 1 2 3 4
Очередь LIFO
В отличие от стандартной реализации FIFO Queue
, LifoQueue
использует порядок «последний пришел – первый ушел» (обычно связанный со структурой данных стека).
queue_lifo.py
import queue q queue.LifoQueue() for i in range(5): q.put(i) while not q.empty(): print(q.get(), end' ') print()
Элемент, который был помещен
в очередь последним, удаляется с помощью get
.
$ python3 queue_lifo.py 4 3 2 1 0
Приоритетная очередь
Иногда порядок обработки элементов в очереди должен основываться на характеристиках этих элементов, а не только на порядке их создания или добавления в очередь. Например, задания на печать из отдела заработной платы могут иметь приоритет над списком кода, который разработчик хочет распечатать. PriorityQueue
использует порядок сортировки содержимого очереди, чтобы решить, какой элемент извлечь.
queue_priority.py
import functools import queue import threading @functools.total_ordering class Job: def __init__(self, priority, description): self.priority priority self.description description print('New job:', description) return def __eq__(self, other): try: return self.priority other.priority except AttributeError: return NotImplemented def __lt__(self, other): try: return self.priority < other.priority except AttributeError: return NotImplemented q queue.PriorityQueue() q.put(Job(3, 'Mid-level job')) q.put(Job(10, 'Low-level job')) q.put(Job(1, 'Important job')) def process_job(q): while True: next_job q.get() print('Processing job:', next_job.description) q.task_done() workers [ threading.Thread(targetprocess_job, args(q,)), threading.Thread(targetprocess_job, args(q,)), ] for w in workers: w.setDaemon(True) w.start() q.join()
В этом примере есть несколько потоков, использующих задания, которые обрабатываются в зависимости от приоритета элементов в очереди на момент вызова get ()
. Порядок обработки элементов, добавленных в очередь, во время работы потребительских потоков зависит от переключения контекста потока.
$ python3 queue_priority.py New job: Mid-level job New job: Low-level job New job: Important job Processing job: Important job Processing job: Mid-level job Processing job: Low-level job
Создание клиента для потокового подкаста
Исходный код для клиента подкастинга в этом разделе демонстрирует, как использовать класс Queue
с несколькими потоками. Программа считывает один или несколько RSS-каналов, ставит в очередь приложения для пяти последних выпусков из каждого канала для загрузки и обрабатывает несколько загрузок параллельно с использованием потоков. У него недостаточно обработки ошибок для производственного использования, но его скелетная реализация иллюстрирует использование модуля queue
.
Сначала устанавливаются некоторые рабочие параметры. Обычно они поступают из пользовательского ввода (например, предпочтений или базы данных). В примере используются жестко запрограммированные значения количества потоков и списка URL-адресов для выборки.
fetch_podcasts.py
from queue import Queue import threading import time import urllib from urllib.parse import urlparse import feedparser # Set up some global variables num_fetch_threads 2 enclosure_queue Queue() # A real app wouldn't use hard-coded data... feed_urls [ 'http://talkpython.fm/episodes/rss', ] def message(s): print('{}: {}'.format(threading.current_thread().name, s))
Функция download_enclosures ()
запускается в рабочем потоке и обрабатывает загрузки с помощью urllib
.
def download_enclosures(q): """This is the worker thread function. It processes items in the queue one after another. These daemon threads go into an infinite loop, and exit only when the main thread ends. """ while True: message('looking for the next enclosure') url q.get() filename url.rpartition('/')[-1] message('downloading {}'.format(filename)) response urllib.request.urlopen(url) data response.read() # Save the downloaded file to the current directory message('writing to {}'.format(filename)) with open(filename, 'wb') as outfile: outfile.write(data) q.task_done()
Как только целевая функция для потоков определена, можно запускать рабочие потоки. Когда download_enclosures ()
обрабатывает оператор url q.get ()
, он блокируется и ожидает, пока в очереди не появится что-нибудь для возврата. Это означает, что можно безопасно запускать потоки до того, как в очереди будет что-либо.
# Set up some threads to fetch the enclosures for i in range(num_fetch_threads): worker threading.Thread( targetdownload_enclosures, args(enclosure_queue,), name'worker-{}'.format(i), ) worker.setDaemon(True) worker.start()
Следующим шагом является получение содержимого канала с помощью модуля feedparser
и постановка URL-адресов вложений в очередь. Как только первый URL-адрес добавляется в очередь, один из рабочих потоков берет его и начинает загрузку. Цикл продолжает добавлять элементы до тех пор, пока канал не будет исчерпан, а рабочие потоки по очереди удаляют URL-адреса из очереди для их загрузки.
# Download the feed(s) and put the enclosure URLs into # the queue. for url in feed_urls: response feedparser.parse(url, agent'fetch_podcasts.py') for entry in response['entries'][:5]: for enclosure in entry.get('enclosures', []): parsed_url urlparse(enclosure['url']) message('queuing {}'.format( parsed_url.path.rpartition('/')[-1])) enclosure_queue.put(enclosure['url'])
Осталось только подождать, пока очередь снова не опустеет, используя join ()
.
# Now wait for the queue to be empty, indicating that we have # processed all of the downloads. message('*** main thread waiting') enclosure_queue.join() message('*** done')
Результат выполнения примера сценария будет примерно таким.
$ python3 fetch_podcasts.py worker-0: looking for the next enclosure worker-1: looking for the next enclosure MainThread: queuing turbogears-and-the-future-of-python-web-frameworks.mp3 MainThread: queuing continuum-scientific-python-and-the-business-of-open-source.mp3 MainThread: queuing openstack-cloud-computing-built-on-python.mp3 MainThread: queuing pypy.js-pypy-python-in-your-browser.mp3 MainThread: queuing machine-learning-with-python-and-scikit-learn.mp3 MainThread: *** main thread waiting worker-0: downloading turbogears-and-the-future-of-python-web-frameworks.mp3 worker-1: downloading continuum-scientific-python-and-the-business-of-open-source.mp3 worker-0: looking for the next enclosure worker-0: downloading openstack-cloud-computing-built-on-python.mp3 worker-1: looking for the next enclosure worker-1: downloading pypy.js-pypy-python-in-your-browser.mp3 worker-0: looking for the next enclosure worker-0: downloading machine-learning-with-python-and-scikit-learn.mp3 worker-1: looking for the next enclosure worker-0: looking for the next enclosure MainThread: *** done
Фактический результат будет зависеть от содержимого используемого RSS-канала.
Смотрите также
- стандартная библиотечная документация для очереди
- deque – Двусторонняя очередь из коллекций
- Структуры данных очереди – статья в Википедии, объясняющая очереди.
- FIFO – статья в Википедии, объясняющая структуры данных «первым пришел – первым обслужен».
- модуль feedparser – модуль для анализа каналов RSS и Atom, созданный Марком Пилигримом и поддерживаемый Куртом Макки.