Рубрики
Без рубрики

Microservices & Rabbitmq На докере

Архитектура на основе микросервисов включает в себя разложение вашего приложения Monolith на множественные, полностью … Tagged с Python, Go, Microservices, Rabbitmq.

Архитектура на основе микросервисов включает в себя разложение вашего приложения Monolith на множественные, полностью независимо развертываемые и масштабируемые услуги Анкет Помимо этого базового определения, то, что представляет собой микросервис, может быть несколько субъективным, хотя есть несколько испытанных боевых действий, принятых такими гигантами, как Netflix и Uber Это всегда следует учитывать. И я обсуждаю некоторые из них. В конечном счете, мы хотим Разделите наше приложение на более мелкие приложения, каждое из которых является отдельной системой и имеет дело только с одним аспектом всего приложения и делает это действительно хорошо Анкет Это разложение является очень важным шагом и может быть сделано на основе субдомены , которые должны быть идентифицированы правильно. Меньшие приложения больше модульный и управляемый С четко определенными границами могут быть написаны с помощью разные языки/фреймворки , неудача в изоляции так что все приложение не падает (без SPOF). Возьмите пример билета в кино:

Источник: https://codeburst.io/build-a-nodejs-cinema-api-gateway-and-deploying-it-to-docker-part-4-703c2b0dd269

Давайте разберем этот птичий глаз:

i) Пользовательским приложением может быть мобильный клиент, спа -салон и т. Д. или любой клиент, потребляющий наши бэкэнд -сервисы.

ii) Считается плохой практикой, чтобы попросить наших клиентов общаться с каждой из наших услуг отдельно , по причинам, которые я пытался объяснить Здесь Анкет Это то, для чего предназначены шлюзы API: для получения запросов клиентов, позвонить в наши услуги, вернуть ответ. Таким образом, клиент должен поговорить только с одним сервером, давая иллюзию монолита. Несколько шлюзов могут использоваться для Различные виды клиентов (Мобильные приложения, планшеты, браузеры и т. Д.). Они могут и должны нести ответственность за ограниченную функциональность, такие как слияние/присоединение ответов от услуг, аутентификация, ACLS. В крупных приложениях, которые должны масштабироваться и динамически перемещаться, шлюзы также нуждаются в доступе к Реестр услуг который содержит местоположения наших экземпляров микросервиса, баз данных и т. Д.

iii) Каждая служба имеет свое собственное хранилище Анкет Это ключевой момент и обеспечивает свободную связь. Затем некоторые запросы должны будут присоединиться к данным, которые принадлежат нескольким сервисам. Чтобы избежать этого серьезного удара, данные могут быть воспроизведены и оскорблены. Этот принцип Не только терпимы в микросервисах, но и поощряли Анкет

iv) Звонки отдыха, сделанные в нашем API Gateway передаются в Сервисы, которые, в свою очередь, разговаривают с другими услугами, возвращают результат в шлюз, которые, возможно, компилируют его и реагируют на него клиенту. Общение между такими услугами по одному запросу клиента в приложение не должно произойти. В противном случае мы будем жертвовать производительностью из-за еще одной http-обратной поездки за недавно введенную модульность.

В идеале единственный запрос должен вызовать только одну услугу, чтобы получить ответ Анкет Это означает, что любые синхронные запросы между службами должны быть сведены к минимуму, и это не всегда возможно; Механизмы, такие как grpc , Благодарность или даже простые HTTP (как в нашем примере) обычно используются при необходимости. Как вы уже догадались, это означает, что данные должны быть воспроизведены в наших услугах. Скажи, GET/CATALOG/<< CITYID >> Конечная точка также должна вернуть премьеры в каждом кинотеатре города в то время. С нашей новой стратегией, премьеры должны храниться в базе данных для Каталог кино обслуживание. Следовательно, точка iii) Анкет

Асинхронно общение между услугами

