Skip to content

Commit

Permalink
Merge pull request #62 from keep-network/resubscriptions-dance
Browse files Browse the repository at this point in the history
Subscription dance: backoff, logging, and test coverage


This PR is the first one in the series of PRs refactoring Ethereum event subscriptions mechanism I plan to open in the next few days.

You can see it in action here: keep-network/keep-ecdsa#663.

So far resubscription logging was implemented in contract binding templates. There were three problems with that mechanism:
1. No resubscribe backoff. Each failed subscription was retried 5 seconds after it failed. For many individual subscriptions ECDSA client is opening, a massive retry with no backoff could be interpreted as misbehavior by a third party Ethereum provider and further attempts could be completely blocked. Some operators experienced it with Alchemy.
2. All the logging was on the warning level. With the lack of backoff mentioned in the previous point, this could produce a deadly mixture - keep client could be trying to reconnect for a long time, Ethereum client could be rejecting those attempts interpreting them as DoS/misbehavior, and operator received only warnings with no single error.
3. Lack of test coverage for the resubscription mechanism. All the subscription code was placed in templates used to generate Go contract bindings and was very hard - if possible at all - to test.

Here we address all those problems.

Instead of implementing resubscriptions on our side, we wrap the code from `github.com/ethereum/go-ethereum/event` with some additional logging logic. This code is used from contract templates to keep the subscription alive. We lean on `go-ethereum` to do those resubscriptions right and we cover our logic addition in unit tests on our side.

The code from `go-ethereum` implements backoffs, increasing the delay twice until it reaches the maximum backoff time which is set in our bindings to 2 minutes.

The new code in `ethutil` wrapping `go-ethereum`'s resubscriber, allows to log an error if the subscription is dropped too often which may indicate problems with Ethereum client. The threshold is set to 15 minutes.
  • Loading branch information
nkuba authored Jan 27, 2021
2 parents c7eca9a + c6adefd commit bb217b9
Show file tree
Hide file tree
Showing 6 changed files with 460 additions and 194 deletions.
55 changes: 55 additions & 0 deletions pkg/chain/ethereum/ethutil/resubscribe.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package ethutil

import (
"context"
"time"

"github.com/ethereum/go-ethereum/event"
)

// WithResubscription wraps the subscribe function to call it repeatedly
// to keep a subscription alive. When a subscription is established, it is
// monitored and in the case of a failure, resubscribe is attempted by
// calling the subscribe function again.
//
// The mechanism applies backoff between resubscription attempts.
// The time between calls is adapted based on the error rate, but will never
// exceed backoffMax.
//
// The mechanism monitors the time elapsed between resubscription attempts and
// if it is shorter than the specificed alertThreshold, it calls
// thresholdViolatedFn passing the time elapsed between resubscription attempts.
// This function alarms about potential problems with the stability of the
// subscription.
//
// In case of an error returned by the wrapped subscription function,
// subscriptionFailedFn is called with the underlying error.
//
// thresholdViolatedFn and subscriptionFailedFn calls are executed in a separate
// goroutine and thus are non-blocking.
func WithResubscription(
backoffMax time.Duration,
subscribeFn event.ResubscribeFunc,
alertThreshold time.Duration,
thresholdViolatedFn func(time.Duration),
subscriptionFailedFn func(error),
) event.Subscription {
lastAttempt := time.Time{}
wrappedResubscribeFn := func(ctx context.Context) (event.Subscription, error) {
now := time.Now()
elapsed := now.Sub(lastAttempt)
if elapsed < alertThreshold {
go thresholdViolatedFn(elapsed)
}

lastAttempt = now

sub, err := subscribeFn(ctx)
if err != nil {
go subscriptionFailedFn(err)
}
return sub, err
}

return event.Resubscribe(backoffMax, wrappedResubscribeFn)
}
289 changes: 289 additions & 0 deletions pkg/chain/ethereum/ethutil/resubscribe_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,289 @@
package ethutil

import (
"context"
"fmt"
"testing"
"time"

"github.com/ethereum/go-ethereum/event"
)

