Skip to content

Commit

Permalink
configurable data path concurrency: all in one json
Browse files Browse the repository at this point in the history
Signed-off-by: Lyndon-Li <lyonghui@vmware.com>
  • Loading branch information
Lyndon-Li committed Nov 8, 2023
1 parent c638ca5 commit db43200
Show file tree
Hide file tree
Showing 5 changed files with 58 additions and 46 deletions.
53 changes: 30 additions & 23 deletions design/node-agent-concurrency.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,15 @@ Therefore, in order to gain the optimized performance with the limited resources

We introduce a configMap named ```node-agent-configs``` for users to specify the node-agent related configurations. This configMap is not created by Velero, users should create it manually on demand. The configMap should be in the same namespace where Velero is installed. If multiple Velero instances are installed in different namespaces, there should be one configMap in each namespace which applies to node-agent in that namespace only.
Node-agent server checks these configurations at startup time and use it to initiate the related VGDP modules. Therefore, users could edit this configMap any time, but in order to make the changes effective, node-agent server needs to be restarted.
The ```node-agent-configs``` configMap may be used for other purpose of configuring node-agent in future, at present, there is only one kind of configuration as the data in the configMap, the name is ```data-path-concurrency```.
The ```node-agent-configs``` configMap may be used for other purpose of configuring node-agent in future, at present, there is only one kind of configuration as the data in the configMap, the name is ```dataPathConcurrency```.

The data structure for ```data-path-concurrency``` is as below:
The data structure for ```node-agent-configs``` is as below:
```go
type Configs struct {
// DataPathConcurrency is the config for data path concurrency per node.
DataPathConcurrency *DataPathConcurrency `json:"dataPathConcurrency,omitempty"`
}

type DataPathConcurrency struct {
// GlobalConfig specifies the concurrency number to all nodes for which per-node config is not specified
GlobalConfig int `json:"globalConfig,omitempty"`
Expand All @@ -50,7 +55,7 @@ type RuledConfigs struct {
```

### Global concurrent number
We allow users to specify a concurrent number that will be applied to all nodes if the per-node number is not specified. This number is set through ```globalConfig``` field in ```data-path-concurrency```.
We allow users to specify a concurrent number that will be applied to all nodes if the per-node number is not specified. This number is set through ```globalConfig``` field in ```dataPathConcurrency```.
The number starts from 1 which means there is no concurrency, only one instance of VGDP is allowed. There is no roof limit.
If this number is not specified or not valid, a hard-coded default value will be used, the value is set to 1.

Expand All @@ -62,7 +67,7 @@ We allow users to specify different concurrent number per node, for example, use
The range of Per-node concurrent number is the same with Global concurrent number.
Per-node concurrent number is preferable to Global concurrent number, so it will overwrite the Global concurrent number for that node.

Per-node concurrent number is implemented through ```perNodeConfig``` field in ```data-path-concurrency```.
Per-node concurrent number is implemented through ```perNodeConfig``` field in ```dataPathConcurrency```.

```perNodeConfig``` is a list of ```RuledConfigs``` each item of which matches one or more nodes by label selectors and specify the concurrent number for the matched nodes. This means, the nodes are identified by labels.

Expand All @@ -80,30 +85,32 @@ If one node falls into more than one rules, e.g., if node1 also has the label ``
A sample of the ```node-agent-configs``` configMap is as below:
```json
{
"globalConfig": 2,
"perNodeConfig": [
{
"nodeSelector": {
"matchLabels": {
"kubernetes.io/hostname": "node1"
}
},
"number": 3
},
{
"nodeSelector": {
"matchLabels": {
"beta.kubernetes.io/instance-type": "Standard_B4ms"
}
"dataPathConcurrency": {
"globalConfig": 2,
"perNodeConfig": [
{
"nodeSelector": {
"matchLabels": {
"kubernetes.io/hostname": "node1"
}
},
"number": 3
},
"number": 5
}
]
{
"nodeSelector": {
"matchLabels": {
"beta.kubernetes.io/instance-type": "Standard_B4ms"
}
},
"number": 5
}
]
}
}
```
To create the configMap, users need to save something like the above sample to a json file and then run below command:
```
kubectl create cm node-agent-configs -n velero --from-file=data-path-concurrency=<json file name>
kubectl create cm node-agent-configs -n velero --from-file=<json file name>
```

### Global data path manager
Expand Down
2 changes: 1 addition & 1 deletion pkg/cmd/cli/nodeagent/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -499,7 +499,7 @@ func (s *nodeAgentServer) getDataPathConcurrentNum(defaultNum int) int {
}

