Skip to content

Commit

Permalink
Add metrics for sync committee validation
Browse files Browse the repository at this point in the history
  • Loading branch information
Bez625 committed Jul 16, 2024
1 parent ab243bc commit 9eddb13
Show file tree
Hide file tree
Showing 7 changed files with 171 additions and 8 deletions.
2 changes: 1 addition & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ import (
)

// ReleaseVersion is the release version for the code.
var ReleaseVersion = "1.9.0-alpha.4-dev"
var ReleaseVersion = "1.9.0-alpha.5-dev"

func main() {
exitCode := main2()
Expand Down
12 changes: 11 additions & 1 deletion services/controller/standard/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/attestantio/go-eth2-client/api"
apiv1 "github.com/attestantio/go-eth2-client/api/v1"
"github.com/attestantio/go-eth2-client/spec/phase0"
"github.com/attestantio/vouch/services/metrics"
e2wtypes "github.com/wealdtech/go-eth2-wallet-types/v2"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
Expand Down Expand Up @@ -345,6 +346,8 @@ func (s *Service) VerifySyncCommitteeMessages(ctx context.Context, data *apiv1.H
_, span := otel.Tracer("attestantio.vouch.services.controller.standard").Start(ctx, "VerifySyncCommitteeMessages")
defer span.End()

messengerMonitor := s.monitor.(metrics.SyncCommitteeValidationMonitor)

// We verify against the previous slot as that is when the sync committee will have reported.
previousSlot := data.Slot - 1
currentSlot := data.Slot
Expand All @@ -367,6 +370,7 @@ func (s *Service) VerifySyncCommitteeMessages(ctx context.Context, data *apiv1.H
})
if err != nil {
log.Debug().Err(err).Msg("Failed to retrieve head block for sync committee validation")
messengerMonitor.SyncCommitteeGetHeadBlockFailedInc(previousSlot, data.Block.String())
return
}
parentRoot, err := blockResponse.Data.ParentRoot()
Expand All @@ -375,8 +379,11 @@ func (s *Service) VerifySyncCommitteeMessages(ctx context.Context, data *apiv1.H
return
}
if !bytes.Equal(parentRoot[:], previousSlotData.Root[:]) {
log.Trace().Stringer("head_parent_root", parentRoot).Stringer("broadcast_root", previousSlotData.Root).
parentRootString := parentRoot.String()
previousSlotRoot := previousSlotData.Root.String()
log.Trace().Str("head_parent_root", parentRootString).Str("broadcast_root", previousSlotRoot).
Msg("Parent root does not equal sync committee root broadcast")
messengerMonitor.SyncCommitteeMessagesHeadMismatchInc(previousSlot, parentRootString, previousSlotRoot)
return
}
syncAggregate, err := blockResponse.Data.SyncAggregate()
Expand All @@ -393,7 +400,10 @@ func (s *Service) VerifySyncCommitteeMessages(ctx context.Context, data *apiv1.H
log.Debug().Uint64("validator_index", uint64(validatorIndex)).
Uint64("committee_index", uint64(committeeIndex)).
Msg("Validator not included in SyncAggregate SyncCommitteeBits")
messengerMonitor.SyncCommitteeSyncAggregateMissingInc(previousSlot, validatorIndex, committeeIndex)
continue
}
messengerMonitor.SyncCommitteeSyncAggregateFoundInc(previousSlot, validatorIndex, committeeIndex)
}
}
}
15 changes: 15 additions & 0 deletions services/metrics/null/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,3 +108,18 @@ func (*Service) SyncCommitteeSubscriptionCompleted(_ time.Time, _ string) {
// SyncCommitteeSubscribers sets the number of sync committees to which our validators are subscribed.
func (*Service) SyncCommitteeSubscribers(_ int) {
}

// SyncCommitteeSyncAggregateFoundInc is called when our sync committee participation was included in the SyncAggregate for the next head.
func (*Service) SyncCommitteeSyncAggregateFoundInc(_ phase0.Slot, _ phase0.ValidatorIndex, _ phase0.CommitteeIndex) {
}

// SyncCommitteeSyncAggregateMissingInc is called when our sync committee participation was not included in the SyncAggregate for the next head.
func (*Service) SyncCommitteeSyncAggregateMissingInc(_ phase0.Slot, _ phase0.ValidatorIndex, _ phase0.CommitteeIndex) {
}

// SyncCommitteeGetHeadBlockFailedInc is called when validation for a sync committee fails due to being unable to retrieve the head block.
func (*Service) SyncCommitteeGetHeadBlockFailedInc(_ phase0.Slot, _ string) {}

// SyncCommitteeMessagesHeadMismatchInc is called when a sync committee message was known to not match the next head block.
func (*Service) SyncCommitteeMessagesHeadMismatchInc(_ phase0.Slot, _, _ string) {
}
25 changes: 20 additions & 5 deletions services/metrics/prometheus/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,11 @@ type Service struct {
syncCommitteeMessageMarkTimer prometheus.Histogram
syncCommitteeMessageProcessLatestSlot prometheus.Gauge

syncCommitteeValidationHeadMismatches *prometheus.CounterVec
syncCommitteeValidationAggregateFound *prometheus.CounterVec
syncCommitteeValidationAggregateMissing *prometheus.CounterVec
syncCommitteeValidationGetHeadFailures *prometheus.CounterVec

syncCommitteeAggregationProcessTimer prometheus.Histogram
syncCommitteeAggregationProcessRequests *prometheus.CounterVec
syncCommitteeAggregationCoverageRatio prometheus.Histogram
Expand Down Expand Up @@ -108,11 +113,8 @@ func New(_ context.Context, params ...Parameter) (*Service, error) {
if err := s.setupAttestationAggregationMetrics(); err != nil {
return nil, errors.Wrap(err, "failed to set up attestation aggregation metrics")
}
if err := s.setupSyncCommitteeMessageMetrics(); err != nil {
return nil, errors.Wrap(err, "failed to set up sync committee message metrics")
}
if err := s.setupSyncCommitteeAggregationMetrics(); err != nil {
return nil, errors.Wrap(err, "failed to set up sync committee aggregation metrics")
if err := s.setupAllSyncCommitteeMetrics(); err != nil {
return nil, err
}
if err := s.setupBeaconCommitteeSubscriptionMetrics(); err != nil {
return nil, errors.Wrap(err, "failed to set up beacon committee subscription metrics")
Expand Down Expand Up @@ -143,6 +145,19 @@ func New(_ context.Context, params ...Parameter) (*Service, error) {
return s, nil
}

func (s *Service) setupAllSyncCommitteeMetrics() error {
if err := s.setupSyncCommitteeMessageMetrics(); err != nil {
return errors.Wrap(err, "failed to set up sync committee message metrics")
}
if err := s.setupSyncCommitteeValidationMetrics(); err != nil {
return errors.Wrap(err, "failed to set up sync committee validation metrics")
}
if err := s.setupSyncCommitteeAggregationMetrics(); err != nil {
return errors.Wrap(err, "failed to set up sync committee aggregation metrics")
}
return nil
}

// Presenter returns the presenter for the events.
func (*Service) Presenter() string {
return "prometheus"
Expand Down
1 change: 0 additions & 1 deletion services/metrics/prometheus/synccommitteemessenger.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,6 @@ func (s *Service) setupSyncCommitteeMessageMetrics() error {
return err
}
}

return nil
}

Expand Down
112 changes: 112 additions & 0 deletions services/metrics/prometheus/synccommitteevalidation.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
// Copyright © 2021 - 2024 Attestant Limited.
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package prometheus

import (
"errors"
"strconv"

"github.com/attestantio/go-eth2-client/spec/phase0"
"github.com/prometheus/client_golang/prometheus"
)

func (s *Service) setupSyncCommitteeValidationMetrics() error {
s.syncCommitteeValidationHeadMismatches = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: "vouch",
Subsystem: "synccommitteevalidation",
Name: "mismatches_total",
Help: "The number of sync committee messages we broadcast that did not match the next head root.",
}, []string{"slot", "head_parent_root", "broadcast_root"})
if err := prometheus.Register(s.syncCommitteeValidationHeadMismatches); err != nil {
var alreadyRegisteredError prometheus.AlreadyRegisteredError
if ok := errors.As(err, &alreadyRegisteredError); ok {
s.syncCommitteeValidationHeadMismatches = alreadyRegisteredError.ExistingCollector.(*prometheus.CounterVec)
} else {
return err
}
}

s.syncCommitteeValidationGetHeadFailures = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: "vouch",
Subsystem: "synccommitteevalidation",
Name: "get_head_failures_total",
Help: "The number of times sync committee validation failed due to being unable to retrieve the head block.",
}, []string{"slot", "block"})
if err := prometheus.Register(s.syncCommitteeValidationGetHeadFailures); err != nil {
var alreadyRegisteredError prometheus.AlreadyRegisteredError
if ok := errors.As(err, &alreadyRegisteredError); ok {
s.syncCommitteeValidationGetHeadFailures = alreadyRegisteredError.ExistingCollector.(*prometheus.CounterVec)
} else {
return err
}
}

s.syncCommitteeValidationAggregateFound = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: "vouch",
Subsystem: "synccommitteevalidation",
Name: "found_total",
Help: "The number of sync committee messages that were included in the sync aggregate.",
}, []string{"slot", "validator_index", "contribution_index"})
if err := prometheus.Register(s.syncCommitteeValidationAggregateFound); err != nil {
var alreadyRegisteredError prometheus.AlreadyRegisteredError
if ok := errors.As(err, &alreadyRegisteredError); ok {
s.syncCommitteeValidationAggregateFound = alreadyRegisteredError.ExistingCollector.(*prometheus.CounterVec)
} else {
return err
}
}

