Skip to content

Commit

Permalink
feat(cannon): Checkpoint iterator (#188)
Browse files Browse the repository at this point in the history
* feat(cannon): Checkpoint iterator

* refactor(checkpoint_iterator): improve sleep duration calculation
  • Loading branch information
samcm authored Sep 13, 2023
1 parent 61c2efe commit 99f2e42
Show file tree
Hide file tree
Showing 12 changed files with 634 additions and 309 deletions.
46 changes: 27 additions & 19 deletions pkg/cannon/cannon.go
Original file line number Diff line number Diff line change
Expand Up @@ -276,101 +276,109 @@ func (c *Cannon) startBeaconBlockProcessor(ctx context.Context) error {
return err
}

slotIteratorMetrics := iterator.NewSlotMetrics("xatu_cannon")
checkpointIteratorMetrics := iterator.NewCheckpointMetrics("xatu_cannon")

finalizedCheckpoint := "finalized"

eventDerivers := []deriver.EventDeriver{
v2.NewAttesterSlashingDeriver(
c.log,
&c.Config.Derivers.AttesterSlashingConfig,
iterator.NewSlotIterator(
iterator.NewCheckpointIterator(
c.log,
networkName,
networkID,
xatu.CannonType_BEACON_API_ETH_V2_BEACON_BLOCK_ATTESTER_SLASHING,
c.coordinatorClient,
wallclock,
&slotIteratorMetrics,
*c.Config.Derivers.AttesterSlashingConfig.HeadSlotLag,
&checkpointIteratorMetrics,
c.beacon,
finalizedCheckpoint,
),
c.beacon,
clientMeta,
),
v2.NewProposerSlashingDeriver(
c.log,
&c.Config.Derivers.ProposerSlashingConfig,
iterator.NewSlotIterator(
iterator.NewCheckpointIterator(
c.log,
networkName,
networkID,
xatu.CannonType_BEACON_API_ETH_V2_BEACON_BLOCK_PROPOSER_SLASHING,
c.coordinatorClient,
wallclock,
&slotIteratorMetrics,
*c.Config.Derivers.ProposerSlashingConfig.HeadSlotLag,
&checkpointIteratorMetrics,
c.beacon,
finalizedCheckpoint,
),
c.beacon,
clientMeta,
),
v2.NewVoluntaryExitDeriver(
c.log,
&c.Config.Derivers.VoluntaryExitConfig,
iterator.NewSlotIterator(
iterator.NewCheckpointIterator(
c.log,
networkName,
networkID,
xatu.CannonType_BEACON_API_ETH_V2_BEACON_BLOCK_VOLUNTARY_EXIT,
c.coordinatorClient,
wallclock,
&slotIteratorMetrics,
*c.Config.Derivers.VoluntaryExitConfig.HeadSlotLag,
&checkpointIteratorMetrics,
c.beacon,
finalizedCheckpoint,
),
c.beacon,
clientMeta,
),
v2.NewDepositDeriver(
c.log,
&c.Config.Derivers.DepositConfig,
iterator.NewSlotIterator(
iterator.NewCheckpointIterator(
c.log,
networkName,
networkID,
xatu.CannonType_BEACON_API_ETH_V2_BEACON_BLOCK_DEPOSIT,
c.coordinatorClient,
wallclock,
&slotIteratorMetrics,
*c.Config.Derivers.DepositConfig.HeadSlotLag,
&checkpointIteratorMetrics,
c.beacon,
finalizedCheckpoint,
),
c.beacon,
clientMeta,
),
v2.NewBLSToExecutionChangeDeriver(
c.log,
&c.Config.Derivers.BLSToExecutionConfig,
iterator.NewSlotIterator(
iterator.NewCheckpointIterator(
c.log,
networkName,
networkID,
xatu.CannonType_BEACON_API_ETH_V2_BEACON_BLOCK_BLS_TO_EXECUTION_CHANGE,
c.coordinatorClient,
wallclock,
&slotIteratorMetrics,
*c.Config.Derivers.BLSToExecutionConfig.HeadSlotLag,
&checkpointIteratorMetrics,
c.beacon,
finalizedCheckpoint,
),
c.beacon,
clientMeta,
),
v2.NewExecutionTransactionDeriver(
c.log,
&c.Config.Derivers.ExecutionTransactionConfig,
iterator.NewSlotIterator(
iterator.NewCheckpointIterator(
c.log,
networkName,
networkID,
xatu.CannonType_BEACON_API_ETH_V2_BEACON_BLOCK_EXECUTION_TRANSACTION,
c.coordinatorClient,
wallclock,
&slotIteratorMetrics,
*c.Config.Derivers.ExecutionTransactionConfig.HeadSlotLag,
&checkpointIteratorMetrics,
c.beacon,
finalizedCheckpoint,
),
c.beacon,
clientMeta,
Expand Down
34 changes: 28 additions & 6 deletions pkg/cannon/deriver/beacon/eth/v2/attester_slashing.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,14 @@ type AttesterSlashingDeriverConfig struct {
type AttesterSlashingDeriver struct {
log logrus.FieldLogger
cfg *AttesterSlashingDeriverConfig
iterator *iterator.SlotIterator
iterator *iterator.CheckpointIterator
onEventCallbacks []func(ctx context.Context, event *xatu.DecoratedEvent) error
onLocationCallbacks []func(ctx context.Context, loc uint64) error
beacon *ethereum.BeaconNode
clientMeta *xatu.ClientMeta
}

func NewAttesterSlashingDeriver(log logrus.FieldLogger, config *AttesterSlashingDeriverConfig, iter *iterator.SlotIterator, beacon *ethereum.BeaconNode, clientMeta *xatu.ClientMeta) *AttesterSlashingDeriver {
func NewAttesterSlashingDeriver(log logrus.FieldLogger, config *AttesterSlashingDeriverConfig, iter *iterator.CheckpointIterator, beacon *ethereum.BeaconNode, clientMeta *xatu.ClientMeta) *AttesterSlashingDeriver {
return &AttesterSlashingDeriver{
log: log.WithField("module", "cannon/event/beacon/eth/v2/attester_slashing"),
cfg: config,
Expand Down Expand Up @@ -106,15 +106,15 @@ func (a *AttesterSlashingDeriver) run(ctx context.Context) {
}

for _, fn := range a.onLocationCallbacks {
if errr := fn(ctx, location.GetEthV2BeaconBlockAttesterSlashing().GetSlot()); errr != nil {
if errr := fn(ctx, location.GetEthV2BeaconBlockAttesterSlashing().GetEpoch()); errr != nil {
a.log.WithError(errr).Error("Failed to send location")
}
}

// Process the slot
events, err := a.processSlot(ctx, phase0.Slot(location.GetEthV2BeaconBlockAttesterSlashing().GetSlot()))
// Process the epoch
events, err := a.processEpoch(ctx, phase0.Epoch(location.GetEthV2BeaconBlockAttesterSlashing().GetEpoch()))
if err != nil {
a.log.WithError(err).Error("Failed to process slot")
a.log.WithError(err).Error("Failed to process epoch")

return err
}
Expand Down Expand Up @@ -147,6 +147,28 @@ func (a *AttesterSlashingDeriver) run(ctx context.Context) {
}
}

func (a *AttesterSlashingDeriver) processEpoch(ctx context.Context, epoch phase0.Epoch) ([]*xatu.DecoratedEvent, error) {
sp, err := a.beacon.Node().Spec()
if err != nil {
return nil, errors.Wrap(err, "failed to obtain spec")
}

allEvents := []*xatu.DecoratedEvent{}

for i := uint64(0); i <= uint64(sp.SlotsPerEpoch); i++ {
slot := phase0.Slot(i + uint64(epoch))

events, err := a.processSlot(ctx, slot)
if err != nil {
return nil, errors.Wrapf(err, "failed to process slot %d", slot)
}

allEvents = append(allEvents, events...)
}

return allEvents, nil
}

func (a *AttesterSlashingDeriver) processSlot(ctx context.Context, slot phase0.Slot) ([]*xatu.DecoratedEvent, error) {
// Get the block
block, err := a.beacon.GetBeaconBlock(ctx, xatuethv1.SlotAsString(slot))
Expand Down
34 changes: 28 additions & 6 deletions pkg/cannon/deriver/beacon/eth/v2/bls_to_execution_change.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,14 @@ type BLSToExecutionChangeDeriverConfig struct {
type BLSToExecutionChangeDeriver struct {
log logrus.FieldLogger
cfg *BLSToExecutionChangeDeriverConfig
iterator *iterator.SlotIterator
iterator *iterator.CheckpointIterator
onEventCallbacks []func(ctx context.Context, event *xatu.DecoratedEvent) error
onLocationCallbacks []func(ctx context.Context, loc uint64) error
beacon *ethereum.BeaconNode
clientMeta *xatu.ClientMeta
}

func NewBLSToExecutionChangeDeriver(log logrus.FieldLogger, config *BLSToExecutionChangeDeriverConfig, iter *iterator.SlotIterator, beacon *ethereum.BeaconNode, clientMeta *xatu.ClientMeta) *BLSToExecutionChangeDeriver {
func NewBLSToExecutionChangeDeriver(log logrus.FieldLogger, config *BLSToExecutionChangeDeriverConfig, iter *iterator.CheckpointIterator, beacon *ethereum.BeaconNode, clientMeta *xatu.ClientMeta) *BLSToExecutionChangeDeriver {
return &BLSToExecutionChangeDeriver{
log: log.WithField("module", "cannon/event/beacon/eth/v2/bls_to_execution_change"),
cfg: config,
Expand Down Expand Up @@ -109,15 +109,15 @@ func (b *BLSToExecutionChangeDeriver) run(ctx context.Context) {
}

for _, fn := range b.onLocationCallbacks {
if errr := fn(ctx, location.GetEthV2BeaconBlockBlsToExecutionChange().GetSlot()); errr != nil {
if errr := fn(ctx, location.GetEthV2BeaconBlockBlsToExecutionChange().GetEpoch()); errr != nil {
b.log.WithError(errr).Error("Failed to send location")
}
}

// Process the slot
events, err := b.processSlot(ctx, phase0.Slot(location.GetEthV2BeaconBlockBlsToExecutionChange().GetSlot()))
// Process the epoch
events, err := b.processEpoch(ctx, phase0.Epoch(location.GetEthV2BeaconBlockBlsToExecutionChange().GetEpoch()))
if err != nil {
b.log.WithError(err).Error("Failed to process slot")
b.log.WithError(err).Error("Failed to process epoch")

return err
}
Expand Down Expand Up @@ -150,6 +150,28 @@ func (b *BLSToExecutionChangeDeriver) run(ctx context.Context) {
}
}

func (b *BLSToExecutionChangeDeriver) processEpoch(ctx context.Context, epoch phase0.Epoch) ([]*xatu.DecoratedEvent, error) {
sp, err := b.beacon.Node().Spec()
if err != nil {
return nil, errors.Wrap(err, "failed to obtain spec")
}

allEvents := []*xatu.DecoratedEvent{}

for i := uint64(0); i <= uint64(sp.SlotsPerEpoch); i++ {
slot := phase0.Slot(i + uint64(epoch))

events, err := b.processSlot(ctx, slot)
if err != nil {
return nil, errors.Wrapf(err, "failed to process slot %d", slot)
}

allEvents = append(allEvents, events...)
}

return allEvents, nil
}

func (b *BLSToExecutionChangeDeriver) processSlot(ctx context.Context, slot phase0.Slot) ([]*xatu.DecoratedEvent, error) {
// Get the block
block, err := b.beacon.GetBeaconBlock(ctx, xatuethv1.SlotAsString(slot))
Expand Down
34 changes: 28 additions & 6 deletions pkg/cannon/deriver/beacon/eth/v2/deposit.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,14 @@ type DepositDeriverConfig struct {
type DepositDeriver struct {
log logrus.FieldLogger
cfg *DepositDeriverConfig
iterator *iterator.SlotIterator
iterator *iterator.CheckpointIterator
onEventCallbacks []func(ctx context.Context, event *xatu.DecoratedEvent) error
onLocationCallbacks []func(ctx context.Context, location uint64) error
beacon *ethereum.BeaconNode
clientMeta *xatu.ClientMeta
}

func NewDepositDeriver(log logrus.FieldLogger, config *DepositDeriverConfig, iter *iterator.SlotIterator, beacon *ethereum.BeaconNode, clientMeta *xatu.ClientMeta) *DepositDeriver {
func NewDepositDeriver(log logrus.FieldLogger, config *DepositDeriverConfig, iter *iterator.CheckpointIterator, beacon *ethereum.BeaconNode, clientMeta *xatu.ClientMeta) *DepositDeriver {
return &DepositDeriver{
log: log.WithField("module", "cannon/event/beacon/eth/v2/deposit"),
cfg: config,
Expand Down Expand Up @@ -107,15 +107,15 @@ func (b *DepositDeriver) run(ctx context.Context) {
}

for _, fn := range b.onLocationCallbacks {
if errr := fn(ctx, location.GetEthV2BeaconBlockDeposit().GetSlot()); errr != nil {
if errr := fn(ctx, location.GetEthV2BeaconBlockDeposit().GetEpoch()); errr != nil {
b.log.WithError(errr).Error("Failed to send location")
}
}

// Process the slot
events, err := b.processSlot(ctx, phase0.Slot(location.GetEthV2BeaconBlockDeposit().GetSlot()))
// Process the epoch
events, err := b.processEpoch(ctx, phase0.Epoch(location.GetEthV2BeaconBlockDeposit().GetEpoch()))
if err != nil {
b.log.WithError(err).Error("Failed to process slot")
b.log.WithError(err).Error("Failed to process epoch")

return err
}
Expand Down Expand Up @@ -148,6 +148,28 @@ func (b *DepositDeriver) run(ctx context.Context) {
}
}

func (b *DepositDeriver) processEpoch(ctx context.Context, epoch phase0.Epoch) ([]*xatu.DecoratedEvent, error) {
sp, err := b.beacon.Node().Spec()
if err != nil {
return nil, errors.Wrap(err, "failed to obtain spec")
}

allEvents := []*xatu.DecoratedEvent{}

for i := uint64(0); i <= uint64(sp.SlotsPerEpoch); i++ {
slot := phase0.Slot(i + uint64(epoch))

events, err := b.processSlot(ctx, slot)
if err != nil {
return nil, errors.Wrapf(err, "failed to process slot %d", slot)
}

allEvents = append(allEvents, events...)
}

return allEvents, nil
}

func (b *DepositDeriver) processSlot(ctx context.Context, slot phase0.Slot) ([]*xatu.DecoratedEvent, error) {
// Get the block
block, err := b.beacon.GetBeaconBlock(ctx, xatuethv1.SlotAsString(slot))
Expand Down
34 changes: 28 additions & 6 deletions pkg/cannon/deriver/beacon/eth/v2/execution_transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import (
type ExecutionTransactionDeriver struct {
log logrus.FieldLogger
cfg *ExecutionTransactionDeriverConfig
iterator *iterator.SlotIterator
iterator *iterator.CheckpointIterator
onEventCallbacks []func(ctx context.Context, event *xatu.DecoratedEvent) error
onLocationCallbacks []func(ctx context.Context, location uint64) error
beacon *ethereum.BeaconNode
Expand All @@ -42,7 +42,7 @@ const (
ExecutionTransactionDeriverName = xatu.CannonType_BEACON_API_ETH_V2_BEACON_BLOCK_EXECUTION_TRANSACTION
)

func NewExecutionTransactionDeriver(log logrus.FieldLogger, config *ExecutionTransactionDeriverConfig, iter *iterator.SlotIterator, beacon *ethereum.BeaconNode, clientMeta *xatu.ClientMeta) *ExecutionTransactionDeriver {
func NewExecutionTransactionDeriver(log logrus.FieldLogger, config *ExecutionTransactionDeriverConfig, iter *iterator.CheckpointIterator, beacon *ethereum.BeaconNode, clientMeta *xatu.ClientMeta) *ExecutionTransactionDeriver {
return &ExecutionTransactionDeriver{
log: log.WithField("module", "cannon/event/beacon/eth/v2/execution_transaction"),
cfg: config,
Expand Down Expand Up @@ -110,15 +110,15 @@ func (b *ExecutionTransactionDeriver) run(ctx context.Context) {
}

for _, fn := range b.onLocationCallbacks {
if errr := fn(ctx, location.GetEthV2BeaconBlockExecutionTransaction().GetSlot()); errr != nil {
if errr := fn(ctx, location.GetEthV2BeaconBlockExecutionTransaction().GetEpoch()); errr != nil {
b.log.WithError(errr).Error("Failed to send location")
}
}

// Process the slot
events, err := b.processSlot(ctx, phase0.Slot(location.GetEthV2BeaconBlockExecutionTransaction().GetSlot()))
// Process the epoch
events, err := b.processEpoch(ctx, phase0.Epoch(location.GetEthV2BeaconBlockExecutionTransaction().GetEpoch()))
if err != nil {
b.log.WithError(err).Error("Failed to process slot")
b.log.WithError(err).Error("Failed to process epoch")

return err
}
Expand Down Expand Up @@ -151,6 +151,28 @@ func (b *ExecutionTransactionDeriver) run(ctx context.Context) {
}
}

func (b *ExecutionTransactionDeriver) processEpoch(ctx context.Context, epoch phase0.Epoch) ([]*xatu.DecoratedEvent, error) {
sp, err := b.beacon.Node().Spec()
if err != nil {
return nil, errors.Wrap(err, "failed to obtain spec")
}

allEvents := []*xatu.DecoratedEvent{}

for i := uint64(0); i <= uint64(sp.SlotsPerEpoch); i++ {
slot := phase0.Slot(i + uint64(epoch))

events, err := b.processSlot(ctx, slot)
if err != nil {
return nil, errors.Wrapf(err, "failed to process slot %d", slot)
}

allEvents = append(allEvents, events...)
}

return allEvents, nil
}

func (b *ExecutionTransactionDeriver) processSlot(ctx context.Context, slot phase0.Slot) ([]*xatu.DecoratedEvent, error) {
// Get the block
block, err := b.beacon.GetBeaconBlock(ctx, xatuethv1.SlotAsString(slot))
Expand Down
Loading

0 comments on commit 99f2e42

Please sign in to comment.