Рубрики
Без рубрики

concurrent.futures – Управление пулами параллельных задач

Автор оригинала: Doug Hellmann.

Цель:

Легко управляйте задачами, выполняемыми одновременно и параллельно.

Модули concurrent.futures предоставляют интерфейсы для выполнения задач с использованием пулов потоков или рабочих процессов. API-интерфейсы одинаковы, поэтому приложения могут переключаться между потоками и процессами с минимальными изменениями.

Модуль предоставляет два типа классов для взаимодействия с пулами. Executors используются для управления пулами рабочих, а Futures используются для управления результатами, вычисляемыми рабочими. Чтобы использовать пул рабочих процессов, приложение создает экземпляр соответствующего класса исполнителя и затем отправляет задачи для его запуска. При запуске каждой задачи возвращается экземпляр Future . Когда требуется результат задачи, приложение может использовать Future для блокировки, пока результат не станет доступен. Предоставляются различные API-интерфейсы, чтобы было удобно ждать завершения задач, чтобы не нужно было напрямую управлять объектами Future .

Использование map () с базовым пулом потоков

ThreadPoolExecutor управляет набором рабочих потоков, передавая им задачи, когда они становятся доступными для дополнительной работы. В этом примере используется map () для одновременного создания набора результатов из итерируемого ввода. Задача использует time.sleep () , чтобы приостановить различное количество времени, чтобы продемонстрировать, что, независимо от порядка выполнения параллельных задач, map () всегда возвращает значения по порядку на основе входов.

futures_thread_pool_map.py

from concurrent import futures
import threading
import time


def task(n):
    print('{}: sleeping {}'.format(
        threading.current_thread().name,
        n)
    )
    time.sleep(n / 10)
    print('{}: done with {}'.format(
        threading.current_thread().name,
        n)
    )
    return n / 10


ex  futures.ThreadPoolExecutor(max_workers2)
print('main: starting')
results  ex.map(task, range(5, 0, -1))
print('main: unprocessed results {}'.format(results))
print('main: waiting for real results')
real_results  list(results)
print('main: results: {}'.format(real_results))

Возвращаемое значение от map () на самом деле является итератором особого типа, который знает, что нужно ждать каждого ответа, когда основная программа выполняет итерацию по нему.

$ python3 futures_thread_pool_map.py

main: starting
ThreadPoolExecutor-0_0: sleeping 5
ThreadPoolExecutor-0_1: sleeping 4
main: unprocessed results .result_iterator at 0x103e12780>
main: waiting for real results
ThreadPoolExecutor-0_1: done with 4
ThreadPoolExecutor-0_1: sleeping 3
ThreadPoolExecutor-0_0: done with 5
ThreadPoolExecutor-0_0: sleeping 2
ThreadPoolExecutor-0_0: done with 2
ThreadPoolExecutor-0_0: sleeping 1
ThreadPoolExecutor-0_1: done with 3
ThreadPoolExecutor-0_0: done with 1
main: results: [0.5, 0.4, 0.3, 0.2, 0.1]

Планирование индивидуальных задач

Помимо использования map () , можно запланировать отдельную задачу с исполнителем с помощью submit () и использовать экземпляр Future вернулся, чтобы дождаться результатов этой задачи.

futures_thread_pool_submit.py

from concurrent import futures
import threading
import time


def task(n):
    print('{}: sleeping {}'.format(
        threading.current_thread().name,
        n)
    )
    time.sleep(n / 10)
    print('{}: done with {}'.format(
        threading.current_thread().name,
        n)
    )
    return n / 10


ex  futures.ThreadPoolExecutor(max_workers2)
print('main: starting')
f  ex.submit(task, 5)
print('main: future: {}'.format(f))
print('main: waiting for results')
result  f.result()
print('main: result: {}'.format(result))
print('main: future after result: {}'.format(f))

Статус будущего меняется после того, как задачи будут выполнены и результат станет доступен.

