From 4458d5fa80f452d2abf5bec997ad4466581c2c5e Mon Sep 17 00:00:00 2001 From: Soumya Ghosh Dastidar <44349253+gdsoumya@users.noreply.github.com> Date: Mon, 12 Feb 2024 00:02:17 +0530 Subject: [PATCH] fix: stop initializing deployment informer if dynamic sharding is disabled (#17097) * fix: stop initializing deployment informer if dynamic sharding is disabled Signed-off-by: Soumya Ghosh Dastidar * feat: updated sharding cache getter func Signed-off-by: Soumya Ghosh Dastidar --------- Signed-off-by: Soumya Ghosh Dastidar --- .../commands/argocd_application_controller.go | 38 +++++--- controller/appcontroller.go | 97 +++++++++++-------- controller/appcontroller_test.go | 1 + 3 files changed, 82 insertions(+), 54 deletions(-) diff --git a/cmd/argocd-application-controller/commands/argocd_application_controller.go b/cmd/argocd-application-controller/commands/argocd_application_controller.go index 0ff9fa33c8254..c38a2113e2b34 100644 --- a/cmd/argocd-application-controller/commands/argocd_application_controller.go +++ b/cmd/argocd-application-controller/commands/argocd_application_controller.go @@ -147,7 +147,8 @@ func NewCommand() *cobra.Command { appController.InvalidateProjectsCache() })) kubectl := kubeutil.NewKubectl() - clusterSharding := getClusterSharding(kubeClient, settingsMgr, shardingAlgorithm, enableDynamicClusterDistribution) + clusterSharding, err := getClusterSharding(kubeClient, settingsMgr, shardingAlgorithm, enableDynamicClusterDistribution) + errors.CheckError(err) appController, err = controller.NewApplicationController( namespace, settingsMgr, @@ -170,6 +171,7 @@ func NewCommand() *cobra.Command { applicationNamespaces, &workqueueRateLimit, serverSideDiff, + enableDynamicClusterDistribution, ) errors.CheckError(err) cacheutil.CollectMetrics(redisClient, appController.GetMetricsServer()) @@ -238,21 +240,29 @@ func NewCommand() *cobra.Command { return &command } -func getClusterSharding(kubeClient *kubernetes.Clientset, settingsMgr *settings.SettingsManager, shardingAlgorithm string, enableDynamicClusterDistribution bool) sharding.ClusterShardingCache { - var replicasCount int +func getClusterSharding(kubeClient *kubernetes.Clientset, settingsMgr *settings.SettingsManager, shardingAlgorithm string, enableDynamicClusterDistribution bool) (sharding.ClusterShardingCache, error) { + var ( + replicasCount int + ) // StatefulSet mode and Deployment mode uses different default values for shard number. defaultShardNumberValue := 0 - applicationControllerName := env.StringFromEnv(common.EnvAppControllerName, common.DefaultApplicationControllerName) - appControllerDeployment, err := kubeClient.AppsV1().Deployments(settingsMgr.GetNamespace()).Get(context.Background(), applicationControllerName, metav1.GetOptions{}) - // if the application controller deployment was not found, the Get() call returns an empty Deployment object. So, set the variable to nil explicitly - if err != nil && kubeerrors.IsNotFound(err) { - appControllerDeployment = nil - } + if enableDynamicClusterDistribution { + applicationControllerName := env.StringFromEnv(common.EnvAppControllerName, common.DefaultApplicationControllerName) + appControllerDeployment, err := kubeClient.AppsV1().Deployments(settingsMgr.GetNamespace()).Get(context.Background(), applicationControllerName, metav1.GetOptions{}) + + // if app controller deployment is not found when dynamic cluster distribution is enabled error out + if err != nil { + return nil, fmt.Errorf("(dymanic cluster distribution) failed to get app controller deployment: %v", err) + } + + if appControllerDeployment != nil && appControllerDeployment.Spec.Replicas != nil { + replicasCount = int(*appControllerDeployment.Spec.Replicas) + defaultShardNumberValue = -1 + } else { + return nil, fmt.Errorf("(dymanic cluster distribution) failed to get app controller deployment replica count") + } - if enableDynamicClusterDistribution && appControllerDeployment != nil && appControllerDeployment.Spec.Replicas != nil { - replicasCount = int(*appControllerDeployment.Spec.Replicas) - defaultShardNumberValue = -1 } else { replicasCount = env.ParseNumFromEnv(common.EnvControllerReplicas, 0, 0, math.MaxInt32) } @@ -260,7 +270,7 @@ func getClusterSharding(kubeClient *kubernetes.Clientset, settingsMgr *settings. if replicasCount > 1 { // check for shard mapping using configmap if application-controller is a deployment // else use existing logic to infer shard from pod name if application-controller is a statefulset - if enableDynamicClusterDistribution && appControllerDeployment != nil { + if enableDynamicClusterDistribution { var err error // retry 3 times if we find a conflict while updating shard mapping configMap. // If we still see conflicts after the retries, wait for next iteration of heartbeat process. @@ -288,5 +298,5 @@ func getClusterSharding(kubeClient *kubernetes.Clientset, settingsMgr *settings. log.Info("Processing all cluster shards") } db := db.NewDB(settingsMgr.GetNamespace(), settingsMgr, kubeClient) - return sharding.NewClusterSharding(db, shardNumber, replicasCount, shardingAlgorithm) + return sharding.NewClusterSharding(db, shardNumber, replicasCount, shardingAlgorithm), nil } diff --git a/controller/appcontroller.go b/controller/appcontroller.go index e6dee507caa2e..f038b770c29c4 100644 --- a/controller/appcontroller.go +++ b/controller/appcontroller.go @@ -113,7 +113,6 @@ type ApplicationController struct { appInformer cache.SharedIndexInformer appLister applisters.ApplicationLister projInformer cache.SharedIndexInformer - deploymentInformer informerv1.DeploymentInformer appStateManager AppStateManager stateCache statecache.LiveStateCache statusRefreshTimeout time.Duration @@ -130,6 +129,10 @@ type ApplicationController struct { clusterSharding sharding.ClusterShardingCache projByNameCache sync.Map applicationNamespaces []string + + // dynamicClusterDistributionEnabled if disabled deploymentInformer is never initialized + dynamicClusterDistributionEnabled bool + deploymentInformer informerv1.DeploymentInformer } // NewApplicationController creates new instance of ApplicationController. @@ -155,6 +158,7 @@ func NewApplicationController( applicationNamespaces []string, rateLimiterConfig *ratelimiter.AppControllerRateLimiterConfig, serverSideDiff bool, + dynamicClusterDistributionEnabled bool, ) (*ApplicationController, error) { log.Infof("appResyncPeriod=%v, appHardResyncPeriod=%v, appResyncJitter=%v", appResyncPeriod, appHardResyncPeriod, appResyncJitter) db := db.NewDB(namespace, settingsMgr, kubeClientset) @@ -163,28 +167,29 @@ func NewApplicationController( log.Info("Using default workqueue rate limiter config") } ctrl := ApplicationController{ - cache: argoCache, - namespace: namespace, - kubeClientset: kubeClientset, - kubectl: kubectl, - applicationClientset: applicationClientset, - repoClientset: repoClientset, - appRefreshQueue: workqueue.NewNamedRateLimitingQueue(ratelimiter.NewCustomAppControllerRateLimiter(rateLimiterConfig), "app_reconciliation_queue"), - appOperationQueue: workqueue.NewNamedRateLimitingQueue(ratelimiter.NewCustomAppControllerRateLimiter(rateLimiterConfig), "app_operation_processing_queue"), - projectRefreshQueue: workqueue.NewNamedRateLimitingQueue(ratelimiter.NewCustomAppControllerRateLimiter(rateLimiterConfig), "project_reconciliation_queue"), - appComparisonTypeRefreshQueue: workqueue.NewRateLimitingQueue(ratelimiter.NewCustomAppControllerRateLimiter(rateLimiterConfig)), - db: db, - statusRefreshTimeout: appResyncPeriod, - statusHardRefreshTimeout: appHardResyncPeriod, - statusRefreshJitter: appResyncJitter, - refreshRequestedApps: make(map[string]CompareWith), - refreshRequestedAppsMutex: &sync.Mutex{}, - auditLogger: argo.NewAuditLogger(namespace, kubeClientset, common.ApplicationController), - settingsMgr: settingsMgr, - selfHealTimeout: selfHealTimeout, - clusterSharding: clusterSharding, - projByNameCache: sync.Map{}, - applicationNamespaces: applicationNamespaces, + cache: argoCache, + namespace: namespace, + kubeClientset: kubeClientset, + kubectl: kubectl, + applicationClientset: applicationClientset, + repoClientset: repoClientset, + appRefreshQueue: workqueue.NewNamedRateLimitingQueue(ratelimiter.NewCustomAppControllerRateLimiter(rateLimiterConfig), "app_reconciliation_queue"), + appOperationQueue: workqueue.NewNamedRateLimitingQueue(ratelimiter.NewCustomAppControllerRateLimiter(rateLimiterConfig), "app_operation_processing_queue"), + projectRefreshQueue: workqueue.NewNamedRateLimitingQueue(ratelimiter.NewCustomAppControllerRateLimiter(rateLimiterConfig), "project_reconciliation_queue"), + appComparisonTypeRefreshQueue: workqueue.NewRateLimitingQueue(ratelimiter.NewCustomAppControllerRateLimiter(rateLimiterConfig)), + db: db, + statusRefreshTimeout: appResyncPeriod, + statusHardRefreshTimeout: appHardResyncPeriod, + statusRefreshJitter: appResyncJitter, + refreshRequestedApps: make(map[string]CompareWith), + refreshRequestedAppsMutex: &sync.Mutex{}, + auditLogger: argo.NewAuditLogger(namespace, kubeClientset, common.ApplicationController), + settingsMgr: settingsMgr, + selfHealTimeout: selfHealTimeout, + clusterSharding: clusterSharding, + projByNameCache: sync.Map{}, + applicationNamespaces: applicationNamespaces, + dynamicClusterDistributionEnabled: dynamicClusterDistributionEnabled, } if kubectlParallelismLimit > 0 { ctrl.kubectlSemaphore = semaphore.NewWeighted(kubectlParallelismLimit) @@ -227,25 +232,33 @@ func NewApplicationController( } factory := informers.NewSharedInformerFactoryWithOptions(ctrl.kubeClientset, defaultDeploymentInformerResyncDuration, informers.WithNamespace(settingsMgr.GetNamespace())) - deploymentInformer := factory.Apps().V1().Deployments() + + var deploymentInformer informerv1.DeploymentInformer + + // only initialize deployment informer if dynamic distribution is enabled + if dynamicClusterDistributionEnabled { + deploymentInformer = factory.Apps().V1().Deployments() + } readinessHealthCheck := func(r *http.Request) error { - applicationControllerName := env.StringFromEnv(common.EnvAppControllerName, common.DefaultApplicationControllerName) - appControllerDeployment, err := deploymentInformer.Lister().Deployments(settingsMgr.GetNamespace()).Get(applicationControllerName) - if err != nil { - if kubeerrors.IsNotFound(err) { - appControllerDeployment = nil - } else { - return fmt.Errorf("error retrieving Application Controller Deployment: %s", err) - } - } - if appControllerDeployment != nil { - if appControllerDeployment.Spec.Replicas != nil && int(*appControllerDeployment.Spec.Replicas) <= 0 { - return fmt.Errorf("application controller deployment replicas is not set or is less than 0, replicas: %d", appControllerDeployment.Spec.Replicas) + if dynamicClusterDistributionEnabled { + applicationControllerName := env.StringFromEnv(common.EnvAppControllerName, common.DefaultApplicationControllerName) + appControllerDeployment, err := deploymentInformer.Lister().Deployments(settingsMgr.GetNamespace()).Get(applicationControllerName) + if err != nil { + if kubeerrors.IsNotFound(err) { + appControllerDeployment = nil + } else { + return fmt.Errorf("error retrieving Application Controller Deployment: %s", err) + } } - shard := env.ParseNumFromEnv(common.EnvControllerShard, -1, -math.MaxInt32, math.MaxInt32) - if _, err := sharding.GetOrUpdateShardFromConfigMap(kubeClientset.(*kubernetes.Clientset), settingsMgr, int(*appControllerDeployment.Spec.Replicas), shard); err != nil { - return fmt.Errorf("error while updating the heartbeat for to the Shard Mapping ConfigMap: %s", err) + if appControllerDeployment != nil { + if appControllerDeployment.Spec.Replicas != nil && int(*appControllerDeployment.Spec.Replicas) <= 0 { + return fmt.Errorf("application controller deployment replicas is not set or is less than 0, replicas: %d", appControllerDeployment.Spec.Replicas) + } + shard := env.ParseNumFromEnv(common.EnvControllerShard, -1, -math.MaxInt32, math.MaxInt32) + if _, err := sharding.GetOrUpdateShardFromConfigMap(kubeClientset.(*kubernetes.Clientset), settingsMgr, int(*appControllerDeployment.Spec.Replicas), shard); err != nil { + return fmt.Errorf("error while updating the heartbeat for to the Shard Mapping ConfigMap: %s", err) + } } } return nil @@ -773,7 +786,11 @@ func (ctrl *ApplicationController) Run(ctx context.Context, statusProcessors int go ctrl.appInformer.Run(ctx.Done()) go ctrl.projInformer.Run(ctx.Done()) - go ctrl.deploymentInformer.Informer().Run(ctx.Done()) + + if ctrl.dynamicClusterDistributionEnabled { + // only start deployment informer if dynamic distribution is enabled + go ctrl.deploymentInformer.Informer().Run(ctx.Done()) + } clusters, err := ctrl.db.ListClusters(ctx) if err != nil { diff --git a/controller/appcontroller_test.go b/controller/appcontroller_test.go index 4162a9983e941..33a29bc5ca3f8 100644 --- a/controller/appcontroller_test.go +++ b/controller/appcontroller_test.go @@ -157,6 +157,7 @@ func newFakeController(data *fakeData, repoErr error) *ApplicationController { nil, false, + false, ) db := &dbmocks.ArgoDB{} db.On("GetApplicationControllerReplicas").Return(1)