Skip to content

Commit

Permalink
Restrict scaling down if new memory size is less than assigned memory (
Browse files Browse the repository at this point in the history
  • Loading branch information
powerfooI authored May 24, 2024
1 parent 875e41f commit 1c01490
Show file tree
Hide file tree
Showing 11 changed files with 256 additions and 19 deletions.
77 changes: 77 additions & 0 deletions api/v1alpha1/helpers.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
/*
Copyright (c) 2024 OceanBase
ob-operator is licensed under Mulan PSL v2.
You can use this software according to the terms and conditions of the Mulan PSL v2.
You may obtain a copy of Mulan PSL v2 at:
http://license.coscl.org.cn/MulanPSL2
THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
See the Mulan PSL v2 for more details.
*/

package v1alpha1

import (
"context"

"github.com/go-logr/logr"
"github.com/pkg/errors"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/client"

oceanbaseconst "github.com/oceanbase/ob-operator/internal/const/oceanbase"
clusterstatus "github.com/oceanbase/ob-operator/internal/const/status/obcluster"
"github.com/oceanbase/ob-operator/pkg/oceanbase-sdk/connector"
"github.com/oceanbase/ob-operator/pkg/oceanbase-sdk/operation"
)

func readPassword(c client.Client, namespace, secretName string) (string, error) {
secret := &corev1.Secret{}
err := c.Get(context.Background(), types.NamespacedName{
Namespace: namespace,
Name: secretName,
}, secret)
if err != nil {
return "", errors.Wrapf(err, "Get password from secret %s failed", secretName)
}
return string(secret.Data["password"]), err
}

func getSysClient(c client.Client, logger *logr.Logger, obcluster *OBCluster, userName, tenantName, secretName string) (*operation.OceanbaseOperationManager, error) {
observerList := &OBServerList{}
err := c.List(context.Background(), observerList, client.MatchingLabels{
oceanbaseconst.LabelRefOBCluster: obcluster.Name,
}, client.InNamespace(obcluster.Namespace))
if err != nil {
return nil, errors.Wrap(err, "Get observer list")
}
if len(observerList.Items) == 0 {
return nil, errors.Errorf("No observer belongs to cluster %s", obcluster.Name)
}

var s *connector.OceanBaseDataSource
password, err := readPassword(c, obcluster.Namespace, secretName)
if err != nil {
return nil, errors.Wrapf(err, "Read password to get oceanbase operation manager of cluster %s", obcluster.Name)
}
for _, observer := range observerList.Items {
address := observer.Status.GetConnectAddr()
switch obcluster.Status.Status {
case clusterstatus.New:
s = connector.NewOceanBaseDataSource(address, oceanbaseconst.SqlPort, oceanbaseconst.RootUser, tenantName, "", "")
case clusterstatus.Bootstrapped:
s = connector.NewOceanBaseDataSource(address, oceanbaseconst.SqlPort, oceanbaseconst.RootUser, tenantName, "", oceanbaseconst.DefaultDatabase)
default:
s = connector.NewOceanBaseDataSource(address, oceanbaseconst.SqlPort, userName, tenantName, password, oceanbaseconst.DefaultDatabase)
}
// if err is nil, db connection is already checked available
sysClient, err := operation.GetOceanbaseOperationManager(s)
if err == nil && sysClient != nil {
sysClient.Logger = logger
return sysClient, nil
}
}
return nil, errors.Errorf("Can not get oceanbase operation manager of obcluster %s after checked all servers", obcluster.Name)
}
3 changes: 3 additions & 0 deletions api/v1alpha1/obcluster_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,9 @@ type OBClusterStatus struct {
//+kubebuilder:subresource:status
//+kubebuilder:printcolumn:name="Status",type="string",JSONPath=".status.status"
//+kubebuilder:printcolumn:name="Age",type="date",JSONPath=".metadata.creationTimestamp"
//+kubebuilder:printcolumn:name="Tasks",type="string",JSONPath=".status.operationContext.tasks",priority=1
//+kubebuilder:printcolumn:name="Task",type="string",JSONPath=".status.operationContext.task",priority=1
//+kubebuilder:printcolumn:name="TaskIdx",type="string",JSONPath=".status.operationContext.idx",priority=1

// OBCluster is the Schema for the obclusters API
type OBCluster struct {
Expand Down
66 changes: 52 additions & 14 deletions api/v1alpha1/obcluster_webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (
apitypes "github.com/oceanbase/ob-operator/api/types"
obcfg "github.com/oceanbase/ob-operator/internal/config/operator"
oceanbaseconst "github.com/oceanbase/ob-operator/internal/const/oceanbase"
clusterstatus "github.com/oceanbase/ob-operator/internal/const/status/obcluster"
)

// log is for logging in this package.
Expand All @@ -50,8 +51,6 @@ func (r *OBCluster) SetupWebhookWithManager(mgr ctrl.Manager) error {
Complete()
}

// TODO(user): EDIT THIS FILE! THIS IS SCAFFOLDING FOR YOU TO OWN!

//+kubebuilder:webhook:path=/mutate-oceanbase-oceanbase-com-v1alpha1-obcluster,mutating=true,failurePolicy=fail,sideEffects=None,groups=oceanbase.oceanbase.com,resources=obclusters,verbs=create;update,versions=v1alpha1,name=mobcluster.kb.io,admissionReviewVersions=v1

var _ webhook.Defaulter = &OBCluster{}
Expand Down Expand Up @@ -186,6 +185,42 @@ func (r *OBCluster) ValidateUpdate(old runtime.Object) (admission.Warnings, erro
} else if !oldCluster.SupportStaticIP() && (oldResource.Cpu != newResource.Cpu || oldResource.Memory != newResource.Memory) {
return nil, errors.New("forbid to modify cpu or memory quota of non-static-ip cluster")
}
if newResource.Memory.Cmp(oldResource.Memory) < 0 || newResource.Cpu.Cmp(oldResource.Cpu) < 0 {
if r.Status.Status != clusterstatus.Running {
return nil, errors.New("forbid to shrink memory size of non-running cluster")
}
conn, err := getSysClient(clt, &obclusterlog, r, oceanbaseconst.OperatorUser, oceanbaseconst.SysTenant, r.Spec.UserSecrets.Operator)
if err != nil {
return nil, err
}
defer conn.Close()
gvservers, err := conn.ListGVServers(context.Background())
if err != nil {
return nil, err
}
var maxAssignedCPU int64
var maxAssignedMemory int64
var memoryLimitPercent float64
for _, gvserver := range gvservers {
if gvserver.MemAssigned > maxAssignedMemory {
if oldResource.Memory.Value() < gvserver.MemoryLimit {
memoryLimitPercent = 0.9
} else {
memoryLimitPercent = float64(gvserver.MemoryLimit) / oldResource.Memory.AsApproximateFloat64()
}
maxAssignedMemory = gvserver.MemAssigned
}
if gvserver.CPUAssigned > maxAssignedCPU {
maxAssignedCPU = gvserver.CPUAssigned
}
}
if newResource.Memory.AsApproximateFloat64()*memoryLimitPercent < float64(maxAssignedMemory) {
return nil, errors.New("Assigned memory is larger than new memory size")
}
if oldResource.Cpu.Value() > 16 && newResource.Cpu.AsApproximateFloat64() < float64(maxAssignedCPU) {
return nil, errors.New("Assigned CPU is larger than new CPU size")
}
}
if r.Spec.BackupVolume == nil && oldCluster.Spec.BackupVolume != nil {
return nil, errors.New("forbid to remove backup volume")
}
Expand All @@ -195,27 +230,30 @@ func (r *OBCluster) ValidateUpdate(old runtime.Object) (admission.Warnings, erro
err = errors.New("forbid to add backup volume to non-static-ip cluster")
}
}
if r.Spec.OBServerTemplate.Storage.DataStorage.Size.Cmp(oldCluster.Spec.OBServerTemplate.Storage.DataStorage.Size) > 0 {
err = errors.Join(err, r.validateStorageClassAllowExpansion(r.Spec.OBServerTemplate.Storage.DataStorage.StorageClass))

newStorage := r.Spec.OBServerTemplate.Storage
oldStorage := oldCluster.Spec.OBServerTemplate.Storage
if newStorage.DataStorage.Size.Cmp(oldStorage.DataStorage.Size) > 0 {
err = errors.Join(err, r.validateStorageClassAllowExpansion(newStorage.DataStorage.StorageClass))
}
if r.Spec.OBServerTemplate.Storage.LogStorage.Size.Cmp(oldCluster.Spec.OBServerTemplate.Storage.LogStorage.Size) > 0 {
err = errors.Join(err, r.validateStorageClassAllowExpansion(r.Spec.OBServerTemplate.Storage.LogStorage.StorageClass))
if newStorage.LogStorage.Size.Cmp(oldStorage.LogStorage.Size) > 0 {
err = errors.Join(err, r.validateStorageClassAllowExpansion(newStorage.LogStorage.StorageClass))
}
if r.Spec.OBServerTemplate.Storage.RedoLogStorage.Size.Cmp(oldCluster.Spec.OBServerTemplate.Storage.RedoLogStorage.Size) > 0 {
err = errors.Join(err, r.validateStorageClassAllowExpansion(r.Spec.OBServerTemplate.Storage.RedoLogStorage.StorageClass))
if newStorage.RedoLogStorage.Size.Cmp(oldStorage.RedoLogStorage.Size) > 0 {
err = errors.Join(err, r.validateStorageClassAllowExpansion(newStorage.RedoLogStorage.StorageClass))
}
if err != nil {
return nil, err
}

if r.Spec.OBServerTemplate.Storage.DataStorage.Size.Cmp(oldCluster.Spec.OBServerTemplate.Storage.DataStorage.Size) < 0 {
err = errors.Join(err, field.Invalid(field.NewPath("spec").Child("observer").Child("storage").Child("dataStorage").Child("size"), r.Spec.OBServerTemplate.Storage.DataStorage.Size.String(), "forbid to shrink data storage size"))
if newStorage.DataStorage.Size.Cmp(oldStorage.DataStorage.Size) < 0 {
err = errors.Join(err, field.Invalid(field.NewPath("spec").Child("observer").Child("storage").Child("dataStorage").Child("size"), newStorage.DataStorage.Size.String(), "forbid to shrink data storage size"))
}
if r.Spec.OBServerTemplate.Storage.LogStorage.Size.Cmp(oldCluster.Spec.OBServerTemplate.Storage.LogStorage.Size) < 0 {
err = errors.Join(err, field.Invalid(field.NewPath("spec").Child("observer").Child("storage").Child("logStorage").Child("size"), r.Spec.OBServerTemplate.Storage.LogStorage.Size.String(), "forbid to shrink log storage size"))
if newStorage.LogStorage.Size.Cmp(oldStorage.LogStorage.Size) < 0 {
err = errors.Join(err, field.Invalid(field.NewPath("spec").Child("observer").Child("storage").Child("logStorage").Child("size"), newStorage.LogStorage.Size.String(), "forbid to shrink log storage size"))
}
if r.Spec.OBServerTemplate.Storage.RedoLogStorage.Size.Cmp(oldCluster.Spec.OBServerTemplate.Storage.RedoLogStorage.Size) < 0 {
err = errors.Join(err, field.Invalid(field.NewPath("spec").Child("observer").Child("storage").Child("redoLogStorage").Child("size"), r.Spec.OBServerTemplate.Storage.RedoLogStorage.Size.String(), "forbid to shrink redo log storage size"))
if newStorage.RedoLogStorage.Size.Cmp(oldStorage.RedoLogStorage.Size) < 0 {
err = errors.Join(err, field.Invalid(field.NewPath("spec").Child("observer").Child("storage").Child("redoLogStorage").Child("size"), newStorage.RedoLogStorage.Size.String(), "forbid to shrink redo log storage size"))
}
if err != nil {
return nil, err
Expand Down
2 changes: 2 additions & 0 deletions api/v1alpha1/obparameter_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ type OBParameterStatus struct {
//+kubebuilder:printcolumn:name="ClusterName",type="string",JSONPath=".spec.clusterName"
//+kubebuilder:printcolumn:name="Status",type="string",JSONPath=".status.status"
//+kubebuilder:printcolumn:name="Age",type="date",JSONPath=".metadata.creationTimestamp"
//+kubebuilder:printcolumn:name="Key",type="string",JSONPath=".spec.parameter.name"
//+kubebuilder:printcolumn:name="WantedValue",type="string",JSONPath=".spec.parameter.value"

// OBParameter is the Schema for the obparameters API
type OBParameter struct {
Expand Down
12 changes: 12 additions & 0 deletions config/crd/bases/oceanbase.oceanbase.com_obclusters.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,18 @@ spec:
- jsonPath: .metadata.creationTimestamp
name: Age
type: date
- jsonPath: .status.operationContext.tasks
name: Tasks
priority: 1
type: string
- jsonPath: .status.operationContext.task
name: Task
priority: 1
type: string
- jsonPath: .status.operationContext.idx
name: TaskIdx
priority: 1
type: string
name: v1alpha1
schema:
openAPIV3Schema:
Expand Down
6 changes: 6 additions & 0 deletions config/crd/bases/oceanbase.oceanbase.com_obparameters.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,12 @@ spec:
- jsonPath: .metadata.creationTimestamp
name: Age
type: date
- jsonPath: .spec.parameter.name
name: Key
type: string
- jsonPath: .spec.parameter.value
name: WantedValue
type: string
name: v1alpha1
schema:
openAPIV3Schema:
Expand Down
4 changes: 2 additions & 2 deletions config/manager/kustomization.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,5 @@ apiVersion: kustomize.config.k8s.io/v1beta1
kind: Kustomization
images:
- name: controller
newName: oceanbase/ob-operator
newTag: 2.2.1
newName: oceanbasedev/ob-operator
newTag: 2.2.2-alpha.2
1 change: 1 addition & 0 deletions internal/resource/obcluster/names.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ const (
tExpandPVC ttypes.TaskName = "expand pvc"
tMountBackupVolume ttypes.TaskName = "mount backup volume"
tCheckEnvironment ttypes.TaskName = "check environment"
tAdjustParameters ttypes.TaskName = "adjust parameters"
tAnnotateOBCluster ttypes.TaskName = "annotate obcluster"
tRollingUpdateOBZones ttypes.TaskName = "rolling update observers"
)
8 changes: 5 additions & 3 deletions internal/resource/obcluster/obcluster_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@ ob-operator is licensed under Mulan PSL v2.
You can use this software according to the terms and conditions of the Mulan PSL v2.
You may obtain a copy of Mulan PSL v2 at:
http://license.coscl.org.cn/MulanPSL2
THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS,
WITHOUT WARRANTIES OF ANY KIND,
EITHER EXPRESS OR IMPLIED,
INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
See the Mulan PSL v2 for more details.
*/
Expand Down Expand Up @@ -162,8 +164,8 @@ func genScaleUpOBZonesFlow(_ *OBClusterManager) *tasktypes.TaskFlow {
OperationContext: &tasktypes.OperationContext{
Name: fScaleUpOBZones,
Tasks: []tasktypes.TaskName{
tAdjustParameters,
tScaleUpOBZones,
tWaitOBZoneRunning,
},
TargetStatus: clusterstatus.Running,
},
Expand Down
95 changes: 95 additions & 0 deletions internal/resource/obcluster/obcluster_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -1116,6 +1116,101 @@ func AnnotateOBCluster(m *OBClusterManager) tasktypes.TaskError {
}
}
}
return nil
}

func AdjustParameters(m *OBClusterManager) tasktypes.TaskError {
conn, err := m.getOceanbaseOperationManager()
if err != nil {
return errors.Wrap(err, "Get operation manager")
}
gvservers, err := conn.ListGVServers(m.Ctx)
if err != nil {
return errors.Wrap(err, "List gv servers")
}
zones, err := m.listOBZones()
if err != nil {
return errors.Wrap(err, "List obzones")
}
if len(zones.Items) == 0 {
return errors.New("No obzone found")
}

oldResource := zones.Items[0].Spec.OBServerTemplate.Resource

var maxAssignedCpu int64
var maxAssignedMem int64
var memoryLimitPercent float64
for _, gvserver := range gvservers {
if gvserver.MemAssigned > maxAssignedMem {
if gvserver.MemoryLimit > oldResource.Memory.Value() {
memoryLimitPercent = 0.9
} else {
memoryLimitPercent = float64(gvserver.MemoryLimit) / oldResource.Memory.AsApproximateFloat64()
}
maxAssignedMem = gvserver.MemAssigned
}
if gvserver.CPUAssigned > maxAssignedCpu {
maxAssignedCpu = gvserver.CPUAssigned
}
}
newResource := m.OBCluster.Spec.OBServerTemplate.Resource
specMem := newResource.Memory.AsApproximateFloat64()
specMemoryLimit := int64(specMem * memoryLimitPercent)

targetMemoryLimit := max(specMemoryLimit, maxAssignedMem)
m.Logger.V(oceanbaseconst.LogLevelDebug).
Info("Adjust memory limit",
"maxAssignedMem", maxAssignedMem,
"specMem", specMem,
"targetMemoryLimit", targetMemoryLimit,
"percent", memoryLimitPercent,
)

copiedCluster := m.OBCluster.DeepCopy()

foundMemoryLimit := false
if newResource.Memory.Cmp(oldResource.Memory) != 0 {
for i, p := range copiedCluster.Spec.Parameters {
if p.Name == "memory_limit" {
copiedCluster.Spec.Parameters[i].Value = fmt.Sprintf("%dM", targetMemoryLimit>>20)
foundMemoryLimit = true
break
}
}
if !foundMemoryLimit {
copiedCluster.Spec.Parameters = append(copiedCluster.Spec.Parameters, apitypes.Parameter{
Name: "memory_limit",
Value: fmt.Sprintf("%dM", targetMemoryLimit>>20),
})
}
}

if oldResource.Cpu.Cmp(newResource.Cpu) != 0 {
targetCpuCount := "16"
if newResource.Cpu.Value() > 16 {
targetCpuCount = newResource.Cpu.String()
}
foundCpuCount := false
for i, p := range copiedCluster.Spec.Parameters {
if p.Name == "cpu_count" {
copiedCluster.Spec.Parameters[i].Value = targetCpuCount
foundCpuCount = true
break
}
}
if !foundCpuCount {
copiedCluster.Spec.Parameters = append(copiedCluster.Spec.Parameters, apitypes.Parameter{
Name: "cpu_count",
Value: targetCpuCount,
})
}
}

err = m.Client.Patch(m.Ctx, copiedCluster, client.MergeFrom(m.OBCluster))
if err != nil {
m.Logger.V(oceanbaseconst.LogLevelDebug).Info("Patch obcluster", "obcluster", m.OBCluster.Name)
return errors.Wrap(err, "Patch obcluster")
}
return nil
}
1 change: 1 addition & 0 deletions internal/resource/obcluster/obcluster_task_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit 1c01490

Please sign in to comment.