diff --git a/CHANGELOG.md b/CHANGELOG.md index 5f50866..8897afb 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,8 +7,10 @@ dev: - add 'failed' dimension for root to slot lookup metrics - add 'deadline' builder bid strategy - add builder configurations to allow more control over selection of bids - - add sync committee verification metrics to highlight when we were and were not included in a SyncAggregate. - - add config setting to enable the above metrics. + - add sync committee verification metrics to highlight when we were and were not included in a SyncAggregate + - add config setting to enable the above metrics + - alter logic for determining sync committee eligible accounts + - enable first strategies to be defined for beaconblockheader and signedbeaconblock 1.9.0-alpha.3 - add proposal value and blinded status to trace diff --git a/docs/configuration.md b/docs/configuration.md index e52af6a..132b2eb 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -22,7 +22,7 @@ log-level: 'debug' # Overridden by beacon-node-addresses if present. beacon-node-address: 'localhost:4000' -# beacon-node-addresseses is the list of address of the beacon nodes. Can be lighthouse, nimbus, prysm or teku. +# beacon-node-addresses is the list of address of the beacon nodes. Can be lighthouse, nimbus, prysm or teku. # If multiple addresses are supplied here it makes Vouch resilient in the situation where a beacon # node goes offline entirely. If this occurs to the currently used node then the next in the list will # be used. If a beacon node comes back online it is added to the end of the list of potential nodes to @@ -32,7 +32,7 @@ beacon-node-address: 'localhost:4000' # ensure they are happy with the event output of all beacon nodes in this list. beacon-node-addresses: ['localhost:4000', 'localhost:5051', 'localhost:5052'] -# timeout is the timeout for all validating operations, for example fetching attesation data from beacon nodes. +# timeout is the timeout for all validating operations, for example fetching attestation data from beacon nodes. timeout: '2s' # reduced-memory-usage can be set on memory-constrained systems to reduce memory usage, at the cost of increased processing time. @@ -155,6 +155,16 @@ strategies: # This allows Vouch to remain responsive in the situation where some beacon nodes are significantly slower than others, for # example if one is remote. timeout: '2s' + # The beaconblockheader strategy obtains the beacon block headers from multiple beacon nodes. + beaconblockheader: + # style can be 'first'. If not defined or set to another value Vouch will default to using the multiclient. + style: 'first' + first: + # beacon-node-addresses are the addresses from which to receive beacon block headers. + beacon-node-addresses: ['localhost:4000', 'localhost:5051', 'localhost:5052'] + # timeout defines the maximum amount of time the strategy will wait for a response. Different strategies may return earlier + # if they have obtained enough information from their beacon node(s). + timeout: '2s' # The beaconblockroot strategy obtains the beacon block root from multiple beacon nodes. beaconblockroot: # style can be 'first', which uses the first returned, 'latest', which uses the latest returned, or 'majority', which uses @@ -177,6 +187,16 @@ strategies: deadline: '1s' # bid-gap is the gap between receiving a response from a relay and querying it again. bid-gap: '100ms' + # The signedbeaconblock strategy obtains the signed beacon blocks from multiple beacon nodes. + signedbeaconblock: + # style can be 'first'. If not defined or set to another value Vouch will default to using the multiclient. + style: 'first' + first: + # beacon-node-addresses are the addresses from which to receive signed beacon blocks. + beacon-node-addresses: ['localhost:4000', 'localhost:5051', 'localhost:5052'] + # timeout defines the maximum amount of time the strategy will wait for a response. Different strategies may return earlier + # if they have obtained enough information from their beacon node(s). + timeout: '2s' # The synccommitteecontribution strategy obtains sync committee contributions from multiple sources. synccommitteecontribution: # style can be 'best', which obtains contributions from all nodes and selects the best, or 'first', which uses the first returned diff --git a/main.go b/main.go index d59599b..c9fe09f 100644 --- a/main.go +++ b/main.go @@ -82,6 +82,7 @@ import ( bestattestationdatastrategy "github.com/attestantio/vouch/strategies/attestationdata/best" firstattestationdatastrategy "github.com/attestantio/vouch/strategies/attestationdata/first" majorityattestationdatastrategy "github.com/attestantio/vouch/strategies/attestationdata/majority" + firstbeaconblockheaderstrategy "github.com/attestantio/vouch/strategies/beaconblockheader/first" bestbeaconblockproposalstrategy "github.com/attestantio/vouch/strategies/beaconblockproposal/best" firstbeaconblockproposalstrategy "github.com/attestantio/vouch/strategies/beaconblockproposal/first" firstbeaconblockrootstrategy "github.com/attestantio/vouch/strategies/beaconblockroot/first" @@ -89,6 +90,7 @@ import ( "github.com/attestantio/vouch/strategies/builderbid" bestbuilderbidstrategy "github.com/attestantio/vouch/strategies/builderbid/best" deadlinebuilderbidstrategy "github.com/attestantio/vouch/strategies/builderbid/deadline" + firstsignedbeaconblockstrategy "github.com/attestantio/vouch/strategies/signedbeaconblock/first" bestsynccommitteecontributionstrategy "github.com/attestantio/vouch/strategies/synccommitteecontribution/best" firstsynccommitteecontributionstrategy "github.com/attestantio/vouch/strategies/synccommitteecontribution/first" "github.com/attestantio/vouch/util" @@ -108,7 +110,7 @@ import ( ) // ReleaseVersion is the release version for the code. -var ReleaseVersion = "1.9.0-alpha.9-dev" +var ReleaseVersion = "1.9.0-alpha.8-dev" func main() { exitCode := main2() @@ -316,28 +318,9 @@ func startServices(ctx context.Context, } // Some beacon nodes do not respond pre-genesis, so we must wait for genesis before proceeding. - genesisTime := chainTime.GenesisTime() - now := time.Now() - waitedForGenesis := false - if now.Before(genesisTime) { - waitedForGenesis = true - // Wait for genesis (or signal, or context cancel). - sigCh := make(chan os.Signal, 1) - signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM, os.Interrupt) - log.Info().Str("genesis", fmt.Sprintf("%v", genesisTime)).Msg("Waiting for genesis") - ctx, cancel := context.WithDeadline(ctx, genesisTime) - defer cancel() - select { - case <-sigCh: - return nil, nil, errors.New("signal received") - case <-ctx.Done(): - switch ctx.Err() { - case context.DeadlineExceeded: - log.Info().Msg("Genesis time") - case context.Canceled: - return nil, nil, errors.New("context cancelled") - } - } + waitedForGenesis, err := waitForGenesis(ctx, chainTime) + if err != nil { + return nil, nil, errors.Wrap(err, "failed to wait for genesis block") } altairCapable, bellatrixCapable, _, err := consensusClientCapabilities(ctx, eth2Client) @@ -345,7 +328,12 @@ func startServices(ctx context.Context, return nil, nil, err } - scheduler, cacheSvc, signerSvc, accountManager, err := startSharedServices(ctx, eth2Client, majordomo, chainTime, monitor) + signedBeaconBlockProvider, beaconBlockHeaderProvider, err := startProviderServices(ctx, monitor, eth2Client) + if err != nil { + return nil, nil, err + } + + schedulerSvc, cacheSvc, signerSvc, accountManager, err := startSharedServices(ctx, eth2Client, majordomo, chainTime, monitor, beaconBlockHeaderProvider, signedBeaconBlockProvider) if err != nil { return nil, nil, err } @@ -355,12 +343,12 @@ func startServices(ctx context.Context, return nil, nil, errors.Wrap(err, "failed to select submitter") } - blockRelay, err := startBlockRelay(ctx, majordomo, monitor, eth2Client, scheduler, chainTime, accountManager, signerSvc) + blockRelay, err := startBlockRelay(ctx, majordomo, monitor, eth2Client, schedulerSvc, chainTime, accountManager, signerSvc) if err != nil { return nil, nil, err } - beaconBlockProposer, attester, attestationAggregator, beaconCommitteeSubscriber, err := startSigningServices(ctx, majordomo, monitor, eth2Client, chainTime, cacheSvc, signerSvc, blockRelay, accountManager, submitter) + beaconBlockProposer, attesterSvc, attestationAggregator, beaconCommitteeSubscriber, err := startSigningServices(ctx, majordomo, monitor, eth2Client, chainTime, cacheSvc, signerSvc, blockRelay, accountManager, submitter) if err != nil { return nil, nil, err } @@ -376,36 +364,32 @@ func startServices(ctx context.Context, } // We need to submit proposal preparations to all nodes that are acting as beacon block proposers. - nodeAddresses := util.BeaconNodeAddressesForProposing() - proposalPreparationsSubmitters := make([]eth2client.ProposalPreparationsSubmitter, 0, len(nodeAddresses)) - for _, address := range nodeAddresses { - client, err := fetchClient(ctx, monitor, address) - if err != nil { - return nil, nil, errors.Wrap(err, fmt.Sprintf("failed to fetch client %s for proposal preparation submitter", address)) - } - proposalPreparationsSubmitters = append(proposalPreparationsSubmitters, client.(eth2client.ProposalPreparationsSubmitter)) + proposalPreparer, err := initProposalPreparer(ctx, monitor, chainTime, bellatrixCapable, accountManager, blockRelay) + if err != nil { + return nil, nil, err } - var proposalPreparer proposalpreparer.Service - if bellatrixCapable { - log.Trace().Msg("Starting proposals preparer") - proposalPreparer, err = standardproposalpreparer.New(ctx, - standardproposalpreparer.WithLogLevel(util.LogLevel("proposalspreparor")), - standardproposalpreparer.WithMonitor(monitor), - standardproposalpreparer.WithChainTimeService(chainTime), - standardproposalpreparer.WithValidatingAccountsProvider(accountManager.(accountmanager.ValidatingAccountsProvider)), - standardproposalpreparer.WithProposalPreparationsSubmitters(proposalPreparationsSubmitters), - standardproposalpreparer.WithExecutionConfigProvider(blockRelay.(blockrelay.ExecutionConfigProvider)), - ) - if err != nil { - return nil, nil, errors.Wrap(err, "failed to start proposal preparer service") - } + controller, err := initController(ctx, monitor, chainTime, eth2Client, + schedulerSvc, attesterSvc, cacheSvc, + waitedForGenesis, accountManager, beaconBlockProposer, signedBeaconBlockProvider, proposalPreparer, attestationAggregator, beaconCommitteeSubscriber, + syncCommitteeMessenger, syncCommitteeAggregator, syncCommitteeSubscriber) + if err != nil { + return nil, nil, err } + return chainTime, controller, nil +} + +func initController(ctx context.Context, monitor metrics.Service, chainTime chaintime.Service, eth2Client eth2client.Service, + schedulerSvc scheduler.Service, attesterSvc attester.Service, cacheSvc cache.Service, + waitedForGenesis bool, accountManager accountmanager.Service, beaconBlockProposer beaconblockproposer.Service, signedBeaconBlockProvider eth2client.SignedBeaconBlockProvider, + proposalPreparer proposalpreparer.Service, attestationAggregator attestationaggregator.Service, beaconCommitteeSubscriber beaconcommitteesubscriber.Service, + syncCommitteeMessenger synccommitteemessenger.Service, syncCommitteeAggregator synccommitteeaggregator.Service, syncCommitteeSubscriber synccommitteesubscriber.Service, +) (*standardcontroller.Service, error) { // The events provider for the controller should only use beacon nodes that are used for attestation data. eventsConsensusClient, err := fetchMultiClient(ctx, monitor, "events", util.BeaconNodeAddressesForAttesting()) if err != nil { - return nil, nil, errors.Wrap(err, "failed to fetch multiclient for controller") + return nil, errors.Wrap(err, "failed to fetch multiclient for controller") } log.Trace().Msg("Starting controller") @@ -419,14 +403,14 @@ func startServices(ctx context.Context, standardcontroller.WithAttesterDutiesProvider(eth2Client.(eth2client.AttesterDutiesProvider)), standardcontroller.WithSyncCommitteeDutiesProvider(eth2Client.(eth2client.SyncCommitteeDutiesProvider)), standardcontroller.WithEventsProvider(eventsConsensusClient.(eth2client.EventsProvider)), - standardcontroller.WithScheduler(scheduler), + standardcontroller.WithScheduler(schedulerSvc), standardcontroller.WithValidatingAccountsProvider(accountManager.(accountmanager.ValidatingAccountsProvider)), - standardcontroller.WithAttester(attester), + standardcontroller.WithAttester(attesterSvc), standardcontroller.WithSyncCommitteeMessenger(syncCommitteeMessenger), standardcontroller.WithSyncCommitteeAggregator(syncCommitteeAggregator), standardcontroller.WithBeaconBlockProposer(beaconBlockProposer), standardcontroller.WithBeaconBlockHeadersProvider(eth2Client.(eth2client.BeaconBlockHeadersProvider)), - standardcontroller.WithSignedBeaconBlockProvider(eth2Client.(eth2client.SignedBeaconBlockProvider)), + standardcontroller.WithSignedBeaconBlockProvider(signedBeaconBlockProvider), standardcontroller.WithProposalsPreparer(proposalPreparer), standardcontroller.WithAttestationAggregator(attestationAggregator), standardcontroller.WithBeaconCommitteeSubscriber(beaconCommitteeSubscriber), @@ -444,10 +428,82 @@ func startServices(ctx context.Context, standardcontroller.WithFastTrackGrace(viper.GetDuration("controller.fast-track.grace")), ) if err != nil { - return nil, nil, errors.Wrap(err, "failed to start controller service") + return nil, errors.Wrap(err, "failed to start controller service") } + return controller, nil +} - return chainTime, controller, nil +func initProposalPreparer(ctx context.Context, monitor metrics.Service, chainTime chaintime.Service, bellatrixCapable bool, accountManager accountmanager.Service, blockRelay blockrelay.Service) (proposalpreparer.Service, error) { + // We need to submit proposal preparations to all nodes that are acting as beacon block proposers. + nodeAddresses := util.BeaconNodeAddressesForProposing() + proposalPreparationsSubmitters := make([]eth2client.ProposalPreparationsSubmitter, 0, len(nodeAddresses)) + for _, address := range nodeAddresses { + client, err := fetchClient(ctx, monitor, address) + if err != nil { + return nil, errors.Wrap(err, fmt.Sprintf("failed to fetch client %s for proposal preparation submitter", address)) + } + proposalPreparationsSubmitters = append(proposalPreparationsSubmitters, client.(eth2client.ProposalPreparationsSubmitter)) + } + + if bellatrixCapable { + log.Trace().Msg("Starting proposals preparer") + proposalPreparer, err := standardproposalpreparer.New(ctx, + standardproposalpreparer.WithLogLevel(util.LogLevel("proposalspreparor")), + standardproposalpreparer.WithMonitor(monitor), + standardproposalpreparer.WithChainTimeService(chainTime), + standardproposalpreparer.WithValidatingAccountsProvider(accountManager.(accountmanager.ValidatingAccountsProvider)), + standardproposalpreparer.WithProposalPreparationsSubmitters(proposalPreparationsSubmitters), + standardproposalpreparer.WithExecutionConfigProvider(blockRelay.(blockrelay.ExecutionConfigProvider)), + ) + if err != nil { + return nil, errors.Wrap(err, "failed to start proposal preparer service") + } + return proposalPreparer, nil + } + var proposalPreparer proposalpreparer.Service + return proposalPreparer, nil +} + +func waitForGenesis(ctx context.Context, chainTime chaintime.Service) (bool, error) { + genesisTime := chainTime.GenesisTime() + now := time.Now() + waitedForGenesis := false + if now.Before(genesisTime) { + waitedForGenesis = true + // Wait for genesis (or signal, or context cancel). + sigCh := make(chan os.Signal, 1) + signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM, os.Interrupt) + log.Info().Str("genesis", fmt.Sprintf("%v", genesisTime)).Msg("Waiting for genesis") + ctx, cancel := context.WithDeadline(ctx, genesisTime) + defer cancel() + select { + case <-sigCh: + return false, errors.New("signal received") + case <-ctx.Done(): + switch ctx.Err() { + case context.DeadlineExceeded: + log.Info().Msg("Genesis time") + case context.Canceled: + return false, errors.New("context cancelled") + } + } + } + return waitedForGenesis, nil +} + +func startProviderServices(ctx context.Context, monitor metrics.Service, eth2Client eth2client.Service) (eth2client.SignedBeaconBlockProvider, eth2client.BeaconBlockHeadersProvider, error) { + // The signed beacon block provider from the configured strategy to define how we get signed beacon blocks. + signedBeaconBlockProvider, err := selectSignedBeaconBlockProvider(ctx, monitor, eth2Client) + if err != nil { + return nil, nil, errors.Wrap(err, "failed to fetch signed beacon block provider for controller") + } + + // The block header provider from the configured strategy to define how we get block headers. + beaconBlockHeaderProvider, err := selectBeaconHeaderProvider(ctx, monitor, eth2Client) + if err != nil { + return nil, nil, errors.Wrap(err, "failed to fetch beacon block header provider for controller") + } + return signedBeaconBlockProvider, beaconBlockHeaderProvider, nil } func startBasicServices(ctx context.Context, @@ -498,6 +554,8 @@ func startSharedServices(ctx context.Context, majordomo majordomo.Service, chainTime chaintime.Service, monitor metrics.Service, + beaconBlockHeaderProvider eth2client.BeaconBlockHeadersProvider, + signedBeaconBlockProvider eth2client.SignedBeaconBlockProvider, ) ( scheduler.Service, cache.Service, @@ -512,7 +570,7 @@ func startSharedServices(ctx context.Context, } log.Trace().Msg("Starting cache") - cacheSvc, err := startCache(ctx, monitor, chainTime, scheduler, eth2Client) + cacheSvc, err := startCache(ctx, monitor, chainTime, scheduler, eth2Client, beaconBlockHeaderProvider, signedBeaconBlockProvider) if err != nil { return nil, nil, nil, nil, errors.Wrap(err, "failed to start cache") } @@ -905,6 +963,8 @@ func startCache(ctx context.Context, chainTime chaintime.Service, scheduler scheduler.Service, consensusClient eth2client.Service, + beaconBlockHeaderProvider eth2client.BeaconBlockHeadersProvider, + signedBeaconBlockProvider eth2client.SignedBeaconBlockProvider, ) (cache.Service, error) { log.Trace().Msg("Starting cache") cache, err := standardcache.New(ctx, @@ -913,8 +973,8 @@ func startCache(ctx context.Context, standardcache.WithScheduler(scheduler), standardcache.WithChainTime(chainTime), standardcache.WithEventsProvider(consensusClient.(eth2client.EventsProvider)), - standardcache.WithSignedBeaconBlockProvider(consensusClient.(eth2client.SignedBeaconBlockProvider)), - standardcache.WithBeaconBlockHeadersProvider(consensusClient.(eth2client.BeaconBlockHeadersProvider)), + standardcache.WithSignedBeaconBlockProvider(signedBeaconBlockProvider), + standardcache.WithBeaconBlockHeadersProvider(beaconBlockHeaderProvider), ) if err != nil { return nil, err @@ -1407,85 +1467,85 @@ func selectSubmitterStrategy(ctx context.Context, monitor metrics.Service, eth2C return submitter, nil } +func genericAddressToClientMapper[T any](ctx context.Context, monitor metrics.Service, path, description string) (map[string]T, error) { + addressToClientMap := make(map[string]T) + for _, address := range util.BeaconNodeAddresses(path) { + client, err := fetchClient(ctx, monitor, address) + if err != nil { + return nil, errors.Wrap(err, fmt.Sprintf("failed to fetch client %s for %s", address, description)) + } + clientCast, ok := client.(T) + if !ok { + return nil, fmt.Errorf("failed to cast client %s for %s", address, description) + } + addressToClientMap[address] = clientCast + } + return addressToClientMap, nil +} + func startMultinodeSubmitter(ctx context.Context, monitor metrics.Service, ) ( submitter.Service, error, ) { - aggregateAttestationSubmitters := make(map[string]eth2client.AggregateAttestationsSubmitter) - for _, address := range util.BeaconNodeAddresses("submitter.aggregateattestation.multinode") { - client, err := fetchClient(ctx, monitor, address) - if err != nil { - return nil, errors.Wrap(err, fmt.Sprintf("failed to fetch client %s for aggregate attestation submitter strategy", address)) - } - aggregateAttestationSubmitters[address] = client.(eth2client.AggregateAttestationsSubmitter) + aggregateAttestationSubmitters, err := genericAddressToClientMapper[eth2client.AggregateAttestationsSubmitter](ctx, monitor, + "submitter.aggregateattestation.multinode", + "aggregate attestation submitter strategy") + if err != nil { + return nil, err } - attestationsSubmitters := make(map[string]eth2client.AttestationsSubmitter) - for _, address := range util.BeaconNodeAddresses("submitter.attestation.multinode") { - client, err := fetchClient(ctx, monitor, address) - if err != nil { - return nil, errors.Wrap(err, fmt.Sprintf("failed to fetch client %s for attestation submitter strategy", address)) - } - attestationsSubmitters[address] = client.(eth2client.AttestationsSubmitter) + attestationsSubmitters, err := genericAddressToClientMapper[eth2client.AttestationsSubmitter](ctx, monitor, + "submitter.attestation.multinode", + "attestation submitter strategy") + if err != nil { + return nil, err } - proposalSubmitters := make(map[string]eth2client.ProposalSubmitter) - for _, address := range util.BeaconNodeAddresses("submitter.proposal.multinode") { - client, err := fetchClient(ctx, monitor, address) - if err != nil { - return nil, errors.Wrap(err, fmt.Sprintf("failed to fetch client %s for proposal submitter strategy", address)) - } - proposalSubmitters[address] = client.(eth2client.ProposalSubmitter) + proposalSubmitters, err := genericAddressToClientMapper[eth2client.ProposalSubmitter](ctx, monitor, + "submitter.proposal.multinode", + "proposal submitter strategy") + if err != nil { + return nil, err } - beaconCommitteeSubscriptionsSubmitters := make(map[string]eth2client.BeaconCommitteeSubscriptionsSubmitter) - for _, address := range util.BeaconNodeAddresses("submitter.beaconcommitteesubscription.multinode") { - client, err := fetchClient(ctx, monitor, address) - if err != nil { - return nil, errors.Wrap(err, fmt.Sprintf("failed to fetch client %s for beacon committee subscription submitter strategy", address)) - } - beaconCommitteeSubscriptionsSubmitters[address] = client.(eth2client.BeaconCommitteeSubscriptionsSubmitter) + beaconCommitteeSubscriptionsSubmitters, err := genericAddressToClientMapper[eth2client.BeaconCommitteeSubscriptionsSubmitter](ctx, monitor, + "submitter.beaconcommitteesubscription.multinode", + "beacon committee subscription submitter strategy") + if err != nil { + return nil, err } - proposalPreparationSubmitters := make(map[string]eth2client.ProposalPreparationsSubmitter) - for _, address := range util.BeaconNodeAddresses("submitter.proposalpreparation.multinode") { - client, err := fetchClient(ctx, monitor, address) - if err != nil { - return nil, errors.Wrap(err, fmt.Sprintf("failed to fetch client %s for proposal preparation submitter strategy", address)) - } - proposalPreparationSubmitters[address] = client.(eth2client.ProposalPreparationsSubmitter) + proposalPreparationSubmitters, err := genericAddressToClientMapper[eth2client.ProposalPreparationsSubmitter](ctx, monitor, + "submitter.proposalpreparation.multinode", + "proposal preparation submitter strategy") + if err != nil { + return nil, err } - syncCommitteeContributionsSubmitters := make(map[string]eth2client.SyncCommitteeContributionsSubmitter) - for _, address := range util.BeaconNodeAddresses("submitter.synccommitteecontribution.multinode") { - client, err := fetchClient(ctx, monitor, address) - if err != nil { - return nil, errors.Wrap(err, fmt.Sprintf("failed to fetch client %s for sync committee contribution submitter strategy", address)) - } - syncCommitteeContributionsSubmitters[address] = client.(eth2client.SyncCommitteeContributionsSubmitter) + syncCommitteeContributionsSubmitters, err := genericAddressToClientMapper[eth2client.SyncCommitteeContributionsSubmitter](ctx, monitor, + "submitter.synccommitteecontribution.multinode", + "sync committee contribution submitter strategy") + if err != nil { + return nil, err } - syncCommitteeMessagesSubmitters := make(map[string]eth2client.SyncCommitteeMessagesSubmitter) - for _, address := range util.BeaconNodeAddresses("submitter.synccommitteemessage.multinode") { - client, err := fetchClient(ctx, monitor, address) - if err != nil { - return nil, errors.Wrap(err, fmt.Sprintf("failed to fetch client %s for sync committee message submitter strategy", address)) - } - syncCommitteeMessagesSubmitters[address] = client.(eth2client.SyncCommitteeMessagesSubmitter) + syncCommitteeMessagesSubmitters, err := genericAddressToClientMapper[eth2client.SyncCommitteeMessagesSubmitter](ctx, monitor, + "submitter.synccommitteemessage.multinode", + "sync committee message submitter strategy") + if err != nil { + return nil, err } - syncCommitteeSubscriptionsSubmitters := make(map[string]eth2client.SyncCommitteeSubscriptionsSubmitter) - for _, address := range util.BeaconNodeAddresses("submitter.synccommitteesubscription.multinode") { - client, err := fetchClient(ctx, monitor, address) - if err != nil { - return nil, errors.Wrap(err, fmt.Sprintf("failed to fetch client %s for sync committee subscription submitter strategy", address)) - } - syncCommitteeSubscriptionsSubmitters[address] = client.(eth2client.SyncCommitteeSubscriptionsSubmitter) + syncCommitteeSubscriptionsSubmitters, err := genericAddressToClientMapper[eth2client.SyncCommitteeSubscriptionsSubmitter](ctx, monitor, + "submitter.synccommitteesubscription.multinode", + "sync committee subscription submitter strategy") + if err != nil { + return nil, err } - submitter, err := multinodesubmitter.New(ctx, + submitterService, err := multinodesubmitter.New(ctx, multinodesubmitter.WithClientMonitor(monitor.(metrics.ClientMonitor)), multinodesubmitter.WithProcessConcurrency(util.ProcessConcurrency("submitter.multinode")), multinodesubmitter.WithLogLevel(util.LogLevel("submitter.multinode")), @@ -1503,7 +1563,7 @@ func startMultinodeSubmitter(ctx context.Context, return nil, err } - return submitter, nil + return submitterService, nil } // runCommands potentially runs commands. @@ -1794,3 +1854,93 @@ func obtainBuilderConfigsForPrivilegedBuilders(_ context.Context, return nil } + +// select the signed beacon block provider based on user input. +func selectSignedBeaconBlockProvider(ctx context.Context, + monitor metrics.Service, + eth2Client eth2client.Service, +) ( + eth2client.SignedBeaconBlockProvider, + error, +) { + log.Trace().Msg("Selecting signed beacon block strategy") + + var provider eth2client.SignedBeaconBlockProvider + var err error + + style := "strategies.signedbeaconblock.style" + switch viper.GetString(style) { + case "first", "": + log.Info().Msg("Starting first signed beacon block strategy") + signedBeaconBlockProviders := make(map[string]eth2client.SignedBeaconBlockProvider) + path := "strategies.signedbeaconblock.first" + for _, address := range util.BeaconNodeAddresses(path) { + client, err := fetchClient(ctx, monitor, address) + if err != nil { + return nil, errors.Wrap(err, fmt.Sprintf("failed to fetch client %s for signed beacon block strategy", address)) + } + signedBeaconBlockProviders[address] = client.(eth2client.SignedBeaconBlockProvider) + } + + provider, err = firstsignedbeaconblockstrategy.New(ctx, + firstsignedbeaconblockstrategy.WithTimeout(util.Timeout(path)), + firstsignedbeaconblockstrategy.WithClientMonitor(monitor.(metrics.ClientMonitor)), + firstsignedbeaconblockstrategy.WithLogLevel(util.LogLevel(path)), + firstsignedbeaconblockstrategy.WithSignedBeaconBlockProviders(signedBeaconBlockProviders), + ) + default: + log.Info().Msg("Starting simple signed block strategy") + provider = eth2Client.(eth2client.SignedBeaconBlockProvider) + } + + if err != nil { + return nil, errors.Wrap(err, "failed to instantiate signed beacon block strategy") + } + + return provider, nil +} + +// select the beacon header provider based on user input. +func selectBeaconHeaderProvider(ctx context.Context, + monitor metrics.Service, + eth2Client eth2client.Service, +) ( + eth2client.BeaconBlockHeadersProvider, + error, +) { + log.Trace().Msg("Selecting beacon header strategy") + + var provider eth2client.BeaconBlockHeadersProvider + var err error + + style := "strategies.beaconblockheader.style" + switch viper.GetString(style) { + case "first", "": + log.Info().Msg("Starting first beach block header strategy") + beaconBlockHeaderProviders := make(map[string]eth2client.BeaconBlockHeadersProvider) + path := "strategies.beaconblockheader.first" + for _, address := range util.BeaconNodeAddresses(path) { + client, err := fetchClient(ctx, monitor, address) + if err != nil { + return nil, errors.Wrap(err, fmt.Sprintf("failed to fetch client %s for beacon block header strategy", address)) + } + beaconBlockHeaderProviders[address] = client.(eth2client.BeaconBlockHeadersProvider) + } + + provider, err = firstbeaconblockheaderstrategy.New(ctx, + firstbeaconblockheaderstrategy.WithTimeout(util.Timeout(path)), + firstbeaconblockheaderstrategy.WithClientMonitor(monitor.(metrics.ClientMonitor)), + firstbeaconblockheaderstrategy.WithLogLevel(util.LogLevel(path)), + firstbeaconblockheaderstrategy.WithBeaconBlockHeadersProviders(beaconBlockHeaderProviders), + ) + default: + log.Info().Msg("Starting simple beacon block header strategy") + provider = eth2Client.(eth2client.BeaconBlockHeadersProvider) + } + + if err != nil { + return nil, errors.Wrap(err, "failed to instantiate beacon header strategy") + } + + return provider, nil +} diff --git a/services/accountmanager/dirk/service.go b/services/accountmanager/dirk/service.go index 0f8021e..b0e7dab 100644 --- a/services/accountmanager/dirk/service.go +++ b/services/accountmanager/dirk/service.go @@ -27,6 +27,7 @@ import ( eth2client "github.com/attestantio/go-eth2-client" api "github.com/attestantio/go-eth2-client/api/v1" "github.com/attestantio/go-eth2-client/spec/phase0" + "github.com/attestantio/vouch/services/accountmanager/utils" "github.com/attestantio/vouch/services/chaintime" "github.com/attestantio/vouch/services/metrics" "github.com/attestantio/vouch/services/validatorsmanager" @@ -288,7 +289,22 @@ func credentialsFromCerts(ctx context.Context, clientCert []byte, clientKey []by // ValidatingAccountsForEpoch obtains the validating accounts for a given epoch. func (s *Service) ValidatingAccountsForEpoch(ctx context.Context, epoch phase0.Epoch) (map[phase0.ValidatorIndex]e2wtypes.Account, error) { - ctx, span := otel.Tracer("attestantio.vouch.services.accountmanager.dirk").Start(ctx, "ValidatingAccountsForEpoch", trace.WithAttributes( + filterFunc := func(state api.ValidatorState) bool { + return state == api.ValidatorStateActiveOngoing || state == api.ValidatorStateActiveExiting + } + return s.accountsForEpochWithFilter(ctx, epoch, "Validating", filterFunc) +} + +// SyncCommitteeAccountsForEpoch obtains the accounts eligible for Sync Committee duty for a given epoch. +// The Ethereum specification has different criteria for Sync Committee eligibility compared to other validating duties. +// This includes an edge case where we are still in scope for sync committee duty between exited and withdrawal states. +func (s *Service) SyncCommitteeAccountsForEpoch(ctx context.Context, epoch phase0.Epoch) (map[phase0.ValidatorIndex]e2wtypes.Account, error) { + return s.accountsForEpochWithFilter(ctx, epoch, "SyncCommittee", utils.IsSyncCommitteeEligible) +} + +// accountsForEpochWithFilter obtains the accounts for a given epoch with a filter on the state of validators returned. +func (s *Service) accountsForEpochWithFilter(ctx context.Context, epoch phase0.Epoch, accountType string, filterFunc func(state api.ValidatorState) bool) (map[phase0.ValidatorIndex]e2wtypes.Account, error) { + ctx, span := otel.Tracer("attestantio.vouch.services.accountmanager.dirk").Start(ctx, fmt.Sprintf("%sAccountsForEpoch", accountType), trace.WithAttributes( attribute.Int64("epoch", int64(epoch)), )) defer span.End() @@ -317,20 +333,20 @@ func (s *Service) ValidatingAccountsForEpoch(ctx context.Context, epoch phase0.E for index, validator := range validators { state := api.ValidatorToState(validator, nil, epoch, s.farFutureEpoch) stateCount[state]++ - if state == api.ValidatorStateActiveOngoing || state == api.ValidatorStateActiveExiting { + if filterFunc(state) { account := s.accounts[validator.PublicKey] s.log.Trace(). Str("name", account.Name()). Str("public_key", fmt.Sprintf("%x", account.PublicKey().Marshal())). Uint64("index", uint64(index)). Str("state", state.String()). - Msg("Validating account") + Msg(fmt.Sprintf("%s account", accountType)) validatingAccounts[index] = account } else { s.log.Trace(). Stringer("pubkey", validator.PublicKey). Stringer("state", state). - Msg("Non-validating account") + Msg(fmt.Sprintf("Non-%s account", strings.ToLower(accountType))) } } s.mutex.RUnlock() @@ -348,7 +364,22 @@ func (s *Service) ValidatingAccountsForEpoch(ctx context.Context, epoch phase0.E // ValidatingAccountsForEpochByIndex obtains the specified validating accounts for a given epoch. func (s *Service) ValidatingAccountsForEpochByIndex(ctx context.Context, epoch phase0.Epoch, indices []phase0.ValidatorIndex) (map[phase0.ValidatorIndex]e2wtypes.Account, error) { - ctx, span := otel.Tracer("attestantio.vouch.services.accountmanager.dirk").Start(ctx, "ValidatingAccountsForEpochByIndex", trace.WithAttributes( + filterFunc := func(state api.ValidatorState) bool { + return state == api.ValidatorStateActiveOngoing || state == api.ValidatorStateActiveExiting + } + return s.accountsForEpochByIndexWithFilter(ctx, epoch, indices, "Validating", filterFunc) +} + +// SyncCommitteeAccountsForEpochByIndex obtains the specified Sync Committee eligible accounts for a given epoch. +// The Ethereum specification has different criteria for Sync Committee eligibility compared to other validating duties. +// This includes an edge case where we are still in scope for sync committee duty between exited and withdrawal states. +func (s *Service) SyncCommitteeAccountsForEpochByIndex(ctx context.Context, epoch phase0.Epoch, indices []phase0.ValidatorIndex) (map[phase0.ValidatorIndex]e2wtypes.Account, error) { + return s.accountsForEpochByIndexWithFilter(ctx, epoch, indices, "SyncCommittee", utils.IsSyncCommitteeEligible) +} + +// accountsForEpochByIndexWithFilter obtains the specified accounts for a given epoch. +func (s *Service) accountsForEpochByIndexWithFilter(ctx context.Context, epoch phase0.Epoch, indices []phase0.ValidatorIndex, accountType string, filterFunc func(state api.ValidatorState) bool) (map[phase0.ValidatorIndex]e2wtypes.Account, error) { + ctx, span := otel.Tracer("attestantio.vouch.services.accountmanager.dirk").Start(ctx, fmt.Sprintf("%sAccountsForEpochByIndex", accountType), trace.WithAttributes( attribute.Int64("epoch", int64(epoch)), )) defer span.End() @@ -368,7 +399,7 @@ func (s *Service) ValidatingAccountsForEpochByIndex(ctx context.Context, epoch p continue } state := api.ValidatorToState(validator, nil, epoch, s.farFutureEpoch) - if state == api.ValidatorStateActiveOngoing || state == api.ValidatorStateActiveExiting { + if filterFunc(state) { s.mutex.RLock() validatingAccounts[index] = s.accounts[validator.PublicKey] s.mutex.RUnlock() diff --git a/services/accountmanager/mock/service.go b/services/accountmanager/mock/service.go index 4d5322e..5078b56 100644 --- a/services/accountmanager/mock/service.go +++ b/services/accountmanager/mock/service.go @@ -62,6 +62,29 @@ func (s *ValidatingAccountsProvider) ValidatingAccountsForEpochByIndex(_ context return accounts, nil } +// SyncCommitteeAccountsForEpoch is a mock. +func (s *ValidatingAccountsProvider) SyncCommitteeAccountsForEpoch(_ context.Context, _ phase0.Epoch) (map[phase0.ValidatorIndex]e2wtypes.Account, error) { + return s.validatingAccounts, nil +} + +// SyncCommitteeAccountsForEpochByIndex obtains the specified validating accounts for a given epoch. +func (s *ValidatingAccountsProvider) SyncCommitteeAccountsForEpochByIndex(_ context.Context, + _ phase0.Epoch, + indices []phase0.ValidatorIndex, +) ( + map[phase0.ValidatorIndex]e2wtypes.Account, + error, +) { + accounts := make(map[phase0.ValidatorIndex]e2wtypes.Account) + for _, index := range indices { + if account, exists := s.validatingAccounts[index]; exists { + accounts[index] = account + } + } + + return accounts, nil +} + type accountsProvider struct{} // NewAccountsProvider is a mock. @@ -106,3 +129,19 @@ func (*erroringValidatingAccountsProvider) ValidatingAccountsForEpochByIndex(_ c ) { return nil, errors.New("error") } + +// SyncCommitteeAccountsForEpoch is a mock. +func (*erroringValidatingAccountsProvider) SyncCommitteeAccountsForEpoch(_ context.Context, _ phase0.Epoch) (map[phase0.ValidatorIndex]e2wtypes.Account, error) { + return nil, errors.New("error") +} + +// SyncCommitteeAccountsForEpochByIndex obtains the specified validating accounts for a given epoch. +func (*erroringValidatingAccountsProvider) SyncCommitteeAccountsForEpochByIndex(_ context.Context, + _ phase0.Epoch, + _ []phase0.ValidatorIndex, +) ( + map[phase0.ValidatorIndex]e2wtypes.Account, + error, +) { + return nil, errors.New("error") +} diff --git a/services/accountmanager/service.go b/services/accountmanager/service.go index 3217e97..04a5787 100644 --- a/services/accountmanager/service.go +++ b/services/accountmanager/service.go @@ -37,6 +37,22 @@ type ValidatingAccountsProvider interface { map[phase0.ValidatorIndex]e2wtypes.Account, error, ) + + // SyncCommitteeAccountsForEpoch obtains the accounts eligible for Sync Committee duty for a given epoch. + // The Ethereum specification has different criteria for Sync Committee eligibility compared to other validating duties. + // This includes an edge case where we are still in scope for sync committee duty between exited and withdrawal states. + SyncCommitteeAccountsForEpoch(ctx context.Context, epoch phase0.Epoch) (map[phase0.ValidatorIndex]e2wtypes.Account, error) + + // SyncCommitteeAccountsForEpochByIndex obtains the specified Sync Committee eligible accounts for a given epoch. + // The Ethereum specification has different criteria for Sync Committee eligibility compared to other validating duties. + // This includes an edge case where we are still in scope for sync committee duty between exited and withdrawal states. + SyncCommitteeAccountsForEpochByIndex(ctx context.Context, + epoch phase0.Epoch, + indices []phase0.ValidatorIndex, + ) ( + map[phase0.ValidatorIndex]e2wtypes.Account, + error, + ) } // Refresher refreshes account information from the remote source. diff --git a/services/accountmanager/utils/utils.go b/services/accountmanager/utils/utils.go new file mode 100644 index 0000000..1338380 --- /dev/null +++ b/services/accountmanager/utils/utils.go @@ -0,0 +1,10 @@ +package utils + +import apiv1 "github.com/attestantio/go-eth2-client/api/v1" + +// IsSyncCommitteeEligible returns true if the validator is in a state that is eligible for Sync Committee duty. +func IsSyncCommitteeEligible(state apiv1.ValidatorState) bool { + return state == apiv1.ValidatorStateActiveOngoing || state == apiv1.ValidatorStateActiveExiting || + state == apiv1.ValidatorStateExitedUnslashed || state == apiv1.ValidatorStateActiveSlashed || + state == apiv1.ValidatorStateExitedSlashed || state == apiv1.ValidatorStateWithdrawalPossible +} diff --git a/services/accountmanager/wallet/service.go b/services/accountmanager/wallet/service.go index 0259225..6810a19 100644 --- a/services/accountmanager/wallet/service.go +++ b/services/accountmanager/wallet/service.go @@ -24,6 +24,7 @@ import ( "github.com/attestantio/go-eth2-client/api" apiv1 "github.com/attestantio/go-eth2-client/api/v1" "github.com/attestantio/go-eth2-client/spec/phase0" + "github.com/attestantio/vouch/services/accountmanager/utils" "github.com/attestantio/vouch/services/chaintime" "github.com/attestantio/vouch/services/metrics" "github.com/attestantio/vouch/services/validatorsmanager" @@ -204,7 +205,22 @@ func (s *Service) refreshValidators(ctx context.Context) error { // ValidatingAccountsForEpoch obtains the validating accounts for a given epoch. func (s *Service) ValidatingAccountsForEpoch(ctx context.Context, epoch phase0.Epoch) (map[phase0.ValidatorIndex]e2wtypes.Account, error) { - ctx, span := otel.Tracer("attestantio.vouch.services.accountmanager.wallet").Start(ctx, "ValidatingAccountsForEpoch", trace.WithAttributes( + filterFunc := func(state apiv1.ValidatorState) bool { + return state == apiv1.ValidatorStateActiveOngoing || state == apiv1.ValidatorStateActiveExiting + } + return s.accountsForEpochWithFilter(ctx, epoch, "Validating", filterFunc) +} + +// SyncCommitteeAccountsForEpoch obtains the accounts eligible for Sync Committee duty for a given epoch. +// The Ethereum specification has different criteria for Sync Committee eligibility compared to other validating duties. +// This includes an edge case where we are still in scope for sync committee duty between exited and withdrawal states. +func (s *Service) SyncCommitteeAccountsForEpoch(ctx context.Context, epoch phase0.Epoch) (map[phase0.ValidatorIndex]e2wtypes.Account, error) { + return s.accountsForEpochWithFilter(ctx, epoch, "SyncCommittee", utils.IsSyncCommitteeEligible) +} + +// accountsForEpochWithFilter obtains the accounts for a given epoch with a filter on the state of validators returned. +func (s *Service) accountsForEpochWithFilter(ctx context.Context, epoch phase0.Epoch, accountType string, filterFunc func(state apiv1.ValidatorState) bool) (map[phase0.ValidatorIndex]e2wtypes.Account, error) { + ctx, span := otel.Tracer("attestantio.vouch.services.accountmanager.wallet").Start(ctx, fmt.Sprintf("%sAccountsForEpoch", accountType), trace.WithAttributes( attribute.Int64("epoch", int64(epoch)), )) defer span.End() @@ -233,14 +249,14 @@ func (s *Service) ValidatingAccountsForEpoch(ctx context.Context, epoch phase0.E for index, validator := range validators { state := apiv1.ValidatorToState(validator, nil, epoch, s.farFutureEpoch) stateCount[state]++ - if state == apiv1.ValidatorStateActiveOngoing || state == apiv1.ValidatorStateActiveExiting { + if filterFunc(state) { account := s.accounts[validator.PublicKey] s.log.Trace(). Str("name", account.Name()). Str("public_key", fmt.Sprintf("%x", account.PublicKey().Marshal())). Uint64("index", uint64(index)). Str("state", state.String()). - Msg("Validating account") + Msg(fmt.Sprintf("%s account", accountType)) validatingAccounts[index] = account } } @@ -258,7 +274,22 @@ func (s *Service) ValidatingAccountsForEpoch(ctx context.Context, epoch phase0.E // ValidatingAccountsForEpochByIndex obtains the specified validating accounts for a given epoch. func (s *Service) ValidatingAccountsForEpochByIndex(ctx context.Context, epoch phase0.Epoch, indices []phase0.ValidatorIndex) (map[phase0.ValidatorIndex]e2wtypes.Account, error) { - ctx, span := otel.Tracer("attestantio.vouch.services.accountmanager.wallet").Start(ctx, "ValidatingAccountsForEpochByIndex", trace.WithAttributes( + filterFunc := func(state apiv1.ValidatorState) bool { + return state == apiv1.ValidatorStateActiveOngoing || state == apiv1.ValidatorStateActiveExiting + } + return s.accountsForEpochByIndexWithFilter(ctx, epoch, indices, "Validating", filterFunc) +} + +// SyncCommitteeAccountsForEpochByIndex obtains the specified Sync Committee eligible accounts for a given epoch. +// The Ethereum specification has different criteria for Sync Committee eligibility compared to other validating duties. +// This includes an edge case where we are still in scope for sync committee duty between exited and withdrawal states. +func (s *Service) SyncCommitteeAccountsForEpochByIndex(ctx context.Context, epoch phase0.Epoch, indices []phase0.ValidatorIndex) (map[phase0.ValidatorIndex]e2wtypes.Account, error) { + return s.accountsForEpochByIndexWithFilter(ctx, epoch, indices, "SyncCommittee", utils.IsSyncCommitteeEligible) +} + +// accountsForEpochByIndexWithFilter obtains the specified accounts for a given epoch with a filter on the state of validators returned. +func (s *Service) accountsForEpochByIndexWithFilter(ctx context.Context, epoch phase0.Epoch, indices []phase0.ValidatorIndex, accountType string, filterFunc func(state apiv1.ValidatorState) bool) (map[phase0.ValidatorIndex]e2wtypes.Account, error) { + ctx, span := otel.Tracer("attestantio.vouch.services.accountmanager.wallet").Start(ctx, fmt.Sprintf("%sAccountsForEpochByIndex", accountType), trace.WithAttributes( attribute.Int64("epoch", int64(epoch)), )) defer span.End() @@ -279,7 +310,7 @@ func (s *Service) ValidatingAccountsForEpochByIndex(ctx context.Context, epoch p continue } state := apiv1.ValidatorToState(validator, nil, epoch, s.farFutureEpoch) - if state == apiv1.ValidatorStateActiveOngoing || state == apiv1.ValidatorStateActiveExiting { + if filterFunc(state) { validatingAccounts[index] = s.accounts[validator.PublicKey] } } diff --git a/services/controller/standard/events.go b/services/controller/standard/events.go index 63f3f39..0e69c42 100644 --- a/services/controller/standard/events.go +++ b/services/controller/standard/events.go @@ -184,10 +184,10 @@ func (s *Service) fastTrackJobs(ctx context.Context, // handlePreviousDependentRootChanged handles the situation where the previous // dependent root changed. func (s *Service) handlePreviousDependentRootChanged(ctx context.Context) { - // Refreshes run in parallel. + // NOT running task in goroutine as there is only one task and this function is always called in a goroutine. // We need to refresh the attester duties for this epoch. - go s.refreshAttesterDutiesForEpoch(ctx, s.chainTimeService.CurrentEpoch()) + s.refreshAttesterDutiesForEpoch(ctx, s.chainTimeService.CurrentEpoch()) } // handleCurrentDependentRootChanged handles the situation where the current @@ -314,15 +314,15 @@ func (s *Service) refreshSyncCommitteeDutiesForEpochPeriod(ctx context.Context, } } - _, validatorIndices, err := s.accountsAndIndicesForEpoch(ctx, firstEpoch) + validatorIndices, err := s.syncCommitteeIndicesForEpoch(ctx, firstEpoch) if err != nil { - s.log.Error().Err(err).Uint64("epoch", uint64(firstEpoch)).Msg("Failed to obtain active validators for epoch") + s.log.Error().Err(err).Uint64("epoch", uint64(firstEpoch)).Msg("Failed to obtain sync committee eligible validators for epoch") return } // Expect at least one validator. if len(validatorIndices) == 0 { - s.log.Warn().Msg("No active validators; not validating") + s.log.Warn().Msg("No eligible sync committee validators for epoch; not scheduling sync committee messages") return } diff --git a/services/controller/standard/service.go b/services/controller/standard/service.go index 6e5a676..26fa7b8 100644 --- a/services/controller/standard/service.go +++ b/services/controller/standard/service.go @@ -122,47 +122,9 @@ func New(ctx context.Context, params ...Parameter) (*Service, error) { return nil, err } - // Handling altair if we have the service and spec to do so. - handlingAltair := parameters.syncCommitteeAggregator != nil && epochsPerSyncCommitteePeriod != 0 - // Fetch the altair fork epoch from the fork schedule. - var altairForkEpoch phase0.Epoch - if handlingAltair { - altairForkEpoch, err = fetchAltairForkEpoch(ctx, parameters.specProvider) - if err != nil { - // Not handling altair after all. - handlingAltair = false - } else { - log.Trace().Uint64("epoch", uint64(altairForkEpoch)).Msg("Obtained Altair fork epoch") - } - } - if !handlingAltair { - log.Debug().Msg("Not handling Altair") - } - - // Handling bellatrix if we can obtain its fork epoch. - handlingBellatrix := true - // Fetch the bellatrix fork epoch from the fork schedule. - var bellatrixForkEpoch phase0.Epoch - bellatrixForkEpoch, err = fetchBellatrixForkEpoch(ctx, parameters.specProvider) - if err != nil { - // Not handling bellatrix after all. - handlingBellatrix = false - bellatrixForkEpoch = 0xffffffffffffffff - } else { - log.Trace().Uint64("epoch", uint64(bellatrixForkEpoch)).Msg("Obtained Bellatrix fork epoch") - } - if !handlingBellatrix { - log.Debug().Msg("Not handling Bellatrix") - } - - // Fetch the Capella fork epoch from the fork schedule. - var capellaForkEpoch phase0.Epoch - capellaForkEpoch, err = fetchCapellaForkEpoch(ctx, parameters.specProvider) - if err != nil { - capellaForkEpoch = 0xffffffffffffffff - } else { - log.Trace().Uint64("epoch", uint64(capellaForkEpoch)).Msg("Obtained Capella fork epoch") - } + handlingAltair, altairForkEpoch := altairDetails(ctx, log, parameters.specProvider, parameters.syncCommitteeAggregator, epochsPerSyncCommitteePeriod) + handlingBellatrix, bellatrixForkEpoch := bellatrixDetails(ctx, log, parameters.specProvider) + capellaForkEpoch := capellaDetails(ctx, log, parameters.specProvider) s := &Service{ log: log, @@ -233,18 +195,28 @@ func New(ctx context.Context, params ...Parameter) (*Service, error) { log.Info().Int("old_validators", s.activeValidators).Int("new_validators", len(validatorIndices)).Msg("Change in number of active validators") s.activeValidators = len(validatorIndices) } + syncCommitteeValidatorIndices, err := s.syncCommitteeIndicesForEpoch(ctx, epoch) + if err != nil { + return nil, errors.Wrap(err, "failed to obtain sync committee eligible validator indices for the current epoch") + } + nextEpochAccounts, nextEpochValidatorIndices, err := s.accountsAndIndicesForEpoch(ctx, epoch+1) if err != nil { return nil, errors.Wrap(err, "failed to obtain active validator indices for the next epoch") } + syncCommitteeNextEpochValidatorIndices, err := s.syncCommitteeIndicesForEpoch(ctx, epoch+1) + if err != nil { + return nil, errors.Wrap(err, "failed to obtain sync committee eligible validator indices for the next epoch") + } + go s.scheduleProposals(ctx, epoch, validatorIndices, !s.waitedForGenesis) go s.scheduleAttestations(ctx, epoch, validatorIndices, !s.waitedForGenesis) if handlingAltair { thisSyncCommitteePeriodStartEpoch := s.firstEpochOfSyncPeriod(uint64(epoch) / s.epochsPerSyncCommitteePeriod) - go s.scheduleSyncCommitteeMessages(ctx, thisSyncCommitteePeriodStartEpoch, validatorIndices, true /* notCurrentSlot */) + go s.scheduleSyncCommitteeMessages(ctx, thisSyncCommitteePeriodStartEpoch, syncCommitteeValidatorIndices, true /* notCurrentSlot */) nextSyncCommitteePeriodStartEpoch := s.firstEpochOfSyncPeriod(uint64(epoch)/s.epochsPerSyncCommitteePeriod + 1) if uint64(nextSyncCommitteePeriodStartEpoch-epoch) <= syncCommitteePreparationEpochs { - go s.scheduleSyncCommitteeMessages(ctx, nextSyncCommitteePeriodStartEpoch, validatorIndices, true /* notCurrentSlot */) + go s.scheduleSyncCommitteeMessages(ctx, nextSyncCommitteePeriodStartEpoch, syncCommitteeNextEpochValidatorIndices, true /* notCurrentSlot */) } } go s.scheduleAttestations(ctx, epoch+1, nextEpochValidatorIndices, true /* notCurrentSlot */) @@ -364,7 +336,14 @@ func (s *Service) epochTicker(ctx context.Context, data interface{}) { // Update the _next_ period if we close to an EPOCHS_PER_SYNC_COMMITTEE_PERIOD boundary. if uint64(currentEpoch)%s.epochsPerSyncCommitteePeriod == s.epochsPerSyncCommitteePeriod-syncCommitteePreparationEpochs { - go s.scheduleSyncCommitteeMessages(ctx, currentEpoch+phase0.Epoch(syncCommitteePreparationEpochs), validatorIndices, false /* notCurrentSlot */) + syncCommitteeValidatorIndices, err := s.syncCommitteeIndicesForEpoch(ctx, currentEpoch) + // If we error getting accounts we log and carry on. + if err != nil { + s.log.Error().Err(err).Uint64("epoch", uint64(currentEpoch)).Msg("Failed to obtain sync committee eligible validators for epoch") + cancel() + } else { + go s.scheduleSyncCommitteeMessages(ctx, currentEpoch+phase0.Epoch(syncCommitteePreparationEpochs), syncCommitteeValidatorIndices, false /* notCurrentSlot */) + } } } @@ -441,6 +420,26 @@ func (s *Service) accountsAndIndicesForEpoch(ctx context.Context, return accounts, validatorIndices, nil } +// syncCommitteeIndicesForEpoch obtains the sync committee eligible validator indices for the specified epoch. +func (s *Service) syncCommitteeIndicesForEpoch(ctx context.Context, + epoch phase0.Epoch, +) ( + []phase0.ValidatorIndex, + error, +) { + accounts, err := s.validatingAccountsProvider.SyncCommitteeAccountsForEpoch(ctx, epoch) + if err != nil { + return nil, errors.Wrap(err, "failed to obtain sync committee eligible accounts") + } + + validatorIndices := make([]phase0.ValidatorIndex, 0, len(accounts)) + for index := range accounts { + validatorIndices = append(validatorIndices, index) + } + + return validatorIndices, nil +} + func fetchAltairForkEpoch(ctx context.Context, specProvider eth2client.SpecProvider, ) ( @@ -526,7 +525,7 @@ func (s *Service) handleAltairForkEpoch(ctx context.Context) { } go func() { - _, validatorIndices, err := s.accountsAndIndicesForEpoch(ctx, s.altairForkEpoch) + validatorIndices, err := s.syncCommitteeIndicesForEpoch(ctx, s.altairForkEpoch) if err != nil { s.log.Error().Err(err).Msg("Failed to obtain active validator indices for the Altair fork epoch") return @@ -537,7 +536,7 @@ func (s *Service) handleAltairForkEpoch(ctx context.Context) { go func() { nextPeriodEpoch := phase0.Epoch((uint64(s.altairForkEpoch)/s.epochsPerSyncCommitteePeriod + 1) * s.epochsPerSyncCommitteePeriod) if uint64(nextPeriodEpoch-s.altairForkEpoch) <= syncCommitteePreparationEpochs { - _, validatorIndices, err := s.accountsAndIndicesForEpoch(ctx, nextPeriodEpoch) + validatorIndices, err := s.syncCommitteeIndicesForEpoch(ctx, nextPeriodEpoch) if err != nil { s.log.Error().Err(err).Msg("Failed to obtain active validator indices for the period following the Altair fork epoch") return @@ -612,3 +611,54 @@ func obtainSpecValues(ctx context.Context, return slotDuration, slotsPerEpoch, epochsPerSyncCommitteePeriod, nil } + +func capellaDetails(ctx context.Context, log zerolog.Logger, specProvider eth2client.SpecProvider) phase0.Epoch { + // Fetch the Capella fork epoch from the fork schedule. + var capellaForkEpoch phase0.Epoch + capellaForkEpoch, err := fetchCapellaForkEpoch(ctx, specProvider) + if err != nil { + capellaForkEpoch = 0xffffffffffffffff + } else { + log.Trace().Uint64("epoch", uint64(capellaForkEpoch)).Msg("Obtained Capella fork epoch") + } + return capellaForkEpoch +} + +func bellatrixDetails(ctx context.Context, log zerolog.Logger, specProvider eth2client.SpecProvider) (bool, phase0.Epoch) { + // Handling bellatrix if we can obtain its fork epoch. + handlingBellatrix := true + // Fetch the bellatrix fork epoch from the fork schedule. + var bellatrixForkEpoch phase0.Epoch + bellatrixForkEpoch, err := fetchBellatrixForkEpoch(ctx, specProvider) + if err != nil { + // Not handling bellatrix after all. + handlingBellatrix = false + bellatrixForkEpoch = 0xffffffffffffffff + } else { + log.Trace().Uint64("epoch", uint64(bellatrixForkEpoch)).Msg("Obtained Bellatrix fork epoch") + } + if !handlingBellatrix { + log.Debug().Msg("Not handling Bellatrix") + } + return handlingBellatrix, bellatrixForkEpoch +} + +func altairDetails(ctx context.Context, log zerolog.Logger, specProvider eth2client.SpecProvider, syncCommitteeAggregator synccommitteeaggregator.Service, epochsPerSyncCommitteePeriod uint64) (bool, phase0.Epoch) { + // Handling altair if we have the service and spec to do so. + handlingAltair := syncCommitteeAggregator != nil && epochsPerSyncCommitteePeriod != 0 + // Fetch the altair fork epoch from the fork schedule. + var altairForkEpoch phase0.Epoch + if handlingAltair { + altairForkEpoch, err := fetchAltairForkEpoch(ctx, specProvider) + if err != nil { + // Not handling altair after all. + handlingAltair = false + } else { + log.Trace().Uint64("epoch", uint64(altairForkEpoch)).Msg("Obtained Altair fork epoch") + } + } + if !handlingAltair { + log.Debug().Msg("Not handling Altair") + } + return handlingAltair, altairForkEpoch +} diff --git a/services/controller/standard/synccommitteemessenger.go b/services/controller/standard/synccommitteemessenger.go index 872e4bd..1b2aaed 100644 --- a/services/controller/standard/synccommitteemessenger.go +++ b/services/controller/standard/synccommitteemessenger.go @@ -81,7 +81,7 @@ func (s *Service) scheduleSyncCommitteeMessages(ctx context.Context, } // Obtain the accounts for the validator indices. - accounts, err := s.validatingAccountsProvider.ValidatingAccountsForEpochByIndex(ctx, firstEpoch, validatorIndices) + accounts, err := s.validatingAccountsProvider.SyncCommitteeAccountsForEpochByIndex(ctx, firstEpoch, validatorIndices) if err != nil { s.log.Error().Err(err).Msg("Failed to obtain validating accounts for epoch") return