From 6a8129099aa973aa64a329153976f7243f605053 Mon Sep 17 00:00:00 2001 From: Jacob Shufro Date: Wed, 4 Oct 2023 14:41:19 -0400 Subject: [PATCH] Pull out consensus cache into file, add withdrawal creds to cached info --- consensuslayer/cache.go | 54 ++++++++++++++++++++++++ consensuslayer/cache_test.go | 47 +++++++++++++++++++++ consensuslayer/consensus-layer.go | 68 +++++++++++++++++-------------- router/router.go | 16 ++++---- 4 files changed, 148 insertions(+), 37 deletions(-) create mode 100644 consensuslayer/cache.go create mode 100644 consensuslayer/cache_test.go diff --git a/consensuslayer/cache.go b/consensuslayer/cache.go new file mode 100644 index 0000000..1060793 --- /dev/null +++ b/consensuslayer/cache.go @@ -0,0 +1,54 @@ +package consensuslayer + +import ( + "context" + + "github.com/allegro/bigcache/v3" + "github.com/ethereum/go-ethereum/common" + rptypes "github.com/rocket-pool/rocketpool-go/types" +) + +const pubkeyLength = 48 +const withdrawalLength = 20 + +type validatorCache struct { + *bigcache.BigCache +} + +// The cache value will be a byte slice of 68 length +// First 48 bytes for the publick key +// Last 20 bytes for the address of the 0x01 credential, or a guardian address if a BLS key. + +func newValidatorCache(ctx context.Context, config bigcache.Config) (*validatorCache, error) { + bc, err := bigcache.New(ctx, config) + if err != nil { + return nil, err + } + + return &validatorCache{bc}, nil +} + +func (c *validatorCache) Get(index string) *ValidatorInfo { + blob, err := c.BigCache.Get(index) + if err != nil { + return nil + } + + if len(blob) != pubkeyLength+withdrawalLength { + return nil + } + + out := ValidatorInfo{} + out.Pubkey = rptypes.BytesToValidatorPubkey(blob[:pubkeyLength]) + out.WithdrawalAddress = common.BytesToAddress(blob[pubkeyLength:]) + return &out +} + +func (c *validatorCache) Set(index string, v *ValidatorInfo) { + var blob [pubkeyLength + withdrawalLength]byte + + copy(blob[:], v.Pubkey[:]) + copy(blob[pubkeyLength:], v.WithdrawalAddress[:]) + + _ = c.BigCache.Set(index, blob[:]) +} diff --git a/consensuslayer/cache_test.go b/consensuslayer/cache_test.go new file mode 100644 index 0000000..2415326 --- /dev/null +++ b/consensuslayer/cache_test.go @@ -0,0 +1,47 @@ +package consensuslayer + +import ( + "bytes" + "context" + "encoding/hex" + "testing" + "time" + + "github.com/allegro/bigcache/v3" + "github.com/ethereum/go-ethereum/common" + rptypes "github.com/rocket-pool/rocketpool-go/types" +) + +func TestCacheRoundTrip(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + config := bigcache.DefaultConfig(10 * time.Hour) + + cache, err := newValidatorCache(ctx, config) + if err != nil { + t.Error(err) + } + + expectedKey, _ := hex.DecodeString("b20fb4a9340f8b23197b8449db7a5d3d8d068570a2b61a8d78817537aac4fd5645434d3e89a918a3ba9d0b7707cbeae0") + expectedAddr, _ := hex.DecodeString("6a6d731664115Ff3C823807442a4dC94999b0923") + + cache.Set("test", &ValidatorInfo{ + Pubkey: rptypes.BytesToValidatorPubkey(expectedKey), + WithdrawalAddress: common.BytesToAddress(expectedAddr), + }) + + vInfo := cache.Get("test") + if vInfo == nil { + t.Fail() + return + } + + if !bytes.EqualFold(vInfo.Pubkey[:], expectedKey) { + t.Fail() + } + + if !bytes.EqualFold(vInfo.WithdrawalAddress[:], expectedAddr) { + t.Fail() + } +} diff --git a/consensuslayer/consensus-layer.go b/consensuslayer/consensus-layer.go index 8bd8bfa..45b68f5 100644 --- a/consensuslayer/consensus-layer.go +++ b/consensuslayer/consensus-layer.go @@ -1,8 +1,8 @@ package consensuslayer import ( + "bytes" "context" - "encoding/hex" "net/url" "strconv" "time" @@ -12,16 +12,12 @@ import ( apiv1 "github.com/attestantio/go-eth2-client/api/v1" "github.com/attestantio/go-eth2-client/http" "github.com/attestantio/go-eth2-client/spec/phase0" + "github.com/ethereum/go-ethereum/common" rptypes "github.com/rocket-pool/rocketpool-go/types" "github.com/rs/zerolog" "go.uber.org/zap" ) -const cacheTTL time.Duration = 10 * time.Hour -const cacheShards int = 32 -const cacheGC time.Duration = 30 * time.Second -const cacheHardMaxMB int = 512 - // ConsensusLayer provides an abstraction for the rescue proxy over the consensus layer // It's specifically needed to map validator indices to pubkeys prior to EL validation type ConsensusLayer struct { @@ -31,8 +27,8 @@ type ConsensusLayer struct { // Client for the BN client *http.Service - // Caches index->pubkey for prepare_beacon_proposer - pubkeyCache *bigcache.BigCache + // Caches index->validatorInfo for prepare_beacon_proposer + validatorCache *validatorCache // Disconnects from the bn disconnect func() @@ -41,6 +37,11 @@ type ConsensusLayer struct { slotsPerEpoch uint64 } +type ValidatorInfo struct { + Pubkey rptypes.ValidatorPubkey + WithdrawalAddress common.Address +} + // NewConsensusLayer creates a new consensus layer client using the provided url and logger func NewConsensusLayer(bnURL *url.URL, logger *zap.Logger) *ConsensusLayer { out := &ConsensusLayer{} @@ -85,7 +86,9 @@ func (c *ConsensusLayer) Init() error { } c.client = client.(*http.Service) - c.slotsPerEpoch, err = c.client.SlotsPerEpoch(context.Background()) + speCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + c.slotsPerEpoch, err = c.client.SlotsPerEpoch(speCtx) if err != nil { c.logger.Warn("Couldn't get slots per epoch, defaulting to 32", zap.Error(err)) c.slotsPerEpoch = 32 @@ -101,12 +104,12 @@ func (c *ConsensusLayer) Init() error { c.logger.Warn("Clouldn't subscribe to CL events. Metrics will be inaccurate", zap.Error(err)) } - cacheConfig := bigcache.DefaultConfig(cacheTTL) - cacheConfig.CleanWindow = cacheGC - cacheConfig.Shards = cacheShards - cacheConfig.HardMaxCacheSize = cacheHardMaxMB + validatorCacheConfig := bigcache.DefaultConfig(10 * time.Hour) + validatorCacheConfig.CleanWindow = 30 * time.Second + validatorCacheConfig.Shards = 32 + validatorCacheConfig.HardMaxCacheSize = 512 - c.pubkeyCache, err = bigcache.New(ctx, cacheConfig) + c.validatorCache, err = newValidatorCache(ctx, validatorCacheConfig) if err != nil { return err } @@ -116,30 +119,24 @@ func (c *ConsensusLayer) Init() error { return nil } -const pubkeyBytes = 48 - -// GetValidatorPubkey maps a validator index to a pubkey. +// GetValidatorIfno maps a validator index to a pubkey and withdrawal credential. // It caches responses from the beacon client in memory for an arbitrary amount of time to save resources. -func (c *ConsensusLayer) GetValidatorPubkey(validatorIndices []string) (map[string]rptypes.ValidatorPubkey, error) { +func (c *ConsensusLayer) GetValidatorInfo(validatorIndices []string) (map[string]*ValidatorInfo, error) { // Pre-allocate the retval based on the argument length - out := make(map[string]rptypes.ValidatorPubkey, len(validatorIndices)) + out := make(map[string]*ValidatorInfo, len(validatorIndices)) missing := make([]phase0.ValidatorIndex, 0, len(validatorIndices)) for _, validatorIndex := range validatorIndices { // Check the cache first - pubkey, err := c.pubkeyCache.Get(validatorIndex) - if err == nil { - if len(pubkey) != pubkeyBytes { - c.logger.Warn("Invalid pubkey from beacon node", zap.String("key", hex.EncodeToString(pubkey))) - continue - } + validatorInfo := c.validatorCache.Get(validatorIndex) + if validatorInfo != nil { // Add the pubkey to the output. We have to cast it to an array, but the length is correct (see above) - out[validatorIndex] = *(*rptypes.ValidatorPubkey)(pubkey) + out[validatorIndex] = validatorInfo c.logger.Debug("Cache hit", zap.String("validator", validatorIndex)) c.m.Counter("cache_hit").Inc() } else { - // An error means the record wasn't in the cache + // A nil value means the record wasn't in the cache or there was an error // Add the index to the list to be queried against the BN index, err := strconv.ParseUint(validatorIndex, 10, 64) if err != nil { @@ -165,10 +162,21 @@ func (c *ConsensusLayer) GetValidatorPubkey(validatorIndices []string) (map[stri for index, validator := range resp { strIndex := strconv.FormatUint(uint64(index), 10) pubkey := rptypes.ValidatorPubkey(validator.Validator.PublicKey) - out[strIndex] = pubkey + withdrawalCredentials := validator.Validator.WithdrawalCredentials + + out[strIndex] = &ValidatorInfo{ + Pubkey: pubkey, + } + + if !bytes.HasPrefix(withdrawalCredentials, []byte{0x01}) { + c.logger.Warn("0x00 Validator seen", zap.Binary("pubkey", pubkey.Bytes())) + } else { + // BytesToAddress will cut off all but the last 20 bytes + out[strIndex].WithdrawalAddress = common.BytesToAddress(withdrawalCredentials) + } // Add it to the cache. Ignore errors, we can always look the key up later - _ = c.pubkeyCache.Set(strIndex, pubkey[:]) + c.validatorCache.Set(strIndex, out[strIndex]) c.m.Counter("cache_add").Inc() } @@ -196,7 +204,7 @@ func (c *ConsensusLayer) GetValidators() ([]*apiv1.Validator, error) { // Deinit shuts down the consensus layer client func (c *ConsensusLayer) Deinit() { - c.pubkeyCache.Close() + c.validatorCache.Close() c.disconnect() c.logger.Debug("HTTP Client Disconnected from the BN") } diff --git a/router/router.go b/router/router.go index eeeb683..befce21 100644 --- a/router/router.go +++ b/router/router.go @@ -59,15 +59,15 @@ func (pr *ProxyRouter) pbpGuardSolo(withdrawalAddress common.Address, proposers pr.m.Counter("prepare_beacon_correct_fee_recipient_solo").Inc() } - pubkeys, err := pr.CL.GetValidatorPubkey(indices) + validatorInfo, err := pr.CL.GetValidatorInfo(indices) if err != nil { // Return here and skip metrics for now. - pr.Logger.Warn("Failed to query solo validator pubkeys for metrics", zap.Error(err)) + pr.Logger.Warn("Failed to query solo validator info for metrics", zap.Error(err)) return gbp.Allowed, nil } - for _, pubkey := range pubkeys { - metrics.ObserveSoloValidator(withdrawalAddress, pubkey) + for _, info := range validatorInfo { + metrics.ObserveSoloValidator(info.WithdrawalAddress, info.Pubkey) } return gbp.Allowed, nil @@ -85,9 +85,9 @@ func (pr *ProxyRouter) prepareBeaconProposerGuard(proposers gbp.PrepareBeaconPro } // Get the index->pubkey map - pubkeyMap, err := pr.CL.GetValidatorPubkey(indices) + validatorMap, err := pr.CL.GetValidatorInfo(indices) if err != nil { - pr.Logger.Error("Error while querying CL for validator pubkeys", zap.Error(err)) + pr.Logger.Error("Error while querying CL for validator info", zap.Error(err)) return gbp.InternalError, nil } @@ -114,13 +114,15 @@ func (pr *ProxyRouter) prepareBeaconProposerGuard(proposers gbp.PrepareBeaconPro // Note: we iterate the map from the HTTP request to ensure every key is present in the // response from the consensuslayer abstraction for _, proposer := range proposers { - pubkey, found := pubkeyMap[proposer.ValidatorIndex] + validatorInfo, found := validatorMap[proposer.ValidatorIndex] if !found { pr.Logger.Warn("Pubkey for index not found in response from cl.", zap.String("requested index", proposer.ValidatorIndex)) return gbp.BadRequest, fmt.Errorf("unknown validator index %s", proposer.ValidatorIndex) } + pubkey := validatorInfo.Pubkey + // Next we need to get the expected fee recipient for the pubkey expectedFeeRecipient, unowned := pr.EL.ValidatorFeeRecipient(pubkey, &authedNodeAddr) if expectedFeeRecipient == nil {