Skip to content

Commit

Permalink
p2psim: Add network simulation for DHT
Browse files Browse the repository at this point in the history
  • Loading branch information
sonhv0212 committed Dec 16, 2024
1 parent 84a44c6 commit dc2410b
Show file tree
Hide file tree
Showing 12 changed files with 699 additions and 26 deletions.
297 changes: 297 additions & 0 deletions cmd/p2psim/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,13 @@ import (
"fmt"
"io"
"os"
"os/signal"
"strings"
"syscall"
"text/tabwriter"
"time"

"github.com/ethereum/go-ethereum/cmd/utils"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/p2p/enode"
Expand Down Expand Up @@ -102,6 +106,22 @@ func main() {
Usage: "load a network snapshot from stdin",
Action: loadSnapshot,
},
{
Name: "network",
Usage: "manage the simulation network",
Subcommands: []*cli.Command{
{
Name: "start",
Usage: "start all nodes in the network",
Action: startNetwork,
},
{
Name: "peer-stats",
Usage: "show peer stats",
Action: getNetworkPeerStats,
},
},
},
{
Name: "node",
Usage: "manage simulation nodes",
Expand Down Expand Up @@ -132,6 +152,74 @@ func main() {
Value: "",
Usage: "node private key (hex encoded)",
},
&cli.BoolFlag{
Name: "sim.dialer",
Usage: "Use the simulation dialer",
},
&cli.BoolFlag{
Name: "fake.iplistener",
Usage: "Use the fake listener",
},
&cli.BoolFlag{
Name: "start",
Usage: "start the node after creation",
},
utils.NoDiscoverFlag,
utils.DHTBucketSizeFlag,
utils.BootnodesFlag,
&cli.BoolFlag{
Name: "autofill.bootnodes",
Usage: "autofill bootnodes with existing bootnodes from manager",
},
},
},
{
Name: "create-multi",
Usage: "create a node",
Action: createMultiNode,
Flags: []cli.Flag{
&cli.StringFlag{
Name: "name",
Value: "",
Usage: "node name",
},
&cli.IntFlag{
Name: "count",
Value: 1,
Usage: "number of nodes to create",
},
&cli.StringFlag{
Name: "services",
Value: "",
Usage: "node services (comma separated)",
},
&cli.BoolFlag{
Name: "sim.dialer",
Usage: "Use the simulation dialer",
},
&cli.BoolFlag{
Name: "fake.iplistener",
Usage: "Use the fake listener",
},
&cli.BoolFlag{
Name: "start",
Usage: "start the node after creation",
},
utils.NoDiscoverFlag,
utils.DHTBucketSizeFlag,
utils.BootnodesFlag,
&cli.BoolFlag{
Name: "only.bootnode",
Usage: "only create bootnodes",
},
&cli.DurationFlag{
Name: "interval",
Usage: "create interval",
},
&cli.BoolFlag{
Name: "autofill.bootnodes",
Usage: "autofill bootnodes with existing bootnodes from manager",
},
},
},
{
Expand Down Expand Up @@ -176,6 +264,29 @@ func main() {
},
},
},
{
Name: "peer-stats",
Usage: "show peer stats",
ArgsUsage: "<node>",
Action: getNodePeerStats,
},
},
},
{
Name: "log-stats",
Usage: "log peer stats to a CSV file",
Action: startLogStats,
Flags: []cli.Flag{
&cli.StringFlag{
Name: "file",
Usage: "output file",
Value: "stats.csv",
},
&cli.DurationFlag{
Name: "interval",
Usage: "log interval",
Value: 15 * time.Second,
},
},
},
}
Expand Down Expand Up @@ -287,6 +398,28 @@ func createNode(ctx *cli.Context) error {
config.ID = enode.PubkeyToIDV4(&privKey.PublicKey)
config.PrivateKey = privKey
}
if ctx.Bool(utils.NoDiscoverFlag.Name) {
config.NoDiscovery = true
}
if ctx.Bool("sim.dialer") {
config.UseTCPDialer = false
} else {
config.UseTCPDialer = true
}
if ctx.Bool("fake.iplistener") {
config.UseFakeIPListener = true
}
config.BootstrapNodeURLs = ctx.String(utils.BootnodesFlag.Name)
if ctx.Bool("autofill.bootnodes") {
bootnodeURLs, err := getBootnodes()
if err != nil {
return err
}
if bootnodeURLs != "" {
config.BootstrapNodeURLs += "," + bootnodeURLs
}
}
config.DHTBucketSize = ctx.Int(utils.DHTBucketSizeFlag.Name)
if services := ctx.String("services"); services != "" {
config.Lifecycles = strings.Split(services, ",")
}
Expand All @@ -295,6 +428,82 @@ func createNode(ctx *cli.Context) error {
return err
}
fmt.Fprintln(ctx.App.Writer, "Created", node.Name)

