From 81a501c80e470c9666af2cb39a81d23910fe0be0 Mon Sep 17 00:00:00 2001 From: Rod Vagg Date: Wed, 24 Aug 2022 17:32:33 +1000 Subject: [PATCH] feat: handle retrieval queries for unindexed identity payload CIDs There are valid cases where a CAR may have an identity CID as its root that is not represented as a 'block' within the CAR body and therefore isn't indexed by the dagstore. In this case, we inspect the identity CID content and treat the query as a query for the intersection of all of the links within the block. Ref: https://github.com/filecoin-project/boost/pull/715 --- retrievalmarket/impl/provider.go | 70 +++++++++++++++++++++++++++++++- 1 file changed, 68 insertions(+), 2 deletions(-) diff --git a/retrievalmarket/impl/provider.go b/retrievalmarket/impl/provider.go index 1042abc2..dcf58c6d 100644 --- a/retrievalmarket/impl/provider.go +++ b/retrievalmarket/impl/provider.go @@ -10,6 +10,10 @@ import ( "github.com/ipfs/go-cid" "github.com/ipfs/go-datastore" "github.com/ipfs/go-datastore/namespace" + "github.com/ipld/go-ipld-prime" + cidlink "github.com/ipld/go-ipld-prime/linking/cid" + "github.com/ipld/go-ipld-prime/traversal" + "github.com/multiformats/go-multihash" "golang.org/x/xerrors" "github.com/filecoin-project/go-address" @@ -341,7 +345,7 @@ func (p *Provider) HandleQueryStream(stream rmnet.RetrievalQueryStream) { pieceInfo, isUnsealed, err := p.getPieceInfoFromCid(ctx, query.PayloadCID, pieceCID) if err != nil { log.Errorf("Retrieval query: getPieceInfoFromCid: %s", err) - if !xerrors.Is(err, retrievalmarket.ErrNotFound) { + if !errors.Is(err, retrievalmarket.ErrNotFound) { answer.Status = retrievalmarket.QueryResponseError answer.Message = fmt.Sprintf("failed to fetch piece to retrieve from: %s", err) } else { @@ -397,7 +401,15 @@ func (p *Provider) getPieceInfoFromCid(ctx context.Context, payloadCID, clientPi // Get all pieces that contain the target block piecesWithTargetBlock, err := p.dagStore.GetPiecesContainingBlock(payloadCID) if err != nil { - return piecestore.PieceInfoUndefined, false, xerrors.Errorf("getting pieces for cid %s: %w", payloadCID, err) + // this payloadCID may be an identity CID that's in the root of a CAR but + // not recorded in the index + piecesWithTargetBlock, err := p.getCommonPiecesFromIdentityCidLinks(ctx, payloadCID) + if err != nil { + return piecestore.PieceInfoUndefined, false, err + } + if len(piecesWithTargetBlock) == 0 { + return piecestore.PieceInfoUndefined, false, fmt.Errorf("getting pieces for cid %s: %w", payloadCID, err) + } } // For each piece that contains the target block @@ -448,6 +460,60 @@ func (p *Provider) getPieceInfoFromCid(ctx context.Context, payloadCID, clientPi return piecestore.PieceInfoUndefined, false, xerrors.Errorf("could not locate piece: %w", lastErr) } +// getCommonPiecesFromIdentityCidLinks will inspect a payloadCID and if it has an identity multihash, +// will determine which pieces contain all of the links within the decoded identity multihash block +func (p *Provider) getCommonPiecesFromIdentityCidLinks(ctx context.Context, payloadCID cid.Cid) ([]cid.Cid, error) { + if payloadCID.Prefix().MhType != multihash.IDENTITY { + return nil, nil + } + + // decode the identity multihash, if possible (i.e. it's valid and we have the right codec loaded) + decoder, err := cidlink.DefaultLinkSystem().DecoderChooser(cidlink.Link{Cid: payloadCID}) + if err != nil { + return nil, fmt.Errorf("choosing decoder for identity CID %s: %w", payloadCID, err) + } + mh, err := multihash.Decode(payloadCID.Hash()) + if err != nil { + return nil, fmt.Errorf("decoding identity CID multihash %s: %w", payloadCID, err) + } + node, err := ipld.Decode(mh.Digest, decoder) + if err != nil { + return nil, fmt.Errorf("decoding identity CID %s: %w", payloadCID, err) + } + links, err := traversal.SelectLinks(node) + if err != nil { + return nil, fmt.Errorf("collecting links from identity CID %s: %w", payloadCID, err) + } + + pieces := make([]cid.Cid, 0) + // for each link, query the dagstore for pieces that contain it + for i, link_ := range links { + link := link_.(cidlink.Link).Cid + piecesWithThisCid, err := p.dagStore.GetPiecesContainingBlock(link) + if len(piecesWithThisCid) == 0 { + return nil, fmt.Errorf("getting pieces for identity CID sub-link %s: %w", link, err) + } + if i == 0 { + pieces = append(pieces, piecesWithThisCid...) + } else { + // after the first, find the intersection between these pieces and the previous ones + intersection := make([]cid.Cid, 0) + for _, cj := range piecesWithThisCid { + for _, ck := range pieces { + if cj.Equals(ck) { + intersection = append(intersection, cj) + break + } + } + } + pieces = intersection + } + if len(pieces) == 0 { + break + } + } + return pieces, nil +} func (p *Provider) pieceInUnsealedSector(ctx context.Context, pieceInfo piecestore.PieceInfo) bool { for _, di := range pieceInfo.Deals { isUnsealed, err := p.sa.IsUnsealed(ctx, di.SectorID, di.Offset.Unpadded(), di.Length.Unpadded())