From 2ceac413e76196bdf069fbb8d0de4a08bc52461e Mon Sep 17 00:00:00 2001 From: Sam Calder-Mason Date: Wed, 6 Sep 2023 13:34:40 +1000 Subject: [PATCH] fix(cannon): Check sync status before deriving events (#182) --- pkg/cannon/cannon.go | 4 ---- pkg/cannon/deriver/beacon/eth/v2/attester_slashing.go | 4 ++++ pkg/cannon/deriver/beacon/eth/v2/bls_to_execution_change.go | 4 ++++ pkg/cannon/deriver/beacon/eth/v2/deposit.go | 4 ++++ pkg/cannon/deriver/beacon/eth/v2/execution_transaction.go | 4 ++++ pkg/cannon/deriver/beacon/eth/v2/proposer_slashing.go | 4 ++++ pkg/cannon/deriver/beacon/eth/v2/voluntary_exit.go | 4 ++++ pkg/cannon/ethereum/beacon.go | 4 +++- 8 files changed, 27 insertions(+), 5 deletions(-) diff --git a/pkg/cannon/cannon.go b/pkg/cannon/cannon.go index fb424ae7..6cfd7d88 100644 --- a/pkg/cannon/cannon.go +++ b/pkg/cannon/cannon.go @@ -247,10 +247,6 @@ func (c *Cannon) syncClockDrift(ctx context.Context) error { } func (c *Cannon) handleNewDecoratedEvent(ctx context.Context, event *xatu.DecoratedEvent) error { - if err := c.beacon.Synced(ctx); err != nil { - return err - } - for _, sink := range c.sinks { if err := sink.HandleNewDecoratedEvent(ctx, event); err != nil { c.log. diff --git a/pkg/cannon/deriver/beacon/eth/v2/attester_slashing.go b/pkg/cannon/deriver/beacon/eth/v2/attester_slashing.go index d8d85d25..94a233ae 100644 --- a/pkg/cannon/deriver/beacon/eth/v2/attester_slashing.go +++ b/pkg/cannon/deriver/beacon/eth/v2/attester_slashing.go @@ -95,6 +95,10 @@ func (a *AttesterSlashingDeriver) run(ctx context.Context) { operation := func() error { time.Sleep(100 * time.Millisecond) + if err := a.beacon.Synced(ctx); err != nil { + return err + } + // Get the next slot location, err := a.iterator.Next(ctx) if err != nil { diff --git a/pkg/cannon/deriver/beacon/eth/v2/bls_to_execution_change.go b/pkg/cannon/deriver/beacon/eth/v2/bls_to_execution_change.go index e2d6147c..43e21e8e 100644 --- a/pkg/cannon/deriver/beacon/eth/v2/bls_to_execution_change.go +++ b/pkg/cannon/deriver/beacon/eth/v2/bls_to_execution_change.go @@ -98,6 +98,10 @@ func (b *BLSToExecutionChangeDeriver) run(ctx context.Context) { operation := func() error { time.Sleep(100 * time.Millisecond) + if err := b.beacon.Synced(ctx); err != nil { + return err + } + // Get the next slot location, err := b.iterator.Next(ctx) if err != nil { diff --git a/pkg/cannon/deriver/beacon/eth/v2/deposit.go b/pkg/cannon/deriver/beacon/eth/v2/deposit.go index cc05f200..9dd47724 100644 --- a/pkg/cannon/deriver/beacon/eth/v2/deposit.go +++ b/pkg/cannon/deriver/beacon/eth/v2/deposit.go @@ -96,6 +96,10 @@ func (b *DepositDeriver) run(ctx context.Context) { operation := func() error { time.Sleep(100 * time.Millisecond) + if err := b.beacon.Synced(ctx); err != nil { + return err + } + // Get the next slot location, err := b.iterator.Next(ctx) if err != nil { diff --git a/pkg/cannon/deriver/beacon/eth/v2/execution_transaction.go b/pkg/cannon/deriver/beacon/eth/v2/execution_transaction.go index 53c31ab7..8f021d59 100644 --- a/pkg/cannon/deriver/beacon/eth/v2/execution_transaction.go +++ b/pkg/cannon/deriver/beacon/eth/v2/execution_transaction.go @@ -99,6 +99,10 @@ func (b *ExecutionTransactionDeriver) run(ctx context.Context) { operation := func() error { time.Sleep(100 * time.Millisecond) + if err := b.beacon.Synced(ctx); err != nil { + return err + } + // Get the next slot location, err := b.iterator.Next(ctx) if err != nil { diff --git a/pkg/cannon/deriver/beacon/eth/v2/proposer_slashing.go b/pkg/cannon/deriver/beacon/eth/v2/proposer_slashing.go index bac6ff87..40257d7c 100644 --- a/pkg/cannon/deriver/beacon/eth/v2/proposer_slashing.go +++ b/pkg/cannon/deriver/beacon/eth/v2/proposer_slashing.go @@ -95,6 +95,10 @@ func (b *ProposerSlashingDeriver) run(ctx context.Context) { operation := func() error { time.Sleep(100 * time.Millisecond) + if err := b.beacon.Synced(ctx); err != nil { + return err + } + // Get the next slot location, err := b.iterator.Next(ctx) if err != nil { diff --git a/pkg/cannon/deriver/beacon/eth/v2/voluntary_exit.go b/pkg/cannon/deriver/beacon/eth/v2/voluntary_exit.go index 6bd50cd8..a23158a3 100644 --- a/pkg/cannon/deriver/beacon/eth/v2/voluntary_exit.go +++ b/pkg/cannon/deriver/beacon/eth/v2/voluntary_exit.go @@ -96,6 +96,10 @@ func (b *VoluntaryExitDeriver) run(ctx context.Context) { operation := func() error { time.Sleep(100 * time.Millisecond) + if err := b.beacon.Synced(ctx); err != nil { + return err + } + // Get the next slot location, err := b.iterator.Next(ctx) if err != nil { diff --git a/pkg/cannon/ethereum/beacon.go b/pkg/cannon/ethereum/beacon.go index 6b29d3dd..29d8c588 100644 --- a/pkg/cannon/ethereum/beacon.go +++ b/pkg/cannon/ethereum/beacon.go @@ -33,12 +33,14 @@ type BeaconNode struct { func NewBeaconNode(ctx context.Context, name string, config *Config, log logrus.FieldLogger) (*BeaconNode, error) { opts := *beacon. DefaultOptions(). - EnableDefaultBeaconSubscription(). + DisableEmptySlotDetection(). DisablePrometheusMetrics() opts.HealthCheck.Interval.Duration = time.Second * 3 opts.HealthCheck.SuccessfulResponses = 1 + opts.BeaconSubscription.Enabled = false + node := beacon.NewNode(log, &beacon.Config{ Name: name, Addr: config.BeaconNodeAddress,