Skip to content

Commit

Permalink
use object caching through informer indexers
Browse files Browse the repository at this point in the history
  • Loading branch information
Jiahui-Zhang-20 committed Dec 18, 2023
1 parent d6d4fca commit 34fb57b
Show file tree
Hide file tree
Showing 10 changed files with 235 additions and 110 deletions.
55 changes: 1 addition & 54 deletions crons.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,10 @@ package main
import (
"context"
"errors"
"time"

"github.com/getsentry/sentry-go"
"github.com/rs/zerolog"
batchv1 "k8s.io/api/batch/v1"
"k8s.io/client-go/informers"
"k8s.io/client-go/tools/cache"
)

type EventHandlerType string
Expand All @@ -20,56 +17,6 @@ const (
EventHandlerDelete EventHandlerType = "DELETE"
)

var cronjobInformer cache.SharedIndexInformer
var jobInformer cache.SharedIndexInformer

// Starts the crons informer which has event handlers
// adds to the crons monitor data struct used for sending
// checkin events to Sentry
func startCronsInformers(ctx context.Context, namespace string) error {

clientset, err := getClientsetFromContext(ctx)
if err != nil {
return errors.New("failed to get clientset")
}

// Create factory that will produce both the cronjob informer and job informer
factory := informers.NewSharedInformerFactoryWithOptions(
clientset,
5*time.Second,
informers.WithNamespace(namespace),
)

// Create the cronjob informer
cronjobInformer, err = createCronjobInformer(ctx, factory, namespace)
if err != nil {
return err
}
// Create the job informer
jobInformer, err = createJobInformer(ctx, factory, namespace)
if err != nil {
return err
}

// Channel to tell the factory to stop the informers
doneChan := make(chan struct{})
factory.Start(doneChan)

// Sync the cronjob informer cache
if ok := cache.WaitForCacheSync(doneChan, cronjobInformer.HasSynced); !ok {
return errors.New("cronjob informer failed to sync")
}
// Sync the job informer cache
if ok := cache.WaitForCacheSync(doneChan, jobInformer.HasSynced); !ok {
return errors.New("job informer failed to sync")
}

// Wait for the channel to be closed
<-doneChan

return nil
}

