From 1a3db53000cb00a828a425eabd28b20381b06645 Mon Sep 17 00:00:00 2001 From: Venkatreddy KP Date: Mon, 13 Nov 2023 00:09:31 +0530 Subject: [PATCH] refactor crossplane resources synch jobs --- capten/agent/cmd/agent/main.go | 62 +++---- capten/agent/pkg/agent/agent.go | 9 +- .../pkg/agent/agent_cluster_apps_test.go | 2 +- capten/agent/pkg/config/config.go | 11 +- .../crossplane/crossplane_cluster_claims.go | 155 ++++++++++++++++ .../pkg/crossplane/crossplane_providers.go | 104 +++++++++++ .../pkg/job/crossplane_resources_sync.go | 30 +++ capten/agent/pkg/job/schduler.go | 87 +++++++++ capten/agent/pkg/sync/cluster_cliam.go | 174 ------------------ capten/agent/pkg/sync/crossplane_providers.go | 124 ------------- .../agent/pkg/util/synch_cassandra_secret.go | 2 +- capten/common-pkg/k8s/client.go | 44 ++--- capten/common-pkg/k8s/model.go | 19 +- capten/common-pkg/plugins/argocd/client.go | 4 +- .../fetcher/{fetcher.go => credential.go} | 20 +- .../worker-framework/event_processor.go | 17 -- 16 files changed, 437 insertions(+), 427 deletions(-) create mode 100644 capten/agent/pkg/crossplane/crossplane_cluster_claims.go create mode 100644 capten/agent/pkg/crossplane/crossplane_providers.go create mode 100644 capten/agent/pkg/job/crossplane_resources_sync.go create mode 100644 capten/agent/pkg/job/schduler.go delete mode 100644 capten/agent/pkg/sync/cluster_cliam.go delete mode 100644 capten/agent/pkg/sync/crossplane_providers.go rename capten/common-pkg/plugins/fetcher/{fetcher.go => credential.go} (80%) diff --git a/capten/agent/cmd/agent/main.go b/capten/agent/cmd/agent/main.go index af978c1d..469f97dc 100644 --- a/capten/agent/cmd/agent/main.go +++ b/capten/agent/cmd/agent/main.go @@ -9,14 +9,13 @@ import ( "google.golang.org/grpc" - "github.com/robfig/cron/v3" - "github.com/intelops/go-common/logging" "github.com/kube-tarian/kad/capten/agent/pkg/agent" + captenstore "github.com/kube-tarian/kad/capten/agent/pkg/capten-store" "github.com/kube-tarian/kad/capten/agent/pkg/config" + "github.com/kube-tarian/kad/capten/agent/pkg/job" "github.com/kube-tarian/kad/capten/agent/pkg/pb/agentpb" "github.com/kube-tarian/kad/capten/agent/pkg/pb/captenpluginspb" - captenagentsync "github.com/kube-tarian/kad/capten/agent/pkg/sync" "github.com/kube-tarian/kad/capten/agent/pkg/util" dbinit "github.com/kube-tarian/kad/capten/common-pkg/cassandra/db-init" dbmigrate "github.com/kube-tarian/kad/capten/common-pkg/cassandra/db-migrate" @@ -41,7 +40,14 @@ func main() { log.Fatalf("%v", err) } - s, err := agent.NewAgent(log, cfg) + as, err := captenstore.NewStore(log) + if err != nil { + // ignoring store failure until DB user creation working + // return nil, err + log.Errorf("failed to initialize store, %v", err) + } + + s, err := agent.NewAgent(log, cfg, as) if err != nil { log.Fatalf("Agent initialization failed, %v", err) } @@ -71,15 +77,13 @@ func main() { } }() - cronjob, err := initializeCronJobs(cfg.CronInterval) + jobScheduler, err := initializeJobScheduler(cfg, as) if err != nil { log.Fatalf("Failed to create cron job: %v", err) } - cronjob.Start() - defer cronjob.Stop() - - log.Info("syncing clusterClaim started successfully...") + jobScheduler.Start() + defer jobScheduler.Stop() signals := make(chan os.Signal, 1) signal.Notify(signals, syscall.SIGINT, syscall.SIGTERM) @@ -104,33 +108,19 @@ func configureDB() error { return nil } -func initializeCronJobs(crontInterval string) (*cron.Cron, error) { - cronJob := cron.New(cron.WithChain(cron.SkipIfStillRunning(cron.DefaultLogger), cron.Recover(cron.DefaultLogger))) - - fetch, err := captenagentsync.NewFetch() - if err != nil { - log.Errorf("Failed to initialize the sync: %v", err) - return nil, err - } - - _, jobErr := cronJob.AddJob(fmt.Sprintf(StrInterval, crontInterval), fetch) - if jobErr != nil { - log.Errorf("Failed to add cronJob for sync clusterClaim: %v", jobErr) - return nil, jobErr - } - - fetchCrossPlaneProviders, err := captenagentsync.NewFetchCrossPlaneProviders() - if err != nil { - log.Errorf("Failed to initialize the sync: %v", err) - return nil, err - } - - _, crossPlaneJobErr := cronJob.AddJob(fmt.Sprintf(StrInterval, crontInterval), fetchCrossPlaneProviders) - if jobErr != nil { - log.Errorf("Failed to add cronJob for sync clusterClaim: %v", crossPlaneJobErr) - return nil, jobErr +func initializeJobScheduler(cfg *config.SericeConfig, as *captenstore.Store) (*job.Scheduler, error) { + s := job.NewScheduler(log) + if cfg.CrossplaneSyncJobEnabled { + cs, err := job.NewCrossplaneResourcesSync(log, cfg.CrossplaneSyncJobInterval, as) + if err != nil { + log.Fatal("failed to init crossplane resources sync job", err) + } + err = s.AddJob("crossplane-resources-synch", cs) + if err != nil { + log.Fatal("failed to add crossplane resources sync job", err) + } } - log.Info("successfully initialized the cron") - return cronJob, nil + log.Info("successfully initialized job scheduler") + return s, nil } diff --git a/capten/agent/pkg/agent/agent.go b/capten/agent/pkg/agent/agent.go index c1c26947..f4b098ae 100644 --- a/capten/agent/pkg/agent/agent.go +++ b/capten/agent/pkg/agent/agent.go @@ -23,7 +23,7 @@ type Agent struct { createPr bool } -func NewAgent(log logging.Logger, cfg *config.SericeConfig) (*Agent, error) { +func NewAgent(log logging.Logger, cfg *config.SericeConfig, as *captenstore.Store) (*Agent, error) { var tc *temporalclient.Client var err error @@ -32,13 +32,6 @@ func NewAgent(log logging.Logger, cfg *config.SericeConfig) (*Agent, error) { return nil, err } - as, err := captenstore.NewStore(log) - if err != nil { - // ignoring store failure until DB user creation working - // return nil, err - log.Errorf("failed to initialize store, %v", err) - } - agent := &Agent{ tc: tc, as: as, diff --git a/capten/agent/pkg/agent/agent_cluster_apps_test.go b/capten/agent/pkg/agent/agent_cluster_apps_test.go index 4ec3a140..88a9339a 100644 --- a/capten/agent/pkg/agent/agent_cluster_apps_test.go +++ b/capten/agent/pkg/agent/agent_cluster_apps_test.go @@ -27,7 +27,7 @@ func TestAgentTestSuite(t *testing.T) { t.Fatal(err) }*/ - agent, err := NewAgent(agentSuite.logger, &config.SericeConfig{}) + agent, err := NewAgent(agentSuite.logger, &config.SericeConfig{}, nil) if err != nil { t.Fatal(err) } diff --git a/capten/agent/pkg/config/config.go b/capten/agent/pkg/config/config.go index 7e29970c..e4058a3b 100644 --- a/capten/agent/pkg/config/config.go +++ b/capten/agent/pkg/config/config.go @@ -5,11 +5,12 @@ import ( ) type SericeConfig struct { - Host string `envconfig:"HOST" default:"0.0.0.0"` - Port int `envconfig:"PORT" default:"9091"` - Mode string `envconfig:"MODE" default:"production"` - AuthEnabled bool `envconfig:"AUTH_ENABLED" default:"false"` - CronInterval string `envconfig:"CRON_INTERNAL" default:"60"` + Host string `envconfig:"HOST" default:"0.0.0.0"` + Port int `envconfig:"PORT" default:"9091"` + Mode string `envconfig:"MODE" default:"production"` + AuthEnabled bool `envconfig:"AUTH_ENABLED" default:"false"` + CrossplaneSyncJobEnabled bool `envconfig:"CROSSPLANE_SYNC_JOB_ENABLED" default:"true"` + CrossplaneSyncJobInterval string `envconfig:"CROSSPLANE_SYNC_JOB_INTERVAL" default:"@every 1m"` } func GetServiceConfig() (*SericeConfig, error) { diff --git a/capten/agent/pkg/crossplane/crossplane_cluster_claims.go b/capten/agent/pkg/crossplane/crossplane_cluster_claims.go new file mode 100644 index 00000000..fa046a96 --- /dev/null +++ b/capten/agent/pkg/crossplane/crossplane_cluster_claims.go @@ -0,0 +1,155 @@ +package crossplane + +import ( + "context" + "encoding/json" + "fmt" + "time" + + "github.com/google/uuid" + "github.com/intelops/go-common/credentials" + "github.com/intelops/go-common/logging" + captenstore "github.com/kube-tarian/kad/capten/agent/pkg/capten-store" + + "github.com/kube-tarian/kad/capten/agent/pkg/pb/captenpluginspb" + + "github.com/kube-tarian/kad/capten/agent/pkg/model" + "github.com/kube-tarian/kad/capten/common-pkg/k8s" + "k8s.io/apimachinery/pkg/runtime/schema" +) + +var ( + statusType = "Ready" + statusValue = "True" + clusterName = "%s-%s" + clusterSecretName = "%s-cluster" + kubeConfig = "kubeconfig" + k8sEndpoint = "endpoint" + k8sClusterCA = "clusterCA" + managedClusterEntityName = "managedcluster" +) + +type ClusterClaimSyncHandler struct { + log logging.Logger + dbStore *captenstore.Store + clusters map[string]*captenpluginspb.ManagedCluster +} + +func NewClusterClaimSyncHandler(log logging.Logger, dbStore *captenstore.Store) (*ClusterClaimSyncHandler, error) { + return &ClusterClaimSyncHandler{log: log, dbStore: dbStore, clusters: map[string]*captenpluginspb.ManagedCluster{}}, nil +} + +func (h *ClusterClaimSyncHandler) Sync() error { + h.log.Debug("started to sync cluster-claims resources") + + k8sclient, err := k8s.NewK8SClient(h.log) + if err != nil { + return fmt.Errorf("failed to initalize k8s client: %v", err) + } + + objList, err := k8sclient.DynamicClient.ListAllNamespaceResource(context.TODO(), + schema.GroupVersionResource{Group: "prodready.cluster", Version: "v1alpha1", Resource: "clusterclaims"}) + if err != nil { + return fmt.Errorf("failed to list cluster claim resources, %v", err) + } + + clusterClaimByte, err := json.Marshal(objList) + if err != nil { + return fmt.Errorf("failed to marshal cluster claim resources, %v", err) + } + + var clObj model.ClusterClaimList + err = json.Unmarshal(clusterClaimByte, &clObj) + if err != nil { + return fmt.Errorf("failed to unmarshal cluster claim resources, %v", err) + } + + if err = h.updateManagedClusters(k8sclient, clObj.Items); err != nil { + return fmt.Errorf("failed to update clusters in DB, %v", err) + } + h.log.Info("cluster-claims resources synched") + return nil +} + +func (h *ClusterClaimSyncHandler) updateManagedClusters(k8sClient *k8s.K8SClient, clObj []model.ClusterClaim) error { + credAdmin, err := credentials.NewCredentialAdmin(context.TODO()) + if err != nil { + return err + } + + clusters, err := h.getManagedClusters() + if err != nil { + return fmt.Errorf("failed to get managed clusters from DB, %v", err) + } + + for _, obj := range clObj { + for _, status := range obj.Status.Conditions { + if status.Type != statusType { + continue + } + + if status.Status != statusValue { + h.log.Info("%s in namespace %s, status is %s, so skiping update to db.", + obj.Spec.Id, obj.Metadata.Namespace, status.Status) + continue + } + + secretName := fmt.Sprintf(clusterSecretName, obj.Spec.Id) + resp, err := k8sClient.GetSecretData(obj.Metadata.Namespace, secretName) + if err != nil { + h.log.Info("failed to get secret %s in namespace %s, %v", + secretName, obj.Metadata.Namespace, err) + continue + } + + clusterEndpoint := resp.Data[k8sEndpoint] + + managedCluster := &captenpluginspb.ManagedCluster{} + clusterObj, ok := h.clusters[clusterEndpoint] + if !ok { + managedCluster.Id = uuid.New().String() + } else { + h.log.Info("found existing Id: %s, updating the latest information ", clusterObj.Id) + managedCluster.Id = clusterObj.Id + } + + managedCluster.ClusterName = fmt.Sprintf(clusterName, obj.Spec.Id, obj.Metadata.Namespace) + managedCluster.ClusterDeployStatus = status.Status + managedCluster.ClusterEndpoint = clusterEndpoint + + clusterDetails := map[string]string{} + clusterDetails[kubeConfig] = resp.Data[kubeConfig] + clusterDetails[k8sClusterCA] = resp.Data[k8sClusterCA] + + err = credAdmin.PutCredential(context.TODO(), credentials.GenericCredentialType, managedClusterEntityName, managedCluster.Id, clusterDetails) + + if err != nil { + h.log.Audit("security", "storecred", "failed", "system", "failed to store crendential for %s", managedCluster.Id) + h.log.Errorf("failed to store credential for %s, %v", managedCluster.Id, err) + continue + } + + managedCluster.LastUpdateTime = time.Now().Format(time.RFC3339) + err = h.dbStore.UpsertManagedCluster(managedCluster) + if err != nil { + h.log.Info("failed to update information to db, %v", err) + continue + } + clusters[clusterEndpoint] = managedCluster + } + } + return nil +} + +func (h *ClusterClaimSyncHandler) getManagedClusters() (map[string]*captenpluginspb.ManagedCluster, error) { + clusters, err := h.dbStore.GetManagedClusters() + if err != nil { + return nil, err + } + + clusterEndpointMap := map[string]*captenpluginspb.ManagedCluster{} + for _, cluster := range clusters { + clusterEndpointMap[cluster.ClusterEndpoint] = cluster + } + return clusterEndpointMap, nil +} diff --git a/capten/agent/pkg/crossplane/crossplane_providers.go b/capten/agent/pkg/crossplane/crossplane_providers.go new file mode 100644 index 00000000..593f2620 --- /dev/null +++ b/capten/agent/pkg/crossplane/crossplane_providers.go @@ -0,0 +1,104 @@ +package crossplane + +import ( + "context" + "encoding/json" + "fmt" + + "github.com/intelops/go-common/credentials" + "github.com/intelops/go-common/logging" + captenstore "github.com/kube-tarian/kad/capten/agent/pkg/capten-store" + + "github.com/kube-tarian/kad/capten/agent/pkg/model" + "github.com/kube-tarian/kad/capten/agent/pkg/pb/captenpluginspb" + "github.com/kube-tarian/kad/capten/common-pkg/k8s" + "k8s.io/apimachinery/pkg/runtime/schema" +) + +const ( + providerNamePrefix = "provider-" +) + +type ProvidersSyncHandler struct { + log logging.Logger + client *k8s.K8SClient + dbStore *captenstore.Store + creds credentials.CredentialAdmin +} + +func NewProvidersSyncHandler(log logging.Logger, dbStore *captenstore.Store) (*ProvidersSyncHandler, error) { + return &ProvidersSyncHandler{log: log, dbStore: dbStore}, nil +} + +func (h *ProvidersSyncHandler) Sync() error { + h.log.Debug("started to sync CrossplaneProvider resources") + + k8sclient, err := k8s.NewK8SClient(h.log) + if err != nil { + return fmt.Errorf("failed to initalize k8s client: %v", err) + } + + objList, err := k8sclient.DynamicClient.ListAllNamespaceResource(context.TODO(), + schema.GroupVersionResource{Group: "pkg.crossplane.io", Version: "v1", Resource: "providers"}) + if err != nil { + return fmt.Errorf("failed to fetch providers resources, %v", err) + } + + providers, err := json.Marshal(objList) + if err != nil { + return fmt.Errorf("failed to marshall the data, err:", err) + } + + var providerObj model.ProviderList + err = json.Unmarshal(providers, &providerObj) + if err != nil { + return fmt.Errorf("failed to un-marshall the data, err:", err) + } + + if err = h.updateCrossplaneProvider(providerObj.Items); err != nil { + return fmt.Errorf("failed to update providers in DB, %v", err) + } + h.log.Debug("Crossplane Provider resources synched") + return nil +} + +func (h *ProvidersSyncHandler) updateCrossplaneProvider(clObj []model.Provider) error { + prvList, err := h.dbStore.GetCrossplaneProviders() + if err != nil { + return fmt.Errorf("failed to get Crossplane Providers, %v", err) + } + + prvMap := make(map[string]*captenpluginspb.CrossplaneProvider) + for _, prov := range prvList { + prvMap[providerNamePrefix+prov.ProviderName] = prov + } + + for _, obj := range clObj { + for _, status := range obj.Status.Conditions { + if status.Type != model.TypeHealthy { + continue + } + + prvObj, ok := prvMap[obj.Name] + if !ok { + h.log.Infof("Provider name %s is not found in the db, skipping the update", obj.Name) + continue + } + + provider := model.CrossplaneProvider{ + Id: prvObj.Id, + Status: string(status.Type), + CloudType: prvObj.CloudType, + CloudProviderId: prvObj.CloudProviderId, + ProviderName: prvObj.ProviderName, + } + + if err := h.dbStore.UpdateCrossplaneProvider(&provider); err != nil { + h.log.Errorf("failed to update provider %s details in db, %v", prvObj.ProviderName, err) + continue + } + h.log.Infof("successfully updated the details for %s", prvObj.ProviderName) + } + } + return nil +} diff --git a/capten/agent/pkg/job/crossplane_resources_sync.go b/capten/agent/pkg/job/crossplane_resources_sync.go new file mode 100644 index 00000000..533f6da5 --- /dev/null +++ b/capten/agent/pkg/job/crossplane_resources_sync.go @@ -0,0 +1,30 @@ +package job + +import ( + "github.com/intelops/go-common/logging" + captenstore "github.com/kube-tarian/kad/capten/agent/pkg/capten-store" +) + +type CrossplaneResourcesSync struct { + dbStore *captenstore.Store + log logging.Logger + frequency string +} + +func NewCrossplaneResourcesSync(log logging.Logger, frequency string, dbStore *captenstore.Store) (*CrossplaneResourcesSync, error) { + return &CrossplaneResourcesSync{ + log: log, + frequency: frequency, + dbStore: dbStore, + }, nil +} + +func (v *CrossplaneResourcesSync) CronSpec() string { + return v.frequency +} + +func (v *CrossplaneResourcesSync) Run() { + v.log.Debug("started crossplane resource sync job") + + v.log.Debug("crossplane resource sync job completed") +} diff --git a/capten/agent/pkg/job/schduler.go b/capten/agent/pkg/job/schduler.go new file mode 100644 index 00000000..442667b5 --- /dev/null +++ b/capten/agent/pkg/job/schduler.go @@ -0,0 +1,87 @@ +package job + +import ( + "github.com/pkg/errors" + "github.com/robfig/cron/v3" + "sync" + + "github.com/intelops/go-common/logging" +) + +type jobHandler interface { + CronSpec() string + Run() +} + +type Scheduler struct { + log logging.Logger + jobs map[string]jobHandler + cronIDs map[string]cron.EntryID + c *cron.Cron + cronMutex *sync.Mutex +} + +func NewScheduler(log logging.Logger) *Scheduler { + clog := cron.VerbosePrintfLogger(log.(logging.StdLogger)) + return &Scheduler{ + log: log, + c: cron.New(cron.WithChain(cron.SkipIfStillRunning(clog), cron.Recover(clog))), + jobs: map[string]jobHandler{}, + cronIDs: map[string]cron.EntryID{}, + cronMutex: &sync.Mutex{}, + } +} + +func (t *Scheduler) AddJob(jobName string, job jobHandler) error { + t.cronMutex.Lock() + defer t.cronMutex.Unlock() + _, ok := t.cronIDs[jobName] + if ok { + return errors.Errorf("%s job already exists", jobName) + } + spec := job.CronSpec() + if spec == "" { + return errors.Errorf("%s job has no cron spec", jobName) + } + entryID, err := t.c.AddJob(spec, job) + if err != nil { + return errors.WithMessagef(err, "%s job cron spec not valid", jobName) + } + + t.jobs[jobName] = job + t.cronIDs[jobName] = entryID + t.log.Infof("%s job added with cron '%s'", jobName, spec) + return nil +} + +// RemoveJob ... +func (t *Scheduler) RemoveJob(jobName string) error { + t.cronMutex.Lock() + defer t.cronMutex.Unlock() + entryID, ok := t.cronIDs[jobName] + if !ok { + return errors.Errorf("%s job not exist", jobName) + } + + t.c.Remove(entryID) + delete(t.jobs, jobName) + delete(t.cronIDs, jobName) + t.log.Infof("%s job removed", jobName) + return nil +} + +func (t *Scheduler) Start() { + t.c.Start() + t.log.Infof("Job scheduler started") +} + +func (t *Scheduler) Stop() { + t.c.Stop() + t.log.Infof("Job scheduler stopped") +} + +func (t *Scheduler) GetJobs() map[string]jobHandler { + t.cronMutex.Lock() + defer t.cronMutex.Unlock() + return t.jobs +} diff --git a/capten/agent/pkg/sync/cluster_cliam.go b/capten/agent/pkg/sync/cluster_cliam.go deleted file mode 100644 index 92eaf1ba..00000000 --- a/capten/agent/pkg/sync/cluster_cliam.go +++ /dev/null @@ -1,174 +0,0 @@ -package sync - -import ( - "context" - "encoding/json" - "fmt" - "time" - - "github.com/google/uuid" - "github.com/intelops/go-common/credentials" - "github.com/intelops/go-common/logging" - captenstore "github.com/kube-tarian/kad/capten/agent/pkg/capten-store" - - pb "github.com/kube-tarian/kad/capten/agent/pkg/pb/captenpluginspb" - - "github.com/kube-tarian/kad/capten/agent/pkg/model" - "github.com/kube-tarian/kad/capten/common-pkg/k8s" - "k8s.io/apimachinery/pkg/runtime/schema" -) - -var ( - statusType = "Ready" - statusValue = "True" - clusterName = "%s-%s" - clusterSecretName = "%s-cluster" - kubeConfig = "kubeconfig" - k8sEndpoint = "endpoint" - k8sClusterCA = "clusterCA" - managedClusterEntityName = "managedcluster" -) - -type Fetch struct { - log logging.Logger - client *k8s.K8SClient - db *captenstore.Store - creds credentials.CredentialAdmin - avlClusters map[string]*pb.ManagedCluster -} - -func NewFetch() (*Fetch, error) { - log := logging.NewLogger() - db, err := captenstore.NewStore(log) - if err != nil { - // ignoring store failure until DB user creation working - // return nil, err - log.Errorf("failed to initialize store, %v", err) - } - - k8sclient, err := k8s.NewK8SClient(log) - if err != nil { - return nil, fmt.Errorf("failed to initalize k8s client: %v", err) - } - - credAdmin, err := credentials.NewCredentialAdmin(context.TODO()) - if err != nil { - log.Audit("security", "storecred", "failed", "system", "failed to intialize credentials client") - return nil, err - } - - avlClusters, err := getManagedClusterEndpointMap(db) - if err != nil { - return nil, fmt.Errorf("failed to execute getManagedClusterEndpointMap, err: %v", err) - } - - return &Fetch{log: log, client: k8sclient, db: db, creds: credAdmin, avlClusters: avlClusters}, nil -} - -// Run ... -func (fetch *Fetch) Run() { - fetch.log.Info("started to sync cluster-claims resources") - - objList, err := fetch.client.DynamicClient.ListAllNamespaceResource(context.TODO(), schema.GroupVersionResource{Group: "prodready.cluster", Version: "v1alpha1", Resource: "clusterclaims"}) - if err != nil { - fetch.log.Error("Failed to fetch all the resource, err:", err) - - return - } - - clusterClaimByte, err := json.Marshal(objList) - if err != nil { - fetch.log.Error("Failed to marshall the data, err:", err) - - return - } - - var clObj model.ClusterClaimList - err = json.Unmarshal(clusterClaimByte, &clObj) - if err != nil { - fetch.log.Error("Failed to un-marshall the data, err:", err) - - return - } - - fetch.UpdateClusterDetails(clObj.Items) - - fetch.log.Info("succesfully sync-ed cluster-claims resources") -} - -func (fetch *Fetch) UpdateClusterDetails(clObj []model.ClusterClaim) { - for _, obj := range clObj { - for _, status := range obj.Status.Conditions { - if status.Type != statusType { - continue - } - - if status.Status != statusValue { - fetch.log.Info("%s in namespace %s, status is %s, so skiping update to db.", - obj.Spec.Id, obj.Metadata.Namespace, status.Status) - continue - } - - // get the cluster endpoint and kubeconfig file from the secrets - req := &k8s.SecretDetailsRequest{Namespace: obj.Metadata.Namespace, - SecretName: fmt.Sprintf(clusterSecretName, obj.Spec.Id)} - resp, err := fetch.client.FetchSecretDetails(req) - if err != nil { - fetch.log.Info("%s in namespace %s, failed to get secret: %v", - req.SecretName, req.Namespace, err) - continue - } - - clusterEndpoint := resp.Data[k8sEndpoint] - - managedCluster := &pb.ManagedCluster{} - clusterObj, ok := fetch.avlClusters[clusterEndpoint] - if !ok { - managedCluster.Id = uuid.New().String() - } else { - fetch.log.Info("found existing Id: %s, updating the latest information ", clusterObj.Id) - managedCluster.Id = clusterObj.Id - } - - managedCluster.ClusterName = fmt.Sprintf(clusterName, obj.Spec.Id, obj.Metadata.Namespace) - managedCluster.ClusterDeployStatus = status.Status - managedCluster.ClusterEndpoint = clusterEndpoint - - clusterDetails := map[string]string{} - clusterDetails[kubeConfig] = resp.Data[kubeConfig] - clusterDetails[k8sClusterCA] = resp.Data[k8sClusterCA] - - err = fetch.creds.PutCredential(context.TODO(), credentials.GenericCredentialType, managedClusterEntityName, managedCluster.Id, clusterDetails) - - if err != nil { - fetch.log.Audit("security", "storecred", "failed", "system", "failed to store crendential for %s", managedCluster.Id) - fetch.log.Errorf("failed to store credential for %s, %v", managedCluster.Id, err) - continue - } - - managedCluster.LastUpdateTime = time.Now().Format(time.RFC3339) - err = fetch.db.UpsertManagedCluster(managedCluster) - if err != nil { - fetch.log.Info("failed to update information to db: %v", err) - continue - } - - fetch.avlClusters[clusterEndpoint] = managedCluster - } - - } -} - -func getManagedClusterEndpointMap(db *captenstore.Store) (map[string]*pb.ManagedCluster, error) { - clusters, err := db.GetManagedClusters() - if err != nil { - return nil, fmt.Errorf("failed to get the managed cluster information from db: %v", err) - } - - clusterEndpointMap := map[string]*pb.ManagedCluster{} - for _, cluster := range clusters { - clusterEndpointMap[cluster.ClusterEndpoint] = cluster - } - - return clusterEndpointMap, nil -} diff --git a/capten/agent/pkg/sync/crossplane_providers.go b/capten/agent/pkg/sync/crossplane_providers.go deleted file mode 100644 index 2172b1cc..00000000 --- a/capten/agent/pkg/sync/crossplane_providers.go +++ /dev/null @@ -1,124 +0,0 @@ -package sync - -import ( - "context" - "encoding/json" - "fmt" - - "github.com/intelops/go-common/credentials" - "github.com/intelops/go-common/logging" - captenstore "github.com/kube-tarian/kad/capten/agent/pkg/capten-store" - - "github.com/kube-tarian/kad/capten/agent/pkg/model" - "github.com/kube-tarian/kad/capten/agent/pkg/pb/captenpluginspb" - "github.com/kube-tarian/kad/capten/common-pkg/k8s" - "k8s.io/apimachinery/pkg/runtime/schema" -) - -const ( - providerNamePrefix = "provider-" -) - -type FetchCrossPlaneProviders struct { - log logging.Logger - client *k8s.K8SClient - db *captenstore.Store - creds credentials.CredentialAdmin -} - -func NewFetchCrossPlaneProviders() (*FetchCrossPlaneProviders, error) { - log := logging.NewLogger() - db, err := captenstore.NewStore(log) - if err != nil { - // ignoring store failure until DB user creation working - // return nil, err - log.Errorf("failed to initialize store, %v", err) - } - - k8sclient, err := k8s.NewK8SClient(log) - if err != nil { - return nil, fmt.Errorf("failed to initalize k8s client: %v", err) - } - - credAdmin, err := credentials.NewCredentialAdmin(context.TODO()) - if err != nil { - log.Audit("security", "storecred", "failed", "system", "failed to intialize credentials client") - return nil, err - } - - return &FetchCrossPlaneProviders{log: log, client: k8sclient, db: db, creds: credAdmin}, nil -} - -func (fetch *FetchCrossPlaneProviders) Run() { - fetch.log.Info("started to sync CrossplaneProvider resources") - - objList, err := fetch.client.DynamicClient.ListAllNamespaceResource(context.TODO(), schema.GroupVersionResource{Group: "pkg.crossplane.io", Version: "v1", Resource: "providers"}) - if err != nil { - fetch.log.Error("Failed to fetch all the resource, err:", err) - - return - } - - providers, err := json.Marshal(objList) - if err != nil { - fetch.log.Error("Failed to marshall the data, err:", err) - - return - } - - var providerObj model.ProviderList - err = json.Unmarshal(providers, &providerObj) - if err != nil { - fetch.log.Error("Failed to un-marshall the data, err:", err) - - return - } - - fetch.UpdateCrossplaneProvider(providerObj.Items) - - fetch.log.Info("succesfully sync-ed CrossplaneProvider resources") -} - -func (fetch *FetchCrossPlaneProviders) UpdateCrossplaneProvider(clObj []model.Provider) { - prvList, err := fetch.db.GetCrossplaneProviders() - if err != nil { - fetch.log.Error("Failed to GetCrossplaneProviders, err:", err) - - return - } - - prvMap := make(map[string]*captenpluginspb.CrossplaneProvider) - for _, prov := range prvList { - prvMap[providerNamePrefix+prov.ProviderName] = prov - } - - for _, obj := range clObj { - for _, status := range obj.Status.Conditions { - if status.Type != model.TypeHealthy { - continue - } - - prvObj, ok := prvMap[obj.Name] - if !ok { - fetch.log.Infof("Provider name %s is not found in the db, skipping the update", obj.Name) - continue - } - - provider := model.CrossplaneProvider{ - Id: prvObj.Id, - Status: string(status.Type), - CloudType: prvObj.CloudType, - CloudProviderId: prvObj.CloudProviderId, - ProviderName: prvObj.ProviderName, - } - - if err := fetch.db.UpdateCrossplaneProvider(&provider); err != nil { - fetch.log.Errorf("failed to update provider %s details in db, err: ", prvObj.ProviderName, err) - continue - } - - fetch.log.Infof("successfully updated the details for %s", prvObj.ProviderName) - - } - } -} diff --git a/capten/agent/pkg/util/synch_cassandra_secret.go b/capten/agent/pkg/util/synch_cassandra_secret.go index 63ec06f5..55a2f0ae 100644 --- a/capten/agent/pkg/util/synch_cassandra_secret.go +++ b/capten/agent/pkg/util/synch_cassandra_secret.go @@ -28,7 +28,7 @@ func SyncCassandraAdminSecret(log logging.Logger) error { return err } - res, err := k8sClient.FetchSecretDetails(&k8s.SecretDetailsRequest{Namespace: conf.Namespace, SecretName: conf.SecretName}) + res, err := k8sClient.GetSecretData(conf.Namespace, conf.SecretName) if err != nil { return err } diff --git a/capten/common-pkg/k8s/client.go b/capten/common-pkg/k8s/client.go index bca7a1de..a815313e 100644 --- a/capten/common-pkg/k8s/client.go +++ b/capten/common-pkg/k8s/client.go @@ -33,14 +33,12 @@ func NewK8SClient(log logging.Logger) (*K8SClient, error) { // creates the clientset clientset, err := kubernetes.NewForConfig(config) if err != nil { - log.Errorf("Initialize kubernetes client failed: %v", err) - return nil, err + return nil, fmt.Errorf("initialize kubernetes client failed: %v", err) } dcClient, err := dynamic.NewForConfig(config) if err != nil { - log.Errorf("failed to initialize dynamic client failed: %v", err) - return nil, err + return nil, fmt.Errorf("failed to initialize dynamic client failed: %v", err) } return &K8SClient{ @@ -53,23 +51,22 @@ func NewK8SClient(log logging.Logger) (*K8SClient, error) { func GetK8SConfig(log logging.Logger) (*rest.Config, error) { conf, err := FetchConfiguration() if err != nil { - log.Errorf("Fetch configuration failed: %v", err) - return nil, err + return nil, fmt.Errorf("fetch configuration failed: %v", err) } + var k8sConfig *rest.Config if conf.KubeconfigPath == "" { // creates the in-cluster config k8sConfig, err = rest.InClusterConfig() if err != nil { - log.Errorf("Fetch in-cluster configuration failed: %v", err) - return nil, err + return nil, fmt.Errorf("fetch in-cluster configuration failed: %v", err) } } else { // use the current context in kubeconfig k8sConfig, err = clientcmd.BuildConfigFromFlags("", conf.KubeconfigPath) if err != nil { - log.Errorf("Fetch in-cluster configuration from absolute path %s failed: %v", conf.KubeconfigPath, err) - return nil, err + return nil, fmt.Errorf("in-cluster configuration from absolute path %s failed: %v", conf.KubeconfigPath, err) + } } return k8sConfig, nil @@ -78,10 +75,8 @@ func GetK8SConfig(log logging.Logger) (*rest.Config, error) { func (k *K8SClient) ListPods(namespace string) ([]corev1.Pod, error) { pods, err := k.Clientset.CoreV1().Pods(namespace).List(context.TODO(), metav1.ListOptions{}) if err != nil { - k.log.Errorf("List pods failed, %v", err) return nil, err } - fmt.Printf("There are %d pods in the cluster\n", len(pods.Items)) return pods.Items, nil } @@ -91,29 +86,25 @@ func FetchConfiguration() (*Configuration, error) { return cfg, err } -func (k *K8SClient) FetchSecretDetails(req *SecretDetailsRequest) (*SecretDetailsResponse, error) { - secret, err := k.Clientset.CoreV1().Secrets(req.Namespace).Get(context.TODO(), req.SecretName, metav1.GetOptions{}) +func (k *K8SClient) GetSecretData(namespace, secretName string) (*SecretData, error) { + secret, err := k.Clientset.CoreV1().Secrets(namespace).Get(context.TODO(), secretName, metav1.GetOptions{}) if err != nil { - k.log.Errorf("Fetching secret %s failed, %v", req.SecretName, err) return nil, err } - // Convert data value from []bytes to string data := make(map[string]string, len(secret.Data)) for k, v := range secret.Data { data[k] = string(v) } - return &SecretDetailsResponse{ - Namespace: req.Namespace, - Data: data, + return &SecretData{ + Data: data, }, nil } -func (k *K8SClient) FetchServiceDetails(req *ServiceDetailsRequest) (*ServiceDetailsResponse, error) { - service, err := k.Clientset.CoreV1().Services(req.Namespace).Get(context.TODO(), req.ServiceName, metav1.GetOptions{}) +func (k *K8SClient) GetServiceData(namespace, serviceName string) (*ServiceData, error) { + service, err := k.Clientset.CoreV1().Services(namespace).Get(context.TODO(), serviceName, metav1.GetOptions{}) if err != nil { - k.log.Errorf("Fetching service %s details failed, %v", req.ServiceName, err) return nil, err } @@ -123,11 +114,8 @@ func (k *K8SClient) FetchServiceDetails(req *ServiceDetailsRequest) (*ServiceDet ports = append(ports, v.Port) } - return &ServiceDetailsResponse{ - Namespace: req.Namespace, - ServiceDetails: ServiceDetails{ - Name: service.Name, - Ports: ports, - }, + return &ServiceData{ + Name: service.Name, + Ports: ports, }, nil } diff --git a/capten/common-pkg/k8s/model.go b/capten/common-pkg/k8s/model.go index 752255ed..c864b7a3 100644 --- a/capten/common-pkg/k8s/model.go +++ b/capten/common-pkg/k8s/model.go @@ -1,26 +1,11 @@ package k8s -type SecretDetailsRequest struct { - Namespace string - SecretName string -} - -type SecretDetailsResponse struct { +type SecretData struct { Namespace string Data map[string]string } -type ServiceDetailsRequest struct { - Namespace string - ServiceName string -} - -type ServiceDetails struct { +type ServiceData struct { Name string Ports []int32 } - -type ServiceDetailsResponse struct { - Namespace string - ServiceDetails -} diff --git a/capten/common-pkg/plugins/argocd/client.go b/capten/common-pkg/plugins/argocd/client.go index 9e30b547..f03f4e9d 100644 --- a/capten/common-pkg/plugins/argocd/client.go +++ b/capten/common-pkg/plugins/argocd/client.go @@ -36,10 +36,12 @@ func NewClient(logger logging.Logger) (*ArgoCDClient, error) { if err != nil { return nil, err } - res, err := k8sClient.FetchSecretDetails(&k8s.SecretDetailsRequest{Namespace: "argo-cd", SecretName: "argocd-initial-admin-secret"}) + + res, err := k8sClient.GetSecretData("argo-cd", "argocd-initial-admin-secret") if err != nil { return nil, err } + password := res.Data["password"] if len(password) == 0 { return nil, fmt.Errorf("credentials not found in the secret") diff --git a/capten/common-pkg/plugins/fetcher/fetcher.go b/capten/common-pkg/plugins/fetcher/credential.go similarity index 80% rename from capten/common-pkg/plugins/fetcher/fetcher.go rename to capten/common-pkg/plugins/fetcher/credential.go index adfb3164..09d8d640 100644 --- a/capten/common-pkg/plugins/fetcher/fetcher.go +++ b/capten/common-pkg/plugins/fetcher/credential.go @@ -7,8 +7,8 @@ package fetcher import ( "fmt" - "github.com/kube-tarian/kad/capten/common-pkg/k8s" "github.com/intelops/go-common/logging" + "github.com/kube-tarian/kad/capten/common-pkg/k8s" ) type CredentialFetcher struct { @@ -17,15 +17,12 @@ type CredentialFetcher struct { } func NewCredentialFetcher(log logging.Logger) (*CredentialFetcher, error) { - // Initialize kubernetes client k8sClient, err := k8s.NewK8SClient(log) if err != nil { log.Errorf("K8S client initialization failed: %v", err) return nil, fmt.Errorf("k8 client initialization failed, %v", err) } - // TODO: Initialze Cassandra client - return &CredentialFetcher{ k8sClient: k8sClient, log: log, @@ -53,27 +50,20 @@ func (c *CredentialFetcher) FetchPluginDetails(req *PluginRequest) (*PluginRespo } func (c *CredentialFetcher) FetchArgoCDDetails(namespace, releaseName string) (*PluginResponse, error) { - service, err := c.k8sClient.FetchServiceDetails(&k8s.ServiceDetailsRequest{ - Namespace: namespace, - ServiceName: releaseName, - }) + service, err := c.k8sClient.GetServiceData(namespace, releaseName) if err != nil { - c.log.Errorf("Fetching plugin credentials failed: %v", err) - return nil, err + return nil, fmt.Errorf("fetching plugin credentials failed: %v", err) } // Depending on the service port details isSSLEnabled can be prepared. For now it is set to false default scenario isSSLEnabled := false - credentialDetails, err := c.k8sClient.FetchSecretDetails(&k8s.SecretDetailsRequest{ - Namespace: namespace, - SecretName: "argocd-initial-admin-secret", - }) + credentialDetails, err := c.k8sClient.GetSecretData(namespace, "argocd-initial-admin-secret") if err != nil { c.log.Errorf("Fetching plugin credentials failed: %v", err) return nil, err } return &PluginResponse{ - ServiceURL: fmt.Sprintf("%s.%s.svc.cluster.local", service.Name, service.Namespace), + ServiceURL: fmt.Sprintf("%s.%s.svc.cluster.local", service.Name, namespace), IsSSLEnabled: isSSLEnabled, Username: "admin", // admin user is not available in secret Password: credentialDetails.Data["password"], diff --git a/capten/common-pkg/worker-framework/event_processor.go b/capten/common-pkg/worker-framework/event_processor.go index 381cedb7..cc64f327 100644 --- a/capten/common-pkg/worker-framework/event_processor.go +++ b/capten/common-pkg/worker-framework/event_processor.go @@ -43,23 +43,6 @@ type ConfigureCICD interface { GetDefaultBranchName() (string, error) } -type ConfigurationWorker interface { - // ConfigurationActivities(payload interface{}) (json.RawMessage, error) - - ClusterAdd(payload interface{}) (json.RawMessage, error) - ClusterDelete(payload interface{}) (json.RawMessage, error) - - RepositoryAdd(payload interface{}) (json.RawMessage, error) - RepositoryDelete(payload interface{}) (json.RawMessage, error) - - ProjectAdd(payload interface{}) (json.RawMessage, error) - ProjectDelete(payload interface{}) (json.RawMessage, error) - - // ConfgiureTarget(payload interface{}) (json.RawMessage, error) - // SetTarget(payload interface{}) (json.RawMessage, error) - // SetDefaultTarget(payload interface{}) (json.RawMessage, error) -} - type Action interface { GetStatus() }