Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RAPIDE & IPSL: first proof of concept #1

Draft
wants to merge 41 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
1e8e000
ipsl: compiler and mesh for ipsl structure
Jorropo Jan 18, 2023
f6c9996
ipsl: add scope test
Jorropo Jan 18, 2023
6a63973
ipsl: remove type field from SomeNode and use reflection on Node instead
Jorropo Jan 18, 2023
1884a90
ipsl: add empty builtin
Jorropo Jan 25, 2023
57506e3
ipsl/helpers: add SyncDFS helper
Jorropo Jan 25, 2023
205c783
ipsl/unixfs: add Everything node
Jorropo Jan 25, 2023
ddb1659
ipsl: add a comment to the compiler tests
Jorropo Jan 26, 2023
cfc7683
ipsl/helpers: test ipsl.All node in SyncDFS test
Jorropo Jan 26, 2023
eeff501
ipsl: use blocks interface
Jorropo Feb 3, 2023
fa62495
ipsl: return UnexpectedEOF with broken nodes
Jorropo Jan 27, 2023
3888323
ipsl: add support for string literal
Jorropo Jan 27, 2023
30b9a4e
ipsl: add None support
Jorropo Jan 27, 2023
9e8378d
ipsl: add support for load-builtin-scope
Jorropo Jan 27, 2023
6017588
ipsl: rename compileNodeWithoutClosure to compileNextNodeWithoutTermi…
Jorropo Jan 30, 2023
de4c8a0
ipsl: add unixfs scope object
Jorropo Jan 30, 2023
3e88f45
ipsl: make CompileEmpty and CompileAll private
Jorropo Jan 31, 2023
816882f
blocks: add BlockIterator
Jorropo Jan 31, 2023
a8a8e67
blocks: add BlockOrError
Jorropo Jan 31, 2023
2d65ba1
ipsl: make all run the child traversals
Jorropo Jan 31, 2023
1c48da4
rapide: initial commit
Jorropo Jan 31, 2023
a8d2fde
rapide: fix termination logic
Jorropo Jan 31, 2023
8b16656
rapide/gateway: change User-Agent to RAPIDE
Jorropo Jan 31, 2023
a0e2170
rapide: refactor serverDrivenWork.work to not be hot garbage
Jorropo Jan 31, 2023
646cb0f
rapide: fully remove workers from the tree when they fail
Jorropo Jan 31, 2023
478da8a
rapide: gc traversal objects in expand
Jorropo Jan 31, 2023
054c641
rapide: return worker errors when all workers died
Jorropo Jan 31, 2023
4b5090b
rapide/gateway: implement fmt.Stringer with PathName
Jorropo Jan 31, 2023
f83f676
rapide: disable eager check for len(w.tasks) == 0 in the server worker
Jorropo Jan 31, 2023
9e04022
rapide: randomly distribute equivalent walks
Jorropo Feb 1, 2023
6a5a03d
rapide: add make use of io.Closer for BlockIterators
Jorropo Feb 1, 2023
960ddf5
rapide: fix leaking worker count when we reset after duplicated block
Jorropo Feb 2, 2023
9eb4ff4
rapide: add tests and fix race issues
Jorropo Feb 2, 2023
fcacbe9
ipsl: remove the compiler and language
Jorropo Feb 3, 2023
f442ca1
rapide: fix: make unexpected blocks a retriable error
Jorropo Feb 3, 2023
758929c
examples/rapido: add example on how to use rapide
Jorropo Feb 3, 2023
55ce02d
rapide: remove useless FIXME
Jorropo Feb 5, 2023
751763d
rapide&ipsl: bring up to speed with the Über migration
Jorropo Mar 23, 2023
c1184d1
rapide: remove random source bookkeeping and use mrand.Int
Jorropo Mar 23, 2023
5f374bc
rapide: first support for client driven downloads
Jorropo Mar 7, 2023
66fd3c8
rapide: make the client code cleaner with happy left
Jorropo Apr 14, 2023
33c59f9
rapide: run tests in parallel
Jorropo Apr 14, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions blocks/blocks.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,10 @@ type Block interface {
Loggable() map[string]interface{}
}

