Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

p2psim: Enhance p2psim for DHT benchmarks #643

Merged
merged 8 commits into from
Dec 26, 2024
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
420 changes: 420 additions & 0 deletions cmd/p2psim/main.go

Large diffs are not rendered by default.

16 changes: 16 additions & 0 deletions p2p/discover/v4_udp.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,22 @@ func ListenV4(c UDPConn, ln *enode.LocalNode, cfg Config) (*UDPv4, error) {
return t, nil
}

// NodesInDHT returns all nodes in the DHT.
// For testing only.
func (t *UDPv4) NodesInDHT() [][]enode.Node {
if t == nil || t.tab == nil {
return nil
}
nodes := make([][]enode.Node, len(t.tab.buckets))
for i, bucket := range t.tab.buckets {
nodes[i] = make([]enode.Node, len(bucket.entries))
for j, entry := range bucket.entries {
nodes[i][j] = entry.Node
}
}
return nodes
}

// Self returns the local node.
func (t *UDPv4) Self() *enode.Node {
return t.localNode.Node()
Expand Down
12 changes: 12 additions & 0 deletions p2p/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,18 @@ func (c *conn) set(f connFlag, val bool) {
}
}

// SetListenFunc sets the function used to accept inbound connections.
// For testing only.
func (srv *Server) SetListenFunc(f func(network, addr string) (net.Listener, error)) {
srv.listenFunc = f
}

// UDPv4 returns the UDPv4 discovery table.
// For testing only.
func (srv *Server) UDPv4() *discover.UDPv4 {
return srv.ntab
}

