diff --git a/nats_events/nats_events/v1_0/nats_queue/events/__init__.py b/nats_events/nats_events/v1_0/nats_queue/events/__init__.py index 6c4a00701..4b577ca95 100644 --- a/nats_events/nats_events/v1_0/nats_queue/events/__init__.py +++ b/nats_events/nats_events/v1_0/nats_queue/events/__init__.py @@ -20,6 +20,7 @@ from nats.aio.client import Client as NATS from nats.aio.errors import ErrConnectionClosed, ErrNoServers, ErrTimeout from nats.js import JetStreamContext +from nats.js.errors import NoStreamResponseError from ..config import EventConfig, OutboundConfig, get_config @@ -57,6 +58,12 @@ async def nats_jetstream_setup(profile: Profile, event: Event) -> JetStreamConte nats = NATS() connect_kwargs = { "servers": [connection_url], + "reconnect_time_wait": 0.5, + "max_reconnect_attempts": -1, + "error_cb": error_callback, + "disconnected_cb": disconnected_callback, + "reconnected_cb": reconnected_callback, + "closed_cb": closed_callback, } if NATS_CREDS_FILE: connect_kwargs["user_credentials"] = NATS_CREDS_FILE @@ -75,6 +82,26 @@ async def nats_jetstream_setup(profile: Profile, event: Event) -> JetStreamConte return js +async def error_callback(e): + """Error callback for NATS.""" + LOGGER.error(f"NATS error: {e}") + + +async def disconnected_callback(): + """Disconnected callback for NATS.""" + LOGGER.warning("Disconnected from NATS server") + + +async def reconnected_callback(): + """Reconnected callback for NATS.""" + LOGGER.info("Reconnected to NATS server") + + +async def closed_callback(): + """Closed callback for NATS.""" + LOGGER.warning("NATS connection closed") + + async def define_stream(js: JetStreamContext, stream_name: str, subjects: list[str]): """Define a JetStream stream.""" try: @@ -185,7 +212,12 @@ async def publish_with_retry( "Published message to subject %s with payload %s", subject, payload ) return - except (ErrConnectionClosed, ErrTimeout, ErrNoServers) as err: + except ( + ErrConnectionClosed, + ErrTimeout, + ErrNoServers, + NoStreamResponseError, + ) as err: LOGGER.warning( "Attempt %d: Failed to publish message to subject %s: %s", attempt + 1,