Skip to content

Commit

Permalink
rapide: first support for client driven downloads
Browse files Browse the repository at this point in the history
It lacks many features mainly backtracking and rescan.
  • Loading branch information
Jorropo committed Apr 14, 2023
1 parent c1184d1 commit 5f374bc
Show file tree
Hide file tree
Showing 3 changed files with 397 additions and 8 deletions.
262 changes: 262 additions & 0 deletions rapide/clientdriven.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,262 @@
package rapide

import (
"io"
"sync"

"github.com/ipfs/boxo/blocks"
"github.com/ipfs/go-cid"
)

// optimize for 2MiB blocks at 10Gbit/s throughput and 200ms one way latency:
// 10Gbit/s * 200ms / 2MiB = 119.2; then round up to the nearest power of two
const targetParallelBlockDownloads = 128

type clientDrivenWorker struct {
impl ClientDrivenDownloader
dl *download
outErr *error
mu sync.Mutex

// len counts the number of non done nodes in the snake
len uint
head *snake
tail *snake

// TODO: add a dontGoThere map which tells you what part of the dag this node is not able to handle
}

func (d *download) startClientDrivenWorker(impl ClientDrivenDownloader, start *node, outErr *error) {
w := &clientDrivenWorker{
impl: impl,
dl: d,
outErr: outErr,
len: 1,
}

root := &snake{
worker: w,
node: start,
}

w.head = root
w.tail = root

impl.Download(CidCallbackPair{start.cid, root.callback})
}

// err must be called while holding w.mu.
func (w *clientDrivenWorker) err(err error) {
if err == io.EOF {
w.dl.workerFinished()
} else {
*w.outErr = err
w.dl.workerErrored()
}

toCancel := make([]cid.Cid, w.len)
for i, p := 0, w.head; p != nil; i, p = i+1, p.children {
if p.status != snakeTodo {
continue
}

p.status = snakeDone
toCancel[i] = p.node.cid
}
w.len = 0
w.impl.Cancel(toCancel...)
}

type snakeStatus uint8

const (
snakeTodo snakeStatus = iota
snakeDone
snakeDuped
)

type snake struct {
worker *clientDrivenWorker
parent *snake
children *snake
node *node
// level indicates how deep a node is in the tree
level uint
status snakeStatus
}

func (s *snake) callback(data []byte, err error) {
w := s.worker
w.mu.Lock()
defer w.mu.Unlock()
if s.status > snakeTodo {
// we already canceled this snake, do nothing
return
}
w.len--
s.status = snakeDone
n := s.node

if err != nil {
// TODO: handle ErrNotFound
goto Errr
}

n.mu.Lock()
if n.state == todo {
var block blocks.Block
block, err = blocks.NewBlockWithCid(data, n.cid)
if err != nil {
goto Errr
}
err = n.expand(w.dl, block)
n.mu.Lock()
if err != nil {
goto Errr
}

newBlocksWanted := uint(len(n.childrens))
if remainingSpace := targetParallelBlockDownloads - w.len; newBlocksWanted > remainingSpace {
newBlocksWanted = remainingSpace
}
w.len += newBlocksWanted
if newBlocksWanted != 0 {
downloads := make([]CidCallbackPair, newBlocksWanted)
// TODO: select blocks randomly within the children
left, right := s.dup()
for i := range downloads {
child := n.childrens[i]
child.mu.Lock()
child.workers++
child.mu.Unlock()
ns := &snake{
worker: s.worker,
node: child,
parent: left,
level: s.level + 1,
}
left.children, left = ns, ns
downloads[i] = CidCallbackPair{child.cid, ns.callback}
}
left.children, right.parent = right, left
s.update()
// TODO: if we havn't found enough blocks to download, try in other nodes of the snake or try backtracking from the head.
s.worker.impl.Download(downloads...)
} else {
s.update()
}
select {
case w.dl.out <- blocks.Is(block):
case <-w.dl.ctx.Done():
err = w.dl.ctx.Err()
goto Errr
}
return
}
// duplicated block
s.update()
return

Errr:
s.update()
w.err(err)
}

// update checks if this node should be removed from the snake and do so if needed. It will update the metric if needed.
// It must be called while holding s.worker.mu and s.node.mu, it will unlock s.node.mu.
func (s *snake) update() {
if s.status == snakeTodo {
s.node.mu.Unlock()
return
}
removeSelf := (s.parent == nil || s.parent.level <= s.level) && (s.children == nil || s.children.level <= s.level)
if !removeSelf {
s.node.mu.Unlock()
return
}
if s.parent != nil {
s.parent.children = s.children
} else {
s.worker.head = s.children
}
if s.children != nil {
s.children.parent = s.parent
} else {
s.worker.tail = s.parent
}
if s.status == snakeDone {
s.node.workers--
}
s.node.mu.Unlock()
if s.parent != nil {
s.parent.updateWithoutNodeLock(updateParent)
s.parent = nil
}
if s.children != nil {
s.children.updateWithoutNodeLock(updateChild)
s.children = nil
}
}

