Рубрики
Без рубрики

queue – Поточно-безопасная реализация FIFO

Автор оригинала: Doug Hellmann.

Цель:

Обеспечивает поточно-ориентированную реализацию FIFO

Модуль queue предоставляет структуру данных «первым пришел – первым обслужен» (FIFO), подходящую для многопоточного программирования. Его можно использовать для безопасной передачи сообщений или других данных между потоками-производителями и потребителями. Блокировка выполняется для вызывающей стороны, поэтому многие потоки могут безопасно и легко работать с одним и тем же экземпляром Queue . Размер Queue (количество содержащихся в ней элементов) может быть ограничен для ограничения использования памяти или обработки.

Примечание

Это обсуждение предполагает, что вы уже понимаете общую природу очереди. Если вы этого не сделаете, вы можете прочитать некоторые ссылки, прежде чем продолжить.

Базовая очередь FIFO

Класс Queue реализует базовый контейнер “первым пришел – первым ушел”. Элементы добавляются к одному «концу» последовательности с помощью put () и удаляются с другого конца с помощью get () .

queue_fifo.py

import queue

q  queue.Queue()

for i in range(5):
    q.put(i)

while not q.empty():
    print(q.get(), end' ')
print()

В этом примере используется один поток, чтобы показать, что элементы удаляются из очереди в том же порядке, в котором они вставляются.

$ python3 queue_fifo.py

0 1 2 3 4

Очередь LIFO

В отличие от стандартной реализации FIFO Queue , LifoQueue использует порядок «последний пришел – первый ушел» (обычно связанный со структурой данных стека).

queue_lifo.py

import queue

q  queue.LifoQueue()

for i in range(5):
    q.put(i)

while not q.empty():
    print(q.get(), end' ')
print()

Элемент, который был помещен в очередь последним, удаляется с помощью get .

$ python3 queue_lifo.py

4 3 2 1 0

Приоритетная очередь

Иногда порядок обработки элементов в очереди должен основываться на характеристиках этих элементов, а не только на порядке их создания или добавления в очередь. Например, задания на печать из отдела заработной платы могут иметь приоритет над списком кода, который разработчик хочет распечатать. PriorityQueue использует порядок сортировки содержимого очереди, чтобы решить, какой элемент извлечь.

queue_priority.py

import functools
import queue
import threading


@functools.total_ordering
class Job:

    def __init__(self, priority, description):
        self.priority  priority
        self.description  description
        print('New job:', description)
        return

    def __eq__(self, other):
        try:
            return self.priority  other.priority
        except AttributeError:
            return NotImplemented

    def __lt__(self, other):
        try:
            return self.priority < other.priority
        except AttributeError:
            return NotImplemented


q  queue.PriorityQueue()

q.put(Job(3, 'Mid-level job'))
q.put(Job(10, 'Low-level job'))
q.put(Job(1, 'Important job'))


def process_job(q):
    while True:
        next_job  q.get()
        print('Processing job:', next_job.description)
        q.task_done()


workers  [
    threading.Thread(targetprocess_job, args(q,)),
    threading.Thread(targetprocess_job, args(q,)),
]
for w in workers:
    w.setDaemon(True)
    w.start()

q.join()

В этом примере есть несколько потоков, использующих задания, которые обрабатываются в зависимости от приоритета элементов в очереди на момент вызова get () . Порядок обработки элементов, добавленных в очередь, во время работы потребительских потоков зависит от переключения контекста потока.

$ python3 queue_priority.py

New job: Mid-level job
New job: Low-level job
New job: Important job
Processing job: Important job
Processing job: Mid-level job
Processing job: Low-level job

Создание клиента для потокового подкаста

Исходный код для клиента подкастинга в этом разделе демонстрирует, как использовать класс Queue с несколькими потоками. Программа считывает один или несколько RSS-каналов, ставит в очередь приложения для пяти последних выпусков из каждого канала для загрузки и обрабатывает несколько загрузок параллельно с использованием потоков. У него недостаточно обработки ошибок для производственного использования, но его скелетная реализация иллюстрирует использование модуля queue .

