Skip to content

Commit

Permalink
Merge pull request #229 from attestantio/cache_strategy
Browse files Browse the repository at this point in the history
Use strategy for providers and alter sync committee eligibile logic
  • Loading branch information
Bez625 authored Aug 12, 2024
2 parents 713059b + 4d2f8f8 commit a0cedef
Show file tree
Hide file tree
Showing 11 changed files with 531 additions and 182 deletions.
6 changes: 4 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,10 @@ dev:
- add 'failed' dimension for root to slot lookup metrics
- add 'deadline' builder bid strategy
- add builder configurations to allow more control over selection of bids
- add sync committee verification metrics to highlight when we were and were not included in a SyncAggregate.
- add config setting to enable the above metrics.
- add sync committee verification metrics to highlight when we were and were not included in a SyncAggregate
- add config setting to enable the above metrics
- alter logic for determining sync committee eligible accounts
- enable first strategies to be defined for beaconblockheader and signedbeaconblock

1.9.0-alpha.3
- add proposal value and blinded status to trace
Expand Down
24 changes: 22 additions & 2 deletions docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ log-level: 'debug'
# Overridden by beacon-node-addresses if present.
beacon-node-address: 'localhost:4000'

# beacon-node-addresseses is the list of address of the beacon nodes. Can be lighthouse, nimbus, prysm or teku.
# beacon-node-addresses is the list of address of the beacon nodes. Can be lighthouse, nimbus, prysm or teku.
# If multiple addresses are supplied here it makes Vouch resilient in the situation where a beacon
# node goes offline entirely. If this occurs to the currently used node then the next in the list will
# be used. If a beacon node comes back online it is added to the end of the list of potential nodes to
Expand All @@ -32,7 +32,7 @@ beacon-node-address: 'localhost:4000'
# ensure they are happy with the event output of all beacon nodes in this list.
beacon-node-addresses: ['localhost:4000', 'localhost:5051', 'localhost:5052']

# timeout is the timeout for all validating operations, for example fetching attesation data from beacon nodes.
# timeout is the timeout for all validating operations, for example fetching attestation data from beacon nodes.
timeout: '2s'

# reduced-memory-usage can be set on memory-constrained systems to reduce memory usage, at the cost of increased processing time.
Expand Down Expand Up @@ -155,6 +155,16 @@ strategies:
# This allows Vouch to remain responsive in the situation where some beacon nodes are significantly slower than others, for
# example if one is remote.
timeout: '2s'
# The beaconblockheader strategy obtains the beacon block headers from multiple beacon nodes.
beaconblockheader:
# style can be 'first'. If not defined or set to another value Vouch will default to using the multiclient.
style: 'first'
first:
# beacon-node-addresses are the addresses from which to receive beacon block headers.
beacon-node-addresses: ['localhost:4000', 'localhost:5051', 'localhost:5052']
# timeout defines the maximum amount of time the strategy will wait for a response. Different strategies may return earlier
# if they have obtained enough information from their beacon node(s).
timeout: '2s'
# The beaconblockroot strategy obtains the beacon block root from multiple beacon nodes.
beaconblockroot:
# style can be 'first', which uses the first returned, 'latest', which uses the latest returned, or 'majority', which uses
Expand All @@ -177,6 +187,16 @@ strategies:
deadline: '1s'
# bid-gap is the gap between receiving a response from a relay and querying it again.
bid-gap: '100ms'
# The signedbeaconblock strategy obtains the signed beacon blocks from multiple beacon nodes.
signedbeaconblock:
# style can be 'first'. If not defined or set to another value Vouch will default to using the multiclient.
style: 'first'
first:
# beacon-node-addresses are the addresses from which to receive signed beacon blocks.
beacon-node-addresses: ['localhost:4000', 'localhost:5051', 'localhost:5052']
# timeout defines the maximum amount of time the strategy will wait for a response. Different strategies may return earlier
# if they have obtained enough information from their beacon node(s).
timeout: '2s'
# The synccommitteecontribution strategy obtains sync committee contributions from multiple sources.
synccommitteecontribution:
# style can be 'best', which obtains contributions from all nodes and selects the best, or 'first', which uses the first returned
Expand Down
380 changes: 265 additions & 115 deletions main.go

Large diffs are not rendered by default.

