Автор оригинала: Doug Hellmann.
Линейным потоком управления между сериями сопрограмм легко управлять с помощью встроенного ключевого слова языка await
. Более сложные структуры, позволяющие одной сопрограмме ждать, пока несколько других завершатся параллельно, также возможны с использованием инструментов в asyncio
.
Ожидание нескольких сопрограмм
Часто бывает полезно разделить одну операцию на несколько частей и выполнить их отдельно. Например, загрузка нескольких удаленных ресурсов или запрос удаленных API. В ситуациях, когда порядок выполнения не имеет значения и где может быть произвольное количество операций, можно использовать wait ()
для приостановки одной сопрограммы до завершения других фоновых операций.
asyncio_wait.py
import asyncio async def phase(i): print('in phase {}'.format(i)) await asyncio.sleep(0.1 * i) print('done with phase {}'.format(i)) return 'phase {} result'.format(i) async def main(num_phases): print('starting main') phases [ phase(i) for i in range(num_phases) ] print('waiting for phases to complete') completed, pending await asyncio.wait(phases) results [t.result() for t in completed] print('results: {!r}'.format(results)) event_loop asyncio.get_event_loop() try: event_loop.run_until_complete(main(3)) finally: event_loop.close()
Внутренне wait ()
использует set
для хранения экземпляров Task
, которые он создает. Это приводит к тому, что они запускаются и заканчиваются в непредсказуемом порядке. Возвращаемое значение от wait ()
– это кортеж, содержащий два набора, содержащих завершенные и ожидающие задачи.
$ python3 asyncio_wait.py starting main waiting for phases to complete in phase 0 in phase 1 in phase 2 done with phase 0 done with phase 1 done with phase 2 results: ['phase 1 result', 'phase 0 result', 'phase 2 result']
Незавершенные операции останутся только в том случае, если wait ()
используется со значением тайм-аута.
asyncio_wait_timeout.py
import asyncio async def phase(i): print('in phase {}'.format(i)) try: await asyncio.sleep(0.1 * i) except asyncio.CancelledError: print('phase {} canceled'.format(i)) raise else: print('done with phase {}'.format(i)) return 'phase {} result'.format(i) async def main(num_phases): print('starting main') phases [ phase(i) for i in range(num_phases) ] print('waiting 0.1 for phases to complete') completed, pending await asyncio.wait(phases, timeout0.1) print('{} completed and {} pending'.format( len(completed), len(pending), )) # Cancel remaining tasks so they do not generate errors # as we exit without finishing them. if pending: print('canceling tasks') for t in pending: t.cancel() print('exiting main') event_loop asyncio.get_event_loop() try: event_loop.run_until_complete(main(3)) finally: event_loop.close()
Эти оставшиеся фоновые операции следует либо отменить, либо завершить, дождавшись их. Если оставить их в ожидании, пока продолжается цикл событий, они могут выполняться дальше, что может быть нежелательно, если вся операция считается прерванной. Если оставить их на рассмотрении в конце процесса, появятся предупреждения.
$ python3 asyncio_wait_timeout.py starting main waiting 0.1 for phases to complete in phase 1 in phase 0 in phase 2 done with phase 0 1 completed and 2 pending cancelling tasks exiting main phase 1 cancelled phase 2 cancelled
Сбор результатов из сопрограмм
Если фоновые фазы четко определены и важны только результаты этих фаз, то gather ()
может быть более полезным для ожидания нескольких операций.
asyncio_gather.py
import asyncio async def phase1(): print('in phase1') await asyncio.sleep(2) print('done with phase1') return 'phase1 result' async def phase2(): print('in phase2') await asyncio.sleep(1) print('done with phase2') return 'phase2 result' async def main(): print('starting main') print('waiting for phases to complete') results await asyncio.gather( phase1(), phase2(), ) print('results: {!r}'.format(results)) event_loop asyncio.get_event_loop() try: event_loop.run_until_complete(main()) finally: event_loop.close()
Задачи, созданные командой gather, не отображаются, поэтому их нельзя отменить. Возвращаемое значение – это список результатов в том же порядке, что и аргументы, переданные в gather ()
, независимо от порядка фактического завершения фоновых операций.
$ python3 asyncio_gather.py starting main waiting for phases to complete in phase2 in phase1 done with phase2 done with phase1 results: ['phase1 result', 'phase2 result']
Обработка фоновых операций по мере их завершения
as_completed ()
– это генератор, который управляет выполнением списка переданных ему сопрограмм и выдает их результаты по одному по мере их завершения. Как и в случае с wait ()
, порядок не гарантируется с помощью as_completed ()
, но нет необходимости ждать завершения всех фоновых операций перед выполнением других действий.
asyncio_as_completed.py
import asyncio async def phase(i): print('in phase {}'.format(i)) await asyncio.sleep(0.5 - (0.1 * i)) print('done with phase {}'.format(i)) return 'phase {} result'.format(i) async def main(num_phases): print('starting main') phases [ phase(i) for i in range(num_phases) ] print('waiting for phases to complete') results [] for next_to_complete in asyncio.as_completed(phases): answer await next_to_complete print('received answer {!r}'.format(answer)) results.append(answer) print('results: {!r}'.format(results)) return results event_loop asyncio.get_event_loop() try: event_loop.run_until_complete(main(3)) finally: event_loop.close()
В этом примере начинается несколько фоновых фаз, которые заканчиваются в порядке, обратном их началу. По мере использования генератора цикл ожидает результата сопрограммы, используя await
.
$ python3 asyncio_as_completed.py starting main waiting for phases to complete in phase 0 in phase 2 in phase 1 done with phase 2 received answer 'phase 2 result' done with phase 1 received answer 'phase 1 result' done with phase 0 received answer 'phase 0 result' results: ['phase 2 result', 'phase 1 result', 'phase 0 result']