Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add event and status.phase to control resource create check #312

Merged
merged 4 commits into from
Apr 17, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 10 additions & 15 deletions shardingsphere-operator/api/v1alpha1/shardingsphere_chaos_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,6 @@ type ShardingSphereChaos struct {
type ShardingSphereChaosSpec struct {
InjectJob JobSpec `json:"injectJob,omitempty"`
EmbedChaos `json:",inline"`
//todo
//Verify batchV1Beta1.JobTemplateSpec `json:"Verify,omitempty"`
}

// JobSpec Specifies the config of job to create
Expand Down Expand Up @@ -77,24 +75,21 @@ const (
UnKnown ChaosCondition = "UnKnown"
moomman marked this conversation as resolved.
Show resolved Hide resolved
)

// Jobschedule Show current job progress
type Jobschedule string

const (
JobCreating Jobschedule = "JobCreating"
JobFailed Jobschedule = "JobFailed"
JobFinish Jobschedule = "JobFinish"
)

// ShardingSphereChaosStatus defines the actual state of ShardingSphereChaos
type ShardingSphereChaosStatus struct {
ChaosCondition ChaosCondition `json:"chaosCondition"`
//todo
//InjectStatus Jobschedule `json:"InjectStatus"`
//todo
//VerifyStatus Jobschedule `json:"VerifyStatus"`
Phase Phase `json:"phase"`
}

type Phase string

var (
PhaseBeforeExperiment Phase = "before experiment"
moomman marked this conversation as resolved.
Show resolved Hide resolved
PhaseAfterExperiment Phase = "after experiment"
PhaseInChaos Phase = "inject chaos"
PhaseRecoveredChaos Phase = "recover chaos"
)

// PodChaosAction Specify the action type of pod Chaos
type PodChaosAction string

Expand Down
2 changes: 1 addition & 1 deletion shardingsphere-operator/build/tools/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ ENV ZOOKEEPER_DOWNLOAD_URL https://dlcdn.apache.org/zookeeper/zookeeper-3.7.1/ap
ENV ZOOKEEPER_DIR /app/zookeeper
WORKDIR /app
RUN mkdir -p "/app/start" && chmod -R 777 /app/start
CMD ["tail -f /dev/null"]
ENTRYPOINT ["sh","-c"]
moomman marked this conversation as resolved.
Show resolved Hide resolved
CMD ["tail -f /dev/null"]
RUN set -eux; \
\
apt-get update; \
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ var featureGatesHandlers = map[string]FeatureGateHandler{
Chaos: chaos.NewChaos(mgr.GetClient()),
Job: job.NewJob(mgr.GetClient()),
ConfigMap: configmap.NewConfigMap(mgr.GetClient()),
Events: mgr.GetEventRecorderFor("shardingsphere-chaos-controller"),
}).SetupWithManager(mgr); err != nil {
logger.Error(err, "unable to create controller", "controller", "ShardingSphereChaos")
return err
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,11 @@ package controllers

import (
"context"
"fmt"
"time"

"k8s.io/client-go/tools/record"

"github.com/apache/shardingsphere-on-cloud/shardingsphere-operator/pkg/kubernetes/configmap"
v1 "k8s.io/api/core/v1"

Expand All @@ -40,7 +43,9 @@ import (

const (
ShardingSphereChaosControllerName = "shardingsphere-chaos-controller"
ssChaosDefaultEnqueueTime = 5 * time.Second
ssChaosDefaultEnqueueTime = 10 * time.Second
defaultCreatedMessage = " is created successfully"
moomman marked this conversation as resolved.
Show resolved Hide resolved
defaultUpdateMessage = "new changes updated"
)

// ShardingSphereChaosReconciler is a controller for the ShardingSphereChaos
Expand All @@ -51,44 +56,75 @@ type ShardingSphereChaosReconciler struct { //
Chaos chaos.Chaos
Job job.Job
ConfigMap configmap.ConfigMap
Events record.EventRecorder
}

// Reconcile handles main function of this controller
func (r *ShardingSphereChaosReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
logger := r.Log.WithValues(ShardingSphereChaosControllerName, req.NamespacedName)

var ssChaos sschaosv1alpha1.ShardingSphereChaos
if err := r.Get(ctx, req.NamespacedName, &ssChaos); err != nil {
logger.Error(err, "unable to fetch ShardingSphereChaos source")
return ctrl.Result{}, client.IgnoreNotFound(err)
ssChaos, err := r.getRuntimeSSChaos(ctx, req.NamespacedName)
if err != nil {
return ctrl.Result{}, err
}

if !ssChaos.ObjectMeta.DeletionTimestamp.IsZero() {
return ctrl.Result{}, nil
}

logger.Info("start reconcile chaos")
if err := r.reconcileChaos(ctx, &ssChaos); err != nil {
if err := r.reconcileChaos(ctx, ssChaos); err != nil {
if err == reconcile.ErrChangedSpec {
errHandle := r.handleChaosChange(ctx, req.NamespacedName)
return ctrl.Result{}, errHandle
}
logger.Error(err, " unable to reconcile chaos")
r.Events.Event(ssChaos, "Warning", "chaos err", err.Error())
return ctrl.Result{}, err
}
if err := r.reconcileConfigMap(ctx, &ssChaos); err != nil {
if err := r.reconcileConfigMap(ctx, ssChaos); err != nil {
logger.Error(err, "unable to reconcile configmap")
r.Events.Event(ssChaos, "Warning", "configmap err", err.Error())
return ctrl.Result{}, err
}
if err := r.reconcileJob(ctx, &ssChaos); err != nil {
if err := r.reconcileJob(ctx, ssChaos); err != nil {
logger.Error(err, "unable to reconcile job")
r.Events.Event(ssChaos, "Warning", "job err", err.Error())
return ctrl.Result{}, err
}
if err := r.reconcileStatus(ctx, &ssChaos); err != nil {
if err := r.reconcileStatus(ctx, req.NamespacedName); err != nil {
r.Events.Event(ssChaos, "Warning", "update status error", err.Error())
logger.Error(err, "failed to update status")
}

return ctrl.Result{RequeueAfter: ssChaosDefaultEnqueueTime}, nil
}

func (r *ShardingSphereChaosReconciler) handleChaosChange(ctx context.Context, name types.NamespacedName) error {

ssChaos, err := r.getRuntimeSSChaos(ctx, name)
if err != nil {
return err
}
if ssChaos.Status.Phase != sschaosv1alpha1.PhaseBeforeExperiment {
ssChaos.Status.Phase = sschaosv1alpha1.PhaseAfterExperiment
if err := r.Status().Update(ctx, ssChaos); err != nil {
return err
}
}
return nil
}

func (r *ShardingSphereChaosReconciler) getRuntimeSSChaos(ctx context.Context, name types.NamespacedName) (*sschaosv1alpha1.ShardingSphereChaos, error) {
var rt = &sschaosv1alpha1.ShardingSphereChaos{}
err := r.Get(ctx, name, rt)
return rt, client.IgnoreNotFound(err)
}

func (r *ShardingSphereChaosReconciler) reconcileChaos(ctx context.Context, ssChao *sschaosv1alpha1.ShardingSphereChaos) error {
logger := r.Log.WithValues("reconcile chaos", ssChao.Name)
if ssChao.Status.Phase == sschaosv1alpha1.PhaseBeforeExperiment || ssChao.Status.Phase == "" {
return nil
}
namespaceName := types.NamespacedName{Namespace: ssChao.Namespace, Name: ssChao.Name}
if ssChao.Spec.EmbedChaos.PodChaos != nil {
chao, isExist, err := r.getPodChaosByNamespacedName(ctx, namespaceName)
Expand Down Expand Up @@ -133,49 +169,92 @@ func (r *ShardingSphereChaosReconciler) reconcileConfigMap(ctx context.Context,
func (r *ShardingSphereChaosReconciler) reconcileJob(ctx context.Context, ssChaos *sschaosv1alpha1.ShardingSphereChaos) error {
logger := r.Log.WithValues("reconcile job", ssChaos.Name)
namespaceName := types.NamespacedName{Namespace: ssChaos.Namespace, Name: ssChaos.Name}

rJob, isExist, err := r.getJobByNamespacedName(ctx, namespaceName)
if err != nil {
logger.Error(err, "get job err")
return err
}
//todo:update InjectRequirement by chaos status
var nowInjectRequirement reconcile.InjectRequirement
if ssChaos.Status.Phase == "" || ssChaos.Status.Phase == sschaosv1alpha1.PhaseBeforeExperiment || ssChaos.Status.Phase == sschaosv1alpha1.PhaseAfterExperiment {
nowInjectRequirement = reconcile.Experimental
}
if ssChaos.Status.Phase == sschaosv1alpha1.PhaseInChaos || ssChaos.Status.Phase == sschaosv1alpha1.PhaseRecoveredChaos {
nowInjectRequirement = reconcile.Pressure
}
if isExist {
return r.updateJob(ctx, reconcile.Experimental, ssChaos, rJob)
return r.updateJob(ctx, nowInjectRequirement, ssChaos, rJob)
}

return r.createJob(ctx, reconcile.Experimental, ssChaos)
return r.createJob(ctx, nowInjectRequirement, ssChaos)
}

func (r *ShardingSphereChaosReconciler) reconcileStatus(ctx context.Context, ssChaos *sschaosv1alpha1.ShardingSphereChaos) error {
var (
chaoCondition sschaosv1alpha1.ChaosCondition
namespacedName = types.NamespacedName{
Namespace: ssChaos.Namespace,
Name: ssChaos.Name,
func (r *ShardingSphereChaosReconciler) reconcileStatus(ctx context.Context, namespacedName types.NamespacedName) error {
ssChaos, err := r.getRuntimeSSChaos(ctx, namespacedName)
if err != nil {
return err
}
if ssChaos.Status.Phase == "" {
ssChaos.Status.Phase = sschaosv1alpha1.PhaseBeforeExperiment
}
rJob := &batchV1.Job{}
if err := r.Get(ctx, namespacedName, rJob); err != nil {
return err
}

if ssChaos.Status.Phase == sschaosv1alpha1.PhaseBeforeExperiment && rJob.Status.Succeeded == 1 {
ssChaos.Status.Phase = sschaosv1alpha1.PhaseAfterExperiment
}

if err := r.updatePhaseStart(ctx, ssChaos); err != nil {
return err
}

rt, err := r.getRuntimeSSChaos(ctx, namespacedName)
if err != nil {
return err
}
rt.Status = ssChaos.Status
return r.Status().Update(ctx, rt)
}

func (r *ShardingSphereChaosReconciler) updatePhaseStart(ctx context.Context, ssChaos *sschaosv1alpha1.ShardingSphereChaos) error {
if ssChaos.Status.Phase != sschaosv1alpha1.PhaseBeforeExperiment {
if err := r.updateChaosCondition(ctx, ssChaos); err != nil {
return err
}
)

if ssChaos.Status.ChaosCondition == sschaosv1alpha1.AllInjected && ssChaos.Status.Phase == sschaosv1alpha1.PhaseAfterExperiment {
ssChaos.Status.Phase = sschaosv1alpha1.PhaseInChaos
}

if ssChaos.Status.ChaosCondition == sschaosv1alpha1.AllRecovered && ssChaos.Status.Phase == sschaosv1alpha1.PhaseInChaos {
ssChaos.Status.Phase = sschaosv1alpha1.PhaseRecoveredChaos
}
}

return nil
}

func (r *ShardingSphereChaosReconciler) updateChaosCondition(ctx context.Context, ssChaos *sschaosv1alpha1.ShardingSphereChaos) error {
namespacedName := types.NamespacedName{
Namespace: ssChaos.Namespace,
Name: ssChaos.Name,
}
if ssChaos.Spec.EmbedChaos.PodChaos != nil {
chao, err := r.Chaos.GetPodChaosByNamespacedName(ctx, namespacedName)
if err != nil {
return err
}
chaoCondition = r.Chaos.ConvertChaosStatus(ctx, ssChaos, chao)
ssChaos.Status.ChaosCondition = r.Chaos.ConvertChaosStatus(ctx, ssChaos, chao)
} else if ssChaos.Spec.EmbedChaos.NetworkChaos != nil {
chao, err := r.Chaos.GetNetworkChaosByNamespacedName(ctx, namespacedName)
if err != nil {
return err
}
chaoCondition = r.Chaos.ConvertChaosStatus(ctx, ssChaos, chao)
ssChaos.Status.ChaosCondition = r.Chaos.ConvertChaosStatus(ctx, ssChaos, chao)
}

var rt sschaosv1alpha1.ShardingSphereChaos
if err := r.Get(ctx, namespacedName, &rt); err != nil {
return err
}
ssChaos.Status.ChaosCondition = chaoCondition
rt.Status = ssChaos.Status
return r.Status().Update(ctx, &rt)
return nil
}

func (r *ShardingSphereChaosReconciler) getNetworkChaosByNamespacedName(ctx context.Context, namespacedName types.NamespacedName) (reconcile.NetworkChaos, bool, error) {
Expand Down Expand Up @@ -277,31 +356,60 @@ func (r *ShardingSphereChaosReconciler) createJob(ctx context.Context, requireme
if err == nil && apierrors.IsAlreadyExists(err) {
return nil
}

return err
}

func (r *ShardingSphereChaosReconciler) updatePodChaos(ctx context.Context, chao *sschaosv1alpha1.ShardingSphereChaos, podChaos reconcile.PodChaos) error {
return r.Chaos.UpdatePodChaos(ctx, chao, podChaos)
err := r.Chaos.UpdatePodChaos(ctx, chao, podChaos)
if err != nil {
if err == reconcile.ErrNotChanged {
return nil
}
return err
}
r.Events.Event(chao, "Normal", "applied", fmt.Sprintf("podChaos %s", defaultUpdateMessage))
return reconcile.ErrChangedSpec
}

func (r *ShardingSphereChaosReconciler) CreatePodChaos(ctx context.Context, chao *sschaosv1alpha1.ShardingSphereChaos) error {
podChaos, err := r.Chaos.NewPodChaos(chao)
if err != nil {
return err
}
return r.Chaos.CreatePodChaos(ctx, podChaos)
err = r.Chaos.CreatePodChaos(ctx, podChaos)
if err != nil {
return err
}
fmt.Println("phase", chao.Status.Phase)
r.Events.Event(chao, "Normal", "created", fmt.Sprintf("podChaos %s", defaultCreatedMessage))
return nil
}

func (r *ShardingSphereChaosReconciler) updateNetWorkChaos(ctx context.Context, chao *sschaosv1alpha1.ShardingSphereChaos, netWorkChaos reconcile.NetworkChaos) error {
return r.Chaos.UpdateNetworkChaos(ctx, chao, netWorkChaos)
err := r.Chaos.UpdateNetworkChaos(ctx, chao, netWorkChaos)
if err != nil {
if err == reconcile.ErrNotChanged {
return nil
}
return err
}
r.Events.Event(chao, "Normal", "applied", fmt.Sprintf("networkChaos %s", defaultUpdateMessage))
return reconcile.ErrChangedSpec
}

func (r *ShardingSphereChaosReconciler) CreateNetworkChaos(ctx context.Context, chao *sschaosv1alpha1.ShardingSphereChaos) error {
networkChaos, err := r.Chaos.NewNetworkPodChaos(chao)
if err != nil {
return err
}
return r.Chaos.CreateNetworkChaos(ctx, networkChaos)
err = r.Chaos.CreateNetworkChaos(ctx, networkChaos)
if err != nil {
return err
}

r.Events.Event(chao, "Normal", "created", fmt.Sprintf("networkChaos %s", defaultCreatedMessage))
return nil
}

// SetupWithManager sets up the controller with the Manager.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,9 @@ const (
)

var (
ErrConvert = errors.New("can not convert chaos interface to specify struct")
ErrConvert = errors.New("can not convert chaos interface to specify struct")
ErrNotChanged = errors.New("object not changed")
ErrChangedSpec = errors.New("change spec")
)

type chaosMeshHandler struct {
Expand Down Expand Up @@ -302,7 +304,7 @@ func (c *chaosMeshHandler) UpdateNetworkChaos(ctx context.Context, ssChaos *v1al
}
isEqual := reflect.DeepEqual(reExp.Spec, reCur.Spec)
if isEqual {
return nil
return ErrNotChanged
}

if err := c.r.Create(ctx, reCur); err != nil {
Expand Down Expand Up @@ -331,7 +333,7 @@ func (c *chaosMeshHandler) UpdatePodChaos(ctx context.Context, ssChaos *v1alpha1
}
isEqual := reflect.DeepEqual(reExp.Spec, reCur.Spec)
if isEqual {
return nil
return ErrNotChanged
}

if err := c.r.Delete(ctx, reCur); err != nil {
Expand Down
Loading