$ python3 futures_thread_pool_submit.py

main: starting
ThreadPoolExecutor-0_0: sleeping 5
main: future: 
main: waiting for results
ThreadPoolExecutor-0_0: done with 5
main: result: 0.5
main: future after result: 

Ожидание задач в любом порядке

Вызов метода result () в Future блокируется до тех пор, пока задача не завершится (либо путем возврата значения или создания исключения), либо не будет отменена. Доступ к результатам нескольких задач можно получить в порядке их планирования с помощью map () . Если не имеет значения, в каком порядке должны обрабатываться результаты, используйте as_completed () для их обработки по завершении каждой задачи.

futures_as_completed.py

from concurrent import futures
import random
import time


def task(n):
    time.sleep(random.random())
    return (n, n / 10)


ex  futures.ThreadPoolExecutor(max_workers5)
print('main: starting')

wait_for  [
    ex.submit(task, i)
    for i in range(5, 0, -1)
]

for f in futures.as_completed(wait_for):
    print('main: result: {}'.format(f.result()))

Поскольку в пуле столько рабочих, сколько задач, можно запускать все задачи. Они заканчиваются в случайном порядке, поэтому значения, сгенерированные as_completed () , будут разными при каждом запуске примера.

$ python3 futures_as_completed.py

main: starting
main: result: (1, 0.1)
main: result: (5, 0.5)
main: result: (3, 0.3)
main: result: (2, 0.2)
main: result: (4, 0.4)

Будущие обратные вызовы

Чтобы предпринять какое-либо действие после завершения задачи, не ожидая явно результата, используйте add_done_callback () , чтобы указать новую функцию, которая будет вызываться, когда Future завершится. Обратный вызов должен быть вызываемым, принимающим единственный аргумент, экземпляр Future .

futures_future_callback.py

from concurrent import futures
import time


def task(n):
    print('{}: sleeping'.format(n))
    time.sleep(0.5)
    print('{}: done'.format(n))
    return n / 10


def done(fn):
    if fn.cancelled():
        print('{}: canceled'.format(fn.arg))
    elif fn.done():
        error  fn.exception()
        if error:
            print('{}: error returned: {}'.format(
                fn.arg, error))
        else:
            result  fn.result()
            print('{}: value returned: {}'.format(
                fn.arg, result))


if __name__  '__main__':
    ex  futures.ThreadPoolExecutor(max_workers2)
    print('main: starting')
    f  ex.submit(task, 5)
    f.arg  5
    f.add_done_callback(done)
    result  f.result()

Обратный вызов вызывается независимо от того, почему Future считается “выполненным”, поэтому необходимо проверить статус объекта, переданного в обратный вызов, прежде чем использовать его каким-либо образом.

$ python3 futures_future_callback.py

main: starting
5: sleeping
5: done
5: value returned: 0.5

Отмена задач

Future можно отменить, если он был отправлен, но не запущен, путем вызова его метода cancel () .

futures_future_callback_cancel.py

from concurrent import futures
import time


def task(n):
    print('{}: sleeping'.format(n))
    time.sleep(0.5)
    print('{}: done'.format(n))
    return n / 10


def done(fn):
    if fn.cancelled():
        print('{}: canceled'.format(fn.arg))
    elif fn.done():
        print('{}: not canceled'.format(fn.arg))


if __name__  '__main__':
    ex  futures.ThreadPoolExecutor(max_workers2)
    print('main: starting')
    tasks  []

    for i in range(10, 0, -1):
        print('main: submitting {}'.format(i))
        f  ex.submit(task, i)
        f.arg  i
        f.add_done_callback(done)
        tasks.append((i, f))

    for i, t in reversed(tasks):
        if not t.cancel():
            print('main: did not cancel {}'.format(i))

    ex.shutdown()

cancel () возвращает логическое значение, указывающее, можно ли было отменить задачу.

$ python3 futures_future_callback_cancel.py

