Рубрики
Без рубрики

Обзор AsyncIO в Python 3.7

Автор оригинала: Guest Contributor.

Обзор AsyncIO в Python 3.7

Модуль Python 3 asyncio предоставляет фундаментальные инструменты для реализации асинхронного ввода-вывода в Python. Он был представлен в Python 3.4, и с каждым последующим незначительным выпуском модуль значительно эволюционировал.

Этот учебник содержит общий обзор асинхронной парадигмы и того, как она реализована в Python 3.7.

Блокирующий и неблокирующий ввод-вывод

Проблема, которую асинхронность стремится решить, – это блокировка ввода-вывода .

По умолчанию, когда ваша программа обращается к данным из источника ввода-вывода, она ждет завершения этой операции, прежде чем продолжить выполнение программы.

with open('myfile.txt', 'r') as file:
    data = file.read()
    # Until the data is read into memory, the program waits here
print(data)

Программа блокируется от продолжения своего потока выполнения во время доступа к физическому устройству и передачи данных.

Сетевые операции являются еще одним распространенным источником блокировки:

# pip install --user requests
import requests

req = requests.get('https://www.stackabuse.com/')

#
# Blocking occurs here, waiting for completion of an HTTPS request
#

print(req.text)

Во многих случаях задержка, вызванная блокировкой, незначительна. Однако блокировка ввода-вывода масштабируется очень плохо. Если вам нужно подождать 10 10 чтение файлов или сетевые транзакции, производительность будет страдать.

Многопроцессорность, многопоточность и асинхронность

Стратегии минимизации задержек блокирующего ввода-вывода делятся на три основные категории: многопроцессорность, многопоточность и асинхронность.

Многопроцессорная обработка

Многопроцессорная обработка-это форма параллельных вычислений: инструкции выполняются в перекрывающихся временных рамках на нескольких физических процессорах или ядрах. Каждый процесс, порожденный ядром, несет накладные расходы, включая независимо выделенный кусок памяти (куча).

Python реализует параллелизм с модулем multiprocessing .

Ниже приведен пример программы Python3, которая порождает четыре дочерних процесса, каждый из которых демонстрирует случайную независимую задержку. Выходные данные показывают идентификатор процесса каждого дочернего элемента, системное время до и после каждой задержки, а также текущее и пиковое выделение памяти на каждом шаге.

from multiprocessing import Process
import os, time, datetime, random, tracemalloc

tracemalloc.start()
children = 4    # number of child processes to spawn
maxdelay = 6    # maximum delay in seconds

def status():
    return ('Time: ' + 
        str(datetime.datetime.now().time()) +
        '\t Malloc, Peak: ' +
        str(tracemalloc.get_traced_memory()))

def child(num):
    delay = random.randrange(maxdelay)
    print(f"{status()}\t\tProcess {num}, PID: {os.getpid()}, Delay: {delay} seconds...")
    time.sleep(delay)
    print(f"{status()}\t\tProcess {num}: Done.")

if __name__ == '__main__':
    print(f"Parent PID: {os.getpid()}")
    for i in range(children):
        proc = Process(target=child, args=(i,))
        proc.start()

Выход:

Parent PID: 16048
Time: 09:52:47.014906    Malloc, Peak: (228400, 240036)     Process 0, PID: 16051, Delay: 1 seconds...
Time: 09:52:47.016517    Malloc, Peak: (231240, 240036)     Process 1, PID: 16052, Delay: 4 seconds...
Time: 09:52:47.018786    Malloc, Peak: (231616, 240036)     Process 2, PID: 16053, Delay: 3 seconds...
Time: 09:52:47.019398    Malloc, Peak: (232264, 240036)     Process 3, PID: 16054, Delay: 2 seconds...
Time: 09:52:48.017104    Malloc, Peak: (228434, 240036)     Process 0: Done.
Time: 09:52:49.021636    Malloc, Peak: (232298, 240036)     Process 3: Done.
Time: 09:52:50.022087    Malloc, Peak: (231650, 240036)     Process 2: Done.
Time: 09:52:51.020856    Malloc, Peak: (231274, 240036)     Process 1: Done.

Нарезание резьбы

Многопоточность-это альтернатива многопроцессорной обработке, имеющая свои преимущества и недостатки.

