Skip to content

Commit

Permalink
feat(cannon): Convert all to backfilling checkpoint (#345)
Browse files Browse the repository at this point in the history
* Backfilling

* refactor: Update comments in checkpoint iterator file

* Convert all to backfilling checkpoint

* refactor: Remove unused checkAndMigrateFromCheckpointIterator function

* feat: Add lag metrics to backfilling checkpoint iterator

* style: Remove unnecessary blank line
  • Loading branch information
samcm authored Jul 5, 2024
1 parent 033a368 commit ec94902
Show file tree
Hide file tree
Showing 22 changed files with 1,177 additions and 1,180 deletions.
50 changes: 30 additions & 20 deletions pkg/cannon/cannon.go
Original file line number Diff line number Diff line change
Expand Up @@ -337,8 +337,6 @@ func (c *Cannon) startBeaconBlockProcessor(ctx context.Context) error {
return err
}

checkpointIteratorMetrics := iterator.NewCheckpointMetrics("xatu_cannon")

backfillingCheckpointIteratorMetrics := iterator.NewBackfillingCheckpointMetrics("xatu_cannon")

blockprintIteratorMetrics := iterator.NewBlockprintMetrics("xatu_cannon")
Expand All @@ -354,135 +352,143 @@ func (c *Cannon) startBeaconBlockProcessor(ctx context.Context) error {
v2.NewAttesterSlashingDeriver(
c.log,
&c.Config.Derivers.AttesterSlashingConfig,
iterator.NewCheckpointIterator(
iterator.NewBackfillingCheckpoint(
c.log,
networkName,
networkID,
xatu.CannonType_BEACON_API_ETH_V2_BEACON_BLOCK_ATTESTER_SLASHING,
c.coordinatorClient,
wallclock,
&checkpointIteratorMetrics,
&backfillingCheckpointIteratorMetrics,
c.beacon,
finalizedCheckpoint,
3,
),
c.beacon,
clientMeta,
),
v2.NewProposerSlashingDeriver(
c.log,
&c.Config.Derivers.ProposerSlashingConfig,
iterator.NewCheckpointIterator(
iterator.NewBackfillingCheckpoint(
c.log,
networkName,
networkID,
xatu.CannonType_BEACON_API_ETH_V2_BEACON_BLOCK_PROPOSER_SLASHING,
c.coordinatorClient,
wallclock,
&checkpointIteratorMetrics,
&backfillingCheckpointIteratorMetrics,
c.beacon,
finalizedCheckpoint,
3,
),
c.beacon,
clientMeta,
),
v2.NewVoluntaryExitDeriver(
c.log,
&c.Config.Derivers.VoluntaryExitConfig,
iterator.NewCheckpointIterator(
iterator.NewBackfillingCheckpoint(
c.log,
networkName,
networkID,
xatu.CannonType_BEACON_API_ETH_V2_BEACON_BLOCK_VOLUNTARY_EXIT,
c.coordinatorClient,
wallclock,
&checkpointIteratorMetrics,
&backfillingCheckpointIteratorMetrics,
c.beacon,
finalizedCheckpoint,
3,
),
c.beacon,
clientMeta,
),
v2.NewDepositDeriver(
c.log,
&c.Config.Derivers.DepositConfig,
iterator.NewCheckpointIterator(
iterator.NewBackfillingCheckpoint(
c.log,
networkName,
networkID,
xatu.CannonType_BEACON_API_ETH_V2_BEACON_BLOCK_DEPOSIT,
c.coordinatorClient,
wallclock,
&checkpointIteratorMetrics,
&backfillingCheckpointIteratorMetrics,
c.beacon,
finalizedCheckpoint,
3,
),
c.beacon,
clientMeta,
),
v2.NewBLSToExecutionChangeDeriver(
c.log,
&c.Config.Derivers.BLSToExecutionConfig,
iterator.NewCheckpointIterator(
iterator.NewBackfillingCheckpoint(
c.log,
networkName,
networkID,
xatu.CannonType_BEACON_API_ETH_V2_BEACON_BLOCK_BLS_TO_EXECUTION_CHANGE,
c.coordinatorClient,
wallclock,
&checkpointIteratorMetrics,
&backfillingCheckpointIteratorMetrics,
c.beacon,
finalizedCheckpoint,
3,
),
c.beacon,
clientMeta,
),
v2.NewExecutionTransactionDeriver(
c.log,
&c.Config.Derivers.ExecutionTransactionConfig,
iterator.NewCheckpointIterator(
iterator.NewBackfillingCheckpoint(
c.log,
networkName,
networkID,
xatu.CannonType_BEACON_API_ETH_V2_BEACON_BLOCK_EXECUTION_TRANSACTION,
c.coordinatorClient,
wallclock,
&checkpointIteratorMetrics,
&backfillingCheckpointIteratorMetrics,
c.beacon,
finalizedCheckpoint,
3,
),
c.beacon,
clientMeta,
),
v2.NewWithdrawalDeriver(
c.log,
&c.Config.Derivers.WithdrawalConfig,
iterator.NewCheckpointIterator(
iterator.NewBackfillingCheckpoint(
c.log,
networkName,
networkID,
xatu.CannonType_BEACON_API_ETH_V2_BEACON_BLOCK_WITHDRAWAL,
c.coordinatorClient,
wallclock,
&checkpointIteratorMetrics,
&backfillingCheckpointIteratorMetrics,
c.beacon,
finalizedCheckpoint,
3,
),
c.beacon,
clientMeta,
),
v2.NewBeaconBlockDeriver(
c.log,
&c.Config.Derivers.BeaconBlockConfig,
iterator.NewCheckpointIterator(
iterator.NewBackfillingCheckpoint(
c.log,
networkName,
networkID,
xatu.CannonType_BEACON_API_ETH_V2_BEACON_BLOCK,
c.coordinatorClient,
wallclock,
&checkpointIteratorMetrics,
&backfillingCheckpointIteratorMetrics,
c.beacon,
finalizedCheckpoint,
3,
),
c.beacon,
clientMeta,
Expand All @@ -506,16 +512,17 @@ func (c *Cannon) startBeaconBlockProcessor(ctx context.Context) error {
v1.NewBeaconBlobDeriver(
c.log,
&c.Config.Derivers.BeaconBlobSidecarConfig,
iterator.NewCheckpointIterator(
iterator.NewBackfillingCheckpoint(
c.log,
networkName,
networkID,
xatu.CannonType_BEACON_API_ETH_V1_BEACON_BLOB_SIDECAR,
c.coordinatorClient,
wallclock,
&checkpointIteratorMetrics,
&backfillingCheckpointIteratorMetrics,
c.beacon,
finalizedCheckpoint,
3,
),
c.beacon,
clientMeta,
Expand All @@ -533,6 +540,7 @@ func (c *Cannon) startBeaconBlockProcessor(ctx context.Context) error {
&backfillingCheckpointIteratorMetrics,
c.beacon,
finalizedCheckpoint,
3,
),
c.beacon,
clientMeta,
Expand All @@ -550,6 +558,7 @@ func (c *Cannon) startBeaconBlockProcessor(ctx context.Context) error {
&backfillingCheckpointIteratorMetrics,
c.beacon,
finalizedCheckpoint,
3,
),
c.beacon,
clientMeta,
Expand All @@ -567,6 +576,7 @@ func (c *Cannon) startBeaconBlockProcessor(ctx context.Context) error {
&backfillingCheckpointIteratorMetrics,
c.beacon,
finalizedCheckpoint,
2,
),
c.beacon,
clientMeta,
Expand Down
24 changes: 10 additions & 14 deletions pkg/cannon/deriver/beacon/eth/v1/beacon_blob.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,13 @@ type BeaconBlobDeriverConfig struct {
type BeaconBlobDeriver struct {
log logrus.FieldLogger
cfg *BeaconBlobDeriverConfig
iterator *iterator.CheckpointIterator
iterator *iterator.BackfillingCheckpoint
onEventsCallbacks []func(ctx context.Context, events []*xatu.DecoratedEvent) error
beacon *ethereum.BeaconNode
clientMeta *xatu.ClientMeta
}

func NewBeaconBlobDeriver(log logrus.FieldLogger, config *BeaconBlobDeriverConfig, iter *iterator.CheckpointIterator, beacon *ethereum.BeaconNode, clientMeta *xatu.ClientMeta) *BeaconBlobDeriver {
func NewBeaconBlobDeriver(log logrus.FieldLogger, config *BeaconBlobDeriverConfig, iter *iterator.BackfillingCheckpoint, beacon *ethereum.BeaconNode, clientMeta *xatu.ClientMeta) *BeaconBlobDeriver {
return &BeaconBlobDeriver{
log: log.WithField("module", "cannon/event/beacon/eth/v1/beacon_blob"),
cfg: config,
Expand Down Expand Up @@ -78,6 +78,10 @@ func (b *BeaconBlobDeriver) Start(ctx context.Context) error {

b.log.Info("Beacon blob deriver enabled")

if err := b.iterator.Start(ctx); err != nil {
return errors.Wrap(err, "failed to start iterator")
}

// Start our main loop
b.run(ctx)

Expand Down Expand Up @@ -118,18 +122,16 @@ func (b *BeaconBlobDeriver) run(rctx context.Context) {

span.AddEvent("Grabbing next location")

// Get the next slot
location, _, err := b.iterator.Next(ctx)
// Get the next position.
position, err := b.iterator.Next(ctx)
if err != nil {
span.SetStatus(codes.Error, err.Error())

return err
}

span.AddEvent("Obtained next location. Processing epoch...", trace.WithAttributes(attribute.Int64("location", int64(location.GetEthV1BeaconBlobSidecar().GetEpoch()))))

// Process the epoch
events, err := b.processEpoch(ctx, phase0.Epoch(location.GetEthV1BeaconBlobSidecar().GetEpoch()))
events, err := b.processEpoch(ctx, position.Next)
if err != nil {
b.log.WithError(err).Error("Failed to process epoch")

Expand All @@ -138,8 +140,6 @@ func (b *BeaconBlobDeriver) run(rctx context.Context) {
return err
}

span.AddEvent("Epoch processing complete. Sending events...")

// Send the events
for _, fn := range b.onEventsCallbacks {
if err := fn(ctx, events); err != nil {
Expand All @@ -149,17 +149,13 @@ func (b *BeaconBlobDeriver) run(rctx context.Context) {
}
}

span.AddEvent("Events sent. Updating location...")

// Update our location
if err := b.iterator.UpdateLocation(ctx, location); err != nil {
if err := b.iterator.UpdateLocation(ctx, position.Next, position.Direction); err != nil {
span.SetStatus(codes.Error, err.Error())

return err
}

span.AddEvent("Location updated. Done.")

bo.Reset()

return nil
Expand Down
Loading

0 comments on commit ec94902

Please sign in to comment.