Рубрики
Без рубрики

threading – управление параллельными операциями в процессе

Автор оригинала: Doug Hellmann.

Цель:

Управляйте несколькими потоками выполнения.

Использование потоков позволяет программе выполнять несколько операций одновременно в одном и том же пространстве процесса.

Объекты потока

Самый простой способ использовать Thread – создать его экземпляр с целевой функцией и вызвать start () , чтобы он начал работать.

threading_simple.py

import threading


def worker():
    """thread worker function"""
    print('Worker')


threads  []
for i in range(5):
    t  threading.Thread(targetworker)
    threads.append(t)
    t.start()

Результатом является пять строк с «Worker» на каждой.

$ python3 threading_simple.py

Worker
Worker
Worker
Worker
Worker

Полезно иметь возможность создавать поток и передавать ему аргументы, чтобы сообщить ему, что делать. В качестве аргумента потоку можно передать любой тип объекта. В этом примере передается число, которое затем выводится потоком.

threading_simpleargs.py

import threading


def worker(num):
    """thread worker function"""
    print('Worker: %s' % num)


threads  []
for i in range(5):
    t  threading.Thread(targetworker, args(i,))
    threads.append(t)
    t.start()

Целочисленный аргумент теперь включен в сообщение, выводимое каждым потоком.

$ python3 threading_simpleargs.py

Worker: 0
Worker: 1
Worker: 2
Worker: 3
Worker: 4

Определение текущего потока

Использование аргументов для идентификации или имени потока громоздко и не нужно. Каждый экземпляр Thread имеет имя со значением по умолчанию, которое может быть изменено при создании потока. Именование потоков полезно в серверных процессах, когда несколько потоков службы обрабатывают разные операции.

threading_names.py

import threading
import time


def worker():
    print(threading.current_thread().getName(), 'Starting')
    time.sleep(0.2)
    print(threading.current_thread().getName(), 'Exiting')


def my_service():
    print(threading.current_thread().getName(), 'Starting')
    time.sleep(0.3)
    print(threading.current_thread().getName(), 'Exiting')


t  threading.Thread(name'my_service', targetmy_service)
w  threading.Thread(name'worker', targetworker)
w2  threading.Thread(targetworker)  # use default name

w.start()
w2.start()
t.start()

Выходные данные отладки включают имя текущего потока в каждой строке. Строки с «Thread-1» в столбце имени потока соответствуют безымянному потоку w2 .

$ python3 threading_names.py

worker Starting
Thread-1 Starting
my_service Starting
worker Exiting
Thread-1 Exiting
my_service Exiting

Большинство программ не используют print для отладки. Модуль ведения журнала поддерживает встраивание имени потока в каждое сообщение журнала с использованием кода средства форматирования % (threadName) s . Включение имен потоков в сообщения журнала позволяет отследить эти сообщения до их источника.

threading_names_log.py

import logging
import threading
import time


def worker():
    logging.debug('Starting')
    time.sleep(0.2)
    logging.debug('Exiting')


def my_service():
    logging.debug('Starting')
    time.sleep(0.3)
    logging.debug('Exiting')


logging.basicConfig(
    levellogging.DEBUG,
    format'[%(levelname)s] (%(threadName)-10s) %(message)s',
)

t  threading.Thread(name'my_service', targetmy_service)
w  threading.Thread(name'worker', targetworker)
w2  threading.Thread(targetworker)  # use default name

w.start()
w2.start()
t.start()

ведение журнала также является потокобезопасным, поэтому сообщения из разных потоков сохраняются в выходных данных отдельно.

$ python3 threading_names_log.py

[DEBUG] (worker    ) Starting
[DEBUG] (Thread-1  ) Starting
[DEBUG] (my_service) Starting
[DEBUG] (worker    ) Exiting
[DEBUG] (Thread-1  ) Exiting
[DEBUG] (my_service) Exiting

Демон против потоков, не являющихся демонами

