Skip to content

Commit

Permalink
Merge pull request vmware-tanzu#7059 from Lyndon-Li/issue-fix-6663
Browse files Browse the repository at this point in the history
Issue 6663: changes for configurable data path concurrency
  • Loading branch information
qiuming-best authored Nov 9, 2023
2 parents 5f7e16b + db43200 commit 76e89f7
Show file tree
Hide file tree
Showing 15 changed files with 786 additions and 40 deletions.
1 change: 1 addition & 0 deletions changelogs/unreleased/7059-Lyndon-Li
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add the implementation for design #6950, configurable data path concurrency
53 changes: 30 additions & 23 deletions design/node-agent-concurrency.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,15 @@ Therefore, in order to gain the optimized performance with the limited resources

We introduce a configMap named ```node-agent-configs``` for users to specify the node-agent related configurations. This configMap is not created by Velero, users should create it manually on demand. The configMap should be in the same namespace where Velero is installed. If multiple Velero instances are installed in different namespaces, there should be one configMap in each namespace which applies to node-agent in that namespace only.
Node-agent server checks these configurations at startup time and use it to initiate the related VGDP modules. Therefore, users could edit this configMap any time, but in order to make the changes effective, node-agent server needs to be restarted.
The ```node-agent-configs``` configMap may be used for other purpose of configuring node-agent in future, at present, there is only one kind of configuration as the data in the configMap, the name is ```data-path-concurrency```.
The ```node-agent-configs``` configMap may be used for other purpose of configuring node-agent in future, at present, there is only one kind of configuration as the data in the configMap, the name is ```dataPathConcurrency```.

The data structure for ```data-path-concurrency``` is as below:
The data structure for ```node-agent-configs``` is as below:
```go
type Configs struct {
// DataPathConcurrency is the config for data path concurrency per node.
DataPathConcurrency *DataPathConcurrency `json:"dataPathConcurrency,omitempty"`
}

type DataPathConcurrency struct {
// GlobalConfig specifies the concurrency number to all nodes for which per-node config is not specified
GlobalConfig int `json:"globalConfig,omitempty"`
Expand All @@ -50,7 +55,7 @@ type RuledConfigs struct {
```

### Global concurrent number
We allow users to specify a concurrent number that will be applied to all nodes if the per-node number is not specified. This number is set through ```globalConfig``` field in ```data-path-concurrency```.
We allow users to specify a concurrent number that will be applied to all nodes if the per-node number is not specified. This number is set through ```globalConfig``` field in ```dataPathConcurrency```.
The number starts from 1 which means there is no concurrency, only one instance of VGDP is allowed. There is no roof limit.
If this number is not specified or not valid, a hard-coded default value will be used, the value is set to 1.

Expand All @@ -62,7 +67,7 @@ We allow users to specify different concurrent number per node, for example, use
The range of Per-node concurrent number is the same with Global concurrent number.
Per-node concurrent number is preferable to Global concurrent number, so it will overwrite the Global concurrent number for that node.

Per-node concurrent number is implemented through ```perNodeConfig``` field in ```data-path-concurrency```.
Per-node concurrent number is implemented through ```perNodeConfig``` field in ```dataPathConcurrency```.

```perNodeConfig``` is a list of ```RuledConfigs``` each item of which matches one or more nodes by label selectors and specify the concurrent number for the matched nodes. This means, the nodes are identified by labels.

Expand All @@ -80,30 +85,32 @@ If one node falls into more than one rules, e.g., if node1 also has the label ``
A sample of the ```node-agent-configs``` configMap is as below:
```json
{
"globalConfig": 2,
"perNodeConfig": [
{
"nodeSelector": {
"matchLabels": {
"kubernetes.io/hostname": "node1"
}
},
"number": 3
},
{
"nodeSelector": {
"matchLabels": {
"beta.kubernetes.io/instance-type": "Standard_B4ms"
}
"dataPathConcurrency": {
"globalConfig": 2,
"perNodeConfig": [
{
"nodeSelector": {
"matchLabels": {
"kubernetes.io/hostname": "node1"
}
},
"number": 3
},
"number": 5
}
]
{
"nodeSelector": {
"matchLabels": {
"beta.kubernetes.io/instance-type": "Standard_B4ms"
}
},
"number": 5
}
]
}
}
```
To create the configMap, users need to save something like the above sample to a json file and then run below command:
```
kubectl create cm node-agent-configs -n velero --from-file=data-path-concurrency=<json file name>
kubectl create cm node-agent-configs -n velero --from-file=<json file name>
```

