Skip to content

Commit

Permalink
feat: handle retrieval queries for unindexed identity payload CIDs
Browse files Browse the repository at this point in the history
There are valid cases where a CAR may have an identity CID as its root that is
not represented as a 'block' within the CAR body and therefore isn't indexed
by the dagstore. In this case, we inspect the identity CID content and treat
the query as a query for the intersection of all of the links within the block.

Ref: filecoin-project/boost#715
  • Loading branch information
rvagg committed Aug 24, 2022
1 parent 38f2cb3 commit 6154030
Showing 1 changed file with 68 additions and 2 deletions.
70 changes: 68 additions & 2 deletions retrievalmarket/impl/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,10 @@ import (
"github.com/ipfs/go-cid"
"github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/namespace"
"github.com/ipld/go-ipld-prime"
cidlink "github.com/ipld/go-ipld-prime/linking/cid"
"github.com/ipld/go-ipld-prime/traversal"
"github.com/multiformats/go-multihash"
"golang.org/x/xerrors"

"github.com/filecoin-project/go-address"
Expand Down Expand Up @@ -341,7 +345,7 @@ func (p *Provider) HandleQueryStream(stream rmnet.RetrievalQueryStream) {
pieceInfo, isUnsealed, err := p.getPieceInfoFromCid(ctx, query.PayloadCID, pieceCID)
if err != nil {
log.Errorf("Retrieval query: getPieceInfoFromCid: %s", err)
if !xerrors.Is(err, retrievalmarket.ErrNotFound) {
if !errors.Is(err, retrievalmarket.ErrNotFound) {
answer.Status = retrievalmarket.QueryResponseError
answer.Message = fmt.Sprintf("failed to fetch piece to retrieve from: %s", err)
} else {
Expand Down Expand Up @@ -397,7 +401,15 @@ func (p *Provider) getPieceInfoFromCid(ctx context.Context, payloadCID, clientPi
// Get all pieces that contain the target block
piecesWithTargetBlock, err := p.dagStore.GetPiecesContainingBlock(payloadCID)
if err != nil {
return piecestore.PieceInfoUndefined, false, xerrors.Errorf("getting pieces for cid %s: %w", payloadCID, err)
// this payloadCID may be an identity CID that's in the root of a CAR but
// not recorded in the index
piecesWithTargetBlock, err := p.getCommonPiecesFromIdentityCidLinks(ctx, payloadCID)
if err != nil {
return piecestore.PieceInfoUndefined, false, err
}
if len(piecesWithTargetBlock) == 0 {
return piecestore.PieceInfoUndefined, false, fmt.Errorf("getting pieces for cid %s: %w", payloadCID, err)
}
}

// For each piece that contains the target block
Expand Down Expand Up @@ -448,6 +460,60 @@ func (p *Provider) getPieceInfoFromCid(ctx context.Context, payloadCID, clientPi
return piecestore.PieceInfoUndefined, false, xerrors.Errorf("could not locate piece: %w", lastErr)
}

// getCommonPiecesFromIdentityCidLinks will inspect a payloadCID and if it has an identity multihash,
// will determine which pieces contain all of the links within the decoded identity multihash block
func (p *Provider) getCommonPiecesFromIdentityCidLinks(ctx context.Context, payloadCID cid.Cid) ([]cid.Cid, error) {
if payloadCID.Prefix().MhType != multihash.IDENTITY {
return nil, nil
}

// decode the identity multihash, if possible (i.e. it's valid and we have the right codec loaded)
decoder, err := cidlink.DefaultLinkSystem().DecoderChooser(cidlink.Link{Cid: payloadCID})
if err != nil {
return nil, fmt.Errorf("choosing decoder for identity CID %s: %w", payloadCID, err)
}
mh, err := multihash.Decode(payloadCID.Hash())
if err != nil {
return nil, fmt.Errorf("decoding identity CID multihash %s: %w", payloadCID, err)
}
node, err := ipld.Decode(mh.Digest, decoder)
if err != nil {
return nil, fmt.Errorf("decoding identity CID %s: %w", payloadCID, err)
}
links, err := traversal.SelectLinks(node)
if err != nil {
return nil, fmt.Errorf("collecting links from identity CID %s: %w", payloadCID, err)
}

pieces := make([]cid.Cid, 0)
// for each link, query the dagstore for pieces that contain it
for i, link_ := range links {
link := link_.(cidlink.Link).Cid
piecesWithThisCid, err := p.dagStore.GetPiecesContainingBlock(link)
if len(piecesWithThisCid) == 0 {
return nil, fmt.Errorf("getting pieces for identity CID sub-link %s: %w", link, err)
}
if i == 0 {
pieces = append(pieces, piecesWithThisCid...)
} else {
// after the first, find the intersection between these pieces and the previous ones
intersection := make([]cid.Cid, 0)
for _, cj := range piecesWithThisCid {
for _, ck := range pieces {
if cj.Equals(ck) {
intersection = append(intersection, cj)
break
}
}
}
pieces = intersection
}
if len(pieces) == 0 {
break
}
}
return pieces, nil
}
func (p *Provider) pieceInUnsealedSector(ctx context.Context, pieceInfo piecestore.PieceInfo) bool {
for _, di := range pieceInfo.Deals {
isUnsealed, err := p.sa.IsUnsealed(ctx, di.SectorID, di.Offset.Unpadded(), di.Length.Unpadded())
Expand Down

0 comments on commit 6154030

Please sign in to comment.