From b1046dd6d5758962a13889eb61044c6f5dd68d90 Mon Sep 17 00:00:00 2001 From: Adin Schmahmann Date: Fri, 31 Mar 2023 09:02:18 -0400 Subject: [PATCH] refactor(examples): proxy gateway to use exchange --- examples/gateway/proxy/blockstore.go | 104 +++++++++++++-------------- examples/gateway/proxy/main.go | 15 ++-- examples/gateway/proxy/main_test.go | 9 ++- 3 files changed, 66 insertions(+), 62 deletions(-) diff --git a/examples/gateway/proxy/blockstore.go b/examples/gateway/proxy/blockstore.go index 4394aa4e0..a8508a758 100644 --- a/examples/gateway/proxy/blockstore.go +++ b/examples/gateway/proxy/blockstore.go @@ -2,47 +2,38 @@ package main import ( "context" - "errors" "fmt" "io" "net/http" "net/url" - blockstore "github.com/ipfs/boxo/blockstore" - "github.com/ipfs/go-block-format" + "github.com/ipfs/boxo/exchange" + blocks "github.com/ipfs/go-block-format" "github.com/ipfs/go-cid" ) -var ( - errNotImplemented = errors.New("not implemented") -) - -type proxyStore struct { +type proxyExchange struct { httpClient *http.Client gatewayURL string - validate bool } -func newProxyStore(gatewayURL string, client *http.Client) blockstore.Blockstore { +func newProxyExchange(gatewayURL string, client *http.Client) exchange.Interface { if client == nil { client = http.DefaultClient } - return &proxyStore{ + return &proxyExchange{ gatewayURL: gatewayURL, httpClient: client, - // Enables block validation by default. Important since we are - // proxying block requests to an untrusted gateway. - validate: true, } } -func (ps *proxyStore) fetch(ctx context.Context, c cid.Cid) (blocks.Block, error) { - u, err := url.Parse(fmt.Sprintf("%s/ipfs/%s?format=raw", ps.gatewayURL, c)) +func (e *proxyExchange) fetch(ctx context.Context, c cid.Cid) (blocks.Block, error) { + u, err := url.Parse(fmt.Sprintf("%s/ipfs/%s?format=raw", e.gatewayURL, c)) if err != nil { return nil, err } - resp, err := ps.httpClient.Do(&http.Request{ + resp, err := e.httpClient.Do(&http.Request{ Method: http.MethodGet, URL: u, Header: http.Header{ @@ -63,57 +54,60 @@ func (ps *proxyStore) fetch(ctx context.Context, c cid.Cid) (blocks.Block, error return nil, err } - if ps.validate { - nc, err := c.Prefix().Sum(rb) - if err != nil { - return nil, blocks.ErrWrongHash - } - if !nc.Equals(c) { - fmt.Printf("got %s vs %s\n", nc, c) - return nil, blocks.ErrWrongHash - } - } - return blocks.NewBlockWithCid(rb, c) -} - -func (ps *proxyStore) Has(ctx context.Context, c cid.Cid) (bool, error) { - blk, err := ps.fetch(ctx, c) + // Validate incoming blocks + // This is important since we are proxying block requests to an untrusted gateway. + nc, err := c.Prefix().Sum(rb) if err != nil { - return false, err + return nil, blocks.ErrWrongHash + } + if !nc.Equals(c) { + fmt.Printf("got %s vs %s\n", nc, c) + return nil, blocks.ErrWrongHash } - return blk != nil, nil + + return blocks.NewBlockWithCid(rb, c) } -func (ps *proxyStore) Get(ctx context.Context, c cid.Cid) (blocks.Block, error) { - blk, err := ps.fetch(ctx, c) +func (e *proxyExchange) GetBlock(ctx context.Context, c cid.Cid) (blocks.Block, error) { + blk, err := e.fetch(ctx, c) if err != nil { return nil, err } return blk, nil } -func (ps *proxyStore) GetSize(ctx context.Context, c cid.Cid) (int, error) { - blk, err := ps.fetch(ctx, c) - if err != nil { - return 0, err - } - return len(blk.RawData()), nil -} +func (e *proxyExchange) GetBlocks(ctx context.Context, cids []cid.Cid) (<-chan blocks.Block, error) { + ch := make(chan blocks.Block) + + // Note: this implementation of GetBlocks does not make use of worker pools or parallelism + // However, production implementations generally will, and an advanced + // version of this can be found in https://github.com/ipfs/bifrost-gateway/ + go func() { + defer close(ch) + for _, c := range cids { + blk, err := e.fetch(ctx, c) + if err != nil { + return + } + select { + case ch <- blk: + case <-ctx.Done(): + return + } + } + }() -func (ps *proxyStore) HashOnRead(enabled bool) { - ps.validate = enabled + return ch, nil } -func (c *proxyStore) Put(context.Context, blocks.Block) error { - return errNotImplemented +func (e *proxyExchange) NotifyNewBlocks(ctx context.Context, blocks ...blocks.Block) error { + // Note: while not required this function could be used optimistically to prevent fetching + // of data that the client has retrieved already even though a Get call is in progress. + return nil } -func (c *proxyStore) PutMany(context.Context, []blocks.Block) error { - return errNotImplemented -} -func (c *proxyStore) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) { - return nil, errNotImplemented -} -func (c *proxyStore) DeleteBlock(context.Context, cid.Cid) error { - return errNotImplemented +func (e *proxyExchange) Close() error { + // Note: while nothing is strictly required to happen here it would be reasonable to close + // existing connections and prevent new operations from starting. + return nil } diff --git a/examples/gateway/proxy/main.go b/examples/gateway/proxy/main.go index a6beeed52..0760c05fc 100644 --- a/examples/gateway/proxy/main.go +++ b/examples/gateway/proxy/main.go @@ -7,9 +7,11 @@ import ( "strconv" "github.com/ipfs/boxo/blockservice" + "github.com/ipfs/boxo/blockstore" "github.com/ipfs/boxo/examples/gateway/common" - offline "github.com/ipfs/boxo/exchange/offline" "github.com/ipfs/boxo/gateway" + "github.com/ipfs/go-datastore" + dssync "github.com/ipfs/go-datastore/sync" ) func main() { @@ -17,9 +19,14 @@ func main() { port := flag.Int("p", 8040, "port to run this gateway from") flag.Parse() - // Sets up the block store, which will proxy the block requests to the given gateway. - blockStore := newProxyStore(*gatewayUrlPtr, nil) - blockService := blockservice.New(blockStore, offline.Exchange(blockStore)) + // Sets up a blockstore to hold the blocks we request from the gateway + // Note: in a production environment you would likely want to choose a more efficient datastore implementation + // as well as one that has a way of pruning storage so as not to hold data in memory indefinitely. + blockStore := blockstore.NewBlockstore(dssync.MutexWrap(datastore.NewMapDatastore())) + + // Sets up the exchange, which will proxy the block requests to the given gateway. + e := newProxyExchange(*gatewayUrlPtr, nil) + blockService := blockservice.New(blockStore, e) // Sets up the routing system, which will proxy the IPNS routing requests to the given gateway. routing := newProxyRouting(*gatewayUrlPtr, nil) diff --git a/examples/gateway/proxy/main_test.go b/examples/gateway/proxy/main_test.go index 3e3accc9b..a8f27ce86 100644 --- a/examples/gateway/proxy/main_test.go +++ b/examples/gateway/proxy/main_test.go @@ -7,10 +7,12 @@ import ( "testing" "github.com/ipfs/boxo/blockservice" + "github.com/ipfs/boxo/blockstore" "github.com/ipfs/boxo/examples/gateway/common" - offline "github.com/ipfs/boxo/exchange/offline" "github.com/ipfs/boxo/gateway" blocks "github.com/ipfs/go-block-format" + "github.com/ipfs/go-datastore" + dssync "github.com/ipfs/go-datastore/sync" "github.com/stretchr/testify/assert" ) @@ -19,8 +21,9 @@ const ( ) func newProxyGateway(t *testing.T, rs *httptest.Server) *httptest.Server { - blockStore := newProxyStore(rs.URL, nil) - blockService := blockservice.New(blockStore, offline.Exchange(blockStore)) + blockStore := blockstore.NewBlockstore(dssync.MutexWrap(datastore.NewMapDatastore())) + exch := newProxyExchange(rs.URL, nil) + blockService := blockservice.New(blockStore, exch) routing := newProxyRouting(rs.URL, nil) gw, err := gateway.NewBlocksGateway(blockService, gateway.WithValueStore(routing))