Рубрики
Без рубрики

Выберите COUNT (*) из Group Dynamodb с помощью потоков

Я недавно играл с потоками Dynamodb для бокового проекта. Я узнал несколько … Помечено с AWS, Lambda, Dynamodb, Python.

Я недавно играл с потоками Dynamodb для бокового проекта. Я узнал несколько вещей, пока я получал его настройки И я собрал эту статью, чтобы поделиться тем, что я узнал.

Этот учебник покажет вам, как использовать потоки DynamOdb для поддержания подсчета различных группировков элементов в той же таблице, что и сами элементы. Для каждого уникального значения PK1 в таблице я буду поддерживать подсчет продаж в день, в месяц, в год и за все время. Ключ сортировки будет значение даты. Каждая вставка новой продажи приведет к четырем обновлениям таблицы. Если элемент графика не существует на дату, оно будет автоматически создано. Смотрите пример подсчета элементов ниже.

Я мог бы использовать традиционную реляционную базу данных и рассчитать счет на запросе каждый раз, когда мне нужно. Мой запрос будет выглядеть что-то подобное:

SELECT COUNT(*)
FROM tablea a
WHERE a.key = 'PROD#0001'
AND a.orderdate BETWEEN '2021-03-01 00:00:00.000' AND '2021-03-31 23:59:59.999';

Это всегда будет возвращать количество соответствующих пунктов в таблице на 2021 марта. В большинстве случаев при условии, что вы используете первичный ключ и/или индексируемое поле, это будет достаточно быстро вернуться. Однако это не масштабируется и производительность не будет последовательным. Поскольку данные растут в таблице, запрос займет больше времени. Также, в зависимости от нагрузки на базу данных, производительность этого запроса может быть затронута.

Именно по этой причине я хочу использовать Dynamodb. Я не знаю ни одной другой базы данных, которая дает вам этот уровень согласованности и быстрой производительности с помощью модели оплаты за использование.

Однако dynamodb не является традиционным СУБД, это база данных NoSQL, предназначенная для масштабирования. Это основная архитектура предназначена для того, чтобы таблица была оценена практически безграничным размером и для последовательного периода времени работы в миллисекундном запросе. Для этого dynamodb накладывает определенные ограничения на запросы, которые вы можете выпускать против таблицы. Заявление SQL, в котором вы пересчитаете количество на лету каждый раз, невозможно в Dynamodb.

Теоретически вы можете выдать широкий запрос, который сканирует самый широкий спектр предметов. Однако это неэффективно и много накладных на каждом вызове. Алекс Дебри делает гораздо лучше объяснять это, чем когда-либо мог в своей статье о выражениях фильтра, https://www.alexdebrie.com/posts/dynamodb-filter-expressions/ . Отказ Оставляя выпуск производительности в одну сторону, очевидную причину не использовать это то, что одному операции запроса DynamOdb может только прочитать до максимума 1 МБ. Поэтому, если объединенный размер всех предметов, которые вам нужно подсчитать, больше этого требуется, ваша транзакция запросов также будет ограничена этим лимитом.

Вы можете выдать описательный запрос на ваш стол. Это возвращает ItemCount для таблицы и другой для каждого GSI и LSI на столе. Он возвращает счет количества предметов в каждом индексе, но не сможет получить счет на любом более тонком зерне, чем это.

Поэтому, если мы не можем посчитать соответствующие предметы, читая каждую один из них на лету, какие варианты? Dynamodb имеет функцию называемую потоками, которые можно использовать для поддержания записанных элементов, удаленных и обновленных на таблице. Остальная часть этой статьи показывает, как я реализовал это для создания решения, в конечном итоге, в конечном итоге более эффективно, чем традиционный подход (*).

Я поделился ссылками GitHub со всем кодом и шаблон AWS SAM в конце статьи, которая соединяет все это вместе. На данный момент я покажу вам, как настроить это, пройдя через консоль. Я думаю, что это лучший способ проиллюстрировать решение.

Динамодб

1) Войдите в консоль AWS и перейдите на консоль dynamodb. 2) Нажмите «Создать таблицу» и введите данные, как показано ниже.

Оставьте все остальные настройки по умолчанию и нажмите «Создать».

3) После создания таблицы вы будете доставлены на вкладку «Обзор» для вашей новой таблицы. Нажмите Управление кнопкой потока DynamOdb на этой вкладке.

Выберите новую и старую опцию изображений. Нам нужно будет отфильтровать записи агрегации из общего количества, поэтому потребуют данные элемента для этого.

