Skip to content

Commit

Permalink
node/object: Cache object policy application results for Put service
Browse files Browse the repository at this point in the history
Continues 2f29338 for `ObjectService`'s
`Put` server handler. It shares the cache with other object processors.

Refs #1803.

Signed-off-by: Leonard Lyubich <leonard@morphbits.io>
  • Loading branch information
cthulhu-rider committed Aug 9, 2024
1 parent 89ac0d3 commit e6ea202
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 35 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ Changelog for NeoFS Node
- `ObjectService.Put` server of in-container node places objects using new `ObjectService.Replicate` RPC (#2802)
- `ObjectService`'s `Put` and `Replicate` RPC handlers cache up to 1000 lists of container nodes (#2892)
- Default max_traceable_blocks Morph setting lowered to 17280 from 2102400 (#2897)
- `ObjectService`'s `Get`/`Head`/`GetRange` RPC handlers cache up to 10K lists of per-object sorted container nodes (#2896)
- `ObjectService`'s `Get`/`Head`/`GetRange`/`Put` RPC handlers cache up to 10K lists of per-object sorted container nodes (#2896, #2901)

### Removed

Expand Down
56 changes: 33 additions & 23 deletions cmd/neofs-node/object.go
Original file line number Diff line number Diff line change
Expand Up @@ -746,44 +746,54 @@ func (c *cfg) GetNodesForObject(addr oid.Address) ([][]netmapsdk.NodeInfo, []uin
//
// GetContainerNodes implements [putsvc.NeoFSNetwork].
func (c *cfg) GetContainerNodes(cnrID cid.ID) (putsvc.ContainerNodes, error) {
cnr, err := c.cfgObject.containerNodes.containers.Get(cnrID)
if err != nil {
return nil, fmt.Errorf("read container by ID: %w", err)
}
curEpoch, err := c.cfgObject.containerNodes.network.Epoch()
if err != nil {
return nil, fmt.Errorf("read current NeoFS epoch: %w", err)
}
networkMap, err := c.cfgObject.containerNodes.network.GetNetMapByEpoch(curEpoch)
if err != nil {
return nil, fmt.Errorf("read network map at the current epoch #%d: %w", curEpoch, err)
}
policy := cnr.Value.PlacementPolicy()
nodeSets, err := networkMap.ContainerNodes(cnr.Value.PlacementPolicy(), cnrID)
policy, networkMap, err := c.cfgObject.containerNodes.getForCurrentEpoch(curEpoch, cnrID)
if err != nil {
return nil, fmt.Errorf("apply container storage policy to the network map at current epoch #%d: %w", curEpoch, err)
}
repCounts := make([]uint, policy.NumberOfReplicas())
for i := range repCounts {
repCounts[i] = uint(policy.ReplicaNumberByIndex(i))
return nil, err
}
return &containerNodesSorter{
policy: storagePolicyRes{
nodeSets: nodeSets,
repCounts: repCounts,
},
networkMap: networkMap,
policy: policy,
networkMap: networkMap,
cnrID: cnrID,
curEpoch: curEpoch,
containerNodes: c.cfgObject.containerNodes,
}, nil
}

// implements [putsvc.ContainerNodes].
type containerNodesSorter struct {
policy storagePolicyRes
networkMap *netmapsdk.NetMap
policy storagePolicyRes
networkMap *netmapsdk.NetMap
cnrID cid.ID
curEpoch uint64
containerNodes *containerNodes
}

func (x *containerNodesSorter) Unsorted() [][]netmapsdk.NodeInfo { return x.policy.nodeSets }
func (x *containerNodesSorter) PrimaryCounts() []uint { return x.policy.repCounts }
func (x *containerNodesSorter) SortForObject(obj oid.ID) ([][]netmapsdk.NodeInfo, error) {
return x.networkMap.PlacementVectors(x.policy.nodeSets, obj)
cacheKey := objectNodesCacheKey{epoch: x.curEpoch}
cacheKey.addr.SetContainer(x.cnrID)
cacheKey.addr.SetObject(obj)
res, ok := x.containerNodes.objCache.Get(cacheKey)
if ok {
return res.nodeSets, res.err
}
if x.networkMap == nil {
var err error
if x.networkMap, err = x.containerNodes.network.GetNetMapByEpoch(x.curEpoch); err != nil {
// non-persistent error => do not cache
return nil, fmt.Errorf("read network map by epoch: %w", err)
}
}
res.repCounts = x.policy.repCounts
res.nodeSets, res.err = x.containerNodes.sortContainerNodesFunc(*x.networkMap, x.policy.nodeSets, obj)
if res.err != nil {
res.err = fmt.Errorf("sort container nodes for object: %w", res.err)
}
x.containerNodes.objCache.Add(cacheKey, res)
return res.nodeSets, res.err
}
30 changes: 19 additions & 11 deletions cmd/neofs-node/policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,17 +161,9 @@ func (x *containerNodes) getNodesForObject(addr oid.Address) ([][]netmapsdk.Node
if ok {
return res.nodeSets, res.repCounts, res.err
}
cnrRes, networkMap, err := (&containerPolicyContext{
id: addr.Container(),
containers: x.containers,
network: x.network,
getNodesFunc: x.getContainerNodesFunc,
}).applyToNetmap(curEpoch, x.cache)
if err != nil || cnrRes.err != nil {
if err == nil {
err = cnrRes.err // cached in x.cache, no need to store in x.objCache
}
return nil, nil, fmt.Errorf("select container nodes for current epoch #%d: %w", curEpoch, err)
cnrRes, networkMap, err := x.getForCurrentEpoch(curEpoch, addr.Container())
if err != nil {
return nil, nil, err
}
if networkMap == nil {
if networkMap, err = x.network.GetNetMapByEpoch(curEpoch); err != nil {
Expand All @@ -188,6 +180,22 @@ func (x *containerNodes) getNodesForObject(addr oid.Address) ([][]netmapsdk.Node
return res.nodeSets, res.repCounts, res.err
}

func (x *containerNodes) getForCurrentEpoch(curEpoch uint64, cnr cid.ID) (storagePolicyRes, *netmapsdk.NetMap, error) {
policy, networkMap, err := (&containerPolicyContext{
id: cnr,
containers: x.containers,
network: x.network,
getNodesFunc: x.getContainerNodesFunc,
}).applyToNetmap(curEpoch, x.cache)
if err != nil || policy.err != nil {
if err == nil {
err = policy.err // cached in x.cache, no need to store in x.objCache
}
return storagePolicyRes{}, nil, fmt.Errorf("select container nodes for current epoch #%d: %w", curEpoch, err)
}
return policy, networkMap, nil
}

// preserves context of storage policy processing for the particular container.
type containerPolicyContext struct {
// static
Expand Down

0 comments on commit e6ea202

Please sign in to comment.