Рубрики
Без рубрики

Создайте свой собственный поток данных для кафки с Python и Faker

Мы все были там: мы установили или приобрели новый и блестящий инструмент управления данными, чтобы TE … Теги с апачекафкой, Python, Data, Faker.

Мы все были там: мы установили или приобрели новый и блестящий инструмент управления данными, чтобы проверить его, но сразу реализуется у нас нет данных для использования для нашего проб Может быть, мы знаем схему данных, но мы не можем использовать находства нашей компании по ряду причин.

Как мы можем дать платформу правильную попытку? Что если я скажу вам все, что вам нужно, это несколько строк кода Python? Мы будем использовать реальный случай цепочки доставки пиццы, чтобы продемонстрировать, насколько легко создавать правильные поддельные данные.

TL; DR : Если вы просто заинтересованы в готовом продюсе на основе kafka на основе пиццы, проверьте Связанный github repo Действительно

(Быстрые ссылки здесь:

Настройка кафки

  • Получение учетных данных обслуживания
  • Автоматическая тема Создание и включение apis apis kafka

Настройки клиента Python

  • Создание поддельных наборов данных с Faker
  • Создать пользовательский поставщик данных
  • Создание заказа Создание поддельного производителя

Наслаждаться!)

Платформы данных являются пустыми оболочками

Базы данных, Data Lakes и DataStores в целом – это прохладные технологии, позволяющие кому-либо управлять и анализировать данные, но они имеют общую проблему: они полностью пустыются по умолчанию. Это делает их трудно использовать для тестирования, которые все еще необходимы для оценки того, соответствует ли решение вашему применению. Несколько платформ решают эту проблему, предлагая предварительно заполненные наборы данных образца, которые могут быть установлены или включены с помощью нескольких команд. Эти наборы данных, однако, довольно статичны и не всегда предоставляют множество функций, полей или мощности, которые вам необходимо выполнить пробные испытания.

С Apache Kafka, масштаб этой проблемы еще больше. Он не только пустым по умолчанию, но это также платформа потоковой передачи данных, которая работает путем проглатывания, преобразования и распределения данных на лету – и она ожидает непрерывного потока данных. Поиск потоковых источников данных – это тяжелая задача, и, особенно если вы просто хотите проверить основы платформы, устанавливая их правильно, может быть довольно громоздкой.

Создание поддельных данных вручную также не тривиально. Даже если вы знаете свою схему данных, создавая когерентный набор рядов с нуля, является сложной задачей. В этом посте в блоге я пойду через вас, как создать такой поддельный набор данных для Кафки с горячей темой: Pizza!

Настройка кафки

Давайте начнем с Tech Setup. Для этого примера нам понадобится кластер кафки. Создание его на Aiven.io действительно легко:

  1. Перейдите к Aiven.io Console и войдите в систему (или зарегистрируйтесь; это быстро и легко).
  2. Нажмите + Создайте новый сервис
  3. Выберите Кафка Сервис (если мы разбираемся, мы также можем выбрать наш любимый версию Apache Kafka)
  4. Выберите поставщика облака, мы хотим развернуть наши услуги, вместе с областью облака
  5. Выберите план обслуживания на основе наших потребностей
  6. Дать услугу имя

Так как я базируюсь в Италии, я мог легко пойти на недавно созданные AWS-EU-South AWS Область, расположенная в Милане, чтобы минимизировать задержку. Ваш выбор зависит от того, где вы находитесь или где вы планируете предоставить свои услуги. Для нашего первоначального теста мы в порядке, используя План запуска Знание, мы всегда можем обновить (или понизить) в будущем.

Тогда мы просто нажимаем на Создать сервис Отказ Это займет несколько минут до нашего кластера Kafka 3-узла в Бег государство.

Получение учетных данных обслуживания

Хотя мы ждем, пока служба будет запущена и работает, мы уже можем начать готовиться к следующему шагу: загрузка сертификатов, необходимых для подключения.

Мы можем пойти в Обзор вкладка нашего экземпляра кафки в Aiven.io Console где мы можем легко найти Ключ доступа , Сертификат доступа и Сертификат CA. . Затем скачайте их на наш компьютер создает 3 файла:

  • Сервис. Кей : ключ доступа
  • Service.Cert : Сертификат доступа
  • CA.PEM : сертификат CA.

На Обзор Вкладка, я также могу принять к сведению Сервис URI (Обычно в форме - .aivencloud.Come: ), который мы будем использовать правильно указать наш производитель в кластер Kafka.

Автоматическая тема Создание и включение apis apis kafka

По умолчанию производители Kafka могут нажать данные только на предварительно созданные темы. Для того, чтобы темы были созданы на лету во время нажатия первой записи, в этом руководстве мы включим kafka.auto_create_topics_enable Параметр в Aiven.io Console Обзор вкладка; Прокрутите вниз до Расширенная конфигурация раздел, а затем нажмите на + Добавить опцию конфигурацию меню.

