Skip to content

Commit

Permalink
use retry for backoff
Browse files Browse the repository at this point in the history
  • Loading branch information
cl0ete committed Dec 5, 2024
1 parent dfa342c commit afe043e
Showing 1 changed file with 42 additions and 60 deletions.
102 changes: 42 additions & 60 deletions waypoint/services/nats_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from nats.js.api import ConsumerConfig, DeliverPolicy
from nats.js.client import JetStreamContext
from nats.js.errors import FetchTimeoutError
from retry import retry

from shared.constants import NATS_STATE_STREAM, NATS_STATE_SUBJECT
from shared.log_config import get_logger
Expand All @@ -28,72 +29,53 @@ def __init__(self, jetstream: JetStreamContext):
async def _subscribe(
self, *, group_id: str, wallet_id: str, topic: str, state: str, look_back: int
) -> JetStreamContext.PullSubscription:
try:
logger.trace(
"Subscribing to JetStream for wallet_id: {}, group_id: {}",
wallet_id,
group_id,
)
group_id = group_id or "*"
subscribe_kwargs = {
"subject": f"{NATS_STATE_SUBJECT}.{group_id}.{wallet_id}.{topic}.{state}",
"stream": NATS_STATE_STREAM,
}

# Get the current time in UTC
current_time = datetime.now(timezone.utc)
logger.trace(
"Subscribing to JetStream for wallet_id: {}, group_id: {}",
wallet_id,
group_id,
)

# Subtract look_back time from the current time
look_back_time = current_time - timedelta(seconds=look_back)
group_id = group_id or "*"
subscribe_kwargs = {
"subject": f"{NATS_STATE_SUBJECT}.{group_id}.{wallet_id}.{topic}.{state}",
"stream": NATS_STATE_STREAM,
}

# Format the time in the required format
start_time = look_back_time.strftime("%Y-%m-%dT%H:%M:%S.%f")[:-3] + "Z"
config = ConsumerConfig(
deliver_policy=DeliverPolicy.BY_START_TIME,
opt_start_time=start_time,
)
attempt = 0
while True:
try:
logger.trace(
"Subscribing to JetStream (Attempt {}) for wallet_id: {}, group_id: {}",
attempt + 1,
wallet_id,
group_id,
)
# Get the current time in UTC
current_time = datetime.now(timezone.utc)

subscription = await self.js_context.pull_subscribe(
config=config, **subscribe_kwargs
)
# Subtract look_back time from the current time
look_back_time = current_time - timedelta(seconds=look_back)

return subscription

except TimeoutError as e:

if attempt < 4:
backoff_time = min(2**attempt, 16)
logger.warning(
f"Timeout error. Retrying in {backoff_time:.2f} seconds "
f"(Attempt {attempt + 1})"
)
await asyncio.sleep(backoff_time)
else:
# After reaching 16 seconds, retry with fixed 16-second delay
logger.warning(
f"Timeout error. Retrying with fixed 16-second delay "
f"(Attempt {attempt + 1})"
)
await asyncio.sleep(16)

attempt += 1

except BadSubscriptionError as e:
logger.error("BadSubscriptionError subscribing to NATS: {}", e)
raise
except Error as e:
logger.error("Error subscribing to NATS: {}", e)
raise
# Format the time in the required format
start_time = look_back_time.strftime("%Y-%m-%dT%H:%M:%S.%f")[:-3] + "Z"
config = ConsumerConfig(
deliver_policy=DeliverPolicy.BY_START_TIME,
opt_start_time=start_time,
)

@retry(TimeoutError, delay=1, backoff=2, max_delay=16, logger=logger)
async def pull_subscribe(config, **kwargs):
try:
logger.trace(
"Subscribing to JetStream for wallet_id: {}, group_id: {}",
wallet_id,
group_id,
)
subscription = await self.js_context.pull_subscribe(
config=config, **kwargs
)
return subscription
except BadSubscriptionError as e:
logger.error("BadSubscriptionError subscribing to NATS: {}", e)
raise
except Error as e:
logger.error("Error subscribing to NATS: {}", e)
raise

try:
return await pull_subscribe(config, **subscribe_kwargs)
except Exception:
logger.exception("Unknown error subscribing to NATS")
raise
Expand Down

0 comments on commit afe043e

Please sign in to comment.