From 963aba7e55b60b5bf41cd197329cdf71736a51cd Mon Sep 17 00:00:00 2001 From: anil-sarodh Date: Wed, 3 Jan 2024 19:42:40 +0530 Subject: [PATCH 1/8] Crospplane provider update workflow added --- .../internal/crossplane/package_providers.go | 59 ++++++++++++++++++- .../internal/job/crossplane_resources_sync.go | 6 +- .../internal/crossplane/activity.go | 11 ++++ capten/model/crossplane_types.go | 11 ++-- 4 files changed, 78 insertions(+), 9 deletions(-) diff --git a/capten/agent/internal/crossplane/package_providers.go b/capten/agent/internal/crossplane/package_providers.go index 33f815e5..7a305882 100644 --- a/capten/agent/internal/crossplane/package_providers.go +++ b/capten/agent/internal/crossplane/package_providers.go @@ -8,6 +8,8 @@ import ( "github.com/intelops/go-common/logging" captenstore "github.com/kube-tarian/kad/capten/agent/internal/capten-store" + "github.com/kube-tarian/kad/capten/agent/internal/temporalclient" + "github.com/kube-tarian/kad/capten/agent/internal/workers" "github.com/kube-tarian/kad/capten/agent/internal/pb/captenpluginspb" "github.com/kube-tarian/kad/capten/common-pkg/k8s" @@ -22,15 +24,25 @@ var ( type ProvidersSyncHandler struct { log logging.Logger + tc *temporalclient.Client dbStore *captenstore.Store } -func NewProvidersSyncHandler(log logging.Logger, dbStore *captenstore.Store) *ProvidersSyncHandler { - return &ProvidersSyncHandler{log: log, dbStore: dbStore} +func NewProvidersSyncHandler(log logging.Logger, dbStore *captenstore.Store) (*ProvidersSyncHandler, error) { + tc, err := temporalclient.NewClient(log) + if err != nil { + return nil, err + } + + return &ProvidersSyncHandler{log: log, dbStore: dbStore, tc: tc}, nil } func registerK8SProviderWatcher(log logging.Logger, dbStore *captenstore.Store, dynamicClient dynamic.Interface) error { - return k8s.RegisterDynamicInformers(NewProvidersSyncHandler(log, dbStore), dynamicClient, pgvk) + provider, err := NewProvidersSyncHandler(log, dbStore) + if err != nil { + return err + } + return k8s.RegisterDynamicInformers(provider, dynamicClient, pgvk) } func getProviderObj(obj any) (*model.Provider, error) { @@ -161,7 +173,48 @@ func (h *ProvidersSyncHandler) updateCrossplaneProvider(k8sProviders []model.Pro continue } h.log.Infof("updated the crossplane provider %s", k8sProvider.Name) + + err = h.triggerProviderUpdate(provider.ProviderName, provider) + if err != nil { + return fmt.Errorf("failed to trigger crossplane provider update workflow, %v", err) + } + + h.log.Infof("triggered crossplane provider update workflow for provider %s", provider.ProviderName) } } return nil } + +func (h *ProvidersSyncHandler) triggerProviderUpdate(clusterName string, provider model.CrossplaneProvider) error { + wd := workers.NewConfig(h.tc, h.log) + + proj, err := h.dbStore.GetCrossplaneProject() + if err != nil { + return err + } + ci := model.CrossplaneClusterUpdate{RepoURL: proj.GitProjectUrl, GitProjectId: proj.GitProjectId, + ManagedClusterName: clusterName, ManagedClusterId: provider.Id} + + wkfId, err := wd.SendAsyncEvent(context.TODO(), &model.ConfigureParameters{Resource: model.CrossPlaneResource, Action: model.CrossPlaneProviderUpdate}, ci) + if err != nil { + return fmt.Errorf("failed to send event to crossplane provider update workflow to configure %s, %v", provider.ProviderName, err) + } + + h.log.Infof("Crossplane provider update %s config workflow %s created", provider.ProviderName, wkfId) + + go h.monitorProviderUpdateWorkflow(&provider, wkfId) + + return nil +} + +func (h *ProvidersSyncHandler) monitorProviderUpdateWorkflow(provider *model.CrossplaneProvider, wkfId string) { + // during system reboot start monitoring, add it in map or somewhere. + wd := workers.NewConfig(h.tc, h.log) + _, err := wd.GetWorkflowInformation(context.TODO(), wkfId) + if err != nil { + h.log.Errorf("failed to send crossplane provider update event to workflow to configure %s, %v", provider.ProviderName, err) + return + } + + h.log.Infof("Crossplane provider update %s config workflow %s completed", provider.ProviderName, wkfId) +} diff --git a/capten/agent/internal/job/crossplane_resources_sync.go b/capten/agent/internal/job/crossplane_resources_sync.go index b072b2e3..e253edbf 100644 --- a/capten/agent/internal/job/crossplane_resources_sync.go +++ b/capten/agent/internal/job/crossplane_resources_sync.go @@ -19,12 +19,16 @@ func NewCrossplaneResourcesSync(log logging.Logger, frequency string, dbStore *c if err != nil { return nil, err } + providerObj, err := crossplane.NewProvidersSyncHandler(log, dbStore) + if err != nil { + return nil, err + } return &CrossplaneResourcesSync{ log: log, frequency: frequency, dbStore: dbStore, clusterHandler: ccObj, - providerHandler: crossplane.NewProvidersSyncHandler(log, dbStore), + providerHandler: providerObj, }, nil } diff --git a/capten/config-worker/internal/crossplane/activity.go b/capten/config-worker/internal/crossplane/activity.go index f5e6063a..b1dadf45 100644 --- a/capten/config-worker/internal/crossplane/activity.go +++ b/capten/config-worker/internal/crossplane/activity.go @@ -71,6 +71,17 @@ func processConfigurationActivity(ctx context.Context, params model.ConfigurePar return status, fmt.Errorf("failed to configure crossplane project for %s", model.CrossPlaneClusterUpdate) } return status, nil + + case model.CrossPlaneProviderUpdate: + reqLocal := &model.CrossplaneClusterUpdate{} + if err := json.Unmarshal(payload, reqLocal); err != nil { + logger.Errorf("failed to unmarshall the crossplane provider update req for %s, %v", model.CrossPlaneProviderUpdate, err) + return string(model.WorkFlowStatusFailed), fmt.Errorf("failed to unmarshall the crossplane provider update req for %s", model.CrossPlaneProviderUpdate) + } + + logger.Info("Crossplane Provider update event triggered") + + return "", nil default: return string(model.WorkFlowStatusFailed), fmt.Errorf("invalid crossplane action") } diff --git a/capten/model/crossplane_types.go b/capten/model/crossplane_types.go index a7447bd7..b5cbb697 100644 --- a/capten/model/crossplane_types.go +++ b/capten/model/crossplane_types.go @@ -9,11 +9,12 @@ import ( ) const ( - providerNamePrefix = "provider" - CrossPlaneResource = "crossplane" - CrossPlaneClusterUpdate = "crossplane-cluster-update" - CrossPlaneProjectSync = "crossplane-project-sync" - CrossPlaneProjectDelete = "crossplane-project-delete" + providerNamePrefix = "provider" + CrossPlaneResource = "crossplane" + CrossPlaneClusterUpdate = "crossplane-cluster-update" + CrossPlaneProjectSync = "crossplane-project-sync" + CrossPlaneProjectDelete = "crossplane-project-delete" + CrossPlaneProviderUpdate = "crossplane-provider-update" ) type CrossplaneProviderStatus string From db6b2359b7b2f238085a119f1414dbb74932a5e7 Mon Sep 17 00:00:00 2001 From: anil-sarodh Date: Wed, 3 Jan 2024 21:44:30 +0530 Subject: [PATCH 2/8] Cluster valaues are getting removed --- .../internal/crossplane/config_cluster_updates.go | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/capten/config-worker/internal/crossplane/config_cluster_updates.go b/capten/config-worker/internal/crossplane/config_cluster_updates.go index 7679b39c..d0d09814 100644 --- a/capten/config-worker/internal/crossplane/config_cluster_updates.go +++ b/capten/config-worker/internal/crossplane/config_cluster_updates.go @@ -249,13 +249,14 @@ func removeClusterValues(valuesFileName, clusterName string) error { clusters = *clusterConfig.Clusters } + newclusters := []Cluster{} for _, cluster := range clusters { if cluster.Name != clusterName { - clusters = append(clusters, cluster) + newclusters = append(newclusters, cluster) } } - clusterConfig.Clusters = &clusters + clusterConfig.Clusters = &newclusters jsonBytes, err := json.Marshal(clusterConfig) if err != nil { return err @@ -265,6 +266,8 @@ func removeClusterValues(valuesFileName, clusterName string) error { if err != nil { return err } + fmt.Println("Removed Values") + fmt.Println(string(yamlBytes)) err = os.WriteFile(valuesFileName, yamlBytes, os.ModeAppend) return err From ea3ff0a56ad399cfe5156986214a711c3568957d Mon Sep 17 00:00:00 2001 From: anil-sarodh Date: Tue, 9 Jan 2024 00:08:46 +0530 Subject: [PATCH 3/8] Minor changes --- capten/agent/internal/crossplane/cluster_claims.go | 3 +++ .../internal/crossplane/config_cluster_updates.go | 2 -- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/capten/agent/internal/crossplane/cluster_claims.go b/capten/agent/internal/crossplane/cluster_claims.go index e77825f2..69034330 100644 --- a/capten/agent/internal/crossplane/cluster_claims.go +++ b/capten/agent/internal/crossplane/cluster_claims.go @@ -364,16 +364,19 @@ func (h *ClusterClaimSyncHandler) monitorCrossplaneWorkflow(managedCluster *capt h.log.Errorf("failed to send event to workflow to configure %s, %v", managedCluster.ClusterEndpoint, err) return } + h.log.Infof("Successfuly removed the %s app config from clusters", managedCluster.ClusterName) if err := h.dbStore.DeleteManagedClusterById(managedCluster.Id); err != nil { h.log.Errorf("failed to delete managed cluster from DB, %v", err) return } + h.log.Infof("Successfuly deleted managed cluster record for %s. cluster Id - %s", managedCluster.ClusterName, managedCluster.Id) if err = h.deleteManagedClusterCredential(context.TODO(), managedCluster.Id); err != nil { h.log.Errorf("failed to delete credential for %s, %v", managedCluster.Id, err) return } + h.log.Infof("Successfuly deleted managed cluster credential for %s. cluster Id - %s", managedCluster.ClusterName, managedCluster.Id) h.log.Infof("Crossplane project delete %s config workflow %s completed", managedCluster.ClusterEndpoint, wkfId) } diff --git a/capten/config-worker/internal/crossplane/config_cluster_updates.go b/capten/config-worker/internal/crossplane/config_cluster_updates.go index d0d09814..26b4a26f 100644 --- a/capten/config-worker/internal/crossplane/config_cluster_updates.go +++ b/capten/config-worker/internal/crossplane/config_cluster_updates.go @@ -266,8 +266,6 @@ func removeClusterValues(valuesFileName, clusterName string) error { if err != nil { return err } - fmt.Println("Removed Values") - fmt.Println(string(yamlBytes)) err = os.WriteFile(valuesFileName, yamlBytes, os.ModeAppend) return err From 1b0ae9d0ee679dee5575d8a9fbf64daf180980ad Mon Sep 17 00:00:00 2001 From: anil-sarodh Date: Tue, 9 Jan 2024 23:08:32 +0530 Subject: [PATCH 4/8] minor changes --- .../internal/crossplane/config_cluster_updates.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/capten/config-worker/internal/crossplane/config_cluster_updates.go b/capten/config-worker/internal/crossplane/config_cluster_updates.go index 26b4a26f..31503ffd 100644 --- a/capten/config-worker/internal/crossplane/config_cluster_updates.go +++ b/capten/config-worker/internal/crossplane/config_cluster_updates.go @@ -256,6 +256,8 @@ func removeClusterValues(valuesFileName, clusterName string) error { } } + fmt.Printf("newclusters \n %+v \n", newclusters) + clusterConfig.Clusters = &newclusters jsonBytes, err := json.Marshal(clusterConfig) if err != nil { @@ -267,6 +269,9 @@ func removeClusterValues(valuesFileName, clusterName string) error { return err } + fmt.Println("final file") + fmt.Printf("%s \n", string(yamlBytes)) + err = os.WriteFile(valuesFileName, yamlBytes, os.ModeAppend) return err } From a4a17a1182d60131680b06b3aadefd826d8389d3 Mon Sep 17 00:00:00 2001 From: anil-sarodh Date: Tue, 9 Jan 2024 23:47:05 +0530 Subject: [PATCH 5/8] minor changes --- .../internal/crossplane/config_cluster_updates.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/capten/config-worker/internal/crossplane/config_cluster_updates.go b/capten/config-worker/internal/crossplane/config_cluster_updates.go index 31503ffd..9b9347c2 100644 --- a/capten/config-worker/internal/crossplane/config_cluster_updates.go +++ b/capten/config-worker/internal/crossplane/config_cluster_updates.go @@ -217,6 +217,11 @@ func (cp *CrossPlaneApp) configureClusterDelete(ctx context.Context, req *model. return string(agentmodel.WorkFlowStatusFailed), errors.WithMessage(err, "failed to remove cluster folder") } + err = cp.helper.AddFilesToRepo([]string{clusterValuesFile}) + if err != nil { + return string(agentmodel.WorkFlowStatusFailed), errors.WithMessage(err, "failed to add git repo") + } + err = cp.helper.CommitRepoChanges() if err != nil { return string(agentmodel.WorkFlowStatusFailed), errors.WithMessage(err, "failed to commit git repo") From e6daafe56cb7e79e8973aa8fa252ee7462b859aa Mon Sep 17 00:00:00 2001 From: anil-sarodh Date: Wed, 10 Jan 2024 00:22:10 +0530 Subject: [PATCH 6/8] minor changes --- .../config-worker/internal/crossplane/config_cluster_updates.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/capten/config-worker/internal/crossplane/config_cluster_updates.go b/capten/config-worker/internal/crossplane/config_cluster_updates.go index 9b9347c2..14373d12 100644 --- a/capten/config-worker/internal/crossplane/config_cluster_updates.go +++ b/capten/config-worker/internal/crossplane/config_cluster_updates.go @@ -217,7 +217,7 @@ func (cp *CrossPlaneApp) configureClusterDelete(ctx context.Context, req *model. return string(agentmodel.WorkFlowStatusFailed), errors.WithMessage(err, "failed to remove cluster folder") } - err = cp.helper.AddFilesToRepo([]string{clusterValuesFile}) + err = cp.helper.AddFilesToRepo([]string{"."}) if err != nil { return string(agentmodel.WorkFlowStatusFailed), errors.WithMessage(err, "failed to add git repo") } From 8b2ca33c800ad49a585f7ba437f0bdcaa450ab2a Mon Sep 17 00:00:00 2001 From: anil-sarodh Date: Wed, 10 Jan 2024 00:40:02 +0530 Subject: [PATCH 7/8] minor changes --- capten/common-pkg/plugins/git/git.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/capten/common-pkg/plugins/git/git.go b/capten/common-pkg/plugins/git/git.go index 0ca6cbe8..54fa44cb 100644 --- a/capten/common-pkg/plugins/git/git.go +++ b/capten/common-pkg/plugins/git/git.go @@ -49,6 +49,12 @@ func (g *GitClient) Add(path string) error { return err } + s, err := w.Status() + if err != nil { + fmt.Println("Error => " + err.Error()) + } + fmt.Printf("Status = \n %+v \n", s) + _, err = w.Add(path) if err != nil { return err From 9d60fb2e9292110429a9126a55aba70fd463489a Mon Sep 17 00:00:00 2001 From: anil-sarodh Date: Wed, 10 Jan 2024 21:30:52 +0530 Subject: [PATCH 8/8] minor changes --- .../internal/capten-store/managed_cluster.go | 33 +++++++++++-------- .../internal/crossplane/cluster_claims.go | 4 +-- capten/common-pkg/plugins/git/git.go | 6 ---- .../crossplane/config_cluster_updates.go | 8 +---- 4 files changed, 23 insertions(+), 28 deletions(-) diff --git a/capten/agent/internal/capten-store/managed_cluster.go b/capten/agent/internal/capten-store/managed_cluster.go index 577dff39..60c3efa7 100644 --- a/capten/agent/internal/capten-store/managed_cluster.go +++ b/capten/agent/internal/capten-store/managed_cluster.go @@ -11,32 +11,39 @@ import ( ) const ( - insertManagedCluster = "INSERT INTO %s.ManagedClusters(id, cluster_name, cluster_endpoint, cluster_deploy_status, app_deploy_status, last_update_time) VALUES (?,?,?,?,?,?)" - insertManagedClusterId = "INSERT INTO %s.ManagedClusters(id) VALUES (?)" - updateManagedClusterById = "UPDATE %s.ManagedClusters SET %s WHERE id=?" - deleteManagedClusterById = "DELETE FROM %s.ManagedClusters WHERE id= ?" - selectAllManagedClusters = "SELECT id, cluster_name, cluster_endpoint, cluster_deploy_status, app_deploy_status, last_update_time FROM %s.ManagedClusters" - selectAllManagedClustersByLabels = "SELECT id, cluster_name, cluster_endpoint, cluster_deploy_status, app_deploy_status, last_update_time FROM %s.ManagedClusters WHERE %s" - selectGetManagedClusterById = "SELECT id, cluster_name, cluster_endpoint, cluster_deploy_status, app_deploy_status, last_update_time FROM %s.ManagedClusters WHERE id=%s;" + insertManagedCluster = "INSERT INTO %s.ManagedClusters(id, cluster_name, cluster_endpoint, cluster_deploy_status, app_deploy_status, last_update_time) VALUES (?,?,?,?,?,?)" + insertManagedClusterId = "INSERT INTO %s.ManagedClusters(id) VALUES (?)" + updateManagedClusterById = "UPDATE %s.ManagedClusters SET %s WHERE id=?" + deleteManagedClusterById = "DELETE FROM %s.ManagedClusters WHERE id= ?" + selectAllManagedClusters = "SELECT id, cluster_name, cluster_endpoint, cluster_deploy_status, app_deploy_status, last_update_time FROM %s.ManagedClusters" + selectAllManagedClustersByLabels = "SELECT id, cluster_name, cluster_endpoint, cluster_deploy_status, app_deploy_status, last_update_time FROM %s.ManagedClusters WHERE %s" + selectGetManagedClusterById = "SELECT id, cluster_name, cluster_endpoint, cluster_deploy_status, app_deploy_status, last_update_time FROM %s.ManagedClusters WHERE id=%s;" + selectGetManagedClusterByClusterName = "SELECT id, cluster_name, cluster_endpoint, cluster_deploy_status, app_deploy_status, last_update_time FROM %s.ManagedClusters WHERE cluster_name=%s ALLOW FILTERING;" ) func (a *Store) UpsertManagedCluster(config *captenpluginspb.ManagedCluster) error { config.LastUpdateTime = time.Now().Format(time.RFC3339) batch := a.client.Session().NewBatch(gocql.LoggedBatch) - batch.Query(fmt.Sprintf(insertManagedCluster, a.keyspace), config.Id, config.ClusterName, config.ClusterEndpoint, config.ClusterDeployStatus, config.AppDeployStatus, config.LastUpdateTime) - err := a.client.Session().ExecuteBatch(batch) + + query := fmt.Sprintf(selectGetManagedClusterByClusterName, a.keyspace, config.ClusterName) + clusters, err := a.executeManagedClustersSelectQuery(query) if err != nil { + batch.Query(fmt.Sprintf(insertManagedCluster, a.keyspace), config.Id, config.ClusterName, config.ClusterEndpoint, config.ClusterDeployStatus, config.AppDeployStatus, config.LastUpdateTime) + } else if len(clusters) > 0 { updatePlaceholders, values := formUpdateKvPairsForManagedCluster(config) if updatePlaceholders == "" { - return err + return fmt.Errorf("empty values found") } query := fmt.Sprintf(updateManagedClusterById, a.keyspace, updatePlaceholders) args := append(values, config.Id) - batch = a.client.Session().NewBatch(gocql.LoggedBatch) batch.Query(query, args...) - err = a.client.Session().ExecuteBatch(batch) } - return err + + if err := a.client.Session().ExecuteBatch(batch); err != nil { + return err + } + + return nil } func (a *Store) DeleteManagedClusterById(id string) error { diff --git a/capten/agent/internal/crossplane/cluster_claims.go b/capten/agent/internal/crossplane/cluster_claims.go index 69034330..79246257 100644 --- a/capten/agent/internal/crossplane/cluster_claims.go +++ b/capten/agent/internal/crossplane/cluster_claims.go @@ -344,10 +344,10 @@ func (h *ClusterClaimSyncHandler) triggerClusterDelete(clusterName string, manag return fmt.Errorf("failed to send event to workflow to configure %s, %v", managedCluster.ClusterEndpoint, err) } - h.log.Infof("Crossplane project delete %s config workflow %s created", managedCluster.ClusterEndpoint, wkfId) - go h.monitorCrossplaneWorkflow(managedCluster, wkfId) + h.log.Infof("Crossplane project delete %s config workflow %s created", managedCluster.ClusterEndpoint, wkfId) + return nil } diff --git a/capten/common-pkg/plugins/git/git.go b/capten/common-pkg/plugins/git/git.go index 54fa44cb..0ca6cbe8 100644 --- a/capten/common-pkg/plugins/git/git.go +++ b/capten/common-pkg/plugins/git/git.go @@ -49,12 +49,6 @@ func (g *GitClient) Add(path string) error { return err } - s, err := w.Status() - if err != nil { - fmt.Println("Error => " + err.Error()) - } - fmt.Printf("Status = \n %+v \n", s) - _, err = w.Add(path) if err != nil { return err diff --git a/capten/config-worker/internal/crossplane/config_cluster_updates.go b/capten/config-worker/internal/crossplane/config_cluster_updates.go index 14373d12..4da34c6e 100644 --- a/capten/config-worker/internal/crossplane/config_cluster_updates.go +++ b/capten/config-worker/internal/crossplane/config_cluster_updates.go @@ -217,8 +217,7 @@ func (cp *CrossPlaneApp) configureClusterDelete(ctx context.Context, req *model. return string(agentmodel.WorkFlowStatusFailed), errors.WithMessage(err, "failed to remove cluster folder") } - err = cp.helper.AddFilesToRepo([]string{"."}) - if err != nil { + if err := cp.helper.AddFilesToRepo([]string{"."}); err != nil { return string(agentmodel.WorkFlowStatusFailed), errors.WithMessage(err, "failed to add git repo") } @@ -261,8 +260,6 @@ func removeClusterValues(valuesFileName, clusterName string) error { } } - fmt.Printf("newclusters \n %+v \n", newclusters) - clusterConfig.Clusters = &newclusters jsonBytes, err := json.Marshal(clusterConfig) if err != nil { @@ -274,9 +271,6 @@ func removeClusterValues(valuesFileName, clusterName string) error { return err } - fmt.Println("final file") - fmt.Printf("%s \n", string(yamlBytes)) - err = os.WriteFile(valuesFileName, yamlBytes, os.ModeAppend) return err }