Skip to content

Commit

Permalink
Support solo validators auth/guards
Browse files Browse the repository at this point in the history
  • Loading branch information
jshufro committed Sep 9, 2023
1 parent ffc8e51 commit d68848b
Show file tree
Hide file tree
Showing 3 changed files with 145 additions and 12 deletions.
54 changes: 49 additions & 5 deletions metrics/epoch.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,13 @@ import (
rptypes "github.com/rocket-pool/rocketpool-go/types"
)

type maps struct {
solo sync.Map
rp sync.Map
}

var currentIdx uint64
var epochs [4]sync.Map
var epochs [4]maps

var registry *MetricsRegistry

Expand All @@ -29,19 +34,30 @@ func OnHead(epoch uint64) {
registry.Counter("head_advanced").Inc()

// Clear the new index
epochs[newCurrentIdx] = sync.Map{}
epochs[newCurrentIdx] = maps{}
// atomic provides us with a memory fence in go 1.19+, which means the above assignments
// are always observable on other goroutines when the newCurrentIdx is loaded
atomic.StoreUint64(&currentIdx, newCurrentIdx)
}

func ObserveSoloValidator(node common.Address, pubkey rptypes.ValidatorPubkey) {
registry.Counter("observed_solo_validator").Inc()
var nodeMap *sync.Map = &sync.Map{}

epoch := &epochs[atomic.LoadUint64(&currentIdx)]

iface, _ := epoch.solo.LoadOrStore(node, nodeMap)
nodeMap = iface.(*sync.Map)
_, _ = nodeMap.LoadOrStore(pubkey, struct{}{})
}

