diff --git a/capten/agent/internal/capten-store/managed_cluster.go b/capten/agent/internal/capten-store/managed_cluster.go index 577dff39..60c3efa7 100644 --- a/capten/agent/internal/capten-store/managed_cluster.go +++ b/capten/agent/internal/capten-store/managed_cluster.go @@ -11,32 +11,39 @@ import ( ) const ( - insertManagedCluster = "INSERT INTO %s.ManagedClusters(id, cluster_name, cluster_endpoint, cluster_deploy_status, app_deploy_status, last_update_time) VALUES (?,?,?,?,?,?)" - insertManagedClusterId = "INSERT INTO %s.ManagedClusters(id) VALUES (?)" - updateManagedClusterById = "UPDATE %s.ManagedClusters SET %s WHERE id=?" - deleteManagedClusterById = "DELETE FROM %s.ManagedClusters WHERE id= ?" - selectAllManagedClusters = "SELECT id, cluster_name, cluster_endpoint, cluster_deploy_status, app_deploy_status, last_update_time FROM %s.ManagedClusters" - selectAllManagedClustersByLabels = "SELECT id, cluster_name, cluster_endpoint, cluster_deploy_status, app_deploy_status, last_update_time FROM %s.ManagedClusters WHERE %s" - selectGetManagedClusterById = "SELECT id, cluster_name, cluster_endpoint, cluster_deploy_status, app_deploy_status, last_update_time FROM %s.ManagedClusters WHERE id=%s;" + insertManagedCluster = "INSERT INTO %s.ManagedClusters(id, cluster_name, cluster_endpoint, cluster_deploy_status, app_deploy_status, last_update_time) VALUES (?,?,?,?,?,?)" + insertManagedClusterId = "INSERT INTO %s.ManagedClusters(id) VALUES (?)" + updateManagedClusterById = "UPDATE %s.ManagedClusters SET %s WHERE id=?" + deleteManagedClusterById = "DELETE FROM %s.ManagedClusters WHERE id= ?" + selectAllManagedClusters = "SELECT id, cluster_name, cluster_endpoint, cluster_deploy_status, app_deploy_status, last_update_time FROM %s.ManagedClusters" + selectAllManagedClustersByLabels = "SELECT id, cluster_name, cluster_endpoint, cluster_deploy_status, app_deploy_status, last_update_time FROM %s.ManagedClusters WHERE %s" + selectGetManagedClusterById = "SELECT id, cluster_name, cluster_endpoint, cluster_deploy_status, app_deploy_status, last_update_time FROM %s.ManagedClusters WHERE id=%s;" + selectGetManagedClusterByClusterName = "SELECT id, cluster_name, cluster_endpoint, cluster_deploy_status, app_deploy_status, last_update_time FROM %s.ManagedClusters WHERE cluster_name=%s ALLOW FILTERING;" ) func (a *Store) UpsertManagedCluster(config *captenpluginspb.ManagedCluster) error { config.LastUpdateTime = time.Now().Format(time.RFC3339) batch := a.client.Session().NewBatch(gocql.LoggedBatch) - batch.Query(fmt.Sprintf(insertManagedCluster, a.keyspace), config.Id, config.ClusterName, config.ClusterEndpoint, config.ClusterDeployStatus, config.AppDeployStatus, config.LastUpdateTime) - err := a.client.Session().ExecuteBatch(batch) + + query := fmt.Sprintf(selectGetManagedClusterByClusterName, a.keyspace, config.ClusterName) + clusters, err := a.executeManagedClustersSelectQuery(query) if err != nil { + batch.Query(fmt.Sprintf(insertManagedCluster, a.keyspace), config.Id, config.ClusterName, config.ClusterEndpoint, config.ClusterDeployStatus, config.AppDeployStatus, config.LastUpdateTime) + } else if len(clusters) > 0 { updatePlaceholders, values := formUpdateKvPairsForManagedCluster(config) if updatePlaceholders == "" { - return err + return fmt.Errorf("empty values found") } query := fmt.Sprintf(updateManagedClusterById, a.keyspace, updatePlaceholders) args := append(values, config.Id) - batch = a.client.Session().NewBatch(gocql.LoggedBatch) batch.Query(query, args...) - err = a.client.Session().ExecuteBatch(batch) } - return err + + if err := a.client.Session().ExecuteBatch(batch); err != nil { + return err + } + + return nil } func (a *Store) DeleteManagedClusterById(id string) error { diff --git a/capten/agent/internal/crossplane/cluster_claims.go b/capten/agent/internal/crossplane/cluster_claims.go index e77825f2..79246257 100644 --- a/capten/agent/internal/crossplane/cluster_claims.go +++ b/capten/agent/internal/crossplane/cluster_claims.go @@ -344,10 +344,10 @@ func (h *ClusterClaimSyncHandler) triggerClusterDelete(clusterName string, manag return fmt.Errorf("failed to send event to workflow to configure %s, %v", managedCluster.ClusterEndpoint, err) } - h.log.Infof("Crossplane project delete %s config workflow %s created", managedCluster.ClusterEndpoint, wkfId) - go h.monitorCrossplaneWorkflow(managedCluster, wkfId) + h.log.Infof("Crossplane project delete %s config workflow %s created", managedCluster.ClusterEndpoint, wkfId) + return nil } @@ -364,16 +364,19 @@ func (h *ClusterClaimSyncHandler) monitorCrossplaneWorkflow(managedCluster *capt h.log.Errorf("failed to send event to workflow to configure %s, %v", managedCluster.ClusterEndpoint, err) return } + h.log.Infof("Successfuly removed the %s app config from clusters", managedCluster.ClusterName) if err := h.dbStore.DeleteManagedClusterById(managedCluster.Id); err != nil { h.log.Errorf("failed to delete managed cluster from DB, %v", err) return } + h.log.Infof("Successfuly deleted managed cluster record for %s. cluster Id - %s", managedCluster.ClusterName, managedCluster.Id) if err = h.deleteManagedClusterCredential(context.TODO(), managedCluster.Id); err != nil { h.log.Errorf("failed to delete credential for %s, %v", managedCluster.Id, err) return } + h.log.Infof("Successfuly deleted managed cluster credential for %s. cluster Id - %s", managedCluster.ClusterName, managedCluster.Id) h.log.Infof("Crossplane project delete %s config workflow %s completed", managedCluster.ClusterEndpoint, wkfId) } diff --git a/capten/agent/internal/crossplane/package_providers.go b/capten/agent/internal/crossplane/package_providers.go index 33f815e5..7a305882 100644 --- a/capten/agent/internal/crossplane/package_providers.go +++ b/capten/agent/internal/crossplane/package_providers.go @@ -8,6 +8,8 @@ import ( "github.com/intelops/go-common/logging" captenstore "github.com/kube-tarian/kad/capten/agent/internal/capten-store" + "github.com/kube-tarian/kad/capten/agent/internal/temporalclient" + "github.com/kube-tarian/kad/capten/agent/internal/workers" "github.com/kube-tarian/kad/capten/agent/internal/pb/captenpluginspb" "github.com/kube-tarian/kad/capten/common-pkg/k8s" @@ -22,15 +24,25 @@ var ( type ProvidersSyncHandler struct { log logging.Logger + tc *temporalclient.Client dbStore *captenstore.Store } -func NewProvidersSyncHandler(log logging.Logger, dbStore *captenstore.Store) *ProvidersSyncHandler { - return &ProvidersSyncHandler{log: log, dbStore: dbStore} +func NewProvidersSyncHandler(log logging.Logger, dbStore *captenstore.Store) (*ProvidersSyncHandler, error) { + tc, err := temporalclient.NewClient(log) + if err != nil { + return nil, err + } + + return &ProvidersSyncHandler{log: log, dbStore: dbStore, tc: tc}, nil } func registerK8SProviderWatcher(log logging.Logger, dbStore *captenstore.Store, dynamicClient dynamic.Interface) error { - return k8s.RegisterDynamicInformers(NewProvidersSyncHandler(log, dbStore), dynamicClient, pgvk) + provider, err := NewProvidersSyncHandler(log, dbStore) + if err != nil { + return err + } + return k8s.RegisterDynamicInformers(provider, dynamicClient, pgvk) } func getProviderObj(obj any) (*model.Provider, error) { @@ -161,7 +173,48 @@ func (h *ProvidersSyncHandler) updateCrossplaneProvider(k8sProviders []model.Pro continue } h.log.Infof("updated the crossplane provider %s", k8sProvider.Name) + + err = h.triggerProviderUpdate(provider.ProviderName, provider) + if err != nil { + return fmt.Errorf("failed to trigger crossplane provider update workflow, %v", err) + } + + h.log.Infof("triggered crossplane provider update workflow for provider %s", provider.ProviderName) } } return nil } + +func (h *ProvidersSyncHandler) triggerProviderUpdate(clusterName string, provider model.CrossplaneProvider) error { + wd := workers.NewConfig(h.tc, h.log) + + proj, err := h.dbStore.GetCrossplaneProject() + if err != nil { + return err + } + ci := model.CrossplaneClusterUpdate{RepoURL: proj.GitProjectUrl, GitProjectId: proj.GitProjectId, + ManagedClusterName: clusterName, ManagedClusterId: provider.Id} + + wkfId, err := wd.SendAsyncEvent(context.TODO(), &model.ConfigureParameters{Resource: model.CrossPlaneResource, Action: model.CrossPlaneProviderUpdate}, ci) + if err != nil { + return fmt.Errorf("failed to send event to crossplane provider update workflow to configure %s, %v", provider.ProviderName, err) + } + + h.log.Infof("Crossplane provider update %s config workflow %s created", provider.ProviderName, wkfId) + + go h.monitorProviderUpdateWorkflow(&provider, wkfId) + + return nil +} + +func (h *ProvidersSyncHandler) monitorProviderUpdateWorkflow(provider *model.CrossplaneProvider, wkfId string) { + // during system reboot start monitoring, add it in map or somewhere. + wd := workers.NewConfig(h.tc, h.log) + _, err := wd.GetWorkflowInformation(context.TODO(), wkfId) + if err != nil { + h.log.Errorf("failed to send crossplane provider update event to workflow to configure %s, %v", provider.ProviderName, err) + return + } + + h.log.Infof("Crossplane provider update %s config workflow %s completed", provider.ProviderName, wkfId) +} diff --git a/capten/agent/internal/job/crossplane_resources_sync.go b/capten/agent/internal/job/crossplane_resources_sync.go index b072b2e3..e253edbf 100644 --- a/capten/agent/internal/job/crossplane_resources_sync.go +++ b/capten/agent/internal/job/crossplane_resources_sync.go @@ -19,12 +19,16 @@ func NewCrossplaneResourcesSync(log logging.Logger, frequency string, dbStore *c if err != nil { return nil, err } + providerObj, err := crossplane.NewProvidersSyncHandler(log, dbStore) + if err != nil { + return nil, err + } return &CrossplaneResourcesSync{ log: log, frequency: frequency, dbStore: dbStore, clusterHandler: ccObj, - providerHandler: crossplane.NewProvidersSyncHandler(log, dbStore), + providerHandler: providerObj, }, nil } diff --git a/capten/config-worker/internal/crossplane/activity.go b/capten/config-worker/internal/crossplane/activity.go index f5e6063a..b1dadf45 100644 --- a/capten/config-worker/internal/crossplane/activity.go +++ b/capten/config-worker/internal/crossplane/activity.go @@ -71,6 +71,17 @@ func processConfigurationActivity(ctx context.Context, params model.ConfigurePar return status, fmt.Errorf("failed to configure crossplane project for %s", model.CrossPlaneClusterUpdate) } return status, nil + + case model.CrossPlaneProviderUpdate: + reqLocal := &model.CrossplaneClusterUpdate{} + if err := json.Unmarshal(payload, reqLocal); err != nil { + logger.Errorf("failed to unmarshall the crossplane provider update req for %s, %v", model.CrossPlaneProviderUpdate, err) + return string(model.WorkFlowStatusFailed), fmt.Errorf("failed to unmarshall the crossplane provider update req for %s", model.CrossPlaneProviderUpdate) + } + + logger.Info("Crossplane Provider update event triggered") + + return "", nil default: return string(model.WorkFlowStatusFailed), fmt.Errorf("invalid crossplane action") } diff --git a/capten/config-worker/internal/crossplane/config_cluster_updates.go b/capten/config-worker/internal/crossplane/config_cluster_updates.go index 7679b39c..4da34c6e 100644 --- a/capten/config-worker/internal/crossplane/config_cluster_updates.go +++ b/capten/config-worker/internal/crossplane/config_cluster_updates.go @@ -217,6 +217,10 @@ func (cp *CrossPlaneApp) configureClusterDelete(ctx context.Context, req *model. return string(agentmodel.WorkFlowStatusFailed), errors.WithMessage(err, "failed to remove cluster folder") } + if err := cp.helper.AddFilesToRepo([]string{"."}); err != nil { + return string(agentmodel.WorkFlowStatusFailed), errors.WithMessage(err, "failed to add git repo") + } + err = cp.helper.CommitRepoChanges() if err != nil { return string(agentmodel.WorkFlowStatusFailed), errors.WithMessage(err, "failed to commit git repo") @@ -249,13 +253,14 @@ func removeClusterValues(valuesFileName, clusterName string) error { clusters = *clusterConfig.Clusters } + newclusters := []Cluster{} for _, cluster := range clusters { if cluster.Name != clusterName { - clusters = append(clusters, cluster) + newclusters = append(newclusters, cluster) } } - clusterConfig.Clusters = &clusters + clusterConfig.Clusters = &newclusters jsonBytes, err := json.Marshal(clusterConfig) if err != nil { return err diff --git a/capten/model/crossplane_types.go b/capten/model/crossplane_types.go index a7447bd7..b5cbb697 100644 --- a/capten/model/crossplane_types.go +++ b/capten/model/crossplane_types.go @@ -9,11 +9,12 @@ import ( ) const ( - providerNamePrefix = "provider" - CrossPlaneResource = "crossplane" - CrossPlaneClusterUpdate = "crossplane-cluster-update" - CrossPlaneProjectSync = "crossplane-project-sync" - CrossPlaneProjectDelete = "crossplane-project-delete" + providerNamePrefix = "provider" + CrossPlaneResource = "crossplane" + CrossPlaneClusterUpdate = "crossplane-cluster-update" + CrossPlaneProjectSync = "crossplane-project-sync" + CrossPlaneProjectDelete = "crossplane-project-delete" + CrossPlaneProviderUpdate = "crossplane-provider-update" ) type CrossplaneProviderStatus string