Ваш поток теперь создан для таблицы. Каждый добавленный элемент, обновленный или удаленный из таблицы, приведет к новой записи в потоке, содержащий значения нового элемента и старого элемента в случае обновления существующего элемента. Скопируйте таблицу dynamodb arn. Вам понадобится это при настройке политики IAM.

Я

Прежде чем отправиться в Lambda, вы должны создать политику IAM и ассоциируйте ее к роли.

4) Перейдите в IAM Console и выберите Роли в меню управления доступом в левой части экрана. Это откроет новый экран, показывающий все пользовательские роли, созданные на вашей учетной записи. Нажмите Создать роль.

5) Выберите лямбда в разделе «Выбрать корпус на использование» и нажмите «Далее: разрешения».

6) Как мы хотим ограничить разрешение на отдельных ресурсов, поэтому мы хотим создать пользовательскую политику. Нажмите Создать политику. Это откроет новый экран.

7) На экране «Создать политики» щелкните вкладку JSON и скопируйте вставку JSON на следующих 3 разделах ниже квадратных скобок после оператора. Обновите ARN в полях ресурсов в каждом.

7.1) Разрешить роль бревенчатый CloudWatch

    {
    "Effect": "Allow",
    "Action": [
        "logs:CreateLogGroup",
        "logs:CreateLogStream",
        "logs:PutLogEvents"
    ],
    "Resource": [
        "arn:aws:logs:[region name]:[aws account number]:*"
        ]
    },

7.2) Разрешить роль читать из потока

    {
        "Effect": "Allow",
        "Action": [
            "dynamodb:GetShardIterator",
            "dynamodb:DescribeStream",
            "dynamodb:GetRecords",
            "dynamodb:ListStreams"
        ],
        "Resource": [
            "[Put your DynamoDB table arn here]/stream/*"
        ]
    },

7.3) Разрешить элементы обновления роли на столе

    {
        "Effect": "Allow",
        "Action": "dynamodb:UpdateItem",
        "Resource": "[Put your DynamoDB table arn here]"
    }

8) Нажмите Далее: Теги, а затем следующие: Обзор. 9) Вызовите Policy StreamsDemo и нажмите Создать политику. 10) С созданной политикой вы можете вернуться к экрану создания роли и прикрепить политику StreamsDemo.

11) Нажмите Далее: Теги, а затем следующие: Обзор. 12) На экране обзора вызовите роль StreamsDemo и нажмите «Создать роль».

Лямбда

13) Перейдите на консоль лямбда и нажмите «Создать функцию». Выберите автор с нуля, используйте StreamsDemo в качестве имени функции и выберите Python 3.8 в качестве времени выполнения. Разверните раздел роль выполнения по умолчанию Изменить экран. Выберите использовать существующую роль и выберите роль streamsDemo. Оставьте все остальные настройки, как они. Нажмите Создать функцию.

14) На вкладке «Обзор функций» нажмите + «Добавить триггер». Выберите DynamOdb. Я думаю, что это должно сказать, что потоки Dynamodb, поскольку триггер – это то, что добавляется к потоку, а не таблицу, но это может быть только я педантичным.

Заполните детали, как показано ниже. Выберите ARN таблицы, создаваемую ранее из раскрывающегося списка таблицы Dynamodb. Установите партию и запуск горизонта.

15) Теперь мы можем начать кодирование. На экране обзора функции лямбда Перейдите в код и нажмите на Lambda_function .py

15.1) Импортируйте следующие библиотеки.

import json
import boto3
import traceback
from botocore.exceptions import ClientError

15.2) Каждый лямбда должен иметь функцию lambda_handler. Это точка входа в приложение. Тем не менее, хорошая практика, чтобы сохранить это как можно просто и поместить логику в другие функции.

def lambda_handler(event, context):
    try:
        return _lambda_handler(event, context)
    except Exception:
        print (traceback.format_exc())

15.3) Используйте эту отдельную функцию для цикла через записи и логику Orchestrate.

def _lambda_handler(event, context):

    records = event['Records']

    record1 = records[0]
    tableName = parseStreamArn(record1['eventSourceARN'])

    for record in records:

        event_name = record['eventName'].upper()  # INSERT, MODIFY, REMOVE
        pkValue = record['dynamodb']['Keys']['pk1']['S']
        skValue = record['dynamodb']['Keys']['sk1']['S']
        #print(keyValue)

        if (event_name == 'INSERT') and "sales_cnt" not in record['dynamodb']["NewImage"]:
            print(event_name)
            updateCounter(tableName,pkValue,skValue,1)

        #if (event_name == 'REMOVE') and "sales_cnt" not in record['dynamodb']["NewImage"]:
        #    updateCounter(tableName,pkValue,skValue,-1)

    return 'Successfully processed {} records.'.format(len(event['Records']))