До этого момента программы примеров неявно ожидали завершения, пока все потоки не завершат свою работу. Иногда программы порождают поток как демон , который запускается, не блокируя выход из основной программы. Использование потоков демона полезно для служб, где может быть нелегко прервать поток или где, если позволить потоку умереть в середине его работы, не будут потеряны или повреждены данные (например, поток, который генерирует «сердцебиение») для инструмента мониторинга услуг). Чтобы пометить поток как демон, передайте при его создании или вызовите его метод set_daemon () с True . По умолчанию потоки не являются демонами.

threading_daemon.py

import threading
import time
import logging


def daemon():
    logging.debug('Starting')
    time.sleep(0.2)
    logging.debug('Exiting')


def non_daemon():
    logging.debug('Starting')
    logging.debug('Exiting')


logging.basicConfig(
    levellogging.DEBUG,
    format'(%(threadName)-10s) %(message)s',
)

d  threading.Thread(name'daemon', targetdaemon, daemonTrue)

t  threading.Thread(name'non-daemon', targetnon_daemon)

d.start()
t.start()

Вывод не включает сообщение "Exiting" от потока демона, поскольку все потоки, не являющиеся демонами (включая основной поток), завершаются до того, как поток демона пробуждается из сна ( ) вызов.

$ python3 threading_daemon.py

(daemon    ) Starting
(non-daemon) Starting
(non-daemon) Exiting

Чтобы подождать, пока поток демона завершит свою работу, используйте метод join () .

threading_daemon_join.py

import threading
import time
import logging


def daemon():
    logging.debug('Starting')
    time.sleep(0.2)
    logging.debug('Exiting')


def non_daemon():
    logging.debug('Starting')
    logging.debug('Exiting')


logging.basicConfig(
    levellogging.DEBUG,
    format'(%(threadName)-10s) %(message)s',
)

d  threading.Thread(name'daemon', targetdaemon, daemonTrue)

t  threading.Thread(name'non-daemon', targetnon_daemon)

d.start()
t.start()

d.join()
t.join()

Ожидание завершения потока демона с помощью join () означает, что у него есть шанс создать сообщение «Exiting» .

$ python3 threading_daemon_join.py

(daemon    ) Starting
(non-daemon) Starting
(non-daemon) Exiting
(daemon    ) Exiting

По умолчанию join () блокируется на неопределенный срок. Также можно передать значение с плавающей запятой, представляющее количество секунд ожидания, пока поток станет неактивным. Если поток не завершается в течение периода ожидания, join () все равно возвращается.

threading_daemon_join_timeout.py

import threading
import time
import logging


def daemon():
    logging.debug('Starting')
    time.sleep(0.2)
    logging.debug('Exiting')


def non_daemon():
    logging.debug('Starting')
    logging.debug('Exiting')


logging.basicConfig(
    levellogging.DEBUG,
    format'(%(threadName)-10s) %(message)s',
)

d  threading.Thread(name'daemon', targetdaemon, daemonTrue)

t  threading.Thread(name'non-daemon', targetnon_daemon)

d.start()
t.start()

d.join(0.1)
print('d.isAlive()', d.isAlive())
t.join()

Поскольку прошедшее время ожидания меньше, чем время, в течение которого поток демона находится в спящем режиме, поток все еще остается «живым» после возврата join () .

$ python3 threading_daemon_join_timeout.py

(daemon    ) Starting
(non-daemon) Starting
(non-daemon) Exiting
d.isAlive() True

Перечисление всех потоков

Нет необходимости сохранять явный дескриптор для всех потоков демона, чтобы гарантировать их завершение перед выходом из основного процесса. enumerate () возвращает список активных экземпляров Thread . Список включает текущий поток, и, поскольку присоединение к текущему потоку приводит к тупиковой ситуации, его необходимо пропустить.

threading_enumerate.py

import random
import threading
import time
import logging


def worker():
    """thread worker function"""
    pause  random.randint(1, 5) / 10
    logging.debug('sleeping %0.2f', pause)
    time.sleep(pause)
    logging.debug('ending')