type BlockIterator interface {
Next() (Block, error)
}

// A BasicBlock is a singular block of data in ipfs. It implements the Block
// interface.
type BasicBlock struct {
Expand Down Expand Up @@ -80,3 +84,20 @@ func (b *BasicBlock) Loggable() map[string]interface{} {
"block": b.Cid().String(),
}
}

// BlockOrError is either an error or a block.
type BlockOrError struct {
block Block
err error
}

func Is(b Block) BlockOrError {
return BlockOrError{block: b}
}
func IsNot(err error) BlockOrError {
return BlockOrError{err: err}
}

func (boe BlockOrError) Get() (Block, error) {
return boe.block, boe.err
}
4 changes: 4 additions & 0 deletions docs/CODEOWNERS
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,7 @@

# HTTP Gateway
gateway/ @lidel @hacdias

# IPSL
ipsl/* @Jorropo
rapide/* @Jorropo
11 changes: 11 additions & 0 deletions examples/rapido/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
# RAPIDO

RAPIDO is an example CLI which download an unordered `.car` for some unixfs data from multiple gateways using [`rapide`](../../rapide).

This code is not maintained up to "production ready" standars, this is an example demonstrating how to use RAPIDE.

## Usage

```console
$ rapido -gw https://ipfs.io/ipfs/ -gw https://strn.pl/ipfs/ QmT2EHPdRvUDxiuZBbYg5ZHy1f8L6MY1HxD45zHycLobMJ | pv > 2023-02-03.ipfs.io.car
```
83 changes: 83 additions & 0 deletions examples/rapido/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
package main

import (
"context"
"flag"
"fmt"
"os"
"strings"

"github.com/ipfs/boxo/ipld/car"
"github.com/ipfs/boxo/ipld/car/util"
"github.com/ipfs/boxo/ipsl/unixfs"
"github.com/ipfs/boxo/rapide"
"github.com/ipfs/boxo/rapide/gateway"
"github.com/ipfs/go-cid"
)

type stringFlagSlice []string

func (sfs *stringFlagSlice) String() string {
return strings.Join(*sfs, ",")
}

func (sfs *stringFlagSlice) Set(s string) error {
*sfs = append(*sfs, s)
return nil
}

func main() {
err := mainRet()
if err != nil {
os.Stderr.WriteString(err.Error())
os.Exit(1)
}
os.Exit(0)
}

func mainRet() error {
// Argument Parsing
var gateways stringFlagSlice
flag.Var(&gateways, "gw", `Set once to add a gateway, setable multiple times to use multiple gateways. Format expected is "-gw https://ipfs.io/ipfs/" for example.`)
flag.Parse()

cidStrs := flag.Args()
if len(cidStrs) != 1 {
return fmt.Errorf("expected one CID as positional argument; got %d", len(cidStrs))
}

root, err := cid.Decode(cidStrs[0])
if err != nil {
return fmt.Errorf("decoding CID: %w", err)
}

// Setup header for the output car
err = car.WriteHeader(&car.CarHeader{
Roots: []cid.Cid{root},
Version: 1,
}, os.Stdout)
if err != nil {
return fmt.Errorf("writing car header: %w", err)
}

// configure rapide
downloaders := make([]rapide.ServerDrivenDownloader, len(gateways)) // create a slice holding our multiple gateways
for i, g := range gateways {
downloaders[i] = gateway.Gateway{PathName: g} // create a gateway protocol implementation
}
client := rapide.Client{ServerDrivenDownloaders: downloaders}

// do request and iterate over the resulting blocks, rapide.(*Client).Get returns a channel
for maybeBlock := range client.Get(context.Background(), root, unixfs.Everything()) {
block, err := maybeBlock.Get() // block or error ?
if err != nil {
return fmt.Errorf("downloading: %w", err)
}
err = util.LdWrite(os.Stdout, block.Cid().Bytes(), block.RawData()) // write to the output car
if err != nil {
return fmt.Errorf("writing to output car: %w", err)
}
}

return nil
}
51 changes: 51 additions & 0 deletions ipsl/helpers/helpers.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
// helpers package have some utility functions that do recursive traversals on ipsl traversal objects.
// This aims to be easy to use. You are not meant to use this in all cases, calling .Traverse on Traversal
// objects gives more control. And can allow you to write faster code.
package helpers

import (
"context"
"errors"
"fmt"

"github.com/ipfs/boxo/blocks"
"github.com/ipfs/boxo/blockservice"
"github.com/ipfs/boxo/ipsl"
"github.com/ipfs/go-cid"
)

var ErrDepthLimitReached = errors.New("safety depth limit reached")

// SyncDFS perform a synchronous recursive depth-first-search.
// It will return [ErrDepthLimitReached] when the safetyDepthLimit is reached.
// It will wrap errors returned by the call back, so use [errors.Is] to test them.
func SyncDFS(ctx context.Context, c cid.Cid, t ipsl.Traversal, getter blockservice.BlockGetter, safetyDepthLimit uint, callBack func(blocks.Block) error) error {
if safetyDepthLimit == 0 {
return ErrDepthLimitReached
}
safetyDepthLimit--

block, err := getter.GetBlock(ctx, c)
if err != nil {
return fmt.Errorf("GetBlock: %w", err)
}

err = callBack(block)
if err != nil {
return fmt.Errorf("callBack: %w", err)
}

pairs, err := t.Traverse(block)
if err != nil {
return fmt.Errorf("Traversal.Traverse: %w", err)
}

for _, p := range pairs {
err := SyncDFS(ctx, p.Cid, p.Traversal, getter, safetyDepthLimit, callBack)
if err != nil {
return err
}
}

return nil
}
156 changes: 156 additions & 0 deletions ipsl/helpers/helpers_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
package helpers_test

import (
"bytes"
"context"
"fmt"
"testing"

"github.com/ipfs/go-cid"
"github.com/ipfs/boxo/blocks"
"github.com/ipfs/boxo/ipsl"
. "github.com/ipfs/boxo/ipsl/helpers"
"github.com/multiformats/go-multihash"
"golang.org/x/exp/slices"
)

type mockTraversal struct {
t *testing.T
expectedCid cid.Cid
expectedData []byte
results []ipsl.CidTraversalPair
}

func (n mockTraversal) Traverse(b blocks.Block) ([]ipsl.CidTraversalPair, error) {
var bad bool
if data := b.RawData(); !bytes.Equal(data, n.expectedData) {
n.t.Errorf("got wrong bytes in Traverse: expected %#v; got %#v", n.expectedData, data)
bad = true
}
if c := b.Cid(); !c.Equals(n.expectedCid) {
n.t.Errorf("got wrong cid: expected %v; got %v", n.expectedCid, c)
bad = true
}
if bad {
return []ipsl.CidTraversalPair{}, nil
}

return n.results, nil
}

type mockByteBlockGetter map[cid.Cid][]byte

func (g mockByteBlockGetter) GetBlock(_ context.Context, c cid.Cid) (blocks.Block, error) {
b, ok := g[c]
if !ok {
panic(fmt.Sprintf("missing block requested %v", c))
}
return blocks.NewBlockWithCid(b, c)
}

func (g mockByteBlockGetter) GetBlocks(ctx context.Context, cids []cid.Cid) <-chan blocks.Block {
r := make(chan blocks.Block, len(cids))
defer close(r)

for _, c := range cids {
b, err := g.GetBlock(ctx, c)
if err != nil {
continue
}
r <- b
}

return r
}

func TestSyncDFS(t *testing.T) {
ctx := context.Background()

pref := cid.Prefix{
Version: 1,
Codec: cid.Raw,
MhType: multihash.SHA2_256,
MhLength: 32,
}

// root1 -> {leaf1, root2 -> {leaf2, leaf3}, leaf4}

root1 := []byte("root1")
root1Cid, err := pref.Sum(root1)
if err != nil {
t.Fatalf("hashing: %v", err)
}
leaf1 := []byte("leaf1")
leaf1Cid, err := pref.Sum(leaf1)
if err != nil {
t.Fatalf("hashing: %v", err)
}
root2 := []byte("root2")
root2Cid, err := pref.Sum(root2)
if err != nil {
t.Fatalf("hashing: %v", err)
}
leaf2 := []byte("leaf2")
leaf2Cid, err := pref.Sum(leaf2)
if err != nil {
t.Fatalf("hashing: %v", err)
}
leaf3 := []byte("leaf3")
leaf3Cid, err := pref.Sum(leaf3)
if err != nil {
t.Fatalf("hashing: %v", err)
}
leaf4 := []byte("leaf4")
leaf4Cid, err := pref.Sum(leaf4)
if err != nil {
t.Fatalf("hashing: %v", err)
}

getter := mockByteBlockGetter{
root1Cid: root1,
leaf1Cid: leaf1,
root2Cid: root2,
leaf2Cid: leaf2,
leaf3Cid: leaf3,
leaf4Cid: leaf4,
}

traversal := mockTraversal{t, root1Cid, root1, []ipsl.CidTraversalPair{
{Cid: leaf1Cid, Traversal: mockTraversal{t, leaf1Cid, leaf1, nil}},
{Cid: root2Cid, Traversal: mockTraversal{t, root2Cid, root2, []ipsl.CidTraversalPair{
{Cid: leaf2Cid, Traversal: ipsl.All(
mockTraversal{t, leaf2Cid, leaf2, nil},
mockTraversal{t, leaf2Cid, leaf2, nil},
)},
{Cid: leaf3Cid, Traversal: mockTraversal{t, leaf3Cid, leaf3, nil}},
}}},
{Cid: leaf4Cid, Traversal: mockTraversal{t, leaf4Cid, leaf4, nil}},
}}

var result []cid.Cid
err = SyncDFS(ctx, root1Cid, traversal, getter, 10, func(b blocks.Block) error {
c := b.Cid()
if realBytes, data := getter[c], b.RawData(); !bytes.Equal(data, realBytes) {
t.Errorf("got wrong bytes in callBack: expected %#v; got %#v", realBytes, data)
}

result = append(result, c)

return nil
})
if err != nil {
t.Fatalf("SyncDFS: %s", err)
}

expectedOrder := []cid.Cid{
root1Cid,
leaf1Cid,
root2Cid,
leaf2Cid,
leaf3Cid,
leaf4Cid,
}
if !slices.Equal(result, expectedOrder) {
t.Errorf("bad traversal order: expected: %v; got %v", expectedOrder, result)
}
}
49 changes: 49 additions & 0 deletions ipsl/ipsl.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package ipsl

import (
"github.com/ipfs/boxo/blocks"
"github.com/ipfs/go-cid"
)

type CidTraversalPair struct {
Cid cid.Cid
Traversal Traversal
}

type Traversal interface {
// Traverse must never be called with bytes not matching the cid.
// The bytes must never be modified by the implementations.
Traverse(blocks.Block) ([]CidTraversalPair, error)
}

// An AllNode traverse all the traversals with the same cid it is given to.
type AllNode struct {
Traversals []Traversal
}

func All(traversals ...Traversal) Traversal {
return AllNode{traversals}
}

func (n AllNode) Traverse(b blocks.Block) ([]CidTraversalPair, error) {
var results []CidTraversalPair
for _, t := range n.Traversals {
r, err := t.Traverse(b)
if err != nil {
return nil, err
}
results = append(results, r...)
}
return results, nil
}

// EmptyTraversal is a traversal that always returns nothing
type EmptyTraversal struct{}

func Empty() Traversal {
return EmptyTraversal{}
}

func (c EmptyTraversal) Traverse(_ blocks.Block) ([]CidTraversalPair, error) {
return []CidTraversalPair{}, nil
}
Binary file added ipsl/unixfs/testdata/small-tree.car
Binary file not shown.
Loading