Skip to content

Commit

Permalink
Add function to create general dagsync blockhook function (#144)
Browse files Browse the repository at this point in the history
* Add function to create general dagsync blockhook function

A general blockhook function is commonly used by clients of this library when segmented syncs are enabled.

* Ad and entries deserialization functions
  • Loading branch information
gammazero authored Jan 10, 2024
1 parent 5e8b864 commit 5250e76
Show file tree
Hide file tree
Showing 2 changed files with 93 additions and 0 deletions.
32 changes: 32 additions & 0 deletions dagsync/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -310,3 +310,35 @@ func ScopedBlockHook(hook BlockHookFunc) SyncOption {
sc.blockHook = hook
}
}

// MakeGeneralBlockHook creates a block hook function that sets the next sync
// action based on whether the specified advertisement has a previous
// advertisement in the chain..
//
// Use this when segmented sync is enabled and no other blockhook is defined.
//
// The supplied prevAdCid function takes the CID of the current advertisement
// and returns the CID of the previous advertisement in the chain. This would
// typically be done my loading the specified advertisement from the
// ipld.LinkSystem and getting the previous CID.
func MakeGeneralBlockHook(prevAdCid func(adCid cid.Cid) (cid.Cid, error)) BlockHookFunc {
return func(_ peer.ID, adCid cid.Cid, actions SegmentSyncActions) {
// The only kind of block we should get by loading CIDs here should be
// Advertisement.
//
// Because:
// - The default subscription selector only selects advertisements.
// - Entries are synced with an explicit selector separate from
// advertisement syncs and should use dagsync.ScopedBlockHook to
// override this hook and decode chunks instead.
//
// Therefore, we only attempt to load advertisements here and signal
// failure if the load fails.
prevCid, err := prevAdCid(adCid)
if err != nil {
actions.FailSync(err)
} else {
actions.SetNextSyncCid(prevCid)
}
}
}
61 changes: 61 additions & 0 deletions ingest/schema/types.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,21 @@
package schema

import (
"bytes"
"errors"
"fmt"
"io"

"github.com/ipfs/go-cid"
"github.com/ipld/go-ipld-prime"
cidlink "github.com/ipld/go-ipld-prime/linking/cid"
"github.com/ipld/go-ipld-prime/multicodec"
"github.com/ipld/go-ipld-prime/node/bindnode"
"github.com/multiformats/go-multihash"

// Import so these codecs get registered.
_ "github.com/ipld/go-ipld-prime/codec/dagcbor"
_ "github.com/ipld/go-ipld-prime/codec/dagjson"
)

type (
Expand Down Expand Up @@ -82,6 +91,15 @@ func UnwrapAdvertisement(node ipld.Node) (*Advertisement, error) {
return ad, nil
}

// Return the Advertisement's previous CID, or cid.Undef if there is no
// previous CID.
func (a Advertisement) PreviousCid() cid.Cid {
if a.PreviousID == nil {
return cid.Undef
}
return a.PreviousID.(cidlink.Link).Cid
}

func (a Advertisement) Validate() error {
if len(a.ContextID) > MaxContextIDLen {
return errors.New("context id too long")
Expand All @@ -94,6 +112,49 @@ func (a Advertisement) Validate() error {
return nil
}

// BytesToAdvertisement deserializes an Advertisement from a buffer. It does
// not check that the given CID matches the data, as this should have been done
// when the data was acquired.
func BytesToAdvertisement(adCid cid.Cid, data []byte) (Advertisement, error) {
adNode, err := decodeIPLDNode(adCid.Prefix().Codec, bytes.NewBuffer(data), AdvertisementPrototype)
if err != nil {
return Advertisement{}, err
}
ad, err := UnwrapAdvertisement(adNode)
if err != nil {
return Advertisement{}, err
}
return *ad, nil
}

// BytesToEntryChunk deserializes an EntryChunk from a buffer. It does not
// check that the given CID matches the data, as this should have been done
// when the data was acquired.
func BytesToEntryChunk(entCid cid.Cid, data []byte) (EntryChunk, error) {
entNode, err := decodeIPLDNode(entCid.Prefix().Codec, bytes.NewBuffer(data), EntryChunkPrototype)
if err != nil {
return EntryChunk{}, err
}
ent, err := UnwrapEntryChunk(entNode)
if err != nil {
return EntryChunk{}, err
}
return *ent, nil
}

// decodeIPLDNode decodes an ipld.Node from bytes read from an io.Reader.
func decodeIPLDNode(codec uint64, r io.Reader, prototype ipld.NodePrototype) (ipld.Node, error) {
nb := prototype.NewBuilder()
decoder, err := multicodec.LookupDecoder(codec)
if err != nil {
return nil, err
}
if err = decoder(nb, r); err != nil {
return nil, err
}
return nb.Build(), nil
}

// UnwrapEntryChunk unwraps the given node as an entry chunk.
//
// Note that the node is reassigned to EntryChunkPrototype if its prototype is different.
Expand Down

0 comments on commit 5250e76

Please sign in to comment.