logging.basicConfig(
    levellogging.DEBUG,
    format'(%(threadName)-10s) %(message)s',
)

for i in range(3):
    t  threading.Thread(targetworker, daemonTrue)
    t.start()

main_thread  threading.main_thread()
for t in threading.enumerate():
    if t is main_thread:
        continue
    logging.debug('joining %s', t.getName())
    t.join()

Поскольку рабочий спит случайное время, результат работы этой программы может отличаться.

$ python3 threading_enumerate.py

(Thread-1  ) sleeping 0.20
(Thread-2  ) sleeping 0.30
(Thread-3  ) sleeping 0.40
(MainThread) joining Thread-1
(Thread-1  ) ending
(MainThread) joining Thread-3
(Thread-2  ) ending
(Thread-3  ) ending
(MainThread) joining Thread-2

Создание подкласса Thread

При запуске Thread выполняет базовую инициализацию, а затем вызывает свой метод run () , который вызывает целевую функцию, переданную конструктору. Чтобы создать подкласс Thread , переопределите run () , чтобы сделать все необходимое.

threading_subclass.py

import threading
import logging


class MyThread(threading.Thread):

    def run(self):
        logging.debug('running')


logging.basicConfig(
    levellogging.DEBUG,
    format'(%(threadName)-10s) %(message)s',
)

for i in range(5):
    t  MyThread()
    t.start()

Возвращаемое значение run () игнорируется.

$ python3 threading_subclass.py

(Thread-1  ) running
(Thread-2  ) running
(Thread-3  ) running
(Thread-4  ) running
(Thread-5  ) running

Поскольку значения args и kwargs , переданные в конструктор Thread , сохраняются в частных переменных с использованием имен с префиксом '__' , к ним нелегко получить доступ из подкласса. Чтобы передать аргументы пользовательскому типу потока, переопределите конструктор, чтобы сохранить значения в атрибуте экземпляра, который можно увидеть в подклассе.

threading_subclass_args.py

import threading
import logging


class MyThreadWithArgs(threading.Thread):

    def __init__(self, groupNone, targetNone, nameNone,
                 args(), kwargsNone, *, daemonNone):
        super().__init__(groupgroup, targettarget, namename,
                         daemondaemon)
        self.args  args
        self.kwargs  kwargs

    def run(self):
        logging.debug('running with %s and %s',
                      self.args, self.kwargs)


logging.basicConfig(
    levellogging.DEBUG,
    format'(%(threadName)-10s) %(message)s',
)

for i in range(5):
    t  MyThreadWithArgs(args(i,), kwargs{'a': 'A', 'b': 'B'})
    t.start()

MyThreadWithArgs использует тот же API, что и Thread , но другой класс может легко изменить метод конструктора, чтобы он принимал больше или другие аргументы, более непосредственно связанные с целью потока, как и любой другой другой класс.

$ python3 threading_subclass_args.py

(Thread-1  ) running with (0,) and {'b': 'B', 'a': 'A'}
(Thread-2  ) running with (1,) and {'b': 'B', 'a': 'A'}
(Thread-3  ) running with (2,) and {'b': 'B', 'a': 'A'}
(Thread-4  ) running with (3,) and {'b': 'B', 'a': 'A'}
(Thread-5  ) running with (4,) and {'b': 'B', 'a': 'A'}

Потоки таймера

Один из примеров причины для создания подкласса Thread предоставляется Timer , также включенным в threading . Таймер начинает свою работу после задержки и может быть отменен в любой момент в течение этого периода времени задержки.

threading_timer.py

import threading
import time
import logging


def delayed():
    logging.debug('worker running')


logging.basicConfig(
    levellogging.DEBUG,
    format'(%(threadName)-10s) %(message)s',
)

t1  threading.Timer(0.3, delayed)
t1.setName('t1')
t2  threading.Timer(0.3, delayed)
t2.setName('t2')

logging.debug('starting timers')
t1.start()
t2.start()

logging.debug('waiting before canceling %s', t2.getName())
time.sleep(0.2)
logging.debug('canceling %s', t2.getName())
t2.cancel()
logging.debug('done')

