Skip to content

Commit

Permalink
Merge pull request #202 from attestantio/fast-track
Browse files Browse the repository at this point in the history
Add fast track flag.
  • Loading branch information
mcdee authored Apr 18, 2024
2 parents 5fa7a12 + 5358faa commit 86276f9
Show file tree
Hide file tree
Showing 6 changed files with 69 additions and 33 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ dev:
- add reduced memory mode for memory-constrained systems
- reduce memory usage when obtaining Dirk accounts
- reduce memory usage when generating beacon committee subscriptions
- add "controller.fast-track" flag

1.9.0-alpha.2:
- allow Vouch to act as an MEV-boost client for non-Vouch validators
Expand Down
6 changes: 6 additions & 0 deletions docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,12 @@ graffiti:
static:
value: 'My graffiti'

# controller controls when validating actions take place.
controller:
# If fast-track is true then Vouch will attest and send sync committee messages as soon as it receives notification
# of receipt of a block for the duties' slot.
fast-track: true

# beaconblockproposer provides control of the beacon block proposal process.
beaconblockproposer:
# If unblind-from-all-relays is true then Vouch will use all relays that it asked for blocks to unblind the
Expand Down
2 changes: 2 additions & 0 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,7 @@ func fetchConfig() error {
viper.SetDefault("controller.max-sync-committee-message-delay", 4*time.Second)
viper.SetDefault("controller.attestation-aggregation-delay", 8*time.Second)
viper.SetDefault("controller.sync-committee-aggregation-delay", 8*time.Second)
viper.SetDefault("controller.fast-track", true)
viper.SetDefault("blockrelay.timeout", 1*time.Second)
viper.SetDefault("blockrelay.listen-address", "0.0.0.0:18550")
viper.SetDefault("blockrelay.fallback-gas-limit", uint64(30000000))
Expand Down Expand Up @@ -431,6 +432,7 @@ func startServices(ctx context.Context,
standardcontroller.WithAttestationAggregationDelay(viper.GetDuration("controller.attestation-aggregation-delay")),
standardcontroller.WithMaxSyncCommitteeMessageDelay(viper.GetDuration("controller.max-sync-committee-message-delay")),
standardcontroller.WithSyncCommitteeAggregationDelay(viper.GetDuration("controller.sync-committee-aggregation-delay")),
standardcontroller.WithFastTrack(viper.GetBool("controller.fast-track")),
)
if err != nil {
return nil, nil, errors.Wrap(err, "failed to start controller service")
Expand Down
81 changes: 49 additions & 32 deletions services/controller/standard/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,24 +48,13 @@ func (s *Service) HandleHeadEvent(event *api.Event) {
return
}

var zeroRoot phase0.Root

data := event.Data.(*api.HeadEvent)
log := log.With().Uint64("slot", uint64(data.Slot)).Logger()
log.Trace().Msg("Received head event")
log.Trace().Uint64("slot", uint64(data.Slot)).Msg("Received head event")

if data.Slot != s.chainTimeService.CurrentSlot() {
return
}

// Old versions of teku send a synthetic head event when they don't receive a block
// by a certain time after start of the slot. We only care about real block updates
// for the purposes of this function, so ignore them.
if !bytes.Equal(s.lastBlockRoot[:], zeroRoot[:]) &&
bytes.Equal(s.lastBlockRoot[:], data.Block[:]) {
log.Trace().Msg("Synthetic head event; ignoring")
return
}
s.lastBlockRoot = data.Block
epoch := s.chainTimeService.SlotToEpoch(data.Slot)

Expand All @@ -76,73 +65,101 @@ func (s *Service) HandleHeadEvent(event *api.Event) {
if data.Slot == s.chainTimeService.CurrentSlot()-1 && s.maxProposalDelay > 0 {
proposalJobName := fmt.Sprintf("Beacon block proposal for slot %d", s.chainTimeService.CurrentSlot())
if s.scheduler.JobExists(ctx, proposalJobName) {
log.Trace().Msg("Kicking off proposal for slot now that parent block for last slot has arrived")
log.Trace().Uint64("slot", uint64(data.Slot)).Msg("Kicking off proposal for slot now that parent block for last slot has arrived")
s.scheduler.RunJobIfExists(ctx, proposalJobName)
}
}

s.checkEventForReorg(ctx, epoch, data.Slot, data.PreviousDutyDependentRoot, data.CurrentDutyDependentRoot)

if s.fastTrack {
s.fastTrackJobs(ctx, data.Slot)
}

// Remove old subscriptions if present.
s.subscriptionInfosMutex.Lock()
delete(s.subscriptionInfos, s.chainTimeService.SlotToEpoch(data.Slot)-2)
s.subscriptionInfosMutex.Unlock()
}

// checkEventForReorg check data in the event against information that we already have to see if
// a chain reorg may have occurred, and if so handle it.
func (s *Service) checkEventForReorg(ctx context.Context,
epoch phase0.Epoch,
slot phase0.Slot,
previousDutyDependentRoot phase0.Root,
currentDutyDependentRoot phase0.Root,
) {
var zeroRoot phase0.Root

// Check to see if there is a reorganisation that requires re-fetching duties.
if s.lastBlockEpoch != 0 {
if epoch > s.lastBlockEpoch {
log.Trace().
Uint64("slot", uint64(slot)).
Str("old_previous_dependent_root", fmt.Sprintf("%#x", s.previousDutyDependentRoot)).
Str("new_previous_dependent_root", fmt.Sprintf("%#x", data.PreviousDutyDependentRoot)).
Str("new_previous_dependent_root", fmt.Sprintf("%#x", previousDutyDependentRoot)).
Str("old_current_dependent_root", fmt.Sprintf("%#x", s.currentDutyDependentRoot)).
Str("new_current_dependent_root", fmt.Sprintf("%#x", data.CurrentDutyDependentRoot)).
Str("new_current_dependent_root", fmt.Sprintf("%#x", currentDutyDependentRoot)).
Msg("Change of epoch")
// Change of epoch. Ensure that the new previous dependent root is the same as
// the old current root.
if !bytes.Equal(s.previousDutyDependentRoot[:], zeroRoot[:]) &&
!bytes.Equal(s.currentDutyDependentRoot[:], data.PreviousDutyDependentRoot[:]) {
!bytes.Equal(s.currentDutyDependentRoot[:], previousDutyDependentRoot[:]) {
log.Debug().
Uint64("slot", uint64(slot)).
Str("old_current_dependent_root", fmt.Sprintf("%#x", s.currentDutyDependentRoot[:])).
Str("new_previous_dependent_root", fmt.Sprintf("%#x", data.PreviousDutyDependentRoot[:])).
Str("new_previous_dependent_root", fmt.Sprintf("%#x", previousDutyDependentRoot[:])).
Msg("Previous duty dependent root has changed on epoch transition")
go s.handlePreviousDependentRootChanged(ctx)
}
} else {
// Existing epoch. Ensure that the roots are the same.
if !bytes.Equal(s.previousDutyDependentRoot[:], zeroRoot[:]) &&
!bytes.Equal(s.previousDutyDependentRoot[:], data.PreviousDutyDependentRoot[:]) {
!bytes.Equal(s.previousDutyDependentRoot[:], previousDutyDependentRoot[:]) {
log.Debug().
Uint64("slot", uint64(slot)).
Str("old_dependent_root", fmt.Sprintf("%#x", s.previousDutyDependentRoot[:])).
Str("new_dependent_root", fmt.Sprintf("%#x", data.PreviousDutyDependentRoot[:])).
Str("new_dependent_root", fmt.Sprintf("%#x", previousDutyDependentRoot[:])).
Msg("Previous duty dependent root has changed")
go s.handlePreviousDependentRootChanged(ctx)
}

if !bytes.Equal(s.currentDutyDependentRoot[:], zeroRoot[:]) &&
!bytes.Equal(s.currentDutyDependentRoot[:], data.CurrentDutyDependentRoot[:]) {
!bytes.Equal(s.currentDutyDependentRoot[:], currentDutyDependentRoot[:]) {
log.Debug().
Uint64("slot", uint64(slot)).
Str("old_dependent_root", fmt.Sprintf("%#x", s.currentDutyDependentRoot[:])).
Str("new_dependent_root", fmt.Sprintf("%#x", data.CurrentDutyDependentRoot[:])).
Str("new_dependent_root", fmt.Sprintf("%#x", currentDutyDependentRoot[:])).
Msg("Current duty dependent root has changed")
go s.handleCurrentDependentRootChanged(ctx)
}
}
}

s.lastBlockEpoch = epoch
s.previousDutyDependentRoot = data.PreviousDutyDependentRoot
s.currentDutyDependentRoot = data.CurrentDutyDependentRoot
s.previousDutyDependentRoot = previousDutyDependentRoot
s.currentDutyDependentRoot = currentDutyDependentRoot
}

// We give the block some time to propagate around the rest of the
// nodes before kicking off attestations and sync committees for the block's slot.
// fastTrackJobs kicks off jobs when a block has been seen early.
func (s *Service) fastTrackJobs(ctx context.Context,
slot phase0.Slot,
) {
// We wait before fast tracking jobs to allow the block some time to propagate around the rest
// of the network before kicking off attestations and sync committees for the block's slot.
time.Sleep(200 * time.Millisecond)
jobName := fmt.Sprintf("Attestations for slot %d", data.Slot)

jobName := fmt.Sprintf("Attestations for slot %d", slot)
if s.scheduler.JobExists(ctx, jobName) {
log.Trace().Msg("Kicking off attestations for slot early due to receiving relevant block")
s.scheduler.RunJobIfExists(ctx, jobName)
}
jobName = fmt.Sprintf("Sync committee messages for slot %d", data.Slot)
jobName = fmt.Sprintf("Sync committee messages for slot %d", slot)
if s.scheduler.JobExists(ctx, jobName) {
log.Trace().Msg("Kicking off sync committee contributions for slot early due to receiving relevant block")
s.scheduler.RunJobIfExists(ctx, jobName)
}

// Remove old subscriptions if present.
s.subscriptionInfosMutex.Lock()
delete(s.subscriptionInfos, s.chainTimeService.SlotToEpoch(data.Slot)-2)
s.subscriptionInfosMutex.Unlock()
}

// handlePreviousDependentRootChanged handles the situation where the previous
Expand Down
10 changes: 9 additions & 1 deletion services/controller/standard/parameters.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright © 2020, 2021 Attestant Limited.
// Copyright © 2020 - 2024 Attestant Limited.
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
Expand Down Expand Up @@ -65,6 +65,7 @@ type parameters struct {
attestationAggregationDelay time.Duration
maxSyncCommitteeMessageDelay time.Duration
syncCommitteeAggregationDelay time.Duration
fastTrack bool
}

// Parameter is the interface for service parameters.
Expand Down Expand Up @@ -274,6 +275,13 @@ func WithSyncCommitteeAggregationDelay(delay time.Duration) Parameter {
})
}

// WithFastTrack sets the fast track flag, attesting as soon as possible.
func WithFastTrack(fastTrack bool) Parameter {
return parameterFunc(func(p *parameters) {
p.fastTrack = fastTrack
})
}

// parseAndCheckParameters parses and checks parameters to ensure that mandatory parameters are present and correct.
func parseAndCheckParameters(params ...Parameter) (*parameters, error) {
parameters := parameters{
Expand Down
2 changes: 2 additions & 0 deletions services/controller/standard/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ type Service struct {
attestationAggregationDelay time.Duration
maxSyncCommitteeMessageDelay time.Duration
syncCommitteeAggregationDelay time.Duration
fastTrack bool

// Hard fork control
handlingAltair bool
Expand Down Expand Up @@ -190,6 +191,7 @@ func New(ctx context.Context, params ...Parameter) (*Service, error) {
attestationAggregationDelay: parameters.attestationAggregationDelay,
maxSyncCommitteeMessageDelay: parameters.maxSyncCommitteeMessageDelay,
syncCommitteeAggregationDelay: parameters.syncCommitteeAggregationDelay,
fastTrack: parameters.fastTrack,
subscriptionInfos: make(map[phase0.Epoch]map[phase0.Slot]map[phase0.CommitteeIndex]*beaconcommitteesubscriber.Subscription),
handlingAltair: handlingAltair,
altairForkEpoch: altairForkEpoch,
Expand Down

0 comments on commit 86276f9

Please sign in to comment.