diff --git a/cmd/devp2p/main.go b/cmd/devp2p/main.go index 4d7b3d872..2e73ba7b7 100644 --- a/cmd/devp2p/main.go +++ b/cmd/devp2p/main.go @@ -64,6 +64,7 @@ func init() { dnsCommand, nodesetCommand, rlpxCommand, + simNetworkCommand, } app.Before = func(ctx *cli.Context) error { flags.MigrateGlobalFlags(ctx) diff --git a/cmd/devp2p/simnetwork.go b/cmd/devp2p/simnetwork.go new file mode 100644 index 000000000..1796d3500 --- /dev/null +++ b/cmd/devp2p/simnetwork.go @@ -0,0 +1,253 @@ +package main + +import ( + "context" + "crypto/ecdsa" + "fmt" + "net" + "os" + "os/signal" + "strings" + "syscall" + "time" + + "github.com/ethereum/go-ethereum/crypto" + "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/p2p" + "github.com/ethereum/go-ethereum/p2p/enode" + "github.com/urfave/cli/v2" + "golang.org/x/exp/rand" +) + +var ( + simNetworkCommand = &cli.Command{ + Name: "network", + Usage: "Simulated p2p network", + Subcommands: []*cli.Command{ + bootNodeCommand, + nodesCommand, + }, + } + bootNodeCommand = &cli.Command{ + Name: "bootnode", + Usage: "Start a bootnode", + Action: simNetworkStartBootnode, + Flags: []cli.Flag{ + &cli.IntFlag{ + Name: "max.peers", + Value: 50, + Usage: "maximum number of peers per node", + }, + &cli.IntFlag{ + Name: "dht.bucketsize", + Value: 16, + Usage: "Size of each bucket in DHT", + }, + &cli.IntFlag{ + Name: "port", + Value: 30303, + Usage: "Port to listen on", + }, + }, + } + nodesCommand = &cli.Command{ + Name: "nodes", + Usage: "Start nodes", + Action: simNetworkStartNodes, + Flags: []cli.Flag{ + &cli.IntFlag{ + Name: "num.nodes", + Value: 100, + Usage: "number of nodes to create", + }, + &cli.IntFlag{ + Name: "max.peers", + Value: 50, + Usage: "maximum number of peers per node", + }, + &cli.IntFlag{ + Name: "dht.bucketsize", + Value: 16, + Usage: "Size of each bucket in DHT", + }, + &cli.IntFlag{ + Name: "port", + Value: 50000, + Usage: "Port of the first node", + }, + &cli.StringFlag{ + Name: "bootnodes", + Usage: "Comma separated nodes used for bootstrapping", + }, + }, + } + statsPeriod = 5 * time.Second +) + +func simNetworkStartBootnode(ctx *cli.Context) error { + sig := make(chan os.Signal, 1) + signal.Notify(sig, syscall.SIGINT, syscall.SIGTERM) + maxPeers := ctx.Int("max.peers") + dhtBucketSize := ctx.Int("dht.bucketsize") + port := ctx.Int("port") + + // Start the bootnode + bootstrapServer := &p2p.Server{ + Config: p2p.Config{ + PrivateKey: newkey(), + ListenAddr: fmt.Sprintf(":%d", port), + MaxPeers: maxPeers, + Protocols: []p2p.Protocol{}, + Logger: log.New(), + DHTBucketSize: dhtBucketSize, + }, + } + bootstrapServer.SetListenFunc(listenFakeAddrFunc) + log.Info("Starting bootnode", "url", bootstrapServer.Self().URLv4()) + if err := bootstrapServer.Start(); err != nil { + log.Error("Failed to start bootnode", "err", err) + return err + } + defer bootstrapServer.Stop() + + go showPeersStats(ctx.Context, []*p2p.Server{bootstrapServer}, statsPeriod) + + <-sig + + return nil +} + +func simNetworkStartNodes(ctx *cli.Context) error { + sig := make(chan os.Signal, 1) + signal.Notify(sig, syscall.SIGINT, syscall.SIGTERM) + maxPeers := ctx.Int("max.peers") + dhtBucketSize := ctx.Int("dht.bucketsize") + port := ctx.Int("port") + numNodes := ctx.Int("num.nodes") + bootnodes := []*enode.Node{} + if ctx.IsSet("bootnodes") { + urls := strings.Split(ctx.String("bootnodes"), ",") + for _, url := range urls { + if url != "" { + node, err := enode.Parse(enode.ValidSchemes, url) + if err == nil { + bootnodes = append(bootnodes, node) + } + } + } + } + + nodes := make([]*p2p.Server, numNodes) + for i := 0; i < numNodes; i++ { + // Create bootnodes if needed + srv := &p2p.Server{ + Config: p2p.Config{ + PrivateKey: newkey(), + ListenAddr: fmt.Sprintf(":%d", port+i), + MaxPeers: maxPeers, + Protocols: []p2p.Protocol{}, + Logger: log.New("node", fmt.Sprintf("node%02d", i)), + DHTBucketSize: dhtBucketSize, + BootstrapNodes: bootnodes, + }, + } + srv.SetListenFunc(listenFakeAddrFunc) + nodes[i] = srv + } + + for _, node := range nodes { + node.Start() + } + defer func() { + for _, node := range nodes { + node.Stop() + } + }() + + go showPeersStats(ctx.Context, nodes, statsPeriod) + + <-sig + log.Info("Stopped nodes") + + return nil +} + +func newkey() *ecdsa.PrivateKey { + key, err := crypto.GenerateKey() + if err != nil { + panic("couldn't generate key: " + err.Error()) + } + return key +} + +// fakeAddrListener is a listener that creates connections with a mocked remote address. +type fakeAddrListener struct { + net.Listener + remoteAddr net.Addr +} + +type fakeAddrConn struct { + net.Conn + remoteAddr net.Addr +} + +func (l *fakeAddrListener) Accept() (net.Conn, error) { + c, err := l.Listener.Accept() + if err != nil { + return nil, err + } + return &fakeAddrConn{c, l.remoteAddr}, nil +} + +func (c *fakeAddrConn) RemoteAddr() net.Addr { + return c.remoteAddr +} + +func listenFakeAddrFunc(network, laddr string) (net.Listener, error) { + l, err := net.Listen(network, laddr) + if err != nil { + return nil, err + } + fakeAddr := &net.TCPAddr{IP: net.IP{byte(rand.Intn(255)), byte(rand.Intn(255)), byte(rand.Intn(255)), byte(rand.Intn(255))}, Port: rand.Intn(65535)} + return &fakeAddrListener{l, fakeAddr}, nil +} + +func showPeersStats(ctx context.Context, servers []*p2p.Server, period time.Duration) { + ticker := time.NewTicker(period) + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + minPeers, maxPeers, avgPeers := 0xffffffff, 0, 0 + for _, s := range servers { + n := s.PeerCount() + if n < minPeers { + minPeers = n + } + if n > maxPeers { + maxPeers = n + } + avgPeers += n + } + avgPeers /= len(servers) + + avgBucketSizes := make([]int, 0) + for i := 0; i < len(servers); i++ { + bucketSizes := servers[i].UDPv4().BucketSizes() + for j, size := range bucketSizes { + if j >= len(avgBucketSizes) { + avgBucketSizes = append(avgBucketSizes, size) + } else { + avgBucketSizes[j] += size + } + } + } + for i := 0; i < len(avgBucketSizes); i++ { + avgBucketSizes[i] /= len(servers) + } + + log.Info("Peers stats", "min", minPeers, "max", maxPeers, "avg", avgPeers, "avgBucketSizes", avgBucketSizes) + } + } +} diff --git a/p2p/discover/v4_udp.go b/p2p/discover/v4_udp.go index bf1e9080a..7923ad9a9 100644 --- a/p2p/discover/v4_udp.go +++ b/p2p/discover/v4_udp.go @@ -155,6 +155,14 @@ func ListenV4(c UDPConn, ln *enode.LocalNode, cfg Config) (*UDPv4, error) { return t, nil } +func (t *UDPv4) BucketSizes() []int { + sizes := make([]int, len(t.tab.buckets)) + for i, bucket := range t.tab.buckets { + sizes[i] = len(bucket.entries) + } + return sizes +} + // Self returns the local node. func (t *UDPv4) Self() *enode.Node { return t.localNode.Node() diff --git a/p2p/server.go b/p2p/server.go index 2577a3221..5bf67483b 100644 --- a/p2p/server.go +++ b/p2p/server.go @@ -308,6 +308,15 @@ func (c *conn) set(f connFlag, val bool) { } } +// SetListenFunc sets the function used to accept inbound connections (testing only) +func (srv *Server) SetListenFunc(f func(network, addr string) (net.Listener, error)) { + srv.listenFunc = f +} + +func (srv *Server) UDPv4() *discover.UDPv4 { + return srv.ntab +} + // LocalNode returns the local node record. func (srv *Server) LocalNode() *enode.LocalNode { return srv.localnode