Второй таймер в этом примере никогда не запускается, а первый таймер запускается после завершения остальной части основной программы. Поскольку это не поток демона, он присоединяется неявно, когда основной поток завершается.

$ python3 threading_timer.py

(MainThread) starting timers
(MainThread) waiting before canceling t2
(MainThread) canceling t2
(MainThread) done
(t1        ) worker running

Передача сигналов между потоками

Хотя суть использования нескольких потоков заключается в одновременном выполнении отдельных операций, бывают случаи, когда важно иметь возможность синхронизировать операции в двух или более потоках. Объекты событий – это простой способ безопасного обмена данными между потоками. Событие управляет внутренним флагом, которым вызывающие абоненты могут управлять с помощью методов set () и clear () . Другие потоки могут использовать wait () для приостановки, пока не будет установлен флаг, эффективно блокируя выполнение, пока не будет разрешено продолжить.

threading_event.py

import logging
import threading
import time


def wait_for_event(e):
    """Wait for the event to be set before doing anything"""
    logging.debug('wait_for_event starting')
    event_is_set  e.wait()
    logging.debug('event set: %s', event_is_set)


def wait_for_event_timeout(e, t):
    """Wait t seconds and then timeout"""
    while not e.is_set():
        logging.debug('wait_for_event_timeout starting')
        event_is_set  e.wait(t)
        logging.debug('event set: %s', event_is_set)
        if event_is_set:
            logging.debug('processing event')
        else:
            logging.debug('doing other work')


logging.basicConfig(
    levellogging.DEBUG,
    format'(%(threadName)-10s) %(message)s',
)

e  threading.Event()
t1  threading.Thread(
    name'block',
    targetwait_for_event,
    args(e,),
)
t1.start()

t2  threading.Thread(
    name'nonblock',
    targetwait_for_event_timeout,
    args(e, 2),
)
t2.start()

logging.debug('Waiting before calling Event.set()')
time.sleep(0.3)
e.set()
logging.debug('Event is set')

Метод wait () принимает аргумент, представляющий количество секунд ожидания события до истечения времени ожидания. Он возвращает логическое значение, указывающее, установлено событие или нет, поэтому вызывающая сторона знает, почему возвращается wait () . Метод is_set () можно использовать отдельно для события, не опасаясь блокировки.

В этом примере wait_for_event_timeout () проверяет состояние события без блокировки на неопределенный срок. wait_for_event () блокирует вызов wait () , который не возвращается, пока состояние события не изменится.

$ python3 threading_event.py

(block     ) wait_for_event starting
(nonblock  ) wait_for_event_timeout starting
(MainThread) Waiting before calling Event.set()
(MainThread) Event is set
(nonblock  ) event set: True
(nonblock  ) processing event
(block     ) event set: True

Контроль доступа к ресурсам

Помимо синхронизации операций потоков, также важно иметь возможность контролировать доступ к общим ресурсам для предотвращения повреждения или пропуска данных. Встроенные в Python структуры данных (списки, словари и т. Д.) Являются поточно-ориентированными, поскольку являются побочным эффектом наличия атомарных байтовых кодов для управления ими (глобальная блокировка интерпретатора, используемая для защиты внутренних структур данных Python, не освобождается посередине. обновления). Другие структуры данных, реализованные в Python, или более простые типы, такие как целые числа и числа с плавающей запятой, не имеют такой защиты. Для защиты от одновременного доступа к объекту используйте объект Lock .

threading_lock.py

import logging
import random
import threading
import time


class Counter:

    def __init__(self, start0):
        self.lock  threading.Lock()
        self.value  start

    def increment(self):
        logging.debug('Waiting for lock')
        self.lock.acquire()
        try:
            logging.debug('Acquired lock')
            self.value  self.value + 1
        finally:
            self.lock.release()


def worker(c):
    for i in range(2):
        pause  random.random()
        logging.debug('Sleeping %0.02f', pause)
        time.sleep(pause)
        c.increment()
    logging.debug('Done')


