Skip to content

Commit

Permalink
fix/GETRANGEHASH to a node without an object (#2557)
Browse files Browse the repository at this point in the history
Closes #2541.
  • Loading branch information
roman-khimov authored Sep 21, 2023
2 parents 4fdec62 + f5f9b71 commit 9207437
Show file tree
Hide file tree
Showing 7 changed files with 102 additions and 14 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ Changelog for NeoFS Node

### Fixed
- `neofs-cli netmap netinfo` documentation (#2555)
- `GETRANGEHASH` to a node without an object produced `GETRANGE` or `GET` requests (#2541)

### Changed
- FSTree storage now uses more efficient and safe temporary files under Linux (#2566)
Expand Down
7 changes: 7 additions & 0 deletions pkg/services/object/get/exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -356,8 +356,15 @@ func (exec execCtx) isForwardingEnabled() bool {
return exec.prm.forwarder != nil
}

// isRangeForwardingEnabled returns true if common execution
// parameters has GETRANGEHASH request forwarding closure set.
func (exec execCtx) isRangeForwardingEnabled() bool {
return exec.prm.rangeForwarder != nil
}

// disableForwarding removes request forwarding closure from common
// parameters, so it won't be inherited in new execution contexts.
func (exec *execCtx) disableForwarding() {
exec.prm.SetRequestForwarder(nil)
exec.prm.SetRangeRequestForwarder(nil)
}
7 changes: 7 additions & 0 deletions pkg/services/object/get/get.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,13 @@ func (s *Service) GetRangeHash(ctx context.Context, prm RangeHashPrm) (*RangeHas
return nil, err
}

if rngPrm.forwardedRangeHashResponse != nil {
// forwarder request case; no need to collect the other
// parts, the whole response has already been received
hashes = rngPrm.forwardedRangeHashResponse
break
}

hashes = append(hashes, h.Sum(nil))
}

Expand Down
13 changes: 11 additions & 2 deletions pkg/services/object/get/prm.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package getsvc

import (
"context"
"crypto/ecdsa"
"errors"
"hash"
Expand All @@ -21,6 +22,8 @@ type RangePrm struct {
commonPrm

rng *object.Range

forwardedRangeHashResponse [][]byte
}

var (
Expand Down Expand Up @@ -58,7 +61,8 @@ type RangeHashPrm struct {
salt []byte
}

type RequestForwarder func(coreclient.NodeInfo, coreclient.MultiAddressClient) (*object.Object, error)
type RequestForwarder func(context.Context, coreclient.NodeInfo, coreclient.MultiAddressClient) (*object.Object, error)
type RangeRequestForwarder func(context.Context, coreclient.NodeInfo, coreclient.MultiAddressClient) ([][]byte, error)

// HeadPrm groups parameters of Head service call.
type HeadPrm struct {
Expand All @@ -74,7 +78,8 @@ type commonPrm struct {

raw bool

forwarder RequestForwarder
forwarder RequestForwarder
rangeForwarder RangeRequestForwarder

// signerKey is a cached key that should be used for spawned
// requests (if any), could be nil if incoming request handling
Expand Down Expand Up @@ -141,6 +146,10 @@ func (p *commonPrm) SetRequestForwarder(f RequestForwarder) {
p.forwarder = f
}

func (p *commonPrm) SetRangeRequestForwarder(f RangeRequestForwarder) {
p.rangeForwarder = f
}

// WithAddress sets object address to be read.
func (p *commonPrm) WithAddress(addr oid.Address) {
p.addr = addr
Expand Down
4 changes: 3 additions & 1 deletion pkg/services/object/get/remote.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,9 @@ func (exec *execCtx) processNode(info client.NodeInfo) bool {

// both object and err are nil only if the original
// request was forwarded to another node and the object
// has already been streamed to the requesting party
// has already been streamed to the requesting party,
// or it is a GETRANGEHASH forwarded request whose
// response is not an object
if obj != nil {
exec.collectedObject = obj
exec.writeCollectedObject()
Expand Down
7 changes: 6 additions & 1 deletion pkg/services/object/get/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ func (c *clientCacheWrapper) get(info coreclient.NodeInfo) (getClient, error) {

func (c *clientWrapper) getObject(exec *execCtx, info coreclient.NodeInfo) (*object.Object, error) {
if exec.isForwardingEnabled() {
return exec.prm.forwarder(info, c.client)
return exec.prm.forwarder(exec.ctx, info, c.client)
}

key, err := exec.key()
Expand Down Expand Up @@ -123,6 +123,11 @@ func (c *clientWrapper) getObject(exec *execCtx, info coreclient.NodeInfo) (*obj
// we don't specify payload writer because we accumulate
// the object locally (even huge).
if rng := exec.ctxRange(); rng != nil {
if exec.isRangeForwardingEnabled() {
exec.prm.forwardedRangeHashResponse, err = exec.prm.rangeForwarder(exec.ctx, info, c.client)
return nil, err
}

var prm internalclient.PayloadRangePrm

prm.SetContext(exec.context())
Expand Down
77 changes: 67 additions & 10 deletions pkg/services/object/get/v2/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package getsvc

import (
"context"
"crypto/ecdsa"
"crypto/sha256"
"errors"
"fmt"
Expand Down Expand Up @@ -70,7 +71,7 @@ func (s *Service) toPrm(req *objectV2.GetRequest, stream objectSvc.GetObjectStre
var onceHeaderSending sync.Once
var globalProgress int

p.SetRequestForwarder(groupAddressRequestForwarder(func(addr network.Address, c client.MultiAddressClient, pubkey []byte) (*object.Object, error) {
p.SetRequestForwarder(groupAddressRequestForwarder(func(ctx context.Context, addr network.Address, c client.MultiAddressClient, pubkey []byte) (*object.Object, error) {
var err error

key, err := s.keyStorage.GetKey(nil)
Expand Down Expand Up @@ -102,7 +103,7 @@ func (s *Service) toPrm(req *objectV2.GetRequest, stream objectSvc.GetObjectStre
// open stream
var getStream *rpc.GetResponseReader
err = c.RawForAddress(addr, func(cli *rpcclient.Client) error {
getStream, err = rpc.GetObject(cli, req, rpcclient.WithContext(stream.Context()))
getStream, err = rpc.GetObject(cli, req, rpcclient.WithContext(ctx))
return err
})
if err != nil {
Expand Down Expand Up @@ -245,7 +246,7 @@ func (s *Service) toRangePrm(req *objectV2.GetRangeRequest, stream objectSvc.Get
return nil, err
}

p.SetRequestForwarder(groupAddressRequestForwarder(func(addr network.Address, c client.MultiAddressClient, pubkey []byte) (*object.Object, error) {
p.SetRequestForwarder(groupAddressRequestForwarder(func(ctx context.Context, addr network.Address, c client.MultiAddressClient, pubkey []byte) (*object.Object, error) {
var err error

// once compose and resign forwarding request
Expand All @@ -272,7 +273,7 @@ func (s *Service) toRangePrm(req *objectV2.GetRangeRequest, stream objectSvc.Get
// open stream
var rangeStream *rpc.ObjectRangeResponseReader
err = c.RawForAddress(addr, func(cli *rpcclient.Client) error {
rangeStream, err = rpc.GetObjectRange(cli, req, rpcclient.WithContext(stream.Context()))
rangeStream, err = rpc.GetObjectRange(cli, req, rpcclient.WithContext(ctx))
return err
})
if err != nil {
Expand Down Expand Up @@ -405,6 +406,62 @@ func (s *Service) toHashRangePrm(req *objectV2.GetRangeHashRequest) (*getsvc.Ran
})
}

if !commonPrm.LocalOnly() {
var onceResign sync.Once
var key *ecdsa.PrivateKey

key, err = s.keyStorage.GetKey(nil)
if err != nil {
return nil, err
}

p.SetRangeRequestForwarder(groupAddressRequestForwarder(func(ctx context.Context, addr network.Address, c client.MultiAddressClient, pubkey []byte) ([][]byte, error) {
meta := req.GetMetaHeader()

// once compose and resign forwarding request
onceResign.Do(func() {
// compose meta header of the local server
metaHdr := new(session.RequestMetaHeader)
metaHdr.SetTTL(meta.GetTTL() - 1)
// TODO: #1165 think how to set the other fields
metaHdr.SetOrigin(meta)
writeCurrentVersion(metaHdr)

req.SetMetaHeader(metaHdr)

err = signature.SignServiceMessage(key, req)
})
if err != nil {
return nil, err
}

var resp *objectV2.GetRangeHashResponse
err = c.RawForAddress(addr, func(cli *rpcclient.Client) error {
resp, err = rpc.HashObjectRange(cli, req, rpcclient.WithContext(ctx))
return err
})
if err != nil {
return nil, fmt.Errorf("GetRangeHash rpc failure: %w", err)
}

// verify response key
if err = internal.VerifyResponseKeyV2(pubkey, resp); err != nil {
return nil, err
}

// verify response structure
if err := signature.VerifyServiceMessage(resp); err != nil {
return nil, fmt.Errorf("could not verify %T: %w", resp, err)
}

if err = checkStatus(resp.GetMetaHeader().GetStatus()); err != nil {
return nil, err
}

return resp.GetBody().GetHashList(), nil
}))
}

return p, nil
}

Expand All @@ -424,7 +481,7 @@ func (w *headResponseWriter) WriteHeader(hdr *object.Object) error {
return nil
}

func (s *Service) toHeadPrm(ctx context.Context, req *objectV2.HeadRequest, resp *objectV2.HeadResponse) (*getsvc.HeadPrm, error) {
func (s *Service) toHeadPrm(_ context.Context, req *objectV2.HeadRequest, resp *objectV2.HeadResponse) (*getsvc.HeadPrm, error) {
body := req.GetBody()

addrV2 := body.GetAddress()
Expand Down Expand Up @@ -459,7 +516,7 @@ func (s *Service) toHeadPrm(ctx context.Context, req *objectV2.HeadRequest, resp
if !commonPrm.LocalOnly() {
var onceResign sync.Once

p.SetRequestForwarder(groupAddressRequestForwarder(func(addr network.Address, c client.MultiAddressClient, pubkey []byte) (*object.Object, error) {
p.SetRequestForwarder(groupAddressRequestForwarder(func(ctx context.Context, addr network.Address, c client.MultiAddressClient, pubkey []byte) (*object.Object, error) {
var err error

key, err := s.keyStorage.GetKey(nil)
Expand Down Expand Up @@ -654,11 +711,11 @@ func toShortObjectHeader(hdr *object.Object) objectV2.GetHeaderPart {
return sh
}

func groupAddressRequestForwarder(f func(network.Address, client.MultiAddressClient, []byte) (*object.Object, error)) getsvc.RequestForwarder {
return func(info client.NodeInfo, c client.MultiAddressClient) (*object.Object, error) {
func groupAddressRequestForwarder[V any](f func(context.Context, network.Address, client.MultiAddressClient, []byte) (V, error)) func(context.Context, client.NodeInfo, client.MultiAddressClient) (V, error) {
return func(ctx context.Context, info client.NodeInfo, c client.MultiAddressClient) (V, error) {
var (
firstErr error
res *object.Object
res V

key = info.PublicKey()
)
Expand All @@ -676,7 +733,7 @@ func groupAddressRequestForwarder(f func(network.Address, client.MultiAddressCli
// would be nice to log otherwise
}()

res, err = f(addr, c, key)
res, err = f(ctx, addr, c, key)

return
})
Expand Down

0 comments on commit 9207437

Please sign in to comment.