diff --git a/CHANGELOG.md b/CHANGELOG.md index ac1376d1..4a3ec0f4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,7 @@ dev: - add reduced memory mode for memory-constrained systems - reduce memory usage when obtaining Dirk accounts - reduce memory usage when generating beacon committee subscriptions + - add "controller.fast-track" flag 1.9.0-alpha.2: - allow Vouch to act as an MEV-boost client for non-Vouch validators diff --git a/docs/configuration.md b/docs/configuration.md index 5056e45d..709320f1 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -62,6 +62,12 @@ graffiti: static: value: 'My graffiti' +# controller controls when validating actions take place. +controller: + # If fast-track is true then Vouch will attest and send sync committee messages as soon as it receives notification + # of receipt of a block for the duties' slot. + fast-track: true + # beaconblockproposer provides control of the beacon block proposal process. beaconblockproposer: # If unblind-from-all-relays is true then Vouch will use all relays that it asked for blocks to unblind the diff --git a/main.go b/main.go index c462fe9d..436007a8 100644 --- a/main.go +++ b/main.go @@ -233,6 +233,7 @@ func fetchConfig() error { viper.SetDefault("controller.max-sync-committee-message-delay", 4*time.Second) viper.SetDefault("controller.attestation-aggregation-delay", 8*time.Second) viper.SetDefault("controller.sync-committee-aggregation-delay", 8*time.Second) + viper.SetDefault("controller.fast-track", true) viper.SetDefault("blockrelay.timeout", 1*time.Second) viper.SetDefault("blockrelay.listen-address", "0.0.0.0:18550") viper.SetDefault("blockrelay.fallback-gas-limit", uint64(30000000)) @@ -431,6 +432,7 @@ func startServices(ctx context.Context, standardcontroller.WithAttestationAggregationDelay(viper.GetDuration("controller.attestation-aggregation-delay")), standardcontroller.WithMaxSyncCommitteeMessageDelay(viper.GetDuration("controller.max-sync-committee-message-delay")), standardcontroller.WithSyncCommitteeAggregationDelay(viper.GetDuration("controller.sync-committee-aggregation-delay")), + standardcontroller.WithFastTrack(viper.GetBool("controller.fast-track")), ) if err != nil { return nil, nil, errors.Wrap(err, "failed to start controller service") diff --git a/services/controller/standard/events.go b/services/controller/standard/events.go index 740cbc07..751e8802 100644 --- a/services/controller/standard/events.go +++ b/services/controller/standard/events.go @@ -48,24 +48,13 @@ func (s *Service) HandleHeadEvent(event *api.Event) { return } - var zeroRoot phase0.Root - data := event.Data.(*api.HeadEvent) - log := log.With().Uint64("slot", uint64(data.Slot)).Logger() - log.Trace().Msg("Received head event") + log.Trace().Uint64("slot", uint64(data.Slot)).Msg("Received head event") if data.Slot != s.chainTimeService.CurrentSlot() { return } - // Old versions of teku send a synthetic head event when they don't receive a block - // by a certain time after start of the slot. We only care about real block updates - // for the purposes of this function, so ignore them. - if !bytes.Equal(s.lastBlockRoot[:], zeroRoot[:]) && - bytes.Equal(s.lastBlockRoot[:], data.Block[:]) { - log.Trace().Msg("Synthetic head event; ignoring") - return - } s.lastBlockRoot = data.Block epoch := s.chainTimeService.SlotToEpoch(data.Slot) @@ -76,73 +65,101 @@ func (s *Service) HandleHeadEvent(event *api.Event) { if data.Slot == s.chainTimeService.CurrentSlot()-1 && s.maxProposalDelay > 0 { proposalJobName := fmt.Sprintf("Beacon block proposal for slot %d", s.chainTimeService.CurrentSlot()) if s.scheduler.JobExists(ctx, proposalJobName) { - log.Trace().Msg("Kicking off proposal for slot now that parent block for last slot has arrived") + log.Trace().Uint64("slot", uint64(data.Slot)).Msg("Kicking off proposal for slot now that parent block for last slot has arrived") s.scheduler.RunJobIfExists(ctx, proposalJobName) } } + s.checkEventForReorg(ctx, epoch, data.Slot, data.PreviousDutyDependentRoot, data.CurrentDutyDependentRoot) + + if s.fastTrack { + s.fastTrackJobs(ctx, data.Slot) + } + + // Remove old subscriptions if present. + s.subscriptionInfosMutex.Lock() + delete(s.subscriptionInfos, s.chainTimeService.SlotToEpoch(data.Slot)-2) + s.subscriptionInfosMutex.Unlock() +} + +// checkEventForReorg check data in the event against information that we already have to see if +// a chain reorg may have occurred, and if so handle it. +func (s *Service) checkEventForReorg(ctx context.Context, + epoch phase0.Epoch, + slot phase0.Slot, + previousDutyDependentRoot phase0.Root, + currentDutyDependentRoot phase0.Root, +) { + var zeroRoot phase0.Root + // Check to see if there is a reorganisation that requires re-fetching duties. if s.lastBlockEpoch != 0 { if epoch > s.lastBlockEpoch { log.Trace(). + Uint64("slot", uint64(slot)). Str("old_previous_dependent_root", fmt.Sprintf("%#x", s.previousDutyDependentRoot)). - Str("new_previous_dependent_root", fmt.Sprintf("%#x", data.PreviousDutyDependentRoot)). + Str("new_previous_dependent_root", fmt.Sprintf("%#x", previousDutyDependentRoot)). Str("old_current_dependent_root", fmt.Sprintf("%#x", s.currentDutyDependentRoot)). - Str("new_current_dependent_root", fmt.Sprintf("%#x", data.CurrentDutyDependentRoot)). + Str("new_current_dependent_root", fmt.Sprintf("%#x", currentDutyDependentRoot)). Msg("Change of epoch") // Change of epoch. Ensure that the new previous dependent root is the same as // the old current root. if !bytes.Equal(s.previousDutyDependentRoot[:], zeroRoot[:]) && - !bytes.Equal(s.currentDutyDependentRoot[:], data.PreviousDutyDependentRoot[:]) { + !bytes.Equal(s.currentDutyDependentRoot[:], previousDutyDependentRoot[:]) { log.Debug(). + Uint64("slot", uint64(slot)). Str("old_current_dependent_root", fmt.Sprintf("%#x", s.currentDutyDependentRoot[:])). - Str("new_previous_dependent_root", fmt.Sprintf("%#x", data.PreviousDutyDependentRoot[:])). + Str("new_previous_dependent_root", fmt.Sprintf("%#x", previousDutyDependentRoot[:])). Msg("Previous duty dependent root has changed on epoch transition") go s.handlePreviousDependentRootChanged(ctx) } } else { // Existing epoch. Ensure that the roots are the same. if !bytes.Equal(s.previousDutyDependentRoot[:], zeroRoot[:]) && - !bytes.Equal(s.previousDutyDependentRoot[:], data.PreviousDutyDependentRoot[:]) { + !bytes.Equal(s.previousDutyDependentRoot[:], previousDutyDependentRoot[:]) { log.Debug(). + Uint64("slot", uint64(slot)). Str("old_dependent_root", fmt.Sprintf("%#x", s.previousDutyDependentRoot[:])). - Str("new_dependent_root", fmt.Sprintf("%#x", data.PreviousDutyDependentRoot[:])). + Str("new_dependent_root", fmt.Sprintf("%#x", previousDutyDependentRoot[:])). Msg("Previous duty dependent root has changed") go s.handlePreviousDependentRootChanged(ctx) } if !bytes.Equal(s.currentDutyDependentRoot[:], zeroRoot[:]) && - !bytes.Equal(s.currentDutyDependentRoot[:], data.CurrentDutyDependentRoot[:]) { + !bytes.Equal(s.currentDutyDependentRoot[:], currentDutyDependentRoot[:]) { log.Debug(). + Uint64("slot", uint64(slot)). Str("old_dependent_root", fmt.Sprintf("%#x", s.currentDutyDependentRoot[:])). - Str("new_dependent_root", fmt.Sprintf("%#x", data.CurrentDutyDependentRoot[:])). + Str("new_dependent_root", fmt.Sprintf("%#x", currentDutyDependentRoot[:])). Msg("Current duty dependent root has changed") go s.handleCurrentDependentRootChanged(ctx) } } } + s.lastBlockEpoch = epoch - s.previousDutyDependentRoot = data.PreviousDutyDependentRoot - s.currentDutyDependentRoot = data.CurrentDutyDependentRoot + s.previousDutyDependentRoot = previousDutyDependentRoot + s.currentDutyDependentRoot = currentDutyDependentRoot +} - // We give the block some time to propagate around the rest of the - // nodes before kicking off attestations and sync committees for the block's slot. +// fastTrackJobs kicks off jobs when a block has been seen early. +func (s *Service) fastTrackJobs(ctx context.Context, + slot phase0.Slot, +) { + // We wait before fast tracking jobs to allow the block some time to propagate around the rest + // of the network before kicking off attestations and sync committees for the block's slot. time.Sleep(200 * time.Millisecond) - jobName := fmt.Sprintf("Attestations for slot %d", data.Slot) + + jobName := fmt.Sprintf("Attestations for slot %d", slot) if s.scheduler.JobExists(ctx, jobName) { log.Trace().Msg("Kicking off attestations for slot early due to receiving relevant block") s.scheduler.RunJobIfExists(ctx, jobName) } - jobName = fmt.Sprintf("Sync committee messages for slot %d", data.Slot) + jobName = fmt.Sprintf("Sync committee messages for slot %d", slot) if s.scheduler.JobExists(ctx, jobName) { log.Trace().Msg("Kicking off sync committee contributions for slot early due to receiving relevant block") s.scheduler.RunJobIfExists(ctx, jobName) } - - // Remove old subscriptions if present. - s.subscriptionInfosMutex.Lock() - delete(s.subscriptionInfos, s.chainTimeService.SlotToEpoch(data.Slot)-2) - s.subscriptionInfosMutex.Unlock() } // handlePreviousDependentRootChanged handles the situation where the previous diff --git a/services/controller/standard/parameters.go b/services/controller/standard/parameters.go index c36192c2..3d44fcd9 100644 --- a/services/controller/standard/parameters.go +++ b/services/controller/standard/parameters.go @@ -1,4 +1,4 @@ -// Copyright © 2020, 2021 Attestant Limited. +// Copyright © 2020 - 2024 Attestant Limited. // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at @@ -65,6 +65,7 @@ type parameters struct { attestationAggregationDelay time.Duration maxSyncCommitteeMessageDelay time.Duration syncCommitteeAggregationDelay time.Duration + fastTrack bool } // Parameter is the interface for service parameters. @@ -274,6 +275,13 @@ func WithSyncCommitteeAggregationDelay(delay time.Duration) Parameter { }) } +// WithFastTrack sets the fast track flag, attesting as soon as possible. +func WithFastTrack(fastTrack bool) Parameter { + return parameterFunc(func(p *parameters) { + p.fastTrack = fastTrack + }) +} + // parseAndCheckParameters parses and checks parameters to ensure that mandatory parameters are present and correct. func parseAndCheckParameters(params ...Parameter) (*parameters, error) { parameters := parameters{ diff --git a/services/controller/standard/service.go b/services/controller/standard/service.go index e7799a95..e8782b73 100644 --- a/services/controller/standard/service.go +++ b/services/controller/standard/service.go @@ -80,6 +80,7 @@ type Service struct { attestationAggregationDelay time.Duration maxSyncCommitteeMessageDelay time.Duration syncCommitteeAggregationDelay time.Duration + fastTrack bool // Hard fork control handlingAltair bool @@ -190,6 +191,7 @@ func New(ctx context.Context, params ...Parameter) (*Service, error) { attestationAggregationDelay: parameters.attestationAggregationDelay, maxSyncCommitteeMessageDelay: parameters.maxSyncCommitteeMessageDelay, syncCommitteeAggregationDelay: parameters.syncCommitteeAggregationDelay, + fastTrack: parameters.fastTrack, subscriptionInfos: make(map[phase0.Epoch]map[phase0.Slot]map[phase0.CommitteeIndex]*beaconcommitteesubscriber.Subscription), handlingAltair: handlingAltair, altairForkEpoch: altairForkEpoch,