diff --git a/rapide/rapide.go b/rapide/rapide.go index f0860a055..976d8b5a1 100644 --- a/rapide/rapide.go +++ b/rapide/rapide.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "io" + mrand "math/rand" "sync" "sync/atomic" @@ -42,8 +43,9 @@ func (c *Client) Get(ctx context.Context, root cid.Cid, traversal ipsl.Traversal errors: make([]error, len(c.ServerDrivenDownloaders)), } + seedRand := mrand.New(mrand.NewSource(mrand.Int63())) for i, sdd := range c.ServerDrivenDownloaders { - d.startServerDrivenWorker(ctx, sdd, &d.root, &d.errors[i]) + d.startServerDrivenWorker(ctx, sdd, &d.root, &d.errors[i], seedRand.Int63()^int64(i)) } return out diff --git a/rapide/serverdriven.go b/rapide/serverdriven.go index aa962ba7e..9719e5f6a 100644 --- a/rapide/serverdriven.go +++ b/rapide/serverdriven.go @@ -5,6 +5,7 @@ import ( "errors" "fmt" "io" + mrand "math/rand" "github.com/ipfs/go-cid" "github.com/ipfs/go-libipfs/blocks" @@ -15,22 +16,24 @@ type serverDrivenWorker struct { impl ServerDrivenDownloader download *download outErr *error - - current *node - - tasks map[cid.Cid]*node + current *node + tasks map[cid.Cid]*node + rand mrand.Rand // TODO: add a dontGoThere map which tells you what part of the dag this node is not able to handle } -func (d *download) startServerDrivenWorker(ctx context.Context, impl ServerDrivenDownloader, root *node, outErr *error) { - go (&serverDrivenWorker{ +func (d *download) startServerDrivenWorker(ctx context.Context, impl ServerDrivenDownloader, root *node, outErr *error, seed int64) { + w := &serverDrivenWorker{ impl: impl, download: d, outErr: outErr, current: root, tasks: make(map[cid.Cid]*node), - }).work(ctx) + rand: *mrand.New(mrand.NewSource(seed)), + } + + go w.work(ctx) } func (w *serverDrivenWorker) work(ctx context.Context) { @@ -175,21 +178,12 @@ func (w *serverDrivenWorker) findWork() (cid.Cid, ipsl.Traversal, bool) { case done: // first search in it's childs if it has something we could run c.workers += 1 - var minWorkers uint + var minWorkers, luck uint var min *node for _, child := range c.childrens { // we run a minimum search, we want the node that have the least amount of workers currently // TODO: filter childs in the dontGoThere map - child.mu.Lock() - switch { - case min == nil: - minWorkers = child.workers - min = child - case child.workers < minWorkers: - minWorkers = child.workers - min = child - } - child.mu.Unlock() + minWorkers, min, luck = w.compareChildWithMinimums(child, minWorkers, min, luck) } if min != nil { c.mu.Unlock() @@ -197,8 +191,7 @@ func (w *serverDrivenWorker) findWork() (cid.Cid, ipsl.Traversal, bool) { continue } - // this node is fully completed, backtracking - // TODO: add c in dontGoThere (we failed to select any child) + // this node is fully completed, do backtracking c.workers -= 1 new := c.parent c.mu.Unlock() @@ -211,6 +204,35 @@ func (w *serverDrivenWorker) findWork() (cid.Cid, ipsl.Traversal, bool) { } } +func (w *serverDrivenWorker) compareChildWithMinimums(child *node, minWorkers uint, min *node, luck uint) (uint, *node, uint) { + child.mu.Lock() + switch { + case min == nil: + minWorkers = child.workers + min = child + case child.workers < minWorkers: + minWorkers = child.workers + min = child + luck = 0 + case child.workers == minWorkers: + // if scores are identical randomly select other nodes to randomly distribute where downloads are placed + if luck == 0 { + // lazy initialisation of luck, this allows to creating a random value when better values exists back to back + luck = uint(w.rand.Int()) + } + newLuck := uint(w.rand.Int()) + if newLuck >= luck { + break + } + minWorkers = child.workers + min = child + luck = newLuck + } + child.mu.Unlock() + + return minWorkers, min, luck +} + // resetCurrentChildsNodeWorkState updates the state of the current node to longer count towards it. func (w *serverDrivenWorker) resetCurrentChildsNodeWorkState() { c := w.current