Skip to content

Commit

Permalink
node: Forward GETRANGEHASH requests
Browse files Browse the repository at this point in the history
Do it as it is already done for GET, HEAD, GETRANGE. In the case of a container
node that does not have an object locally, the node spawns GETRANGE request in
order to hash it. That is not allowed operation in the NeoFS. Even with nspcc-dev#1884,
GET may fail because the node may not be a container part. Moreover, attached
bearer token is not allowed for the node's key usage so that is another way to
get unexpected results. Forwarding requests is the only sane fix for the nspcc-dev#2541.
The code smells but this is not this commit's responsibility: it is hard to fix
that bug nicely without a get service refactor.
Closes nspcc-dev#2541.

Signed-off-by: Pavel Karpy <carpawell@nspcc.ru>
  • Loading branch information
carpawell committed Oct 23, 2023
1 parent fe69589 commit 2f4977c
Show file tree
Hide file tree
Showing 7 changed files with 99 additions and 11 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ Changelog for NeoFS Node

### Fixed
- `neofs-cli netmap netinfo` documentation (#2555)
- `GETRANGEHASH` to a node without an object produced `GETRANGE` or `GET` requests (#2541)

### Changed
- FSTree storage now uses more efficient and safe temporary files under Linux (#2566)
Expand Down
7 changes: 7 additions & 0 deletions pkg/services/object/get/exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -356,8 +356,15 @@ func (exec execCtx) isForwardingEnabled() bool {
return exec.prm.forwarder != nil
}

// isRangeForwardingEnabled returns true if common execution
// parameters has GETRANGEHASH request forwarding closure set.
func (exec execCtx) isRangeForwardingEnabled() bool {
return exec.prm.rangeForwarder != nil
}

// disableForwarding removes request forwarding closure from common
// parameters, so it won't be inherited in new execution contexts.
func (exec *execCtx) disableForwarding() {
exec.prm.SetRequestForwarder(nil)
exec.prm.SetRangeRequestForwarder(nil)
}
7 changes: 7 additions & 0 deletions pkg/services/object/get/get.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,13 @@ func (s *Service) GetRangeHash(ctx context.Context, prm RangeHashPrm) (*RangeHas
return nil, err
}

if rngPrm.forwardedRangeHashResponse != nil {
// forwarder request case; no need to collect the other
// parts, the whole response has already been received
hashes = rngPrm.forwardedRangeHashResponse
break
}

hashes = append(hashes, h.Sum(nil))
}

Expand Down
13 changes: 11 additions & 2 deletions pkg/services/object/get/prm.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package getsvc

import (
"context"
"crypto/ecdsa"
"errors"
"hash"
Expand All @@ -21,6 +22,8 @@ type RangePrm struct {
commonPrm

rng *object.Range

forwardedRangeHashResponse [][]byte
}

var (
Expand Down Expand Up @@ -58,7 +61,8 @@ type RangeHashPrm struct {
salt []byte
}

type RequestForwarder func(coreclient.NodeInfo, coreclient.MultiAddressClient) (*object.Object, error)
type RequestForwarder func(context.Context, coreclient.NodeInfo, coreclient.MultiAddressClient) (*object.Object, error)
type RangeRequestForwarder func(context.Context, coreclient.NodeInfo, coreclient.MultiAddressClient) ([][]byte, error)

// HeadPrm groups parameters of Head service call.
type HeadPrm struct {
Expand All @@ -74,7 +78,8 @@ type commonPrm struct {

raw bool

forwarder RequestForwarder
forwarder RequestForwarder
rangeForwarder RangeRequestForwarder

// signerKey is a cached key that should be used for spawned
// requests (if any), could be nil if incoming request handling
Expand Down Expand Up @@ -141,6 +146,10 @@ func (p *commonPrm) SetRequestForwarder(f RequestForwarder) {
p.forwarder = f
}

func (p *commonPrm) SetRangeRequestForwarder(f RangeRequestForwarder) {
p.rangeForwarder = f
}

// WithAddress sets object address to be read.
func (p *commonPrm) WithAddress(addr oid.Address) {
p.addr = addr
Expand Down
4 changes: 3 additions & 1 deletion pkg/services/object/get/remote.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,9 @@ func (exec *execCtx) processNode(info client.NodeInfo) bool {

// both object and err are nil only if the original
// request was forwarded to another node and the object
// has already been streamed to the requesting party
// has already been streamed to the requesting party,
// or it is a GETRANGEHASH forwarded request whose
// response is not an object
if obj != nil {
exec.collectedObject = obj
exec.writeCollectedObject()
Expand Down
7 changes: 6 additions & 1 deletion pkg/services/object/get/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ func (c *clientCacheWrapper) get(info coreclient.NodeInfo) (getClient, error) {

func (c *clientWrapper) getObject(exec *execCtx, info coreclient.NodeInfo) (*object.Object, error) {
if exec.isForwardingEnabled() {
return exec.prm.forwarder(info, c.client)
return exec.prm.forwarder(exec.ctx, info, c.client)
}

key, err := exec.key()
Expand Down Expand Up @@ -123,6 +123,11 @@ func (c *clientWrapper) getObject(exec *execCtx, info coreclient.NodeInfo) (*obj
// we don't specify payload writer because we accumulate
// the object locally (even huge).
if rng := exec.ctxRange(); rng != nil {
if exec.isRangeForwardingEnabled() {
exec.prm.forwardedRangeHashResponse, err = exec.prm.rangeForwarder(exec.ctx, info, c.client)
return nil, err
}

var prm internalclient.PayloadRangePrm

prm.SetContext(exec.context())
Expand Down
71 changes: 64 additions & 7 deletions pkg/services/object/get/v2/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package getsvc

import (
"context"
"crypto/ecdsa"
"crypto/sha256"
"errors"
"fmt"
Expand Down Expand Up @@ -70,7 +71,7 @@ func (s *Service) toPrm(req *objectV2.GetRequest, stream objectSvc.GetObjectStre
var onceHeaderSending sync.Once
var globalProgress int

p.SetRequestForwarder(groupAddressRequestForwarder(func(addr network.Address, c client.MultiAddressClient, pubkey []byte) (*object.Object, error) {
p.SetRequestForwarder(groupAddressRequestForwarder(func(ctx context.Context, addr network.Address, c client.MultiAddressClient, pubkey []byte) (*object.Object, error) {
var err error

key, err := s.keyStorage.GetKey(nil)
Expand Down Expand Up @@ -245,7 +246,7 @@ func (s *Service) toRangePrm(req *objectV2.GetRangeRequest, stream objectSvc.Get
return nil, err
}

p.SetRequestForwarder(groupAddressRequestForwarder(func(addr network.Address, c client.MultiAddressClient, pubkey []byte) (*object.Object, error) {
p.SetRequestForwarder(groupAddressRequestForwarder(func(ctx context.Context, addr network.Address, c client.MultiAddressClient, pubkey []byte) (*object.Object, error) {
var err error

// once compose and resign forwarding request
Expand Down Expand Up @@ -405,6 +406,62 @@ func (s *Service) toHashRangePrm(req *objectV2.GetRangeHashRequest) (*getsvc.Ran
})
}

if !commonPrm.LocalOnly() {
var onceResign sync.Once
var key *ecdsa.PrivateKey

key, err = s.keyStorage.GetKey(nil)
if err != nil {
return nil, err
}

p.SetRangeRequestForwarder(groupAddressRequestForwarder(func(ctx context.Context, addr network.Address, c client.MultiAddressClient, pubkey []byte) ([][]byte, error) {
meta := req.GetMetaHeader()

// once compose and resign forwarding request
onceResign.Do(func() {
// compose meta header of the local server
metaHdr := new(session.RequestMetaHeader)
metaHdr.SetTTL(meta.GetTTL() - 1)
// TODO: #1165 think how to set the other fields
metaHdr.SetOrigin(meta)
writeCurrentVersion(metaHdr)

req.SetMetaHeader(metaHdr)

err = signature.SignServiceMessage(key, req)
})
if err != nil {
return nil, err
}

var resp *objectV2.GetRangeHashResponse
err = c.RawForAddress(addr, func(cli *rpcclient.Client) error {
resp, err = rpc.HashObjectRange(cli, req, rpcclient.WithContext(ctx))
return err
})
if err != nil {
return nil, fmt.Errorf("GetRangeHash rpc failure: %w", err)
}

// verify response key
if err = internal.VerifyResponseKeyV2(pubkey, resp); err != nil {
return nil, err
}

// verify response structure
if err := signature.VerifyServiceMessage(resp); err != nil {
return nil, fmt.Errorf("could not verify %T: %w", resp, err)
}

if err = checkStatus(resp.GetMetaHeader().GetStatus()); err != nil {
return nil, err
}

return resp.GetBody().GetHashList(), nil
}))
}

return p, nil
}

Expand Down Expand Up @@ -459,7 +516,7 @@ func (s *Service) toHeadPrm(ctx context.Context, req *objectV2.HeadRequest, resp
if !commonPrm.LocalOnly() {
var onceResign sync.Once

p.SetRequestForwarder(groupAddressRequestForwarder(func(addr network.Address, c client.MultiAddressClient, pubkey []byte) (*object.Object, error) {
p.SetRequestForwarder(groupAddressRequestForwarder(func(ctx context.Context, addr network.Address, c client.MultiAddressClient, pubkey []byte) (*object.Object, error) {
var err error

key, err := s.keyStorage.GetKey(nil)
Expand Down Expand Up @@ -654,11 +711,11 @@ func toShortObjectHeader(hdr *object.Object) objectV2.GetHeaderPart {
return sh
}

func groupAddressRequestForwarder(f func(network.Address, client.MultiAddressClient, []byte) (*object.Object, error)) getsvc.RequestForwarder {
return func(info client.NodeInfo, c client.MultiAddressClient) (*object.Object, error) {
func groupAddressRequestForwarder[V any](f func(context.Context, network.Address, client.MultiAddressClient, []byte) (V, error)) func(context.Context, client.NodeInfo, client.MultiAddressClient) (V, error) {
return func(ctx context.Context, info client.NodeInfo, c client.MultiAddressClient) (V, error) {
var (
firstErr error
res *object.Object
res V

key = info.PublicKey()
)
Expand All @@ -676,7 +733,7 @@ func groupAddressRequestForwarder(f func(network.Address, client.MultiAddressCli
// would be nice to log otherwise
}()

res, err = f(addr, c, key)
res, err = f(ctx, addr, c, key)

return
})
Expand Down

0 comments on commit 2f4977c

Please sign in to comment.