Последний шаг, необходимый в конфигурации, состоит в том, чтобы включить Kafka Read API (Karapace) в консоли Кафки Обзор Вкладка для нашего кластера. Этот шаг, строго говоря, не фундаментальный, но позволит нам проверить наш производитель по Обзор нажатых записей в консоли AIVEN.IO Темы вкладка.

Установка клиента Python

Мы будем использовать Кафка-Питон клиент, чтобы построить наш продюсер. Все, что нам нужно сделать, это установить это:

pip install kafka-python

А затем установить производитель. Добавьте этот код на новый main.py файл

import json
from kafka import KafkaProducer

folderName = "~/kafkaCerts/kafka-pizza/"
producer = KafkaProducer(
    bootstrap_servers="-.aivencloud.com:",
    security_protocol="SSL",
    ssl_cafile=folderName+"ca.pem",
    ssl_certfile=folderName+"service.cert",
    ssl_keyfile=folderName+"service.key",
    value_serializer=lambda v: json.dumps(v).encode('ascii'),
    key_serializer=lambda v: json.dumps(v).encode('ascii')

)

В примерном коде мы импортировали зависимости и установите правильные параметры, такие как bootstrap_servers , SSL_CAFILE С ssl_certfile и ssl_keyfile которые относятся к URI подключения и три файла сертификатов, упомянутых в разделе выше.

value_serializer и key_serializer Параметры нуждаются в отдельном объяснении. Позже мы сделаем каждую запись и ключ в формате JSON. Чтобы правильно подтолкнуть его к кафке, нам нужно преобразовать их в строковый формат и кодировать. Это именно то, какой код лямбда v: json.dumps (v) .encode ('ascii') делает.

Теперь мы готовы подтолкнуть наше первое сообщение для Kafka’s Тестовая тема с участием

producer.send("test-topic",
                key={"key": 1},
                value={"message": "hello world"}
            )
producer.flush()

Flush () Команда блокирует код из выполнения до тех пор, пока все сообщения Async не отправлены.

Если мы сделали нашу домашнюю работу правильно, теперь мы можем выполнить main.py Код с

python main.py

Мы должны быть в состоянии увидеть в Aiven.io Console под Темы вкладка, тема правильно создана

И после нажатия на название темы, на Сообщения и выбирая json. как ФОРМАТ Мы должны иметь возможность просматривать наше сообщение

Создание поддельных наборов данных с Faker

Итак, вернемся к нашему Главная Тема: Пицца.

Мы являемся владельцами цепочки доставки пиццы, и, конечно, мы хотим подталкивать наши приказы в Apache Kafka. Мы знаем, что мы получаем звонки и запишите клиент Имя , Адрес и Номер телефона (Вы никогда не знаете, мы могли бы потеряться во время доставки). Как мы можем имитировать эту информацию?

Добро пожаловать в Faker , библиотека Python, позволяющая нам создавать правильные поддельные данные! Мы должны быть на Python 3.5 и выше, и мы можем установить его с

pip install Faker

Нам просто нужен простой код для создания одного (или более) кортежа данных, содержащих Имя , Адрес и Телефонный номер

from faker import Faker
fake = Faker()
message= {
        'name':fake.name(),
        'address':fake.address(),
        'phone':fake.phone_number()
        }
print(message)

Который будет распечатать запись, как ниже

{
  'name': 'Adrian Cole', 
  'address': '9395 Smith Canyon\nSullivanport, UT 22200', 
  'phone': '001-959-108-3651'
}

Мы даже можем локализовать вывод, передавая локаль как аргумент, как

fake = Faker('it_IT')

Тот же пример выше, локализован на итальянском с it_it Параметр будет генерировать

{
  'name': 'Sig. Leopoldo Piacentini', 
  'address': 'Piazza Cocci 707 Piano 3\nSesto Isabella lido, 53704 Forlì-Cesena (FE)', 
  'phone': '+39 12 26548428'
}

Идеально, теперь наши основные поддельные генератор данных готов!

Ну … а <Имя, адрес, номер телефона> Tupple не совсем ракетная наука, а также ничего не говорит нам о нашем бизнесе. Мы пиццерия, где пицца? Удивительно Стандартные поставщики Faker Не включайте генератор пиццы, но не волнуйтесь, мы можем создать нашего собственного Отказ

Создать пользовательский поставщик данных

Мы знаем, что у нас есть стандартное меню для пиццы, состоящее из нескольких вариантов, начиная от традиционного Маргерита к Mari & Monti Смешивание морепродуктов и ветчины. Создание поддельного генератора для пиццы – это просто вопрос возврата случайного выбора между доступными параметрами. Мы можем создать новый поставщик Faker в отдельном PizzaProducer.py файл.

import random
from faker.providers import BaseProvider