Итак, скажем, премьера изменяется в результате какой -то операции CRUD на Фильмы оказание услуг. Чтобы сохранить данные синхронизации, это событие обновления должно быть излучено и применено к Каталог кино обслуживание. Попробуйте представить наши микросервисы как кластер государственных машин, где обновления в состояниях, возможно, должны быть переданы по всему кластеру для достижения возможной последовательности Анкет Конечно, мы никогда не должны ожидать, что нашим конечным пользователям придется дольше ждать, чтобы запросы закончили и пожертвовали своим временем для модульности на нашу пользу. Таким образом, все это общение должно быть не блокирующим. И вот где Rabbitmq приходит в.

Rabbitmq – это очень мощный Брокер сообщения, который реализует Протокол обмена сообщениями AMQP . Вот реферат: во -первых, вы устанавливаете экземпляр сервера RabbitMQ (брокер) в систему. Тогда a Издатель/продюсер Программа подключается к этому серверу и отправляет сообщение. Rabbitmq очереди это сообщение и откидывает его на один или несколько подписчик/потребитель Программы, которые прослушиваются на сервере RabbitMQ.

Прежде чем я доберусь до сутки этой статьи, я хочу явно заявить, что микросервисы являются путь более сложные, и мы не будем освещать критические темы, такие как терпимость ошибки Из -за сложности распределенных систем полное Роль шлюза API , Обнаружение обслуживания , шаблоны согласованности данных, такие как Саги , предотвращение сбоя обслуживания каскады с использованием Выключатели трассы , проверки здоровья и архитектурные модели, такие как CQRS Анкет Не говоря уже о Как решить, будут ли микросервисы работать на вас или нет Анкет

Как Rabbitmq Работает

Более конкретно, сообщения Опубликовано к Обмен Внутри брокера Rabbitmq. Затем обмен распределяет копии этого сообщения в очереди На основании определенных разработчиков правил, называемых привязки Анкет Эта часть путешествия сообщений называется маршрутизация Анкет И это косвено, конечно, является тем, что делает для неблокирующих передачи сообщений. Потребители Слушая на этих очереди Это получило сообщение получит его. Довольно просто, верно?

Не совсем. Есть Четыре разных типа обмены и каждый, вместе с привязки , определяет алгоритм маршрутизации. «Алгоритм маршрутизации» означает, по сути, то, как сообщения распространяются между очередями. Заниматься подробностями о каждом типе может быть излишним здесь, поэтому я просто расширим тот, который мы будем использовать: Обмен темы :

Чтобы обмен выдвинул сообщение в очередь, эта очередь должна быть связана с обменом. Мы можем создать несколько обменов с уникальными именами, явно. Однако, когда вы развертываете RabbitMQ, он поставляется с дефолтом, безымянным обменом. И каждая очередь, которую мы создаем, будет автоматически связана с этим обменом. Чтобы быть описательным, я буду создавать названный обмен вручную, а затем связывать очередь с ним. Это привязка определяется Ключ для привязки Анкет Точный способ, которым ключ привязки, опять же, зависит от типа обмена. Вот как это работает с обменом темы:

  • A очередь связан с Обмен используя Строка шаблона (Ключ для связывания)
  • Опубликованное сообщение доставляется в Обмен вместе с Ключ маршрутизации
  • Обмен проверяет, что очереди соответствует Ключ маршрутизации на основе Ключ для привязки шаблон определяется ранее.

* может заменить ровно одно слово. # может заменить ноль или более слов. Источник: https://www.rabbitmq.com/tutorials/tutorial-five-python.html

Любое сообщение с ключом маршрутизации "Quick.orange.rabbit" будет доставлен в обе очереди. Однако сообщения с "lazy.rown.fox" достигнет только Q2 Анкет Те, у кого клавиша маршрутизации, не соответствующая каким -либо шаблону, будут потеряны.

Для некоторой перспективы давайте просто просмотрим двух других типов обмена:

  • Fanout Exchange : Сообщения, отправленные на этот вид обмена, будут отправлены во все очереди, связанные с ним. Ключ маршрутизации, если он предоставлен, будет полностью игнорироваться. Это может быть использовано, например, для трансляции обновлений глобальной конфигурации в распределенной системе.
  • Прямой обмен (Самый простой): отправляет сообщение в очередь, привязывающий ключ, это точно равна данному ключу маршрутизации. Если в очереди слушают несколько потребителей, то сообщения будут сбалансированы между ними, следовательно, они обычно используются для распространения задач между несколькими работниками в круглом манере.

