- python3
- Kafka topic created
virtualenv virtual-environment-name
source ./virtual-environment-name/bin/activate
pip3 install -r requirements.txt
# topic name
topic = 'topic-name'
To specify the desired quantity of produced messages, in the "for i range(3)" of the "main" function. In this case range(3) will be the messages produced
# Create data messages
def main():
for i in range (3):
# Generate Fake Data
data = {
'id': dataFake.random_int(min=20000, max=100000),
'name': dataFake.name(),
'address': dataFake.street_address() + ' | ' + dataFake.city() + ' | ' + dataFake.country_code(),
'platform': random.choice(['phone', 'laptop', 'Tablet']),
'date': str(dataFake.date_time_this_month())
}
python3 kafka-producer.py
producer.log
python3 kafka-consumer.py
If necessary, convert kafka.client.truststore.jks to cacert.pem and certificate.pem for Kafka authentication
keytool -list -rfc -keystore kafka.client.truststore.jks -storepass pass-kafka.client.truststore | awk '/BEGIN CERTIFICATE/,/END CERTIFICATE/ {print $0}' > cacert.pem
keytool -list -rfc -keystore kafka.client.truststore.jks
keytool -exportcert -alias root-users.pem -keystore kafka.client.truststore.jks \
-rfc -file certificate.pem
# Connection Kafka Brokers
producer = KafkaProducer(
bootstrap_servers='server:port,server:port',
security_protocol='SSL',
ssl_check_hostname=False,
ssl_cafile='./cacert.pem',
ssl_certfile='./certificate.pem',
# ssl_keyfile='key.pem'
)
# Connection Kafka Brokers
consumer = KafkaConsumer(
topic,
bootstrap_servers='server:port,server:port,
security_protocol='SSL',
ssl_check_hostname=False,
ssl_cafile='./cacert.pem',
ssl_certfile='./certificate.pem',
auto_offset_reset='earliest',
enable_auto_commit=True,
group_id=consumer_group
)