Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

v2.2.0 fix(ec): enable time based out of sync tolerance #1959

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 19 additions & 1 deletion eth/executionclient/execution_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,15 @@ type ExecutionClient struct {

syncDistanceTolerance uint64
syncProgressFn func(context.Context) (*ethereum.SyncProgress, error)
lastHealthy time.Time

// variables
client *ethclient.Client
closed chan struct{}
}

const syncTimeTolerance = 1 * time.Minute
anatolie-ssv marked this conversation as resolved.
Show resolved Hide resolved

// New creates a new instance of ExecutionClient.
func New(ctx context.Context, nodeAddr string, contractAddr ethcommon.Address, opts ...Option) (*ExecutionClient, error) {
client := &ExecutionClient{
Expand All @@ -67,6 +70,7 @@ func New(ctx context.Context, nodeAddr string, contractAddr ethcommon.Address, o
reconnectionMaxInterval: DefaultReconnectionMaxInterval,
logBatchSize: DefaultHistoricalLogsBatchSize, // TODO Make batch of logs adaptive depending on "websocket: read limit"
closed: make(chan struct{}),
lastHealthy: time.Now(),
}
for _, opt := range opts {
opt(client)
Expand Down Expand Up @@ -256,7 +260,14 @@ func (ec *ExecutionClient) Healthy(ctx context.Context) error {
ec.logger.Error(elResponseErrMsg,
zap.String("method", "eth_syncing"),
zap.Error(err))
return err

unhealthyDuration := time.Since(ec.lastHealthy)
if unhealthyDuration < syncTimeTolerance {
// override error if we're in the tolerance window
return nil
}

return errors.Join(fmt.Errorf("check sync progress: %w", err), errSyncing)
}
recordRequestDuration(ctx, ec.nodeAddr, time.Since(start))

Expand All @@ -271,6 +282,13 @@ func (ec *ExecutionClient) Healthy(ctx context.Context) error {
if syncDistance > ec.syncDistanceTolerance {
return fmt.Errorf("sync distance exceeds tolerance (%d): %w", syncDistance, errSyncing)
}
} else {
ec.lastHealthy = time.Now()
}

unhealthyDuration := time.Since(ec.lastHealthy)
if unhealthyDuration > syncTimeTolerance {
return fmt.Errorf("not synced for too long (%d): %w", unhealthyDuration, errSyncing)
}

recordExecutionClientStatus(ctx, statusReady, ec.nodeAddr)
Expand Down
63 changes: 60 additions & 3 deletions eth/executionclient/execution_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package executionclient

import (
"context"
"errors"
"math/big"
"net/http/httptest"
"strings"
Expand Down Expand Up @@ -605,7 +606,7 @@ func TestSimSSV(t *testing.T) {
require.NoError(t, sim.Close())
}

func TestSyncProgress(t *testing.T) {
func TestHealthy(t *testing.T) {
const testTimeout = 1 * time.Second
ctx, cancel := context.WithTimeout(context.Background(), testTimeout)
defer cancel()
Expand Down Expand Up @@ -643,7 +644,7 @@ func TestSyncProgress(t *testing.T) {
err = client.Healthy(ctx)
require.NoError(t, err)

t.Run("out of sync", func(t *testing.T) {
t.Run("distance out of sync", func(t *testing.T) {
client.syncProgressFn = func(context.Context) (*ethereum.SyncProgress, error) {
p := new(ethereum.SyncProgress)
p.CurrentBlock = 5
Expand All @@ -655,7 +656,7 @@ func TestSyncProgress(t *testing.T) {
require.ErrorIs(t, err, errSyncing)
})

t.Run("within tolerable limits", func(t *testing.T) {
t.Run("sync distance within tolerable limits", func(t *testing.T) {
client, err := New(ctx, addr, contractAddr, WithSyncDistanceTolerance(2))
require.NoError(t, err)

Expand All @@ -669,6 +670,62 @@ func TestSyncProgress(t *testing.T) {
err = client.Healthy(ctx)
require.NoError(t, err)
})

t.Run("overrides error if within time tolerance", func(t *testing.T) {
client, err := New(ctx, addr, contractAddr)
require.NoError(t, err)

client.syncProgressFn = func(context.Context) (*ethereum.SyncProgress, error) {
return nil, errors.New("connection refused")
}

err = client.Healthy(ctx)
require.NoError(t, err)
})

t.Run("propagates error if outside of time tolerance", func(t *testing.T) {
client, err := New(ctx, addr, contractAddr)
require.NoError(t, err)

client.syncProgressFn = func(context.Context) (*ethereum.SyncProgress, error) {
return nil, errors.New("connection refused")
}

client.lastHealthy = time.Now().Add(-61 * time.Second)
err = client.Healthy(ctx)
require.ErrorIs(t, err, errSyncing)
})

t.Run("within block distance but outside of time tolerance", func(t *testing.T) {
client, err := New(ctx, addr, contractAddr)
require.NoError(t, err)

client.syncProgressFn = func(context.Context) (*ethereum.SyncProgress, error) {
p := new(ethereum.SyncProgress)
return p, nil
}

client.lastHealthy = time.Now().Add(-61 * time.Second)
err = client.Healthy(ctx)
require.ErrorIs(t, err, errSyncing)
})

t.Run("overwrites checkpoint on OK response", func(t *testing.T) {
client, err := New(ctx, addr, contractAddr)
require.NoError(t, err)

old := client.lastHealthy

client.syncProgressFn = func(context.Context) (*ethereum.SyncProgress, error) {
time.Sleep(time.Millisecond)
return nil, nil
}

err = client.Healthy(ctx)
require.NoError(t, err)

require.True(t, client.lastHealthy.After(old))
})
}

func httpToWebSocketURL(url string) string {
Expand Down
Loading