diff --git a/pkg/cannon/cannon.go b/pkg/cannon/cannon.go index 2d0b3144..2c3b8ab8 100644 --- a/pkg/cannon/cannon.go +++ b/pkg/cannon/cannon.go @@ -14,6 +14,7 @@ import ( //nolint:gosec // only exposed if pprofAddr config is set _ "net/http/pprof" + "github.com/attestantio/go-eth2-client/spec/phase0" "github.com/beevik/ntp" aBlockprint "github.com/ethpandaops/xatu/pkg/cannon/blockprint" "github.com/ethpandaops/xatu/pkg/cannon/coordinator" @@ -557,14 +558,11 @@ func (c *Cannon) startBeaconBlockProcessor(ctx context.Context) error { return c.handleNewDecoratedEvents(ctx, events) }) - c.log. - WithField("deriver", deriver.Name()). - WithField("type", deriver.CannonType()). - Info("Starting cannon event deriver") - - if err := deriver.Start(ctx); err != nil { - return err - } + go func() { + if err := c.startDeriverWhenReady(ctx, d); err != nil { + c.log.WithError(err).Error("Failed to start deriver") + } + }() } return nil @@ -572,3 +570,48 @@ func (c *Cannon) startBeaconBlockProcessor(ctx context.Context) error { return nil } + +func (c *Cannon) startDeriverWhenReady(ctx context.Context, d deriver.EventDeriver) error { + for { + // Handle derivers that require phase0, since its not actually a fork it'll never appear + // in the spec. + if d.ActivationFork() != ethereum.ForkNamePhase0 { + spec, err := c.beacon.Node().Spec() + if err != nil { + c.log.WithError(err).Error("Failed to get spec") + + time.Sleep(5 * time.Second) + + continue + } + + slot := c.beacon.Node().Wallclock().Slots().Current() + + fork, err := spec.ForkEpochs.GetByName(d.ActivationFork()) + if err != nil { + c.log.WithError(err).Errorf("unknown activation fork: %s", d.ActivationFork()) + + time.Sleep(5 * time.Second) + + continue + } + + if !fork.Active(phase0.Slot(slot.Number()), spec.SlotsPerEpoch) { + // Sleep until the next epochl and then retrty + c.log.Debug("Derived epoch is not active yet, sleeping until next epoch") + + epoch := c.beacon.Metadata().Wallclock().Epochs().Current() + + time.Sleep(time.Until(epoch.TimeWindow().End())) + + continue + } + } + + c.log. + WithField("deriver", d.Name()). + Info("Starting cannon event deriver") + + return d.Start(ctx) + } +} diff --git a/pkg/cannon/deriver/beacon/eth/v1/beacon_blob.go b/pkg/cannon/deriver/beacon/eth/v1/beacon_blob.go index 40fa79e1..87f5a618 100644 --- a/pkg/cannon/deriver/beacon/eth/v1/beacon_blob.go +++ b/pkg/cannon/deriver/beacon/eth/v1/beacon_blob.go @@ -31,7 +31,7 @@ const ( ) type BeaconBlobDeriverConfig struct { - Enabled bool `yaml:"enabled" default:"false"` + Enabled bool `yaml:"enabled" default:"true"` } type BeaconBlobDeriver struct { @@ -57,6 +57,10 @@ func (b *BeaconBlobDeriver) CannonType() xatu.CannonType { return BeaconBlobDeriverName } +func (b *BeaconBlobDeriver) ActivationFork() string { + return ethereum.ForkNameDeneb +} + func (b *BeaconBlobDeriver) Name() string { return BeaconBlobDeriverName.String() } @@ -75,7 +79,7 @@ func (b *BeaconBlobDeriver) Start(ctx context.Context) error { b.log.Info("Beacon blob deriver enabled") // Start our main loop - go b.run(ctx) + b.run(ctx) return nil } diff --git a/pkg/cannon/deriver/beacon/eth/v1/proposer_duty.go b/pkg/cannon/deriver/beacon/eth/v1/proposer_duty.go index 3370c5a0..0773e529 100644 --- a/pkg/cannon/deriver/beacon/eth/v1/proposer_duty.go +++ b/pkg/cannon/deriver/beacon/eth/v1/proposer_duty.go @@ -56,6 +56,10 @@ func (b *ProposerDutyDeriver) CannonType() xatu.CannonType { return ProposerDutyDeriverName } +func (b *ProposerDutyDeriver) ActivationFork() string { + return ethereum.ForkNamePhase0 +} + func (b *ProposerDutyDeriver) Name() string { return ProposerDutyDeriverName.String() } @@ -74,7 +78,7 @@ func (b *ProposerDutyDeriver) Start(ctx context.Context) error { b.log.Info("Proposer duty deriver enabled") // Start our main loop - go b.run(ctx) + b.run(ctx) return nil } diff --git a/pkg/cannon/deriver/beacon/eth/v2/attester_slashing.go b/pkg/cannon/deriver/beacon/eth/v2/attester_slashing.go index 464d6e71..56a9f28a 100644 --- a/pkg/cannon/deriver/beacon/eth/v2/attester_slashing.go +++ b/pkg/cannon/deriver/beacon/eth/v2/attester_slashing.go @@ -54,6 +54,10 @@ func (a *AttesterSlashingDeriver) CannonType() xatu.CannonType { return AttesterSlashingDeriverName } +func (a *AttesterSlashingDeriver) ActivationFork() string { + return ethereum.ForkNamePhase0 +} + func (a *AttesterSlashingDeriver) Name() string { return AttesterSlashingDeriverName.String() } diff --git a/pkg/cannon/deriver/beacon/eth/v2/beacon_block.go b/pkg/cannon/deriver/beacon/eth/v2/beacon_block.go index bd074ca8..9d223ff3 100644 --- a/pkg/cannon/deriver/beacon/eth/v2/beacon_block.go +++ b/pkg/cannon/deriver/beacon/eth/v2/beacon_block.go @@ -59,6 +59,10 @@ func (b *BeaconBlockDeriver) CannonType() xatu.CannonType { return BeaconBlockDeriverName } +func (b *BeaconBlockDeriver) ActivationFork() string { + return ethereum.ForkNamePhase0 +} + func (b *BeaconBlockDeriver) Name() string { return BeaconBlockDeriverName.String() } @@ -77,7 +81,7 @@ func (b *BeaconBlockDeriver) Start(ctx context.Context) error { b.log.Info("Beacon block deriver enabled") // Start our main loop - go b.run(ctx) + b.run(ctx) return nil } diff --git a/pkg/cannon/deriver/beacon/eth/v2/bls_to_execution_change.go b/pkg/cannon/deriver/beacon/eth/v2/bls_to_execution_change.go index b65df10d..9b57ae57 100644 --- a/pkg/cannon/deriver/beacon/eth/v2/bls_to_execution_change.go +++ b/pkg/cannon/deriver/beacon/eth/v2/bls_to_execution_change.go @@ -64,6 +64,10 @@ func (b *BLSToExecutionChangeDeriver) OnEventsDerived(ctx context.Context, fn fu b.onEventsCallbacks = append(b.onEventsCallbacks, fn) } +func (b *BLSToExecutionChangeDeriver) ActivationFork() string { + return ethereum.ForkNameCapella +} + func (b *BLSToExecutionChangeDeriver) Start(ctx context.Context) error { if !b.cfg.Enabled { b.log.Info("BLS to execution change deriver disabled") @@ -74,7 +78,7 @@ func (b *BLSToExecutionChangeDeriver) Start(ctx context.Context) error { b.log.Info("BLS to execution change deriver enabled") // Start our main loop - go b.run(ctx) + b.run(ctx) return nil } diff --git a/pkg/cannon/deriver/beacon/eth/v2/deposit.go b/pkg/cannon/deriver/beacon/eth/v2/deposit.go index 50457266..6c03eea8 100644 --- a/pkg/cannon/deriver/beacon/eth/v2/deposit.go +++ b/pkg/cannon/deriver/beacon/eth/v2/deposit.go @@ -58,6 +58,10 @@ func (b *DepositDeriver) Name() string { return DepositDeriverName.String() } +func (b *DepositDeriver) ActivationFork() string { + return ethereum.ForkNamePhase0 +} + func (b *DepositDeriver) OnEventsDerived(ctx context.Context, fn func(ctx context.Context, events []*xatu.DecoratedEvent) error) { b.onEventsCallbacks = append(b.onEventsCallbacks, fn) } @@ -72,7 +76,7 @@ func (b *DepositDeriver) Start(ctx context.Context) error { b.log.Info("Deposit deriver enabled") // Start our main loop - go b.run(ctx) + b.run(ctx) return nil } diff --git a/pkg/cannon/deriver/beacon/eth/v2/elaborated_attestation.go b/pkg/cannon/deriver/beacon/eth/v2/elaborated_attestation.go index 45be8dce..98c165ac 100644 --- a/pkg/cannon/deriver/beacon/eth/v2/elaborated_attestation.go +++ b/pkg/cannon/deriver/beacon/eth/v2/elaborated_attestation.go @@ -59,6 +59,10 @@ func (b *ElaboratedAttestationDeriver) Name() string { return ElaboratedAttestationDeriverName.String() } +func (b *ElaboratedAttestationDeriver) ActivationFork() string { + return ethereum.ForkNamePhase0 +} + func (b *ElaboratedAttestationDeriver) OnEventsDerived(ctx context.Context, fn func(ctx context.Context, events []*xatu.DecoratedEvent) error) { b.onEventsCallbacks = append(b.onEventsCallbacks, fn) } @@ -73,7 +77,7 @@ func (b *ElaboratedAttestationDeriver) Start(ctx context.Context) error { b.log.Info("Elaborated attestation deriver enabled") // Start our main loop - go b.run(ctx) + b.run(ctx) return nil } diff --git a/pkg/cannon/deriver/beacon/eth/v2/execution_transaction.go b/pkg/cannon/deriver/beacon/eth/v2/execution_transaction.go index 198da075..fa820b41 100644 --- a/pkg/cannon/deriver/beacon/eth/v2/execution_transaction.go +++ b/pkg/cannon/deriver/beacon/eth/v2/execution_transaction.go @@ -57,6 +57,10 @@ func (b *ExecutionTransactionDeriver) CannonType() xatu.CannonType { return ExecutionTransactionDeriverName } +func (b *ExecutionTransactionDeriver) ActivationFork() string { + return ethereum.ForkNameBellatrix +} + func (b *ExecutionTransactionDeriver) Name() string { return ExecutionTransactionDeriverName.String() } @@ -75,7 +79,7 @@ func (b *ExecutionTransactionDeriver) Start(ctx context.Context) error { b.log.Info("Execution transaction deriver enabled") // Start our main loop - go b.run(ctx) + b.run(ctx) return nil } diff --git a/pkg/cannon/deriver/beacon/eth/v2/proposer_slashing.go b/pkg/cannon/deriver/beacon/eth/v2/proposer_slashing.go index 5d0086a1..72aeaf8f 100644 --- a/pkg/cannon/deriver/beacon/eth/v2/proposer_slashing.go +++ b/pkg/cannon/deriver/beacon/eth/v2/proposer_slashing.go @@ -58,6 +58,10 @@ func (b *ProposerSlashingDeriver) Name() string { return ProposerSlashingDeriverName.String() } +func (b *ProposerSlashingDeriver) ActivationFork() string { + return ethereum.ForkNamePhase0 +} + func (b *ProposerSlashingDeriver) OnEventsDerived(ctx context.Context, fn func(ctx context.Context, events []*xatu.DecoratedEvent) error) { b.onEventsCallbacks = append(b.onEventsCallbacks, fn) } @@ -72,7 +76,7 @@ func (b *ProposerSlashingDeriver) Start(ctx context.Context) error { b.log.Info("Proposer slashing deriver enabled") // Start our main loop - go b.run(ctx) + b.run(ctx) return nil } diff --git a/pkg/cannon/deriver/beacon/eth/v2/voluntary_exit.go b/pkg/cannon/deriver/beacon/eth/v2/voluntary_exit.go index f028bab7..270aa48e 100644 --- a/pkg/cannon/deriver/beacon/eth/v2/voluntary_exit.go +++ b/pkg/cannon/deriver/beacon/eth/v2/voluntary_exit.go @@ -54,6 +54,10 @@ func (b *VoluntaryExitDeriver) CannonType() xatu.CannonType { return VoluntaryExitDeriverName } +func (b *VoluntaryExitDeriver) ActivationFork() string { + return ethereum.ForkNamePhase0 +} + func (b *VoluntaryExitDeriver) Name() string { return VoluntaryExitDeriverName.String() } @@ -72,7 +76,7 @@ func (b *VoluntaryExitDeriver) Start(ctx context.Context) error { b.log.Info("Voluntary exit deriver enabled") // Start our main loop - go b.run(ctx) + b.run(ctx) return nil } diff --git a/pkg/cannon/deriver/beacon/eth/v2/withdrawal.go b/pkg/cannon/deriver/beacon/eth/v2/withdrawal.go index 77ecf156..6ee25042 100644 --- a/pkg/cannon/deriver/beacon/eth/v2/withdrawal.go +++ b/pkg/cannon/deriver/beacon/eth/v2/withdrawal.go @@ -58,6 +58,10 @@ func (b *WithdrawalDeriver) Name() string { return WithdrawalDeriverName.String() } +func (b *WithdrawalDeriver) ActivationFork() string { + return ethereum.ForkNameCapella +} + func (b *WithdrawalDeriver) OnEventsDerived(ctx context.Context, fn func(ctx context.Context, events []*xatu.DecoratedEvent) error) { b.onEventsCallbacks = append(b.onEventsCallbacks, fn) } @@ -72,7 +76,7 @@ func (b *WithdrawalDeriver) Start(ctx context.Context) error { b.log.Info("Withdrawal deriver enabled") // Start our main loop - go b.run(ctx) + b.run(ctx) return nil } diff --git a/pkg/cannon/deriver/blockprint/block_classification.go b/pkg/cannon/deriver/blockprint/block_classification.go index 46f742bc..2f96b70d 100644 --- a/pkg/cannon/deriver/blockprint/block_classification.go +++ b/pkg/cannon/deriver/blockprint/block_classification.go @@ -67,6 +67,10 @@ func (b *BlockClassificationDeriver) CannonType() xatu.CannonType { return BlockClassificationName } +func (b *BlockClassificationDeriver) ActivationFork() string { + return ethereum.ForkNamePhase0 +} + func (b *BlockClassificationDeriver) Name() string { return BlockClassificationName.String() } @@ -85,7 +89,7 @@ func (b *BlockClassificationDeriver) Start(ctx context.Context) error { b.log.Info("BlockClassification deriver enabled") // Start our main loop - go b.run(ctx) + b.run(ctx) return nil } diff --git a/pkg/cannon/deriver/event_deriver.go b/pkg/cannon/deriver/event_deriver.go index cbf6ec0a..a4853d33 100644 --- a/pkg/cannon/deriver/event_deriver.go +++ b/pkg/cannon/deriver/event_deriver.go @@ -3,6 +3,7 @@ package deriver import ( "context" + v1 "github.com/ethpandaops/xatu/pkg/cannon/deriver/beacon/eth/v1" v2 "github.com/ethpandaops/xatu/pkg/cannon/deriver/beacon/eth/v2" "github.com/ethpandaops/xatu/pkg/cannon/deriver/blockprint" "github.com/ethpandaops/xatu/pkg/proto/xatu" @@ -15,6 +16,8 @@ type EventDeriver interface { CannonType() xatu.CannonType // Callbacks OnEventsDerived(ctx context.Context, fn func(ctx context.Context, events []*xatu.DecoratedEvent) error) + // ActivationFork is the fork at which the deriver should start deriving events + ActivationFork() string } // Ensure that derivers implements the EventDeriver interface @@ -28,3 +31,5 @@ var _ EventDeriver = &v2.WithdrawalDeriver{} var _ EventDeriver = &v2.BeaconBlockDeriver{} var _ EventDeriver = &blockprint.BlockClassificationDeriver{} var _ EventDeriver = &v2.ElaboratedAttestationDeriver{} +var _ EventDeriver = &v1.ProposerDutyDeriver{} +var _ EventDeriver = &v1.BeaconBlobDeriver{} diff --git a/pkg/cannon/ethereum/forks.go b/pkg/cannon/ethereum/forks.go new file mode 100644 index 00000000..2fd6d1d5 --- /dev/null +++ b/pkg/cannon/ethereum/forks.go @@ -0,0 +1,10 @@ +package ethereum + +var ( + ForkNamePhase0 = "PHASE0" + ForkNameAltair = "ALTAIR" + ForkNameBellatrix = "BELLATRIX" + ForkNameCapella = "CAPELLA" + ForkNameDeneb = "DENEB" + ForkNameElectra = "ELECTRA" +)