Skip to content

Commit

Permalink
[REFACTOR] rule group set controller
Browse files Browse the repository at this point in the history
Signed-off-by: Nicolas Takashi <nicolas.takashi@coralogix.com>
  • Loading branch information
nicolastakashi committed Jul 26, 2024
1 parent 8feeed6 commit d4d0f36
Show file tree
Hide file tree
Showing 2 changed files with 278 additions and 376 deletions.
242 changes: 78 additions & 164 deletions controllers/alphacontrollers/recordingrulegroupset_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@ package alphacontrollers

import (
"context"
"fmt"

"k8s.io/apimachinery/pkg/runtime"
"k8s.io/utils/ptr"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/log"
Expand All @@ -28,7 +30,6 @@ import (
"github.com/coralogix/coralogix-operator/controllers/clientset"
rrg "github.com/coralogix/coralogix-operator/controllers/clientset/grpc/recording-rules-groups/v2"

"github.com/golang/protobuf/jsonpb"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"k8s.io/apimachinery/pkg/api/errors"
Expand All @@ -48,25 +49,16 @@ type RecordingRuleGroupSetReconciler struct {
//+kubebuilder:rbac:groups=coralogix.com,resources=recordingrulegroupsets/status,verbs=get;update;patch
//+kubebuilder:rbac:groups=coralogix.com,resources=recordingrulegroupsets/finalizers,verbs=update

// Reconcile is part of the main kubernetes reconciliation loop which aims to
// move the current state of the cluster closer to the desired state.
// TODO(user): Modify the Reconcile function to compare the state specified by
// the RecordingRuleGroupSet object against the actual cluster state, and then
// perform operations to make the cluster state reflect the state specified by
// the user.
//
// For more details, check Reconcile and its Result here:
// - https://pkg.go.dev/sigs.k8s.io/controller-runtime@v0.14.4/pkg/reconcile
func (r *RecordingRuleGroupSetReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
log := log.FromContext(ctx)
jsm := &jsonpb.Marshaler{
EmitDefaults: true,
}
rRGClient := r.CoralogixClientSet.RecordingRuleGroups()
log := log.FromContext(ctx).WithValues(
"recordingRuleGroupSet", req.NamespacedName.Name,
"namespace", req.NamespacedName.Namespace,
)

// recordingRuleClient := r.CoralogixClientSet.RecordingRuleGroups()

//Get ruleGroupSetRD
ruleGroupSetCRD := &coralogixv1alpha1.RecordingRuleGroupSet{}
if err := r.Client.Get(ctx, req.NamespacedName, ruleGroupSetCRD); err != nil {
recordingRuleGroupSet := &coralogixv1alpha1.RecordingRuleGroupSet{}
if err := r.Client.Get(ctx, req.NamespacedName, recordingRuleGroupSet); err != nil {
if errors.IsNotFound(err) {
// Request object not found, could have been deleted after reconcile request.
// Owned objects are automatically garbage collected. For additional cleanup logic use finalizers.
Expand All @@ -77,179 +69,101 @@ func (r *RecordingRuleGroupSetReconciler) Reconcile(ctx context.Context, req ctr
return ctrl.Result{RequeueAfter: defaultErrRequeuePeriod}, err
}

// examine DeletionTimestamp to determine if object is under deletion
if ruleGroupSetCRD.ObjectMeta.DeletionTimestamp.IsZero() {
// The object is not being deleted, so if it does not have our finalizer,
// then lets add the finalizer and update the object. This is equivalent
// registering our finalizer.
if !controllerutil.ContainsFinalizer(ruleGroupSetCRD, recordingRuleGroupSetFinalizerName) {
controllerutil.AddFinalizer(ruleGroupSetCRD, recordingRuleGroupSetFinalizerName)
if err := r.Update(ctx, ruleGroupSetCRD); err != nil {
log.Error(err, "Received an error while Updating a RecordingRuleGroupSet", "recordingRuleGroup Name", ruleGroupSetCRD.Name)
return ctrl.Result{}, err
}
if ptr.Deref(recordingRuleGroupSet.Status.ID, "") == "" {
if err := r.create(ctx, recordingRuleGroupSet); err != nil {
log.Error(err, "Failed to create RecordingRuleGroupSet", "error", err)
return ctrl.Result{RequeueAfter: defaultErrRequeuePeriod}, err
}
} else {
// The object is being deleted
if controllerutil.ContainsFinalizer(ruleGroupSetCRD, recordingRuleGroupSetFinalizerName) {
// our finalizer is present, so lets handle any external dependency
if ruleGroupSetCRD.Status.ID == nil {
controllerutil.RemoveFinalizer(ruleGroupSetCRD, recordingRuleGroupSetFinalizerName)
err := r.Update(ctx, ruleGroupSetCRD)
log.Error(err, "Received an error while Updating a RecordingRuleGroupSet", "recordingRuleGroup Name", ruleGroupSetCRD.Name)
return ctrl.Result{}, err
}

id := *ruleGroupSetCRD.Status.ID
deleteRRGReq := &rrg.DeleteRuleGroupSet{Id: id}
log.V(1).Info("Deleting RecordingRuleGroupSet", "recordingRuleGroup ID", id)
if _, err := rRGClient.DeleteRecordingRuleGroupSet(ctx, deleteRRGReq); err != nil {
// if fail to delete the external dependency here, return with error
// so that it can be retried unless it is deleted manually.
log.Error(err, "Received an error while Deleting a RecordingRuleGroupSet", "recordingRuleGroup ID", id)
if status.Code(err) == codes.NotFound {
controllerutil.RemoveFinalizer(ruleGroupSetCRD, recordingRuleGroupSetFinalizerName)
err := r.Update(ctx, ruleGroupSetCRD)
return ctrl.Result{}, err
}
return ctrl.Result{}, err
}
return ctrl.Result{}, nil
}

log.V(1).Info("RecordingRuleGroupSet was deleted", "RecordingRuleGroupSet ID", id)
// remove our finalizer from the list and update it.
controllerutil.RemoveFinalizer(ruleGroupSetCRD, recordingRuleGroupSetFinalizerName)
if err := r.Update(ctx, ruleGroupSetCRD); err != nil {
return ctrl.Result{}, err
}
if !recordingRuleGroupSet.ObjectMeta.DeletionTimestamp.IsZero() {
if err := r.delete(ctx, recordingRuleGroupSet); err != nil {
log.Error(err, "Failed to delete RecordingRuleGroupSet", "error", err)
return ctrl.Result{RequeueAfter: defaultErrRequeuePeriod}, err
}

// Stop reconciliation as the item is being deleted
return ctrl.Result{}, nil
}

var notFount bool
var err error
var actualState coralogixv1alpha1.RecordingRuleGroupSetStatus
if id := ruleGroupSetCRD.Status.ID; id == nil {
log.V(1).Info("RecordingRuleGroupSet wasn't created in Coralogix backend")
notFount = true
} else if getRuleGroupSetResp, err := rRGClient.GetRecordingRuleGroupSet(ctx, &rrg.FetchRuleGroupSet{Id: *id}); status.Code(err) == codes.NotFound {
log.V(1).Info("RecordingRuleGroupSet doesn't exist in Coralogix backend")
notFount = true
} else if err == nil {
actualState = flattenRecordingRuleGroupSet(getRuleGroupSetResp)
if err := r.update(ctx, recordingRuleGroupSet); err != nil {
log.Error(err, "Failed to update RecordingRuleGroupSet", "error", err)
return ctrl.Result{RequeueAfter: defaultErrRequeuePeriod}, err
}

if notFount {
groups := ruleGroupSetCRD.Spec.ExtractRecordingRuleGroups()
createRuleGroupReq := &rrg.CreateRuleGroupSet{Groups: groups}
jstr, _ := jsm.MarshalToString(createRuleGroupReq)
log.V(1).Info("Creating RecordingRuleGroupSet", "RecordingRuleGroupSet", jstr)
if createRRGResp, err := rRGClient.CreateRecordingRuleGroupSet(ctx, createRuleGroupReq); err == nil {
jstr, _ := jsm.MarshalToString(createRRGResp)
log.V(1).Info("RecordingRuleGroupSet was created", "RecordingRuleGroupSet", jstr)

//To avoid a situation of the operator falling between the creation of the ruleGroup in coralogix and being saved in the cluster (something that would cause it to be created again and again), its id will be saved ASAP.
id := createRRGResp.Id
ruleGroupSetCRD.Status = coralogixv1alpha1.RecordingRuleGroupSetStatus{ID: &id}
if err := r.Status().Update(ctx, ruleGroupSetCRD); err != nil {
log.Error(err, "Error on updating RecordingRuleGroupSet status", "Name", ruleGroupSetCRD.Name, "Namespace", ruleGroupSetCRD.Namespace)
return ctrl.Result{RequeueAfter: defaultErrRequeuePeriod}, err
}
return ctrl.Result{}, nil
}

getRuleGroupReq := &rrg.FetchRuleGroupSet{Id: createRRGResp.Id}
var getRRGResp *rrg.OutRuleGroupSet
if getRRGResp, err = rRGClient.GetRecordingRuleGroupSet(ctx, getRuleGroupReq); err != nil || ruleGroupSetCRD == nil {
log.Error(err, "Received an error while getting a RecordingRuleGroupSet", "RecordingRuleGroupSet", createRuleGroupReq)
return ctrl.Result{RequeueAfter: defaultErrRequeuePeriod}, err
}
ruleGroupSetCRD.Status = flattenRecordingRuleGroupSet(getRRGResp)
if err := r.Status().Update(ctx, ruleGroupSetCRD); err != nil {
log.V(1).Error(err, "updating crd")
}
return ctrl.Result{RequeueAfter: defaultRequeuePeriod}, nil
} else {
log.Error(err, "Received an error while creating a RecordingRuleGroupSet", "recordingRuleGroupSet", createRuleGroupReq)
return ctrl.Result{RequeueAfter: defaultErrRequeuePeriod}, err
}
} else if err != nil {
log.Error(err, "Received an error while reading a RecordingRuleGroupSet", "recordingRuleGroupSet ID", *ruleGroupSetCRD.Status.ID)
return ctrl.Result{RequeueAfter: defaultErrRequeuePeriod}, err
func (r *RecordingRuleGroupSetReconciler) create(ctx context.Context, recordingRuleGroupSet *coralogixv1alpha1.RecordingRuleGroupSet) error {
response, err := r.CoralogixClientSet.
RecordingRuleGroups().
CreateRecordingRuleGroupSet(ctx, &rrg.CreateRuleGroupSet{
Groups: recordingRuleGroupSet.Spec.ExtractRecordingRuleGroups(),
})

if err != nil {
return fmt.Errorf("failed to create recording rule groupSet: %w", err)
}

if equal, diff := ruleGroupSetCRD.Spec.DeepEqual(actualState); !equal {
log.V(1).Info("Find diffs between spec and the actual state", "Diff", diff)
id := *ruleGroupSetCRD.Status.ID
groups := ruleGroupSetCRD.Spec.ExtractRecordingRuleGroups()
updateRRGReq := &rrg.UpdateRuleGroupSet{Id: id, Groups: groups}
if updateRRGResp, err := rRGClient.UpdateRecordingRuleGroupSet(ctx, updateRRGReq); err != nil {
log.Error(err, "Received an error while updating a RecordingRuleGroupSet", "recordingRuleGroup", updateRRGReq)
return ctrl.Result{RequeueAfter: defaultErrRequeuePeriod}, err
} else {
jstr, _ := jsm.MarshalToString(updateRRGResp)
log.V(1).Info("RecordingRuleGroupSet was updated on backend", "recordingRuleGroup", jstr)
var getRuleGroupResp *rrg.OutRuleGroupSet
if getRuleGroupResp, err = rRGClient.GetRecordingRuleGroupSet(ctx, &rrg.FetchRuleGroupSet{Id: *ruleGroupSetCRD.Status.ID}); err != nil {
log.Error(err, "Received an error while updating a RecordingRuleGroupSet", "recordingRuleGroup", updateRRGReq)
return ctrl.Result{RequeueAfter: defaultErrRequeuePeriod}, err
}
recordingRuleGroupSet.Status.ID = ptr.To(response.Id)

r.Client.Get(ctx, req.NamespacedName, ruleGroupSetCRD)
ruleGroupSetCRD.Status = flattenRecordingRuleGroupSet(getRuleGroupResp)
if err := r.Status().Update(ctx, ruleGroupSetCRD); err != nil {
log.V(1).Error(err, "Error on updating RuleGroupSet crd")
return ctrl.Result{RequeueAfter: defaultErrRequeuePeriod}, err
}
}
if err := r.Status().Update(ctx, recordingRuleGroupSet); err != nil {
return fmt.Errorf("failed to update recording rule groupSet status: %w", err)
}

if !controllerutil.ContainsFinalizer(recordingRuleGroupSet, recordingRuleGroupSetFinalizerName) {
controllerutil.AddFinalizer(recordingRuleGroupSet, recordingRuleGroupSetFinalizerName)
}

return ctrl.Result{RequeueAfter: defaultRequeuePeriod}, nil
if err := r.Client.Update(ctx, recordingRuleGroupSet); err != nil {
return fmt.Errorf("failed to update recording rule groupSet: %w", err)
}

return nil
}

func flattenRecordingRuleGroupSet(set *rrg.OutRuleGroupSet) coralogixv1alpha1.RecordingRuleGroupSetStatus {
id := new(string)
*id = set.Id
func (r *RecordingRuleGroupSetReconciler) update(ctx context.Context, recordingRuleGroupSet *coralogixv1alpha1.RecordingRuleGroupSet) error {
remoteRecordingRule, err := r.CoralogixClientSet.RecordingRuleGroups().GetRecordingRuleGroupSet(ctx, &rrg.FetchRuleGroupSet{
Id: *recordingRuleGroupSet.Status.ID,
})

groups := make([]coralogixv1alpha1.RecordingRuleGroup, 0, len(set.Groups))
for _, ruleGroup := range set.Groups {
rg := flattenRecordingRuleGroup(ruleGroup)
groups = append(groups, rg)
if err != nil {
if status.Code(err) == codes.NotFound {
recordingRuleGroupSet.Status.ID = nil
if err := r.Status().Update(ctx, recordingRuleGroupSet); err != nil {
return fmt.Errorf("failed to update recording rule groupSet status: %w", err)
}
return err
}
return fmt.Errorf("failed to get recording rule groupSet: %w", err)
}

return coralogixv1alpha1.RecordingRuleGroupSetStatus{
ID: id,
Groups: groups,
if _, err := r.CoralogixClientSet.
RecordingRuleGroups().
UpdateRecordingRuleGroupSet(ctx, &rrg.UpdateRuleGroupSet{
Id: remoteRecordingRule.Id,
Groups: recordingRuleGroupSet.Spec.ExtractRecordingRuleGroups(),
}); err != nil {
return fmt.Errorf("failed to update recording rule groupSet: %w", err)
}

return nil
}

func flattenRecordingRuleGroup(rRG *rrg.OutRuleGroup) coralogixv1alpha1.RecordingRuleGroup {
interval := int32(*rRG.Interval)
limit := int64(*rRG.Limit)
rules := flattenRecordingRules(rRG.Rules)
func (r *RecordingRuleGroupSetReconciler) delete(ctx context.Context, recordingRuleGroupSet *coralogixv1alpha1.RecordingRuleGroupSet) error {
_, err := r.CoralogixClientSet.RecordingRuleGroups().DeleteRecordingRuleGroupSet(ctx, &rrg.DeleteRuleGroupSet{
Id: *recordingRuleGroupSet.Status.ID,
})

return coralogixv1alpha1.RecordingRuleGroup{
Name: rRG.Name,
IntervalSeconds: interval,
Limit: limit,
Rules: rules,
if err != nil && status.Code(err) != codes.NotFound {
return fmt.Errorf("failed to delete recording rule groupSet: %w", err)
}
}

func flattenRecordingRules(rules []*rrg.OutRule) []coralogixv1alpha1.RecordingRule {
result := make([]coralogixv1alpha1.RecordingRule, 0, len(rules))
for _, r := range rules {
rule := flattenRecordingRule(r)
result = append(result, rule)
controllerutil.RemoveFinalizer(recordingRuleGroupSet, alertFinalizerName)
if err = r.Update(ctx, recordingRuleGroupSet); err != nil {
return fmt.Errorf("failed to remove finalizer from recording rule groupSet: %w", err)
}
return result
}

func flattenRecordingRule(rule *rrg.OutRule) coralogixv1alpha1.RecordingRule {
return coralogixv1alpha1.RecordingRule{
Record: rule.Record,
Expr: rule.Expr,
Labels: rule.Labels,
}
return nil
}

func (r *RecordingRuleGroupSetReconciler) SetupWithManager(mgr ctrl.Manager) error {
Expand Down
Loading

0 comments on commit d4d0f36

Please sign in to comment.