Недавно я столкнулся с задачей пакетной обработки обработки тысяч изображений из S3. Поскольку обработка была относительно легким, большая часть времени вычислений была потрачена на загрузку и загрузка изображений, то есть ввода/вывода. Такие оперативные задачи ввода/вывода являются отличными подходящими для многопотативных, контрастных задач CPU, лучше подходящих для многопроцессора со всеми его причудливыми, связанными с коммуникацией интерпретирования. В этом посте я хотел бы поделиться небольшим примером, как запускать задачи в пуле резьбы.
Мы будем использовать ThreadPoolexecutor
Для выполнения задач в настраиваемом количестве рабочих потоков. Один вариант будет представлять задачи в бассейн с Execututor.submit ()
Отказ Этот метод возвращает одновременно. Будущее
объект, к которому можно добавить обратные вызовы с add_done_callback ()
Отказ Однако обратные вызовы являются злом, и лучше избегать их, если это возможно.
С асинсио
мы можем вместо этого использовать Очередным asyncio.future
объекты и избегать обратных вызовов.
Наша игрушечная задача определяется следующим образом:
def execute_hello(ind): print(f"Thread {threading.current_thread().name}: Starting task: {ind}...") time.sleep(1) print(f"Thread {threading.current_thread().name}: Finished task {ind}!") return ind
Задача будет спать одну секунду между печатными информационными сообщениями стандартным выходом. Он принимает целочисленный аргумент, поэтому мы можем отслеживать, какую задачу выполняется.
Въездная точка нашей программы выглядит следующим образом:
import asyncio def execute_hello(ind): ... async def main(tasks=20): ... if __name__ == "__main__": asyncio.run(main())
asyncio.run
Функция выполняет Coroutine Главная
начиная с петли события. Обратите внимание, что вам нужен Python> 3.7 для использования asyncio.run
Отказ
В Корудине мы объявим ThreadPoolexecutor
С, например, четыре рабочие потоки:
MAX_WORKERS = 4 async def main(tasks=20): with concurrent.futures.ThreadPoolExecutor(max_workers=MAX_WORKERS) as pool: ...
Использование ThreadPoolexecutor
В качестве менеджера контекста гарантирует, что пул выключен, когда контекстные менеджеры выходит.
Теперь, когда у нас есть бассейн, мы можем подать заданиям к нему. Вместо использования pool.submit.submit.
мы будем использовать loop.run_in_executor ()
от asyncio
:
async def main(tasks=20): with concurrent.futures.ThreadPoolExecutor(max_workers=MAX_WORKERS) as pool: loop = asyncio.get_running_loop() futures = [ loop.run_in_executor(pool, execute_hello, task) for task in range(tasks) ]
Функция asyncio.get_running_loop ()
Возвращает цикл запущенного события. Затем мы создаем будущее для каждого из 20 задач, которые мы хотим работать с помощью loop.run_in_executor (пул, execute_hello, задача)
Отказ
Все идет нормально. Если мы теперь выполним сценарий, мы видим, что пул выполняет четыре задания одновременно и займет около пяти секунд, чтобы завершить, как ожидалось. Тем не менее, мы все еще отсутствуем какого-либо очистки и обработки ошибок.
Чтобы собрать результаты поставленных задач, мы будем использовать asyncio.ghather
Отказ Эта функция может быть использована для ждут нескольких вспомогательных пользователей, чтобы закончить и добавить обработку ошибок, как это:
async def main(tasks=20): with concurrent.futures.ThreadPoolExecutor(max_workers=MAX_WORKERS) as pool: loop = asyncio.get_running_loop() futures = [ loop.run_in_executor(pool, execute_hello, task) for task in range(tasks) ] try: results = await asyncio.gather(*futures, return_exceptions=False) except Exception as ex: print("Caught error executing task", ex) raise print(f"Finished processing, got results: {results}")
Когда наш сценарий сейчас заканчивается, он печатает результат от каждой задачи в порядке:
Finished processing, got results: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19]
Давайте теперь проверим нашу обработку ошибок, изменяя задачу следующим образом:
def execute_hello(ind): print(f"Thread {threading.current_thread().name}: Starting task: {ind}...") time.sleep(1) if ind == 2: print("BOOM!") raise Exception("Boom!") print(f"Thread {threading.current_thread().name}: Finished task {ind}!") return ind
Если теперь мы запустим сценарий, мы замечаем, что выполнение продолжается до тех пор, пока все задачи не будут запущены, даже если третья задача в первой партии не удается. Это может или не может быть то, что мы хотим. В моем случае мне нужно было избежать запуска новых задач после того, как какая-либо единственная задача не удалась. Это может быть достигнуто путем установки глобального _shutdown
Переменная к Правда
Когда первая задача терпит неудачу и пропускает работу в любых оставшихся задачах, если _shutdown
это Правда
:
_shutdown = False def execute_hello(ind): if _shutdown: print(f"Thread {threading.current_thread().name}: Skipping task {ind} as shutdown was requested") return None ... async def main(tasks=20): global _shutdown with concurrent.futures.ThreadPoolExecutor(max_workers=MAX_WORKERS) as pool: loop = asyncio.get_running_loop() futures = [ loop.run_in_executor(pool, execute_hello, task) for task in range(tasks) ] try: results = await asyncio.gather(*futures, return_exceptions=False) except Exception as ex: print("Caught error executing task", ex) _shutdown = True raise print(f"Finished processing, got results: {results}")
Теперь любые задачи, которые не были начаты во время первого исключения, пропущены. Любые задачи, которые были начаты, все еще ожидаются, чтобы закончить.
Если у вас есть какие-либо вопросы или комментарии, пожалуйста, оставьте один!
Оригинал: “https://dev.to/ksaaskil/how-to-speed-up-i-o-intensive-tasks-with-multithreading-and-asyncio-3j2m”