Skip to content

Commit

Permalink
p2p: add flag to config bucket size of DHT
Browse files Browse the repository at this point in the history
  • Loading branch information
sonhv0212 committed Dec 11, 2024
1 parent 18f0e5b commit 84a44c6
Show file tree
Hide file tree
Showing 13 changed files with 53 additions and 31 deletions.
1 change: 1 addition & 0 deletions cmd/ronin/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,7 @@ var (
utils.DisableRoninProtocol,
utils.AdditionalChainEventFlag,
utils.DBEngineFlag,
utils.DHTBucketSizeFlag,
}

rpcFlags = []cli.Flag{
Expand Down
7 changes: 7 additions & 0 deletions cmd/utils/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -871,6 +871,12 @@ var (
Usage: "Sets DNS discovery entry points (use \"\" to disable DNS)",
Category: flags.NetworkingCategory,
}
DHTBucketSizeFlag = &cli.IntFlag{
Name: "dht.bucketsize",
Usage: "Size of each DHT bucket",
Value: 16,
Category: flags.NetworkingCategory,
}

// ATM the url is left to the user and deployment to
JSpathFlag = &flags.DirectoryFlag{
Expand Down Expand Up @@ -1542,6 +1548,7 @@ func SetP2PConfig(ctx *cli.Context, cfg *p2p.Config) {
cfg.NoDiscovery = true
cfg.DiscoveryV5 = false
}
cfg.DHTBucketSize = ctx.Int(DHTBucketSizeFlag.Name)
}

// SetNodeConfig applies node-related command line flags to the config.
Expand Down
3 changes: 3 additions & 0 deletions eth/ethconfig/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,9 @@ type Config struct {

// Send additional chain event
EnableAdditionalChainEvent bool

// Size of each bucket in DHT
DHTBucketSize int
}

// CreateConsensusEngine creates a consensus engine for the given chain configuration.
Expand Down
1 change: 1 addition & 0 deletions p2p/discover/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ type Config struct {
ValidSchemes enr.IdentityScheme // allowed identity schemes
Clock mclock.Clock
FilterFunction NodeFilterFunc // function for filtering ENR entries
DHTBucketSize int // size of each bucket in DHT
}

func (cfg Config) withDefaults() Config {
Expand Down
6 changes: 3 additions & 3 deletions p2p/discover/lookup.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func (it *lookup) advance() bool {
for _, n := range nodes {
if n != nil && !it.seen[n.ID()] {
it.seen[n.ID()] = true
it.result.push(n, bucketSize)
it.result.push(n, defaultBucketSize)
it.replyBuffer = append(it.replyBuffer, n)
}
}
Expand Down Expand Up @@ -104,7 +104,7 @@ func (it *lookup) startQueries() bool {

// The first query returns nodes from the local table.
if it.queries == -1 {
closest := it.tab.findnodeByID(it.result.target, bucketSize, false)
closest := it.tab.findnodeByID(it.result.target, defaultBucketSize, false)
// Avoid finishing the lookup too quickly if table is empty. It'd be better to wait
// for the table to fill in this case, but there is no good mechanism for that
// yet.
Expand Down Expand Up @@ -151,7 +151,7 @@ func (it *lookup) query(n *node, reply chan<- []*node) {
// Remove the node from the local table if it fails to return anything useful too
// many times, but only if there are enough other nodes in the bucket.
dropped := false
if fails >= maxFindnodeFailures && it.tab.bucketLen(n.ID()) >= bucketSize/2 {
if fails >= maxFindnodeFailures && it.tab.bucketLen(n.ID()) >= defaultBucketSize/2 {
dropped = true
it.tab.delete(n)
}
Expand Down
30 changes: 18 additions & 12 deletions p2p/discover/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ import (

const (
alpha = 3 // Kademlia concurrency factor
bucketSize = 16 // Kademlia bucket size
defaultBucketSize = 16 // Kademlia bucket size
maxReplacements = 10 // Size of per-bucket replacement list
maxWorkerTask = 90 // Maximum number of worker tasks
timeoutWorkerTaskClose = 1 * time.Second // Timeout for waiting workerPoolTask is refill full
Expand All @@ -67,11 +67,12 @@ const (
// itself up-to-date by verifying the liveness of neighbors and requesting their node
// records when announcements of a new record version are received.
type Table struct {
mutex sync.Mutex // protects buckets, bucket content, nursery, rand
buckets [nBuckets]*bucket // index of known nodes by distance
nursery []*node // bootstrap nodes
rand *mrand.Rand // source of randomness, periodically reseeded
ips netutil.DistinctNetSet
mutex sync.Mutex // protects buckets, bucket content, nursery, rand
buckets [nBuckets]*bucket // index of known nodes by distance
nursery []*node // bootstrap nodes
rand *mrand.Rand // source of randomness, periodically reseeded
ips netutil.DistinctNetSet
bucketSize int

log log.Logger
db *enode.DB // database of known nodes
Expand Down Expand Up @@ -105,7 +106,7 @@ type bucket struct {
index int
}

func newTable(t transport, db *enode.DB, bootnodes []*enode.Node, log log.Logger, filter NodeFilterFunc) (*Table, error) {
func newTable(t transport, db *enode.DB, bootnodes []*enode.Node, log log.Logger, filter NodeFilterFunc, bucketSize int) (*Table, error) {
tab := &Table{
net: t,
db: db,
Expand All @@ -118,6 +119,7 @@ func newTable(t transport, db *enode.DB, bootnodes []*enode.Node, log log.Logger
ips: netutil.DistinctNetSet{Subnet: tableSubnet, Limit: tableIPLimit},
log: log,
enrFilter: filter,
bucketSize: bucketSize,
}
if err := tab.setFallbackNodes(bootnodes); err != nil {
return nil, err
Expand All @@ -131,15 +133,19 @@ func newTable(t transport, db *enode.DB, bootnodes []*enode.Node, log log.Logger
for i := 0; i < maxWorkerTask; i++ {
tab.workerPoolTask <- struct{}{}
}
// Set default bucket size if bucketSize is not set
if tab.bucketSize <= 0 {
tab.bucketSize = defaultBucketSize
}

tab.seedRand()
tab.loadSeedNodes()

return tab, nil
}

func newMeteredTable(t transport, db *enode.DB, bootnodes []*enode.Node, log log.Logger, filter NodeFilterFunc) (*Table, error) {
tab, err := newTable(t, db, bootnodes, log, filter)
func newMeteredTable(t transport, db *enode.DB, bootnodes []*enode.Node, log log.Logger, filter NodeFilterFunc, bucketSize int) (*Table, error) {
tab, err := newTable(t, db, bootnodes, log, filter, bucketSize)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -557,7 +563,7 @@ func (tab *Table) addSeenNodeSync(n *node) {
// Already in bucket, don't add.
return
}
if len(b.entries) >= bucketSize {
if len(b.entries) >= tab.bucketSize {
// Bucket full, maybe add as replacement.
tab.addReplacement(b, n)
return
Expand Down Expand Up @@ -636,7 +642,7 @@ func (tab *Table) addVerifiedNodeSync(n *node) {
// Already in bucket, moved to front.
return
}
if len(b.entries) >= bucketSize {
if len(b.entries) >= tab.bucketSize {
// Bucket full, maybe add as replacement.
tab.addReplacement(b, n)
return
Expand All @@ -646,7 +652,7 @@ func (tab *Table) addVerifiedNodeSync(n *node) {
return
}
// Add to front of bucket.
b.entries, _ = pushNode(b.entries, n, bucketSize)
b.entries, _ = pushNode(b.entries, n, tab.bucketSize)
b.replacements = deleteNode(b.replacements, n)
n.addedAt = time.Now()
if tab.nodeAddedHook != nil {
Expand Down
6 changes: 3 additions & 3 deletions p2p/discover/table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func testPingReplace(t *testing.T, newNodeIsResponding, lastInBucketIsResponding

tab.mutex.Lock()
defer tab.mutex.Unlock()
wantSize := bucketSize
wantSize := defaultBucketSize
if !lastInBucketIsResponding && !newNodeIsResponding {
wantSize--
}
Expand All @@ -102,7 +102,7 @@ func TestBucket_bumpNoDuplicates(t *testing.T) {
Rand: rand.New(rand.NewSource(time.Now().Unix())),
Values: func(args []reflect.Value, rand *rand.Rand) {
// generate a random list of nodes. this will be the content of the bucket.
n := rand.Intn(bucketSize-1) + 1
n := rand.Intn(defaultBucketSize-1) + 1
nodes := make([]*node, n)
for i := range nodes {
nodes[i] = nodeAtDistance(enode.ID{}, 200, intIP(200))
Expand Down Expand Up @@ -296,7 +296,7 @@ func (*closeTest) Generate(rand *rand.Rand, size int) reflect.Value {
t := &closeTest{
Self: gen(enode.ID{}, rand).(enode.ID),
Target: gen(enode.ID{}, rand).(enode.ID),
N: rand.Intn(bucketSize),
N: rand.Intn(defaultBucketSize),
}
for _, id := range gen([]enode.ID{}, rand).([]enode.ID) {
r := new(enr.Record)
Expand Down
6 changes: 3 additions & 3 deletions p2p/discover/table_util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func init() {

func newTestTable(t transport) (*Table, *enode.DB) {
db, _ := enode.OpenDB("")
tab, _ := newTable(t, db, nil, log.Root(), nil)
tab, _ := newTable(t, db, nil, log.Root(), nil, defaultBucketSize)
go tab.loop()
return tab, db
}
Expand Down Expand Up @@ -100,10 +100,10 @@ func intIP(i int) net.IP {
func fillBucket(tab *Table, n *node) (last *node) {
ld := enode.LogDist(tab.self().ID(), n.ID())
b := tab.bucket(n.ID())
for len(b.entries) < bucketSize {
for len(b.entries) < tab.bucketSize {
b.entries = append(b.entries, nodeAtDistance(tab.self().ID(), ld, intIP(ld)))
}
return b.entries[bucketSize-1]
return b.entries[tab.bucketSize-1]
}

// fillTable adds nodes the table to the end of their corresponding bucket
Expand Down
4 changes: 2 additions & 2 deletions p2p/discover/v4_lookup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,8 @@ func TestUDPv4_Lookup(t *testing.T) {
for _, e := range results {
t.Logf(" ld=%d, %x", enode.LogDist(lookupTestnet.target.id(), e.ID()), e.ID().Bytes())
}
if len(results) != bucketSize {
t.Errorf("wrong number of results: got %d, want %d", len(results), bucketSize)
if len(results) != defaultBucketSize {
t.Errorf("wrong number of results: got %d, want %d", len(results), defaultBucketSize)
}
checkLookupResults(t, lookupTestnet, results)
}
Expand Down
8 changes: 4 additions & 4 deletions p2p/discover/v4_udp.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ func ListenV4(c UDPConn, ln *enode.LocalNode, cfg Config) (*UDPv4, error) {
log: cfg.Log,
}

tab, err := newMeteredTable(t, ln.Database(), cfg.Bootnodes, t.log, cfg.FilterFunction)
tab, err := newMeteredTable(t, ln.Database(), cfg.Bootnodes, t.log, cfg.FilterFunction, cfg.DHTBucketSize)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -303,7 +303,7 @@ func (t *UDPv4) findnode(toid enode.ID, toaddr *net.UDPAddr, target v4wire.Pubke

// Add a matcher for 'neighbours' replies to the pending reply queue. The matcher is
// active until enough nodes have been received.
nodes := make([]*node, 0, bucketSize)
nodes := make([]*node, 0, defaultBucketSize)
nreceived := 0
rm := t.pending(toid, toaddr.IP, v4wire.NeighborsPacket, func(r v4wire.Packet) (matched bool, requestDone bool) {
reply := r.(*v4wire.Neighbors)
Expand All @@ -316,7 +316,7 @@ func (t *UDPv4) findnode(toid enode.ID, toaddr *net.UDPAddr, target v4wire.Pubke
}
nodes = append(nodes, n)
}
return true, nreceived >= bucketSize
return true, nreceived >= defaultBucketSize
})
t.send(toaddr, toid, &v4wire.Findnode{
Target: target,
Expand Down Expand Up @@ -721,7 +721,7 @@ func (t *UDPv4) handleFindnode(h *packetHandlerV4, from *net.UDPAddr, fromID eno

// Determine closest nodes.
target := enode.ID(crypto.Keccak256Hash(req.Target[:]))
closest := t.tab.findnodeByID(target, bucketSize, true).entries
closest := t.tab.findnodeByID(target, defaultBucketSize, true).entries

// Send neighbors in chunks with at most maxNeighbors per packet
// to stay below the packet size limit.
Expand Down
6 changes: 3 additions & 3 deletions p2p/discover/v4_udp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,7 @@ func TestUDPv4_findnode(t *testing.T) {
// take care not to overflow any bucket.
nodes := &nodesByDistance{target: testTarget.ID()}
live := make(map[enode.ID]bool)
numCandidates := 2 * bucketSize
numCandidates := 2 * defaultBucketSize
for i := 0; i < numCandidates; i++ {
key := newkey()
ip := net.IP{10, 13, 0, byte(i)}
Expand All @@ -278,12 +278,12 @@ func TestUDPv4_findnode(t *testing.T) {
test.table.db.UpdateLastPongReceived(remoteID, test.remoteaddr.IP, time.Now())

// check that closest neighbors are returned.
expected := test.table.findnodeByID(testTarget.ID(), bucketSize, true)
expected := test.table.findnodeByID(testTarget.ID(), defaultBucketSize, true)
test.packetIn(nil, &v4wire.Findnode{Target: testTarget, Expiration: futureExp})
waitNeighbors := func(want []*node) {
test.waitPacketOut(func(p *v4wire.Neighbors, to *net.UDPAddr, hash []byte) {
if len(p.Nodes) != len(want) {
t.Errorf("wrong number of results: got %d, want %d", len(p.Nodes), bucketSize)
t.Errorf("wrong number of results: got %d, want %d", len(p.Nodes), defaultBucketSize)
}
for i, n := range p.Nodes {
if n.ID.ID() != want[i].ID() {
Expand Down
2 changes: 1 addition & 1 deletion p2p/discover/v5_udp.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ func newUDPv5(conn UDPConn, ln *enode.LocalNode, cfg Config) (*UDPv5, error) {
closeCtx: closeCtx,
cancelCloseCtx: cancelCloseCtx,
}
tab, err := newMeteredTable(t, t.db, cfg.Bootnodes, cfg.Log, cfg.FilterFunction)
tab, err := newMeteredTable(t, t.db, cfg.Bootnodes, cfg.Log, cfg.FilterFunction, cfg.DHTBucketSize)
if err != nil {
return nil, err
}
Expand Down
4 changes: 4 additions & 0 deletions p2p/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,9 @@ type Config struct {
Logger log.Logger `toml:",omitempty"`

clock mclock.Clock

// Size of each bucket in DHT
DHTBucketSize int
}

// Server manages all peer connections.
Expand Down Expand Up @@ -630,6 +633,7 @@ func (srv *Server) setupDiscovery() error {
Unhandled: unhandled,
Log: srv.log,
FilterFunction: f,
DHTBucketSize: srv.Config.DHTBucketSize,
}
ntab, err := discover.ListenV4(conn, srv.localnode, cfg)
if err != nil {
Expand Down

0 comments on commit 84a44c6

Please sign in to comment.