diff --git a/CHANGELOG.md b/CHANGELOG.md index 8bde550f92..fd71713ac2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,7 @@ Changelog for NeoFS Node ### Fixed ### Changed +- `ObjectService`'s `Put` RPC handler caches up to 10K lists of per-object sorted container nodes (#2901) ### Removed diff --git a/cmd/neofs-node/object.go b/cmd/neofs-node/object.go index ea52fbf9fc..701d23b77c 100644 --- a/cmd/neofs-node/object.go +++ b/cmd/neofs-node/object.go @@ -256,14 +256,13 @@ func initObjectService(c *cfg) { searchsvcV2.WithKeyStorage(keyStorage), ) - sPut := putsvc.NewService(&transport{clients: putConstructor}, + sPut := putsvc.NewService(&transport{clients: putConstructor}, c, putsvc.WithKeyStorage(keyStorage), putsvc.WithClientConstructor(putConstructor), putsvc.WithMaxSizeSource(newCachedMaxObjectSizeSource(c)), putsvc.WithObjectStorage(storageEngine{engine: ls}), putsvc.WithContainerSource(c.cfgObject.cnrSource), putsvc.WithNetworkMapSource(c.netMapSource), - putsvc.WithNetmapKeys(c), putsvc.WithNetworkState(c.cfgNetmap.state), putsvc.WithWorkerPools(c.cfgObject.pool.putRemote, c.cfgObject.pool.putLocal), putsvc.WithLogger(c.log), @@ -758,3 +757,78 @@ func (n netmapSourceWithNodes) ServerInContainer(cID cid.ID) (bool, error) { return serverInContainer, nil } + +// GetContainerNodes reads storage policy of the referenced container from the +// underlying container storage, reads current network map from the underlying +// storage, applies the storage policy to it, gathers storage nodes matching the +// policy and returns sort interface. +// +// GetContainerNodes implements [putsvc.NeoFSNetwork]. +func (c *cfg) GetContainerNodes(cnrID cid.ID) (putsvc.ContainerNodes, error) { + cnr, err := c.cfgObject.containerNodes.containers.Get(cnrID) + if err != nil { + return nil, fmt.Errorf("read container by ID: %w", err) + } + curEpoch, err := c.cfgObject.containerNodes.network.Epoch() + if err != nil { + return nil, fmt.Errorf("read current NeoFS epoch: %w", err) + } + networkMap, err := c.cfgObject.containerNodes.network.GetNetMapByEpoch(curEpoch) + if err != nil { + return nil, fmt.Errorf("read network map at the current epoch #%d: %w", curEpoch, err) + } + policy := cnr.Value.PlacementPolicy() + nodeSets, err := networkMap.ContainerNodes(cnr.Value.PlacementPolicy(), cnrID) + if err != nil { + return nil, fmt.Errorf("apply container storage policy to the network map at current epoch #%d: %w", curEpoch, err) + } + repCounts := make([]uint, policy.NumberOfReplicas()) + for i := range repCounts { + repCounts[i] = uint(policy.ReplicaNumberByIndex(i)) + } + return &containerNodesSorter{ + policy: storagePolicyRes{ + nodeSets: nodeSets, + repCounts: repCounts, + }, + networkMap: networkMap, + cnrID: cnrID, + curEpoch: curEpoch, + containerNodes: c.cfgObject.containerNodes, + }, nil +} + +// implements [putsvc.ContainerNodes]. +type containerNodesSorter struct { + policy storagePolicyRes + networkMap *netmapsdk.NetMap + cnrID cid.ID + curEpoch uint64 + containerNodes *containerNodes +} + +func (x *containerNodesSorter) Unsorted() [][]netmapsdk.NodeInfo { return x.policy.nodeSets } +func (x *containerNodesSorter) PrimaryCounts() []uint { return x.policy.repCounts } +func (x *containerNodesSorter) SortForObject(obj oid.ID) ([][]netmapsdk.NodeInfo, error) { + cacheKey := objectNodesCacheKey{epoch: x.curEpoch} + cacheKey.addr.SetContainer(x.cnrID) + cacheKey.addr.SetObject(obj) + res, ok := x.containerNodes.objCache.Get(cacheKey) + if ok { + return res.nodeSets, res.err + } + if x.networkMap == nil { + var err error + if x.networkMap, err = x.containerNodes.network.GetNetMapByEpoch(x.curEpoch); err != nil { + // non-persistent error => do not cache + return nil, fmt.Errorf("read network map by epoch: %w", err) + } + } + res.repCounts = x.policy.repCounts + res.nodeSets, res.err = x.containerNodes.sortContainerNodesFunc(*x.networkMap, x.policy.nodeSets, obj) + if res.err != nil { + res.err = fmt.Errorf("sort container nodes for object: %w", res.err) + } + x.containerNodes.objCache.Add(cacheKey, res) + return res.nodeSets, res.err +} diff --git a/cmd/neofs-node/policy.go b/cmd/neofs-node/policy.go index a98d0a43d0..27ce63e53a 100644 --- a/cmd/neofs-node/policy.go +++ b/cmd/neofs-node/policy.go @@ -161,17 +161,9 @@ func (x *containerNodes) getNodesForObject(addr oid.Address) ([][]netmapsdk.Node if ok { return res.nodeSets, res.repCounts, res.err } - cnrRes, networkMap, err := (&containerPolicyContext{ - id: addr.Container(), - containers: x.containers, - network: x.network, - getNodesFunc: x.getContainerNodesFunc, - }).applyToNetmap(curEpoch, x.cache) - if err != nil || cnrRes.err != nil { - if err == nil { - err = cnrRes.err // cached in x.cache, no need to store in x.objCache - } - return nil, nil, fmt.Errorf("select container nodes for current epoch #%d: %w", curEpoch, err) + cnrRes, networkMap, err := x.getForCurrentEpoch(curEpoch, addr.Container()) + if err != nil { + return nil, nil, err } if networkMap == nil { if networkMap, err = x.network.GetNetMapByEpoch(curEpoch); err != nil { @@ -188,6 +180,22 @@ func (x *containerNodes) getNodesForObject(addr oid.Address) ([][]netmapsdk.Node return res.nodeSets, res.repCounts, res.err } +func (x *containerNodes) getForCurrentEpoch(curEpoch uint64, cnr cid.ID) (storagePolicyRes, *netmapsdk.NetMap, error) { + policy, networkMap, err := (&containerPolicyContext{ + id: cnr, + containers: x.containers, + network: x.network, + getNodesFunc: x.getContainerNodesFunc, + }).applyToNetmap(curEpoch, x.cache) + if err != nil || policy.err != nil { + if err == nil { + err = policy.err // cached in x.cache, no need to store in x.objCache + } + return storagePolicyRes{}, nil, fmt.Errorf("select container nodes for current epoch #%d: %w", curEpoch, err) + } + return policy, networkMap, nil +} + // preserves context of storage policy processing for the particular container. type containerPolicyContext struct { // static diff --git a/pkg/services/object/put/distibuted_test.go b/pkg/services/object/put/distibuted_test.go new file mode 100644 index 0000000000..c5613840ca --- /dev/null +++ b/pkg/services/object/put/distibuted_test.go @@ -0,0 +1,494 @@ +package putsvc + +import ( + "bytes" + "errors" + "fmt" + "strconv" + "sync" + "testing" + + cid "github.com/nspcc-dev/neofs-sdk-go/container/id" + "github.com/nspcc-dev/neofs-sdk-go/netmap" + oid "github.com/nspcc-dev/neofs-sdk-go/object/id" + oidtest "github.com/nspcc-dev/neofs-sdk-go/object/id/test" + "github.com/stretchr/testify/require" + "go.uber.org/zap" +) + +func allocNodes(n []uint) [][]netmap.NodeInfo { + res := make([][]netmap.NodeInfo, len(n)) + for i := range res { + res[i] = make([]netmap.NodeInfo, n[i]) + for j := range res[i] { + res[i][j].SetPublicKey([]byte(fmt.Sprintf("pub_%d_%d", i, j))) + res[i][j].SetNetworkEndpoints( + "localhost:"+strconv.Itoa(1e4+i*100+2*j), + "localhost:"+strconv.Itoa(1e4+i*100+2*j+1), + ) + res[i][j].SetExternalAddresses( + "external:"+strconv.Itoa(1e4+i*100+2*j), + "external:"+strconv.Itoa(1e4+i*100+2*j+1), + ) + } + } + return res +} + +type testContainerNodes struct { + objID oid.ID + + sortErr error + cnrNodes [][]netmap.NodeInfo + + primCounts []uint +} + +func (x testContainerNodes) Unsorted() [][]netmap.NodeInfo { return x.cnrNodes } + +func (x testContainerNodes) SortForObject(obj oid.ID) ([][]netmap.NodeInfo, error) { + if x.objID != obj { + return nil, errors.New("[test] unexpected object ID") + } + return x.cnrNodes, x.sortErr +} + +func (x testContainerNodes) PrimaryCounts() []uint { return x.primCounts } + +type testNetwork struct { + localPubKey []byte +} + +func (x testNetwork) IsLocalNodePublicKey(pk []byte) bool { return bytes.Equal(x.localPubKey, pk) } + +func (x testNetwork) GetContainerNodes(cid.ID) (ContainerNodes, error) { panic("unimplemented") } + +type testWorkerPool struct { + nCalls int + nFail int + err error +} + +func (x *testWorkerPool) Release() {} +func (x *testWorkerPool) Submit(f func()) error { + x.nCalls++ + if x.err != nil && (x.nFail == 0 || x.nCalls == x.nFail) { + return x.err + } + go f() + return nil +} + +func TestIterateNodesForObject(t *testing.T) { + // nodes: [A B C] [D C E F G] [B H I J] + // policy: [2 3 2] + // C is local. B, E and H fail, all others succeed + // + // expected order of candidates: [A B C] [D E F] [H I J] + objID := oidtest.ID() + cnrNodes := allocNodes([]uint{3, 5, 4}) + cnrNodes[2][0].SetPublicKey(cnrNodes[0][1].PublicKey()) + cnrNodes[1][1].SetPublicKey(cnrNodes[0][2].PublicKey()) + var lwp, rwp testWorkerPool + iter := placementIterator{ + log: zap.NewNop(), + neoFSNet: testNetwork{ + localPubKey: cnrNodes[0][2].PublicKey(), + }, + remotePool: &rwp, + localPool: &lwp, + containerNodes: testContainerNodes{ + objID: objID, + cnrNodes: cnrNodes, + primCounts: []uint{2, 3, 2}, + }, + } + var handlerMtx sync.Mutex + var handlerCalls []nodeDesc + err := iter.iterateNodesForObject(objID, func(node nodeDesc) error { + handlerMtx.Lock() + handlerCalls = append(handlerCalls, node) + handlerMtx.Unlock() + if bytes.Equal(node.info.PublicKey(), cnrNodes[0][1].PublicKey()) || + bytes.Equal(node.info.PublicKey(), cnrNodes[1][2].PublicKey()) || + bytes.Equal(node.info.PublicKey(), cnrNodes[2][1].PublicKey()) { + return errors.New("any node error") + } + return nil + }) + require.NoError(t, err) + require.Len(t, handlerCalls, 9) + withLocal := false + for _, node := range handlerCalls[:3] { + if !withLocal { + withLocal = node.local + } + if node.local { + continue + } + require.Contains(t, [][]byte{ + cnrNodes[0][0].PublicKey(), cnrNodes[0][1].PublicKey(), + }, node.info.PublicKey()) + require.Len(t, node.info.AddressGroup(), 2) + var expNetAddrs, expNetAddrsExt []string + if bytes.Equal(node.info.PublicKey(), cnrNodes[0][0].PublicKey()) { + expNetAddrs = []string{"localhost:10000", "localhost:10001"} + expNetAddrsExt = []string{"external:10000", "external:10001"} + } else { + expNetAddrs = []string{"localhost:10002", "localhost:10003"} + expNetAddrsExt = []string{"external:10002", "external:10003"} + } + require.ElementsMatch(t, expNetAddrs, []string{node.info.AddressGroup()[0].URIAddr(), node.info.AddressGroup()[1].URIAddr()}) + require.ElementsMatch(t, expNetAddrsExt, []string{node.info.ExternalAddressGroup()[0].URIAddr(), node.info.ExternalAddressGroup()[1].URIAddr()}) + } + require.True(t, withLocal) + for _, node := range handlerCalls[3:6] { + require.False(t, node.local) + require.Contains(t, [][]byte{ + cnrNodes[1][0].PublicKey(), cnrNodes[1][2].PublicKey(), cnrNodes[1][3].PublicKey(), + }, node.info.PublicKey()) + require.Len(t, node.info.AddressGroup(), 2) + var expNetAddrs, expNetAddrsExt []string + switch key := node.info.PublicKey(); { + case bytes.Equal(key, cnrNodes[1][0].PublicKey()): + expNetAddrs = []string{"localhost:10100", "localhost:10101"} + expNetAddrsExt = []string{"external:10100", "external:10101"} + case bytes.Equal(key, cnrNodes[1][2].PublicKey()): + expNetAddrs = []string{"localhost:10104", "localhost:10105"} + expNetAddrsExt = []string{"external:10104", "external:10105"} + case bytes.Equal(key, cnrNodes[1][3].PublicKey()): + expNetAddrs = []string{"localhost:10106", "localhost:10107"} + expNetAddrsExt = []string{"external:10106", "external:10107"} + } + require.ElementsMatch(t, expNetAddrs, []string{node.info.AddressGroup()[0].URIAddr(), node.info.AddressGroup()[1].URIAddr()}) + require.ElementsMatch(t, expNetAddrsExt, []string{node.info.ExternalAddressGroup()[0].URIAddr(), node.info.ExternalAddressGroup()[1].URIAddr()}) + } + for _, node := range handlerCalls[6:] { + require.False(t, node.local) + require.Contains(t, [][]byte{ + cnrNodes[2][1].PublicKey(), cnrNodes[2][2].PublicKey(), cnrNodes[2][3].PublicKey(), + }, node.info.PublicKey()) + require.Len(t, node.info.AddressGroup(), 2) + var expNetAddrs, expNetAddrsExt []string + switch key := node.info.PublicKey(); { + case bytes.Equal(key, cnrNodes[2][1].PublicKey()): + expNetAddrs = []string{"localhost:10202", "localhost:10203"} + expNetAddrsExt = []string{"external:10202", "external:10203"} + case bytes.Equal(key, cnrNodes[2][2].PublicKey()): + expNetAddrs = []string{"localhost:10204", "localhost:10205"} + expNetAddrsExt = []string{"external:10204", "external:10205"} + case bytes.Equal(key, cnrNodes[2][3].PublicKey()): + expNetAddrs = []string{"localhost:10206", "localhost:10207"} + expNetAddrsExt = []string{"external:10206", "external:10207"} + } + require.ElementsMatch(t, expNetAddrs, []string{node.info.AddressGroup()[0].URIAddr(), node.info.AddressGroup()[1].URIAddr()}) + require.ElementsMatch(t, expNetAddrsExt, []string{node.info.ExternalAddressGroup()[0].URIAddr(), node.info.ExternalAddressGroup()[1].URIAddr()}) + } + + t.Run("local only", func(t *testing.T) { + objID := oidtest.ID() + cnrNodes := allocNodes([]uint{2, 3, 1}) + var lwp, rwp testWorkerPool + iter := placementIterator{ + log: zap.NewNop(), + neoFSNet: testNetwork{ + localPubKey: cnrNodes[1][1].PublicKey(), + }, + remotePool: &rwp, + localPool: &lwp, + containerNodes: testContainerNodes{ + cnrNodes: cnrNodes, + }, + localOnly: true, + localNodePos: [2]int{1, 1}, + broadcast: true, + } + var handlerMtx sync.Mutex + var handlerCalls []nodeDesc + err := iter.iterateNodesForObject(objID, func(node nodeDesc) error { + handlerMtx.Lock() + handlerCalls = append(handlerCalls, node) + handlerMtx.Unlock() + return nil + }) + require.NoError(t, err) + require.Len(t, handlerCalls, 1) + require.True(t, handlerCalls[0].local) + require.EqualValues(t, 1, lwp.nCalls) + require.Zero(t, rwp.nCalls) + }) + t.Run("linear num of replicas", func(t *testing.T) { + // nodes: [A B C] [D B E] [F G] + // policy: [2 1 2] + // B fails, all others succeed + // linear num: 4 + // + // expected order of candidates: [A B C D E] + objID := oidtest.ID() + cnrNodes := allocNodes([]uint{3, 3, 2}) + cnrNodes[1][1].SetPublicKey(cnrNodes[0][1].PublicKey()) + iter := placementIterator{ + log: zap.NewNop(), + neoFSNet: new(testNetwork), + remotePool: new(testWorkerPool), + localPool: new(testWorkerPool), + containerNodes: testContainerNodes{ + objID: objID, + cnrNodes: cnrNodes, + primCounts: []uint{2, 1, 2}, + }, + linearReplNum: 4, + } + var handlerMtx sync.Mutex + var handlerCalls [][]byte + err := iter.iterateNodesForObject(objID, func(node nodeDesc) error { + handlerMtx.Lock() + handlerCalls = append(handlerCalls, node.info.PublicKey()) + handlerMtx.Unlock() + if bytes.Equal(node.info.PublicKey(), cnrNodes[0][1].PublicKey()) { + return errors.New("any node error") + } + return nil + }) + require.NoError(t, err) + require.Len(t, handlerCalls, 5) + require.ElementsMatch(t, [][]byte{ + cnrNodes[0][0].PublicKey(), + cnrNodes[0][1].PublicKey(), + cnrNodes[0][2].PublicKey(), + cnrNodes[1][0].PublicKey(), + cnrNodes[1][2].PublicKey(), + }, handlerCalls) + }) + t.Run("broadcast", func(t *testing.T) { + // nodes: [A B] [C D B] [E F] + // policy: [1 1 1] + // D fails, all others succeed + // + // expected order candidates: [A C E B D F] + objID := oidtest.ID() + cnrNodes := allocNodes([]uint{2, 3, 2}) + cnrNodes[1][2].SetPublicKey(cnrNodes[0][1].PublicKey()) + iter := placementIterator{ + log: zap.NewNop(), + neoFSNet: new(testNetwork), + remotePool: new(testWorkerPool), + localPool: new(testWorkerPool), + containerNodes: testContainerNodes{ + objID: objID, + cnrNodes: cnrNodes, + primCounts: []uint{1, 1, 1}, + }, + broadcast: true, + } + var handlerMtx sync.Mutex + var handlerCalls [][]byte + err := iter.iterateNodesForObject(objID, func(node nodeDesc) error { + handlerMtx.Lock() + handlerCalls = append(handlerCalls, node.info.PublicKey()) + handlerMtx.Unlock() + if bytes.Equal(node.info.PublicKey(), cnrNodes[1][1].PublicKey()) { + return errors.New("any node error") + } + return nil + }) + require.NoError(t, err) + require.Len(t, handlerCalls, 6) + require.ElementsMatch(t, [][]byte{ + cnrNodes[0][0].PublicKey(), + cnrNodes[1][0].PublicKey(), + cnrNodes[2][0].PublicKey(), + }, handlerCalls[:3]) + for _, key := range handlerCalls[3:] { + require.Contains(t, [][]byte{ + cnrNodes[0][1].PublicKey(), + cnrNodes[1][1].PublicKey(), + cnrNodes[2][1].PublicKey(), + }, key) + } + }) + t.Run("sort nodes for object failure", func(t *testing.T) { + objID := oidtest.ID() + iter := placementIterator{ + log: zap.NewNop(), + containerNodes: testContainerNodes{ + objID: objID, + sortErr: errors.New("any sort error"), + }, + } + err := iter.iterateNodesForObject(objID, func(nodeDesc) error { + t.Fatal("must not be called") + return nil + }) + require.EqualError(t, err, "sort container nodes for the object: any sort error") + }) + t.Run("worker pool failure", func(t *testing.T) { + // nodes: [A B] [C D E] [F] + // policy: [2 2 1] + // C fails network op, worker pool fails for E + objID := oidtest.ID() + replCounts := []uint{2, 3, 1} + cnrNodes := allocNodes(replCounts) + wp := testWorkerPool{ + nFail: 5, + err: errors.New("any worker pool error"), + } + iter := placementIterator{ + log: zap.NewNop(), + neoFSNet: new(testNetwork), + localPool: &wp, + remotePool: &wp, + containerNodes: testContainerNodes{ + objID: objID, + cnrNodes: cnrNodes, + primCounts: []uint{2, 2, 1}, + }, + } + var handlerMtx sync.Mutex + var handlerCalls [][]byte + err := iter.iterateNodesForObject(objID, func(node nodeDesc) error { + handlerMtx.Lock() + handlerCalls = append(handlerCalls, node.info.PublicKey()) + handlerMtx.Unlock() + if bytes.Equal(node.info.PublicKey(), cnrNodes[1][0].PublicKey()) { + return errors.New("any node error") + } + return nil + }) + require.Len(t, handlerCalls, 4) + require.ElementsMatch(t, handlerCalls[:2], [][]byte{ + cnrNodes[0][0].PublicKey(), cnrNodes[0][1].PublicKey(), + }) + require.ElementsMatch(t, handlerCalls[2:], [][]byte{ + cnrNodes[1][0].PublicKey(), cnrNodes[1][1].PublicKey(), + }) + require.EqualError(t, err, "incomplete object PUT by placement: "+ + "submit next job to save an object to the worker pool: any worker pool error "+ + "(last node error: any node error)") + }) + t.Run("not enough nodes a priori", func(t *testing.T) { + // nodes: [A B] [C D E] [F] + // policy: [2 4 1] + // + // after 1st list is finished, 2nd list has not enough nodes, so service should + // not try to access them + // + // this test is a bit synthetic. Actually, selection of container nodes must + // fail if there is not enough nodes. But for complete coverage, detection of + // such cases is also tested + objID := oidtest.ID() + cnrNodes := allocNodes([]uint{2, 3, 1}) + var wp testWorkerPool + iter := placementIterator{ + log: zap.NewNop(), + neoFSNet: new(testNetwork), + remotePool: &wp, + containerNodes: testContainerNodes{ + objID: objID, + cnrNodes: cnrNodes, + primCounts: []uint{2, 4, 1}, + }, + } + var handlerMtx sync.Mutex + var handlerCalls [][]byte + err := iter.iterateNodesForObject(objID, func(node nodeDesc) error { + handlerMtx.Lock() + handlerCalls = append(handlerCalls, node.info.PublicKey()) + handlerMtx.Unlock() + if bytes.Equal(node.info.PublicKey(), cnrNodes[1][0].PublicKey()) { + return errors.New("any node error") + } + return nil + }) + require.EqualError(t, err, "incomplete object PUT by placement: "+ + "number of replicas cannot be met for list #1: 4 required, 3 nodes remaining") + // assert that we processed 1st list and did not even try to access + // nodes from the 2nd one + require.Len(t, handlerCalls, 2) + require.ElementsMatch(t, handlerCalls, [][]byte{ + cnrNodes[0][0].PublicKey(), cnrNodes[0][1].PublicKey(), + }) + }) + t.Run("not enough nodes due to decoding network endpoints failure", func(t *testing.T) { + // nodes: [A B] [C D E] [F] + // policy: [2 2 1] + // node C fails network op, E has incorrect network endpoints. Service should try it but fail + objID := oidtest.ID() + cnrNodes := allocNodes([]uint{2, 3, 1}) + cnrNodes[1][2].SetNetworkEndpoints("definitely invalid network address") + var wp testWorkerPool + iter := placementIterator{ + log: zap.NewNop(), + neoFSNet: new(testNetwork), + remotePool: &wp, + containerNodes: testContainerNodes{ + objID: objID, + cnrNodes: cnrNodes, + primCounts: []uint{2, 2, 1}, + }, + } + var handlerMtx sync.Mutex + var handlerCalls [][]byte + err := iter.iterateNodesForObject(objID, func(node nodeDesc) error { + handlerMtx.Lock() + handlerCalls = append(handlerCalls, node.info.PublicKey()) + handlerMtx.Unlock() + if bytes.Equal(node.info.PublicKey(), cnrNodes[1][0].PublicKey()) { + return errors.New("any node error") + } + return nil + }) + require.ErrorContains(t, err, "incomplete object PUT by placement: "+ + "number of replicas cannot be met for list #1: 1 required, 0 nodes remaining "+ + "(last node error: failed to decode network addresses:") + // assert that we processed 1st list and did not even try to access + // nodes from the 2nd one + require.Len(t, handlerCalls, 4) + require.ElementsMatch(t, handlerCalls[:2], [][]byte{ + cnrNodes[0][0].PublicKey(), cnrNodes[0][1].PublicKey(), + }) + require.ElementsMatch(t, handlerCalls[2:], [][]byte{ + cnrNodes[1][0].PublicKey(), cnrNodes[1][1].PublicKey(), + }) + }) + t.Run("not enough nodes left after RPC failures", func(t *testing.T) { + // nodes: [A B] [C D E] [F] + // policy: [2 3 1] + // C and D fail network op + objID := oidtest.ID() + cnrNodes := allocNodes([]uint{2, 3, 1}) + var wp testWorkerPool + iter := placementIterator{ + log: zap.NewNop(), + neoFSNet: new(testNetwork), + remotePool: &wp, + containerNodes: testContainerNodes{ + objID: objID, + cnrNodes: cnrNodes, + primCounts: []uint{2, 2, 1}, + }, + } + var handlerMtx sync.Mutex + var handlerCalls [][]byte + err := iter.iterateNodesForObject(objID, func(node nodeDesc) error { + handlerMtx.Lock() + handlerCalls = append(handlerCalls, node.info.PublicKey()) + handlerMtx.Unlock() + if bytes.Equal(node.info.PublicKey(), cnrNodes[1][0].PublicKey()) || + bytes.Equal(node.info.PublicKey(), cnrNodes[1][1].PublicKey()) { + return errors.New("any node error") + } + return nil + }) + require.EqualError(t, err, "incomplete object PUT by placement: "+ + "number of replicas cannot be met for list #1: 2 required, 1 nodes remaining "+ + "(last node error: any node error)") + require.Len(t, handlerCalls, 4) + require.ElementsMatch(t, handlerCalls[:2], [][]byte{ + cnrNodes[0][0].PublicKey(), cnrNodes[0][1].PublicKey(), + }) + require.ElementsMatch(t, handlerCalls[2:], [][]byte{ + cnrNodes[1][0].PublicKey(), cnrNodes[1][1].PublicKey(), + }) + }) +} diff --git a/pkg/services/object/put/distributed.go b/pkg/services/object/put/distributed.go index 972ea28c2f..5d6dd29469 100644 --- a/pkg/services/object/put/distributed.go +++ b/pkg/services/object/put/distributed.go @@ -6,11 +6,13 @@ import ( "sync" "sync/atomic" + "github.com/nspcc-dev/neofs-node/pkg/core/client" "github.com/nspcc-dev/neofs-node/pkg/core/object" + "github.com/nspcc-dev/neofs-node/pkg/network" svcutil "github.com/nspcc-dev/neofs-node/pkg/services/object/util" - "github.com/nspcc-dev/neofs-node/pkg/services/object_manager/placement" "github.com/nspcc-dev/neofs-node/pkg/util" neofscrypto "github.com/nspcc-dev/neofs-sdk-go/crypto" + "github.com/nspcc-dev/neofs-sdk-go/netmap" objectSDK "github.com/nspcc-dev/neofs-sdk-go/object" oid "github.com/nspcc-dev/neofs-sdk-go/object/id" "go.uber.org/zap" @@ -22,15 +24,11 @@ type preparedObjectTarget interface { } type distributedTarget struct { - traversalState traversal - traverser *placement.Traverser - - remotePool, localPool util.WorkerPool + placementIterator placementIterator obj *objectSDK.Object objMeta object.ContentMeta - localOnly bool localNodeInContainer bool localNodeSigner neofscrypto.Signer // - object if localOnly @@ -40,72 +38,15 @@ type distributedTarget struct { nodeTargetInitializer func(nodeDesc) preparedObjectTarget - isLocalKey func([]byte) bool - relay func(nodeDesc) error fmt *object.FormatValidator - - log *zap.Logger -} - -// parameters and state of container traversal. -type traversal struct { - opts []placement.Option - - // need of additional broadcast after the object is saved - extraBroadcastEnabled bool - - // mtx protects mExclude map. - mtx sync.RWMutex - - // container nodes which was processed during the primary object placement - mExclude map[string]struct{} -} - -// updates traversal parameters after the primary placement finish and -// returns true if additional container broadcast is needed. -func (x *traversal) submitPrimaryPlacementFinish() bool { - if x.extraBroadcastEnabled { - // do not track success during container broadcast (best-effort) - x.opts = append(x.opts, placement.WithoutSuccessTracking()) - - // avoid 2nd broadcast - x.extraBroadcastEnabled = false - - return true - } - - return false -} - -// marks the container node as processed during the primary object placement. -func (x *traversal) submitProcessed(n placement.Node) { - if x.extraBroadcastEnabled { - key := string(n.PublicKey()) - - x.mtx.Lock() - if x.mExclude == nil { - x.mExclude = make(map[string]struct{}, 1) - } - - x.mExclude[key] = struct{}{} - x.mtx.Unlock() - } -} - -// checks if specified node was processed during the primary object placement. -func (x *traversal) processed(n placement.Node) bool { - x.mtx.RLock() - _, ok := x.mExclude[string(n.PublicKey())] - x.mtx.RUnlock() - return ok } type nodeDesc struct { local bool - info placement.Node + info client.NodeInfo } // errIncompletePut is returned if processing on a container fails. @@ -131,7 +72,7 @@ func (t *distributedTarget) WriteHeader(hdr *objectSDK.Object) error { if t.localNodeInContainer { var err error - if t.localOnly { + if t.placementIterator.localOnly { t.encodedObject, err = encodeObjectWithoutPayload(*hdr, int(payloadLen)) } else { t.encodedObject, err = encodeReplicateRequestWithoutPayload(t.localNodeSigner, *hdr, int(payloadLen)) @@ -169,18 +110,9 @@ func (t *distributedTarget) Close() (oid.ID, error) { tombOrLink := t.obj.Type() == objectSDK.TypeLink || t.obj.Type() == objectSDK.TypeTombstone - if len(t.obj.Children()) > 0 || tombOrLink { + if !t.placementIterator.broadcast && len(t.obj.Children()) > 0 || tombOrLink { // enabling extra broadcast for linking and tomb objects - t.traversalState.extraBroadcastEnabled = true - } - - id, _ := t.obj.ID() - t.traversalState.opts = append(t.traversalState.opts, placement.ForObject(id)) - var err error - - t.traverser, err = placement.NewTraverser(t.traversalState.opts...) - if err != nil { - return oid.ID{}, fmt.Errorf("(%T) could not create object placement traverser: %w", t, err) + t.placementIterator.broadcast = true } // v2 split link object and tombstone validations are expensive routines @@ -188,12 +120,14 @@ func (t *distributedTarget) Close() (oid.ID, error) { // another node is responsible for the validation and may decline it, // does not matter what this node thinks about it if !tombOrLink || t.localNodeInContainer { + var err error if t.objMeta, err = t.fmt.ValidateContent(t.obj); err != nil { return oid.ID{}, fmt.Errorf("(%T) could not validate payload content: %w", t, err) } } - return t.iteratePlacement(t.sendObject) + id, _ := t.obj.ID() + return id, t.placementIterator.iterateNodesForObject(id, t.sendObject) } func (t *distributedTarget) sendObject(node nodeDesc) error { @@ -211,95 +145,247 @@ func (t *distributedTarget) sendObject(node nodeDesc) error { return nil } -func (t *distributedTarget) iteratePlacement(f func(nodeDesc) error) (oid.ID, error) { - id, _ := t.obj.ID() - var resErr atomic.Value +type errNotEnoughNodes struct { + listIndex int + required uint + left uint +} -loop: - for { - addrs := t.traverser.Next() - if len(addrs) == 0 { - break - } +func (x errNotEnoughNodes) Error() string { + return fmt.Sprintf("number of replicas cannot be met for list #%d: %d required, %d nodes remaining", + x.listIndex, x.required, x.left) +} - wg := new(sync.WaitGroup) +type placementIterator struct { + log *zap.Logger + neoFSNet NeoFSNetwork + localPool util.WorkerPool + remotePool util.WorkerPool + /* request-dependent */ + containerNodes ContainerNodes + localOnly bool + localNodePos [2]int // in containerNodeSets. Undefined localOnly is false + // when non-zero, this setting simplifies the object's storage policy + // requirements to a fixed number of object replicas to be retained + linearReplNum uint + // whether to perform additional best-effort of sending the object replica to + // all reserve nodes of the container + broadcast bool +} - for i := range addrs { - if t.traversalState.processed(addrs[i]) { - // it can happen only during additional container broadcast +func (x placementIterator) iterateNodesForObject(obj oid.ID, f func(nodeDesc) error) error { + var err error + var nodeLists [][]netmap.NodeInfo + var replCounts []uint + if x.localOnly { + // TODO: although this particular case fits correctly into the general approach, + // much less actions can be done + nn := x.containerNodes.Unsorted() + nodeLists = [][]netmap.NodeInfo{{nn[x.localNodePos[0]][x.localNodePos[1]]}} + replCounts = []uint{1} + } else { + if nodeLists, err = x.containerNodes.SortForObject(obj); err != nil { + return fmt.Errorf("sort container nodes for the object: %w", err) + } + if x.linearReplNum > 0 { + var n int + for i := range nodeLists { + n += len(nodeLists[i]) + } + ns := make([]netmap.NodeInfo, 0, n) + for i := range nodeLists { + ns = append(ns, nodeLists[i]...) + } + nodeLists = [][]netmap.NodeInfo{ns} + replCounts = []uint{x.linearReplNum} + } else { + replCounts = x.containerNodes.PrimaryCounts() + } + } + var processedNodesMtx sync.RWMutex + var nextNodeGroupKeys []string + var wg sync.WaitGroup + var lastRespErr atomic.Value + nodesCounters := make([]struct{ stored, processed uint }, len(nodeLists)) + nodeResults := make(map[string]struct { + convertErr error + desc nodeDesc + succeeded bool + }) + // TODO: processing node lists in ascending size can potentially reduce failure + // latency and volume of "unfinished" data to be garbage-collected. Also after + // the failure of any of the nodes the ability to comply with the policy + // requirements may be lost. + for i := range nodeLists { + listInd := i + for { + replRem := replCounts[listInd] - nodesCounters[listInd].stored + if replRem == 0 { + break + } + listLen := uint(len(nodeLists[listInd])) + if listLen-nodesCounters[listInd].processed < replRem { + err = errNotEnoughNodes{listIndex: listInd, required: replRem, left: listLen - nodesCounters[listInd].processed} + if e, _ := lastRespErr.Load().(error); e != nil { + err = fmt.Errorf("%w (last node error: %w)", err, e) + } + return errIncompletePut{singleErr: err} + } + if uint(cap(nextNodeGroupKeys)) < replRem { + nextNodeGroupKeys = make([]string, 0, replRem) + } else { + nextNodeGroupKeys = nextNodeGroupKeys[:0] + } + for ; nodesCounters[listInd].processed < listLen && uint(len(nextNodeGroupKeys)) < replRem; nodesCounters[listInd].processed++ { + j := nodesCounters[listInd].processed + pk := nodeLists[listInd][j].PublicKey() + pks := string(pk) + processedNodesMtx.RLock() + nr, ok := nodeResults[pks] + processedNodesMtx.RUnlock() + if ok { + if nr.succeeded { // in some previous list + nodesCounters[listInd].stored++ + replRem-- + } + continue + } + if nr.desc.local = x.neoFSNet.IsLocalNodePublicKey(pk); !nr.desc.local { + nr.desc.info, nr.convertErr = x.convertNodeInfo(nodeLists[listInd][j]) + } + processedNodesMtx.Lock() + nodeResults[pks] = nr + processedNodesMtx.Unlock() + if nr.convertErr == nil { + nextNodeGroupKeys = append(nextNodeGroupKeys, pks) + continue + } + // critical error that may ultimately block the storage service. Normally it + // should not appear because entry into the network map under strict control + x.log.Error("failed to decode network endpoints of the storage node from the network map, skip the node", + zap.String("public key", netmap.StringifyPublicKey(nodeLists[listInd][j])), zap.Error(nr.convertErr)) + if listLen-nodesCounters[listInd].processed-1 < replRem { // -1 includes current node failure + err = fmt.Errorf("%w (last node error: failed to decode network addresses: %w)", + errNotEnoughNodes{listIndex: listInd, required: replRem, left: listLen - nodesCounters[listInd].processed - 1}, + nr.convertErr) + return errIncompletePut{singleErr: err} + } + // continue to try the best to save required number of replicas + } + for j := range nextNodeGroupKeys { + var workerPool util.WorkerPool + pks := nextNodeGroupKeys[j] + processedNodesMtx.RLock() + nr := nodeResults[pks] + processedNodesMtx.RUnlock() + if nr.desc.local { + workerPool = x.localPool + } else { + workerPool = x.remotePool + } + wg.Add(1) + if err := workerPool.Submit(func() { + defer wg.Done() + err := f(nr.desc) + processedNodesMtx.Lock() + if nr.succeeded = err == nil; nr.succeeded { + nodesCounters[listInd].stored++ + } + nodeResults[pks] = nr + processedNodesMtx.Unlock() + if err != nil { + lastRespErr.Store(err) + svcutil.LogServiceError(x.log, "PUT", nr.desc.info.AddressGroup(), err) + return + } + }); err != nil { + svcutil.LogWorkerPoolError(x.log, "PUT", err) + err = fmt.Errorf("submit next job to save an object to the worker pool: %w", err) + if e, _ := lastRespErr.Load().(error); e != nil { + err = fmt.Errorf("%w (last node error: %w)", err, e) + } + return errIncompletePut{singleErr: err} + } + } + wg.Wait() + } + } + if !x.broadcast { + return nil + } + // TODO: since main part of the operation has already been completed, and + // additional broadcast does not affect the result, server should immediately + // send the response +broadcast: + for i := range nodeLists { + for j := range nodeLists[i] { + pk := nodeLists[i][j].PublicKey() + pks := string(pk) + processedNodesMtx.RLock() + nr, ok := nodeResults[pks] + processedNodesMtx.RUnlock() + if ok { continue } - - wg.Add(1) - - addr := addrs[i] - - isLocal := t.isLocalKey(addr.PublicKey()) - + if nr.desc.local = x.neoFSNet.IsLocalNodePublicKey(pk); !nr.desc.local { + nr.desc.info, nr.convertErr = x.convertNodeInfo(nodeLists[i][j]) + } + processedNodesMtx.Lock() + nodeResults[pks] = nr + processedNodesMtx.Unlock() + if nr.convertErr != nil { + // critical error that may ultimately block the storage service. Normally it + // should not appear because entry into the network map under strict control + x.log.Error("failed to decode network endpoints of the storage node from the network map, skip the node", + zap.String("public key", netmap.StringifyPublicKey(nodeLists[i][j])), zap.Error(nr.convertErr)) + continue // to send as many replicas as possible + } var workerPool util.WorkerPool - - if isLocal { - workerPool = t.localPool + if nr.desc.local { + workerPool = x.localPool } else { - workerPool = t.remotePool + workerPool = x.remotePool } - + wg.Add(1) if err := workerPool.Submit(func() { defer wg.Done() - - err := f(nodeDesc{local: isLocal, info: addr}) - - // mark the container node as processed in order to exclude it - // in subsequent container broadcast. Note that we don't - // process this node during broadcast if primary placement - // on it failed. - t.traversalState.submitProcessed(addr) - + err := f(nr.desc) + processedNodesMtx.Lock() + // no need to update result details, just cache + nodeResults[pks] = nr + processedNodesMtx.Unlock() if err != nil { - resErr.Store(err) - svcutil.LogServiceError(t.log, "PUT", addr.Addresses(), err) + svcutil.LogServiceError(x.log, "PUT (extra broadcast)", nr.desc.info.AddressGroup(), err) return } - - t.traverser.SubmitSuccess() }); err != nil { wg.Done() - - svcutil.LogWorkerPoolError(t.log, "PUT", err) - - break loop + svcutil.LogWorkerPoolError(x.log, "PUT (extra broadcast)", err) + break broadcast } } - - wg.Wait() } + wg.Wait() + return nil +} - if !t.traverser.Success() { - var err errIncompletePut - - err.singleErr, _ = resErr.Load().(error) - - return oid.ID{}, err +func (x placementIterator) convertNodeInfo(nodeInfo netmap.NodeInfo) (client.NodeInfo, error) { + var res client.NodeInfo + var endpoints network.AddressGroup + if err := endpoints.FromIterator(network.NodeEndpointsIterator(nodeInfo)); err != nil { + return res, err } - - // perform additional container broadcast if needed - if t.traversalState.submitPrimaryPlacementFinish() { - // reset traversal progress - var err error - t.traverser, err = placement.NewTraverser(t.traversalState.opts...) - if err != nil { - return oid.ID{}, fmt.Errorf("(%T) could not create object placement traverser: %w", t, err) - } - - _, err = t.iteratePlacement(f) - if err != nil { - t.log.Error("additional container broadcast failure", - zap.Error(err), - ) - - // we don't fail primary operation because of broadcast failure + if ext := nodeInfo.ExternalAddresses(); len(ext) > 0 { + var externalEndpoints network.AddressGroup + if err := externalEndpoints.FromStringSlice(ext); err != nil { + // less critical since the main ones must work, but also important + x.log.Warn("failed to decode external network endpoints of the storage node from the network map, ignore them", + zap.String("public key", netmap.StringifyPublicKey(nodeInfo)), zap.Strings("endpoints", ext), zap.Error(err)) + } else { + res.SetExternalAddressGroup(externalEndpoints) } } - - return id, nil + res.SetAddressGroup(endpoints) + res.SetPublicKey(nodeInfo.PublicKey()) + return res, nil } diff --git a/pkg/services/object/put/prm.go b/pkg/services/object/put/prm.go index 229f894e0c..a4081b321d 100644 --- a/pkg/services/object/put/prm.go +++ b/pkg/services/object/put/prm.go @@ -3,7 +3,6 @@ package putsvc import ( "github.com/nspcc-dev/neofs-node/pkg/core/client" "github.com/nspcc-dev/neofs-node/pkg/services/object/util" - "github.com/nspcc-dev/neofs-node/pkg/services/object_manager/placement" containerSDK "github.com/nspcc-dev/neofs-sdk-go/container" neofscrypto "github.com/nspcc-dev/neofs-sdk-go/crypto" "github.com/nspcc-dev/neofs-sdk-go/object" @@ -16,13 +15,13 @@ type PutInitPrm struct { cnr containerSDK.Container - traverseOpts []placement.Option - copiesNumber uint32 relay func(client.NodeInfo, client.MultiAddressClient) error + containerNodes ContainerNodes localNodeInContainer bool + localNodePos [2]int // in containerNodeSets. Undefined when localNodeInContainer is false localNodeSigner neofscrypto.Signer } diff --git a/pkg/services/object/put/service.go b/pkg/services/object/put/service.go index b777182a57..04ae1a82ec 100644 --- a/pkg/services/object/put/service.go +++ b/pkg/services/object/put/service.go @@ -9,6 +9,9 @@ import ( "github.com/nspcc-dev/neofs-node/pkg/core/object" objutil "github.com/nspcc-dev/neofs-node/pkg/services/object/util" "github.com/nspcc-dev/neofs-node/pkg/util" + cid "github.com/nspcc-dev/neofs-sdk-go/container/id" + netmapsdk "github.com/nspcc-dev/neofs-sdk-go/netmap" + oid "github.com/nspcc-dev/neofs-sdk-go/object/id" "go.uber.org/zap" ) @@ -23,6 +26,7 @@ type MaxSizeSource interface { type Service struct { *cfg transport Transport + neoFSNet NeoFSNetwork } type Option func(*cfg) @@ -38,6 +42,41 @@ type ClientConstructor interface { Get(client.NodeInfo) (client.MultiAddressClient, error) } +// ContainerNodes provides access to storage nodes matching storage policy of +// the particular container. +type ContainerNodes interface { + // Unsorted returns unsorted descriptor set corresponding to the storage nodes + // matching storage policy of the container. Nodes are identified by their + // public keys and can be repeated in different sets. + // + // Unsorted callers do not change resulting slices and their elements. + Unsorted() [][]netmapsdk.NodeInfo + // SortForObject sorts container nodes for the referenced object's storage. + // + // SortForObject callers do not change resulting slices and their elements. + SortForObject(oid.ID) ([][]netmapsdk.NodeInfo, error) + // PrimaryCounts returns number (N) of primary object holders for each sorted + // list (L) so: + // - size of each L >= N; + // - first N nodes of each L are primary data holders while others (if any) + // are backup. + PrimaryCounts() []uint +} + +// NeoFSNetwork provides access to the NeoFS network to get information +// necessary for the [Service] to work. +type NeoFSNetwork interface { + // GetContainerNodes selects storage nodes matching storage policy of the + // referenced container for now and provides [ContainerNodes] interface. + // + // Returns [apistatus.ContainerNotFound] if requested container is missing in + // the network. + GetContainerNodes(cid.ID) (ContainerNodes, error) + // IsLocalNodePublicKey checks whether given binary-encoded public key is + // assigned in the network map to a local storage node providing [Service]. + IsLocalNodePublicKey([]byte) bool +} + type cfg struct { keyStorage *objutil.KeyStorage @@ -51,8 +90,6 @@ type cfg struct { remotePool, localPool util.WorkerPool - netmapKeys netmap.AnnouncedKeys - fmtValidator *object.FormatValidator fmtValidatorOpts []object.FormatValidatorOption @@ -72,7 +109,7 @@ func defaultCfg() *cfg { } } -func NewService(transport Transport, opts ...Option) *Service { +func NewService(transport Transport, neoFSNet NeoFSNetwork, opts ...Option) *Service { c := defaultCfg() for i := range opts { @@ -84,6 +121,7 @@ func NewService(transport Transport, opts ...Option) *Service { return &Service{ cfg: c, transport: transport, + neoFSNet: neoFSNet, } } @@ -92,6 +130,7 @@ func (p *Service) Put(ctx context.Context) (*Streamer, error) { cfg: p.cfg, ctx: ctx, transport: p.transport, + neoFSNet: p.neoFSNet, }, nil } @@ -133,12 +172,6 @@ func WithWorkerPools(remote, local util.WorkerPool) Option { } } -func WithNetmapKeys(v netmap.AnnouncedKeys) Option { - return func(c *cfg) { - c.netmapKeys = v - } -} - func WithNetworkState(v netmap.State) Option { return func(c *cfg) { c.networkState = v diff --git a/pkg/services/object/put/streamer.go b/pkg/services/object/put/streamer.go index 29b84b6913..7c96e27919 100644 --- a/pkg/services/object/put/streamer.go +++ b/pkg/services/object/put/streamer.go @@ -6,10 +6,8 @@ import ( "fmt" "github.com/nspcc-dev/neofs-node/pkg/core/client" - "github.com/nspcc-dev/neofs-node/pkg/core/netmap" "github.com/nspcc-dev/neofs-node/pkg/services/object/internal" "github.com/nspcc-dev/neofs-node/pkg/services/object/util" - "github.com/nspcc-dev/neofs-node/pkg/services/object_manager/placement" neofsecdsa "github.com/nspcc-dev/neofs-sdk-go/crypto/ecdsa" "github.com/nspcc-dev/neofs-sdk-go/object" "github.com/nspcc-dev/neofs-sdk-go/user" @@ -27,6 +25,7 @@ type Streamer struct { maxPayloadSz uint64 // network config transport Transport + neoFSNet NeoFSNetwork } var errNotInit = errors.New("stream not initialized") @@ -148,12 +147,6 @@ func (p *Streamer) preparePrm(prm *PutInitPrm) error { return fmt.Errorf("get local node's private key: %w", err) } - // get latest network map - nm, err := netmap.GetLatestNetworkMap(p.netMapSrc) - if err != nil { - return fmt.Errorf("(%T) could not get latest network map: %w", p, err) - } - idCnr, ok := prm.hdr.ContainerID() if !ok { return errors.New("missing container ID") @@ -167,48 +160,27 @@ func (p *Streamer) preparePrm(prm *PutInitPrm) error { prm.cnr = cnrInfo.Value - // add common options - prm.traverseOpts = append(prm.traverseOpts, - // set processing container - placement.ForContainer(prm.cnr), - ) - - if id, ok := prm.hdr.ID(); ok { - prm.traverseOpts = append(prm.traverseOpts, - // set identifier of the processing object - placement.ForObject(id), - ) - } - - prm.traverseOpts = append(prm.traverseOpts, placement.WithCopiesNumber(prm.copiesNumber)) - - // create placement builder from network map - builder := placement.NewNetworkMapBuilder(nm) - - if prm.common.LocalOnly() { - // restrict success count to 1 stored copy (to local storage) - prm.traverseOpts = append(prm.traverseOpts, placement.SuccessAfter(1)) - - // use local-only placement builder - builder = util.NewLocalPlacement(builder, p.netmapKeys) - } - - nodeSets, err := builder.BuildPlacement(idCnr, nil, cnrInfo.Value.PlacementPolicy()) + prm.containerNodes, err = p.neoFSNet.GetContainerNodes(idCnr) if err != nil { - return fmt.Errorf("apply container's storage policy to current network map: %w", err) + return fmt.Errorf("select storage nodes for the container: %w", err) } + cnrNodes := prm.containerNodes.Unsorted() nextSet: - for i := range nodeSets { - for j := range nodeSets[i] { - prm.localNodeInContainer = p.netmapKeys.IsLocalKey(nodeSets[i][j].PublicKey()) + for i := range cnrNodes { + for j := range cnrNodes[i] { + prm.localNodeInContainer = p.neoFSNet.IsLocalNodePublicKey(cnrNodes[i][j].PublicKey()) if prm.localNodeInContainer { + if prm.copiesNumber > 0 { + return errors.New("storage of multiple object replicas is requested for a local operation") + } + prm.localNodePos[0], prm.localNodePos[1] = i, j break nextSet } } } - - // set placement builder - prm.traverseOpts = append(prm.traverseOpts, placement.UseBuilder(builder)) + if !prm.localNodeInContainer && prm.common.LocalOnly() { + return errors.New("local operation on the node not compliant with the container storage policy") + } prm.localNodeSigner = (*neofsecdsa.Signer)(localNodeKey) @@ -219,16 +191,12 @@ func (p *Streamer) newCommonTarget(prm *PutInitPrm) internal.Target { var relay func(nodeDesc) error if p.relay != nil { relay = func(node nodeDesc) error { - var info client.NodeInfo - - client.NodeInfoFromNetmapElement(&info, node.info) - - c, err := p.clientConstructor.Get(info) + c, err := p.clientConstructor.Get(node.info) if err != nil { - return fmt.Errorf("could not create SDK client %s: %w", info.AddressGroup(), err) + return fmt.Errorf("could not create SDK client %s: %w", node.info.AddressGroup(), err) } - return p.relay(info, c) + return p.relay(node.info, c) } } @@ -239,13 +207,17 @@ func (p *Streamer) newCommonTarget(prm *PutInitPrm) internal.Target { withBroadcast := !localOnly && (typ == object.TypeTombstone || typ == object.TypeLock) return &distributedTarget{ - traversalState: traversal{ - opts: prm.traverseOpts, - - extraBroadcastEnabled: withBroadcast, + placementIterator: placementIterator{ + log: p.log, + neoFSNet: p.neoFSNet, + localPool: p.localPool, + remotePool: p.remotePool, + containerNodes: prm.containerNodes, + localOnly: localOnly, + localNodePos: prm.localNodePos, + linearReplNum: uint(prm.copiesNumber), + broadcast: withBroadcast, }, - remotePool: p.remotePool, - localPool: p.localPool, nodeTargetInitializer: func(node nodeDesc) preparedObjectTarget { if node.local { return &localTarget{ @@ -257,21 +229,15 @@ func (p *Streamer) newCommonTarget(prm *PutInitPrm) internal.Target { ctx: p.ctx, keyStorage: p.keyStorage, commonPrm: prm.common, + nodeInfo: node.info, clientConstructor: p.clientConstructor, transport: p.transport, } - client.NodeInfoFromNetmapElement(&rt.nodeInfo, node.info) - return rt }, - relay: relay, - fmt: p.fmtValidator, - log: p.log, - - isLocalKey: p.netmapKeys.IsLocalKey, - - localOnly: localOnly, + relay: relay, + fmt: p.fmtValidator, localNodeInContainer: prm.localNodeInContainer, localNodeSigner: prm.localNodeSigner, }