diff --git a/constants.go b/constants.go new file mode 100644 index 0000000..9377891 --- /dev/null +++ b/constants.go @@ -0,0 +1,9 @@ +package main + +const ( + KindPod string = "Pod" + KindJob string = "Job" + KindCronjob string = "CronJob" + KindReplicaset string = "ReplicaSet" + KindDeployment string = "Deployment" +) diff --git a/crons.go b/crons.go index 06a2986..a3b13c9 100644 --- a/crons.go +++ b/crons.go @@ -3,16 +3,10 @@ package main import ( "context" "errors" - "fmt" - "time" "github.com/getsentry/sentry-go" "github.com/rs/zerolog" batchv1 "k8s.io/api/batch/v1" - v1 "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/client-go/informers" - "k8s.io/client-go/tools/cache" ) type EventHandlerType string @@ -23,56 +17,6 @@ const ( EventHandlerDelete EventHandlerType = "DELETE" ) -var cronjobInformer cache.SharedIndexInformer -var jobInformer cache.SharedIndexInformer - -// Starts the crons informer which has event handlers -// adds to the crons monitor data struct used for sending -// checkin events to Sentry -func startCronsInformers(ctx context.Context, namespace string) error { - - clientset, err := getClientsetFromContext(ctx) - if err != nil { - return errors.New("failed to get clientset") - } - - // Create factory that will produce both the cronjob informer and job informer - factory := informers.NewSharedInformerFactoryWithOptions( - clientset, - 5*time.Second, - informers.WithNamespace(namespace), - ) - - // Create the cronjob informer - cronjobInformer, err = createCronjobInformer(ctx, factory, namespace) - if err != nil { - return err - } - // Create the job informer - jobInformer, err = createJobInformer(ctx, factory, namespace) - if err != nil { - return err - } - - // Channel to tell the factory to stop the informers - doneChan := make(chan struct{}) - factory.Start(doneChan) - - // Sync the cronjob informer cache - if ok := cache.WaitForCacheSync(doneChan, cronjobInformer.HasSynced); !ok { - return errors.New("cronjob informer failed to sync") - } - // Sync the job informer cache - if ok := cache.WaitForCacheSync(doneChan, jobInformer.HasSynced); !ok { - return errors.New("job informer failed to sync") - } - - // Wait for the channel to be closed - <-doneChan - - return nil -} - // Captures sentry crons checkin event if appropriate // by checking the job status to determine if the job just created pod (job starting) // or if the job exited @@ -92,7 +36,7 @@ func runSentryCronsCheckin(ctx context.Context, job *batchv1.Job, eventHandlerTy return errors.New("job does not have cronjob reference") } cronjobRef := job.OwnerReferences[0] - if !*cronjobRef.Controller || cronjobRef.Kind != "CronJob" { + if !*cronjobRef.Controller || cronjobRef.Kind != KindCronjob { return errors.New("job does not have cronjob reference") } cronsMonitorData, ok := cronsMetaData.getCronsMonitorData(cronjobRef.Name) @@ -197,87 +141,3 @@ func checkinJobEnding(ctx context.Context, job *batchv1.Job, cronsMonitorData *C ) return nil } - -// Adds to the sentry events whenever it is associated with a cronjob -// so the sentry event contains the corresponding slug monitor, cronjob name, timestamp of when the cronjob began, and -// the k8s cronjob metadata -func runCronsDataHandler(ctx context.Context, scope *sentry.Scope, pod *v1.Pod, sentryEvent *sentry.Event) (bool, error) { - - // get owningCronJob if exists - owningCronJob, err := getOwningCronJob(ctx, pod) - if err != nil { - return false, err - } - - // pod not part of a cronjob - if owningCronJob == nil { - return false, nil - } - - scope.SetContext("Monitor", sentry.Context{ - "Slug": owningCronJob.Name, - }) - - sentryEvent.Fingerprint = append(sentryEvent.Fingerprint, owningCronJob.Kind, owningCronJob.Name) - - setTagIfNotEmpty(scope, "cronjob_name", owningCronJob.Name) - - // add breadcrumb with cronJob timestamps - scope.AddBreadcrumb(&sentry.Breadcrumb{ - Message: fmt.Sprintf("Created cronjob %s", owningCronJob.Name), - Level: sentry.LevelInfo, - Timestamp: owningCronJob.CreationTimestamp.Time, - }, breadcrumbLimit) - - metadataJson, err := prettyJson(owningCronJob.ObjectMeta) - - if err == nil { - scope.SetContext("Cronjob", sentry.Context{ - "Metadata": metadataJson, - }) - } else { - return false, err - } - - return true, nil -} - -// returns the cronjob that is the grandparent of a pod if exists -// but returns nil is no cronjob is found -func getOwningCronJob(ctx context.Context, pod *v1.Pod) (*batchv1.CronJob, error) { - - clientset, err := getClientsetFromContext(ctx) - if err != nil { - return nil, err - } - - namespace := pod.Namespace - - // first attempt to group events by cronJobs - var owningCronJob *batchv1.CronJob = nil - - // check if the pod corresponds to a cronJob - for _, podRef := range pod.ObjectMeta.OwnerReferences { - // check the pod has a job as an owner - if !*podRef.Controller || podRef.Kind != "Job" { - continue - } - // find the owning job - owningJob, err := clientset.BatchV1().Jobs(namespace).Get(context.Background(), podRef.Name, metav1.GetOptions{}) - if err != nil { - continue - } - // check if owning job is owned by a cronJob - for _, jobRef := range owningJob.ObjectMeta.OwnerReferences { - if !*jobRef.Controller || jobRef.Kind != "CronJob" { - continue - } - owningCronJob, err = clientset.BatchV1().CronJobs(namespace).Get(context.Background(), jobRef.Name, metav1.GetOptions{}) - if err != nil { - continue - } - } - } - - return owningCronJob, nil -} diff --git a/enhancers.go b/enhancers.go index 2664423..f7620d2 100644 --- a/enhancers.go +++ b/enhancers.go @@ -2,29 +2,338 @@ package main import ( "context" + "errors" "fmt" "github.com/getsentry/sentry-go" + appsv1 "k8s.io/api/apps/v1" + batchv1 "k8s.io/api/batch/v1" v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) -func runEnhancers(ctx context.Context, objectRef *v1.ObjectReference, cachedObject interface{}, scope *sentry.Scope, sentryEvent *sentry.Event) { - involvedObject := fmt.Sprintf("%s/%s", objectRef.Kind, objectRef.Name) +const breadcrumbLimit = 20 + +func runEnhancers(ctx context.Context, eventObject *v1.Event, kind string, object metav1.Object, scope *sentry.Scope, sentryEvent *sentry.Event) error { + + involvedObject := fmt.Sprintf("%s/%s", kind, object.GetName()) ctx, logger := getLoggerWithTag(ctx, "object", involvedObject) + logger.Debug().Msgf("Running the enhancer") var err error - logger.Debug().Msgf("Running enhancers...") - // First, run the basic (common) enhancer - runCommonEnhancer(ctx, scope, sentryEvent) + // First, run the common enhancer + err = runCommonEnhancer(ctx, scope, sentryEvent) + if err != nil { + return err + } + // If an event object is provided, we call the event enhancer + if eventObject != nil { + err = eventEnhancer(ctx, scope, eventObject, sentryEvent) + if err != nil { + return err + } + } + + logger.Trace().Msgf("Current fingerprint: %v", sentryEvent.Fingerprint) + // Enhance message with object name + message := sentryEvent.Message + sentryEvent.Message = fmt.Sprintf("%s: %s", object.GetName(), sentryEvent.Message) + + // Adjust fingerprint. + // If there's already a non-empty fingerprint set, we assume that it was set by + // another enhancer, so we don't touch it. + if len(sentryEvent.Fingerprint) == 0 { + sentryEvent.Fingerprint = []string{message} + } + logger.Trace().Msgf("Fingerprint after adjustment: %v", sentryEvent.Fingerprint) + + // Find the root owners and their corresponding object kinds + rootOwners, err := findRootOwners(ctx, &KindObjectPair{ + kind: kind, + object: object, + }) + if err != nil { + return err + } + + // Call the specific enhancer for the object + callObjectEnhancer(ctx, scope, &KindObjectPair{ + kind, + object, + }, sentryEvent) - // Then, run kind-specific enhancers - switch objectRef.Kind { - case "Pod": - err = runPodEnhancer(ctx, objectRef, cachedObject, scope, sentryEvent) + // Call specific enhancers for all root owners + // (there most likely is just one root owner) + for _, rootOwner := range rootOwners { + callObjectEnhancer(ctx, scope, &rootOwner, sentryEvent) + if err != nil { + return err + } } + return nil +} + +type KindObjectPair struct { + kind string + object metav1.Object +} + +func findRootOwners(ctx context.Context, kindObjPair *KindObjectPair) ([]KindObjectPair, error) { + // use DFS to find the leaves of the owner references graph + rootOwners, err := ownerRefDFS(ctx, kindObjPair) if err != nil { - logger.Error().Msgf("Error running an enhancer: %v", err) + return nil, err } + + // if the object has no owner references + if rootOwners[0].object.GetUID() == kindObjPair.object.GetUID() { + return []KindObjectPair{}, nil + } + + return rootOwners, nil + +} + +// this function finds performs DFS to find the leaves the owner references graph +func ownerRefDFS(ctx context.Context, kindObjPair *KindObjectPair) ([]KindObjectPair, error) { + + parents := kindObjPair.object.GetOwnerReferences() + // the owners slice to be returned + rootOwners := []KindObjectPair{} + + // base case: the object has no parents + if len(parents) == 0 { + rootOwners = append(rootOwners, *kindObjPair) + return rootOwners, nil + } + + // recursive case: the object has parents to explore + for _, parent := range parents { + parentObj, ok := findObject(ctx, parent.Kind, kindObjPair.object.GetNamespace(), parent.Name) + if !ok { + return nil, errors.New("error attempting to find root owneres") + } + partialOwners, err := ownerRefDFS(ctx, &KindObjectPair{ + kind: parent.Kind, + object: parentObj, + }) + if err != nil { + return nil, err + } + if partialOwners != nil { + rootOwners = append(rootOwners, partialOwners...) + } + } + return rootOwners, nil +} + +func callObjectEnhancer(ctx context.Context, scope *sentry.Scope, kindObjectPair *KindObjectPair, sentryEvent *sentry.Event) error { + + var err error = nil + switch kindObjectPair.kind { + case KindPod: + err = podEnhancer(ctx, scope, kindObjectPair.object, sentryEvent) + case KindReplicaset: + err = replicaSetEnhancer(ctx, scope, kindObjectPair.object, sentryEvent) + case KindDeployment: + err = deploymentEnhancer(ctx, scope, kindObjectPair.object, sentryEvent) + case KindJob: + err = jobEnhancer(ctx, scope, kindObjectPair.object, sentryEvent) + case KindCronjob: + err = cronjobEnhancer(ctx, scope, kindObjectPair.object, sentryEvent) + default: + sentryEvent.Fingerprint = append(sentryEvent.Fingerprint, kindObjectPair.object.GetName()) + } + return err +} + +func eventEnhancer(ctx context.Context, scope *sentry.Scope, object metav1.Object, sentryEvent *sentry.Event) error { + eventObj, ok := object.(*v1.Event) + if !ok { + return errors.New("failed to cast object to event object") + } + + // The involved object is likely very similar + // to the involved object's metadata which will + // be included when the the object's enhancer + // eventually gets triggered + scope.RemoveExtra("Involved Object") + + // Add related events as breadcrumbs + objEvents := filterEventsFromBuffer(eventObj.Namespace, "Event", eventObj.Name) + for _, objEvent := range objEvents { + breadcrumbLevel := sentry.LevelInfo + if objEvent.Type == v1.EventTypeWarning { + breadcrumbLevel = sentry.LevelWarning + } + + scope.AddBreadcrumb(&sentry.Breadcrumb{ + Message: objEvent.Message, + Level: breadcrumbLevel, + Timestamp: objEvent.LastTimestamp.Time, + }, breadcrumbLimit) + } + + return nil +} + +func podEnhancer(ctx context.Context, scope *sentry.Scope, object metav1.Object, sentryEvent *sentry.Event) error { + podObj, ok := object.(*v1.Pod) + if !ok { + return errors.New("failed to cast object to Pod object") + } + + // Add node name as tag + nodeName := podObj.Spec.NodeName + setTagIfNotEmpty(scope, "node_name", nodeName) + + // Add the cronjob to the fingerprint + sentryEvent.Fingerprint = append(sentryEvent.Fingerprint, KindPod, podObj.Name) + + // Add the cronjob to the tag + setTagIfNotEmpty(scope, "pod_name", object.GetName()) + podObj.ManagedFields = []metav1.ManagedFieldsEntry{} + metadataJson, err := prettyJson(podObj.ObjectMeta) + if err == nil { + scope.SetContext(KindPod, sentry.Context{ + "Metadata": metadataJson, + }) + } + + // Add breadcrumb with cronjob timestamps + scope.AddBreadcrumb(&sentry.Breadcrumb{ + Message: fmt.Sprintf("Created pod %s", object.GetName()), + Level: sentry.LevelInfo, + Timestamp: object.GetCreationTimestamp().Time, + }, breadcrumbLimit) + + addPodLogLinkToGKEContext(ctx, scope, podObj.Name, podObj.Namespace) + + return nil +} + +func jobEnhancer(ctx context.Context, scope *sentry.Scope, object metav1.Object, sentryEvent *sentry.Event) error { + + jobObj, ok := object.(*batchv1.Job) + if !ok { + return errors.New("failed to cast object to Job object") + } + + // Add the cronjob to the fingerprint + sentryEvent.Fingerprint = append(sentryEvent.Fingerprint, KindJob, jobObj.Name) + + // Add the cronjob to the tag + setTagIfNotEmpty(scope, "job_name", object.GetName()) + jobObj.ManagedFields = []metav1.ManagedFieldsEntry{} + metadataJson, err := prettyJson(jobObj.ObjectMeta) + if err == nil { + scope.SetContext(KindJob, sentry.Context{ + "Metadata": metadataJson, + }) + } + + // Add breadcrumb with cronjob timestamps + scope.AddBreadcrumb(&sentry.Breadcrumb{ + Message: fmt.Sprintf("Created job %s", object.GetName()), + Level: sentry.LevelInfo, + Timestamp: object.GetCreationTimestamp().Time, + }, breadcrumbLimit) + + return nil +} + +func cronjobEnhancer(ctx context.Context, scope *sentry.Scope, object metav1.Object, sentryEvent *sentry.Event) error { + + cronjobObj, ok := object.(*batchv1.CronJob) + if !ok { + return errors.New("failed to cast object to CronJob object") + } + + // Set the context for corresponding slug monitor + scope.SetContext("Monitor", sentry.Context{ + "Slug": object.GetName(), + }) + + // Add the cronjob to the fingerprint + sentryEvent.Fingerprint = append(sentryEvent.Fingerprint, KindCronjob, cronjobObj.Name) + + // Add the cronjob to the tag + setTagIfNotEmpty(scope, "cronjob_name", object.GetName()) + cronjobObj.ManagedFields = []metav1.ManagedFieldsEntry{} + metadataJson, err := prettyJson(cronjobObj.ObjectMeta) + if err == nil { + scope.SetContext(KindCronjob, sentry.Context{ + "Metadata": metadataJson, + }) + } + + // Add breadcrumb with cronjob timestamps + scope.AddBreadcrumb(&sentry.Breadcrumb{ + Message: fmt.Sprintf("Created cronjob %s", object.GetName()), + Level: sentry.LevelInfo, + Timestamp: object.GetCreationTimestamp().Time, + }, breadcrumbLimit) + + return nil +} + +func replicaSetEnhancer(ctx context.Context, scope *sentry.Scope, object metav1.Object, sentryEvent *sentry.Event) error { + + replicasetObj, ok := object.(*appsv1.ReplicaSet) + if !ok { + return errors.New("failed to cast object to ReplicaSet object") + } + + // Add the cronjob to the fingerprint + sentryEvent.Fingerprint = append(sentryEvent.Fingerprint, KindReplicaset, replicasetObj.Name) + + // Add the cronjob to the tag + setTagIfNotEmpty(scope, "replicaset_name", object.GetName()) + replicasetObj.ManagedFields = []metav1.ManagedFieldsEntry{} + metadataJson, err := prettyJson(replicasetObj.ObjectMeta) + if err == nil { + scope.SetContext(KindReplicaset, sentry.Context{ + "Metadata": metadataJson, + }) + } + + // Add breadcrumb with cronjob timestamps + scope.AddBreadcrumb(&sentry.Breadcrumb{ + Message: fmt.Sprintf("Created replicaset %s", object.GetName()), + Level: sentry.LevelInfo, + Timestamp: object.GetCreationTimestamp().Time, + }, breadcrumbLimit) + + return nil +} + +func deploymentEnhancer(ctx context.Context, scope *sentry.Scope, object metav1.Object, sentryEvent *sentry.Event) error { + + deploymentObj, ok := object.(*appsv1.Deployment) + if !ok { + return errors.New("failed to cast object to Deployment object") + } + // Add the cronjob to the fingerprint + sentryEvent.Fingerprint = append(sentryEvent.Fingerprint, KindDeployment, deploymentObj.Name) + + // Add the cronjob to the tag + setTagIfNotEmpty(scope, "deployment_name", object.GetName()) + deploymentObj.ManagedFields = []metav1.ManagedFieldsEntry{} + metadataJson, err := prettyJson(deploymentObj.ObjectMeta) + if err == nil { + scope.SetContext(KindDeployment, sentry.Context{ + "Metadata": metadataJson, + }) + } + + // Add breadcrumb with cronjob timestamps + scope.AddBreadcrumb(&sentry.Breadcrumb{ + Message: fmt.Sprintf("Created deployment %s", object.GetName()), + Level: sentry.LevelInfo, + Timestamp: object.GetCreationTimestamp().Time, + }, breadcrumbLimit) + + return nil } diff --git a/enhancers_pod.go b/enhancers_pod.go deleted file mode 100644 index 0b7b154..0000000 --- a/enhancers_pod.go +++ /dev/null @@ -1,116 +0,0 @@ -package main - -import ( - "context" - "fmt" - - "github.com/getsentry/sentry-go" - "github.com/rs/zerolog" - v1 "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" -) - -const breadcrumbLimit = 20 - -func runPodEnhancer(ctx context.Context, podMeta *v1.ObjectReference, cachedObject interface{}, scope *sentry.Scope, sentryEvent *sentry.Event) error { - logger := zerolog.Ctx(ctx) - - logger.Debug().Msgf("Running the pod enhancer") - - clientset, err := getClientsetFromContext(ctx) - if err != nil { - return err - } - - namespace := podMeta.Namespace - podName := podMeta.Name - opts := metav1.GetOptions{} - - cachedPod, _ := cachedObject.(*v1.Pod) - var pod *v1.Pod - if cachedPod == nil { - logger.Debug().Msgf("Fetching pod data") - // FIXME: this can probably be cached if we use NewSharedInformerFactory - pod, err = clientset.CoreV1().Pods(namespace).Get(context.Background(), podName, opts) - if err != nil { - return err - } - } else { - logger.Debug().Msgf("Reusing the available pod object") - pod = cachedPod - } - - // Clean-up the object - pod.ManagedFields = []metav1.ManagedFieldsEntry{} - - nodeName := pod.Spec.NodeName - setTagIfNotEmpty(scope, "node_name", nodeName) - - metadataJson, err := prettyJson(pod.ObjectMeta) - if err == nil { - scope.SetContext("Pod", sentry.Context{ - "Metadata": metadataJson, - }) - } - - // The data will be mostly duplicated in "Pod Metadata" - scope.RemoveExtra("Involved Object") - - // Add related events as breadcrumbs - podEvents := filterEventsFromBuffer(namespace, "Pod", podName) - for _, podEvent := range podEvents { - breadcrumbLevel := sentry.LevelInfo - if podEvent.Type == v1.EventTypeWarning { - breadcrumbLevel = sentry.LevelWarning - } - - scope.AddBreadcrumb(&sentry.Breadcrumb{ - Message: podEvent.Message, - Level: breadcrumbLevel, - Timestamp: podEvent.LastTimestamp.Time, - }, breadcrumbLimit) - } - - message := sentryEvent.Message - sentryEvent.Message = fmt.Sprintf("%s: %s", podName, sentryEvent.Message) - - logger.Trace().Msgf("Current fingerprint: %v", sentryEvent.Fingerprint) - - // Adjust fingerprint. - // If there's already a non-empty fingerprint set, we assume that it was set by - // another enhancer, so we don't touch it. - if len(sentryEvent.Fingerprint) == 0 { - sentryEvent.Fingerprint = []string{message} - } - - // Using finger print to group events together - // The pod is not owned by a higher resource - if len(pod.OwnerReferences) == 0 { - // Standalone pod => most probably it has a unique name - sentryEvent.Fingerprint = append(sentryEvent.Fingerprint, podName) - // The pod is owned by a higher resource - } else { - // Check if pod is part of cronJob (as grandchild workload resource) - - // check if a cronJob - var ok bool - if pod.OwnerReferences[0].Kind == "Job" { - ok, err = runCronsDataHandler(ctx, scope, pod, sentryEvent) - if err != nil { - return err - } - } - - // The job is not owned by a cronJob - if !ok { - owner := pod.OwnerReferences[0] - sentryEvent.Fingerprint = append(sentryEvent.Fingerprint, owner.Kind, owner.Name) - } - } - - logger.Trace().Msgf("Fingerprint after adjustment: %v", sentryEvent.Fingerprint) - - addPodLogLinkToGKEContext(ctx, scope, podName, namespace) - - return nil -} diff --git a/enhancers_pod_test.go b/enhancers_test.go similarity index 90% rename from enhancers_pod_test.go rename to enhancers_test.go index fec2cc4..7af41ad 100644 --- a/enhancers_pod_test.go +++ b/enhancers_test.go @@ -10,7 +10,7 @@ import ( "k8s.io/client-go/kubernetes/fake" ) -func TestRunPodEnhancer(t *testing.T) { +func TestRunEnhancers(t *testing.T) { // Create empty context ctx := context.Background() @@ -45,10 +45,6 @@ func TestRunPodEnhancer(t *testing.T) { t.Fatalf("error injecting pod add: %v", err) } ctx = setClientsetOnContext(ctx, fakeClientset) - objRef := &corev1.ObjectReference{ - Name: "TestRunPodEnhancerPod", - Namespace: "TestRunPodEnhancerNamespace", - } // Create empty scope scope := sentry.NewScope() @@ -57,9 +53,9 @@ func TestRunPodEnhancer(t *testing.T) { // Add event message event.Message = "This event is for TestRunPodEnhancer" // Call pod enhancer to modify scope and event - err = runPodEnhancer(ctx, objRef, nil, scope, event) + err = runEnhancers(ctx, nil, KindPod, podObj, scope, event) if err != nil { - t.Errorf("pod enhancer returned an error: %v", err) + t.Errorf("runEnhancers returned an error: %v", err) } // Apply the scope to the event diff --git a/informer_crons.go b/informer_cronjobs.go similarity index 81% rename from informer_crons.go rename to informer_cronjobs.go index 7575dbf..f53ab64 100644 --- a/informer_crons.go +++ b/informer_cronjobs.go @@ -2,6 +2,7 @@ package main import ( "context" + "os" "github.com/rs/zerolog" batchv1 "k8s.io/api/batch/v1" @@ -14,7 +15,7 @@ func createCronjobInformer(ctx context.Context, factory informers.SharedInformer logger := zerolog.Ctx(ctx) - logger.Debug().Msgf("Starting cronJob informer\n") + logger.Debug().Msgf("Starting cronjob informer\n") cronjobInformer := factory.Batch().V1().CronJobs().Informer() @@ -43,7 +44,13 @@ func createCronjobInformer(ctx context.Context, factory informers.SharedInformer } } - cronjobInformer.AddEventHandler(handler) + // Check if cronjob monitoring is enabled + if isTruthy(os.Getenv("SENTRY_K8S_MONITOR_CRONJOBS")) { + logger.Info().Msgf("Add cronjob informer handlers for cronjob monitoring") + cronjobInformer.AddEventHandler(handler) + } else { + logger.Info().Msgf("Cronjob monitoring is disabled") + } return cronjobInformer, nil } diff --git a/informer_deployments.go b/informer_deployments.go new file mode 100644 index 0000000..fc02bf3 --- /dev/null +++ b/informer_deployments.go @@ -0,0 +1,20 @@ +package main + +import ( + "context" + + "github.com/rs/zerolog" + "k8s.io/client-go/informers" + "k8s.io/client-go/tools/cache" +) + +func createDeploymentInformer(ctx context.Context, factory informers.SharedInformerFactory, namespace string) (cache.SharedIndexInformer, error) { + + logger := zerolog.Ctx(ctx) + + logger.Debug().Msgf("starting deployment informer\n") + + jobInformer := factory.Apps().V1().Deployments().Informer() + + return jobInformer, nil +} diff --git a/informer_jobs.go b/informer_jobs.go index 3dcab1f..bc59070 100644 --- a/informer_jobs.go +++ b/informer_jobs.go @@ -2,6 +2,7 @@ package main import ( "context" + "os" "github.com/rs/zerolog" batchv1 "k8s.io/api/batch/v1" @@ -50,7 +51,13 @@ func createJobInformer(ctx context.Context, factory informers.SharedInformerFact } } - jobInformer.AddEventHandler(handler) + // Check if cronjob monitoring is enabled + if isTruthy(os.Getenv("SENTRY_K8S_MONITOR_CRONJOBS")) { + logger.Info().Msgf("Add job informer handlers for cronjob monitoring") + jobInformer.AddEventHandler(handler) + } else { + logger.Info().Msgf("Cronjob monitoring is disabled") + } return jobInformer, nil } diff --git a/informer_replicasets.go b/informer_replicasets.go new file mode 100644 index 0000000..da73545 --- /dev/null +++ b/informer_replicasets.go @@ -0,0 +1,21 @@ +package main + +import ( + "context" + + "github.com/rs/zerolog" + + "k8s.io/client-go/informers" + "k8s.io/client-go/tools/cache" +) + +func createReplicasetInformer(ctx context.Context, factory informers.SharedInformerFactory, namespace string) (cache.SharedIndexInformer, error) { + + logger := zerolog.Ctx(ctx) + + logger.Debug().Msgf("starting replicaset informer\n") + + jobInformer := factory.Apps().V1().ReplicaSets().Informer() + + return jobInformer, nil +} diff --git a/informers.go b/informers.go new file mode 100644 index 0000000..d060d9d --- /dev/null +++ b/informers.go @@ -0,0 +1,80 @@ +package main + +import ( + "context" + "errors" + "time" + + "k8s.io/client-go/informers" + "k8s.io/client-go/tools/cache" +) + +var cronjobInformer cache.SharedIndexInformer +var jobInformer cache.SharedIndexInformer +var replicasetInformer cache.SharedIndexInformer +var deploymentInformer cache.SharedIndexInformer + +// Starts all informers (jobs, cronjobs, replicasets, deployments) +// if we opt into cronjob, attach the job/cronjob event handlers +// and add to the crons monitor data struct for Sentry Crons +func startInformers(ctx context.Context, namespace string) error { + + clientset, err := getClientsetFromContext(ctx) + if err != nil { + return errors.New("failed to get clientset") + } + + // Create factory that will produce both the cronjob informer and job informer + factory := informers.NewSharedInformerFactoryWithOptions( + clientset, + 5*time.Second, + informers.WithNamespace(namespace), + ) + + // Create the job informer + jobInformer, err = createJobInformer(ctx, factory, namespace) + if err != nil { + return err + } + // Create the cronjob informer + cronjobInformer, err = createCronjobInformer(ctx, factory, namespace) + if err != nil { + return err + } + // Create the replicaset informer + replicasetInformer, err = createReplicasetInformer(ctx, factory, namespace) + if err != nil { + return err + } + // Create the deployment informer + deploymentInformer, err = createDeploymentInformer(ctx, factory, namespace) + if err != nil { + return err + } + + // Channel to tell the factory to stop the informers + doneChan := make(chan struct{}) + factory.Start(doneChan) + + // Sync the cronjob informer cache + if ok := cache.WaitForCacheSync(doneChan, cronjobInformer.HasSynced); !ok { + return errors.New("cronjob informer failed to sync") + } + // Sync the job informer cache + if ok := cache.WaitForCacheSync(doneChan, jobInformer.HasSynced); !ok { + return errors.New("job informer failed to sync") + } + // Sync the replicaset informer cache + if ok := cache.WaitForCacheSync(doneChan, replicasetInformer.HasSynced); !ok { + return errors.New("replicaset informer failed to sync") + } + // Sync the deployment informer cache + if ok := cache.WaitForCacheSync(doneChan, deploymentInformer.HasSynced); !ok { + return errors.New("deployment informer failed to sync") + } + + // Wait for the channel to be closed + <-doneChan + + return nil +} diff --git a/k8s/errors/delete_test_pods.sh b/k8s/errors/delete_test_pods.sh index 6fac52a..07b6e0d 100755 --- a/k8s/errors/delete_test_pods.sh +++ b/k8s/errors/delete_test_pods.sh @@ -2,6 +2,7 @@ set -euxo pipefail kubectl delete deployment -l type=test-pod -A +kubectl delete replicaset -l type=test-pod -A kubectl delete cronjob -l type=test-pod -A kubectl delete job -l type=test-pod -A kubectl delete pod -l type=test-pod -A diff --git a/k8s/errors/job-basic-error.yaml b/k8s/errors/job-basic-error.yaml new file mode 100644 index 0000000..30b8899 --- /dev/null +++ b/k8s/errors/job-basic-error.yaml @@ -0,0 +1,20 @@ +apiVersion: batch/v1 +kind: Job +metadata: + name: job-basic-error + labels: + type: test-pod + annotations: + k8s.sentry.io/dsn: "https://474d9da00094c5e39d6800c01f3aeff6@o4506191942320128.ingest.sentry.io/4506363396816896" +spec: + template: + spec: + containers: + - name: hello + image: busybox:1.28 + imagePullPolicy: IfNotPresent + command: + - /bin/sh + - -c + - sleep 1; invalid_command + restartPolicy: Never \ No newline at end of file diff --git a/k8s/errors/replicaset-basic-error.yaml b/k8s/errors/replicaset-basic-error.yaml new file mode 100644 index 0000000..8105083 --- /dev/null +++ b/k8s/errors/replicaset-basic-error.yaml @@ -0,0 +1,27 @@ +apiVersion: apps/v1 +kind: ReplicaSet +metadata: + name: replicaset-basic-error + labels: + run: replicaset-basic-error + type: test-pod + annotations: + k8s.sentry.io/dsn: "https://474d9da00094c5e39d6800c01f3aeff6@o4506191942320128.ingest.sentry.io/4506363396816896" +spec: + replicas: 3 + selector: + matchLabels: + tier: replicaset-basic-error + template: + metadata: + labels: + tier: replicaset-basic-error + spec: + containers: + - name: hello + image: busybox:1.28 + imagePullPolicy: IfNotPresent + command: + - /bin/sh + - -c + - sleep 1; invalid_command diff --git a/sentry_dsn_data.go b/sentry_dsn_data.go index f944fdb..d8c4502 100644 --- a/sentry_dsn_data.go +++ b/sentry_dsn_data.go @@ -7,11 +7,12 @@ import ( "sync" "github.com/getsentry/sentry-go" + v1 "k8s.io/api/apps/v1" + batchv1 "k8s.io/api/batch/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) var DSNAnnotation = "k8s.sentry.io/dsn" - var dsnClientMapping = NewDsnClientMapping() // Map from Sentry DSN to Client @@ -62,7 +63,7 @@ func (d *DsnClientMapping) GetClientFromMap(dsn string) (*sentry.Client, bool) { return existingClient, ok } -func (d *DsnClientMapping) GetClientFromObject(ctx context.Context, objectMeta *metav1.ObjectMeta, clientOptions sentry.ClientOptions) (*sentry.Client, bool) { +func (d *DsnClientMapping) GetClientFromObject(ctx context.Context, object metav1.Object, clientOptions sentry.ClientOptions) (*sentry.Client, bool) { // If the custom DSN flag is set to false // then avoid searching for the custom DSN @@ -73,7 +74,7 @@ func (d *DsnClientMapping) GetClientFromObject(ctx context.Context, objectMeta * } // Find DSN annotation from the object - altDsn, err := searchDsn(ctx, objectMeta) + altDsn, err := searchDsn(ctx, object) if err != nil { return nil, false } @@ -97,28 +98,28 @@ func (d *DsnClientMapping) GetClientFromObject(ctx context.Context, objectMeta * } // Recursive function to find if there is a DSN annotation -func searchDsn(ctx context.Context, object *metav1.ObjectMeta) (string, error) { +func searchDsn(ctx context.Context, obj metav1.Object) (string, error) { - dsn, ok := object.Annotations[DSNAnnotation] + dsn, ok := obj.GetAnnotations()[DSNAnnotation] if ok { return dsn, nil } - if len(object.OwnerReferences) == 0 { + if len(obj.GetOwnerReferences()) == 0 { return "", nil } - owningRef := object.OwnerReferences[0] - owningObjectMeta, ok := findObjectMeta(ctx, owningRef.Kind, object.Namespace, owningRef.Name) + owningRef := obj.GetOwnerReferences()[0] + owningObject, ok := findObject(ctx, owningRef.Kind, obj.GetNamespace(), owningRef.Name) if !ok { return "", errors.New("the DSN cannot be found") } - return searchDsn(ctx, owningObjectMeta) + return searchDsn(ctx, owningObject) } -func findObjectMeta(ctx context.Context, kind string, namespace string, name string) (*metav1.ObjectMeta, bool) { +func findObject(ctx context.Context, kind string, namespace string, name string) (metav1.Object, bool) { clientset, err := getClientsetFromContext(ctx) if err != nil { @@ -126,36 +127,81 @@ func findObjectMeta(ctx context.Context, kind string, namespace string, name str } switch kind { - case "Pod": + case KindPod: pod, err := clientset.CoreV1().Pods(namespace).Get(context.Background(), name, metav1.GetOptions{}) if err != nil { return nil, false } - return &pod.ObjectMeta, true - case "ReplicaSet": - replicaSet, err := clientset.AppsV1().ReplicaSets(namespace).Get(context.Background(), name, metav1.GetOptions{}) - if err != nil { - return nil, false + return pod, true + case KindReplicaset: + var replicaSet *v1.ReplicaSet + // Check if the replicaset is available in the indexer first + if replicasetInformer != nil { + obj, ok, err := replicasetInformer.GetIndexer().GetByKey(namespace + "/" + name) + if ok && err == nil { + replicaSet = obj.(*v1.ReplicaSet) + } } - return &replicaSet.ObjectMeta, true - case "Deployment": - deployment, err := clientset.AppsV1().Deployments(namespace).Get(context.Background(), name, metav1.GetOptions{}) - if err != nil { - return nil, false + if replicaSet == nil { + // Query replicaset with kubernetes API + replicaSet, err = clientset.AppsV1().ReplicaSets(namespace).Get(context.Background(), name, metav1.GetOptions{}) + if err != nil { + return nil, false + } } - return &deployment.ObjectMeta, true - case "Job": - job, err := clientset.BatchV1().Jobs(namespace).Get(context.Background(), name, metav1.GetOptions{}) - if err != nil { - return nil, false + return replicaSet, true + case KindDeployment: + var deployment *v1.Deployment + // Check if the deployment is available in the indexer first + if deploymentInformer != nil { + obj, ok, err := deploymentInformer.GetIndexer().GetByKey(namespace + "/" + name) + if ok && err == nil { + deployment = obj.(*v1.Deployment) + + } } - return &job.ObjectMeta, true - case "CronJob": - cronjob, err := clientset.BatchV1().CronJobs(namespace).Get(context.Background(), name, metav1.GetOptions{}) - if err != nil { - return nil, false + if deployment == nil { + // Query deployment with kubernetes API + deployment, err = clientset.AppsV1().Deployments(namespace).Get(context.Background(), name, metav1.GetOptions{}) + if err != nil { + return nil, false + } + } + return deployment, true + case KindJob: + var job *batchv1.Job + // Check if the job is available in the indexer first + if jobInformer != nil { + obj, ok, err := jobInformer.GetIndexer().GetByKey(namespace + "/" + name) + if ok && err == nil { + job = obj.(*batchv1.Job) + } + } + if job == nil { + // Query job with kubernetes API + job, err = clientset.BatchV1().Jobs(namespace).Get(context.Background(), name, metav1.GetOptions{}) + if err != nil { + return nil, false + } + } + return job, true + case KindCronjob: + var cronjob *batchv1.CronJob + // Check if the cronjob is available in the indexer first + if cronjobInformer != nil { + obj, ok, err := cronjobInformer.GetIndexer().GetByKey(namespace + "/" + name) + if ok && err == nil { + cronjob = obj.(*batchv1.CronJob) + } + } + if cronjob == nil { + // Query cronjob with kubernetes API + cronjob, err = clientset.BatchV1().CronJobs(namespace).Get(context.Background(), name, metav1.GetOptions{}) + if err != nil { + return nil, false + } } - return &cronjob.ObjectMeta, true + return cronjob, true default: return nil, false } diff --git a/watcher_events.go b/watcher_events.go index 8294265..bce21ae 100644 --- a/watcher_events.go +++ b/watcher_events.go @@ -75,12 +75,16 @@ func handleGeneralEvent(ctx context.Context, eventObject *v1.Event, scope *sentr func buildSentryEventFromGeneralEvent(ctx context.Context, event *v1.Event, scope *sentry.Scope) *sentry.Event { sentryEvent := &sentry.Event{Message: event.Message, Level: sentry.LevelError} - objectRef := &v1.ObjectReference{ - Kind: event.InvolvedObject.Kind, - Name: event.InvolvedObject.Name, - Namespace: event.InvolvedObject.Namespace, + + involvedObj, ok := findObject(ctx, event.InvolvedObject.Kind, event.InvolvedObject.Namespace, event.InvolvedObject.Name) + + // cannot find event + if !ok { + return sentryEvent } - runEnhancers(ctx, objectRef, nil, scope, sentryEvent) + + // run enhancers with the involved object + runEnhancers(ctx, event, event.InvolvedObject.Kind, involvedObj, scope, sentryEvent) return sentryEvent } @@ -145,10 +149,10 @@ func handleWatchEvent(ctx context.Context, event *watch.Event, cutoffTime metav1 hub.WithScope(func(scope *sentry.Scope) { // Find the object meta that the event is about - objectMeta, ok := findObjectMeta(ctx, eventObject.InvolvedObject.Kind, eventObject.InvolvedObject.Namespace, eventObject.InvolvedObject.Name) + object, ok := findObject(ctx, eventObject.InvolvedObject.Kind, eventObject.InvolvedObject.Namespace, eventObject.InvolvedObject.Name) if ok { // if DSN annotation provided, we bind a new client with that DSN - client, ok := dsnClientMapping.GetClientFromObject(ctx, objectMeta, hub.Client().Options()) + client, ok := dsnClientMapping.GetClientFromObject(ctx, object, hub.Client().Options()) if ok { hub.BindClient(client) } diff --git a/watcher_pods.go b/watcher_pods.go index 98c69f2..f750b66 100644 --- a/watcher_pods.go +++ b/watcher_pods.go @@ -3,7 +3,6 @@ package main import ( "context" "fmt" - "os" "time" "github.com/getsentry/sentry-go" @@ -33,7 +32,7 @@ func handlePodTerminationEvent(ctx context.Context, containerStatus *v1.Containe } setTagIfNotEmpty(scope, "reason", state.Reason) - setTagIfNotEmpty(scope, "kind", pod.Kind) + setTagIfNotEmpty(scope, "kind", KindPod) setTagIfNotEmpty(scope, "object_uid", string(pod.UID)) setTagIfNotEmpty(scope, "namespace", pod.Namespace) setTagIfNotEmpty(scope, "pod_name", pod.Name) @@ -63,12 +62,7 @@ func handlePodTerminationEvent(ctx context.Context, containerStatus *v1.Containe func buildSentryEventFromPodTerminationEvent(ctx context.Context, pod *v1.Pod, message string, scope *sentry.Scope) *sentry.Event { sentryEvent := &sentry.Event{Message: message, Level: sentry.LevelError} - objectRef := &v1.ObjectReference{ - Kind: "Pod", - Name: pod.Name, - Namespace: pod.Namespace, - } - runEnhancers(ctx, objectRef, pod, scope, sentryEvent) + runEnhancers(ctx, nil, KindPod, pod, scope, sentryEvent) return sentryEvent } @@ -193,13 +187,10 @@ func watchPodsInNamespaceForever(ctx context.Context, config *rest.Config, names ctx = setClientsetOnContext(ctx, clientset) - // Create the informers to integrate with sentry crons - if isTruthy(os.Getenv("SENTRY_K8S_MONITOR_CRONJOBS")) { - logger.Info().Msgf("Enabling CronJob monitoring") - go startCronsInformers(ctx, namespace) - } else { - logger.Info().Msgf("CronJob monitoring is disabled") - } + // Start the informers for Sentry event capturing + // and caching with the indexers + go startInformers(ctx, namespace) + for { if err := watchPodsInNamespace(ctx, namespace); err != nil { logger.Error().Msgf("Error while watching pods %s: %s", where, err) diff --git a/watcher_pods_test.go b/watcher_pods_test.go index 5671d17..98cfebc 100644 --- a/watcher_pods_test.go +++ b/watcher_pods_test.go @@ -79,7 +79,7 @@ func TestHandlePodWatchEvent(t *testing.T) { } // the Sentry event message should match that of the container status - expectedMsg := "Fake Message: TestHandlePodWatchEvent" + expectedMsg := "TestHandlePodWatchEventPod: Fake Message: TestHandlePodWatchEvent" if events[0].Message != expectedMsg { t.Errorf("received %s, wanted %s", events[0].Message, expectedMsg) }