if configs == nil || configs.DataPathConcurrency == nil {
s.logger.Infof("Node agent configs are not found, use the default number %v", defaultNum)
s.logger.Infof("Concurrency configs are not found, use the default number %v", defaultNum)
return defaultNum
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/cmd/cli/nodeagent/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,15 +161,15 @@ func Test_getDataPathConcurrentNum(t *testing.T) {
getFunc: func(context.Context, string, kubernetes.Interface) (*nodeagent.Configs, error) {
return nil, nil
},
expectLog: fmt.Sprintf("Node agent configs are not found, use the default number %v", defaultNum),
expectLog: fmt.Sprintf("Concurrency configs are not found, use the default number %v", defaultNum),
expectNum: defaultNum,
},
{
name: "configs cm's data path concurrency is nil",
getFunc: func(context.Context, string, kubernetes.Interface) (*nodeagent.Configs, error) {
return &nodeagent.Configs{}, nil
},
expectLog: fmt.Sprintf("Node agent configs are not found, use the default number %v", defaultNum),
expectLog: fmt.Sprintf("Concurrency configs are not found, use the default number %v", defaultNum),
expectNum: defaultNum,
},
{
Expand Down
16 changes: 8 additions & 8 deletions pkg/nodeagent/node_agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ type RuledConfigs struct {

type Configs struct {
// DataPathConcurrency is the config for data path concurrency per node.
DataPathConcurrency *DataPathConcurrency
DataPathConcurrency *DataPathConcurrency `json:"dataPathConcurrency,omitempty"`
}

// IsRunning checks if the node agent daemonset is running properly. If not, return the error found
Expand Down Expand Up @@ -128,16 +128,16 @@ func GetConfigs(ctx context.Context, namespace string, kubeClient kubernetes.Int
return nil, errors.Errorf("data is not available in config map %s", configName)
}

jsonString, exist := cm.Data[dataPathConConfigName]
if !exist {
return nil, nil
jsonString := ""
for _, v := range cm.Data {
jsonString = v
}

concurrencyConfigs := &DataPathConcurrency{}
err = json.Unmarshal([]byte(jsonString), concurrencyConfigs)
configs := &Configs{}
err = json.Unmarshal([]byte(jsonString), configs)
if err != nil {
return nil, errors.Wrapf(err, "error to unmarshall data path concurrency configs from %s", configName)
return nil, errors.Wrapf(err, "error to unmarshall configs from %s", configName)
}

return &Configs{DataPathConcurrency: concurrencyConfigs}, nil
return configs, nil
}
29 changes: 17 additions & 12 deletions pkg/nodeagent/node_agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,16 +242,16 @@ func TestGetPodSpec(t *testing.T) {

func TestGetConfigs(t *testing.T) {
cm := builder.ForConfigMap("fake-ns", "node-agent-configs").Result()
cmWithInvalidData := builder.ForConfigMap("fake-ns", "node-agent-configs").Data("fake-key", "fake-value").Result()
cmWithInvalidDataFormat := builder.ForConfigMap("fake-ns", "node-agent-configs").Data("data-path-concurrency", "wrong").Result()
cmWithValidData := builder.ForConfigMap("fake-ns", "node-agent-configs").Data("data-path-concurrency", "{\"globalConfig\": 5}").Result()
cmWithInvalidDataFormat := builder.ForConfigMap("fake-ns", "node-agent-configs").Data("fake-key", "wrong").Result()
cmWithoutCocurrentData := builder.ForConfigMap("fake-ns", "node-agent-configs").Data("fake-key", "{\"someothers\":{\"someother\": 10}}").Result()
cmWithValidData := builder.ForConfigMap("fake-ns", "node-agent-configs").Data("fake-key", "{\"dataPathConcurrency\":{\"globalConfig\": 5}}").Result()

tests := []struct {
name string
kubeClientObj []runtime.Object
namespace string
kubeReactors []reactor
expectResult *DataPathConcurrency
expectResult *Configs
expectErr string
}{
{
Expand Down Expand Up @@ -281,28 +281,31 @@ func TestGetConfigs(t *testing.T) {
expectErr: "data is not available in config map node-agent-configs",
},
{
name: "cm's data is not found",
name: "cm's data is with invalid format",
namespace: "fake-ns",
kubeClientObj: []runtime.Object{
cmWithInvalidData,
cmWithInvalidDataFormat,
},
expectErr: "error to unmarshall configs from node-agent-configs: invalid character 'w' looking for beginning of value",
},
{
name: "cm's data is with invalid format",
name: "concurrency configs are not found",
namespace: "fake-ns",
kubeClientObj: []runtime.Object{
cmWithInvalidDataFormat,
cmWithoutCocurrentData,
},
expectErr: "error to unmarshall data path concurrency configs from node-agent-configs: invalid character 'w' looking for beginning of value",
expectResult: &Configs{nil},
},
{
name: "success",
namespace: "fake-ns",
kubeClientObj: []runtime.Object{
cmWithValidData,
},
expectResult: &DataPathConcurrency{
GlobalConfig: 5,
expectResult: &Configs{
DataPathConcurrency: &DataPathConcurrency{
GlobalConfig: 5,
},
},
},
}
Expand All @@ -321,8 +324,10 @@ func TestGetConfigs(t *testing.T) {

if test.expectResult == nil {
assert.Nil(t, result)
} else if test.expectResult.DataPathConcurrency == nil {
assert.Nil(t, result.DataPathConcurrency)
} else {
assert.Equal(t, *test.expectResult, *result.DataPathConcurrency)
assert.Equal(t, *test.expectResult.DataPathConcurrency, *result.DataPathConcurrency)
}
} else {
assert.EqualError(t, err, test.expectErr)
Expand Down

0 comments on commit db43200

Please sign in to comment.