Потоки независимо планируются, и их выполнение может происходить в течение перекрывающегося периода времени. Однако, в отличие от многопроцессорной обработки, потоки существуют полностью в одном процессе ядра и совместно используют одну выделенную кучу.

Потоки Python являются параллельными — несколько последовательностей машинного кода выполняются в перекрывающихся временных интервалах. Но они не являются параллельными — выполнение не происходит одновременно на нескольких физических ядрах.

Основными недостатками потоковой передачи Python являются безопасность памяти и условия гонки . Все дочерние потоки родительского процесса работают в одном и том же общем пространстве памяти. Без дополнительной защиты один поток может перезаписать общее значение в памяти без ведома других потоков. Такое искажение данных было бы катастрофическим.

Для обеспечения потокобезопасности реализации Python используют глобальную блокировку интерпретатора (GIL). GIL-это механизм мьютекса, который предотвращает одновременное выполнение нескольких потоков на объектах Python. Фактически это означает, что в любой момент времени выполняется только один поток.

Вот потоковая версия примера многопроцессорной обработки из предыдущего раздела. Обратите внимание, что очень мало изменилось: многопроцессорная обработка.Процесс заменяется на нарезание резьбы.Нить . Как указано в выходных данных, все происходит в одном процессе, и объем памяти значительно меньше.

from threading import Thread
import os, time, datetime, random, tracemalloc

tracemalloc.start()
children = 4    # number of child threads to spawn
maxdelay = 6    # maximum delay in seconds

def status():
    return ('Time: ' + 
        str(datetime.datetime.now().time()) +
        '\t Malloc, Peak: ' +
        str(tracemalloc.get_traced_memory()))

def child(num):
    delay = random.randrange(maxdelay)
    print(f"{status()}\t\tProcess {num}, PID: {os.getpid()}, Delay: {delay} seconds...")
    time.sleep(delay)
    print(f"{status()}\t\tProcess {num}: Done.")

if __name__ == '__main__':
    print(f"Parent PID: {os.getpid()}")
    for i in range(children):
        thr = Thread(target=child, args=(i,))
        thr.start()

Выход:

Parent PID: 19770
Time: 10:44:40.942558    Malloc, Peak: (9150, 9264)     Process 0, PID: 19770, Delay: 3 seconds...
Time: 10:44:40.942937    Malloc, Peak: (13989, 14103)       Process 1, PID: 19770, Delay: 5 seconds...
Time: 10:44:40.943298    Malloc, Peak: (18734, 18848)       Process 2, PID: 19770, Delay: 3 seconds...
Time: 10:44:40.943746    Malloc, Peak: (23959, 24073)       Process 3, PID: 19770, Delay: 2 seconds...
Time: 10:44:42.945896    Malloc, Peak: (26599, 26713)       Process 3: Done.
Time: 10:44:43.945739    Malloc, Peak: (26741, 27223)       Process 0: Done.
Time: 10:44:43.945942    Malloc, Peak: (26851, 27333)       Process 2: Done.
Time: 10:44:45.948107    Malloc, Peak: (24639, 27475)       Process 1: Done.

Асинхронность

Асинхронность-это альтернатива потоковой обработке для написания параллельных приложений. Асинхронные события происходят по независимым расписаниям, “несинхронизированным” друг с другом, полностью в пределах одного потока .

В отличие от потоковой передачи, в асинхронных программах программист контролирует, когда и как происходит добровольное упреждение, что облегчает изоляцию и предотвращение условий гонки.

Введение в модуль Python 3.7 asyncio

В Python 3.7 асинхронные операции обеспечиваются модулем asyncio .

Высокоуровневый и низкоуровневый asyncio API

Компоненты Asyncio делятся на высокоуровневые API (для написания программ) и низкоуровневые API (для написания библиотек или фреймворков на основе asyncio ).

Каждая программа asyncio |/может быть написана с использованием только высокоуровневых API. Если вы не пишете фреймворк или библиотеку, вам никогда не нужно прикасаться к низкоуровневым вещам.

С учетом сказанного давайте рассмотрим основные высокоуровневые API и обсудим основные концепции.

Сопрограммы

