From 729927e1ee61d5ebdc293fd363cfb4f1eaba415d Mon Sep 17 00:00:00 2001 From: Leonard Lyubich Date: Fri, 9 Aug 2024 20:06:40 +0400 Subject: [PATCH] node/object: Cache object policy application results for Put service Continues 5389a1e269a9541f1d5795771782d23fd686ee58 for `ObjectService`'s `Put` server handler. It shares the cache with other object processors. Refs #1803. Signed-off-by: Leonard Lyubich --- CHANGELOG.md | 2 +- cmd/neofs-node/object.go | 46 +++++++++++++++++++++++++++++++--------- cmd/neofs-node/policy.go | 30 ++++++++++++++++---------- 3 files changed, 56 insertions(+), 22 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 0bf0697479..3e0b8688f7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -36,7 +36,7 @@ Changelog for NeoFS Node - `ObjectService.Put` server of in-container node places objects using new `ObjectService.Replicate` RPC (#2802) - `ObjectService`'s `Search` and `Replicate` RPC handlers cache up to 1000 lists of container nodes (#2892) - Default max_traceable_blocks Morph setting lowered to 17280 from 2102400 (#2897) -- `ObjectService`'s `Get`/`Head`/`GetRange` RPC handlers cache up to 10K lists of per-object sorted container nodes (#2896) +- `ObjectService`'s `Get`/`Head`/`GetRange`/`Put` RPC handlers cache up to 10K lists of per-object sorted container nodes (#2896, #2901) ### Updated - neofs-contract dependency to 0.20.0 (#2872) diff --git a/cmd/neofs-node/object.go b/cmd/neofs-node/object.go index c2cf0df341..701d23b77c 100644 --- a/cmd/neofs-node/object.go +++ b/cmd/neofs-node/object.go @@ -786,23 +786,49 @@ func (c *cfg) GetContainerNodes(cnrID cid.ID) (putsvc.ContainerNodes, error) { for i := range repCounts { repCounts[i] = uint(policy.ReplicaNumberByIndex(i)) } - return containerNodesSorter{ + return &containerNodesSorter{ policy: storagePolicyRes{ nodeSets: nodeSets, repCounts: repCounts, }, - networkMap: networkMap, + networkMap: networkMap, + cnrID: cnrID, + curEpoch: curEpoch, + containerNodes: c.cfgObject.containerNodes, }, nil } // implements [putsvc.ContainerNodes]. type containerNodesSorter struct { - policy storagePolicyRes - networkMap *netmapsdk.NetMap -} - -func (x containerNodesSorter) Unsorted() [][]netmapsdk.NodeInfo { return x.policy.nodeSets } -func (x containerNodesSorter) PrimaryCounts() []uint { return x.policy.repCounts } -func (x containerNodesSorter) SortForObject(obj oid.ID) ([][]netmapsdk.NodeInfo, error) { - return x.networkMap.PlacementVectors(x.policy.nodeSets, obj) + policy storagePolicyRes + networkMap *netmapsdk.NetMap + cnrID cid.ID + curEpoch uint64 + containerNodes *containerNodes +} + +func (x *containerNodesSorter) Unsorted() [][]netmapsdk.NodeInfo { return x.policy.nodeSets } +func (x *containerNodesSorter) PrimaryCounts() []uint { return x.policy.repCounts } +func (x *containerNodesSorter) SortForObject(obj oid.ID) ([][]netmapsdk.NodeInfo, error) { + cacheKey := objectNodesCacheKey{epoch: x.curEpoch} + cacheKey.addr.SetContainer(x.cnrID) + cacheKey.addr.SetObject(obj) + res, ok := x.containerNodes.objCache.Get(cacheKey) + if ok { + return res.nodeSets, res.err + } + if x.networkMap == nil { + var err error + if x.networkMap, err = x.containerNodes.network.GetNetMapByEpoch(x.curEpoch); err != nil { + // non-persistent error => do not cache + return nil, fmt.Errorf("read network map by epoch: %w", err) + } + } + res.repCounts = x.policy.repCounts + res.nodeSets, res.err = x.containerNodes.sortContainerNodesFunc(*x.networkMap, x.policy.nodeSets, obj) + if res.err != nil { + res.err = fmt.Errorf("sort container nodes for object: %w", res.err) + } + x.containerNodes.objCache.Add(cacheKey, res) + return res.nodeSets, res.err } diff --git a/cmd/neofs-node/policy.go b/cmd/neofs-node/policy.go index a98d0a43d0..27ce63e53a 100644 --- a/cmd/neofs-node/policy.go +++ b/cmd/neofs-node/policy.go @@ -161,17 +161,9 @@ func (x *containerNodes) getNodesForObject(addr oid.Address) ([][]netmapsdk.Node if ok { return res.nodeSets, res.repCounts, res.err } - cnrRes, networkMap, err := (&containerPolicyContext{ - id: addr.Container(), - containers: x.containers, - network: x.network, - getNodesFunc: x.getContainerNodesFunc, - }).applyToNetmap(curEpoch, x.cache) - if err != nil || cnrRes.err != nil { - if err == nil { - err = cnrRes.err // cached in x.cache, no need to store in x.objCache - } - return nil, nil, fmt.Errorf("select container nodes for current epoch #%d: %w", curEpoch, err) + cnrRes, networkMap, err := x.getForCurrentEpoch(curEpoch, addr.Container()) + if err != nil { + return nil, nil, err } if networkMap == nil { if networkMap, err = x.network.GetNetMapByEpoch(curEpoch); err != nil { @@ -188,6 +180,22 @@ func (x *containerNodes) getNodesForObject(addr oid.Address) ([][]netmapsdk.Node return res.nodeSets, res.repCounts, res.err } +func (x *containerNodes) getForCurrentEpoch(curEpoch uint64, cnr cid.ID) (storagePolicyRes, *netmapsdk.NetMap, error) { + policy, networkMap, err := (&containerPolicyContext{ + id: cnr, + containers: x.containers, + network: x.network, + getNodesFunc: x.getContainerNodesFunc, + }).applyToNetmap(curEpoch, x.cache) + if err != nil || policy.err != nil { + if err == nil { + err = policy.err // cached in x.cache, no need to store in x.objCache + } + return storagePolicyRes{}, nil, fmt.Errorf("select container nodes for current epoch #%d: %w", curEpoch, err) + } + return policy, networkMap, nil +} + // preserves context of storage policy processing for the particular container. type containerPolicyContext struct { // static