Skip to content

Commit

Permalink
node/object/put: exchange meta signatures during replication
Browse files Browse the repository at this point in the history
Initial replication requires nodes to sign object's main meta information and
respond with it. Meta information is not sent on wire and treated as a fixed
ordered NEO's map. Signatures are verified, not stored/send anywhere yet.
It follows recent API extension: nspcc-dev/neofs-api#299.
Closes #2876.

Signed-off-by: Pavel Karpy <carpawell@nspcc.ru>
  • Loading branch information
carpawell committed Sep 6, 2024
1 parent 66adff2 commit a30127e
Show file tree
Hide file tree
Showing 13 changed files with 304 additions and 40 deletions.
3 changes: 2 additions & 1 deletion cmd/neofs-node/object.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
truststorage "github.com/nspcc-dev/neofs-node/pkg/services/reputation/local/storage"
"github.com/nspcc-dev/neofs-sdk-go/client"
cid "github.com/nspcc-dev/neofs-sdk-go/container/id"
neofsecdsa "github.com/nspcc-dev/neofs-sdk-go/crypto/ecdsa"
eaclSDK "github.com/nspcc-dev/neofs-sdk-go/eacl"
netmapsdk "github.com/nspcc-dev/neofs-sdk-go/netmap"
objectSDK "github.com/nspcc-dev/neofs-sdk-go/object"
Expand Down Expand Up @@ -350,7 +351,7 @@ func initObjectService(c *cfg) {
firstSvc = objectService.NewMetricCollector(signSvc, c.metricsCollector)
}

server := objectTransportGRPC.New(firstSvc, objNode)
server := objectTransportGRPC.New(firstSvc, objNode, neofsecdsa.SignerRFC6979(c.shared.basics.key.PrivateKey))

Check warning on line 354 in cmd/neofs-node/object.go

View check run for this annotation

Codecov / codecov/patch

cmd/neofs-node/object.go#L354

Added line #L354 was not covered by tests