class PizzaProvider(BaseProvider):
    def pizza_name(self):
        validPizzaNames= ['Margherita',
                          'Marinara',
                          'Diavola',
                          'Mari & Monti',
                          'Salami',
                          'Pepperoni'
                        ]
        return validPizzaNames[random.randint(0,len(validPizzaNames)-1)]

Теперь мы можем импортировать PizzaProvider В нашем main.py Файл и запустить его на 10 образцов

from pizzaproducer import PizzaProvider
fake.add_provider(PizzaProvider)
for i in range(0,10):
    print(fake.pizza_name())

Мы правильно получаем

Mari & Monti
Salami
Marinara
Pepperoni
Marinara
Pepperoni
Salami
Pepperoni
Margherita
Pepperoni

Но, как в любой респектабельной пиццерии, мы позволяем людям добавить начинки Из списка, и аналогично вышеперечисленному мы можем определить пользовательский Pizza_toppings. функция. То же самое касается записи, какой Пицца магазин В нашей цепи получает порядок, поколению которого требуется Pizza_shop функция.

Полная фальшивая пицца код поставщика данных может быть найден здесь , готов к копированию в нашу PizzaProducer.py файл.

Создание заказа

Теперь у нас есть все строительные блоки, давайте создадим заказ. Для каждого вызова мы отмечаем Имя , Адрес и Номер телефона Отказ Клиент однако может заказать 1-10 пиццеров, а для каждой пиццы, 0-5 дополнительных начин. Для генерации поддельных заказов можно определить функцию, которая принимает случайно генерировать номер заказа и возвращает заказ сообщение и связанные с этим ключ Отказ Одно следует отметить: мы решили показать наши сообщения с представлением названия магазина в формате JSON. Это обеспечит все заказы из того же магазина, чтобы появиться в том же Пицца-заказ Разделение темы, тем самым убедившись, что запросы в магазине будут выполнены после времени прибытия заказа.

# creating function to generate the pizza Order
def produce_pizza_order (orderid = 1):
    shop = fake.pizza_shop()
    # Each Order can have 1-10 pizzas in it
    pizzas = []
    for pizza in range(random.randint(1, MAX_NUMBER_PIZZAS_IN_ORDER)):
        # Each Pizza can have 0-5 additional toppings on it
        toppings = []
        for topping in range(random.randint(0, MAX_ADDITIONAL_TOPPINGS_IN_PIZZA)):
            toppings.append(fake.pizza_topping())
        pizzas.append({
            'pizzaName': fake.pizza_name(),
            'additionalToppings': toppings
        })
    # message composition
    message = {
        'id': orderid,
        'shop': shop,
        'name': fake.unique.name(),
        'phoneNumber': fake.unique.phone_number(),
        'address': fake.address(),
        'pizzas': pizzas
    }
    return message, key

Вызывая вышеуказанный код с

produce_pizza_order(704)

Будет генерировать ключ как

{
  "shop":"Luigis Pizza"
}

И а сообщение как

{
  "id": 704,
  "shop": "Luigis Pizza",
  "name": "Jessica Green",
  "phoneNumber": "(549)966-3806x9591",
  "address": "458 Conway Dale Apt. 510\nZacharyborough, TX 48185",
  "pizzas": [
    {
      "pizzaName": "Mari & Monti",
      "additionalToppings": [
        "banana"
      ]
    },
    {
      "pizzaName": "Peperoni",
      "additionalToppings": [
        "ham"
      ]
    }
  ]
}

Создание поддельного производителя

Мы описали выше два блока Lego: настройки производителя кафки и генератор поддельных заказов. Чего не хватает? Нам нужен непрерывный поток событий. Это можно легко моделировать, для 100 Сообщения, например, с циклом:

import time
while i <  100:
    message, key = produce_pizza_order(i)

    print("Sending: {}".format(message))
    # sending the message to Kafka
    producer.send(topic_name,
                  key=key,
                  value=message)
    # 2 seconds of sleep time before the next message
    time.sleep(2)

    # Force sending of all messages
    if (i % 100) == 0:
        producer.flush()
    i=i+1
producer.flush()

Следуя вышеуказанным шагам, вы должны быть в состоянии правильно производить события для кафки. Но, если вы хотите смотреть на готовый проект, проверьте Связанный github repo что позволит вам генерировать данные в течение нескольких минут.

После выполнения кода мы можем проверить, что производитель сделал свое задание, перейдя на Темы вкладка в Aiven.io Console и проверка Пицца-заказы компенсировать:

И, поскольку мы включили API для отдыха Kafka, также путем просмотра фактической темы содержимого:

Мы любим работать с Кафкой, и наши клиенты тоже! Если вы хотите узнать больше, то проверьте наши ресурсы:

Оригинал: “https://dev.to/ftisiot/create-your-own-data-stream-for-kafka-with-python-and-faker-49af”