Рубрики
Без рубрики

Хранение информации о контексте для асинсио

Реализуйте хранилище контекстной информации для Asyncio в Python. Tagged с Dev, Python, OpenSource.

В SQREEN , мы строим агент на основе динамических инструментов. Он обнаруживает инциденты безопасности изнутри приложения (инъекции, перекрестные сценарии и т. Д.) И позволяет пользователям настраивать действия (блокировка атаки, регистрация трассировки стека и т. Д.), Не требуя изменения кода. Механизмы динамического инструментария в питоне описаны в Предыдущий пост в блоге Анкет Динамическая инструментария также используется в решениях управления производительностью приложений (APM), таких как DataDog , Instana и Новая реликвия Анкет

Инструментация кода позволяет нам выполнять обратные вызовы перед вызовом потенциально опасных функций. Например, к Защита от инъекций SQL , мы прозрачно завершаем метод Cursor.execute с помощью уровня безопасности:

def sqreen_execute(self, sql_stmt, *sql_params):
    # Before executing the SQL statement, check it was not built with
    # malicious, unescaped request parameters in it.
    if has_malicious_param(sql_stmt, request.params):
        # If there is, this is an SQL injection. Abort!
        raise SQLinjection(remote_addr=request.remote_addr)
    else:
        # If not, we can safely call the original method.
        return self.execute(sql_stmt, *sql_params)

Давайте предположим, что инструментальный код содержит уязвимый шаблон, такой как:

@app.route('/posts')
def posts(request):
    sql_stmt = 'SELECT * FROM posts WHERE id=%s' % request.params['id']

    # With dynamic instrumentation, sqreen_execute is transparently called
    # instead of cursor.execute.
    posts = cursor.execute(sql_stmt)

    return posts_template.render(posts)

Затем будет выполнен номинальный запрос (хотя параметр запроса не является злонамеренным), но злонамеренный запрос или не будет. Таким образом, мы можем защитить приложение, не нарушая его!

Контекстная информация хранение

Как мы видели выше, функция SQReen_Execute должна знать текущий запрос на проверку безопасности оператора SQL. Как это может получить?

Поскольку наша функция прозрачно заменяет Cursor.execute, она должна иметь одинаковую подпись, поэтому мы не можем передать запрос в качестве параметра SQREEN_EXECUTE. Некоторые веб -фреймворки предоставляют функции для получения текущих запросов, но не все из них, и мы стремимся к универсальному решению.

Что мы можем сделать, так это вставить промежуточное программное обеспечение (или придать механизм обработки запросов Framework) для хранения текущего запроса в глобальной переменной:

CURRENT_REQUEST = None

def set_request(request):
    global CURRENT_REQUEST
    current_request = request

def get_request():
    return CURRENT_REQUEST

Но есть улов: веб -фреймворки могут выполнять несколько запросов одновременно, по очевидным причинам производительности. Таким образом, приведенная выше шаблон не будет работать: мы можем получить первую запрос запроса_1 (и сохранить его в current_request), и перед тем, как он получить второй запрос на запрос, request_2 (переоценка request_1 в current_request). В то время, когда мы ищем SQL -инъекции в request_1, мы испортим это с request_2! Таким образом, нам нужен более сильный, параллельный защитный механизм для хранения текущего запроса.

Резьбое локальное хранилище

Чтобы решить эту проблему, мы должны знать, как реализуется параллелизм. Большинство веб -фреймворков Python используют потоки: это, в частности, случай Django В Колба а также Пирамида , которые, вероятно, самые популярные. Они реализуют общий протокол связи с веб -серверами, называемым WSGI (интерфейс шлюза веб -сервера) и первоначально описанный в PEP 333 Анкет

Серверы WSGI также используют потоки, наряду с процессами, чтобы породить несколько экземпляров приложения. Многоподобность здесь не является проблемой, поскольку каждый процесс будет обрабатывать свою собственную копию current_request. Таким образом, нам просто нужно найти решение, чтобы предоставить сервисному потоку хранить запрос, с которым он в настоящее время имеет дело, не влияя на другие потоки.

И Python предлагает решение для этого. Функциональная потока.local В стандартной библиотеке Верните объект пространства имен, значения которого являются специфичными для потока. Это позволяет нам реализовать хранилище запросов, безопасных для потока следующим образом:

import threading