// Captures sentry crons checkin event if appropriate
// by checking the job status to determine if the job just created pod (job starting)
// or if the job exited
Expand All @@ -89,7 +36,7 @@ func runSentryCronsCheckin(ctx context.Context, job *batchv1.Job, eventHandlerTy
return errors.New("job does not have cronjob reference")
}
cronjobRef := job.OwnerReferences[0]
if !*cronjobRef.Controller || cronjobRef.Kind != "CronJob" {
if !*cronjobRef.Controller || cronjobRef.Kind != CRONJOB {
return errors.New("job does not have cronjob reference")
}
cronsMonitorData, ok := cronsMetaData.getCronsMonitorData(cronjobRef.Name)
Expand Down
46 changes: 23 additions & 23 deletions enhancers.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,16 +120,16 @@ func callObjectEnhancer(ctx context.Context, scope *sentry.Scope, kindObjectPair

var err error = nil
switch kindObjectPair.kind {
case "Pod":
case POD:
err = podEnhancer(ctx, scope, kindObjectPair.object, sentryEvent)
case "ReplicaSet":
err = replicaSetPodEnhancer(ctx, scope, kindObjectPair.object, sentryEvent)
case "Deployment":
err = deploymentPodEnhancer(ctx, scope, kindObjectPair.object, sentryEvent)
case "Job":
err = jobPodEnhancer(ctx, scope, kindObjectPair.object, sentryEvent)
case "CronJob":
err = cronjobPodEnhancer(ctx, scope, kindObjectPair.object, sentryEvent)
case REPLICASET:
err = replicaSetEnhancer(ctx, scope, kindObjectPair.object, sentryEvent)
case DEPLOYMENT:
err = deploymentEnhancer(ctx, scope, kindObjectPair.object, sentryEvent)
case JOB:
err = jobEnhancer(ctx, scope, kindObjectPair.object, sentryEvent)
case CRONJOB:
err = cronjobEnhancer(ctx, scope, kindObjectPair.object, sentryEvent)
default:
sentryEvent.Fingerprint = append(sentryEvent.Fingerprint, kindObjectPair.object.GetName())
}
Expand Down Expand Up @@ -177,14 +177,14 @@ func podEnhancer(ctx context.Context, scope *sentry.Scope, object metav1.Object,
setTagIfNotEmpty(scope, "node_name", nodeName)

// Add the cronjob to the fingerprint
sentryEvent.Fingerprint = append(sentryEvent.Fingerprint, "pod", podObj.Name)
sentryEvent.Fingerprint = append(sentryEvent.Fingerprint, POD, podObj.Name)

// Add the cronjob to the tag
setTagIfNotEmpty(scope, "pod_name", object.GetName())
podObj.ManagedFields = []metav1.ManagedFieldsEntry{}
metadataJson, err := prettyJson(podObj.ObjectMeta)
if err == nil {
scope.SetContext("pod", sentry.Context{
scope.SetContext(POD, sentry.Context{
"Metadata": metadataJson,
})
}
Expand All @@ -201,22 +201,22 @@ func podEnhancer(ctx context.Context, scope *sentry.Scope, object metav1.Object,
return nil
}

func jobPodEnhancer(ctx context.Context, scope *sentry.Scope, object metav1.Object, sentryEvent *sentry.Event) error {
func jobEnhancer(ctx context.Context, scope *sentry.Scope, object metav1.Object, sentryEvent *sentry.Event) error {

jobObj, ok := object.(*batchv1.Job)
if !ok {
return errors.New("failed to cast object to Job object")
}

// Add the cronjob to the fingerprint
sentryEvent.Fingerprint = append(sentryEvent.Fingerprint, "job", jobObj.Name)
sentryEvent.Fingerprint = append(sentryEvent.Fingerprint, JOB, jobObj.Name)

// Add the cronjob to the tag
setTagIfNotEmpty(scope, "job_name", object.GetName())
jobObj.ManagedFields = []metav1.ManagedFieldsEntry{}
metadataJson, err := prettyJson(jobObj.ObjectMeta)
if err == nil {
scope.SetContext("job", sentry.Context{
scope.SetContext(JOB, sentry.Context{
"Metadata": metadataJson,
})
}
Expand All @@ -231,7 +231,7 @@ func jobPodEnhancer(ctx context.Context, scope *sentry.Scope, object metav1.Obje
return nil
}

func cronjobPodEnhancer(ctx context.Context, scope *sentry.Scope, object metav1.Object, sentryEvent *sentry.Event) error {
func cronjobEnhancer(ctx context.Context, scope *sentry.Scope, object metav1.Object, sentryEvent *sentry.Event) error {

cronjobObj, ok := object.(*batchv1.CronJob)
if !ok {
Expand All @@ -244,14 +244,14 @@ func cronjobPodEnhancer(ctx context.Context, scope *sentry.Scope, object metav1.
})

// Add the cronjob to the fingerprint
sentryEvent.Fingerprint = append(sentryEvent.Fingerprint, "cronjob", cronjobObj.Name)
sentryEvent.Fingerprint = append(sentryEvent.Fingerprint, CRONJOB, cronjobObj.Name)

// Add the cronjob to the tag
setTagIfNotEmpty(scope, "cronjob_name", object.GetName())
cronjobObj.ManagedFields = []metav1.ManagedFieldsEntry{}
metadataJson, err := prettyJson(cronjobObj.ObjectMeta)
if err == nil {
scope.SetContext("cronjob", sentry.Context{
scope.SetContext(CRONJOB, sentry.Context{
"Metadata": metadataJson,
})
}
Expand All @@ -266,22 +266,22 @@ func cronjobPodEnhancer(ctx context.Context, scope *sentry.Scope, object metav1.
return nil
}

func replicaSetPodEnhancer(ctx context.Context, scope *sentry.Scope, object metav1.Object, sentryEvent *sentry.Event) error {
func replicaSetEnhancer(ctx context.Context, scope *sentry.Scope, object metav1.Object, sentryEvent *sentry.Event) error {

replicasetObj, ok := object.(*appsv1.ReplicaSet)
if !ok {
return errors.New("failed to cast object to ReplicaSet object")
}

// Add the cronjob to the fingerprint
sentryEvent.Fingerprint = append(sentryEvent.Fingerprint, "replicaset", replicasetObj.Name)
sentryEvent.Fingerprint = append(sentryEvent.Fingerprint, REPLICASET, replicasetObj.Name)

// Add the cronjob to the tag
setTagIfNotEmpty(scope, "replicaset_name", object.GetName())
replicasetObj.ManagedFields = []metav1.ManagedFieldsEntry{}
metadataJson, err := prettyJson(replicasetObj.ObjectMeta)
if err == nil {
scope.SetContext("replicaset", sentry.Context{
scope.SetContext(REPLICASET, sentry.Context{
"Metadata": metadataJson,
})
}
Expand All @@ -296,21 +296,21 @@ func replicaSetPodEnhancer(ctx context.Context, scope *sentry.Scope, object meta
return nil
}

func deploymentPodEnhancer(ctx context.Context, scope *sentry.Scope, object metav1.Object, sentryEvent *sentry.Event) error {
func deploymentEnhancer(ctx context.Context, scope *sentry.Scope, object metav1.Object, sentryEvent *sentry.Event) error {

deploymentObj, ok := object.(*appsv1.Deployment)
if !ok {
return errors.New("failed to cast object to Deployment object")
}
// Add the cronjob to the fingerprint
sentryEvent.Fingerprint = append(sentryEvent.Fingerprint, "deployment", deploymentObj.Name)
sentryEvent.Fingerprint = append(sentryEvent.Fingerprint, DEPLOYMENT, deploymentObj.Name)

// Add the cronjob to the tag
setTagIfNotEmpty(scope, "deployment_name", object.GetName())
deploymentObj.ManagedFields = []metav1.ManagedFieldsEntry{}
metadataJson, err := prettyJson(deploymentObj.ObjectMeta)
if err == nil {
scope.SetContext("deployment", sentry.Context{
scope.SetContext(DEPLOYMENT, sentry.Context{
"Metadata": metadataJson,
})
}
Expand Down
2 changes: 1 addition & 1 deletion enhancers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func TestRunEnhancers(t *testing.T) {
// Add event message
event.Message = "This event is for TestRunPodEnhancer"
// Call pod enhancer to modify scope and event
err = runEnhancers(ctx, nil, "Pod", podObj, scope, event)
err = runEnhancers(ctx, nil, POD, podObj, scope, event)
if err != nil {
t.Errorf("runEnhancers returned an error: %v", err)
}
Expand Down
11 changes: 9 additions & 2 deletions informer_crons.go → informer_cronjobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package main

import (
"context"
"os"

"github.com/rs/zerolog"
batchv1 "k8s.io/api/batch/v1"
Expand All @@ -14,7 +15,7 @@ func createCronjobInformer(ctx context.Context, factory informers.SharedInformer

logger := zerolog.Ctx(ctx)

logger.Debug().Msgf("Starting cronJob informer\n")
logger.Debug().Msgf("Starting cronjob informer\n")

cronjobInformer := factory.Batch().V1().CronJobs().Informer()

Expand Down Expand Up @@ -43,7 +44,13 @@ func createCronjobInformer(ctx context.Context, factory informers.SharedInformer
}
}

cronjobInformer.AddEventHandler(handler)
// Check if cronjob monitoring is enabled
if isTruthy(os.Getenv("SENTRY_K8S_MONITOR_CRONJOBS")) {
logger.Info().Msgf("Add cronjob informer handlers for cronjob monitoring")
cronjobInformer.AddEventHandler(handler)
} else {
logger.Info().Msgf("Cronjob monitoring is disabled")
}

return cronjobInformer, nil
}
20 changes: 20 additions & 0 deletions informer_deployments.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package main

import (
"context"

"github.com/rs/zerolog"
"k8s.io/client-go/informers"
"k8s.io/client-go/tools/cache"
)

func createDeploymentInformer(ctx context.Context, factory informers.SharedInformerFactory, namespace string) (cache.SharedIndexInformer, error) {

logger := zerolog.Ctx(ctx)

logger.Debug().Msgf("starting deployment informer\n")

jobInformer := factory.Apps().V1().Deployments().Informer()

return jobInformer, nil
}
9 changes: 8 additions & 1 deletion informer_jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package main

import (
"context"
"os"

"github.com/rs/zerolog"
batchv1 "k8s.io/api/batch/v1"
Expand Down Expand Up @@ -50,7 +51,13 @@ func createJobInformer(ctx context.Context, factory informers.SharedInformerFact
}
}

jobInformer.AddEventHandler(handler)
// Check if cronjob monitoring is enabled
if isTruthy(os.Getenv("SENTRY_K8S_MONITOR_CRONJOBS")) {
logger.Info().Msgf("Add job informer handlers for cronjob monitoring")
jobInformer.AddEventHandler(handler)
} else {
logger.Info().Msgf("Cronjob monitoring is disabled")
}

return jobInformer, nil
}
21 changes: 21 additions & 0 deletions informer_replicasets.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package main

import (
"context"

"github.com/rs/zerolog"

"k8s.io/client-go/informers"
"k8s.io/client-go/tools/cache"
)

func createReplicasetInformer(ctx context.Context, factory informers.SharedInformerFactory, namespace string) (cache.SharedIndexInformer, error) {

logger := zerolog.Ctx(ctx)

logger.Debug().Msgf("starting replicaset informer\n")

jobInformer := factory.Apps().V1().ReplicaSets().Informer()

return jobInformer, nil
}
Loading

0 comments on commit 34fb57b

Please sign in to comment.