Skip to content

Commit

Permalink
Add PDB to EtcdCluster spec and implement validation webhhoks
Browse files Browse the repository at this point in the history
  • Loading branch information
sircthulhu committed Mar 28, 2024
1 parent fee8e09 commit fd56efc
Show file tree
Hide file tree
Showing 5 changed files with 342 additions and 19 deletions.
41 changes: 39 additions & 2 deletions api/v1alpha1/etcdcluster_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,11 @@ limitations under the License.
package v1alpha1

import (
"math"

corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/intstr"
)

const defaultEtcdImage = "quay.io/coreos/etcd:v3.5.12"
Expand All @@ -31,8 +34,9 @@ type EtcdClusterSpec struct {
// +kubebuilder:validation:Minimum:=0
Replicas *int32 `json:"replicas,omitempty"`
// PodSpec defines the desired state of PodSpec for etcd members. If not specified, default values will be used.
PodSpec PodSpec `json:"podSpec,omitempty"`
Storage StorageSpec `json:"storage"`
PodSpec PodSpec `json:"podSpec,omitempty"`
PodDisruptionBudget EmbeddedPodDisruptionBudget `json:"podDisruptionBudget,omitempty"`
Storage StorageSpec `json:"storage"`
}

const (
Expand Down Expand Up @@ -66,6 +70,11 @@ type EtcdCluster struct {
Status EtcdClusterStatus `json:"status,omitempty"`
}

// calculateQuorumSize returns minimum quorum size for current number of replicas
func (r *EtcdCluster) calculateQuorumSize() int {
return int(math.Ceil(float64(*r.Spec.Replicas) / 2.))
}

// +kubebuilder:object:root=true

// EtcdClusterList contains a list of EtcdCluster
Expand Down Expand Up @@ -192,6 +201,34 @@ type EmbeddedPersistentVolumeClaim struct {
Status corev1.PersistentVolumeClaimStatus `json:"status,omitempty" protobuf:"bytes,3,opt,name=status"`
}

// EmbeddedPodDisruptionBudget describes PDB resource for etcd cluster members
type EmbeddedPodDisruptionBudget struct {
metav1.TypeMeta `json:",inline"`
// EmbeddedMetadata contains metadata relevant to an EmbeddedResource.
// +optional
EmbeddedObjectMetadata `json:"metadata,omitempty" protobuf:"bytes,1,opt,name=metadata"`
// If Enabled, PDB resource will be deployed for cluster members
//+optional
Enabled bool `json:"enabled,omitempty"`
// Spec defines the desired characteristics of a PDB.
// More info: https://kubernetes.io/docs/concepts/workloads/pods/disruptions/#pod-disruption-budgets
//+optional
Spec PodDisruptionBudgetSpec `json:"spec"`
}

type PodDisruptionBudgetSpec struct {
// MinAvailable describes minimum ready replicas. If both are empty, controller will implicitly
// calculate MaxUnavailable based on number of replicas
// Mutually exclusive with MaxUnavailable.
// +optional
MinAvailable intstr.IntOrString `json:"minAvailable,omitempty"`
// MinAvailable describes maximum not ready replicas. If both are empty, controller will implicitly
// calculate MaxUnavailable based on number of replicas
// Mutually exclusive with MinAvailable
// +optional
MaxUnavailable intstr.IntOrString `json:"maxUnavailable,omitempty"`
}

func init() {
SchemeBuilder.Register(&EtcdCluster{}, &EtcdClusterList{})
}
79 changes: 78 additions & 1 deletion api/v1alpha1/etcdcluster_webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,14 @@ var _ webhook.Validator = &EtcdCluster{}
// ValidateCreate implements webhook.Validator so a webhook will be registered for the type
func (r *EtcdCluster) ValidateCreate() (admission.Warnings, error) {
etcdclusterlog.Info("validate create", "name", r.Name)
return nil, nil
warnings, err := r.validatePdb()
if err != nil {
return nil, errors.NewInvalid(
schema.GroupKind{Group: GroupVersion.Group, Kind: "EtcdCluster"},
r.Name, err)
}

return warnings, nil
}

// ValidateUpdate implements webhook.Validator so a webhook will be registered for the type
Expand All @@ -91,6 +98,14 @@ func (r *EtcdCluster) ValidateUpdate(old runtime.Object) (admission.Warnings, er
)
}

