Skip to content

Commit

Permalink
Merge pull request #106 from getAlby/task-backoff
Browse files Browse the repository at this point in the history
feat: add exponential backoff to subscriptions
  • Loading branch information
im-adithya authored Aug 27, 2024
2 parents f73fec3 + 8bc1433 commit 12d12b1
Showing 1 changed file with 21 additions and 10 deletions.
31 changes: 21 additions & 10 deletions internal/nostr/nostr.go
Original file line number Diff line number Diff line change
Expand Up @@ -699,33 +699,43 @@ func (svc *Service) startSubscription(ctx context.Context, subscription *Subscri
var relay *nostr.Relay
var isCustomRelay bool
var err error
waitToReconnectSeconds := 0

for {
// close relays with connection errors before connecting again
// because context expiration has no effect on relays
// TODO: Call relay.Connect on already initialized relays
if relay != nil && isCustomRelay {
// context expiration has no effect on relays
if relay != nil && relay.Connection != nil && isCustomRelay {
relay.Close()
}
if ctx.Err() != nil {
svc.Logger.WithFields(logrus.Fields{
"subscription_id": subscription.ID,
"relay_url": subscription.RelayUrl,
}).Debug("Context canceled, stopping subscription")
svc.stopSubscription(subscription)
return
}
time.Sleep(time.Duration(waitToReconnectSeconds) * time.Second)
relay, isCustomRelay, err = svc.getRelayConnection(ctx, subscription.RelayUrl)
if err != nil {
// TODO: notify user about relay failure
waitToReconnectSeconds = max(waitToReconnectSeconds, 1)
waitToReconnectSeconds = min(waitToReconnectSeconds * 2, 900)
svc.Logger.WithError(err).WithFields(logrus.Fields{
"subscription_id": subscription.ID,
"relay_url": subscription.RelayUrl,
}).Error("Failed get relay connection, retrying in 5s...")
time.Sleep(5 * time.Second) // sleep for 5 seconds
}).Errorf("Failed to connect to relay, retrying in %vs...", waitToReconnectSeconds)
continue
}

sub, err := relay.Subscribe(ctx, []nostr.Filter{*filter})
if err != nil {
// TODO: notify user about subscription failure
waitToReconnectSeconds = max(waitToReconnectSeconds, 1)
waitToReconnectSeconds = min(waitToReconnectSeconds * 2, 900)
svc.Logger.WithError(err).WithFields(logrus.Fields{
"subscription_id": subscription.ID,
"relay_url": subscription.RelayUrl,
}).Error("Failed to subscribe to relay, retrying in 5s...")
time.Sleep(5 * time.Second) // sleep for 5 seconds
}).Errorf("Failed to subscribe to relay, retrying in %vs...", waitToReconnectSeconds)
continue
}

Expand All @@ -738,15 +748,16 @@ func (svc *Service) startSubscription(ctx context.Context, subscription *Subscri
"relay_url": subscription.RelayUrl,
}).Debug("Started subscription")

waitToReconnectSeconds = 0

err = svc.processEvents(ctx, subscription, onReceiveEOS, handleEvent)

if err != nil {
// TODO: notify user about subscription failure
svc.Logger.WithError(err).WithFields(logrus.Fields{
"subscription_id": subscription.ID,
"relay_url": subscription.RelayUrl,
}).Error("Subscription stopped due to relay error, reconnecting in 5s...")
time.Sleep(5 * time.Second) // sleep for 5 seconds
}).Error("Subscription stopped due to relay error, reconnecting...")
continue
} else {
if isCustomRelay {
Expand Down

0 comments on commit 12d12b1

Please sign in to comment.