for _, srv := range c.cfgGRPC.servers {
objectGRPC.RegisterObjectServiceServer(srv, server)
Expand Down
36 changes: 31 additions & 5 deletions cmd/neofs-node/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,15 @@ import (
"fmt"

objectGRPC "github.com/nspcc-dev/neofs-api-go/v2/object/grpc"
"github.com/nspcc-dev/neofs-api-go/v2/refs"
rawclient "github.com/nspcc-dev/neofs-api-go/v2/rpc/client"
"github.com/nspcc-dev/neofs-api-go/v2/rpc/common"
"github.com/nspcc-dev/neofs-api-go/v2/rpc/grpc"
"github.com/nspcc-dev/neofs-api-go/v2/rpc/message"
"github.com/nspcc-dev/neofs-api-go/v2/status"
coreclient "github.com/nspcc-dev/neofs-node/pkg/core/client"
apistatus "github.com/nspcc-dev/neofs-sdk-go/client/status"
neofscrypto "github.com/nspcc-dev/neofs-sdk-go/crypto"
)

type transport struct {
Expand All @@ -20,27 +22,31 @@ type transport struct {

// SendReplicationRequestToNode connects to described node and sends prepared
// replication request message to it.
func (x *transport) SendReplicationRequestToNode(ctx context.Context, req []byte, node coreclient.NodeInfo) error {
func (x *transport) SendReplicationRequestToNode(ctx context.Context, req []byte, node coreclient.NodeInfo) (*neofscrypto.Signature, error) {

Check warning on line 25 in cmd/neofs-node/transport.go

View check run for this annotation

Codecov / codecov/patch

cmd/neofs-node/transport.go#L25

Added line #L25 was not covered by tests
c, err := x.clients.Get(node)
if err != nil {
return fmt.Errorf("connect to remote node: %w", err)
return nil, fmt.Errorf("connect to remote node: %w", err)

Check warning on line 28 in cmd/neofs-node/transport.go

View check run for this annotation

Codecov / codecov/patch

cmd/neofs-node/transport.go#L28

Added line #L28 was not covered by tests
}

return c.ExecRaw(func(c *rawclient.Client) error {
var resp replicateResponse
err = c.ExecRaw(func(c *rawclient.Client) error {

Check warning on line 32 in cmd/neofs-node/transport.go

View check run for this annotation

Codecov / codecov/patch

cmd/neofs-node/transport.go#L31-L32

Added lines #L31 - L32 were not covered by tests
// this will be changed during NeoFS API Go deprecation. Code most likely be
// placed in SDK
m := common.CallMethodInfo{Service: "neo.fs.v2.object.ObjectService", Name: "Replicate"}
var resp replicateResponse
err = rawclient.SendUnary(c, m, rawclient.BinaryMessage(req), &resp,
rawclient.WithContext(ctx), rawclient.AllowBinarySendingOnly())
if err != nil {
return fmt.Errorf("API transport (service=%s,op=%s): %w", m.Service, m.Name, err)
}
return resp.err
})
return resp.sig, err

Check warning on line 43 in cmd/neofs-node/transport.go

View check run for this annotation

Codecov / codecov/patch

cmd/neofs-node/transport.go#L43

Added line #L43 was not covered by tests
}

type replicateResponse struct{ err error }
type replicateResponse struct {
sig *neofscrypto.Signature
err error
}

func (x replicateResponse) ToGRPCMessage() grpc.Message { return new(objectGRPC.ReplicateResponse) }

Expand All @@ -60,6 +66,26 @@ func (x *replicateResponse) FromGRPCMessage(gm grpc.Message) error {
}

x.err = apistatus.ErrorFromV2(st)
if x.err != nil {
return nil

Check warning on line 70 in cmd/neofs-node/transport.go

View check run for this annotation

Codecov / codecov/patch

cmd/neofs-node/transport.go#L69-L70

Added lines #L69 - L70 were not covered by tests
}

sig := m.GetObjectSignature()
if sig == nil {
return nil

Check warning on line 75 in cmd/neofs-node/transport.go

View check run for this annotation

Codecov / codecov/patch

cmd/neofs-node/transport.go#L73-L75

Added lines #L73 - L75 were not covered by tests
}

sigV2 := new(refs.Signature)
err := sigV2.Unmarshal(sig)
if err != nil {
return fmt.Errorf("decoding signature from proto message: %w", err)

Check warning on line 81 in cmd/neofs-node/transport.go

View check run for this annotation

Codecov / codecov/patch

cmd/neofs-node/transport.go#L78-L81

Added lines #L78 - L81 were not covered by tests
}

x.sig = new(neofscrypto.Signature)
err = x.sig.ReadFromV2(*sigV2)
if err != nil {
return fmt.Errorf("invalid signature: %w", err)

Check warning on line 87 in cmd/neofs-node/transport.go

View check run for this annotation

Codecov / codecov/patch

cmd/neofs-node/transport.go#L84-L87

Added lines #L84 - L87 were not covered by tests
}

return nil
}
68 changes: 68 additions & 0 deletions pkg/core/object/replicate.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package object

import (
"github.com/nspcc-dev/neo-go/pkg/vm/stackitem"
cid "github.com/nspcc-dev/neofs-sdk-go/container/id"
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
)

const (
validInterval = 10 // in epoches
currentVersion = 6 // it is also a number of fields

cidKey = "cid"
oidKey = "oid"
sizeKey = "size"
deletedKey = "deleted"
lockedKey = "locked"
validUntilKey = "validuntil"
)

// MetaInfo uses NEO's map (strict order) serialized format as a raw
// representation of object's meta information.
//
// This (ordered) format is used (keys are strings):
//
// "cid": _raw_ container ID (32 bytes)
// "oid": _raw_ object ID (32 bytes)
// "size": payload size
// "deleted": array of _raw_ object IDs
// "locked": array of _raw_ object IDs
// "validuntil": last valid epoch number for meta information
//
// Last valid epoch is object's creation epoch + 10.
func MetaInfo(cID cid.ID, oID oid.ID, pSize uint64, deleted []oid.ID, locked []oid.ID, createdAt uint64) []byte {
kvs := make([]stackitem.MapElement, 0, currentVersion)
kvs = append(kvs, kv(cidKey, cID[:]))
kvs = append(kvs, kv(oidKey, oID[:]))
kvs = append(kvs, kv(sizeKey, pSize))
kvs = append(kvs, oidsKV(deletedKey, deleted))
kvs = append(kvs, oidsKV(lockedKey, locked))
kvs = append(kvs, kv(validUntilKey, createdAt+validInterval))

result, err := stackitem.Serialize(stackitem.NewMapWithValue(kvs))
if err != nil {
// all the errors in the stackitem relate only cases when it is
// impossible to use serialized values (too many values, unsupported
// types, etc.), unexpected errors at all
panic(err)

Check warning on line 48 in pkg/core/object/replicate.go

View check run for this annotation

Codecov / codecov/patch

pkg/core/object/replicate.go#L48

Added line #L48 was not covered by tests
}

return result
}

func kv(k string, value any) stackitem.MapElement {
return stackitem.MapElement{
Key: stackitem.Make(k),
Value: stackitem.Make(value),
}
}

func oidsKV(fieldKey string, oIDs []oid.ID) stackitem.MapElement {
res := make([]stackitem.Item, 0, len(oIDs))
for _, oID := range oIDs {
res = append(res, stackitem.NewByteArray(oID[:]))
}

return kv(fieldKey, res)
}
65 changes: 65 additions & 0 deletions pkg/core/object/replicate_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package object

import (
"math/big"
"math/rand/v2"
"testing"

"github.com/nspcc-dev/neo-go/pkg/vm/stackitem"
cidtest "github.com/nspcc-dev/neofs-sdk-go/container/id/test"
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
oidtest "github.com/nspcc-dev/neofs-sdk-go/object/id/test"
"github.com/stretchr/testify/require"
)

func TestMetaInfo(t *testing.T) {
oID := oidtest.ID()
cID := cidtest.ID()
size := rand.Uint64()
deleted := oidtest.IDs(10)
locked := oidtest.IDs(10)
validUntil := rand.Uint64()

raw := MetaInfo(cID, oID, size, deleted, locked, validUntil)
item, err := stackitem.Deserialize(raw)
require.NoError(t, err)

require.Equal(t, stackitem.MapT, item.Type())
mm, ok := item.Value().([]stackitem.MapElement)
require.True(t, ok)

require.Len(t, mm, currentVersion)

require.Equal(t, cidKey, string(mm[0].Key.Value().([]byte)))
require.Equal(t, cID[:], mm[0].Value.Value().([]byte))

require.Equal(t, oidKey, string(mm[1].Key.Value().([]byte)))
require.Equal(t, oID[:], mm[1].Value.Value().([]byte))

require.Equal(t, sizeKey, string(mm[2].Key.Value().([]byte)))
require.Equal(t, size, mm[2].Value.Value().(*big.Int).Uint64())

require.Equal(t, deletedKey, string(mm[3].Key.Value().([]byte)))
require.Equal(t, deleted, stackItemToOIDs(t, mm[3].Value))

require.Equal(t, lockedKey, string(mm[4].Key.Value().([]byte)))
require.Equal(t, locked, stackItemToOIDs(t, mm[4].Value))

require.Equal(t, validUntilKey, string(mm[5].Key.Value().([]byte)))
require.Equal(t, validUntil+validInterval, mm[5].Value.Value().(*big.Int).Uint64())
}

func stackItemToOIDs(t *testing.T, value stackitem.Item) []oid.ID {
value, ok := value.(*stackitem.Array)
require.True(t, ok)

vv := value.Value().([]stackitem.Item)
res := make([]oid.ID, 0, len(vv))

for _, v := range vv {
raw := v.Value().([]byte)
res = append(res, oid.ID(raw))
}

return res
}
55 changes: 54 additions & 1 deletion pkg/network/transport/object/grpc/replication.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,13 @@ import (
refsv2 "github.com/nspcc-dev/neofs-api-go/v2/refs"
refs "github.com/nspcc-dev/neofs-api-go/v2/refs/grpc"
status "github.com/nspcc-dev/neofs-api-go/v2/status/grpc"
objectcore "github.com/nspcc-dev/neofs-node/pkg/core/object"
apistatus "github.com/nspcc-dev/neofs-sdk-go/client/status"
cid "github.com/nspcc-dev/neofs-sdk-go/container/id"
neofscrypto "github.com/nspcc-dev/neofs-sdk-go/crypto"
neofsecdsa "github.com/nspcc-dev/neofs-sdk-go/crypto/ecdsa"
"github.com/nspcc-dev/neofs-sdk-go/object"
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
)

// Replicate serves neo.fs.v2.object.ObjectService/Replicate RPC.
Expand Down Expand Up @@ -178,7 +180,20 @@ func (s *Server) Replicate(_ context.Context, req *objectGRPC.ReplicateRequest)
}}, nil
}

return new(objectGRPC.ReplicateResponse), nil
resp := new(objectGRPC.ReplicateResponse)
if req.GetSignObject() {
sigRaw, err := s.metaInfoSignature(*obj)
if err != nil {
return &objectGRPC.ReplicateResponse{Status: &status.Status{
Code: codeInternal,
Message: fmt.Sprintf("failed to sign object meta information: %v", err),
}}, nil

Check warning on line 190 in pkg/network/transport/object/grpc/replication.go

View check run for this annotation

Codecov / codecov/patch

pkg/network/transport/object/grpc/replication.go#L185-L190

Added lines #L185 - L190 were not covered by tests
}

resp.ObjectSignature = sigRaw

Check warning on line 193 in pkg/network/transport/object/grpc/replication.go

View check run for this annotation

Codecov / codecov/patch

pkg/network/transport/object/grpc/replication.go#L193

Added line #L193 was not covered by tests
}

return resp, nil
}

func objectFromMessage(gMsg *objectGRPC.Object) (*object.Object, error) {
Expand All @@ -190,3 +205,41 @@ func objectFromMessage(gMsg *objectGRPC.Object) (*object.Object, error) {

return object.NewFromV2(&msg), nil
}

func (s *Server) metaInfoSignature(o object.Object) ([]byte, error) {
var deleted []oid.ID
var locked []oid.ID
switch o.Type() {
case object.TypeTombstone:
var t object.Tombstone
err := t.Unmarshal(o.Payload())
if err != nil {
return nil, fmt.Errorf("reading tombstoned objects: %w", err)

Check warning on line 217 in pkg/network/transport/object/grpc/replication.go

View check run for this annotation

Codecov / codecov/patch

pkg/network/transport/object/grpc/replication.go#L209-L217

Added lines #L209 - L217 were not covered by tests
}

deleted = t.Members()
case object.TypeLock:
var l object.Lock
err := l.Unmarshal(o.Payload())
if err != nil {
return nil, fmt.Errorf("reading locked objects: %w", err)

Check warning on line 225 in pkg/network/transport/object/grpc/replication.go

View check run for this annotation

Codecov / codecov/patch

pkg/network/transport/object/grpc/replication.go#L220-L225

Added lines #L220 - L225 were not covered by tests
}

locked = make([]oid.ID, l.NumberOfMembers())
l.ReadMembers(locked)
default:

Check warning on line 230 in pkg/network/transport/object/grpc/replication.go

View check run for this annotation

Codecov / codecov/patch

pkg/network/transport/object/grpc/replication.go#L228-L230

Added lines #L228 - L230 were not covered by tests
}

metaInfo := objectcore.MetaInfo(o.GetContainerID(), o.GetID(), o.PayloadSize(), deleted, locked, o.CreationEpoch())

Check warning on line 233 in pkg/network/transport/object/grpc/replication.go

View check run for this annotation

Codecov / codecov/patch

pkg/network/transport/object/grpc/replication.go#L233

Added line #L233 was not covered by tests

var sig neofscrypto.Signature
err := sig.Calculate(s.signer, metaInfo)
if err != nil {
return nil, fmt.Errorf("signature failure: %w", err)

Check warning on line 238 in pkg/network/transport/object/grpc/replication.go

View check run for this annotation

Codecov / codecov/patch

pkg/network/transport/object/grpc/replication.go#L235-L238

Added lines #L235 - L238 were not covered by tests
}

sigV2 := new(refsv2.Signature)
sig.WriteToV2(sigV2)

Check warning on line 242 in pkg/network/transport/object/grpc/replication.go

View check run for this annotation

Codecov / codecov/patch

pkg/network/transport/object/grpc/replication.go#L241-L242

Added lines #L241 - L242 were not covered by tests

return sigV2.StableMarshal(nil), nil

Check warning on line 244 in pkg/network/transport/object/grpc/replication.go

View check run for this annotation

Codecov / codecov/patch

pkg/network/transport/object/grpc/replication.go#L244

Added line #L244 was not covered by tests
}
13 changes: 7 additions & 6 deletions pkg/network/transport/object/grpc/replication_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@ func anyValidRequest(tb testing.TB, signer neofscrypto.Signer, cnr cid.ID, objID
Key: neofscrypto.PublicKeyBytes(signer.Public()),
Sign: sig,
},
SignObject: false,
}

switch signer.Scheme() {
Expand All @@ -160,7 +161,7 @@ func anyValidRequest(tb testing.TB, signer neofscrypto.Signer, cnr cid.ID, objID
func TestServer_Replicate(t *testing.T) {
var noCallNode noCallTestNode
var noCallObjSvc noCallObjectService
noCallSrv := New(noCallObjSvc, &noCallNode)
noCallSrv := New(noCallObjSvc, &noCallNode, neofscryptotest.Signer())
clientSigner := neofscryptotest.Signer()
clientPubKey := neofscrypto.PublicKeyBytes(clientSigner.Public())
serverPubKey := neofscrypto.PublicKeyBytes(neofscryptotest.Signer().Public())
Expand Down Expand Up @@ -324,7 +325,7 @@ func TestServer_Replicate(t *testing.T) {

t.Run("apply storage policy failure", func(t *testing.T) {
node := newTestNode(t, serverPubKey, clientPubKey, cnr, req.Object)
srv := New(noCallObjSvc, node)
srv := New(noCallObjSvc, node, neofscryptotest.Signer())

node.cnrErr = errors.New("any error")

Expand All @@ -336,7 +337,7 @@ func TestServer_Replicate(t *testing.T) {

t.Run("client or server mismatches object's storage policy", func(t *testing.T) {
node := newTestNode(t, serverPubKey, clientPubKey, cnr, req.Object)
srv := New(noCallObjSvc, node)
srv := New(noCallObjSvc, node, neofscryptotest.Signer())

node.serverOutsideCnr = true
node.clientOutsideCnr = true
Expand All @@ -356,7 +357,7 @@ func TestServer_Replicate(t *testing.T) {

t.Run("local storage failure", func(t *testing.T) {
node := newTestNode(t, serverPubKey, clientPubKey, cnr, req.Object)
srv := New(noCallObjSvc, node)
srv := New(noCallObjSvc, node, neofscryptotest.Signer())

node.storeErr = errors.New("any error")

Expand All @@ -368,7 +369,7 @@ func TestServer_Replicate(t *testing.T) {

t.Run("OK", func(t *testing.T) {
node := newTestNode(t, serverPubKey, clientPubKey, cnr, req.Object)
srv := New(noCallObjSvc, node)
srv := New(noCallObjSvc, node, neofscryptotest.Signer())

resp, err := srv.Replicate(context.Background(), req)
require.NoError(t, err)
Expand All @@ -395,7 +396,7 @@ func BenchmarkServer_Replicate(b *testing.B) {
ctx := context.Background()
var node nopNode

srv := New(nil, node)
srv := New(nil, node, neofscryptotest.Signer())

for _, tc := range []struct {
name string
Expand Down
Loading

0 comments on commit a30127e

Please sign in to comment.