Эта статья конкретно рассказывает о том, как написать производителя и потребителя для кластера KAFKA, обеспеченного SSL с помощью Python. Я не буду получать, как создать клиентские сертификаты в этой статье, это тема, зарезервированная для другой статьи:).
Предварительные условия
- Кафка кластер с SSL
- Клиентский сертификат (ключевой таблиц) в формате JKS
- Среда Linux с keytool и openssl установлена
- Python 3.6.
Шаг 1 – Преобразование JKS в PEM-файл
Почему мне нужен этот шаг?
В отличие от Java, Python и C # использует .pem Файлы для подключения к кафке. Для этого нам придется преобразовать файлы JKS в PEM с помощью keytool. и openssl команды. Если вы работаете над Windows 10, вы можете обратиться к моей статье о том, как запустить WSL в Windows здесь Отказ
Чтобы ваша жизнь легкой, я создал скрипт оболочки, чтобы быстро преобразовать JKS в PEM.
#!/bin/bash srcFolder=$1 keyStore=$1/$2 password=$3 alias=$4 outputFolder=$5 echo $keyStore echo "Generating certificate.pem" keytool -exportcert -alias $alias -keystore $keyStore -rfc -file $outputFolder/certificate.pem -storepass $password echo "Generating key.pem" keytool -v -importkeystore -srckeystore $keyStore -srcalias $alias -destkeystore $outputFolder/cert_and_key.p12 -deststoretype PKCS12 -storepass $password -srcstorepass $password openssl pkcs12 -in $outputFolder/cert_and_key.p12 -nodes -nocerts -out $outputFolder/key.pem -passin pass:$password echo "Generating CARoot.pem" keytool -exportcert -alias $alias -keystore $keyStore -rfc -file $outputFolder/CARoot.pem -storepass $password
Сценарий генерирует следующие файлы из файла клавишных магазинов,
- key.pem.pem.
- сертификат
- Caroot.pem.pem.
Как запустить этот скрипт?
Сохраните скрипт в файле E.g. jkstopem.sh и дать выполнение разрешений, как ниже,
chmod +x jkstopem.sh
Генерировать файлы PEM. Запустите сценарий оболочки, как показано на примере ниже,
./jkstopem.sh
Как найти псевдоним?
Если вы не знаете о том, что имеет псевдоним ваш сертификат. Запустите следующую команду в папке, в которой у вас есть файл клавишного магазина.
keytool -list -v -keystore kafka.client.keystore.jks
Вам будет предложено ввести пароль. Введите пароль ключевого магазина, это перечисляет содержимое файла ключевого магазина. Вы сможете увидеть * Псевдоним .
Ниже приведен пример для запуска скрипта оболочки,
./jkstopem.sh ~/client-cert kafka.client.keystore.jks welcome123 client-alias ~/client-cert/pem
Теперь вы должны уметь видеть следующие файлы в выходной папке,
- key.pem.pem.
- сертификат
- Caroot.pem.pem.
Теперь, как у нас есть все PEM Файлы, давайте получим растрескивание.
Шаг 2 – Написание производителя Кафки в Python
Мы будем использовать пакет «Кафка-Питона» для подключения к Кафке. Вы можете установить его, используя PIP,
pip install kafka-python
Теперь, давайте напишем наш продюсер.
#Producer.py from kafka import KafkaProducer kafkaBrokers='kafka1.xyz.com:443,kafka2.xyz.com:443,kafka3.xyz.com:443' caRootLocation='CARoot.pem' certLocation='certificate.pem' keyLocation='key.pem' topic='test-topic' password='welcome123' producer = KafkaProducer(bootstrap_servers=kafkaBrokers, security_protocol='SSL', ssl_check_hostname=True, ssl_cafile=caRootLocation, ssl_certfile=certLocation, ssl_keyfile=keyLocation, ssl_password=password) producer.send(topic, bytes('Hello Kafka!','utf-8')) # Send to a particular partition producer.send(topic, bytes('Hello Kafka!','utf-8'),partition=0) producer.flush()
В приведенном выше примере мы используем файлы PEM, которые мы создали на последнем шаге с паролем для чтения файла PEM.
kafkaBrokers='kafka1.xyz.com:443,kafka2.xyz.com:443,kafka3.xyz.com:443' caRootLocation='CARoote.pem' certLocation='certificate.pem' keyLocation='key.pem' password='welcome123' producer = KafkaProducer(bootstrap_servers=kafkaBrokers, security_protocol='SSL', ssl_check_hostname=True, ssl_cafile=caRootLocation, ssl_certfile=certLocation, ssl_keyfile=keyLocation, ssl_password=password)
Отправка данных в случайный раздел тематики
Ниже приведен фрагмент кода отправит данные в случайное раздел, определяющую KAFKA.
producer.send(topic, bytes('Hello Kafka!','utf-8')) producer.flush()
Отправка данных в определенный раздел тематики
Чтобы отправить данные в конкретный раздел, вам просто нужно указать раздел, как показано в следующем фрагменте кода,
producer.send(topic, bytes('Hello Kafka - Partition 0!','utf-8'),partition=0) producer.flush()
Поэтому мы построили наш производитель Python для кафки. В следующей части мы напишем потребитель, чтобы потреблять сообщение с темы.
Оригинал: “https://dev.to/adityakanekar/connecting-to-kafka-cluster-using-ssl-with-python-k2e”