Автор оригинала: Doug Hellmann.
Многие существующие библиотеки изначально не готовы к использованию с asyncio
. Они могут блокировать или зависеть от функций параллелизма, недоступных в модуле. Эти библиотеки по-прежнему можно использовать в приложении, основанном на asyncio
, с помощью исполнителя из concurrent.futures для выполнения кода либо в отдельном потоке, либо в отдельном процессе.
Потоки
Метод run_in_executor ()
цикла событий принимает экземпляр исполнителя, вызываемый обычный вызываемый объект и любые аргументы, передаваемые вызываемому объекту. Он возвращает Future
, который можно использовать для ожидания, пока функция завершит свою работу и что-то вернет. Если исполнитель не передан, создается ThreadPoolExecutor
. В этом примере явно создается исполнитель, чтобы ограничить количество доступных ему рабочих потоков.
ThreadPoolExecutor
запускает свои рабочие потоки, а затем вызывает каждую из предоставленных функций один раз в потоке. В этом примере показано, как объединить run_in_executor ()
и wait ()
, чтобы сопрограмма передавала управление циклу событий, в то время как функции блокировки выполняются в отдельных потоках, а затем снова пробуждается когда эти функции будут завершены.
asyncio_executor_thread.py
import asyncio import concurrent.futures import logging import sys import time def blocks(n): log logging.getLogger('blocks({})'.format(n)) log.info('running') time.sleep(0.1) log.info('done') return n ** 2 async def run_blocking_tasks(executor): log logging.getLogger('run_blocking_tasks') log.info('starting') log.info('creating executor tasks') loop asyncio.get_event_loop() blocking_tasks [ loop.run_in_executor(executor, blocks, i) for i in range(6) ] log.info('waiting for executor tasks') completed, pending await asyncio.wait(blocking_tasks) results [t.result() for t in completed] log.info('results: {!r}'.format(results)) log.info('exiting') if __name__ '__main__': # Configure logging to show the name of the thread # where the log message originates. logging.basicConfig( levellogging.INFO, format'%(threadName)10s %(name)18s: %(message)s', streamsys.stderr, ) # Create a limited thread pool. executor concurrent.futures.ThreadPoolExecutor( max_workers3, ) event_loop asyncio.get_event_loop() try: event_loop.run_until_complete( run_blocking_tasks(executor) ) finally: event_loop.close()
asyncio_executor_thread.py
использует ведение журнала, чтобы удобно указать, какой поток и функция создают каждое сообщение журнала. Поскольку при каждом вызове blocks ()
используется отдельное средство ведения журнала, выходные данные ясно показывают, что одни и те же потоки повторно используются для вызова нескольких копий функции с разными аргументами.
$ python3 asyncio_executor_thread.py MainThread run_blocking_tasks: starting MainThread run_blocking_tasks: creating executor tasks ThreadPoolExecutor-0_0 blocks(0): running ThreadPoolExecutor-0_1 blocks(1): running ThreadPoolExecutor-0_2 blocks(2): running MainThread run_blocking_tasks: waiting for executor tasks ThreadPoolExecutor-0_0 blocks(0): done ThreadPoolExecutor-0_1 blocks(1): done ThreadPoolExecutor-0_2 blocks(2): done ThreadPoolExecutor-0_0 blocks(3): running ThreadPoolExecutor-0_1 blocks(4): running ThreadPoolExecutor-0_2 blocks(5): running ThreadPoolExecutor-0_0 blocks(3): done ThreadPoolExecutor-0_2 blocks(5): done ThreadPoolExecutor-0_1 blocks(4): done MainThread run_blocking_tasks: results: [0, 9, 16, 25, 1, 4] MainThread run_blocking_tasks: exiting
Процессы
ProcessPoolExecutor
работает почти так же, создавая набор рабочих процессов вместо потоков. Использование отдельных процессов требует больше системных ресурсов, но для ресурсоемких операций может иметь смысл запускать отдельную задачу на каждом ядре ЦП.
asyncio_executor_process.py
# changes from asyncio_executor_thread.py if __name__ '__main__': # Configure logging to show the id of the process # where the log message originates. logging.basicConfig( levellogging.INFO, format'PID %(process)5s %(name)18s: %(message)s', streamsys.stderr, ) # Create a limited process pool. executor concurrent.futures.ProcessPoolExecutor( max_workers3, ) event_loop asyncio.get_event_loop() try: event_loop.run_until_complete( run_blocking_tasks(executor) ) finally: event_loop.close()
Единственное изменение, необходимое для перехода от потоков к процессам, – это создание другого типа исполнителя. В этом примере также изменяется строка формата ведения журнала, чтобы включить идентификатор процесса вместо имени потока, чтобы продемонстрировать, что задачи фактически выполняются в отдельных процессах.
$ python3 asyncio_executor_process.py PID 40498 run_blocking_tasks: starting PID 40498 run_blocking_tasks: creating executor tasks PID 40498 run_blocking_tasks: waiting for executor tasks PID 40499 blocks(0): running PID 40500 blocks(1): running PID 40501 blocks(2): running PID 40499 blocks(0): done PID 40500 blocks(1): done PID 40501 blocks(2): done PID 40500 blocks(3): running PID 40499 blocks(4): running PID 40501 blocks(5): running PID 40499 blocks(4): done PID 40500 blocks(3): done PID 40501 blocks(5): done PID 40498 run_blocking_tasks: results: [1, 4, 9, 0, 16, 25] PID 40498 run_blocking_tasks: exiting