Skip to content

Commit

Permalink
Merge pull request #318 from kube-tarian/fix-crossplane-res-sync
Browse files Browse the repository at this point in the history
refactor crossplane resources synch jobs
  • Loading branch information
vramk23 committed Nov 12, 2023
2 parents ce6c6e1 + 1a3db53 commit 32bf560
Show file tree
Hide file tree
Showing 16 changed files with 437 additions and 427 deletions.
62 changes: 26 additions & 36 deletions capten/agent/cmd/agent/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,13 @@ import (

"google.golang.org/grpc"

"github.com/robfig/cron/v3"

"github.com/intelops/go-common/logging"
"github.com/kube-tarian/kad/capten/agent/pkg/agent"
captenstore "github.com/kube-tarian/kad/capten/agent/pkg/capten-store"
"github.com/kube-tarian/kad/capten/agent/pkg/config"
"github.com/kube-tarian/kad/capten/agent/pkg/job"
"github.com/kube-tarian/kad/capten/agent/pkg/pb/agentpb"
"github.com/kube-tarian/kad/capten/agent/pkg/pb/captenpluginspb"
captenagentsync "github.com/kube-tarian/kad/capten/agent/pkg/sync"
"github.com/kube-tarian/kad/capten/agent/pkg/util"
dbinit "github.com/kube-tarian/kad/capten/common-pkg/cassandra/db-init"
dbmigrate "github.com/kube-tarian/kad/capten/common-pkg/cassandra/db-migrate"
Expand All @@ -41,7 +40,14 @@ func main() {
log.Fatalf("%v", err)
}

s, err := agent.NewAgent(log, cfg)
as, err := captenstore.NewStore(log)
if err != nil {
// ignoring store failure until DB user creation working
// return nil, err
log.Errorf("failed to initialize store, %v", err)
}

s, err := agent.NewAgent(log, cfg, as)
if err != nil {
log.Fatalf("Agent initialization failed, %v", err)
}
Expand Down Expand Up @@ -71,15 +77,13 @@ func main() {
}
}()

cronjob, err := initializeCronJobs(cfg.CronInterval)
jobScheduler, err := initializeJobScheduler(cfg, as)
if err != nil {
log.Fatalf("Failed to create cron job: %v", err)
}

cronjob.Start()
defer cronjob.Stop()

log.Info("syncing clusterClaim started successfully...")
jobScheduler.Start()
defer jobScheduler.Stop()

signals := make(chan os.Signal, 1)
signal.Notify(signals, syscall.SIGINT, syscall.SIGTERM)
Expand All @@ -104,33 +108,19 @@ func configureDB() error {
return nil
}

