Skip to content

Commit

Permalink
PWX-37601: Using informer cache's event handling instead of native k8…
Browse files Browse the repository at this point in the history
…s watchers for pod monitoring and extender metrics. (#1795)

* PWX-37601: Using informer cache's event handling instead of native k8s watchers for pod monitoring and extender metrics.

* Generated the mock files.
  • Loading branch information
diptiranjanpx committed Jun 13, 2024
1 parent 5d6641f commit 0607b74
Show file tree
Hide file tree
Showing 4 changed files with 85 additions and 6 deletions.
27 changes: 27 additions & 0 deletions pkg/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,11 @@ import (
"sync"

storkv1alpha1 "github.com/libopenstorage/stork/pkg/apis/stork/v1alpha1"
"github.com/sirupsen/logrus"
corev1 "k8s.io/api/core/v1"
storagev1 "k8s.io/api/storage/v1"
"k8s.io/client-go/rest"
clientCache "k8s.io/client-go/tools/cache"
controllercache "sigs.k8s.io/controller-runtime/pkg/cache"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/manager"
Expand Down Expand Up @@ -36,6 +38,9 @@ type SharedInformerCache interface {

// ListTransformedPods lists the all the Pods from the cache after applying TransformFunc
ListTransformedPods() (*corev1.PodList, error)

// WatchPods registers the pod event handlers with the informer cache
WatchPods(fn func(object interface{})) error
}

type cache struct {
Expand Down Expand Up @@ -84,6 +89,8 @@ func CreateSharedInformerCache(mgr manager.Manager) error {
}
}

currPod.ObjectMeta.DeletionTimestamp = podResource.ObjectMeta.DeletionTimestamp

currPod.Spec.Containers = podResource.Spec.Containers
currPod.Spec.NodeName = podResource.Spec.NodeName

Expand Down Expand Up @@ -204,3 +211,23 @@ func (c *cache) ListTransformedPods() (*corev1.PodList, error) {
}
return podList, nil
}

// WatchPods uses handlers for different pod events with shared informers.
func (c *cache) WatchPods(fn func(object interface{})) error {
informer, err := c.controllerCache.GetInformer(context.Background(), &corev1.Pod{})
if err != nil {
logrus.WithError(err).Error("error getting the informer for pods")
return err
}

informer.AddEventHandler(clientCache.ResourceEventHandlerFuncs{
AddFunc: fn,
UpdateFunc: func(oldObj, newObj interface{}) {
// Only considering the new pod object
fn(newObj)
},
DeleteFunc: fn,
})

return nil
}
26 changes: 23 additions & 3 deletions pkg/extender/extender.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"time"

"github.com/libopenstorage/stork/drivers/volume"
storkcache "github.com/libopenstorage/stork/pkg/cache"
storklog "github.com/libopenstorage/stork/pkg/log"
restore "github.com/libopenstorage/stork/pkg/snapshot/controllers"
"github.com/portworx/sched-ops/k8s/core"
Expand Down Expand Up @@ -483,9 +484,28 @@ func (e *Extender) collectExtenderMetrics() error {
return nil
}

if err := core.Instance().WatchPods("", fn, metav1.ListOptions{}); err != nil {
log.Errorf("failed to watch pods due to: %v", err)
return err
podHandler := func(object interface{}) {
pod, ok := object.(*v1.Pod)
if !ok {
log.Errorf("invalid object type on pod watch from cache: %v", object)
} else {
fn(pod)
}
}

if storkcache.Instance() != nil {
log.Debugf("Shared informer cache has been initialized, using it for extender metrics.")
err := storkcache.Instance().WatchPods(podHandler)
if err != nil {
log.Errorf("failed to watch pods with informer cache for health monitoring, err: %v", err)
}
log.Errorf("failed to watch pods with informer cache for metrics, err: %v", err)
} else {
log.Warnf("Shared informer cache has not been initialized, using watch for extender metrics.")
if err := core.Instance().WatchPods("", fn, metav1.ListOptions{}); err != nil {
log.Errorf("failed to watch pods for metrics due to: %v", err)
return err
}
}
return nil
}
Expand Down
14 changes: 14 additions & 0 deletions pkg/mock/cache/cache.mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

24 changes: 21 additions & 3 deletions pkg/monitor/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,9 +183,27 @@ func (m *Monitor) podMonitor() error {
return nil
}

if err := core.Instance().WatchPods("", fn, metav1.ListOptions{}); err != nil {
log.Errorf("failed to watch pods due to: %v", err)
return err
podHandler := func(object interface{}) {
pod, ok := object.(*v1.Pod)
if !ok {
log.Errorf("invalid object type on pod watch from cache: %v", object)
} else {
fn(pod)
}
}

if storkcache.Instance() != nil {
log.Debugf("Shared informer cache has been initialized, using it for pod monitor.")
err := storkcache.Instance().WatchPods(podHandler)
if err != nil {
log.Errorf("failed to watch pods with informer cache for health monitoring, err: %v", err)
}
} else {
log.Warnf("Shared informer cache has not been initialized, using watch for pod monitor.")
if err := core.Instance().WatchPods("", fn, metav1.ListOptions{}); err != nil {
log.Errorf("failed to watch pods for health monitoring due to: %v", err)
return err
}
}

return nil
Expand Down

0 comments on commit 0607b74

Please sign in to comment.