Skip to content

Commit

Permalink
optimization: (eventsyncer) don't remove QBFT instances to speed up s…
Browse files Browse the repository at this point in the history
…ync [main] (#1824)

* optimization: (eventsyncer) don't remove QBFT instances to speed up historical syncing (#1615)

* optimize badger delete action

* try another optimization

* use dropprefix instead of delete all keys

* drop messages only when own share or fullnode

* removed usage of store.CleanAllInstances

* remove storageMap from eventhandler

---------

Co-authored-by: y0sher <lyosher@gmail.com>
Co-authored-by: Matus Kysel <matus@ssvlabs.io>

* fix build and lint

---------

Co-authored-by: Anton Korpusenko <antokorp@gmail.com>
Co-authored-by: Matus Kysel <matus@ssvlabs.io>
Co-authored-by: Nikita Kryuchkov <nkryuchkov10@gmail.com>
  • Loading branch information
4 people authored Oct 28, 2024
1 parent 027ec07 commit 662f423
Show file tree
Hide file tree
Showing 12 changed files with 12 additions and 54 deletions.
3 changes: 0 additions & 3 deletions cli/operator/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,6 @@ var StartNodeCmd = &cobra.Command{
logger,
executionClient,
validatorCtrl,
storageMap,
metricsReporter,
networkConfig,
nodeStorage,
Expand Down Expand Up @@ -613,7 +612,6 @@ func setupEventHandling(
logger *zap.Logger,
executionClient *executionclient.ExecutionClient,
validatorCtrl validator.Controller,
storageMap *ibftstorage.QBFTStores,
metricsReporter metricsreporter.MetricsReporter,
networkConfig networkconfig.NetworkConfig,
nodeStorage operatorstorage.Storage,
Expand All @@ -636,7 +634,6 @@ func setupEventHandling(
operatorDecrypter,
cfg.SSVOptions.ValidatorOptions.KeyManager,
cfg.SSVOptions.ValidatorOptions.Beacon,
storageMap,
eventhandler.WithFullNode(),
eventhandler.WithLogger(logger),
eventhandler.WithMetrics(metricsReporter),
Expand Down
2 changes: 0 additions & 2 deletions eth/ethtest/utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,6 @@ func setupEventHandler(
operator.privateKey,
keyManager,
bc,
storageMap,
eventhandler.WithFullNode(),
eventhandler.WithLogger(logger),
)
Expand Down Expand Up @@ -222,7 +221,6 @@ func setupEventHandler(
operator.privateKey,
keyManager,
bc,
storageMap,
eventhandler.WithFullNode(),
eventhandler.WithLogger(logger),
)
Expand Down
4 changes: 0 additions & 4 deletions eth/eventhandler/event_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"github.com/bloxapp/ssv/eth/eventparser"
"github.com/bloxapp/ssv/eth/executionclient"
"github.com/bloxapp/ssv/eth/localevents"
qbftstorage "github.com/bloxapp/ssv/ibft/storage"
"github.com/bloxapp/ssv/logging/fields"
"github.com/bloxapp/ssv/networkconfig"
operatordatastore "github.com/bloxapp/ssv/operator/datastore"
Expand Down Expand Up @@ -65,7 +64,6 @@ type EventHandler struct {
operatorDecrypter keys.OperatorDecrypter
keyManager spectypes.KeyManager
beacon beaconprotocol.BeaconNode
storageMap *qbftstorage.QBFTStores

fullNode bool
logger *zap.Logger
Expand All @@ -81,7 +79,6 @@ func New(
operatorDecrypter keys.OperatorDecrypter,
keyManager spectypes.KeyManager,
beacon beaconprotocol.BeaconNode,
storageMap *qbftstorage.QBFTStores,
opts ...Option,
) (*EventHandler, error) {
eh := &EventHandler{
Expand All @@ -93,7 +90,6 @@ func New(
operatorDecrypter: operatorDecrypter,
keyManager: keyManager,
beacon: beacon,
storageMap: storageMap,
logger: zap.NewNop(),
metrics: nopMetrics{},
}
Expand Down
2 changes: 0 additions & 2 deletions eth/eventhandler/event_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1316,7 +1316,6 @@ func setupEventHandler(t *testing.T, ctx context.Context, logger *zap.Logger, ne
operator.privateKey,
keyManager,
bc,
storageMap,
WithFullNode(),
WithLogger(logger))
if err != nil {
Expand Down Expand Up @@ -1353,7 +1352,6 @@ func setupEventHandler(t *testing.T, ctx context.Context, logger *zap.Logger, ne
operator.privateKey,
keyManager,
bc,
storageMap,
WithFullNode(),
WithLogger(logger))
if err != nil {
Expand Down
16 changes: 4 additions & 12 deletions eth/eventhandler/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import (
"github.com/bloxapp/ssv/eth/contract"
"github.com/bloxapp/ssv/logging/fields"
"github.com/bloxapp/ssv/operator/duties"
qbftstorage "github.com/bloxapp/ssv/protocol/v2/qbft/storage"
ssvtypes "github.com/bloxapp/ssv/protocol/v2/types"
registrystorage "github.com/bloxapp/ssv/registry/storage"
"github.com/bloxapp/ssv/storage/basedb"
Expand Down Expand Up @@ -224,6 +223,7 @@ func (eh *EventHandler) handleShareCreation(
encryptedKeys [][]byte,
) (*ssvtypes.SSVShare, error) {
share, shareSecret, err := eh.validatorAddedEventToShare(
txn,
validatorEvent,
sharePublicKeys,
encryptedKeys,
Expand Down Expand Up @@ -252,6 +252,7 @@ func (eh *EventHandler) handleShareCreation(
}

func (eh *EventHandler) validatorAddedEventToShare(
txn basedb.Txn,
event *contract.ContractValidatorAdded,
sharePublicKeys [][]byte,
encryptedKeys [][]byte,
Expand Down Expand Up @@ -341,16 +342,7 @@ func (eh *EventHandler) handleValidatorRemoved(txn basedb.Txn, event *contract.C
return nil, &MalformedEventError{Err: ErrShareBelongsToDifferentOwner}
}

removeDecidedMessages := func(role spectypes.BeaconRole, store qbftstorage.QBFTStore) error {
messageID := spectypes.NewMsgID(eh.networkConfig.Domain, share.ValidatorPubKey, role)
return store.CleanAllInstances(logger, messageID[:])
}
err := eh.storageMap.Each(removeDecidedMessages)
if err != nil {
return nil, fmt.Errorf("could not clean all decided messages: %w", err)
}

if err := eh.nodeStorage.Shares().Delete(txn, share.ValidatorPubKey); err != nil {
if err := eh.nodeStorage.Shares().Delete(txn, share.ValidatorPubKey[:]); err != nil {
return nil, fmt.Errorf("could not remove validator share: %w", err)
}

Expand All @@ -359,7 +351,7 @@ func (eh *EventHandler) handleValidatorRemoved(txn basedb.Txn, event *contract.C
logger = logger.With(zap.String("validator_pubkey", hex.EncodeToString(share.ValidatorPubKey)))
}
if isOperatorShare {
err = eh.keyManager.RemoveShare(hex.EncodeToString(share.SharePubKey))
err := eh.keyManager.RemoveShare(hex.EncodeToString(share.SharePubKey))
if err != nil {
return nil, fmt.Errorf("could not remove share from ekm storage: %w", err)
}
Expand Down
4 changes: 0 additions & 4 deletions eth/eventsyncer/event_syncer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ import (
"github.com/bloxapp/ssv/eth/eventparser"
"github.com/bloxapp/ssv/eth/executionclient"
"github.com/bloxapp/ssv/eth/simulator/simcontract"
ibftstorage "github.com/bloxapp/ssv/ibft/storage"
"github.com/bloxapp/ssv/networkconfig"
operatorstorage "github.com/bloxapp/ssv/operator/storage"
"github.com/bloxapp/ssv/operator/validator"
Expand Down Expand Up @@ -137,8 +136,6 @@ func setupEventHandler(t *testing.T, ctx context.Context, logger *zap.Logger) *e
})
require.NoError(t, err)

storageMap := ibftstorage.NewStores()

privateKey, err := keys.GeneratePrivateKey()
if err != nil {
logger.Fatal("failed generating operator key %v", zap.Error(err))
Expand Down Expand Up @@ -179,7 +176,6 @@ func setupEventHandler(t *testing.T, ctx context.Context, logger *zap.Logger) *e
privateKey,
keyManager,
bc,
storageMap,
eventhandler.WithFullNode(),
eventhandler.WithLogger(logger))

Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ require (
github.com/btcsuite/btcd/btcec/v2 v2.3.2
github.com/cespare/xxhash/v2 v2.2.0
github.com/cornelk/hashmap v1.0.8
github.com/dgraph-io/badger/v4 v4.1.0
github.com/dgraph-io/badger/v4 v4.2.0
github.com/dgraph-io/ristretto v0.1.1
github.com/ethereum/go-ethereum v1.13.5
github.com/ferranbt/fastssz v0.1.3
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,8 @@ github.com/decred/dcrd/dcrec/secp256k1/v4 v4.2.0/go.mod h1:v57UDF4pDQJcEfFUCRop3
github.com/dgraph-io/badger v1.6.0/go.mod h1:zwt7syl517jmP8s94KqSxTlM6IMsdhYy6psNgSztDR4=
github.com/dgraph-io/badger/v4 v4.1.0 h1:E38jc0f+RATYrycSUf9LMv/t47XAy+3CApyYSq4APOQ=
github.com/dgraph-io/badger/v4 v4.1.0/go.mod h1:P50u28d39ibBRmIJuQC/NSdBOg46HnHw7al2SW5QRHg=
github.com/dgraph-io/badger/v4 v4.2.0 h1:kJrlajbXXL9DFTNuhhu9yCx7JJa4qpYWxtE8BzuWsEs=
github.com/dgraph-io/badger/v4 v4.2.0/go.mod h1:qfCqhPoWDFJRx1gp5QwwyGo8xk1lbHUxvK9nK0OGAak=
github.com/dgraph-io/ristretto v0.1.1-0.20211108053508-297c39e6640f h1:NBGp2JpfMtXmanFWt6f3gEdBtnLO5LupRvm3w4TXrvs=
github.com/dgraph-io/ristretto v0.1.1-0.20211108053508-297c39e6640f/go.mod h1:fux0lOrBhrVCJd3lcTHsIJhq1T2rokOu6v9Vcb3Q9ug=
github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2 h1:tdlZCpZ/P9DhczCTSixgIKmwPv6+wP5DGjqLYw5SUiA=
Expand Down
2 changes: 1 addition & 1 deletion ibft/storage/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ func (i *ibftStorage) CleanAllInstances(logger *zap.Logger, msgID []byte) error
prefix := i.prefix
prefix = append(prefix, msgID[:]...)
prefix = append(prefix, []byte(instanceKey)...)
_, err := i.db.DeletePrefix(prefix)
err := i.db.DropPrefix(prefix)
if err != nil {
return errors.Wrap(err, "failed to remove decided")
}
Expand Down
1 change: 0 additions & 1 deletion storage/basedb/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@ type Database interface {

// TODO: consider moving these functions into Reader and ReadWriter interfaces?
CountPrefix(prefix []byte) (int64, error)
DeletePrefix(prefix []byte) (int, error)
DropPrefix(prefix []byte) error
Update(fn func(Txn) error) error
Close() error
Expand Down
22 changes: 4 additions & 18 deletions storage/kv/badger.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,22 +158,6 @@ func (b *BadgerDB) Delete(prefix []byte, key []byte) error {
})
}

// DeletePrefix all items with this prefix
func (b *BadgerDB) DeletePrefix(prefix []byte) (int, error) {
count := 0
err := b.db.Update(func(txn *badger.Txn) error {
rawKeys := b.listRawKeys(prefix, txn)
for _, k := range rawKeys {
if err := txn.Delete(k); err != nil {
return err
}
count++
}
return nil
})
return count, err
}

// GetAll returns all the items of a given collection
func (b *BadgerDB) GetAll(prefix []byte, handler func(int, basedb.Obj) error) error {
// we got issues when reading more than 100 items with iterator (items get mixed up)
Expand Down Expand Up @@ -249,11 +233,13 @@ func (b *BadgerDB) listRawKeys(prefix []byte, txn *badger.Txn) [][]byte {

opt := badger.DefaultIteratorOptions
opt.Prefix = prefix
opt.PrefetchValues = false

it := txn.NewIterator(opt)
defer it.Close()

for it.Seek(prefix); it.ValidForPrefix(prefix); it.Next() {
item := it.Item()
keys = append(keys, item.KeyCopy(nil))
keys = append(keys, it.Item().KeyCopy(nil))
}

return keys
Expand Down
6 changes: 0 additions & 6 deletions storage/kv/badger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,9 +87,6 @@ func TestBadgerEndToEnd(t *testing.T) {
require.False(t, found)

require.NoError(t, db.DropPrefix([]byte("prefix2")))
deleted, err := db.DeletePrefix([]byte("prefix1"))
require.NoError(t, err)
require.Equal(t, 1, deleted)
}

func TestBadgerDb_GetAll(t *testing.T) {
Expand Down Expand Up @@ -194,7 +191,4 @@ func getAllTest(t *testing.T, n int, db basedb.Database) {
visited[string(item.Key)] = item.Value
}
require.Equal(t, n, len(visited))
count, err := db.DeletePrefix(prefix)
require.NoError(t, err)
require.Equal(t, n, count)
}

0 comments on commit 662f423

Please sign in to comment.