15.4) Я пытаюсь сделать этот код лямбда как можно более многоразовым. Поэтому я не хочу жесткокодировать имя таблицы, поэтому я включил эту функцию для анализа имени таблицы из записи потока.

def parseStreamArn(streamARN):
    tableName = streamARN.split(':')[5].split('/')[1]
    return(tableName)

15.5) Чтобы связать обработчик и функцию обновления, я создал эту функцию для создания словаря значений, которые будут вставлены или обновлены в виде клавиш сортировки в элементах агрегации в сочетании с ключом перегородки элемента запуска.

def updateCounter(tableName,pkValue,skValue,counter):
    #always increment 0000 entry
    counterKey = [skValue[0:10], skValue[0:8]+ "00",skValue[0:5]+ "00-00","0000-00-00"]
    #persist changes to table
    updateDDBTable(tableName,pkValue,counterKey,counter)

15.6) Заявление об обновлении является секретный соус здесь, особенно параметр UpdateExpression. Условие IF_NOT_EXISS гарантирует, что приложение может вставить и обновлять записи с одним вызовом без четки явного существования. Если запись с этим первичным ключом не существует, он будет создан с помощью поля Sales_cnt, установленным значение init (0 в этом случае), а затем значение NUM (1 в этом случае) будет добавлено к нему все в один звонок. Этот код необходим, поскольку он автоматически добавит новые записи агрегации каждый день, который продается продукт. Каждая другая продажа того же продукта в тот же день/месяц/год будет просто обновлять ту же запись, созданную первоначальной продажей.

def updateDDBTable(tableName,pkValue,counterKey,counter):
    dynamodb = boto3.resource('dynamodb')

    #Get table name from stream. Updates will be written back to same table
    dynamodb_table = dynamodb.Table(tableName)

    #loop through collection
    for i in counterKey:
        dynamodb_table.update_item(
                Key={
                        'pk1': pkValue,
                        'sk1': i,
                    },
                UpdateExpression="set sales_cnt = ((if_not_exists(sales_cnt,:init)) + :num)", #if record doesn't exist, create it
                ExpressionAttributeValues={
                        ':init': 0, #new record will be created with 0 + num value
                        ':num': counter
                    },
                ReturnValues="NONE"
                )

15.7) Это код лямбда в полном объеме.

import json
import boto3
import traceback
from botocore.exceptions import ClientError

def updateDDBTable(tableName,pkValue,counterKey,counter):
    dynamodb = boto3.resource('dynamodb')

    #Get table name from stream. Updates will be written back to same table
    dynamodb_table = dynamodb.Table(tableName)

    #loop through collection
    for i in counterKey:
        dynamodb_table.update_item(
                Key={
                        'pk1': pkValue,
                        'sk1': i,
                    },
                UpdateExpression="set sales_cnt = ((if_not_exists(sales_cnt,:init)) + :num)", #if record doesn't exist, create it
                ExpressionAttributeValues={
                        ':init': 0, #new record will be created with 0 + num value
                        ':num': counter
                    },
                ReturnValues="NONE"
                )

def updateCounter(tableName,pkValue,skValue,counter):
    #always increment 0000 entry
    counterKey = [skValue[0:10], skValue[0:8]+ "00",skValue[0:5]+ "00-00","0000-00-00"]
    #persist changes to table
    updateDDBTable(tableName,pkValue,counterKey,counter)

def parseStreamArn(streamARN):
    tableName = streamARN.split(':')[5].split('/')[1]
    return(tableName)

def _lambda_handler(event, context):

    records = event['Records']

    record1 = records[0]
    tableName = parseStreamArn(record1['eventSourceARN'])

    for record in records:

        event_name = record['eventName'].upper()  # INSERT, MODIFY, REMOVE
        pkValue = record['dynamodb']['Keys']['pk1']['S']
        skValue = record['dynamodb']['Keys']['sk1']['S']
        #print(keyValue)

        if (event_name == 'INSERT') and "sales_cnt" not in record['dynamodb']["NewImage"]:
            print(event_name)
            updateCounter(tableName,pkValue,skValue,1)

        #if (event_name == 'REMOVE') and "sales_cnt" not in record['dynamodb']["NewImage"]:
        #    updateCounter(tableName,pkValue,skValue,-1)

    return 'Successfully processed {} records.'.format(len(event['Records']))

def lambda_handler(event, context):
    try:
        return _lambda_handler(event, context)
    except Exception:
        print (traceback.format_exc())