43 changes: 37 additions & 6 deletions services/accountmanager/dirk/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
eth2client "github.com/attestantio/go-eth2-client"
api "github.com/attestantio/go-eth2-client/api/v1"
"github.com/attestantio/go-eth2-client/spec/phase0"
"github.com/attestantio/vouch/services/accountmanager/utils"
"github.com/attestantio/vouch/services/chaintime"
"github.com/attestantio/vouch/services/metrics"
"github.com/attestantio/vouch/services/validatorsmanager"
Expand Down Expand Up @@ -288,7 +289,22 @@ func credentialsFromCerts(ctx context.Context, clientCert []byte, clientKey []by

// ValidatingAccountsForEpoch obtains the validating accounts for a given epoch.
func (s *Service) ValidatingAccountsForEpoch(ctx context.Context, epoch phase0.Epoch) (map[phase0.ValidatorIndex]e2wtypes.Account, error) {
ctx, span := otel.Tracer("attestantio.vouch.services.accountmanager.dirk").Start(ctx, "ValidatingAccountsForEpoch", trace.WithAttributes(
filterFunc := func(state api.ValidatorState) bool {
return state == api.ValidatorStateActiveOngoing || state == api.ValidatorStateActiveExiting
}
return s.accountsForEpochWithFilter(ctx, epoch, "Validating", filterFunc)
}

// SyncCommitteeAccountsForEpoch obtains the accounts eligible for Sync Committee duty for a given epoch.
// The Ethereum specification has different criteria for Sync Committee eligibility compared to other validating duties.
// This includes an edge case where we are still in scope for sync committee duty between exited and withdrawal states.
func (s *Service) SyncCommitteeAccountsForEpoch(ctx context.Context, epoch phase0.Epoch) (map[phase0.ValidatorIndex]e2wtypes.Account, error) {
return s.accountsForEpochWithFilter(ctx, epoch, "SyncCommittee", utils.IsSyncCommitteeEligible)
}

// accountsForEpochWithFilter obtains the accounts for a given epoch with a filter on the state of validators returned.
func (s *Service) accountsForEpochWithFilter(ctx context.Context, epoch phase0.Epoch, accountType string, filterFunc func(state api.ValidatorState) bool) (map[phase0.ValidatorIndex]e2wtypes.Account, error) {
ctx, span := otel.Tracer("attestantio.vouch.services.accountmanager.dirk").Start(ctx, fmt.Sprintf("%sAccountsForEpoch", accountType), trace.WithAttributes(
attribute.Int64("epoch", int64(epoch)),
))
defer span.End()
Expand Down Expand Up @@ -317,20 +333,20 @@ func (s *Service) ValidatingAccountsForEpoch(ctx context.Context, epoch phase0.E
for index, validator := range validators {
state := api.ValidatorToState(validator, nil, epoch, s.farFutureEpoch)
stateCount[state]++
if state == api.ValidatorStateActiveOngoing || state == api.ValidatorStateActiveExiting {
if filterFunc(state) {
account := s.accounts[validator.PublicKey]
s.log.Trace().
Str("name", account.Name()).
Str("public_key", fmt.Sprintf("%x", account.PublicKey().Marshal())).
Uint64("index", uint64(index)).
Str("state", state.String()).
Msg("Validating account")
Msg(fmt.Sprintf("%s account", accountType))
validatingAccounts[index] = account
} else {
s.log.Trace().
Stringer("pubkey", validator.PublicKey).
Stringer("state", state).
Msg("Non-validating account")
Msg(fmt.Sprintf("Non-%s account", strings.ToLower(accountType)))
}
}
s.mutex.RUnlock()
Expand All @@ -348,7 +364,22 @@ func (s *Service) ValidatingAccountsForEpoch(ctx context.Context, epoch phase0.E

// ValidatingAccountsForEpochByIndex obtains the specified validating accounts for a given epoch.
func (s *Service) ValidatingAccountsForEpochByIndex(ctx context.Context, epoch phase0.Epoch, indices []phase0.ValidatorIndex) (map[phase0.ValidatorIndex]e2wtypes.Account, error) {
ctx, span := otel.Tracer("attestantio.vouch.services.accountmanager.dirk").Start(ctx, "ValidatingAccountsForEpochByIndex", trace.WithAttributes(
filterFunc := func(state api.ValidatorState) bool {
return state == api.ValidatorStateActiveOngoing || state == api.ValidatorStateActiveExiting
}
return s.accountsForEpochByIndexWithFilter(ctx, epoch, indices, "Validating", filterFunc)
}

// SyncCommitteeAccountsForEpochByIndex obtains the specified Sync Committee eligible accounts for a given epoch.
// The Ethereum specification has different criteria for Sync Committee eligibility compared to other validating duties.
// This includes an edge case where we are still in scope for sync committee duty between exited and withdrawal states.
func (s *Service) SyncCommitteeAccountsForEpochByIndex(ctx context.Context, epoch phase0.Epoch, indices []phase0.ValidatorIndex) (map[phase0.ValidatorIndex]e2wtypes.Account, error) {
return s.accountsForEpochByIndexWithFilter(ctx, epoch, indices, "SyncCommittee", utils.IsSyncCommitteeEligible)
}

// accountsForEpochByIndexWithFilter obtains the specified accounts for a given epoch.
func (s *Service) accountsForEpochByIndexWithFilter(ctx context.Context, epoch phase0.Epoch, indices []phase0.ValidatorIndex, accountType string, filterFunc func(state api.ValidatorState) bool) (map[phase0.ValidatorIndex]e2wtypes.Account, error) {
ctx, span := otel.Tracer("attestantio.vouch.services.accountmanager.dirk").Start(ctx, fmt.Sprintf("%sAccountsForEpochByIndex", accountType), trace.WithAttributes(
attribute.Int64("epoch", int64(epoch)),
))
defer span.End()
Expand All @@ -368,7 +399,7 @@ func (s *Service) ValidatingAccountsForEpochByIndex(ctx context.Context, epoch p
continue
}
state := api.ValidatorToState(validator, nil, epoch, s.farFutureEpoch)
if state == api.ValidatorStateActiveOngoing || state == api.ValidatorStateActiveExiting {
if filterFunc(state) {
s.mutex.RLock()
validatingAccounts[index] = s.accounts[validator.PublicKey]
s.mutex.RUnlock()
Expand Down
39 changes: 39 additions & 0 deletions services/accountmanager/mock/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,29 @@ func (s *ValidatingAccountsProvider) ValidatingAccountsForEpochByIndex(_ context
return accounts, nil
}

// SyncCommitteeAccountsForEpoch is a mock.
func (s *ValidatingAccountsProvider) SyncCommitteeAccountsForEpoch(_ context.Context, _ phase0.Epoch) (map[phase0.ValidatorIndex]e2wtypes.Account, error) {
return s.validatingAccounts, nil
}

// SyncCommitteeAccountsForEpochByIndex obtains the specified validating accounts for a given epoch.
func (s *ValidatingAccountsProvider) SyncCommitteeAccountsForEpochByIndex(_ context.Context,
_ phase0.Epoch,
indices []phase0.ValidatorIndex,
) (
map[phase0.ValidatorIndex]e2wtypes.Account,
error,
) {
accounts := make(map[phase0.ValidatorIndex]e2wtypes.Account)
for _, index := range indices {
if account, exists := s.validatingAccounts[index]; exists {
accounts[index] = account
}
}

return accounts, nil
}

type accountsProvider struct{}

// NewAccountsProvider is a mock.
Expand Down Expand Up @@ -106,3 +129,19 @@ func (*erroringValidatingAccountsProvider) ValidatingAccountsForEpochByIndex(_ c
) {
return nil, errors.New("error")
}

// SyncCommitteeAccountsForEpoch is a mock.
func (*erroringValidatingAccountsProvider) SyncCommitteeAccountsForEpoch(_ context.Context, _ phase0.Epoch) (map[phase0.ValidatorIndex]e2wtypes.Account, error) {
return nil, errors.New("error")
}

// SyncCommitteeAccountsForEpochByIndex obtains the specified validating accounts for a given epoch.
func (*erroringValidatingAccountsProvider) SyncCommitteeAccountsForEpochByIndex(_ context.Context,
_ phase0.Epoch,
_ []phase0.ValidatorIndex,
) (
map[phase0.ValidatorIndex]e2wtypes.Account,
error,
) {
return nil, errors.New("error")
}
16 changes: 16 additions & 0 deletions services/accountmanager/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,22 @@ type ValidatingAccountsProvider interface {
map[phase0.ValidatorIndex]e2wtypes.Account,
error,
)

// SyncCommitteeAccountsForEpoch obtains the accounts eligible for Sync Committee duty for a given epoch.
// The Ethereum specification has different criteria for Sync Committee eligibility compared to other validating duties.
// This includes an edge case where we are still in scope for sync committee duty between exited and withdrawal states.
SyncCommitteeAccountsForEpoch(ctx context.Context, epoch phase0.Epoch) (map[phase0.ValidatorIndex]e2wtypes.Account, error)

// SyncCommitteeAccountsForEpochByIndex obtains the specified Sync Committee eligible accounts for a given epoch.
// The Ethereum specification has different criteria for Sync Committee eligibility compared to other validating duties.
// This includes an edge case where we are still in scope for sync committee duty between exited and withdrawal states.
SyncCommitteeAccountsForEpochByIndex(ctx context.Context,
epoch phase0.Epoch,
indices []phase0.ValidatorIndex,
) (
map[phase0.ValidatorIndex]e2wtypes.Account,
error,
)
}

// Refresher refreshes account information from the remote source.
Expand Down
10 changes: 10 additions & 0 deletions services/accountmanager/utils/utils.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package utils

import apiv1 "github.com/attestantio/go-eth2-client/api/v1"

// IsSyncCommitteeEligible returns true if the validator is in a state that is eligible for Sync Committee duty.
func IsSyncCommitteeEligible(state apiv1.ValidatorState) bool {
return state == apiv1.ValidatorStateActiveOngoing || state == apiv1.ValidatorStateActiveExiting ||
state == apiv1.ValidatorStateExitedUnslashed || state == apiv1.ValidatorStateActiveSlashed ||
state == apiv1.ValidatorStateExitedSlashed || state == apiv1.ValidatorStateWithdrawalPossible
}
41 changes: 36 additions & 5 deletions services/accountmanager/wallet/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/attestantio/go-eth2-client/api"
apiv1 "github.com/attestantio/go-eth2-client/api/v1"
"github.com/attestantio/go-eth2-client/spec/phase0"
"github.com/attestantio/vouch/services/accountmanager/utils"
"github.com/attestantio/vouch/services/chaintime"
"github.com/attestantio/vouch/services/metrics"
"github.com/attestantio/vouch/services/validatorsmanager"
Expand Down Expand Up @@ -204,7 +205,22 @@ func (s *Service) refreshValidators(ctx context.Context) error {

// ValidatingAccountsForEpoch obtains the validating accounts for a given epoch.
func (s *Service) ValidatingAccountsForEpoch(ctx context.Context, epoch phase0.Epoch) (map[phase0.ValidatorIndex]e2wtypes.Account, error) {
ctx, span := otel.Tracer("attestantio.vouch.services.accountmanager.wallet").Start(ctx, "ValidatingAccountsForEpoch", trace.WithAttributes(
filterFunc := func(state apiv1.ValidatorState) bool {
return state == apiv1.ValidatorStateActiveOngoing || state == apiv1.ValidatorStateActiveExiting
}
return s.accountsForEpochWithFilter(ctx, epoch, "Validating", filterFunc)
}

// SyncCommitteeAccountsForEpoch obtains the accounts eligible for Sync Committee duty for a given epoch.
// The Ethereum specification has different criteria for Sync Committee eligibility compared to other validating duties.
// This includes an edge case where we are still in scope for sync committee duty between exited and withdrawal states.
func (s *Service) SyncCommitteeAccountsForEpoch(ctx context.Context, epoch phase0.Epoch) (map[phase0.ValidatorIndex]e2wtypes.Account, error) {
return s.accountsForEpochWithFilter(ctx, epoch, "SyncCommittee", utils.IsSyncCommitteeEligible)
}

// accountsForEpochWithFilter obtains the accounts for a given epoch with a filter on the state of validators returned.
func (s *Service) accountsForEpochWithFilter(ctx context.Context, epoch phase0.Epoch, accountType string, filterFunc func(state apiv1.ValidatorState) bool) (map[phase0.ValidatorIndex]e2wtypes.Account, error) {
ctx, span := otel.Tracer("attestantio.vouch.services.accountmanager.wallet").Start(ctx, fmt.Sprintf("%sAccountsForEpoch", accountType), trace.WithAttributes(
attribute.Int64("epoch", int64(epoch)),
))
defer span.End()
Expand Down Expand Up @@ -233,14 +249,14 @@ func (s *Service) ValidatingAccountsForEpoch(ctx context.Context, epoch phase0.E
for index, validator := range validators {
state := apiv1.ValidatorToState(validator, nil, epoch, s.farFutureEpoch)
stateCount[state]++
if state == apiv1.ValidatorStateActiveOngoing || state == apiv1.ValidatorStateActiveExiting {
if filterFunc(state) {
account := s.accounts[validator.PublicKey]
s.log.Trace().
Str("name", account.Name()).
Str("public_key", fmt.Sprintf("%x", account.PublicKey().Marshal())).
Uint64("index", uint64(index)).
Str("state", state.String()).
Msg("Validating account")
Msg(fmt.Sprintf("%s account", accountType))
validatingAccounts[index] = account
}
}
Expand All @@ -258,7 +274,22 @@ func (s *Service) ValidatingAccountsForEpoch(ctx context.Context, epoch phase0.E

// ValidatingAccountsForEpochByIndex obtains the specified validating accounts for a given epoch.
func (s *Service) ValidatingAccountsForEpochByIndex(ctx context.Context, epoch phase0.Epoch, indices []phase0.ValidatorIndex) (map[phase0.ValidatorIndex]e2wtypes.Account, error) {
ctx, span := otel.Tracer("attestantio.vouch.services.accountmanager.wallet").Start(ctx, "ValidatingAccountsForEpochByIndex", trace.WithAttributes(
filterFunc := func(state apiv1.ValidatorState) bool {
return state == apiv1.ValidatorStateActiveOngoing || state == apiv1.ValidatorStateActiveExiting
}
return s.accountsForEpochByIndexWithFilter(ctx, epoch, indices, "Validating", filterFunc)
}

// SyncCommitteeAccountsForEpochByIndex obtains the specified Sync Committee eligible accounts for a given epoch.
// The Ethereum specification has different criteria for Sync Committee eligibility compared to other validating duties.
// This includes an edge case where we are still in scope for sync committee duty between exited and withdrawal states.
func (s *Service) SyncCommitteeAccountsForEpochByIndex(ctx context.Context, epoch phase0.Epoch, indices []phase0.ValidatorIndex) (map[phase0.ValidatorIndex]e2wtypes.Account, error) {
return s.accountsForEpochByIndexWithFilter(ctx, epoch, indices, "SyncCommittee", utils.IsSyncCommitteeEligible)
}

// accountsForEpochByIndexWithFilter obtains the specified accounts for a given epoch with a filter on the state of validators returned.
func (s *Service) accountsForEpochByIndexWithFilter(ctx context.Context, epoch phase0.Epoch, indices []phase0.ValidatorIndex, accountType string, filterFunc func(state apiv1.ValidatorState) bool) (map[phase0.ValidatorIndex]e2wtypes.Account, error) {
ctx, span := otel.Tracer("attestantio.vouch.services.accountmanager.wallet").Start(ctx, fmt.Sprintf("%sAccountsForEpochByIndex", accountType), trace.WithAttributes(
attribute.Int64("epoch", int64(epoch)),
))
defer span.End()
Expand All @@ -279,7 +310,7 @@ func (s *Service) ValidatingAccountsForEpochByIndex(ctx context.Context, epoch p
continue
}
state := apiv1.ValidatorToState(validator, nil, epoch, s.farFutureEpoch)
if state == apiv1.ValidatorStateActiveOngoing || state == apiv1.ValidatorStateActiveExiting {
if filterFunc(state) {
validatingAccounts[index] = s.accounts[validator.PublicKey]
}
}
Expand Down
10 changes: 5 additions & 5 deletions services/controller/standard/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,10 +184,10 @@ func (s *Service) fastTrackJobs(ctx context.Context,
// handlePreviousDependentRootChanged handles the situation where the previous
// dependent root changed.
func (s *Service) handlePreviousDependentRootChanged(ctx context.Context) {
// Refreshes run in parallel.
// NOT running task in goroutine as there is only one task and this function is always called in a goroutine.

// We need to refresh the attester duties for this epoch.
go s.refreshAttesterDutiesForEpoch(ctx, s.chainTimeService.CurrentEpoch())
s.refreshAttesterDutiesForEpoch(ctx, s.chainTimeService.CurrentEpoch())
}

// handleCurrentDependentRootChanged handles the situation where the current
Expand Down Expand Up @@ -314,15 +314,15 @@ func (s *Service) refreshSyncCommitteeDutiesForEpochPeriod(ctx context.Context,
}
}

_, validatorIndices, err := s.accountsAndIndicesForEpoch(ctx, firstEpoch)
validatorIndices, err := s.syncCommitteeIndicesForEpoch(ctx, firstEpoch)
if err != nil {
s.log.Error().Err(err).Uint64("epoch", uint64(firstEpoch)).Msg("Failed to obtain active validators for epoch")
s.log.Error().Err(err).Uint64("epoch", uint64(firstEpoch)).Msg("Failed to obtain sync committee eligible validators for epoch")
return
}

// Expect at least one validator.
if len(validatorIndices) == 0 {
s.log.Warn().Msg("No active validators; not validating")
s.log.Warn().Msg("No eligible sync committee validators for epoch; not scheduling sync committee messages")
return
}

Expand Down
Loading

0 comments on commit a0cedef

Please sign in to comment.