Рубрики
Без рубрики

Кафка – API производителя с третьим лицами

Это продолжение записи до предыдущего поста – Кафка – игра с потребителем API, где я COV … Теги с Python, Kafka, MessagePack.

Это продолжение записи на предыдущий пост – кафка – игра с потребительской API, где я охватил некоторые своеобразные случаи использования потребительских API. Здесь я хотел бы сыграть с API-производителем, чтобы создать сообщение, используя сторонние инструменты, такие как MessagePack для сериализации и кафдроп для мониторинга

MessagePack.

MessagePack – одна из лучших доступных библиотек сериализации схемы для развязки данных между гетерогенными приложениями. Это двоичный для простых структур данных и предназначен для эффективной передачи через провод.

Кодовые фрагменты

Следующие примеры показывают, как упаковать и распаковать общие типы данных, работать с пользовательскими типами данных, такими как dateTime.dateTime, проверьте пример dateTime

  • Простой пример
from msgpack import packb, unpackb

data = {'a': 5}
msg_packet = packb(data)

recreated_data = unpackb(msg_packet)
  • Используя datetime.
from datetime import datetime as dtm
from dateutil import parser

def dtm_encode(obj):
    if isinstance(obj, dtm):
        return {
            '__dtm__': True,
            'obj_as_str': obj.isoformat()
        }
    return obj

def dtm_decode(obj):
    if '__dtm__' in obj:
        return parser.isoparse(obj['obj_as_str'])
    return obj

dat = {'a':5, 'dates': {'created_at': dtm.now()}}
serialized_dat = packb(dat, default=dtm_encode)
deserialzed_dat = unpackb(serialized_dat, object_hook=dtm_decode)

Почему сериализация сообщений?

Сериализация – это процесс преобразования объекта в формат, который он может быть сохранен, в пункте назначения данные могут быть десериализованы для воссоздания исходного объекта. Поскольку KAFKA использует файловую систему для хранения сообщений, нам нужно сериализовать и десетировать данные. Существует два протокола для сериализации сообщения либо с использованием IDL на основе схемы (язык определения интерфейса), либо с использованием схемы. Хотя вы можете найти многочисленные статьи, касающиеся этих протоколов в Интернете, я представлял собой одноклассник дифференцирующую их.

Схемы и схема IDL

В IDL на основе схемы примитивы сообщения предварительно определены, где издатели и потребители могут подтвердить сообщение, прежде чем работать над ним. С другой стороны, схема может иметь пользовательские примитивы для каждого сообщения. Если у вас есть стандартная и жесткая схема базы данных, вы, возможно, захотите использовать Apache AVRO, который является лучшим в форме и популярно используется в сочетании с Apache Kafka. Напротив, если вы работаете с схемыми или ориентированными на бигдата, где примитивы являются крайне непредсказуемыми, существует накладные расходы на поддержание схемы AVRO и возникает необходимость в схеме IDL. Это некоторые из стандартных протоколов наиболее часто выбраны.

Схемы Комиссия
Схемы Avro.
Схемы Json.
Схемы MessagePack.
Схемы Босоз

Я выбрал MessagePack среди списка, потому что он имеет невероятную производительность, Super Simple для настройки и начать работать, а также намного меньше и быстрее, чем JSON.

Kafdrop.

KAFDrop – это веб-интерфейс OpenSource для просмотра темы кафки и просмотра групп потребителей. Инструмент отображает такую информацию, как брокеры, темы, разделы, потребители и позволяет просматривать сообщения.

Пример экрана Как выглядит кафдроп

  • Главная Экран Главная Экран не только охватывает обзор, но он также имеет информацию о брокерах и темах, доступных в кластере
  • Тема страницы раскрывает все разделы информации темы и оказывает список групп потребителей, подписанных на тему

Функции

  • Перечислите все темы кластера
  • Возможность просматривать данные разбиения темы
  • Отслеживает Сообщение Смещения всех потребительских групп, подписанных на тему
  • Создание и удаление темы
  • Поиск и фильтры сообщений в разделе

Производить сообщение и увидеть в кафкропе

from datetime import datetime as dtm

from msgpack import packb, unpackb
from confluent_kafka import Producer

'''
Note: Import or recreate "dtm_encode" function from the Code Snippet section 
'''


def delivery_report(err, m):
    if err is not None:
        print(f'Message -{m}, delivery failed: {err}')
    else:
        print(f'Message delivered to {m.topic()} [{m.partition()}]')


producer_config = {
    'bootstrap.servers': '172.16.6.9'
}
producer_topic = 'raghu-producer-test'
P = Producer(producer_config)
data = {
    'a':5,
    'created_at': dtm.now()
}
data_packet = packb(data, default=dtm_encode, use_bin_type=True)

P.produce(
    producer_topic,
    value=data_packet,
    callback=delivery_report,
)
P.poll(0.01)

Примечание: См. Образцы кода в моем GitHub репо

Снимок кафдроп

Лучшие практики

Прежде чем мы закрываемся, я хотел бы упомянуть некоторые из лучших практик

  • Сжатие уменьшает размер пакета и помогает провести гораздо быстрее по сети, однако, MessagePack сокращает размер пакета на 30%, что выступает в качестве сжатия и заставило меня держаться подальше от обычных библиотек сжатия. Большинство стандартных рабочих процессов разработаны с использованием механизма сжатия Сырые & Zlib наиболее широко приняты библиотеки для сжатия.

  • Шифрование, которое я больше предпочитаю, чем сжатие, является хорошей практикой, чтобы гарантировать, что данные защищены в сети. Это не утомительная задача для шифрования сообщений перед точностью до продюсера и расшифровки на конце потребителя. Существует множество ключевых шифрованных систем шифрования, которая помогает нам шифровать и расшифровать пакеты. Для моего требования мы разработали тонкую обертку на криптографической библиотеке для шифрования и расшифровки сообщений, использующих гибридный механизм (симметричный + асимметричный), это функционирует, как и любое другое микровидное устройство для шифрования и расшифровки сообщений.

  • Заголовки сообщений, это не редкая практика для включения заголовков в сообщении для нескольких технических или архитектурных целей. Моя команда использует заголовки, чтобы определить версию сообщения, однако, ожидать нового использования с заголовками, обратитесь к дело Узнать больше.

Оригинал: “https://dev.to/nraghu/kafka-producer-api-with-third-party-tools-1j30”