В общем случае сопрограмма (сокращение от кооперативная подпрограмма ) – это функция, предназначенная для добровольной упреждающей многозадачности: она проактивно уступает другим подпрограммам и процессам, а не принудительно вытесняется ядром. Термин “сопрограмма” был придуман в 1958 году Мелвином Конвеем (известным как “Закон Конвея”) для описания кода, который активно облегчает потребности других частей системы.

В asyncio это добровольное упреждение называется ожидание .

Ожидаемый, Асинхронный и Ожидающий

Любой объект, который может быть ожидаем (добровольно вытеснен сопрограммой), называется ожидаемым .

Ключевое слово await приостанавливает выполнение текущей сопрограммы и вызывает указанную доступную.

В Python 3.7 доступны три объекта: coroutine , task и future .

Asyncio coroutine – это любая функция Python, определение которой имеет префикс с ключевым словом async .

async def my_coro():
    pass

Asyncio task – это объект, который обертывает сопрограмму, предоставляя методы для управления ее выполнением и запроса ее состояния. Задача может быть создана с помощью asyncio.create_task () или asyncio.gather() .

Asyncio future -это низкоуровневый объект, который действует как заполнитель для данных, которые еще не были вычислены или извлечены. Он может обеспечить пустую структуру, которая будет заполнена данными позже, и механизм обратного вызова, который запускается, когда данные готовы.

Задача наследует все методы , кроме двух, доступных для future , поэтому в Python 3.7 вам никогда не нужно создавать объект future напрямую.

Циклы событий

В asyncio цикл событий управляет планированием и связью доступных объектов. Для использования available требуется цикл событий. Каждая программа asyncio имеет по крайней мере один цикл событий. Можно иметь несколько циклов событий, но несколько циклов событий настоятельно не рекомендуется использовать в Python 3.7 .

Ссылка на текущий запущенный объект цикла получается вызовом asyncio.get_running_loop() .

Спящий

asyncio.sleep(delay) сопрограмма блокирует delay секунды. Это полезно для моделирования блокирующего ввода-вывода.

import asyncio

async def main():
    print("Sleep now.")
    await asyncio.sleep(1.5)
    print("OK, wake up!")

asyncio.run(main())
Инициирование основного цикла событий

Канонической точкой входа в программу asyncio является asyncio.run(main()) , где main() является сопрограммой верхнего уровня.

import asyncio

async def my_coro(arg):
    "A coroutine."  
    print(arg)

async def main():
    "The top-level coroutine."
    await my_coro(42)

asyncio.run(main())

Вызов asyncio.run() неявно создает и запускает цикл событий. Объект loop имеет много полезных методов, включая loop.time() , который возвращает float, представляющий текущее время, измеренное внутренними часами цикла.

Примечание : Функция asyncio.run() не может быть вызвана из существующего цикла событий. Поэтому вполне возможно, что вы видите ошибки, если вы запускаете программу в контролирующей среде, такой как Anaconda или Jupyter, которая запускает собственный цикл событий. Примеры программ в этом разделе и следующих разделах должны запускаться непосредственно из командной строки путем выполнения файла python.

Следующая программа печатает строки текста, блокируя на одну секунду после каждой строки до последней.

import asyncio

async def my_coro(delay):
    loop = asyncio.get_running_loop()
    end_time = loop.time() + delay
    while True:
        print("Blocking...")
        await asyncio.sleep(1)
        if loop.time() > end_time:
            print("Done.")
            break

async def main():
    await my_coro(3.0)

asyncio.run(main())

Выход:

Blocking...
Blocking...
Blocking...
Done.
Задачи

Задача-это ожидаемый объект, который обертывает сопрограмму. Чтобы создать и немедленно запланировать задачу, можно вызвать следующие команды:

asyncio.create_task(coro(args...))

Это вернет объект задачи. Создание задачи говорит циклу: “Продолжайте и запустите эту сопрограмму как можно скорее.”

Если вы ждете задачи, выполнение текущей сопрограммы блокируется до тех пор, пока эта задача не будет завершена.

import asyncio

async def my_coro(n):
    print(f"The answer is {n}.")

async def main():
    # By creating the task, it's scheduled to run 
    # concurrently, at the event loop's discretion.
    mytask = asyncio.create_task(my_coro(42))
    
    # If we later await the task, execution stops there
    # until the task is complete. If the task is already
    # complete before it is awaited, nothing is awaited. 
    await mytask

