Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Nats reconnect #858

Merged
merged 6 commits into from
Nov 12, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 33 additions & 1 deletion nats_events/nats_events/v1_0/nats_queue/events/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from nats.aio.client import Client as NATS
from nats.aio.errors import ErrConnectionClosed, ErrNoServers, ErrTimeout
from nats.js import JetStreamContext
from nats.js.errors import NoStreamResponseError

from ..config import EventConfig, OutboundConfig, get_config

Expand Down Expand Up @@ -57,6 +58,12 @@ async def nats_jetstream_setup(profile: Profile, event: Event) -> JetStreamConte
nats = NATS()
connect_kwargs = {
"servers": [connection_url],
"reconnect_time_wait": 0.5,
"max_reconnect_attempts": -1,
"error_cb": error_callback,
"disconnected_cb": disconnected_callback,
"reconnected_cb": reconnected_callback,
"closed_cb": closed_callback,
}
if NATS_CREDS_FILE:
connect_kwargs["user_credentials"] = NATS_CREDS_FILE
Expand All @@ -75,6 +82,26 @@ async def nats_jetstream_setup(profile: Profile, event: Event) -> JetStreamConte
return js


async def error_callback(e):
"""Error callback for NATS."""
LOGGER.error(f"NATS error: {e}")


async def disconnected_callback():
"""Disconnected callback for NATS."""
LOGGER.warning("Disconnected from NATS server")


async def reconnected_callback():
"""Reconnected callback for NATS."""
LOGGER.info("Reconnected to NATS server")


async def closed_callback():
"""Closed callback for NATS."""
LOGGER.warning("NATS connection closed")


async def define_stream(js: JetStreamContext, stream_name: str, subjects: list[str]):
"""Define a JetStream stream."""
try:
Expand Down Expand Up @@ -185,7 +212,12 @@ async def publish_with_retry(
"Published message to subject %s with payload %s", subject, payload
)
return
except (ErrConnectionClosed, ErrTimeout, ErrNoServers) as err:
except (
ErrConnectionClosed,
ErrTimeout,
ErrNoServers,
NoStreamResponseError,
) as err:
LOGGER.warning(
"Attempt %d: Failed to publish message to subject %s: %s",
attempt + 1,
Expand Down
Loading