Skip to content

Commit

Permalink
refactoring enhancer code
Browse files Browse the repository at this point in the history
  • Loading branch information
Jiahui-Zhang-20 committed Dec 19, 2023
1 parent 34fb57b commit 1e27d0f
Show file tree
Hide file tree
Showing 10 changed files with 138 additions and 115 deletions.
9 changes: 9 additions & 0 deletions constants.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package main

const (
KindPod string = "Pod"
KindJob string = "Job"
KindCronjob string = "CronJob"
KindReplicaset string = "ReplicaSet"
KindDeployment string = "Deployment"
)
2 changes: 1 addition & 1 deletion crons.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func runSentryCronsCheckin(ctx context.Context, job *batchv1.Job, eventHandlerTy
return errors.New("job does not have cronjob reference")
}
cronjobRef := job.OwnerReferences[0]
if !*cronjobRef.Controller || cronjobRef.Kind != CRONJOB {
if !*cronjobRef.Controller || cronjobRef.Kind != KindCronjob {
return errors.New("job does not have cronjob reference")
}
cronsMonitorData, ok := cronsMetaData.getCronsMonitorData(cronjobRef.Name)
Expand Down
57 changes: 35 additions & 22 deletions enhancers.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,11 +66,6 @@ func runEnhancers(ctx context.Context, eventObject *v1.Event, kind string, objec
// Call specific enhancers for all root owners
// (there most likely is just one root owner)
for _, rootOwner := range rootOwners {
// We already called an enhancer for the object
// so we avoid calling it again
if rootOwner.object.GetName() == object.GetName() {
continue
}
callObjectEnhancer(ctx, scope, &rootOwner, sentryEvent)
if err != nil {
return err
Expand All @@ -86,6 +81,24 @@ type KindObjectPair struct {

func findRootOwners(ctx context.Context, kindObjPair *KindObjectPair) ([]KindObjectPair, error) {

// use DFS to find the leaves of the owner references graph
rootOwners, err := ownerRefDFS(ctx, kindObjPair)
if err != nil {
return nil, err
}

// if the object has no owner references
if rootOwners[0].object.GetUID() == kindObjPair.object.GetUID() {
return []KindObjectPair{}, nil
}

return rootOwners, nil

}

// this function finds performs DFS to find the leaves the owner references graph
func ownerRefDFS(ctx context.Context, kindObjPair *KindObjectPair) ([]KindObjectPair, error) {

parents := kindObjPair.object.GetOwnerReferences()
// the owners slice to be returned
rootOwners := []KindObjectPair{}
Expand All @@ -102,7 +115,7 @@ func findRootOwners(ctx context.Context, kindObjPair *KindObjectPair) ([]KindObj
if !ok {
return nil, errors.New("error attempting to find root owneres")
}
partialOwners, err := findRootOwners(ctx, &KindObjectPair{
partialOwners, err := ownerRefDFS(ctx, &KindObjectPair{
kind: parent.Kind,
object: parentObj,
})
Expand All @@ -120,15 +133,15 @@ func callObjectEnhancer(ctx context.Context, scope *sentry.Scope, kindObjectPair

var err error = nil
switch kindObjectPair.kind {
case POD:
case KindPod:
err = podEnhancer(ctx, scope, kindObjectPair.object, sentryEvent)
case REPLICASET:
case KindReplicaset:
err = replicaSetEnhancer(ctx, scope, kindObjectPair.object, sentryEvent)
case DEPLOYMENT:
case KindDeployment:
err = deploymentEnhancer(ctx, scope, kindObjectPair.object, sentryEvent)
case JOB:
case KindJob:
err = jobEnhancer(ctx, scope, kindObjectPair.object, sentryEvent)
case CRONJOB:
case KindCronjob:
err = cronjobEnhancer(ctx, scope, kindObjectPair.object, sentryEvent)
default:
sentryEvent.Fingerprint = append(sentryEvent.Fingerprint, kindObjectPair.object.GetName())
Expand All @@ -142,7 +155,7 @@ func eventEnhancer(ctx context.Context, scope *sentry.Scope, object metav1.Objec
return errors.New("failed to cast object to event object")
}

// The involved object is likely very simillar
// The involved object is likely very similar
// to the involved object's metadata which will
// be included when the the object's enhancer
// eventually gets triggered
Expand Down Expand Up @@ -177,14 +190,14 @@ func podEnhancer(ctx context.Context, scope *sentry.Scope, object metav1.Object,
setTagIfNotEmpty(scope, "node_name", nodeName)

// Add the cronjob to the fingerprint
sentryEvent.Fingerprint = append(sentryEvent.Fingerprint, POD, podObj.Name)
sentryEvent.Fingerprint = append(sentryEvent.Fingerprint, KindPod, podObj.Name)

// Add the cronjob to the tag
setTagIfNotEmpty(scope, "pod_name", object.GetName())
podObj.ManagedFields = []metav1.ManagedFieldsEntry{}
metadataJson, err := prettyJson(podObj.ObjectMeta)
if err == nil {
scope.SetContext(POD, sentry.Context{
scope.SetContext(KindPod, sentry.Context{
"Metadata": metadataJson,
})
}
Expand All @@ -209,14 +222,14 @@ func jobEnhancer(ctx context.Context, scope *sentry.Scope, object metav1.Object,
}

// Add the cronjob to the fingerprint
sentryEvent.Fingerprint = append(sentryEvent.Fingerprint, JOB, jobObj.Name)
sentryEvent.Fingerprint = append(sentryEvent.Fingerprint, KindJob, jobObj.Name)

// Add the cronjob to the tag
setTagIfNotEmpty(scope, "job_name", object.GetName())
jobObj.ManagedFields = []metav1.ManagedFieldsEntry{}
metadataJson, err := prettyJson(jobObj.ObjectMeta)
if err == nil {
scope.SetContext(JOB, sentry.Context{
scope.SetContext(KindJob, sentry.Context{
"Metadata": metadataJson,
})
}
Expand Down Expand Up @@ -244,14 +257,14 @@ func cronjobEnhancer(ctx context.Context, scope *sentry.Scope, object metav1.Obj
})

// Add the cronjob to the fingerprint
sentryEvent.Fingerprint = append(sentryEvent.Fingerprint, CRONJOB, cronjobObj.Name)
sentryEvent.Fingerprint = append(sentryEvent.Fingerprint, KindCronjob, cronjobObj.Name)

// Add the cronjob to the tag
setTagIfNotEmpty(scope, "cronjob_name", object.GetName())
cronjobObj.ManagedFields = []metav1.ManagedFieldsEntry{}
metadataJson, err := prettyJson(cronjobObj.ObjectMeta)
if err == nil {
scope.SetContext(CRONJOB, sentry.Context{
scope.SetContext(KindCronjob, sentry.Context{
"Metadata": metadataJson,
})
}
Expand All @@ -274,14 +287,14 @@ func replicaSetEnhancer(ctx context.Context, scope *sentry.Scope, object metav1.
}

// Add the cronjob to the fingerprint
sentryEvent.Fingerprint = append(sentryEvent.Fingerprint, REPLICASET, replicasetObj.Name)
sentryEvent.Fingerprint = append(sentryEvent.Fingerprint, KindReplicaset, replicasetObj.Name)

// Add the cronjob to the tag
setTagIfNotEmpty(scope, "replicaset_name", object.GetName())
replicasetObj.ManagedFields = []metav1.ManagedFieldsEntry{}
metadataJson, err := prettyJson(replicasetObj.ObjectMeta)
if err == nil {
scope.SetContext(REPLICASET, sentry.Context{
scope.SetContext(KindReplicaset, sentry.Context{
"Metadata": metadataJson,
})
}
Expand All @@ -303,14 +316,14 @@ func deploymentEnhancer(ctx context.Context, scope *sentry.Scope, object metav1.
return errors.New("failed to cast object to Deployment object")
}
// Add the cronjob to the fingerprint
sentryEvent.Fingerprint = append(sentryEvent.Fingerprint, DEPLOYMENT, deploymentObj.Name)
sentryEvent.Fingerprint = append(sentryEvent.Fingerprint, KindDeployment, deploymentObj.Name)

// Add the cronjob to the tag
setTagIfNotEmpty(scope, "deployment_name", object.GetName())
deploymentObj.ManagedFields = []metav1.ManagedFieldsEntry{}
metadataJson, err := prettyJson(deploymentObj.ObjectMeta)
if err == nil {
scope.SetContext(DEPLOYMENT, sentry.Context{
scope.SetContext(KindDeployment, sentry.Context{
"Metadata": metadataJson,
})
}
Expand Down
2 changes: 1 addition & 1 deletion enhancers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func TestRunEnhancers(t *testing.T) {
// Add event message
event.Message = "This event is for TestRunPodEnhancer"
// Call pod enhancer to modify scope and event
err = runEnhancers(ctx, nil, POD, podObj, scope, event)
err = runEnhancers(ctx, nil, "Pod", podObj, scope, event)
if err != nil {
t.Errorf("runEnhancers returned an error: %v", err)
}
Expand Down
80 changes: 80 additions & 0 deletions informers.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
package main

import (
"context"
"errors"
"time"

"k8s.io/client-go/informers"
"k8s.io/client-go/tools/cache"
)

var cronjobInformer cache.SharedIndexInformer
var jobInformer cache.SharedIndexInformer
var replicasetInformer cache.SharedIndexInformer
var deploymentInformer cache.SharedIndexInformer

// Starts all informers (jobs, cronjobs, replicasets, deployments)
// if we opt into cronjob, attach the job/cronjob event handlers
// and add to the crons monitor data struct for Sentry Crons
func startInformers(ctx context.Context, namespace string) error {

clientset, err := getClientsetFromContext(ctx)
if err != nil {
return errors.New("failed to get clientset")
}

// Create factory that will produce both the cronjob informer and job informer
factory := informers.NewSharedInformerFactoryWithOptions(
clientset,
5*time.Second,
informers.WithNamespace(namespace),
)

// Create the job informer
jobInformer, err = createJobInformer(ctx, factory, namespace)
if err != nil {
return err
}
// Create the cronjob informer
cronjobInformer, err = createCronjobInformer(ctx, factory, namespace)
if err != nil {
return err
}
// Create the replicaset informer
replicasetInformer, err = createReplicasetInformer(ctx, factory, namespace)
if err != nil {
return err
}
// Create the deployment informer
deploymentInformer, err = createDeploymentInformer(ctx, factory, namespace)
if err != nil {
return err
}

// Channel to tell the factory to stop the informers
doneChan := make(chan struct{})
factory.Start(doneChan)

// Sync the cronjob informer cache
if ok := cache.WaitForCacheSync(doneChan, cronjobInformer.HasSynced); !ok {
return errors.New("cronjob informer failed to sync")
}
// Sync the job informer cache
if ok := cache.WaitForCacheSync(doneChan, jobInformer.HasSynced); !ok {
return errors.New("job informer failed to sync")
}
// Sync the replicaset informer cache
if ok := cache.WaitForCacheSync(doneChan, replicasetInformer.HasSynced); !ok {
return errors.New("replicaset informer failed to sync")
}
// Sync the deployment informer cache
if ok := cache.WaitForCacheSync(doneChan, deploymentInformer.HasSynced); !ok {
return errors.New("deployment informer failed to sync")
}

// Wait for the channel to be closed
<-doneChan

return nil
}
1 change: 1 addition & 0 deletions k8s/errors/delete_test_pods.sh
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
set -euxo pipefail

kubectl delete deployment -l type=test-pod -A
kubectl delete replicaset -l type=test-pod -A
kubectl delete cronjob -l type=test-pod -A
kubectl delete job -l type=test-pod -A
kubectl delete pod -l type=test-pod -A
3 changes: 2 additions & 1 deletion k8s/errors/replicaset-basic-error.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@ kind: ReplicaSet
metadata:
name: replicaset-basic-error
labels:
app: test-pod
run: replicaset-basic-error
type: test-pod
annotations:
k8s.sentry.io/dsn: "https://474d9da00094c5e39d6800c01f3aeff6@o4506191942320128.ingest.sentry.io/4506363396816896"
spec:
Expand Down
19 changes: 5 additions & 14 deletions sentry_dsn_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,6 @@ import (
)

var DSNAnnotation = "k8s.sentry.io/dsn"

const (
POD string = "Pod"
JOB string = "Job"
CRONJOB string = "CronJob"
REPLICASET string = "ReplicaSet"
DEPLOYMENT string = "Deployment"
)

var dsnClientMapping = NewDsnClientMapping()

// Map from Sentry DSN to Client
Expand Down Expand Up @@ -136,13 +127,13 @@ func findObject(ctx context.Context, kind string, namespace string, name string)
}

switch kind {
case POD:
case KindPod:
pod, err := clientset.CoreV1().Pods(namespace).Get(context.Background(), name, metav1.GetOptions{})
if err != nil {
return nil, false
}
return pod, true
case REPLICASET:
case KindReplicaset:
var replicaSet *v1.ReplicaSet
// Check if the replicaset is available in the indexer first
if replicasetInformer != nil {
Expand All @@ -159,7 +150,7 @@ func findObject(ctx context.Context, kind string, namespace string, name string)
}
}
return replicaSet, true
case DEPLOYMENT:
case KindDeployment:
var deployment *v1.Deployment
// Check if the deployment is available in the indexer first
if deploymentInformer != nil {
Expand All @@ -177,7 +168,7 @@ func findObject(ctx context.Context, kind string, namespace string, name string)
}
}
return deployment, true
case JOB:
case KindJob:
var job *batchv1.Job
// Check if the job is available in the indexer first
if jobInformer != nil {
Expand All @@ -194,7 +185,7 @@ func findObject(ctx context.Context, kind string, namespace string, name string)
}
}
return job, true
case CRONJOB:
case KindCronjob:
var cronjob *batchv1.CronJob
// Check if the cronjob is available in the indexer first
if cronjobInformer != nil {
Expand Down
4 changes: 2 additions & 2 deletions watcher_events_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func TestHandleWatchEvent(t *testing.T) {
},
InvolvedObject: corev1.ObjectReference{
APIVersion: "batch/v1",
Kind: CRONJOB,
Kind: "CronJob",
Name: "cronjob-basic-success",
Namespace: "TestHandleWatchEventNamespace",
ResourceVersion: "33547",
Expand Down Expand Up @@ -114,7 +114,7 @@ func TestHandleWatchEvent(t *testing.T) {
"watcher_name": "events",
"event_type": "Warning",
"reason": "Fake Reason: TestHandlePodWatchEvent",
"kind": CRONJOB,
"kind": "CronJob",
"object_uid": "f825de34-6728-474a-a28f-8318de23acc1",
"namespace": "TestHandleWatchEventNamespace"}

Expand Down
Loading

0 comments on commit 1e27d0f

Please sign in to comment.