Skip to content

Commit

Permalink
bitswap client (#856)
Browse files Browse the repository at this point in the history
* booster bitswap MVP executable (#707)

* feat(booster-bitswap): booster bitswap MVP untested

* refactor(booster-bitswap): use API for fetching blocks

* fix(deps): update deps to compile

* feat(booster-bitswap): makefile & fixes

add commands to build booster-bitswap, and very a round tripped successful fetch from
booster-bitswap

* refactor: clean up unused vars etc

* fix: booster-bitsawp - check error when creating libp2p key

* refactor(node): avoid FreeAndUnsealed method

Co-authored-by: Dirk McCormick <dirkmdev@gmail.com>
Co-authored-by: Anton Evangelatov <anton.evangelatov@gmail.com>

* booster-bitswap devnet and tracing (#796)

* return ipld ErrNotFound from remote blockstore interface (#798)

* fix: return ipld ErrNotFound from remote blockstore interface

* test: add more tests for ipld ErrNotFound

* test: comment out part of TestDummydealOnline that is flaky due to a bug in latest lotus (#802)

* fix normaliseError nil ptr dereference (#803)

* feat: shard selector (#807)

* LoadBalancer for bitswap (and later, more of libp2p) (#786)

* feat(loadbalancer): add message types

* feat(messages): add utility functions

* feat(loadbalancer): initial load balancer impl

implementation of the load balancer node itself

* feat(loadbalancer): add service node

implements code for running a service node

* feat(loadbalancer): integrate into boost and booster-bitswap

* Update loadbalancer/loadbalancer.go

Co-authored-by: Rod Vagg <rod@vagg.org>

* Update loadbalancer/servicenode.go

Co-authored-by: Rod Vagg <rod@vagg.org>

* Update loadbalancer/servicenode.go

Co-authored-by: Rod Vagg <rod@vagg.org>

* Update loadbalancer/messages/messages.ipldsch

Co-authored-by: Rod Vagg <rod@vagg.org>

* Update loadbalancer/messages/messages.ipldsch

Co-authored-by: Rod Vagg <rod@vagg.org>

* refactor(loadbalancer): remove routing protocol

remove the routing protocol, instead relying on a set config. also remove forwarding response for
inbound requests

* fix(loadbalancer): update tests

* refactor(loadbalancer): integrate simplified load balancer

removed pub keys to minimize network traffic, added api's to configure and update bitswap peer id,
added auto config of bitswap peer id in booster-bitswap

* docs(gen): regenerate api docs

* chore(lint): fix lint errors

* fix(loadbalancer): minor bridgestream fix

* Update loadbalancer/servicenode.go

Co-authored-by: dirkmc <dirkmdev@gmail.com>

* refactor(protocolproxy): address PR comments

renames, reconfigured architecture, etc

* refactor(make init print out peer id): remove apis and transparent peer id setting. have init print

Co-authored-by: Rod Vagg <rod@vagg.org>
Co-authored-by: dirkmc <dirkmdev@gmail.com>

* Add block filter via BadBits (#825)

* feat(booster-bitswap): add block filter via BadBits

* refactor(booster-bitswap): use bitswap blockfilter for filtering

* feat(blockfilter): only update when list is modified

* feat(blockFilter): add on disk caching

* Update cmd/booster-bitswap/blockfilter/blockfilter.go

Co-authored-by: dirkmc <dirkmdev@gmail.com>

* fix(blockfilter): minor PR fixups

Co-authored-by: dirkmc <dirkmdev@gmail.com>

* Libp2p 0.22 upgrade (#837)

* chore(deps): upgrade to Lotus RC & libp2p v0.22

* chore(deps): update go to 1.18

* ci(circle): update circle to go 1.18

* style(imports): fix imports

* fix(build): update ffi

* fix(lint): fix deprecated strings.Title method

* fix(mod): mod tidy

* Protocol Proxy cleanup (#836)

* refactor(booster-bitswap): minor UI fixes for booster-bitswap UI

* Update cmd/booster-bitswap/init.go

Co-authored-by: dirkmc <dirkmdev@gmail.com>

Co-authored-by: dirkmc <dirkmdev@gmail.com>

* feat: update to dagstore v0.5.5 (#849)

* feat: bitswap client

* feat: bitswap client - output car file

* refactor: bitswap client - remove tracing

* feat: debug logs

* fix: write blocks to blockstore

* fix: duration output

* fix: duration output for block received

* feat: add pprof to bitswap client

* feat: protocol proxy logging

* feat: bitswap client - check host supports bitswap protocol

* feat: listen for bitswap requests locally as well as through forwarding protocol

Co-authored-by: Hannah Howard <hannah@hannahhoward.net>
Co-authored-by: Anton Evangelatov <anton.evangelatov@gmail.com>
Co-authored-by: Rod Vagg <rod@vagg.org>
  • Loading branch information
4 people authored Oct 4, 2022
1 parent 6e0ac5c commit f0726d2
Show file tree
Hide file tree
Showing 8 changed files with 273 additions and 31 deletions.
21 changes: 21 additions & 0 deletions cmd/booster-bitswap/bitswap/bitswap.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package bitswap

import (
"github.com/ipfs/go-bitswap/network"
"github.com/libp2p/go-libp2p/core/protocol"
)

var Protocols = []protocol.ID{
network.ProtocolBitswap,
network.ProtocolBitswapNoVers,
network.ProtocolBitswapOneOne,
network.ProtocolBitswapOneZero,
}

var ProtocolStrings = []string{}

func init() {
for _, p := range Protocols {
ProtocolStrings = append(ProtocolStrings, string(p))
}
}
220 changes: 220 additions & 0 deletions cmd/booster-bitswap/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,220 @@
package main

import (
"context"
"crypto/rand"
"fmt"
"net/http"
_ "net/http/pprof"
"sort"
"sync/atomic"
"time"

"github.com/filecoin-project/boost/cmd/booster-bitswap/bitswap"
lcli "github.com/filecoin-project/lotus/cli"
"github.com/ipfs/go-bitswap/client"
bsnetwork "github.com/ipfs/go-bitswap/network"
blocks "github.com/ipfs/go-block-format"
"github.com/ipfs/go-cid"
nilrouting "github.com/ipfs/go-ipfs-routing/none"
ipldlegacy "github.com/ipfs/go-ipld-legacy"
"github.com/ipld/go-car/v2/blockstore"
"github.com/libp2p/go-libp2p"
"github.com/libp2p/go-libp2p/core/crypto"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/p2p/muxer/mplex"
"github.com/libp2p/go-libp2p/p2p/muxer/yamux"
quic "github.com/libp2p/go-libp2p/p2p/transport/quic"
"github.com/libp2p/go-libp2p/p2p/transport/tcp"
"github.com/urfave/cli/v2"
"golang.org/x/sync/errgroup"
)

var fetchCmd = &cli.Command{
Name: "fetch",
Usage: "fetch <multiaddr> <root cid> <output car path>",
Description: "Fetch all blocks in the DAG under the given root cid from the bitswap node at multiaddr",
Before: before,
Flags: []cli.Flag{
&cli.BoolFlag{
Name: "pprof",
Usage: "run pprof web server on localhost:6071",
},
&cli.IntFlag{
Name: "concurrency",
Usage: "concurrent request limit - 0 means unlimited",
Value: 10,
},
},
Action: func(cctx *cli.Context) error {
if cctx.Bool("pprof") {
go func() {
err := http.ListenAndServe("localhost:6071", nil)
if err != nil {
log.Error(err)
}
}()
}

if cctx.Args().Len() != 3 {
return fmt.Errorf("usage: fetch <multiaddr> <root cid> <output car path>")
}

addrInfoStr := cctx.Args().Get(0)
serverAddrInfo, err := peer.AddrInfoFromString(addrInfoStr)
if err != nil {
return fmt.Errorf("parsing server multiaddr %s: %w", addrInfoStr, err)
}

rootCidStr := cctx.Args().Get(1)
rootCid, err := cid.Parse(rootCidStr)
if err != nil {
return fmt.Errorf("parsing cid %s: %w", rootCidStr, err)
}

outputCarPath := cctx.Args().Get(2)

ctx := lcli.ReqContext(cctx)

// setup libp2p host
log.Infow("generating libp2p key")
privKey, _, err := crypto.GenerateECDSAKeyPair(rand.Reader)
if err != nil {
return err
}

host, err := libp2p.New(
libp2p.Transport(tcp.NewTCPTransport),
libp2p.Transport(quic.NewTransport),
libp2p.Muxer("/mplex/6.7.0", mplex.DefaultTransport),
libp2p.Muxer("/yamux/1.0.0", yamux.DefaultTransport),
libp2p.Identity(privKey),
libp2p.ResourceManager(network.NullResourceManager),
)
if err != nil {
return err
}

// Create a bitswap client
nilRouter, err := nilrouting.ConstructNilRouting(ctx, nil, nil, nil)
if err != nil {
return err
}
net := bsnetwork.NewFromIpfsHost(host, nilRouter)
bs, err := blockstore.OpenReadWrite(outputCarPath, []cid.Cid{rootCid}, blockstore.UseWholeCIDs(true))
if err != nil {
return fmt.Errorf("creating blockstore at %s: %w", outputCarPath, err)
}

ctx, cancel := context.WithCancel(ctx)
defer cancel()
brn := &blockReceiver{bs: bs, ctx: ctx, cancel: cancel}
bsClient := client.New(ctx, net, bs, client.WithBlockReceivedNotifier(brn))
defer bsClient.Close()
net.Start(bsClient)

// Connect to host
connectStart := time.Now()
log.Infow("connecting to server", "server", serverAddrInfo.String())
err = host.Connect(ctx, *serverAddrInfo)
if err != nil {
return fmt.Errorf("connecting to %s: %w", serverAddrInfo, err)
}
log.Debugw("connected to server", "duration", time.Since(connectStart).String())

// Check host's libp2p protocols
protos, err := host.Peerstore().GetProtocols(serverAddrInfo.ID)
if err != nil {
return fmt.Errorf("getting protocols from peer store for %s: %w", serverAddrInfo.ID, err)
}
sort.Slice(protos, func(i, j int) bool {
return protos[i] < protos[j]
})
log.Debugw("host libp2p protocols", "protocols", protos)
p, err := host.Peerstore().FirstSupportedProtocol(serverAddrInfo.ID, bitswap.ProtocolStrings...)
if err != nil {
return fmt.Errorf("getting first supported protocol from peer store for %s: %w", serverAddrInfo.ID, err)
}
if p == "" {
return fmt.Errorf("host %s does not support any know bitswap protocols: %s", serverAddrInfo.ID, bitswap.ProtocolStrings)
}

var throttle chan struct{}
concurrency := cctx.Int("concurrency")
if concurrency > 0 {
throttle = make(chan struct{}, concurrency)
}

// Fetch all blocks under the root cid
log.Infow("fetch", "cid", rootCid, "concurrency", concurrency)
start := time.Now()
count, size, err := getBlocks(ctx, bsClient, rootCid, throttle)
if err != nil {
return fmt.Errorf("getting blocks: %w", err)
}

log.Infow("fetch complete", "count", count, "size", size, "duration", time.Since(start).String())
log.Debug("finalizing")
finalizeStart := time.Now()
defer func() { log.Infow("finalize complete", "duration", time.Since(finalizeStart).String()) }()
return bs.Finalize()
},
}

func getBlocks(ctx context.Context, bsClient *client.Client, c cid.Cid, throttle chan struct{}) (uint64, uint64, error) {
if throttle != nil {
throttle <- struct{}{}
}
// Get the block
start := time.Now()
blk, err := bsClient.GetBlock(ctx, c)
if throttle != nil {
<-throttle
}
if err != nil {
return 0, 0, err
}

var size = uint64(len(blk.RawData()))
log.Debugw("receive", "cid", c, "size", size, "duration", time.Since(start).String())

// Read the links from the block to child nodes in the DAG
var count = uint64(1)
nd, err := ipldlegacy.DecodeNode(ctx, blk)
if err != nil {
return 0, 0, fmt.Errorf("decoding node %s: %w", c, err)
}

var eg errgroup.Group
lnks := nd.Links()
for _, l := range lnks {
l := l
// Launch a go routine to fetch the blocks underneath each link
eg.Go(func() error {
cnt, sz, err := getBlocks(ctx, bsClient, l.Cid, throttle)
if err != nil {
return err
}
atomic.AddUint64(&count, cnt)
atomic.AddUint64(&size, sz)
return nil
})
}

return count, size, eg.Wait()
}

type blockReceiver struct {
bs *blockstore.ReadWrite
ctx context.Context
cancel context.CancelFunc
}

func (b blockReceiver) ReceivedBlocks(id peer.ID, blks []blocks.Block) {
err := b.bs.PutMany(b.ctx, blks)
if err != nil {
log.Errorw("failed to write blocks to blockstore: %s", err)
b.cancel()
}
}
1 change: 1 addition & 0 deletions cmd/booster-bitswap/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ func main() {
Commands: []*cli.Command{
initCmd,
runCmd,
fetchCmd,
},
}
app.Setup()
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ require (
github.com/ipfs/go-ipfs-files v0.1.1
github.com/ipfs/go-ipfs-routing v0.2.1
github.com/ipfs/go-ipld-format v0.4.0
github.com/ipfs/go-ipld-legacy v0.1.1
github.com/ipfs/go-log/v2 v2.5.1
github.com/ipfs/go-merkledag v0.6.0
github.com/ipfs/go-metrics-interface v0.0.1
Expand Down Expand Up @@ -217,7 +218,6 @@ require (
github.com/ipfs/go-ipfs-pq v0.0.2 // indirect
github.com/ipfs/go-ipfs-util v0.0.2 // indirect
github.com/ipfs/go-ipld-cbor v0.0.6 // indirect
github.com/ipfs/go-ipld-legacy v0.1.1 // indirect
github.com/ipfs/go-ipns v0.2.0 // indirect
github.com/ipfs/go-log v1.0.5 // indirect
github.com/ipfs/go-path v0.3.0 // indirect
Expand Down
3 changes: 2 additions & 1 deletion node/impl/boost_legacy.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (
"time"

"github.com/filecoin-project/dagstore/shard"
"github.com/multiformats/go-multihash"

"github.com/filecoin-project/go-address"
datatransfer "github.com/filecoin-project/go-data-transfer"
"github.com/filecoin-project/go-fil-markets/piecestore"
Expand All @@ -18,7 +20,6 @@ import (
"github.com/filecoin-project/lotus/chain/types"
"github.com/ipfs/go-cid"
peer "github.com/libp2p/go-libp2p/core/peer"
"github.com/multiformats/go-multihash"
)

func (sm *BoostAPI) MarketListDataTransfers(ctx context.Context) ([]lapi.DataTransferChannel, error) {
Expand Down
15 changes: 4 additions & 11 deletions node/modules/retrieval.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,25 +4,18 @@ import (
"context"
"fmt"

"github.com/filecoin-project/boost/cmd/booster-bitswap/bitswap"
"github.com/filecoin-project/boost/node/config"
"github.com/filecoin-project/boost/protocolproxy"
"github.com/filecoin-project/boost/retrievalmarket/lp2pimpl"
"github.com/filecoin-project/boost/retrievalmarket/types"
"github.com/ipfs/go-bitswap/network"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/protocol"
"github.com/multiformats/go-multiaddr"
"go.uber.org/fx"
)

var bitswapProtocols = []protocol.ID{
network.ProtocolBitswap,
network.ProtocolBitswapNoVers,
network.ProtocolBitswapOneOne,
network.ProtocolBitswapOneZero,
}

func NewTransportsListener(cfg *config.Boost) func(h host.Host) (*lp2pimpl.TransportsListener, error) {
return func(h host.Host) (*lp2pimpl.TransportsListener, error) {
protos := []types.Protocol{}
Expand Down Expand Up @@ -84,7 +77,7 @@ func NewProtocolProxy(cfg *config.Boost) func(h host.Host) (*protocolproxy.Proto
if err != nil {
return nil, err
}
peerConfig[bsPeerID] = bitswapProtocols
peerConfig[bsPeerID] = bitswap.Protocols
}
return protocolproxy.NewProtocolProxy(h, peerConfig)
}
Expand All @@ -93,12 +86,12 @@ func NewProtocolProxy(cfg *config.Boost) func(h host.Host) (*protocolproxy.Proto
func HandleProtocolProxy(lc fx.Lifecycle, pp *protocolproxy.ProtocolProxy) {
lc.Append(fx.Hook{
OnStart: func(ctx context.Context) error {
log.Info("starting load balancer")
log.Info("starting libp2p protocol proxy")
pp.Start(ctx)
return nil
},
OnStop: func(context.Context) error {
log.Info("stopping load balancer")
log.Info("stopping libp2p protocol proxy")
pp.Close()
return nil
},
Expand Down
Loading

0 comments on commit f0726d2

Please sign in to comment.