Это продолжение записи на предыдущий пост – кафка – игра с потребительской API, где я охватил некоторые своеобразные случаи использования потребительских API. Здесь я хотел бы сыграть с API-производителем, чтобы создать сообщение, используя сторонние инструменты, такие как MessagePack для сериализации и кафдроп для мониторинга
MessagePack.
MessagePack – одна из лучших доступных библиотек сериализации схемы для развязки данных между гетерогенными приложениями. Это двоичный для простых структур данных и предназначен для эффективной передачи через провод.
Кодовые фрагменты
Следующие примеры показывают, как упаковать и распаковать общие типы данных, работать с пользовательскими типами данных, такими как dateTime.dateTime, проверьте пример dateTime
- Простой пример
from msgpack import packb, unpackb data = {'a': 5} msg_packet = packb(data) recreated_data = unpackb(msg_packet)
- Используя datetime.
from datetime import datetime as dtm from dateutil import parser def dtm_encode(obj): if isinstance(obj, dtm): return { '__dtm__': True, 'obj_as_str': obj.isoformat() } return obj def dtm_decode(obj): if '__dtm__' in obj: return parser.isoparse(obj['obj_as_str']) return obj dat = {'a':5, 'dates': {'created_at': dtm.now()}} serialized_dat = packb(dat, default=dtm_encode) deserialzed_dat = unpackb(serialized_dat, object_hook=dtm_decode)
Почему сериализация сообщений?
Сериализация – это процесс преобразования объекта в формат, который он может быть сохранен, в пункте назначения данные могут быть десериализованы для воссоздания исходного объекта. Поскольку KAFKA использует файловую систему для хранения сообщений, нам нужно сериализовать и десетировать данные. Существует два протокола для сериализации сообщения либо с использованием IDL на основе схемы (язык определения интерфейса), либо с использованием схемы. Хотя вы можете найти многочисленные статьи, касающиеся этих протоколов в Интернете, я представлял собой одноклассник дифференцирующую их.
Схемы и схема IDL
В IDL на основе схемы примитивы сообщения предварительно определены, где издатели и потребители могут подтвердить сообщение, прежде чем работать над ним. С другой стороны, схема может иметь пользовательские примитивы для каждого сообщения. Если у вас есть стандартная и жесткая схема базы данных, вы, возможно, захотите использовать Apache AVRO, который является лучшим в форме и популярно используется в сочетании с Apache Kafka. Напротив, если вы работаете с схемыми или ориентированными на бигдата, где примитивы являются крайне непредсказуемыми, существует накладные расходы на поддержание схемы AVRO и возникает необходимость в схеме IDL. Это некоторые из стандартных протоколов наиболее часто выбраны.
Схемы | Комиссия |
Схемы | Avro. |
Схемы | Json. |
Схемы | MessagePack. |
Схемы | Босоз |
Я выбрал MessagePack среди списка, потому что он имеет невероятную производительность, Super Simple для настройки и начать работать, а также намного меньше и быстрее, чем JSON.
Kafdrop.
KAFDrop – это веб-интерфейс OpenSource для просмотра темы кафки и просмотра групп потребителей. Инструмент отображает такую информацию, как брокеры, темы, разделы, потребители и позволяет просматривать сообщения.
Пример экрана Как выглядит кафдроп
- Главная Экран Главная Экран не только охватывает обзор, но он также имеет информацию о брокерах и темах, доступных в кластере
- Тема страницы раскрывает все разделы информации темы и оказывает список групп потребителей, подписанных на тему
Функции
- Перечислите все темы кластера
- Возможность просматривать данные разбиения темы
- Отслеживает Сообщение Смещения всех потребительских групп, подписанных на тему
- Создание и удаление темы
- Поиск и фильтры сообщений в разделе
Производить сообщение и увидеть в кафкропе
from datetime import datetime as dtm from msgpack import packb, unpackb from confluent_kafka import Producer ''' Note: Import or recreate "dtm_encode" function from the Code Snippet section ''' def delivery_report(err, m): if err is not None: print(f'Message -{m}, delivery failed: {err}') else: print(f'Message delivered to {m.topic()} [{m.partition()}]') producer_config = { 'bootstrap.servers': '172.16.6.9' } producer_topic = 'raghu-producer-test' P = Producer(producer_config) data = { 'a':5, 'created_at': dtm.now() } data_packet = packb(data, default=dtm_encode, use_bin_type=True) P.produce( producer_topic, value=data_packet, callback=delivery_report, ) P.poll(0.01)
Примечание: См. Образцы кода в моем GitHub репо
Снимок кафдроп
Лучшие практики
Прежде чем мы закрываемся, я хотел бы упомянуть некоторые из лучших практик
Сжатие уменьшает размер пакета и помогает провести гораздо быстрее по сети, однако, MessagePack сокращает размер пакета на 30%, что выступает в качестве сжатия и заставило меня держаться подальше от обычных библиотек сжатия. Большинство стандартных рабочих процессов разработаны с использованием механизма сжатия Сырые & Zlib наиболее широко приняты библиотеки для сжатия.
Шифрование, которое я больше предпочитаю, чем сжатие, является хорошей практикой, чтобы гарантировать, что данные защищены в сети. Это не утомительная задача для шифрования сообщений перед точностью до продюсера и расшифровки на конце потребителя. Существует множество ключевых шифрованных систем шифрования, которая помогает нам шифровать и расшифровать пакеты. Для моего требования мы разработали тонкую обертку на криптографической библиотеке для шифрования и расшифровки сообщений, использующих гибридный механизм (симметричный + асимметричный), это функционирует, как и любое другое микровидное устройство для шифрования и расшифровки сообщений.
Заголовки сообщений, это не редкая практика для включения заголовков в сообщении для нескольких технических или архитектурных целей. Моя команда использует заголовки, чтобы определить версию сообщения, однако, ожидать нового использования с заголовками, обратитесь к дело Узнать больше.
Оригинал: “https://dev.to/nraghu/kafka-producer-api-with-third-party-tools-1j30”