Автор оригинала: Doug Hellmann.
Цель:
Легко управляйте задачами, выполняемыми одновременно и параллельно.
Модули concurrent.futures
предоставляют интерфейсы для выполнения задач с использованием пулов потоков или рабочих процессов. API-интерфейсы одинаковы, поэтому приложения могут переключаться между потоками и процессами с минимальными изменениями.
Модуль предоставляет два типа классов для взаимодействия с пулами. Executors используются для управления пулами рабочих, а Futures используются для управления результатами, вычисляемыми рабочими. Чтобы использовать пул рабочих процессов, приложение создает экземпляр соответствующего класса исполнителя и затем отправляет задачи для его запуска. При запуске каждой задачи возвращается экземпляр Future
. Когда требуется результат задачи, приложение может использовать Future
для блокировки, пока результат не станет доступен. Предоставляются различные API-интерфейсы, чтобы было удобно ждать завершения задач, чтобы не нужно было напрямую управлять объектами Future
.
Использование map () с базовым пулом потоков
ThreadPoolExecutor
управляет набором рабочих потоков, передавая им задачи, когда они становятся доступными для дополнительной работы. В этом примере используется map ()
для одновременного создания набора результатов из итерируемого ввода. Задача использует time.sleep ()
, чтобы приостановить различное количество времени, чтобы продемонстрировать, что, независимо от порядка выполнения параллельных задач, map ()
всегда возвращает значения по порядку на основе входов.
futures_thread_pool_map.py
from concurrent import futures import threading import time def task(n): print('{}: sleeping {}'.format( threading.current_thread().name, n) ) time.sleep(n / 10) print('{}: done with {}'.format( threading.current_thread().name, n) ) return n / 10 ex futures.ThreadPoolExecutor(max_workers2) print('main: starting') results ex.map(task, range(5, 0, -1)) print('main: unprocessed results {}'.format(results)) print('main: waiting for real results') real_results list(results) print('main: results: {}'.format(real_results))
Возвращаемое значение от map ()
на самом деле является итератором особого типа, который знает, что нужно ждать каждого ответа, когда основная программа выполняет итерацию по нему.
$ python3 futures_thread_pool_map.py main: starting ThreadPoolExecutor-0_0: sleeping 5 ThreadPoolExecutor-0_1: sleeping 4 main: unprocessed results.result_iterator at 0x103e12780> main: waiting for real results ThreadPoolExecutor-0_1: done with 4 ThreadPoolExecutor-0_1: sleeping 3 ThreadPoolExecutor-0_0: done with 5 ThreadPoolExecutor-0_0: sleeping 2 ThreadPoolExecutor-0_0: done with 2 ThreadPoolExecutor-0_0: sleeping 1 ThreadPoolExecutor-0_1: done with 3 ThreadPoolExecutor-0_0: done with 1 main: results: [0.5, 0.4, 0.3, 0.2, 0.1]
Планирование индивидуальных задач
Помимо использования map ()
, можно запланировать отдельную задачу с исполнителем с помощью submit ()
и использовать экземпляр Future
вернулся, чтобы дождаться результатов этой задачи.
futures_thread_pool_submit.py
from concurrent import futures import threading import time def task(n): print('{}: sleeping {}'.format( threading.current_thread().name, n) ) time.sleep(n / 10) print('{}: done with {}'.format( threading.current_thread().name, n) ) return n / 10 ex futures.ThreadPoolExecutor(max_workers2) print('main: starting') f ex.submit(task, 5) print('main: future: {}'.format(f)) print('main: waiting for results') result f.result() print('main: result: {}'.format(result)) print('main: future after result: {}'.format(f))
Статус будущего меняется после того, как задачи будут выполнены и результат станет доступен.
$ python3 futures_thread_pool_submit.py main: starting ThreadPoolExecutor-0_0: sleeping 5 main: future:main: waiting for results ThreadPoolExecutor-0_0: done with 5 main: result: 0.5 main: future after result:
Ожидание задач в любом порядке
Вызов метода result ()
в Future
блокируется до тех пор, пока задача не завершится (либо путем возврата значения или создания исключения), либо не будет отменена. Доступ к результатам нескольких задач можно получить в порядке их планирования с помощью map ()
. Если не имеет значения, в каком порядке должны обрабатываться результаты, используйте as_completed ()
для их обработки по завершении каждой задачи.
futures_as_completed.py
from concurrent import futures import random import time def task(n): time.sleep(random.random()) return (n, n / 10) ex futures.ThreadPoolExecutor(max_workers5) print('main: starting') wait_for [ ex.submit(task, i) for i in range(5, 0, -1) ] for f in futures.as_completed(wait_for): print('main: result: {}'.format(f.result()))
Поскольку в пуле столько рабочих, сколько задач, можно запускать все задачи. Они заканчиваются в случайном порядке, поэтому значения, сгенерированные as_completed ()
, будут разными при каждом запуске примера.
$ python3 futures_as_completed.py main: starting main: result: (1, 0.1) main: result: (5, 0.5) main: result: (3, 0.3) main: result: (2, 0.2) main: result: (4, 0.4)
Будущие обратные вызовы
Чтобы предпринять какое-либо действие после завершения задачи, не ожидая явно результата, используйте add_done_callback ()
, чтобы указать новую функцию, которая будет вызываться, когда Future
завершится. Обратный вызов должен быть вызываемым, принимающим единственный аргумент, экземпляр Future
.
futures_future_callback.py
from concurrent import futures import time def task(n): print('{}: sleeping'.format(n)) time.sleep(0.5) print('{}: done'.format(n)) return n / 10 def done(fn): if fn.cancelled(): print('{}: canceled'.format(fn.arg)) elif fn.done(): error fn.exception() if error: print('{}: error returned: {}'.format( fn.arg, error)) else: result fn.result() print('{}: value returned: {}'.format( fn.arg, result)) if __name__ '__main__': ex futures.ThreadPoolExecutor(max_workers2) print('main: starting') f ex.submit(task, 5) f.arg 5 f.add_done_callback(done) result f.result()
Обратный вызов вызывается независимо от того, почему Future
считается “выполненным”, поэтому необходимо проверить статус объекта, переданного в обратный вызов, прежде чем использовать его каким-либо образом.
$ python3 futures_future_callback.py main: starting 5: sleeping 5: done 5: value returned: 0.5
Отмена задач
Future
можно отменить, если он был отправлен, но не запущен, путем вызова его метода cancel ()
.
futures_future_callback_cancel.py
from concurrent import futures import time def task(n): print('{}: sleeping'.format(n)) time.sleep(0.5) print('{}: done'.format(n)) return n / 10 def done(fn): if fn.cancelled(): print('{}: canceled'.format(fn.arg)) elif fn.done(): print('{}: not canceled'.format(fn.arg)) if __name__ '__main__': ex futures.ThreadPoolExecutor(max_workers2) print('main: starting') tasks [] for i in range(10, 0, -1): print('main: submitting {}'.format(i)) f ex.submit(task, i) f.arg i f.add_done_callback(done) tasks.append((i, f)) for i, t in reversed(tasks): if not t.cancel(): print('main: did not cancel {}'.format(i)) ex.shutdown()
cancel ()
возвращает логическое значение, указывающее, можно ли было отменить задачу.
$ python3 futures_future_callback_cancel.py main: starting main: submitting 10 10: sleeping main: submitting 9 9: sleeping main: submitting 8 main: submitting 7 main: submitting 6 main: submitting 5 main: submitting 4 main: submitting 3 main: submitting 2 main: submitting 1 1: canceled 2: canceled 3: canceled 4: canceled 5: canceled 6: canceled 7: canceled 8: canceled main: did not cancel 9 main: did not cancel 10 10: done 10: not canceled 9: done 9: not canceled
Исключения в задачах
Если задача вызывает необработанное исключение, она сохраняется в Future
для задачи и становится доступной через result ()
или exception ()
методы.
futures_future_exception.py
from concurrent import futures def task(n): print('{}: starting'.format(n)) raise ValueError('the value {} is no good'.format(n)) ex futures.ThreadPoolExecutor(max_workers2) print('main: starting') f ex.submit(task, 5) error f.exception() print('main: error: {}'.format(error)) try: result f.result() except ValueError as e: print('main: saw error "{}" when accessing result'.format(e))
Если result ()
вызывается после возникновения необработанного исключения в функции задачи, то же исключение повторно возникает в текущем контексте.
$ python3 futures_future_exception.py main: starting 5: starting main: error: the value 5 is no good main: saw error "the value 5 is no good" when accessing result
Менеджер контекста
Исполнители работают как менеджеры контекста, выполняя задачи одновременно и ожидая их завершения. Когда диспетчер контекста завершает работу, вызывается метод исполнителя shutdown ()
.
futures_context_manager.py
from concurrent import futures def task(n): print(n) with futures.ThreadPoolExecutor(max_workers2) as ex: print('main: starting') ex.submit(task, 1) ex.submit(task, 2) ex.submit(task, 3) ex.submit(task, 4) print('main: done')
Этот режим использования исполнителя полезен, когда ресурсы потока или процесса должны быть очищены, когда выполнение выходит за пределы текущей области.
$ python3 futures_context_manager.py main: starting 1 2 3 4 main: done
Пулы процессов
ProcessPoolExecutor
работает так же, как ThreadPoolExecutor
, но использует процессы вместо потоков. Это позволяет операциям с интенсивным использованием ЦП использовать отдельный ЦП и не блокировать глобальную блокировку интерпретатора интерпретатора CPython.
futures_process_pool_map.py
from concurrent import futures import os def task(n): return (n, os.getpid()) ex futures.ProcessPoolExecutor(max_workers2) results ex.map(task, range(5, 0, -1)) for n, pid in results: print('ran task {} in process {}'.format(n, pid))
Как и в случае с пулом потоков, отдельные рабочие процессы повторно используются для нескольких задач.
$ python3 futures_process_pool_map.py ran task 5 in process 40854 ran task 4 in process 40854 ran task 3 in process 40854 ran task 2 in process 40854 ran task 1 in process 40854
Если что-то происходит с одним из рабочих процессов, вызывая его неожиданный выход, ProcessPoolExecutor
считается “неисправным” и больше не будет планировать задачи.
futures_process_pool_broken.py
from concurrent import futures import os import signal with futures.ProcessPoolExecutor(max_workers2) as ex: print('getting the pid for one worker') f1 ex.submit(os.getpid) pid1 f1.result() print('killing process {}'.format(pid1)) os.kill(pid1, signal.SIGHUP) print('submitting another task') f2 ex.submit(os.getpid) try: pid2 f2.result() except futures.process.BrokenProcessPool as e: print('could not start new tasks: {}'.format(e))
Исключение BrokenProcessPool
фактически вызывается при обработке результатов, а не при отправке новой задачи.
$ python3 futures_process_pool_broken.py getting the pid for one worker killing process 40858 submitting another task could not start new tasks: A process in the process pool was terminated abruptly while the future was running or pending.
Смотрите также
- Стандартная библиотека документации для concurrent.futures
- PEP 3148 – предложение по созданию
concurrent.futures набор функций.
- Объединение сопрограмм с потоками и процессами
- заправка
- многопроцессорность