logging.basicConfig(
    levellogging.DEBUG,
    format'(%(threadName)-10s) %(message)s',
)

counter  Counter()
for i in range(2):
    t  threading.Thread(targetworker, args(counter,))
    t.start()

logging.debug('Waiting for worker threads')
main_thread  threading.main_thread()
for t in threading.enumerate():
    if t is not main_thread:
        t.join()
logging.debug('Counter: %d', counter.value)

В этом примере функция worker () увеличивает экземпляр Counter , который управляет Lock , чтобы два потока не могли изменить свое внутреннее состояние в то же время. Если Lock не использовался, есть вероятность пропустить изменение значения атрибута.

$ python3 threading_lock.py

(Thread-1  ) Sleeping 0.18
(Thread-2  ) Sleeping 0.93
(MainThread) Waiting for worker threads
(Thread-1  ) Waiting for lock
(Thread-1  ) Acquired lock
(Thread-1  ) Sleeping 0.11
(Thread-1  ) Waiting for lock
(Thread-1  ) Acquired lock
(Thread-1  ) Done
(Thread-2  ) Waiting for lock
(Thread-2  ) Acquired lock
(Thread-2  ) Sleeping 0.81
(Thread-2  ) Waiting for lock
(Thread-2  ) Acquired lock
(Thread-2  ) Done
(MainThread) Counter: 4

Чтобы узнать, получил ли другой поток блокировку, не задерживая текущий поток, передайте False для аргумента blocking в Acquire () . В следующем примере worker () пытается получить блокировку три разных раза и подсчитывает, сколько попыток он должен сделать для этого. Между тем, lock_holder () циклически переключает между удержанием и снятием блокировки с короткими паузами в каждом состоянии, используемом для имитации нагрузки.

threading_lock_noblock.py

import logging
import threading
import time


def lock_holder(lock):
    logging.debug('Starting')
    while True:
        lock.acquire()
        try:
            logging.debug('Holding')
            time.sleep(0.5)
        finally:
            logging.debug('Not holding')
            lock.release()
        time.sleep(0.5)


def worker(lock):
    logging.debug('Starting')
    num_tries  0
    num_acquires  0
    while num_acquires < 3:
        time.sleep(0.5)
        logging.debug('Trying to acquire')
        have_it  lock.acquire(0)
        try:
            num_tries  1
            if have_it:
                logging.debug('Iteration %d: Acquired',
                              num_tries)
                num_acquires  1
            else:
                logging.debug('Iteration %d: Not acquired',
                              num_tries)
        finally:
            if have_it:
                lock.release()
    logging.debug('Done after %d iterations', num_tries)


logging.basicConfig(
    levellogging.DEBUG,
    format'(%(threadName)-10s) %(message)s',
)

lock  threading.Lock()

holder  threading.Thread(
    targetlock_holder,
    args(lock,),
    name'LockHolder',
    daemonTrue,
)
holder.start()

worker  threading.Thread(
    targetworker,
    args(lock,),
    name'Worker',
)
worker.start()

worker () требует более трех итераций, чтобы трижды установить блокировку.

$ python3 threading_lock_noblock.py

(LockHolder) Starting
(LockHolder) Holding
(Worker    ) Starting
(LockHolder) Not holding
(Worker    ) Trying to acquire
(Worker    ) Iteration 1: Acquired
(LockHolder) Holding
(Worker    ) Trying to acquire
(Worker    ) Iteration 2: Not acquired
(LockHolder) Not holding
(Worker    ) Trying to acquire
(Worker    ) Iteration 3: Acquired
(LockHolder) Holding
(Worker    ) Trying to acquire
(Worker    ) Iteration 4: Not acquired
(LockHolder) Not holding
(Worker    ) Trying to acquire
(Worker    ) Iteration 5: Acquired
(Worker    ) Done after 5 iterations

Повторные замки

