Skip to content

Commit

Permalink
review changes
Browse files Browse the repository at this point in the history
  • Loading branch information
gammazero committed Sep 4, 2024
1 parent 57beee1 commit bbbeb76
Show file tree
Hide file tree
Showing 4 changed files with 52 additions and 12 deletions.
4 changes: 2 additions & 2 deletions dagsync/announce_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func TestAnnounceReplace(t *testing.T) {
}

sub, err := dagsync.NewSubscriber(dstHost, dstLnkS, dagsync.RecvAnnounce(testTopic),
dagsync.BlockHook(blockHook))
dagsync.BlockHook(blockHook), dagsync.WithCidSchemaHint(false))
require.NoError(t, err)
defer sub.Close()

Expand Down Expand Up @@ -377,7 +377,7 @@ func initPubSub(t *testing.T, srcStore, dstStore datastore.Batching, allowPeer f
dstHost.Peerstore().AddAddrs(srcHost.ID(), srcHost.Addrs(), time.Hour)
dstLnkS := test.MkLinkSystem(dstStore)

sub, err := dagsync.NewSubscriber(dstHost, dstLnkS,
sub, err := dagsync.NewSubscriber(dstHost, dstLnkS, dagsync.WithCidSchemaHint(false),
dagsync.RecvAnnounce(testTopic, announce.WithTopic(topics[1]), announce.WithAllowPeer(allowPeer)))
require.NoError(t, err)

Expand Down
20 changes: 19 additions & 1 deletion dagsync/ipnisync/publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,11 @@ import (
"github.com/ipfs/go-cid"
"github.com/ipld/go-ipld-prime"
"github.com/ipld/go-ipld-prime/codec/dagjson"
ipldmodel "github.com/ipld/go-ipld-prime/datamodel"
cidlink "github.com/ipld/go-ipld-prime/linking/cid"
basicnode "github.com/ipld/go-ipld-prime/node/basic"
headschema "github.com/ipni/go-libipni/dagsync/ipnisync/head"
"github.com/ipni/go-libipni/ingest/schema"
"github.com/ipni/go-libipni/maurl"
ic "github.com/libp2p/go-libp2p/core/crypto"
"github.com/libp2p/go-libp2p/core/peer"
Expand Down Expand Up @@ -226,13 +228,29 @@ func (p *Publisher) ServeHTTP(w http.ResponseWriter, r *http.Request) {
ipldCtx := ipld.LinkContext{}
reqType := r.Header.Get(CidSchemaHeader)
if reqType != "" {
log.Debug("sync request has cid schema type hint", "hint", reqType)
ipldCtx.Ctx, err = CtxWithCidSchema(ipldCtx.Ctx, reqType)
if err != nil {
// Log warning about unknown cid schema type, but continue on since
// the linksystem might recognize it.
log.Warnw(err.Error(), "value", reqType)
}
}

item, err := p.lsys.Load(ipldCtx, cidlink.Link{Cid: c}, basicnode.Prototype.Any)
var ipldProto ipldmodel.NodePrototype

switch reqType {
case CidSchemaAdvertisement:
ipldProto = schema.AdvertisementPrototype
case CidSchemaEntryChunk:
ipldProto = schema.EntryChunkPrototype
default:
ipldProto = basicnode.Prototype.Any
}

//ipldProto = basicnode.Prototype.Any

item, err := p.lsys.Load(ipldCtx, cidlink.Link{Cid: c}, ipldProto)
if err != nil {
if errors.Is(err, ipld.ErrNotExists{}) {
http.Error(w, "cid not found", http.StatusNotFound)
Expand Down
9 changes: 9 additions & 0 deletions dagsync/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ type config struct {
firstSyncDepth int64
segDepthLimit int64

cidSchemaHint bool
strictAdsSelSeq bool

httpTimeout time.Duration
Expand All @@ -66,6 +67,7 @@ func getOpts(opts []Option) (config, error) {
httpTimeout: defaultHttpTimeout,
idleHandlerTTL: defaultIdleHandlerTTL,
segDepthLimit: defaultSegDepthLimit,
cidSchemaHint: true,
strictAdsSelSeq: true,
}

Expand Down Expand Up @@ -339,3 +341,10 @@ func MakeGeneralBlockHook(prevAdCid func(adCid cid.Cid) (cid.Cid, error)) BlockH
}
}
}

func WithCidSchemaHint(enable bool) Option {
return func(c *config) error {
c.cidSchemaHint = enable
return nil
}
}
31 changes: 22 additions & 9 deletions dagsync/subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,10 @@ type Subscriber struct {
// specifies the selection sequence itself.
adsSelectorSeq ipld.Node

// cidSchemaHint enables sending the cid schema type hint as
// an HTTP header in sync requests.
cidSchemaHint bool

// selectorOne selects one multihash entries or HAMT block.
selectorOne ipld.Node
// selectorAll selects all multihash HAMT blocks.
Expand Down Expand Up @@ -236,6 +240,8 @@ func NewSubscriber(host host.Host, lsys ipld.LinkSystem, options ...Option) (*Su
ssb.ExploreFields(func(efsb builder.ExploreFieldsSpecBuilder) {
efsb.Insert("Next", ssb.ExploreRecursiveEdge()) // Next field in EntryChunk
})).Node(),

cidSchemaHint: opts.cidSchemaHint,
}

if opts.strictAdsSelSeq {
Expand All @@ -244,6 +250,7 @@ func NewSubscriber(host host.Host, lsys ipld.LinkSystem, options ...Option) (*Su
}).Node()
} else {
s.adsSelectorSeq = ssb.ExploreAll(ssb.ExploreRecursiveEdge()).Node()
s.cidSchemaHint = false
}

if opts.hasRcvr {
Expand Down Expand Up @@ -488,9 +495,11 @@ func (s *Subscriber) SyncAdChain(ctx context.Context, peerInfo peer.AddrInfo, op

sel := ExploreRecursiveWithStopNode(depthLimit, s.adsSelectorSeq, stopLnk)

ctx, err = ipnisync.CtxWithCidSchema(ctx, ipnisync.CidSchemaAdvertisement)
if err != nil {
panic(err.Error())
if s.cidSchemaHint {
ctx, err = ipnisync.CtxWithCidSchema(ctx, ipnisync.CidSchemaAdvertisement)
if err != nil {
panic(err.Error())
}
}
syncCount, err := hnd.handle(ctx, nextCid, sel, syncer, opts.blockHook, segdl, stopAtCid)
if err != nil {
Expand Down Expand Up @@ -575,9 +584,11 @@ func (s *Subscriber) syncEntries(ctx context.Context, peerInfo peer.AddrInfo, en

log.Debugw("Start entries sync", "peer", peerInfo.ID, "cid", entCid)

ctx, err = ipnisync.CtxWithCidSchema(ctx, ipnisync.CidSchemaEntryChunk)
if err != nil {
panic(err.Error())
if s.cidSchemaHint {
ctx, err = ipnisync.CtxWithCidSchema(ctx, ipnisync.CidSchemaEntryChunk)
if err != nil {
panic(err.Error())
}
}
_, err = hnd.handle(ctx, entCid, sel, syncer, bh, segdl, cid.Undef)
if err != nil {
Expand Down Expand Up @@ -880,9 +891,11 @@ func (h *handler) asyncSyncAdChain(ctx context.Context) {
return
}

ctx, err = ipnisync.CtxWithCidSchema(ctx, ipnisync.CidSchemaAdvertisement)
if err != nil {
panic(err.Error())
if h.subscriber.cidSchemaHint {
ctx, err = ipnisync.CtxWithCidSchema(ctx, ipnisync.CidSchemaAdvertisement)
if err != nil {
panic(err.Error())
}
}
sel := ExploreRecursiveWithStopNode(adsDepthLimit, h.subscriber.adsSelectorSeq, latestSyncLink)
syncCount, err := h.handle(ctx, nextCid, sel, syncer, h.subscriber.generalBlockHook, h.subscriber.segDepthLimit, stopAtCid)
Expand Down

0 comments on commit bbbeb76

Please sign in to comment.