Skip to content

Commit

Permalink
node/object: Cache object policy application results for Put service
Browse files Browse the repository at this point in the history
Continues 5389a1e for `ObjectService`'s
`Put` server handler. It shares the cache with other object processors.

Refs #1803.

Signed-off-by: Leonard Lyubich <leonard@morphbits.io>
  • Loading branch information
cthulhu-rider committed Aug 23, 2024
1 parent 5076fe2 commit 729927e
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 22 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ Changelog for NeoFS Node
- `ObjectService.Put` server of in-container node places objects using new `ObjectService.Replicate` RPC (#2802)
- `ObjectService`'s `Search` and `Replicate` RPC handlers cache up to 1000 lists of container nodes (#2892)
- Default max_traceable_blocks Morph setting lowered to 17280 from 2102400 (#2897)
- `ObjectService`'s `Get`/`Head`/`GetRange` RPC handlers cache up to 10K lists of per-object sorted container nodes (#2896)
- `ObjectService`'s `Get`/`Head`/`GetRange`/`Put` RPC handlers cache up to 10K lists of per-object sorted container nodes (#2896, #2901)

### Updated
- neofs-contract dependency to 0.20.0 (#2872)
Expand Down
46 changes: 36 additions & 10 deletions cmd/neofs-node/object.go
Original file line number Diff line number Diff line change
Expand Up @@ -786,23 +786,49 @@ func (c *cfg) GetContainerNodes(cnrID cid.ID) (putsvc.ContainerNodes, error) {
for i := range repCounts {
repCounts[i] = uint(policy.ReplicaNumberByIndex(i))

Check warning on line 787 in cmd/neofs-node/object.go

View check run for this annotation

Codecov / codecov/patch

cmd/neofs-node/object.go#L785-L787

Added lines #L785 - L787 were not covered by tests
}
return containerNodesSorter{
return &containerNodesSorter{
policy: storagePolicyRes{
nodeSets: nodeSets,
repCounts: repCounts,
},
networkMap: networkMap,
networkMap: networkMap,
cnrID: cnrID,
curEpoch: curEpoch,
containerNodes: c.cfgObject.containerNodes,
}, nil

Check warning on line 798 in cmd/neofs-node/object.go

View check run for this annotation

Codecov / codecov/patch

cmd/neofs-node/object.go#L789-L798

Added lines #L789 - L798 were not covered by tests
}

// implements [putsvc.ContainerNodes].
type containerNodesSorter struct {
policy storagePolicyRes
networkMap *netmapsdk.NetMap
}

func (x containerNodesSorter) Unsorted() [][]netmapsdk.NodeInfo { return x.policy.nodeSets }
func (x containerNodesSorter) PrimaryCounts() []uint { return x.policy.repCounts }
func (x containerNodesSorter) SortForObject(obj oid.ID) ([][]netmapsdk.NodeInfo, error) {
return x.networkMap.PlacementVectors(x.policy.nodeSets, obj)
policy storagePolicyRes
networkMap *netmapsdk.NetMap
cnrID cid.ID
curEpoch uint64
containerNodes *containerNodes
}

func (x *containerNodesSorter) Unsorted() [][]netmapsdk.NodeInfo { return x.policy.nodeSets }
func (x *containerNodesSorter) PrimaryCounts() []uint { return x.policy.repCounts }
func (x *containerNodesSorter) SortForObject(obj oid.ID) ([][]netmapsdk.NodeInfo, error) {
cacheKey := objectNodesCacheKey{epoch: x.curEpoch}
cacheKey.addr.SetContainer(x.cnrID)
cacheKey.addr.SetObject(obj)
res, ok := x.containerNodes.objCache.Get(cacheKey)
if ok {
return res.nodeSets, res.err

Check warning on line 818 in cmd/neofs-node/object.go

View check run for this annotation

Codecov / codecov/patch

cmd/neofs-node/object.go#L810-L818

Added lines #L810 - L818 were not covered by tests
}
if x.networkMap == nil {
var err error
if x.networkMap, err = x.containerNodes.network.GetNetMapByEpoch(x.curEpoch); err != nil {

Check warning on line 822 in cmd/neofs-node/object.go

View check run for this annotation

Codecov / codecov/patch

cmd/neofs-node/object.go#L820-L822

Added lines #L820 - L822 were not covered by tests
// non-persistent error => do not cache
return nil, fmt.Errorf("read network map by epoch: %w", err)

Check warning on line 824 in cmd/neofs-node/object.go

View check run for this annotation

Codecov / codecov/patch

cmd/neofs-node/object.go#L824

Added line #L824 was not covered by tests
}
}
res.repCounts = x.policy.repCounts
res.nodeSets, res.err = x.containerNodes.sortContainerNodesFunc(*x.networkMap, x.policy.nodeSets, obj)
if res.err != nil {
res.err = fmt.Errorf("sort container nodes for object: %w", res.err)

Check warning on line 830 in cmd/neofs-node/object.go

View check run for this annotation

Codecov / codecov/patch

cmd/neofs-node/object.go#L827-L830

Added lines #L827 - L830 were not covered by tests
}
x.containerNodes.objCache.Add(cacheKey, res)
return res.nodeSets, res.err

Check warning on line 833 in cmd/neofs-node/object.go

View check run for this annotation

Codecov / codecov/patch

cmd/neofs-node/object.go#L832-L833

Added lines #L832 - L833 were not covered by tests
}
30 changes: 19 additions & 11 deletions cmd/neofs-node/policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,17 +161,9 @@ func (x *containerNodes) getNodesForObject(addr oid.Address) ([][]netmapsdk.Node
if ok {
return res.nodeSets, res.repCounts, res.err
}
cnrRes, networkMap, err := (&containerPolicyContext{
id: addr.Container(),
containers: x.containers,
network: x.network,
getNodesFunc: x.getContainerNodesFunc,
}).applyToNetmap(curEpoch, x.cache)
if err != nil || cnrRes.err != nil {
if err == nil {
err = cnrRes.err // cached in x.cache, no need to store in x.objCache
}
return nil, nil, fmt.Errorf("select container nodes for current epoch #%d: %w", curEpoch, err)
cnrRes, networkMap, err := x.getForCurrentEpoch(curEpoch, addr.Container())
if err != nil {
return nil, nil, err
}
if networkMap == nil {
if networkMap, err = x.network.GetNetMapByEpoch(curEpoch); err != nil {
Expand All @@ -188,6 +180,22 @@ func (x *containerNodes) getNodesForObject(addr oid.Address) ([][]netmapsdk.Node
return res.nodeSets, res.repCounts, res.err
}

func (x *containerNodes) getForCurrentEpoch(curEpoch uint64, cnr cid.ID) (storagePolicyRes, *netmapsdk.NetMap, error) {
policy, networkMap, err := (&containerPolicyContext{
id: cnr,
containers: x.containers,
network: x.network,
getNodesFunc: x.getContainerNodesFunc,
}).applyToNetmap(curEpoch, x.cache)
if err != nil || policy.err != nil {
if err == nil {
err = policy.err // cached in x.cache, no need to store in x.objCache
}
return storagePolicyRes{}, nil, fmt.Errorf("select container nodes for current epoch #%d: %w", curEpoch, err)
}
return policy, networkMap, nil
}

// preserves context of storage policy processing for the particular container.
type containerPolicyContext struct {
// static
Expand Down

0 comments on commit 729927e

Please sign in to comment.