Обычные объекты Lock не могут быть получены более одного раза даже одним потоком. Это может вызвать нежелательные побочные эффекты, если к блокировке обращается более одной функции в одной цепочке вызовов.

threading_lock_reacquire.py

import threading

lock  threading.Lock()

print('First try :', lock.acquire())
print('Second try:', lock.acquire(0))

В этом случае для второго вызова Acquire () дается нулевой тайм-аут, чтобы предотвратить его блокировку, потому что блокировка была получена первым вызовом.

$ python3 threading_lock_reacquire.py

First try : True
Second try: False

В ситуации, когда отдельный код из того же потока должен «повторно получить» блокировку, используйте вместо этого RLock .

threading_rlock.py

import threading

lock  threading.RLock()

print('First try :', lock.acquire())
print('Second try:', lock.acquire(0))

Единственное изменение в коде из предыдущего примера – замена RLock на Lock .

$ python3 threading_rlock.py

First try : True
Second try: True

Блокировки как менеджеры контекста

Блокировки реализуют API диспетчера контекста и совместимы с оператором with . Использование with устраняет необходимость явно устанавливать и снимать блокировку.

threading_lock_with.py

import threading
import logging


def worker_with(lock):
    with lock:
        logging.debug('Lock acquired via with')


def worker_no_with(lock):
    lock.acquire()
    try:
        logging.debug('Lock acquired directly')
    finally:
        lock.release()


logging.basicConfig(
    levellogging.DEBUG,
    format'(%(threadName)-10s) %(message)s',
)

lock  threading.Lock()
w  threading.Thread(targetworker_with, args(lock,))
nw  threading.Thread(targetworker_no_with, args(lock,))

w.start()
nw.start()

Две функции worker_with () и worker_no_with () управляют блокировкой одинаковым образом.

$ python3 threading_lock_with.py

(Thread-1  ) Lock acquired via with
(Thread-2  ) Lock acquired directly

Синхронизация потоков

Помимо использования Events , еще один способ синхронизации потоков – использование объекта Condition . Поскольку Condition использует Lock , он может быть привязан к общему ресурсу, что позволяет нескольким потокам ждать обновления ресурса. В этом примере потоки consumer () ожидают установки Condition , прежде чем продолжить. Поток Producer () отвечает за установку условия и уведомление других потоков о том, что они могут продолжить.

threading_condition.py

import logging
import threading
import time


def consumer(cond):
    """wait for the condition and use the resource"""
    logging.debug('Starting consumer thread')
    with cond:
        cond.wait()
        logging.debug('Resource is available to consumer')


def producer(cond):
    """set up the resource to be used by the consumer"""
    logging.debug('Starting producer thread')
    with cond:
        logging.debug('Making resource available')
        cond.notifyAll()


logging.basicConfig(
    levellogging.DEBUG,
    format'%(asctime)s (%(threadName)-2s) %(message)s',
)

condition  threading.Condition()
c1  threading.Thread(name'c1', targetconsumer,
                      args(condition,))
c2  threading.Thread(name'c2', targetconsumer,
                      args(condition,))
p  threading.Thread(name'p', targetproducer,
                     args(condition,))

c1.start()
time.sleep(0.2)
c2.start()
time.sleep(0.2)
p.start()

Потоки используют with для получения блокировки, связанной с Condition . Также работает явное использование методов collect () и release () .

$ python3 threading_condition.py

2016-07-10 10:45:28,170 (c1) Starting consumer thread
2016-07-10 10:45:28,376 (c2) Starting consumer thread
2016-07-10 10:45:28,581 (p ) Starting producer thread
2016-07-10 10:45:28,581 (p ) Making resource available
2016-07-10 10:45:28,582 (c1) Resource is available to consumer
2016-07-10 10:45:28,582 (c2) Resource is available to consumer

Барьеры – еще один механизм синхронизации потоков. Барьер устанавливает контрольную точку, и все участвующие потоки блокируются, пока все участвующие «стороны» не достигнут этой точки. Он позволяет потокам запускаться по отдельности, а затем приостанавливать работу, пока все они не будут готовы к продолжению.

