Skip to content

Commit

Permalink
Merge pull request #366 from kube-tarian/feature/cluster_claim
Browse files Browse the repository at this point in the history
Crospplane provider update workflow added
  • Loading branch information
vramk23 committed Jan 14, 2024
2 parents 29ae1ac + 1d63403 commit 40e5917
Show file tree
Hide file tree
Showing 7 changed files with 110 additions and 26 deletions.
33 changes: 20 additions & 13 deletions capten/agent/internal/capten-store/managed_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,32 +11,39 @@ import (
)

const (
insertManagedCluster = "INSERT INTO %s.ManagedClusters(id, cluster_name, cluster_endpoint, cluster_deploy_status, app_deploy_status, last_update_time) VALUES (?,?,?,?,?,?)"
insertManagedClusterId = "INSERT INTO %s.ManagedClusters(id) VALUES (?)"
updateManagedClusterById = "UPDATE %s.ManagedClusters SET %s WHERE id=?"
deleteManagedClusterById = "DELETE FROM %s.ManagedClusters WHERE id= ?"
selectAllManagedClusters = "SELECT id, cluster_name, cluster_endpoint, cluster_deploy_status, app_deploy_status, last_update_time FROM %s.ManagedClusters"
selectAllManagedClustersByLabels = "SELECT id, cluster_name, cluster_endpoint, cluster_deploy_status, app_deploy_status, last_update_time FROM %s.ManagedClusters WHERE %s"
selectGetManagedClusterById = "SELECT id, cluster_name, cluster_endpoint, cluster_deploy_status, app_deploy_status, last_update_time FROM %s.ManagedClusters WHERE id=%s;"
insertManagedCluster = "INSERT INTO %s.ManagedClusters(id, cluster_name, cluster_endpoint, cluster_deploy_status, app_deploy_status, last_update_time) VALUES (?,?,?,?,?,?)"
insertManagedClusterId = "INSERT INTO %s.ManagedClusters(id) VALUES (?)"
updateManagedClusterById = "UPDATE %s.ManagedClusters SET %s WHERE id=?"
deleteManagedClusterById = "DELETE FROM %s.ManagedClusters WHERE id= ?"
selectAllManagedClusters = "SELECT id, cluster_name, cluster_endpoint, cluster_deploy_status, app_deploy_status, last_update_time FROM %s.ManagedClusters"
selectAllManagedClustersByLabels = "SELECT id, cluster_name, cluster_endpoint, cluster_deploy_status, app_deploy_status, last_update_time FROM %s.ManagedClusters WHERE %s"
selectGetManagedClusterById = "SELECT id, cluster_name, cluster_endpoint, cluster_deploy_status, app_deploy_status, last_update_time FROM %s.ManagedClusters WHERE id=%s;"
selectGetManagedClusterByClusterName = "SELECT id, cluster_name, cluster_endpoint, cluster_deploy_status, app_deploy_status, last_update_time FROM %s.ManagedClusters WHERE cluster_name=%s ALLOW FILTERING;"
)

func (a *Store) UpsertManagedCluster(config *captenpluginspb.ManagedCluster) error {
config.LastUpdateTime = time.Now().Format(time.RFC3339)
batch := a.client.Session().NewBatch(gocql.LoggedBatch)
batch.Query(fmt.Sprintf(insertManagedCluster, a.keyspace), config.Id, config.ClusterName, config.ClusterEndpoint, config.ClusterDeployStatus, config.AppDeployStatus, config.LastUpdateTime)
err := a.client.Session().ExecuteBatch(batch)

query := fmt.Sprintf(selectGetManagedClusterByClusterName, a.keyspace, config.ClusterName)
clusters, err := a.executeManagedClustersSelectQuery(query)
if err != nil {
batch.Query(fmt.Sprintf(insertManagedCluster, a.keyspace), config.Id, config.ClusterName, config.ClusterEndpoint, config.ClusterDeployStatus, config.AppDeployStatus, config.LastUpdateTime)
} else if len(clusters) > 0 {
updatePlaceholders, values := formUpdateKvPairsForManagedCluster(config)
if updatePlaceholders == "" {
return err
return fmt.Errorf("empty values found")
}
query := fmt.Sprintf(updateManagedClusterById, a.keyspace, updatePlaceholders)
args := append(values, config.Id)
batch = a.client.Session().NewBatch(gocql.LoggedBatch)
batch.Query(query, args...)
err = a.client.Session().ExecuteBatch(batch)
}
return err

if err := a.client.Session().ExecuteBatch(batch); err != nil {
return err
}

return nil
}

