Skip to content

Commit

Permalink
refactor(examples): proxy gateway to use exchange
Browse files Browse the repository at this point in the history
  • Loading branch information
aschmahmann authored Mar 31, 2023
1 parent 2835752 commit b1046dd
Show file tree
Hide file tree
Showing 3 changed files with 66 additions and 62 deletions.
104 changes: 49 additions & 55 deletions examples/gateway/proxy/blockstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,47 +2,38 @@ package main

import (
"context"
"errors"
"fmt"
"io"
"net/http"
"net/url"

blockstore "github.com/ipfs/boxo/blockstore"
"github.com/ipfs/go-block-format"
"github.com/ipfs/boxo/exchange"
blocks "github.com/ipfs/go-block-format"
"github.com/ipfs/go-cid"
)

var (
errNotImplemented = errors.New("not implemented")
)

type proxyStore struct {
type proxyExchange struct {
httpClient *http.Client
gatewayURL string
validate bool
}

func newProxyStore(gatewayURL string, client *http.Client) blockstore.Blockstore {
func newProxyExchange(gatewayURL string, client *http.Client) exchange.Interface {
if client == nil {
client = http.DefaultClient
}

return &proxyStore{
return &proxyExchange{
gatewayURL: gatewayURL,
httpClient: client,
// Enables block validation by default. Important since we are
// proxying block requests to an untrusted gateway.
validate: true,
}
}

func (ps *proxyStore) fetch(ctx context.Context, c cid.Cid) (blocks.Block, error) {
u, err := url.Parse(fmt.Sprintf("%s/ipfs/%s?format=raw", ps.gatewayURL, c))
func (e *proxyExchange) fetch(ctx context.Context, c cid.Cid) (blocks.Block, error) {
u, err := url.Parse(fmt.Sprintf("%s/ipfs/%s?format=raw", e.gatewayURL, c))
if err != nil {
return nil, err
}
resp, err := ps.httpClient.Do(&http.Request{
resp, err := e.httpClient.Do(&http.Request{
Method: http.MethodGet,
URL: u,
Header: http.Header{
Expand All @@ -63,57 +54,60 @@ func (ps *proxyStore) fetch(ctx context.Context, c cid.Cid) (blocks.Block, error
return nil, err
}

if ps.validate {
nc, err := c.Prefix().Sum(rb)
if err != nil {
return nil, blocks.ErrWrongHash
}
if !nc.Equals(c) {
fmt.Printf("got %s vs %s\n", nc, c)
return nil, blocks.ErrWrongHash
}
}
return blocks.NewBlockWithCid(rb, c)
}

func (ps *proxyStore) Has(ctx context.Context, c cid.Cid) (bool, error) {
blk, err := ps.fetch(ctx, c)
// Validate incoming blocks
// This is important since we are proxying block requests to an untrusted gateway.
nc, err := c.Prefix().Sum(rb)
if err != nil {
return false, err
return nil, blocks.ErrWrongHash
}
if !nc.Equals(c) {
fmt.Printf("got %s vs %s\n", nc, c)
return nil, blocks.ErrWrongHash
}
return blk != nil, nil

return blocks.NewBlockWithCid(rb, c)
}

func (ps *proxyStore) Get(ctx context.Context, c cid.Cid) (blocks.Block, error) {
blk, err := ps.fetch(ctx, c)
func (e *proxyExchange) GetBlock(ctx context.Context, c cid.Cid) (blocks.Block, error) {
blk, err := e.fetch(ctx, c)
if err != nil {
return nil, err
}
return blk, nil
}

func (ps *proxyStore) GetSize(ctx context.Context, c cid.Cid) (int, error) {
blk, err := ps.fetch(ctx, c)
if err != nil {
return 0, err
}
return len(blk.RawData()), nil
}
func (e *proxyExchange) GetBlocks(ctx context.Context, cids []cid.Cid) (<-chan blocks.Block, error) {
ch := make(chan blocks.Block)

// Note: this implementation of GetBlocks does not make use of worker pools or parallelism
// However, production implementations generally will, and an advanced
// version of this can be found in https://github.com/ipfs/bifrost-gateway/
go func() {
defer close(ch)
for _, c := range cids {
blk, err := e.fetch(ctx, c)
if err != nil {
return
}
select {
case ch <- blk:
case <-ctx.Done():
return
}
}
}()

func (ps *proxyStore) HashOnRead(enabled bool) {
ps.validate = enabled
return ch, nil
}

func (c *proxyStore) Put(context.Context, blocks.Block) error {
return errNotImplemented
func (e *proxyExchange) NotifyNewBlocks(ctx context.Context, blocks ...blocks.Block) error {
// Note: while not required this function could be used optimistically to prevent fetching
// of data that the client has retrieved already even though a Get call is in progress.
return nil
}

func (c *proxyStore) PutMany(context.Context, []blocks.Block) error {
return errNotImplemented
}
func (c *proxyStore) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) {
return nil, errNotImplemented
}
func (c *proxyStore) DeleteBlock(context.Context, cid.Cid) error {
return errNotImplemented
func (e *proxyExchange) Close() error {
// Note: while nothing is strictly required to happen here it would be reasonable to close
// existing connections and prevent new operations from starting.
return nil
}
15 changes: 11 additions & 4 deletions examples/gateway/proxy/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,19 +7,26 @@ import (
"strconv"

"github.com/ipfs/boxo/blockservice"
"github.com/ipfs/boxo/blockstore"
"github.com/ipfs/boxo/examples/gateway/common"
offline "github.com/ipfs/boxo/exchange/offline"
"github.com/ipfs/boxo/gateway"
"github.com/ipfs/go-datastore"
dssync "github.com/ipfs/go-datastore/sync"
)

func main() {
gatewayUrlPtr := flag.String("g", "", "gateway to proxy to")
port := flag.Int("p", 8040, "port to run this gateway from")
flag.Parse()

// Sets up the block store, which will proxy the block requests to the given gateway.
blockStore := newProxyStore(*gatewayUrlPtr, nil)
blockService := blockservice.New(blockStore, offline.Exchange(blockStore))
// Sets up a blockstore to hold the blocks we request from the gateway
// Note: in a production environment you would likely want to choose a more efficient datastore implementation
// as well as one that has a way of pruning storage so as not to hold data in memory indefinitely.
blockStore := blockstore.NewBlockstore(dssync.MutexWrap(datastore.NewMapDatastore()))

// Sets up the exchange, which will proxy the block requests to the given gateway.
e := newProxyExchange(*gatewayUrlPtr, nil)
blockService := blockservice.New(blockStore, e)

// Sets up the routing system, which will proxy the IPNS routing requests to the given gateway.
routing := newProxyRouting(*gatewayUrlPtr, nil)
Expand Down
9 changes: 6 additions & 3 deletions examples/gateway/proxy/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,12 @@ import (
"testing"

"github.com/ipfs/boxo/blockservice"
"github.com/ipfs/boxo/blockstore"
"github.com/ipfs/boxo/examples/gateway/common"
offline "github.com/ipfs/boxo/exchange/offline"
"github.com/ipfs/boxo/gateway"
blocks "github.com/ipfs/go-block-format"
"github.com/ipfs/go-datastore"
dssync "github.com/ipfs/go-datastore/sync"
"github.com/stretchr/testify/assert"
)

Expand All @@ -19,8 +21,9 @@ const (
)

func newProxyGateway(t *testing.T, rs *httptest.Server) *httptest.Server {
blockStore := newProxyStore(rs.URL, nil)
blockService := blockservice.New(blockStore, offline.Exchange(blockStore))
blockStore := blockstore.NewBlockstore(dssync.MutexWrap(datastore.NewMapDatastore()))
exch := newProxyExchange(rs.URL, nil)
blockService := blockservice.New(blockStore, exch)
routing := newProxyRouting(rs.URL, nil)

gw, err := gateway.NewBlocksGateway(blockService, gateway.WithValueStore(routing))
Expand Down

0 comments on commit b1046dd

Please sign in to comment.