From 06a66d8ecdf7d2d3074b88274a1a48c66135acd6 Mon Sep 17 00:00:00 2001 From: cl0ete Date: Wed, 6 Nov 2024 14:18:27 +0200 Subject: [PATCH 1/6] reduce reconnect wait time and add callbacks --- .../v1_0/nats_queue/events/__init__.py | 22 +++++++++++++++++++ 1 file changed, 22 insertions(+) diff --git a/nats_events/nats_events/v1_0/nats_queue/events/__init__.py b/nats_events/nats_events/v1_0/nats_queue/events/__init__.py index 598ffe9b8..f5fa8d325 100644 --- a/nats_events/nats_events/v1_0/nats_queue/events/__init__.py +++ b/nats_events/nats_events/v1_0/nats_queue/events/__init__.py @@ -57,6 +57,12 @@ async def nats_jetstream_setup(profile: Profile, event: Event) -> JetStreamConte nats = NATS() connect_kwargs = { "servers": [connection_url], + "reconnect_time_wait": 0.5, + "max_reconnect_attempts": -1, + "error_cb": error_callback, + "disconnected_cb": disconnected_callback, + "reconnected_cb": reconnected_callback, + "closed_cb": closed_callback, } if NATS_CREDS_FILE: connect_kwargs["user_credentials"] = NATS_CREDS_FILE @@ -75,6 +81,22 @@ async def nats_jetstream_setup(profile: Profile, event: Event) -> JetStreamConte return js +async def error_callback(e): + LOGGER.error("NATS error: {}", str(e)) + + +async def disconnected_callback(): + LOGGER.warning("Disconnected from NATS server") + + +async def reconnected_callback(): + LOGGER.info("Reconnected to NATS server") + + +async def closed_callback(): + LOGGER.warning("NATS connection closed") + + async def define_stream(js: JetStreamContext, stream_name: str, subjects: list[str]): """Define a JetStream stream.""" try: From e8d583ad299e646cbe65369b324427415501ba17 Mon Sep 17 00:00:00 2001 From: cl0ete Date: Wed, 6 Nov 2024 15:56:18 +0200 Subject: [PATCH 2/6] add NoStreamResponseError to exception list --- nats_events/nats_events/v1_0/nats_queue/events/__init__.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/nats_events/nats_events/v1_0/nats_queue/events/__init__.py b/nats_events/nats_events/v1_0/nats_queue/events/__init__.py index f5fa8d325..18c813e9b 100644 --- a/nats_events/nats_events/v1_0/nats_queue/events/__init__.py +++ b/nats_events/nats_events/v1_0/nats_queue/events/__init__.py @@ -20,6 +20,7 @@ from nats.aio.client import Client as NATS from nats.aio.errors import ErrConnectionClosed, ErrNoServers, ErrTimeout from nats.js import JetStreamContext +from nats.js.errors import NoStreamResponseError from ..config import EventConfig, OutboundConfig, get_config @@ -207,7 +208,7 @@ async def publish_with_retry( "Published message to subject %s with payload %s", subject, payload ) return - except (ErrConnectionClosed, ErrTimeout, ErrNoServers) as err: + except (ErrConnectionClosed, ErrTimeout, ErrNoServers, NoStreamResponseError) as err: LOGGER.warning( "Attempt %d: Failed to publish message to subject %s: %s", attempt + 1, From 03378faee1d2fa62518ec60a6b2f7a300f905517 Mon Sep 17 00:00:00 2001 From: cl0ete Date: Thu, 7 Nov 2024 08:43:46 +0200 Subject: [PATCH 3/6] add docstrings --- nats_events/nats_events/v1_0/nats_queue/events/__init__.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/nats_events/nats_events/v1_0/nats_queue/events/__init__.py b/nats_events/nats_events/v1_0/nats_queue/events/__init__.py index 18c813e9b..432a4f47d 100644 --- a/nats_events/nats_events/v1_0/nats_queue/events/__init__.py +++ b/nats_events/nats_events/v1_0/nats_queue/events/__init__.py @@ -83,18 +83,22 @@ async def nats_jetstream_setup(profile: Profile, event: Event) -> JetStreamConte async def error_callback(e): + """Error callback for NATS.""" LOGGER.error("NATS error: {}", str(e)) async def disconnected_callback(): + """Disconnected callback for NATS.""" LOGGER.warning("Disconnected from NATS server") async def reconnected_callback(): + """Reconnected callback for NATS.""" LOGGER.info("Reconnected to NATS server") async def closed_callback(): + """Closed callback for NATS.""" LOGGER.warning("NATS connection closed") From 44d7833a14e94ee1933f4e943daac5e34e3f1f1c Mon Sep 17 00:00:00 2001 From: cl0ete Date: Thu, 7 Nov 2024 08:46:28 +0200 Subject: [PATCH 4/6] :art: --- nats_events/nats_events/v1_0/nats_queue/events/__init__.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/nats_events/nats_events/v1_0/nats_queue/events/__init__.py b/nats_events/nats_events/v1_0/nats_queue/events/__init__.py index 432a4f47d..1f2467e63 100644 --- a/nats_events/nats_events/v1_0/nats_queue/events/__init__.py +++ b/nats_events/nats_events/v1_0/nats_queue/events/__init__.py @@ -212,7 +212,12 @@ async def publish_with_retry( "Published message to subject %s with payload %s", subject, payload ) return - except (ErrConnectionClosed, ErrTimeout, ErrNoServers, NoStreamResponseError) as err: + except ( + ErrConnectionClosed, + ErrTimeout, + ErrNoServers, + NoStreamResponseError, + ) as err: LOGGER.warning( "Attempt %d: Failed to publish message to subject %s: %s", attempt + 1, From 180fd95f3feeabedbf5cf78f83fa3f8988147ef0 Mon Sep 17 00:00:00 2001 From: cl0ete Date: Thu, 7 Nov 2024 10:48:27 +0200 Subject: [PATCH 5/6] update error logging --- nats_events/nats_events/v1_0/nats_queue/events/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/nats_events/nats_events/v1_0/nats_queue/events/__init__.py b/nats_events/nats_events/v1_0/nats_queue/events/__init__.py index 1f2467e63..adf15901f 100644 --- a/nats_events/nats_events/v1_0/nats_queue/events/__init__.py +++ b/nats_events/nats_events/v1_0/nats_queue/events/__init__.py @@ -84,7 +84,7 @@ async def nats_jetstream_setup(profile: Profile, event: Event) -> JetStreamConte async def error_callback(e): """Error callback for NATS.""" - LOGGER.error("NATS error: {}", str(e)) + LOGGER.error("NATS error: {}".format(e)) async def disconnected_callback(): From 89b50f9c07372c0ba07cfa4c219614b7cbe1dedd Mon Sep 17 00:00:00 2001 From: cl0ete Date: Thu, 7 Nov 2024 10:53:00 +0200 Subject: [PATCH 6/6] update error logs --- nats_events/nats_events/v1_0/nats_queue/events/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/nats_events/nats_events/v1_0/nats_queue/events/__init__.py b/nats_events/nats_events/v1_0/nats_queue/events/__init__.py index adf15901f..3abcf9e65 100644 --- a/nats_events/nats_events/v1_0/nats_queue/events/__init__.py +++ b/nats_events/nats_events/v1_0/nats_queue/events/__init__.py @@ -84,7 +84,7 @@ async def nats_jetstream_setup(profile: Profile, event: Event) -> JetStreamConte async def error_callback(e): """Error callback for NATS.""" - LOGGER.error("NATS error: {}".format(e)) + LOGGER.error(f"NATS error: {e}") async def disconnected_callback():