Рубрики
Без рубрики

Как обрабатывать I / O-o-интенсивные задачи одновременно с Asyncio

Это 2021, не более обратных вызовов. Теги от обучения, Python, Asyncio.

Недавно я столкнулся с задачей пакетной обработки обработки тысяч изображений из S3. Поскольку обработка была относительно легким, большая часть времени вычислений была потрачена на загрузку и загрузка изображений, то есть ввода/вывода. Такие оперативные задачи ввода/вывода являются отличными подходящими для многопотативных, контрастных задач CPU, лучше подходящих для многопроцессора со всеми его причудливыми, связанными с коммуникацией интерпретирования. В этом посте я хотел бы поделиться небольшим примером, как запускать задачи в пуле резьбы.

Мы будем использовать ThreadPoolexecutor Для выполнения задач в настраиваемом количестве рабочих потоков. Один вариант будет представлять задачи в бассейн с Execututor.submit () Отказ Этот метод возвращает одновременно. Будущее объект, к которому можно добавить обратные вызовы с add_done_callback () Отказ Однако обратные вызовы являются злом, и лучше избегать их, если это возможно.

С асинсио мы можем вместо этого использовать Очередным asyncio.future объекты и избегать обратных вызовов.

Наша игрушечная задача определяется следующим образом:

def execute_hello(ind):
    print(f"Thread {threading.current_thread().name}: Starting task: {ind}...")
    time.sleep(1)
    print(f"Thread {threading.current_thread().name}: Finished task {ind}!")
    return ind

Задача будет спать одну секунду между печатными информационными сообщениями стандартным выходом. Он принимает целочисленный аргумент, поэтому мы можем отслеживать, какую задачу выполняется.

Въездная точка нашей программы выглядит следующим образом:

import asyncio

def execute_hello(ind):
    ...

async def main(tasks=20):
    ...

if __name__ == "__main__":
    asyncio.run(main())

asyncio.run Функция выполняет Coroutine Главная начиная с петли события. Обратите внимание, что вам нужен Python> 3.7 для использования asyncio.run Отказ

В Корудине мы объявим ThreadPoolexecutor С, например, четыре рабочие потоки:

MAX_WORKERS = 4

async def main(tasks=20):
    with concurrent.futures.ThreadPoolExecutor(max_workers=MAX_WORKERS) as pool:
        ...

Использование ThreadPoolexecutor В качестве менеджера контекста гарантирует, что пул выключен, когда контекстные менеджеры выходит.

Теперь, когда у нас есть бассейн, мы можем подать заданиям к нему. Вместо использования pool.submit.submit. мы будем использовать loop.run_in_executor () от asyncio :

async def main(tasks=20):
    with concurrent.futures.ThreadPoolExecutor(max_workers=MAX_WORKERS) as pool:
        loop = asyncio.get_running_loop()
        futures = [
            loop.run_in_executor(pool, execute_hello, task)
            for task in range(tasks)
        ]

Функция asyncio.get_running_loop () Возвращает цикл запущенного события. Затем мы создаем будущее для каждого из 20 задач, которые мы хотим работать с помощью loop.run_in_executor (пул, execute_hello, задача) Отказ

Все идет нормально. Если мы теперь выполним сценарий, мы видим, что пул выполняет четыре задания одновременно и займет около пяти секунд, чтобы завершить, как ожидалось. Тем не менее, мы все еще отсутствуем какого-либо очистки и обработки ошибок.

Чтобы собрать результаты поставленных задач, мы будем использовать asyncio.ghather Отказ Эта функция может быть использована для ждут нескольких вспомогательных пользователей, чтобы закончить и добавить обработку ошибок, как это:

async def main(tasks=20):
    with concurrent.futures.ThreadPoolExecutor(max_workers=MAX_WORKERS) as pool:
        loop = asyncio.get_running_loop()
        futures = [
            loop.run_in_executor(pool, execute_hello, task)
            for task in range(tasks)
        ]
        try:
            results = await asyncio.gather(*futures, return_exceptions=False)
        except Exception as ex:
            print("Caught error executing task", ex)
            raise
    print(f"Finished processing, got results: {results}")

Когда наш сценарий сейчас заканчивается, он печатает результат от каждой задачи в порядке:

Finished processing, got results: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19]

Давайте теперь проверим нашу обработку ошибок, изменяя задачу следующим образом:

def execute_hello(ind):
    print(f"Thread {threading.current_thread().name}: Starting task: {ind}...")

    time.sleep(1)

    if ind == 2:
        print("BOOM!")
        raise Exception("Boom!")

    print(f"Thread {threading.current_thread().name}: Finished task {ind}!")
    return ind

Если теперь мы запустим сценарий, мы замечаем, что выполнение продолжается до тех пор, пока все задачи не будут запущены, даже если третья задача в первой партии не удается. Это может или не может быть то, что мы хотим. В моем случае мне нужно было избежать запуска новых задач после того, как какая-либо единственная задача не удалась. Это может быть достигнуто путем установки глобального _shutdown Переменная к Правда Когда первая задача терпит неудачу и пропускает работу в любых оставшихся задачах, если _shutdown это Правда :

_shutdown = False

def execute_hello(ind):
    if _shutdown:
        print(f"Thread {threading.current_thread().name}: Skipping task {ind} as shutdown was requested")
        return None
    ...

async def main(tasks=20):
    global _shutdown
    with concurrent.futures.ThreadPoolExecutor(max_workers=MAX_WORKERS) as pool:
        loop = asyncio.get_running_loop()
        futures = [
            loop.run_in_executor(pool, execute_hello, task)
            for task in range(tasks)
        ]
        try:
            results = await asyncio.gather(*futures, return_exceptions=False)
        except Exception as ex:
            print("Caught error executing task", ex)
            _shutdown = True
            raise
    print(f"Finished processing, got results: {results}")

Теперь любые задачи, которые не были начаты во время первого исключения, пропущены. Любые задачи, которые были начаты, все еще ожидаются, чтобы закончить.

Если у вас есть какие-либо вопросы или комментарии, пожалуйста, оставьте один!

Оригинал: “https://dev.to/ksaaskil/how-to-speed-up-i-o-intensive-tasks-with-multithreading-and-asyncio-3j2m”