// Start node if needed
if ctx.Bool("start") {
if err := client.StartNode(node.Name); err != nil {
return err
}
fmt.Fprintln(ctx.App.Writer, "Started", node.Name)
}

return nil
}

func getBootnodes() (string, error) {
nodes, err := client.GetNodes()
if err != nil {
return "", err
}

bootnodes := make([]string, 0)
for _, node := range nodes {
if strings.HasPrefix(node.Name, "bootnode") {
bootnodes = append(bootnodes, node.Enode)
}
}

return strings.Join(bootnodes, ","), nil
}

func createMultiNode(ctx *cli.Context) error {
if ctx.Args().Len() != 0 {
return cli.ShowCommandHelp(ctx, ctx.Command.Name)
}

t := time.Now()

onlyCreateBootnode := ctx.Bool("only.bootnode")
createInterval := ctx.Duration("interval")
bootNodeURLs := ctx.String(utils.BootnodesFlag.Name)
if ctx.Bool("autofill.bootnodes") {
existedBootnodeURLs, err := getBootnodes()
if err != nil {
return err
}
if existedBootnodeURLs != "" {
bootNodeURLs += "," + existedBootnodeURLs
}
}

// Overwrite the bootnodes flag if not create bootnodes
if !onlyCreateBootnode {
ctx.Set(utils.BootnodesFlag.Name, bootNodeURLs)
}

// Create nodes
count := ctx.Int("count")
for i := 0; i < count; i++ {
nodeName := fmt.Sprintf("node-%d-%d", t.Unix(), i)
if onlyCreateBootnode {
nodeName = fmt.Sprintf("bootnode-%d-%d", t.Unix(), i)
}
ctx.Set("name", nodeName)
for {
if err := createNode(ctx); err != nil {
fmt.Fprintln(ctx.App.Writer, "Failed to create node", nodeName, err)
// Try to create the node again
client.DeleteNode(nodeName)
time.Sleep(500 * time.Millisecond)
} else {
break
}
}
if createInterval > 0 {
time.Sleep(createInterval)
}
}

return nil
}

Expand Down Expand Up @@ -429,3 +638,91 @@ func rpcSubscribe(client *rpc.Client, out io.Writer, method string, args ...stri
}
}
}

func startNetwork(ctx *cli.Context) error {
if ctx.Args().Len() != 0 {
return cli.ShowCommandHelp(ctx, ctx.Command.Name)
}
if err := client.StartNetwork(); err != nil {
return err
}
fmt.Fprintln(ctx.App.Writer, "Started network")
return nil
}

func getNodePeerStats(ctx *cli.Context) error {
if ctx.Args().Len() != 1 {
return cli.ShowCommandHelp(ctx, ctx.Command.Name)
}
nodeName := ctx.Args().Get(0)
stats, err := client.GetNodePeerStats(nodeName)
if err != nil {
return err
}
fmt.Fprintln(ctx.App.Writer, "Peer stats of", ctx.String("node"))
fmt.Fprintln(ctx.App.Writer, "Peer count: ", stats.PeerCount)
fmt.Fprintln(ctx.App.Writer, "Tried: ", stats.Tried)
fmt.Fprintln(ctx.App.Writer, "Failed: ", stats.Failed)
fmt.Fprintln(ctx.App.Writer, "Nodes count: ", stats.DifferentNodesDiscovered)
fmt.Fprintln(ctx.App.Writer, "DHT: ", stats.DHTBuckets)
return nil
}

