Skip to content

Commit

Permalink
Include optional Ipni-Cid-Schema-Type HTTP header
Browse files Browse the repository at this point in the history
Same as #221 applied to Release-0.5.x branch.

- update go-libp2p to latest
  • Loading branch information
gammazero committed Sep 5, 2024
1 parent ec53518 commit 3ad5ce3
Show file tree
Hide file tree
Showing 11 changed files with 285 additions and 15 deletions.
7 changes: 4 additions & 3 deletions dagsync/announce_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func TestAnnounceReplace(t *testing.T) {
}

sub, err := dagsync.NewSubscriber(dstHost, dstStore, dstLnkS, testTopic, dagsync.RecvAnnounce(),
dagsync.BlockHook(blockHook))
dagsync.BlockHook(blockHook), dagsync.WithCidSchemaHint(false))
require.NoError(t, err)
defer sub.Close()

Expand Down Expand Up @@ -377,7 +377,8 @@ func TestPublisherRejectsPeer(t *testing.T) {
dstHost.Peerstore().AddAddrs(srcHost.ID(), srcHost.Addrs(), time.Hour)
dstLnkS := test.MkLinkSystem(dstStore)

sub, err := dagsync.NewSubscriber(dstHost, dstStore, dstLnkS, testTopic, dagsync.RecvAnnounce(announce.WithTopic(topics[1])))
sub, err := dagsync.NewSubscriber(dstHost, dstStore, dstLnkS, testTopic,
dagsync.RecvAnnounce(announce.WithTopic(topics[1])))
require.NoError(t, err)
defer sub.Close()

Expand Down Expand Up @@ -457,7 +458,7 @@ func initPubSub(t *testing.T, srcStore, dstStore datastore.Batching, allowPeer f
dstHost.Peerstore().AddAddrs(srcHost.ID(), srcHost.Addrs(), time.Hour)
dstLnkS := test.MkLinkSystem(dstStore)

sub, err := dagsync.NewSubscriber(dstHost, dstStore, dstLnkS, testTopic,
sub, err := dagsync.NewSubscriber(dstHost, dstStore, dstLnkS, testTopic, dagsync.WithCidSchemaHint(false),
dagsync.RecvAnnounce(announce.WithTopic(topics[1]), announce.WithAllowPeer(allowPeer)))
require.NoError(t, err)

Expand Down
71 changes: 71 additions & 0 deletions dagsync/ipnisync/cid_schema_hint.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package ipnisync

import (
"context"
"errors"
)

const (
// CidSchemaHeader is the HTTP header used as an optional hint about the
// type of data requested by a CID.
CidSchemaHeader = "Ipni-Cid-Schema-Type"
// CidSchemaAdvertisement is a value for the CidSchemaHeader specifying
// advertiesement data is being requested. Referrs to Advertisement in
// https://github.com/ipni/go-libipni/blob/main/ingest/schema/schema.ipldsch
CidSchemaAdvertisement = "Advertisement"
// CidSchemaEntries is a value for the CidSchemaHeader specifying
// advertisement entries (multihash chunks) data is being requested.
// Referrs to Entry chunk in
// https://github.com/ipni/go-libipni/blob/main/ingest/schema/schema.ipldsch
CidSchemaEntryChunk = "EntryChunk"
)

var ErrUnknownCidSchema = errors.New("unknown cid schema type value")

// cidSchemaTypeKey is the type used for the key of CidSchemaHeader when set as
// a context value.
type cidSchemaTypeCtxKey string

// cidSchemaCtxKey is used to get the key used to store or extract the cid
// schema value in a context.
const cidSchemaCtxKey cidSchemaTypeCtxKey = CidSchemaHeader

// CidSchemaFromCtx extracts the CID schema name from the context. If the
// scheam value is not set, then returns "". If the schema value is set, but is
// not recognized, then ErrUnknownCidSchema is returned along with the value.
//
// Returning unrecognized values with an error allows consumers to retrieved
// newer values that are not recognized by an older version of this library.
func CidSchemaFromCtx(ctx context.Context) (string, error) {
cidSchemaType, ok := ctx.Value(cidSchemaCtxKey).(string)
if !ok {
return "", nil
}

var err error
switch cidSchemaType {
case CidSchemaAdvertisement, CidSchemaEntryChunk:
default:
err = ErrUnknownCidSchema
}
return cidSchemaType, err
}

// CtxWithCidSchema creates a derived context that has the specified value for
// the CID schema type.
//
// Setting an unrecognized value, even when an error is retruned, allows
// producers to set context values that are not recognized by an older version
// of this library.
func CtxWithCidSchema(ctx context.Context, cidSchemaType string) (context.Context, error) {
if cidSchemaType == "" {
return ctx, nil
}
var err error
switch cidSchemaType {
case CidSchemaAdvertisement, CidSchemaEntryChunk:
default:
err = ErrUnknownCidSchema
}
return context.WithValue(ctx, cidSchemaCtxKey, cidSchemaType), err
}
46 changes: 46 additions & 0 deletions dagsync/ipnisync/cid_schema_hint_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package ipnisync_test

import (
"context"
"testing"

"github.com/ipni/go-libipni/dagsync/ipnisync"
"github.com/stretchr/testify/require"
)

func TestCtxWithCidSchema(t *testing.T) {
ctxOrig := context.Background()
ctx, err := ipnisync.CtxWithCidSchema(ctxOrig, "")
require.NoError(t, err)
require.Equal(t, ctxOrig, ctx)

ctx, err = ipnisync.CtxWithCidSchema(ctxOrig, ipnisync.CidSchemaAdvertisement)
require.NoError(t, err)
require.NotEqual(t, ctxOrig, ctx)

value, err := ipnisync.CidSchemaFromCtx(ctx)
require.NoError(t, err)
require.Equal(t, ipnisync.CidSchemaAdvertisement, value)

ctx, err = ipnisync.CtxWithCidSchema(ctx, ipnisync.CidSchemaEntryChunk)
require.NoError(t, err)
value, err = ipnisync.CidSchemaFromCtx(ctx)
require.NoError(t, err)
require.Equal(t, ipnisync.CidSchemaEntryChunk, value)

value, err = ipnisync.CidSchemaFromCtx(ctxOrig)
require.NoError(t, err)
require.Empty(t, value)

const unknownVal = "unknown"

// Setting unknown value returns error as well as context with value.
ctx, err = ipnisync.CtxWithCidSchema(ctxOrig, unknownVal)
require.ErrorIs(t, err, ipnisync.ErrUnknownCidSchema)
require.NotNil(t, ctxOrig, ctx)

// Getting unknown value returns error as well as value.
value, err = ipnisync.CidSchemaFromCtx(ctx)
require.ErrorIs(t, err, ipnisync.ErrUnknownCidSchema)
require.Equal(t, unknownVal, value)
}
32 changes: 31 additions & 1 deletion dagsync/ipnisync/publisher.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package ipnisync

import (
"context"
"errors"
"fmt"
"net/http"
Expand All @@ -12,9 +13,11 @@ import (
"github.com/ipfs/go-cid"
"github.com/ipld/go-ipld-prime"
"github.com/ipld/go-ipld-prime/codec/dagjson"
"github.com/ipld/go-ipld-prime/datamodel"
cidlink "github.com/ipld/go-ipld-prime/linking/cid"
basicnode "github.com/ipld/go-ipld-prime/node/basic"
headschema "github.com/ipni/go-libipni/dagsync/ipnisync/head"
"github.com/ipni/go-libipni/ingest/schema"
"github.com/ipni/go-libipni/maurl"
ic "github.com/libp2p/go-libp2p/core/crypto"
"github.com/libp2p/go-libp2p/core/peer"
Expand All @@ -23,6 +26,10 @@ import (
)

// Publisher serves an advertisement chain over HTTP.
//
// If the publisher receives a request that contains a valid CidSchemaHeader
// header, then the ipld.Context passed to the lsys Load function contains a
// context that has that header's value retrievable with CidSchemaFromCtx.
type Publisher struct {
lsys ipld.LinkSystem
handlerPath string
Expand Down Expand Up @@ -218,7 +225,30 @@ func (p *Publisher) ServeHTTP(w http.ResponseWriter, r *http.Request) {
http.Error(w, "invalid request: not a cid", http.StatusBadRequest)
return
}
item, err := p.lsys.Load(ipld.LinkContext{}, cidlink.Link{Cid: c}, basicnode.Prototype.Any)

ipldCtx := ipld.LinkContext{}
reqType := r.Header.Get(CidSchemaHeader)
if reqType != "" {
log.Debug("sync request has cid schema type hint", "hint", reqType)
ipldCtx.Ctx, err = CtxWithCidSchema(context.Background(), reqType)
if err != nil {
// Log warning about unknown cid schema type, but continue on since
// the linksystem might recognize it.
log.Warnw(err.Error(), "value", reqType)
}
}

var ipldProto datamodel.NodePrototype
switch reqType {
case CidSchemaAdvertisement:
ipldProto = schema.AdvertisementPrototype
case CidSchemaEntryChunk:
ipldProto = schema.EntryChunkPrototype
default:
ipldProto = basicnode.Prototype.Any
}

item, err := p.lsys.Load(ipldCtx, cidlink.Link{Cid: c}, ipldProto)
if err != nil {
if errors.Is(err, ipld.ErrNotExists{}) {
http.Error(w, "cid not found", http.StatusNotFound)
Expand Down
35 changes: 29 additions & 6 deletions dagsync/ipnisync/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/ipld/go-ipld-prime/traversal"
"github.com/ipld/go-ipld-prime/traversal/selector"
headschema "github.com/ipni/go-libipni/dagsync/ipnisync/head"
"github.com/ipni/go-libipni/ingest/schema"
"github.com/ipni/go-libipni/maurl"
"github.com/ipni/go-libipni/mautil"
"github.com/libp2p/go-libp2p/core/network"
Expand Down Expand Up @@ -242,7 +243,23 @@ func (s *Syncer) Sync(ctx context.Context, nextCid cid.Cid, sel ipld.Node) error
return fmt.Errorf("failed to compile selector: %w", err)
}

cids, err := s.walkFetch(ctx, nextCid, xsel)
// Check for valid cid schema type if set.
reqType, err := CidSchemaFromCtx(ctx)
if err != nil {
return err
}

var ipldProto datamodel.NodePrototype
switch reqType {
case CidSchemaAdvertisement:
ipldProto = schema.AdvertisementPrototype
case CidSchemaEntryChunk:
ipldProto = schema.EntryChunkPrototype
default:
ipldProto = basicnode.Prototype.Any
}

cids, err := s.walkFetch(ctx, nextCid, xsel, ipldProto)
if err != nil {
return fmt.Errorf("failed to traverse requested dag: %w", err)
}
Expand All @@ -268,7 +285,7 @@ func (s *Syncer) Sync(ctx context.Context, nextCid cid.Cid, sel ipld.Node) error
// walkFetch is run by a traversal of the selector. For each block that the
// selector walks over, walkFetch will look to see if it can find it in the
// local data store. If it cannot, it will then go and get it over HTTP.
func (s *Syncer) walkFetch(ctx context.Context, rootCid cid.Cid, sel selector.Selector) ([]cid.Cid, error) {
func (s *Syncer) walkFetch(ctx context.Context, rootCid cid.Cid, sel selector.Selector, ipldProto datamodel.NodePrototype) ([]cid.Cid, error) {
// Track the order of cids seen during traversal so that the block hook
// function gets called in the same order.
var traversalOrder []cid.Cid
Expand All @@ -279,7 +296,7 @@ func (s *Syncer) walkFetch(ctx context.Context, rootCid cid.Cid, sel selector.Se
getMissingLs.StorageReadOpener = func(lc ipld.LinkContext, l ipld.Link) (io.Reader, error) {
c := l.(cidlink.Link).Cid
// fetchBlock checks if the node is already present in storage.
err := s.fetchBlock(ctx, c)
err := s.fetchBlock(ctx, c, ipldProto)
if err != nil {
return nil, fmt.Errorf("failed to fetch block for cid %s: %w", c, err)
}
Expand All @@ -301,7 +318,7 @@ func (s *Syncer) walkFetch(ctx context.Context, rootCid cid.Cid, sel selector.Se
}

// get the direct node.
rootNode, err := getMissingLs.Load(ipld.LinkContext{Ctx: ctx}, cidlink.Link{Cid: rootCid}, basicnode.Prototype.Any)
rootNode, err := getMissingLs.Load(ipld.LinkContext{Ctx: ctx}, cidlink.Link{Cid: rootCid}, ipldProto)
if err != nil {
return nil, fmt.Errorf("failed to load node for root cid %s: %w", rootCid, err)
}
Expand All @@ -323,6 +340,12 @@ retry:
return err
}

// Error already checked in Sync.
reqType, _ := CidSchemaFromCtx(ctx)
if reqType != "" {
req.Header.Set(CidSchemaHeader, reqType)
}

resp, err := s.client.Do(req)
if err != nil {
if len(s.urls) != 0 {
Expand Down Expand Up @@ -378,8 +401,8 @@ retry:
}

// fetchBlock fetches an item into the datastore at c if not locally available.
func (s *Syncer) fetchBlock(ctx context.Context, c cid.Cid) error {
n, err := s.sync.lsys.Load(ipld.LinkContext{Ctx: ctx}, cidlink.Link{Cid: c}, basicnode.Prototype.Any)
func (s *Syncer) fetchBlock(ctx context.Context, c cid.Cid, ipldProto datamodel.NodePrototype) error {
n, err := s.sync.lsys.Load(ipld.LinkContext{Ctx: ctx}, cidlink.Link{Cid: c}, ipldProto)
// node is already present.
if n != nil && err == nil {
return nil
Expand Down
61 changes: 61 additions & 0 deletions dagsync/ipnisync/sync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,3 +230,64 @@ func TestIPNIsync_NotFoundReturnsContentNotFoundErr(t *testing.T) {
require.NotNil(t, err)
require.Contains(t, err.Error(), "content not found")
}

func TestRequestTypeHint(t *testing.T) {
pubPrK, _, err := crypto.GenerateKeyPairWithReader(crypto.RSA, 2048, rand.Reader)
require.NoError(t, err)
pubID, err := peer.IDFromPrivateKey(pubPrK)
require.NoError(t, err)

var lastReqTypeHint string

// Instantiate a dagsync publisher.
publs := cidlink.DefaultLinkSystem()

publs.StorageReadOpener = func(lnkCtx linking.LinkContext, lnk datamodel.Link) (io.Reader, error) {
if lnkCtx.Ctx != nil {
hint, err := ipnisync.CidSchemaFromCtx(lnkCtx.Ctx)
require.NoError(t, err)
require.NotEmpty(t, hint)
lastReqTypeHint = hint
} else {
lastReqTypeHint = ""
}

require.NotEmpty(t, lastReqTypeHint, "missing expected context value")
return nil, ipld.ErrNotExists{}
}

pub, err := ipnisync.NewPublisher(publs, pubPrK, ipnisync.WithHTTPListenAddrs("0.0.0.0:0"))
require.NoError(t, err)
t.Cleanup(func() { require.NoError(t, pub.Close()) })

ls := cidlink.DefaultLinkSystem()
store := &memstore.Store{}
ls.SetWriteStorage(store)
ls.SetReadStorage(store)

sync := ipnisync.NewSync(ls, nil)
pubInfo := peer.AddrInfo{
ID: pubID,
Addrs: pub.Addrs(),
}
syncer, err := sync.NewSyncer(pubInfo)
require.NoError(t, err)

testCid, err := cid.Decode(sampleNFTStorageCid)
require.NoError(t, err)

ctx, err := ipnisync.CtxWithCidSchema(context.Background(), ipnisync.CidSchemaAdvertisement)
require.NoError(t, err)
_ = syncer.Sync(ctx, testCid, selectorparse.CommonSelector_MatchPoint)
require.Equal(t, ipnisync.CidSchemaAdvertisement, lastReqTypeHint)

ctx, err = ipnisync.CtxWithCidSchema(context.Background(), ipnisync.CidSchemaEntryChunk)
require.NoError(t, err)
_ = syncer.Sync(ctx, testCid, selectorparse.CommonSelector_MatchPoint)
require.Equal(t, ipnisync.CidSchemaEntryChunk, lastReqTypeHint)

ctx, err = ipnisync.CtxWithCidSchema(context.Background(), "bad")
require.ErrorIs(t, err, ipnisync.ErrUnknownCidSchema)
err = syncer.Sync(ctx, testCid, selectorparse.CommonSelector_MatchPoint)
require.ErrorIs(t, err, ipnisync.ErrUnknownCidSchema)
}
9 changes: 9 additions & 0 deletions dagsync/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ type config struct {
gsMaxInRequests uint64
gsMaxOutRequests uint64

cidSchemaHint bool
strictAdsSelSeq bool

httpTimeout time.Duration
Expand All @@ -73,6 +74,7 @@ func getOpts(opts []Option) (config, error) {
segDepthLimit: defaultSegDepthLimit,
gsMaxInRequests: defaultGsMaxInRequests,
gsMaxOutRequests: defaultGsMaxOutRequests,
cidSchemaHint: true,
strictAdsSelSeq: true,
}

Expand Down Expand Up @@ -355,3 +357,10 @@ func MakeGeneralBlockHook(prevAdCid func(adCid cid.Cid) (cid.Cid, error)) BlockH
}
}
}

func WithCidSchemaHint(enable bool) Option {
return func(c *config) error {
c.cidSchemaHint = enable
return nil
}
}
Loading

0 comments on commit 3ad5ce3

Please sign in to comment.