diff --git a/.env.example b/.env.example index 9b81918..271e532 100644 --- a/.env.example +++ b/.env.example @@ -7,3 +7,4 @@ SENTRY_K8S_CLUSTER_CONFIG_TYPE="" SENTRY_K8S_KUBECONFIG_PATH="" SENTRY_K8S_LOG_LEVEL="" SENTRY_K8S_MONITOR_CRONJOBS="" +SENTRY_K8S_CUSTOM_DSNS="" diff --git a/crons.go b/crons.go index 7bb8eed..06a2986 100644 --- a/crons.go +++ b/crons.go @@ -78,6 +78,14 @@ func startCronsInformers(ctx context.Context, namespace string) error { // or if the job exited func runSentryCronsCheckin(ctx context.Context, job *batchv1.Job, eventHandlerType EventHandlerType) error { + hub := sentry.GetHubFromContext(ctx) + if hub == nil { + return errors.New("cannot get hub from context") + } + + // To avoid concurrency issue + hub = hub.Clone() + // Try to find the cronJob name that owns the job // in order to get the crons monitor data if len(job.OwnerReferences) == 0 { @@ -92,16 +100,28 @@ func runSentryCronsCheckin(ctx context.Context, job *batchv1.Job, eventHandlerTy return errors.New("cannot find cronJob data") } - // The job just begun so check in to start - if job.Status.Active == 0 && job.Status.Succeeded == 0 && job.Status.Failed == 0 { - // Add the job to the cronJob informer data - checkinJobStarting(ctx, job, cronsMonitorData) - } else if job.Status.Active > 0 { - return nil - } else if job.Status.Failed > 0 || job.Status.Succeeded > 0 { - checkinJobEnding(ctx, job, cronsMonitorData) - return nil // finished - } + hub.WithScope(func(scope *sentry.Scope) { + + // If DSN annotation provided, we bind a new client with that DSN + client, ok := dsnClientMapping.GetClientFromObject(ctx, &job.ObjectMeta, hub.Client().Options()) + if ok { + hub.BindClient(client) + } + + // Pass clone hub down with context + ctx = sentry.SetHubOnContext(ctx, hub) + // The job just begun so check in to start + if job.Status.Active == 0 && job.Status.Succeeded == 0 && job.Status.Failed == 0 { + // Add the job to the cronJob informer data + checkinJobStarting(ctx, job, cronsMonitorData) + } else if job.Status.Active > 0 { + return + } else if job.Status.Failed > 0 || job.Status.Succeeded > 0 { + checkinJobEnding(ctx, job, cronsMonitorData) + return // Finished + } + }) + return nil } @@ -110,6 +130,11 @@ func checkinJobStarting(ctx context.Context, job *batchv1.Job, cronsMonitorData logger := zerolog.Ctx(ctx) + hub := sentry.GetHubFromContext(ctx) + if hub == nil { + return errors.New("cannot get hub from context") + } + // Check if job already added to jobData slice _, ok := cronsMonitorData.JobDatas[job.Name] if ok { @@ -118,7 +143,7 @@ func checkinJobStarting(ctx context.Context, job *batchv1.Job, cronsMonitorData logger.Debug().Msgf("Checking in at start of job: %s\n", job.Name) // All containers running in the pod - checkinId := sentry.CaptureCheckIn( + checkinId := hub.CaptureCheckIn( &sentry.CheckIn{ MonitorSlug: cronsMonitorData.MonitorSlug, Status: sentry.CheckInStatusInProgress, @@ -133,6 +158,11 @@ func checkinJobStarting(ctx context.Context, job *batchv1.Job, cronsMonitorData // Sends the checkin event to sentry crons for when a job ends func checkinJobEnding(ctx context.Context, job *batchv1.Job, cronsMonitorData *CronsMonitorData) error { + hub := sentry.GetHubFromContext(ctx) + if hub == nil { + return errors.New("cannot get hub from context") + } + logger := zerolog.Ctx(ctx) // Check desired number of pods have succeeded @@ -157,7 +187,7 @@ func checkinJobEnding(ctx context.Context, job *batchv1.Job, cronsMonitorData *C } logger.Trace().Msgf("checking in at end of job: %s\n", job.Name) - sentry.CaptureCheckIn( + hub.CaptureCheckIn( &sentry.CheckIn{ ID: jobData.getCheckinId(), MonitorSlug: cronsMonitorData.MonitorSlug, diff --git a/k8s/errors/cronjob-basic-error.yaml b/k8s/errors/cronjob-basic-error.yaml index 22cc423..eb30b25 100644 --- a/k8s/errors/cronjob-basic-error.yaml +++ b/k8s/errors/cronjob-basic-error.yaml @@ -4,6 +4,8 @@ metadata: name: cronjob-basic-error labels: type: test-pod + annotations: + k8s.sentry.io/dsn: "https://474d9da00094c5e39d6800c01f3aeff6@o4506191942320128.ingest.sentry.io/4506363396816896" spec: schedule: "* * * * *" jobTemplate: diff --git a/k8s/errors/cronjob-basic-success.yaml b/k8s/errors/cronjob-basic-success.yaml index 2292483..ace025e 100644 --- a/k8s/errors/cronjob-basic-success.yaml +++ b/k8s/errors/cronjob-basic-success.yaml @@ -4,6 +4,8 @@ metadata: name: cronjob-basic-success labels: type: test-pod + annotations: + k8s.sentry.io/dsn: "https://474d9da00094c5e39d6800c01f3aeff6@o4506191942320128.ingest.sentry.io/4506363396816896" spec: schedule: "* * * * *" jobTemplate: diff --git a/k8s/errors/cronjob-late-maybe-error.yaml b/k8s/errors/cronjob-late-maybe-error.yaml index ccb9a22..f944921 100644 --- a/k8s/errors/cronjob-late-maybe-error.yaml +++ b/k8s/errors/cronjob-late-maybe-error.yaml @@ -4,6 +4,8 @@ metadata: name: cronjob-late-maybe-error labels: type: test-pod + annotations: + k8s.sentry.io/dsn: "https://c6a5dd95a40ab7e4e34a3af43c14f848@o4506191942320128.ingest.sentry.io/4506363401601024" spec: schedule: "* * * * *" jobTemplate: diff --git a/k8s/errors/cronjob-late-success.yaml b/k8s/errors/cronjob-late-success.yaml index 19e07b9..e1b9144 100644 --- a/k8s/errors/cronjob-late-success.yaml +++ b/k8s/errors/cronjob-late-success.yaml @@ -4,6 +4,8 @@ metadata: name: cronjob-late-success labels: type: test-pod + annotations: + k8s.sentry.io/dsn: "https://c6a5dd95a40ab7e4e34a3af43c14f848@o4506191942320128.ingest.sentry.io/4506363401601024" spec: schedule: "* * * * *" jobTemplate: diff --git a/k8s/errors/deployment-create-container-error.yaml b/k8s/errors/deployment-create-container-error.yaml index 003fbb1..0e88576 100644 --- a/k8s/errors/deployment-create-container-error.yaml +++ b/k8s/errors/deployment-create-container-error.yaml @@ -5,6 +5,8 @@ metadata: labels: run: deployment-create-container-error type: test-pod + annotations: + k8s.sentry.io/dsn: "https://474d9da00094c5e39d6800c01f3aeff6@o4506191942320128.ingest.sentry.io/4506363396816896" spec: replicas: 2 selector: diff --git a/k8s/errors/pod-outofmemory.yaml b/k8s/errors/pod-outofmemory.yaml index 3f9a991..997aab8 100644 --- a/k8s/errors/pod-outofmemory.yaml +++ b/k8s/errors/pod-outofmemory.yaml @@ -6,6 +6,8 @@ metadata: run: pod-outofmemory type: test-pod name: pod-outofmemory + annotations: + k8s.sentry.io/dsn: "https://474d9da00094c5e39d6800c01f3aeff6@o4506191942320128.ingest.sentry.io/4506363396816896" spec: containers: - image: python:3.10-alpine diff --git a/mocks_test.go b/mocks_test.go index 655d05e..6177dac 100644 --- a/mocks_test.go +++ b/mocks_test.go @@ -8,9 +8,8 @@ import ( ) type TransportMock struct { - mu sync.Mutex - events []*sentry.Event - lastEvent *sentry.Event + mu sync.Mutex + events []*sentry.Event } func (t *TransportMock) Configure(options sentry.ClientOptions) {} @@ -18,7 +17,6 @@ func (t *TransportMock) SendEvent(event *sentry.Event) { t.mu.Lock() defer t.mu.Unlock() t.events = append(t.events, event) - t.lastEvent = event } func (t *TransportMock) Flush(timeout time.Duration) bool { return true diff --git a/sentry_dsn_data.go b/sentry_dsn_data.go new file mode 100644 index 0000000..f944fdb --- /dev/null +++ b/sentry_dsn_data.go @@ -0,0 +1,162 @@ +package main + +import ( + "context" + "errors" + "os" + "sync" + + "github.com/getsentry/sentry-go" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +var DSNAnnotation = "k8s.sentry.io/dsn" + +var dsnClientMapping = NewDsnClientMapping() + +// Map from Sentry DSN to Client +type DsnClientMapping struct { + mutex sync.RWMutex + clientMap map[string]*sentry.Client + customDsnFlag bool +} + +func NewDsnClientMapping() *DsnClientMapping { + return &DsnClientMapping{ + mutex: sync.RWMutex{}, + clientMap: make(map[string]*sentry.Client), + customDsnFlag: isTruthy(os.Getenv("SENTRY_K8S_CUSTOM_DSNS")), + } +} + +// Return client if added successfully +// (also returns client if already exists) +func (d *DsnClientMapping) AddClientToMap(options sentry.ClientOptions) (*sentry.Client, error) { + d.mutex.Lock() + defer d.mutex.Unlock() + + // Create a new client for the dsn + // even if client already exists, it + // will be re-initialized with a new client + newClient, err := sentry.NewClient( + sentry.ClientOptions{ + Dsn: options.Dsn, + Debug: true, + AttachStacktrace: true, + }, + ) + if err != nil { + return nil, err + } + d.clientMap[options.Dsn] = newClient + return newClient, nil +} + +// Retrieve a client with given dsn +func (d *DsnClientMapping) GetClientFromMap(dsn string) (*sentry.Client, bool) { + d.mutex.RLock() + defer d.mutex.RUnlock() + + // Check if we have this dsn + existingClient, ok := d.clientMap[dsn] + return existingClient, ok +} + +func (d *DsnClientMapping) GetClientFromObject(ctx context.Context, objectMeta *metav1.ObjectMeta, clientOptions sentry.ClientOptions) (*sentry.Client, bool) { + + // If the custom DSN flag is set to false + // then avoid searching for the custom DSN + // or adding an alternative client and instead + // just return nil as the client + if !d.customDsnFlag { + return nil, false + } + + // Find DSN annotation from the object + altDsn, err := searchDsn(ctx, objectMeta) + if err != nil { + return nil, false + } + + // If we did find an alternative DSN + if altDsn != "" { + // Attempt to retrieve the corresponding client + client, _ := dsnClientMapping.GetClientFromMap(altDsn) + if client == nil { + // create new client + clientOptions.Dsn = altDsn + client, err = dsnClientMapping.AddClientToMap(clientOptions) + if err != nil { + return nil, false + } + } + return client, true + } else { + return nil, false + } +} + +// Recursive function to find if there is a DSN annotation +func searchDsn(ctx context.Context, object *metav1.ObjectMeta) (string, error) { + + dsn, ok := object.Annotations[DSNAnnotation] + if ok { + return dsn, nil + } + + if len(object.OwnerReferences) == 0 { + return "", nil + } + + owningRef := object.OwnerReferences[0] + owningObjectMeta, ok := findObjectMeta(ctx, owningRef.Kind, object.Namespace, owningRef.Name) + + if !ok { + return "", errors.New("the DSN cannot be found") + } + + return searchDsn(ctx, owningObjectMeta) +} + +func findObjectMeta(ctx context.Context, kind string, namespace string, name string) (*metav1.ObjectMeta, bool) { + + clientset, err := getClientsetFromContext(ctx) + if err != nil { + return nil, false + } + + switch kind { + case "Pod": + pod, err := clientset.CoreV1().Pods(namespace).Get(context.Background(), name, metav1.GetOptions{}) + if err != nil { + return nil, false + } + return &pod.ObjectMeta, true + case "ReplicaSet": + replicaSet, err := clientset.AppsV1().ReplicaSets(namespace).Get(context.Background(), name, metav1.GetOptions{}) + if err != nil { + return nil, false + } + return &replicaSet.ObjectMeta, true + case "Deployment": + deployment, err := clientset.AppsV1().Deployments(namespace).Get(context.Background(), name, metav1.GetOptions{}) + if err != nil { + return nil, false + } + return &deployment.ObjectMeta, true + case "Job": + job, err := clientset.BatchV1().Jobs(namespace).Get(context.Background(), name, metav1.GetOptions{}) + if err != nil { + return nil, false + } + return &job.ObjectMeta, true + case "CronJob": + cronjob, err := clientset.BatchV1().CronJobs(namespace).Get(context.Background(), name, metav1.GetOptions{}) + if err != nil { + return nil, false + } + return &cronjob.ObjectMeta, true + default: + return nil, false + } +} diff --git a/watcher_events.go b/watcher_events.go index 1e53dae..8294265 100644 --- a/watcher_events.go +++ b/watcher_events.go @@ -139,7 +139,23 @@ func handleWatchEvent(ctx context.Context, event *watch.Event, cutoffTime metav1 logger.Error().Msgf("Cannot get Sentry hub from context") return } + + // To avoid concurrency issue + hub = hub.Clone() hub.WithScope(func(scope *sentry.Scope) { + + // Find the object meta that the event is about + objectMeta, ok := findObjectMeta(ctx, eventObject.InvolvedObject.Kind, eventObject.InvolvedObject.Namespace, eventObject.InvolvedObject.Name) + if ok { + // if DSN annotation provided, we bind a new client with that DSN + client, ok := dsnClientMapping.GetClientFromObject(ctx, objectMeta, hub.Client().Options()) + if ok { + hub.BindClient(client) + } + } + + // Pass down clone context + ctx = sentry.SetHubOnContext(ctx, hub) setWatcherTag(scope, eventsWatcherName) sentryEvent := handleGeneralEvent(ctx, eventObject, scope) if sentryEvent != nil { diff --git a/watcher_events_test.go b/watcher_events_test.go index 4448bea..32d1a8b 100644 --- a/watcher_events_test.go +++ b/watcher_events_test.go @@ -101,6 +101,7 @@ func TestHandleWatchEvent(t *testing.T) { // The Sentry event message should match that of the event message expectedMsg := "Fake Message: TestHandleWatchEvent" + if events[0].Message != expectedMsg { t.Errorf("received %s, wanted %s", events[0].Message, expectedMsg) } diff --git a/watcher_pods.go b/watcher_pods.go index b64dad4..98c69f2 100644 --- a/watcher_pods.go +++ b/watcher_pods.go @@ -103,6 +103,8 @@ func handlePodWatchEvent(ctx context.Context, event *watch.Event) { logger.Error().Msgf("Cannot get Sentry hub from context") return } + // To avoid concurrency issue + hub = hub.Clone() containerStatuses := podObject.Status.ContainerStatuses logger.Trace().Msgf("Container statuses: %#v\n", containerStatuses) @@ -112,8 +114,16 @@ func handlePodWatchEvent(ctx context.Context, event *watch.Event) { // Ignore non-Terminated statuses continue } - hub.WithScope(func(scope *sentry.Scope) { + + // If DSN annotation provided, we bind a new client with that DSN + client, ok := dsnClientMapping.GetClientFromObject(ctx, &podObject.ObjectMeta, hub.Client().Options()) + if ok { + hub.BindClient(client) + } + + // Pass down clone context + ctx = sentry.SetHubOnContext(ctx, hub) setWatcherTag(scope, podsWatcherName) sentryEvent := handlePodTerminationEvent(ctx, &status, podObject, scope) if sentryEvent != nil {