Моя иллюстрация будет очень Просто: питон Колба Приложение с одной конечной точкой пост, которая при вызове будет направлено на обновление информации пользователя, издайте сообщение брокеру RabbitMQ (не блокировка, конечно) и вернет 201. Отдельная служба GO будет прислушиваться к сообщению от брокера и, следовательно, будет иметь возможность соответствующим образом обновить свои данные. Все три будут размещены на отдельных контейнерах.

Настройка наших контейнерных микросервисов и брокера с помощью Docker Compose

Предоставление кучу контейнеров, и все, что сопровождает их, может быть болью, поэтому я всегда полагаюсь на Docker Compose .

Вот весь код . Мы объявляем три услуги, которые будут использоваться для трех контейнеров. Два тома необходимы для размещения нашего кода в контейнеры:

# docker-compose.yml

version: "3.2"
services:
    rabbitmq-server:
        build: ./rabbitmq-server

    python-service:
        build: ./python-service
        # 'rabbitmq-server' will be available as a network reference inside this service 
        # and this service will start only after the RabbitMQ service does.
        depends_on:
            - rabbitmq-server
        # Keep it running.  
        tty: true
        # Map port 3000 on the host machine to port 3000 of the container.
        # This will be used to receive HTTP requests made to the service.
        ports:
            - "3000:3000"
        volumes:
            - './python-service:/python-service'

    go-service:
        build: ./go-service
        depends_on:
            - rabbitmq-server
        tty: true
        volumes:
            - './go-service:/go-service'

# Host volumes used to store code.
volumes:
    python-service:
    go-service:

Dockerfiles – это в значительной степени стандартные из Docker Hub , к которому я добавил:

  • /go-service Рабочий каталог в контейнере сервиса GO.
  • /python-service Рабочий каталог в контейнере Python Service.
  • Клиентская библиотека Go Rabbitmq под названием AMQP
  • Python’s Rabbitmq Client Пика & Фляжка

Наше приложение Flask имеет только одну конечную точку, которая получает user_id и full_name , который будет использоваться для обновления профиля пользователя. Сообщение, указывающее на это обновление, будет отправлено брокеру RabbitMQ.

# main.py

from flask import Flask
from flask import request
from flask import jsonify
from services.user_event_handler import emit_user_profile_update

app = Flask(__name__)

@app.route('/users/', methods=['POST'])
def update(user_id):
    new_name = request.form['full_name']

    # Update the user in the datastore using a local transaction...

    emit_user_profile_update(user_id, {'full_name': new_name})

    return jsonify({'full_name': new_name}), 201

Логика для излучения событий в другие службы всегда должна быть отделена от остальной части приложения, поэтому я извлекла его в модуль. Обмен следует четко проверять и создано и создано издателем и потребителем, потому что мы не можем знать (и не должны полагаться), на какую службу начинается первым. Это хорошая практика, которую большинство клиентских библиотек RabbitMQ облегчают:

# services/user_event_handler.py

import pika
import json

def emit_user_profile_update(user_id, new_data):
    # 'rabbitmq-server' is the network reference we have to the broker, 
    # thanks to Docker Compose.
    connection = pika.BlockingConnection(pika.ConnectionParameters(host='rabbitmq-server'))
    channel    = connection.channel()

    exchange_name = 'user_updates'
    routing_key   = 'user.profile.update'

    # This will create the exchange if it doesn't already exist.
    channel.exchange_declare(exchange=exchange_name, exchange_type='topic', durable=True)

    new_data['id'] = user_id

    channel.basic_publish(exchange=exchange_name,
                          routing_key=routing_key,
                          body=json.dumps(new_data),
                          # Delivery mode 2 makes the broker save the message to disk.
                          # This will ensure that the message be restored on reboot even  
                          # if RabbitMQ crashes before having forwarded the message.
                          properties=pika.BasicProperties(
                            delivery_mode = 2,
                        ))

    print("%r sent to exchange %r with data: %r" % (routing_key, exchange_name, new_data))
    connection.close()

Не путайтесь по канал . A канал это просто Виртуальное, легкое соединение внутри соединение TCP, которое предназначено для предотвращения открытия нескольких дорогих соединений TCP. Особенно в многопоточных средах.

