Skip to content

Commit

Permalink
Merge branch 'master' into feat/inclusion-distance
Browse files Browse the repository at this point in the history
  • Loading branch information
samcm committed Jan 12, 2024
2 parents 626a4f9 + 7d9507e commit 015ff43
Show file tree
Hide file tree
Showing 15 changed files with 157 additions and 20 deletions.
90 changes: 82 additions & 8 deletions pkg/cannon/cannon.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@ import (
//nolint:gosec // only exposed if pprofAddr config is set
_ "net/http/pprof"

"github.com/attestantio/go-eth2-client/spec/phase0"
"github.com/beevik/ntp"
"github.com/ethpandaops/ethwallclock"
aBlockprint "github.com/ethpandaops/xatu/pkg/cannon/blockprint"
"github.com/ethpandaops/xatu/pkg/cannon/coordinator"
"github.com/ethpandaops/xatu/pkg/cannon/deriver"
Expand Down Expand Up @@ -550,25 +552,97 @@ func (c *Cannon) startBeaconBlockProcessor(ctx context.Context) error {

c.eventDerivers = eventDerivers

// Refresh the spec every epoch
c.beacon.Metadata().Wallclock().OnEpochChanged(func(current ethwallclock.Epoch) {
_, err := c.beacon.Node().FetchSpec(ctx)
if err != nil {
c.log.WithError(err).Error("Failed to refresh spec")
}
})

for _, deriver := range c.eventDerivers {
d := deriver

d.OnEventsDerived(ctx, func(ctx context.Context, events []*xatu.DecoratedEvent) error {
return c.handleNewDecoratedEvents(ctx, events)
})

c.log.
WithField("deriver", deriver.Name()).
WithField("type", deriver.CannonType()).
Info("Starting cannon event deriver")

if err := deriver.Start(ctx); err != nil {
return err
}
go func() {
if err := c.startDeriverWhenReady(ctx, d); err != nil {
c.log.WithError(err).Error("Failed to start deriver")
}
}()
}

return nil
})

return nil
}

func (c *Cannon) startDeriverWhenReady(ctx context.Context, d deriver.EventDeriver) error {
for {
// Handle derivers that require phase0, since its not actually a fork it'll never appear
// in the spec.
if d.ActivationFork() != ethereum.ForkNamePhase0 {
spec, err := c.beacon.Node().Spec()
if err != nil {
c.log.WithError(err).Error("Failed to get spec")

time.Sleep(5 * time.Second)

continue
}

slot := c.beacon.Node().Wallclock().Slots().Current()

fork, err := spec.ForkEpochs.GetByName(d.ActivationFork())
if err != nil {
c.log.WithError(err).Errorf("unknown activation fork: %s", d.ActivationFork())

epoch := c.beacon.Metadata().Wallclock().Epochs().Current()

time.Sleep(time.Until(epoch.TimeWindow().End()))

continue
}

if !fork.Active(phase0.Slot(slot.Number()), spec.SlotsPerEpoch) {
// Sleep until the next epochl and then retrty
currentEpoch := c.beacon.Metadata().Wallclock().Epochs().Current()

activationForkEpoch := c.beacon.Node().Wallclock().Epochs().FromNumber(uint64(fork.Epoch))

sleepFor := time.Until(activationForkEpoch.TimeWindow().End())

if activationForkEpoch.Number()-currentEpoch.Number() > 100000 {
// If the fork epoch is over 100k epochs away we are most likely dealing with a
// placeholder fork epoch. We should sleep until the end of the current fork epoch and then
// wait for the spec to refresh. This gives the beacon node a chance to give us the real
// fork epoch once its scheduled.
sleepFor = time.Until(currentEpoch.TimeWindow().End())
}

c.log.
WithField("current_epoch", currentEpoch.Number()).
WithField("activation_fork_name", d.ActivationFork()).
WithField("activation_fork_epoch", fork.Epoch).
WithField("estimated_time_until_fork", time.Until(
activationForkEpoch.TimeWindow().Start(),
)).
WithField("check_again_in", sleepFor).
Warn("Deriver required fork is not active yet")

time.Sleep(sleepFor)

continue
}
}

c.log.
WithField("deriver", d.Name()).
Info("Starting cannon event deriver")

return d.Start(ctx)
}
}
8 changes: 6 additions & 2 deletions pkg/cannon/deriver/beacon/eth/v1/beacon_blob.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ const (
)