RUNTIME_STORAGE = threading.local()
RUNTIME_STORAGE.request = None

def set_request(request):
    RUNTIME_STORAGE.request = request

def get_request():
    return RUNTIME_STORAGE.request

А как насчет асинсио?

В Python 3.4 была введена новая модель параллелизма: асинсио . Он обеспечивает инфраструктуру для однопоточного асинхронного программирования, включая:

  • Функции Coroutine, определяемые с Async def, чье выполнение можно приостановить, используя ключевое слово wait с другой коратиной, и возобновится после завершения другой коратики.
  • Цикл события, чтобы запланировать и выполнить Coroutines.

Вот пример асинхронного кода.

import asyncio

async def compute(x, y):
    print("Compute %s + %s ..." % (x, y))
    await asyncio.sleep(1.0)
    return x + y

async def print_sum(x, y):
    result = await compute(x, y)
    print("%s + %s = %s" % (x, y, result))

loop = asyncio.get_event_loop()
loop.run_until_complete(print_sum(1, 2))
loop.close()

Есть две функции Coroutine, print_sum и вычисление. Во время исполнения

  • Цикл события входит в Print_Sum и сразу же вручает в вычисление.
  • Вычислите распечатки вычисления и передает Asyncio.sleep.
  • Ничего не сделано в следующую секунду. Если бы другие задачи были запланированы в цикле событий, они могли бы быть выполнены в то же время, что -то невозможно при времени блокировки.
  • Вычисление возобновляется и завершено.
  • print_sum возобновлен и завершен.

Asyncio является отличной моделью для параллелизма, когда участвует IO: когда выполняемый код заблокирован в ожидании ответа (например, результаты БД), программа может переключаться на другие задачи и вернуться к нему позже. Он менее системный, чем потоки, и обычно быстрее, когда участвуют медленные операции ввода-вывода.

Это делает Asyncio хорошо подходящим для сетевых операций и, несмотря на то, что он относительно молод, Несколько веб -фреймворков были разработаны вокруг него. Среди них недавно мы принесли поддержку aiohttp в нашем агенте. Это была очень интересная и сложная задача, так как до сих пор мы вообще не поддерживали AIOHTTP, и важная проблема, с которой мы столкнулись с механизмом хранения запросов.

Вот что может произойти: мы получаем первую запрос запроса_1 и начинаем иметь дело с ним в коратике. В какой -то момент коратика приостановлена, а событие переходит к другому, который обрабатывает request_2. Важным моментом является то, что эти две коратики выполняются в та же нить , таким образом, потоковое. Когда первая Coroutine возобновится, Runtime_storage.Request был установлен на request_2: это именно то, что мы хотим предотвратить.

Первая попытка: давайте используем задачи!

Нам нужен механизм, похожий на потоки. , то есть позволяет нам хранить переменные контекста и отслеживать значения в контексте асинхронного выполнения.

К сожалению, в настоящее время в Python нет встроенного механизма. Были представлены различные предложения для предоставления общего решения в будущих версиях Python ( PEP 550 , PEP 567 ), но в то же время мы должны разработать решение самостоятельно.

Давайте немного углубимся в внутреннюю часть Асинсио. Корука, чье исполнение запланировано, обернута в асинсио. Задача объект Отвечает за выполнение объекта Coroutine в цикле событий.

Схема последовательности примера

Существует также функция асинсио. Task.current_task, который возвращает в настоящее время запущенное задание. MMH … мы могли бы использовать это для сопоставления текущего запроса с задачей, обрабатывающей его. Что -то подобное может сработать:

import asyncio

TASK_REQUESTS = {}

def set_request(request):
    task = asyncio.Task.current_task()
    TASK_REQUESTS[id(task)] = request

def get_request():
    task = asyncio.Task.current_task()
    return TASK_REQUESTS.get(id(task))

При этой реализации нам также понадобится механизм, чтобы обеспечить удаление запроса после выполнения задачи, чтобы избежать накопления старых запросов и вызвать утечку памяти. Способ не иметь отношения к этому – хранить запрос внутри объект задачи, как дополнительный атрибут:

def set_request(request):
     task = asyncio.Task.current_task()
     setattr(task, 'current_request', request)

 def get_request():
     task = asyncio.Task.current_task()
     return getattr(task, 'current_request', None)

Итак, это работает? Давайте протестируем!

import random

class Request:
    # Dummy request object, for the sake of testing.
    pass

