Автор оригинала: Doug Hellmann.
Хотя приложения asyncio
обычно выполняются как однопоточный процесс, они по-прежнему создаются как параллельные приложения. Каждая сопрограмма или задача могут выполняться в непредсказуемом порядке, основанном на задержках и прерываниях от ввода-вывода и других внешних событий. Для поддержки безопасного параллелизма asyncio
включает в себя реализации некоторых из тех же низкоуровневых примитивов, что и в модулях потоковой и многопроцессорной обработки.
Замки
Lock
можно использовать для защиты доступа к общему ресурсу. Ресурс может использовать только владелец замка. Множественные попытки получить блокировку будут заблокированы, так что одновременно будет только один держатель.
asyncio_lock.py
import asyncio import functools def unlock(lock): print('callback releasing lock') lock.release() async def coro1(lock): print('coro1 waiting for the lock') async with lock: print('coro1 acquired lock') print('coro1 released lock') async def coro2(lock): print('coro2 waiting for the lock') await lock.acquire() try: print('coro2 acquired lock') finally: print('coro2 released lock') lock.release() async def main(loop): # Create and acquire a shared lock. lock asyncio.Lock() print('acquiring the lock before starting coroutines') await lock.acquire() print('lock acquired: {}'.format(lock.locked())) # Schedule a callback to unlock the lock. loop.call_later(0.1, functools.partial(unlock, lock)) # Run the coroutines that want to use the lock. print('waiting for coroutines') await asyncio.wait([coro1(lock), coro2(lock)]), event_loop asyncio.get_event_loop() try: event_loop.run_until_complete(main(event_loop)) finally: event_loop.close()
Метод блокировки Acquire ()
может быть вызван напрямую, используя await
и вызывая метод release ()
, когда это выполняется, как в coro2 ()
в этом примере. Их также можно использовать в качестве асинхронных менеджеров контекста с ключевыми словами with await
, как в coro1 ()
.
$ python3 asyncio_lock.py acquiring the lock before starting coroutines lock acquired: True waiting for coroutines coro2 waiting for the lock coro1 waiting for the lock callback releasing lock coro2 acquired lock coro2 released lock coro1 acquired lock coro1 released lock
События
asyncio.Event
основан на threading.Event
и используется, чтобы позволить нескольким потребителям ждать, пока что-то произойдет, не ища конкретное значение, связанное с уведомлением.
asyncio_event.py
import asyncio import functools def set_event(event): print('setting event in callback') event.set() async def coro1(event): print('coro1 waiting for event') await event.wait() print('coro1 triggered') async def coro2(event): print('coro2 waiting for event') await event.wait() print('coro2 triggered') async def main(loop): # Create a shared event event asyncio.Event() print('event start state: {}'.format(event.is_set())) loop.call_later( 0.1, functools.partial(set_event, event) ) await asyncio.wait([coro1(event), coro2(event)]) print('event end state: {}'.format(event.is_set())) event_loop asyncio.get_event_loop() try: event_loop.run_until_complete(main(event_loop)) finally: event_loop.close()
Как и в случае с Lock
, оба coro1 ()
и coro2 ()
ждут, пока событие не будет установлено. Разница в том, что оба они могут запускаться, как только состояние события изменится, и им не нужно приобретать уникальную фиксацию объекта события.
$ python3 asyncio_event.py event start state: False coro2 waiting for event coro1 waiting for event setting event in callback coro2 triggered coro1 triggered event end state: True
Условия
Condition
работает аналогично Event
, за исключением того, что вместо уведомления всех ожидающих сопрограмм количество пробужденных официантов контролируется аргументом для notify ()
.
asyncio_condition.py
import asyncio async def consumer(condition, n): async with condition: print('consumer {} is waiting'.format(n)) await condition.wait() print('consumer {} triggered'.format(n)) print('ending consumer {}'.format(n)) async def manipulate_condition(condition): print('starting manipulate_condition') # pause to let consumers start await asyncio.sleep(0.1) for i in range(1, 3): async with condition: print('notifying {} consumers'.format(i)) condition.notify(ni) await asyncio.sleep(0.1) async with condition: print('notifying remaining consumers') condition.notify_all() print('ending manipulate_condition') async def main(loop): # Create a condition condition asyncio.Condition() # Set up tasks watching the condition consumers [ consumer(condition, i) for i in range(5) ] # Schedule a task to manipulate the condition variable loop.create_task(manipulate_condition(condition)) # Wait for the consumers to be done await asyncio.wait(consumers) event_loop asyncio.get_event_loop() try: result event_loop.run_until_complete(main(event_loop)) finally: event_loop.close()
В этом примере запускаются пять потребителей Condition
. Каждый из них использует метод wait ()
для ожидания уведомления о том, что они могут продолжить. manage_condition ()
уведомляет одного потребителя, затем двух потребителей, а затем всех остальных потребителей.
$ python3 asyncio_condition.py starting manipulate_condition consumer 3 is waiting consumer 0 is waiting consumer 4 is waiting consumer 1 is waiting consumer 2 is waiting notifying 1 consumers consumer 3 triggered ending consumer 3 notifying 2 consumers consumer 0 triggered ending consumer 0 consumer 4 triggered ending consumer 4 notifying remaining consumers ending manipulate_condition consumer 1 triggered ending consumer 1 consumer 2 triggered ending consumer 2
Очереди
asyncio.Queue
предоставляет структуру данных «первым пришел – первым вышел» для сопрограмм, как queue.Queue
для потоков или multiprocessing.Queue
делает для процессов.
asyncio_queue.py
import asyncio async def consumer(n, q): print('consumer {}: starting'.format(n)) while True: print('consumer {}: waiting for item'.format(n)) item await q.get() print('consumer {}: has item {}'.format(n, item)) if item is None: # None is the signal to stop. q.task_done() break else: await asyncio.sleep(0.01 * item) q.task_done() print('consumer {}: ending'.format(n)) async def producer(q, num_workers): print('producer: starting') # Add some numbers to the queue to simulate jobs for i in range(num_workers * 3): await q.put(i) print('producer: added task {} to the queue'.format(i)) # Add None entries in the queue # to signal the consumers to exit print('producer: adding stop signals to the queue') for i in range(num_workers): await q.put(None) print('producer: waiting for queue to empty') await q.join() print('producer: ending') async def main(loop, num_consumers): # Create the queue with a fixed size so the producer # will block until the consumers pull some items out. q asyncio.Queue(maxsizenum_consumers) # Scheduled the consumer tasks. consumers [ loop.create_task(consumer(i, q)) for i in range(num_consumers) ] # Schedule the producer task. prod loop.create_task(producer(q, num_consumers)) # Wait for all of the coroutines to finish. await asyncio.wait(consumers + [prod]) event_loop asyncio.get_event_loop() try: event_loop.run_until_complete(main(event_loop, 2)) finally: event_loop.close()
Добавление элементов с помощью put ()
или удаление элементов с помощью get ()
являются асинхронными операциями, поскольку размер очереди может быть фиксированным (блокирование добавления) или очередь может быть пустой (блокирование вызова для получения элемента).
$ python3 asyncio_queue.py consumer 0: starting consumer 0: waiting for item consumer 1: starting consumer 1: waiting for item producer: starting producer: added task 0 to the queue producer: added task 1 to the queue consumer 0: has item 0 consumer 1: has item 1 producer: added task 2 to the queue producer: added task 3 to the queue consumer 0: waiting for item consumer 0: has item 2 producer: added task 4 to the queue consumer 1: waiting for item consumer 1: has item 3 producer: added task 5 to the queue producer: adding stop signals to the queue consumer 0: waiting for item consumer 0: has item 4 consumer 1: waiting for item consumer 1: has item 5 producer: waiting for queue to empty consumer 0: waiting for item consumer 0: has item None consumer 0: ending consumer 1: waiting for item consumer 1: has item None consumer 1: ending producer: ending