Skip to content

Commit

Permalink
booster bitswap MVP executable (#707)
Browse files Browse the repository at this point in the history
* feat(booster-bitswap): booster bitswap MVP untested

* refactor(booster-bitswap): use API for fetching blocks

* fix(deps): update deps to compile

* feat(booster-bitswap): makefile & fixes

add commands to build booster-bitswap, and very a round tripped successful fetch from
booster-bitswap

* refactor: clean up unused vars etc

* fix: booster-bitsawp - check error when creating libp2p key

* refactor(node): avoid FreeAndUnsealed method

Co-authored-by: Dirk McCormick <dirkmdev@gmail.com>
Co-authored-by: Anton Evangelatov <anton.evangelatov@gmail.com>
  • Loading branch information
3 people committed Sep 13, 2022
1 parent 847ccc9 commit 85b7358
Show file tree
Hide file tree
Showing 18 changed files with 681 additions and 94 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
/boostd
/devnet
/booster-http
/booster-bitswap
/docgen-md
/docgen-openrpc
extern/filecoin-ffi/rust/target
Expand Down
6 changes: 6 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,12 @@ booster-http: $(BUILD_DEPS)
.PHONY: booster-http
BINS+=booster-http

booster-bitswap: $(BUILD_DEPS)
rm -f booster-bitswap
$(GOCC) build $(GOFLAGS) -o booster-bitswap ./cmd/booster-bitswap
.PHONY: booster-bitswap
BINS+=booster-bitswap

devnet: $(BUILD_DEPS)
rm -f devnet
$(GOCC) build $(GOFLAGS) -o devnet ./cmd/devnet
Expand Down
5 changes: 5 additions & 0 deletions api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,11 @@ type Boost interface {
BoostDagstorePiecesContainingMultihash(ctx context.Context, mh multihash.Multihash) ([]cid.Cid, error) //perm:read
BoostDagstoreListShards(ctx context.Context) ([]DagstoreShardInfo, error) //perm:read

// MethodGroup: Blockstore
BlockstoreGet(ctx context.Context, c cid.Cid) ([]byte, error) //perm:read
BlockstoreHas(ctx context.Context, c cid.Cid) (bool, error) //perm:read
BlockstoreGetSize(ctx context.Context, c cid.Cid) (int, error) //perm:read

// RuntimeSubsystems returns the subsystems that are enabled
// in this instance.
RuntimeSubsystems(ctx context.Context) (lapi.MinerSubsystems, error) //perm:read
Expand Down
39 changes: 39 additions & 0 deletions api/proxy_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Binary file modified build/openrpc/boost.json.gz
Binary file not shown.
42 changes: 42 additions & 0 deletions cmd/booster-bitswap/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package main

import (
"os"

"github.com/filecoin-project/boost/build"
cliutil "github.com/filecoin-project/boost/cli/util"
logging "github.com/ipfs/go-log/v2"
"github.com/urfave/cli/v2"
)

var log = logging.Logger("booster")

func main() {
app := &cli.App{
Name: "booster-bitswap",
Usage: "Bitswap endpoint for retrieval from Filecoin",
EnableBashCompletion: true,
Version: build.UserVersion(),
Flags: []cli.Flag{
cliutil.FlagVeryVerbose,
},
Commands: []*cli.Command{
runCmd,
},
}
app.Setup()

if err := app.Run(os.Args); err != nil {
os.Stderr.WriteString("Error: " + err.Error() + "\n")
}
}

func before(cctx *cli.Context) error {
_ = logging.SetLogLevel("booster", "INFO")

if cliutil.IsVeryVerbose {
_ = logging.SetLogLevel("booster", "DEBUG")
}

return nil
}
72 changes: 72 additions & 0 deletions cmd/booster-bitswap/remoteblockstore/remoteblockstore.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
package remoteblockstore

import (
"context"
"errors"

blocks "github.com/ipfs/go-block-format"
logging "github.com/ipfs/go-log/v2"

"github.com/ipfs/go-cid"
blockstore "github.com/ipfs/go-ipfs-blockstore"
)

var log = logging.Logger("remote-blockstore")

var _ blockstore.Blockstore = (*RemoteBlockstore)(nil)

type RemoteBlockstoreAPI interface {
BlockstoreGet(ctx context.Context, c cid.Cid) ([]byte, error)
BlockstoreHas(ctx context.Context, c cid.Cid) (bool, error)
BlockstoreGetSize(ctx context.Context, c cid.Cid) (int, error)
}

// RemoteBlockstore is a read-only blockstore over all cids across all pieces on a provider.
type RemoteBlockstore struct {
api RemoteBlockstoreAPI
}

func NewRemoteBlockstore(api RemoteBlockstoreAPI) blockstore.Blockstore {
return &RemoteBlockstore{
api: api,
}
}

func (ro *RemoteBlockstore) Get(ctx context.Context, c cid.Cid) (b blocks.Block, err error) {
log.Debugw("Get", "cid", c)
data, err := ro.api.BlockstoreGet(ctx, c)
log.Debugw("Get response", "cid", c, "error", err)
if err != nil {
return nil, err
}
return blocks.NewBlockWithCid(data, c)
}

func (ro *RemoteBlockstore) Has(ctx context.Context, c cid.Cid) (bool, error) {
log.Debugw("Has", "cid", c)
has, err := ro.api.BlockstoreHas(ctx, c)
log.Debugw("Has response", "cid", c, "has", has, "error", err)
return has, err
}

func (ro *RemoteBlockstore) GetSize(ctx context.Context, c cid.Cid) (int, error) {
log.Debugw("GetSize", "cid", c)
size, err := ro.api.BlockstoreGetSize(ctx, c)
log.Debugw("GetSize response", "cid", c, "size", size, "error", err)
return size, err
}

// --- UNSUPPORTED BLOCKSTORE METHODS -------
func (ro *RemoteBlockstore) DeleteBlock(context.Context, cid.Cid) error {
return errors.New("unsupported operation DeleteBlock")
}
func (ro *RemoteBlockstore) HashOnRead(_ bool) {}
func (ro *RemoteBlockstore) Put(context.Context, blocks.Block) error {
return errors.New("unsupported operation Put")
}
func (ro *RemoteBlockstore) PutMany(context.Context, []blocks.Block) error {
return errors.New("unsupported operation PutMany")
}
func (ro *RemoteBlockstore) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) {
return nil, errors.New("unsupported operation AllKeysChan")
}
102 changes: 102 additions & 0 deletions cmd/booster-bitswap/run.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
package main

import (
"context"
"fmt"
"net/http"
_ "net/http/pprof"
"strings"

"github.com/filecoin-project/boost/api"
bclient "github.com/filecoin-project/boost/api/client"
cliutil "github.com/filecoin-project/boost/cli/util"
"github.com/filecoin-project/boost/cmd/booster-bitswap/remoteblockstore"
"github.com/filecoin-project/go-jsonrpc"
lcli "github.com/filecoin-project/lotus/cli"
"github.com/urfave/cli/v2"
)

var runCmd = &cli.Command{
Name: "run",
Usage: "Start a booster-bitswap process",
Before: before,
Flags: []cli.Flag{
&cli.BoolFlag{
Name: "pprof",
Usage: "run pprof web server on localhost:6070",
},
&cli.UintFlag{
Name: "port",
Usage: "the port to listen for bitswap requests on",
Value: 8888,
},
&cli.StringFlag{
Name: "api-boost",
Usage: "the endpoint for the boost API",
Required: true,
},
},
Action: func(cctx *cli.Context) error {
if cctx.Bool("pprof") {
go func() {
err := http.ListenAndServe("localhost:6070", nil)
if err != nil {
log.Error(err)
}
}()
}

// Connect to the Boost API
ctx := lcli.ReqContext(cctx)
boostAPIInfo := cctx.String("api-boost")
bapi, bcloser, err := getBoostAPI(ctx, boostAPIInfo)
if err != nil {
return fmt.Errorf("getting boost API: %w", err)
}
defer bcloser()

remoteStore := remoteblockstore.NewRemoteBlockstore(bapi)
// Create the server API
port := cctx.Int("port")
server := NewBitswapServer(port, remoteStore)

// Start the server
log.Infof("Starting booster-bitswap node on port %d", port)
err = server.Start(ctx)
if err != nil {
return err
}
// Monitor for shutdown.
<-ctx.Done()

log.Info("Shutting down...")

err = server.Stop()
if err != nil {
return err
}
log.Info("Graceful shutdown successful")

// Sync all loggers.
_ = log.Sync() //nolint:errcheck

return nil
},
}

func getBoostAPI(ctx context.Context, ai string) (api.Boost, jsonrpc.ClientCloser, error) {
ai = strings.TrimPrefix(strings.TrimSpace(ai), "BOOST_API_INFO=")
info := cliutil.ParseApiInfo(ai)
addr, err := info.DialArgs("v0")
if err != nil {
return nil, nil, fmt.Errorf("could not get DialArgs: %w", err)
}

log.Infof("Using boost API at %s", addr)
api, closer, err := bclient.NewBoostRPCV0(ctx, addr, info.AuthHeader())
if err != nil {
return nil, nil, fmt.Errorf("creating full node service API: %w", err)
}

return api, closer, nil
}
75 changes: 75 additions & 0 deletions cmd/booster-bitswap/server.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
package main

import (
"context"
"crypto/rand"
"fmt"

bsnetwork "github.com/ipfs/go-bitswap/network"
"github.com/ipfs/go-bitswap/server"
blockstore "github.com/ipfs/go-ipfs-blockstore"
nilrouting "github.com/ipfs/go-ipfs-routing/none"
"github.com/libp2p/go-libp2p"
crypto "github.com/libp2p/go-libp2p-core/crypto"
"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p/p2p/muxer/mplex"
"github.com/libp2p/go-libp2p/p2p/muxer/yamux"
quic "github.com/libp2p/go-libp2p/p2p/transport/quic"
"github.com/libp2p/go-libp2p/p2p/transport/tcp"
)

type BitswapServer struct {
port int
remoteStore blockstore.Blockstore

ctx context.Context
cancel context.CancelFunc
server *server.Server
}

func NewBitswapServer(port int, remoteStore blockstore.Blockstore) *BitswapServer {
return &BitswapServer{port: port, remoteStore: remoteStore}
}

func (s *BitswapServer) Start(ctx context.Context) error {
s.ctx, s.cancel = context.WithCancel(ctx)
// setup libp2p host
privKey, _, err := crypto.GenerateECDSAKeyPair(rand.Reader)
if err != nil {
return err
}

host, err := libp2p.New(
libp2p.ListenAddrStrings(
fmt.Sprintf("/ip4/0.0.0.0/tcp/%d", s.port),
fmt.Sprintf("/ip4/0.0.0.0/udp/%d/quic", s.port),
),
libp2p.Transport(tcp.NewTCPTransport),
libp2p.Transport(quic.NewTransport),
libp2p.Muxer("/mplex/6.7.0", mplex.DefaultTransport),
libp2p.Muxer("/yamux/1.0.0", yamux.DefaultTransport),
libp2p.Identity(privKey),
libp2p.ResourceManager(network.NullResourceManager),
)
if err != nil {
return err
}

// start a bitswap session on the provider
nilRouter, err := nilrouting.ConstructNilRouting(ctx, nil, nil, nil)
if err != nil {
return err
}
bsopts := []server.Option{server.MaxOutstandingBytesPerPeer(1 << 20)}
net := bsnetwork.NewFromIpfsHost(host, nilRouter)
s.server = server.New(ctx, net, s.remoteStore, bsopts...)
net.Start(s.server)

log.Infow("bitswap server running", "multiaddrs", host.Addrs(), "peerId", host.ID())
return nil
}

func (s *BitswapServer) Stop() error {
s.cancel()
return s.server.Close()
}
Loading

0 comments on commit 85b7358

Please sign in to comment.