Skip to content

Commit

Permalink
Merge pull request #3686 from nspcc-dev/fix-mode
Browse files Browse the repository at this point in the history
cli: fix `skip-blocks-uploading` mode
  • Loading branch information
AnnaShaleva authored Nov 15, 2024
2 parents b97d0b2 + 5f6284d commit 4a96bd1
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 17 deletions.
3 changes: 2 additions & 1 deletion cli/util/convert.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ func NewCommands() []*cli.Command {
return nil
},
},
options.Debug,
}, options.RPC...)
uploadBinFlags = append(uploadBinFlags, options.Wallet...)
return []*cli.Command{
Expand Down Expand Up @@ -183,7 +184,7 @@ func NewCommands() []*cli.Command {
{
Name: "upload-bin",
Usage: "Fetch blocks from RPC node and upload them to the NeoFS container",
UsageText: "neo-go util upload-bin --fs-rpc-endpoint <address1>[,<address2>[...]] --container <cid> --block-attribute block --index-attribute index --rpc-endpoint <node> [--timeout <time>] --wallet <wallet> [--wallet-config <config>] [--address <address>] [--workers <num>] [--searchers <num>] [--index-file-size <size>] [--skip-blocks-uploading] [--retries <num>]",
UsageText: "neo-go util upload-bin --fs-rpc-endpoint <address1>[,<address2>[...]] --container <cid> --block-attribute block --index-attribute index --rpc-endpoint <node> [--timeout <time>] --wallet <wallet> [--wallet-config <config>] [--address <address>] [--workers <num>] [--searchers <num>] [--index-file-size <size>] [--skip-blocks-uploading] [--retries <num>] [--debug]",
Action: uploadBin,
Flags: uploadBinFlags,
},
Expand Down
48 changes: 33 additions & 15 deletions cli/util/upload_bin.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ func uploadBin(ctx *cli.Context) error {
numWorkers := ctx.Int("workers")
maxParallelSearches := ctx.Int("searchers")
maxRetries := int(ctx.Uint("retries"))
debug := ctx.Bool("debug")
acc, _, err := options.GetAccFromContext(ctx)
if err != nil {
return cli.Exit(fmt.Sprintf("failed to load account: %v", err), 1)
Expand Down Expand Up @@ -142,13 +143,14 @@ func uploadBin(ctx *cli.Context) error {
fmt.Fprintln(ctx.App.Writer, "First block of latest incomplete batch uploaded to NeoFS container:", oldestMissingBlockIndex)

if !ctx.Bool("skip-blocks-uploading") {
err = uploadBlocks(ctx, p, rpc, signer, containerID, acc, attr, oldestMissingBlockIndex, uint(currentBlockHeight), homomorphicHashingDisabled, numWorkers, maxRetries)
err = uploadBlocks(ctx, p, rpc, signer, containerID, acc, attr, oldestMissingBlockIndex, uint(currentBlockHeight), homomorphicHashingDisabled, numWorkers, maxRetries, debug)
if err != nil {
return cli.Exit(fmt.Errorf("failed to upload blocks: %w", err), 1)
}
oldestMissingBlockIndex = int(currentBlockHeight) + 1
}

err = uploadIndexFiles(ctx, p, containerID, acc, signer, uint(oldestMissingBlockIndex), attr, homomorphicHashingDisabled, maxParallelSearches, maxRetries)
err = uploadIndexFiles(ctx, p, containerID, acc, signer, uint(oldestMissingBlockIndex), attr, homomorphicHashingDisabled, maxParallelSearches, maxRetries, debug)
if err != nil {
return cli.Exit(fmt.Errorf("failed to upload index files: %w", err), 1)
}
Expand Down Expand Up @@ -241,7 +243,7 @@ func fetchLatestMissingBlockIndex(ctx context.Context, p *pool.Pool, containerID
}

// uploadBlocks uploads the blocks to the container using the pool.
func uploadBlocks(ctx *cli.Context, p *pool.Pool, rpc *rpcclient.Client, signer user.Signer, containerID cid.ID, acc *wallet.Account, attr string, oldestMissingBlockIndex int, currentBlockHeight uint, homomorphicHashingDisabled bool, numWorkers, maxRetries int) error {
func uploadBlocks(ctx *cli.Context, p *pool.Pool, rpc *rpcclient.Client, signer user.Signer, containerID cid.ID, acc *wallet.Account, attr string, oldestMissingBlockIndex int, currentBlockHeight uint, homomorphicHashingDisabled bool, numWorkers, maxRetries int, debug bool) error {
if oldestMissingBlockIndex > int(currentBlockHeight) {
fmt.Fprintf(ctx.App.Writer, "No new blocks to upload. Need to upload starting from %d, current height %d\n", oldestMissingBlockIndex, currentBlockHeight)
return nil
Expand Down Expand Up @@ -295,7 +297,14 @@ func uploadBlocks(ctx *cli.Context, p *pool.Pool, rpc *rpcclient.Client, signer

objBytes := bw.Bytes()
errRetr := retry(func() error {
return uploadObj(ctx.Context, p, signer, acc.PrivateKey().GetScriptHash(), containerID, objBytes, attrs, homomorphicHashingDisabled)
resOid, errUpload := uploadObj(ctx.Context, p, signer, acc.PrivateKey().GetScriptHash(), containerID, objBytes, attrs, homomorphicHashingDisabled)
if errUpload != nil {
return errUpload
}
if debug {
fmt.Fprintf(ctx.App.Writer, "Uploaded block %d with object ID: %s\n", blockIndex, resOid.String())
}
return errUpload
}, maxRetries)
if errRetr != nil {
select {
Expand Down Expand Up @@ -325,7 +334,7 @@ func uploadBlocks(ctx *cli.Context, p *pool.Pool, rpc *rpcclient.Client, signer
}

// uploadIndexFiles uploads missing index files to the container.
func uploadIndexFiles(ctx *cli.Context, p *pool.Pool, containerID cid.ID, account *wallet.Account, signer user.Signer, oldestMissingBlockIndex uint, blockAttributeKey string, homomorphicHashingDisabled bool, maxParallelSearches, maxRetries int) error {
func uploadIndexFiles(ctx *cli.Context, p *pool.Pool, containerID cid.ID, account *wallet.Account, signer user.Signer, oldestMissingBlockIndex uint, blockAttributeKey string, homomorphicHashingDisabled bool, maxParallelSearches, maxRetries int, debug bool) error {
attributeKey := ctx.String("index-attribute")
indexFileSize := ctx.Uint("index-file-size")
fmt.Fprintln(ctx.App.Writer, "Uploading index files...")
Expand Down Expand Up @@ -460,7 +469,14 @@ func uploadIndexFiles(ctx *cli.Context, p *pool.Pool, containerID cid.ID, accoun
*object.NewAttribute("IndexSize", strconv.Itoa(int(indexFileSize))),
}
err := retry(func() error {
return uploadObj(ctx.Context, p, signer, account.PrivateKey().GetScriptHash(), containerID, buffer, attrs, homomorphicHashingDisabled)
resOid, errUpload := uploadObj(ctx.Context, p, signer, account.PrivateKey().GetScriptHash(), containerID, buffer, attrs, homomorphicHashingDisabled)
if errUpload != nil {
return errUpload
}
if debug {
fmt.Fprintf(ctx.App.Writer, "Uploaded idex file %d with object ID: %s\n", i, resOid.String())
}
return errUpload
}, maxRetries)
if err != nil {
select {
Expand Down Expand Up @@ -541,14 +557,15 @@ func searchObjects(ctx context.Context, p *pool.Pool, containerID cid.ID, accoun
}

// uploadObj uploads object to the container using provided settings.
func uploadObj(ctx context.Context, p *pool.Pool, signer user.Signer, owner util.Uint160, containerID cid.ID, objData []byte, attrs []object.Attribute, homomorphicHashingDisabled bool) error {
func uploadObj(ctx context.Context, p *pool.Pool, signer user.Signer, owner util.Uint160, containerID cid.ID, objData []byte, attrs []object.Attribute, homomorphicHashingDisabled bool) (oid.ID, error) {
var (
ownerID user.ID
hdr object.Object
chSHA256 checksum.Checksum
chHomomorphic checksum.Checksum
v = new(version.Version)
prmObjectPutInit client.PrmObjectPutInit
resOID = oid.ID{}
)

ownerID.SetScriptHash(owner)
Expand All @@ -569,31 +586,32 @@ func uploadObj(ctx context.Context, p *pool.Pool, signer user.Signer, owner util

err := hdr.SetIDWithSignature(signer)
if err != nil {
return err
return resOID, err
}
err = hdr.CheckHeaderVerificationFields()
if err != nil {
return err
return resOID, err
}

writer, err := p.ObjectPutInit(ctx, hdr, signer, prmObjectPutInit)
if err != nil {
return fmt.Errorf("failed to initiate object upload: %w", err)
return resOID, fmt.Errorf("failed to initiate object upload: %w", err)
}
_, err = writer.Write(objData)
if err != nil {
_ = writer.Close()
return fmt.Errorf("failed to write object data: %w", err)
return resOID, fmt.Errorf("failed to write object data: %w", err)
}
err = writer.Close()
if err != nil {
return fmt.Errorf("failed to close object writer: %w", err)
return resOID, fmt.Errorf("failed to close object writer: %w", err)
}
res := writer.GetResult()
if res.StoredObjectID().Equals(oid.ID{}) {
return fmt.Errorf("object ID is empty")
resOID = res.StoredObjectID()
if resOID.Equals(oid.ID{}) {
return resOID, fmt.Errorf("object ID is empty")
}
return nil
return resOID, nil
}

func getBlockIndex(header object.Object, attribute string) (int, error) {
Expand Down
3 changes: 2 additions & 1 deletion docs/neofs-blockstorage.md
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ NAME:
neo-go util upload-bin - Fetch blocks from RPC node and upload them to the NeoFS container
USAGE:
neo-go util upload-bin --fs-rpc-endpoint <address1>[,<address2>[...]] --container <cid> --block-attribute block --index-attribute index --rpc-endpoint <node> [--timeout <time>] --wallet <wallet> [--wallet-config <config>] [--address <address>] [--workers <num>] [--searchers <num>] [--index-file-size <size>] [--skip-blocks-uploading] [--retries <num>]
neo-go util upload-bin --fs-rpc-endpoint <address1>[,<address2>[...]] --container <cid> --block-attribute block --index-attribute index --rpc-endpoint <node> [--timeout <time>] --wallet <wallet> [--wallet-config <config>] [--address <address>] [--workers <num>] [--searchers <num>] [--index-file-size <size>] [--skip-blocks-uploading] [--retries <num>] [--debug]
OPTIONS:
--fs-rpc-endpoint value, --fsr value [ --fs-rpc-endpoint value, --fsr value ] List of NeoFS storage node RPC addresses (comma-separated or multiple --fs-rpc-endpoint flags)
Expand All @@ -100,6 +100,7 @@ OPTIONS:
--searchers value Number of concurrent searches for blocks (default: 20)
--skip-blocks-uploading Skip blocks uploading and upload only index files (default: false)
--retries value Maximum number of Neo/NeoFS node request retries (default: 5)
--debug, -d Enable debug logging (LOTS of output, overrides configuration) (default: false)
--rpc-endpoint value, -r value RPC node address
--timeout value, -s value Timeout for the operation (default: 10s)
--wallet value, -w value Wallet to use to get the key for transaction signing; conflicts with --wallet-config flag
Expand Down

0 comments on commit 4a96bd1

Please sign in to comment.