diff --git a/controllers/clusterresources/awssecuritygroupfirewallrule_controller.go b/controllers/clusterresources/awssecuritygroupfirewallrule_controller.go index db1b40186..8d84b2bc6 100644 --- a/controllers/clusterresources/awssecuritygroupfirewallrule_controller.go +++ b/controllers/clusterresources/awssecuritygroupfirewallrule_controller.go @@ -287,8 +287,14 @@ func (r *AWSSecurityGroupFirewallRuleReconciler) startFirewallRuleStatusJob(fire func (r *AWSSecurityGroupFirewallRuleReconciler) newWatchStatusJob(firewallRule *v1beta1.AWSSecurityGroupFirewallRule) scheduler.Job { l := log.Log.WithValues("component", "FirewallRuleStatusJob") return func() error { + ctx := context.Background() instaFirewallRuleStatus, err := r.API.GetFirewallRuleStatus(firewallRule.Status.ID, instaclustr.AWSSecurityGroupFirewallRuleEndpoint) if err != nil { + if errors.Is(err, instaclustr.NotFound) { + l.Info("The resource has been deleted on Instaclustr, deleting resource in k8s...") + return r.Delete(ctx, firewallRule) + } + l.Error(err, "Cannot get AWS security group firewall rule status from Inst API", "firewall rule ID", firewallRule.Status.ID) return err } @@ -299,13 +305,13 @@ func (r *AWSSecurityGroupFirewallRuleReconciler) newWatchStatusJob(firewallRule "firewall rule status", firewallRule.Status) patch := firewallRule.NewPatch() firewallRule.Status.FirewallRuleStatus = *instaFirewallRuleStatus - err := r.Status().Patch(context.Background(), firewallRule, patch) + err := r.Status().Patch(ctx, firewallRule, patch) if err != nil { return err } if instaFirewallRuleStatus.Status == statusDELETED { - r.Scheduler.RemoveJob(firewallRule.GetJobID(scheduler.StatusChecker)) + go r.Scheduler.RemoveJob(firewallRule.GetJobID(scheduler.StatusChecker)) } } diff --git a/controllers/clusterresources/awsvpcpeering_controller.go b/controllers/clusterresources/awsvpcpeering_controller.go index 8e50f107e..c97051969 100644 --- a/controllers/clusterresources/awsvpcpeering_controller.go +++ b/controllers/clusterresources/awsvpcpeering_controller.go @@ -24,6 +24,7 @@ import ( k8serrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/runtime" "k8s.io/client-go/tools/record" + "k8s.io/utils/strings/slices" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/builder" "sigs.k8s.io/controller-runtime/pkg/client" @@ -194,7 +195,59 @@ func (r *AWSVPCPeeringReconciler) handleUpdatePeering( aws *v1beta1.AWSVPCPeering, l logr.Logger, ) reconcile.Result { - err := r.API.UpdatePeering(aws.Status.ID, instaclustr.AWSPeeringEndpoint, &aws.Spec) + instaAWSPeering, err := r.API.GetAWSVPCPeering(aws.Status.ID) + if err != nil { + l.Error(err, "Cannot get AWS VPC Peering from Instaclutr", + "AWS VPC Peering ID", aws.Status.ID, + ) + r.EventRecorder.Eventf(aws, models.Warning, models.UpdateFailed, + "Cannot get AWS VPC Peering from Instaclutr. Reason: %v", + ) + + return models.ReconcileRequeue + } + + if aws.Annotations[models.ExternalChangesAnnotation] == models.True { + if !slices.Equal(instaAWSPeering.PeerSubnets, aws.Spec.PeerSubnets) { + l.Info("The resource specification still differs from the Instaclustr resource specification, please reconcile it manually", + "AWS VPC ID", aws.Status.ID, + "k8s peerSubnets", aws.Spec.PeerSubnets, + "instaclustr peerSubnets", instaAWSPeering.PeerSubnets, + ) + r.EventRecorder.Eventf(aws, models.Warning, models.UpdateFailed, + "The resource specification still differs from the Instaclustr resource specification, please reconcile it manually.", + ) + + return models.ExitReconcile + } + + patch := aws.NewPatch() + delete(aws.Annotations, models.ExternalChangesAnnotation) + err = r.Patch(ctx, aws, patch) + if err != nil { + l.Error(err, "Cannot delete external changes annotation from the resource", + "AWS VPC Peering ID", aws.Status.ID, + ) + r.EventRecorder.Eventf( + aws, models.Warning, models.PatchFailed, + "Deleting external changes annotation is failed. Reason: %v", + err, + ) + + return models.ReconcileRequeue + } + + l.Info("External changes of the k8s resource specification was fixed", + "AWS VPC Peering ID", aws.Status.ID, + ) + r.EventRecorder.Eventf(aws, models.Normal, models.ExternalChanges, + "External changes of the k8s resource specification was fixed", + ) + + return models.ExitReconcile + } + + err = r.API.UpdatePeering(aws.Status.ID, instaclustr.AWSPeeringEndpoint, &aws.Spec) if err != nil { l.Error(err, "cannot update AWS VPC Peering", "AWS Account ID", aws.Spec.PeerAWSAccountID, @@ -264,7 +317,6 @@ func (r *AWSVPCPeeringReconciler) handleDeletePeering( } if status != nil { - r.Scheduler.RemoveJob(aws.GetJobID(scheduler.StatusChecker)) err = r.API.DeletePeering(aws.Status.ID, instaclustr.AWSPeeringEndpoint) if err != nil { l.Error(err, "cannot update AWS VPC Peering resource status", @@ -288,6 +340,8 @@ func (r *AWSVPCPeeringReconciler) handleDeletePeering( return models.ReconcileRequeue } + r.Scheduler.RemoveJob(aws.GetJobID(scheduler.StatusChecker)) + patch := aws.NewPatch() controllerutil.RemoveFinalizer(aws, models.DeletionFinalizer) aws.Annotations[models.ResourceStateAnnotation] = models.DeletedEvent @@ -316,11 +370,6 @@ func (r *AWSVPCPeeringReconciler) handleDeletePeering( "AWS VPC Peering Status", aws.Status.PeeringStatus, ) - r.EventRecorder.Eventf( - aws, models.Normal, models.Deleted, - "Resource is deleted", - ) - return models.ExitReconcile } @@ -338,30 +387,75 @@ func (r *AWSVPCPeeringReconciler) startAWSVPCPeeringStatusJob(awsPeering *v1beta func (r *AWSVPCPeeringReconciler) newWatchStatusJob(awsPeering *v1beta1.AWSVPCPeering) scheduler.Job { l := log.Log.WithValues("component", "AWSVPCPeeringStatusJob") return func() error { - instaPeeringStatus, err := r.API.GetPeeringStatus(awsPeering.Status.ID, instaclustr.AWSPeeringEndpoint) + ctx := context.Background() + + namespacedName := client.ObjectKeyFromObject(awsPeering) + err := r.Get(ctx, namespacedName, awsPeering) + if k8serrors.IsNotFound(err) { + l.Info("Resource is not found in the k8s cluster. Closing Instaclustr status sync.", + "namespaced name", namespacedName) + + go r.Scheduler.RemoveJob(awsPeering.GetJobID(scheduler.StatusChecker)) + + return nil + } + + instaAWSPeering, err := r.API.GetAWSVPCPeering(awsPeering.Status.ID) if err != nil { - l.Error(err, "cannot get AWS VPC Peering Status from Inst API", "AWS VPC Peering ID", awsPeering.Status.ID) + if errors.Is(err, instaclustr.NotFound) { + l.Info("The resource has been deleted on Instaclustr, deleting resource in k8s...") + return r.Delete(ctx, awsPeering) + } + + l.Error(err, "cannot get AWS VPC Peering StatusCode from Inst API", "AWS VPC Peering ID", awsPeering.Status.ID) return err } - if !arePeeringStatusesEqual(instaPeeringStatus, &awsPeering.Status.PeeringStatus) { + instaPeeringStatus := v1beta1.PeeringStatus{ + ID: instaAWSPeering.ID, + StatusCode: instaAWSPeering.StatusCode, + } + + if !arePeeringStatusesEqual(&instaPeeringStatus, &awsPeering.Status.PeeringStatus) { l.Info("AWS VPC Peering status of k8s is different from Instaclustr. Reconcile statuses..", "AWS VPC Peering Status from Inst API", instaPeeringStatus, "AWS VPC Peering Status", awsPeering.Status) patch := awsPeering.NewPatch() - awsPeering.Status.PeeringStatus = *instaPeeringStatus - err := r.Status().Patch(context.Background(), awsPeering, patch) + awsPeering.Status.PeeringStatus = instaPeeringStatus + err := r.Status().Patch(ctx, awsPeering, patch) if err != nil { return err } } + if awsPeering.Annotations[models.ResourceStateAnnotation] != models.UpdatingEvent && + awsPeering.Annotations[models.ExternalChangesAnnotation] != models.True && + !slices.Equal(instaAWSPeering.PeerSubnets, awsPeering.Spec.PeerSubnets) { + l.Info("The k8s resource specification doesn't match the specification of Instaclustr, please change it manually", + "k8s peerSubnets", instaAWSPeering.PeerSubnets, + "instaclutr peerSubnets", awsPeering.Spec.PeerSubnets, + ) + + patch := awsPeering.NewPatch() + awsPeering.Annotations[models.ExternalChangesAnnotation] = models.True + + err = r.Patch(ctx, awsPeering, patch) + if err != nil { + l.Error(err, "Cannot patch the resource with external changes annotation") + + return err + } + + r.EventRecorder.Event(awsPeering, models.Warning, models.ExternalChanges, + "k8s spec doesn't match spec of Instaclutr, please change it manually", + ) + } + return nil } } -// SetupWithManager sets up the controller with the Manager. func (r *AWSVPCPeeringReconciler) SetupWithManager(mgr ctrl.Manager) error { return ctrl.NewControllerManagedBy(mgr). For(&v1beta1.AWSVPCPeering{}, builder.WithPredicates(predicate.Funcs{ diff --git a/pkg/instaclustr/client.go b/pkg/instaclustr/client.go index b0f530b00..4df1a7e2a 100644 --- a/pkg/instaclustr/client.go +++ b/pkg/instaclustr/client.go @@ -2265,3 +2265,33 @@ func (c *Client) GetResizeOperationsByClusterDataCentreID(cdcID string) ([]*v1be return resize.Operations, nil } + +func (c *Client) GetAWSVPCPeering(peerID string) (*models.AWSVPCPeering, error) { + url := c.serverHostname + AWSPeeringEndpoint + peerID + resp, err := c.DoRequest(url, http.MethodGet, nil) + if err != nil { + return nil, err + } + defer resp.Body.Close() + + b, err := io.ReadAll(resp.Body) + if err != nil { + return nil, err + } + + if resp.StatusCode == http.StatusNotFound { + return nil, NotFound + } + + if resp.StatusCode != http.StatusOK { + return nil, fmt.Errorf("status code: %d, message: %s", resp.StatusCode, b) + } + + var vpcPeering models.AWSVPCPeering + err = json.Unmarshal(b, &vpcPeering) + if err != nil { + return nil, err + } + + return &vpcPeering, nil +} diff --git a/pkg/instaclustr/interfaces.go b/pkg/instaclustr/interfaces.go index 96c636eb2..ad65e0764 100644 --- a/pkg/instaclustr/interfaces.go +++ b/pkg/instaclustr/interfaces.go @@ -34,6 +34,7 @@ type API interface { UpdateCluster(id, clusterEndpoint string, instaDCs any) error DeleteCluster(id, clusterEndpoint string) error GetPeeringStatus(peerID, peeringEndpoint string) (*clusterresourcesv1beta1.PeeringStatus, error) + GetAWSVPCPeering(peerID string) (*models.AWSVPCPeering, error) UpdatePeering(peerID, peeringEndpoint string, peerSpec any) error DeletePeering(peerID, peeringEndpoint string) error CreatePeering(url string, peeringSpec any) (*clusterresourcesv1beta1.PeeringStatus, error) diff --git a/pkg/instaclustr/mock/client.go b/pkg/instaclustr/mock/client.go index 903b79a77..2497d62bc 100644 --- a/pkg/instaclustr/mock/client.go +++ b/pkg/instaclustr/mock/client.go @@ -367,3 +367,7 @@ func (c *mockClient) GetRedisUser(id string) (*models.RedisUser, error) { func (c *mockClient) GetResizeOperationsByClusterDataCentreID(cdcID string) ([]*clustersv1beta1.ResizeOperation, error) { panic("GetResizeOperationsByClusterDataCentreID: is not implemented") } + +func (c *mockClient) GetAWSVPCPeering(peerID string) (*models.AWSVPCPeering, error) { + panic("GetAWSVPCPeering: is not implemented") +} diff --git a/pkg/models/apiv2.go b/pkg/models/apiv2.go index dfcc52010..548e9b1a6 100644 --- a/pkg/models/apiv2.go +++ b/pkg/models/apiv2.go @@ -192,3 +192,14 @@ func (cb *ClusterBackup) GetBackupEvents(clusterKind string) map[int]*BackupEven return instBackupEvents } + +type AWSVPCPeering struct { + ID string `json:"id"` + CDCID string `json:"cdcId"` + DataCentreVPCID string `json:"dataCentreVpcId"` + PeerAWSAccountID string `json:"peerAwsAccountId"` + PeerRegion string `json:"peerRegion"` + PeerSubnets []string `json:"peerSubnets"` + PeerVpcID string `json:"peerVpcId"` + StatusCode string `json:"statusCode"` +}