Skip to content

Commit

Permalink
feat: new high-level concurrent dynamic tracker
Browse files Browse the repository at this point in the history
Signed-off-by: Ilya Lesikov <ilya@lesikov.com>
  • Loading branch information
ilya-lesikov committed Dec 15, 2023
1 parent 6f6030f commit 5721c3e
Show file tree
Hide file tree
Showing 22 changed files with 2,135 additions and 0 deletions.
3 changes: 3 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@ go 1.20

require (
github.com/acarl005/stripansi v0.0.0-20180116102854-5a71ef0e047d
github.com/dominikbraun/graph v0.23.0
github.com/fluxcd/flagger v1.29.0
github.com/gookit/color v1.5.2
github.com/samber/lo v1.38.1
github.com/spf13/cobra v1.6.1
github.com/werf/logboek v0.5.5
golang.org/x/crypto v0.7.0
Expand Down Expand Up @@ -53,6 +55,7 @@ require (
github.com/xlab/treeprint v1.1.0 // indirect
github.com/xo/terminfo v0.0.0-20210125001918-ca9a967f8778 // indirect
go.starlark.net v0.0.0-20200306205701-8dd3e2ee1dd5 // indirect
golang.org/x/exp v0.0.0-20221126150942-6ab00d035af9 // indirect
golang.org/x/net v0.8.0 // indirect
golang.org/x/oauth2 v0.5.0 // indirect
golang.org/x/sys v0.6.0 // indirect
Expand Down
6 changes: 6 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSs
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/docopt/docopt-go v0.0.0-20180111231733-ee0de3bc6815/go.mod h1:WwZ+bS3ebgob9U8Nd0kOddGdZWjyMGR8Wziv+TBNwSE=
github.com/dominikbraun/graph v0.23.0 h1:TdZB4pPqCLFxYhdyMFb1TBdFxp8XLcJfTTBQucVPgCo=
github.com/dominikbraun/graph v0.23.0/go.mod h1:yOjYyogZLY1LSG9E33JWZJiq5k83Qy2C6POAuiViluc=
github.com/emicklei/go-restful/v3 v3.10.1 h1:rc42Y5YTp7Am7CS630D7JmhRjq4UlEUuEKfrDac4bSQ=
github.com/emicklei/go-restful/v3 v3.10.1/go.mod h1:6n3XBCmQQb25CM2LCACGz8ukIrRry+4bhvbpWn3mrbc=
github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
Expand Down Expand Up @@ -134,6 +136,8 @@ github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZN
github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ=
github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
github.com/samber/lo v1.38.1 h1:j2XEAqXKb09Am4ebOg31SpvzUTTs6EN3VfgeLUhPdXM=
github.com/samber/lo v1.38.1/go.mod h1:+m/ZKRl6ClXCE2Lgf3MsQlWfh4bn1bz6CXEOxnEXnEA=
github.com/sergi/go-diff v1.1.0 h1:we8PVUC3FE2uYfodKH/nBHMSetSfHDR6scGdBi+erh0=
github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA=
github.com/spf13/cobra v1.6.1 h1:o94oiPyS4KD1mPy2fmcYYHHfCxLqYjJOhGsCHFZtEzA=
Expand Down Expand Up @@ -172,6 +176,8 @@ golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPh
golang.org/x/crypto v0.7.0 h1:AvwMYaRytfdeVt3u6mLaxYtErKYjxA2OXjJ1HHq6t3A=
golang.org/x/crypto v0.7.0/go.mod h1:pYwdfH91IfpZVANVyUOhSIPZaFoJGxTFbZhFTx+dXZU=
golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/exp v0.0.0-20221126150942-6ab00d035af9 h1:yZNXmy+j/JpX19vZkVktWqAo7Gny4PBWYYK3zskGpx4=
golang.org/x/exp v0.0.0-20221126150942-6ab00d035af9/go.mod h1:CxIveKay+FTh1D0yPZemJVgC/95VzuuOLq5Qi4xnoYc=
golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE=
golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU=
golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc=
Expand Down
3 changes: 3 additions & 0 deletions pkg/tracker/pod/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ import (
type PodStatus struct {
corev1.PodStatus

Name string

StatusGeneration uint64

StatusIndicator *indicators.StringEqualConditionIndicator
Expand All @@ -35,6 +37,7 @@ func NewPodStatus(pod *corev1.Pod, statusGeneration uint64, trackedContainers []
Age: utils.TranslateTimestampSince(pod.CreationTimestamp),
StatusIndicator: &indicators.StringEqualConditionIndicator{},
StatusGeneration: statusGeneration,
Name: pod.Name,
}

for _, cond := range pod.Status.Conditions {
Expand Down
110 changes: 110 additions & 0 deletions pkg/trackers/dyntracker/dynamic_absence_tracker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
package dyntracker

import (
"context"
"fmt"
"io"
"time"

apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/dynamic"

"github.com/werf/kubedog/pkg/trackers/dyntracker/statestore"
"github.com/werf/kubedog/pkg/trackers/dyntracker/util"
)

type DynamicAbsenceTracker struct {
taskState *util.Concurrent[*statestore.AbsenceTaskState]
dynamicClient dynamic.Interface
mapper meta.ResettableRESTMapper

timeout time.Duration
pollPeriod time.Duration
}

func NewDynamicAbsenceTracker(
taskState *util.Concurrent[*statestore.AbsenceTaskState],
dynamicClient dynamic.Interface,
mapper meta.ResettableRESTMapper,
opts DynamicAbsenceTrackerOptions,
) *DynamicAbsenceTracker {
timeout := opts.Timeout
var pollPeriod time.Duration
if opts.PollPeriod != 0 {
pollPeriod = opts.PollPeriod
} else {
pollPeriod = 1 * time.Second
}

return &DynamicAbsenceTracker{
taskState: taskState,
dynamicClient: dynamicClient,
mapper: mapper,
timeout: timeout,
pollPeriod: pollPeriod,
}
}

type DynamicAbsenceTrackerOptions struct {
Timeout time.Duration
PollPeriod time.Duration
}

func (t *DynamicAbsenceTracker) Track(ctx context.Context) error {
var (
name string
namespace string
groupVersionKind schema.GroupVersionKind
)
t.taskState.RTransaction(func(ts *statestore.AbsenceTaskState) {
name = ts.Name()
namespace = ts.Namespace()
groupVersionKind = ts.GroupVersionKind()
})

namespaced, err := util.IsNamespaced(groupVersionKind, t.mapper)
if err != nil {
return fmt.Errorf("check if namespaced: %w", err)
}

gvr, err := util.GVRFromGVK(groupVersionKind, t.mapper)
if err != nil {
return fmt.Errorf("get GroupVersionResource: %w", err)
}

var resourceClient dynamic.ResourceInterface
if namespaced {
resourceClient = t.dynamicClient.Resource(gvr).Namespace(namespace)
} else {
resourceClient = t.dynamicClient.Resource(gvr)
}

resourceHumanID, err := util.ResourceHumanID(name, namespace, groupVersionKind, t.mapper)
if err != nil {
return fmt.Errorf("get resource human ID: %w", err)
}

if err := wait.PollImmediate(t.pollPeriod, t.timeout, func() (bool, error) {
if _, err := resourceClient.Get(ctx, name, metav1.GetOptions{}); err != nil {
if apierrors.IsResourceExpired(err) || apierrors.IsGone(err) || err == io.EOF || err == io.ErrUnexpectedEOF {
return false, nil
}

if apierrors.IsNotFound(err) {
return true, nil
}

return false, fmt.Errorf("get resource %q: %w", resourceHumanID, err)
}

return false, nil
}); err != nil {
return fmt.Errorf("poll resource %q: %w", resourceHumanID, err)
}

return nil
}
112 changes: 112 additions & 0 deletions pkg/trackers/dyntracker/dynamic_presence_tracker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
package dyntracker

import (
"context"
"fmt"
"io"
"time"

apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/dynamic"

"github.com/werf/kubedog/pkg/trackers/dyntracker/statestore"
"github.com/werf/kubedog/pkg/trackers/dyntracker/util"
)

type DynamicPresenceTracker struct {
taskState *util.Concurrent[*statestore.PresenceTaskState]
dynamicClient dynamic.Interface
mapper meta.ResettableRESTMapper

timeout time.Duration
pollPeriod time.Duration
}

func NewDynamicPresenceTracker(
taskState *util.Concurrent[*statestore.PresenceTaskState],
dynamicClient dynamic.Interface,
mapper meta.ResettableRESTMapper,
opts DynamicPresenceTrackerOptions,
) *DynamicPresenceTracker {
var timeout time.Duration
if opts.Timeout != 0 {
timeout = opts.Timeout
} else {
timeout = 5 * time.Minute
}

var pollPeriod time.Duration
if opts.PollPeriod != 0 {
pollPeriod = opts.PollPeriod
} else {
pollPeriod = 1 * time.Second
}

return &DynamicPresenceTracker{
taskState: taskState,
dynamicClient: dynamicClient,
mapper: mapper,
timeout: timeout,
pollPeriod: pollPeriod,
}
}

type DynamicPresenceTrackerOptions struct {
Timeout time.Duration
PollPeriod time.Duration
}

func (t *DynamicPresenceTracker) Track(ctx context.Context) error {
var (
name string
namespace string
groupVersionKind schema.GroupVersionKind
)
t.taskState.RTransaction(func(ts *statestore.PresenceTaskState) {
name = ts.Name()
namespace = ts.Namespace()
groupVersionKind = ts.GroupVersionKind()
})

namespaced, err := util.IsNamespaced(groupVersionKind, t.mapper)
if err != nil {
return fmt.Errorf("check if namespaced: %w", err)
}

gvr, err := util.GVRFromGVK(groupVersionKind, t.mapper)
if err != nil {
return fmt.Errorf("get GroupVersionResource: %w", err)
}

var resourceClient dynamic.ResourceInterface
if namespaced {
resourceClient = t.dynamicClient.Resource(gvr).Namespace(namespace)
} else {
resourceClient = t.dynamicClient.Resource(gvr)
}

resourceHumanID, err := util.ResourceHumanID(name, namespace, groupVersionKind, t.mapper)
if err != nil {
return fmt.Errorf("get resource human ID: %w", err)
}

if err := wait.PollImmediate(t.pollPeriod, t.timeout, func() (bool, error) {
if _, err := resourceClient.Get(ctx, name, metav1.GetOptions{}); err != nil {
if apierrors.IsResourceExpired(err) || apierrors.IsGone(err) || err == io.EOF || err == io.ErrUnexpectedEOF || apierrors.IsNotFound(err) {
return false, nil
}

return false, fmt.Errorf("get resource %q: %w", resourceHumanID, err)
}

return true, nil
}); err != nil {
return fmt.Errorf("poll resource %q: %w", resourceHumanID, err)
}

return nil
}
Loading

0 comments on commit 5721c3e

Please sign in to comment.