долговечный Параметр гарантирует, что Обмен сохраняется на диске и может быть восстановлен, если брокер вылетел или не по какой -либо причине остается в автономном режиме. Издатель (Service Python) создает Обмен названный user_updates и отправляет обновленные данные пользователя с user.profile.update как Ключ маршрутизации Анкет Это будет сопоставлено с Профиль пользователя. * Ключ для привязки , что наше обслуживание GO определит:

// main.go

package main

import (
    "fmt"
    "log"
    "github.com/streadway/amqp"
)

func failOnError(err error, msg string) {
    if err != nil {
        // Exit the program.
        panic(fmt.Sprintf("%s: %s", msg, err))
    }
}

func main() {
    // 'rabbitmq-server' is the network reference we have to the broker, 
    // thanks to Docker Compose.
    conn, err := amqp.Dial("amqp://guest:guest@rabbitmq-server:5672/")
    failOnError(err, "Error connecting to the broker")
    // Make sure we close the connection whenever the program is about to exit.
    defer conn.Close()

    ch, err := conn.Channel()
    failOnError(err, "Failed to open a channel")
    // Make sure we close the channel whenever the program is about to exit.
    defer ch.Close()

    exchangeName := "user_updates"
    bindingKey   := "user.profile.*"

    // Create the exchange if it doesn't already exist.
    err = ch.ExchangeDeclare(
            exchangeName,   // name
            "topic",        // type
            true,           // durable
            false,
            false,
            false,
            nil,
    )
    failOnError(err, "Error creating the exchange")

    // Create the queue if it doesn't already exist.
    // This does not need to be done in the publisher because the
    // queue is only relevant to the consumer, which subscribes to it.
    // Like the exchange, let's make it durable (saved to disk) too.
    q, err := ch.QueueDeclare(
            "",    // name - empty means a random, unique name will be assigned
            true,  // durable
            false, // delete when the last consumer unsubscribes
            false, 
            false, 
            nil,   
    )
    failOnError(err, "Error creating the queue")

    // Bind the queue to the exchange based on a string pattern (binding key).
    err = ch.QueueBind(
            q.Name,       // queue name
            bindingKey,   // binding key
            exchangeName, // exchange
            false,
            nil,
    )
    failOnError(err, "Error binding the queue")

    // Subscribe to the queue.
    msgs, err := ch.Consume(
            q.Name, // queue
            "",     // consumer id - empty means a random, unique id will be assigned
            false,  // auto acknowledgement of message delivery
            false,  
            false,  
            false,  
            nil,
    )
    failOnError(err, "Failed to register as a consumer")


    forever := make(chan bool)

    go func() {
        for d := range msgs {
            log.Printf("Received message: %s", d.Body)

            // Update the user's data on the service's 
            // associated datastore using a local transaction...

            // The 'false' indicates the success of a single delivery, 'true' would
            // mean that this delivery and all prior unacknowledged deliveries on this
            // channel will be acknowledged, which I find no reason for in this example.
            d.Ack(false)
        }
    }()

    fmt.Println("Service listening for events...")

    // Block until 'forever' receives a value, which will never happen.
    <-forever
}

Rabbitmq использует порт 5672 по умолчанию для не TLS-соединений и «гость» в качестве имени пользователя и пароля. Вы можете изучать множество параметров конфигурации Доступно и как их использовать с Пика а также Go amqp Анкет

Вам может быть интересно, для чего эта линия: D.ack (ложь)

Это говорит брокеру, что сообщение было доставлено, успешно обработано и может быть удалено. По умолчанию эти подтверждения происходят автоматически. Но мы указали так иначе, когда подписались на очередь: гнездо Потреблять Анкет

Теперь, если сервис GO сбой (по какой-либо непредвиденной причине), подтверждение не будет отправлено, и это приведет к тому, что брокер будет повторно заполнить сообщение, чтобы ему был предоставлен еще один шанс обработать.

Запуск микросервисов

Хорошо, давайте заставим:

Запустить Docker составить

Когда будут построены три службы (в первый раз займет не менее нескольких минут), проверьте их имена, используя Docker PS :