func (a *Store) DeleteManagedClusterById(id string) error {
Expand Down
7 changes: 5 additions & 2 deletions capten/agent/internal/crossplane/cluster_claims.go
Original file line number Diff line number Diff line change
Expand Up @@ -344,10 +344,10 @@ func (h *ClusterClaimSyncHandler) triggerClusterDelete(clusterName string, manag
return fmt.Errorf("failed to send event to workflow to configure %s, %v", managedCluster.ClusterEndpoint, err)
}

h.log.Infof("Crossplane project delete %s config workflow %s created", managedCluster.ClusterEndpoint, wkfId)

go h.monitorCrossplaneWorkflow(managedCluster, wkfId)

h.log.Infof("Crossplane project delete %s config workflow %s created", managedCluster.ClusterEndpoint, wkfId)

return nil
}

Expand All @@ -364,16 +364,19 @@ func (h *ClusterClaimSyncHandler) monitorCrossplaneWorkflow(managedCluster *capt
h.log.Errorf("failed to send event to workflow to configure %s, %v", managedCluster.ClusterEndpoint, err)
return
}
h.log.Infof("Successfuly removed the %s app config from clusters", managedCluster.ClusterName)

if err := h.dbStore.DeleteManagedClusterById(managedCluster.Id); err != nil {
h.log.Errorf("failed to delete managed cluster from DB, %v", err)
return
}
h.log.Infof("Successfuly deleted managed cluster record for %s. cluster Id - %s", managedCluster.ClusterName, managedCluster.Id)

if err = h.deleteManagedClusterCredential(context.TODO(), managedCluster.Id); err != nil {
h.log.Errorf("failed to delete credential for %s, %v", managedCluster.Id, err)
return
}
h.log.Infof("Successfuly deleted managed cluster credential for %s. cluster Id - %s", managedCluster.ClusterName, managedCluster.Id)

