Skip to content

Commit

Permalink
p2psim: Enhance p2psim for DHT benchmarks
Browse files Browse the repository at this point in the history
This commit enhances the p2p simulation server to facilitate benchmarking of the discovery process in DHT. Key updates include:
- Exposing APIs for improved control and observability during simulation.
- Adding configurable parameters for simulation nodes (inproc) to support benchmarking scenarios.
- Implementing a new benchmark strategy to measure the impact of applying ENR filtering and increasing the DHT bucket size.
  • Loading branch information
sonhv0212 committed Dec 23, 2024
1 parent dfbbfea commit f22279e
Show file tree
Hide file tree
Showing 11 changed files with 1,193 additions and 33 deletions.
430 changes: 430 additions & 0 deletions cmd/p2psim/main.go

Large diffs are not rendered by default.

25 changes: 25 additions & 0 deletions p2p/discover/v4_udp.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ type UDPv4 struct {
tab *Table
closeOnce sync.Once
wg sync.WaitGroup
dirty bool // for testing

addReplyMatcher chan *replyMatcher
gotreply chan reply
Expand Down Expand Up @@ -155,6 +156,27 @@ func ListenV4(c UDPConn, ln *enode.LocalNode, cfg Config) (*UDPv4, error) {
return t, nil
}

// SetDirty sets the dirty flag for testing purposes.
func (t *UDPv4) SetDirty(dirty bool) {
t.dirty = dirty
}

// NodesInDHT returns all nodes in the DHT.
// For testing only.
func (t *UDPv4) NodesInDHT() [][]enode.Node {
if t == nil || t.tab == nil {
return nil
}
nodes := make([][]enode.Node, len(t.tab.buckets))
for i, bucket := range t.tab.buckets {
nodes[i] = make([]enode.Node, len(bucket.entries))
for j, entry := range bucket.entries {
nodes[i][j] = entry.Node
}
}
return nodes
}

// Self returns the local node.
func (t *UDPv4) Self() *enode.Node {
return t.localNode.Node()
Expand Down Expand Up @@ -771,6 +793,9 @@ func (t *UDPv4) verifyENRRequest(h *packetHandlerV4, from *net.UDPAddr, fromID e
}

func (t *UDPv4) handleENRRequest(h *packetHandlerV4, from *net.UDPAddr, fromID enode.ID, mac []byte) {
if t.dirty { // simulate dirty node, for testing purposes only
return
}
t.send(from, fromID, &v4wire.ENRResponse{
ReplyTok: mac,
Record: *t.localNode.Node().Record(),
Expand Down
20 changes: 20 additions & 0 deletions p2p/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,9 @@ type Config struct {
// Logger is a custom logger to use with the p2p.Server.
Logger log.Logger `toml:",omitempty"`

// Dirty is set to true if this node is dirty (for testing purposes).
Dirty bool

clock mclock.Clock
}

Expand Down Expand Up @@ -305,6 +308,18 @@ func (c *conn) set(f connFlag, val bool) {
}
}

// SetListenFunc sets the function used to accept inbound connections.
// For testing only.
func (srv *Server) SetListenFunc(f func(network, addr string) (net.Listener, error)) {
srv.listenFunc = f
}

// UDPv4 returns the UDPv4 discovery table.
// For testing only.
func (srv *Server) UDPv4() *discover.UDPv4 {
return srv.ntab
}

// LocalNode returns the local node record.
func (srv *Server) LocalNode() *enode.LocalNode {
return srv.localnode
Expand Down Expand Up @@ -637,6 +652,11 @@ func (srv *Server) setupDiscovery() error {
}
srv.ntab = ntab
srv.discmix.AddSource(ntab.RandomNodes())

// Mark the node as dirty (for testing purposes).
if srv.Dirty {
srv.ntab.SetDirty(true)
}
}

// Discovery V5
Expand Down
15 changes: 15 additions & 0 deletions p2p/simulations/adapters/exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -364,6 +364,21 @@ func (n *ExecNode) Snapshots() (map[string][]byte, error) {
return snapshots, n.client.Call(&snapshots, "simulation_snapshot")
}

// Empty PeerStats
func (n *ExecNode) PeerStats() *PeerStats {
return &PeerStats{}
}

// Empty DHT
func (n *ExecNode) NodesInDHT() [][]enode.Node {
return nil
}

// Empty PeersInfo
func (n *ExecNode) PeersInfo() []*p2p.PeerInfo {
return nil
}

// execNodeConfig is used to serialize the node configuration so it can be
// passed to the child process as a JSON encoded environment variable
type execNodeConfig struct {
Expand Down
212 changes: 204 additions & 8 deletions p2p/simulations/adapters/inproc.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,21 @@ import (
"context"
"errors"
"fmt"
"math"
"math/rand"
"net"
"strings"
"sync"
"time"

"github.com/ethereum/go-ethereum/core/forkid"
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/node"
"github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum/go-ethereum/p2p/enr"
"github.com/ethereum/go-ethereum/p2p/simulations/pipes"
"github.com/ethereum/go-ethereum/rlp"
"github.com/ethereum/go-ethereum/rpc"
"github.com/gorilla/websocket"
)
Expand Down Expand Up @@ -91,28 +96,80 @@ func (s *SimAdapter) NewNode(config *NodeConfig) (Node, error) {
return nil, err
}

p2pCfg := p2p.Config{
PrivateKey: config.PrivateKey,
MaxPeers: config.MaxPeers,
NoDiscovery: config.NoDiscovery,
EnableMsgEvents: config.EnableMsgEvents,
Dirty: strings.HasPrefix(config.Name, "dirty"),
}
if !config.DisableTCPListener {
p2pCfg.ListenAddr = fmt.Sprintf(":%d", config.Port)
} else {
p2pCfg.ListenAddr = ""
}
if len(config.BootstrapNodeURLs) > 0 {
for _, url := range strings.Split(config.BootstrapNodeURLs, ",") {
if len(url) == 0 {
continue
}
n, err := enode.Parse(enode.ValidSchemes, url)
if err != nil {
log.Warn("invalid bootstrap node URL", "url", url, "err", err)
continue
}
p2pCfg.BootstrapNodes = append(p2pCfg.BootstrapNodes, n)
}
}

n, err := node.New(&node.Config{
P2P: p2p.Config{
PrivateKey: config.PrivateKey,
MaxPeers: math.MaxInt32,
NoDiscovery: true,
Dialer: s,
EnableMsgEvents: config.EnableMsgEvents,
},
P2P: p2pCfg,
ExternalSigner: config.ExternalSigner,
Logger: log.New("node.id", id.String()),
})
if err != nil {
return nil, err
}

if config.UseFakeIPListener {
n.Server().SetListenFunc(listenFakeAddrFunc)
}

simNode := &SimNode{
ID: id,
config: config,
node: n,
adapter: s,
running: make(map[string]node.Lifecycle),
}
if !config.UseTCPDialer {
n.Server().Dialer = s
} else {
simNode.dialer = &wrapTCPDialerStats{
d: &net.Dialer{Timeout: 15 * time.Second},
resultCh: make(chan resultDial, 10000),
}
n.Server().Dialer = simNode.dialer
}

if config.EnableENRFilter {
n.Server().SetFilter(func(id forkid.ID) error {
var eth struct {
ForkID forkid.ID
Rest []rlp.RawValue `rlp:"tail"`
}
if err := n.Server().Self().Record().Load(enr.WithEntry("eth", &eth)); err != nil {
log.Warn("failed to load eth entry", "err", err)
return err
}

if id == eth.ForkID {
return nil
}
return forkid.ErrLocalIncompatibleOrStale
})
}

s.nodes[id] = simNode
return simNode, nil
}
Expand Down Expand Up @@ -162,6 +219,8 @@ func (s *SimAdapter) GetNode(id enode.ID) (*SimNode, bool) {
// net.Pipe (see SimAdapter.Dial), running devp2p protocols directly over that
// pipe
type SimNode struct {
ctx context.Context
cancel context.CancelFunc
lock sync.RWMutex
ID enode.ID
config *NodeConfig
Expand All @@ -170,6 +229,11 @@ type SimNode struct {
running map[string]node.Lifecycle
client *rpc.Client
registerOnce sync.Once
dialer *wrapTCPDialerStats

// Track different nodes discovered by the node
discoveredNodes sync.Map
differentNodeCount int
}

// Close closes the underlaying node.Node to release
Expand Down Expand Up @@ -240,6 +304,15 @@ func (sn *SimNode) Snapshots() (map[string][]byte, error) {

// Start registers the services and starts the underlying devp2p node
func (sn *SimNode) Start(snapshots map[string][]byte) error {
sn.lock.Lock()
if sn.cancel != nil {
sn.lock.Unlock()
return errors.New("node already started")
}

sn.ctx, sn.cancel = context.WithCancel(context.Background())
sn.lock.Unlock()

// ensure we only register the services once in the case of the node
// being stopped and then started again
var regErr error
Expand Down Expand Up @@ -282,6 +355,8 @@ func (sn *SimNode) Start(snapshots map[string][]byte) error {
sn.client = client
sn.lock.Unlock()

go sn.trackDiscoveredNode()

return nil
}

Expand All @@ -292,6 +367,10 @@ func (sn *SimNode) Stop() error {
sn.client.Close()
sn.client = nil
}
if sn.cancel != nil {
sn.cancel()
sn.cancel = nil
}
sn.lock.Unlock()
return sn.node.Close()
}
Expand Down Expand Up @@ -351,3 +430,120 @@ func (sn *SimNode) NodeInfo() *p2p.NodeInfo {
}
return server.NodeInfo()
}

// PeerStats returns statistics about the node's peers
func (sn *SimNode) PeerStats() *PeerStats {
if sn.dialer == nil || sn.node.Server() == nil || sn.node.Server().UDPv4() == nil {
return &PeerStats{}
}

nodesCount := 0
sn.discoveredNodes.Range(func(_, _ interface{}) bool {
nodesCount++
return true
})
buckets := sn.node.Server().UDPv4().NodesInDHT()
bucketSizes := make([]int, len(buckets))
for i, bucket := range buckets {
bucketSizes[i] = len(bucket)
}
return &PeerStats{
PeerCount: sn.node.Server().PeerCount(),
Failed: sn.dialer.failed,
Tried: sn.dialer.tried,
DifferentNodesDiscovered: nodesCount,
DHTBuckets: bucketSizes,
}
}

// NodesInDHT returns the nodes in the DHT buckets
func (sn *SimNode) NodesInDHT() [][]enode.Node {
if sn.node.Server() == nil || sn.node.Server().UDPv4() == nil {
return nil
}
return sn.node.Server().UDPv4().NodesInDHT()
}

// PeersInfo returns information about the node's peers
func (sn *SimNode) PeersInfo() []*p2p.PeerInfo {
if sn.node.Server() == nil {
return nil
}
return sn.node.Server().PeersInfo()
}

// trackDiscoveredNodes tracks all nodes discovered by the node and dial by wrapTCPDialerStats
func (sn *SimNode) trackDiscoveredNode() {
if sn.dialer == nil {
return
}

for {
select {
case <-sn.ctx.Done():
return
case r := <-sn.dialer.resultCh:
if _, ok := sn.discoveredNodes.LoadOrStore(r.node, struct{}{}); !ok {
sn.differentNodeCount++
}
if r.err != nil {
log.Info("dial failed", "node", r.node, "err", r.err)
sn.dialer.failed++
}
log.Info("dial tried", "from", sn.ID, "to", r.node)
sn.dialer.tried++
}
}
}

func listenFakeAddrFunc(network, laddr string) (net.Listener, error) {
l, err := net.Listen(network, laddr)
if err != nil {
return nil, err
}
fakeAddr := &net.TCPAddr{IP: net.IP{byte(rand.Intn(255)), byte(rand.Intn(255)), byte(rand.Intn(255)), byte(rand.Intn(255))}, Port: rand.Intn(65535)}
return &fakeAddrListener{l, fakeAddr}, nil
}

// fakeAddrListener is a listener that creates connections with a mocked remote address.
type fakeAddrListener struct {
net.Listener
remoteAddr net.Addr
}

type fakeAddrConn struct {
net.Conn
remoteAddr net.Addr
}

func (l *fakeAddrListener) Accept() (net.Conn, error) {
c, err := l.Listener.Accept()
if err != nil {
return nil, err
}
return &fakeAddrConn{c, l.remoteAddr}, nil
}

func (c *fakeAddrConn) RemoteAddr() net.Addr {
return c.remoteAddr
}

// wrapTCPDialerStats is a wrapper around the net.Dialer which tracks nodes that have been tried to dial
type wrapTCPDialerStats struct {
d *net.Dialer
failed int
tried int
resultCh chan resultDial
}

type resultDial struct {
err error
node enode.ID
}

func (d wrapTCPDialerStats) Dial(ctx context.Context, dest *enode.Node) (net.Conn, error) {
nodeAddr := &net.TCPAddr{IP: dest.IP(), Port: dest.TCP()}
conn, err := d.d.DialContext(ctx, "tcp", nodeAddr.String())
d.resultCh <- resultDial{err, dest.ID()}
return conn, err
}
Loading

0 comments on commit f22279e

Please sign in to comment.