Рубрики
Без рубрики

Подписчик SNS для вашего API

В этом посте мы увидим, как добавить подписчика API для уведомлений SNS. Для этого случая … с меткой AWS, колба, Python, SNS.

В этом посте мы увидим, как добавить подписчика API для уведомлений SNS. Для этого случая сценарий будет следующим:

  • API реализован в колбе
  • Бюджет AWS будет производителем сообщений
  • У нас есть бюджет, настроенный с порогом 90%
  • Подписчик будет как тип HTTP/HTTPS

Этот пост будет объяснен на этих шагах:

  1. Понимание того, как подписчик API для уведомлений SNS должен обрабатывать сообщение.
  2. Аутентификация сообщения от AWS.
  3. Подтвердите подписку на тему SNS.
  4. Как подтвердить подписку на предупреждение о бюджете.
  5. Выполнить действие.

Пойдем!

Обработка уведомления SNS

Абонент API для уведомлений SNS должен иметь возможность выполнить эти три действия:

  1. Проверьте, что сообщение идет от AWS.
  2. Подтвердите подписчик.
  3. Запустить желаемое действие.

Проверка подписи AWS

AWS определил необходимые шаги для успешной проверки подписи. Вы можете прочитать об этом здесь. Я написал этот шаг в качестве декоратора, так как, вероятно, будет использоваться в подписчиках фьючерса:

SNS_MESSAGE_HEADER_TYPE = 'x-amz-sns-message-type'
SUBCRIPTION_TYPE = 'SubscriptionConfirmation'
SUBCRIPTION_FORMAT = ["Message", "MessageId", "SubscribeURL", "Timestamp", "Token", "TopicArn", "Type"]
NOTIFICATION_TYPE = 'Notification'
NOTIFICATION_FORMAT = ["Message", "MessageId", "Subject", "Timestamp", "TopicArn", "Type"]

def needs_sns_notification_auth(f):

    @wraps(f)
    def verify_aws_origin(*args, **kwargs):
        if request.headers.get(SNS_MESSAGE_HEADER_TYPE) == SUBCRIPTION_TYPE:
            msg_format = SUBCRIPTION_FORMAT
        elif request.headers.get(SNS_MESSAGE_HEADER_TYPE) == NOTIFICATION_TYPE:
            msg_format = NOTIFICATION_FORMAT
        else:
            logger.error("Message not valid")
            return jsonify({'message': "Not valid"}), 401
        try:
            message_json = request.get_json(force=True)  # Notification will come without Content-type header
            decoded_signature = b64decode(message_json.get("Signature"))
            canonical_message = _message_builder(message_json, msg_format)
            amazon_url = message_json.get("SigningCertURL")
            if not urlparse(amazon_url).hostname.endswith('.amazonaws.com'):
                return jsonify({'message': "Not authorized"}), 403
            cert_content = urlopen(amazon_url).read()
            cert = x509.load_pem_x509_certificate(cert_content, default_backend())
            pubkey = cert.public_key()
            pubkey.verify(
                decoded_signature,
                canonical_message.encode(),
                padding.PKCS1v15(),
                hashes.SHA1()
            )
        except InvalidSignature as e:
            logger.error(e)
            return jsonify({'message': "Not authorized"}), 403
        except Exception as e:
            logger.error(e)
            error_msg = "Couldn't authenticate sns notification."
            return jsonify({'message': error_msg}), 500

        return f(*args, **kwargs)
    return verify_aws_origin

Много кода там. Давайте немного объясним:

  1. Итак, первое, что нужно сделать, это знать, какой это тип. Тип сообщения будет указан заголовком X-AMZ-SNS-Message. Таким образом, единственными возможными вариантами являются подписка и уведомление. Это важно, потому что тело сообщения будет другим. Мы также определяем правильный формат сообщения.
  2. Способ проверить сообщение AWS состоит в том, чтобы проверить этот домен имени хоста запроса Amazonaws.com. Также в сообщении мы получим сообщение, подписанное Amazon, и URL -адрес сертификата X509 (PEM), который использовался для его подписи. Таким образом, мы строим сообщение с определенным форматом, который определяет AWS и подписывает его с ключом, который мы получаем из этого сертификата.
  3. В случае, если подпись не совпадает, мы поймаем сигнал Invalidsignaturation и обрабатываем его по мере необходимости. Мы также захватываем неожиданные ошибки.
  4. Если все работает нормально, мы выполняем украшенную функцию.

Подтверждение подписки

Для этого шага мы можем предположить, что подпись действительна. Я также написал это как декоратор по тем же причинам:

def sns_subscriber(f):

    @wraps(f)
    def confirm_subscription(*args, **kwargs):
        if request.headers.get(SNS_MESSAGE_HEADER_TYPE) == SUBCRIPTION_TYPE:
            client = boto3.client('sns')
            json_data = request.get_json(force=True)  # Notification will come without Content-type header
            topic_arn = json_data.get('TopicArn')
            token = json_data.get('Token')
            try:
                response = client.confirm_subscription(
                    TopicArn=topic_arn,
                    Token=token
                )
            except ClientError as e:
                code = e.response.get('Error').get('Code')
                failed_msg = f"Subscription failed: {code}"
                logger.error(failed_msg)
                return jsonify({'message': failed_msg}), 409
            except Exception as e:
                logger.error(e)
                error_msg = f"Couldn't not process subscription for {request.endpoint} to {topic_arn}"
                logger.error(error_msg)
                return jsonify({'message': error_msg}), 500

            return jsonify({'message': success_msg})

        return f(*args, **kwargs)

    return confirm_subscription
  1. Мы проверяем тип сообщения.
  2. Есть два варианта подтверждения подписки. Одним из них является запрос получить запрос на URL -адрес подтверждения, который входит в сообщение. Другой – это выполнение запроса AWS API с темой ARN и токеном, который мы получаем из сообщения. Я рассмотрел более надежный способ, выполнив этот шаг через вызов API. Поэтому я извлекал информацию и использовал BOTO3, чтобы позвонить.
  3. В случае сбоя вызова, я поймаю ошибку за исключением ClientError. Это способ поймать ошибки Boto3. Вы можете увидеть больше информации об этом в документах библиотеки.
  4. Я рассматривал разные ответы в каждом случае. В конце этого поста я оставлю ссылку на полный код этого, который содержит подробные документы.

Бюджетное подтверждение

Проверка подписи и подтверждение SNS уже должно быть сделано. Поскольку сообщение будет отправлено изначально из бюджета, когда мы добавим подписчика SNS темы в его оповещение, оно автоматически запустит сообщение, в котором наш абонент должен ответить. О, и это будет также декоратор 🙂:

def budget_subscriber(f):

    @wraps(f)
    def confirm_subscription(*args, **kwargs):
        if request.headers.get(SNS_MESSAGE_HEADER_TYPE) == NOTIFICATION_TYPE:
            json_data = request.get_json(force=True)
            message_subject = json_data.get('Subject')
            message = json_data.get('Message')
            topic_arn = json_data.get('TopicArn')
            if message_subject == 'SNS Topic Verified!':
                success_msg = f"Successfully subscribed to budget"
                return jsonify({'message': success_msg})

        return f(*args, **kwargs)

    return confirm_subscription

Это более проще, чем предыдущий. Нам просто нужно прочитать сообщение. Если все в порядке, мы отвечаем 200 кодом.

Условия подписчиков SNS Met, триггер

Для моего подписчика я отправлю электронное письмо, чтобы сообщить пользователю, что его бюджетный порог достигнут:

@app.route('/budget-subscriber', methods=['POST'])
@needs_sns_notification_auth
@sns_subscriber
@budget_subscriber
def notify_threshold():
    json_data = request.get_json(force=True)
    user_summary = process_notification(json_data)
    if not user_summary:
        error_msg = "Not able to process the sns notification."
        logger.error(error_msg)
        return jsonify({'message': error_msg}), 400

    user_email = user_summary.get('user_email')
    user_name = user_summary.get('user_name')
    threshold = user_summary.get('threshold')
    amount = user_summary.get('budget_amount')

    subject = f"Cost control alert - {threshold}% warning threshold exceeded"
    body = (f"Hi {user_name},

You have exceeded {threshold}% of the ${amount} authorized for the current month. " ) message_html = f'{body}' try: sent = send_email([user_email], subject, message_html) except ClientError as e: error_code = e.response.get('Error').get('Code') error_message = f"notification email to user failed: {error_code}" logger.error(error_message) return jsonify({'message': error_message}), 409 if not sent: error_message = f"Unexpected error trying to send notification to user." logger.error(error_message) return jsonify({'message': error_message}), 500 return jsonify({'message': "success"})

Этот код сделает следующее:

  1. Я укрась этой конечной точкой определенными декораторами. Таким образом, до того, как код попадет здесь, он выполнит необходимые предыдущие шаги.
  2. Я собираю некоторую информацию, необходимую для создания сообщения, которое я отправлю пользователю с помощью Process_notification. Ниже я оставлю ссылку на полный образец кода.
  3. Создайте сообщение.
  4. Отправьте сообщение с пользовательской функцией send_email.
  5. Обработка ошибок.

ВЫВОД

Как видите, есть немного работы, прежде чем мы просто запустим наше действие подписчика. AWS предоставляет нам уровень безопасности, в котором мы наверняка должны слушать. Для выполнения нашей задачи может потребоваться больше времени и усилий, но в долгосрочной перспективе это будет стоить усилий. Проблемы интернет -безопасности могут быть реальной проблемой в наши дни, если у нас нет серьезных соображений. Все должно быть принято.

Надеюсь, вам понравился этот пост. Если вам это нравится, пожалуйста, поделитесь!

Получить код Здесь !

Оригинал: “https://dev.to/alegpereira/sns-subscriber-for-your-api-48ic”