Сначала устанавливаются некоторые рабочие параметры. Обычно они поступают из пользовательского ввода (например, предпочтений или базы данных). В примере используются жестко запрограммированные значения количества потоков и списка URL-адресов для выборки.

fetch_podcasts.py

from queue import Queue
import threading
import time
import urllib
from urllib.parse import urlparse

import feedparser

# Set up some global variables
num_fetch_threads  2
enclosure_queue  Queue()

# A real app wouldn't use hard-coded data...
feed_urls  [
    'http://talkpython.fm/episodes/rss',
]


def message(s):
    print('{}: {}'.format(threading.current_thread().name, s))

Функция download_enclosures () запускается в рабочем потоке и обрабатывает загрузки с помощью urllib .

def download_enclosures(q):
    """This is the worker thread function.
    It processes items in the queue one after
    another.  These daemon threads go into an
    infinite loop, and exit only when
    the main thread ends.
    """
    while True:
        message('looking for the next enclosure')
        url  q.get()
        filename  url.rpartition('/')[-1]
        message('downloading {}'.format(filename))
        response  urllib.request.urlopen(url)
        data  response.read()
        # Save the downloaded file to the current directory
        message('writing to {}'.format(filename))
        with open(filename, 'wb') as outfile:
            outfile.write(data)
        q.task_done()

Как только целевая функция для потоков определена, можно запускать рабочие потоки. Когда download_enclosures () обрабатывает оператор url q.get () , он блокируется и ожидает, пока в очереди не появится что-нибудь для возврата. Это означает, что можно безопасно запускать потоки до того, как в очереди будет что-либо.

# Set up some threads to fetch the enclosures
for i in range(num_fetch_threads):
    worker  threading.Thread(
        targetdownload_enclosures,
        args(enclosure_queue,),
        name'worker-{}'.format(i),
    )
    worker.setDaemon(True)
    worker.start()

Следующим шагом является получение содержимого канала с помощью модуля feedparser и постановка URL-адресов вложений в очередь. Как только первый URL-адрес добавляется в очередь, один из рабочих потоков берет его и начинает загрузку. Цикл продолжает добавлять элементы до тех пор, пока канал не будет исчерпан, а рабочие потоки по очереди удаляют URL-адреса из очереди для их загрузки.

# Download the feed(s) and put the enclosure URLs into
# the queue.
for url in feed_urls:
    response  feedparser.parse(url, agent'fetch_podcasts.py')
    for entry in response['entries'][:5]:
        for enclosure in entry.get('enclosures', []):
            parsed_url  urlparse(enclosure['url'])
            message('queuing {}'.format(
                parsed_url.path.rpartition('/')[-1]))
            enclosure_queue.put(enclosure['url'])

Осталось только подождать, пока очередь снова не опустеет, используя join () .

# Now wait for the queue to be empty, indicating that we have
# processed all of the downloads.
message('*** main thread waiting')
enclosure_queue.join()
message('*** done')

Результат выполнения примера сценария будет примерно таким.

$ python3 fetch_podcasts.py

worker-0: looking for the next enclosure
worker-1: looking for the next enclosure
MainThread: queuing turbogears-and-the-future-of-python-web-frameworks.mp3
MainThread: queuing continuum-scientific-python-and-the-business-of-open-source.mp3
MainThread: queuing openstack-cloud-computing-built-on-python.mp3
MainThread: queuing pypy.js-pypy-python-in-your-browser.mp3
MainThread: queuing machine-learning-with-python-and-scikit-learn.mp3
MainThread: *** main thread waiting
worker-0: downloading turbogears-and-the-future-of-python-web-frameworks.mp3
worker-1: downloading continuum-scientific-python-and-the-business-of-open-source.mp3
worker-0: looking for the next enclosure
worker-0: downloading openstack-cloud-computing-built-on-python.mp3
worker-1: looking for the next enclosure
worker-1: downloading pypy.js-pypy-python-in-your-browser.mp3
worker-0: looking for the next enclosure
worker-0: downloading machine-learning-with-python-and-scikit-learn.mp3
worker-1: looking for the next enclosure
worker-0: looking for the next enclosure
MainThread: *** done

Фактический результат будет зависеть от содержимого используемого RSS-канала.

Смотрите также