type updateDirection uint8

const (
_ updateDirection = iota
updateParent
updateChild
)

// updateWithoutNodeLock is like update but it doesn't require to hold s.node.mu.
// It must be called while holding s.worker.mu.
func (s *snake) updateWithoutNodeLock(direction updateDirection) {
if s.status == snakeTodo {
return
}
removeSelf := (s.parent == nil || s.parent.level <= s.level) && (s.children == nil || s.children.level <= s.level)
if !removeSelf {
return
}
if s.parent != nil {
s.parent.children = s.children
} else {
s.worker.head = s.children
}
if s.children != nil {
s.children.parent = s.parent
} else {
s.worker.tail = s.parent
}
if s.status == snakeDone {
s.node.mu.Lock()
s.node.workers--
s.node.mu.Unlock()
}
switch direction {
case updateParent:
if s.parent != nil {
s.parent.updateWithoutNodeLock(direction)
}
case updateChild:
if s.children != nil {
s.children.updateWithoutNodeLock(direction)
}
default:
panic("unreachable")
}
s.parent, s.children = nil, nil
}

// dup duplicate this node of the snake and return left and right pointer.
// This allows to insert children nodes inbetween.
func (s *snake) dup() (*snake, *snake) {
if s.status != snakeDone {
panic("trying to dup an not done snake")
}
scopy := *s
s2 := &scopy
if s.children != nil {
s.children.parent = s2
}
s2.parent, s.children = s, s2
return s, s2
}
96 changes: 96 additions & 0 deletions rapide/clientdriven_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
package rapide_test

import (
"context"
"fmt"
"sync"
"testing"
"time"

. "github.com/ipfs/boxo/rapide"
"github.com/ipfs/go-cid"
)

type mockClientDrivenDownloder struct {
bs *mockBlockstore
mu sync.Mutex
m map[cid.Cid]context.CancelFunc
}

func (m *mockClientDrivenDownloder) Download(ccs ...CidCallbackPair) {
m.mu.Lock()
defer m.mu.Unlock()

for _, d := range ccs {
ctx, cancel := context.WithCancel(context.Background())
cid := d.Cid
cb := d.Callback
m.m[cid] = cancel
go func() {
b, err := m.bs.GetBlock(ctx, cid)
cb(b.RawData(), err)
m.mu.Lock()
delete(m.m, cid)
m.mu.Unlock()
}()
}
}

func (m *mockClientDrivenDownloder) Cancel(cids ...cid.Cid) {
m.mu.Lock()
defer m.mu.Unlock()

for _, c := range cids {
cancel, ok := m.m[c]
if !ok {
continue
}
cancel()
}
}

func TestClientDrivenDownloader(t *testing.T) {
for _, tc := range [...]struct {
delay time.Duration
runners uint
width uint
depth uint
}{
{0, 1, 2, 2},
{time.Nanosecond, 1, 2, 2},
{time.Microsecond, 1, 2, 2},
{time.Millisecond, 1, 2, 2},
} {
t.Run(fmt.Sprintf("%v %v %v %v", tc.delay, tc.runners, tc.width, tc.depth), func(t *testing.T) {
bs := &mockBlockstore{
t: t,
delay: tc.delay,
}
var i uint64
root := bs.makeDag(tc.width, tc.depth, &i)

clients := make([]ClientDrivenDownloader, tc.runners)
for i := tc.runners; i != 0; {
i--
clients[i] = &mockClientDrivenDownloder{bs: bs, m: make(map[cid.Cid]context.CancelFunc)}
}

seen := make(map[cid.Cid]struct{})
for b := range (&Client{ClientDrivenDownloaders: clients}).Get(context.Background(), root, bs) {
block, err := b.Get()
if err != nil {
t.Fatalf("got error from rapide: %s", err)
}
c := block.Cid()
if _, ok := bs.m[c]; !ok {
t.Fatalf("got cid not in blockstore %s", c)
}
seen[c] = struct{}{}
}

if len(seen) != len(bs.m) {
t.Fatalf("seen less blocks than in blockstore: expected %d; got %d", len(bs.m), len(seen))
}
})
}
}
Loading

0 comments on commit 5f374bc

Please sign in to comment.