From 3ff0d926330766ed3b46e7e38a6881b7e72c7b16 Mon Sep 17 00:00:00 2001 From: Bohdan Siryk Date: Thu, 28 Sep 2023 17:02:24 +0300 Subject: [PATCH] issue-572, handling of the resource external deletion was added --- controllers/clusters/cadence_controller.go | 83 ++++------ controllers/clusters/cassandra_controller.go | 136 +++++++--------- .../clusters/cassandra_controller_test.go | 52 ++++++ controllers/clusters/helpers.go | 5 +- controllers/clusters/kafka_controller.go | 101 +++++------- controllers/clusters/kafka_controller_test.go | 52 +++++- .../clusters/kafkaconnect_controller.go | 122 +++++++------- .../clusters/kafkaconnect_controller_test.go | 67 +++++++- controllers/clusters/opensearch_controller.go | 131 +++++++-------- .../clusters/opensearch_controller_test.go | 49 ++++++ controllers/clusters/postgresql_controller.go | 144 ++++++++--------- .../clusters/postgresql_controller_test.go | 49 ++++++ controllers/clusters/redis_controller.go | 150 ++++++++---------- controllers/clusters/redis_controller_test.go | 49 ++++++ controllers/clusters/suite_test.go | 4 +- controllers/clusters/zookeeper_controller.go | 105 +++++------- .../clusters/zookeeper_controller_test.go | 65 ++++++++ pkg/instaclustr/errors.go | 1 + pkg/instaclustr/mock/server/go/api.go | 1 + ...pache_cassandra_provisioning_v2_service.go | 32 ++-- ...e_kafka_connect_provisioning_v2_service.go | 32 ++-- ...pi_apache_kafka_provisioning_v2_service.go | 32 ++-- .../mock/server/go/api_bundle_user.go | 19 +++ .../mock/server/go/api_bundle_user_service.go | 13 ++ ...api_open_search_provisioning_v2_service.go | 31 ++-- pkg/models/apiv1.go | 1 - pkg/models/apiv2.go | 2 + pkg/models/operator.go | 1 + 28 files changed, 901 insertions(+), 628 deletions(-) diff --git a/controllers/clusters/cadence_controller.go b/controllers/clusters/cadence_controller.go index af59a3668..5945367ec 100644 --- a/controllers/clusters/cadence_controller.go +++ b/controllers/clusters/cadence_controller.go @@ -222,20 +222,22 @@ func (r *CadenceReconciler) HandleCreateCluster( "Cluster creation request is sent. Cluster ID: %s", id) } - err := r.startClusterStatusJob(cadence) - if err != nil { - logger.Error(err, "Cannot start cluster status job", - "cadence cluster ID", cadence.Status.ID, - ) + if cadence.Status.State != models.DeletedStatus { + err := r.startClusterStatusJob(cadence) + if err != nil { + logger.Error(err, "Cannot start cluster status job", + "cadence cluster ID", cadence.Status.ID, + ) - r.EventRecorder.Eventf(cadence, models.Warning, models.CreationFailed, - "Cluster status check job is failed. Reason: %v", err) + r.EventRecorder.Eventf(cadence, models.Warning, models.CreationFailed, + "Cluster status check job is failed. Reason: %v", err) - return models.ReconcileRequeue - } + return models.ReconcileRequeue + } - r.EventRecorder.Event(cadence, models.Normal, models.Created, - "Cluster status check job is started") + r.EventRecorder.Event(cadence, models.Normal, models.Created, + "Cluster status check job is started") + } return models.ExitReconcile } @@ -790,45 +792,7 @@ func (r *CadenceReconciler) newWatchStatusJob(cadence *v1beta1.Cadence) schedule iData, err := r.API.GetCadence(cadence.Status.ID) if err != nil { if errors.Is(err, instaclustr.NotFound) { - activeClusters, err := r.API.ListClusters() - if err != nil { - l.Error(err, "Cannot list account active clusters") - return err - } - - if !isClusterActive(cadence.Status.ID, activeClusters) { - l.Info("Cluster is not found in Instaclustr. Deleting resource.", - "cluster ID", cadence.Status.ClusterStatus.ID, - "cluster name", cadence.Spec.Name, - ) - - patch := cadence.NewPatch() - cadence.Annotations[models.ClusterDeletionAnnotation] = "" - cadence.Annotations[models.ResourceStateAnnotation] = models.DeletingEvent - err = r.Patch(context.TODO(), cadence, patch) - if err != nil { - l.Error(err, "Cannot patch Cadence cluster resource", - "cluster ID", cadence.Status.ID, - "cluster name", cadence.Spec.Name, - "resource name", cadence.Name, - ) - - return err - } - - err = r.Delete(context.TODO(), cadence) - if err != nil { - l.Error(err, "Cannot delete Cadence cluster resource", - "cluster ID", cadence.Status.ID, - "cluster name", cadence.Spec.Name, - "resource name", cadence.Name, - ) - - return err - } - - return nil - } + return r.handleExternalDelete(context.Background(), cadence) } l.Error(err, "Cannot get Cadence cluster from the Instaclustr API", @@ -1281,3 +1245,22 @@ func (r *CadenceReconciler) reconcileMaintenanceEvents(ctx context.Context, c *v return nil } + +func (r *CadenceReconciler) handleExternalDelete(ctx context.Context, c *v1beta1.Cadence) error { + l := log.FromContext(ctx) + + patch := c.NewPatch() + c.Status.State = models.DeletedStatus + err := r.Status().Patch(ctx, c, patch) + if err != nil { + return err + } + + l.Info(instaclustr.MsgInstaclustrResourceNotFound) + r.EventRecorder.Eventf(c, models.Warning, models.ExternalDeleted, instaclustr.MsgInstaclustrResourceNotFound) + + r.Scheduler.RemoveJob(c.GetJobID(scheduler.BackupsChecker)) + r.Scheduler.RemoveJob(c.GetJobID(scheduler.StatusChecker)) + + return nil +} diff --git a/controllers/clusters/cassandra_controller.go b/controllers/clusters/cassandra_controller.go index 3368b66a6..8de997e52 100644 --- a/controllers/clusters/cassandra_controller.go +++ b/controllers/clusters/cassandra_controller.go @@ -221,54 +221,56 @@ func (r *CassandraReconciler) handleCreateCluster( ) } - err = r.startClusterStatusJob(cassandra) - if err != nil { - l.Error(err, "Cannot start cluster status job", - "cassandra cluster ID", cassandra.Status.ID) + if cassandra.Status.State != models.DeletedStatus { + err = r.startClusterStatusJob(cassandra) + if err != nil { + l.Error(err, "Cannot start cluster status job", + "cassandra cluster ID", cassandra.Status.ID) + + r.EventRecorder.Eventf( + cassandra, models.Warning, models.CreationFailed, + "Cluster status check job is failed. Reason: %v", + err, + ) + return models.ReconcileRequeue + } r.EventRecorder.Eventf( - cassandra, models.Warning, models.CreationFailed, - "Cluster status check job is failed. Reason: %v", - err, + cassandra, models.Normal, models.Created, + "Cluster status check job is started", ) - return models.ReconcileRequeue - } - r.EventRecorder.Eventf( - cassandra, models.Normal, models.Created, - "Cluster status check job is started", - ) + err = r.startClusterBackupsJob(cassandra) + if err != nil { + l.Error(err, "Cannot start cluster backups check job", + "cluster ID", cassandra.Status.ID, + ) - err = r.startClusterBackupsJob(cassandra) - if err != nil { - l.Error(err, "Cannot start cluster backups check job", - "cluster ID", cassandra.Status.ID, - ) + r.EventRecorder.Eventf( + cassandra, models.Warning, models.CreationFailed, + "Cluster backups check job is failed. Reason: %v", + err, + ) + return models.ReconcileRequeue + } r.EventRecorder.Eventf( - cassandra, models.Warning, models.CreationFailed, - "Cluster backups check job is failed. Reason: %v", - err, + cassandra, models.Normal, models.Created, + "Cluster backups check job is started", ) - return models.ReconcileRequeue - } - r.EventRecorder.Eventf( - cassandra, models.Normal, models.Created, - "Cluster backups check job is started", - ) + if cassandra.Spec.UserRefs != nil { + err = r.startUsersCreationJob(cassandra) + if err != nil { + l.Error(err, "Failed to start user creation job") + r.EventRecorder.Eventf(cassandra, models.Warning, models.CreationFailed, + "User creation job is failed. Reason: %v", err) + return models.ReconcileRequeue + } - if cassandra.Spec.UserRefs != nil { - err = r.startUsersCreationJob(cassandra) - if err != nil { - l.Error(err, "Failed to start user creation job") - r.EventRecorder.Eventf(cassandra, models.Warning, models.CreationFailed, - "User creation job is failed. Reason: %v", err) - return models.ReconcileRequeue + r.EventRecorder.Event(cassandra, models.Normal, models.Created, + "Cluster user creation job is started") } - - r.EventRecorder.Event(cassandra, models.Normal, models.Created, - "Cluster user creation job is started") } return models.ExitReconcile @@ -885,45 +887,7 @@ func (r *CassandraReconciler) newWatchStatusJob(cassandra *v1beta1.Cassandra) sc iData, err := r.API.GetCassandra(cassandra.Status.ID) if err != nil { if errors.Is(err, instaclustr.NotFound) { - activeClusters, err := r.API.ListClusters() - if err != nil { - l.Error(err, "Cannot list account active clusters") - return err - } - - if !isClusterActive(cassandra.Status.ID, activeClusters) { - l.Info("Cluster is not found in Instaclustr. Deleting resource.", - "cluster ID", cassandra.Status.ClusterStatus.ID, - "cluster name", cassandra.Spec.Name, - ) - - patch := cassandra.NewPatch() - cassandra.Annotations[models.ClusterDeletionAnnotation] = "" - cassandra.Annotations[models.ResourceStateAnnotation] = models.DeletingEvent - err = r.Patch(context.TODO(), cassandra, patch) - if err != nil { - l.Error(err, "Cannot patch Cassandra cluster resource", - "cluster ID", cassandra.Status.ID, - "cluster name", cassandra.Spec.Name, - "resource name", cassandra.Name, - ) - - return err - } - - err = r.Delete(context.TODO(), cassandra) - if err != nil { - l.Error(err, "Cannot delete Cassandra cluster resource", - "cluster ID", cassandra.Status.ID, - "cluster name", cassandra.Spec.Name, - "resource name", cassandra.Name, - ) - - return err - } - - return nil - } + return r.handleExternalDelete(context.Background(), cassandra) } l.Error(err, "Cannot get cluster from the Instaclustr API", @@ -1262,6 +1226,26 @@ func (r *CassandraReconciler) reconcileMaintenanceEvents(ctx context.Context, c return nil } +func (r *CassandraReconciler) handleExternalDelete(ctx context.Context, c *v1beta1.Cassandra) error { + l := log.FromContext(ctx) + + patch := c.NewPatch() + c.Status.State = models.DeletedStatus + err := r.Status().Patch(ctx, c, patch) + if err != nil { + return err + } + + l.Info(instaclustr.MsgInstaclustrResourceNotFound) + r.EventRecorder.Eventf(c, models.Warning, models.ExternalDeleted, instaclustr.MsgInstaclustrResourceNotFound) + + r.Scheduler.RemoveJob(c.GetJobID(scheduler.BackupsChecker)) + r.Scheduler.RemoveJob(c.GetJobID(scheduler.UserCreator)) + r.Scheduler.RemoveJob(c.GetJobID(scheduler.StatusChecker)) + + return nil +} + // SetupWithManager sets up the controller with the Manager. func (r *CassandraReconciler) SetupWithManager(mgr ctrl.Manager) error { return ctrl.NewControllerManagedBy(mgr). diff --git a/controllers/clusters/cassandra_controller_test.go b/controllers/clusters/cassandra_controller_test.go index 151235359..8256f24ed 100644 --- a/controllers/clusters/cassandra_controller_test.go +++ b/controllers/clusters/cassandra_controller_test.go @@ -18,6 +18,7 @@ package clusters import ( "context" + "fmt" "os" . "github.com/onsi/ginkgo/v2" @@ -28,7 +29,9 @@ import ( "github.com/instaclustr/operator/apis/clusters/v1beta1" "github.com/instaclustr/operator/controllers/tests" + "github.com/instaclustr/operator/pkg/instaclustr" openapi "github.com/instaclustr/operator/pkg/instaclustr/mock/server/go" + "github.com/instaclustr/operator/pkg/models" ) const newCassandraNodeSize = "CAS-DEV-t4g.small-30" @@ -106,4 +109,53 @@ var _ = Describe("Cassandra Controller", func() { }, timeout, interval).Should(BeTrue()) }) }) + + When("Deleting the Cassandra resource by avoiding operator", func() { + It("should try to get the cluster details and receive StatusNotFound", func() { + cassandraManifest2 := cassandraManifest.DeepCopy() + cassandraManifest2.Name += "-test-external-delete" + cassandraManifest2.ResourceVersion = "" + + cassandra2 := v1beta1.Cassandra{} + cassandra2NamespacedName := types.NamespacedName{ + Namespace: cassandraManifest2.Namespace, + Name: cassandraManifest2.Name, + } + + Expect(k8sClient.Create(ctx, cassandraManifest2)).Should(Succeed()) + + doneCh := tests.NewChannelWithTimeout(timeout) + + Eventually(func() bool { + if err := k8sClient.Get(ctx, cassandra2NamespacedName, &cassandra2); err != nil { + return false + } + + if cassandra2.Status.State != models.RunningStatus { + return false + } + + doneCh <- struct{}{} + + return true + }, timeout, interval).Should(BeTrue()) + + <-doneCh + + By("testing by deleting the cluster by Instaclustr API client") + Expect(instaClient.DeleteCluster(cassandra2.Status.ID, instaclustr.CassandraEndpoint)).Should(Succeed()) + + Eventually(func() bool { + err := k8sClient.Get(ctx, cassandra2NamespacedName, &cassandra2) + if err != nil { + return false + } + + fmt.Println("cassandra2.Status.State", cassandra2.Status.State) + + return cassandra2.Status.State == models.DeletedStatus + }, timeout, interval).Should(BeTrue()) + }) + }) + }) diff --git a/controllers/clusters/helpers.go b/controllers/clusters/helpers.go index 806f71dfb..ebb3bd69a 100644 --- a/controllers/clusters/helpers.go +++ b/controllers/clusters/helpers.go @@ -22,11 +22,10 @@ import ( "sort" "github.com/hashicorp/go-version" - "k8s.io/utils/strings/slices" - "sigs.k8s.io/controller-runtime/pkg/client" - "github.com/instaclustr/operator/apis/clusters/v1beta1" "github.com/instaclustr/operator/pkg/models" + "k8s.io/utils/strings/slices" + "sigs.k8s.io/controller-runtime/pkg/client" ) // confirmDeletion confirms if resource is deleting and set appropriate annotation. diff --git a/controllers/clusters/kafka_controller.go b/controllers/clusters/kafka_controller.go index c480ac2e2..e1eca1ce2 100644 --- a/controllers/clusters/kafka_controller.go +++ b/controllers/clusters/kafka_controller.go @@ -151,37 +151,44 @@ func (r *KafkaReconciler) handleCreateCluster(ctx context.Context, kafka *v1beta ) return models.ReconcileRequeue } - } - err = r.startClusterStatusJob(kafka) - if err != nil { - l.Error(err, "Cannot start cluster status job", - "cluster ID", kafka.Status.ID) - r.EventRecorder.Eventf( - kafka, models.Warning, models.CreationFailed, - "Cluster status check job creation is failed. Reason: %v", - err, + l.Info("Cluster has been created", + "cluster ID", kafka.Status.ID, ) - return models.ReconcileRequeue } - r.EventRecorder.Eventf( - kafka, models.Normal, models.Created, - "Cluster status check job is started", - ) - - l.Info("Cluster has been created", - "cluster ID", kafka.Status.ID) - - if kafka.Spec.UserRefs != nil { - err = r.startUsersCreationJob(kafka) + if kafka.Status.State != models.DeletedStatus { + err = r.startClusterStatusJob(kafka) if err != nil { - l.Error(err, "Failed to start user creation job") - r.EventRecorder.Eventf(kafka, models.Warning, models.CreationFailed, - "User creation job is failed. Reason: %v", err, + l.Error(err, "Cannot start cluster status job", + "cluster ID", kafka.Status.ID) + r.EventRecorder.Eventf( + kafka, models.Warning, models.CreationFailed, + "Cluster status check job creation is failed. Reason: %v", + err, ) return models.ReconcileRequeue } + + r.EventRecorder.Eventf( + kafka, models.Normal, models.Created, + "Cluster status check job is started", + ) + + if kafka.Spec.UserRefs != nil { + err = r.startUsersCreationJob(kafka) + if err != nil { + l.Error(err, "Failed to start user creation job") + r.EventRecorder.Eventf(kafka, models.Warning, models.CreationFailed, + "User creation job is failed. Reason: %v", err, + ) + return models.ReconcileRequeue + } + + r.EventRecorder.Event(kafka, models.Normal, models.Created, + "Cluster user creation job is started", + ) + } } return models.ExitReconcile @@ -677,6 +684,7 @@ func (r *KafkaReconciler) newWatchStatusJob(kafka *v1beta1.Kafka) scheduler.Job "namespaced name", namespacedName) r.Scheduler.RemoveJob(kafka.GetJobID(scheduler.StatusChecker)) r.Scheduler.RemoveJob(kafka.GetJobID(scheduler.UserCreator)) + r.Scheduler.RemoveJob(kafka.GetJobID(scheduler.BackupsChecker)) return nil } if err != nil { @@ -688,7 +696,7 @@ func (r *KafkaReconciler) newWatchStatusJob(kafka *v1beta1.Kafka) scheduler.Job iData, err := r.API.GetKafka(kafka.Status.ID) if err != nil { if errors.Is(err, instaclustr.NotFound) { - return r.handleDeleteFromInstaclustrUI(kafka, l) + return r.handleExternalDelete(context.Background(), kafka) } l.Error(err, "Cannot get cluster from the Instaclustr", "cluster ID", kafka.Status.ID) @@ -868,47 +876,22 @@ func (r *KafkaReconciler) newUsersCreationJob(kafka *v1beta1.Kafka) scheduler.Jo } } -func (r *KafkaReconciler) handleDeleteFromInstaclustrUI(kafka *v1beta1.Kafka, l logr.Logger) error { - activeClusters, err := r.API.ListClusters() - if err != nil { - l.Error(err, "Cannot list account active clusters") - return err - } - - if isClusterActive(kafka.Status.ID, activeClusters) { - l.Info("Kafka is not found in the Instaclustr but still exist in the Instaclustr list of active cluster", - "cluster ID", kafka.Status.ID, - "cluster name", kafka.Spec.Name, - "resource name", kafka.Name) - - return nil - } - - l.Info("Cluster is not found in Instaclustr. Deleting resource.", - "cluster ID", kafka.Status.ClusterStatus.ID, - "cluster name", kafka.Spec.Name) +func (r *KafkaReconciler) handleExternalDelete(ctx context.Context, kafka *v1beta1.Kafka) error { + l := log.FromContext(ctx) patch := kafka.NewPatch() - - kafka.Annotations[models.ClusterDeletionAnnotation] = "" - kafka.Annotations[models.ResourceStateAnnotation] = models.DeletingEvent - err = r.Patch(context.TODO(), kafka, patch) + kafka.Status.State = models.DeletedStatus + err := r.Status().Patch(ctx, kafka, patch) if err != nil { - l.Error(err, "Cannot patch Kafka cluster resource", - "cluster ID", kafka.Status.ID, - "cluster name", kafka.Spec.Name, - "resource name", kafka.Name) return err } - err = r.Delete(context.TODO(), kafka) - if err != nil { - l.Error(err, "Cannot delete Kafka cluster resource", - "cluster ID", kafka.Status.ID, - "cluster name", kafka.Spec.Name, - "resource name", kafka.Name) - return err - } + l.Info(instaclustr.MsgInstaclustrResourceNotFound) + r.EventRecorder.Eventf(kafka, models.Warning, models.ExternalDeleted, instaclustr.MsgInstaclustrResourceNotFound) + + r.Scheduler.RemoveJob(kafka.GetJobID(scheduler.BackupsChecker)) + r.Scheduler.RemoveJob(kafka.GetJobID(scheduler.UserCreator)) + r.Scheduler.RemoveJob(kafka.GetJobID(scheduler.StatusChecker)) return nil } diff --git a/controllers/clusters/kafka_controller_test.go b/controllers/clusters/kafka_controller_test.go index dae764f44..2e533126c 100644 --- a/controllers/clusters/kafka_controller_test.go +++ b/controllers/clusters/kafka_controller_test.go @@ -28,7 +28,8 @@ import ( "github.com/instaclustr/operator/apis/clusters/v1beta1" "github.com/instaclustr/operator/controllers/tests" - openapi "github.com/instaclustr/operator/pkg/instaclustr/mock/server/go" + "github.com/instaclustr/operator/pkg/instaclustr" + "github.com/instaclustr/operator/pkg/models" ) const newKafkaNodeSize = "KFK-DEV-t4g.medium-80" @@ -58,7 +59,7 @@ var _ = Describe("Kafka Controller", func() { return false } - return kafka.Status.ID == openapi.CreatedID + return kafka.Status.ID != "" }).Should(BeTrue()) <-done @@ -104,4 +105,51 @@ var _ = Describe("Kafka Controller", func() { }, timeout, interval).Should(BeTrue()) }) }) + + When("Deleting the Kafka Connect resource by avoiding operator", func() { + It("should try to get the cluster details and receive StatusNotFound", func() { + kafkaManifest2 := kafkaManifest.DeepCopy() + kafkaManifest2.Name += "-test-external-delete" + kafkaManifest2.ResourceVersion = "" + + kafka2 := v1beta1.Kafka{} + kafkaConnect2NamespacedName := types.NamespacedName{ + Namespace: kafkaManifest2.Namespace, + Name: kafkaManifest2.Name, + } + + Expect(k8sClient.Create(ctx, kafkaManifest2)).Should(Succeed()) + + doneCh := tests.NewChannelWithTimeout(timeout) + + Eventually(func() bool { + if err := k8sClient.Get(ctx, kafkaConnect2NamespacedName, &kafka2); err != nil { + return false + } + + if kafka2.Status.State != models.RunningStatus { + return false + } + + doneCh <- struct{}{} + + return true + }, timeout, interval).Should(BeTrue()) + + <-doneCh + + By("testing by deleting the cluster by Instaclustr API client") + Expect(instaClient.DeleteCluster(kafka2.Status.ID, instaclustr.KafkaEndpoint)).Should(Succeed()) + + Eventually(func() bool { + err := k8sClient.Get(ctx, kafkaConnect2NamespacedName, &kafka2) + if err != nil { + return false + } + + return kafka2.Status.State == models.DeletedStatus + }, timeout, interval).Should(BeTrue()) + }) + }) + }) diff --git a/controllers/clusters/kafkaconnect_controller.go b/controllers/clusters/kafkaconnect_controller.go index c8df36ee2..ef36cb776 100644 --- a/controllers/clusters/kafkaconnect_controller.go +++ b/controllers/clusters/kafkaconnect_controller.go @@ -19,7 +19,6 @@ package clusters import ( "context" "errors" - "github.com/go-logr/logr" k8serrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/runtime" @@ -138,40 +137,43 @@ func (r *KafkaConnectReconciler) handleCreateCluster(ctx context.Context, kc *v1 ) return models.ReconcileRequeue } - } - err := r.startClusterStatusJob(kc) - if err != nil { - l.Error(err, "Cannot start cluster status job", "cluster ID", kc.Status.ID) - r.EventRecorder.Eventf( - kc, models.Warning, models.CreationFailed, - "Cluster status check job is failed. Reason: %v", - err, + err = r.createDefaultSecret(ctx, kc, l) + if err != nil { + l.Error(err, "Cannot create default secret for Kafka Connect", + "cluster name", kc.Spec.Name, + "clusterID", kc.Status.ID, + ) + r.EventRecorder.Eventf( + kc, models.Warning, models.CreationFailed, + "Default user secret creation on the Instaclustr is failed. Reason: %v", + err, + ) + + return models.ReconcileRequeue + } + + l.Info("Kafka Connect cluster has been created", + "cluster ID", kc.Status.ID, ) - return models.ReconcileRequeue } - l.Info("Kafka Connect cluster has been created", - "cluster ID", kc.Status.ID) - - r.EventRecorder.Eventf( - kc, models.Normal, models.Created, - "Cluster status check job is started", - ) + if kc.Status.State != models.DeletedStatus { + err := r.startClusterStatusJob(kc) + if err != nil { + l.Error(err, "Cannot start cluster status job", "cluster ID", kc.Status.ID) + r.EventRecorder.Eventf( + kc, models.Warning, models.CreationFailed, + "Cluster status check job is failed. Reason: %v", + err, + ) + return models.ReconcileRequeue + } - err = r.createDefaultSecret(ctx, kc, l) - if err != nil { - l.Error(err, "Cannot create default secret for Kafka Connect", - "cluster name", kc.Spec.Name, - "clusterID", kc.Status.ID, - ) r.EventRecorder.Eventf( - kc, models.Warning, models.CreationFailed, - "Default user secret creation on the Instaclustr is failed. Reason: %v", - err, + kc, models.Normal, models.Created, + "Cluster status check job is started", ) - - return models.ReconcileRequeue } return models.ExitReconcile @@ -434,6 +436,11 @@ func (r *KafkaConnectReconciler) createDefaultSecret(ctx context.Context, kc *v1 return err } + l.Info("Default secret was created", + "secret name", secret.Name, + "secret namespace", secret.Namespace, + ) + return nil } @@ -468,45 +475,7 @@ func (r *KafkaConnectReconciler) newWatchStatusJob(kc *v1beta1.KafkaConnect) sch iData, err := r.API.GetKafkaConnect(kc.Status.ID) if err != nil { if errors.Is(err, instaclustr.NotFound) { - activeClusters, err := r.API.ListClusters() - if err != nil { - l.Error(err, "Cannot list account active clusters") - return err - } - - if !isClusterActive(kc.Status.ID, activeClusters) { - l.Info("Cluster is not found in Instaclustr. Deleting resource.", - "cluster ID", kc.Status.ClusterStatus.ID, - "cluster name", kc.Spec.Name, - ) - - patch := kc.NewPatch() - kc.Annotations[models.ClusterDeletionAnnotation] = "" - kc.Annotations[models.ResourceStateAnnotation] = models.DeletingEvent - err = r.Patch(context.TODO(), kc, patch) - if err != nil { - l.Error(err, "Cannot patch KafkaConnect cluster resource", - "cluster ID", kc.Status.ID, - "cluster name", kc.Spec.Name, - "resource name", kc.Name, - ) - - return err - } - - err = r.Delete(context.TODO(), kc) - if err != nil { - l.Error(err, "Cannot delete KafkaConnect cluster resource", - "cluster ID", kc.Status.ID, - "cluster name", kc.Spec.Name, - "resource name", kc.Name, - ) - - return err - } - - return nil - } + return r.handleExternalDelete(context.Background(), kc) } l.Error(err, "Cannot get Kafka Connect from Instaclustr", @@ -657,3 +626,22 @@ func (r *KafkaConnectReconciler) reconcileMaintenanceEvents(ctx context.Context, return nil } + +func (r *KafkaConnectReconciler) handleExternalDelete(ctx context.Context, kc *v1beta1.KafkaConnect) error { + l := log.FromContext(ctx) + + patch := kc.NewPatch() + kc.Status.State = models.DeletedStatus + err := r.Status().Patch(ctx, kc, patch) + if err != nil { + return err + } + + l.Info(instaclustr.MsgInstaclustrResourceNotFound) + r.EventRecorder.Eventf(kc, models.Warning, models.ExternalDeleted, instaclustr.MsgInstaclustrResourceNotFound) + + r.Scheduler.RemoveJob(kc.GetJobID(scheduler.BackupsChecker)) + r.Scheduler.RemoveJob(kc.GetJobID(scheduler.StatusChecker)) + + return nil +} diff --git a/controllers/clusters/kafkaconnect_controller_test.go b/controllers/clusters/kafkaconnect_controller_test.go index a4fa8d406..92e20478b 100644 --- a/controllers/clusters/kafkaconnect_controller_test.go +++ b/controllers/clusters/kafkaconnect_controller_test.go @@ -18,6 +18,8 @@ package clusters import ( "context" + "github.com/instaclustr/operator/pkg/instaclustr" + "github.com/instaclustr/operator/pkg/models" "os" . "github.com/onsi/ginkgo/v2" @@ -28,7 +30,6 @@ import ( "github.com/instaclustr/operator/apis/clusters/v1beta1" "github.com/instaclustr/operator/controllers/tests" - openapi "github.com/instaclustr/operator/pkg/instaclustr/mock/server/go" ) const newKafkaConnectNodeNumbers = 6 @@ -58,10 +59,26 @@ var _ = Describe("Kafka Connect Controller", func() { return false } - return kafkaConnect.Status.ID == openapi.CreatedID + return kafkaConnect.Status.ID != "" }).Should(BeTrue()) <-done + + By("creating secret with the default user credentials") + secret := kafkaConnect.NewDefaultUserSecret("", "") + secretNamespacedName := types.NamespacedName{ + Namespace: secret.Namespace, + Name: secret.Name, + } + + Eventually(func() bool { + if err := k8sClient.Get(ctx, secretNamespacedName, secret); err != nil { + return false + } + + return string(secret.Data[models.Username]) == kafkaConnect.Status.ID+"_username" && + string(secret.Data[models.Password]) == kafkaConnect.Status.ID+"_password" + }).Should(BeTrue()) }) }) @@ -104,4 +121,50 @@ var _ = Describe("Kafka Connect Controller", func() { }, timeout, interval).Should(BeTrue()) }) }) + + When("Deleting the Kafka Connect resource by avoiding operator", func() { + It("should try to get the cluster details and receive StatusNotFound", func() { + kafkaConnectManifest2 := kafkaConnectManifest.DeepCopy() + kafkaConnectManifest2.Name += "-test-external-delete" + kafkaConnectManifest2.ResourceVersion = "" + + kafkaConnect2 := v1beta1.KafkaConnect{} + kafkaConnect2NamespacedName := types.NamespacedName{ + Namespace: kafkaConnectManifest2.Namespace, + Name: kafkaConnectManifest2.Name, + } + + Expect(k8sClient.Create(ctx, kafkaConnectManifest2)).Should(Succeed()) + + doneCh := tests.NewChannelWithTimeout(timeout) + + Eventually(func() bool { + if err := k8sClient.Get(ctx, kafkaConnect2NamespacedName, &kafkaConnect2); err != nil { + return false + } + + if kafkaConnect2.Status.State != models.RunningStatus { + return false + } + + doneCh <- struct{}{} + + return true + }, timeout, interval).Should(BeTrue()) + + <-doneCh + + By("testing by deleting the cluster by Instaclustr API client") + Expect(instaClient.DeleteCluster(kafkaConnect2.Status.ID, instaclustr.KafkaConnectEndpoint)).Should(Succeed()) + + Eventually(func() bool { + err := k8sClient.Get(ctx, kafkaConnect2NamespacedName, &kafkaConnect2) + if err != nil { + return false + } + + return kafkaConnect2.Status.State == models.DeletedStatus + }, timeout, interval).Should(BeTrue()) + }) + }) }) diff --git a/controllers/clusters/opensearch_controller.go b/controllers/clusters/opensearch_controller.go index 75ae84c1a..6c9448f12 100644 --- a/controllers/clusters/opensearch_controller.go +++ b/controllers/clusters/opensearch_controller.go @@ -192,53 +192,56 @@ func (r *OpenSearchReconciler) HandleCreateCluster( return models.ReconcileRequeue } - } - - err = r.startClusterStatusJob(o) - if err != nil { - logger.Error(err, "Cannot start OpenSearch cluster status job", - "cluster ID", o.Status.ID) - r.EventRecorder.Eventf(o, models.Warning, models.CreationFailed, - "Cluster status check job is failed. Reason: %v", err) - - return models.ReconcileRequeue + logger.Info( + "OpenSearch resource has been created", + "cluster name", o.Name, "cluster ID", o.Status.ID, + "api version", o.APIVersion, "namespace", o.Namespace, + ) } - r.EventRecorder.Event(o, models.Normal, models.Created, - "Cluster status check job is started") - - err = r.startClusterBackupsJob(o) - if err != nil { - logger.Error(err, "Cannot start OpenSearch cluster backups check job", - "cluster ID", o.Status.ID) - - r.EventRecorder.Eventf(o, models.Warning, models.CreationFailed, - "Cluster backups check job is failed. Reason: %v", err) + if o.Status.State != models.DeletedStatus { + err = r.startClusterStatusJob(o) + if err != nil { + logger.Error(err, "Cannot start OpenSearch cluster status job", + "cluster ID", o.Status.ID) - return models.ReconcileRequeue - } + r.EventRecorder.Eventf(o, models.Warning, models.CreationFailed, + "Cluster status check job is failed. Reason: %v", err) - r.EventRecorder.Event(o, models.Normal, models.Created, - "Cluster backups check job is started") + return models.ReconcileRequeue + } - logger.Info( - "OpenSearch resource has been created", - "cluster name", o.Name, "cluster ID", o.Status.ID, - "api version", o.APIVersion, "namespace", o.Namespace) + r.EventRecorder.Event(o, models.Normal, models.Created, + "Cluster status check job is started") - if o.Spec.UserRefs != nil { - err = r.startUsersCreationJob(o) + err = r.startClusterBackupsJob(o) if err != nil { - logger.Error(err, "Failed to start user creation job") + logger.Error(err, "Cannot start OpenSearch cluster backups check job", + "cluster ID", o.Status.ID) + r.EventRecorder.Eventf(o, models.Warning, models.CreationFailed, - "User creation job is failed. Reason: %v", err, - ) + "Cluster backups check job is failed. Reason: %v", err) + return models.ReconcileRequeue } r.EventRecorder.Event(o, models.Normal, models.Created, - "Cluster user creation job is started") + "Cluster backups check job is started") + + if o.Spec.UserRefs != nil { + err = r.startUsersCreationJob(o) + if err != nil { + logger.Error(err, "Failed to start user creation job") + r.EventRecorder.Eventf(o, models.Warning, models.CreationFailed, + "User creation job is failed. Reason: %v", err, + ) + return models.ReconcileRequeue + } + + r.EventRecorder.Event(o, models.Normal, models.Created, + "Cluster user creation job is started") + } } return models.ExitReconcile @@ -597,45 +600,7 @@ func (r *OpenSearchReconciler) newWatchStatusJob(o *v1beta1.OpenSearch) schedule iData, err := r.API.GetOpenSearch(o.Status.ID) if err != nil { if errors.Is(err, instaclustr.NotFound) { - activeClusters, err := r.API.ListClusters() - if err != nil { - l.Error(err, "Cannot list account active clusters") - return err - } - - if !isClusterActive(o.Status.ID, activeClusters) { - l.Info("Cluster is not found in Instaclustr. Deleting resource.", - "cluster ID", o.Status.ClusterStatus.ID, - "cluster name", o.Spec.Name, - ) - - patch := o.NewPatch() - o.Annotations[models.ClusterDeletionAnnotation] = "" - o.Annotations[models.ResourceStateAnnotation] = models.DeletingEvent - err = r.Patch(context.TODO(), o, patch) - if err != nil { - l.Error(err, "Cannot patch OpenSearch cluster resource", - "cluster ID", o.Status.ID, - "cluster name", o.Spec.Name, - "resource name", o.Name, - ) - - return err - } - - err = r.Delete(context.TODO(), o) - if err != nil { - l.Error(err, "Cannot delete OpenSearch cluster resource", - "cluster ID", o.Status.ID, - "cluster name", o.Spec.Name, - "resource name", o.Name, - ) - - return err - } - - return nil - } + return r.handleExternalDelete(context.Background(), o) } l.Error(err, "Cannot get OpenSearch cluster from the Instaclustr API", @@ -1248,3 +1213,23 @@ func (r *OpenSearchReconciler) reconcileMaintenanceEvents(ctx context.Context, o return nil } + +func (r *OpenSearchReconciler) handleExternalDelete(ctx context.Context, o *v1beta1.OpenSearch) error { + l := log.FromContext(ctx) + + patch := o.NewPatch() + o.Status.State = models.DeletedStatus + err := r.Status().Patch(ctx, o, patch) + if err != nil { + return err + } + + l.Info(instaclustr.MsgInstaclustrResourceNotFound) + r.EventRecorder.Eventf(o, models.Warning, models.ExternalDeleted, instaclustr.MsgInstaclustrResourceNotFound) + + r.Scheduler.RemoveJob(o.GetJobID(scheduler.BackupsChecker)) + r.Scheduler.RemoveJob(o.GetJobID(scheduler.UserCreator)) + r.Scheduler.RemoveJob(o.GetJobID(scheduler.StatusChecker)) + + return nil +} diff --git a/controllers/clusters/opensearch_controller_test.go b/controllers/clusters/opensearch_controller_test.go index 6d5ee1c09..7b8b84b72 100644 --- a/controllers/clusters/opensearch_controller_test.go +++ b/controllers/clusters/opensearch_controller_test.go @@ -28,7 +28,9 @@ import ( "github.com/instaclustr/operator/apis/clusters/v1beta1" "github.com/instaclustr/operator/controllers/tests" + "github.com/instaclustr/operator/pkg/instaclustr" openapi "github.com/instaclustr/operator/pkg/instaclustr/mock/server/go" + "github.com/instaclustr/operator/pkg/models" ) const newOpenSearchNodeSize = "SRH-DEV-t4g.small-30" @@ -102,4 +104,51 @@ var _ = Describe("OpenSearch Controller", func() { }, timeout, interval).Should(BeTrue()) }) }) + + When("Deleting the Kafka Connect resource by avoiding operator", func() { + It("should try to get the cluster details and receive StatusNotFound", func() { + openSearchManifest2 := openSearchManifest.DeepCopy() + openSearchManifest2.Name += "-test-external-delete" + openSearchManifest2.ResourceVersion = "" + + openSearch2 := v1beta1.OpenSearch{} + openSearch2NamespacedName := types.NamespacedName{ + Namespace: openSearchManifest2.Namespace, + Name: openSearchManifest2.Name, + } + + Expect(k8sClient.Create(ctx, openSearchManifest2)).Should(Succeed()) + + doneCh := tests.NewChannelWithTimeout(timeout) + + Eventually(func() bool { + if err := k8sClient.Get(ctx, openSearch2NamespacedName, &openSearch2); err != nil { + return false + } + + if openSearch2.Status.State != models.RunningStatus { + return false + } + + doneCh <- struct{}{} + + return true + }, timeout, interval).Should(BeTrue()) + + <-doneCh + + By("testing by deleting the cluster by Instaclutr API client") + Expect(instaClient.DeleteCluster(openSearch2.Status.ID, instaclustr.OpenSearchEndpoint)).Should(Succeed()) + + Eventually(func() bool { + err := k8sClient.Get(ctx, openSearch2NamespacedName, &openSearch2) + if err != nil { + return false + } + + return openSearch2.Status.State == models.DeletedStatus + }, timeout, interval).Should(BeTrue()) + }) + }) + }) diff --git a/controllers/clusters/postgresql_controller.go b/controllers/clusters/postgresql_controller.go index 406f69f2d..2fe50fe47 100644 --- a/controllers/clusters/postgresql_controller.go +++ b/controllers/clusters/postgresql_controller.go @@ -227,43 +227,58 @@ func (r *PostgreSQLReconciler) handleCreateCluster( return models.ReconcileRequeue } - err = r.startClusterStatusJob(pg) - if err != nil { - logger.Error(err, "Cannot start PostgreSQL cluster status check job", - "cluster ID", pg.Status.ID, - ) + if pg.Status.State != models.DeletedStatus { + err = r.startClusterStatusJob(pg) + if err != nil { + logger.Error(err, "Cannot start PostgreSQL cluster status check job", + "cluster ID", pg.Status.ID, + ) + + r.EventRecorder.Eventf( + pg, models.Warning, models.CreationFailed, + "Cluster status check job is failed. Reason: %v", + err, + ) + return models.ReconcileRequeue + } r.EventRecorder.Eventf( - pg, models.Warning, models.CreationFailed, - "Cluster status check job is failed. Reason: %v", - err, + pg, models.Normal, models.Created, + "Cluster status check job is started", ) - return models.ReconcileRequeue - } - r.EventRecorder.Eventf( - pg, models.Normal, models.Created, - "Cluster status check job is started", - ) + err = r.startClusterBackupsJob(pg) + if err != nil { + logger.Error(err, "Cannot start PostgreSQL cluster backups check job", + "cluster ID", pg.Status.ID, + ) - err = r.startClusterBackupsJob(pg) - if err != nil { - logger.Error(err, "Cannot start PostgreSQL cluster backups check job", - "cluster ID", pg.Status.ID, - ) + r.EventRecorder.Eventf( + pg, models.Warning, models.CreationFailed, + "Cluster backups check job is failed. Reason: %v", + err, + ) + return models.ReconcileRequeue + } r.EventRecorder.Eventf( - pg, models.Warning, models.CreationFailed, - "Cluster backups check job is failed. Reason: %v", - err, + pg, models.Normal, models.Created, + "Cluster backups check job is started", ) - return models.ReconcileRequeue - } - r.EventRecorder.Eventf( - pg, models.Normal, models.Created, - "Cluster backups check job is started", - ) + if pg.Spec.UserRefs != nil { + err = r.startUsersCreationJob(pg) + if err != nil { + logger.Error(err, "Failed to start user PostreSQL creation job") + r.EventRecorder.Eventf(pg, models.Warning, models.CreationFailed, + "User creation job is failed. Reason: %v", err) + return models.ReconcileRequeue + } + + r.EventRecorder.Event(pg, models.Normal, models.Created, + "Cluster user creation job is started") + } + } err = r.createDefaultPassword(ctx, pg, logger) if err != nil { @@ -281,19 +296,6 @@ func (r *PostgreSQLReconciler) handleCreateCluster( return models.ReconcileRequeue } - if pg.Spec.UserRefs != nil { - err = r.startUsersCreationJob(pg) - if err != nil { - logger.Error(err, "Failed to start user PostreSQL creation job") - r.EventRecorder.Eventf(pg, models.Warning, models.CreationFailed, - "User creation job is failed. Reason: %v", err) - return models.ReconcileRequeue - } - - r.EventRecorder.Event(pg, models.Normal, models.Created, - "Cluster user creation job is started") - } - return models.ExitReconcile } @@ -964,45 +966,7 @@ func (r *PostgreSQLReconciler) newWatchStatusJob(pg *v1beta1.PostgreSQL) schedul instPGData, err := r.API.GetPostgreSQL(pg.Status.ID) if err != nil { if errors.Is(err, instaclustr.NotFound) { - activeClusters, err := r.API.ListClusters() - if err != nil { - l.Error(err, "Cannot list account active clusters") - return err - } - - if !isClusterActive(pg.Status.ID, activeClusters) { - l.Info("Cluster is not found in Instaclustr. Deleting resource.", - "cluster ID", pg.Status.ClusterStatus.ID, - "cluster name", pg.Spec.Name, - ) - - patch := pg.NewPatch() - pg.Annotations[models.ClusterDeletionAnnotation] = "" - pg.Annotations[models.ResourceStateAnnotation] = models.DeletingEvent - err = r.Patch(context.TODO(), pg, patch) - if err != nil { - l.Error(err, "Cannot patch PostgreSQL cluster resource", - "cluster ID", pg.Status.ID, - "cluster name", pg.Spec.Name, - "resource name", pg.Name, - ) - - return err - } - - err = r.Delete(context.TODO(), pg) - if err != nil { - l.Error(err, "Cannot delete PostgreSQL cluster resource", - "cluster ID", pg.Status.ID, - "cluster name", pg.Spec.Name, - "resource name", pg.Name, - ) - - return err - } - - return nil - } + return r.handleExternalDelete(context.Background(), pg) } l.Error(err, "Cannot get PostgreSQL cluster status", @@ -1637,3 +1601,23 @@ func (r *PostgreSQLReconciler) reconcileMaintenanceEvents(ctx context.Context, p return nil } + +func (r *PostgreSQLReconciler) handleExternalDelete(ctx context.Context, pg *v1beta1.PostgreSQL) error { + l := log.FromContext(ctx) + + patch := pg.NewPatch() + pg.Status.State = models.DeletedStatus + err := r.Status().Patch(ctx, pg, patch) + if err != nil { + return err + } + + l.Info(instaclustr.MsgInstaclustrResourceNotFound) + r.EventRecorder.Eventf(pg, models.Warning, models.ExternalDeleted, instaclustr.MsgInstaclustrResourceNotFound) + + r.Scheduler.RemoveJob(pg.GetJobID(scheduler.BackupsChecker)) + r.Scheduler.RemoveJob(pg.GetJobID(scheduler.UserCreator)) + r.Scheduler.RemoveJob(pg.GetJobID(scheduler.StatusChecker)) + + return nil +} diff --git a/controllers/clusters/postgresql_controller_test.go b/controllers/clusters/postgresql_controller_test.go index 5501f141a..8aa7c84ee 100644 --- a/controllers/clusters/postgresql_controller_test.go +++ b/controllers/clusters/postgresql_controller_test.go @@ -28,7 +28,9 @@ import ( "github.com/instaclustr/operator/apis/clusters/v1beta1" "github.com/instaclustr/operator/controllers/tests" + "github.com/instaclustr/operator/pkg/instaclustr" openapi "github.com/instaclustr/operator/pkg/instaclustr/mock/server/go" + "github.com/instaclustr/operator/pkg/models" ) const newPostgreSQLNodeSize = "PGS-DEV-t4g.medium-30" @@ -104,4 +106,51 @@ var _ = Describe("PostgreSQL Controller", func() { }, timeout, interval).Should(BeTrue()) }) }) + + When("Deleting the Kafka Connect resource by avoiding operator", func() { + It("should try to get the cluster details and receive StatusNotFound", func() { + postgresqlManifest2 := postgresqlManifest.DeepCopy() + postgresqlManifest2.Name += "-test-external-delete" + postgresqlManifest2.ResourceVersion = "" + + pg2 := v1beta1.PostgreSQL{} + pg2NamespacedName := types.NamespacedName{ + Namespace: postgresqlManifest2.Namespace, + Name: postgresqlManifest2.Name, + } + + Expect(k8sClient.Create(ctx, postgresqlManifest2)).Should(Succeed()) + + doneCh := tests.NewChannelWithTimeout(timeout) + + Eventually(func() bool { + if err := k8sClient.Get(ctx, pg2NamespacedName, &pg2); err != nil { + return false + } + + if pg2.Status.State != models.RunningStatus { + return false + } + + doneCh <- struct{}{} + + return true + }, timeout, interval).Should(BeTrue()) + + <-doneCh + + By("testing by deleting the cluster by Instaclutr API client") + Expect(instaClient.DeleteCluster(pg2.Status.ID, instaclustr.PGSQLEndpoint)).Should(Succeed()) + + Eventually(func() bool { + err := k8sClient.Get(ctx, pg2NamespacedName, &pg2) + if err != nil { + return false + } + + return pg2.Status.State == models.DeletedStatus + }, timeout, interval).Should(BeTrue()) + }) + }) + }) diff --git a/controllers/clusters/redis_controller.go b/controllers/clusters/redis_controller.go index b8101bd3b..79bc0b4c2 100644 --- a/controllers/clusters/redis_controller.go +++ b/controllers/clusters/redis_controller.go @@ -208,43 +208,60 @@ func (r *RedisReconciler) handleCreateCluster( return models.ReconcileRequeue } - err = r.startClusterStatusJob(redis) - if err != nil { - logger.Error(err, "Cannot start cluster status job", - "redis cluster ID", redis.Status.ID, - ) + if redis.Status.State != models.DeletedStatus { + err = r.startClusterStatusJob(redis) + if err != nil { + logger.Error(err, "Cannot start cluster status job", + "redis cluster ID", redis.Status.ID, + ) + + r.EventRecorder.Eventf( + redis, models.Warning, models.CreationFailed, + "Cluster status job creation is failed. Reason: %v", + err, + ) + return models.ReconcileRequeue + } r.EventRecorder.Eventf( - redis, models.Warning, models.CreationFailed, - "Cluster status job creation is failed. Reason: %v", - err, + redis, models.Normal, models.Created, + "Cluster status check job is started", ) - return models.ReconcileRequeue - } - r.EventRecorder.Eventf( - redis, models.Normal, models.Created, - "Cluster status check job is started", - ) + err = r.startClusterBackupsJob(redis) + if err != nil { + logger.Error(err, "Cannot start Redis cluster backups check job", + "cluster ID", redis.Status.ID, + ) - err = r.startClusterBackupsJob(redis) - if err != nil { - logger.Error(err, "Cannot start Redis cluster backups check job", - "cluster ID", redis.Status.ID, - ) + r.EventRecorder.Eventf( + redis, models.Warning, models.CreationFailed, + "Cluster backups job creation is failed. Reason: %v", + err, + ) + return models.ReconcileRequeue + } r.EventRecorder.Eventf( - redis, models.Warning, models.CreationFailed, - "Cluster backups job creation is failed. Reason: %v", - err, + redis, models.Normal, models.Created, + "Cluster backups check job is started", ) - return models.ReconcileRequeue - } - r.EventRecorder.Eventf( - redis, models.Normal, models.Created, - "Cluster backups check job is started", - ) + if redis.Spec.UserRefs != nil { + err = r.startUsersCreationJob(redis) + + if err != nil { + logger.Error(err, "Failed to start user creation job") + r.EventRecorder.Eventf(redis, models.Warning, models.CreationFailed, + "User creation job is failed. Reason: %v", err, + ) + return models.ReconcileRequeue + } + + r.EventRecorder.Event(redis, models.Normal, models.Created, + "Cluster user creation job is started") + } + } logger.Info( "Redis resource has been created", @@ -255,23 +272,6 @@ func (r *RedisReconciler) handleCreateCluster( "namespace", redis.Namespace, ) - // Adding users is allowed when the cluster is running. - - if redis.Spec.UserRefs != nil { - err = r.startUsersCreationJob(redis) - - if err != nil { - logger.Error(err, "Failed to start user creation job") - r.EventRecorder.Eventf(redis, models.Warning, models.CreationFailed, - "User creation job is failed. Reason: %v", err, - ) - return models.ReconcileRequeue - } - - r.EventRecorder.Event(redis, models.Normal, models.Created, - "Cluster user creation job is started") - } - return models.ExitReconcile } @@ -896,45 +896,7 @@ func (r *RedisReconciler) newWatchStatusJob(redis *v1beta1.Redis) scheduler.Job iData, err := r.API.GetRedis(redis.Status.ID) if err != nil { if errors.Is(err, instaclustr.NotFound) { - activeClusters, err := r.API.ListClusters() - if err != nil { - l.Error(err, "Cannot list account active clusters") - return err - } - - if !isClusterActive(redis.Status.ID, activeClusters) { - l.Info("Cluster is not found in Instaclustr. Deleting resource.", - "cluster ID", redis.Status.ClusterStatus.ID, - "cluster name", redis.Spec.Name, - ) - - patch := redis.NewPatch() - redis.Annotations[models.ClusterDeletionAnnotation] = "" - redis.Annotations[models.ResourceStateAnnotation] = models.DeletingEvent - err = r.Patch(context.TODO(), redis, patch) - if err != nil { - l.Error(err, "Cannot patch Redis cluster resource", - "cluster ID", redis.Status.ID, - "cluster name", redis.Spec.Name, - "resource name", redis.Name, - ) - - return err - } - - err = r.Delete(context.TODO(), redis) - if err != nil { - l.Error(err, "Cannot delete Redis cluster resource", - "cluster ID", redis.Status.ID, - "cluster name", redis.Spec.Name, - "resource name", redis.Name, - ) - - return err - } - - return nil - } + return r.handleExternalDelete(context.Background(), redis) } l.Error(err, "Cannot get Redis cluster status from Instaclustr", @@ -1284,3 +1246,23 @@ func (r *RedisReconciler) reconcileMaintenanceEvents(ctx context.Context, redis return nil } + +func (r *RedisReconciler) handleExternalDelete(ctx context.Context, redis *v1beta1.Redis) error { + l := log.FromContext(ctx) + + patch := redis.NewPatch() + redis.Status.State = models.DeletedStatus + err := r.Status().Patch(ctx, redis, patch) + if err != nil { + return err + } + + l.Info(instaclustr.MsgInstaclustrResourceNotFound) + r.EventRecorder.Eventf(redis, models.Warning, models.ExternalDeleted, instaclustr.MsgInstaclustrResourceNotFound) + + r.Scheduler.RemoveJob(redis.GetJobID(scheduler.BackupsChecker)) + r.Scheduler.RemoveJob(redis.GetJobID(scheduler.UserCreator)) + r.Scheduler.RemoveJob(redis.GetJobID(scheduler.StatusChecker)) + + return nil +} diff --git a/controllers/clusters/redis_controller_test.go b/controllers/clusters/redis_controller_test.go index 7a4bfac32..397c2bbb3 100644 --- a/controllers/clusters/redis_controller_test.go +++ b/controllers/clusters/redis_controller_test.go @@ -28,7 +28,9 @@ import ( "github.com/instaclustr/operator/apis/clusters/v1beta1" "github.com/instaclustr/operator/controllers/tests" + "github.com/instaclustr/operator/pkg/instaclustr" openapi "github.com/instaclustr/operator/pkg/instaclustr/mock/server/go" + "github.com/instaclustr/operator/pkg/models" ) const newRedisNodeSize = "RDS-DEV-t4g.medium-80" @@ -103,4 +105,51 @@ var _ = Describe("Redis Controller", func() { }, timeout, interval).Should(BeTrue()) }) }) + + When("Deleting the Kafka Connect resource by avoiding operator", func() { + It("should try to get the cluster details and receive StatusNotFound", func() { + redisManifest2 := redisManifest.DeepCopy() + redisManifest2.Name += "-test-external-delete" + redisManifest2.ResourceVersion = "" + + redis2 := v1beta1.Redis{} + redis2NamespacedName := types.NamespacedName{ + Namespace: redisManifest2.Namespace, + Name: redisManifest2.Name, + } + + Expect(k8sClient.Create(ctx, redisManifest2)).Should(Succeed()) + + doneCh := tests.NewChannelWithTimeout(timeout) + + Eventually(func() bool { + if err := k8sClient.Get(ctx, redis2NamespacedName, &redis2); err != nil { + return false + } + + if redis2.Status.State != models.RunningStatus { + return false + } + + doneCh <- struct{}{} + + return true + }, timeout, interval).Should(BeTrue()) + + <-doneCh + + By("testing by deleting the cluster by Instaclustr API client") + Expect(instaClient.DeleteCluster(redis2.Status.ID, instaclustr.RedisEndpoint)).Should(Succeed()) + + Eventually(func() bool { + err := k8sClient.Get(ctx, redis2NamespacedName, &redis2) + if err != nil { + return false + } + + return redis2.Status.State == models.DeletedStatus + }, timeout, interval).Should(BeTrue()) + }) + }) + }) diff --git a/controllers/clusters/suite_test.go b/controllers/clusters/suite_test.go index c62f5cd55..b87d78a35 100644 --- a/controllers/clusters/suite_test.go +++ b/controllers/clusters/suite_test.go @@ -53,6 +53,8 @@ var ( timeout = time.Millisecond * 1000 interval = time.Millisecond * 10 + + instaClient = instaclustr.NewClient("test", "test", "http://localhost:8082", time.Second*10) ) func TestAPIs(t *testing.T) { @@ -77,8 +79,6 @@ var _ = BeforeSuite(func() { Expect(err).NotTo(HaveOccurred()) Expect(cfg).NotTo(BeNil()) - instaClient := instaclustr.NewClient("test", "test", "http://localhost:8082", time.Second*10) - err = v1beta1.AddToScheme(scheme.Scheme) Expect(err).NotTo(HaveOccurred()) diff --git a/controllers/clusters/zookeeper_controller.go b/controllers/clusters/zookeeper_controller.go index b832054ca..16c002009 100644 --- a/controllers/clusters/zookeeper_controller.go +++ b/controllers/clusters/zookeeper_controller.go @@ -153,40 +153,42 @@ func (r *ZookeeperReconciler) handleCreateCluster( ) return models.ReconcileRequeue } - } - err = r.startClusterStatusJob(zook) - if err != nil { - l.Error(err, "Cannot start cluster status job", - "zookeeper cluster ID", zook.Status.ID) - r.EventRecorder.Eventf( - zook, models.Warning, models.CreationFailed, - "Cluster status check job creation is failed. Reason: %v", - err, - ) - return models.ReconcileRequeue - } + l.Info("Zookeeper cluster has been created", "cluster ID", zook.Status.ID) - r.EventRecorder.Eventf( - zook, models.Normal, models.Created, - "Cluster status check job is started", - ) + err = r.createDefaultSecret(ctx, zook, l) + if err != nil { + l.Error(err, "Cannot create default secret for Zookeeper cluster", + "cluster name", zook.Spec.Name, + "clusterID", zook.Status.ID, + ) + r.EventRecorder.Eventf( + zook, models.Warning, models.CreationFailed, + "Default user secret creation on the Instaclustr is failed. Reason: %v", + err, + ) - l.Info("Zookeeper cluster has been created", "cluster ID", zook.Status.ID) + return models.ReconcileRequeue + } + } + + if zook.Status.State != models.DeletedStatus { + err = r.startClusterStatusJob(zook) + if err != nil { + l.Error(err, "Cannot start cluster status job", + "zookeeper cluster ID", zook.Status.ID) + r.EventRecorder.Eventf( + zook, models.Warning, models.CreationFailed, + "Cluster status check job creation is failed. Reason: %v", + err, + ) + return models.ReconcileRequeue + } - err = r.createDefaultSecret(ctx, zook, l) - if err != nil { - l.Error(err, "Cannot create default secret for Zookeeper cluster", - "cluster name", zook.Spec.Name, - "clusterID", zook.Status.ID, - ) r.EventRecorder.Eventf( - zook, models.Warning, models.CreationFailed, - "Default user secret creation on the Instaclustr is failed. Reason: %v", - err, + zook, models.Normal, models.Created, + "Cluster status check job is started", ) - - return models.ReconcileRequeue } return models.ExitReconcile @@ -422,7 +424,7 @@ func (r *ZookeeperReconciler) newWatchStatusJob(zook *v1beta1.Zookeeper) schedul iData, err := r.API.GetZookeeper(zook.Status.ID) if err != nil { if errors.Is(err, instaclustr.NotFound) { - return r.handleDeleteFromInstaclustrUI(zook, l) + return r.handleExternalDelete(context.Background(), zook) } l.Error(err, "Cannot get Zookeeper cluster status from Instaclustr", @@ -507,50 +509,21 @@ func (r *ZookeeperReconciler) newWatchStatusJob(zook *v1beta1.Zookeeper) schedul } } -func (r *ZookeeperReconciler) handleDeleteFromInstaclustrUI(zook *v1beta1.Zookeeper, l logr.Logger) error { - activeClusters, err := r.API.ListClusters() - if err != nil { - l.Error(err, "Cannot list account active clusters") - return err - } - - if isClusterActive(zook.Status.ID, activeClusters) { - l.Info("Zookeeper is not found in the Instaclustr but still exist in the Instaclustr list of active cluster", - "cluster ID", zook.Status.ID, - "cluster name", zook.Spec.Name, - "resource name", zook.Name) - - return nil - } - - l.Info("Cluster is not found in Instaclustr. Deleting resource.", - "cluster ID", zook.Status.ID, - "cluster name", zook.Spec.Name) +func (r *ZookeeperReconciler) handleExternalDelete(ctx context.Context, zook *v1beta1.Zookeeper) error { + l := log.FromContext(ctx) patch := zook.NewPatch() - zook.Annotations[models.ClusterDeletionAnnotation] = "" - zook.Annotations[models.ResourceStateAnnotation] = models.DeletingEvent - err = r.Patch(context.TODO(), zook, patch) + zook.Status.State = models.DeletedStatus + err := r.Status().Patch(ctx, zook, patch) if err != nil { - l.Error(err, "Cannot patch Zookeeper cluster resource", - "cluster ID", zook.Status.ID, - "cluster name", zook.Spec.Name, - "resource name", zook.Name, - ) - return err } - err = r.Delete(context.TODO(), zook) - if err != nil { - l.Error(err, "Cannot delete Zookeeper cluster resource", - "cluster ID", zook.Status.ID, - "cluster name", zook.Spec.Name, - "resource name", zook.Name, - ) + l.Info(instaclustr.MsgInstaclustrResourceNotFound) + r.EventRecorder.Eventf(zook, models.Warning, models.ExternalDeleted, instaclustr.MsgInstaclustrResourceNotFound) - return err - } + r.Scheduler.RemoveJob(zook.GetJobID(scheduler.BackupsChecker)) + r.Scheduler.RemoveJob(zook.GetJobID(scheduler.StatusChecker)) return nil } diff --git a/controllers/clusters/zookeeper_controller_test.go b/controllers/clusters/zookeeper_controller_test.go index b51e65490..24cf7554e 100644 --- a/controllers/clusters/zookeeper_controller_test.go +++ b/controllers/clusters/zookeeper_controller_test.go @@ -28,7 +28,9 @@ import ( "github.com/instaclustr/operator/apis/clusters/v1beta1" "github.com/instaclustr/operator/controllers/tests" + "github.com/instaclustr/operator/pkg/instaclustr" openapi "github.com/instaclustr/operator/pkg/instaclustr/mock/server/go" + "github.com/instaclustr/operator/pkg/models" ) const zookeeperNodeSizeFromYAML = "zookeeper-developer-t3.small-20" @@ -62,6 +64,22 @@ var _ = Describe("Zookeeper Controller", func() { }).Should(BeTrue()) <-done + + By("creating secret with the default user credentials") + secret := zookeeper.NewDefaultUserSecret("", "") + secretNamespacedName := types.NamespacedName{ + Namespace: secret.Namespace, + Name: secret.Name, + } + + Eventually(func() bool { + if err := k8sClient.Get(ctx, secretNamespacedName, secret); err != nil { + return false + } + + return string(secret.Data[models.Username]) == zookeeper.Status.ID+"_username" && + string(secret.Data[models.Password]) == zookeeper.Status.ID+"_password" + }).Should(BeTrue()) }) When("zookeeper is created, the status job get new data from the InstAPI.", func() { @@ -98,4 +116,51 @@ var _ = Describe("Zookeeper Controller", func() { }) }) }) + + When("Deleting the Kafka Connect resource by avoiding operator", func() { + It("should try to get the cluster details and receive StatusNotFound", func() { + zookeeperManifest2 := zookeeperManifest.DeepCopy() + zookeeperManifest2.Name += "-test-external-delete" + zookeeperManifest2.ResourceVersion = "" + + zookeeper2 := v1beta1.Zookeeper{} + kafkaConnect2NamespacedName := types.NamespacedName{ + Namespace: zookeeperManifest2.Namespace, + Name: zookeeperManifest2.Name, + } + + Expect(k8sClient.Create(ctx, zookeeperManifest2)).Should(Succeed()) + + doneCh := tests.NewChannelWithTimeout(timeout) + + Eventually(func() bool { + if err := k8sClient.Get(ctx, kafkaConnect2NamespacedName, &zookeeper2); err != nil { + return false + } + + if zookeeper2.Status.State != models.RunningStatus { + return false + } + + doneCh <- struct{}{} + + return true + }, timeout, interval).Should(BeTrue()) + + <-doneCh + + By("testing by deleting the cluster by Instaclustr API client") + Expect(instaClient.DeleteCluster(zookeeper2.Status.ID, instaclustr.ZookeeperEndpoint)).Should(Succeed()) + + Eventually(func() bool { + err := k8sClient.Get(ctx, kafkaConnect2NamespacedName, &zookeeper2) + if err != nil { + return false + } + + return zookeeper2.Status.State == models.DeletedStatus + }, timeout, interval).Should(BeTrue()) + }) + }) + }) diff --git a/pkg/instaclustr/errors.go b/pkg/instaclustr/errors.go index bcb895eda..96d9ffebe 100644 --- a/pkg/instaclustr/errors.go +++ b/pkg/instaclustr/errors.go @@ -25,3 +25,4 @@ var ( ) const MsgDeleteUser = "If you want to delete the user, remove it from the clusters specifications first" +const MsgInstaclustrResourceNotFound = "The resource is not found on Instaclustr, changing its state to deleted" diff --git a/pkg/instaclustr/mock/server/go/api.go b/pkg/instaclustr/mock/server/go/api.go index fd084d4ae..c2e9d0264 100644 --- a/pkg/instaclustr/mock/server/go/api.go +++ b/pkg/instaclustr/mock/server/go/api.go @@ -548,6 +548,7 @@ type AzureVnetPeerV2APIServicer interface { type BundleUserAPIServicer interface { CreateUser(context.Context, string, string, BundleUserCreateRequest) (ImplResponse, error) DeleteUser(context.Context, string, string, BundleUserDeleteRequest) (ImplResponse, error) + GetDefaultCreds(ctx context.Context, clusterID string) (ImplResponse, error) } // CadenceProvisioningV2APIServicer defines the api actions for the CadenceProvisioningV2API service diff --git a/pkg/instaclustr/mock/server/go/api_apache_cassandra_provisioning_v2_service.go b/pkg/instaclustr/mock/server/go/api_apache_cassandra_provisioning_v2_service.go index e3c60b26f..522880524 100644 --- a/pkg/instaclustr/mock/server/go/api_apache_cassandra_provisioning_v2_service.go +++ b/pkg/instaclustr/mock/server/go/api_apache_cassandra_provisioning_v2_service.go @@ -19,12 +19,12 @@ import ( // This service should implement the business logic for every endpoint for the ApacheCassandraProvisioningV2API API. // Include any external packages or services that will be required by this service. type ApacheCassandraProvisioningV2APIService struct { - MockCassandraClusters []*CassandraClusterV2 + clusters map[string]*CassandraClusterV2 } // NewApacheCassandraProvisioningV2APIService creates a default api service func NewApacheCassandraProvisioningV2APIService() ApacheCassandraProvisioningV2APIServicer { - return &ApacheCassandraProvisioningV2APIService{} + return &ApacheCassandraProvisioningV2APIService{clusters: map[string]*CassandraClusterV2{}} } // ClusterManagementV2DataSourcesApplicationsCassandraClustersV2ClusterIdListBackupsV2Get - List recent cluster backup events. @@ -64,12 +64,12 @@ func (s *ApacheCassandraProvisioningV2APIService) ClusterManagementV2OperationsA func (s *ApacheCassandraProvisioningV2APIService) ClusterManagementV2ResourcesApplicationsCassandraClustersV2ClusterIdDelete(ctx context.Context, clusterId string) (ImplResponse, error) { // TODO - update ClusterManagementV2ResourcesApplicationsCassandraClustersV2ClusterIdDelete with the required logic for this service method. // Add api_apache_cassandra_provisioning_v2_service.go to the .openapi-generator-ignore to avoid overwriting this service implementation when updating open api generation. - c := s.getCluster(clusterId) - if c == nil { + _, exists := s.clusters[clusterId] + if !exists { return Response(http.StatusNotFound, nil), nil } - c = nil + delete(s.clusters, clusterId) return Response(http.StatusNoContent, nil), nil } @@ -79,8 +79,8 @@ func (s *ApacheCassandraProvisioningV2APIService) ClusterManagementV2ResourcesAp // TODO - update ClusterManagementV2ResourcesApplicationsCassandraClustersV2ClusterIdGet with the required logic for this service method. // Add api_apache_cassandra_provisioning_v2_service.go to the .openapi-generator-ignore to avoid overwriting this service implementation when updating open api generation. - c := s.getCluster(clusterId) - if c == nil { + c, exists := s.clusters[clusterId] + if !exists { return Response(http.StatusNotFound, nil), nil } @@ -94,9 +94,9 @@ func (s *ApacheCassandraProvisioningV2APIService) ClusterManagementV2ResourcesAp // TODO - update ClusterManagementV2ResourcesApplicationsCassandraClustersV2ClusterIdPut with the required logic for this service method. // Add api_apache_cassandra_provisioning_v2_service.go to the .openapi-generator-ignore to avoid overwriting this service implementation when updating open api generation. - c := s.getCluster(clusterId) - if c == nil { - return Response(404, nil), nil + c, exists := s.clusters[clusterId] + if !exists { + return Response(http.StatusNotFound, nil), nil } newNode := []NodeDetailsV2{{ @@ -121,17 +121,7 @@ func (s *ApacheCassandraProvisioningV2APIService) ClusterManagementV2ResourcesAp newCassandra.Id = cassandraClusterV2.Name + CreatedID newCassandra.DataCentres[0].Nodes = []NodeDetailsV2{{NodeSize: cassandraClusterV2.DataCentres[0].NodeSize}} - s.MockCassandraClusters = append(s.MockCassandraClusters, newCassandra) + s.clusters[newCassandra.Id] = newCassandra return Response(http.StatusAccepted, newCassandra), nil } - -func (s *ApacheCassandraProvisioningV2APIService) getCluster(clusterID string) *CassandraClusterV2 { - for _, c := range s.MockCassandraClusters { - if c.Id == clusterID { - return c - } - } - - return nil -} diff --git a/pkg/instaclustr/mock/server/go/api_apache_kafka_connect_provisioning_v2_service.go b/pkg/instaclustr/mock/server/go/api_apache_kafka_connect_provisioning_v2_service.go index 96f261b30..47b9b11c7 100644 --- a/pkg/instaclustr/mock/server/go/api_apache_kafka_connect_provisioning_v2_service.go +++ b/pkg/instaclustr/mock/server/go/api_apache_kafka_connect_provisioning_v2_service.go @@ -13,18 +13,20 @@ import ( "context" "errors" "net/http" + + "github.com/google/uuid" ) // ApacheKafkaConnectProvisioningV2APIService is a service that implements the logic for the ApacheKafkaConnectProvisioningV2APIServicer // This service should implement the business logic for every endpoint for the ApacheKafkaConnectProvisioningV2API API. // Include any external packages or services that will be required by this service. type ApacheKafkaConnectProvisioningV2APIService struct { - MockKafkaConnectCluster *KafkaConnectClusterV2 + clusters map[string]*KafkaConnectClusterV2 } // NewApacheKafkaConnectProvisioningV2APIService creates a default api service func NewApacheKafkaConnectProvisioningV2APIService() ApacheKafkaConnectProvisioningV2APIServicer { - return &ApacheKafkaConnectProvisioningV2APIService{} + return &ApacheKafkaConnectProvisioningV2APIService{clusters: map[string]*KafkaConnectClusterV2{}} } // ClusterManagementV2OperationsApplicationsKafkaConnectClustersV2ClusterIdSyncCustomKafkaConnectorsV2Put - Update the custom connectors of a Kafka Connect cluster. @@ -43,7 +45,7 @@ func (s *ApacheKafkaConnectProvisioningV2APIService) ClusterManagementV2Resource // TODO - update ClusterManagementV2ResourcesApplicationsKafkaConnectClustersV2ClusterIdDelete with the required logic for this service method. // Add api_apache_kafka_connect_provisioning_v2_service.go to the .openapi-generator-ignore to avoid overwriting this service implementation when updating open api generation. - s.MockKafkaConnectCluster = nil + delete(s.clusters, clusterId) return Response(204, nil), nil } @@ -52,13 +54,14 @@ func (s *ApacheKafkaConnectProvisioningV2APIService) ClusterManagementV2Resource // TODO - update ClusterManagementV2ResourcesApplicationsKafkaConnectClustersV2ClusterIdGet with the required logic for this service method. // Add api_apache_kafka_connect_provisioning_v2_service.go to the .openapi-generator-ignore to avoid overwriting this service implementation when updating open api generation. - if s.MockKafkaConnectCluster != nil { - s.MockKafkaConnectCluster.Status = RUNNING - } else { + cluster, exists := s.clusters[clusterId] + if !exists { return Response(404, nil), nil } - return Response(200, s.MockKafkaConnectCluster), nil + cluster.Status = RUNNING + + return Response(200, cluster), nil } // ClusterManagementV2ResourcesApplicationsKafkaConnectClustersV2ClusterIdPut - Update a Kafka connect cluster @@ -66,7 +69,12 @@ func (s *ApacheKafkaConnectProvisioningV2APIService) ClusterManagementV2Resource // TODO - update ClusterManagementV2ResourcesApplicationsKafkaConnectClustersV2ClusterIdPut with the required logic for this service method. // Add api_apache_kafka_connect_provisioning_v2_service.go to the .openapi-generator-ignore to avoid overwriting this service implementation when updating open api generation. - s.MockKafkaConnectCluster.DataCentres[0].NumberOfNodes = kafkaConnectClusterUpdateV2.DataCentres[0].NumberOfNodes + cluster, exists := s.clusters[clusterId] + if !exists { + return Response(404, nil), nil + } + + cluster.DataCentres[0].NumberOfNodes = kafkaConnectClusterUpdateV2.DataCentres[0].NumberOfNodes return Response(202, nil), nil @@ -79,7 +87,9 @@ func (s *ApacheKafkaConnectProvisioningV2APIService) ClusterManagementV2Resource // TODO - update ClusterManagementV2ResourcesApplicationsKafkaConnectClustersV2Post with the required logic for this service method. // Add api_apache_kafka_connect_provisioning_v2_service.go to the .openapi-generator-ignore to avoid overwriting this service implementation when updating open api generation. - s.MockKafkaConnectCluster = &kafkaConnectClusterV2 - s.MockKafkaConnectCluster.Id = CreatedID - return Response(202, s.MockKafkaConnectCluster), nil + id := uuid.New().String() + kafkaConnectClusterV2.Id = id + s.clusters[id] = &kafkaConnectClusterV2 + + return Response(202, kafkaConnectClusterV2), nil } diff --git a/pkg/instaclustr/mock/server/go/api_apache_kafka_provisioning_v2_service.go b/pkg/instaclustr/mock/server/go/api_apache_kafka_provisioning_v2_service.go index f114c585c..45c52d65a 100644 --- a/pkg/instaclustr/mock/server/go/api_apache_kafka_provisioning_v2_service.go +++ b/pkg/instaclustr/mock/server/go/api_apache_kafka_provisioning_v2_service.go @@ -11,18 +11,20 @@ package openapi import ( "context" + + "github.com/google/uuid" ) // ApacheKafkaProvisioningV2APIService is a service that implements the logic for the ApacheKafkaProvisioningV2APIServicer // This service should implement the business logic for every endpoint for the ApacheKafkaProvisioningV2API API. // Include any external packages or services that will be required by this service. type ApacheKafkaProvisioningV2APIService struct { - MockKafkaCluster *KafkaClusterV2 + clusters map[string]*KafkaClusterV2 } // NewApacheKafkaProvisioningV2APIService creates a default api service func NewApacheKafkaProvisioningV2APIService() ApacheKafkaProvisioningV2APIServicer { - return &ApacheKafkaProvisioningV2APIService{} + return &ApacheKafkaProvisioningV2APIService{clusters: map[string]*KafkaClusterV2{}} } // ClusterManagementV2ResourcesApplicationsKafkaClustersV2ClusterIdDelete - Delete cluster @@ -30,7 +32,7 @@ func (s *ApacheKafkaProvisioningV2APIService) ClusterManagementV2ResourcesApplic // TODO - update ClusterManagementV2ResourcesApplicationsKafkaClustersV2ClusterIdDelete with the required logic for this service method. // Add api_apache_kafka_provisioning_v2_service.go to the .openapi-generator-ignore to avoid overwriting this service implementation when updating open api generation. - s.MockKafkaCluster = nil + delete(s.clusters, clusterId) return Response(204, nil), nil } @@ -39,13 +41,14 @@ func (s *ApacheKafkaProvisioningV2APIService) ClusterManagementV2ResourcesApplic // TODO - update ClusterManagementV2ResourcesApplicationsKafkaClustersV2ClusterIdGet with the required logic for this service method. // Add api_apache_kafka_provisioning_v2_service.go to the .openapi-generator-ignore to avoid overwriting this service implementation when updating open api generation. - if s.MockKafkaCluster != nil { - s.MockKafkaCluster.Status = RUNNING - } else { + cluster, exists := s.clusters[clusterId] + if !exists { return Response(404, nil), nil } - return Response(200, s.MockKafkaCluster), nil + cluster.Status = RUNNING + + return Response(200, cluster), nil } // ClusterManagementV2ResourcesApplicationsKafkaClustersV2ClusterIdPut - Update Kafka Cluster Details @@ -53,6 +56,11 @@ func (s *ApacheKafkaProvisioningV2APIService) ClusterManagementV2ResourcesApplic // TODO - update ClusterManagementV2ResourcesApplicationsKafkaClustersV2ClusterIdPut with the required logic for this service method. // Add api_apache_kafka_provisioning_v2_service.go to the .openapi-generator-ignore to avoid overwriting this service implementation when updating open api generation. + cluster, exists := s.clusters[clusterId] + if !exists { + return Response(404, nil), nil + } + newNode := []NodeDetailsV2{{ NodeRoles: []NodeRolesV2{"KAFKA_BROKER", "KAFKA_ZOOKEEPER"}, Rack: "us-east-1a", @@ -60,7 +68,7 @@ func (s *ApacheKafkaProvisioningV2APIService) ClusterManagementV2ResourcesApplic PublicAddress: "54.146.160.89", }} - s.MockKafkaCluster.DataCentres[0].Nodes = newNode + cluster.DataCentres[0].Nodes = newNode return Response(202, nil), nil @@ -73,7 +81,9 @@ func (s *ApacheKafkaProvisioningV2APIService) ClusterManagementV2ResourcesApplic // TODO - update ClusterManagementV2ResourcesApplicationsKafkaClustersV2Post with the required logic for this service method. // Add api_apache_kafka_provisioning_v2_service.go to the .openapi-generator-ignore to avoid overwriting this service implementation when updating open api generation. - s.MockKafkaCluster = &kafkaClusterV2 - s.MockKafkaCluster.Id = CreatedID - return Response(202, s.MockKafkaCluster), nil + id := uuid.New().String() + kafkaClusterV2.Id = id + s.clusters[id] = &kafkaClusterV2 + + return Response(202, kafkaClusterV2), nil } diff --git a/pkg/instaclustr/mock/server/go/api_bundle_user.go b/pkg/instaclustr/mock/server/go/api_bundle_user.go index a2c4386e4..e731ea2c8 100644 --- a/pkg/instaclustr/mock/server/go/api_bundle_user.go +++ b/pkg/instaclustr/mock/server/go/api_bundle_user.go @@ -60,6 +60,11 @@ func (c *BundleUserAPIController) Routes() Routes { "/provisioning/v1/{clusterId}/{bundle}/users", c.DeleteUser, }, + "GetDefaultCreds": Route{ + strings.ToUpper("Get"), + "/provisioning/v1/{clusterId}", + c.GetDefaultCreds, + }, } } @@ -122,3 +127,17 @@ func (c *BundleUserAPIController) DeleteUser(w http.ResponseWriter, r *http.Requ // If no error, encode the body and the result code EncodeJSONResponse(result.Body, &result.Code, w) } + +func (c *BundleUserAPIController) GetDefaultCreds(w http.ResponseWriter, r *http.Request) { + params := mux.Vars(r) + + clusterID := params["clusterId"] + + result, err := c.service.GetDefaultCreds(r.Context(), clusterID) + if err != nil { + c.errorHandler(w, r, err, &result) + return + } + + EncodeJSONResponse(result.Body, &result.Code, w) +} diff --git a/pkg/instaclustr/mock/server/go/api_bundle_user_service.go b/pkg/instaclustr/mock/server/go/api_bundle_user_service.go index 26429e276..f80832994 100644 --- a/pkg/instaclustr/mock/server/go/api_bundle_user_service.go +++ b/pkg/instaclustr/mock/server/go/api_bundle_user_service.go @@ -11,6 +11,7 @@ package openapi import ( "context" + "net/http" ) // BundleUserAPIService is a service that implements the logic for the BundleUserAPIServicer @@ -69,3 +70,15 @@ func (s *BundleUserAPIService) DeleteUser(ctx context.Context, clusterId string, return Response(200, GenericResponse{}), nil } + +func (s *BundleUserAPIService) GetDefaultCreds(ctx context.Context, clusterID string) (ImplResponse, error) { + creds := &struct { + Username string `json:"username"` + InstaclustrUserPassword string `json:"instaclustrUserPassword"` + }{ + Username: clusterID + "_username", + InstaclustrUserPassword: clusterID + "_password", + } + + return Response(http.StatusAccepted, creds), nil +} diff --git a/pkg/instaclustr/mock/server/go/api_open_search_provisioning_v2_service.go b/pkg/instaclustr/mock/server/go/api_open_search_provisioning_v2_service.go index 06b2cd328..c8cfa315d 100644 --- a/pkg/instaclustr/mock/server/go/api_open_search_provisioning_v2_service.go +++ b/pkg/instaclustr/mock/server/go/api_open_search_provisioning_v2_service.go @@ -19,12 +19,12 @@ import ( // This service should implement the business logic for every endpoint for the OpenSearchProvisioningV2API API. // Include any external packages or services that will be required by this service. type OpenSearchProvisioningV2APIService struct { - MockOpenSearchCluster []*OpenSearchClusterV2 + clusters map[string]*OpenSearchClusterV2 } // NewOpenSearchProvisioningV2APIService creates a default api service func NewOpenSearchProvisioningV2APIService() OpenSearchProvisioningV2APIServicer { - return &OpenSearchProvisioningV2APIService{} + return &OpenSearchProvisioningV2APIService{clusters: map[string]*OpenSearchClusterV2{}} } // ClusterManagementV2DataSourcesApplicationsOpensearchClustersV2ClusterIdListBackupsV2Get - List recent cluster backup events. @@ -65,12 +65,13 @@ func (s *OpenSearchProvisioningV2APIService) ClusterManagementV2ResourcesApplica // TODO - update ClusterManagementV2ResourcesApplicationsOpensearchClustersV2ClusterIdDelete with the required logic for this service method. // Add api_open_search_provisioning_v2_service.go to the .openapi-generator-ignore to avoid overwriting this service implementation when updating open api generation. - o := s.getCluster(clusterId) - if o == nil { + _, exists := s.clusters[clusterId] + if !exists { return Response(http.StatusNotFound, nil), nil } - o = nil + delete(s.clusters, clusterId) + return Response(http.StatusNoContent, nil), nil } @@ -79,8 +80,8 @@ func (s *OpenSearchProvisioningV2APIService) ClusterManagementV2ResourcesApplica // TODO - update ClusterManagementV2ResourcesApplicationsOpensearchClustersV2ClusterIdGet with the required logic for this service method. // Add api_open_search_provisioning_v2_service.go to the .openapi-generator-ignore to avoid overwriting this service implementation when updating open api generation. - o := s.getCluster(clusterId) - if o == nil { + o, exists := s.clusters[clusterId] + if !exists { return Response(http.StatusNotFound, nil), nil } @@ -94,8 +95,8 @@ func (s *OpenSearchProvisioningV2APIService) ClusterManagementV2ResourcesApplica // TODO - update ClusterManagementV2ResourcesApplicationsOpensearchClustersV2ClusterIdPut with the required logic for this service method. // Add api_open_search_provisioning_v2_service.go to the .openapi-generator-ignore to avoid overwriting this service implementation when updating open api generation. - o := s.getCluster(clusterId) - if o == nil { + o, exists := s.clusters[clusterId] + if !exists { return Response(http.StatusNotFound, nil), nil } @@ -119,17 +120,7 @@ func (s *OpenSearchProvisioningV2APIService) ClusterManagementV2ResourcesApplica newOpenSearch = &openSearchClusterV2 newOpenSearch.Id = openSearchClusterV2.Name + CreatedID - s.MockOpenSearchCluster = append(s.MockOpenSearchCluster, newOpenSearch) + s.clusters[newOpenSearch.Id] = newOpenSearch return Response(202, newOpenSearch), nil } - -func (s *OpenSearchProvisioningV2APIService) getCluster(clusterID string) *OpenSearchClusterV2 { - for _, o := range s.MockOpenSearchCluster { - if o.Id == clusterID { - return o - } - } - - return nil -} diff --git a/pkg/models/apiv1.go b/pkg/models/apiv1.go index 0e94b3a7a..e5c8fef03 100644 --- a/pkg/models/apiv1.go +++ b/pkg/models/apiv1.go @@ -18,7 +18,6 @@ package models const ( RunningStatus = "RUNNING" - Disabled = "DISABLED" ) type ClusterProviderV1 struct { diff --git a/pkg/models/apiv2.go b/pkg/models/apiv2.go index 548e9b1a6..a2e11dfa1 100644 --- a/pkg/models/apiv2.go +++ b/pkg/models/apiv2.go @@ -20,6 +20,8 @@ const ( NoOperation = "NO_OPERATION" OperationInProgress = "OPERATION_IN_PROGRESS" + DeletedStatus = "DELETED" + DefaultAccountName = "INSTACLUSTR" AWSVPC = "AWS_VPC" diff --git a/pkg/models/operator.go b/pkg/models/operator.go index 6051c62c6..f30df8360 100644 --- a/pkg/models/operator.go +++ b/pkg/models/operator.go @@ -139,6 +139,7 @@ const ( DeletionStarted = "DeletionStarted" DeletionFailed = "DeletionFailed" Deleted = "Deleted" + ExternalDeleted = "ExternalDeleted" ) const (