diff --git a/services/controller/standard/events.go b/services/controller/standard/events.go index 7b1e010c..f29fef1f 100644 --- a/services/controller/standard/events.go +++ b/services/controller/standard/events.go @@ -22,6 +22,7 @@ import ( "github.com/attestantio/go-eth2-client/api" apiv1 "github.com/attestantio/go-eth2-client/api/v1" "github.com/attestantio/go-eth2-client/spec/phase0" + "github.com/attestantio/vouch/services/metrics" e2wtypes "github.com/wealdtech/go-eth2-wallet-types/v2" "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/attribute" @@ -345,6 +346,8 @@ func (s *Service) VerifySyncCommitteeMessages(ctx context.Context, data *apiv1.H _, span := otel.Tracer("attestantio.vouch.services.controller.standard").Start(ctx, "VerifySyncCommitteeMessages") defer span.End() + messengerMonitor := s.monitor.(metrics.SyncCommitteeValidationMonitor) + // We verify against the previous slot as that is when the sync committee will have reported. previousSlot := data.Slot - 1 currentSlot := data.Slot @@ -367,6 +370,7 @@ func (s *Service) VerifySyncCommitteeMessages(ctx context.Context, data *apiv1.H }) if err != nil { log.Debug().Err(err).Msg("Failed to retrieve head block for sync committee validation") + messengerMonitor.SyncCommitteeGetHeadBlockFailedInc(previousSlot, data.Block.String()) return } parentRoot, err := blockResponse.Data.ParentRoot() @@ -375,8 +379,11 @@ func (s *Service) VerifySyncCommitteeMessages(ctx context.Context, data *apiv1.H return } if !bytes.Equal(parentRoot[:], previousSlotData.Root[:]) { - log.Trace().Stringer("head_parent_root", parentRoot).Stringer("broadcast_root", previousSlotData.Root). + parentRootString := parentRoot.String() + previousSlotRoot := previousSlotData.Root.String() + log.Trace().Str("head_parent_root", parentRootString).Str("broadcast_root", previousSlotRoot). Msg("Parent root does not equal sync committee root broadcast") + messengerMonitor.SyncCommitteeMessagesHeadMismatchInc(previousSlot, parentRootString, previousSlotRoot) return } syncAggregate, err := blockResponse.Data.SyncAggregate() @@ -393,7 +400,10 @@ func (s *Service) VerifySyncCommitteeMessages(ctx context.Context, data *apiv1.H log.Debug().Uint64("validator_index", uint64(validatorIndex)). Uint64("committee_index", uint64(committeeIndex)). Msg("Validator not included in SyncAggregate SyncCommitteeBits") + messengerMonitor.SyncCommitteeSyncAggregateMissingInc(previousSlot, validatorIndex, committeeIndex) + continue } + messengerMonitor.SyncCommitteeSyncAggregateFoundInc(previousSlot, validatorIndex, committeeIndex) } } } diff --git a/services/metrics/null/service.go b/services/metrics/null/service.go index 7880937f..4b2b0ce7 100644 --- a/services/metrics/null/service.go +++ b/services/metrics/null/service.go @@ -108,3 +108,18 @@ func (*Service) SyncCommitteeSubscriptionCompleted(_ time.Time, _ string) { // SyncCommitteeSubscribers sets the number of sync committees to which our validators are subscribed. func (*Service) SyncCommitteeSubscribers(_ int) { } + +// SyncCommitteeSyncAggregateFoundInc is called when our sync committee participation was included in the SyncAggregate for the next head. +func (*Service) SyncCommitteeSyncAggregateFoundInc(_ phase0.Slot, _ phase0.ValidatorIndex, _ phase0.CommitteeIndex) { +} + +// SyncCommitteeSyncAggregateMissingInc is called when our sync committee participation was not included in the SyncAggregate for the next head. +func (*Service) SyncCommitteeSyncAggregateMissingInc(_ phase0.Slot, _ phase0.ValidatorIndex, _ phase0.CommitteeIndex) { +} + +// SyncCommitteeGetHeadBlockFailedInc is called when validation for a sync committee fails due to being unable to retrieve the head block. +func (*Service) SyncCommitteeGetHeadBlockFailedInc(_ phase0.Slot, _ string) {} + +// SyncCommitteeMessagesHeadMismatchInc is called when a sync committee message was known to not match the next head block. +func (*Service) SyncCommitteeMessagesHeadMismatchInc(_ phase0.Slot, _, _ string) { +} diff --git a/services/metrics/prometheus/service.go b/services/metrics/prometheus/service.go index ef19b93b..9d6e2d5b 100644 --- a/services/metrics/prometheus/service.go +++ b/services/metrics/prometheus/service.go @@ -53,6 +53,11 @@ type Service struct { syncCommitteeMessageMarkTimer prometheus.Histogram syncCommitteeMessageProcessLatestSlot prometheus.Gauge + syncCommitteeValidationHeadMismatches *prometheus.CounterVec + syncCommitteeValidationAggregateFound *prometheus.CounterVec + syncCommitteeValidationAggregateMissing *prometheus.CounterVec + syncCommitteeValidationGetHeadFailures *prometheus.CounterVec + syncCommitteeAggregationProcessTimer prometheus.Histogram syncCommitteeAggregationProcessRequests *prometheus.CounterVec syncCommitteeAggregationCoverageRatio prometheus.Histogram @@ -111,6 +116,9 @@ func New(_ context.Context, params ...Parameter) (*Service, error) { if err := s.setupSyncCommitteeMessageMetrics(); err != nil { return nil, errors.Wrap(err, "failed to set up sync committee message metrics") } + if err := s.setupSyncCommitteeValidationMetrics(); err != nil { + return nil, errors.Wrap(err, "failed to set up sync committee validation metrics") + } if err := s.setupSyncCommitteeAggregationMetrics(); err != nil { return nil, errors.Wrap(err, "failed to set up sync committee aggregation metrics") } diff --git a/services/metrics/prometheus/synccommitteemessenger.go b/services/metrics/prometheus/synccommitteemessenger.go index 1976661f..95215065 100644 --- a/services/metrics/prometheus/synccommitteemessenger.go +++ b/services/metrics/prometheus/synccommitteemessenger.go @@ -99,7 +99,6 @@ func (s *Service) setupSyncCommitteeMessageMetrics() error { return err } } - return nil } diff --git a/services/metrics/prometheus/synccommitteevalidation.go b/services/metrics/prometheus/synccommitteevalidation.go new file mode 100644 index 00000000..64aa5daf --- /dev/null +++ b/services/metrics/prometheus/synccommitteevalidation.go @@ -0,0 +1,112 @@ +// Copyright © 2021 - 2024 Attestant Limited. +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package prometheus + +import ( + "errors" + "strconv" + + "github.com/attestantio/go-eth2-client/spec/phase0" + "github.com/prometheus/client_golang/prometheus" +) + +func (s *Service) setupSyncCommitteeValidationMetrics() error { + s.syncCommitteeValidationHeadMismatches = prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: "vouch", + Subsystem: "synccommitteevalidation", + Name: "mismatches_total", + Help: "The number of sync committee messages we broadcast that did not match the next head root.", + }, []string{"slot", "head_parent_root", "broadcast_root"}) + if err := prometheus.Register(s.syncCommitteeValidationHeadMismatches); err != nil { + var alreadyRegisteredError prometheus.AlreadyRegisteredError + if ok := errors.As(err, &alreadyRegisteredError); ok { + s.syncCommitteeValidationHeadMismatches = alreadyRegisteredError.ExistingCollector.(*prometheus.CounterVec) + } else { + return err + } + } + + s.syncCommitteeValidationGetHeadFailures = prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: "vouch", + Subsystem: "synccommitteevalidation", + Name: "get_head_failures_total", + Help: "The number of times sync committee validation failed due to being unable to retrieve the head block.", + }, []string{"slot", "block"}) + if err := prometheus.Register(s.syncCommitteeValidationGetHeadFailures); err != nil { + var alreadyRegisteredError prometheus.AlreadyRegisteredError + if ok := errors.As(err, &alreadyRegisteredError); ok { + s.syncCommitteeValidationGetHeadFailures = alreadyRegisteredError.ExistingCollector.(*prometheus.CounterVec) + } else { + return err + } + } + + s.syncCommitteeValidationAggregateFound = prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: "vouch", + Subsystem: "synccommitteevalidation", + Name: "found_total", + Help: "The number of sync committee messages that were included in the sync aggregate.", + }, []string{"slot", "validator_index", "contribution_index"}) + if err := prometheus.Register(s.syncCommitteeValidationAggregateFound); err != nil { + var alreadyRegisteredError prometheus.AlreadyRegisteredError + if ok := errors.As(err, &alreadyRegisteredError); ok { + s.syncCommitteeValidationAggregateFound = alreadyRegisteredError.ExistingCollector.(*prometheus.CounterVec) + } else { + return err + } + } + + s.syncCommitteeValidationAggregateMissing = prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: "vouch", + Subsystem: "synccommitteevalidation", + Name: "missing_total", + Help: "The number of sync committee messages that were not included in the sync aggregate.", + }, []string{"slot", "validator_index", "contribution_index"}) + if err := prometheus.Register(s.syncCommitteeValidationAggregateMissing); err != nil { + var alreadyRegisteredError prometheus.AlreadyRegisteredError + if ok := errors.As(err, &alreadyRegisteredError); ok { + s.syncCommitteeValidationAggregateMissing = alreadyRegisteredError.ExistingCollector.(*prometheus.CounterVec) + } else { + return err + } + } + + return nil +} + +// SyncCommitteeSyncAggregateFoundInc is called when our sync committee participation was included in the SyncAggregate for the next head. +func (s *Service) SyncCommitteeSyncAggregateFoundInc(slot phase0.Slot, validatorIndex phase0.ValidatorIndex, committeeIndex phase0.CommitteeIndex) { + converter := func(unitToConvert uint64) string { + return strconv.FormatUint(unitToConvert, 10) + } + s.syncCommitteeValidationAggregateFound.WithLabelValues(converter(uint64(slot)), converter(uint64(validatorIndex)), converter(uint64(committeeIndex))).Add(1) +} + +// SyncCommitteeSyncAggregateMissingInc is called when our sync committee participation was not included in the SyncAggregate for the next head. +func (s *Service) SyncCommitteeSyncAggregateMissingInc(slot phase0.Slot, validatorIndex phase0.ValidatorIndex, committeeIndex phase0.CommitteeIndex) { + converter := func(unitToConvert uint64) string { + return strconv.FormatUint(unitToConvert, 10) + } + s.syncCommitteeValidationAggregateMissing.WithLabelValues(converter(uint64(slot)), converter(uint64(validatorIndex)), converter(uint64(committeeIndex))).Add(1) +} + +// SyncCommitteeGetHeadBlockFailedInc is called when validation for a sync committee fails due to being unable to retrieve the head block. +func (s *Service) SyncCommitteeGetHeadBlockFailedInc(slot phase0.Slot, block string) { + s.syncCommitteeValidationGetHeadFailures.WithLabelValues(strconv.FormatUint(uint64(slot), 10), block).Add(1) +} + +// SyncCommitteeMessagesHeadMismatchInc is called when a sync committee message was known to not match the next head block. +func (s *Service) SyncCommitteeMessagesHeadMismatchInc(slot phase0.Slot, headParentRoot, broadcastRoot string) { + s.syncCommitteeValidationAggregateMissing.WithLabelValues(strconv.FormatUint(uint64(slot), 10), headParentRoot, broadcastRoot).Add(1) +} diff --git a/services/metrics/service.go b/services/metrics/service.go index d00a1d52..f9931370 100644 --- a/services/metrics/service.go +++ b/services/metrics/service.go @@ -103,6 +103,18 @@ type SyncCommitteeSubscriptionMonitor interface { SyncCommitteeSubscribers(subscribers int) } +// SyncCommitteeValidationMonitor provides methods to monitor the sync committee validation process. +type SyncCommitteeValidationMonitor interface { + // SyncCommitteeSyncAggregateFoundInc is called when our sync committee participation was included in the SyncAggregate for the next head. + SyncCommitteeSyncAggregateFoundInc(slot phase0.Slot, validatorIndex phase0.ValidatorIndex, committeeIndex phase0.CommitteeIndex) + // SyncCommitteeSyncAggregateMissingInc is called when our sync committee participation was not included in the SyncAggregate for the next head. + SyncCommitteeSyncAggregateMissingInc(slot phase0.Slot, validatorIndex phase0.ValidatorIndex, committeeIndex phase0.CommitteeIndex) + // SyncCommitteeGetHeadBlockFailedInc is called when validation for a sync committee fails due to being unable to retrieve the head block. + SyncCommitteeGetHeadBlockFailedInc(slot phase0.Slot, block string) + // SyncCommitteeMessagesHeadMismatchInc is called when a sync committee message was known to not match the next head block. + SyncCommitteeMessagesHeadMismatchInc(slot phase0.Slot, headParentRoot, broadcastRoot string) +} + // AccountManagerMonitor provides methods to monitor the account manager. type AccountManagerMonitor interface { // Accounts sets the number of accounts in a given state.