From 5f374bc75c109afa5a9c641de99f5826f39cd46e Mon Sep 17 00:00:00 2001 From: Jorropo Date: Tue, 7 Mar 2023 18:03:21 +0100 Subject: [PATCH] rapide: first support for client driven downloads It lacks many features mainly backtracking and rescan. --- rapide/clientdriven.go | 262 ++++++++++++++++++++++++++++++++++++ rapide/clientdriven_test.go | 96 +++++++++++++ rapide/rapide.go | 47 +++++-- 3 files changed, 397 insertions(+), 8 deletions(-) create mode 100644 rapide/clientdriven.go create mode 100644 rapide/clientdriven_test.go diff --git a/rapide/clientdriven.go b/rapide/clientdriven.go new file mode 100644 index 000000000..1119fca28 --- /dev/null +++ b/rapide/clientdriven.go @@ -0,0 +1,262 @@ +package rapide + +import ( + "io" + "sync" + + "github.com/ipfs/boxo/blocks" + "github.com/ipfs/go-cid" +) + +// optimize for 2MiB blocks at 10Gbit/s throughput and 200ms one way latency: +// 10Gbit/s * 200ms / 2MiB = 119.2; then round up to the nearest power of two +const targetParallelBlockDownloads = 128 + +type clientDrivenWorker struct { + impl ClientDrivenDownloader + dl *download + outErr *error + mu sync.Mutex + + // len counts the number of non done nodes in the snake + len uint + head *snake + tail *snake + + // TODO: add a dontGoThere map which tells you what part of the dag this node is not able to handle +} + +func (d *download) startClientDrivenWorker(impl ClientDrivenDownloader, start *node, outErr *error) { + w := &clientDrivenWorker{ + impl: impl, + dl: d, + outErr: outErr, + len: 1, + } + + root := &snake{ + worker: w, + node: start, + } + + w.head = root + w.tail = root + + impl.Download(CidCallbackPair{start.cid, root.callback}) +} + +// err must be called while holding w.mu. +func (w *clientDrivenWorker) err(err error) { + if err == io.EOF { + w.dl.workerFinished() + } else { + *w.outErr = err + w.dl.workerErrored() + } + + toCancel := make([]cid.Cid, w.len) + for i, p := 0, w.head; p != nil; i, p = i+1, p.children { + if p.status != snakeTodo { + continue + } + + p.status = snakeDone + toCancel[i] = p.node.cid + } + w.len = 0 + w.impl.Cancel(toCancel...) +} + +type snakeStatus uint8 + +const ( + snakeTodo snakeStatus = iota + snakeDone + snakeDuped +) + +type snake struct { + worker *clientDrivenWorker + parent *snake + children *snake + node *node + // level indicates how deep a node is in the tree + level uint + status snakeStatus +} + +func (s *snake) callback(data []byte, err error) { + w := s.worker + w.mu.Lock() + defer w.mu.Unlock() + if s.status > snakeTodo { + // we already canceled this snake, do nothing + return + } + w.len-- + s.status = snakeDone + n := s.node + + if err != nil { + // TODO: handle ErrNotFound + goto Errr + } + + n.mu.Lock() + if n.state == todo { + var block blocks.Block + block, err = blocks.NewBlockWithCid(data, n.cid) + if err != nil { + goto Errr + } + err = n.expand(w.dl, block) + n.mu.Lock() + if err != nil { + goto Errr + } + + newBlocksWanted := uint(len(n.childrens)) + if remainingSpace := targetParallelBlockDownloads - w.len; newBlocksWanted > remainingSpace { + newBlocksWanted = remainingSpace + } + w.len += newBlocksWanted + if newBlocksWanted != 0 { + downloads := make([]CidCallbackPair, newBlocksWanted) + // TODO: select blocks randomly within the children + left, right := s.dup() + for i := range downloads { + child := n.childrens[i] + child.mu.Lock() + child.workers++ + child.mu.Unlock() + ns := &snake{ + worker: s.worker, + node: child, + parent: left, + level: s.level + 1, + } + left.children, left = ns, ns + downloads[i] = CidCallbackPair{child.cid, ns.callback} + } + left.children, right.parent = right, left + s.update() + // TODO: if we havn't found enough blocks to download, try in other nodes of the snake or try backtracking from the head. + s.worker.impl.Download(downloads...) + } else { + s.update() + } + select { + case w.dl.out <- blocks.Is(block): + case <-w.dl.ctx.Done(): + err = w.dl.ctx.Err() + goto Errr + } + return + } + // duplicated block + s.update() + return + +Errr: + s.update() + w.err(err) +} + +// update checks if this node should be removed from the snake and do so if needed. It will update the metric if needed. +// It must be called while holding s.worker.mu and s.node.mu, it will unlock s.node.mu. +func (s *snake) update() { + if s.status == snakeTodo { + s.node.mu.Unlock() + return + } + removeSelf := (s.parent == nil || s.parent.level <= s.level) && (s.children == nil || s.children.level <= s.level) + if !removeSelf { + s.node.mu.Unlock() + return + } + if s.parent != nil { + s.parent.children = s.children + } else { + s.worker.head = s.children + } + if s.children != nil { + s.children.parent = s.parent + } else { + s.worker.tail = s.parent + } + if s.status == snakeDone { + s.node.workers-- + } + s.node.mu.Unlock() + if s.parent != nil { + s.parent.updateWithoutNodeLock(updateParent) + s.parent = nil + } + if s.children != nil { + s.children.updateWithoutNodeLock(updateChild) + s.children = nil + } +} + +type updateDirection uint8 + +const ( + _ updateDirection = iota + updateParent + updateChild +) + +// updateWithoutNodeLock is like update but it doesn't require to hold s.node.mu. +// It must be called while holding s.worker.mu. +func (s *snake) updateWithoutNodeLock(direction updateDirection) { + if s.status == snakeTodo { + return + } + removeSelf := (s.parent == nil || s.parent.level <= s.level) && (s.children == nil || s.children.level <= s.level) + if !removeSelf { + return + } + if s.parent != nil { + s.parent.children = s.children + } else { + s.worker.head = s.children + } + if s.children != nil { + s.children.parent = s.parent + } else { + s.worker.tail = s.parent + } + if s.status == snakeDone { + s.node.mu.Lock() + s.node.workers-- + s.node.mu.Unlock() + } + switch direction { + case updateParent: + if s.parent != nil { + s.parent.updateWithoutNodeLock(direction) + } + case updateChild: + if s.children != nil { + s.children.updateWithoutNodeLock(direction) + } + default: + panic("unreachable") + } + s.parent, s.children = nil, nil +} + +// dup duplicate this node of the snake and return left and right pointer. +// This allows to insert children nodes inbetween. +func (s *snake) dup() (*snake, *snake) { + if s.status != snakeDone { + panic("trying to dup an not done snake") + } + scopy := *s + s2 := &scopy + if s.children != nil { + s.children.parent = s2 + } + s2.parent, s.children = s, s2 + return s, s2 +} diff --git a/rapide/clientdriven_test.go b/rapide/clientdriven_test.go new file mode 100644 index 000000000..9b6b039c9 --- /dev/null +++ b/rapide/clientdriven_test.go @@ -0,0 +1,96 @@ +package rapide_test + +import ( + "context" + "fmt" + "sync" + "testing" + "time" + + . "github.com/ipfs/boxo/rapide" + "github.com/ipfs/go-cid" +) + +type mockClientDrivenDownloder struct { + bs *mockBlockstore + mu sync.Mutex + m map[cid.Cid]context.CancelFunc +} + +func (m *mockClientDrivenDownloder) Download(ccs ...CidCallbackPair) { + m.mu.Lock() + defer m.mu.Unlock() + + for _, d := range ccs { + ctx, cancel := context.WithCancel(context.Background()) + cid := d.Cid + cb := d.Callback + m.m[cid] = cancel + go func() { + b, err := m.bs.GetBlock(ctx, cid) + cb(b.RawData(), err) + m.mu.Lock() + delete(m.m, cid) + m.mu.Unlock() + }() + } +} + +func (m *mockClientDrivenDownloder) Cancel(cids ...cid.Cid) { + m.mu.Lock() + defer m.mu.Unlock() + + for _, c := range cids { + cancel, ok := m.m[c] + if !ok { + continue + } + cancel() + } +} + +func TestClientDrivenDownloader(t *testing.T) { + for _, tc := range [...]struct { + delay time.Duration + runners uint + width uint + depth uint + }{ + {0, 1, 2, 2}, + {time.Nanosecond, 1, 2, 2}, + {time.Microsecond, 1, 2, 2}, + {time.Millisecond, 1, 2, 2}, + } { + t.Run(fmt.Sprintf("%v %v %v %v", tc.delay, tc.runners, tc.width, tc.depth), func(t *testing.T) { + bs := &mockBlockstore{ + t: t, + delay: tc.delay, + } + var i uint64 + root := bs.makeDag(tc.width, tc.depth, &i) + + clients := make([]ClientDrivenDownloader, tc.runners) + for i := tc.runners; i != 0; { + i-- + clients[i] = &mockClientDrivenDownloder{bs: bs, m: make(map[cid.Cid]context.CancelFunc)} + } + + seen := make(map[cid.Cid]struct{}) + for b := range (&Client{ClientDrivenDownloaders: clients}).Get(context.Background(), root, bs) { + block, err := b.Get() + if err != nil { + t.Fatalf("got error from rapide: %s", err) + } + c := block.Cid() + if _, ok := bs.m[c]; !ok { + t.Fatalf("got cid not in blockstore %s", c) + } + seen[c] = struct{}{} + } + + if len(seen) != len(bs.m) { + t.Fatalf("seen less blocks than in blockstore: expected %d; got %d", len(bs.m), len(seen)) + } + }) + } +} diff --git a/rapide/rapide.go b/rapide/rapide.go index 6d76a9254..dc967b689 100644 --- a/rapide/rapide.go +++ b/rapide/rapide.go @@ -28,30 +28,62 @@ type ClosableBlockIterator interface { blocks.BlockIterator } +type ClientDrivenDownloader interface { + // Download must be asynchronous. It schedule blocks to be downloaded and + // callbacks to be called when either it failed or succeeded. + // Clients need to callback when they have blocks or error. + // In the callback either []byte != nil and error == nil or error != nil and []byte == nil. + // When a callback for a cid is called the CID is cancel regardless of the success. + // The callback is expected to be really fast and should be called synchronously. + // All callbacks are threadsafe and may be called concurrently. But consumers + // must not go out of their way to call them concurrently. Just do whatever + // your underlying impl already do. + Download(...CidCallbackPair) + + // Cancel must be asynchronous. + // It indicates that we are no longer intrested in some blocks. + // The callbacks are still allowed to be called again but that really not advised. + // It would be nice if consumers freed the callbacks in the short term after this is called. + Cancel(...cid.Cid) +} + +type CidCallbackPair struct { + Cid cid.Cid + Callback ClientDrivenCallback +} + +type ClientDrivenCallback = func([]byte, error) + // A Client is a collection of routers and protocols that can be used to do requests. type Client struct { ServerDrivenDownloaders []ServerDrivenDownloader + ClientDrivenDownloaders []ClientDrivenDownloader } func (c *Client) Get(ctx context.Context, root cid.Cid, traversal ipsl.Traversal) <-chan blocks.BlockOrError { + totalWorkers := uint(len(c.ServerDrivenDownloaders) + len(c.ClientDrivenDownloaders)) ctx, cancel := context.WithCancel(ctx) out := make(chan blocks.BlockOrError) d := &download{ out: out, ctx: ctx, cancel: cancel, - done: uint64(len(c.ServerDrivenDownloaders)), + done: uint64(totalWorkers), root: node{ state: todo, - workers: uint(len(c.ServerDrivenDownloaders)), + workers: totalWorkers, cid: root, traversal: traversal, }, - errors: make([]error, len(c.ServerDrivenDownloaders)), + errors: make([]error, totalWorkers), + } + + for i, cdd := range c.ClientDrivenDownloaders { + d.startClientDrivenWorker(cdd, &d.root, &d.errors[i]) } for i, sdd := range c.ServerDrivenDownloaders { - d.startServerDrivenWorker(ctx, sdd, &d.root, &d.errors[i]) + d.startServerDrivenWorker(ctx, sdd, &d.root, &d.errors[len(c.ClientDrivenDownloaders)+i]) } return out @@ -118,8 +150,8 @@ type node struct { state nodeState } -// expand will run the Traversal and create childrens, it must be called while holding n.mu.Mutex. -// it will unlock n.mu.Mutex +// expand will run the Traversal and create childrens, it must be called while holding n.mu. +// it will unlock n.mu. func (n *node) expand(d *download, b blocks.Block) error { if n.state != todo { panic(fmt.Sprintf("expanding a node that is not todo: %d", n.state)) @@ -179,8 +211,7 @@ func (n *node) expand(d *download, b blocks.Block) error { return nil } -// n.state - notStarted = the number of runners -type nodeState uint +type nodeState uint8 const ( _ nodeState = iota