s.syncCommitteeValidationAggregateMissing = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: "vouch",
Subsystem: "synccommitteevalidation",
Name: "missing_total",
Help: "The number of sync committee messages that were not included in the sync aggregate.",
}, []string{"slot", "validator_index", "contribution_index"})
if err := prometheus.Register(s.syncCommitteeValidationAggregateMissing); err != nil {
var alreadyRegisteredError prometheus.AlreadyRegisteredError
if ok := errors.As(err, &alreadyRegisteredError); ok {
s.syncCommitteeValidationAggregateMissing = alreadyRegisteredError.ExistingCollector.(*prometheus.CounterVec)
} else {
return err
}
}

return nil
}

// SyncCommitteeSyncAggregateFoundInc is called when our sync committee participation was included in the SyncAggregate for the next head.
func (s *Service) SyncCommitteeSyncAggregateFoundInc(slot phase0.Slot, validatorIndex phase0.ValidatorIndex, committeeIndex phase0.CommitteeIndex) {
converter := func(unitToConvert uint64) string {
return strconv.FormatUint(unitToConvert, 10)
}
s.syncCommitteeValidationAggregateFound.WithLabelValues(converter(uint64(slot)), converter(uint64(validatorIndex)), converter(uint64(committeeIndex))).Add(1)
}

