Skip to content

Commit

Permalink
issue-572, handling external deletion of clusters was implemented
Browse files Browse the repository at this point in the history
  • Loading branch information
Bohdan Siryk authored and Bohdan Siryk committed Sep 25, 2023
1 parent 3723b60 commit 47d9fd7
Show file tree
Hide file tree
Showing 13 changed files with 190 additions and 309 deletions.
4 changes: 4 additions & 0 deletions apis/clusters/v1beta1/cassandra_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -491,6 +491,10 @@ func (c *CassandraSpec) validateResizeSettings(nodeNumber int) error {
return nil
}

func (c *Cassandra) SetState(state string) {
c.Status.State = state
}

func init() {
SchemeBuilder.Register(&Cassandra{}, &CassandraList{})
}
60 changes: 21 additions & 39 deletions controllers/clusters/cadence_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -790,45 +790,7 @@ func (r *CadenceReconciler) newWatchStatusJob(cadence *v1beta1.Cadence) schedule
iData, err := r.API.GetCadence(cadence.Status.ID)
if err != nil {
if errors.Is(err, instaclustr.NotFound) {
activeClusters, err := r.API.ListClusters()
if err != nil {
l.Error(err, "Cannot list account active clusters")
return err
}

if !isClusterActive(cadence.Status.ID, activeClusters) {
l.Info("Cluster is not found in Instaclustr. Deleting resource.",
"cluster ID", cadence.Status.ClusterStatus.ID,
"cluster name", cadence.Spec.Name,
)

patch := cadence.NewPatch()
cadence.Annotations[models.ClusterDeletionAnnotation] = ""
cadence.Annotations[models.ResourceStateAnnotation] = models.DeletingEvent
err = r.Patch(context.TODO(), cadence, patch)
if err != nil {
l.Error(err, "Cannot patch Cadence cluster resource",
"cluster ID", cadence.Status.ID,
"cluster name", cadence.Spec.Name,
"resource name", cadence.Name,
)

return err
}

err = r.Delete(context.TODO(), cadence)
if err != nil {
l.Error(err, "Cannot delete Cadence cluster resource",
"cluster ID", cadence.Status.ID,
"cluster name", cadence.Spec.Name,
"resource name", cadence.Name,
)

return err
}

return nil
}
return r.handleExternalDelete(context.Background(), cadence)
}

l.Error(err, "Cannot get Cadence cluster from the Instaclustr API",
Expand Down Expand Up @@ -1281,3 +1243,23 @@ func (r *CadenceReconciler) reconcileMaintenanceEvents(ctx context.Context, c *v

return nil
}

func (r *CadenceReconciler) handleExternalDelete(ctx context.Context, c *v1beta1.Cadence) error {
l := log.FromContext(ctx)

patch := c.NewPatch()
c.Status.State = models.DeletedStatus
err := r.Status().Patch(ctx, c, patch)
if err != nil {
return err
}

l.Info(msgInstaclustrResourceNotFound)
r.EventRecorder.Eventf(c, models.Warning, models.ExternalDeleted, msgInstaclustrResourceNotFound)

r.Scheduler.RemoveJob(c.GetJobID(scheduler.BackupsChecker))
r.Scheduler.RemoveJob(c.GetJobID(scheduler.UserCreator))
r.Scheduler.RemoveJob(c.GetJobID(scheduler.StatusChecker))

return nil
}
60 changes: 21 additions & 39 deletions controllers/clusters/cassandra_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -885,45 +885,7 @@ func (r *CassandraReconciler) newWatchStatusJob(cassandra *v1beta1.Cassandra) sc
iData, err := r.API.GetCassandra(cassandra.Status.ID)
if err != nil {
if errors.Is(err, instaclustr.NotFound) {
activeClusters, err := r.API.ListClusters()
if err != nil {
l.Error(err, "Cannot list account active clusters")
return err
}

if !isClusterActive(cassandra.Status.ID, activeClusters) {
l.Info("Cluster is not found in Instaclustr. Deleting resource.",
"cluster ID", cassandra.Status.ClusterStatus.ID,
"cluster name", cassandra.Spec.Name,
)

patch := cassandra.NewPatch()
cassandra.Annotations[models.ClusterDeletionAnnotation] = ""
cassandra.Annotations[models.ResourceStateAnnotation] = models.DeletingEvent
err = r.Patch(context.TODO(), cassandra, patch)
if err != nil {
l.Error(err, "Cannot patch Cassandra cluster resource",
"cluster ID", cassandra.Status.ID,
"cluster name", cassandra.Spec.Name,
"resource name", cassandra.Name,
)

return err
}

err = r.Delete(context.TODO(), cassandra)
if err != nil {
l.Error(err, "Cannot delete Cassandra cluster resource",
"cluster ID", cassandra.Status.ID,
"cluster name", cassandra.Spec.Name,
"resource name", cassandra.Name,
)

return err
}

return nil
}
return r.handleExternalDelete(context.Background(), cassandra)
}

l.Error(err, "Cannot get cluster from the Instaclustr API",
Expand Down Expand Up @@ -1262,6 +1224,26 @@ func (r *CassandraReconciler) reconcileMaintenanceEvents(ctx context.Context, c
return nil
}

func (r *CassandraReconciler) handleExternalDelete(ctx context.Context, cassandra *v1beta1.Cassandra) error {
l := log.FromContext(ctx)

patch := cassandra.NewPatch()
cassandra.Status.State = models.DeletedStatus
err := r.Status().Patch(ctx, cassandra, patch)
if err != nil {
return err
}

l.Info(msgInstaclustrResourceNotFound)
r.EventRecorder.Eventf(cassandra, models.Warning, models.ExternalDeleted, msgInstaclustrResourceNotFound)

r.Scheduler.RemoveJob(cassandra.GetJobID(scheduler.BackupsChecker))
r.Scheduler.RemoveJob(cassandra.GetJobID(scheduler.UserCreator))
r.Scheduler.RemoveJob(cassandra.GetJobID(scheduler.StatusChecker))

return nil
}

// SetupWithManager sets up the controller with the Manager.
func (r *CassandraReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
Expand Down
35 changes: 35 additions & 0 deletions controllers/clusters/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,11 @@ limitations under the License.
package clusters

import (
"context"
"encoding/json"
"fmt"
"k8s.io/client-go/tools/record"
"sigs.k8s.io/controller-runtime/pkg/log"
"sort"

"github.com/hashicorp/go-version"
Expand Down Expand Up @@ -182,6 +185,36 @@ func createSpecDifferenceMessage(k8sSpec, iSpec any) (string, error) {
return msg + specDifference, nil
}

type Object interface {
client.Object
SetState(state string)
NewPatch() client.Patch
}

// ResourceDeleted sets the state of the resource to models.ExternalDeleted
func ResourceDeleted(
ctx context.Context,
client client.Client,
recorder record.EventRecorder,
obj Object,
) error {
l := log.FromContext(ctx)

patch := obj.NewPatch()

obj.SetState(models.DeletedStatus)

err := client.Status().Patch(ctx, obj, patch)
if err != nil {
return err
}

l.Info(msgInstaclustrResourceNotFound)
recorder.Eventf(obj, models.Warning, models.ExternalDeleted, msgInstaclustrResourceNotFound)

return nil
}

var msgDeleteClusterWithTwoFactorDelete = "Please confirm cluster deletion via email or phone. " +
"If you have canceled a cluster deletion and want to put the cluster on deletion again, " +
"remove \"triggered\" from Instaclustr.com/clusterDeletion annotation."
Expand All @@ -191,3 +224,5 @@ var msgExternalChanges = "The k8s specification is different from Instaclustr Co
"so that it would corresponds to the data from Instaclustr."

var msgSpecStillNoMatch = "k8s resource specification still doesn't match with data on the Instaclustr Console. Double check the difference."

const msgInstaclustrResourceNotFound = "The resource is not found on Instaclustr, changing its state to deleted"
48 changes: 12 additions & 36 deletions controllers/clusters/kafka_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -677,6 +677,7 @@ func (r *KafkaReconciler) newWatchStatusJob(kafka *v1beta1.Kafka) scheduler.Job
"namespaced name", namespacedName)
r.Scheduler.RemoveJob(kafka.GetJobID(scheduler.StatusChecker))
r.Scheduler.RemoveJob(kafka.GetJobID(scheduler.UserCreator))
r.Scheduler.RemoveJob(kafka.GetJobID(scheduler.BackupsChecker))
return nil
}
if err != nil {
Expand All @@ -688,7 +689,7 @@ func (r *KafkaReconciler) newWatchStatusJob(kafka *v1beta1.Kafka) scheduler.Job
iData, err := r.API.GetKafka(kafka.Status.ID)
if err != nil {
if errors.Is(err, instaclustr.NotFound) {
return r.handleDeleteFromInstaclustrUI(kafka, l)
return r.handleExternalDelete(context.Background(), kafka)
}

l.Error(err, "Cannot get cluster from the Instaclustr", "cluster ID", kafka.Status.ID)
Expand Down Expand Up @@ -868,47 +869,22 @@ func (r *KafkaReconciler) newUsersCreationJob(kafka *v1beta1.Kafka) scheduler.Jo
}
}

func (r *KafkaReconciler) handleDeleteFromInstaclustrUI(kafka *v1beta1.Kafka, l logr.Logger) error {
activeClusters, err := r.API.ListClusters()
if err != nil {
l.Error(err, "Cannot list account active clusters")
return err
}

if isClusterActive(kafka.Status.ID, activeClusters) {
l.Info("Kafka is not found in the Instaclustr but still exist in the Instaclustr list of active cluster",
"cluster ID", kafka.Status.ID,
"cluster name", kafka.Spec.Name,
"resource name", kafka.Name)

return nil
}

l.Info("Cluster is not found in Instaclustr. Deleting resource.",
"cluster ID", kafka.Status.ClusterStatus.ID,
"cluster name", kafka.Spec.Name)
func (r *KafkaReconciler) handleExternalDelete(ctx context.Context, kafka *v1beta1.Kafka) error {
l := log.FromContext(ctx)

patch := kafka.NewPatch()

kafka.Annotations[models.ClusterDeletionAnnotation] = ""
kafka.Annotations[models.ResourceStateAnnotation] = models.DeletingEvent
err = r.Patch(context.TODO(), kafka, patch)
kafka.Status.State = models.DeletedStatus
err := r.Status().Patch(ctx, kafka, patch)
if err != nil {
l.Error(err, "Cannot patch Kafka cluster resource",
"cluster ID", kafka.Status.ID,
"cluster name", kafka.Spec.Name,
"resource name", kafka.Name)
return err
}

err = r.Delete(context.TODO(), kafka)
if err != nil {
l.Error(err, "Cannot delete Kafka cluster resource",
"cluster ID", kafka.Status.ID,
"cluster name", kafka.Spec.Name,
"resource name", kafka.Name)
return err
}
l.Info(msgInstaclustrResourceNotFound)
r.EventRecorder.Eventf(kafka, models.Warning, models.ExternalDeleted, msgInstaclustrResourceNotFound)

r.Scheduler.RemoveJob(kafka.GetJobID(scheduler.BackupsChecker))
r.Scheduler.RemoveJob(kafka.GetJobID(scheduler.UserCreator))
r.Scheduler.RemoveJob(kafka.GetJobID(scheduler.StatusChecker))

return nil
}
Expand Down
60 changes: 21 additions & 39 deletions controllers/clusters/kafkaconnect_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -468,45 +468,7 @@ func (r *KafkaConnectReconciler) newWatchStatusJob(kc *v1beta1.KafkaConnect) sch
iData, err := r.API.GetKafkaConnect(kc.Status.ID)
if err != nil {
if errors.Is(err, instaclustr.NotFound) {
activeClusters, err := r.API.ListClusters()
if err != nil {
l.Error(err, "Cannot list account active clusters")
return err
}

if !isClusterActive(kc.Status.ID, activeClusters) {
l.Info("Cluster is not found in Instaclustr. Deleting resource.",
"cluster ID", kc.Status.ClusterStatus.ID,
"cluster name", kc.Spec.Name,
)

patch := kc.NewPatch()
kc.Annotations[models.ClusterDeletionAnnotation] = ""
kc.Annotations[models.ResourceStateAnnotation] = models.DeletingEvent
err = r.Patch(context.TODO(), kc, patch)
if err != nil {
l.Error(err, "Cannot patch KafkaConnect cluster resource",
"cluster ID", kc.Status.ID,
"cluster name", kc.Spec.Name,
"resource name", kc.Name,
)

return err
}

err = r.Delete(context.TODO(), kc)
if err != nil {
l.Error(err, "Cannot delete KafkaConnect cluster resource",
"cluster ID", kc.Status.ID,
"cluster name", kc.Spec.Name,
"resource name", kc.Name,
)

return err
}

return nil
}
return r.handleExternalDelete(context.Background(), kc)
}

l.Error(err, "Cannot get Kafka Connect from Instaclustr",
Expand Down Expand Up @@ -657,3 +619,23 @@ func (r *KafkaConnectReconciler) reconcileMaintenanceEvents(ctx context.Context,

return nil
}

func (r *KafkaConnectReconciler) handleExternalDelete(ctx context.Context, kc *v1beta1.KafkaConnect) error {
l := log.FromContext(ctx)

patch := kc.NewPatch()
kc.Status.State = models.DeletedStatus
err := r.Status().Patch(ctx, kc, patch)
if err != nil {
return err
}

l.Info(msgInstaclustrResourceNotFound)
r.EventRecorder.Eventf(kc, models.Warning, models.ExternalDeleted, msgInstaclustrResourceNotFound)

r.Scheduler.RemoveJob(kc.GetJobID(scheduler.BackupsChecker))
r.Scheduler.RemoveJob(kc.GetJobID(scheduler.UserCreator))
r.Scheduler.RemoveJob(kc.GetJobID(scheduler.StatusChecker))

return nil
}
Loading

0 comments on commit 47d9fd7

Please sign in to comment.