Рубрики
Без рубрики

Примитивы синхронизации

Автор оригинала: Doug Hellmann.

Хотя приложения asyncio обычно выполняются как однопоточный процесс, они по-прежнему создаются как параллельные приложения. Каждая сопрограмма или задача могут выполняться в непредсказуемом порядке, основанном на задержках и прерываниях от ввода-вывода и других внешних событий. Для поддержки безопасного параллелизма asyncio включает в себя реализации некоторых из тех же низкоуровневых примитивов, что и в модулях потоковой и многопроцессорной обработки.

Замки

Lock можно использовать для защиты доступа к общему ресурсу. Ресурс может использовать только владелец замка. Множественные попытки получить блокировку будут заблокированы, так что одновременно будет только один держатель.

asyncio_lock.py

import asyncio
import functools


def unlock(lock):
    print('callback releasing lock')
    lock.release()


async def coro1(lock):
    print('coro1 waiting for the lock')
    async with lock:
        print('coro1 acquired lock')
    print('coro1 released lock')


async def coro2(lock):
    print('coro2 waiting for the lock')
    await lock.acquire()
    try:
        print('coro2 acquired lock')
    finally:
        print('coro2 released lock')
        lock.release()


async def main(loop):
    # Create and acquire a shared lock.
    lock  asyncio.Lock()
    print('acquiring the lock before starting coroutines')
    await lock.acquire()
    print('lock acquired: {}'.format(lock.locked()))

    # Schedule a callback to unlock the lock.
    loop.call_later(0.1, functools.partial(unlock, lock))

    # Run the coroutines that want to use the lock.
    print('waiting for coroutines')
    await asyncio.wait([coro1(lock), coro2(lock)]),


event_loop  asyncio.get_event_loop()
try:
    event_loop.run_until_complete(main(event_loop))
finally:
    event_loop.close()

Метод блокировки Acquire () может быть вызван напрямую, используя await и вызывая метод release () , когда это выполняется, как в coro2 () в этом примере. Их также можно использовать в качестве асинхронных менеджеров контекста с ключевыми словами with await , как в coro1 () .

$ python3 asyncio_lock.py

acquiring the lock before starting coroutines
lock acquired: True
waiting for coroutines
coro2 waiting for the lock
coro1 waiting for the lock
callback releasing lock
coro2 acquired lock
coro2 released lock
coro1 acquired lock
coro1 released lock

События

asyncio.Event основан на threading.Event и используется, чтобы позволить нескольким потребителям ждать, пока что-то произойдет, не ища конкретное значение, связанное с уведомлением.

asyncio_event.py

import asyncio
import functools


def set_event(event):
    print('setting event in callback')
    event.set()


async def coro1(event):
    print('coro1 waiting for event')
    await event.wait()
    print('coro1 triggered')


async def coro2(event):
    print('coro2 waiting for event')
    await event.wait()
    print('coro2 triggered')


async def main(loop):
    # Create a shared event
    event  asyncio.Event()
    print('event start state: {}'.format(event.is_set()))

    loop.call_later(
        0.1, functools.partial(set_event, event)
    )

    await asyncio.wait([coro1(event), coro2(event)])
    print('event end state: {}'.format(event.is_set()))


event_loop  asyncio.get_event_loop()
try:
    event_loop.run_until_complete(main(event_loop))
finally:
    event_loop.close()

Как и в случае с Lock , оба coro1 () и coro2 () ждут, пока событие не будет установлено. Разница в том, что оба они могут запускаться, как только состояние события изменится, и им не нужно приобретать уникальную фиксацию объекта события.

$ python3 asyncio_event.py

event start state: False
coro2 waiting for event
coro1 waiting for event
setting event in callback
coro2 triggered
coro1 triggered
event end state: True

Условия

Condition работает аналогично Event , за исключением того, что вместо уведомления всех ожидающих сопрограмм количество пробужденных официантов контролируется аргументом для notify () .

asyncio_condition.py

import asyncio


async def consumer(condition, n):
    async with condition:
        print('consumer {} is waiting'.format(n))
        await condition.wait()
        print('consumer {} triggered'.format(n))
    print('ending consumer {}'.format(n))