h.log.Infof("Crossplane project delete %s config workflow %s completed", managedCluster.ClusterEndpoint, wkfId)
}
Expand Down
59 changes: 56 additions & 3 deletions capten/agent/internal/crossplane/package_providers.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (

"github.com/intelops/go-common/logging"
captenstore "github.com/kube-tarian/kad/capten/agent/internal/capten-store"
"github.com/kube-tarian/kad/capten/agent/internal/temporalclient"
"github.com/kube-tarian/kad/capten/agent/internal/workers"

"github.com/kube-tarian/kad/capten/agent/internal/pb/captenpluginspb"
"github.com/kube-tarian/kad/capten/common-pkg/k8s"
Expand All @@ -22,15 +24,25 @@ var (

type ProvidersSyncHandler struct {
log logging.Logger
tc *temporalclient.Client
dbStore *captenstore.Store
}

func NewProvidersSyncHandler(log logging.Logger, dbStore *captenstore.Store) *ProvidersSyncHandler {
return &ProvidersSyncHandler{log: log, dbStore: dbStore}
func NewProvidersSyncHandler(log logging.Logger, dbStore *captenstore.Store) (*ProvidersSyncHandler, error) {
tc, err := temporalclient.NewClient(log)
if err != nil {
return nil, err
}

return &ProvidersSyncHandler{log: log, dbStore: dbStore, tc: tc}, nil
}

func registerK8SProviderWatcher(log logging.Logger, dbStore *captenstore.Store, dynamicClient dynamic.Interface) error {
return k8s.RegisterDynamicInformers(NewProvidersSyncHandler(log, dbStore), dynamicClient, pgvk)
provider, err := NewProvidersSyncHandler(log, dbStore)
if err != nil {
return err
}
return k8s.RegisterDynamicInformers(provider, dynamicClient, pgvk)
}

func getProviderObj(obj any) (*model.Provider, error) {
Expand Down Expand Up @@ -161,7 +173,48 @@ func (h *ProvidersSyncHandler) updateCrossplaneProvider(k8sProviders []model.Pro
continue
}
h.log.Infof("updated the crossplane provider %s", k8sProvider.Name)

err = h.triggerProviderUpdate(provider.ProviderName, provider)
if err != nil {
return fmt.Errorf("failed to trigger crossplane provider update workflow, %v", err)
}

h.log.Infof("triggered crossplane provider update workflow for provider %s", provider.ProviderName)
}
}
return nil
}

func (h *ProvidersSyncHandler) triggerProviderUpdate(clusterName string, provider model.CrossplaneProvider) error {
wd := workers.NewConfig(h.tc, h.log)

proj, err := h.dbStore.GetCrossplaneProject()
if err != nil {
return err
}
ci := model.CrossplaneClusterUpdate{RepoURL: proj.GitProjectUrl, GitProjectId: proj.GitProjectId,
ManagedClusterName: clusterName, ManagedClusterId: provider.Id}

wkfId, err := wd.SendAsyncEvent(context.TODO(), &model.ConfigureParameters{Resource: model.CrossPlaneResource, Action: model.CrossPlaneProviderUpdate}, ci)
if err != nil {
return fmt.Errorf("failed to send event to crossplane provider update workflow to configure %s, %v", provider.ProviderName, err)
}

h.log.Infof("Crossplane provider update %s config workflow %s created", provider.ProviderName, wkfId)

go h.monitorProviderUpdateWorkflow(&provider, wkfId)

return nil
}

func (h *ProvidersSyncHandler) monitorProviderUpdateWorkflow(provider *model.CrossplaneProvider, wkfId string) {
// during system reboot start monitoring, add it in map or somewhere.
wd := workers.NewConfig(h.tc, h.log)
_, err := wd.GetWorkflowInformation(context.TODO(), wkfId)
if err != nil {
h.log.Errorf("failed to send crossplane provider update event to workflow to configure %s, %v", provider.ProviderName, err)
return
}

h.log.Infof("Crossplane provider update %s config workflow %s completed", provider.ProviderName, wkfId)
}
6 changes: 5 additions & 1 deletion capten/agent/internal/job/crossplane_resources_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,16 @@ func NewCrossplaneResourcesSync(log logging.Logger, frequency string, dbStore *c
if err != nil {
return nil, err
}
providerObj, err := crossplane.NewProvidersSyncHandler(log, dbStore)
if err != nil {
return nil, err
}
return &CrossplaneResourcesSync{
log: log,
frequency: frequency,
dbStore: dbStore,
clusterHandler: ccObj,
providerHandler: crossplane.NewProvidersSyncHandler(log, dbStore),
providerHandler: providerObj,
}, nil
}

Expand Down
11 changes: 11 additions & 0 deletions capten/config-worker/internal/crossplane/activity.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,17 @@ func processConfigurationActivity(ctx context.Context, params model.ConfigurePar
return status, fmt.Errorf("failed to configure crossplane project for %s", model.CrossPlaneClusterUpdate)
}
return status, nil

case model.CrossPlaneProviderUpdate:
reqLocal := &model.CrossplaneClusterUpdate{}
if err := json.Unmarshal(payload, reqLocal); err != nil {
logger.Errorf("failed to unmarshall the crossplane provider update req for %s, %v", model.CrossPlaneProviderUpdate, err)
return string(model.WorkFlowStatusFailed), fmt.Errorf("failed to unmarshall the crossplane provider update req for %s", model.CrossPlaneProviderUpdate)
}

logger.Info("Crossplane Provider update event triggered")

return "", nil
default:
return string(model.WorkFlowStatusFailed), fmt.Errorf("invalid crossplane action")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,10 @@ func (cp *CrossPlaneApp) configureClusterDelete(ctx context.Context, req *model.
return string(agentmodel.WorkFlowStatusFailed), errors.WithMessage(err, "failed to remove cluster folder")
}

if err := cp.helper.AddFilesToRepo([]string{"."}); err != nil {
return string(agentmodel.WorkFlowStatusFailed), errors.WithMessage(err, "failed to add git repo")
}

err = cp.helper.CommitRepoChanges()
if err != nil {
return string(agentmodel.WorkFlowStatusFailed), errors.WithMessage(err, "failed to commit git repo")
Expand Down Expand Up @@ -249,13 +253,14 @@ func removeClusterValues(valuesFileName, clusterName string) error {
clusters = *clusterConfig.Clusters
}

newclusters := []Cluster{}
for _, cluster := range clusters {
if cluster.Name != clusterName {
clusters = append(clusters, cluster)
newclusters = append(newclusters, cluster)
}
}

clusterConfig.Clusters = &clusters
clusterConfig.Clusters = &newclusters
jsonBytes, err := json.Marshal(clusterConfig)
if err != nil {
return err
Expand Down
11 changes: 6 additions & 5 deletions capten/model/crossplane_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,12 @@ import (
)

const (
providerNamePrefix = "provider"
CrossPlaneResource = "crossplane"
CrossPlaneClusterUpdate = "crossplane-cluster-update"
CrossPlaneProjectSync = "crossplane-project-sync"
CrossPlaneProjectDelete = "crossplane-project-delete"
providerNamePrefix = "provider"
CrossPlaneResource = "crossplane"
CrossPlaneClusterUpdate = "crossplane-cluster-update"
CrossPlaneProjectSync = "crossplane-project-sync"
CrossPlaneProjectDelete = "crossplane-project-delete"
CrossPlaneProviderUpdate = "crossplane-provider-update"
)

type CrossplaneProviderStatus string
Expand Down

0 comments on commit 40e5917

Please sign in to comment.