func ObserveValidator(node common.Address, pubkey rptypes.ValidatorPubkey) {
registry.Counter("observed_validator").Inc()
var nodeMap *sync.Map = &sync.Map{}

epoch := &epochs[atomic.LoadUint64(&currentIdx)]

iface, _ := epoch.LoadOrStore(node, nodeMap)
iface, _ := epoch.rp.LoadOrStore(node, nodeMap)
nodeMap = iface.(*sync.Map)
_, _ = nodeMap.LoadOrStore(pubkey, struct{}{})
}
Expand All @@ -58,7 +74,7 @@ func previousEpochIdx() uint64 {
func PreviousEpochNodes() (out float64) {

epoch := &epochs[previousEpochIdx()]
epoch.Range(func(key, value any) bool {
epoch.rp.Range(func(key, value any) bool {
out += 1
return true
})
Expand All @@ -69,7 +85,33 @@ func PreviousEpochNodes() (out float64) {
func PreviousEpochValidators() (out float64) {

epoch := &epochs[previousEpochIdx()]
epoch.Range(func(key, value any) bool {
epoch.rp.Range(func(key, value any) bool {
m := value.(*sync.Map)
m.Range(func(key2, value2 any) bool {
out += 1
return true
})
return true
})

return
}

func PreviousEpochSoloNodes() (out float64) {

epoch := &epochs[previousEpochIdx()]
epoch.solo.Range(func(key, value any) bool {
out += 1
return true
})

return
}

func PreviousEpochSoloValidators() (out float64) {

epoch := &epochs[previousEpochIdx()]
epoch.solo.Range(func(key, value any) bool {
m := value.(*sync.Map)
m.Range(func(key2, value2 any) bool {
out += 1
Expand All @@ -95,6 +137,8 @@ func InitEpochMetrics() {
registry = NewMetricsRegistry("epoch")
registry.GaugeFunc("validators_seen", PreviousEpochValidators)
registry.GaugeFunc("nodes_seen", PreviousEpochNodes)
registry.GaugeFunc("validators_seen_solo", PreviousEpochSoloValidators)
registry.GaugeFunc("nodes_seen_solo", PreviousEpochSoloNodes)
registry.GaugeFunc("current_idx", CurrentIdx)
registry.GaugeFunc("previous_idx", PreviousIdx)
}
11 changes: 6 additions & 5 deletions router/authentication_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"time"

"github.com/Rocket-Pool-Rescue-Node/credentials"
"github.com/Rocket-Pool-Rescue-Node/credentials/pb"
"github.com/Rocket-Pool-Rescue-Node/rescue-proxy/metrics"
)

Expand All @@ -31,7 +32,7 @@ func TestValidCredential(t *testing.T) {
defer teardown()

// Create a valid credential
cred, err := cm.Create(time.Now(), nodeId)
cred, err := cm.Create(time.Now(), nodeId, pb.OperatorType_OT_ROCKETPOOL)
if err != nil {
t.Error(err)
}
Expand All @@ -55,7 +56,7 @@ func TestExpiredCredential(t *testing.T) {
defer teardown()

// Create a valid credential
cred, err := cm.Create(time.Now().Add(-time.Hour), nodeId)
cred, err := cm.Create(time.Now().Add(-time.Hour), nodeId, pb.OperatorType_OT_ROCKETPOOL)
if err != nil {
t.Error(err)
}
Expand All @@ -78,7 +79,7 @@ func TestEmptyUsername(t *testing.T) {
defer teardown()

// Create a valid credential
cred, err := cm.Create(time.Now().Add(-time.Hour), nodeId)
cred, err := cm.Create(time.Now().Add(-time.Hour), nodeId, pb.OperatorType_OT_ROCKETPOOL)
if err != nil {
t.Error(err)
}
Expand All @@ -101,7 +102,7 @@ func TestEmptyPassword(t *testing.T) {
defer teardown()

// Create a valid credential
cred, err := cm.Create(time.Now().Add(-time.Hour), nodeId)
cred, err := cm.Create(time.Now().Add(-time.Hour), nodeId, pb.OperatorType_OT_ROCKETPOOL)
if err != nil {
t.Error(err)
}
Expand All @@ -121,7 +122,7 @@ func TestFutureCredential(t *testing.T) {
defer teardown()

// Create a valid credential
cred, err := cm.Create(time.Now().Add(time.Hour), nodeId)
cred, err := cm.Create(time.Now().Add(time.Hour), nodeId, pb.OperatorType_OT_ROCKETPOOL)
if err != nil {
t.Error(err)
}
Expand Down
92 changes: 90 additions & 2 deletions router/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"strings"
"time"

"github.com/Rocket-Pool-Rescue-Node/credentials/pb"
"github.com/Rocket-Pool-Rescue-Node/rescue-proxy/consensuslayer"
"github.com/Rocket-Pool-Rescue-Node/rescue-proxy/executionlayer"
"github.com/Rocket-Pool-Rescue-Node/rescue-proxy/metrics"
Expand Down Expand Up @@ -39,6 +40,38 @@ type ProxyRouter struct {
// see: https://pkg.go.dev/context#WithValue
type prContextKey string

func (pr *ProxyRouter) pbpGuardSolo(withdrawalAddress common.Address, proposers gbp.PrepareBeaconProposerRequest) (gbp.AuthenticationStatus, error) {

indices := make([]string, 0, len(proposers))

for _, proposer := range proposers {

// Solo validators building local blocks must use their withdrawal address as their fee recipient
if !strings.EqualFold(withdrawalAddress.String(), proposer.FeeRecipient) {
pr.m.Counter("prepare_beacon_incorrect_fee_recipient_solo").Inc()
return gbp.Conflict, fmt.Errorf("Solo validator fee recipient didn't match 0x01 credential for validator %s: expected %s",
proposer.ValidatorIndex, withdrawalAddress.String())
}

// Collect indices for metrics
indices = append(indices, proposer.ValidatorIndex)
pr.m.Counter("prepare_beacon_correct_fee_recipient_solo").Inc()
}

pubkeys, err := pr.CL.GetValidatorPubkey(indices)
if err != nil {
// Return here and skip metrics for now.
pr.Logger.Warn("Failed to query solo validator pubkeys for metrics", zap.Error(err))
return gbp.Allowed, nil
}

for _, pubkey := range pubkeys {
metrics.ObserveSoloValidator(withdrawalAddress, pubkey)
}

return gbp.Allowed, nil
}

func (pr *ProxyRouter) prepareBeaconProposerGuard(proposers gbp.PrepareBeaconProposerRequest, ctx context.Context) (gbp.AuthenticationStatus, error) {
pr.m.Counter("prepare_beacon_proposer").Inc()

Expand All @@ -63,7 +96,18 @@ func (pr *ProxyRouter) prepareBeaconProposerGuard(proposers gbp.PrepareBeaconPro
pr.Logger.Warn("Unable to retrieve node address cached on request context")
return gbp.InternalError, nil
}

// Grab the operator type
operatorType, ok := ctx.Value(prContextKey("operator_type")).(pb.OperatorType)
if !ok {
pr.Logger.Warn("Unable to retrieve operator_type cached on request context")
return gbp.InternalError, nil
}

authedNodeAddr := common.BytesToAddress(authedNode)
if operatorType == pb.OperatorType_OT_SOLO {
return pr.pbpGuardSolo(authedNodeAddr, proposers)
}

// Iterate the results and check the fee recipients against our expected values
// Note: we iterate the map from the HTTP request to ensure every key is present in the
Expand Down Expand Up @@ -114,6 +158,27 @@ func (pr *ProxyRouter) prepareBeaconProposerGuard(proposers gbp.PrepareBeaconPro
return gbp.Allowed, nil
}

func (pr *ProxyRouter) rvGuardSolo(withdrawalAddress common.Address, validators gbp.RegisterValidatorRequest) (gbp.AuthenticationStatus, error) {

// We don't care to guard register_validator for solo credentials, since it requires a bls signature, and unlike RP,
// there are no constraints on valid fee recipients.
//
// Collect metrics only.

for _, validator := range validators {
pubkeyStr := strings.TrimPrefix(validator.Message.Pubkey, "0x")

pubkey, err := rptypes.HexToValidatorPubkey(pubkeyStr)
if err != nil {
pr.Logger.Warn("Malformed pubkey in register_validator_request", zap.Error(err), zap.String("pubkey", pubkeyStr))
continue
}
metrics.ObserveSoloValidator(withdrawalAddress, pubkey)
}

return gbp.Allowed, nil
}

func (pr *ProxyRouter) registerValidatorGuard(validators gbp.RegisterValidatorRequest, ctx context.Context) (gbp.AuthenticationStatus, error) {
pr.m.Counter("register_validator").Inc()

Expand All @@ -123,8 +188,20 @@ func (pr *ProxyRouter) registerValidatorGuard(validators gbp.RegisterValidatorRe
pr.Logger.Warn("Unable to retrieve node address cached on request context")
return gbp.InternalError, nil
}

// Grab the operator type
operatorType, ok := ctx.Value(prContextKey("operator_type")).(pb.OperatorType)
if !ok {
pr.Logger.Warn("Unable to retrieve operator_type cached on request context")
return gbp.InternalError, nil
}

authedNodeAddr := common.BytesToAddress(authedNode)

if operatorType == pb.OperatorType_OT_SOLO {
return pr.rvGuardSolo(authedNodeAddr, validators)
}

for _, validator := range validators {
pubkeyStr := strings.TrimPrefix(validator.Message.Pubkey, "0x")

Expand Down Expand Up @@ -199,10 +276,16 @@ func (pr *ProxyRouter) authenticate(r *http.Request) (gbp.AuthenticationStatus,
}

// If auth succeeds:
pr.m.Counter("auth_ok").Inc()
if ac.Credential.OperatorType == pb.OperatorType_OT_ROCKETPOOL {
pr.m.Counter("auth_ok").Inc()
} else {
pr.m.Counter("auth_ok_solo").Inc()
}
pr.Logger.Debug("Proxying Guarded URI", zap.String("uri", r.RequestURI))
// Add the node address to the request context
ctx := context.WithValue(r.Context(), prContextKey("node"), ac.Credential.NodeId)
// Add the operator type to the request context
ctx = context.WithValue(ctx, prContextKey("operator_type"), ac.Credential.OperatorType)
return gbp.Allowed, ctx, nil
}

Expand All @@ -228,9 +311,14 @@ func (pr *ProxyRouter) grpcAuthenticate(md metadata.MD) (gbp.AuthenticationStatu
return err.gbpStatus, nil, err
}

pr.gm.Counter("auth_ok").Inc()
if ac.Credential.OperatorType == pb.OperatorType_OT_ROCKETPOOL {
pr.gm.Counter("auth_ok").Inc()
} else {
pr.gm.Counter("auth_ok_solo").Inc()
}

ctx := context.WithValue(context.Background(), prContextKey("node"), ac.Credential.NodeId)
ctx = context.WithValue(ctx, prContextKey("operator_type"), ac.Credential.OperatorType)
return gbp.Allowed, ctx, nil
}

Expand Down

0 comments on commit d68848b

Please sign in to comment.