Skip to content

Commit

Permalink
Merge pull request #40 from Rocket-Rescue-Node/jms/update-guards
Browse files Browse the repository at this point in the history
Pull out consensus cache into file, add withdrawal creds to cached info
  • Loading branch information
jshufro authored Oct 4, 2023
2 parents a423372 + 6a81290 commit 72f16b6
Show file tree
Hide file tree
Showing 4 changed files with 148 additions and 37 deletions.
54 changes: 54 additions & 0 deletions consensuslayer/cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package consensuslayer

import (
"context"

"github.com/allegro/bigcache/v3"
"github.com/ethereum/go-ethereum/common"
rptypes "github.com/rocket-pool/rocketpool-go/types"
)

const pubkeyLength = 48
const withdrawalLength = 20

type validatorCache struct {
*bigcache.BigCache
}

// The cache value will be a byte slice of 68 length
// First 48 bytes for the publick key
// Last 20 bytes for the address of the 0x01 credential, or a guardian address if a BLS key.

func newValidatorCache(ctx context.Context, config bigcache.Config) (*validatorCache, error) {
bc, err := bigcache.New(ctx, config)
if err != nil {
return nil, err
}

return &validatorCache{bc}, nil
}

func (c *validatorCache) Get(index string) *ValidatorInfo {
blob, err := c.BigCache.Get(index)
if err != nil {
return nil
}

if len(blob) != pubkeyLength+withdrawalLength {
return nil
}

out := ValidatorInfo{}
out.Pubkey = rptypes.BytesToValidatorPubkey(blob[:pubkeyLength])
out.WithdrawalAddress = common.BytesToAddress(blob[pubkeyLength:])
return &out
}

func (c *validatorCache) Set(index string, v *ValidatorInfo) {
var blob [pubkeyLength + withdrawalLength]byte

copy(blob[:], v.Pubkey[:])
copy(blob[pubkeyLength:], v.WithdrawalAddress[:])

_ = c.BigCache.Set(index, blob[:])
}
47 changes: 47 additions & 0 deletions consensuslayer/cache_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package consensuslayer

import (
"bytes"
"context"
"encoding/hex"
"testing"
"time"

"github.com/allegro/bigcache/v3"
"github.com/ethereum/go-ethereum/common"
rptypes "github.com/rocket-pool/rocketpool-go/types"
)

func TestCacheRoundTrip(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

config := bigcache.DefaultConfig(10 * time.Hour)

cache, err := newValidatorCache(ctx, config)
if err != nil {
t.Error(err)
}

expectedKey, _ := hex.DecodeString("b20fb4a9340f8b23197b8449db7a5d3d8d068570a2b61a8d78817537aac4fd5645434d3e89a918a3ba9d0b7707cbeae0")
expectedAddr, _ := hex.DecodeString("6a6d731664115Ff3C823807442a4dC94999b0923")

cache.Set("test", &ValidatorInfo{
Pubkey: rptypes.BytesToValidatorPubkey(expectedKey),
WithdrawalAddress: common.BytesToAddress(expectedAddr),
})

vInfo := cache.Get("test")
if vInfo == nil {
t.Fail()
return
}

if !bytes.EqualFold(vInfo.Pubkey[:], expectedKey) {
t.Fail()
}

if !bytes.EqualFold(vInfo.WithdrawalAddress[:], expectedAddr) {
t.Fail()
}
}
68 changes: 38 additions & 30 deletions consensuslayer/consensus-layer.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
package consensuslayer