func TestEmitOriginalError(t *testing.T) {
backoffMax := 100 * time.Millisecond
alertThreshold := 100 * time.Millisecond

failedOnce := false
expectedFailMessage := "wherever I go, he goes"
subscribeFn := func(ctx context.Context) (event.Subscription, error) {
if !failedOnce {
failedOnce = true
return nil, fmt.Errorf(expectedFailMessage)
}
delegate := event.NewSubscription(func(unsubscribed <-chan struct{}) error {
return nil
})
return delegate, nil
}

// Using buffered channels to do not block writes.
// There should never be a need to write more to those channels if the code
// under the test works as expected.
thresholdViolated := make(chan time.Duration, 10)
subscriptionFailed := make(chan error, 10)
subscription := WithResubscription(
backoffMax,
subscribeFn,
alertThreshold,
func(elapsed time.Duration) { thresholdViolated <- elapsed },
func(err error) { subscriptionFailed <- err },
)
<-subscription.Err()

// Subscription failed one time so there should be one error in the channel.
subscriptionFailCount := len(subscriptionFailed)
if subscriptionFailCount != 1 {
t.Fatalf(
"subscription failure reported [%v] times, expected [1]",
subscriptionFailCount,
)
}

// That failure should refer the original error.
err := <-subscriptionFailed
if err.Error() != expectedFailMessage {
t.Errorf(
"unexpected subscription error message\nexpected: [%v]\nactual: [%v]",
expectedFailMessage,
err.Error(),
)
}
}

func TestResubscribeAboveThreshold(t *testing.T) {
backoffMax := 100 * time.Millisecond
alertThreshold := 100 * time.Millisecond

plannedSubscriptionFailures := 3
elapsedBetweenFailures := 150 * time.Millisecond

resubscribeFnCalls := 0
subscribeFn := func(ctx context.Context) (event.Subscription, error) {
resubscribeFnCalls++
time.Sleep(elapsedBetweenFailures) // 150ms > 100ms, above alert threshold
if resubscribeFnCalls <= plannedSubscriptionFailures {
return nil, fmt.Errorf("this is the way")
}
delegate := event.NewSubscription(func(unsubscribed <-chan struct{}) error {
return nil
})
return delegate, nil
}

// Using buffered channels to do not block writes.
// There should never be a need to write more to those channels if the code
// under the test works as expected.
thresholdViolated := make(chan time.Duration, 10)
subscriptionFailed := make(chan error, 10)
subscription := WithResubscription(
backoffMax,
subscribeFn,
alertThreshold,
func(elapsed time.Duration) { thresholdViolated <- elapsed },
func(err error) { subscriptionFailed <- err },
)
<-subscription.Err()

// Nothing expected in thresholdViolated channel.
// Alert threshold is set to 100ms and there were no resubscription attempts
// in a time shorter than 150ms one after another.
violationCount := len(thresholdViolated)
if violationCount != 0 {
t.Errorf(
"threshold violation reported [%v] times, expected none",
violationCount,
)
}

// Subscription failed plannedSubscriptionFailures times and resubscription
// function should be called plannedSubscriptionFailures + 1 times. One time
// for each failure and one time at the end - that subscription was
// successful and had not to be retried.
expectedResubscriptionCalls := plannedSubscriptionFailures + 1
if resubscribeFnCalls != expectedResubscriptionCalls {
t.Errorf(
"resubscription called [%v] times, expected [%v]",
resubscribeFnCalls,
expectedResubscriptionCalls,
)
}

// Expect all subscription failures to be reported.
subscriptionFailCount := len(subscriptionFailed)
if subscriptionFailCount != plannedSubscriptionFailures {
t.Errorf(
"subscription failure reported [%v] times, expected [%v]",
subscriptionFailCount,
plannedSubscriptionFailures,
)
}
}