threading_barrier.py

import threading
import time


def worker(barrier):
    print(threading.current_thread().name,
          'waiting for barrier with {} others'.format(
              barrier.n_waiting))
    worker_id  barrier.wait()
    print(threading.current_thread().name, 'after barrier',
          worker_id)


NUM_THREADS  3

barrier  threading.Barrier(NUM_THREADS)

threads  [
    threading.Thread(
        name'worker-%s' % i,
        targetworker,
        args(barrier,),
    )
    for i in range(NUM_THREADS)
]

for t in threads:
    print(t.name, 'starting')
    t.start()
    time.sleep(0.1)

for t in threads:
    t.join()

В этом примере Barrier настроен на блокировку, пока три потока не ожидают. Когда условие выполняется, все потоки освобождаются за контрольную точку одновременно. Возвращаемое значение wait () указывает номер освобождаемой стороны и может использоваться для ограничения некоторых потоков от выполнения таких действий, как очистка общего ресурса.

$ python3 threading_barrier.py

worker-0 starting
worker-0 waiting for barrier with 0 others
worker-1 starting
worker-1 waiting for barrier with 1 others
worker-2 starting
worker-2 waiting for barrier with 2 others
worker-2 after barrier 2
worker-0 after barrier 0
worker-1 after barrier 1

Метод abort () для Barrier заставляет все ожидающие потоки получать BrokenBarrierError . Это позволяет потокам очищаться, если обработка остановлена, пока они заблокированы wait () .

threading_barrier_abort.py

import threading
import time


def worker(barrier):
    print(threading.current_thread().name,
          'waiting for barrier with {} others'.format(
              barrier.n_waiting))
    try:
        worker_id  barrier.wait()
    except threading.BrokenBarrierError:
        print(threading.current_thread().name, 'aborting')
    else:
        print(threading.current_thread().name, 'after barrier',
              worker_id)


NUM_THREADS  3

barrier  threading.Barrier(NUM_THREADS + 1)

threads  [
    threading.Thread(
        name'worker-%s' % i,
        targetworker,
        args(barrier,),
    )
    for i in range(NUM_THREADS)
]

for t in threads:
    print(t.name, 'starting')
    t.start()
    time.sleep(0.1)

barrier.abort()

for t in threads:
    t.join()

В этом примере Barrier настраивается для ожидания на один участвующий поток больше, чем фактически запущено, так что обработка во всех потоках блокируется. Вызов abort () вызывает исключение в каждом заблокированном потоке.

$ python3 threading_barrier_abort.py

worker-0 starting
worker-0 waiting for barrier with 0 others
worker-1 starting
worker-1 waiting for barrier with 1 others
worker-2 starting
worker-2 waiting for barrier with 2 others
worker-0 aborting
worker-2 aborting
worker-1 aborting

Ограничение одновременного доступа к ресурсам

Иногда бывает полезно разрешить доступ к ресурсу более чем одному исполнителю одновременно, при этом ограничивая общее количество. Например, пул подключений может поддерживать фиксированное количество одновременных подключений, или сетевое приложение может поддерживать фиксированное количество одновременных загрузок. Семафор – это один из способов управления этими подключениями.

threading_semaphore.py

import logging
import random
import threading
import time


class ActivePool:

    def __init__(self):
        super(ActivePool, self).__init__()
        self.active  []
        self.lock  threading.Lock()

    def makeActive(self, name):
        with self.lock:
            self.active.append(name)
            logging.debug('Running: %s', self.active)

    def makeInactive(self, name):
        with self.lock:
            self.active.remove(name)
            logging.debug('Running: %s', self.active)


def worker(s, pool):
    logging.debug('Waiting to join the pool')
    with s:
        name  threading.current_thread().getName()
        pool.makeActive(name)
        time.sleep(0.1)
        pool.makeInactive(name)


logging.basicConfig(
    levellogging.DEBUG,
    format'%(asctime)s (%(threadName)-2s) %(message)s',
)