async def manipulate_condition(condition):
    print('starting manipulate_condition')

    # pause to let consumers start
    await asyncio.sleep(0.1)

    for i in range(1, 3):
        async with condition:
            print('notifying {} consumers'.format(i))
            condition.notify(ni)
        await asyncio.sleep(0.1)

    async with condition:
        print('notifying remaining consumers')
        condition.notify_all()

    print('ending manipulate_condition')


async def main(loop):
    # Create a condition
    condition  asyncio.Condition()

    # Set up tasks watching the condition
    consumers  [
        consumer(condition, i)
        for i in range(5)
    ]

    # Schedule a task to manipulate the condition variable
    loop.create_task(manipulate_condition(condition))

    # Wait for the consumers to be done
    await asyncio.wait(consumers)


event_loop  asyncio.get_event_loop()
try:
    result  event_loop.run_until_complete(main(event_loop))
finally:
    event_loop.close()

В этом примере запускаются пять потребителей Condition . Каждый из них использует метод wait () для ожидания уведомления о том, что они могут продолжить. manage_condition () уведомляет одного потребителя, затем двух потребителей, а затем всех остальных потребителей.

$ python3 asyncio_condition.py

starting manipulate_condition
consumer 3 is waiting
consumer 0 is waiting
consumer 4 is waiting
consumer 1 is waiting
consumer 2 is waiting
notifying 1 consumers
consumer 3 triggered
ending consumer 3
notifying 2 consumers
consumer 0 triggered
ending consumer 0
consumer 4 triggered
ending consumer 4
notifying remaining consumers
ending manipulate_condition
consumer 1 triggered
ending consumer 1
consumer 2 triggered
ending consumer 2

Очереди

asyncio.Queue предоставляет структуру данных «первым пришел – первым вышел» для сопрограмм, как queue.Queue для потоков или multiprocessing.Queue делает для процессов.

asyncio_queue.py

import asyncio


async def consumer(n, q):
    print('consumer {}: starting'.format(n))
    while True:
        print('consumer {}: waiting for item'.format(n))
        item  await q.get()
        print('consumer {}: has item {}'.format(n, item))
        if item is None:
            # None is the signal to stop.
            q.task_done()
            break
        else:
            await asyncio.sleep(0.01 * item)
            q.task_done()
    print('consumer {}: ending'.format(n))


async def producer(q, num_workers):
    print('producer: starting')
    # Add some numbers to the queue to simulate jobs
    for i in range(num_workers * 3):
        await q.put(i)
        print('producer: added task {} to the queue'.format(i))
    # Add None entries in the queue
    # to signal the consumers to exit
    print('producer: adding stop signals to the queue')
    for i in range(num_workers):
        await q.put(None)
    print('producer: waiting for queue to empty')
    await q.join()
    print('producer: ending')


async def main(loop, num_consumers):
    # Create the queue with a fixed size so the producer
    # will block until the consumers pull some items out.
    q  asyncio.Queue(maxsizenum_consumers)

    # Scheduled the consumer tasks.
    consumers  [
        loop.create_task(consumer(i, q))
        for i in range(num_consumers)
    ]

    # Schedule the producer task.
    prod  loop.create_task(producer(q, num_consumers))

    # Wait for all of the coroutines to finish.
    await asyncio.wait(consumers + [prod])


event_loop  asyncio.get_event_loop()
try:
    event_loop.run_until_complete(main(event_loop, 2))
finally:
    event_loop.close()

Добавление элементов с помощью put () или удаление элементов с помощью get () являются асинхронными операциями, поскольку размер очереди может быть фиксированным (блокирование добавления) или очередь может быть пустой (блокирование вызова для получения элемента).

$ python3 asyncio_queue.py

consumer 0: starting
consumer 0: waiting for item
consumer 1: starting
consumer 1: waiting for item
producer: starting
producer: added task 0 to the queue
producer: added task 1 to the queue
consumer 0: has item 0
consumer 1: has item 1
producer: added task 2 to the queue
producer: added task 3 to the queue
consumer 0: waiting for item
consumer 0: has item 2
producer: added task 4 to the queue
consumer 1: waiting for item
consumer 1: has item 3
producer: added task 5 to the queue
producer: adding stop signals to the queue
consumer 0: waiting for item
consumer 0: has item 4
consumer 1: waiting for item
consumer 1: has item 5
producer: waiting for queue to empty
consumer 0: waiting for item
consumer 0: has item None
consumer 0: ending
consumer 1: waiting for item
consumer 1: has item None
consumer 1: ending
producer: ending