asyncio.run(main())

Выход:

The answer is 42.

Задачи имеют несколько полезных методов управления обернутой сопрограммой. Примечательно, что вы можете запросить отмену задачи, вызвав метод задачи .cancel () . Задача будет запланирована для отмены в следующем цикле цикла событий. Отмена не гарантируется: задача может завершиться до этого цикла, и в этом случае отмена не произойдет.

Сбор Доступен

Доступные могут быть собраны как группа, предоставив их в качестве аргумента списка встроенной сопрограмме asyncio.gather(awaitables) .

Функция asyncio.gather() возвращает ожидаемое, представляющее собранное доступное, и поэтому должна иметь префикс await .

Если какой-либо элемент awaitables является сопрограммой, он немедленно запланирован как задача.

Сбор-это удобный способ запланировать одновременное выполнение нескольких сопрограмм в качестве задач. Он также связывает собранные задачи некоторыми полезными способами:

  • Когда все собранные задачи завершены, их совокупные возвращаемые значения возвращаются в виде списка, упорядоченного в соответствии с порядком списка awaitables .
  • Любая собранная задача может быть отменена, не отменяя другие задачи.
  • Сам сбор можно отменить, отменив все задания.
Пример: Асинхронные веб-запросы с aiohttp

Следующий пример иллюстрирует, как эти высокоуровневые API asyncio могут быть реализованы. Ниже приводится модифицированная версия, обновленная для Python 3.7, из изящного примера asyncio Скотта Робинсона . Его программа использует модуль aiohttp , чтобы захватить верхние посты на Reddit и вывести их на консоль.

Убедитесь, что у вас установлен модуль aiohttp , прежде чем запускать приведенный ниже скрипт. Вы можете загрузить модуль с помощью следующей команды pip:

$ pip install --user aiohttp
import sys  
import asyncio  
import aiohttp  
import json
import datetime

async def get_json(client, url):  
    async with client.get(url) as response:
        assert response.status == 200
        return await response.read()

async def get_reddit_top(subreddit, client, numposts):  
    data = await get_json(client, 'https://www.reddit.com/r/' + 
        subreddit + '/top.json?sort=top&t=day&limit=' +
        str(numposts))

    print(f'\n/r/{subreddit}:')

    j = json.loads(data.decode('utf-8'))
    for i in j['data']['children']:
        score = i['data']['score']
        title = i['data']['title']
        link = i['data']['url']
        print('\t' + str(score) + ': ' + title + '\n\t\t(' + link + ')')

async def main():
    print(datetime.datetime.now().strftime("%A, %B %d, %I:%M %p"))
    print('---------------------------')
    loop = asyncio.get_running_loop()  
    async with aiohttp.ClientSession(loop=loop) as client:
        await asyncio.gather(
            get_reddit_top('python', client, 3),
            get_reddit_top('programming', client, 4),
            get_reddit_top('asyncio', client, 2),
            get_reddit_top('dailyprogrammer', client, 1)
            )

asyncio.run(main())

Если вы запустите программу несколько раз, вы увидите, что порядок вывода меняется. Это связано с тем, что запросы JSON отображаются по мере их получения, что зависит от времени отклика сервера и промежуточной задержки сети. В системе Linux вы можете наблюдать это в действии, запустив скрипт с префиксом (например) watch-n 5 , который будет обновлять выходные данные каждые 5 секунд:

Другие высокоуровневые API

Надеюсь, этот обзор даст вам прочную основу для того, как, когда и почему использовать asyncio. Другие высокоуровневые API asyncio, не рассматриваемые здесь, включают:

  • stream , набор высокоуровневых сетевых примитивов для управления асинхронными TCP-событиями.
  • lock , event , condition , асинхронные аналоги примитивов синхронизации, представленных в модуле threading .
  • subprocess , набор инструментов для запуска асинхронных подпроцессов, таких как команды оболочки.
  • queue , асинхронный аналог модуля queue .
  • exception , для обработки исключений в асинхронном коде.

Вывод

Имейте в виду, что даже если ваша программа не требует асинхронности по соображениям производительности, вы все равно можете использовать asyncio , если предпочитаете писать в асинхронной парадигме. Я надеюсь, что этот обзор даст вам четкое представление о том, как, когда и почему начать использовать use asyncio .