Skip to content

Commit

Permalink
Generate valid removal advertisements (#124)
Browse files Browse the repository at this point in the history
* Generate valid removeal advertisements

Advertisements require valid metadata, even though metadata is not used for index removal.  This change creates a removal advertisement with valid, but empty, metadata.

Additional changes:
- Fix bad contextID creation in remove command
- Improve output of find, import, and remove commands
- Improve log messages: readable contextID, structured logging

Fixes issue #121

* Remove debug print
  • Loading branch information
gammazero authored Jan 6, 2022
1 parent b917992 commit 9c133b1
Show file tree
Hide file tree
Showing 6 changed files with 77 additions and 66 deletions.
7 changes: 4 additions & 3 deletions cmd/provider/find.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package main

import (
"encoding/base64"
"fmt"

httpfinderclient "github.com/filecoin-project/storetheindex/api/v0/finder/client/http"
Expand Down Expand Up @@ -52,12 +53,12 @@ func findCommand(cctx *cli.Context) error {

fmt.Println("Content providers:")
for i := range resp.MultihashResults {
fmt.Println(" Multihash:", resp.MultihashResults[i].Multihash.B58String(), "==>")
fmt.Println(" Multihash:", resp.MultihashResults[i].Multihash.B58String())
for _, pr := range resp.MultihashResults[i].ProviderResults {
fmt.Println(" Provider:", pr.Provider)
fmt.Println(" ContextID:", string(pr.ContextID))
fmt.Println(" ContextID:", base64.StdEncoding.EncodeToString(pr.ContextID))
fmt.Println(" Proto:", pr.Metadata.ProtocolID)
fmt.Println(" Metadata:", string(pr.Metadata.Data))
fmt.Println(" Metadata:", base64.StdEncoding.EncodeToString(pr.Metadata.Data))
}
}

Expand Down
9 changes: 4 additions & 5 deletions cmd/provider/import.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,6 @@ import (
"github.com/urfave/cli/v2"
)

// TODO: This should change to a code that indicates graphsync.
const providerProtocolID = 0x300001

var ImportCmd = &cli.Command{
Name: "import",
Aliases: []string{"i"},
Expand All @@ -35,7 +32,7 @@ var (
Action: doImportCar,
}
metadata = stiapi.Metadata{
ProtocolID: providerProtocolID,
ProtocolID: cardatatransfer.ContextIDCodec,
}
)

Expand Down Expand Up @@ -86,12 +83,14 @@ func doImportCar(cctx *cli.Context) error {
log.Infof("imported car successfully")
var res adminserver.ImportCarRes
if _, err := res.ReadFrom(resp.Body); err != nil {
return fmt.Errorf("received OK response from server but cannot decode response body. %v", err)
return fmt.Errorf("received ok response from server but cannot decode response body. %v", err)
}
var b bytes.Buffer
b.WriteString("Successfully imported CAR.\n")
b.WriteString("\t Advertisement ID: ")
b.WriteString(res.AdvId.String())
b.WriteString("\n\t Context ID: ")
b.WriteString(base64.StdEncoding.EncodeToString(importCarKey))
b.WriteString("\n")
_, err = cctx.App.Writer.Write(b.Bytes())
return err
Expand Down
36 changes: 18 additions & 18 deletions cmd/provider/remove.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,25 +44,24 @@ specified, they key is simply calculated as the SHA_256 hash of the given path.`
)

func beforeRemoveCar(cctx *cli.Context) error {
keyFlagSet := cctx.IsSet(keyFlag.Name)
carPathFlagSet := cctx.IsSet(carPathFlag.Name)

if keyFlagSet && carPathFlagSet {
return fmt.Errorf("only one of %s or %s must be set", keyFlag.Name, carPathFlag.Name)
}
if !keyFlagSet && !carPathFlagSet {
return fmt.Errorf("either %s or %s must be set", keyFlag.Name, carPathFlag.Name)
if !cctx.IsSet(keyFlag.Name) {
if !cctx.IsSet(optionalCarPathFlag.Name) {
return fmt.Errorf("either %s or %s must be set", keyFlag.Name, optionalCarPathFlag.Name)
}
h := sha256.New()
h.Write([]byte(optionalCarPathFlagValue))
removeCarKey = h.Sum(nil)
return nil
}

if keyFlagSet {
decoded, err := base64.StdEncoding.DecodeString(keyFlagValue)
if err != nil {
return errors.New("key is not a valid base64 encoded string")
}
removeCarKey = decoded
} else {
removeCarKey = sha256.New().Sum([]byte(carPathFlagValue))
if cctx.IsSet(optionalCarPathFlag.Name) {
return fmt.Errorf("only one of %s or %s must be set", keyFlag.Name, optionalCarPathFlag.Name)
}
decoded, err := base64.StdEncoding.DecodeString(keyFlagValue)
if err != nil {
return errors.New("key is not a valid base64 encoded string")
}
removeCarKey = decoded
return nil
}

Expand All @@ -79,15 +78,16 @@ func doRemoveCar(cctx *cli.Context) error {
return errFromHttpResp(resp)
}

log.Info("removed car successfully", "key", removeCarKey)
var res adminserver.RemoveCarRes
if _, err := res.ReadFrom(resp.Body); err != nil {
return fmt.Errorf("received OK response from server but cannot decode response body. %v", err)
return fmt.Errorf("received ok response from server but cannot decode response body. %v", err)
}
var b bytes.Buffer
b.WriteString("Successfully removed CAR.\n")
b.WriteString("\t Advertisement ID: ")
b.WriteString(res.AdvId.String())
b.WriteString("\n\t Context ID: ")
b.WriteString(base64.StdEncoding.EncodeToString(removeCarKey))
b.WriteString("\n")
_, err = cctx.App.Writer.Write(b.Bytes())
return err
Expand Down
65 changes: 38 additions & 27 deletions engine/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package engine

import (
"context"
"encoding/base64"
"errors"
"fmt"
"sync"
Expand All @@ -10,6 +11,7 @@ import (
"github.com/filecoin-project/go-legs"
"github.com/filecoin-project/go-legs/dtsync"
provider "github.com/filecoin-project/index-provider"
"github.com/filecoin-project/index-provider/cardatatransfer"
"github.com/filecoin-project/index-provider/config"
stiapi "github.com/filecoin-project/storetheindex/api/v0"
"github.com/filecoin-project/storetheindex/api/v0/ingest/schema"
Expand Down Expand Up @@ -104,7 +106,7 @@ func NewFromConfig(cfg config.Config, dt dt.Manager, host host.Host, ds datastor
log.Info("Starting new reference provider engine")
privKey, err := cfg.Identity.DecodePrivateKey("")
if err != nil {
log.Errorf("Error decoding private key from provider: %s", err)
log.Errorw("Error decoding private key from provider", "err", err)
return nil, err
}
return New(cfg.Ingest, privKey, dt, host, ds, cfg.ProviderServer.RetrievalMultiaddrs)
Expand All @@ -129,7 +131,7 @@ func (e *Engine) Start(ctx context.Context) error {

e.publisher, err = dtsync.NewPublisherFromExisting(e.dataTransfer, e.host, e.pubSubTopic, e.lsys)
if err != nil {
log.Errorf("Error initializing publisher in engine: %s", err)
log.Errorw("Error initializing publisher in engine:", "err", err)
return err
}
return nil
Expand All @@ -143,7 +145,7 @@ func (e *Engine) Start(ctx context.Context) error {
func (e *Engine) PublishLocal(ctx context.Context, adv schema.Advertisement) (cid.Cid, error) {
adLnk, err := schema.AdvertisementLink(e.lsys, adv)
if err != nil {
log.Errorf("Error generating advertisement link: %s", err)
log.Errorw("Error generating advertisement link", "err", err)
return cid.Undef, err
}

Expand All @@ -155,7 +157,7 @@ func (e *Engine) PublishLocal(ctx context.Context, adv schema.Advertisement) (ci
log.Infow("Storing advertisement locally", "cid", c.String())
err = e.putLatestAdv(c.Bytes())
if err != nil {
log.Errorf("Error storing latest advertisement in blockstore: %s", err)
log.Errorw("Error storing latest advertisement in blockstore", "err", err)
return cid.Undef, err
}
return c, nil
Expand All @@ -165,7 +167,7 @@ func (e *Engine) Publish(ctx context.Context, adv schema.Advertisement) (cid.Cid
// Store the advertisement locally.
c, err := e.PublishLocal(ctx, adv)
if err != nil {
log.Errorf("Failed to publish advertisement locally: %s", err)
log.Errorw("Failed to publish advertisement locally", "err", err)
return cid.Undef, err
}

Expand All @@ -182,14 +184,12 @@ func (e *Engine) RegisterCallback(cb provider.Callback) {
}

func (e *Engine) NotifyPut(ctx context.Context, contextID []byte, metadata stiapi.Metadata) (cid.Cid, error) {
log.Debugf("NotifyPut for context ID %s", string(contextID))
// The callback must have been registered for the linkSystem to know how to
// go from contextID to list of CIDs.
return e.publishAdvForIndex(ctx, contextID, metadata, false)
}

func (e *Engine) NotifyRemove(ctx context.Context, contextID []byte) (cid.Cid, error) {
log.Debugf("NotifyRemove for contextID %s", string(contextID))
return e.publishAdvForIndex(ctx, contextID, stiapi.Metadata{}, true)
}

Expand All @@ -199,7 +199,7 @@ func (e *Engine) Shutdown() error {
err = fmt.Errorf("error closing leg publisher: %w", err)
}
if cerr := e.cache.Close(); cerr != nil {
log.Errorf("Error closing link cache: %s", cerr)
log.Errorw("Error closing link cache", "err", cerr)
}
return err
}
Expand All @@ -208,19 +208,19 @@ func (e *Engine) GetAdv(ctx context.Context, c cid.Cid) (schema.Advertisement, e
log.Infow("Getting advertisement", "cid", c)
l, err := schema.LinkAdvFromCid(c).AsLink()
if err != nil {
log.Errorf("Error getting Advertisement link from its CID %q: %s", c, err)
log.Errorw("Error getting Advertisement link from its CID", "cid", c, "err", err)
return nil, err
}

lsys := e.vanillaLinkSystem()
n, err := lsys.Load(ipld.LinkContext{}, l, schema.Type.Advertisement)
if err != nil {
log.Errorf("Error loading advertisement from blockstore with vanilla lsys: %s", err)
log.Errorw("Error loading advertisement from blockstore with vanilla lsys", "err", err)
return nil, err
}
adv, ok := n.(schema.Advertisement)
if !ok {
log.Errorf("Stored IPLD node for cid %q is not advertisement", c)
log.Errorw("Stored IPLD node for cid is not advertisement", "cid", c)
return nil, errors.New("stored IPLD node not of advertisement type")
}
return adv, nil
Expand All @@ -230,12 +230,12 @@ func (e *Engine) GetLatestAdv(ctx context.Context) (cid.Cid, schema.Advertisemen
log.Info("Getting latest advertisement")
latestAdv, err := e.getLatestAdv()
if err != nil {
log.Errorf("Failed to fetch latest advertisement from blockstore: %s", err)
log.Errorw("Failed to fetch latest advertisement from blockstore", "err", err)
return cid.Undef, nil, err
}
ad, err := e.GetAdv(ctx, latestAdv)
if err != nil {
log.Errorf("Latest advertisement could not be retrieved from blockstore using its CID: %s", err)
log.Errorw("Latest advertisement could not be retrieved from blockstore by CID", "err", err)
return cid.Undef, nil, err
}
return latestAdv, ad, nil
Expand All @@ -251,10 +251,12 @@ func (e *Engine) publishAdvForIndex(ctx context.Context, contextID []byte, metad
return cid.Undef, provider.ErrNoCallback
}

log := log.With("contextID", base64.StdEncoding.EncodeToString(contextID))

c, err := e.getKeyCidMap(contextID)
if err != nil {
if err != datastore.ErrNotFound {
log.Errorf("Could not get mapping between contextID and CID of linked list (%s): %s", string(contextID), err)
log.Errorw("Could not get mapping between contextID and CID of linked list", "err", err)
return cid.Undef, err
}
}
Expand All @@ -274,7 +276,7 @@ func (e *Engine) publishAdvForIndex(ctx context.Context, contextID []byte, metad
// advertisement and used for ingestion.
lnk, err := e.generateChunks(mhIter)
if err != nil {
log.Errorf("Error generating link from list of CIDs for contextID (%s): %s", string(contextID), err)
log.Errorw("Error generating link from list of CIDs", "err", err)
return cid.Undef, err
}
cidsLnk = lnk.(cidlink.Link)
Expand All @@ -283,15 +285,15 @@ func (e *Engine) publishAdvForIndex(ctx context.Context, contextID []byte, metad
// list of Cids.
err = e.putKeyCidMap(contextID, cidsLnk.Cid)
if err != nil {
log.Errorf("Could not set mapping between contextID and CID of linked list (%s): %s", string(contextID), err)
log.Errorw("Could not set mapping between contextID and CID of linked list", "err", err)
return cid.Undef, err
}
} else {
// Lookup metadata for this contextID.
prevMetadata, err := e.getKeyMetadataMap(contextID)
if err != nil {
if err != datastore.ErrNotFound {
log.Errorf("Could not get metadata for existing chain: %s", err)
log.Errorw("Could not get metadata for existing chain", "err", err)
return cid.Undef, err
}
log.Warn("No metadata for existing chain, generating new advertisement")
Expand All @@ -308,7 +310,7 @@ func (e *Engine) publishAdvForIndex(ctx context.Context, contextID []byte, metad
}

if err = e.putKeyMetadataMap(contextID, metadata); err != nil {
log.Errorf("Could not set mapping between contextID (%s) and metadata: %s", string(contextID), err)
log.Errorw("Could not set mapping between contextID and metadata", "err", err)
return cid.Undef, err
}
} else {
Expand All @@ -317,32 +319,41 @@ func (e *Engine) publishAdvForIndex(ctx context.Context, contextID []byte, metad
}
log.Info("Generating removal list for advertisement")

// If we are removing, we already know the relationship key-cid of the
// list, so we can add it right away in the advertisement.
cidsLnk = cidlink.Link{Cid: c}
// And if we are removing it means we probably do not have the list of
// CIDs anymore, so we can remove the entry from the datastore.
err = e.deleteKeyCidMap(contextID)
if err != nil {
log.Errorf("Failed deleting Key-Cid map for contextID (%s): %s", string(contextID), err)
log.Errorw("Failed deleting Key-Cid map for contextID", "err", err)
return cid.Undef, err
}
err = e.deleteCidKeyMap(c)
if err != nil {
log.Errorf("Failed deleting Cid-Key map for lookup cid (%s): %s", c, err)
log.Errorw("Failed deleting Cid-Key map for lookup cid", "cid", c, "err", err)
return cid.Undef, err
}
err = e.deleteKeyMetadataMap(contextID)
if err != nil {
log.Errorf("Failed deleting Key-Metadata map for contextID (%s): %s", string(contextID), err)
log.Errorw("Failed deleting Key-Metadata map for contextID", "err", err)
return cid.Undef, err
}

// Provide the CID link to the content entries to give the indexer the
// option to remove all individual entries. If a contextID is given
// then the indexer will delete all content for that contextID and not
// need to retrieve the entries.
cidsLnk = cidlink.Link{Cid: c}

// The advertisement still requires a valid metadata even though it is
// not used for removal. Create a valid empty metadata.
metadata = stiapi.Metadata{
ProtocolID: cardatatransfer.ContextIDCodec, //providerProtocolID,
}
}

// Get the latest advertisement that was generated
latestAdvID, err := e.getLatestAdv()
if err != nil {
log.Errorf("Could not get latest advertisement: %s", err)
log.Errorw("Could not get latest advertisement", "err", err)
return cid.Undef, err
}
var previousLnk schema.Link_Advertisement
Expand All @@ -355,7 +366,7 @@ func (e *Engine) publishAdvForIndex(ctx context.Context, contextID []byte, metad
nb := schema.Type.Link_Advertisement.NewBuilder()
err = nb.AssignLink(cidlink.Link{Cid: latestAdvID})
if err != nil {
log.Errorf("Error generating link from latest advertisement: %s", err)
log.Errorw("Error generating link from latest advertisement", "err", err)
return cid.Undef, err
}
previousLnk = nb.Build().(schema.Link_Advertisement)
Expand All @@ -364,7 +375,7 @@ func (e *Engine) publishAdvForIndex(ctx context.Context, contextID []byte, metad
adv, err := schema.NewAdvertisement(e.privKey, previousLnk, cidsLnk,
contextID, metadata, isRm, e.host.ID().String(), e.addrs)
if err != nil {
log.Errorf("Error generating new advertisement: %s", err)
log.Errorw("Error generating new advertisement", "err", err)
return cid.Undef, err
}
return e.Publish(ctx, adv)
Expand Down
Loading

0 comments on commit 9c133b1

Please sign in to comment.