Skip to content

Commit

Permalink
Fix hanging jobs on upgrade (kyma-project#14412)
Browse files Browse the repository at this point in the history
* fix

* improve fn timing

* tweaks

* more changes

* refactor

* refactor

* fix imports

* update tests

* update tests #2

* update tests kyma-project#3

* update tests kyma-project#4

* fix test

* fix format

* bump images

* change error backoff to 5min

* fix logging

Co-authored-by: Damian Badura <damian.badura@sap.com>
  • Loading branch information
m00g3n and dbadura authored May 27, 2022
1 parent 19b21a7 commit 31f0ca6
Show file tree
Hide file tree
Showing 11 changed files with 142 additions and 50 deletions.
21 changes: 10 additions & 11 deletions components/function-controller/cmd/manager/main.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
package main

import (
"errors"
"fmt"
"os"
"time"

"sigs.k8s.io/controller-runtime/pkg/healthz"
"sigs.k8s.io/controller-runtime/pkg/webhook"

ctrl "sigs.k8s.io/controller-runtime"

Expand All @@ -30,7 +30,6 @@ import (
ctrlzap "sigs.k8s.io/controller-runtime/pkg/log/zap"
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/source"
"sigs.k8s.io/controller-runtime/pkg/webhook"
// +kubebuilder:scaffold:imports
)

Expand Down Expand Up @@ -78,7 +77,7 @@ func main() {
}

atomicLevel := zap.NewAtomicLevelAt(logLevel)
zapLogger := ctrlzap.NewRaw(ctrlzap.UseDevMode(true), ctrlzap.Level(&atomicLevel))
zapLogger := ctrlzap.NewRaw(ctrlzap.Level(&atomicLevel))
ctrl.SetLogger(zapr.NewLogger(zapLogger))

setupLog.Info("Generating Kubernetes client config")
Expand Down Expand Up @@ -129,7 +128,7 @@ func main() {
os.Exit(1)
}

fnRecon := serverless.NewFunction(resourceClient, zap.NewNop().Sugar(), config.Function, git.NewGit2Go(), mgr.GetEventRecorderFor(serverlessv1alpha1.FunctionControllerValue), prometheusCollector, healthCh)
fnRecon := serverless.NewFunction(resourceClient, zapLogger.Sugar(), config.Function, git.NewGit2Go(), mgr.GetEventRecorderFor(serverlessv1alpha1.FunctionControllerValue), prometheusCollector, healthCh)
fnCtrl, err := fnRecon.SetupWithManager(mgr)
if err != nil {
setupLog.Error(err, "unable to create Function controller")
Expand All @@ -142,37 +141,37 @@ func main() {
os.Exit(1)
}

if err := k8s.NewConfigMap(mgr.GetClient(), zap.NewNop().Sugar(), config.Kubernetes, configMapSvc).
if err := k8s.NewConfigMap(mgr.GetClient(), zapLogger.Sugar(), config.Kubernetes, configMapSvc).
SetupWithManager(mgr); err != nil {
setupLog.Error(err, "unable to create ConfigMap controller")
os.Exit(1)
}

if err := k8s.NewNamespace(mgr.GetClient(), zap.NewNop().Sugar(), config.Kubernetes, configMapSvc, secretSvc, serviceAccountSvc, roleSvc, roleBindingSvc).
if err := k8s.NewNamespace(mgr.GetClient(), zapLogger.Sugar(), config.Kubernetes, configMapSvc, secretSvc, serviceAccountSvc, roleSvc, roleBindingSvc).
SetupWithManager(mgr); err != nil {
setupLog.Error(err, "unable to create Namespace controller")
os.Exit(1)
}

if err := k8s.NewSecret(mgr.GetClient(), zap.NewNop().Sugar(), config.Kubernetes, secretSvc).
if err := k8s.NewSecret(mgr.GetClient(), zapLogger.Sugar(), config.Kubernetes, secretSvc).
SetupWithManager(mgr); err != nil {
setupLog.Error(err, "unable to create Secret controller")
os.Exit(1)
}

if err := k8s.NewServiceAccount(mgr.GetClient(), zap.NewNop().Sugar(), config.Kubernetes, serviceAccountSvc).
if err := k8s.NewServiceAccount(mgr.GetClient(), zapLogger.Sugar(), config.Kubernetes, serviceAccountSvc).
SetupWithManager(mgr); err != nil {
setupLog.Error(err, "unable to create ServiceAccount controller")
os.Exit(1)
}

if err := k8s.NewRole(mgr.GetClient(), zap.NewNop().Sugar(), config.Kubernetes, roleSvc).
if err := k8s.NewRole(mgr.GetClient(), zapLogger.Sugar(), config.Kubernetes, roleSvc).
SetupWithManager(mgr); err != nil {
setupLog.Error(err, "unable to create Role controller")
os.Exit(1)
}

if err := k8s.NewRoleBinding(mgr.GetClient(), zap.NewNop().Sugar(), config.Kubernetes, roleBindingSvc).
if err := k8s.NewRoleBinding(mgr.GetClient(), zapLogger.Sugar(), config.Kubernetes, roleBindingSvc).
SetupWithManager(mgr); err != nil {
setupLog.Error(err, "unable to create RoleBinding controller")
os.Exit(1)
Expand Down Expand Up @@ -209,6 +208,6 @@ func toZapLogLevel(level string) (zapcore.Level, error) {
case "error":
return zapcore.ErrorLevel, nil
default:
return 0, errors.New(fmt.Sprintf("Desired log level: %s not exist", level))
return 0, fmt.Errorf(fmt.Sprintf("Desired log level: %s not exist", level))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,14 @@ import (
"context"
"fmt"

ctrl "sigs.k8s.io/controller-runtime"

serverlessv1alpha1 "github.com/kyma-project/kyma/components/function-controller/pkg/apis/serverless/v1alpha1"
"gopkg.in/yaml.v2"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
apilabels "k8s.io/apimachinery/pkg/labels"
ctrl "sigs.k8s.io/controller-runtime"
)

const (
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"context"
"errors"
"fmt"
"reflect"
"runtime"
"strings"
"time"

Expand Down Expand Up @@ -61,11 +63,19 @@ loop:
m.err = ctx.Err()
break loop
default:
m.log.With("stateFn", runtime.FuncForPC(reflect.ValueOf(m.fn).Pointer()).Name()).
Info("next state")

m.fn = m.fn(ctx, m, &state)

}
}

m.log.With("requeueAfter", m.result.RequeueAfter).
With("requeue", m.result.Requeue).
With("err", m.err).
Info("reconciliation result")

return m.result, m.err
}

Expand Down Expand Up @@ -114,8 +124,8 @@ func buildStateFnGenericUpdateStatus(condition serverlessv1alpha1.Condition, rep

if !equalFunctionStatus(currentFunction.Status, s.instance.Status) {

if r.err = r.client.Status().Update(ctx, currentFunction); r.err != nil {
r.err = fmt.Errorf("while updating function status: %w", r.err)
if err := r.client.Status().Update(ctx, currentFunction); err != nil {
r.log.Warnf("while updating function status: %s", err)
}

r.statsCollector.UpdateReconcileStats(&s.instance, condition)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package serverless

import (
"context"
"reflect"
"time"

"go.uber.org/zap"
Expand All @@ -10,14 +11,15 @@ import (
"golang.org/x/time/rate"
appsv1 "k8s.io/api/apps/v1"
autoscalingv1 "k8s.io/api/autoscaling/v1"
batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/workqueue"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/predicate"

"github.com/kyma-project/kyma/components/function-controller/internal/git"
"github.com/kyma-project/kyma/components/function-controller/internal/resource"
Expand Down Expand Up @@ -64,7 +66,6 @@ func (r *FunctionReconciler) SetupWithManager(mgr ctrl.Manager) (controller.Cont
Named("function-controller").
For(&serverlessv1alpha1.Function{}).
Owns(&corev1.ConfigMap{}).
Owns(&batchv1.Job{}).
Owns(&appsv1.Deployment{}).
Owns(&corev1.Service{}).
Owns(&autoscalingv1.HorizontalPodAutoscaler{}).
Expand All @@ -76,6 +77,37 @@ func (r *FunctionReconciler) SetupWithManager(mgr ctrl.Manager) (controller.Cont
),
MaxConcurrentReconciles: 1, // Build job scheduling mechanism requires this parameter to be set to 1. The mechanism is based on getting active and stateless jobs, concurrent reconciles makes it non deterministic . Value 1 removes data races while fetching list of jobs. https://github.com/kyma-project/kyma/issues/10037
}).
WithEventFilter(predicate.Funcs{
UpdateFunc: func(event event.UpdateEvent) bool {
r.Log.Debug("old", event.ObjectOld.GetName())
r.Log.Debug("new:", event.ObjectNew.GetName())

oldFn, ok := event.ObjectOld.(*serverlessv1alpha1.Function)
if !ok {
v := reflect.ValueOf(oldFn)
r.Log.Debug("Can't cast:", v.Type())
return true
}

if oldFn == nil {
return false
}

newFn, ok := event.ObjectNew.(*serverlessv1alpha1.Function)
if !ok {
v := reflect.ValueOf(newFn)
r.Log.Debug("Can't cast:", v.Type())
return true
}
if newFn == nil {
return false
}

equalStasus := equalFunctionStatus(oldFn.Status, newFn.Status)
r.Log.Debug("Statuses are equal: ", equalStasus)

return equalStasus
}}).
Build(r)
}

Expand Down Expand Up @@ -136,6 +168,10 @@ func (r *FunctionReconciler) Reconcile(ctx context.Context, request ctrl.Request
operator: r.gitOperator,
}

stateReconciler.result = ctrl.Result{
RequeueAfter: time.Second * 1,
}

return stateReconciler.reconcile(ctx, instance)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func assertSuccessfulFunctionBuild(t *testing.T, resourceClient resource.Client,
result, err := reconciler.Reconcile(ctx, request)
g.Expect(err).To(gomega.BeNil())
g.Expect(result.Requeue).To(gomega.BeFalse())
g.Expect(result.RequeueAfter).To(gomega.Equal(time.Second * 0))
g.Expect(result.RequeueAfter).To(gomega.Equal(time.Second * 1))

function := &serverlessv1alpha1.Function{}
g.Expect(resourceClient.Get(context.TODO(), request.NamespacedName, function)).To(gomega.Succeed())
Expand All @@ -54,7 +54,7 @@ func assertSuccessfulFunctionBuild(t *testing.T, resourceClient resource.Client,
result, err = reconciler.Reconcile(ctx, request)
g.Expect(err).To(gomega.BeNil())
g.Expect(result.Requeue).To(gomega.BeFalse())
g.Expect(result.RequeueAfter).To(gomega.Equal(time.Second * 0))
g.Expect(result.RequeueAfter).To(gomega.Equal(time.Second * 1))

function = &serverlessv1alpha1.Function{}
g.Expect(resourceClient.Get(context.TODO(), request.NamespacedName, function)).To(gomega.Succeed())
Expand All @@ -77,7 +77,7 @@ func assertSuccessfulFunctionBuild(t *testing.T, resourceClient resource.Client,
result, err = reconciler.Reconcile(ctx, request)
g.Expect(err).To(gomega.BeNil())
g.Expect(result.Requeue).To(gomega.BeFalse())
g.Expect(result.RequeueAfter).To(gomega.Equal(time.Second * 0))
g.Expect(result.RequeueAfter).To(gomega.Equal(time.Second * 1))

function = &serverlessv1alpha1.Function{}
g.Expect(resourceClient.Get(context.TODO(), request.NamespacedName, function)).To(gomega.Succeed())
Expand All @@ -98,7 +98,7 @@ func assertSuccessfulFunctionDeployment(t *testing.T, resourceClient resource.Cl
result, err := reconciler.Reconcile(ctx, request)
g.Expect(err).To(gomega.BeNil())
g.Expect(result.Requeue).To(gomega.BeFalse())
g.Expect(result.RequeueAfter).To(gomega.Equal(time.Second * 0))
g.Expect(result.RequeueAfter).To(gomega.Equal(time.Second * 1))

function := &serverlessv1alpha1.Function{}
g.Expect(resourceClient.Get(context.TODO(), request.NamespacedName, function)).To(gomega.Succeed())
Expand Down Expand Up @@ -134,7 +134,7 @@ func assertSuccessfulFunctionDeployment(t *testing.T, resourceClient resource.Cl
result, err = reconciler.Reconcile(ctx, request)
g.Expect(err).To(gomega.BeNil())
g.Expect(result.Requeue).To(gomega.BeFalse())
g.Expect(result.RequeueAfter).To(gomega.Equal(time.Duration(0)))
g.Expect(result.RequeueAfter).To(gomega.Equal(time.Second * 1))

function = &serverlessv1alpha1.Function{}
g.Expect(resourceClient.Get(context.TODO(), request.NamespacedName, function)).To(gomega.Succeed())
Expand Down Expand Up @@ -171,7 +171,7 @@ func assertSuccessfulFunctionDeployment(t *testing.T, resourceClient resource.Cl
result, err = reconciler.Reconcile(ctx, request)
g.Expect(err).To(gomega.BeNil())
g.Expect(result.Requeue).To(gomega.BeFalse())
g.Expect(result.RequeueAfter).To(gomega.Equal(time.Duration(0)))
g.Expect(result.RequeueAfter).To(gomega.Equal(time.Second * 1))

function = &serverlessv1alpha1.Function{}
g.Expect(resourceClient.Get(context.TODO(), request.NamespacedName, function)).To(gomega.Succeed())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
corev1 "k8s.io/api/core/v1"
)

var beOKReconcileResult = recResultMatcher(false, 0)
var beOKReconcileResult = recResultMatcher(false, time.Second*1)
var beFinishedReconcileResult = recResultMatcher(false, time.Minute*5)

func recResultMatcher(requeue bool, requeueAfter time.Duration) gtypes.GomegaMatcher {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func TestFunctionReconciler_Reconcile(t *testing.T) {
result, err := reconciler.Reconcile(ctx, request)
g.Expect(err).To(gomega.BeNil())
g.Expect(result.Requeue).To(gomega.BeFalse())
g.Expect(result.RequeueAfter).To(gomega.Equal(time.Second * 0))
g.Expect(result.RequeueAfter).To(gomega.Equal(time.Second * 1))

function := &serverlessv1alpha1.Function{}
g.Expect(resourceClient.Get(context.TODO(), request.NamespacedName, function)).To(gomega.Succeed())
Expand Down Expand Up @@ -108,7 +108,7 @@ func TestFunctionReconciler_Reconcile(t *testing.T) {
result, err = reconciler.Reconcile(ctx, request)
g.Expect(err).To(gomega.BeNil())
g.Expect(result.Requeue).To(gomega.BeFalse())
g.Expect(result.RequeueAfter).To(gomega.Equal(time.Duration(0)))
g.Expect(result.RequeueAfter).To(gomega.Equal(time.Second * 1))

function = &serverlessv1alpha1.Function{}
g.Expect(resourceClient.Get(context.TODO(), request.NamespacedName, function)).To(gomega.Succeed())
Expand All @@ -129,7 +129,7 @@ func TestFunctionReconciler_Reconcile(t *testing.T) {
result, err = reconciler.Reconcile(ctx, request)
g.Expect(err).To(gomega.BeNil())
g.Expect(result.Requeue).To(gomega.BeFalse())
g.Expect(result.RequeueAfter).To(gomega.Equal(time.Duration(0)))
g.Expect(result.RequeueAfter).To(gomega.Equal(time.Second * 1))

function = &serverlessv1alpha1.Function{}
g.Expect(resourceClient.Get(context.TODO(), request.NamespacedName, function)).To(gomega.Succeed())
Expand Down Expand Up @@ -1007,7 +1007,7 @@ func TestFunctionReconciler_Reconcile(t *testing.T) {
result, err := reconciler.Reconcile(ctx, request)
g.Expect(err).To(gomega.BeNil())
g.Expect(result.Requeue).To(gomega.BeFalse())
g.Expect(result.RequeueAfter).To(gomega.Equal(time.Second * 0))
g.Expect(result.RequeueAfter).To(gomega.Equal(time.Second * 1))

function := &serverlessv1alpha1.Function{}
g.Expect(resourceClient.Get(context.TODO(), request.NamespacedName, function)).To(gomega.Succeed())
Expand Down Expand Up @@ -1042,7 +1042,7 @@ func TestFunctionReconciler_Reconcile(t *testing.T) {
result, err = reconciler.Reconcile(ctx, request)
g.Expect(err).To(gomega.BeNil())
g.Expect(result.Requeue).To(gomega.BeFalse())
g.Expect(result.RequeueAfter).To(gomega.Equal(time.Second * 0))
g.Expect(result.RequeueAfter).To(gomega.Equal(time.Second * 1))

g.Expect(resourceClient.Get(context.TODO(), request.NamespacedName, function)).To(gomega.Succeed())
g.Expect(function.Status.Conditions).To(gomega.HaveLen(conditionLen))
Expand All @@ -1061,7 +1061,7 @@ func TestFunctionReconciler_Reconcile(t *testing.T) {
result, err = reconciler.Reconcile(ctx, request)
g.Expect(err).To(gomega.BeNil())
g.Expect(result.Requeue).To(gomega.BeFalse())
g.Expect(result.RequeueAfter).To(gomega.Equal(time.Second * 0))
g.Expect(result.RequeueAfter).To(gomega.Equal(time.Second * 1))

hpaList := &autoscalingv1.HorizontalPodAutoscalerList{}
err = reconciler.client.ListByLabel(context.TODO(), function.GetNamespace(), fnLabels, hpaList)
Expand Down Expand Up @@ -1090,7 +1090,7 @@ func TestFunctionReconciler_Reconcile(t *testing.T) {
result, err = reconciler.Reconcile(ctx, request)
g.Expect(err).To(gomega.BeNil())
g.Expect(result.Requeue).To(gomega.BeFalse())
g.Expect(result.RequeueAfter).To(gomega.Equal(time.Duration(0)))
g.Expect(result.RequeueAfter).To(gomega.Equal(time.Second * 1))

g.Expect(resourceClient.Get(context.TODO(), request.NamespacedName, function)).To(gomega.Succeed())
g.Expect(function.Status.Conditions).To(gomega.HaveLen(conditionLen))
Expand Down Expand Up @@ -1211,7 +1211,7 @@ func TestFunctionReconciler_Reconcile(t *testing.T) {
result, err := reconciler.Reconcile(ctx, request)
g.Expect(err).To(gomega.BeNil())
g.Expect(result.Requeue).To(gomega.BeFalse())
g.Expect(result.RequeueAfter).To(gomega.Equal(time.Duration(0)))
g.Expect(result.RequeueAfter).To(gomega.Equal(time.Second * 1))

function = &serverlessv1alpha1.Function{}
g.Expect(resourceClient.Get(context.TODO(), request.NamespacedName, function)).To(gomega.Succeed())
Expand All @@ -1226,7 +1226,7 @@ func TestFunctionReconciler_Reconcile(t *testing.T) {
result, err = reconciler.Reconcile(ctx, request)
g.Expect(err).To(gomega.BeNil())
g.Expect(result.Requeue).To(gomega.BeFalse())
g.Expect(result.RequeueAfter).To(gomega.Equal(time.Duration(0)))
g.Expect(result.RequeueAfter).To(gomega.Equal(time.Second * 1))

function = &serverlessv1alpha1.Function{}
g.Expect(function.Spec.RuntimeImageOverride).To(gomega.Equal(""))
Expand Down
Loading

0 comments on commit 31f0ca6

Please sign in to comment.