type BeaconBlobDeriverConfig struct {
Enabled bool `yaml:"enabled" default:"false"`
Enabled bool `yaml:"enabled" default:"true"`
}

type BeaconBlobDeriver struct {
Expand All @@ -57,6 +57,10 @@ func (b *BeaconBlobDeriver) CannonType() xatu.CannonType {
return BeaconBlobDeriverName
}

func (b *BeaconBlobDeriver) ActivationFork() string {
return ethereum.ForkNameDeneb
}

func (b *BeaconBlobDeriver) Name() string {
return BeaconBlobDeriverName.String()
}
Expand All @@ -75,7 +79,7 @@ func (b *BeaconBlobDeriver) Start(ctx context.Context) error {
b.log.Info("Beacon blob deriver enabled")

// Start our main loop
go b.run(ctx)
b.run(ctx)

return nil
}
Expand Down
6 changes: 5 additions & 1 deletion pkg/cannon/deriver/beacon/eth/v1/proposer_duty.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,10 @@ func (b *ProposerDutyDeriver) CannonType() xatu.CannonType {
return ProposerDutyDeriverName
}

func (b *ProposerDutyDeriver) ActivationFork() string {
return ethereum.ForkNamePhase0
}

func (b *ProposerDutyDeriver) Name() string {
return ProposerDutyDeriverName.String()
}
Expand All @@ -74,7 +78,7 @@ func (b *ProposerDutyDeriver) Start(ctx context.Context) error {
b.log.Info("Proposer duty deriver enabled")

// Start our main loop
go b.run(ctx)
b.run(ctx)

return nil
}
Expand Down
4 changes: 4 additions & 0 deletions pkg/cannon/deriver/beacon/eth/v2/attester_slashing.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,10 @@ func (a *AttesterSlashingDeriver) CannonType() xatu.CannonType {
return AttesterSlashingDeriverName
}

func (a *AttesterSlashingDeriver) ActivationFork() string {
return ethereum.ForkNamePhase0
}

func (a *AttesterSlashingDeriver) Name() string {
return AttesterSlashingDeriverName.String()
}
Expand Down
6 changes: 5 additions & 1 deletion pkg/cannon/deriver/beacon/eth/v2/beacon_block.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,10 @@ func (b *BeaconBlockDeriver) CannonType() xatu.CannonType {
return BeaconBlockDeriverName
}

func (b *BeaconBlockDeriver) ActivationFork() string {
return ethereum.ForkNamePhase0
}

func (b *BeaconBlockDeriver) Name() string {
return BeaconBlockDeriverName.String()
}
Expand All @@ -77,7 +81,7 @@ func (b *BeaconBlockDeriver) Start(ctx context.Context) error {
b.log.Info("Beacon block deriver enabled")

// Start our main loop
go b.run(ctx)
b.run(ctx)

return nil
}
Expand Down
6 changes: 5 additions & 1 deletion pkg/cannon/deriver/beacon/eth/v2/bls_to_execution_change.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,10 @@ func (b *BLSToExecutionChangeDeriver) OnEventsDerived(ctx context.Context, fn fu
b.onEventsCallbacks = append(b.onEventsCallbacks, fn)
}

func (b *BLSToExecutionChangeDeriver) ActivationFork() string {
return ethereum.ForkNameCapella
}

func (b *BLSToExecutionChangeDeriver) Start(ctx context.Context) error {
if !b.cfg.Enabled {
b.log.Info("BLS to execution change deriver disabled")
Expand All @@ -74,7 +78,7 @@ func (b *BLSToExecutionChangeDeriver) Start(ctx context.Context) error {
b.log.Info("BLS to execution change deriver enabled")

// Start our main loop
go b.run(ctx)
b.run(ctx)

return nil
}
Expand Down
6 changes: 5 additions & 1 deletion pkg/cannon/deriver/beacon/eth/v2/deposit.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,10 @@ func (b *DepositDeriver) Name() string {
return DepositDeriverName.String()
}

func (b *DepositDeriver) ActivationFork() string {
return ethereum.ForkNamePhase0
}

func (b *DepositDeriver) OnEventsDerived(ctx context.Context, fn func(ctx context.Context, events []*xatu.DecoratedEvent) error) {
b.onEventsCallbacks = append(b.onEventsCallbacks, fn)
}
Expand All @@ -72,7 +76,7 @@ func (b *DepositDeriver) Start(ctx context.Context) error {
b.log.Info("Deposit deriver enabled")

// Start our main loop
go b.run(ctx)
b.run(ctx)

return nil
}
Expand Down
6 changes: 5 additions & 1 deletion pkg/cannon/deriver/beacon/eth/v2/elaborated_attestation.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,10 @@ func (b *ElaboratedAttestationDeriver) Name() string {
return ElaboratedAttestationDeriverName.String()
}

func (b *ElaboratedAttestationDeriver) ActivationFork() string {
return ethereum.ForkNamePhase0
}

func (b *ElaboratedAttestationDeriver) OnEventsDerived(ctx context.Context, fn func(ctx context.Context, events []*xatu.DecoratedEvent) error) {
b.onEventsCallbacks = append(b.onEventsCallbacks, fn)
}
Expand All @@ -73,7 +77,7 @@ func (b *ElaboratedAttestationDeriver) Start(ctx context.Context) error {
b.log.Info("Elaborated attestation deriver enabled")

// Start our main loop
go b.run(ctx)
b.run(ctx)

return nil
}
Expand Down
6 changes: 5 additions & 1 deletion pkg/cannon/deriver/beacon/eth/v2/execution_transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,10 @@ func (b *ExecutionTransactionDeriver) CannonType() xatu.CannonType {
return ExecutionTransactionDeriverName
}

func (b *ExecutionTransactionDeriver) ActivationFork() string {
return ethereum.ForkNameBellatrix
}

func (b *ExecutionTransactionDeriver) Name() string {
return ExecutionTransactionDeriverName.String()
}
Expand All @@ -75,7 +79,7 @@ func (b *ExecutionTransactionDeriver) Start(ctx context.Context) error {
b.log.Info("Execution transaction deriver enabled")

// Start our main loop
go b.run(ctx)
b.run(ctx)

return nil
}
Expand Down
6 changes: 5 additions & 1 deletion pkg/cannon/deriver/beacon/eth/v2/proposer_slashing.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,10 @@ func (b *ProposerSlashingDeriver) Name() string {
return ProposerSlashingDeriverName.String()
}

func (b *ProposerSlashingDeriver) ActivationFork() string {
return ethereum.ForkNamePhase0
}

func (b *ProposerSlashingDeriver) OnEventsDerived(ctx context.Context, fn func(ctx context.Context, events []*xatu.DecoratedEvent) error) {
b.onEventsCallbacks = append(b.onEventsCallbacks, fn)
}
Expand All @@ -72,7 +76,7 @@ func (b *ProposerSlashingDeriver) Start(ctx context.Context) error {
b.log.Info("Proposer slashing deriver enabled")

// Start our main loop
go b.run(ctx)
b.run(ctx)

return nil
}
Expand Down
6 changes: 5 additions & 1 deletion pkg/cannon/deriver/beacon/eth/v2/voluntary_exit.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,10 @@ func (b *VoluntaryExitDeriver) CannonType() xatu.CannonType {
return VoluntaryExitDeriverName
}

func (b *VoluntaryExitDeriver) ActivationFork() string {
return ethereum.ForkNamePhase0
}

func (b *VoluntaryExitDeriver) Name() string {
return VoluntaryExitDeriverName.String()
}
Expand All @@ -72,7 +76,7 @@ func (b *VoluntaryExitDeriver) Start(ctx context.Context) error {
b.log.Info("Voluntary exit deriver enabled")

// Start our main loop
go b.run(ctx)
b.run(ctx)

return nil
}
Expand Down
6 changes: 5 additions & 1 deletion pkg/cannon/deriver/beacon/eth/v2/withdrawal.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,10 @@ func (b *WithdrawalDeriver) Name() string {
return WithdrawalDeriverName.String()
}

func (b *WithdrawalDeriver) ActivationFork() string {
return ethereum.ForkNameCapella
}

func (b *WithdrawalDeriver) OnEventsDerived(ctx context.Context, fn func(ctx context.Context, events []*xatu.DecoratedEvent) error) {
b.onEventsCallbacks = append(b.onEventsCallbacks, fn)
}
Expand All @@ -72,7 +76,7 @@ func (b *WithdrawalDeriver) Start(ctx context.Context) error {
b.log.Info("Withdrawal deriver enabled")

// Start our main loop
go b.run(ctx)
b.run(ctx)

return nil
}
Expand Down
6 changes: 5 additions & 1 deletion pkg/cannon/deriver/blockprint/block_classification.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,10 @@ func (b *BlockClassificationDeriver) CannonType() xatu.CannonType {
return BlockClassificationName
}

func (b *BlockClassificationDeriver) ActivationFork() string {
return ethereum.ForkNamePhase0
}

func (b *BlockClassificationDeriver) Name() string {
return BlockClassificationName.String()
}
Expand All @@ -85,7 +89,7 @@ func (b *BlockClassificationDeriver) Start(ctx context.Context) error {
b.log.Info("BlockClassification deriver enabled")

// Start our main loop
go b.run(ctx)
b.run(ctx)

return nil
}
Expand Down
5 changes: 5 additions & 0 deletions pkg/cannon/deriver/event_deriver.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package deriver
import (
"context"

v1 "github.com/ethpandaops/xatu/pkg/cannon/deriver/beacon/eth/v1"
v2 "github.com/ethpandaops/xatu/pkg/cannon/deriver/beacon/eth/v2"
"github.com/ethpandaops/xatu/pkg/cannon/deriver/blockprint"
"github.com/ethpandaops/xatu/pkg/proto/xatu"
Expand All @@ -15,6 +16,8 @@ type EventDeriver interface {
CannonType() xatu.CannonType
// Callbacks
OnEventsDerived(ctx context.Context, fn func(ctx context.Context, events []*xatu.DecoratedEvent) error)
// ActivationFork is the fork at which the deriver should start deriving events
ActivationFork() string
}

// Ensure that derivers implements the EventDeriver interface
Expand All @@ -28,3 +31,5 @@ var _ EventDeriver = &v2.WithdrawalDeriver{}
var _ EventDeriver = &v2.BeaconBlockDeriver{}
var _ EventDeriver = &blockprint.BlockClassificationDeriver{}
var _ EventDeriver = &v2.ElaboratedAttestationDeriver{}
var _ EventDeriver = &v1.ProposerDutyDeriver{}
var _ EventDeriver = &v1.BeaconBlobDeriver{}
10 changes: 10 additions & 0 deletions pkg/cannon/ethereum/forks.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package ethereum

var (
ForkNamePhase0 = "PHASE0"
ForkNameAltair = "ALTAIR"
ForkNameBellatrix = "BELLATRIX"
ForkNameCapella = "CAPELLA"
ForkNameDeneb = "DENEB"
ForkNameElectra = "ELECTRA"
)

0 comments on commit 015ff43

Please sign in to comment.