// LocalNode returns the local node record.
func (srv *Server) LocalNode() *enode.LocalNode {
return srv.localnode
Expand Down
15 changes: 15 additions & 0 deletions p2p/simulations/adapters/exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -364,6 +364,21 @@ func (n *ExecNode) Snapshots() (map[string][]byte, error) {
return snapshots, n.client.Call(&snapshots, "simulation_snapshot")
}

// Empty PeerStats
func (n *ExecNode) PeerStats() *PeerStats {
return &PeerStats{}
}

// Empty DHT
func (n *ExecNode) NodesInDHT() [][]enode.Node {
return nil
}

// Empty PeersInfo
func (n *ExecNode) PeersInfo() []*p2p.PeerInfo {
return nil
}

// execNodeConfig is used to serialize the node configuration so it can be
// passed to the child process as a JSON encoded environment variable
type execNodeConfig struct {
Expand Down
176 changes: 167 additions & 9 deletions p2p/simulations/adapters/inproc.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,20 @@ import (
"context"
"errors"
"fmt"
"math"
"net"
"strings"
"sync"
"time"

"github.com/ethereum/go-ethereum/core/forkid"
"github.com/ethereum/go-ethereum/event"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please update inproc_test for adapting the new changes.

"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/node"
"github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum/go-ethereum/p2p/enr"
"github.com/ethereum/go-ethereum/p2p/simulations/pipes"
"github.com/ethereum/go-ethereum/rlp"
"github.com/ethereum/go-ethereum/rpc"
"github.com/gorilla/websocket"
)
Expand Down Expand Up @@ -91,16 +95,35 @@ func (s *SimAdapter) NewNode(config *NodeConfig) (Node, error) {
return nil, err
}

p2pCfg := p2p.Config{
PrivateKey: config.PrivateKey,
MaxPeers: config.MaxPeers,
NoDiscovery: config.NoDiscovery,
EnableMsgEvents: config.EnableMsgEvents,
}
if !config.DisableTCPListener {
p2pCfg.ListenAddr = fmt.Sprintf(":%d", config.Port)
} else {
p2pCfg.ListenAddr = ""
}
if len(config.BootstrapNodeURLs) > 0 {
for _, url := range strings.Split(config.BootstrapNodeURLs, ",") {
if len(url) == 0 {
continue
}
n, err := enode.Parse(enode.ValidSchemes, url)
if err != nil {
log.Warn("invalid bootstrap node URL", "url", url, "err", err)
continue
}
p2pCfg.BootstrapNodes = append(p2pCfg.BootstrapNodes, n)
}
}

n, err := node.New(&node.Config{
P2P: p2p.Config{
PrivateKey: config.PrivateKey,
MaxPeers: math.MaxInt32,
NoDiscovery: true,
Dialer: s,
EnableMsgEvents: config.EnableMsgEvents,
},
P2P: p2pCfg,
ExternalSigner: config.ExternalSigner,
Logger: log.New("node.id", id.String()),
Logger: log.New("node.name", config.Name),
})
if err != nil {
return nil, err
Expand All @@ -113,6 +136,34 @@ func (s *SimAdapter) NewNode(config *NodeConfig) (Node, error) {
adapter: s,
running: make(map[string]node.Lifecycle),
}
if !config.UseTCPDialer {
n.Server().Dialer = s
} else {
simNode.dialer = &wrapTCPDialerStats{
d: &net.Dialer{Timeout: 15 * time.Second},
resultCh: make(chan resultDial, 10000),
}
n.Server().Dialer = simNode.dialer
}

if config.EnableENRFilter {
n.Server().SetFilter(func(id forkid.ID) error {
var eth struct {
ForkID forkid.ID
Rest []rlp.RawValue `rlp:"tail"`
}
if err := n.Server().Self().Record().Load(enr.WithEntry("eth", &eth)); err != nil {
log.Warn("failed to load eth entry", "err", err)
return err
}

if id == eth.ForkID {
return nil
}
return forkid.ErrLocalIncompatibleOrStale
})
}

s.nodes[id] = simNode
return simNode, nil
}
Expand Down Expand Up @@ -162,6 +213,8 @@ func (s *SimAdapter) GetNode(id enode.ID) (*SimNode, bool) {
// net.Pipe (see SimAdapter.Dial), running devp2p protocols directly over that
// pipe
type SimNode struct {
ctx context.Context
cancel context.CancelFunc
lock sync.RWMutex
ID enode.ID
config *NodeConfig
Expand All @@ -170,6 +223,11 @@ type SimNode struct {
running map[string]node.Lifecycle
client *rpc.Client
registerOnce sync.Once
dialer *wrapTCPDialerStats

// Track different nodes discovered by the node
discoveredNodes sync.Map
differentNodeCount int
}

// Close closes the underlaying node.Node to release
Expand Down Expand Up @@ -240,6 +298,15 @@ func (sn *SimNode) Snapshots() (map[string][]byte, error) {

// Start registers the services and starts the underlying devp2p node
func (sn *SimNode) Start(snapshots map[string][]byte) error {
sn.lock.Lock()
if sn.cancel != nil {
sn.lock.Unlock()
return errors.New("node already started")
}

sn.ctx, sn.cancel = context.WithCancel(context.Background())
sn.lock.Unlock()

// ensure we only register the services once in the case of the node
// being stopped and then started again
var regErr error
Expand Down Expand Up @@ -282,6 +349,8 @@ func (sn *SimNode) Start(snapshots map[string][]byte) error {
sn.client = client
sn.lock.Unlock()

go sn.trackDiscoveredNode()

return nil
}

Expand All @@ -292,6 +361,10 @@ func (sn *SimNode) Stop() error {
sn.client.Close()
sn.client = nil
}
if sn.cancel != nil {
sn.cancel()
sn.cancel = nil
}
sn.lock.Unlock()
return sn.node.Close()
}
Expand Down Expand Up @@ -351,3 +424,88 @@ func (sn *SimNode) NodeInfo() *p2p.NodeInfo {
}
return server.NodeInfo()
}

// PeerStats returns statistics about the node's peers
func (sn *SimNode) PeerStats() *PeerStats {
if sn.dialer == nil || sn.node.Server() == nil || sn.node.Server().UDPv4() == nil {
return &PeerStats{}
}

nodesCount := 0
sn.discoveredNodes.Range(func(_, _ interface{}) bool {
nodesCount++
return true
})
buckets := sn.node.Server().UDPv4().NodesInDHT()
bucketSizes := make([]int, len(buckets))
for i, bucket := range buckets {
bucketSizes[i] = len(bucket)
}
return &PeerStats{
PeerCount: sn.node.Server().PeerCount(),
Failed: sn.dialer.failed,
Tried: sn.dialer.tried,
DifferentNodesDiscovered: nodesCount,
DHTBuckets: bucketSizes,
}
}

// NodesInDHT returns the nodes in the DHT buckets
func (sn *SimNode) NodesInDHT() [][]enode.Node {
if sn.node.Server() == nil || sn.node.Server().UDPv4() == nil {
return nil
}
return sn.node.Server().UDPv4().NodesInDHT()
}

// PeersInfo returns information about the node's peers
func (sn *SimNode) PeersInfo() []*p2p.PeerInfo {
if sn.node.Server() == nil {
return nil
}
return sn.node.Server().PeersInfo()
}

// trackDiscoveredNodes tracks all nodes discovered by the node and dial by wrapTCPDialerStats
func (sn *SimNode) trackDiscoveredNode() {
if sn.dialer == nil {
return
}

for {
select {
case <-sn.ctx.Done():
return
case r := <-sn.dialer.resultCh:
if _, ok := sn.discoveredNodes.LoadOrStore(r.node, struct{}{}); !ok {
sn.differentNodeCount++
}
if r.err != nil {
log.Info("dial failed", "node", r.node, "err", r.err)
sn.dialer.failed++
}
log.Info("dial tried", "from", sn.ID, "to", r.node)
sn.dialer.tried++
}
}
}

// wrapTCPDialerStats is a wrapper around the net.Dialer which tracks nodes that have been tried to dial
type wrapTCPDialerStats struct {
d *net.Dialer
failed int
tried int
resultCh chan resultDial
}

type resultDial struct {
err error
node enode.ID
}

func (d wrapTCPDialerStats) Dial(ctx context.Context, dest *enode.Node) (net.Conn, error) {
nodeAddr := &net.TCPAddr{IP: dest.IP(), Port: dest.TCP()}
conn, err := d.d.DialContext(ctx, "tcp", nodeAddr.String())
d.resultCh <- resultDial{err, dest.ID()}
return conn, err
}
Loading
Loading