Skip to content

Commit

Permalink
exporter: update participants atomically
Browse files Browse the repository at this point in the history
  • Loading branch information
nkryuchkov committed Oct 30, 2024
1 parent a020f5a commit 8e45af2
Show file tree
Hide file tree
Showing 5 changed files with 150 additions and 37 deletions.
8 changes: 5 additions & 3 deletions exporter/api/query_handlers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,13 +120,15 @@ func TestHandleDecidedQuery(t *testing.T) {
})
require.NoError(t, err)

// save decided
// save participants
for _, d := range decided250Seq {
require.NoError(t, ibftStorage.Get(role).SaveInstance(d))
require.NoError(t, ibftStorage.Get(role).SaveParticipants(convert.MessageID(d.DecidedMessage.SSVMessage.MsgID),
_, err := ibftStorage.Get(role).UpdateParticipants(
convert.MessageID(d.DecidedMessage.SSVMessage.MsgID),
phase0.Slot(d.State.Height),
d.DecidedMessage.OperatorIDs),
d.DecidedMessage.OperatorIDs,
)
require.NoError(t, err)
}

t.Run("valid range", func(t *testing.T) {
Expand Down
89 changes: 70 additions & 19 deletions ibft/storage/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package storage
import (
"encoding/binary"
"fmt"
"slices"

"github.com/attestantio/go-eth2-client/spec/phase0"
"github.com/pkg/errors"
Expand Down Expand Up @@ -55,7 +56,7 @@ func New(db basedb.Database, prefix string) qbftstorage.QBFTStore {

// GetHighestInstance returns the StoredInstance for the highest instance.
func (i *ibftStorage) GetHighestInstance(identifier []byte) (*qbftstorage.StoredInstance, error) {
val, found, err := i.get(highestInstanceKey, identifier[:])
val, found, err := i.get(nil, highestInstanceKey, identifier[:])
if !found {
return nil, nil
}
Expand Down Expand Up @@ -90,14 +91,14 @@ func (i *ibftStorage) saveInstance(inst *qbftstorage.StoredInstance, toHistory,
}

if asHighest {
err = i.save(value, highestInstanceKey, inst.State.ID)
err = i.save(nil, value, highestInstanceKey, inst.State.ID)
if err != nil {
return errors.Wrap(err, "could not save highest instance")
}
}

if toHistory {
err = i.save(value, instanceKey, inst.State.ID, uInt64ToByteSlice(uint64(inst.State.Height)))
err = i.save(nil, value, instanceKey, inst.State.ID, uInt64ToByteSlice(uint64(inst.State.Height)))
if err != nil {
return errors.Wrap(err, "could not save historical instance")
}
Expand All @@ -108,7 +109,7 @@ func (i *ibftStorage) saveInstance(inst *qbftstorage.StoredInstance, toHistory,

// GetInstance returns historical StoredInstance for the given identifier and height.
func (i *ibftStorage) GetInstance(identifier []byte, height specqbft.Height) (*qbftstorage.StoredInstance, error) {
val, found, err := i.get(instanceKey, identifier[:], uInt64ToByteSlice(uint64(height)))
val, found, err := i.get(nil, instanceKey, identifier[:], uInt64ToByteSlice(uint64(height)))
if !found {
return nil, nil
}
Expand Down Expand Up @@ -140,7 +141,7 @@ func (i *ibftStorage) GetInstancesInRange(identifier []byte, from specqbft.Heigh
}

// CleanAllInstances removes all StoredInstance's & highest StoredInstance's for msgID.
func (i *ibftStorage) CleanAllInstances(logger *zap.Logger, msgID []byte) error {
func (i *ibftStorage) CleanAllInstances(msgID []byte) error {
prefix := i.prefix
prefix = append(prefix, msgID[:]...)
prefix = append(prefix, []byte(instanceKey)...)
Expand All @@ -149,22 +150,35 @@ func (i *ibftStorage) CleanAllInstances(logger *zap.Logger, msgID []byte) error
return errors.Wrap(err, "failed to remove decided")
}

if err := i.delete(highestInstanceKey, msgID[:]); err != nil {
if err := i.delete(nil, highestInstanceKey, msgID[:]); err != nil {
return errors.Wrap(err, "failed to remove last decided")
}
return nil
}

func (i *ibftStorage) SaveParticipants(identifier convert.MessageID, slot phase0.Slot, operators []spectypes.OperatorID) error {
bytes, err := encodeOperators(operators)
func (i *ibftStorage) UpdateParticipants(identifier convert.MessageID, slot phase0.Slot, newParticipants []spectypes.OperatorID) (updated bool, err error) {
txn := i.db.Begin()
defer txn.Discard()

existingParticipants, err := i.getParticipants(txn, identifier, slot)
if err != nil {
return err
return false, fmt.Errorf("could not get participants %w", err)
}
if err := i.save(bytes, participantsKey, identifier[:], uInt64ToByteSlice(uint64(slot))); err != nil {
return fmt.Errorf("could not save participants: %w", err)

mergedParticipants := mergeParticipants(existingParticipants, newParticipants)
if slices.Equal(mergedParticipants, existingParticipants) {
return false, nil
}

return nil
if err := i.saveParticipants(txn, identifier, slot, mergedParticipants); err != nil {
return false, fmt.Errorf("could not save participants: %w", err)
}

if err := txn.Commit(); err != nil {
return false, fmt.Errorf("commit transaction: %w", err)
}

return true, nil
}

func (i *ibftStorage) GetParticipantsInRange(identifier convert.MessageID, from, to phase0.Slot) ([]qbftstorage.ParticipantsRangeEntry, error) {
Expand All @@ -191,7 +205,11 @@ func (i *ibftStorage) GetParticipantsInRange(identifier convert.MessageID, from,
}

func (i *ibftStorage) GetParticipants(identifier convert.MessageID, slot phase0.Slot) ([]spectypes.OperatorID, error) {
val, found, err := i.get(participantsKey, identifier[:], uInt64ToByteSlice(uint64(slot)))
return i.getParticipants(nil, identifier, slot)
}

func (i *ibftStorage) getParticipants(txn basedb.ReadWriter, identifier convert.MessageID, slot phase0.Slot) ([]spectypes.OperatorID, error) {
val, found, err := i.get(txn, participantsKey, identifier[:], uInt64ToByteSlice(uint64(slot)))
if err != nil {
return nil, err
}
Expand All @@ -203,16 +221,49 @@ func (i *ibftStorage) GetParticipants(identifier convert.MessageID, slot phase0.
return operators, nil
}

func (i *ibftStorage) save(value []byte, id string, pk []byte, keyParams ...[]byte) error {
func (i *ibftStorage) saveParticipants(txn basedb.ReadWriter, identifier convert.MessageID, slot phase0.Slot, operators []spectypes.OperatorID) error {
bytes, err := encodeOperators(operators)
if err != nil {
return err
}
if err := i.save(txn, bytes, participantsKey, identifier[:], uInt64ToByteSlice(uint64(slot))); err != nil {
return fmt.Errorf("could not save participants: %w", err)
}

return nil
}

func mergeParticipants(existingParticipants, newParticipants []spectypes.OperatorID) []spectypes.OperatorID {
seen := make(map[spectypes.OperatorID]struct{})

for _, operatorID := range existingParticipants {
seen[operatorID] = struct{}{}
}

for _, operatorID := range newParticipants {
seen[operatorID] = struct{}{}
}

result := make([]spectypes.OperatorID, 0, len(seen))
for operatorID := range seen {
result = append(result, operatorID)
}

slices.Sort(result)

return result
}

func (i *ibftStorage) save(txn basedb.ReadWriter, value []byte, id string, pk []byte, keyParams ...[]byte) error {
prefix := append(i.prefix, pk...)
key := i.key(id, keyParams...)
return i.db.Set(prefix, key, value)
return i.db.Using(txn).Set(prefix, key, value)
}

func (i *ibftStorage) get(id string, pk []byte, keyParams ...[]byte) ([]byte, bool, error) {
func (i *ibftStorage) get(txn basedb.ReadWriter, id string, pk []byte, keyParams ...[]byte) ([]byte, bool, error) {
prefix := append(i.prefix, pk...)
key := i.key(id, keyParams...)
obj, found, err := i.db.Get(prefix, key)
obj, found, err := i.db.Using(txn).Get(prefix, key)
if !found {
return nil, found, nil
}
Expand All @@ -222,10 +273,10 @@ func (i *ibftStorage) get(id string, pk []byte, keyParams ...[]byte) ([]byte, bo
return obj.Value, found, nil
}

func (i *ibftStorage) delete(id string, pk []byte, keyParams ...[]byte) error {
func (i *ibftStorage) delete(txn basedb.ReadWriter, id string, pk []byte, keyParams ...[]byte) error {
prefix := append(i.prefix, pk...)
key := i.key(id, keyParams...)
return i.db.Delete(prefix, key)
return i.db.Using(txn).Delete(prefix, key)
}

func (i *ibftStorage) key(id string, params ...[]byte) []byte {
Expand Down
67 changes: 66 additions & 1 deletion ibft/storage/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func TestCleanInstances(t *testing.T) {
require.Equal(t, specqbft.Height(msgsCount), last.State.Height)

// remove all instances
require.NoError(t, storage.CleanAllInstances(logger, msgID[:]))
require.NoError(t, storage.CleanAllInstances(msgID[:]))
res, err = storage.GetInstancesInRange(msgID[:], 0, specqbft.Height(msgsCount))
require.NoError(t, err)
require.Equal(t, 0, len(res))
Expand Down Expand Up @@ -208,3 +208,68 @@ func TestEncodeDecodeOperators(t *testing.T) {
})
}
}

func Test_mergeQuorums(t *testing.T) {
tests := []struct {
name string
participants1 []spectypes.OperatorID
participants2 []spectypes.OperatorID
expected []spectypes.OperatorID
}{
{
name: "Both participants empty",
participants1: []spectypes.OperatorID{},
participants2: []spectypes.OperatorID{},
expected: []spectypes.OperatorID{},
},
{
name: "First participants empty",
participants1: []spectypes.OperatorID{},
participants2: []spectypes.OperatorID{1, 2, 3},
expected: []spectypes.OperatorID{1, 2, 3},
},
{
name: "Second participants empty",
participants1: []spectypes.OperatorID{1, 2, 3},
participants2: []spectypes.OperatorID{},
expected: []spectypes.OperatorID{1, 2, 3},
},
{
name: "No duplicates",
participants1: []spectypes.OperatorID{1, 3, 5},
participants2: []spectypes.OperatorID{2, 4, 6},
expected: []spectypes.OperatorID{1, 2, 3, 4, 5, 6},
},
{
name: "With duplicates",
participants1: []spectypes.OperatorID{1, 2, 3, 5},
participants2: []spectypes.OperatorID{3, 4, 5, 6},
expected: []spectypes.OperatorID{1, 2, 3, 4, 5, 6},
},
{
name: "All duplicates",
participants1: []spectypes.OperatorID{1, 2, 3},
participants2: []spectypes.OperatorID{1, 2, 3},
expected: []spectypes.OperatorID{1, 2, 3},
},
{
name: "Unsorted input participants",
participants1: []spectypes.OperatorID{5, 1, 3},
participants2: []spectypes.OperatorID{4, 2, 6},
expected: []spectypes.OperatorID{1, 2, 3, 4, 5, 6},
},
{
name: "Large participants size",
participants1: []spectypes.OperatorID{1, 3, 5, 7, 9, 11, 13},
participants2: []spectypes.OperatorID{2, 4, 6, 8, 10, 12, 14},
expected: []spectypes.OperatorID{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14},
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
result := mergeParticipants(tt.participants1, tt.participants2)
require.Equal(t, tt.expected, result)
})
}
}
12 changes: 6 additions & 6 deletions protocol/v2/qbft/storage/ibft_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,12 @@ package qbftstorage

import (
"encoding/json"

"github.com/attestantio/go-eth2-client/spec/phase0"
specqbft "github.com/ssvlabs/ssv-spec/qbft"
spectypes "github.com/ssvlabs/ssv-spec/types"
"github.com/ssvlabs/ssv/exporter/convert"
"go.uber.org/zap"

specqbft "github.com/ssvlabs/ssv-spec/qbft"
"github.com/ssvlabs/ssv/exporter/convert"
)

// StoredInstance contains instance state alongside with a decided message (aggregated commits).
Expand Down Expand Up @@ -53,10 +53,10 @@ type InstanceStore interface {
GetInstance(identifier []byte, height specqbft.Height) (*StoredInstance, error)

// CleanAllInstances removes all historical and highest instances for the given identifier.
CleanAllInstances(logger *zap.Logger, msgID []byte) error
CleanAllInstances(msgID []byte) error

// SaveParticipants save participants in quorum.
SaveParticipants(identifier convert.MessageID, slot phase0.Slot, operators []spectypes.OperatorID) error
// UpdateParticipants updates participants in quorum.
UpdateParticipants(identifier convert.MessageID, slot phase0.Slot, newParticipants []spectypes.OperatorID) (bool, error)

// GetParticipantsInRange returns participants in quorum for the given slot range.
GetParticipantsInRange(identifier convert.MessageID, from, to phase0.Slot) ([]ParticipantsRangeEntry, error)
Expand Down
11 changes: 3 additions & 8 deletions protocol/v2/ssv/validator/non_committee_validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,20 +158,15 @@ func (ncv *CommitteeObserver) ProcessMessage(msg *queue.SSVMessage) error {
return fmt.Errorf("role storage doesn't exist: %v", beaconRole)
}

existingQuorum, err := roleStorage.GetParticipants(msgID, slot)
updated, err := roleStorage.UpdateParticipants(msgID, slot, quorum)
if err != nil {
return fmt.Errorf("could not get participants %w", err)
return fmt.Errorf("could not save participants: %w", err)
}

mergedQuorum := mergeQuorums(existingQuorum, quorum)
if slices.Equal(existingQuorum, existingQuorum) {
if !updated {
continue
}

if err := roleStorage.SaveParticipants(msgID, slot, mergedQuorum); err != nil {
return fmt.Errorf("could not save participants: %w", err)
}

logger.Info("✅ saved participants",
zap.String("converted_role", beaconRole.ToBeaconRole()),
zap.Uint64("validator_index", uint64(key.ValidatorIndex)),
Expand Down

0 comments on commit 8e45af2

Please sign in to comment.