Откройте два новых терминала, SSH в Python и перейдите в контейнеры, используя соответствующие названия контейнеров и запустите серверы:

Docker Exec -it MicroServicesUsingRabbitmq_python -service_1 bash Flask_app = main.py Python -m Flask Run -Port 3000 -Хост 0.0.0.0

Docker Exec -it MicroServicesUsingRabbitmq_go -service_1 bash Иди запустить main.go

Откройте третий терминал, чтобы отправить запрос POST. Я использую Curl:

curl -d “-x post http://localhost: 3000/users/1

И вы увидите передачу:

В любой момент вы также можете SSH в контейнер RabbitMQ и просто осмотреть:

  • rabbitmqctl list_exchanges (Перечислите все обмены на этом брокерском узле)
  • rabbitmqctl list_queues (Перечислите все очереди на этом брокерском узле)
  • rabbitmqctl list_bindings (Перечислите все привязки в этом брокером)
  • rabbitmqctl list_queues name_ready messages_unacknowged (Перечислите все очереди с количеством сообщений, которые есть в каждом, которые являются готовы к доставке клиентам, но еще не доставляются , а те, которые были доставлены, но еще не признаны )

Как я уже упоминал в начале, это ни в коем случае не глубоко погружение в микросервисы. Есть много вопросов, которые нужно задавать, и я постараюсь ответить на важную: как мы делаем эту коммуникационную транзакцию? Итак, что произойдет, если наш сервис GO (потребитель) вызовет исключение при обновлении состояния на его конце, и мы должны убедиться, что событие обновления возвращается на все услуги, которые были затронуты им? . Представьте себе, как это может быть сложно получить, когда у нас есть несколько микросервисов и тысячи таких «событий обновления». По сути, нам нужно включить отдельные события, которые выполняют откат Анкет

В нашем случае, если сервис GO выдвигает исключение при обновлении данных, ей придется отправить сообщение обратно в службу Python, в котором говорится, что он откатится от обновления. Также важно отметить, что в случае с такими ошибками доставка сообщений должна быть подтверждена (даже если обработка не была успешной), чтобы сообщение не было повторно переполнено брокером . При написании нашего потребителя нам придется решить, какие ошибки означают, что сообщение должно быть переполнено (еще раз), а какие означают, что сообщение не должно быть переполнено и просто откатится.

Но как мы указываем, какое событие обновления для отката и как именно произойдет откат? SAGA PATTER вместе с Сообщение о событиях широко используется для обеспечения такой согласованности данных.

Несколько слов о разработке брокера

Рассмотрим две вещи: Типы обменов для использования и Как сгруппировать обмены Анкет

Если вам нужно транслировать определенные виды сообщений во все услуги в вашей системе, посмотрите на Тип обмена фанатом Анкет Затем один из способов сгруппировать обмены может быть на основе событий, например Три биржи поклонника по имени user.profile.updated , user.profile.deleted , user.profile.added . Это может быть не то, что вы хотите все время, так как вы можете получить слишком много обменов и не сможете фильтровать сообщения для конкретных потребителей, не создавая новый обмен , очередь и Переплет Анкет

Другим способом может быть создание Обмен темы С точки зрения объектов в вашей системе. Итак, в нашем первом примере, Пользователь , Фильм , Кино и т. д. могут быть сущности и, скажем, очереди Связан с Пользователь Можно использовать привязывающие клавиши, такие как user.created (Получите сообщение, когда пользователь создан), user.login (Получите сообщение, когда пользователь только что вошел в систему), user.roles.grant (Получите сообщение о том, что пользователю получили роль авторизации), user.notify (Отправьте пользователю уведомление) и т. Д.

Всегда используйте маршрутизацию для фильтрации и доставки сообщений конкретным потребителям. Написание кода, чтобы отказаться от определенных входящих сообщений при принятии других, является анти-паттерном. Потребители должны получать только сообщения, которые им требуются.

Окончательно, Если ваши потребности сложны, и вы требуете, чтобы сообщения были отфильтрованы до определенных потребителей на основе нескольких свойств, используйте Заголовки обмен Анкет

Наслаждаться!

Оригинал: “https://dev.to/usamaashraf/microservices–rabbitmq-on-docker-e2f”