diff --git a/cmd/p2psim/main.go b/cmd/p2psim/main.go index 451b0d942d..f2a92841a6 100644 --- a/cmd/p2psim/main.go +++ b/cmd/p2psim/main.go @@ -40,10 +40,15 @@ import ( "encoding/json" "fmt" "io" + "math/rand" "os" + "os/signal" "strings" + "syscall" "text/tabwriter" + "time" + "github.com/ethereum/go-ethereum/cmd/utils" "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/p2p" "github.com/ethereum/go-ethereum/p2p/enode" @@ -102,6 +107,32 @@ func main() { Usage: "load a network snapshot from stdin", Action: loadSnapshot, }, + { + Name: "network", + Usage: "manage the simulation network", + Subcommands: []*cli.Command{ + { + Name: "start", + Usage: "start all nodes in the network", + Action: startNetwork, + }, + { + Name: "peer-stats", + Usage: "show peer stats", + Action: getNetworkPeerStats, + }, + { + Name: "dht", + Usage: "Get all nodes in the DHT of all nodes", + Action: getAllDHT, + }, + { + Name: "peers", + Usage: "Get all peers of all nodes", + Action: getAllNodePeersInfo, + }, + }, + }, { Name: "node", Usage: "manage simulation nodes", @@ -132,6 +163,106 @@ func main() { Value: "", Usage: "node private key (hex encoded)", }, + &cli.BoolFlag{ + Name: "sim.dialer", + Usage: "Use the simulation dialer", + }, + &cli.BoolFlag{ + Name: "fake.iplistener", + Usage: "Use the fake listener to random remote ip when accepting connections", + }, + &cli.BoolFlag{ + Name: "start", + Usage: "start the node after creating successfully", + }, + &cli.BoolFlag{ + Name: "autofill.bootnodes", + Usage: "autofill bootnodes with existing bootnodes from manager", + }, + &cli.StringFlag{ + Name: "node.type", + Value: "default", + Usage: "Set node type (default, outbound, dirty, bootnode)", + }, + &cli.BoolFlag{ + Name: "enable.enrfilter", + Usage: "Enable ENR filter when adding nodes to the DHT", + }, + &cli.BoolFlag{ + Name: "only.outbound", + Usage: "Only allow outbound connections", + }, + utils.NoDiscoverFlag, + utils.DHTBucketSizeFlag, + utils.BootnodesFlag, + utils.MaxPeersFlag, + }, + }, + { + Name: "create-multi", + Usage: "create a node", + Action: createMultiNode, + Flags: []cli.Flag{ + &cli.StringFlag{ + Name: "name", + Value: "", + Usage: "node name", + }, + &cli.IntFlag{ + Name: "count", + Value: 1, + Usage: "number of nodes to create", + }, + &cli.StringFlag{ + Name: "services", + Value: "", + Usage: "node services (comma separated)", + }, + &cli.BoolFlag{ + Name: "sim.dialer", + Usage: "Use the simulation dialer", + }, + &cli.BoolFlag{ + Name: "fake.iplistener", + Usage: "Use the fake listener to random remote ip when accepting connections", + }, + &cli.BoolFlag{ + Name: "start", + Usage: "start the node after creating successfully", + }, + &cli.BoolFlag{ + Name: "autofill.bootnodes", + Usage: "autofill bootnodes with existing bootnodes from manager", + }, + &cli.StringFlag{ + Name: "node.type", + Value: "default", + Usage: "Set node type (default, outbound, dirty, bootnode)", + }, + &cli.BoolFlag{ + Name: "enable.enrfilter", + Usage: "Enable ENR filter when adding nodes to the DHT", + }, + &cli.DurationFlag{ + Name: "interval", + Usage: "create interval", + }, + &cli.IntFlag{ + Name: "dirty.rate", + Usage: "Rate of dirty nodes", + }, + &cli.IntFlag{ + Name: "only.outbound.rate", + Usage: "Rate of nodes that only allow outbound connections", + }, + &cli.BoolFlag{ + Name: "only.outbound", + Usage: "Only allow outbound connections", + }, + utils.NoDiscoverFlag, + utils.DHTBucketSizeFlag, + utils.BootnodesFlag, + utils.MaxPeersFlag, }, }, { @@ -176,6 +307,29 @@ func main() { }, }, }, + { + Name: "peer-stats", + Usage: "show peer stats", + ArgsUsage: "", + Action: getNodePeerStats, + }, + }, + }, + { + Name: "log-stats", + Usage: "log peer stats to a CSV file", + Action: startLogStats, + Flags: []cli.Flag{ + &cli.StringFlag{ + Name: "file", + Usage: "output file", + Value: "stats.csv", + }, + &cli.DurationFlag{ + Name: "interval", + Usage: "log interval", + Value: 15 * time.Second, + }, }, }, } @@ -287,6 +441,31 @@ func createNode(ctx *cli.Context) error { config.ID = enode.PubkeyToIDV4(&privKey.PublicKey) config.PrivateKey = privKey } + if ctx.Bool(utils.NoDiscoverFlag.Name) { + config.NoDiscovery = true + } + if ctx.Bool("sim.dialer") { + config.UseTCPDialer = false + } else { + config.UseTCPDialer = true + } + if ctx.Bool("fake.iplistener") { + config.UseFakeIPListener = true + } + config.BootstrapNodeURLs = ctx.String(utils.BootnodesFlag.Name) + if ctx.Bool("autofill.bootnodes") { + bootnodeURLs, err := getBootnodes() + if err != nil { + return err + } + if bootnodeURLs != "" { + config.BootstrapNodeURLs += "," + bootnodeURLs + } + } + config.MaxPeers = ctx.Int(utils.MaxPeersFlag.Name) + config.DHTBucketSize = ctx.Int(utils.DHTBucketSizeFlag.Name) + config.DisableTCPListener = ctx.Bool("only.outbound") + config.EnableENRFilter = ctx.Bool("enable.enrfilter") if services := ctx.String("services"); services != "" { config.Lifecycles = strings.Split(services, ",") } @@ -295,6 +474,112 @@ func createNode(ctx *cli.Context) error { return err } fmt.Fprintln(ctx.App.Writer, "Created", node.Name) + + // Start node if needed + if ctx.Bool("start") { + if err := client.StartNode(node.Name); err != nil { + return err + } + fmt.Fprintln(ctx.App.Writer, "Started", node.Name) + } + + return nil +} + +func getBootnodes() (string, error) { + nodes, err := client.GetNodes() + if err != nil { + return "", err + } + + bootnodes := make([]string, 0) + for _, node := range nodes { + if strings.HasPrefix(node.Name, "bootnode") { + bootnodes = append(bootnodes, node.Enode) + } + } + + return strings.Join(bootnodes, ","), nil +} + +func createMultiNode(ctx *cli.Context) error { + if ctx.Args().Len() != 0 { + return cli.ShowCommandHelp(ctx, ctx.Command.Name) + } + + t := time.Now() + + createInterval := ctx.Duration("interval") + bootNodeURLs := ctx.String(utils.BootnodesFlag.Name) + if ctx.Bool("autofill.bootnodes") { + existedBootnodeURLs, err := getBootnodes() + if err != nil { + return err + } + if existedBootnodeURLs != "" { + bootNodeURLs += "," + existedBootnodeURLs + } + } + + // Create nodes + count := ctx.Int("count") + outboundRate := ctx.Int("only.outbound.rate") + dirtyRate := ctx.Int("dirty.rate") + per := make([]int, 0) + for i := 0; i < count; i++ { + if i < outboundRate*count/100 { + per = append(per, 1) + } else if i < (outboundRate+dirtyRate)*count/100 { + per = append(per, 2) + } else { + per = append(per, 0) + } + } + rand.Shuffle(len(per), func(i, j int) { per[i], per[j] = per[j], per[i] }) + + isBootnode := ctx.String("node.type") == "bootnode" + + for i := 0; i < count; i++ { + var nodeName string + if isBootnode { + nodeName = fmt.Sprintf("bootnode-%d-%d", t.Unix(), i) + ctx.Set(utils.BootnodesFlag.Name, "") + } else { + nodeType := per[i%len(per)] + switch nodeType { + case 1: + ctx.Set("only.outbound", "true") + ctx.Set("node.type", "outbound") + ctx.Set("services", "valid") + nodeName = fmt.Sprintf("outbound-%d-%d", t.Unix(), i) + case 2: + ctx.Set("only.outbound", "false") + ctx.Set("node.type", "dirty") + ctx.Set("services", "invalid") + nodeName = fmt.Sprintf("dirty-%d-%d", t.Unix(), i) + default: + ctx.Set("only.outbound", "false") + ctx.Set("node.type", "default") + ctx.Set("services", "valid") + nodeName = fmt.Sprintf("node-%d-%d", t.Unix(), i) + } + } + ctx.Set("name", nodeName) + for { + if err := createNode(ctx); err != nil { + fmt.Fprintln(ctx.App.Writer, "Failed to create node", nodeName, err) + // Try to create the node again + client.DeleteNode(nodeName) + time.Sleep(500 * time.Millisecond) + } else { + break + } + } + if createInterval > 0 { + time.Sleep(createInterval) + } + } + return nil } @@ -429,3 +714,151 @@ func rpcSubscribe(client *rpc.Client, out io.Writer, method string, args ...stri } } } + +func startNetwork(ctx *cli.Context) error { + if ctx.Args().Len() != 0 { + return cli.ShowCommandHelp(ctx, ctx.Command.Name) + } + if err := client.StartNetwork(); err != nil { + return err + } + fmt.Fprintln(ctx.App.Writer, "Started network") + return nil +} + +func getNodePeerStats(ctx *cli.Context) error { + if ctx.Args().Len() != 1 { + return cli.ShowCommandHelp(ctx, ctx.Command.Name) + } + nodeName := ctx.Args().Get(0) + stats, err := client.GetNodePeerStats(nodeName) + if err != nil { + return err + } + fmt.Fprintln(ctx.App.Writer, "Peer stats of", ctx.String("node")) + fmt.Fprintln(ctx.App.Writer, "Peer count: ", stats.PeerCount) + fmt.Fprintln(ctx.App.Writer, "Tried: ", stats.Tried) + fmt.Fprintln(ctx.App.Writer, "Failed: ", stats.Failed) + fmt.Fprintln(ctx.App.Writer, "Nodes count: ", stats.DifferentNodesDiscovered) + fmt.Fprintln(ctx.App.Writer, "DHT: ", stats.DHTBuckets) + return nil +} + +func getNetworkPeerStats(ctx *cli.Context) error { + if ctx.Args().Len() != 0 { + return cli.ShowCommandHelp(ctx, ctx.Command.Name) + } + stats, err := client.GetAllNodePeerStats() + if err != nil { + return err + } + for nodeID, stats := range stats { + fmt.Fprintln(ctx.App.Writer, "Peer stats of", nodeID) + fmt.Fprintln(ctx.App.Writer, "Peer count: ", stats.PeerCount) + fmt.Fprintln(ctx.App.Writer, "Tried: ", stats.Tried) + fmt.Fprintln(ctx.App.Writer, "Failed: ", stats.Failed) + fmt.Fprintln(ctx.App.Writer, "Nodes count: ", stats.DifferentNodesDiscovered) + fmt.Fprintln(ctx.App.Writer, "DHT: ", stats.DHTBuckets) + } + return nil +} + +func getAllDHT(ctx *cli.Context) error { + if ctx.Args().Len() != 0 { + return cli.ShowCommandHelp(ctx, ctx.Command.Name) + } + + nodes, err := client.GetNodes() + if err != nil { + return err + } + nodeID2Name := make(map[string]string) + for _, node := range nodes { + nodeID2Name[node.ID] = node.Name + } + + dht, err := client.GetAllNodeDHT() + if err != nil { + return err + } + for nodeName, buckets := range dht { + fmt.Fprintf(ctx.App.Writer, "%s: ", nodeName) + for _, bucket := range buckets { + fmt.Fprintf(ctx.App.Writer, "[") + for _, node := range bucket { + fmt.Fprintf(ctx.App.Writer, "%s ", nodeID2Name[node.ID().String()]) + } + fmt.Fprintf(ctx.App.Writer, "],") + } + fmt.Fprintf(ctx.App.Writer, "\n") + } + return nil +} + +func getAllNodePeersInfo(ctx *cli.Context) error { + if ctx.Args().Len() != 0 { + return cli.ShowCommandHelp(ctx, ctx.Command.Name) + } + + nodes, err := client.GetNodes() + if err != nil { + return err + } + nodeID2Name := make(map[string]string) + for _, node := range nodes { + nodeID2Name[node.ID] = node.Name + } + + peers, err := client.GetAllNodePeersInfo() + if err != nil { + return err + } + for nodeName, peerInfos := range peers { + fmt.Fprintf(ctx.App.Writer, "%s: ", nodeName) + for _, peerInfo := range peerInfos { + fmt.Fprintf(ctx.App.Writer, "(%s %v), ", nodeID2Name[peerInfo.ID], peerInfo.Network.Inbound) + } + fmt.Fprintf(ctx.App.Writer, "\n") + } + return nil +} + +func startLogStats(ctx *cli.Context) error { + if ctx.Args().Len() != 0 { + return cli.ShowCommandHelp(ctx, ctx.Command.Name) + } + csvFile := ctx.String("file") + f, err := os.OpenFile(csvFile, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0644) + if err != nil { + return err + } + defer f.Close() + + sig := make(chan os.Signal, 1) + signal.Notify(sig, syscall.SIGINT, syscall.SIGTERM) + timer := time.NewTicker(ctx.Duration("interval")) + + f.WriteString("node,timestamp,type,value\n") + +loop: + for { + select { + case <-sig: + return nil + case <-timer.C: + stats, err := client.GetAllNodePeerStats() + if err != nil { + fmt.Fprintln(ctx.App.Writer, err) + goto loop + } + for nodeID, stats := range stats { + t := time.Now() + f.WriteString(fmt.Sprintf("%s,%d,%s,%d\n", nodeID, t.Unix(), "PeerCount", stats.PeerCount)) + f.WriteString(fmt.Sprintf("%s,%d,%s,%d\n", nodeID, t.Unix(), "Tried", stats.Tried)) + f.WriteString(fmt.Sprintf("%s,%d,%s,%d\n", nodeID, t.Unix(), "Failed", stats.Failed)) + f.WriteString(fmt.Sprintf("%s,%d,%s,%d\n", nodeID, t.Unix(), "DifferentNodesDiscovered", stats.DifferentNodesDiscovered)) + f.WriteString(fmt.Sprintf("%s,%d,%s,%d\n", nodeID, t.Unix(), "DHTBuckets", stats.DHTBuckets)) + } + } + } +} diff --git a/p2p/discover/v4_udp.go b/p2p/discover/v4_udp.go index bf1e9080a4..8195629ccf 100644 --- a/p2p/discover/v4_udp.go +++ b/p2p/discover/v4_udp.go @@ -155,6 +155,21 @@ func ListenV4(c UDPConn, ln *enode.LocalNode, cfg Config) (*UDPv4, error) { return t, nil } +// NodesInDHT returns all nodes in the DHT. +func (t *UDPv4) NodesInDHT() [][]enode.Node { + if t == nil || t.tab == nil { + return nil + } + nodes := make([][]enode.Node, len(t.tab.buckets)) + for i, bucket := range t.tab.buckets { + nodes[i] = make([]enode.Node, len(bucket.entries)) + for j, entry := range bucket.entries { + nodes[i][j] = entry.Node + } + } + return nodes +} + // Self returns the local node. func (t *UDPv4) Self() *enode.Node { return t.localNode.Node() diff --git a/p2p/server.go b/p2p/server.go index 2577a32218..13f5818357 100644 --- a/p2p/server.go +++ b/p2p/server.go @@ -308,6 +308,16 @@ func (c *conn) set(f connFlag, val bool) { } } +// SetListenFunc sets the function used to accept inbound connections (testing only) +func (srv *Server) SetListenFunc(f func(network, addr string) (net.Listener, error)) { + srv.listenFunc = f +} + +// UDPv4 returns the UDPv4 discovery table. +func (srv *Server) UDPv4() *discover.UDPv4 { + return srv.ntab +} + // LocalNode returns the local node record. func (srv *Server) LocalNode() *enode.LocalNode { return srv.localnode diff --git a/p2p/simulations/adapters/exec.go b/p2p/simulations/adapters/exec.go index 35ccdfb068..e4d80cb2ae 100644 --- a/p2p/simulations/adapters/exec.go +++ b/p2p/simulations/adapters/exec.go @@ -364,6 +364,21 @@ func (n *ExecNode) Snapshots() (map[string][]byte, error) { return snapshots, n.client.Call(&snapshots, "simulation_snapshot") } +// Empty PeerStats +func (n *ExecNode) PeerStats() *PeerStats { + return &PeerStats{} +} + +// Empty DHT +func (n *ExecNode) NodesInDHT() [][]enode.Node { + return nil +} + +// Empty PeersInfo +func (n *ExecNode) PeersInfo() []*p2p.PeerInfo { + return nil +} + // execNodeConfig is used to serialize the node configuration so it can be // passed to the child process as a JSON encoded environment variable type execNodeConfig struct { diff --git a/p2p/simulations/adapters/inproc.go b/p2p/simulations/adapters/inproc.go index 1cb26a8ea0..b86b2b1d4f 100644 --- a/p2p/simulations/adapters/inproc.go +++ b/p2p/simulations/adapters/inproc.go @@ -20,16 +20,21 @@ import ( "context" "errors" "fmt" - "math" + "math/rand" "net" + "strings" "sync" + "time" + "github.com/ethereum/go-ethereum/core/forkid" "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/node" "github.com/ethereum/go-ethereum/p2p" "github.com/ethereum/go-ethereum/p2p/enode" + "github.com/ethereum/go-ethereum/p2p/enr" "github.com/ethereum/go-ethereum/p2p/simulations/pipes" + "github.com/ethereum/go-ethereum/rlp" "github.com/ethereum/go-ethereum/rpc" "github.com/gorilla/websocket" ) @@ -91,14 +96,34 @@ func (s *SimAdapter) NewNode(config *NodeConfig) (Node, error) { return nil, err } + p2pCfg := p2p.Config{ + PrivateKey: config.PrivateKey, + MaxPeers: config.MaxPeers, + NoDiscovery: config.NoDiscovery, + EnableMsgEvents: config.EnableMsgEvents, + DHTBucketSize: config.DHTBucketSize, + } + if !config.DisableTCPListener { + p2pCfg.ListenAddr = fmt.Sprintf(":%d", config.Port) + } else { + p2pCfg.ListenAddr = "" + } + if len(config.BootstrapNodeURLs) > 0 { + for _, url := range strings.Split(config.BootstrapNodeURLs, ",") { + if len(url) == 0 { + continue + } + n, err := enode.Parse(enode.ValidSchemes, url) + if err != nil { + log.Warn("invalid bootstrap node URL", "url", url, "err", err) + continue + } + p2pCfg.BootstrapNodes = append(p2pCfg.BootstrapNodes, n) + } + } + n, err := node.New(&node.Config{ - P2P: p2p.Config{ - PrivateKey: config.PrivateKey, - MaxPeers: math.MaxInt32, - NoDiscovery: true, - Dialer: s, - EnableMsgEvents: config.EnableMsgEvents, - }, + P2P: p2pCfg, ExternalSigner: config.ExternalSigner, Logger: log.New("node.id", id.String()), }) @@ -106,6 +131,10 @@ func (s *SimAdapter) NewNode(config *NodeConfig) (Node, error) { return nil, err } + if config.UseFakeIPListener { + n.Server().SetListenFunc(listenFakeAddrFunc) + } + simNode := &SimNode{ ID: id, config: config, @@ -113,6 +142,34 @@ func (s *SimAdapter) NewNode(config *NodeConfig) (Node, error) { adapter: s, running: make(map[string]node.Lifecycle), } + if !config.UseTCPDialer { + n.Server().Dialer = s + } else { + simNode.dialer = &wrapTCPDialerStats{ + d: &net.Dialer{Timeout: 15 * time.Second}, + resultCh: make(chan resultDial, 10000), + } + n.Server().Dialer = simNode.dialer + } + + if config.EnableENRFilter { + n.Server().SetFilter(func(id forkid.ID) error { + var eth struct { + ForkID forkid.ID + Rest []rlp.RawValue `rlp:"tail"` + } + if err := n.Server().Self().Record().Load(enr.WithEntry("eth", ð)); err != nil { + log.Warn("failed to load eth entry", "err", err) + return err + } + + if id == eth.ForkID { + return nil + } + return forkid.ErrLocalIncompatibleOrStale + }) + } + s.nodes[id] = simNode return simNode, nil } @@ -162,6 +219,8 @@ func (s *SimAdapter) GetNode(id enode.ID) (*SimNode, bool) { // net.Pipe (see SimAdapter.Dial), running devp2p protocols directly over that // pipe type SimNode struct { + ctx context.Context + cancel context.CancelFunc lock sync.RWMutex ID enode.ID config *NodeConfig @@ -170,6 +229,11 @@ type SimNode struct { running map[string]node.Lifecycle client *rpc.Client registerOnce sync.Once + dialer *wrapTCPDialerStats + + // Track different nodes discovered by the node + discoveredNodes sync.Map + differentNodeCount int } // Close closes the underlaying node.Node to release @@ -240,6 +304,15 @@ func (sn *SimNode) Snapshots() (map[string][]byte, error) { // Start registers the services and starts the underlying devp2p node func (sn *SimNode) Start(snapshots map[string][]byte) error { + sn.lock.Lock() + if sn.cancel != nil { + sn.lock.Unlock() + return errors.New("node already started") + } + + sn.ctx, sn.cancel = context.WithCancel(context.Background()) + sn.lock.Unlock() + // ensure we only register the services once in the case of the node // being stopped and then started again var regErr error @@ -282,6 +355,8 @@ func (sn *SimNode) Start(snapshots map[string][]byte) error { sn.client = client sn.lock.Unlock() + go sn.trackDiscoveredNode() + return nil } @@ -292,6 +367,10 @@ func (sn *SimNode) Stop() error { sn.client.Close() sn.client = nil } + if sn.cancel != nil { + sn.cancel() + sn.cancel = nil + } sn.lock.Unlock() return sn.node.Close() } @@ -351,3 +430,120 @@ func (sn *SimNode) NodeInfo() *p2p.NodeInfo { } return server.NodeInfo() } + +// PeerStats returns statistics about the node's peers +func (sn *SimNode) PeerStats() *PeerStats { + if sn.dialer == nil || sn.node.Server() == nil || sn.node.Server().UDPv4() == nil { + return &PeerStats{} + } + + nodesCount := 0 + sn.discoveredNodes.Range(func(_, _ interface{}) bool { + nodesCount++ + return true + }) + buckets := sn.node.Server().UDPv4().NodesInDHT() + bucketSizes := make([]int, len(buckets)) + for i, bucket := range buckets { + bucketSizes[i] = len(bucket) + } + return &PeerStats{ + PeerCount: sn.node.Server().PeerCount(), + Failed: sn.dialer.failed, + Tried: sn.dialer.tried, + DifferentNodesDiscovered: nodesCount, + DHTBuckets: bucketSizes, + } +} + +// NodesInDHT returns the nodes in the DHT buckets +func (sn *SimNode) NodesInDHT() [][]enode.Node { + if sn.node.Server() == nil || sn.node.Server().UDPv4() == nil { + return nil + } + return sn.node.Server().UDPv4().NodesInDHT() +} + +// PeersInfo returns information about the node's peers +func (sn *SimNode) PeersInfo() []*p2p.PeerInfo { + if sn.node.Server() == nil { + return nil + } + return sn.node.Server().PeersInfo() +} + +// trackDiscoveredNodes tracks all nodes discovered by the node and dial by wrapTCPDialerStats +func (sn *SimNode) trackDiscoveredNode() { + if sn.dialer == nil { + return + } + + for { + select { + case <-sn.ctx.Done(): + return + case r := <-sn.dialer.resultCh: + if _, ok := sn.discoveredNodes.LoadOrStore(r.node, struct{}{}); !ok { + sn.differentNodeCount++ + } + if r.err != nil { + log.Info("dial failed", "node", r.node, "err", r.err) + sn.dialer.failed++ + } + log.Info("dial tried", "from", sn.ID, "to", r.node) + sn.dialer.tried++ + } + } +} + +func listenFakeAddrFunc(network, laddr string) (net.Listener, error) { + l, err := net.Listen(network, laddr) + if err != nil { + return nil, err + } + fakeAddr := &net.TCPAddr{IP: net.IP{byte(rand.Intn(255)), byte(rand.Intn(255)), byte(rand.Intn(255)), byte(rand.Intn(255))}, Port: rand.Intn(65535)} + return &fakeAddrListener{l, fakeAddr}, nil +} + +// fakeAddrListener is a listener that creates connections with a mocked remote address. +type fakeAddrListener struct { + net.Listener + remoteAddr net.Addr +} + +type fakeAddrConn struct { + net.Conn + remoteAddr net.Addr +} + +func (l *fakeAddrListener) Accept() (net.Conn, error) { + c, err := l.Listener.Accept() + if err != nil { + return nil, err + } + return &fakeAddrConn{c, l.remoteAddr}, nil +} + +func (c *fakeAddrConn) RemoteAddr() net.Addr { + return c.remoteAddr +} + +// wrapTCPDialerStats is a wrapper around the net.Dialer which tracks nodes that have been tried to dial +type wrapTCPDialerStats struct { + d *net.Dialer + failed int + tried int + resultCh chan resultDial +} + +type resultDial struct { + err error + node enode.ID +} + +func (d wrapTCPDialerStats) Dial(ctx context.Context, dest *enode.Node) (net.Conn, error) { + nodeAddr := &net.TCPAddr{IP: dest.IP(), Port: dest.TCP()} + conn, err := d.d.DialContext(ctx, "tcp", nodeAddr.String()) + d.resultCh <- resultDial{err, dest.ID()} + return conn, err +} diff --git a/p2p/simulations/adapters/types.go b/p2p/simulations/adapters/types.go index aeb8ef7772..68088c2f8b 100644 --- a/p2p/simulations/adapters/types.go +++ b/p2p/simulations/adapters/types.go @@ -42,7 +42,6 @@ import ( // * SimNode - An in-memory node // * ExecNode - A child process node // * DockerNode - A Docker container node -// type Node interface { // Addr returns the node's address (e.g. an Enode URL) Addr() []byte @@ -65,6 +64,15 @@ type Node interface { // Snapshots creates snapshots of the running services Snapshots() (map[string][]byte, error) + + // PeerStats returns the node's peer statistics + PeerStats() *PeerStats + + // NodesInDHT returns all nodes in the DHT + NodesInDHT() [][]enode.Node + + // PeersInfo returns information about the node's peers + PeersInfo() []*p2p.PeerInfo } // NodeAdapter is used to create Nodes in a simulation network @@ -119,7 +127,8 @@ type NodeConfig struct { // function to sanction or prevent suggesting a peer Reachable func(id enode.ID) bool - Port uint16 + Port uint16 + DisableTCPListener bool // LogFile is the log file name of the p2p node at runtime. // @@ -131,34 +140,71 @@ type NodeConfig struct { // // The default verbosity is INFO. LogVerbosity log.Lvl + + // NoDiscovery disables the peer discovery mechanism (manual peer addition) + NoDiscovery bool + + // Use default TCP dialer + UseTCPDialer bool + + // UseFakeIPListener is used to fake the remote IP address when accepting incoming connections + UseFakeIPListener bool + + // DHTBucketSize is the bucket size of DHT + DHTBucketSize int + + // BootstrapNodes is the list of bootstrap nodes + BootstrapNodeURLs string + + // MaxPeers is the maximum number of peers + MaxPeers int + + // EnableENRFilter enables the ENR filter when adding node into the DHT + EnableENRFilter bool } // nodeConfigJSON is used to encode and decode NodeConfig as JSON by encoding // all fields as strings type nodeConfigJSON struct { - ID string `json:"id"` - PrivateKey string `json:"private_key"` - Name string `json:"name"` - Lifecycles []string `json:"lifecycles"` - Properties []string `json:"properties"` - EnableMsgEvents bool `json:"enable_msg_events"` - Port uint16 `json:"port"` - LogFile string `json:"logfile"` - LogVerbosity int `json:"log_verbosity"` + ID string `json:"id"` + PrivateKey string `json:"private_key"` + Name string `json:"name"` + Lifecycles []string `json:"lifecycles"` + Properties []string `json:"properties"` + EnableMsgEvents bool `json:"enable_msg_events"` + Port uint16 `json:"port"` + DisableTCPListener bool `json:"disable_tcp_listener"` + LogFile string `json:"logfile"` + LogVerbosity int `json:"log_verbosity"` + NoDiscovery bool `json:"no_discovery"` + UseTCPDialer bool `json:"use_tcp_dialer"` + UseFakeIPListener bool `json:"use_fake_ip_listener"` + DHTBucketSize int `json:"dht_bucket_size"` + BootstrapNodeURLs string `json:"bootstrap_node_urls"` + MaxPeers int `json:"max_peers"` + EnableENRFilter bool `json:"enable_enr_filter"` } // MarshalJSON implements the json.Marshaler interface by encoding the config // fields as strings func (n *NodeConfig) MarshalJSON() ([]byte, error) { confJSON := nodeConfigJSON{ - ID: n.ID.String(), - Name: n.Name, - Lifecycles: n.Lifecycles, - Properties: n.Properties, - Port: n.Port, - EnableMsgEvents: n.EnableMsgEvents, - LogFile: n.LogFile, - LogVerbosity: int(n.LogVerbosity), + ID: n.ID.String(), + Name: n.Name, + Lifecycles: n.Lifecycles, + Properties: n.Properties, + Port: n.Port, + DisableTCPListener: n.DisableTCPListener, + EnableMsgEvents: n.EnableMsgEvents, + LogFile: n.LogFile, + LogVerbosity: int(n.LogVerbosity), + NoDiscovery: n.NoDiscovery, + UseTCPDialer: n.UseTCPDialer, + UseFakeIPListener: n.UseFakeIPListener, + DHTBucketSize: n.DHTBucketSize, + BootstrapNodeURLs: n.BootstrapNodeURLs, + MaxPeers: n.MaxPeers, + EnableENRFilter: n.EnableENRFilter, } if n.PrivateKey != nil { confJSON.PrivateKey = hex.EncodeToString(crypto.FromECDSA(n.PrivateKey)) @@ -196,9 +242,17 @@ func (n *NodeConfig) UnmarshalJSON(data []byte) error { n.Lifecycles = confJSON.Lifecycles n.Properties = confJSON.Properties n.Port = confJSON.Port + n.DisableTCPListener = confJSON.DisableTCPListener n.EnableMsgEvents = confJSON.EnableMsgEvents n.LogFile = confJSON.LogFile n.LogVerbosity = log.Lvl(confJSON.LogVerbosity) + n.NoDiscovery = confJSON.NoDiscovery + n.UseTCPDialer = confJSON.UseTCPDialer + n.UseFakeIPListener = confJSON.UseFakeIPListener + n.DHTBucketSize = confJSON.DHTBucketSize + n.BootstrapNodeURLs = confJSON.BootstrapNodeURLs + n.MaxPeers = confJSON.MaxPeers + n.EnableENRFilter = confJSON.EnableENRFilter return nil } @@ -223,12 +277,15 @@ func RandomNodeConfig() *NodeConfig { enodId := enode.PubkeyToIDV4(&prvkey.PublicKey) return &NodeConfig{ - PrivateKey: prvkey, - ID: enodId, - Name: fmt.Sprintf("node_%s", enodId.String()), - Port: port, - EnableMsgEvents: true, - LogVerbosity: log.LvlInfo, + PrivateKey: prvkey, + ID: enodId, + Name: fmt.Sprintf("node_%s", enodId.String()), + Port: port, + EnableMsgEvents: true, + LogVerbosity: log.LvlInfo, + DisableTCPListener: true, + UseTCPDialer: false, + MaxPeers: node.DefaultConfig.P2P.MaxPeers, } } @@ -324,3 +381,12 @@ func (n *NodeConfig) initEnode(ip net.IP, tcpport int, udpport int) error { func (n *NodeConfig) initDummyEnode() error { return n.initEnode(net.IPv4(127, 0, 0, 1), int(n.Port), 0) } + +// PeerStats is a struct that holds the statistics of a node's peers +type PeerStats struct { + PeerCount int `json:"peer_count"` + Tried int `json:"tried"` + DifferentNodesDiscovered int `json:"different_nodes_discovered"` + Failed int `json:"failed"` + DHTBuckets []int `json:"dht_buckets"` +} diff --git a/p2p/simulations/discovery/discovery.go b/p2p/simulations/discovery/discovery.go new file mode 100644 index 0000000000..c20d8b5262 --- /dev/null +++ b/p2p/simulations/discovery/discovery.go @@ -0,0 +1,133 @@ +package main + +import ( + "context" + "flag" + "fmt" + "net/http" + "os" + + "github.com/ethereum/go-ethereum/core/forkid" + "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/node" + "github.com/ethereum/go-ethereum/p2p" + "github.com/ethereum/go-ethereum/p2p/enr" + "github.com/ethereum/go-ethereum/p2p/simulations" + "github.com/ethereum/go-ethereum/p2p/simulations/adapters" + "github.com/ethereum/go-ethereum/rlp" +) + +var ( + verbosity = flag.Int("verbosity", 3, "logging verbosity") + port = flag.Int("port", 8888, "port to listen on") +) + +var ( + validETHEntry = mockETHEntry{ForkID: forkid.ID{Hash: [4]byte{1, 2, 3, 4}}} + invalidETHEntry = mockETHEntry{ForkID: forkid.ID{Hash: [4]byte{5, 6, 7, 8}}} +) + +// main() starts a simulation network which contains nodes to benchmark the discovery process +func main() { + flag.Parse() + + // set the log level + log.Root().SetHandler(log.LvlFilterHandler(log.Lvl(*verbosity), log.StreamHandler(os.Stdout, log.TerminalFormat(false)))) + + // available services + // - valid: supports the valid fork ID, used for the valid node in the network + // - invalid: supports the invalid fork ID, used for the dirty node in the network + services := map[string]adapters.LifecycleConstructor{ + "valid": func(ctx *adapters.ServiceContext, stack *node.Node) (node.Lifecycle, error) { + s := newMockService("valid") + s.SetAttributes([]enr.Entry{validETHEntry}) + stack.RegisterProtocols(s.Protocols()) + return s, nil + }, + "invalid": func(ctx *adapters.ServiceContext, stack *node.Node) (node.Lifecycle, error) { + s := newMockService("invalid") + s.SetAttributes([]enr.Entry{invalidETHEntry}) + stack.RegisterProtocols(s.Protocols()) + return s, nil + }, + } + adapters.RegisterLifecycles(services) + + // use simulation adapter (inproc) + adapter := adapters.NewSimAdapter(services) + + // start the simulation server + log.Info("starting simulation server", "port", *port) + network := simulations.NewNetwork(adapter, &simulations.NetworkConfig{ + DefaultService: "valid", + }) + if err := http.ListenAndServe(fmt.Sprintf(":%d", *port), simulations.NewServer(network)); err != nil { + log.Crit("error starting simulation server", "err", err) + } +} + +// mockService is a simple protocol to verify the compatibility of the fork ID +// between nodes in the simulation network +type mockService struct { + name string + attrs []enr.Entry + ctx context.Context + cancel context.CancelFunc +} + +func newMockService(name string) *mockService { + s := &mockService{ + name: name, + } + s.ctx, s.cancel = context.WithCancel(context.Background()) + return s +} + +func (s *mockService) SetAttributes(attrs []enr.Entry) { + s.attrs = attrs +} + +func (s *mockService) Protocols() []p2p.Protocol { + return []p2p.Protocol{{ + Name: s.name, + Version: 1, + Length: 1, + Run: s.Run, + NodeInfo: s.Info, + Attributes: s.attrs, + }} +} + +func (s *mockService) Start() error { + return nil +} + +func (s *mockService) Stop() error { + s.cancel() + return nil +} + +func (s *mockService) Info() interface{} { + return nil +} + +func (s *mockService) Run(peer *p2p.Peer, rw p2p.MsgReadWriter) error { + if !peer.RunningCap(s.name, []uint{1}) { + log.Error("peer does not support protocol", "peer", peer.ID()) + return fmt.Errorf("peer does not support protocol %s", s.name) + } + + <-s.ctx.Done() + return nil +} + +// mockETHEntry is a mock Ethereum entry for the ENR +// mockService uses this entry to verify the compatibility of the fork ID +type mockETHEntry struct { + ForkID forkid.ID + Rest []rlp.RawValue `rlp:"tail"` +} + +func (e mockETHEntry) ENRKey() string { + return "eth" +} diff --git a/p2p/simulations/discovery/discovery.sh b/p2p/simulations/discovery/discovery.sh new file mode 100755 index 0000000000..d8627c64ca --- /dev/null +++ b/p2p/simulations/discovery/discovery.sh @@ -0,0 +1,71 @@ +#!/bin/bash +# +# Boot a simulation network and start some benchmarking tests. +# For each test, export the logs, peers info, and DHT info to files for visualization. +# Test strategy: +# +# We have some types of nodes: +# 1. Dirty nodes: Nodes that are not compatible with the valid nodes +# 2. Valid nodes +# 3. Valid nodes that only accept outbound connections +# +# We will test the performance of the DHT with different configurations: +# 1. DHT with bucket size 16 and no filter +# 2. DHT with bucket size 16 and filter +# 3. DHT with bucket size 256 and no filter +# 4. DHT with bucket size 256 and filter +# +# For each configuration, the following steps will be executed: +# 1. Start the simulation server, 2 bootnodes and rolling out nodes in batch 1 and sleep for a while +# 2. Rolling out nodes in batch 2 and sleep for a while +# 3. Rolling out nodes in batch 3 and sleep for a while +# 4. Export the DHT and peers info + +main_cmd="go run ." +p2psim_cmd="go run ../../../cmd/p2psim" + +# Number of nodes to start for each batch +distribution=(150 100 100) + +# Rate of dirty node that not compatible with the valid node +dirty_rate=60 + +# Rate of valid node but only accept outbound connection +only_outbound_rate=20 + +benchmark() { + local bucket_size=$1 + local sleep_time=$2 + local other=$3 + local test_name=$4 + + pids=() + echo "Start server $test_name..." + $main_cmd > tmp_$test_name.log 2> tmp_$test_name.err & + echo "Start stats $test_name..." + $p2psim_cmd log-stats --file ./stats_$test_name.csv & + + echo "Start bootnodes $test_name..." + $p2psim_cmd node create-multi --count 2 --fake.iplistener --start --dht.bucketsize $bucket_size --node.type bootnode $other + + for num_node in ${distribution[@]}; do + echo "Start $num_node nodes..." + $p2psim_cmd node create-multi --count $num_node --fake.iplistener --start --dht.bucketsize 16 --autofill.bootnodes --interval 1s --dirty.rate 60 --only.outbound.rate 20 $other + echo "Sleep $sleep_time..." + sleep $sleep_time + done + + $p2psim_cmd network dht > dht_$test_name.log + $p2psim_cmd network peers > peers_$test_name.log + + echo "Kill server and stats $test_name..." + kill -9 $(lsof -t -i:8888) + ps aux | grep "p2psim" | grep -v "grep" | awk '{print $2}' | xargs kill -9 + + sleep 10 +} + +benchmark 16 1200 "" "16_disable_filter" +benchmark 16 1200 "--enable.enrfilter" "16_enable_filter" +benchmark 256 1200 "" "256_disable_filter" +benchmark 256 1200 "--enable.enrfilter" "256_enable_filter" diff --git a/p2p/simulations/http.go b/p2p/simulations/http.go index 27ed5b75d2..e172f3f0b3 100644 --- a/p2p/simulations/http.go +++ b/p2p/simulations/http.go @@ -179,6 +179,11 @@ func (c *Client) CreateNode(config *adapters.NodeConfig) (*p2p.NodeInfo, error) return node, c.Post("/nodes", config, node) } +// DeleteNode deletes a node from the network +func (c *Client) DeleteNode(nodeID string) error { + return c.Delete(fmt.Sprintf("/nodes/%s", nodeID)) +} + // GetNode returns details of a node func (c *Client) GetNode(nodeID string) (*p2p.NodeInfo, error) { node := &p2p.NodeInfo{} @@ -211,6 +216,30 @@ func (c *Client) RPCClient(ctx context.Context, nodeID string) (*rpc.Client, err return rpc.DialWebsocket(ctx, fmt.Sprintf("%s/nodes/%s/rpc", baseURL, nodeID), "") } +// GetNodePeerStats returns the peer stats of a node +func (c *Client) GetNodePeerStats(nodeID string) (*adapters.PeerStats, error) { + stats := &adapters.PeerStats{} + return stats, c.Get(fmt.Sprintf("/peerstats/%s", nodeID), stats) +} + +// GetAllNodePeerStats returns the peer stats of all nodes +func (c *Client) GetAllNodePeerStats() (map[string]*adapters.PeerStats, error) { + stats := make(map[string]*adapters.PeerStats) + return stats, c.Get("/peerstats", &stats) +} + +// GetAllNodeDHT returns the DHT of all nodes +func (c *Client) GetAllNodeDHT() (map[string][][]enode.Node, error) { + nodesDHT := make(map[string][][]enode.Node) + return nodesDHT, c.Get("/dht", &nodesDHT) +} + +// GetAllNodePeersInfo returns the peers info of all nodes +func (c *Client) GetAllNodePeersInfo() (map[string][]*p2p.PeerInfo, error) { + peersInfo := make(map[string][]*p2p.PeerInfo) + return peersInfo, c.Get("/peers", &peersInfo) +} + // Get performs a HTTP GET request decoding the resulting JSON response // into "out" func (c *Client) Get(path string, out interface{}) error { @@ -296,6 +325,11 @@ func NewServer(network *Network) *Server { s.POST("/nodes/:nodeid/conn/:peerid", s.ConnectNode) s.DELETE("/nodes/:nodeid/conn/:peerid", s.DisconnectNode) s.GET("/nodes/:nodeid/rpc", s.NodeRPC) + s.GET("/peerstats/:nodeid", s.GetNodePeerStats) + s.GET("/peerstats", s.GetAllNodePeerStats) + s.DELETE("/nodes/:nodeid", s.DeleteNode) + s.GET("/dht", s.GetAllNodeDHT) + s.GET("/peers", s.GetAllNodePeersInfo) return s } @@ -592,6 +626,13 @@ func (s *Server) GetNode(w http.ResponseWriter, req *http.Request) { s.JSON(w, http.StatusOK, node.NodeInfo()) } +// DeleteNode deletes a node from the network +func (s *Server) DeleteNode(w http.ResponseWriter, req *http.Request) { + node := req.Context().Value("node").(*Node) + s.network.DeleteNode(node.NodeInfo().Name) + s.JSON(w, http.StatusOK, node.NodeInfo()) +} + // StartNode starts a node func (s *Server) StartNode(w http.ResponseWriter, req *http.Request) { node := req.Context().Value("node").(*Node) @@ -642,6 +683,42 @@ func (s *Server) DisconnectNode(w http.ResponseWriter, req *http.Request) { s.JSON(w, http.StatusOK, node.NodeInfo()) } +// GetNodePeerStats returns the peer stats of a node +func (s *Server) GetNodePeerStats(w http.ResponseWriter, req *http.Request) { + node := req.Context().Value("node").(*Node) + s.JSON(w, http.StatusOK, node.PeerStats()) +} + +// GetAllNodePeerStats returns the peer stats of all nodes +func (s *Server) GetAllNodePeerStats(w http.ResponseWriter, req *http.Request) { + stats := make(map[string]*adapters.PeerStats) + for _, node := range s.network.GetNodes() { + stats[node.Config.Name] = node.PeerStats() + } + + s.JSON(w, http.StatusOK, stats) +} + +// GetAllNodeDHT returns the DHT of all nodes +func (s *Server) GetAllNodeDHT(w http.ResponseWriter, req *http.Request) { + nodesDHT := make(map[string][][]enode.Node) + for _, node := range s.network.GetNodes() { + nodesDHT[node.Config.Name] = node.NodesInDHT() + } + + s.JSON(w, http.StatusOK, nodesDHT) +} + +// GetAllNodePeersInfo returns the peers info of all nodes +func (s *Server) GetAllNodePeersInfo(w http.ResponseWriter, req *http.Request) { + peersInfo := make(map[string][]*p2p.PeerInfo) + for _, node := range s.network.GetNodes() { + peersInfo[node.Config.Name] = node.PeersInfo() + } + + s.JSON(w, http.StatusOK, peersInfo) +} + // Options responds to the OPTIONS HTTP method by returning a 200 OK response // with the "Access-Control-Allow-Headers" header set to "Content-Type" func (s *Server) Options(w http.ResponseWriter, req *http.Request) { diff --git a/p2p/simulations/network.go b/p2p/simulations/network.go index 962910dd25..e18266a5c1 100644 --- a/p2p/simulations/network.go +++ b/p2p/simulations/network.go @@ -584,6 +584,20 @@ func (net *Network) getRandomNode(ids []enode.ID, excludeIDs []enode.ID) *Node { return net.getNode(filtered[rand.Intn(l)]) } +// DeleteNode deletes the node with the given ID from the network +func (net *Network) DeleteNode(name string) { + net.lock.Lock() + defer net.lock.Unlock() + + for i, node := range net.Nodes { + if node.Config.Name == name { + delete(net.nodeMap, node.ID()) + net.Nodes = append(net.Nodes[:i], net.Nodes[i+1:]...) + break + } + } +} + func filterIDs(ids []enode.ID, excludeIDs []enode.ID) []enode.ID { exclude := make(map[enode.ID]bool) for _, id := range excludeIDs {