pool  ActivePool()
s  threading.Semaphore(2)
for i in range(4):
    t  threading.Thread(
        targetworker,
        namestr(i),
        args(s, pool),
    )
    t.start()

В этом примере класс ActivePool просто служит удобным способом отслеживать, какие потоки могут выполняться в данный момент. Реальный пул ресурсов выделит соединение или какое-либо другое значение новому активному потоку и восстановит значение, когда поток будет завершен. Здесь он просто используется для хранения имен активных потоков, чтобы показать, что не более двух выполняются одновременно.

$ python3 threading_semaphore.py

2016-07-10 10:45:29,398 (0 ) Waiting to join the pool
2016-07-10 10:45:29,398 (0 ) Running: ['0']
2016-07-10 10:45:29,399 (1 ) Waiting to join the pool
2016-07-10 10:45:29,399 (1 ) Running: ['0', '1']
2016-07-10 10:45:29,399 (2 ) Waiting to join the pool
2016-07-10 10:45:29,399 (3 ) Waiting to join the pool
2016-07-10 10:45:29,501 (1 ) Running: ['0']
2016-07-10 10:45:29,501 (0 ) Running: []
2016-07-10 10:45:29,502 (3 ) Running: ['3']
2016-07-10 10:45:29,502 (2 ) Running: ['3', '2']
2016-07-10 10:45:29,607 (3 ) Running: ['2']
2016-07-10 10:45:29,608 (2 ) Running: []

Данные, относящиеся к потоку

В то время как некоторые ресурсы необходимо заблокировать, чтобы их могли использовать несколько потоков, другие необходимо защитить, чтобы они были скрыты от потоков, которым они не принадлежат. Класс local () создает объект, способный скрывать значения от представления в отдельных потоках.

threading_local.py

import random
import threading
import logging


def show_value(data):
    try:
        val  data.value
    except AttributeError:
        logging.debug('No value yet')
    else:
        logging.debug(%s', val)


def worker(data):
    show_value(data)
    data.value  random.randint(1, 100)
    show_value(data)


logging.basicConfig(
    levellogging.DEBUG,
    format'(%(threadName)-10s) %(message)s',
)

local_data  threading.local()
show_value(local_data)
local_data.value  1000
show_value(local_data)

for i in range(2):
    t  threading.Thread(targetworker, args(local_data,))
    t.start()

Атрибут local_data.value не присутствует ни в одном потоке, пока он не будет установлен в этом потоке.

$ python3 threading_local.py

(MainThread) No value yet
(MainThread)
(Thread-1  ) No value yet
(Thread-1  )
(Thread-2  ) No value yet
(Thread-2  )

Чтобы инициализировать настройки, чтобы все потоки начинались с одного и того же значения, используйте подкласс и установите атрибуты в __init __ () .

threading_local_defaults.py

import random
import threading
import logging


def show_value(data):
    try:
        val  data.value
    except AttributeError:
        logging.debug('No value yet')
    else:
        logging.debug(%s', val)


def worker(data):
    show_value(data)
    data.value  random.randint(1, 100)
    show_value(data)


class MyLocal(threading.local):

    def __init__(self, value):
        super().__init__()
        logging.debug('Initializing %r', self)
        self.value  value


logging.basicConfig(
    levellogging.DEBUG,
    format'(%(threadName)-10s) %(message)s',
)

local_data  MyLocal(1000)
show_value(local_data)

for i in range(2):
    t  threading.Thread(targetworker, args(local_data,))
    t.start()

__init __ () вызывается для того же объекта (обратите внимание на значение id () ) один раз в каждом потоке для установки значений по умолчанию.

$ python3 threading_local_defaults.py

(MainThread) Initializing <__main__.MyLocal object at
0x101c6c288>
(MainThread)
(Thread-1  ) Initializing <__main__.MyLocal object at
0x101c6c288>
(Thread-1  )
(Thread-1  )
(Thread-2  ) Initializing <__main__.MyLocal object at
0x101c6c288>
(Thread-2  )
(Thread-2  )

Смотрите также