func getNetworkPeerStats(ctx *cli.Context) error {
if ctx.Args().Len() != 0 {
return cli.ShowCommandHelp(ctx, ctx.Command.Name)
}
stats, err := client.GetAllNodePeerStats()
if err != nil {
return err
}
for nodeID, stats := range stats {
fmt.Fprintln(ctx.App.Writer, "Peer stats of", nodeID)
fmt.Fprintln(ctx.App.Writer, "Peer count: ", stats.PeerCount)
fmt.Fprintln(ctx.App.Writer, "Tried: ", stats.Tried)
fmt.Fprintln(ctx.App.Writer, "Failed: ", stats.Failed)
fmt.Fprintln(ctx.App.Writer, "Nodes count: ", stats.DifferentNodesDiscovered)
fmt.Fprintln(ctx.App.Writer, "DHT: ", stats.DHTBuckets)
}
return nil
}

func startLogStats(ctx *cli.Context) error {
if ctx.Args().Len() != 0 {
return cli.ShowCommandHelp(ctx, ctx.Command.Name)
}
csvFile := ctx.String("file")
f, err := os.OpenFile(csvFile, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0644)
if err != nil {
return err
}
defer f.Close()

sig := make(chan os.Signal, 1)
signal.Notify(sig, syscall.SIGINT, syscall.SIGTERM)
timer := time.NewTicker(ctx.Duration("interval"))

f.WriteString("node,timestamp,type,value\n")

loop:
for {
select {
case <-sig:
return nil
case <-timer.C:
stats, err := client.GetAllNodePeerStats()
if err != nil {
fmt.Fprintln(ctx.App.Writer, err)
goto loop
}
for nodeID, stats := range stats {
t := time.Now()
f.WriteString(fmt.Sprintf("%s,%d,%s,%d\n", nodeID, t.Unix(), "PeerCount", stats.PeerCount))
f.WriteString(fmt.Sprintf("%s,%d,%s,%d\n", nodeID, t.Unix(), "Tried", stats.Tried))
f.WriteString(fmt.Sprintf("%s,%d,%s,%d\n", nodeID, t.Unix(), "Failed", stats.Failed))
f.WriteString(fmt.Sprintf("%s,%d,%s,%d\n", nodeID, t.Unix(), "DifferentNodesDiscovered", stats.DifferentNodesDiscovered))
f.WriteString(fmt.Sprintf("%s,%d,%s,%d\n", nodeID, t.Unix(), "DHTBuckets", stats.DHTBuckets))
}
}
}
}
15 changes: 15 additions & 0 deletions p2p/discover/v4_udp.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,21 @@ func ListenV4(c UDPConn, ln *enode.LocalNode, cfg Config) (*UDPv4, error) {
return t, nil
}

func (t *UDPv4) BucketSizes() []int {
if t == nil || t.tab == nil {
return []int{}
}
sizes := make([]int, len(t.tab.buckets))
for i, bucket := range t.tab.buckets {
if bucket == nil {
sizes[i] = 0
} else {
sizes[i] = len(bucket.entries)
}
}
return sizes
}

// Self returns the local node.
func (t *UDPv4) Self() *enode.Node {
return t.localNode.Node()
Expand Down
9 changes: 9 additions & 0 deletions p2p/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,15 @@ func (c *conn) set(f connFlag, val bool) {
}
}

// SetListenFunc sets the function used to accept inbound connections (testing only)
func (srv *Server) SetListenFunc(f func(network, addr string) (net.Listener, error)) {
srv.listenFunc = f
}

func (srv *Server) UDPv4() *discover.UDPv4 {
return srv.ntab
}

// LocalNode returns the local node record.
func (srv *Server) LocalNode() *enode.LocalNode {
return srv.localnode
Expand Down
5 changes: 5 additions & 0 deletions p2p/simulations/adapters/exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -364,6 +364,11 @@ func (n *ExecNode) Snapshots() (map[string][]byte, error) {
return snapshots, n.client.Call(&snapshots, "simulation_snapshot")
}

// Empty PeerStats
func (n *ExecNode) PeerStats() *PeerStats {
return &PeerStats{}
}

// execNodeConfig is used to serialize the node configuration so it can be
// passed to the child process as a JSON encoded environment variable
type execNodeConfig struct {
Expand Down
Loading

0 comments on commit dc2410b

Please sign in to comment.