main: starting
main: submitting 10
10: sleeping
main: submitting 9
9: sleeping
main: submitting 8
main: submitting 7
main: submitting 6
main: submitting 5
main: submitting 4
main: submitting 3
main: submitting 2
main: submitting 1
1: canceled
2: canceled
3: canceled
4: canceled
5: canceled
6: canceled
7: canceled
8: canceled
main: did not cancel 9
main: did not cancel 10
10: done
10: not canceled
9: done
9: not canceled

Исключения в задачах

Если задача вызывает необработанное исключение, она сохраняется в Future для задачи и становится доступной через result () или exception () методы.

futures_future_exception.py

from concurrent import futures


def task(n):
    print('{}: starting'.format(n))
    raise ValueError('the value {} is no good'.format(n))


ex  futures.ThreadPoolExecutor(max_workers2)
print('main: starting')
f  ex.submit(task, 5)

error  f.exception()
print('main: error: {}'.format(error))

try:
    result  f.result()
except ValueError as e:
    print('main: saw error "{}" when accessing result'.format(e))

Если result () вызывается после возникновения необработанного исключения в функции задачи, то же исключение повторно возникает в текущем контексте.

$ python3 futures_future_exception.py

main: starting
5: starting
main: error: the value 5 is no good
main: saw error "the value 5 is no good" when accessing result

Менеджер контекста

Исполнители работают как менеджеры контекста, выполняя задачи одновременно и ожидая их завершения. Когда диспетчер контекста завершает работу, вызывается метод исполнителя shutdown () .

futures_context_manager.py

from concurrent import futures


def task(n):
    print(n)


with futures.ThreadPoolExecutor(max_workers2) as ex:
    print('main: starting')
    ex.submit(task, 1)
    ex.submit(task, 2)
    ex.submit(task, 3)
    ex.submit(task, 4)

print('main: done')

Этот режим использования исполнителя полезен, когда ресурсы потока или процесса должны быть очищены, когда выполнение выходит за пределы текущей области.

$ python3 futures_context_manager.py

main: starting
1
2
3
4
main: done

Пулы процессов

ProcessPoolExecutor работает так же, как ThreadPoolExecutor , но использует процессы вместо потоков. Это позволяет операциям с интенсивным использованием ЦП использовать отдельный ЦП и не блокировать глобальную блокировку интерпретатора интерпретатора CPython.

futures_process_pool_map.py

from concurrent import futures
import os


def task(n):
    return (n, os.getpid())


ex  futures.ProcessPoolExecutor(max_workers2)
results  ex.map(task, range(5, 0, -1))
for n, pid in results:
    print('ran task {} in process {}'.format(n, pid))

Как и в случае с пулом потоков, отдельные рабочие процессы повторно используются для нескольких задач.

$ python3 futures_process_pool_map.py

ran task 5 in process 40854
ran task 4 in process 40854
ran task 3 in process 40854
ran task 2 in process 40854
ran task 1 in process 40854

Если что-то происходит с одним из рабочих процессов, вызывая его неожиданный выход, ProcessPoolExecutor считается “неисправным” и больше не будет планировать задачи.

futures_process_pool_broken.py

from concurrent import futures
import os
import signal


with futures.ProcessPoolExecutor(max_workers2) as ex:
    print('getting the pid for one worker')
    f1  ex.submit(os.getpid)
    pid1  f1.result()

    print('killing process {}'.format(pid1))
    os.kill(pid1, signal.SIGHUP)

    print('submitting another task')
    f2  ex.submit(os.getpid)
    try:
        pid2  f2.result()
    except futures.process.BrokenProcessPool as e:
        print('could not start new tasks: {}'.format(e))

Исключение BrokenProcessPool фактически вызывается при обработке результатов, а не при отправке новой задачи.

$ python3 futures_process_pool_broken.py

getting the pid for one worker
killing process 40858
submitting another task
could not start new tasks: A process in the process pool was
terminated abruptly while the future was running or pending.

Смотрите также