### Global data path manager
Expand Down
5 changes: 5 additions & 0 deletions pkg/builder/node_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,11 @@ func ForNode(name string) *NodeBuilder {
}
}

func (b *NodeBuilder) Labels(labels map[string]string) *NodeBuilder {
b.object.Labels = labels
return b
}

// Result returns the built Node.
func (b *NodeBuilder) Result() *corev1api.Node {
return b.object
Expand Down
5 changes: 5 additions & 0 deletions pkg/builder/pod_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,3 +101,8 @@ func (b *PodBuilder) ContainerStatuses(containerStatuses ...*corev1api.Container
}
return b
}

func (b *PodBuilder) Phase(phase corev1api.PodPhase) *PodBuilder {
b.object.Status.Phase = phase
return b
}
80 changes: 76 additions & 4 deletions pkg/cmd/cli/nodeagent/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package nodeagent
import (
"context"
"fmt"
"math"
"net/http"
"os"
"strings"
Expand All @@ -32,6 +33,7 @@ import (
storagev1api "k8s.io/api/storage/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
Expand All @@ -53,7 +55,9 @@ import (
"github.com/vmware-tanzu/velero/pkg/cmd"
"github.com/vmware-tanzu/velero/pkg/cmd/util/signals"
"github.com/vmware-tanzu/velero/pkg/controller"
"github.com/vmware-tanzu/velero/pkg/datapath"
"github.com/vmware-tanzu/velero/pkg/metrics"
"github.com/vmware-tanzu/velero/pkg/nodeagent"
"github.com/vmware-tanzu/velero/pkg/repository"
"github.com/vmware-tanzu/velero/pkg/util/filesystem"
"github.com/vmware-tanzu/velero/pkg/util/logging"
Expand All @@ -73,6 +77,7 @@ const (

defaultResourceTimeout = 10 * time.Minute
defaultDataMoverPrepareTimeout = 30 * time.Minute
defaultDataPathConcurrentNum = 1
)

type nodeAgentServerConfig struct {
Expand Down Expand Up @@ -132,6 +137,7 @@ type nodeAgentServer struct {
config nodeAgentServerConfig
kubeClient kubernetes.Interface
csiSnapshotClient *snapshotv1client.Clientset
dataPathMgr *datapath.Manager
}

func newNodeAgentServer(logger logrus.FieldLogger, factory client.Factory, config nodeAgentServerConfig) (*nodeAgentServer, error) {
Expand Down Expand Up @@ -219,6 +225,10 @@ func newNodeAgentServer(logger logrus.FieldLogger, factory client.Factory, confi
if err != nil {
return nil, err
}

dataPathConcurrentNum := s.getDataPathConcurrentNum(defaultDataPathConcurrentNum)
s.dataPathMgr = datapath.NewManager(dataPathConcurrentNum)

return s, nil
}

Expand Down Expand Up @@ -263,24 +273,24 @@ func (s *nodeAgentServer) run() {

credentialGetter := &credentials.CredentialGetter{FromFile: credentialFileStore, FromSecret: credSecretStore}
repoEnsurer := repository.NewEnsurer(s.mgr.GetClient(), s.logger, s.config.resourceTimeout)
pvbReconciler := controller.NewPodVolumeBackupReconciler(s.mgr.GetClient(), repoEnsurer,
pvbReconciler := controller.NewPodVolumeBackupReconciler(s.mgr.GetClient(), s.dataPathMgr, repoEnsurer,
credentialGetter, s.nodeName, s.mgr.GetScheme(), s.metrics, s.logger)

if err := pvbReconciler.SetupWithManager(s.mgr); err != nil {
s.logger.Fatal(err, "unable to create controller", "controller", controller.PodVolumeBackup)
}

if err = controller.NewPodVolumeRestoreReconciler(s.mgr.GetClient(), repoEnsurer, credentialGetter, s.logger).SetupWithManager(s.mgr); err != nil {
if err = controller.NewPodVolumeRestoreReconciler(s.mgr.GetClient(), s.dataPathMgr, repoEnsurer, credentialGetter, s.logger).SetupWithManager(s.mgr); err != nil {
s.logger.WithError(err).Fatal("Unable to create the pod volume restore controller")
}

dataUploadReconciler := controller.NewDataUploadReconciler(s.mgr.GetClient(), s.kubeClient, s.csiSnapshotClient.SnapshotV1(), repoEnsurer, clock.RealClock{}, credentialGetter, s.nodeName, s.fileSystem, s.config.dataMoverPrepareTimeout, s.logger, s.metrics)
dataUploadReconciler := controller.NewDataUploadReconciler(s.mgr.GetClient(), s.kubeClient, s.csiSnapshotClient.SnapshotV1(), s.dataPathMgr, repoEnsurer, clock.RealClock{}, credentialGetter, s.nodeName, s.fileSystem, s.config.dataMoverPrepareTimeout, s.logger, s.metrics)
s.markDataUploadsCancel(dataUploadReconciler)
if err = dataUploadReconciler.SetupWithManager(s.mgr); err != nil {
s.logger.WithError(err).Fatal("Unable to create the data upload controller")
}

dataDownloadReconciler := controller.NewDataDownloadReconciler(s.mgr.GetClient(), s.kubeClient, repoEnsurer, credentialGetter, s.nodeName, s.config.dataMoverPrepareTimeout, s.logger, s.metrics)
dataDownloadReconciler := controller.NewDataDownloadReconciler(s.mgr.GetClient(), s.kubeClient, s.dataPathMgr, repoEnsurer, credentialGetter, s.nodeName, s.config.dataMoverPrepareTimeout, s.logger, s.metrics)
s.markDataDownloadsCancel(dataDownloadReconciler)
if err = dataDownloadReconciler.SetupWithManager(s.mgr); err != nil {
s.logger.WithError(err).Fatal("Unable to create the data download controller")
Expand Down Expand Up @@ -478,3 +488,65 @@ func (s *nodeAgentServer) markInProgressPVRsFailed(client ctrlclient.Client) {
s.logger.WithField("podvolumerestore", pvr.GetName()).Warn(pvr.Status.Message)
}
}

var getConfigsFunc = nodeagent.GetConfigs

func (s *nodeAgentServer) getDataPathConcurrentNum(defaultNum int) int {
configs, err := getConfigsFunc(s.ctx, s.namespace, s.kubeClient)
if err != nil {
s.logger.WithError(err).Warn("Failed to get node agent configs")
return defaultNum
}

if configs == nil || configs.DataPathConcurrency == nil {
s.logger.Infof("Concurrency configs are not found, use the default number %v", defaultNum)
return defaultNum
}

globalNum := configs.DataPathConcurrency.GlobalConfig

if globalNum <= 0 {
s.logger.Warnf("Global number %v is invalid, use the default value %v", globalNum, defaultNum)
globalNum = defaultNum
}

if len(configs.DataPathConcurrency.PerNodeConfig) == 0 {
return globalNum
}

curNode, err := s.kubeClient.CoreV1().Nodes().Get(s.ctx, s.nodeName, metav1.GetOptions{})
if err != nil {
s.logger.WithError(err).Warnf("Failed to get node info for %s, use the global number %v", s.nodeName, globalNum)
return globalNum
}

concurrentNum := math.MaxInt32

for _, rule := range configs.DataPathConcurrency.PerNodeConfig {
selector, err := metav1.LabelSelectorAsSelector(&rule.NodeSelector)
if err != nil {
s.logger.WithError(err).Warnf("Failed to parse rule with label selector %s, skip it", rule.NodeSelector.String())
continue
}

if rule.Number <= 0 {
s.logger.Warnf("Rule with label selector %s is with an invalid number %v, skip it", rule.NodeSelector.String(), rule.Number)
continue
}

if selector.Matches(labels.Set(curNode.GetLabels())) {
if concurrentNum > rule.Number {
concurrentNum = rule.Number
}
}
}

if concurrentNum == math.MaxInt32 {
s.logger.Infof("Per node number for node %s is not found, use the global number %v", s.nodeName, globalNum)
concurrentNum = globalNum
} else {
s.logger.Infof("Use the per node number %v over global number %v for node %s", concurrentNum, globalNum, s.nodeName)
}

return concurrentNum
}
Loading

0 comments on commit 76e89f7

Please sign in to comment.