Skip to content

Commit

Permalink
added handling of external changes to aws vpc peering resource
Browse files Browse the repository at this point in the history
  • Loading branch information
Bohdan Siryk authored and Bohdan Siryk committed Sep 19, 2023
1 parent 5ebf820 commit dd40ef0
Show file tree
Hide file tree
Showing 6 changed files with 179 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -287,8 +287,14 @@ func (r *AWSSecurityGroupFirewallRuleReconciler) startFirewallRuleStatusJob(fire
func (r *AWSSecurityGroupFirewallRuleReconciler) newWatchStatusJob(firewallRule *v1beta1.AWSSecurityGroupFirewallRule) scheduler.Job {
l := log.Log.WithValues("component", "FirewallRuleStatusJob")
return func() error {
ctx := context.Background()
instaFirewallRuleStatus, err := r.API.GetFirewallRuleStatus(firewallRule.Status.ID, instaclustr.AWSSecurityGroupFirewallRuleEndpoint)
if err != nil {
if errors.Is(err, instaclustr.NotFound) {
l.Info("The resource has been deleted on Instaclustr, deleting resource in k8s...")
return r.Delete(ctx, firewallRule)
}

l.Error(err, "Cannot get AWS security group firewall rule status from Inst API", "firewall rule ID", firewallRule.Status.ID)
return err
}
Expand All @@ -299,13 +305,13 @@ func (r *AWSSecurityGroupFirewallRuleReconciler) newWatchStatusJob(firewallRule
"firewall rule status", firewallRule.Status)
patch := firewallRule.NewPatch()
firewallRule.Status.FirewallRuleStatus = *instaFirewallRuleStatus
err := r.Status().Patch(context.Background(), firewallRule, patch)
err := r.Status().Patch(ctx, firewallRule, patch)
if err != nil {
return err
}

if instaFirewallRuleStatus.Status == statusDELETED {
r.Scheduler.RemoveJob(firewallRule.GetJobID(scheduler.StatusChecker))
go r.Scheduler.RemoveJob(firewallRule.GetJobID(scheduler.StatusChecker))
}
}

Expand Down
140 changes: 125 additions & 15 deletions controllers/clusterresources/awsvpcpeering_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,11 @@ package clusterresources
import (
"context"
"errors"

"github.com/go-logr/logr"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/tools/record"
"k8s.io/utils/strings/slices"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
Expand Down Expand Up @@ -194,7 +194,75 @@ func (r *AWSVPCPeeringReconciler) handleUpdatePeering(
aws *v1beta1.AWSVPCPeering,
l logr.Logger,
) reconcile.Result {
err := r.API.UpdatePeering(aws.Status.ID, instaclustr.AWSPeeringEndpoint, &aws.Spec)
instaAWSpeering, err := r.API.GetAWSVPCPeering(aws.Status.ID)
if err != nil {
l.Error(err, "Cannot get AWS VPC Peering from Instaclutr",
"AWS VPC Peering ID", aws.Status.ID,
)
r.EventRecorder.Eventf(aws, models.Warning, models.UpdateFailed,
"Cannot get AWS VPC Peering from Instaclutr. Reason: %v",
)

return models.ReconcileRequeue
}

if aws.Annotations[models.ExternalChangesAnnotation] == models.True {
if !slices.Equal(instaAWSpeering.PeerSubnets, aws.Spec.PeerSubnets) {
l.Info("The resource specification still differs from the Instaclustr resource specification, please reconcile it manually",
"AWS VPC ID", aws.Status.ID,
"k8s peerSubnets", aws.Spec.PeerSubnets,
"instaclustr peerSubnets", instaAWSpeering.PeerSubnets,
)
r.EventRecorder.Eventf(aws, models.Warning, models.UpdateFailed,
"The resource specification still differs from the Instaclustr resource specification, please reconcile it manually.",
)

return models.ExitReconcile
}

patch := aws.NewPatch()
delete(aws.Annotations, models.ExternalChangesAnnotation)
err = r.Patch(ctx, aws, patch)
if err != nil {
l.Error(err, "Cannot delete external changes annotation from the resource",
"AWS VPC Peering ID", aws.Status.ID,
)
r.EventRecorder.Eventf(
aws, models.Warning, models.PatchFailed,
"Deleting external changes annotation is failed. Reason: %v",
err,
)

return models.ReconcileRequeue
}

l.Info("External changes of the k8s resource specification was fixed",
"AWS VPC Peering ID", aws.Status.ID,
)
r.EventRecorder.Eventf(aws, models.Normal, models.ExternalChanges,
"External changes of the k8s resource specification was fixed",
)

return models.ExitReconcile
}

patch := aws.NewPatch()
aws.Annotations[models.UpdateQueuedAnnotation] = models.True
err = r.Patch(ctx, aws, patch)
if err != nil {
l.Error(err, "Cannot patch AWS VPC Peering resource metadata with update queued annotation",
"AWS VPC Peering ID", aws.Status.ID,
)
r.EventRecorder.Eventf(
aws, models.Warning, models.PatchFailed,
"Patching resource with update queued annotation is failed. Reason: %v",
err,
)

return models.ReconcileRequeue
}

err = r.API.UpdatePeering(aws.Status.ID, instaclustr.AWSPeeringEndpoint, &aws.Spec)
if err != nil {
l.Error(err, "cannot update AWS VPC Peering",
"AWS Account ID", aws.Spec.PeerAWSAccountID,
Expand All @@ -210,8 +278,9 @@ func (r *AWSVPCPeeringReconciler) handleUpdatePeering(
return models.ReconcileRequeue
}

patch := aws.NewPatch()
patch = aws.NewPatch()
aws.Annotations[models.ResourceStateAnnotation] = models.UpdatedEvent
delete(aws.Annotations, models.UpdateQueuedAnnotation)
err = r.Patch(ctx, aws, patch)
if err != nil {
l.Error(err, "cannot patch AWS VPC Peering resource metadata",
Expand Down Expand Up @@ -264,7 +333,6 @@ func (r *AWSVPCPeeringReconciler) handleDeletePeering(
}

if status != nil {
r.Scheduler.RemoveJob(aws.GetJobID(scheduler.StatusChecker))
err = r.API.DeletePeering(aws.Status.ID, instaclustr.AWSPeeringEndpoint)
if err != nil {
l.Error(err, "cannot update AWS VPC Peering resource status",
Expand All @@ -288,6 +356,8 @@ func (r *AWSVPCPeeringReconciler) handleDeletePeering(
return models.ReconcileRequeue
}

r.Scheduler.RemoveJob(aws.GetJobID(scheduler.StatusChecker))

patch := aws.NewPatch()
controllerutil.RemoveFinalizer(aws, models.DeletionFinalizer)
aws.Annotations[models.ResourceStateAnnotation] = models.DeletedEvent
Expand Down Expand Up @@ -316,11 +386,6 @@ func (r *AWSVPCPeeringReconciler) handleDeletePeering(
"AWS VPC Peering Status", aws.Status.PeeringStatus,
)

r.EventRecorder.Eventf(
aws, models.Normal, models.Deleted,
"Resource is deleted",
)

return models.ExitReconcile
}

Expand All @@ -338,30 +403,75 @@ func (r *AWSVPCPeeringReconciler) startAWSVPCPeeringStatusJob(awsPeering *v1beta
func (r *AWSVPCPeeringReconciler) newWatchStatusJob(awsPeering *v1beta1.AWSVPCPeering) scheduler.Job {
l := log.Log.WithValues("component", "AWSVPCPeeringStatusJob")
return func() error {
instaPeeringStatus, err := r.API.GetPeeringStatus(awsPeering.Status.ID, instaclustr.AWSPeeringEndpoint)
ctx := context.Background()

namespacedName := client.ObjectKeyFromObject(awsPeering)
err := r.Get(ctx, namespacedName, awsPeering)
if k8serrors.IsNotFound(err) {
l.Info("Resource is not found in the k8s cluster. Closing Instaclustr status sync.",
"namespaced name", namespacedName)

go r.Scheduler.RemoveJob(awsPeering.GetJobID(scheduler.StatusChecker))

return nil
}

instaAWSPeering, err := r.API.GetAWSVPCPeering(awsPeering.Status.ID)
if err != nil {
l.Error(err, "cannot get AWS VPC Peering Status from Inst API", "AWS VPC Peering ID", awsPeering.Status.ID)
if errors.Is(err, instaclustr.NotFound) {
l.Info("The resource has been deleted on Instaclustr, deleting resource in k8s...")
return r.Delete(ctx, awsPeering)
}

l.Error(err, "cannot get AWS VPC Peering StatusCode from Inst API", "AWS VPC Peering ID", awsPeering.Status.ID)
return err
}

if !arePeeringStatusesEqual(instaPeeringStatus, &awsPeering.Status.PeeringStatus) {
instaPeeringStatus := v1beta1.PeeringStatus{
ID: instaAWSPeering.ID,
StatusCode: instaAWSPeering.StatusCode,
}

if !arePeeringStatusesEqual(&instaPeeringStatus, &awsPeering.Status.PeeringStatus) {
l.Info("AWS VPC Peering status of k8s is different from Instaclustr. Reconcile statuses..",
"AWS VPC Peering Status from Inst API", instaPeeringStatus,
"AWS VPC Peering Status", awsPeering.Status)

patch := awsPeering.NewPatch()
awsPeering.Status.PeeringStatus = *instaPeeringStatus
err := r.Status().Patch(context.Background(), awsPeering, patch)
awsPeering.Status.PeeringStatus = instaPeeringStatus
err := r.Status().Patch(ctx, awsPeering, patch)
if err != nil {
return err
}
}

if awsPeering.Annotations[models.UpdateQueuedAnnotation] == "" &&
awsPeering.Annotations[models.ExternalChangesAnnotation] != models.True &&
!slices.Equal(instaAWSPeering.PeerSubnets, awsPeering.Spec.PeerSubnets) {
l.Info("The k8s resource specification doesn't match the specification of Instaclustr, please change it manually",
"k8s peerSubnets", instaAWSPeering.PeerSubnets,
"instaclutr peerSubnets", awsPeering.Spec.PeerSubnets,
)

patch := awsPeering.NewPatch()
awsPeering.Annotations[models.ExternalChangesAnnotation] = models.True

err = r.Patch(ctx, awsPeering, patch)
if err != nil {
l.Error(err, "Cannot patch the resource with external changes annotation")

return err
}

r.EventRecorder.Event(awsPeering, models.Warning, models.ExternalChanges,
"k8s spec doesn't match spec of Instaclutr, please change it manually",
)
}

return nil
}
}

// SetupWithManager sets up the controller with the Manager.
func (r *AWSVPCPeeringReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&v1beta1.AWSVPCPeering{}, builder.WithPredicates(predicate.Funcs{
Expand Down
30 changes: 30 additions & 0 deletions pkg/instaclustr/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -2424,3 +2424,33 @@ func (c *Client) GetResizeOperationsByClusterDataCentreID(cdcID string) ([]*v1be

return resize.Operations, nil
}

func (c *Client) GetAWSVPCPeering(peerID string) (*models.AWSVPCPeering, error) {
url := c.serverHostname + AWSPeeringEndpoint + peerID
resp, err := c.DoRequest(url, http.MethodGet, nil)
if err != nil {
return nil, err
}
defer resp.Body.Close()

b, err := io.ReadAll(resp.Body)
if err != nil {
return nil, err
}

if resp.StatusCode == http.StatusNotFound {
return nil, NotFound
}

if resp.StatusCode != http.StatusOK {
return nil, fmt.Errorf("status code: %d, message: %s", resp.StatusCode, b)
}

var vpcPeering models.AWSVPCPeering
err = json.Unmarshal(b, &vpcPeering)
if err != nil {
return nil, err
}

return &vpcPeering, nil
}
1 change: 1 addition & 0 deletions pkg/instaclustr/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ type API interface {
UpdateCluster(id, clusterEndpoint string, instaDCs any) error
DeleteCluster(id, clusterEndpoint string) error
GetPeeringStatus(peerID, peeringEndpoint string) (*clusterresourcesv1beta1.PeeringStatus, error)
GetAWSVPCPeering(peerID string) (*models.AWSVPCPeering, error)
UpdatePeering(peerID, peeringEndpoint string, peerSpec any) error
DeletePeering(peerID, peeringEndpoint string) error
CreatePeering(url string, peeringSpec any) (*clusterresourcesv1beta1.PeeringStatus, error)
Expand Down
4 changes: 4 additions & 0 deletions pkg/instaclustr/mock/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -379,3 +379,7 @@ func (c *mockClient) GetRedisUser(id string) (*models.RedisUser, error) {
func (c *mockClient) GetResizeOperationsByClusterDataCentreID(cdcID string) ([]*clustersv1beta1.ResizeOperation, error) {
panic("GetResizeOperationsByClusterDataCentreID: is not implemented")
}

func (c *mockClient) GetAWSVPCPeering(peerID string) (*models.AWSVPCPeering, error) {
panic("GetAWSVPCPeering: is not implemented")
}
11 changes: 11 additions & 0 deletions pkg/models/apiv2.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,3 +137,14 @@ type ResizeSettings struct {
// Number of concurrent nodes to resize during a resize operation
Concurrency int `json:"concurrency,omitempty"`
}

type AWSVPCPeering struct {
ID string `json:"id"`
CDCID string `json:"cdcId"`
DataCentreVPCID string `json:"dataCentreVpcId"`
PeerAWSAccountID string `json:"peerAwsAccountId"`
PeerRegion string `json:"peerRegion"`
PeerSubnets []string `json:"peerSubnets"`
PeerVpcID string `json:"peerVpcId"`
StatusCode string `json:"statusCode"`
}

0 comments on commit dd40ef0

Please sign in to comment.