import (
"bytes"
"context"
"encoding/hex"
"net/url"
"strconv"
"time"
Expand All @@ -12,16 +12,12 @@ import (
apiv1 "github.com/attestantio/go-eth2-client/api/v1"
"github.com/attestantio/go-eth2-client/http"
"github.com/attestantio/go-eth2-client/spec/phase0"
"github.com/ethereum/go-ethereum/common"
rptypes "github.com/rocket-pool/rocketpool-go/types"
"github.com/rs/zerolog"
"go.uber.org/zap"
)

const cacheTTL time.Duration = 10 * time.Hour
const cacheShards int = 32
const cacheGC time.Duration = 30 * time.Second
const cacheHardMaxMB int = 512

// ConsensusLayer provides an abstraction for the rescue proxy over the consensus layer
// It's specifically needed to map validator indices to pubkeys prior to EL validation
type ConsensusLayer struct {
Expand All @@ -31,8 +27,8 @@ type ConsensusLayer struct {
// Client for the BN
client *http.Service

// Caches index->pubkey for prepare_beacon_proposer
pubkeyCache *bigcache.BigCache
// Caches index->validatorInfo for prepare_beacon_proposer
validatorCache *validatorCache

// Disconnects from the bn
disconnect func()
Expand All @@ -41,6 +37,11 @@ type ConsensusLayer struct {
slotsPerEpoch uint64
}

type ValidatorInfo struct {
Pubkey rptypes.ValidatorPubkey
WithdrawalAddress common.Address
}

// NewConsensusLayer creates a new consensus layer client using the provided url and logger
func NewConsensusLayer(bnURL *url.URL, logger *zap.Logger) *ConsensusLayer {
out := &ConsensusLayer{}
Expand Down Expand Up @@ -85,7 +86,9 @@ func (c *ConsensusLayer) Init() error {
}
c.client = client.(*http.Service)

c.slotsPerEpoch, err = c.client.SlotsPerEpoch(context.Background())
speCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
c.slotsPerEpoch, err = c.client.SlotsPerEpoch(speCtx)
if err != nil {
c.logger.Warn("Couldn't get slots per epoch, defaulting to 32", zap.Error(err))
c.slotsPerEpoch = 32
Expand All @@ -101,12 +104,12 @@ func (c *ConsensusLayer) Init() error {
c.logger.Warn("Clouldn't subscribe to CL events. Metrics will be inaccurate", zap.Error(err))
}

cacheConfig := bigcache.DefaultConfig(cacheTTL)
cacheConfig.CleanWindow = cacheGC
cacheConfig.Shards = cacheShards
cacheConfig.HardMaxCacheSize = cacheHardMaxMB
validatorCacheConfig := bigcache.DefaultConfig(10 * time.Hour)
validatorCacheConfig.CleanWindow = 30 * time.Second
validatorCacheConfig.Shards = 32
validatorCacheConfig.HardMaxCacheSize = 512

c.pubkeyCache, err = bigcache.New(ctx, cacheConfig)
c.validatorCache, err = newValidatorCache(ctx, validatorCacheConfig)
if err != nil {
return err
}
Expand All @@ -116,30 +119,24 @@ func (c *ConsensusLayer) Init() error {
return nil
}

const pubkeyBytes = 48

// GetValidatorPubkey maps a validator index to a pubkey.
// GetValidatorIfno maps a validator index to a pubkey and withdrawal credential.
// It caches responses from the beacon client in memory for an arbitrary amount of time to save resources.
func (c *ConsensusLayer) GetValidatorPubkey(validatorIndices []string) (map[string]rptypes.ValidatorPubkey, error) {
func (c *ConsensusLayer) GetValidatorInfo(validatorIndices []string) (map[string]*ValidatorInfo, error) {

// Pre-allocate the retval based on the argument length
out := make(map[string]rptypes.ValidatorPubkey, len(validatorIndices))
out := make(map[string]*ValidatorInfo, len(validatorIndices))
missing := make([]phase0.ValidatorIndex, 0, len(validatorIndices))

for _, validatorIndex := range validatorIndices {
// Check the cache first
pubkey, err := c.pubkeyCache.Get(validatorIndex)
if err == nil {
if len(pubkey) != pubkeyBytes {
c.logger.Warn("Invalid pubkey from beacon node", zap.String("key", hex.EncodeToString(pubkey)))
continue
}
validatorInfo := c.validatorCache.Get(validatorIndex)
if validatorInfo != nil {
// Add the pubkey to the output. We have to cast it to an array, but the length is correct (see above)
out[validatorIndex] = *(*rptypes.ValidatorPubkey)(pubkey)
out[validatorIndex] = validatorInfo
c.logger.Debug("Cache hit", zap.String("validator", validatorIndex))
c.m.Counter("cache_hit").Inc()
} else {
// An error means the record wasn't in the cache
// A nil value means the record wasn't in the cache or there was an error
// Add the index to the list to be queried against the BN
index, err := strconv.ParseUint(validatorIndex, 10, 64)
if err != nil {
Expand All @@ -165,10 +162,21 @@ func (c *ConsensusLayer) GetValidatorPubkey(validatorIndices []string) (map[stri
for index, validator := range resp {
strIndex := strconv.FormatUint(uint64(index), 10)
pubkey := rptypes.ValidatorPubkey(validator.Validator.PublicKey)
out[strIndex] = pubkey
withdrawalCredentials := validator.Validator.WithdrawalCredentials

out[strIndex] = &ValidatorInfo{
Pubkey: pubkey,
}

if !bytes.HasPrefix(withdrawalCredentials, []byte{0x01}) {
c.logger.Warn("0x00 Validator seen", zap.Binary("pubkey", pubkey.Bytes()))
} else {
// BytesToAddress will cut off all but the last 20 bytes
out[strIndex].WithdrawalAddress = common.BytesToAddress(withdrawalCredentials)
}

// Add it to the cache. Ignore errors, we can always look the key up later
_ = c.pubkeyCache.Set(strIndex, pubkey[:])
c.validatorCache.Set(strIndex, out[strIndex])
c.m.Counter("cache_add").Inc()
}

Expand Down Expand Up @@ -196,7 +204,7 @@ func (c *ConsensusLayer) GetValidators() ([]*apiv1.Validator, error) {

// Deinit shuts down the consensus layer client
func (c *ConsensusLayer) Deinit() {
c.pubkeyCache.Close()
c.validatorCache.Close()
c.disconnect()
c.logger.Debug("HTTP Client Disconnected from the BN")
}
16 changes: 9 additions & 7 deletions router/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,15 +59,15 @@ func (pr *ProxyRouter) pbpGuardSolo(withdrawalAddress common.Address, proposers
pr.m.Counter("prepare_beacon_correct_fee_recipient_solo").Inc()
}

pubkeys, err := pr.CL.GetValidatorPubkey(indices)
validatorInfo, err := pr.CL.GetValidatorInfo(indices)
if err != nil {
// Return here and skip metrics for now.
pr.Logger.Warn("Failed to query solo validator pubkeys for metrics", zap.Error(err))
pr.Logger.Warn("Failed to query solo validator info for metrics", zap.Error(err))
return gbp.Allowed, nil
}

for _, pubkey := range pubkeys {
metrics.ObserveSoloValidator(withdrawalAddress, pubkey)
for _, info := range validatorInfo {
metrics.ObserveSoloValidator(info.WithdrawalAddress, info.Pubkey)
}

return gbp.Allowed, nil
Expand All @@ -85,9 +85,9 @@ func (pr *ProxyRouter) prepareBeaconProposerGuard(proposers gbp.PrepareBeaconPro
}

// Get the index->pubkey map
pubkeyMap, err := pr.CL.GetValidatorPubkey(indices)
validatorMap, err := pr.CL.GetValidatorInfo(indices)
if err != nil {
pr.Logger.Error("Error while querying CL for validator pubkeys", zap.Error(err))
pr.Logger.Error("Error while querying CL for validator info", zap.Error(err))
return gbp.InternalError, nil
}

Expand All @@ -114,13 +114,15 @@ func (pr *ProxyRouter) prepareBeaconProposerGuard(proposers gbp.PrepareBeaconPro
// Note: we iterate the map from the HTTP request to ensure every key is present in the
// response from the consensuslayer abstraction
for _, proposer := range proposers {
pubkey, found := pubkeyMap[proposer.ValidatorIndex]
validatorInfo, found := validatorMap[proposer.ValidatorIndex]
if !found {
pr.Logger.Warn("Pubkey for index not found in response from cl.",
zap.String("requested index", proposer.ValidatorIndex))
return gbp.BadRequest, fmt.Errorf("unknown validator index %s", proposer.ValidatorIndex)
}

pubkey := validatorInfo.Pubkey

// Next we need to get the expected fee recipient for the pubkey
expectedFeeRecipient, unowned := pr.EL.ValidatorFeeRecipient(pubkey, &authedNodeAddr)
if expectedFeeRecipient == nil {
Expand Down

0 comments on commit 72f16b6

Please sign in to comment.