Skip to content

Commit

Permalink
Nats reconnect (#858)
Browse files Browse the repository at this point in the history
* reduce reconnect wait time and add callbacks

* add NoStreamResponseError to exception list

* add docstrings

* 🎨

* update error logging

* update error logs

---------

Co-authored-by: cl0ete <cloete.dupreez@gmail.com>
  • Loading branch information
ff137 and cl0ete authored Nov 12, 2024
1 parent 509929b commit 8fe294b
Showing 1 changed file with 33 additions and 1 deletion.
34 changes: 33 additions & 1 deletion nats_events/nats_events/v1_0/nats_queue/events/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from nats.aio.client import Client as NATS
from nats.aio.errors import ErrConnectionClosed, ErrNoServers, ErrTimeout
from nats.js import JetStreamContext
from nats.js.errors import NoStreamResponseError

from ..config import EventConfig, OutboundConfig, get_config

Expand Down Expand Up @@ -57,6 +58,12 @@ async def nats_jetstream_setup(profile: Profile, event: Event) -> JetStreamConte
nats = NATS()
connect_kwargs = {
"servers": [connection_url],
"reconnect_time_wait": 0.5,
"max_reconnect_attempts": -1,
"error_cb": error_callback,
"disconnected_cb": disconnected_callback,
"reconnected_cb": reconnected_callback,
"closed_cb": closed_callback,
}
if NATS_CREDS_FILE:
connect_kwargs["user_credentials"] = NATS_CREDS_FILE
Expand All @@ -75,6 +82,26 @@ async def nats_jetstream_setup(profile: Profile, event: Event) -> JetStreamConte
return js


async def error_callback(e):
"""Error callback for NATS."""
LOGGER.error(f"NATS error: {e}")


async def disconnected_callback():
"""Disconnected callback for NATS."""
LOGGER.warning("Disconnected from NATS server")


async def reconnected_callback():
"""Reconnected callback for NATS."""
LOGGER.info("Reconnected to NATS server")


async def closed_callback():
"""Closed callback for NATS."""
LOGGER.warning("NATS connection closed")


async def define_stream(js: JetStreamContext, stream_name: str, subjects: list[str]):
"""Define a JetStream stream."""
try:
Expand Down Expand Up @@ -185,7 +212,12 @@ async def publish_with_retry(
"Published message to subject %s with payload %s", subject, payload
)
return
except (ErrConnectionClosed, ErrTimeout, ErrNoServers) as err:
except (
ErrConnectionClosed,
ErrTimeout,
ErrNoServers,
NoStreamResponseError,
) as err:
LOGGER.warning(
"Attempt %d: Failed to publish message to subject %s: %s",
attempt + 1,
Expand Down

0 comments on commit 8fe294b

Please sign in to comment.