Skip to content

Commit

Permalink
Merge pull request #18 from real-digital/enable-retrying-offset-commits
Browse files Browse the repository at this point in the history
Added the retry logic.
  • Loading branch information
ognjen-j authored Jul 13, 2020
2 parents e2fa950 + d6fa0cf commit f34ea72
Show file tree
Hide file tree
Showing 6 changed files with 50 additions and 4 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
### Version 0.4.5
* Enabled the Sink to retry committing offsets in case of failure. The maximum number of retries is configurable through
the `sink_commit_retry_count` field.
### Version 0.4.4
* Enable JSON logging using loguru
### Version 0.4.3
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

setup(
name="pyconnect",
version="0.4.4",
version="0.4.5",
packages=["pyconnect"],
package_dir={"": "src"},
# minimal requirements to run pyconnect
Expand Down
11 changes: 11 additions & 0 deletions src/pyconnect/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,10 @@ def bool_from_string_parser(string: str) -> bool:
return string.lower() == "true"


def int_from_string_parser(string: str) -> int:
return int(string)


def csv_line_reader(separator=",", quoter='"', escaper="\\", strip_chars="\r\t\n ") -> Callable[[str], List[str]]:
"""
Creates a function that parses a **line** in csv format using the given parameters and returns a list of strings.
Expand Down Expand Up @@ -395,6 +399,10 @@ class BaseConfig(dict):
30 minutes. See :func:`pyconnect.config.timedelta_parser` for more info.
*Default is 30m*
**sink_commit_retry_count**: int
The number of retries for the Sink when committing offsets.
Default: 2
**hash_sensitive_values**: bool
If true, sensitive values (e.g. sasl.password) from the kafka_opts configurations are
hashed and logged with the hashing parameters, so that the values can be validated.
Expand All @@ -412,13 +420,15 @@ class BaseConfig(dict):

__sanity_checks = [
"{offset_commit_interval}>0",
"{sink_commit_retry_count}>0",
check_field_is_valid_url("schema_registry"),
check_field_is_valid_url("bootstrap_servers"),
]

__parsers = {
"bootstrap_servers": csv_line_reader(),
"offset_commit_interval": timedelta_parser,
"sink_commit_retry_count": int_from_string_parser,
"hash_sensitive_values": bool_from_string_parser,
"unify_logging": bool_from_string_parser,
"kafka_opts": json.loads,
Expand All @@ -429,6 +439,7 @@ def __init__(self, conf_dict: Dict[str, Any]) -> None:
self["bootstrap_servers"] = conf_dict.pop("bootstrap_servers")
self["schema_registry"] = conf_dict.pop("schema_registry")
self["offset_commit_interval"] = conf_dict.pop("offset_commit_interval", "30m")
self["sink_commit_retry_count"] = conf_dict.pop("sink_commit_retry_count", "2")
self["hash_sensitive_values"] = conf_dict.pop("hash_sensitive_values", "true")
self["unify_logging"] = conf_dict.pop("unify_logging", "true")
self["kafka_opts"] = conf_dict.pop("kafka_opts", {})
Expand Down
19 changes: 16 additions & 3 deletions src/pyconnect/pyconnectsink.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

from confluent_kafka import Message, TopicPartition
from confluent_kafka.avro import AvroConsumer
from confluent_kafka.cimpl import KafkaError
from confluent_kafka.cimpl import KafkaError, KafkaException
from loguru import logger

from pyconnect.config import configure_logging
Expand Down Expand Up @@ -378,8 +378,21 @@ def _commit(self) -> None:
if not offsets:
logger.info("No offsets to commit.")
else:
logger.info(f"Committing offsets: {offsets}")
self._consumer.commit(offsets=offsets, asynchronous=False)
max_attempts: int = self.config["sink_commit_retry_count"]
attempt_count: int = 1
while attempt_count <= max_attempts:
try:
logger.info(f"Committing offsets: {offsets}")
self._consumer.commit(offsets=offsets, asynchronous=False)
break
except KafkaException as ke:
logger.warning(
f"Kafka exception occurred while comitting offsets (attempt {attempt_count}): {str(ke)}"
)
if attempt_count == max_attempts:
raise
else:
attempt_count += 1

def on_shutdown(self):
"""
Expand Down
18 changes: 18 additions & 0 deletions test/test_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ def test_host_splitting():
bootstrap_servers=servers,
schema_registry="localhost",
offset_commit_interval=1,
sink_commit_retry_count=2,
group_id="groupid",
topics="topics",
)
Expand All @@ -114,12 +115,14 @@ def test_sanity_check_success():
bootstrap_servers="localhost",
schema_registry="localhost",
offset_commit_interval=1,
sink_commit_retry_count=2,
group_id="groupid",
topics="topics",
)
)

assert config["offset_commit_interval"] == 1
assert config["sink_commit_retry_count"] == 2


def test_sanity_check_failure(caplog):
Expand All @@ -140,3 +143,18 @@ def test_sanity_check_failure_subclass(caplog):
topics="topics",
)
)


def test_sanity_check_failure_commit_retry(caplog):
caplog.set_level(logging.DEBUG)
with pytest.raises(SanityError):
SinkConfig(
dict(
bootstrap_servers="localhost",
schema_registry="locahlost",
offset_commit_interval=5,
sink_commit_retry_count=-2,
group_id="groupid",
topics="topics",
)
)
1 change: 1 addition & 0 deletions test/test_pyconnectsink.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ def sink_factory():
bootstrap_servers="localhost",
schema_registry="localhost",
offset_commit_interval=1,
sink_commit_retry_count=2,
group_id="group_id",
poll_timeout=1,
topics="",
Expand Down

0 comments on commit f34ea72

Please sign in to comment.