Ниже приведен простой пример кода для издевания API AWS SQS. Это может быть полезно для разработки, поэтому вам не нужно на самом деле управлять реальной очередью SQS/подключения/IAM разрешения, пока не будете готовы.
Простая функция для написания сообщения в ранее существовавшую очередь
QUEUE_URL = os.environ.get('QUEUE_URL', '') def write_message(data): sqs = boto3.client('sqs', region_name = 'us-east-1') r = sqs.send_message( MessageBody = json.dumps(data), QueueUrl = QUEUE_URL )
Pytest светильники, чтобы издеваться над API AWS SQS. aws_credentials ()
Также гарантирует, что ваши функции Pytest на самом деле не будут писать ресурсам AWS.
REGION='us-east-' @pytest.fixture(scope='function') def aws_credentials(): """Mocked AWS Credentials for moto.""" os.environ['AWS_ACCESS_KEY_ID'] = 'testing' os.environ['AWS_SECRET_ACCESS_KEY'] = 'testing' os.environ['AWS_SECURITY_TOKEN'] = 'testing' os.environ['AWS_SESSION_TOKEN'] = 'testing' @pytest.fixture(scope='function') def sqs_client(aws_credentials): # setup with mock_sqs(): yield boto3.client('sqs', region_name=REGION) # teardown
Пример функции тестирования. Создать очередь, используя макет клиента из conftest.py (№ afly sqs_client
Параметр соответствует имени функции Conftest sqs_client
), вызвать функцию модуля Python app.write_message ()
Анкет Проверить возвращенное сообщение соответствует тому, что вы отправили
def test_write_message(sqs_client): queue = sqs_client.create_queue(QueueName='test-msg-sender') queue_url = queue['QueueUrl'] # override function global URL variable app.QUEUE_URL = queue_url expected_msg = str({'msg':f'this is a test'}) app.write_message(expected_msg) sqs_messages = sqs_client.receive_message(QueueUrl=queue_url) assert json.loads(sqs_messages['Messages'][0]['Body']) == expected_msg
Дополнительный
В случае, если вы захотите увидеть мою структуру файла:
├── README.md ├── app.py ├── requirements.txt ├── requirements_dev.txt └── tests ├── __init__.py ├── conftest.py └── unit ├── __init__.py └── test_sqs.py
Спасибо
Престижность Arunkumar Muralidharan Кто меня начал
Оригинал: “https://dev.to/drewmullen/mock-a-sqs-queue-with-moto-4ppd”