Skip to content

Commit

Permalink
Merge pull request #336 from gridsingularity/kafka_default_url
Browse files Browse the repository at this point in the history
Removed Kafka default url variable.
  • Loading branch information
spyrostz authored Mar 15, 2022
2 parents fd57017 + a7ff72c commit 3da716e
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 16 deletions.
15 changes: 8 additions & 7 deletions gsy_framework/kafka_communication/__init__.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,22 @@
import ssl
from os import environ

DEFAULT_KAFKA_URL = "localhost:9092"
KAFKA_URL = environ.get("KAFKA_URL", DEFAULT_KAFKA_URL)

IS_KAFKA_RUNNING_LOCALLY = environ.get("IS_KAFKA_RUNNING_LOCALLY", "true") == "true"
KAFKA_URL = environ.get("KAFKA_URL", "localhost:9092")
KAFKA_USERNAME = environ.get("KAFKA_USERNAME", None)
KAFKA_PASSWORD = environ.get("KAFKA_PASSWORD", None)
KAFKA_COMMUNICATION_SECURITY_PROTOCOL = \
environ.get("KAFKA_COMMUNICATION_SECURITY_PROTOCOL", "SASL_SSL")
KAFKA_SASL_AUTH_MECHANISM = \
environ.get("KAFKA_SASL_AUTH_MECHANISM", "SCRAM-SHA-512")
KAFKA_COMMUNICATION_SECURITY_PROTOCOL = (
environ.get("KAFKA_COMMUNICATION_SECURITY_PROTOCOL", "SASL_SSL"))
KAFKA_SASL_AUTH_MECHANISM = (
environ.get("KAFKA_SASL_AUTH_MECHANISM", "SCRAM-SHA-512"))
KAFKA_API_VERSION = (0, 10)
KAFKA_RESULTS_TOPIC = environ.get("KAFKA_RESULTS_TOPIC", "d3a-results")
KAFKA_CONSUMER_GROUP_ID = environ.get("KAFKA_RESULTS_GROUP_ID", "d3a-results-group")


def create_kafka_new_ssl_context():
# Create a new context using system defaults, disable all but TLS1.2
"""Create a new context using system defaults, disable all but TLS1.2."""
ssl_context = ssl.create_default_context()
ssl_context.options &= ssl.OP_NO_TLSv1
ssl_context.options &= ssl.OP_NO_TLSv1_1
Expand Down
10 changes: 5 additions & 5 deletions gsy_framework/kafka_communication/kafka_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from kafka import KafkaConsumer

from gsy_framework.kafka_communication import (
KAFKA_URL, DEFAULT_KAFKA_URL, KAFKA_USERNAME,
KAFKA_URL, IS_KAFKA_RUNNING_LOCALLY, KAFKA_USERNAME,
KAFKA_PASSWORD, KAFKA_COMMUNICATION_SECURITY_PROTOCOL,
KAFKA_SASL_AUTH_MECHANISM,
KAFKA_API_VERSION, create_kafka_new_ssl_context, KAFKA_RESULTS_TOPIC, KAFKA_CONSUMER_GROUP_ID)
Expand All @@ -16,7 +16,10 @@

class KafkaConnection:
def __init__(self, callback):
if KAFKA_URL != DEFAULT_KAFKA_URL:
if IS_KAFKA_RUNNING_LOCALLY:
kwargs = {"bootstrap_servers": KAFKA_URL,
"consumer_timeout_ms": KAFKA_CONSUMER_TIMEOUT_MS}
else:
kwargs = {"bootstrap_servers": KAFKA_URL,
"sasl_plain_username": KAFKA_USERNAME,
"sasl_plain_password": KAFKA_PASSWORD,
Expand All @@ -29,9 +32,6 @@ def __init__(self, callback):
"max_poll_records": KAFKA_MAX_POLL_RECORDS,
"consumer_timeout_ms": KAFKA_CONSUMER_TIMEOUT_MS,
"group_id": KAFKA_CONSUMER_GROUP_ID}
else:
kwargs = {"bootstrap_servers": DEFAULT_KAFKA_URL,
"consumer_timeout_ms": KAFKA_CONSUMER_TIMEOUT_MS}

self._consumer = KafkaConsumer(KAFKA_RESULTS_TOPIC, **kwargs)
self._callback = callback
Expand Down
8 changes: 4 additions & 4 deletions gsy_framework/kafka_communication/kafka_producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from kafka import KafkaProducer

from gsy_framework.kafka_communication import (
KAFKA_URL, DEFAULT_KAFKA_URL, KAFKA_USERNAME, KAFKA_PASSWORD,
KAFKA_URL, IS_KAFKA_RUNNING_LOCALLY, KAFKA_USERNAME, KAFKA_PASSWORD,
KAFKA_COMMUNICATION_SECURITY_PROTOCOL, KAFKA_SASL_AUTH_MECHANISM, KAFKA_API_VERSION,
create_kafka_new_ssl_context, KAFKA_RESULTS_TOPIC)

Expand Down Expand Up @@ -38,7 +38,9 @@ def is_enabled():
class KafkaConnection(DisabledKafkaConnection):
def __init__(self):
super().__init__()
if KAFKA_URL != DEFAULT_KAFKA_URL:
if IS_KAFKA_RUNNING_LOCALLY:
kwargs = {"bootstrap_servers": KAFKA_URL}
else:
kwargs = {"bootstrap_servers": KAFKA_URL,
"sasl_plain_username": KAFKA_USERNAME,
"sasl_plain_password": KAFKA_PASSWORD,
Expand All @@ -49,8 +51,6 @@ def __init__(self):
"retries": KAFKA_PUBLISH_RETRIES,
"buffer_memory": KAFKA_BUFFER_MEMORY_BYTES,
"max_request_size": KAFKA_MAX_REQUEST_SIZE_BYTES}
else:
kwargs = {"bootstrap_servers": KAFKA_URL}

self.producer = KafkaProducer(**kwargs)

Expand Down

0 comments on commit 3da716e

Please sign in to comment.