From 5721c3ed54d4bd26a53743e3e6028bd85015ad6a Mon Sep 17 00:00:00 2001 From: Ilya Lesikov Date: Fri, 15 Dec 2023 20:47:02 +0300 Subject: [PATCH] feat: new high-level concurrent dynamic tracker Signed-off-by: Ilya Lesikov --- go.mod | 3 + go.sum | 6 + pkg/tracker/pod/status.go | 3 + .../dyntracker/dynamic_absence_tracker.go | 110 ++ .../dyntracker/dynamic_presence_tracker.go | 112 ++ .../dyntracker/dynamic_readiness_tracker.go | 1130 +++++++++++++++++ pkg/trackers/dyntracker/logstore/log_line.go | 8 + pkg/trackers/dyntracker/logstore/log_store.go | 21 + .../dyntracker/logstore/resource_logs.go | 58 + .../statestore/absence_task_state.go | 85 ++ .../dyntracker/statestore/attribute.go | 12 + .../dyntracker/statestore/conditions.go | 7 + pkg/trackers/dyntracker/statestore/error.go | 8 + pkg/trackers/dyntracker/statestore/event.go | 8 + .../statestore/presence_task_state.go | 86 ++ .../statestore/readiness_task_state.go | 220 ++++ .../dyntracker/statestore/resource_state.go | 105 ++ .../dyntracker/statestore/resource_status.go | 10 + .../dyntracker/statestore/task_status.go | 25 + .../dyntracker/statestore/task_store.go | 39 + pkg/trackers/dyntracker/util/concurrency.go | 36 + pkg/trackers/dyntracker/util/resource.go | 43 + 22 files changed, 2135 insertions(+) create mode 100644 pkg/trackers/dyntracker/dynamic_absence_tracker.go create mode 100644 pkg/trackers/dyntracker/dynamic_presence_tracker.go create mode 100644 pkg/trackers/dyntracker/dynamic_readiness_tracker.go create mode 100644 pkg/trackers/dyntracker/logstore/log_line.go create mode 100644 pkg/trackers/dyntracker/logstore/log_store.go create mode 100644 pkg/trackers/dyntracker/logstore/resource_logs.go create mode 100644 pkg/trackers/dyntracker/statestore/absence_task_state.go create mode 100644 pkg/trackers/dyntracker/statestore/attribute.go create mode 100644 pkg/trackers/dyntracker/statestore/conditions.go create mode 100644 pkg/trackers/dyntracker/statestore/error.go create mode 100644 pkg/trackers/dyntracker/statestore/event.go create mode 100644 pkg/trackers/dyntracker/statestore/presence_task_state.go create mode 100644 pkg/trackers/dyntracker/statestore/readiness_task_state.go create mode 100644 pkg/trackers/dyntracker/statestore/resource_state.go create mode 100644 pkg/trackers/dyntracker/statestore/resource_status.go create mode 100644 pkg/trackers/dyntracker/statestore/task_status.go create mode 100644 pkg/trackers/dyntracker/statestore/task_store.go create mode 100644 pkg/trackers/dyntracker/util/concurrency.go create mode 100644 pkg/trackers/dyntracker/util/resource.go diff --git a/go.mod b/go.mod index 6df9d1c..4c79d60 100644 --- a/go.mod +++ b/go.mod @@ -4,8 +4,10 @@ go 1.20 require ( github.com/acarl005/stripansi v0.0.0-20180116102854-5a71ef0e047d + github.com/dominikbraun/graph v0.23.0 github.com/fluxcd/flagger v1.29.0 github.com/gookit/color v1.5.2 + github.com/samber/lo v1.38.1 github.com/spf13/cobra v1.6.1 github.com/werf/logboek v0.5.5 golang.org/x/crypto v0.7.0 @@ -53,6 +55,7 @@ require ( github.com/xlab/treeprint v1.1.0 // indirect github.com/xo/terminfo v0.0.0-20210125001918-ca9a967f8778 // indirect go.starlark.net v0.0.0-20200306205701-8dd3e2ee1dd5 // indirect + golang.org/x/exp v0.0.0-20221126150942-6ab00d035af9 // indirect golang.org/x/net v0.8.0 // indirect golang.org/x/oauth2 v0.5.0 // indirect golang.org/x/sys v0.6.0 // indirect diff --git a/go.sum b/go.sum index 1174986..3dbe0aa 100644 --- a/go.sum +++ b/go.sum @@ -23,6 +23,8 @@ github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSs github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/docopt/docopt-go v0.0.0-20180111231733-ee0de3bc6815/go.mod h1:WwZ+bS3ebgob9U8Nd0kOddGdZWjyMGR8Wziv+TBNwSE= +github.com/dominikbraun/graph v0.23.0 h1:TdZB4pPqCLFxYhdyMFb1TBdFxp8XLcJfTTBQucVPgCo= +github.com/dominikbraun/graph v0.23.0/go.mod h1:yOjYyogZLY1LSG9E33JWZJiq5k83Qy2C6POAuiViluc= github.com/emicklei/go-restful/v3 v3.10.1 h1:rc42Y5YTp7Am7CS630D7JmhRjq4UlEUuEKfrDac4bSQ= github.com/emicklei/go-restful/v3 v3.10.1/go.mod h1:6n3XBCmQQb25CM2LCACGz8ukIrRry+4bhvbpWn3mrbc= github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= @@ -134,6 +136,8 @@ github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZN github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ= github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= +github.com/samber/lo v1.38.1 h1:j2XEAqXKb09Am4ebOg31SpvzUTTs6EN3VfgeLUhPdXM= +github.com/samber/lo v1.38.1/go.mod h1:+m/ZKRl6ClXCE2Lgf3MsQlWfh4bn1bz6CXEOxnEXnEA= github.com/sergi/go-diff v1.1.0 h1:we8PVUC3FE2uYfodKH/nBHMSetSfHDR6scGdBi+erh0= github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA= github.com/spf13/cobra v1.6.1 h1:o94oiPyS4KD1mPy2fmcYYHHfCxLqYjJOhGsCHFZtEzA= @@ -172,6 +176,8 @@ golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPh golang.org/x/crypto v0.7.0 h1:AvwMYaRytfdeVt3u6mLaxYtErKYjxA2OXjJ1HHq6t3A= golang.org/x/crypto v0.7.0/go.mod h1:pYwdfH91IfpZVANVyUOhSIPZaFoJGxTFbZhFTx+dXZU= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= +golang.org/x/exp v0.0.0-20221126150942-6ab00d035af9 h1:yZNXmy+j/JpX19vZkVktWqAo7Gny4PBWYYK3zskGpx4= +golang.org/x/exp v0.0.0-20221126150942-6ab00d035af9/go.mod h1:CxIveKay+FTh1D0yPZemJVgC/95VzuuOLq5Qi4xnoYc= golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE= golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU= golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc= diff --git a/pkg/tracker/pod/status.go b/pkg/tracker/pod/status.go index 0c719e5..d14bd13 100644 --- a/pkg/tracker/pod/status.go +++ b/pkg/tracker/pod/status.go @@ -12,6 +12,8 @@ import ( type PodStatus struct { corev1.PodStatus + Name string + StatusGeneration uint64 StatusIndicator *indicators.StringEqualConditionIndicator @@ -35,6 +37,7 @@ func NewPodStatus(pod *corev1.Pod, statusGeneration uint64, trackedContainers [] Age: utils.TranslateTimestampSince(pod.CreationTimestamp), StatusIndicator: &indicators.StringEqualConditionIndicator{}, StatusGeneration: statusGeneration, + Name: pod.Name, } for _, cond := range pod.Status.Conditions { diff --git a/pkg/trackers/dyntracker/dynamic_absence_tracker.go b/pkg/trackers/dyntracker/dynamic_absence_tracker.go new file mode 100644 index 0000000..6793d99 --- /dev/null +++ b/pkg/trackers/dyntracker/dynamic_absence_tracker.go @@ -0,0 +1,110 @@ +package dyntracker + +import ( + "context" + "fmt" + "io" + "time" + + apierrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/api/meta" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/dynamic" + + "github.com/werf/kubedog/pkg/trackers/dyntracker/statestore" + "github.com/werf/kubedog/pkg/trackers/dyntracker/util" +) + +type DynamicAbsenceTracker struct { + taskState *util.Concurrent[*statestore.AbsenceTaskState] + dynamicClient dynamic.Interface + mapper meta.ResettableRESTMapper + + timeout time.Duration + pollPeriod time.Duration +} + +func NewDynamicAbsenceTracker( + taskState *util.Concurrent[*statestore.AbsenceTaskState], + dynamicClient dynamic.Interface, + mapper meta.ResettableRESTMapper, + opts DynamicAbsenceTrackerOptions, +) *DynamicAbsenceTracker { + timeout := opts.Timeout + var pollPeriod time.Duration + if opts.PollPeriod != 0 { + pollPeriod = opts.PollPeriod + } else { + pollPeriod = 1 * time.Second + } + + return &DynamicAbsenceTracker{ + taskState: taskState, + dynamicClient: dynamicClient, + mapper: mapper, + timeout: timeout, + pollPeriod: pollPeriod, + } +} + +type DynamicAbsenceTrackerOptions struct { + Timeout time.Duration + PollPeriod time.Duration +} + +func (t *DynamicAbsenceTracker) Track(ctx context.Context) error { + var ( + name string + namespace string + groupVersionKind schema.GroupVersionKind + ) + t.taskState.RTransaction(func(ts *statestore.AbsenceTaskState) { + name = ts.Name() + namespace = ts.Namespace() + groupVersionKind = ts.GroupVersionKind() + }) + + namespaced, err := util.IsNamespaced(groupVersionKind, t.mapper) + if err != nil { + return fmt.Errorf("check if namespaced: %w", err) + } + + gvr, err := util.GVRFromGVK(groupVersionKind, t.mapper) + if err != nil { + return fmt.Errorf("get GroupVersionResource: %w", err) + } + + var resourceClient dynamic.ResourceInterface + if namespaced { + resourceClient = t.dynamicClient.Resource(gvr).Namespace(namespace) + } else { + resourceClient = t.dynamicClient.Resource(gvr) + } + + resourceHumanID, err := util.ResourceHumanID(name, namespace, groupVersionKind, t.mapper) + if err != nil { + return fmt.Errorf("get resource human ID: %w", err) + } + + if err := wait.PollImmediate(t.pollPeriod, t.timeout, func() (bool, error) { + if _, err := resourceClient.Get(ctx, name, metav1.GetOptions{}); err != nil { + if apierrors.IsResourceExpired(err) || apierrors.IsGone(err) || err == io.EOF || err == io.ErrUnexpectedEOF { + return false, nil + } + + if apierrors.IsNotFound(err) { + return true, nil + } + + return false, fmt.Errorf("get resource %q: %w", resourceHumanID, err) + } + + return false, nil + }); err != nil { + return fmt.Errorf("poll resource %q: %w", resourceHumanID, err) + } + + return nil +} diff --git a/pkg/trackers/dyntracker/dynamic_presence_tracker.go b/pkg/trackers/dyntracker/dynamic_presence_tracker.go new file mode 100644 index 0000000..598785c --- /dev/null +++ b/pkg/trackers/dyntracker/dynamic_presence_tracker.go @@ -0,0 +1,112 @@ +package dyntracker + +import ( + "context" + "fmt" + "io" + "time" + + apierrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/api/meta" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/dynamic" + + "github.com/werf/kubedog/pkg/trackers/dyntracker/statestore" + "github.com/werf/kubedog/pkg/trackers/dyntracker/util" +) + +type DynamicPresenceTracker struct { + taskState *util.Concurrent[*statestore.PresenceTaskState] + dynamicClient dynamic.Interface + mapper meta.ResettableRESTMapper + + timeout time.Duration + pollPeriod time.Duration +} + +func NewDynamicPresenceTracker( + taskState *util.Concurrent[*statestore.PresenceTaskState], + dynamicClient dynamic.Interface, + mapper meta.ResettableRESTMapper, + opts DynamicPresenceTrackerOptions, +) *DynamicPresenceTracker { + var timeout time.Duration + if opts.Timeout != 0 { + timeout = opts.Timeout + } else { + timeout = 5 * time.Minute + } + + var pollPeriod time.Duration + if opts.PollPeriod != 0 { + pollPeriod = opts.PollPeriod + } else { + pollPeriod = 1 * time.Second + } + + return &DynamicPresenceTracker{ + taskState: taskState, + dynamicClient: dynamicClient, + mapper: mapper, + timeout: timeout, + pollPeriod: pollPeriod, + } +} + +type DynamicPresenceTrackerOptions struct { + Timeout time.Duration + PollPeriod time.Duration +} + +func (t *DynamicPresenceTracker) Track(ctx context.Context) error { + var ( + name string + namespace string + groupVersionKind schema.GroupVersionKind + ) + t.taskState.RTransaction(func(ts *statestore.PresenceTaskState) { + name = ts.Name() + namespace = ts.Namespace() + groupVersionKind = ts.GroupVersionKind() + }) + + namespaced, err := util.IsNamespaced(groupVersionKind, t.mapper) + if err != nil { + return fmt.Errorf("check if namespaced: %w", err) + } + + gvr, err := util.GVRFromGVK(groupVersionKind, t.mapper) + if err != nil { + return fmt.Errorf("get GroupVersionResource: %w", err) + } + + var resourceClient dynamic.ResourceInterface + if namespaced { + resourceClient = t.dynamicClient.Resource(gvr).Namespace(namespace) + } else { + resourceClient = t.dynamicClient.Resource(gvr) + } + + resourceHumanID, err := util.ResourceHumanID(name, namespace, groupVersionKind, t.mapper) + if err != nil { + return fmt.Errorf("get resource human ID: %w", err) + } + + if err := wait.PollImmediate(t.pollPeriod, t.timeout, func() (bool, error) { + if _, err := resourceClient.Get(ctx, name, metav1.GetOptions{}); err != nil { + if apierrors.IsResourceExpired(err) || apierrors.IsGone(err) || err == io.EOF || err == io.ErrUnexpectedEOF || apierrors.IsNotFound(err) { + return false, nil + } + + return false, fmt.Errorf("get resource %q: %w", resourceHumanID, err) + } + + return true, nil + }); err != nil { + return fmt.Errorf("poll resource %q: %w", resourceHumanID, err) + } + + return nil +} diff --git a/pkg/trackers/dyntracker/dynamic_readiness_tracker.go b/pkg/trackers/dyntracker/dynamic_readiness_tracker.go new file mode 100644 index 0000000..6fc7d4d --- /dev/null +++ b/pkg/trackers/dyntracker/dynamic_readiness_tracker.go @@ -0,0 +1,1130 @@ +package dyntracker + +import ( + "context" + "errors" + "fmt" + "time" + + "github.com/samber/lo" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/meta" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/client-go/discovery" + "k8s.io/client-go/dynamic" + "k8s.io/client-go/kubernetes" + watchtools "k8s.io/client-go/tools/watch" + + commontracker "github.com/werf/kubedog/pkg/tracker" + "github.com/werf/kubedog/pkg/tracker/canary" + "github.com/werf/kubedog/pkg/tracker/daemonset" + "github.com/werf/kubedog/pkg/tracker/deployment" + "github.com/werf/kubedog/pkg/tracker/generic" + "github.com/werf/kubedog/pkg/tracker/job" + "github.com/werf/kubedog/pkg/tracker/pod" + "github.com/werf/kubedog/pkg/tracker/replicaset" + "github.com/werf/kubedog/pkg/tracker/resid" + "github.com/werf/kubedog/pkg/tracker/statefulset" + "github.com/werf/kubedog/pkg/trackers/dyntracker/logstore" + "github.com/werf/kubedog/pkg/trackers/dyntracker/statestore" + "github.com/werf/kubedog/pkg/trackers/dyntracker/util" +) + +type DynamicReadinessTracker struct { + taskState *util.Concurrent[*statestore.ReadinessTaskState] + logStore *util.Concurrent[*logstore.LogStore] + tracker any + + timeout time.Duration + noActivityTimeout time.Duration +} + +func NewDynamicReadinessTracker( + ctx context.Context, + taskState *util.Concurrent[*statestore.ReadinessTaskState], + logStore *util.Concurrent[*logstore.LogStore], + staticClient kubernetes.Interface, + dynamicClient dynamic.Interface, + discoveryClient discovery.CachedDiscoveryInterface, + mapper meta.ResettableRESTMapper, + opts DynamicReadinessTrackerOptions, +) *DynamicReadinessTracker { + timeout := opts.Timeout + logsFromTime := opts.LogsFromTime + + var noActivityTimeout time.Duration + if opts.NoActivityTimeout != 0 { + noActivityTimeout = opts.NoActivityTimeout + } else { + noActivityTimeout = 4 * time.Minute + } + + ignoreReadinessProbeFailsByContainerName := make(map[string]time.Duration) + if opts.IgnoreReadinessProbeFailsByContainerName != nil { + ignoreReadinessProbeFailsByContainerName = opts.IgnoreReadinessProbeFailsByContainerName + } + + var ( + resourceName string + resourceNamespace string + resourceGVK schema.GroupVersionKind + ) + taskState.RTransaction(func(ts *statestore.ReadinessTaskState) { + resourceName = ts.Name() + resourceNamespace = ts.Namespace() + resourceGVK = ts.GroupVersionKind() + }) + + var tracker any + switch resourceGVK.GroupKind() { + case schema.GroupKind{Group: "apps", Kind: "Deployment"}, schema.GroupKind{Group: "extensions", Kind: "Deployment"}: + tracker = deployment.NewTracker(resourceName, resourceNamespace, staticClient, commontracker.Options{ + ParentContext: ctx, + Timeout: timeout, + LogsFromTime: logsFromTime, + IgnoreReadinessProbeFailsByContainerName: ignoreReadinessProbeFailsByContainerName, + }) + case schema.GroupKind{Group: "apps", Kind: "DaemonSet"}, schema.GroupKind{Group: "extensions", Kind: "DaemonSet"}: + tracker = daemonset.NewTracker(resourceName, resourceNamespace, staticClient, commontracker.Options{ + ParentContext: ctx, + Timeout: timeout, + LogsFromTime: logsFromTime, + IgnoreReadinessProbeFailsByContainerName: ignoreReadinessProbeFailsByContainerName, + }) + case schema.GroupKind{Group: "flagger.app", Kind: "Canary"}: + tracker = canary.NewTracker(resourceName, resourceNamespace, staticClient, commontracker.Options{ + ParentContext: ctx, + Timeout: timeout, + LogsFromTime: logsFromTime, + IgnoreReadinessProbeFailsByContainerName: ignoreReadinessProbeFailsByContainerName, + }) + case schema.GroupKind{Group: "apps", Kind: "StatefulSet"}: + tracker = statefulset.NewTracker(resourceName, resourceNamespace, staticClient, commontracker.Options{ + ParentContext: ctx, + Timeout: timeout, + LogsFromTime: logsFromTime, + IgnoreReadinessProbeFailsByContainerName: ignoreReadinessProbeFailsByContainerName, + }) + case schema.GroupKind{Group: "batch", Kind: "Job"}: + tracker = job.NewTracker(resourceName, resourceNamespace, staticClient, commontracker.Options{ + ParentContext: ctx, + Timeout: timeout, + LogsFromTime: logsFromTime, + IgnoreReadinessProbeFailsByContainerName: ignoreReadinessProbeFailsByContainerName, + }) + default: + resid := resid.NewResourceID(resourceName, resourceGVK, resid.NewResourceIDOptions{ + Namespace: resourceNamespace, + }) + + tracker = generic.NewTracker(resid, staticClient, dynamicClient, discoveryClient, mapper) + } + + return &DynamicReadinessTracker{ + taskState: taskState, + logStore: logStore, + tracker: tracker, + timeout: timeout, + noActivityTimeout: noActivityTimeout, + } +} + +type DynamicReadinessTrackerOptions struct { + Timeout time.Duration + NoActivityTimeout time.Duration + LogsFromTime time.Time + IgnoreReadinessProbeFailsByContainerName map[string]time.Duration +} + +func (t *DynamicReadinessTracker) Track(ctx context.Context) error { + switch tracker := t.tracker.(type) { + case *deployment.Tracker: + if err := t.trackDeployment(ctx, tracker); err != nil { + return fmt.Errorf("track deployment: %w", err) + } + case *statefulset.Tracker: + if err := t.trackStatefulSet(ctx, tracker); err != nil { + return fmt.Errorf("track statefulset: %w", err) + } + case *daemonset.Tracker: + if err := t.trackDaemonSet(ctx, tracker); err != nil { + return fmt.Errorf("track daemonset: %w", err) + } + case *job.Tracker: + if err := t.trackJob(ctx, tracker); err != nil { + return fmt.Errorf("track job: %w", err) + } + case *canary.Tracker: + if err := t.trackCanary(ctx, tracker); err != nil { + return fmt.Errorf("track canary: %w", err) + } + case *generic.Tracker: + if err := t.trackGeneric(ctx, tracker); err != nil { + return fmt.Errorf("track generic: %w", err) + } + default: + panic("unexpected tracker") + } + + return nil +} + +func (t *DynamicReadinessTracker) trackDeployment(ctx context.Context, tracker *deployment.Tracker) error { + trackCtx, trackCtxCancelFn := watchtools.ContextWithOptionalTimeout(ctx, t.timeout) + defer trackCtxCancelFn() + + trackErrCh := make(chan error) + go func() { + trackErrCh <- tracker.Track(trackCtx) + }() + + for { + select { + case err := <-trackErrCh: + if err != nil && !errors.Is(err, commontracker.ErrStopTrack) { + return fmt.Errorf("track: %w", err) + } + + return nil + case status := <-tracker.Added: + var ( + abort bool + abortErr error + ) + t.taskState.RWTransaction(func(ts *statestore.ReadinessTaskState) { + t.addMissingPodsStatesFromDeploymentStatus(&status, ts) + t.handleDeploymentStatus(&status, ts) + abort, abortErr = t.handleTaskStateStatus(ts) + }) + + if abort { + return abortErr + } + case status := <-tracker.Ready: + var ( + abort bool + abortErr error + ) + t.taskState.RWTransaction(func(ts *statestore.ReadinessTaskState) { + t.addMissingPodsStatesFromDeploymentStatus(&status, ts) + t.handleDeploymentStatus(&status, ts) + abort, abortErr = t.handleTaskStateStatus(ts) + }) + + if abort { + return abortErr + } + case report := <-tracker.AddedReplicaSet: + var ( + abort bool + abortErr error + ) + t.taskState.RWTransaction(func(ts *statestore.ReadinessTaskState) { + t.addMissingPodsStatesFromDeploymentStatus(&report.DeploymentStatus, ts) + t.handleDeploymentStatus(&report.DeploymentStatus, ts) + abort, abortErr = t.handleTaskStateStatus(ts) + }) + + if abort { + return abortErr + } + case report := <-tracker.AddedPod: + var ( + abort bool + abortErr error + ) + t.taskState.RWTransaction(func(ts *statestore.ReadinessTaskState) { + t.addMissingPodsStatesFromDeploymentStatus(&report.DeploymentStatus, ts) + t.addMissingPodsStatesFromDeploymentPodAddedReport(&report, ts) + t.handleDeploymentStatus(&report.DeploymentStatus, ts) + abort, abortErr = t.handleTaskStateStatus(ts) + }) + + if abort { + return abortErr + } + case status := <-tracker.Failed: + var ( + abort bool + abortErr error + ) + t.taskState.RWTransaction(func(ts *statestore.ReadinessTaskState) { + t.addMissingPodsStatesFromDeploymentStatus(&status, ts) + t.handleDeploymentStatus(&status, ts) + abort, abortErr = t.handleTaskStateStatus(ts) + }) + + if abort { + return abortErr + } + case status := <-tracker.Status: + var ( + abort bool + abortErr error + ) + t.taskState.RWTransaction(func(ts *statestore.ReadinessTaskState) { + t.addMissingPodsStatesFromDeploymentStatus(&status, ts) + t.handleDeploymentStatus(&status, ts) + abort, abortErr = t.handleTaskStateStatus(ts) + }) + + if abort { + return abortErr + } + case logChunk := <-tracker.PodLogChunk: + t.taskState.RTransaction(func(ts *statestore.ReadinessTaskState) { + t.logStore.RWTransaction(func(ls *logstore.LogStore) { + t.handleReplicaSetPodLogChunk(logChunk, ls, ts) + }) + }) + case msg := <-tracker.EventMsg: + t.taskState.RTransaction(func(ts *statestore.ReadinessTaskState) { + t.handleEventMessage(msg, ts, time.Now()) + }) + case report := <-tracker.PodError: + var ( + abort bool + abortErr error + ) + t.taskState.RWTransaction(func(ts *statestore.ReadinessTaskState) { + t.addMissingPodsStatesFromDeploymentStatus(&report.DeploymentStatus, ts) + t.handleDeploymentStatus(&report.DeploymentStatus, ts) + t.handleReplicaSetPodError(&report.ReplicaSetPodError, ts) + abort, abortErr = t.handleTaskStateStatus(ts) + }) + + if abort { + return abortErr + } + } + } +} + +func (t *DynamicReadinessTracker) trackStatefulSet(ctx context.Context, tracker *statefulset.Tracker) error { + trackCtx, trackCtxCancelFn := watchtools.ContextWithOptionalTimeout(ctx, t.timeout) + defer trackCtxCancelFn() + + trackErrCh := make(chan error) + go func() { + trackErrCh <- tracker.Track(trackCtx) + }() + + for { + select { + case err := <-trackErrCh: + if err != nil && !errors.Is(err, commontracker.ErrStopTrack) { + return fmt.Errorf("track: %w", err) + } + + return nil + case status := <-tracker.Added: + var ( + abort bool + abortErr error + ) + t.taskState.RWTransaction(func(ts *statestore.ReadinessTaskState) { + t.addMissingPodsStatesFromStatefulSetStatus(&status, ts) + t.handleStatefulSetStatus(&status, ts) + abort, abortErr = t.handleTaskStateStatus(ts) + }) + + if abort { + return abortErr + } + case status := <-tracker.Ready: + var ( + abort bool + abortErr error + ) + t.taskState.RWTransaction(func(ts *statestore.ReadinessTaskState) { + t.addMissingPodsStatesFromStatefulSetStatus(&status, ts) + t.handleStatefulSetStatus(&status, ts) + abort, abortErr = t.handleTaskStateStatus(ts) + }) + + if abort { + return abortErr + } + case report := <-tracker.AddedPod: + var ( + abort bool + abortErr error + ) + t.taskState.RWTransaction(func(ts *statestore.ReadinessTaskState) { + t.addMissingPodsStatesFromStatefulSetStatus(&report.StatefulSetStatus, ts) + t.addMissingPodsStatesFromStatefulSetPodAddedReport(&report, ts) + t.handleStatefulSetStatus(&report.StatefulSetStatus, ts) + abort, abortErr = t.handleTaskStateStatus(ts) + }) + + if abort { + return abortErr + } + case status := <-tracker.Failed: + var ( + abort bool + abortErr error + ) + t.taskState.RWTransaction(func(ts *statestore.ReadinessTaskState) { + t.addMissingPodsStatesFromStatefulSetStatus(&status, ts) + t.handleStatefulSetStatus(&status, ts) + abort, abortErr = t.handleTaskStateStatus(ts) + }) + + if abort { + return abortErr + } + case status := <-tracker.Status: + var ( + abort bool + abortErr error + ) + t.taskState.RWTransaction(func(ts *statestore.ReadinessTaskState) { + t.addMissingPodsStatesFromStatefulSetStatus(&status, ts) + t.handleStatefulSetStatus(&status, ts) + abort, abortErr = t.handleTaskStateStatus(ts) + }) + + if abort { + return abortErr + } + case logChunk := <-tracker.PodLogChunk: + t.taskState.RTransaction(func(ts *statestore.ReadinessTaskState) { + t.logStore.RWTransaction(func(ls *logstore.LogStore) { + t.handleReplicaSetPodLogChunk(logChunk, ls, ts) + }) + }) + case msg := <-tracker.EventMsg: + t.taskState.RTransaction(func(ts *statestore.ReadinessTaskState) { + t.handleEventMessage(msg, ts, time.Now()) + }) + case report := <-tracker.PodError: + var ( + abort bool + abortErr error + ) + t.taskState.RWTransaction(func(ts *statestore.ReadinessTaskState) { + t.addMissingPodsStatesFromStatefulSetStatus(&report.StatefulSetStatus, ts) + t.handleStatefulSetStatus(&report.StatefulSetStatus, ts) + t.handleReplicaSetPodError(&report.ReplicaSetPodError, ts) + abort, abortErr = t.handleTaskStateStatus(ts) + }) + + if abort { + return abortErr + } + } + } +} + +func (t *DynamicReadinessTracker) trackDaemonSet(ctx context.Context, tracker *daemonset.Tracker) error { + trackCtx, trackCtxCancelFn := watchtools.ContextWithOptionalTimeout(ctx, t.timeout) + defer trackCtxCancelFn() + + trackErrCh := make(chan error) + go func() { + trackErrCh <- tracker.Track(trackCtx) + }() + + for { + select { + case err := <-trackErrCh: + if err != nil && !errors.Is(err, commontracker.ErrStopTrack) { + return fmt.Errorf("track: %w", err) + } + + return nil + case status := <-tracker.Added: + var ( + abort bool + abortErr error + ) + t.taskState.RWTransaction(func(ts *statestore.ReadinessTaskState) { + t.addMissingPodsStatesFromDaemonSetStatus(&status, ts) + t.handleDaemonSetStatus(&status, ts) + abort, abortErr = t.handleTaskStateStatus(ts) + }) + + if abort { + return abortErr + } + case status := <-tracker.Ready: + var ( + abort bool + abortErr error + ) + t.taskState.RWTransaction(func(ts *statestore.ReadinessTaskState) { + t.addMissingPodsStatesFromDaemonSetStatus(&status, ts) + t.handleDaemonSetStatus(&status, ts) + abort, abortErr = t.handleTaskStateStatus(ts) + }) + + if abort { + return abortErr + } + case report := <-tracker.AddedPod: + var ( + abort bool + abortErr error + ) + t.taskState.RWTransaction(func(ts *statestore.ReadinessTaskState) { + t.addMissingPodsStatesFromDaemonSetStatus(&report.DaemonSetStatus, ts) + t.addMissingPodsStatesFromDaemonSetPodAddedReport(&report, ts) + t.handleDaemonSetStatus(&report.DaemonSetStatus, ts) + abort, abortErr = t.handleTaskStateStatus(ts) + }) + + if abort { + return abortErr + } + case status := <-tracker.Failed: + var ( + abort bool + abortErr error + ) + t.taskState.RWTransaction(func(ts *statestore.ReadinessTaskState) { + t.addMissingPodsStatesFromDaemonSetStatus(&status, ts) + t.handleDaemonSetStatus(&status, ts) + abort, abortErr = t.handleTaskStateStatus(ts) + }) + + if abort { + return abortErr + } + case status := <-tracker.Status: + var ( + abort bool + abortErr error + ) + t.taskState.RWTransaction(func(ts *statestore.ReadinessTaskState) { + t.addMissingPodsStatesFromDaemonSetStatus(&status, ts) + t.handleDaemonSetStatus(&status, ts) + abort, abortErr = t.handleTaskStateStatus(ts) + }) + + if abort { + return abortErr + } + case logChunk := <-tracker.PodLogChunk: + t.taskState.RTransaction(func(ts *statestore.ReadinessTaskState) { + t.logStore.RWTransaction(func(ls *logstore.LogStore) { + t.handleReplicaSetPodLogChunk(logChunk, ls, ts) + }) + }) + case msg := <-tracker.EventMsg: + t.taskState.RTransaction(func(ts *statestore.ReadinessTaskState) { + t.handleEventMessage(msg, ts, time.Now()) + }) + case report := <-tracker.PodError: + var ( + abort bool + abortErr error + ) + t.taskState.RWTransaction(func(ts *statestore.ReadinessTaskState) { + t.addMissingPodsStatesFromDaemonSetStatus(&report.DaemonSetStatus, ts) + t.handleDaemonSetStatus(&report.DaemonSetStatus, ts) + t.handleReplicaSetPodError(&report.PodError, ts) + abort, abortErr = t.handleTaskStateStatus(ts) + }) + + if abort { + return abortErr + } + } + } +} + +func (t *DynamicReadinessTracker) trackJob(ctx context.Context, tracker *job.Tracker) error { + trackCtx, trackCtxCancelFn := watchtools.ContextWithOptionalTimeout(ctx, t.timeout) + defer trackCtxCancelFn() + + trackErrCh := make(chan error) + go func() { + trackErrCh <- tracker.Track(trackCtx) + }() + + for { + select { + case err := <-trackErrCh: + if err != nil && !errors.Is(err, commontracker.ErrStopTrack) { + return fmt.Errorf("track: %w", err) + } + + return nil + case status := <-tracker.Added: + var ( + abort bool + abortErr error + ) + t.taskState.RWTransaction(func(ts *statestore.ReadinessTaskState) { + t.addMissingPodsStatesFromJobStatus(&status, ts) + t.handleJobStatus(&status, ts) + abort, abortErr = t.handleTaskStateStatus(ts) + }) + + if abort { + return abortErr + } + case status := <-tracker.Succeeded: + var ( + abort bool + abortErr error + ) + t.taskState.RWTransaction(func(ts *statestore.ReadinessTaskState) { + t.addMissingPodsStatesFromJobStatus(&status, ts) + t.handleJobStatus(&status, ts) + abort, abortErr = t.handleTaskStateStatus(ts) + }) + + if abort { + return abortErr + } + case report := <-tracker.AddedPod: + var ( + abort bool + abortErr error + ) + t.taskState.RWTransaction(func(ts *statestore.ReadinessTaskState) { + t.addMissingPodsStatesFromJobStatus(&report.JobStatus, ts) + t.addMissingPodsStatesFromJobPodAddedReport(&report, ts) + t.handleJobStatus(&report.JobStatus, ts) + abort, abortErr = t.handleTaskStateStatus(ts) + }) + + if abort { + return abortErr + } + case status := <-tracker.Failed: + var ( + abort bool + abortErr error + ) + t.taskState.RWTransaction(func(ts *statestore.ReadinessTaskState) { + t.addMissingPodsStatesFromJobStatus(&status, ts) + t.handleJobStatus(&status, ts) + abort, abortErr = t.handleTaskStateStatus(ts) + }) + + if abort { + return abortErr + } + case status := <-tracker.Status: + var ( + abort bool + abortErr error + ) + t.taskState.RWTransaction(func(ts *statestore.ReadinessTaskState) { + t.addMissingPodsStatesFromJobStatus(&status, ts) + t.handleJobStatus(&status, ts) + abort, abortErr = t.handleTaskStateStatus(ts) + }) + + if abort { + return abortErr + } + case logChunk := <-tracker.PodLogChunk: + t.taskState.RTransaction(func(ts *statestore.ReadinessTaskState) { + t.logStore.RWTransaction(func(ls *logstore.LogStore) { + t.handlePodLogChunk(logChunk, ls, ts) + }) + }) + case msg := <-tracker.EventMsg: + t.taskState.RTransaction(func(ts *statestore.ReadinessTaskState) { + t.handleEventMessage(msg, ts, time.Now()) + }) + case report := <-tracker.PodError: + var ( + abort bool + abortErr error + ) + t.taskState.RWTransaction(func(ts *statestore.ReadinessTaskState) { + t.addMissingPodsStatesFromJobStatus(&report.JobStatus, ts) + t.handleJobStatus(&report.JobStatus, ts) + t.handlePodError(&report.PodError, ts) + abort, abortErr = t.handleTaskStateStatus(ts) + }) + + if abort { + return abortErr + } + } + } +} + +func (t *DynamicReadinessTracker) trackCanary(ctx context.Context, tracker *canary.Tracker) error { + trackCtx, trackCtxCancelFn := watchtools.ContextWithOptionalTimeout(ctx, t.timeout) + defer trackCtxCancelFn() + + trackErrCh := make(chan error) + go func() { + trackErrCh <- tracker.Track(trackCtx) + }() + + for { + select { + case err := <-trackErrCh: + if err != nil && !errors.Is(err, commontracker.ErrStopTrack) { + return fmt.Errorf("track: %w", err) + } + + return nil + case status := <-tracker.Added: + var ( + abort bool + abortErr error + ) + t.taskState.RWTransaction(func(ts *statestore.ReadinessTaskState) { + t.handleCanaryStatus(&status, ts) + abort, abortErr = t.handleTaskStateStatus(ts) + }) + + if abort { + return abortErr + } + case status := <-tracker.Succeeded: + var ( + abort bool + abortErr error + ) + t.taskState.RWTransaction(func(ts *statestore.ReadinessTaskState) { + t.handleCanaryStatus(&status, ts) + abort, abortErr = t.handleTaskStateStatus(ts) + }) + + if abort { + return abortErr + } + case status := <-tracker.Failed: + var ( + abort bool + abortErr error + ) + t.taskState.RWTransaction(func(ts *statestore.ReadinessTaskState) { + t.handleCanaryStatus(&status, ts) + abort, abortErr = t.handleTaskStateStatus(ts) + }) + + if abort { + return abortErr + } + case status := <-tracker.Status: + var ( + abort bool + abortErr error + ) + t.taskState.RWTransaction(func(ts *statestore.ReadinessTaskState) { + t.handleCanaryStatus(&status, ts) + abort, abortErr = t.handleTaskStateStatus(ts) + }) + + if abort { + return abortErr + } + case msg := <-tracker.EventMsg: + t.taskState.RTransaction(func(ts *statestore.ReadinessTaskState) { + t.handleEventMessage(msg, ts, time.Now()) + }) + } + } +} + +func (t *DynamicReadinessTracker) trackGeneric(ctx context.Context, tracker *generic.Tracker) error { + trackCtx, trackCtxCancelFn := watchtools.ContextWithOptionalTimeout(ctx, t.timeout) + defer trackCtxCancelFn() + + addedCh := make(chan *generic.ResourceStatus) + succeededCh := make(chan *generic.ResourceStatus) + failedCh := make(chan *generic.ResourceStatus) + regularCh := make(chan *generic.ResourceStatus, 100) + eventCh := make(chan *corev1.Event) + + trackErrCh := make(chan error) + go func() { + trackErrCh <- tracker.Track(trackCtx, t.noActivityTimeout, addedCh, succeededCh, failedCh, regularCh, eventCh) + }() + + for { + select { + case err := <-trackErrCh: + if err != nil && !errors.Is(err, commontracker.ErrStopTrack) { + return fmt.Errorf("track: %w", err) + } + + return nil + case status := <-addedCh: + var ( + abort bool + abortErr error + ) + t.taskState.RWTransaction(func(ts *statestore.ReadinessTaskState) { + t.handleGenericResourceStatus(status, ts) + abort, abortErr = t.handleTaskStateStatus(ts) + }) + + if abort { + return abortErr + } + case status := <-succeededCh: + var ( + abort bool + abortErr error + ) + t.taskState.RWTransaction(func(ts *statestore.ReadinessTaskState) { + t.handleGenericResourceStatus(status, ts) + abort, abortErr = t.handleTaskStateStatus(ts) + }) + + if abort { + return abortErr + } + case status := <-failedCh: + var ( + abort bool + abortErr error + ) + t.taskState.RWTransaction(func(ts *statestore.ReadinessTaskState) { + t.handleGenericResourceStatus(status, ts) + abort, abortErr = t.handleTaskStateStatus(ts) + }) + + if abort { + return abortErr + } + case status := <-regularCh: + var ( + abort bool + abortErr error + ) + t.taskState.RWTransaction(func(ts *statestore.ReadinessTaskState) { + t.handleGenericResourceStatus(status, ts) + abort, abortErr = t.handleTaskStateStatus(ts) + }) + + if abort { + return abortErr + } + case event := <-eventCh: + t.taskState.RTransaction(func(ts *statestore.ReadinessTaskState) { + t.handleEventMessage(event.Message, ts, event.EventTime.Time) + }) + } + } +} + +func (t *DynamicReadinessTracker) addMissingPodsStatesFromDeploymentStatus(status *deployment.DeploymentStatus, taskState *statestore.ReadinessTaskState) { + pods := lo.PickBy(status.Pods, func(_ string, podStatus pod.PodStatus) bool { + return lo.Contains(status.NewPodsNames, podStatus.Name) + }) + + for _, pod := range pods { + taskState.AddResourceState(pod.Name, taskState.Namespace(), podGvk) + + taskState.AddDependency(taskState.Name(), taskState.Namespace(), taskState.GroupVersionKind(), pod.Name, taskState.Namespace(), podGvk) + } +} + +func (t *DynamicReadinessTracker) addMissingPodsStatesFromStatefulSetStatus(status *statefulset.StatefulSetStatus, taskState *statestore.ReadinessTaskState) { + pods := lo.PickBy(status.Pods, func(_ string, podStatus pod.PodStatus) bool { + return lo.Contains(status.NewPodsNames, podStatus.Name) + }) + + for _, pod := range pods { + taskState.AddResourceState(pod.Name, taskState.Namespace(), podGvk) + + taskState.AddDependency(taskState.Name(), taskState.Namespace(), taskState.GroupVersionKind(), pod.Name, taskState.Namespace(), podGvk) + } +} + +func (t *DynamicReadinessTracker) addMissingPodsStatesFromDaemonSetStatus(status *daemonset.DaemonSetStatus, taskState *statestore.ReadinessTaskState) { + pods := lo.PickBy(status.Pods, func(_ string, podStatus pod.PodStatus) bool { + return lo.Contains(status.NewPodsNames, podStatus.Name) + }) + + for _, pod := range pods { + taskState.AddResourceState(pod.Name, taskState.Namespace(), podGvk) + + taskState.AddDependency(taskState.Name(), taskState.Namespace(), taskState.GroupVersionKind(), pod.Name, taskState.Namespace(), podGvk) + } +} + +func (t *DynamicReadinessTracker) addMissingPodsStatesFromJobStatus(status *job.JobStatus, taskState *statestore.ReadinessTaskState) { + for _, pod := range status.Pods { + taskState.AddResourceState(pod.Name, taskState.Namespace(), podGvk) + + taskState.AddDependency(taskState.Name(), taskState.Namespace(), taskState.GroupVersionKind(), pod.Name, taskState.Namespace(), podGvk) + } +} + +func (t *DynamicReadinessTracker) addMissingPodsStatesFromDeploymentPodAddedReport(report *deployment.PodAddedReport, taskState *statestore.ReadinessTaskState) { + if !report.ReplicaSetPod.ReplicaSet.IsNew { + return + } + + taskState.AddResourceState(report.ReplicaSetPod.Name, taskState.Namespace(), podGvk) + + taskState.AddDependency(taskState.Name(), taskState.Namespace(), taskState.GroupVersionKind(), report.ReplicaSetPod.Name, taskState.Namespace(), podGvk) +} + +func (t *DynamicReadinessTracker) addMissingPodsStatesFromStatefulSetPodAddedReport(report *statefulset.PodAddedReport, taskState *statestore.ReadinessTaskState) { + if !report.ReplicaSetPod.ReplicaSet.IsNew { + return + } + + taskState.AddResourceState(report.ReplicaSetPod.Name, taskState.Namespace(), podGvk) + + taskState.AddDependency(taskState.Name(), taskState.Namespace(), taskState.GroupVersionKind(), report.ReplicaSetPod.Name, taskState.Namespace(), podGvk) +} + +func (t *DynamicReadinessTracker) addMissingPodsStatesFromDaemonSetPodAddedReport(report *daemonset.PodAddedReport, taskState *statestore.ReadinessTaskState) { + if !report.Pod.ReplicaSet.IsNew { + return + } + + taskState.AddResourceState(report.Pod.Name, taskState.Namespace(), podGvk) + + taskState.AddDependency(taskState.Name(), taskState.Namespace(), taskState.GroupVersionKind(), report.Pod.Name, taskState.Namespace(), podGvk) +} + +func (t *DynamicReadinessTracker) addMissingPodsStatesFromJobPodAddedReport(report *job.PodAddedReport, taskState *statestore.ReadinessTaskState) { + taskState.AddResourceState(report.PodName, taskState.Namespace(), podGvk) + + taskState.AddDependency(taskState.Name(), taskState.Namespace(), taskState.GroupVersionKind(), report.PodName, taskState.Namespace(), podGvk) +} + +func (t *DynamicReadinessTracker) handleDeploymentStatus(status *deployment.DeploymentStatus, taskState *statestore.ReadinessTaskState) { + if status.ReplicasIndicator != nil { + replicasAttr := statestore.Attribute[int]{ + Value: int(status.ReplicasIndicator.TargetValue), + Internal: true, + } + + taskState.ResourceState(taskState.Name(), taskState.Namespace(), taskState.GroupVersionKind()).RWTransaction(func(rs *statestore.ResourceState) { + rs.SetAttribute(statestore.AttributeNameRequiredReplicas, replicasAttr) + }) + } + + if status.IsFailed { + taskState.ResourceState(taskState.Name(), taskState.Namespace(), taskState.GroupVersionKind()).RWTransaction(func(rs *statestore.ResourceState) { + rs.AddError(errors.New(status.FailedReason), "", time.Now()) + }) + + return + } + + if status.IsReady { + for _, state := range taskState.ResourceStates() { + state.RWTransaction(func(rs *statestore.ResourceState) { + rs.SetStatus(statestore.ResourceStatusReady) + }) + } + } +} + +func (t *DynamicReadinessTracker) handleStatefulSetStatus(status *statefulset.StatefulSetStatus, taskState *statestore.ReadinessTaskState) { + if status.ReplicasIndicator != nil { + replicasAttr := statestore.Attribute[int]{ + Value: int(status.ReplicasIndicator.TargetValue), + Internal: true, + } + + taskState.ResourceState(taskState.Name(), taskState.Namespace(), taskState.GroupVersionKind()).RWTransaction(func(rs *statestore.ResourceState) { + rs.SetAttribute(statestore.AttributeNameRequiredReplicas, replicasAttr) + }) + } + + if status.IsFailed { + taskState.ResourceState(taskState.Name(), taskState.Namespace(), taskState.GroupVersionKind()).RWTransaction(func(rs *statestore.ResourceState) { + rs.AddError(errors.New(status.FailedReason), "", time.Now()) + }) + + return + } + + if status.IsReady { + for _, state := range taskState.ResourceStates() { + state.RWTransaction(func(rs *statestore.ResourceState) { + rs.SetStatus(statestore.ResourceStatusReady) + }) + } + } +} + +func (t *DynamicReadinessTracker) handleDaemonSetStatus(status *daemonset.DaemonSetStatus, taskState *statestore.ReadinessTaskState) { + if status.ReplicasIndicator != nil { + replicasAttr := statestore.Attribute[int]{ + Value: int(status.ReplicasIndicator.TargetValue), + Internal: true, + } + + taskState.ResourceState(taskState.Name(), taskState.Namespace(), taskState.GroupVersionKind()).RWTransaction(func(rs *statestore.ResourceState) { + rs.SetAttribute(statestore.AttributeNameRequiredReplicas, replicasAttr) + }) + } + + if status.IsFailed { + taskState.ResourceState(taskState.Name(), taskState.Namespace(), taskState.GroupVersionKind()).RWTransaction(func(rs *statestore.ResourceState) { + rs.AddError(errors.New(status.FailedReason), "", time.Now()) + }) + + return + } + + if status.IsReady { + for _, state := range taskState.ResourceStates() { + state.RWTransaction(func(rs *statestore.ResourceState) { + rs.SetStatus(statestore.ResourceStatusReady) + }) + } + } +} + +func (t *DynamicReadinessTracker) handleJobStatus(status *job.JobStatus, taskState *statestore.ReadinessTaskState) { + taskState.ResourceState(taskState.Name(), taskState.Namespace(), taskState.GroupVersionKind()).RWTransaction(func(rs *statestore.ResourceState) { + rs.SetAttribute(statestore.AttributeNameRequiredReplicas, 1) + }) + + if status.IsFailed { + taskState.ResourceState(taskState.Name(), taskState.Namespace(), taskState.GroupVersionKind()).RWTransaction(func(rs *statestore.ResourceState) { + rs.AddError(errors.New(status.FailedReason), "", time.Now()) + }) + + return + } + + if status.IsSucceeded { + for _, state := range taskState.ResourceStates() { + state.RWTransaction(func(rs *statestore.ResourceState) { + rs.SetStatus(statestore.ResourceStatusReady) + }) + } + } +} + +func (t *DynamicReadinessTracker) handleCanaryStatus(status *canary.CanaryStatus, taskState *statestore.ReadinessTaskState) { + if status.IsFailed { + taskState.ResourceState(taskState.Name(), taskState.Namespace(), taskState.GroupVersionKind()).RWTransaction(func(rs *statestore.ResourceState) { + rs.AddError(errors.New(status.FailedReason), "", time.Now()) + }) + + return + } + + if status.IsSucceeded { + for _, state := range taskState.ResourceStates() { + state.RWTransaction(func(rs *statestore.ResourceState) { + rs.SetStatus(statestore.ResourceStatusReady) + }) + } + } +} + +func (t *DynamicReadinessTracker) handleGenericResourceStatus(status *generic.ResourceStatus, taskState *statestore.ReadinessTaskState) { + if status.IsFailed() { + taskState.ResourceState(taskState.Name(), taskState.Namespace(), taskState.GroupVersionKind()).RWTransaction(func(rs *statestore.ResourceState) { + rs.AddError(errors.New(status.FailureReason()), "", time.Now()) + }) + + return + } + + if status.IsReady() { + for _, state := range taskState.ResourceStates() { + state.RWTransaction(func(rs *statestore.ResourceState) { + rs.SetStatus(statestore.ResourceStatusReady) + }) + } + + return + } + + if status.IsDeleted() { + for _, state := range taskState.ResourceStates() { + state.RWTransaction(func(rs *statestore.ResourceState) { + rs.SetStatus(statestore.ResourceStatusDeleted) + }) + } + } +} + +func (t *DynamicReadinessTracker) handleReplicaSetPodError(podError *replicaset.ReplicaSetPodError, taskState *statestore.ReadinessTaskState) { + if !podError.ReplicaSet.IsNew { + return + } + + t.handlePodError(&podError.PodError, taskState) +} + +func (t *DynamicReadinessTracker) handlePodError(podError *pod.PodError, taskState *statestore.ReadinessTaskState) { + podState := taskState.ResourceState(podError.PodName, taskState.Namespace(), podGvk) + + podState.RWTransaction(func(rs *statestore.ResourceState) { + rs.AddError(errors.New(podError.Message), podError.ContainerName, time.Now()) + }) +} + +func (t *DynamicReadinessTracker) handleReplicaSetPodLogChunk(logChunk *replicaset.ReplicaSetPodLogChunk, logStore *logstore.LogStore, taskState *statestore.ReadinessTaskState) { + if !logChunk.ReplicaSet.IsNew { + return + } + + t.handlePodLogChunk(logChunk.PodLogChunk, logStore, taskState) +} + +func (t *DynamicReadinessTracker) handlePodLogChunk(logChunk *pod.PodLogChunk, logStore *logstore.LogStore, taskState *statestore.ReadinessTaskState) { + namespace := taskState.Namespace() + + var resourceLogs *util.Concurrent[*logstore.ResourceLogs] + if foundResourceLogs, found := lo.Find(logStore.ResourcesLogs(), func(crl *util.Concurrent[*logstore.ResourceLogs]) bool { + var found bool + crl.RTransaction(func(rl *logstore.ResourceLogs) { + found = rl.Name() == logChunk.PodName && rl.Namespace() == namespace && rl.GroupVersionKind() == podGvk + }) + + return found + }); found { + resourceLogs = foundResourceLogs + } else { + resourceLogs = util.NewConcurrent( + logstore.NewResourceLogs(logChunk.PodName, namespace, podGvk), + ) + logStore.AddResourceLogs(resourceLogs) + } + + for _, line := range logChunk.LogLines { + resourceLogs.RWTransaction(func(rl *logstore.ResourceLogs) { + rl.AddLogLine(line.Message, "container/"+logChunk.ContainerName, lo.Must(time.Parse(time.RFC3339, line.Timestamp))) + }) + } +} + +func (t *DynamicReadinessTracker) handleEventMessage(msg string, taskState *statestore.ReadinessTaskState, timestamp time.Time) { + resourceState := taskState.ResourceState(taskState.Name(), taskState.Namespace(), taskState.GroupVersionKind()) + + resourceState.RWTransaction(func(rs *statestore.ResourceState) { + rs.AddEvent(msg, timestamp) + }) +} + +func (t *DynamicReadinessTracker) handleTaskStateStatus(taskState *statestore.ReadinessTaskState) (abort bool, abortErr error) { + taskStateStatus := taskState.Status() + switch taskStateStatus { + case statestore.ReadinessTaskStatusProgressing: + case statestore.ReadinessTaskStatusReady: + abort = true + return + case statestore.ReadinessTaskStatusFailed: + abort = true + abortErr = fmt.Errorf("waiting for readiness failed") + return + default: + panic("unexpected status") + } + + return abort, abortErr +} + +var podGvk = schema.GroupVersionKind{ + Group: "", + Version: "v1", + Kind: "Pod", +} diff --git a/pkg/trackers/dyntracker/logstore/log_line.go b/pkg/trackers/dyntracker/logstore/log_line.go new file mode 100644 index 0000000..7fa6cc6 --- /dev/null +++ b/pkg/trackers/dyntracker/logstore/log_line.go @@ -0,0 +1,8 @@ +package logstore + +import "time" + +type LogLine struct { + Time time.Time + Line string +} diff --git a/pkg/trackers/dyntracker/logstore/log_store.go b/pkg/trackers/dyntracker/logstore/log_store.go new file mode 100644 index 0000000..50bd50e --- /dev/null +++ b/pkg/trackers/dyntracker/logstore/log_store.go @@ -0,0 +1,21 @@ +package logstore + +import ( + "github.com/werf/kubedog/pkg/trackers/dyntracker/util" +) + +type LogStore struct { + resourcesLogs []*util.Concurrent[*ResourceLogs] +} + +func NewLogStore() *LogStore { + return &LogStore{} +} + +func (s *LogStore) AddResourceLogs(resourceLogs *util.Concurrent[*ResourceLogs]) { + s.resourcesLogs = append(s.resourcesLogs, resourceLogs) +} + +func (s *LogStore) ResourcesLogs() []*util.Concurrent[*ResourceLogs] { + return append([]*util.Concurrent[*ResourceLogs]{}, s.resourcesLogs...) +} diff --git a/pkg/trackers/dyntracker/logstore/resource_logs.go b/pkg/trackers/dyntracker/logstore/resource_logs.go new file mode 100644 index 0000000..87582bb --- /dev/null +++ b/pkg/trackers/dyntracker/logstore/resource_logs.go @@ -0,0 +1,58 @@ +package logstore + +import ( + "time" + + "k8s.io/apimachinery/pkg/runtime/schema" +) + +type ResourceLogs struct { + name string + namespace string + groupVersionKind schema.GroupVersionKind + logs map[string][]*LogLine +} + +func NewResourceLogs(name, namespace string, groupVersionKind schema.GroupVersionKind) *ResourceLogs { + return &ResourceLogs{ + name: name, + namespace: namespace, + groupVersionKind: groupVersionKind, + logs: make(map[string][]*LogLine), + } +} + +func (s *ResourceLogs) Name() string { + return s.name +} + +func (s *ResourceLogs) Namespace() string { + return s.namespace +} + +func (s *ResourceLogs) GroupVersionKind() schema.GroupVersionKind { + return s.groupVersionKind +} + +func (s *ResourceLogs) AddLogLine(line, source string, timestamp time.Time) { + l := &LogLine{ + Time: timestamp, + Line: line, + } + + if _, ok := s.logs[source]; !ok { + s.logs[source] = []*LogLine{} + } + + s.logs[source] = append(s.logs[source], l) +} + +func (s *ResourceLogs) LogLines() map[string][]*LogLine { + result := make(map[string][]*LogLine) + + for source, logs := range s.logs { + result[source] = append([]*LogLine{}, logs...) + } + + return result +} diff --git a/pkg/trackers/dyntracker/statestore/absence_task_state.go b/pkg/trackers/dyntracker/statestore/absence_task_state.go new file mode 100644 index 0000000..60cd8a1 --- /dev/null +++ b/pkg/trackers/dyntracker/statestore/absence_task_state.go @@ -0,0 +1,85 @@ +package statestore + +import ( + "k8s.io/apimachinery/pkg/runtime/schema" + + "github.com/werf/kubedog/pkg/trackers/dyntracker/util" +) + +type AbsenceTaskState struct { + name string + namespace string + groupVersionKind schema.GroupVersionKind + + absentConditions []AbsenceTaskConditionFn + failureConditions []AbsenceTaskConditionFn + + resourceState *util.Concurrent[*ResourceState] +} + +func NewAbsenceTaskState(name, namespace string, groupVersionKind schema.GroupVersionKind, opts AbsenceTaskStateOptions) *AbsenceTaskState { + resourceState := util.NewConcurrent(NewResourceState(name, namespace, groupVersionKind)) + + absentConditions := initAbsenceTaskStateAbsentConditions() + failureConditions := []AbsenceTaskConditionFn{} + + return &AbsenceTaskState{ + name: name, + namespace: namespace, + groupVersionKind: groupVersionKind, + absentConditions: absentConditions, + failureConditions: failureConditions, + resourceState: resourceState, + } +} + +type AbsenceTaskStateOptions struct{} + +func (s *AbsenceTaskState) Name() string { + return s.name +} + +func (s *AbsenceTaskState) Namespace() string { + return s.namespace +} + +func (s *AbsenceTaskState) GroupVersionKind() schema.GroupVersionKind { + return s.groupVersionKind +} + +func (s *AbsenceTaskState) ResourceState() *util.Concurrent[*ResourceState] { + return s.resourceState +} + +func (s *AbsenceTaskState) Status() AbsenceTaskStatus { + for _, failureCondition := range s.failureConditions { + if failureCondition(s) { + return AbsenceTaskStatusFailed + } + } + + for _, absentCondition := range s.absentConditions { + if !absentCondition(s) { + return AbsenceTaskStatusProgressing + } + } + + return AbsenceTaskStatusAbsent +} + +func initAbsenceTaskStateAbsentConditions() []AbsenceTaskConditionFn { + var absentConditions []AbsenceTaskConditionFn + + absentConditions = append(absentConditions, func(taskState *AbsenceTaskState) bool { + var absent bool + taskState.resourceState.RTransaction(func(rs *ResourceState) { + if rs.Status() == ResourceStatusDeleted { + absent = true + } + }) + + return absent + }) + + return absentConditions +} diff --git a/pkg/trackers/dyntracker/statestore/attribute.go b/pkg/trackers/dyntracker/statestore/attribute.go new file mode 100644 index 0000000..799a76b --- /dev/null +++ b/pkg/trackers/dyntracker/statestore/attribute.go @@ -0,0 +1,12 @@ +package statestore + +const ( + AttributeNameRequiredReplicas = "RequiredReplicas" +) + +type Attributer interface{} + +type Attribute[T any] struct { + Value T + Internal bool +} diff --git a/pkg/trackers/dyntracker/statestore/conditions.go b/pkg/trackers/dyntracker/statestore/conditions.go new file mode 100644 index 0000000..0c8c757 --- /dev/null +++ b/pkg/trackers/dyntracker/statestore/conditions.go @@ -0,0 +1,7 @@ +package statestore + +type ReadinessTaskConditionFn func(taskState *ReadinessTaskState) bool + +type PresenceTaskConditionFn func(taskState *PresenceTaskState) bool + +type AbsenceTaskConditionFn func(taskState *AbsenceTaskState) bool diff --git a/pkg/trackers/dyntracker/statestore/error.go b/pkg/trackers/dyntracker/statestore/error.go new file mode 100644 index 0000000..ec35ee0 --- /dev/null +++ b/pkg/trackers/dyntracker/statestore/error.go @@ -0,0 +1,8 @@ +package statestore + +import "time" + +type Error struct { + Time time.Time + Err error +} diff --git a/pkg/trackers/dyntracker/statestore/event.go b/pkg/trackers/dyntracker/statestore/event.go new file mode 100644 index 0000000..42f0edc --- /dev/null +++ b/pkg/trackers/dyntracker/statestore/event.go @@ -0,0 +1,8 @@ +package statestore + +import "time" + +type Event struct { + Time time.Time + Message string +} diff --git a/pkg/trackers/dyntracker/statestore/presence_task_state.go b/pkg/trackers/dyntracker/statestore/presence_task_state.go new file mode 100644 index 0000000..fad952e --- /dev/null +++ b/pkg/trackers/dyntracker/statestore/presence_task_state.go @@ -0,0 +1,86 @@ +package statestore + +import ( + "k8s.io/apimachinery/pkg/runtime/schema" + + "github.com/werf/kubedog/pkg/trackers/dyntracker/util" +) + +type PresenceTaskState struct { + name string + namespace string + groupVersionKind schema.GroupVersionKind + + presentConditions []PresenceTaskConditionFn + failureConditions []PresenceTaskConditionFn + + resourceState *util.Concurrent[*ResourceState] +} + +func NewPresenceTaskState(name, namespace string, groupVersionKind schema.GroupVersionKind, opts PresenceTaskStateOptions) *PresenceTaskState { + resourceState := util.NewConcurrent(NewResourceState(name, namespace, groupVersionKind)) + + presentConditions := initPresenceTaskStatePresentConditions() + failureConditions := []PresenceTaskConditionFn{} + + return &PresenceTaskState{ + name: name, + namespace: namespace, + groupVersionKind: groupVersionKind, + presentConditions: presentConditions, + failureConditions: failureConditions, + resourceState: resourceState, + } +} + +type PresenceTaskStateOptions struct{} + +func (s *PresenceTaskState) Name() string { + return s.name +} + +func (s *PresenceTaskState) Namespace() string { + return s.namespace +} + +func (s *PresenceTaskState) GroupVersionKind() schema.GroupVersionKind { + return s.groupVersionKind +} + +func (s *PresenceTaskState) ResourceState() *util.Concurrent[*ResourceState] { + return s.resourceState +} + +func (s *PresenceTaskState) Status() PresenceTaskStatus { + for _, failureCondition := range s.failureConditions { + if failureCondition(s) { + return PresenceTaskStatusFailed + } + } + + for _, presentCondition := range s.presentConditions { + if !presentCondition(s) { + return PresenceTaskStatusProgressing + } + } + + return PresenceTaskStatusPresent +} + +func initPresenceTaskStatePresentConditions() []PresenceTaskConditionFn { + var presentConditions []PresenceTaskConditionFn + + presentConditions = append(presentConditions, func(taskState *PresenceTaskState) bool { + var present bool + taskState.resourceState.RTransaction(func(rs *ResourceState) { + switch rs.Status() { + case ResourceStatusCreated, ResourceStatusReady: + present = true + } + }) + + return present + }) + + return presentConditions +} diff --git a/pkg/trackers/dyntracker/statestore/readiness_task_state.go b/pkg/trackers/dyntracker/statestore/readiness_task_state.go new file mode 100644 index 0000000..7f36ca5 --- /dev/null +++ b/pkg/trackers/dyntracker/statestore/readiness_task_state.go @@ -0,0 +1,220 @@ +package statestore + +import ( + "errors" + + domigraph "github.com/dominikbraun/graph" + "github.com/samber/lo" + "k8s.io/apimachinery/pkg/runtime/schema" + + "github.com/werf/kubedog/pkg/trackers/dyntracker/util" + "github.com/werf/kubedog/pkg/trackers/rollout/multitrack" +) + +type ReadinessTaskState struct { + name string + namespace string + groupVersionKind schema.GroupVersionKind + + readyConditions []ReadinessTaskConditionFn + failureConditions []ReadinessTaskConditionFn + + resourceStatesTree domigraph.Graph[string, *util.Concurrent[*ResourceState]] +} + +func NewReadinessTaskState(name, namespace string, groupVersionKind schema.GroupVersionKind, opts ReadinessTaskStateOptions) *ReadinessTaskState { + totalAllowFailuresCount := opts.TotalAllowFailuresCount + + var failMode multitrack.FailMode + if opts.FailMode != "" { + failMode = opts.FailMode + } else { + failMode = multitrack.FailWholeDeployProcessImmediately + } + + resourceStatesTree := domigraph.New[string, *util.Concurrent[*ResourceState]](func(s *util.Concurrent[*ResourceState]) string { + var id string + s.RTransaction(func(state *ResourceState) { + id = state.ID() + }) + return id + }, domigraph.PreventCycles(), domigraph.Directed(), domigraph.Rooted()) + + failureConditions := initReadinessTaskStateFailureConditions(failMode, totalAllowFailuresCount) + readyConditions := initReadinessTaskStateReadyConditions() + + rootResourceState := util.NewConcurrent(NewResourceState(name, namespace, groupVersionKind)) + resourceStatesTree.AddVertex(rootResourceState) + + return &ReadinessTaskState{ + name: name, + namespace: namespace, + groupVersionKind: groupVersionKind, + failureConditions: failureConditions, + readyConditions: readyConditions, + resourceStatesTree: resourceStatesTree, + } +} + +type ReadinessTaskStateOptions struct { + FailMode multitrack.FailMode + TotalAllowFailuresCount int +} + +func (s *ReadinessTaskState) Name() string { + return s.name +} + +func (s *ReadinessTaskState) Namespace() string { + return s.namespace +} + +func (s *ReadinessTaskState) GroupVersionKind() schema.GroupVersionKind { + return s.groupVersionKind +} + +func (s *ReadinessTaskState) AddResourceState(name, namespace string, groupVersionKind schema.GroupVersionKind) { + if _, err := s.resourceStatesTree.Vertex(util.ResourceID(name, namespace, groupVersionKind)); err != nil { + if !errors.Is(err, domigraph.ErrVertexNotFound) { + panic(err) + } + } else { + return + } + + resourceState := util.NewConcurrent(NewResourceState(name, namespace, groupVersionKind)) + lo.Must0(s.resourceStatesTree.AddVertex(resourceState)) +} + +func (s *ReadinessTaskState) AddDependency(fromName, fromNamespace string, fromGroupVersionKind schema.GroupVersionKind, toName, toNamespace string, toGroupVersionKind schema.GroupVersionKind) { + if err := s.resourceStatesTree.AddEdge(util.ResourceID(fromName, fromNamespace, fromGroupVersionKind), util.ResourceID(toName, toNamespace, toGroupVersionKind)); err != nil { + if !errors.Is(err, domigraph.ErrEdgeAlreadyExists) { + panic(err) + } + } +} + +func (s *ReadinessTaskState) ResourceState(name, namespace string, groupVersionKind schema.GroupVersionKind) *util.Concurrent[*ResourceState] { + return lo.Must1(s.resourceStatesTree.Vertex(util.ResourceID(name, namespace, groupVersionKind))) +} + +func (s *ReadinessTaskState) ResourceStates() []*util.Concurrent[*ResourceState] { + var states []*util.Concurrent[*ResourceState] + + lo.Must0(domigraph.BFS(s.resourceStatesTree, util.ResourceID(s.name, s.namespace, s.groupVersionKind), func(id string) bool { + state := lo.Must(s.resourceStatesTree.Vertex(id)) + states = append(states, state) + return false + })) + + return states +} + +func (s *ReadinessTaskState) TraverseResourceStates(fromName, fromNamespace string, fromGroupVersionKind schema.GroupVersionKind) []*util.Concurrent[*ResourceState] { + var states []*util.Concurrent[*ResourceState] + + lo.Must0(domigraph.BFS(s.resourceStatesTree, util.ResourceID(fromName, fromNamespace, fromGroupVersionKind), func(id string) bool { + state := lo.Must(s.resourceStatesTree.Vertex(id)) + states = append(states, state) + return false + })) + + return states +} + +func (s *ReadinessTaskState) Status() ReadinessTaskStatus { + for _, failureCondition := range s.failureConditions { + if failureCondition(s) { + return ReadinessTaskStatusFailed + } + } + + for _, readyCondition := range s.readyConditions { + if !readyCondition(s) { + return ReadinessTaskStatusProgressing + } + } + + return ReadinessTaskStatusReady +} + +func initReadinessTaskStateReadyConditions() []ReadinessTaskConditionFn { + var readyConditions []ReadinessTaskConditionFn + + readyConditions = append(readyConditions, func(taskState *ReadinessTaskState) bool { + resourcesReadyRequired := 1 + taskState.ResourceState(taskState.name, taskState.namespace, taskState.groupVersionKind).RTransaction(func(s *ResourceState) { + attrs := s.Attributes() + if attr, found := attrs[AttributeNameRequiredReplicas]; found { + resourcesReadyRequired += attr.(Attribute[int]).Value + } + }) + + var resourcesReady int + lo.Must0(domigraph.BFS(taskState.resourceStatesTree, util.ResourceID(taskState.name, taskState.namespace, taskState.groupVersionKind), func(id string) bool { + state := lo.Must(taskState.resourceStatesTree.Vertex(id)) + + state.RTransaction(func(s *ResourceState) { + if s.Status() == ResourceStatusReady { + resourcesReady++ + } + }) + + return false + })) + + return resourcesReady >= resourcesReadyRequired + }) + + return readyConditions +} + +func initReadinessTaskStateFailureConditions(failMode multitrack.FailMode, totalAllowFailuresCount int) []ReadinessTaskConditionFn { + var failureConditions []ReadinessTaskConditionFn + + failureConditions = append(failureConditions, func(taskState *ReadinessTaskState) bool { + var failed bool + lo.Must0(domigraph.BFS(taskState.resourceStatesTree, util.ResourceID(taskState.name, taskState.namespace, taskState.groupVersionKind), func(id string) bool { + state := lo.Must(taskState.resourceStatesTree.Vertex(id)) + state.RTransaction(func(s *ResourceState) { + if s.Status() == ResourceStatusFailed { + failed = true + } + }) + + if failed { + return true + } + + return false + })) + + return failed + }) + + if failMode != multitrack.IgnoreAndContinueDeployProcess { + maxErrors := lo.Max([]int{totalAllowFailuresCount + 1}) + + failureConditions = append(failureConditions, func(taskState *ReadinessTaskState) bool { + var totalErrsCount int + lo.Must0(domigraph.BFS(taskState.resourceStatesTree, util.ResourceID(taskState.name, taskState.namespace, taskState.groupVersionKind), func(id string) bool { + state := lo.Must(taskState.resourceStatesTree.Vertex(id)) + + var errors map[string][]*Error + state.RTransaction(func(s *ResourceState) { + errors = s.Errors() + }) + + for _, errs := range errors { + totalErrsCount += len(errs) + } + + return false + })) + + return totalErrsCount > maxErrors + }) + } + + return failureConditions +} diff --git a/pkg/trackers/dyntracker/statestore/resource_state.go b/pkg/trackers/dyntracker/statestore/resource_state.go new file mode 100644 index 0000000..40ecdf6 --- /dev/null +++ b/pkg/trackers/dyntracker/statestore/resource_state.go @@ -0,0 +1,105 @@ +package statestore + +import ( + "time" + + "k8s.io/apimachinery/pkg/runtime/schema" + + "github.com/werf/kubedog/pkg/trackers/dyntracker/util" +) + +type ResourceState struct { + name string + namespace string + groupVersionKind schema.GroupVersionKind + + status ResourceStatus + attributes map[string]Attributer + events []*Event + errors map[string][]*Error +} + +func NewResourceState(name, namespace string, groupVersionKind schema.GroupVersionKind) *ResourceState { + return &ResourceState{ + name: name, + namespace: namespace, + groupVersionKind: groupVersionKind, + status: ResourceStatusCreated, + attributes: make(map[string]Attributer), + errors: make(map[string][]*Error), + } +} + +func (s *ResourceState) Name() string { + return s.name +} + +func (s *ResourceState) Namespace() string { + return s.namespace +} + +func (s *ResourceState) GroupVersionKind() schema.GroupVersionKind { + return s.groupVersionKind +} + +func (s *ResourceState) SetStatus(status ResourceStatus) { + s.status = status +} + +func (s *ResourceState) Status() ResourceStatus { + return s.status +} + +func (s *ResourceState) AddError(err error, source string, timestamp time.Time) { + e := &Error{ + Time: timestamp, + Err: err, + } + + if _, ok := s.errors[source]; !ok { + s.errors[source] = []*Error{} + } + + s.errors[source] = append(s.errors[source], e) +} + +func (s *ResourceState) Errors() map[string][]*Error { + result := make(map[string][]*Error) + + for source, errors := range s.errors { + result[source] = append([]*Error{}, errors...) + } + + return result +} + +func (s *ResourceState) AddEvent(message string, timestamp time.Time) { + e := &Event{ + Time: timestamp, + Message: message, + } + + s.events = append(s.events, e) +} + +func (s *ResourceState) Events() []*Event { + return append([]*Event{}, s.events...) +} + +func (s *ResourceState) SetAttribute(name string, attr Attributer) { + s.attributes[name] = attr +} + +func (s *ResourceState) Attributes() map[string]Attributer { + result := make(map[string]Attributer) + + for name, attr := range s.attributes { + result[name] = attr + } + + return result +} + +func (s *ResourceState) ID() string { + return util.ResourceID(s.name, s.namespace, s.groupVersionKind) +} diff --git a/pkg/trackers/dyntracker/statestore/resource_status.go b/pkg/trackers/dyntracker/statestore/resource_status.go new file mode 100644 index 0000000..178b6ec --- /dev/null +++ b/pkg/trackers/dyntracker/statestore/resource_status.go @@ -0,0 +1,10 @@ +package statestore + +type ResourceStatus string + +const ( + ResourceStatusReady ResourceStatus = "ready" + ResourceStatusCreated ResourceStatus = "created" + ResourceStatusDeleted ResourceStatus = "deleted" + ResourceStatusFailed ResourceStatus = "failed" +) diff --git a/pkg/trackers/dyntracker/statestore/task_status.go b/pkg/trackers/dyntracker/statestore/task_status.go new file mode 100644 index 0000000..0494e1d --- /dev/null +++ b/pkg/trackers/dyntracker/statestore/task_status.go @@ -0,0 +1,25 @@ +package statestore + +type ReadinessTaskStatus string + +const ( + ReadinessTaskStatusProgressing ReadinessTaskStatus = "progressing" + ReadinessTaskStatusReady ReadinessTaskStatus = "ready" + ReadinessTaskStatusFailed ReadinessTaskStatus = "failed" +) + +type PresenceTaskStatus string + +const ( + PresenceTaskStatusProgressing PresenceTaskStatus = "progressing" + PresenceTaskStatusPresent PresenceTaskStatus = "present" + PresenceTaskStatusFailed PresenceTaskStatus = "failed" +) + +type AbsenceTaskStatus string + +const ( + AbsenceTaskStatusProgressing AbsenceTaskStatus = "progressing" + AbsenceTaskStatusAbsent AbsenceTaskStatus = "absent" + AbsenceTaskStatusFailed AbsenceTaskStatus = "failed" +) diff --git a/pkg/trackers/dyntracker/statestore/task_store.go b/pkg/trackers/dyntracker/statestore/task_store.go new file mode 100644 index 0000000..1753856 --- /dev/null +++ b/pkg/trackers/dyntracker/statestore/task_store.go @@ -0,0 +1,39 @@ +package statestore + +import ( + "github.com/werf/kubedog/pkg/trackers/dyntracker/util" +) + +type TaskStore struct { + readinessTasks []*util.Concurrent[*ReadinessTaskState] + presenceTasks []*util.Concurrent[*PresenceTaskState] + absenceTasks []*util.Concurrent[*AbsenceTaskState] +} + +func NewTaskStore() *TaskStore { + return &TaskStore{} +} + +func (s *TaskStore) AddReadinessTaskState(task *util.Concurrent[*ReadinessTaskState]) { + s.readinessTasks = append(s.readinessTasks, task) +} + +func (s *TaskStore) AddPresenceTaskState(task *util.Concurrent[*PresenceTaskState]) { + s.presenceTasks = append(s.presenceTasks, task) +} + +func (s *TaskStore) AddAbsenceTaskState(task *util.Concurrent[*AbsenceTaskState]) { + s.absenceTasks = append(s.absenceTasks, task) +} + +func (s *TaskStore) ReadinessTasksStates() []*util.Concurrent[*ReadinessTaskState] { + return append([]*util.Concurrent[*ReadinessTaskState]{}, s.readinessTasks...) +} + +func (s *TaskStore) PresenceTasksStates() []*util.Concurrent[*PresenceTaskState] { + return append([]*util.Concurrent[*PresenceTaskState]{}, s.presenceTasks...) +} + +func (s *TaskStore) AbsenceTasksStates() []*util.Concurrent[*AbsenceTaskState] { + return append([]*util.Concurrent[*AbsenceTaskState]{}, s.absenceTasks...) +} diff --git a/pkg/trackers/dyntracker/util/concurrency.go b/pkg/trackers/dyntracker/util/concurrency.go new file mode 100644 index 0000000..c34e4b2 --- /dev/null +++ b/pkg/trackers/dyntracker/util/concurrency.go @@ -0,0 +1,36 @@ +package util + +import "sync" + +type Concurrent[T any] struct { + lock *sync.RWMutex + object T +} + +func NewConcurrent[T any](obj T) *Concurrent[T] { + return &Concurrent[T]{ + lock: &sync.RWMutex{}, + object: obj, + } +} + +func NewConcurrentWithLock[T any](obj T, lock *sync.RWMutex) *Concurrent[T] { + return &Concurrent[T]{ + lock: lock, + object: obj, + } +} + +func (c *Concurrent[T]) RWTransaction(f func(object T)) { + c.lock.Lock() + defer c.lock.Unlock() + + f(c.object) +} + +func (c *Concurrent[T]) RTransaction(f func(object T)) { + c.lock.RLock() + defer c.lock.RUnlock() + + f(c.object) +} diff --git a/pkg/trackers/dyntracker/util/resource.go b/pkg/trackers/dyntracker/util/resource.go new file mode 100644 index 0000000..c44df3c --- /dev/null +++ b/pkg/trackers/dyntracker/util/resource.go @@ -0,0 +1,43 @@ +package util + +import ( + "fmt" + + "k8s.io/apimachinery/pkg/api/meta" + "k8s.io/apimachinery/pkg/runtime/schema" +) + +func ResourceID(name, namespace string, groupVersionKind schema.GroupVersionKind) string { + return fmt.Sprintf("%s:%s:%s:%s", namespace, groupVersionKind.Group, groupVersionKind.Kind, name) +} + +func IsNamespaced(groupVersionKind schema.GroupVersionKind, mapper meta.ResettableRESTMapper) (namespaced bool, err error) { + mapping, err := mapper.RESTMapping(groupVersionKind.GroupKind(), groupVersionKind.Version) + if err != nil { + return false, fmt.Errorf("get resource mapping for %q: %w", groupVersionKind.String(), err) + } + + return mapping.Scope == meta.RESTScopeNamespace, nil +} + +func GVRFromGVK(groupVersionKind schema.GroupVersionKind, mapper meta.ResettableRESTMapper) (schema.GroupVersionResource, error) { + mapping, err := mapper.RESTMapping(groupVersionKind.GroupKind(), groupVersionKind.Version) + if err != nil { + return schema.GroupVersionResource{}, fmt.Errorf("get resource mapping for %q: %w", groupVersionKind.String(), err) + } + + return mapping.Resource, nil +} + +func ResourceHumanID(name, namespace string, groupVersionKind schema.GroupVersionKind, mapper meta.ResettableRESTMapper) (string, error) { + namespaced, err := IsNamespaced(groupVersionKind, mapper) + if err != nil { + return "", fmt.Errorf("check if namespaced: %w", err) + } + + if namespaced && namespace != "" { + return fmt.Sprintf("%s/%s/%s", namespace, groupVersionKind.Kind, name), nil + } else { + return fmt.Sprintf("%s/%s", groupVersionKind.Kind, name), nil + } +}