func TestResubscribeBelowThreshold(t *testing.T) {
backoffMax := 50 * time.Millisecond
alertThreshold := 100 * time.Millisecond

plannedSubscriptionFailures := 5
elapsedBetweenFailures := 50 * time.Millisecond

resubscribeFnCalls := 0
subscribeFn := func(ctx context.Context) (event.Subscription, error) {
resubscribeFnCalls++
time.Sleep(elapsedBetweenFailures) // 50ms < 100ms, below alert threshold
if resubscribeFnCalls <= plannedSubscriptionFailures {
return nil, fmt.Errorf("i have spoken")
}
delegate := event.NewSubscription(func(unsubscribed <-chan struct{}) error {
return nil
})
return delegate, nil
}

// Using buffered channels to do not block writes.
// There should never be a need to write more to those channels if the code
// under the test works as expected.
thresholdViolated := make(chan time.Duration, 10)
subscriptionFailed := make(chan error, 10)
subscription := WithResubscription(
backoffMax,
subscribeFn,
alertThreshold,
func(elapsed time.Duration) { thresholdViolated <- elapsed },
func(err error) { subscriptionFailed <- err },
)
<-subscription.Err()

// Threshold violaton should be reported for each subscription failure if
// the time elapsed since the previous resubscription was shorter than the
// threshold.
// In this test, alert threshold is set to 100ms and delays between failures
// are just 50ms. Thus, we expect the same number of threshold violations as
// resubscription attempts.
violationCount := len(thresholdViolated)
if violationCount != plannedSubscriptionFailures {
t.Errorf(
"threshold violation reported [%v] times, expected [%v]",
violationCount,
plannedSubscriptionFailures,
)
}

// All violations reported should have correct values - all of them should
// be longer than the time elapsed between failures and shorter than the
// alert threshold. It is not possible to assert on a precise value.
for i := 0; i < violationCount; i++ {
violation := <-thresholdViolated
if violation < elapsedBetweenFailures {
t.Errorf(
"violation reported should be longer than the time elapsed "+
"between failures; is: [%v] and should be longer than [%v]",
violation,
elapsedBetweenFailures,
)
}
if violation > alertThreshold {
t.Errorf(
"violation reported should be shorter than the alert threshold; "+
"; is: [%v] and should be shorter than [%v]",
violation,
alertThreshold,
)
}
}

// Subscription failed plannedSubscriptionFailures times and resubscription
// function should be called plannedSubscriptionFailures + 1 times. One time
// for each failure and one time at the end - that subscription was
// successful and had not to be retried.
expectedResubscriptionCalls := plannedSubscriptionFailures + 1
if resubscribeFnCalls != expectedResubscriptionCalls {
t.Errorf(
"resubscription called [%v] times, expected [%v]",
resubscribeFnCalls,
expectedResubscriptionCalls,
)
}

// Expect all subscription failures to be reported.
subscriptionFailCount := len(subscriptionFailed)
if subscriptionFailCount != plannedSubscriptionFailures {
t.Errorf(
"subscription failure reported [%v] times, expected [%v]",
subscriptionFailCount,
plannedSubscriptionFailures,
)
}
}

func TestDoNotBlockOnChannelWrites(t *testing.T) {
backoffMax := 50 * time.Millisecond
alertThreshold := 100 * time.Millisecond

plannedSubscriptionFailures := 5
elapsedBetweenFailures := 10 * time.Millisecond

resubscribeFnCalls := 0
subscribeFn := func(ctx context.Context) (event.Subscription, error) {
resubscribeFnCalls++
time.Sleep(elapsedBetweenFailures) // 10ms < 100ms, below alert threshold
if resubscribeFnCalls <= plannedSubscriptionFailures {
return nil, fmt.Errorf("Groku?")
}
delegate := event.NewSubscription(func(unsubscribed <-chan struct{}) error {
return nil
})
return delegate, nil
}

// Non-buffered channels with no receivers, will block on write
thresholdViolated := make(chan time.Duration)
subscriptionFailed := make(chan error)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

subscription := WithResubscription(
backoffMax,
subscribeFn,
alertThreshold,
func(elapsed time.Duration) {
select {
case thresholdViolated <- elapsed:
case <-ctx.Done():
return
}
},
func(err error) {
select {
case subscriptionFailed <- err:
case <-ctx.Done():
return
}
},
)
<-subscription.Err()

// Subscription failed plannedSubscriptionFailures times and resubscription
// function should be called plannedSubscriptionFailures + 1 times. One time
// for each failure and one time at the end - that subscription was
// successful and had not to be retried. No resubscription attempt should be
// blocked by the lack of channel receivers on non-buffered channels.
expectedResubscriptionCalls := plannedSubscriptionFailures + 1
if resubscribeFnCalls != expectedResubscriptionCalls {
t.Errorf(
"resubscription called [%v] times, expected [%v]",
resubscribeFnCalls,
expectedResubscriptionCalls,
)
}
}
15 changes: 15 additions & 0 deletions tools/generators/ethereum/contract.go.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,21 @@ import (
// included or excluded from logging at startup by name.
var {{.ShortVar}}Logger = log.Logger("keep-contract-{{.Class}}")

const (
// Maximum backoff time between event resubscription attempts.
{{.ShortVar}}SubscriptionBackoffMax = 2 * time.Minute

// Threshold below which event resubscription emits an error to the logs.
// WS connection can be dropped at any moment and event resubscription will
// follow. However, if WS connection for event subscription is getting
// dropped too often, it may indicate something is wrong with Ethereum
// client. This constant defines the minimum lifetime of an event
// subscription required before the subscription failure happens and
// resubscription follows so that the resubscription does not emit an error
// to the logs alerting about potential problems with Ethereum client.
{{.ShortVar}}SubscriptionAlertThreshold = 15 * time.Minute
)

type {{.Class}} struct {
contract *abi.{{.AbiClass}}
contractAddress common.Address
Expand Down
Loading

0 comments on commit bb217b9

Please sign in to comment.