Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

v2.2.0 fix(ec): enable time based out of sync tolerance #1959

Draft
wants to merge 5 commits into
base: stage
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 35 additions & 6 deletions beacon/goclient/goclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ type GoClient struct {

syncDistanceTolerance phase0.Slot
nodeSyncingFn func(ctx context.Context, opts *api.NodeSyncingOpts) (*api.Response[*apiv1.SyncState], error)
lastHealthy time.Time

operatorDataStore operatordatastore.OperatorDataStore

Expand Down Expand Up @@ -233,6 +234,19 @@ func (gc *GoClient) NodeClient() NodeClient {

var errSyncing = errors.New("syncing")

const unhealthyDurationTolerance = 1 * time.Minute

// Checks if the error occurred within the tolerance window
// and suppresses it if true
func (gc *GoClient) checkIsWithinTolerance(err error) error {
unhealthyDuration := time.Since(gc.lastHealthy)
if unhealthyDuration < unhealthyDurationTolerance {
return nil
}

return err
}

// Healthy returns if beacon node is currently healthy: responds to requests, not in the syncing state, not optimistic
// (for optimistic see https://github.com/ethereum/consensus-specs/blob/dev/sync/optimistic.md#block-production).
func (gc *GoClient) Healthy(ctx context.Context) error {
Expand All @@ -244,36 +258,51 @@ func (gc *GoClient) Healthy(ctx context.Context) error {
)
// TODO: get rid of global variable, pass metrics to goClient
recordBeaconClientStatus(ctx, statusUnknown, gc.client.Address())
return fmt.Errorf("failed to obtain node syncing status: %w", err)
err = fmt.Errorf("failed to obtain node syncing status: %w", err)
return gc.checkIsWithinTolerance(err)
}

if nodeSyncingResp == nil {
gc.log.Error(clNilResponseErrMsg,
zap.String("api", "NodeSyncing"),
)
recordBeaconClientStatus(ctx, statusUnknown, gc.client.Address())
return fmt.Errorf("node syncing response is nil")
err = fmt.Errorf("node syncing response is nil")
return gc.checkIsWithinTolerance(err)
}
if nodeSyncingResp.Data == nil {
gc.log.Error(clNilResponseDataErrMsg,
zap.String("api", "NodeSyncing"),
)
recordBeaconClientStatus(ctx, statusUnknown, gc.client.Address())
return fmt.Errorf("node syncing data is nil")
err = fmt.Errorf("node syncing data is nil")
return gc.checkIsWithinTolerance(err)
}
syncState := nodeSyncingResp.Data
recordBeaconClientStatus(ctx, statusSyncing, gc.client.Address())
recordSyncDistance(ctx, syncState.SyncDistance, gc.client.Address())

// TODO: also check if syncState.ElOffline when github.com/attestantio/go-eth2-client supports it
if syncState.IsSyncing && syncState.SyncDistance > gc.syncDistanceTolerance {
gc.log.Error("Consensus client is not synced")
return errSyncing
if syncState.IsSyncing {
// check within the allowed distance
if syncState.SyncDistance > gc.syncDistanceTolerance {
gc.log.Error("Consensus client is not synced")
return errSyncing
}
unhealthyDuration := time.Since(gc.lastHealthy)
// within the distance but out of time
if unhealthyDuration > unhealthyDurationTolerance {
return fmt.Errorf("not synced for too long (%d): %w", unhealthyDuration, errSyncing)
}
}

if syncState.IsOptimistic {
gc.log.Error("Consensus client is in optimistic mode")
return fmt.Errorf("optimistic")
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure how to treat this optimistic case @moshe-blox


gc.lastHealthy = time.Now()

recordBeaconClientStatus(ctx, statusSynced, gc.client.Address())

return nil
Expand Down
73 changes: 73 additions & 0 deletions beacon/goclient/goclient_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package goclient
import (
"context"
"encoding/json"
"errors"
"fmt"
"net/http"
"net/http/httptest"
Expand All @@ -18,6 +19,7 @@ import (
"github.com/ssvlabs/ssv/operator/slotticker"
"github.com/ssvlabs/ssv/protocol/v2/blockchain/beacon"
registrystorage "github.com/ssvlabs/ssv/registry/storage"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
)
Expand Down Expand Up @@ -68,6 +70,77 @@ func TestHealthy(t *testing.T) {
err = client.Healthy(ctx)
require.NoError(t, err)
})

t.Run("within distance/time allowance", func(t *testing.T) {
lh := time.Now().Add(-59 * time.Second)
client.lastHealthy = lh
client.nodeSyncingFn = func(ctx context.Context, opts *api.NodeSyncingOpts) (*api.Response[*v1.SyncState], error) {
r := new(api.Response[*v1.SyncState])
r.Data = &v1.SyncState{
IsSyncing: true,
}
return r, nil
}

err = client.Healthy(ctx)
require.NoError(t, err)
assert.True(t, client.lastHealthy.After(lh))
})

t.Run("outside time allowance", func(t *testing.T) {
lh := time.Now().Add(-time.Minute)
client.lastHealthy = lh
client.nodeSyncingFn = func(ctx context.Context, opts *api.NodeSyncingOpts) (*api.Response[*v1.SyncState], error) {
r := new(api.Response[*v1.SyncState])
r.Data = &v1.SyncState{
IsSyncing: true,
}
return r, nil
}

err = client.Healthy(ctx)
require.ErrorIs(t, err, errSyncing)
assert.True(t, client.lastHealthy == lh)
})

// error suppression
t.Run("sync error is propagated if outside of time limits", func(t *testing.T) {
syncErr := errors.New("sync err")
client.lastHealthy = time.Now().Add(-61 * time.Second)
client.nodeSyncingFn = func(ctx context.Context, opts *api.NodeSyncingOpts) (*api.Response[*v1.SyncState], error) {
return nil, syncErr
}

err = client.Healthy(ctx)
require.ErrorIs(t, err, syncErr)
})
t.Run("sync error suppressed if within time limits", func(t *testing.T) {
client.lastHealthy = time.Now().Add(-59 * time.Second)
client.nodeSyncingFn = func(ctx context.Context, opts *api.NodeSyncingOpts) (*api.Response[*v1.SyncState], error) {
return nil, errors.New("some err")
}

err = client.Healthy(ctx)
require.NoError(t, err)
})
t.Run("sync nil response err suppressed if within time limits", func(t *testing.T) {
client.lastHealthy = time.Now().Add(-59 * time.Second)
client.nodeSyncingFn = func(ctx context.Context, opts *api.NodeSyncingOpts) (*api.Response[*v1.SyncState], error) {
return nil, nil
}

err = client.Healthy(ctx)
require.NoError(t, err)
})
t.Run("sync nil response data err suppressed if within time ", func(t *testing.T) {
client.lastHealthy = time.Now().Add(-59 * time.Second)
client.nodeSyncingFn = func(ctx context.Context, opts *api.NodeSyncingOpts) (*api.Response[*v1.SyncState], error) {
return new(api.Response[*v1.SyncState]), nil
}

err = client.Healthy(ctx)
require.NoError(t, err)
})
}

func TestTimeouts(t *testing.T) {
Expand Down
20 changes: 19 additions & 1 deletion eth/executionclient/execution_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,15 @@ type ExecutionClient struct {

syncDistanceTolerance uint64
syncProgressFn func(context.Context) (*ethereum.SyncProgress, error)
lastHealthy time.Time

// variables
client *ethclient.Client
closed chan struct{}
}

const unhealthyDurationTolerance = 1 * time.Minute

// New creates a new instance of ExecutionClient.
func New(ctx context.Context, nodeAddr string, contractAddr ethcommon.Address, opts ...Option) (*ExecutionClient, error) {
client := &ExecutionClient{
Expand All @@ -67,6 +70,7 @@ func New(ctx context.Context, nodeAddr string, contractAddr ethcommon.Address, o
reconnectionMaxInterval: DefaultReconnectionMaxInterval,
logBatchSize: DefaultHistoricalLogsBatchSize, // TODO Make batch of logs adaptive depending on "websocket: read limit"
closed: make(chan struct{}),
lastHealthy: time.Now(),
}
for _, opt := range opts {
opt(client)
Expand Down Expand Up @@ -256,7 +260,14 @@ func (ec *ExecutionClient) Healthy(ctx context.Context) error {
ec.logger.Error(elResponseErrMsg,
zap.String("method", "eth_syncing"),
zap.Error(err))
return err

unhealthyDuration := time.Since(ec.lastHealthy)
if unhealthyDuration < unhealthyDurationTolerance {
// override error if we're in the tolerance window
return nil
}

return errors.Join(fmt.Errorf("check sync progress: %w", err), errSyncing)
}
recordRequestDuration(ctx, ec.nodeAddr, time.Since(start))

Expand All @@ -271,6 +282,13 @@ func (ec *ExecutionClient) Healthy(ctx context.Context) error {
if syncDistance > ec.syncDistanceTolerance {
return fmt.Errorf("sync distance exceeds tolerance (%d): %w", syncDistance, errSyncing)
}
} else {
ec.lastHealthy = time.Now()
}

unhealthyDuration := time.Since(ec.lastHealthy)
if unhealthyDuration > unhealthyDurationTolerance {
return fmt.Errorf("not synced for too long (%d): %w", unhealthyDuration, errSyncing)
}

recordExecutionClientStatus(ctx, statusReady, ec.nodeAddr)
Expand Down
63 changes: 60 additions & 3 deletions eth/executionclient/execution_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package executionclient

import (
"context"
"errors"
"math/big"
"net/http/httptest"
"strings"
Expand Down Expand Up @@ -605,7 +606,7 @@ func TestSimSSV(t *testing.T) {
require.NoError(t, sim.Close())
}

func TestSyncProgress(t *testing.T) {
func TestHealthy(t *testing.T) {
const testTimeout = 1 * time.Second
ctx, cancel := context.WithTimeout(context.Background(), testTimeout)
defer cancel()
Expand Down Expand Up @@ -643,7 +644,7 @@ func TestSyncProgress(t *testing.T) {
err = client.Healthy(ctx)
require.NoError(t, err)

t.Run("out of sync", func(t *testing.T) {
t.Run("distance out of sync", func(t *testing.T) {
client.syncProgressFn = func(context.Context) (*ethereum.SyncProgress, error) {
p := new(ethereum.SyncProgress)
p.CurrentBlock = 5
Expand All @@ -655,7 +656,7 @@ func TestSyncProgress(t *testing.T) {
require.ErrorIs(t, err, errSyncing)
})

t.Run("within tolerable limits", func(t *testing.T) {
t.Run("sync distance within tolerable limits", func(t *testing.T) {
client, err := New(ctx, addr, contractAddr, WithSyncDistanceTolerance(2))
require.NoError(t, err)

Expand All @@ -669,6 +670,62 @@ func TestSyncProgress(t *testing.T) {
err = client.Healthy(ctx)
require.NoError(t, err)
})

t.Run("overrides error if within time tolerance", func(t *testing.T) {
client, err := New(ctx, addr, contractAddr)
require.NoError(t, err)

client.syncProgressFn = func(context.Context) (*ethereum.SyncProgress, error) {
return nil, errors.New("connection refused")
}

err = client.Healthy(ctx)
require.NoError(t, err)
})

t.Run("propagates error if outside of time tolerance", func(t *testing.T) {
client, err := New(ctx, addr, contractAddr)
require.NoError(t, err)

client.syncProgressFn = func(context.Context) (*ethereum.SyncProgress, error) {
return nil, errors.New("connection refused")
}

client.lastHealthy = time.Now().Add(-61 * time.Second)
err = client.Healthy(ctx)
require.ErrorIs(t, err, errSyncing)
})

t.Run("within block distance but outside of time tolerance", func(t *testing.T) {
client, err := New(ctx, addr, contractAddr)
require.NoError(t, err)

client.syncProgressFn = func(context.Context) (*ethereum.SyncProgress, error) {
p := new(ethereum.SyncProgress)
return p, nil
}

client.lastHealthy = time.Now().Add(-61 * time.Second)
err = client.Healthy(ctx)
require.ErrorIs(t, err, errSyncing)
})

t.Run("overwrites checkpoint on OK response", func(t *testing.T) {
client, err := New(ctx, addr, contractAddr)
require.NoError(t, err)

old := client.lastHealthy

client.syncProgressFn = func(context.Context) (*ethereum.SyncProgress, error) {
time.Sleep(time.Millisecond)
return nil, nil
}

err = client.Healthy(ctx)
require.NoError(t, err)

require.True(t, client.lastHealthy.After(old))
})
}

func httpToWebSocketURL(url string) string {
Expand Down
Loading