Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Align to ssv-spec v0.3.3 #1109

Merged
merged 24 commits into from
Oct 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions beacon/goclient/goclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ type Client interface {
eth2client.BlindedBeaconBlockProposalProvider
eth2client.BlindedBeaconBlockSubmitter
eth2client.ValidatorRegistrationsSubmitter
eth2client.VoluntaryExitSubmitter
}

type NodeClientProvider interface {
Expand Down
10 changes: 10 additions & 0 deletions beacon/goclient/voluntary_exit.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package goclient

import (
"github.com/attestantio/go-eth2-client/spec/phase0"
"github.com/pkg/errors"
)

func (gc *goClient) SubmitVoluntaryExit(voluntaryExit *phase0.SignedVoluntaryExit, sig phase0.BLSSignature) error {
return errors.New("not implemented")
}
14 changes: 9 additions & 5 deletions eth/executionclient/execution_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func TestFetchHistoricalLogs(t *testing.T) {
httpsrv := httptest.NewServer(rpcServer.WebsocketHandler([]string{"*"}))
defer rpcServer.Stop()
defer httpsrv.Close()
addr := "ws:" + strings.TrimPrefix(httpsrv.URL, "http:")
addr := httpToWebSocketURL(httpsrv.URL)

parsed, _ := abi.JSON(strings.NewReader(callableAbi))
auth, _ := bind.NewKeyedTransactorWithChainID(testKey, big.NewInt(1337))
Expand Down Expand Up @@ -131,7 +131,7 @@ func TestStreamLogs(t *testing.T) {
httpsrv := httptest.NewServer(rpcServer.WebsocketHandler([]string{"*"}))
defer rpcServer.Stop()
defer httpsrv.Close()
addr := "ws:" + strings.TrimPrefix(httpsrv.URL, "http:")
addr := httpToWebSocketURL(httpsrv.URL)

// Deploy the contract
parsed, _ := abi.JSON(strings.NewReader(callableAbi))
Expand Down Expand Up @@ -215,7 +215,7 @@ func TestFetchLogsInBatches(t *testing.T) {
httpsrv := httptest.NewServer(rpcServer.WebsocketHandler([]string{"*"}))
defer rpcServer.Stop()
defer httpsrv.Close()
addr := "ws:" + strings.TrimPrefix(httpsrv.URL, "http:")
addr := httpToWebSocketURL(httpsrv.URL)

// Deploy the contract
parsed, _ := abi.JSON(strings.NewReader(callableAbi))
Expand Down Expand Up @@ -325,7 +325,7 @@ func TestChainReorganizationLogs(t *testing.T) {
// defer rpcServer.Stop()
// defer httpsrv.Close()

// addr := "ws:" + strings.TrimPrefix(httpsrv.URL, "http:")
// addr := httpToWebSocketURL(httpsrv.URL)

// // 1.
// parsed, _ := abi.JSON(strings.NewReader(callableAbi))
Expand Down Expand Up @@ -417,7 +417,7 @@ func TestSimSSV(t *testing.T) {
httpsrv := httptest.NewServer(rpcServer.WebsocketHandler([]string{"*"}))
defer rpcServer.Stop()
defer httpsrv.Close()
addr := "ws:" + strings.TrimPrefix(httpsrv.URL, "http:")
addr := httpToWebSocketURL(httpsrv.URL)

parsed, _ := abi.JSON(strings.NewReader(simcontract.SimcontractMetaData.ABI))
auth, _ := bind.NewKeyedTransactorWithChainID(testKey, big.NewInt(1337))
Expand Down Expand Up @@ -584,3 +584,7 @@ func TestSimSSV(t *testing.T) {
require.NoError(t, client.Close())
require.NoError(t, sim.Close())
}

func httpToWebSocketURL(url string) string {
return "ws:" + strings.TrimPrefix(url, "http:")
}
4 changes: 1 addition & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ require (
github.com/aquasecurity/table v1.8.0
github.com/attestantio/go-eth2-client v0.16.3
github.com/bloxapp/eth2-key-manager v1.3.1
github.com/bloxapp/ssv-spec v0.3.1
github.com/bloxapp/ssv-spec v0.3.3
github.com/btcsuite/btcd/btcec/v2 v2.3.2
github.com/cespare/xxhash/v2 v2.2.0
github.com/cornelk/hashmap v1.0.8
Expand Down Expand Up @@ -222,5 +222,3 @@ require (
replace github.com/google/flatbuffers => github.com/google/flatbuffers v1.11.0

replace github.com/dgraph-io/ristretto => github.com/dgraph-io/ristretto v0.1.1-0.20211108053508-297c39e6640f

replace github.com/bloxapp/ssv-spec => github.com/bloxapp/ssv-spec v0.0.0-20230719131453-1c0044021800
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,8 @@ github.com/bits-and-blooms/bitset v1.7.0 h1:YjAGVd3XmtK9ktAbX8Zg2g2PwLIMjGREZJHl
github.com/bits-and-blooms/bitset v1.7.0/go.mod h1:gIdJ4wp64HaoK2YrL1Q5/N7Y16edYb8uY+O0FJTyyDA=
github.com/bloxapp/eth2-key-manager v1.3.1 h1:1olQcOHRY2TN1o8JX9AN1siEIJXWnlM+BlknfBbXoo4=
github.com/bloxapp/eth2-key-manager v1.3.1/go.mod h1:cT+qAJfnAzNz9StFoHQ8xAkyU2eyEukd6xfxvcBWuZA=
github.com/bloxapp/ssv-spec v0.0.0-20230719131453-1c0044021800 h1:ikChvdYVw4GFSlnIS+u1qmNqOvgq2a2H3b2FZ44KBn8=
github.com/bloxapp/ssv-spec v0.0.0-20230719131453-1c0044021800/go.mod h1:zPJR7YnG5iZ6I0h6EzfVly8bTBXaZwcx4TyJ8pzYVd8=
github.com/bloxapp/ssv-spec v0.3.3 h1:iNomqWQjxDDQouHMjl27PmH1hUolJ4u8QQ+HX/TQQcg=
github.com/bloxapp/ssv-spec v0.3.3/go.mod h1:zPJR7YnG5iZ6I0h6EzfVly8bTBXaZwcx4TyJ8pzYVd8=
github.com/bradfitz/go-smtpd v0.0.0-20170404230938-deb6d6237625/go.mod h1:HYsPBTaaSFSlLx/70C2HPIMNZpVV8+vt/A+FMnYP11g=
github.com/btcsuite/btcd/btcec/v2 v2.3.2 h1:5n0X6hX0Zk+6omWcihdYvdAlGf2DfasC0GMf7DClJ3U=
github.com/btcsuite/btcd/btcec/v2 v2.3.2/go.mod h1:zYzJ8etWJQIv1Ogk7OzpWjowwOdXY1W/17j2MW85J04=
Expand Down
12 changes: 0 additions & 12 deletions integration/qbft/tests/scenario_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package tests

import (
"context"
"fmt"
"testing"
"time"

Expand All @@ -21,11 +20,9 @@ import (
"github.com/bloxapp/ssv/networkconfig"
"github.com/bloxapp/ssv/operator/validator"
protocolbeacon "github.com/bloxapp/ssv/protocol/v2/blockchain/beacon"
protocolp2p "github.com/bloxapp/ssv/protocol/v2/p2p"
protocolstorage "github.com/bloxapp/ssv/protocol/v2/qbft/storage"
"github.com/bloxapp/ssv/protocol/v2/ssv/queue"
protocolvalidator "github.com/bloxapp/ssv/protocol/v2/ssv/validator"
"github.com/bloxapp/ssv/protocol/v2/sync/handlers"
"github.com/bloxapp/ssv/protocol/v2/types"
"github.com/bloxapp/ssv/storage/basedb"
"github.com/bloxapp/ssv/storage/kv"
Expand Down Expand Up @@ -63,15 +60,6 @@ func (s *Scenario) Run(t *testing.T, role spectypes.BeaconRole) {
for id := 1; id <= s.Committee; id++ {
id := spectypes.OperatorID(id)
s.validators[id] = createValidator(t, ctx, id, getKeySet(s.Committee), logger, s.shared.Nodes[id])

stores := newStores(logger)
s.shared.Nodes[id].RegisterHandlers(logger, protocolp2p.WithHandler(
protocolp2p.LastDecidedProtocol,
handlers.LastDecidedHandler(logger.Named(fmt.Sprintf("decided-handler-%d", id)), stores, s.shared.Nodes[id]),
), protocolp2p.WithHandler(
protocolp2p.DecidedHistoryProtocol,
handlers.HistoryHandler(logger.Named(fmt.Sprintf("history-handler-%d", id)), stores, s.shared.Nodes[id], 25),
))
}

//invoking duties
Expand Down
7 changes: 0 additions & 7 deletions network/p2p/p2p.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"github.com/bloxapp/ssv/network/peers/connections"
"github.com/bloxapp/ssv/network/records"
"github.com/bloxapp/ssv/network/streams"
"github.com/bloxapp/ssv/network/syncing"
"github.com/bloxapp/ssv/network/topics"
operatorstorage "github.com/bloxapp/ssv/operator/storage"
"github.com/bloxapp/ssv/utils/async"
Expand Down Expand Up @@ -72,7 +71,6 @@ type p2pNetwork struct {
backoffConnector *libp2pdiscbackoff.BackoffConnector
subnets []byte
libConnManager connmgrcore.ConnManager
syncer syncing.Syncer
nodeStorage operatorstorage.Storage
operatorPKCache sync.Map
}
Expand Down Expand Up @@ -172,11 +170,6 @@ func (n *p2pNetwork) Start(logger *zap.Logger) error {
return err
}

// Create & start ConcurrentSyncer.
syncer := syncing.NewConcurrent(n.ctx, syncing.New(n, n.msgValidator), 16, syncing.DefaultTimeouts, nil)
go syncer.Run(logger)
n.syncer = syncer

return nil
}

Expand Down
123 changes: 2 additions & 121 deletions network/p2p/p2p_sync.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,11 @@
package p2pv1

import (
"context"
"encoding/hex"
"fmt"
"math/rand"
"time"

"github.com/bloxapp/ssv-spec/qbft"
specqbft "github.com/bloxapp/ssv-spec/qbft"
spectypes "github.com/bloxapp/ssv-spec/types"
libp2pnetwork "github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
Expand All @@ -21,126 +18,8 @@ import (
"github.com/bloxapp/ssv/network/commons"
"github.com/bloxapp/ssv/protocol/v2/message"
p2pprotocol "github.com/bloxapp/ssv/protocol/v2/p2p"
"github.com/bloxapp/ssv/protocol/v2/ssv/queue"
)

func (n *p2pNetwork) SyncHighestDecided(mid spectypes.MessageID) error {
ctx := context.TODO() // TODO: pass context to SyncHighestDecided

return n.syncer.SyncHighestDecided(ctx, n.interfaceLogger, mid, func(msg *queue.DecodedSSVMessage) {
n.msgRouter.Route(ctx, msg)
})
}

func (n *p2pNetwork) SyncDecidedByRange(mid spectypes.MessageID, from, to qbft.Height) {
ctx := context.TODO() // TODO: pass context to SyncDecidedByRange
Comment on lines -27 to -36
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought we said we wanted to leave sync for fullnodes and not just removed entirely. wouldn't just deleting like this stop fullnode/explorer functionality?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, but since the history sync was disabled anyway, and we want to redesign the syncing, then we preferred to fully align to spec here by removing SyncDecidedByRange as well.


if !n.cfg.FullNode {
return
}
// TODO: uncomment to fix syncing bug!
// if from < to {
// n.logger.Warn("failed to sync decided by range: from is greater than to",
// zap.String("pubkey", hex.EncodeToString(mid.GetPubKey())),
// zap.String("role", mid.GetRoleType().String()),
// zap.Uint64("from", uint64(from)),
// zap.Uint64("to", uint64(to)))
// return
// }
if to > from {
n.interfaceLogger.Warn("failed to sync decided by range: to is higher than from",
zap.Uint64("from", uint64(from)),
zap.Uint64("to", uint64(to)))
return
}

// TODO: this is a temporary solution to prevent syncing already decided heights.
// Example: Say we received a decided at height 99, and right after we received a decided at height 100
// before we could advance the controller's height. This would cause the controller to call SyncDecidedByRange.
// However, height 99 is already synced, so temporarily we reject such requests here.
// Note: This isn't ideal because sometimes you do want to sync gaps of 1.
const minGap = 2
if to-from < minGap {
return
}

err := n.syncer.SyncDecidedByRange(ctx, n.interfaceLogger, mid, from, to, func(msg *queue.DecodedSSVMessage) {
n.msgRouter.Route(ctx, msg)
})
if err != nil {
n.interfaceLogger.Error("failed to sync decided by range", zap.Error(err))
}
}

// LastDecided fetches last decided from a random set of peers
func (n *p2pNetwork) LastDecided(logger *zap.Logger, mid spectypes.MessageID) ([]p2pprotocol.SyncResult, error) {
const (
minPeers = 3
waitTime = time.Second * 24
)
if !n.isReady() {
return nil, p2pprotocol.ErrNetworkIsNotReady
}
pid, maxPeers := commons.ProtocolID(p2pprotocol.LastDecidedProtocol)
peers, err := waitSubsetOfPeers(logger, n.getSubsetOfPeers, mid.GetPubKey(), minPeers, maxPeers, waitTime, allPeersFilter)
if err != nil {
return nil, errors.Wrap(err, "could not get subset of peers")
}
return n.makeSyncRequest(logger, peers, mid, pid, &message.SyncMessage{
Params: &message.SyncParams{
Identifier: mid,
},
Protocol: message.LastDecidedType,
})
}

// GetHistory sync the given range from a set of peers that supports history for the given identifier
func (n *p2pNetwork) GetHistory(logger *zap.Logger, mid spectypes.MessageID, from, to specqbft.Height, targets ...string) ([]p2pprotocol.SyncResult, specqbft.Height, error) {
if from >= to {
return nil, 0, nil
}

if !n.isReady() {
return nil, 0, p2pprotocol.ErrNetworkIsNotReady
}
protocolID, peerCount := commons.ProtocolID(p2pprotocol.DecidedHistoryProtocol)
peers := make([]peer.ID, 0)
for _, t := range targets {
p, err := peer.Decode(t)
if err != nil {
continue
}
peers = append(peers, p)
}
// if no peers were provided -> select a random set of peers
if len(peers) == 0 {
random, err := n.getSubsetOfPeers(logger, mid.GetPubKey(), peerCount, n.peersWithProtocolsFilter(protocolID))
if err != nil {
return nil, 0, errors.Wrap(err, "could not get subset of peers")
}
peers = random
}
maxBatchRes := specqbft.Height(n.cfg.MaxBatchResponse)

var results []p2pprotocol.SyncResult
var err error
currentEnd := to
if to-from > maxBatchRes {
currentEnd = from + maxBatchRes
}
results, err = n.makeSyncRequest(logger, peers, mid, protocolID, &message.SyncMessage{
Params: &message.SyncParams{
Height: []specqbft.Height{from, currentEnd},
Identifier: mid,
},
Protocol: message.DecidedHistoryType,
})
if err != nil {
return results, 0, err
}
return results, currentEnd, nil
}

// RegisterHandlers registers the given handlers
func (n *p2pNetwork) RegisterHandlers(logger *zap.Logger, handlers ...*p2pprotocol.SyncHandler) {
m := make(map[libp2p_protocol.ID][]p2pprotocol.RequestHandler)
Expand Down Expand Up @@ -277,6 +156,8 @@ func (n *p2pNetwork) makeSyncRequest(logger *zap.Logger, peers []peer.ID, mid sp
}

// peersWithProtocolsFilter is used to accept peers that supports the given protocols
//
//nolint:unused
func (n *p2pNetwork) peersWithProtocolsFilter(protocols ...libp2p_protocol.ID) func(peer.ID) bool {
return func(id peer.ID) bool {
supported, err := n.host.Network().Peerstore().SupportsProtocols(id, protocols...)
Expand Down
31 changes: 27 additions & 4 deletions network/p2p/p2p_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@ import (
"time"

"github.com/bloxapp/ssv/logging"
"github.com/bloxapp/ssv/network/commons"
"github.com/bloxapp/ssv/networkconfig"
"github.com/bloxapp/ssv/protocol/v2/message"
"github.com/bloxapp/ssv/protocol/v2/ssv/queue"

specqbft "github.com/bloxapp/ssv-spec/qbft"
Expand All @@ -20,7 +22,7 @@ import (
"go.uber.org/zap"

"github.com/bloxapp/ssv/network"
protcolp2p "github.com/bloxapp/ssv/protocol/v2/p2p"
p2pprotocol "github.com/bloxapp/ssv/protocol/v2/p2p"
)

func TestGetMaxPeers(t *testing.T) {
Expand Down Expand Up @@ -141,7 +143,7 @@ func TestP2pNetwork_Stream(t *testing.T) {
<-time.After(time.Second)

node := ln.Nodes[0]
res, err := node.LastDecided(logger, mid)
res, err := node.(*p2pNetwork).LastDecided(logger, mid)
require.NoError(t, err)
select {
case err := <-errors:
Expand Down Expand Up @@ -206,9 +208,30 @@ func TestWaitSubsetOfPeers(t *testing.T) {
}
}

func (n *p2pNetwork) LastDecided(logger *zap.Logger, mid spectypes.MessageID) ([]p2pprotocol.SyncResult, error) {
const (
minPeers = 3
waitTime = time.Second * 24
)
if !n.isReady() {
return nil, p2pprotocol.ErrNetworkIsNotReady
}
pid, maxPeers := commons.ProtocolID(p2pprotocol.LastDecidedProtocol)
peers, err := waitSubsetOfPeers(logger, n.getSubsetOfPeers, mid.GetPubKey(), minPeers, maxPeers, waitTime, allPeersFilter)
if err != nil {
return nil, errors.Wrap(err, "could not get subset of peers")
}
return n.makeSyncRequest(logger, peers, mid, pid, &message.SyncMessage{
Params: &message.SyncParams{
Identifier: mid,
},
Protocol: message.LastDecidedType,
})
}

func registerHandler(logger *zap.Logger, node network.P2PNetwork, mid spectypes.MessageID, height specqbft.Height, round specqbft.Round, counter *int64, errors chan<- error) {
node.RegisterHandlers(logger, &protcolp2p.SyncHandler{
Protocol: protcolp2p.LastDecidedProtocol,
node.RegisterHandlers(logger, &p2pprotocol.SyncHandler{
Protocol: p2pprotocol.LastDecidedProtocol,
Handler: func(message *spectypes.SSVMessage) (*spectypes.SSVMessage, error) {
atomic.AddInt64(counter, 1)
sm := specqbft.SignedMessage{
Expand Down
Loading
Loading