pdbWarnings, pdbErr := r.validatePdb()
if pdbErr != nil {
allErrors = append(allErrors, pdbErr...)
}
if len(pdbWarnings) > 0 {
warnings = append(warnings, pdbWarnings...)
}

if len(allErrors) > 0 {
err := errors.NewInvalid(
schema.GroupKind{Group: GroupVersion.Group, Kind: "EtcdCluster"},
Expand All @@ -106,3 +121,65 @@ func (r *EtcdCluster) ValidateDelete() (admission.Warnings, error) {
etcdclusterlog.Info("validate delete", "name", r.Name)
return nil, nil
}

// validatePdb validates PDB fields
func (r *EtcdCluster) validatePdb() (admission.Warnings, field.ErrorList) {
if !r.Spec.PodDisruptionBudget.Enabled {
return nil, nil
}
pdb := r.Spec.PodDisruptionBudget
var warnings admission.Warnings
var allErrors field.ErrorList
if pdb.Spec.MinAvailable.IntValue() != 0 && pdb.Spec.MaxUnavailable.IntValue() != 0 {
allErrors = append(allErrors, field.Invalid(
field.NewPath("spec", "podDisruptionBudget", "minAvailable"),
pdb.Spec.MinAvailable.IntValue(),
"minAvailable is mutually exclusive with maxUnavailable"),
)
}
minQuorumSize := r.calculateQuorumSize()
if pdb.Spec.MinAvailable.IntValue() != 0 {
if pdb.Spec.MinAvailable.IntValue() < 0 {
allErrors = append(allErrors, field.Invalid(
field.NewPath("spec", "podDisruptionBudget", "minAvailable"),
pdb.Spec.MinAvailable.IntValue(),
"value cannot be less than zero"),
)
}
if pdb.Spec.MinAvailable.IntValue() > int(*r.Spec.Replicas) {
allErrors = append(allErrors, field.Invalid(
field.NewPath("spec", "podDisruptionBudget", "minAvailable"),
pdb.Spec.MinAvailable.IntValue(),
"value cannot be larger than number of replicas"),
)
}
if pdb.Spec.MinAvailable.IntValue() < minQuorumSize {
warnings = append(warnings, "current number of spec.podDisruptionBudget.minAvailable can lead to loss of quorum")
}
}
if pdb.Spec.MaxUnavailable.IntValue() != 0 {
if pdb.Spec.MaxUnavailable.IntValue() < 0 {
allErrors = append(allErrors, field.Invalid(
field.NewPath("spec", "podDisruptionBudget", "maxUnavailable"),
pdb.Spec.MaxUnavailable.IntValue(),
"value cannot be less than zero"),
)
}
if pdb.Spec.MaxUnavailable.IntValue() > int(*r.Spec.Replicas) {
allErrors = append(allErrors, field.Invalid(
field.NewPath("spec", "podDisruptionBudget", "maxUnavailable"),
pdb.Spec.MaxUnavailable.IntValue(),
"value cannot be larger than number of replicas"),
)
}
if int(*r.Spec.Replicas)-pdb.Spec.MaxUnavailable.IntValue() < minQuorumSize {
warnings = append(warnings, "current number of spec.podDisruptionBudget.maxUnavailable can lead to loss of quorum")
}
}

if len(allErrors) > 0 {
return nil, allErrors
}

return warnings, nil
}
125 changes: 109 additions & 16 deletions api/v1alpha1/etcdcluster_webhook_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,11 @@ package v1alpha1

import (
. "github.com/onsi/ginkgo/v2"
"github.com/onsi/gomega"
. "github.com/onsi/gomega"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/resource"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/utils/ptr"
)

Expand All @@ -31,13 +32,14 @@ var _ = Describe("EtcdCluster Webhook", func() {
It("Should fill in the default value if a required field is empty", func() {
etcdCluster := &EtcdCluster{}
etcdCluster.Default()
gomega.Expect(etcdCluster.Spec.PodSpec.Image).To(gomega.Equal(defaultEtcdImage))
gomega.Expect(etcdCluster.Spec.Replicas).To(gomega.BeNil(), "User should have an opportunity to create cluster with 0 replicas")
gomega.Expect(etcdCluster.Spec.Storage.EmptyDir).To(gomega.BeNil())
Expect(etcdCluster.Spec.PodSpec.Image).To(Equal(defaultEtcdImage))
Expect(etcdCluster.Spec.Replicas).To(BeNil(), "User should have an opportunity to create cluster with 0 replicas")
Expect(etcdCluster.Spec.Storage.EmptyDir).To(BeNil())
storage := etcdCluster.Spec.Storage.VolumeClaimTemplate.Spec.Resources.Requests.Storage()
if gomega.Expect(storage).NotTo(gomega.BeNil()) {
gomega.Expect(*storage).To(gomega.Equal(resource.MustParse("4Gi")))
if Expect(storage).NotTo(BeNil()) {
Expect(*storage).To(Equal(resource.MustParse("4Gi")))
}
Expect(etcdCluster.Spec.PodDisruptionBudget.Enabled).To(BeFalse())
})

It("Should not override fields with default values if not empty", func() {
Expand All @@ -47,6 +49,12 @@ var _ = Describe("EtcdCluster Webhook", func() {
PodSpec: PodSpec{
Image: "myregistry.local/etcd:v1.1.1",
},
PodDisruptionBudget: EmbeddedPodDisruptionBudget{
Enabled: true,
Spec: PodDisruptionBudgetSpec{
MaxUnavailable: intstr.FromInt32(int32(2)),
},
},
Storage: StorageSpec{
VolumeClaimTemplate: EmbeddedPersistentVolumeClaim{
Spec: corev1.PersistentVolumeClaimSpec{
Expand All @@ -63,12 +71,14 @@ var _ = Describe("EtcdCluster Webhook", func() {
},
}
etcdCluster.Default()
gomega.Expect(*etcdCluster.Spec.Replicas).To(gomega.Equal(int32(5)))
gomega.Expect(etcdCluster.Spec.PodSpec.Image).To(gomega.Equal("myregistry.local/etcd:v1.1.1"))
gomega.Expect(etcdCluster.Spec.Storage.EmptyDir).To(gomega.BeNil())
Expect(*etcdCluster.Spec.Replicas).To(Equal(int32(5)))
Expect(etcdCluster.Spec.PodDisruptionBudget.Enabled).To(BeTrue())
Expect(etcdCluster.Spec.PodDisruptionBudget.Spec.MaxUnavailable.IntValue()).To(Equal(2))
Expect(etcdCluster.Spec.PodSpec.Image).To(Equal("myregistry.local/etcd:v1.1.1"))
Expect(etcdCluster.Spec.Storage.EmptyDir).To(BeNil())
storage := etcdCluster.Spec.Storage.VolumeClaimTemplate.Spec.Resources.Requests.Storage()
if gomega.Expect(storage).NotTo(gomega.BeNil()) {
gomega.Expect(*storage).To(gomega.Equal(resource.MustParse("10Gi")))
if Expect(storage).NotTo(BeNil()) {
Expect(*storage).To(Equal(resource.MustParse("10Gi")))
}
})
})
Expand All @@ -81,8 +91,91 @@ var _ = Describe("EtcdCluster Webhook", func() {
},
}
w, err := etcdCluster.ValidateCreate()
gomega.Expect(err).To(gomega.Succeed())
gomega.Expect(w).To(gomega.BeEmpty())
Expect(err).To(Succeed())
Expect(w).To(BeEmpty())
})
It("Should admit enabled empty PDB", func() {
etcdCluster := &EtcdCluster{
Spec: EtcdClusterSpec{
Replicas: ptr.To(int32(1)),
PodDisruptionBudget: EmbeddedPodDisruptionBudget{Enabled: true},
},
}
w, err := etcdCluster.ValidateCreate()
Expect(err).To(Succeed())
Expect(w).To(BeEmpty())
})
It("Should reject if negative spec.podDisruptionBudget.minAvailable", func() {
etcdCluster := &EtcdCluster{
Spec: EtcdClusterSpec{
Replicas: ptr.To(int32(1)),
PodDisruptionBudget: EmbeddedPodDisruptionBudget{
Enabled: true,
Spec: PodDisruptionBudgetSpec{
MinAvailable: intstr.FromInt32(int32(-1)),
},
},
},
}
_, err := etcdCluster.ValidateCreate()
if Expect(err).To(HaveOccurred()) {
statusErr := err.(*errors.StatusError)
Expect(statusErr.ErrStatus.Message).To(ContainSubstring("value cannot be less than zero"))
}
})
It("Should reject if negative spec.podDisruptionBudget.maxUnavailable", func() {
etcdCluster := &EtcdCluster{
Spec: EtcdClusterSpec{
Replicas: ptr.To(int32(1)),
PodDisruptionBudget: EmbeddedPodDisruptionBudget{
Enabled: true,
Spec: PodDisruptionBudgetSpec{
MaxUnavailable: intstr.FromInt32(int32(-1)),
},
},
},
}
_, err := etcdCluster.ValidateCreate()
if Expect(err).To(HaveOccurred()) {
statusErr := err.(*errors.StatusError)
Expect(statusErr.ErrStatus.Message).To(ContainSubstring("maxUnavailable: Invalid value: -1: value cannot be less than zero"))
}
})
It("Should reject if min available field larger than replicas", func() {
etcdCluster := &EtcdCluster{
Spec: EtcdClusterSpec{
Replicas: ptr.To(int32(1)),
PodDisruptionBudget: EmbeddedPodDisruptionBudget{
Enabled: true,
Spec: PodDisruptionBudgetSpec{
MinAvailable: intstr.FromInt32(int32(2)),
},
},
},
}
_, err := etcdCluster.ValidateCreate()
if Expect(err).To(HaveOccurred()) {
statusErr := err.(*errors.StatusError)
Expect(statusErr.ErrStatus.Message).To(ContainSubstring("minAvailable: Invalid value: 2: value cannot be larger than number of replicas"))
}
})
It("Should reject if max unavailable field larger than replicas", func() {
etcdCluster := &EtcdCluster{
Spec: EtcdClusterSpec{
Replicas: ptr.To(int32(1)),
PodDisruptionBudget: EmbeddedPodDisruptionBudget{
Enabled: true,
Spec: PodDisruptionBudgetSpec{
MaxUnavailable: intstr.FromInt32(int32(2)),
},
},
},
}
_, err := etcdCluster.ValidateCreate()
if Expect(err).To(HaveOccurred()) {
statusErr := err.(*errors.StatusError)
Expect(statusErr.ErrStatus.Message).To(ContainSubstring("maxUnavailable: Invalid value: 2: value cannot be larger than number of replicas"))
}
})
})

Expand All @@ -101,9 +194,9 @@ var _ = Describe("EtcdCluster Webhook", func() {
},
}
_, err := etcdCluster.ValidateUpdate(oldCluster)
if gomega.Expect(err).To(gomega.HaveOccurred()) {
if Expect(err).To(HaveOccurred()) {
statusErr := err.(*errors.StatusError)
gomega.Expect(statusErr.ErrStatus.Message).To(gomega.ContainSubstring("field is immutable"))
Expect(statusErr.ErrStatus.Message).To(ContainSubstring("field is immutable"))
}
})

Expand All @@ -121,7 +214,7 @@ var _ = Describe("EtcdCluster Webhook", func() {
},
}
_, err := etcdCluster.ValidateUpdate(oldCluster)
gomega.Expect(err).To(gomega.Succeed())
Expect(err).To(Succeed())
})
})
})
Loading

0 comments on commit fd56efc

Please sign in to comment.