Рубрики
Без рубрики

Составление сопрограмм с управляющими структурами

Автор оригинала: Doug Hellmann.

Линейным потоком управления между сериями сопрограмм легко управлять с помощью встроенного ключевого слова языка await . Более сложные структуры, позволяющие одной сопрограмме ждать, пока несколько других завершатся параллельно, также возможны с использованием инструментов в asyncio .

Ожидание нескольких сопрограмм

Часто бывает полезно разделить одну операцию на несколько частей и выполнить их отдельно. Например, загрузка нескольких удаленных ресурсов или запрос удаленных API. В ситуациях, когда порядок выполнения не имеет значения и где может быть произвольное количество операций, можно использовать wait () для приостановки одной сопрограммы до завершения других фоновых операций.

asyncio_wait.py

import asyncio


async def phase(i):
    print('in phase {}'.format(i))
    await asyncio.sleep(0.1 * i)
    print('done with phase {}'.format(i))
    return 'phase {} result'.format(i)


async def main(num_phases):
    print('starting main')
    phases  [
        phase(i)
        for i in range(num_phases)
    ]
    print('waiting for phases to complete')
    completed, pending  await asyncio.wait(phases)
    results  [t.result() for t in completed]
    print('results: {!r}'.format(results))


event_loop  asyncio.get_event_loop()
try:
    event_loop.run_until_complete(main(3))
finally:
    event_loop.close()

Внутренне wait () использует set для хранения экземпляров Task , которые он создает. Это приводит к тому, что они запускаются и заканчиваются в непредсказуемом порядке. Возвращаемое значение от wait () – это кортеж, содержащий два набора, содержащих завершенные и ожидающие задачи.

$ python3 asyncio_wait.py

starting main
waiting for phases to complete
in phase 0
in phase 1
in phase 2
done with phase 0
done with phase 1
done with phase 2
results: ['phase 1 result', 'phase 0 result', 'phase 2 result']

Незавершенные операции останутся только в том случае, если wait () используется со значением тайм-аута.

asyncio_wait_timeout.py

import asyncio


async def phase(i):
    print('in phase {}'.format(i))
    try:
        await asyncio.sleep(0.1 * i)
    except asyncio.CancelledError:
        print('phase {} canceled'.format(i))
        raise
    else:
        print('done with phase {}'.format(i))
        return 'phase {} result'.format(i)


async def main(num_phases):
    print('starting main')
    phases  [
        phase(i)
        for i in range(num_phases)
    ]
    print('waiting 0.1 for phases to complete')
    completed, pending  await asyncio.wait(phases, timeout0.1)
    print('{} completed and {} pending'.format(
        len(completed), len(pending),
    ))
    # Cancel remaining tasks so they do not generate errors
    # as we exit without finishing them.
    if pending:
        print('canceling tasks')
        for t in pending:
            t.cancel()
    print('exiting main')


event_loop  asyncio.get_event_loop()
try:
    event_loop.run_until_complete(main(3))
finally:
    event_loop.close()

Эти оставшиеся фоновые операции следует либо отменить, либо завершить, дождавшись их. Если оставить их в ожидании, пока продолжается цикл событий, они могут выполняться дальше, что может быть нежелательно, если вся операция считается прерванной. Если оставить их на рассмотрении в конце процесса, появятся предупреждения.

$ python3 asyncio_wait_timeout.py

starting main
waiting 0.1 for phases to complete
in phase 1
in phase 0
in phase 2
done with phase 0
1 completed and 2 pending
cancelling tasks
exiting main
phase 1 cancelled
phase 2 cancelled

Сбор результатов из сопрограмм

Если фоновые фазы четко определены и важны только результаты этих фаз, то gather () может быть более полезным для ожидания нескольких операций.

asyncio_gather.py

import asyncio


async def phase1():
    print('in phase1')
    await asyncio.sleep(2)
    print('done with phase1')
    return 'phase1 result'


async def phase2():
    print('in phase2')
    await asyncio.sleep(1)
    print('done with phase2')
    return 'phase2 result'


async def main():
    print('starting main')
    print('waiting for phases to complete')
    results  await asyncio.gather(
        phase1(),
        phase2(),
    )
    print('results: {!r}'.format(results))


event_loop  asyncio.get_event_loop()
try:
    event_loop.run_until_complete(main())
finally:
    event_loop.close()

Задачи, созданные командой gather, не отображаются, поэтому их нельзя отменить. Возвращаемое значение – это список результатов в том же порядке, что и аргументы, переданные в gather () , независимо от порядка фактического завершения фоновых операций.

$ python3 asyncio_gather.py

starting main
waiting for phases to complete
in phase2
in phase1
done with phase2
done with phase1
results: ['phase1 result', 'phase2 result']

Обработка фоновых операций по мере их завершения

as_completed () – это генератор, который управляет выполнением списка переданных ему сопрограмм и выдает их результаты по одному по мере их завершения. Как и в случае с wait () , порядок не гарантируется с помощью as_completed () , но нет необходимости ждать завершения всех фоновых операций перед выполнением других действий.

asyncio_as_completed.py

import asyncio


async def phase(i):
    print('in phase {}'.format(i))
    await asyncio.sleep(0.5 - (0.1 * i))
    print('done with phase {}'.format(i))
    return 'phase {} result'.format(i)


async def main(num_phases):
    print('starting main')
    phases  [
        phase(i)
        for i in range(num_phases)
    ]
    print('waiting for phases to complete')
    results  []
    for next_to_complete in asyncio.as_completed(phases):
        answer  await next_to_complete
        print('received answer {!r}'.format(answer))
        results.append(answer)
    print('results: {!r}'.format(results))
    return results


event_loop  asyncio.get_event_loop()
try:
    event_loop.run_until_complete(main(3))
finally:
    event_loop.close()

В этом примере начинается несколько фоновых фаз, которые заканчиваются в порядке, обратном их началу. По мере использования генератора цикл ожидает результата сопрограммы, используя await .

$ python3 asyncio_as_completed.py

starting main
waiting for phases to complete
in phase 0
in phase 2
in phase 1
done with phase 2
received answer 'phase 2 result'
done with phase 1
received answer 'phase 1 result'
done with phase 0
received answer 'phase 0 result'
results: ['phase 2 result', 'phase 1 result', 'phase 0 result']