async def handle_request(request):
    set_request(request)
    await asyncio.sleep(random.uniform(0, 2))
    await check_request(request)

async def check_request(request):
    # Check that the stored request corresponds to the current request. If not,
    # an AssertionError is raised and the test is interrupted with an error.
    assert get_request() is request, "wrong request"

NUM_REQUESTS = 1000

loop = asyncio.get_event_loop()
coros = [handle_request(Request()) for _ in range(NUM_REQUESTS)]
loop.run_until_complete(asyncio.gather(*coros))
loop.close()
print("Success!")

Этот тест имитирует тысячу параллельных запросов. Каждый обрабатывается в выделенной Coroutine handle_request. Эта функция хранит запрос, затем делает паузу для случайной продолжительности (это имитирует асинхронную операцию, такую как доступ DB, и гарантирует, что поток выполнения COROUTINE пройден). Когда возобновится, вложенная Coroutine check_request, которая гарантирует, что get_request возвращает правильный запрос. Если нет, тест прерывается ошибкой.

А вот несколько хороших новостей: тест работает плавно с хранением запросов на основе задач. Он также не работает с хранением нитока, которое ожидалось, но показывает, что тест актуален. Итак, мы решили нашу проблему?

Контекст наследования между задачами

Давайте попробуем что -нибудь немного более скрученное:

async def handle_request(request):
    set_request(request)
    await asyncio.gather(
        asyncio.sleep(random.uniform(0, 2)),
        check_request(request),
    )

Вместо того, чтобы выполнять асинсио. Это не должно иметь большого значения: код немного более одновременно, но он не влияет на обработку запросов. В частности, Check_Request все еще называется после set_request для каждого запроса.

Тем не менее, этот новый тест не удается! Что -то пошло не так, когда мы представили Asyncio.gather, но что?

Ну, помните, что запланированные коратики обернуты в задачи? Это именно то, что происходит здесь: Asyncio.gather создает задачи вокруг аргументов asyncio.sleep () и check_request (), и эти задачи выполняются с помощью цикла событий.

async def handle_request(request):
    set_request(request)                              # Running in task 1.
    await asyncio.gather(
        asyncio.sleep(random.uniform(0, 2)),          # Create child task 2.
        check_request(request),                       # Create child task 3.
    )

async def check_request(request):
    assert get_request() is request, "wrong request"  # Running in task 3.

Следствием этого является то, что set_request и get_request вызываются в разных задачах, что делает тест неудачным. Это не связано с запросом, как мы можем проверить, установив num_requests на 1: тест продолжает сбой.

Фактически, при вызове get_request из детской задачи нам нужен механизм для извлечения запроса из родительской задачи, если он не определен в детской задаче. Но Asyncio не позволяет нам получить доступ к родительской задаче, так что это не сработает.

С другой стороны, что -то асинсио давайте сделаем это, это заменить функцию, вызванную для создания новых задач, a.k.a. Заводка задачи Анкет Эта функция вызывается в контексте родительской задачи и возвращает новую дочернюю задачу. Что ж, давайте воспользуемся его для украшения дочерней задачи с помощью текущего запроса!

Вот как будет выглядеть заводская завод «Аквация с запросом»:

def request_task_factory(loop, coro):
    # This is the default way to create a child task.
    child_task = asyncio.tasks.Task(coro, loop=loop)

    # Retrieve the request from the parent task...
    parent_task = asyncio.Task.current_task(loop=loop)
    current_request = getattr(parent_task, 'current_request', None)

    # ...and store it in the child task too.
    setattr(child_task, 'current_request', current_request)

    return child_task

Чтобы установить заводскую завод, нам также необходимо вызовать LOOP.Set_task_factory (request_task_factory), прежде чем запустить цикл. Итак, вот окончательная версия нашего кода:

import asyncio
import random

class Request:
    pass

def set_request(request):
    task = asyncio.Task.current_task()
    setattr(task, 'current_request', request)

def get_request():
    task = asyncio.Task.current_task()
    return getattr(task, 'current_request', None)

def request_task_factory(loop, coro):
    child_task = asyncio.tasks.Task(coro, loop=loop)
    parent_task = asyncio.Task.current_task(loop=loop)
    current_request = getattr(parent_task, 'current_request', None)
    setattr(child_task, 'current_request', current_request)
    return child_task

