From d68848ba6c4094028766505e86642a5ffeecd59f Mon Sep 17 00:00:00 2001 From: Jacob Shufro Date: Sat, 9 Sep 2023 11:37:44 -0400 Subject: [PATCH] Support solo validators auth/guards --- metrics/epoch.go | 54 ++++++++++++++++++-- router/authentication_test.go | 11 +++-- router/router.go | 92 ++++++++++++++++++++++++++++++++++- 3 files changed, 145 insertions(+), 12 deletions(-) diff --git a/metrics/epoch.go b/metrics/epoch.go index 39c8b40..c47b308 100644 --- a/metrics/epoch.go +++ b/metrics/epoch.go @@ -8,8 +8,13 @@ import ( rptypes "github.com/rocket-pool/rocketpool-go/types" ) +type maps struct { + solo sync.Map + rp sync.Map +} + var currentIdx uint64 -var epochs [4]sync.Map +var epochs [4]maps var registry *MetricsRegistry @@ -29,19 +34,30 @@ func OnHead(epoch uint64) { registry.Counter("head_advanced").Inc() // Clear the new index - epochs[newCurrentIdx] = sync.Map{} + epochs[newCurrentIdx] = maps{} // atomic provides us with a memory fence in go 1.19+, which means the above assignments // are always observable on other goroutines when the newCurrentIdx is loaded atomic.StoreUint64(¤tIdx, newCurrentIdx) } +func ObserveSoloValidator(node common.Address, pubkey rptypes.ValidatorPubkey) { + registry.Counter("observed_solo_validator").Inc() + var nodeMap *sync.Map = &sync.Map{} + + epoch := &epochs[atomic.LoadUint64(¤tIdx)] + + iface, _ := epoch.solo.LoadOrStore(node, nodeMap) + nodeMap = iface.(*sync.Map) + _, _ = nodeMap.LoadOrStore(pubkey, struct{}{}) +} + func ObserveValidator(node common.Address, pubkey rptypes.ValidatorPubkey) { registry.Counter("observed_validator").Inc() var nodeMap *sync.Map = &sync.Map{} epoch := &epochs[atomic.LoadUint64(¤tIdx)] - iface, _ := epoch.LoadOrStore(node, nodeMap) + iface, _ := epoch.rp.LoadOrStore(node, nodeMap) nodeMap = iface.(*sync.Map) _, _ = nodeMap.LoadOrStore(pubkey, struct{}{}) } @@ -58,7 +74,7 @@ func previousEpochIdx() uint64 { func PreviousEpochNodes() (out float64) { epoch := &epochs[previousEpochIdx()] - epoch.Range(func(key, value any) bool { + epoch.rp.Range(func(key, value any) bool { out += 1 return true }) @@ -69,7 +85,33 @@ func PreviousEpochNodes() (out float64) { func PreviousEpochValidators() (out float64) { epoch := &epochs[previousEpochIdx()] - epoch.Range(func(key, value any) bool { + epoch.rp.Range(func(key, value any) bool { + m := value.(*sync.Map) + m.Range(func(key2, value2 any) bool { + out += 1 + return true + }) + return true + }) + + return +} + +func PreviousEpochSoloNodes() (out float64) { + + epoch := &epochs[previousEpochIdx()] + epoch.solo.Range(func(key, value any) bool { + out += 1 + return true + }) + + return +} + +func PreviousEpochSoloValidators() (out float64) { + + epoch := &epochs[previousEpochIdx()] + epoch.solo.Range(func(key, value any) bool { m := value.(*sync.Map) m.Range(func(key2, value2 any) bool { out += 1 @@ -95,6 +137,8 @@ func InitEpochMetrics() { registry = NewMetricsRegistry("epoch") registry.GaugeFunc("validators_seen", PreviousEpochValidators) registry.GaugeFunc("nodes_seen", PreviousEpochNodes) + registry.GaugeFunc("validators_seen_solo", PreviousEpochSoloValidators) + registry.GaugeFunc("nodes_seen_solo", PreviousEpochSoloNodes) registry.GaugeFunc("current_idx", CurrentIdx) registry.GaugeFunc("previous_idx", PreviousIdx) } diff --git a/router/authentication_test.go b/router/authentication_test.go index f7b27a7..28ee461 100644 --- a/router/authentication_test.go +++ b/router/authentication_test.go @@ -6,6 +6,7 @@ import ( "time" "github.com/Rocket-Pool-Rescue-Node/credentials" + "github.com/Rocket-Pool-Rescue-Node/credentials/pb" "github.com/Rocket-Pool-Rescue-Node/rescue-proxy/metrics" ) @@ -31,7 +32,7 @@ func TestValidCredential(t *testing.T) { defer teardown() // Create a valid credential - cred, err := cm.Create(time.Now(), nodeId) + cred, err := cm.Create(time.Now(), nodeId, pb.OperatorType_OT_ROCKETPOOL) if err != nil { t.Error(err) } @@ -55,7 +56,7 @@ func TestExpiredCredential(t *testing.T) { defer teardown() // Create a valid credential - cred, err := cm.Create(time.Now().Add(-time.Hour), nodeId) + cred, err := cm.Create(time.Now().Add(-time.Hour), nodeId, pb.OperatorType_OT_ROCKETPOOL) if err != nil { t.Error(err) } @@ -78,7 +79,7 @@ func TestEmptyUsername(t *testing.T) { defer teardown() // Create a valid credential - cred, err := cm.Create(time.Now().Add(-time.Hour), nodeId) + cred, err := cm.Create(time.Now().Add(-time.Hour), nodeId, pb.OperatorType_OT_ROCKETPOOL) if err != nil { t.Error(err) } @@ -101,7 +102,7 @@ func TestEmptyPassword(t *testing.T) { defer teardown() // Create a valid credential - cred, err := cm.Create(time.Now().Add(-time.Hour), nodeId) + cred, err := cm.Create(time.Now().Add(-time.Hour), nodeId, pb.OperatorType_OT_ROCKETPOOL) if err != nil { t.Error(err) } @@ -121,7 +122,7 @@ func TestFutureCredential(t *testing.T) { defer teardown() // Create a valid credential - cred, err := cm.Create(time.Now().Add(time.Hour), nodeId) + cred, err := cm.Create(time.Now().Add(time.Hour), nodeId, pb.OperatorType_OT_ROCKETPOOL) if err != nil { t.Error(err) } diff --git a/router/router.go b/router/router.go index 3b37833..fa97661 100644 --- a/router/router.go +++ b/router/router.go @@ -8,6 +8,7 @@ import ( "strings" "time" + "github.com/Rocket-Pool-Rescue-Node/credentials/pb" "github.com/Rocket-Pool-Rescue-Node/rescue-proxy/consensuslayer" "github.com/Rocket-Pool-Rescue-Node/rescue-proxy/executionlayer" "github.com/Rocket-Pool-Rescue-Node/rescue-proxy/metrics" @@ -39,6 +40,38 @@ type ProxyRouter struct { // see: https://pkg.go.dev/context#WithValue type prContextKey string +func (pr *ProxyRouter) pbpGuardSolo(withdrawalAddress common.Address, proposers gbp.PrepareBeaconProposerRequest) (gbp.AuthenticationStatus, error) { + + indices := make([]string, 0, len(proposers)) + + for _, proposer := range proposers { + + // Solo validators building local blocks must use their withdrawal address as their fee recipient + if !strings.EqualFold(withdrawalAddress.String(), proposer.FeeRecipient) { + pr.m.Counter("prepare_beacon_incorrect_fee_recipient_solo").Inc() + return gbp.Conflict, fmt.Errorf("Solo validator fee recipient didn't match 0x01 credential for validator %s: expected %s", + proposer.ValidatorIndex, withdrawalAddress.String()) + } + + // Collect indices for metrics + indices = append(indices, proposer.ValidatorIndex) + pr.m.Counter("prepare_beacon_correct_fee_recipient_solo").Inc() + } + + pubkeys, err := pr.CL.GetValidatorPubkey(indices) + if err != nil { + // Return here and skip metrics for now. + pr.Logger.Warn("Failed to query solo validator pubkeys for metrics", zap.Error(err)) + return gbp.Allowed, nil + } + + for _, pubkey := range pubkeys { + metrics.ObserveSoloValidator(withdrawalAddress, pubkey) + } + + return gbp.Allowed, nil +} + func (pr *ProxyRouter) prepareBeaconProposerGuard(proposers gbp.PrepareBeaconProposerRequest, ctx context.Context) (gbp.AuthenticationStatus, error) { pr.m.Counter("prepare_beacon_proposer").Inc() @@ -63,7 +96,18 @@ func (pr *ProxyRouter) prepareBeaconProposerGuard(proposers gbp.PrepareBeaconPro pr.Logger.Warn("Unable to retrieve node address cached on request context") return gbp.InternalError, nil } + + // Grab the operator type + operatorType, ok := ctx.Value(prContextKey("operator_type")).(pb.OperatorType) + if !ok { + pr.Logger.Warn("Unable to retrieve operator_type cached on request context") + return gbp.InternalError, nil + } + authedNodeAddr := common.BytesToAddress(authedNode) + if operatorType == pb.OperatorType_OT_SOLO { + return pr.pbpGuardSolo(authedNodeAddr, proposers) + } // Iterate the results and check the fee recipients against our expected values // Note: we iterate the map from the HTTP request to ensure every key is present in the @@ -114,6 +158,27 @@ func (pr *ProxyRouter) prepareBeaconProposerGuard(proposers gbp.PrepareBeaconPro return gbp.Allowed, nil } +func (pr *ProxyRouter) rvGuardSolo(withdrawalAddress common.Address, validators gbp.RegisterValidatorRequest) (gbp.AuthenticationStatus, error) { + + // We don't care to guard register_validator for solo credentials, since it requires a bls signature, and unlike RP, + // there are no constraints on valid fee recipients. + // + // Collect metrics only. + + for _, validator := range validators { + pubkeyStr := strings.TrimPrefix(validator.Message.Pubkey, "0x") + + pubkey, err := rptypes.HexToValidatorPubkey(pubkeyStr) + if err != nil { + pr.Logger.Warn("Malformed pubkey in register_validator_request", zap.Error(err), zap.String("pubkey", pubkeyStr)) + continue + } + metrics.ObserveSoloValidator(withdrawalAddress, pubkey) + } + + return gbp.Allowed, nil +} + func (pr *ProxyRouter) registerValidatorGuard(validators gbp.RegisterValidatorRequest, ctx context.Context) (gbp.AuthenticationStatus, error) { pr.m.Counter("register_validator").Inc() @@ -123,8 +188,20 @@ func (pr *ProxyRouter) registerValidatorGuard(validators gbp.RegisterValidatorRe pr.Logger.Warn("Unable to retrieve node address cached on request context") return gbp.InternalError, nil } + + // Grab the operator type + operatorType, ok := ctx.Value(prContextKey("operator_type")).(pb.OperatorType) + if !ok { + pr.Logger.Warn("Unable to retrieve operator_type cached on request context") + return gbp.InternalError, nil + } + authedNodeAddr := common.BytesToAddress(authedNode) + if operatorType == pb.OperatorType_OT_SOLO { + return pr.rvGuardSolo(authedNodeAddr, validators) + } + for _, validator := range validators { pubkeyStr := strings.TrimPrefix(validator.Message.Pubkey, "0x") @@ -199,10 +276,16 @@ func (pr *ProxyRouter) authenticate(r *http.Request) (gbp.AuthenticationStatus, } // If auth succeeds: - pr.m.Counter("auth_ok").Inc() + if ac.Credential.OperatorType == pb.OperatorType_OT_ROCKETPOOL { + pr.m.Counter("auth_ok").Inc() + } else { + pr.m.Counter("auth_ok_solo").Inc() + } pr.Logger.Debug("Proxying Guarded URI", zap.String("uri", r.RequestURI)) // Add the node address to the request context ctx := context.WithValue(r.Context(), prContextKey("node"), ac.Credential.NodeId) + // Add the operator type to the request context + ctx = context.WithValue(ctx, prContextKey("operator_type"), ac.Credential.OperatorType) return gbp.Allowed, ctx, nil } @@ -228,9 +311,14 @@ func (pr *ProxyRouter) grpcAuthenticate(md metadata.MD) (gbp.AuthenticationStatu return err.gbpStatus, nil, err } - pr.gm.Counter("auth_ok").Inc() + if ac.Credential.OperatorType == pb.OperatorType_OT_ROCKETPOOL { + pr.gm.Counter("auth_ok").Inc() + } else { + pr.gm.Counter("auth_ok_solo").Inc() + } ctx := context.WithValue(context.Background(), prContextKey("node"), ac.Credential.NodeId) + ctx = context.WithValue(ctx, prContextKey("operator_type"), ac.Credential.OperatorType) return gbp.Allowed, ctx, nil }