func initializeCronJobs(crontInterval string) (*cron.Cron, error) {
cronJob := cron.New(cron.WithChain(cron.SkipIfStillRunning(cron.DefaultLogger), cron.Recover(cron.DefaultLogger)))

fetch, err := captenagentsync.NewFetch()
if err != nil {
log.Errorf("Failed to initialize the sync: %v", err)
return nil, err
}

_, jobErr := cronJob.AddJob(fmt.Sprintf(StrInterval, crontInterval), fetch)
if jobErr != nil {
log.Errorf("Failed to add cronJob for sync clusterClaim: %v", jobErr)
return nil, jobErr
}

fetchCrossPlaneProviders, err := captenagentsync.NewFetchCrossPlaneProviders()
if err != nil {
log.Errorf("Failed to initialize the sync: %v", err)
return nil, err
}

_, crossPlaneJobErr := cronJob.AddJob(fmt.Sprintf(StrInterval, crontInterval), fetchCrossPlaneProviders)
if jobErr != nil {
log.Errorf("Failed to add cronJob for sync clusterClaim: %v", crossPlaneJobErr)
return nil, jobErr
func initializeJobScheduler(cfg *config.SericeConfig, as *captenstore.Store) (*job.Scheduler, error) {
s := job.NewScheduler(log)
if cfg.CrossplaneSyncJobEnabled {
cs, err := job.NewCrossplaneResourcesSync(log, cfg.CrossplaneSyncJobInterval, as)
if err != nil {
log.Fatal("failed to init crossplane resources sync job", err)
}
err = s.AddJob("crossplane-resources-synch", cs)
if err != nil {
log.Fatal("failed to add crossplane resources sync job", err)
}
}

log.Info("successfully initialized the cron")
return cronJob, nil
log.Info("successfully initialized job scheduler")
return s, nil
}
9 changes: 1 addition & 8 deletions capten/agent/pkg/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ type Agent struct {
createPr bool
}

func NewAgent(log logging.Logger, cfg *config.SericeConfig) (*Agent, error) {
func NewAgent(log logging.Logger, cfg *config.SericeConfig, as *captenstore.Store) (*Agent, error) {
var tc *temporalclient.Client
var err error

Expand All @@ -32,13 +32,6 @@ func NewAgent(log logging.Logger, cfg *config.SericeConfig) (*Agent, error) {
return nil, err
}

as, err := captenstore.NewStore(log)
if err != nil {
// ignoring store failure until DB user creation working
// return nil, err
log.Errorf("failed to initialize store, %v", err)
}

agent := &Agent{
tc: tc,
as: as,
Expand Down
2 changes: 1 addition & 1 deletion capten/agent/pkg/agent/agent_cluster_apps_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ func TestAgentTestSuite(t *testing.T) {
t.Fatal(err)
}*/

agent, err := NewAgent(agentSuite.logger, &config.SericeConfig{})
agent, err := NewAgent(agentSuite.logger, &config.SericeConfig{}, nil)
if err != nil {
t.Fatal(err)
}
Expand Down
11 changes: 6 additions & 5 deletions capten/agent/pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,12 @@ import (
)

type SericeConfig struct {
Host string `envconfig:"HOST" default:"0.0.0.0"`
Port int `envconfig:"PORT" default:"9091"`
Mode string `envconfig:"MODE" default:"production"`
AuthEnabled bool `envconfig:"AUTH_ENABLED" default:"false"`
CronInterval string `envconfig:"CRON_INTERNAL" default:"60"`
Host string `envconfig:"HOST" default:"0.0.0.0"`
Port int `envconfig:"PORT" default:"9091"`
Mode string `envconfig:"MODE" default:"production"`
AuthEnabled bool `envconfig:"AUTH_ENABLED" default:"false"`
CrossplaneSyncJobEnabled bool `envconfig:"CROSSPLANE_SYNC_JOB_ENABLED" default:"true"`
CrossplaneSyncJobInterval string `envconfig:"CROSSPLANE_SYNC_JOB_INTERVAL" default:"@every 1m"`
}

func GetServiceConfig() (*SericeConfig, error) {
Expand Down
155 changes: 155 additions & 0 deletions capten/agent/pkg/crossplane/crossplane_cluster_claims.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
package crossplane

import (
"context"
"encoding/json"
"fmt"
"time"

"github.com/google/uuid"
"github.com/intelops/go-common/credentials"
"github.com/intelops/go-common/logging"
captenstore "github.com/kube-tarian/kad/capten/agent/pkg/capten-store"

"github.com/kube-tarian/kad/capten/agent/pkg/pb/captenpluginspb"

"github.com/kube-tarian/kad/capten/agent/pkg/model"
"github.com/kube-tarian/kad/capten/common-pkg/k8s"
"k8s.io/apimachinery/pkg/runtime/schema"
)

var (
statusType = "Ready"
statusValue = "True"
clusterName = "%s-%s"
clusterSecretName = "%s-cluster"
kubeConfig = "kubeconfig"
k8sEndpoint = "endpoint"
k8sClusterCA = "clusterCA"
managedClusterEntityName = "managedcluster"
)

type ClusterClaimSyncHandler struct {
log logging.Logger
dbStore *captenstore.Store
clusters map[string]*captenpluginspb.ManagedCluster
}

func NewClusterClaimSyncHandler(log logging.Logger, dbStore *captenstore.Store) (*ClusterClaimSyncHandler, error) {
return &ClusterClaimSyncHandler{log: log, dbStore: dbStore, clusters: map[string]*captenpluginspb.ManagedCluster{}}, nil
}

func (h *ClusterClaimSyncHandler) Sync() error {
h.log.Debug("started to sync cluster-claims resources")

k8sclient, err := k8s.NewK8SClient(h.log)
if err != nil {
return fmt.Errorf("failed to initalize k8s client: %v", err)
}

objList, err := k8sclient.DynamicClient.ListAllNamespaceResource(context.TODO(),
schema.GroupVersionResource{Group: "prodready.cluster", Version: "v1alpha1", Resource: "clusterclaims"})
if err != nil {
return fmt.Errorf("failed to list cluster claim resources, %v", err)
}

clusterClaimByte, err := json.Marshal(objList)
if err != nil {
return fmt.Errorf("failed to marshal cluster claim resources, %v", err)
}

var clObj model.ClusterClaimList
err = json.Unmarshal(clusterClaimByte, &clObj)
if err != nil {
return fmt.Errorf("failed to unmarshal cluster claim resources, %v", err)
}

if err = h.updateManagedClusters(k8sclient, clObj.Items); err != nil {
return fmt.Errorf("failed to update clusters in DB, %v", err)
}
h.log.Info("cluster-claims resources synched")
return nil
}

func (h *ClusterClaimSyncHandler) updateManagedClusters(k8sClient *k8s.K8SClient, clObj []model.ClusterClaim) error {
credAdmin, err := credentials.NewCredentialAdmin(context.TODO())
if err != nil {
return err
}

clusters, err := h.getManagedClusters()
if err != nil {
return fmt.Errorf("failed to get managed clusters from DB, %v", err)
}

for _, obj := range clObj {
for _, status := range obj.Status.Conditions {
if status.Type != statusType {
continue
}

if status.Status != statusValue {
h.log.Info("%s in namespace %s, status is %s, so skiping update to db.",
obj.Spec.Id, obj.Metadata.Namespace, status.Status)
continue
}

secretName := fmt.Sprintf(clusterSecretName, obj.Spec.Id)
resp, err := k8sClient.GetSecretData(obj.Metadata.Namespace, secretName)
if err != nil {
h.log.Info("failed to get secret %s in namespace %s, %v",
secretName, obj.Metadata.Namespace, err)
continue
}

clusterEndpoint := resp.Data[k8sEndpoint]

managedCluster := &captenpluginspb.ManagedCluster{}
clusterObj, ok := h.clusters[clusterEndpoint]
if !ok {
managedCluster.Id = uuid.New().String()
} else {
h.log.Info("found existing Id: %s, updating the latest information ", clusterObj.Id)
managedCluster.Id = clusterObj.Id
}

managedCluster.ClusterName = fmt.Sprintf(clusterName, obj.Spec.Id, obj.Metadata.Namespace)
managedCluster.ClusterDeployStatus = status.Status
managedCluster.ClusterEndpoint = clusterEndpoint

clusterDetails := map[string]string{}
clusterDetails[kubeConfig] = resp.Data[kubeConfig]
clusterDetails[k8sClusterCA] = resp.Data[k8sClusterCA]

err = credAdmin.PutCredential(context.TODO(), credentials.GenericCredentialType, managedClusterEntityName, managedCluster.Id, clusterDetails)

if err != nil {
h.log.Audit("security", "storecred", "failed", "system", "failed to store crendential for %s", managedCluster.Id)
h.log.Errorf("failed to store credential for %s, %v", managedCluster.Id, err)
continue
}

managedCluster.LastUpdateTime = time.Now().Format(time.RFC3339)
err = h.dbStore.UpsertManagedCluster(managedCluster)
if err != nil {
h.log.Info("failed to update information to db, %v", err)
continue
}
clusters[clusterEndpoint] = managedCluster
}
}
return nil
}

func (h *ClusterClaimSyncHandler) getManagedClusters() (map[string]*captenpluginspb.ManagedCluster, error) {
clusters, err := h.dbStore.GetManagedClusters()
if err != nil {
return nil, err
}

clusterEndpointMap := map[string]*captenpluginspb.ManagedCluster{}
for _, cluster := range clusters {
clusterEndpointMap[cluster.ClusterEndpoint] = cluster
}
return clusterEndpointMap, nil
}
Loading

0 comments on commit 32bf560

Please sign in to comment.