Skip to content

Commit

Permalink
p2p: Allow to config bucket size of DHT
Browse files Browse the repository at this point in the history
Currently, the bucket size of the DHT is hardcoded to 16. This commit introduces a new flag --dht.bucketsize, allowing users to configure the bucket size as needed.
  • Loading branch information
sonhv0212 committed Dec 22, 2024
1 parent dfbbfea commit 12a3d75
Show file tree
Hide file tree
Showing 13 changed files with 53 additions and 31 deletions.
1 change: 1 addition & 0 deletions cmd/ronin/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,7 @@ var (
utils.DisableRoninProtocol,
utils.AdditionalChainEventFlag,
utils.DBEngineFlag,
utils.DHTBucketSizeFlag,
}

rpcFlags = []cli.Flag{
Expand Down
7 changes: 7 additions & 0 deletions cmd/utils/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -871,6 +871,12 @@ var (
Usage: "Sets DNS discovery entry points (use \"\" to disable DNS)",
Category: flags.NetworkingCategory,
}
DHTBucketSizeFlag = &cli.IntFlag{
Name: "dht.bucketsize",
Usage: "Size of each DHT bucket",
Value: 16,
Category: flags.NetworkingCategory,
}

// ATM the url is left to the user and deployment to
JSpathFlag = &flags.DirectoryFlag{
Expand Down Expand Up @@ -1542,6 +1548,7 @@ func SetP2PConfig(ctx *cli.Context, cfg *p2p.Config) {
cfg.NoDiscovery = true
cfg.DiscoveryV5 = false
}
cfg.DHTBucketSize = ctx.Int(DHTBucketSizeFlag.Name)
}

// SetNodeConfig applies node-related command line flags to the config.
Expand Down
3 changes: 3 additions & 0 deletions eth/ethconfig/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,9 @@ type Config struct {

// Send additional chain event
EnableAdditionalChainEvent bool

// Size of each bucket in DHT
DHTBucketSize int
}

// CreateConsensusEngine creates a consensus engine for the given chain configuration.
Expand Down
1 change: 1 addition & 0 deletions p2p/discover/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ type Config struct {
ValidSchemes enr.IdentityScheme // allowed identity schemes
Clock mclock.Clock
FilterFunction NodeFilterFunc // function for filtering ENR entries
DHTBucketSize int // size of each bucket in DHT
}

func (cfg Config) withDefaults() Config {
Expand Down
6 changes: 3 additions & 3 deletions p2p/discover/lookup.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func (it *lookup) advance() bool {
for _, n := range nodes {
if n != nil && !it.seen[n.ID()] {
it.seen[n.ID()] = true
it.result.push(n, bucketSize)
it.result.push(n, defaultBucketSize)
it.replyBuffer = append(it.replyBuffer, n)
}
}
Expand Down Expand Up @@ -104,7 +104,7 @@ func (it *lookup) startQueries() bool {

// The first query returns nodes from the local table.
if it.queries == -1 {
closest := it.tab.findnodeByID(it.result.target, bucketSize, false)
closest := it.tab.findnodeByID(it.result.target, defaultBucketSize, false)
// Avoid finishing the lookup too quickly if table is empty. It'd be better to wait
// for the table to fill in this case, but there is no good mechanism for that
// yet.
Expand Down Expand Up @@ -151,7 +151,7 @@ func (it *lookup) query(n *node, reply chan<- []*node) {
// Remove the node from the local table if it fails to return anything useful too
// many times, but only if there are enough other nodes in the bucket.
dropped := false
if fails >= maxFindnodeFailures && it.tab.bucketLen(n.ID()) >= bucketSize/2 {
if fails >= maxFindnodeFailures && it.tab.bucketLen(n.ID()) >= defaultBucketSize/2 {
dropped = true
it.tab.delete(n)
}
Expand Down
30 changes: 18 additions & 12 deletions p2p/discover/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ import (

const (
alpha = 3 // Kademlia concurrency factor
bucketSize = 16 // Kademlia bucket size
defaultBucketSize = 16 // Kademlia bucket size
maxReplacements = 10 // Size of per-bucket replacement list
maxWorkerTask = 90 // Maximum number of worker tasks
timeoutWorkerTaskClose = 1 * time.Second // Timeout for waiting workerPoolTask is refill full
Expand All @@ -67,11 +67,12 @@ const (
// itself up-to-date by verifying the liveness of neighbors and requesting their node
// records when announcements of a new record version are received.
type Table struct {
mutex sync.Mutex // protects buckets, bucket content, nursery, rand
buckets [nBuckets]*bucket // index of known nodes by distance
nursery []*node // bootstrap nodes
rand *mrand.Rand // source of randomness, periodically reseeded
ips netutil.DistinctNetSet
mutex sync.Mutex // protects buckets, bucket content, nursery, rand
buckets [nBuckets]*bucket // index of known nodes by distance
nursery []*node // bootstrap nodes
rand *mrand.Rand // source of randomness, periodically reseeded
ips netutil.DistinctNetSet
bucketSize int

log log.Logger
db *enode.DB // database of known nodes
Expand Down Expand Up @@ -105,7 +106,7 @@ type bucket struct {
index int
}

func newTable(t transport, db *enode.DB, bootnodes []*enode.Node, log log.Logger, filter NodeFilterFunc) (*Table, error) {
func newTable(t transport, db *enode.DB, bootnodes []*enode.Node, log log.Logger, filter NodeFilterFunc, bucketSize int) (*Table, error) {
tab := &Table{
net: t,
db: db,
Expand All @@ -118,6 +119,7 @@ func newTable(t transport, db *enode.DB, bootnodes []*enode.Node, log log.Logger
ips: netutil.DistinctNetSet{Subnet: tableSubnet, Limit: tableIPLimit},
log: log,
enrFilter: filter,
bucketSize: bucketSize,
}
if err := tab.setFallbackNodes(bootnodes); err != nil {
return nil, err
Expand All @@ -131,15 +133,19 @@ func newTable(t transport, db *enode.DB, bootnodes []*enode.Node, log log.Logger
for i := 0; i < maxWorkerTask; i++ {
tab.workerPoolTask <- struct{}{}
}
// Set default bucket size if bucketSize is not set
if tab.bucketSize <= 0 {
tab.bucketSize = defaultBucketSize
}

tab.seedRand()
tab.loadSeedNodes()

return tab, nil
}

func newMeteredTable(t transport, db *enode.DB, bootnodes []*enode.Node, log log.Logger, filter NodeFilterFunc) (*Table, error) {
tab, err := newTable(t, db, bootnodes, log, filter)
func newMeteredTable(t transport, db *enode.DB, bootnodes []*enode.Node, log log.Logger, filter NodeFilterFunc, bucketSize int) (*Table, error) {
tab, err := newTable(t, db, bootnodes, log, filter, bucketSize)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -557,7 +563,7 @@ func (tab *Table) addSeenNodeSync(n *node) {
// Already in bucket, don't add.
return
}
if len(b.entries) >= bucketSize {
if len(b.entries) >= tab.bucketSize {
// Bucket full, maybe add as replacement.
tab.addReplacement(b, n)
return
Expand Down Expand Up @@ -636,7 +642,7 @@ func (tab *Table) addVerifiedNodeSync(n *node) {
// Already in bucket, moved to front.
return
}
if len(b.entries) >= bucketSize {
if len(b.entries) >= tab.bucketSize {
// Bucket full, maybe add as replacement.
tab.addReplacement(b, n)
return
Expand All @@ -646,7 +652,7 @@ func (tab *Table) addVerifiedNodeSync(n *node) {
return
}
// Add to front of bucket.
b.entries, _ = pushNode(b.entries, n, bucketSize)
b.entries, _ = pushNode(b.entries, n, tab.bucketSize)
b.replacements = deleteNode(b.replacements, n)
n.addedAt = time.Now()
if tab.nodeAddedHook != nil {
Expand Down
6 changes: 3 additions & 3 deletions p2p/discover/table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func testPingReplace(t *testing.T, newNodeIsResponding, lastInBucketIsResponding

tab.mutex.Lock()
defer tab.mutex.Unlock()
wantSize := bucketSize
wantSize := defaultBucketSize
if !lastInBucketIsResponding && !newNodeIsResponding {
wantSize--
}
Expand All @@ -102,7 +102,7 @@ func TestBucket_bumpNoDuplicates(t *testing.T) {
Rand: rand.New(rand.NewSource(time.Now().Unix())),
Values: func(args []reflect.Value, rand *rand.Rand) {
// generate a random list of nodes. this will be the content of the bucket.
n := rand.Intn(bucketSize-1) + 1
n := rand.Intn(defaultBucketSize-1) + 1
nodes := make([]*node, n)
for i := range nodes {
nodes[i] = nodeAtDistance(enode.ID{}, 200, intIP(200))
Expand Down Expand Up @@ -296,7 +296,7 @@ func (*closeTest) Generate(rand *rand.Rand, size int) reflect.Value {
t := &closeTest{
Self: gen(enode.ID{}, rand).(enode.ID),
Target: gen(enode.ID{}, rand).(enode.ID),
N: rand.Intn(bucketSize),
N: rand.Intn(defaultBucketSize),
}
for _, id := range gen([]enode.ID{}, rand).([]enode.ID) {
r := new(enr.Record)
Expand Down
6 changes: 3 additions & 3 deletions p2p/discover/table_util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func init() {

func newTestTable(t transport) (*Table, *enode.DB) {
db, _ := enode.OpenDB("")
tab, _ := newTable(t, db, nil, log.Root(), nil)
tab, _ := newTable(t, db, nil, log.Root(), nil, defaultBucketSize)
go tab.loop()
return tab, db
}
Expand Down Expand Up @@ -100,10 +100,10 @@ func intIP(i int) net.IP {
func fillBucket(tab *Table, n *node) (last *node) {
ld := enode.LogDist(tab.self().ID(), n.ID())
b := tab.bucket(n.ID())
for len(b.entries) < bucketSize {
for len(b.entries) < tab.bucketSize {
b.entries = append(b.entries, nodeAtDistance(tab.self().ID(), ld, intIP(ld)))
}
return b.entries[bucketSize-1]
return b.entries[tab.bucketSize-1]
}

// fillTable adds nodes the table to the end of their corresponding bucket
Expand Down
4 changes: 2 additions & 2 deletions p2p/discover/v4_lookup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,8 @@ func TestUDPv4_Lookup(t *testing.T) {
for _, e := range results {
t.Logf(" ld=%d, %x", enode.LogDist(lookupTestnet.target.id(), e.ID()), e.ID().Bytes())
}
if len(results) != bucketSize {
t.Errorf("wrong number of results: got %d, want %d", len(results), bucketSize)
if len(results) != defaultBucketSize {
t.Errorf("wrong number of results: got %d, want %d", len(results), defaultBucketSize)
}
checkLookupResults(t, lookupTestnet, results)
}
Expand Down
8 changes: 4 additions & 4 deletions p2p/discover/v4_udp.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ func ListenV4(c UDPConn, ln *enode.LocalNode, cfg Config) (*UDPv4, error) {
log: cfg.Log,
}

tab, err := newMeteredTable(t, ln.Database(), cfg.Bootnodes, t.log, cfg.FilterFunction)
tab, err := newMeteredTable(t, ln.Database(), cfg.Bootnodes, t.log, cfg.FilterFunction, cfg.DHTBucketSize)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -303,7 +303,7 @@ func (t *UDPv4) findnode(toid enode.ID, toaddr *net.UDPAddr, target v4wire.Pubke

// Add a matcher for 'neighbours' replies to the pending reply queue. The matcher is
// active until enough nodes have been received.
nodes := make([]*node, 0, bucketSize)
nodes := make([]*node, 0, defaultBucketSize)
nreceived := 0
rm := t.pending(toid, toaddr.IP, v4wire.NeighborsPacket, func(r v4wire.Packet) (matched bool, requestDone bool) {
reply := r.(*v4wire.Neighbors)
Expand All @@ -316,7 +316,7 @@ func (t *UDPv4) findnode(toid enode.ID, toaddr *net.UDPAddr, target v4wire.Pubke
}
nodes = append(nodes, n)
}
return true, nreceived >= bucketSize
return true, nreceived >= defaultBucketSize
})
t.send(toaddr, toid, &v4wire.Findnode{
Target: target,
Expand Down Expand Up @@ -721,7 +721,7 @@ func (t *UDPv4) handleFindnode(h *packetHandlerV4, from *net.UDPAddr, fromID eno

// Determine closest nodes.
target := enode.ID(crypto.Keccak256Hash(req.Target[:]))
closest := t.tab.findnodeByID(target, bucketSize, true).entries
closest := t.tab.findnodeByID(target, defaultBucketSize, true).entries

// Send neighbors in chunks with at most maxNeighbors per packet
// to stay below the packet size limit.
Expand Down
6 changes: 3 additions & 3 deletions p2p/discover/v4_udp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,7 @@ func TestUDPv4_findnode(t *testing.T) {
// take care not to overflow any bucket.
nodes := &nodesByDistance{target: testTarget.ID()}
live := make(map[enode.ID]bool)
numCandidates := 2 * bucketSize
numCandidates := 2 * defaultBucketSize
for i := 0; i < numCandidates; i++ {
key := newkey()
ip := net.IP{10, 13, 0, byte(i)}
Expand All @@ -278,12 +278,12 @@ func TestUDPv4_findnode(t *testing.T) {
test.table.db.UpdateLastPongReceived(remoteID, test.remoteaddr.IP, time.Now())

// check that closest neighbors are returned.
expected := test.table.findnodeByID(testTarget.ID(), bucketSize, true)
expected := test.table.findnodeByID(testTarget.ID(), defaultBucketSize, true)
test.packetIn(nil, &v4wire.Findnode{Target: testTarget, Expiration: futureExp})
waitNeighbors := func(want []*node) {
test.waitPacketOut(func(p *v4wire.Neighbors, to *net.UDPAddr, hash []byte) {
if len(p.Nodes) != len(want) {
t.Errorf("wrong number of results: got %d, want %d", len(p.Nodes), bucketSize)
t.Errorf("wrong number of results: got %d, want %d", len(p.Nodes), defaultBucketSize)
}
for i, n := range p.Nodes {
if n.ID.ID() != want[i].ID() {
Expand Down
2 changes: 1 addition & 1 deletion p2p/discover/v5_udp.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ func newUDPv5(conn UDPConn, ln *enode.LocalNode, cfg Config) (*UDPv5, error) {
closeCtx: closeCtx,
cancelCloseCtx: cancelCloseCtx,
}
tab, err := newMeteredTable(t, t.db, cfg.Bootnodes, cfg.Log, cfg.FilterFunction)
tab, err := newMeteredTable(t, t.db, cfg.Bootnodes, cfg.Log, cfg.FilterFunction, cfg.DHTBucketSize)
if err != nil {
return nil, err
}
Expand Down
4 changes: 4 additions & 0 deletions p2p/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,9 @@ type Config struct {
Logger log.Logger `toml:",omitempty"`

clock mclock.Clock

// Size of each bucket in DHT
DHTBucketSize int
}

// Server manages all peer connections.
Expand Down Expand Up @@ -630,6 +633,7 @@ func (srv *Server) setupDiscovery() error {
Unhandled: unhandled,
Log: srv.log,
FilterFunction: f,
DHTBucketSize: srv.Config.DHTBucketSize,
}
ntab, err := discover.ListenV4(conn, srv.localnode, cfg)
if err != nil {
Expand Down

0 comments on commit 12a3d75

Please sign in to comment.