Рубрики
Без рубрики

Издеваться в очередь к Sqs с помощью Moto

Пример использования библиотеки Python для издевания API AWS SQS. Tagged с Python, Moto, Boto3, AWS.

Ниже приведен простой пример кода для издевания API AWS SQS. Это может быть полезно для разработки, поэтому вам не нужно на самом деле управлять реальной очередью SQS/подключения/IAM разрешения, пока не будете готовы.

Простая функция для написания сообщения в ранее существовавшую очередь

QUEUE_URL = os.environ.get('QUEUE_URL', '')
def write_message(data):
    sqs = boto3.client('sqs', region_name = 'us-east-1')
    r = sqs.send_message(
        MessageBody = json.dumps(data),
        QueueUrl = QUEUE_URL
    )

Pytest светильники, чтобы издеваться над API AWS SQS. aws_credentials () Также гарантирует, что ваши функции Pytest на самом деле не будут писать ресурсам AWS.

REGION='us-east-'
@pytest.fixture(scope='function')
def aws_credentials():
    """Mocked AWS Credentials for moto."""
    os.environ['AWS_ACCESS_KEY_ID'] = 'testing'
    os.environ['AWS_SECRET_ACCESS_KEY'] = 'testing'
    os.environ['AWS_SECURITY_TOKEN'] = 'testing'
    os.environ['AWS_SESSION_TOKEN'] = 'testing'


@pytest.fixture(scope='function')
def sqs_client(aws_credentials):
    # setup
    with mock_sqs():
        yield boto3.client('sqs', region_name=REGION)
    # teardown

Пример функции тестирования. Создать очередь, используя макет клиента из conftest.py (№ afly sqs_client Параметр соответствует имени функции Conftest sqs_client ), вызвать функцию модуля Python app.write_message () Анкет Проверить возвращенное сообщение соответствует тому, что вы отправили

def test_write_message(sqs_client):
    queue = sqs_client.create_queue(QueueName='test-msg-sender')
    queue_url = queue['QueueUrl']
    # override function global URL variable
    app.QUEUE_URL = queue_url
    expected_msg = str({'msg':f'this is a test'})
    app.write_message(expected_msg)
    sqs_messages = sqs_client.receive_message(QueueUrl=queue_url)

    assert json.loads(sqs_messages['Messages'][0]['Body']) == expected_msg

Дополнительный

В случае, если вы захотите увидеть мою структуру файла:

├── README.md
├── app.py
├── requirements.txt
├── requirements_dev.txt
└── tests
    ├── __init__.py
    ├── conftest.py
    └── unit
        ├── __init__.py
        └── test_sqs.py

Спасибо

Престижность Arunkumar Muralidharan Кто меня начал

Оригинал: “https://dev.to/drewmullen/mock-a-sqs-queue-with-moto-4ppd”