Skip to content

Commit

Permalink
devp2p: Add network simulation
Browse files Browse the repository at this point in the history
  • Loading branch information
sonhv0212 committed Dec 11, 2024
1 parent 84a44c6 commit d682390
Show file tree
Hide file tree
Showing 4 changed files with 271 additions and 0 deletions.
1 change: 1 addition & 0 deletions cmd/devp2p/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ func init() {
dnsCommand,
nodesetCommand,
rlpxCommand,
simNetworkCommand,
}
app.Before = func(ctx *cli.Context) error {
flags.MigrateGlobalFlags(ctx)
Expand Down
253 changes: 253 additions & 0 deletions cmd/devp2p/simnetwork.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,253 @@
package main

import (
"context"
"crypto/ecdsa"
"fmt"
"net"
"os"
"os/signal"
"strings"
"syscall"
"time"

"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/urfave/cli/v2"
"golang.org/x/exp/rand"
)

var (
simNetworkCommand = &cli.Command{
Name: "network",
Usage: "Simulated p2p network",
Subcommands: []*cli.Command{
bootNodeCommand,
nodesCommand,
},
}
bootNodeCommand = &cli.Command{
Name: "bootnode",
Usage: "Start a bootnode",
Action: simNetworkStartBootnode,
Flags: []cli.Flag{
&cli.IntFlag{
Name: "max.peers",
Value: 50,
Usage: "maximum number of peers per node",
},
&cli.IntFlag{
Name: "dht.bucketsize",
Value: 16,
Usage: "Size of each bucket in DHT",
},
&cli.IntFlag{
Name: "port",
Value: 30303,
Usage: "Port to listen on",
},
},
}
nodesCommand = &cli.Command{
Name: "nodes",
Usage: "Start nodes",
Action: simNetworkStartNodes,
Flags: []cli.Flag{
&cli.IntFlag{
Name: "num.nodes",
Value: 100,
Usage: "number of nodes to create",
},
&cli.IntFlag{
Name: "max.peers",
Value: 50,
Usage: "maximum number of peers per node",
},
&cli.IntFlag{
Name: "dht.bucketsize",
Value: 16,
Usage: "Size of each bucket in DHT",
},
&cli.IntFlag{
Name: "port",
Value: 50000,
Usage: "Port of the first node",
},
&cli.StringFlag{
Name: "bootnodes",
Usage: "Comma separated nodes used for bootstrapping",
},
},
}
statsPeriod = 5 * time.Second
)

func simNetworkStartBootnode(ctx *cli.Context) error {
sig := make(chan os.Signal, 1)
signal.Notify(sig, syscall.SIGINT, syscall.SIGTERM)
maxPeers := ctx.Int("max.peers")
dhtBucketSize := ctx.Int("dht.bucketsize")
port := ctx.Int("port")

// Start the bootnode
bootstrapServer := &p2p.Server{
Config: p2p.Config{
PrivateKey: newkey(),
ListenAddr: fmt.Sprintf(":%d", port),
MaxPeers: maxPeers,
Protocols: []p2p.Protocol{},
Logger: log.New(),
DHTBucketSize: dhtBucketSize,
},
}
bootstrapServer.SetListenFunc(listenFakeAddrFunc)
log.Info("Starting bootnode", "url", bootstrapServer.Self().URLv4())
if err := bootstrapServer.Start(); err != nil {
log.Error("Failed to start bootnode", "err", err)
return err
}
defer bootstrapServer.Stop()

go showPeersStats(ctx.Context, []*p2p.Server{bootstrapServer}, statsPeriod)

<-sig

return nil
}

func simNetworkStartNodes(ctx *cli.Context) error {
sig := make(chan os.Signal, 1)
signal.Notify(sig, syscall.SIGINT, syscall.SIGTERM)
maxPeers := ctx.Int("max.peers")
dhtBucketSize := ctx.Int("dht.bucketsize")
port := ctx.Int("port")
numNodes := ctx.Int("num.nodes")
bootnodes := []*enode.Node{}
if ctx.IsSet("bootnodes") {
urls := strings.Split(ctx.String("bootnodes"), ",")
for _, url := range urls {
if url != "" {
node, err := enode.Parse(enode.ValidSchemes, url)
if err == nil {
bootnodes = append(bootnodes, node)
}
}
}
}

nodes := make([]*p2p.Server, numNodes)
for i := 0; i < numNodes; i++ {
// Create bootnodes if needed
srv := &p2p.Server{
Config: p2p.Config{
PrivateKey: newkey(),
ListenAddr: fmt.Sprintf(":%d", port+i),
MaxPeers: maxPeers,
Protocols: []p2p.Protocol{},
Logger: log.New("node", fmt.Sprintf("node%02d", i)),
DHTBucketSize: dhtBucketSize,
BootstrapNodes: bootnodes,
},
}
srv.SetListenFunc(listenFakeAddrFunc)
nodes[i] = srv
}

for _, node := range nodes {
node.Start()
}
defer func() {
for _, node := range nodes {
node.Stop()
}
}()

go showPeersStats(ctx.Context, nodes, statsPeriod)

<-sig
log.Info("Stopped nodes")

return nil
}

func newkey() *ecdsa.PrivateKey {
key, err := crypto.GenerateKey()
if err != nil {
panic("couldn't generate key: " + err.Error())
}
return key
}

// fakeAddrListener is a listener that creates connections with a mocked remote address.
type fakeAddrListener struct {
net.Listener
remoteAddr net.Addr
}

type fakeAddrConn struct {
net.Conn
remoteAddr net.Addr
}

func (l *fakeAddrListener) Accept() (net.Conn, error) {
c, err := l.Listener.Accept()
if err != nil {
return nil, err
}
return &fakeAddrConn{c, l.remoteAddr}, nil
}

func (c *fakeAddrConn) RemoteAddr() net.Addr {
return c.remoteAddr
}

func listenFakeAddrFunc(network, laddr string) (net.Listener, error) {
l, err := net.Listen(network, laddr)
if err != nil {
return nil, err
}
fakeAddr := &net.TCPAddr{IP: net.IP{byte(rand.Intn(255)), byte(rand.Intn(255)), byte(rand.Intn(255)), byte(rand.Intn(255))}, Port: rand.Intn(65535)}
return &fakeAddrListener{l, fakeAddr}, nil
}

func showPeersStats(ctx context.Context, servers []*p2p.Server, period time.Duration) {
ticker := time.NewTicker(period)
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
minPeers, maxPeers, avgPeers := 0xffffffff, 0, 0
for _, s := range servers {
n := s.PeerCount()
if n < minPeers {
minPeers = n
}
if n > maxPeers {
maxPeers = n
}
avgPeers += n
}
avgPeers /= len(servers)

avgBucketSizes := make([]int, 0)
for i := 0; i < len(servers); i++ {
bucketSizes := servers[i].UDPv4().BucketSizes()
for j, size := range bucketSizes {
if j >= len(avgBucketSizes) {
avgBucketSizes = append(avgBucketSizes, size)
} else {
avgBucketSizes[j] += size
}
}
}
for i := 0; i < len(avgBucketSizes); i++ {
avgBucketSizes[i] /= len(servers)
}

log.Info("Peers stats", "min", minPeers, "max", maxPeers, "avg", avgPeers, "avgBucketSizes", avgBucketSizes)
}
}
}
8 changes: 8 additions & 0 deletions p2p/discover/v4_udp.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,14 @@ func ListenV4(c UDPConn, ln *enode.LocalNode, cfg Config) (*UDPv4, error) {
return t, nil
}

func (t *UDPv4) BucketSizes() []int {
sizes := make([]int, len(t.tab.buckets))
for i, bucket := range t.tab.buckets {
sizes[i] = len(bucket.entries)
}
return sizes
}

// Self returns the local node.
func (t *UDPv4) Self() *enode.Node {
return t.localNode.Node()
Expand Down
9 changes: 9 additions & 0 deletions p2p/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,15 @@ func (c *conn) set(f connFlag, val bool) {
}
}

// SetListenFunc sets the function used to accept inbound connections (testing only)
func (srv *Server) SetListenFunc(f func(network, addr string) (net.Listener, error)) {
srv.listenFunc = f
}

func (srv *Server) UDPv4() *discover.UDPv4 {
return srv.ntab
}

// LocalNode returns the local node record.
func (srv *Server) LocalNode() *enode.LocalNode {
return srv.localnode
Expand Down

0 comments on commit d682390

Please sign in to comment.