Автор оригинала: Doug Hellmann.
Цель:
Управляйте несколькими потоками выполнения.
Использование потоков позволяет программе выполнять несколько операций одновременно в одном и том же пространстве процесса.
Объекты потока
Самый простой способ использовать Thread
– создать его экземпляр с целевой функцией и вызвать start ()
, чтобы он начал работать.
threading_simple.py
import threading def worker(): """thread worker function""" print('Worker') threads [] for i in range(5): t threading.Thread(targetworker) threads.append(t) t.start()
Результатом является пять строк с «Worker»
на каждой.
$ python3 threading_simple.py Worker Worker Worker Worker Worker
Полезно иметь возможность создавать поток и передавать ему аргументы, чтобы сообщить ему, что делать. В качестве аргумента потоку можно передать любой тип объекта. В этом примере передается число, которое затем выводится потоком.
threading_simpleargs.py
import threading def worker(num): """thread worker function""" print('Worker: %s' % num) threads [] for i in range(5): t threading.Thread(targetworker, args(i,)) threads.append(t) t.start()
Целочисленный аргумент теперь включен в сообщение, выводимое каждым потоком.
$ python3 threading_simpleargs.py Worker: 0 Worker: 1 Worker: 2 Worker: 3 Worker: 4
Определение текущего потока
Использование аргументов для идентификации или имени потока громоздко и не нужно. Каждый экземпляр Thread
имеет имя со значением по умолчанию, которое может быть изменено при создании потока. Именование потоков полезно в серверных процессах, когда несколько потоков службы обрабатывают разные операции.
threading_names.py
import threading import time def worker(): print(threading.current_thread().getName(), 'Starting') time.sleep(0.2) print(threading.current_thread().getName(), 'Exiting') def my_service(): print(threading.current_thread().getName(), 'Starting') time.sleep(0.3) print(threading.current_thread().getName(), 'Exiting') t threading.Thread(name'my_service', targetmy_service) w threading.Thread(name'worker', targetworker) w2 threading.Thread(targetworker) # use default name w.start() w2.start() t.start()
Выходные данные отладки включают имя текущего потока в каждой строке. Строки с «Thread-1»
в столбце имени потока соответствуют безымянному потоку w2
.
$ python3 threading_names.py worker Starting Thread-1 Starting my_service Starting worker Exiting Thread-1 Exiting my_service Exiting
Большинство программ не используют print
для отладки. Модуль ведения журнала поддерживает встраивание имени потока в каждое сообщение журнала с использованием кода средства форматирования % (threadName) s
. Включение имен потоков в сообщения журнала позволяет отследить эти сообщения до их источника.
threading_names_log.py
import logging import threading import time def worker(): logging.debug('Starting') time.sleep(0.2) logging.debug('Exiting') def my_service(): logging.debug('Starting') time.sleep(0.3) logging.debug('Exiting') logging.basicConfig( levellogging.DEBUG, format'[%(levelname)s] (%(threadName)-10s) %(message)s', ) t threading.Thread(name'my_service', targetmy_service) w threading.Thread(name'worker', targetworker) w2 threading.Thread(targetworker) # use default name w.start() w2.start() t.start()
ведение журнала также является потокобезопасным, поэтому сообщения из разных потоков сохраняются в выходных данных отдельно.
$ python3 threading_names_log.py [DEBUG] (worker ) Starting [DEBUG] (Thread-1 ) Starting [DEBUG] (my_service) Starting [DEBUG] (worker ) Exiting [DEBUG] (Thread-1 ) Exiting [DEBUG] (my_service) Exiting
Демон против потоков, не являющихся демонами
До этого момента программы примеров неявно ожидали завершения, пока все потоки не завершат свою работу. Иногда программы порождают поток как демон , который запускается, не блокируя выход из основной программы. Использование потоков демона полезно для служб, где может быть нелегко прервать поток или где, если позволить потоку умереть в середине его работы, не будут потеряны или повреждены данные (например, поток, который генерирует «сердцебиение») для инструмента мониторинга услуг). Чтобы пометить поток как демон, передайте
при его создании или вызовите его метод set_daemon ()
с True
. По умолчанию потоки не являются демонами.
threading_daemon.py
import threading import time import logging def daemon(): logging.debug('Starting') time.sleep(0.2) logging.debug('Exiting') def non_daemon(): logging.debug('Starting') logging.debug('Exiting') logging.basicConfig( levellogging.DEBUG, format'(%(threadName)-10s) %(message)s', ) d threading.Thread(name'daemon', targetdaemon, daemonTrue) t threading.Thread(name'non-daemon', targetnon_daemon) d.start() t.start()
Вывод не включает сообщение "Exiting"
от потока демона, поскольку все потоки, не являющиеся демонами (включая основной поток), завершаются до того, как поток демона пробуждается из сна ( )
вызов.
$ python3 threading_daemon.py (daemon ) Starting (non-daemon) Starting (non-daemon) Exiting
Чтобы подождать, пока поток демона завершит свою работу, используйте метод join ()
.
threading_daemon_join.py
import threading import time import logging def daemon(): logging.debug('Starting') time.sleep(0.2) logging.debug('Exiting') def non_daemon(): logging.debug('Starting') logging.debug('Exiting') logging.basicConfig( levellogging.DEBUG, format'(%(threadName)-10s) %(message)s', ) d threading.Thread(name'daemon', targetdaemon, daemonTrue) t threading.Thread(name'non-daemon', targetnon_daemon) d.start() t.start() d.join() t.join()
Ожидание завершения потока демона с помощью join ()
означает, что у него есть шанс создать сообщение «Exiting»
.
$ python3 threading_daemon_join.py (daemon ) Starting (non-daemon) Starting (non-daemon) Exiting (daemon ) Exiting
По умолчанию join ()
блокируется на неопределенный срок. Также можно передать значение с плавающей запятой, представляющее количество секунд ожидания, пока поток станет неактивным. Если поток не завершается в течение периода ожидания, join ()
все равно возвращается.
threading_daemon_join_timeout.py
import threading import time import logging def daemon(): logging.debug('Starting') time.sleep(0.2) logging.debug('Exiting') def non_daemon(): logging.debug('Starting') logging.debug('Exiting') logging.basicConfig( levellogging.DEBUG, format'(%(threadName)-10s) %(message)s', ) d threading.Thread(name'daemon', targetdaemon, daemonTrue) t threading.Thread(name'non-daemon', targetnon_daemon) d.start() t.start() d.join(0.1) print('d.isAlive()', d.isAlive()) t.join()
Поскольку прошедшее время ожидания меньше, чем время, в течение которого поток демона находится в спящем режиме, поток все еще остается «живым» после возврата join ()
.
$ python3 threading_daemon_join_timeout.py (daemon ) Starting (non-daemon) Starting (non-daemon) Exiting d.isAlive() True
Перечисление всех потоков
Нет необходимости сохранять явный дескриптор для всех потоков демона, чтобы гарантировать их завершение перед выходом из основного процесса. enumerate ()
возвращает список активных экземпляров Thread
. Список включает текущий поток, и, поскольку присоединение к текущему потоку приводит к тупиковой ситуации, его необходимо пропустить.
threading_enumerate.py
import random import threading import time import logging def worker(): """thread worker function""" pause random.randint(1, 5) / 10 logging.debug('sleeping %0.2f', pause) time.sleep(pause) logging.debug('ending') logging.basicConfig( levellogging.DEBUG, format'(%(threadName)-10s) %(message)s', ) for i in range(3): t threading.Thread(targetworker, daemonTrue) t.start() main_thread threading.main_thread() for t in threading.enumerate(): if t is main_thread: continue logging.debug('joining %s', t.getName()) t.join()
Поскольку рабочий спит случайное время, результат работы этой программы может отличаться.
$ python3 threading_enumerate.py (Thread-1 ) sleeping 0.20 (Thread-2 ) sleeping 0.30 (Thread-3 ) sleeping 0.40 (MainThread) joining Thread-1 (Thread-1 ) ending (MainThread) joining Thread-3 (Thread-2 ) ending (Thread-3 ) ending (MainThread) joining Thread-2
Создание подкласса Thread
При запуске Thread
выполняет базовую инициализацию, а затем вызывает свой метод run ()
, который вызывает целевую функцию, переданную конструктору. Чтобы создать подкласс Thread
, переопределите run ()
, чтобы сделать все необходимое.
threading_subclass.py
import threading import logging class MyThread(threading.Thread): def run(self): logging.debug('running') logging.basicConfig( levellogging.DEBUG, format'(%(threadName)-10s) %(message)s', ) for i in range(5): t MyThread() t.start()
Возвращаемое значение run ()
игнорируется.
$ python3 threading_subclass.py (Thread-1 ) running (Thread-2 ) running (Thread-3 ) running (Thread-4 ) running (Thread-5 ) running
Поскольку значения args
и kwargs
, переданные в конструктор Thread
, сохраняются в частных переменных с использованием имен с префиксом '__'
, к ним нелегко получить доступ из подкласса. Чтобы передать аргументы пользовательскому типу потока, переопределите конструктор, чтобы сохранить значения в атрибуте экземпляра, который можно увидеть в подклассе.
threading_subclass_args.py
import threading import logging class MyThreadWithArgs(threading.Thread): def __init__(self, groupNone, targetNone, nameNone, args(), kwargsNone, *, daemonNone): super().__init__(groupgroup, targettarget, namename, daemondaemon) self.args args self.kwargs kwargs def run(self): logging.debug('running with %s and %s', self.args, self.kwargs) logging.basicConfig( levellogging.DEBUG, format'(%(threadName)-10s) %(message)s', ) for i in range(5): t MyThreadWithArgs(args(i,), kwargs{'a': 'A', 'b': 'B'}) t.start()
MyThreadWithArgs
использует тот же API, что и Thread
, но другой класс может легко изменить метод конструктора, чтобы он принимал больше или другие аргументы, более непосредственно связанные с целью потока, как и любой другой другой класс.
$ python3 threading_subclass_args.py (Thread-1 ) running with (0,) and {'b': 'B', 'a': 'A'} (Thread-2 ) running with (1,) and {'b': 'B', 'a': 'A'} (Thread-3 ) running with (2,) and {'b': 'B', 'a': 'A'} (Thread-4 ) running with (3,) and {'b': 'B', 'a': 'A'} (Thread-5 ) running with (4,) and {'b': 'B', 'a': 'A'}
Потоки таймера
Один из примеров причины для создания подкласса Thread
предоставляется Timer
, также включенным в threading
. Таймер
начинает свою работу после задержки и может быть отменен в любой момент в течение этого периода времени задержки.
threading_timer.py
import threading import time import logging def delayed(): logging.debug('worker running') logging.basicConfig( levellogging.DEBUG, format'(%(threadName)-10s) %(message)s', ) t1 threading.Timer(0.3, delayed) t1.setName('t1') t2 threading.Timer(0.3, delayed) t2.setName('t2') logging.debug('starting timers') t1.start() t2.start() logging.debug('waiting before canceling %s', t2.getName()) time.sleep(0.2) logging.debug('canceling %s', t2.getName()) t2.cancel() logging.debug('done')
Второй таймер в этом примере никогда не запускается, а первый таймер запускается после завершения остальной части основной программы. Поскольку это не поток демона, он присоединяется неявно, когда основной поток завершается.
$ python3 threading_timer.py (MainThread) starting timers (MainThread) waiting before canceling t2 (MainThread) canceling t2 (MainThread) done (t1 ) worker running
Передача сигналов между потоками
Хотя суть использования нескольких потоков заключается в одновременном выполнении отдельных операций, бывают случаи, когда важно иметь возможность синхронизировать операции в двух или более потоках. Объекты событий – это простой способ безопасного обмена данными между потоками. Событие
управляет внутренним флагом, которым вызывающие абоненты могут управлять с помощью методов set ()
и clear ()
. Другие потоки могут использовать wait ()
для приостановки, пока не будет установлен флаг, эффективно блокируя выполнение, пока не будет разрешено продолжить.
threading_event.py
import logging import threading import time def wait_for_event(e): """Wait for the event to be set before doing anything""" logging.debug('wait_for_event starting') event_is_set e.wait() logging.debug('event set: %s', event_is_set) def wait_for_event_timeout(e, t): """Wait t seconds and then timeout""" while not e.is_set(): logging.debug('wait_for_event_timeout starting') event_is_set e.wait(t) logging.debug('event set: %s', event_is_set) if event_is_set: logging.debug('processing event') else: logging.debug('doing other work') logging.basicConfig( levellogging.DEBUG, format'(%(threadName)-10s) %(message)s', ) e threading.Event() t1 threading.Thread( name'block', targetwait_for_event, args(e,), ) t1.start() t2 threading.Thread( name'nonblock', targetwait_for_event_timeout, args(e, 2), ) t2.start() logging.debug('Waiting before calling Event.set()') time.sleep(0.3) e.set() logging.debug('Event is set')
Метод wait ()
принимает аргумент, представляющий количество секунд ожидания события до истечения времени ожидания. Он возвращает логическое значение, указывающее, установлено событие или нет, поэтому вызывающая сторона знает, почему возвращается wait ()
. Метод is_set ()
можно использовать отдельно для события, не опасаясь блокировки.
В этом примере wait_for_event_timeout ()
проверяет состояние события без блокировки на неопределенный срок. wait_for_event ()
блокирует вызов wait ()
, который не возвращается, пока состояние события не изменится.
$ python3 threading_event.py (block ) wait_for_event starting (nonblock ) wait_for_event_timeout starting (MainThread) Waiting before calling Event.set() (MainThread) Event is set (nonblock ) event set: True (nonblock ) processing event (block ) event set: True
Контроль доступа к ресурсам
Помимо синхронизации операций потоков, также важно иметь возможность контролировать доступ к общим ресурсам для предотвращения повреждения или пропуска данных. Встроенные в Python структуры данных (списки, словари и т. Д.) Являются поточно-ориентированными, поскольку являются побочным эффектом наличия атомарных байтовых кодов для управления ими (глобальная блокировка интерпретатора, используемая для защиты внутренних структур данных Python, не освобождается посередине. обновления). Другие структуры данных, реализованные в Python, или более простые типы, такие как целые числа и числа с плавающей запятой, не имеют такой защиты. Для защиты от одновременного доступа к объекту используйте объект Lock
.
threading_lock.py
import logging import random import threading import time class Counter: def __init__(self, start0): self.lock threading.Lock() self.value start def increment(self): logging.debug('Waiting for lock') self.lock.acquire() try: logging.debug('Acquired lock') self.value self.value + 1 finally: self.lock.release() def worker(c): for i in range(2): pause random.random() logging.debug('Sleeping %0.02f', pause) time.sleep(pause) c.increment() logging.debug('Done') logging.basicConfig( levellogging.DEBUG, format'(%(threadName)-10s) %(message)s', ) counter Counter() for i in range(2): t threading.Thread(targetworker, args(counter,)) t.start() logging.debug('Waiting for worker threads') main_thread threading.main_thread() for t in threading.enumerate(): if t is not main_thread: t.join() logging.debug('Counter: %d', counter.value)
В этом примере функция worker ()
увеличивает экземпляр Counter
, который управляет Lock
, чтобы два потока не могли изменить свое внутреннее состояние в то же время. Если Lock
не использовался, есть вероятность пропустить изменение значения атрибута.
$ python3 threading_lock.py (Thread-1 ) Sleeping 0.18 (Thread-2 ) Sleeping 0.93 (MainThread) Waiting for worker threads (Thread-1 ) Waiting for lock (Thread-1 ) Acquired lock (Thread-1 ) Sleeping 0.11 (Thread-1 ) Waiting for lock (Thread-1 ) Acquired lock (Thread-1 ) Done (Thread-2 ) Waiting for lock (Thread-2 ) Acquired lock (Thread-2 ) Sleeping 0.81 (Thread-2 ) Waiting for lock (Thread-2 ) Acquired lock (Thread-2 ) Done (MainThread) Counter: 4
Чтобы узнать, получил ли другой поток блокировку, не задерживая текущий поток, передайте False
для аргумента blocking
в Acquire ()
. В следующем примере worker ()
пытается получить блокировку три разных раза и подсчитывает, сколько попыток он должен сделать для этого. Между тем, lock_holder ()
циклически переключает между удержанием и снятием блокировки с короткими паузами в каждом состоянии, используемом для имитации нагрузки.
threading_lock_noblock.py
import logging import threading import time def lock_holder(lock): logging.debug('Starting') while True: lock.acquire() try: logging.debug('Holding') time.sleep(0.5) finally: logging.debug('Not holding') lock.release() time.sleep(0.5) def worker(lock): logging.debug('Starting') num_tries 0 num_acquires 0 while num_acquires < 3: time.sleep(0.5) logging.debug('Trying to acquire') have_it lock.acquire(0) try: num_tries 1 if have_it: logging.debug('Iteration %d: Acquired', num_tries) num_acquires 1 else: logging.debug('Iteration %d: Not acquired', num_tries) finally: if have_it: lock.release() logging.debug('Done after %d iterations', num_tries) logging.basicConfig( levellogging.DEBUG, format'(%(threadName)-10s) %(message)s', ) lock threading.Lock() holder threading.Thread( targetlock_holder, args(lock,), name'LockHolder', daemonTrue, ) holder.start() worker threading.Thread( targetworker, args(lock,), name'Worker', ) worker.start()
worker ()
требует более трех итераций, чтобы трижды установить блокировку.
$ python3 threading_lock_noblock.py (LockHolder) Starting (LockHolder) Holding (Worker ) Starting (LockHolder) Not holding (Worker ) Trying to acquire (Worker ) Iteration 1: Acquired (LockHolder) Holding (Worker ) Trying to acquire (Worker ) Iteration 2: Not acquired (LockHolder) Not holding (Worker ) Trying to acquire (Worker ) Iteration 3: Acquired (LockHolder) Holding (Worker ) Trying to acquire (Worker ) Iteration 4: Not acquired (LockHolder) Not holding (Worker ) Trying to acquire (Worker ) Iteration 5: Acquired (Worker ) Done after 5 iterations
Повторные замки
Обычные объекты Lock
не могут быть получены более одного раза даже одним потоком. Это может вызвать нежелательные побочные эффекты, если к блокировке обращается более одной функции в одной цепочке вызовов.
threading_lock_reacquire.py
import threading lock threading.Lock() print('First try :', lock.acquire()) print('Second try:', lock.acquire(0))
В этом случае для второго вызова Acquire ()
дается нулевой тайм-аут, чтобы предотвратить его блокировку, потому что блокировка была получена первым вызовом.
$ python3 threading_lock_reacquire.py First try : True Second try: False
В ситуации, когда отдельный код из того же потока должен «повторно получить» блокировку, используйте вместо этого RLock
.
threading_rlock.py
import threading lock threading.RLock() print('First try :', lock.acquire()) print('Second try:', lock.acquire(0))
Единственное изменение в коде из предыдущего примера – замена RLock
на Lock
.
$ python3 threading_rlock.py First try : True Second try: True
Блокировки как менеджеры контекста
Блокировки реализуют API диспетчера контекста и совместимы с оператором with
. Использование with
устраняет необходимость явно устанавливать и снимать блокировку.
threading_lock_with.py
import threading import logging def worker_with(lock): with lock: logging.debug('Lock acquired via with') def worker_no_with(lock): lock.acquire() try: logging.debug('Lock acquired directly') finally: lock.release() logging.basicConfig( levellogging.DEBUG, format'(%(threadName)-10s) %(message)s', ) lock threading.Lock() w threading.Thread(targetworker_with, args(lock,)) nw threading.Thread(targetworker_no_with, args(lock,)) w.start() nw.start()
Две функции worker_with ()
и worker_no_with ()
управляют блокировкой одинаковым образом.
$ python3 threading_lock_with.py (Thread-1 ) Lock acquired via with (Thread-2 ) Lock acquired directly
Синхронизация потоков
Помимо использования Events
, еще один способ синхронизации потоков – использование объекта Condition
. Поскольку Condition
использует Lock
, он может быть привязан к общему ресурсу, что позволяет нескольким потокам ждать обновления ресурса. В этом примере потоки consumer ()
ожидают установки Condition
, прежде чем продолжить. Поток Producer ()
отвечает за установку условия и уведомление других потоков о том, что они могут продолжить.
threading_condition.py
import logging import threading import time def consumer(cond): """wait for the condition and use the resource""" logging.debug('Starting consumer thread') with cond: cond.wait() logging.debug('Resource is available to consumer') def producer(cond): """set up the resource to be used by the consumer""" logging.debug('Starting producer thread') with cond: logging.debug('Making resource available') cond.notifyAll() logging.basicConfig( levellogging.DEBUG, format'%(asctime)s (%(threadName)-2s) %(message)s', ) condition threading.Condition() c1 threading.Thread(name'c1', targetconsumer, args(condition,)) c2 threading.Thread(name'c2', targetconsumer, args(condition,)) p threading.Thread(name'p', targetproducer, args(condition,)) c1.start() time.sleep(0.2) c2.start() time.sleep(0.2) p.start()
Потоки используют with
для получения блокировки, связанной с Condition
. Также работает явное использование методов collect ()
и release ()
.
$ python3 threading_condition.py 2016-07-10 10:45:28,170 (c1) Starting consumer thread 2016-07-10 10:45:28,376 (c2) Starting consumer thread 2016-07-10 10:45:28,581 (p ) Starting producer thread 2016-07-10 10:45:28,581 (p ) Making resource available 2016-07-10 10:45:28,582 (c1) Resource is available to consumer 2016-07-10 10:45:28,582 (c2) Resource is available to consumer
Барьеры – еще один механизм синхронизации потоков. Барьер
устанавливает контрольную точку, и все участвующие потоки блокируются, пока все участвующие «стороны» не достигнут этой точки. Он позволяет потокам запускаться по отдельности, а затем приостанавливать работу, пока все они не будут готовы к продолжению.
threading_barrier.py
import threading import time def worker(barrier): print(threading.current_thread().name, 'waiting for barrier with {} others'.format( barrier.n_waiting)) worker_id barrier.wait() print(threading.current_thread().name, 'after barrier', worker_id) NUM_THREADS 3 barrier threading.Barrier(NUM_THREADS) threads [ threading.Thread( name'worker-%s' % i, targetworker, args(barrier,), ) for i in range(NUM_THREADS) ] for t in threads: print(t.name, 'starting') t.start() time.sleep(0.1) for t in threads: t.join()
В этом примере Barrier
настроен на блокировку, пока три потока не ожидают. Когда условие выполняется, все потоки освобождаются за контрольную точку одновременно. Возвращаемое значение wait ()
указывает номер освобождаемой стороны и может использоваться для ограничения некоторых потоков от выполнения таких действий, как очистка общего ресурса.
$ python3 threading_barrier.py worker-0 starting worker-0 waiting for barrier with 0 others worker-1 starting worker-1 waiting for barrier with 1 others worker-2 starting worker-2 waiting for barrier with 2 others worker-2 after barrier 2 worker-0 after barrier 0 worker-1 after barrier 1
Метод abort ()
для Barrier
заставляет все ожидающие потоки получать BrokenBarrierError
. Это позволяет потокам очищаться, если обработка остановлена, пока они заблокированы wait ()
.
threading_barrier_abort.py
import threading import time def worker(barrier): print(threading.current_thread().name, 'waiting for barrier with {} others'.format( barrier.n_waiting)) try: worker_id barrier.wait() except threading.BrokenBarrierError: print(threading.current_thread().name, 'aborting') else: print(threading.current_thread().name, 'after barrier', worker_id) NUM_THREADS 3 barrier threading.Barrier(NUM_THREADS + 1) threads [ threading.Thread( name'worker-%s' % i, targetworker, args(barrier,), ) for i in range(NUM_THREADS) ] for t in threads: print(t.name, 'starting') t.start() time.sleep(0.1) barrier.abort() for t in threads: t.join()
В этом примере Barrier
настраивается для ожидания на один участвующий поток больше, чем фактически запущено, так что обработка во всех потоках блокируется. Вызов abort ()
вызывает исключение в каждом заблокированном потоке.
$ python3 threading_barrier_abort.py worker-0 starting worker-0 waiting for barrier with 0 others worker-1 starting worker-1 waiting for barrier with 1 others worker-2 starting worker-2 waiting for barrier with 2 others worker-0 aborting worker-2 aborting worker-1 aborting
Ограничение одновременного доступа к ресурсам
Иногда бывает полезно разрешить доступ к ресурсу более чем одному исполнителю одновременно, при этом ограничивая общее количество. Например, пул подключений может поддерживать фиксированное количество одновременных подключений, или сетевое приложение может поддерживать фиксированное количество одновременных загрузок. Семафор
– это один из способов управления этими подключениями.
threading_semaphore.py
import logging import random import threading import time class ActivePool: def __init__(self): super(ActivePool, self).__init__() self.active [] self.lock threading.Lock() def makeActive(self, name): with self.lock: self.active.append(name) logging.debug('Running: %s', self.active) def makeInactive(self, name): with self.lock: self.active.remove(name) logging.debug('Running: %s', self.active) def worker(s, pool): logging.debug('Waiting to join the pool') with s: name threading.current_thread().getName() pool.makeActive(name) time.sleep(0.1) pool.makeInactive(name) logging.basicConfig( levellogging.DEBUG, format'%(asctime)s (%(threadName)-2s) %(message)s', ) pool ActivePool() s threading.Semaphore(2) for i in range(4): t threading.Thread( targetworker, namestr(i), args(s, pool), ) t.start()
В этом примере класс ActivePool
просто служит удобным способом отслеживать, какие потоки могут выполняться в данный момент. Реальный пул ресурсов выделит соединение или какое-либо другое значение новому активному потоку и восстановит значение, когда поток будет завершен. Здесь он просто используется для хранения имен активных потоков, чтобы показать, что не более двух выполняются одновременно.
$ python3 threading_semaphore.py 2016-07-10 10:45:29,398 (0 ) Waiting to join the pool 2016-07-10 10:45:29,398 (0 ) Running: ['0'] 2016-07-10 10:45:29,399 (1 ) Waiting to join the pool 2016-07-10 10:45:29,399 (1 ) Running: ['0', '1'] 2016-07-10 10:45:29,399 (2 ) Waiting to join the pool 2016-07-10 10:45:29,399 (3 ) Waiting to join the pool 2016-07-10 10:45:29,501 (1 ) Running: ['0'] 2016-07-10 10:45:29,501 (0 ) Running: [] 2016-07-10 10:45:29,502 (3 ) Running: ['3'] 2016-07-10 10:45:29,502 (2 ) Running: ['3', '2'] 2016-07-10 10:45:29,607 (3 ) Running: ['2'] 2016-07-10 10:45:29,608 (2 ) Running: []
Данные, относящиеся к потоку
В то время как некоторые ресурсы необходимо заблокировать, чтобы их могли использовать несколько потоков, другие необходимо защитить, чтобы они были скрыты от потоков, которым они не принадлежат. Класс local ()
создает объект, способный скрывать значения от представления в отдельных потоках.
threading_local.py
import random import threading import logging def show_value(data): try: val data.value except AttributeError: logging.debug('No value yet') else: logging.debug(%s', val) def worker(data): show_value(data) data.value random.randint(1, 100) show_value(data) logging.basicConfig( levellogging.DEBUG, format'(%(threadName)-10s) %(message)s', ) local_data threading.local() show_value(local_data) local_data.value 1000 show_value(local_data) for i in range(2): t threading.Thread(targetworker, args(local_data,)) t.start()
Атрибут local_data.value
не присутствует ни в одном потоке, пока он не будет установлен в этом потоке.
$ python3 threading_local.py (MainThread) No value yet (MainThread) (Thread-1 ) No value yet (Thread-1 ) (Thread-2 ) No value yet (Thread-2 )
Чтобы инициализировать настройки, чтобы все потоки начинались с одного и того же значения, используйте подкласс и установите атрибуты в __init __ ()
.
threading_local_defaults.py
import random import threading import logging def show_value(data): try: val data.value except AttributeError: logging.debug('No value yet') else: logging.debug(%s', val) def worker(data): show_value(data) data.value random.randint(1, 100) show_value(data) class MyLocal(threading.local): def __init__(self, value): super().__init__() logging.debug('Initializing %r', self) self.value value logging.basicConfig( levellogging.DEBUG, format'(%(threadName)-10s) %(message)s', ) local_data MyLocal(1000) show_value(local_data) for i in range(2): t threading.Thread(targetworker, args(local_data,)) t.start()
__init __ ()
вызывается для того же объекта (обратите внимание на значение id ()
) один раз в каждом потоке для установки значений по умолчанию.
$ python3 threading_local_defaults.py (MainThread) Initializing <__main__.MyLocal object at 0x101c6c288> (MainThread) (Thread-1 ) Initializing <__main__.MyLocal object at 0x101c6c288> (Thread-1 ) (Thread-1 ) (Thread-2 ) Initializing <__main__.MyLocal object at 0x101c6c288> (Thread-2 ) (Thread-2 )
Смотрите также
- стандартная библиотечная документация для потоковой передачи
- Заметки о переносе Python 2 на 3 для потоковой передачи
thread
– API потока нижнего уровня.Queue
– потокобезопасная очередь, полезная для передачи сообщений между потоками.- multiprocessing – API для работы с процессами, который отражает API
threading
.