async def handle_request(request):
    set_request(request)
    await asyncio.gather(
        asyncio.sleep(random.uniform(0, 2)),
        check_request(request),
    )

async def check_request(request):
    assert get_request() is request

NUM_REQUESTS = 1000

loop = asyncio.get_event_loop()
loop.set_task_factory(request_task_factory)
coros = [handle_request(Request()) for _ in range(NUM_REQUESTS)]
loop.run_until_complete(asyncio.gather(*coros))
loop.close()

И это работает безупречно!

И что теперь?

Теперь у нас есть основы для решения проблемы хранения запросов в нашем агенте. Поскольку мы хотим, чтобы агент работал максимально прозрачным, а не требовал изменения кода от пользователя, все еще есть две незначительные проблемы, которые необходимо решить:

  • Мы хотим автоматически настроить заводскую завод. Это будет сделано с помощью динамического инструментария.
  • Но если пользователь настроен пользовательская задача, мы не хотим перезаписать ее. Вместо этого мы завершим его самостоятельно.

Давайте начнем со второй проблемы. Мы можем определить универсальную функцию wroud_request_task_factory, которая принимает заводскую завод в качестве аргумента и возвращает его вариант, который поддерживает распространение запроса. Код обернутой функции действительно близок к запросу request_task_factory выше:

from functools import wraps

def wrap_request_task_factory(task_factory):

    @wraps(task_factory)
    def wrapped(loop, coro):
        child_task = task_factory(loop, coro)
        parent_task = asyncio.Task.current_task(loop=loop)
        current_request = getattr(parent_task, 'current_request', None)
        setattr(child_task, 'current_request', current_request)
        return child_task

    return wrapped

Затем определение request_task_factory может быть упрощено:

@wrap_request_task_factory
def request_task_factory(loop, coro):
    asyncio.Task.current_task(loop=loop)

Время вернуться к динамическим инструментальным инструментам. Подключив систему импорта, мы можем прозрачно заменить импортный класс на пользовательский. Итак, давайте определим функцию patch_loop_cls, которая создает пользовательский класс цикла с желаемым поведением:

def wrap_loop_cls(loop_cls):

    class SqreenLoop(loop_cls):

        def __init__(self, *args, **kwargs):
            super().__init__(*args, **kwargs)
            # We want to use request_task_factory to be the default task
            # factory.
            super().set_task_factory(request_task_factory)

        def set_task_factory(self, task_factory):
            # If the user sets up a custom task factory, let's wrap it with
            # request propagation.
            wrapped_task_factory = wrap_request_task_factory(task_factory)
            super().set_task_factory(wrapped_task_factory)

    return SqreenLoop

Этот класс цикла прозрачно заменяет базовый. По умолчанию он использует правильную заводскую задачу и позволяет пользователю изменить его при сохранении уровня управления запросами.

Заключительные слова

Мы опубликовали большую часть этой работы (без инструментальной части) в библиотеке Python под названием AioContext Анкет Это поставляется с общими контекстными объектами, которые ведут себя как словаря. Это также позволяет восстановить исходную заводскую завод, если контексты больше не нужны, и хранит контексты в качестве дополнительного атрибута заводской завод, чтобы не возиться с асинсио. Сам класс задания. Документация доступна Здесь Анкет

import asyncio
import aiocontext
import random

class Request:
    pass

CONTEXT = aiocontext.Context()

async def handle_request(request):
    CONTEXT['current_request'] = request
    await asyncio.gather(
        asyncio.sleep(random.uniform(0, 2)),
        check_request(request),
    )

async def check_request(request):
    assert CONTEXT['current_request'] is request

NUM_REQUESTS = 1000

loop = asyncio.get_event_loop()
aiocontext.wrap_task_factory(loop)
CONTEXT.attach(loop)
coros = [handle_request(Request()) for _ in range(NUM_REQUESTS)]
loop.run_until_complete(asyncio.gather(*coros))
loop.close()

Эта работа была сильно вдохновлена внедрением в блоге в руководстве Миранды От колбы до aiohttp и библиотека aiotask-context Анкет Мы хотим поблагодарить его за большой вклад.

Пост Хранение информации о контексте для Asyncio появился первым на SQREEN BLOG | Современная безопасность приложений и был написан Vivien Maisonneuve.

Оригинал: “https://dev.to/sqreenio/context-information-storage-for-asyncio-5303”