Рубрики
Без рубрики

Объединение сопрограмм с потоками и процессами

Автор оригинала: Doug Hellmann.

Многие существующие библиотеки изначально не готовы к использованию с asyncio . Они могут блокировать или зависеть от функций параллелизма, недоступных в модуле. Эти библиотеки по-прежнему можно использовать в приложении, основанном на asyncio , с помощью исполнителя из concurrent.futures для выполнения кода либо в отдельном потоке, либо в отдельном процессе.

Потоки

Метод run_in_executor () цикла событий принимает экземпляр исполнителя, вызываемый обычный вызываемый объект и любые аргументы, передаваемые вызываемому объекту. Он возвращает Future , который можно использовать для ожидания, пока функция завершит свою работу и что-то вернет. Если исполнитель не передан, создается ThreadPoolExecutor . В этом примере явно создается исполнитель, чтобы ограничить количество доступных ему рабочих потоков.

ThreadPoolExecutor запускает свои рабочие потоки, а затем вызывает каждую из предоставленных функций один раз в потоке. В этом примере показано, как объединить run_in_executor () и wait () , чтобы сопрограмма передавала управление циклу событий, в то время как функции блокировки выполняются в отдельных потоках, а затем снова пробуждается когда эти функции будут завершены.

asyncio_executor_thread.py

import asyncio
import concurrent.futures
import logging
import sys
import time


def blocks(n):
    log  logging.getLogger('blocks({})'.format(n))
    log.info('running')
    time.sleep(0.1)
    log.info('done')
    return n ** 2


async def run_blocking_tasks(executor):
    log  logging.getLogger('run_blocking_tasks')
    log.info('starting')

    log.info('creating executor tasks')
    loop  asyncio.get_event_loop()
    blocking_tasks  [
        loop.run_in_executor(executor, blocks, i)
        for i in range(6)
    ]
    log.info('waiting for executor tasks')
    completed, pending  await asyncio.wait(blocking_tasks)
    results  [t.result() for t in completed]
    log.info('results: {!r}'.format(results))

    log.info('exiting')


if __name__  '__main__':
    # Configure logging to show the name of the thread
    # where the log message originates.
    logging.basicConfig(
        levellogging.INFO,
        format'%(threadName)10s %(name)18s: %(message)s',
        streamsys.stderr,
    )

    # Create a limited thread pool.
    executor  concurrent.futures.ThreadPoolExecutor(
        max_workers3,
    )

    event_loop  asyncio.get_event_loop()
    try:
        event_loop.run_until_complete(
            run_blocking_tasks(executor)
        )
    finally:
        event_loop.close()

asyncio_executor_thread.py использует ведение журнала, чтобы удобно указать, какой поток и функция создают каждое сообщение журнала. Поскольку при каждом вызове blocks () используется отдельное средство ведения журнала, выходные данные ясно показывают, что одни и те же потоки повторно используются для вызова нескольких копий функции с разными аргументами.

$ python3 asyncio_executor_thread.py

MainThread run_blocking_tasks: starting
MainThread run_blocking_tasks: creating executor tasks
ThreadPoolExecutor-0_0          blocks(0): running
ThreadPoolExecutor-0_1          blocks(1): running
ThreadPoolExecutor-0_2          blocks(2): running
MainThread run_blocking_tasks: waiting for executor tasks
ThreadPoolExecutor-0_0          blocks(0): done
ThreadPoolExecutor-0_1          blocks(1): done
ThreadPoolExecutor-0_2          blocks(2): done
ThreadPoolExecutor-0_0          blocks(3): running
ThreadPoolExecutor-0_1          blocks(4): running
ThreadPoolExecutor-0_2          blocks(5): running
ThreadPoolExecutor-0_0          blocks(3): done
ThreadPoolExecutor-0_2          blocks(5): done
ThreadPoolExecutor-0_1          blocks(4): done
MainThread run_blocking_tasks: results: [0, 9, 16, 25, 1, 4]
MainThread run_blocking_tasks: exiting

Процессы

ProcessPoolExecutor работает почти так же, создавая набор рабочих процессов вместо потоков. Использование отдельных процессов требует больше системных ресурсов, но для ресурсоемких операций может иметь смысл запускать отдельную задачу на каждом ядре ЦП.

asyncio_executor_process.py

# changes from asyncio_executor_thread.py

if __name__  '__main__':
    # Configure logging to show the id of the process
    # where the log message originates.
    logging.basicConfig(
        levellogging.INFO,
        format'PID %(process)5s %(name)18s: %(message)s',
        streamsys.stderr,
    )

    # Create a limited process pool.
    executor  concurrent.futures.ProcessPoolExecutor(
        max_workers3,
    )

    event_loop  asyncio.get_event_loop()
    try:
        event_loop.run_until_complete(
            run_blocking_tasks(executor)
        )
    finally:
        event_loop.close()

Единственное изменение, необходимое для перехода от потоков к процессам, – это создание другого типа исполнителя. В этом примере также изменяется строка формата ведения журнала, чтобы включить идентификатор процесса вместо имени потока, чтобы продемонстрировать, что задачи фактически выполняются в отдельных процессах.

$ python3 asyncio_executor_process.py

PID 40498 run_blocking_tasks: starting
PID 40498 run_blocking_tasks: creating executor tasks
PID 40498 run_blocking_tasks: waiting for executor tasks
PID 40499          blocks(0): running
PID 40500          blocks(1): running
PID 40501          blocks(2): running
PID 40499          blocks(0): done
PID 40500          blocks(1): done
PID 40501          blocks(2): done
PID 40500          blocks(3): running
PID 40499          blocks(4): running
PID 40501          blocks(5): running
PID 40499          blocks(4): done
PID 40500          blocks(3): done
PID 40501          blocks(5): done
PID 40498 run_blocking_tasks: results: [1, 4, 9, 0, 16, 25]
PID 40498 run_blocking_tasks: exiting