// SyncCommitteeSyncAggregateMissingInc is called when our sync committee participation was not included in the SyncAggregate for the next head.
func (s *Service) SyncCommitteeSyncAggregateMissingInc(slot phase0.Slot, validatorIndex phase0.ValidatorIndex, committeeIndex phase0.CommitteeIndex) {
converter := func(unitToConvert uint64) string {
return strconv.FormatUint(unitToConvert, 10)
}
s.syncCommitteeValidationAggregateMissing.WithLabelValues(converter(uint64(slot)), converter(uint64(validatorIndex)), converter(uint64(committeeIndex))).Add(1)
}

// SyncCommitteeGetHeadBlockFailedInc is called when validation for a sync committee fails due to being unable to retrieve the head block.
func (s *Service) SyncCommitteeGetHeadBlockFailedInc(slot phase0.Slot, block string) {
s.syncCommitteeValidationGetHeadFailures.WithLabelValues(strconv.FormatUint(uint64(slot), 10), block).Add(1)
}

// SyncCommitteeMessagesHeadMismatchInc is called when a sync committee message was known to not match the next head block.
func (s *Service) SyncCommitteeMessagesHeadMismatchInc(slot phase0.Slot, headParentRoot, broadcastRoot string) {
s.syncCommitteeValidationAggregateMissing.WithLabelValues(strconv.FormatUint(uint64(slot), 10), headParentRoot, broadcastRoot).Add(1)
}
12 changes: 12 additions & 0 deletions services/metrics/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,18 @@ type SyncCommitteeSubscriptionMonitor interface {
SyncCommitteeSubscribers(subscribers int)
}

// SyncCommitteeValidationMonitor provides methods to monitor the sync committee validation process.
type SyncCommitteeValidationMonitor interface {
// SyncCommitteeSyncAggregateFoundInc is called when our sync committee participation was included in the SyncAggregate for the next head.
SyncCommitteeSyncAggregateFoundInc(slot phase0.Slot, validatorIndex phase0.ValidatorIndex, committeeIndex phase0.CommitteeIndex)
// SyncCommitteeSyncAggregateMissingInc is called when our sync committee participation was not included in the SyncAggregate for the next head.
SyncCommitteeSyncAggregateMissingInc(slot phase0.Slot, validatorIndex phase0.ValidatorIndex, committeeIndex phase0.CommitteeIndex)
// SyncCommitteeGetHeadBlockFailedInc is called when validation for a sync committee fails due to being unable to retrieve the head block.
SyncCommitteeGetHeadBlockFailedInc(slot phase0.Slot, block string)
// SyncCommitteeMessagesHeadMismatchInc is called when a sync committee message was known to not match the next head block.
SyncCommitteeMessagesHeadMismatchInc(slot phase0.Slot, headParentRoot, broadcastRoot string)
}

// AccountManagerMonitor provides methods to monitor the account manager.
type AccountManagerMonitor interface {
// Accounts sets the number of accounts in a given state.
Expand Down

0 comments on commit 9eddb13

Please sign in to comment.