В моем Git Reppo я включил скрипт (/tests/dataload.py), который проходит через два итератора для генерации 100 уникальных записей для вставки. Вы можете запустить это несколько раз, чтобы создать больше записей. Просто обновите параметр имени таблицы на свой собственный стол, если вы использовали другой.

Вы можете использовать запросы ниже, чтобы запросить таблицу с CLI.

aws dynamodb query --table-name streamsdemo --key-condition-expression "pk1 = :name" --expression-attribute-values  '{":name":{"S":"PROD#0001"}}'

aws dynamodb query --table-name streamsdemo --key-condition-expression "pk1 = :name and sk1 = :date" --expression-attribute-values  '{":name":{"S":"PROD#0001"},":date":{"S":"2021-03-00"}}'

aws dynamodb describe-table --table-name streamsdemo

Я также включил сценарии Python в репо, чтобы запустить одни и те же вопросы. Смотрите/tests/query1.py,/tests/query1.py,/tests/query2.py.

Если у вас еще нет AWS Cloudshell, вы должны попробовать это. Это отличный способ выполнить эти скрипты без необходимости настроить любые ключи доступа/секретные ключи от вашей машины.

Размер партии vs timeout vs память

При использовании шаблона SAM для развертывания приложения я изначально имел размер партии, установленный на 10. Это означает, что лямбда попытается привлечь до 10 записей от потока, когда он вызывается. Если у вас есть большая сумма записей, это уменьшит количество раз, когда лямбда вызывает с одним из разных записей один раз на 10 записей. Когда я обрабатываю каждую запись в сингулярно, это означает, что моя функция Lambda будет работать для O (n) в одном вызове. Обработка 10 записей займет 10 раз, если обработка 1 запись. Я не установил явную тайм-аут, чтобы по умолчанию по умолчанию до 3 секунд. Попытка обрабатывать 10 записей за 3 секунды привело к таймаутам и выделил ошибку в моей установке. Когда тайм-ауты происходят при размере партии, превышающие 1, все записи будут возвращены в очередь для обработки. Это приводит к записям, считается более одного раза и в конечном итоге неверное количество. Например, если бы время ожидания после 5-й записи был подсчитан, Lambda будет думать, что никаких записей не обрабатывалась и поставила все 10 обратно на очередь.

Чтобы исправить это, у меня было 3 рычагов, чтобы попробовать, уменьшить размер партии, увеличить тайм-аут или увеличить память. Увеличение памяти не имело никакого эффекта, так как это небольшая функция. Память потребляется в обоих случаях нормализована около 75 МБ, значительно ниже минимальной 128 МБ. Увеличение тайм-аута до 30 секунд, когда размер партии был установлен на 10, решил проблему тайм-аута в этом случае. Однако я в конечном итоге пошел с уменьшением размера пакета до 1. Тайм-ауты могут случиться по другим причинам и установить размер пакетного размера до 1, уменьшает радиус взрыва любого таймаута до 1 записи, минимизируя шансы подсчета записей несколько раз.

Расходы

Лямбда заряжена за количество вызовов и длительности *, выделенную памятью. Решение по установке размера партии до 1 записи результатов в 10 раз больше вызовов, чем размер партии из 10 записей. Это не приводит к увеличению затрат на 10x, однако, как выступальная продолжительность 10x ниже. Более высокое количество вызовов, по-видимому, рассчитывает больше до стоимости, чем платежную продолжительность, но не массивно. Разница в размере 1,80 долл. США будет стоить того, чтобы обеспечить более точные счетчики.

128 МБ. 3 секунды 1 запись 365 мс. 9,29 долл. США / 10 миллионов
128 МБ. 30 секунд 10 записей 3650 мс. $ 7,49 / 1 миллион долларов

Все цены на ценообразование происходят из калькулятора цен на лямбда на этой странице.

https://aws.amazon.com/lambda/pricing/

Этот шаблон не будет для каждого случая использования, и есть и другие способы выполнения требований. Для бокового проекта я работаю, таблица DynamoDB будет иметь высокую читающую к соотношению записи, что означает, что ему придется поддерживать гораздо больше читать, чем пишет. С низким количеством пишет, используемые потоки Dynamodb и лямбда, будут очень низкими. Наличие количества предметов, предварительно рассчитанных и кэшированных в отдельной записи, значительно уменьшит время запроса для приложения. Пожалуйста, прокомментируйте ниже, если у вас есть какие-либо вопросы. Я рад помочь.

https://github.com/thomasmilner/ddbstreamsdemo

Оригинал: “https://dev.to/aws-builders/select-count-from-dynamodb-group-by-pk1-sk1-with-streams-43dj”