Skip to content

Commit

Permalink
feature: allow object manifest to define custom DSN (#72)
Browse files Browse the repository at this point in the history
* allow custom dsn in manifest

* fix errors of sending to wrong projects

* clients inherit old options with only dsn changed

* change transport mock to remove last event

* refactor dsn code

* fix event watcher to pass unit test

* perform minor code cleanup

* add flag for custom dsn

* add minor changes to sentry dsn data
  • Loading branch information
Jiahui-Zhang-20 authored Dec 13, 2023
1 parent 00ba82f commit da92dca
Show file tree
Hide file tree
Showing 13 changed files with 247 additions and 17 deletions.
1 change: 1 addition & 0 deletions .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,4 @@ SENTRY_K8S_CLUSTER_CONFIG_TYPE=""
SENTRY_K8S_KUBECONFIG_PATH=""
SENTRY_K8S_LOG_LEVEL=""
SENTRY_K8S_MONITOR_CRONJOBS=""
SENTRY_K8S_CUSTOM_DSNS=""
54 changes: 42 additions & 12 deletions crons.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,14 @@ func startCronsInformers(ctx context.Context, namespace string) error {
// or if the job exited
func runSentryCronsCheckin(ctx context.Context, job *batchv1.Job, eventHandlerType EventHandlerType) error {

hub := sentry.GetHubFromContext(ctx)
if hub == nil {
return errors.New("cannot get hub from context")
}

// To avoid concurrency issue
hub = hub.Clone()

// Try to find the cronJob name that owns the job
// in order to get the crons monitor data
if len(job.OwnerReferences) == 0 {
Expand All @@ -92,16 +100,28 @@ func runSentryCronsCheckin(ctx context.Context, job *batchv1.Job, eventHandlerTy
return errors.New("cannot find cronJob data")
}

// The job just begun so check in to start
if job.Status.Active == 0 && job.Status.Succeeded == 0 && job.Status.Failed == 0 {
// Add the job to the cronJob informer data
checkinJobStarting(ctx, job, cronsMonitorData)
} else if job.Status.Active > 0 {
return nil
} else if job.Status.Failed > 0 || job.Status.Succeeded > 0 {
checkinJobEnding(ctx, job, cronsMonitorData)
return nil // finished
}
hub.WithScope(func(scope *sentry.Scope) {

// If DSN annotation provided, we bind a new client with that DSN
client, ok := dsnClientMapping.GetClientFromObject(ctx, &job.ObjectMeta, hub.Client().Options())
if ok {
hub.BindClient(client)
}

// Pass clone hub down with context
ctx = sentry.SetHubOnContext(ctx, hub)
// The job just begun so check in to start
if job.Status.Active == 0 && job.Status.Succeeded == 0 && job.Status.Failed == 0 {
// Add the job to the cronJob informer data
checkinJobStarting(ctx, job, cronsMonitorData)
} else if job.Status.Active > 0 {
return
} else if job.Status.Failed > 0 || job.Status.Succeeded > 0 {
checkinJobEnding(ctx, job, cronsMonitorData)
return // Finished
}
})

return nil
}

Expand All @@ -110,6 +130,11 @@ func checkinJobStarting(ctx context.Context, job *batchv1.Job, cronsMonitorData

logger := zerolog.Ctx(ctx)

hub := sentry.GetHubFromContext(ctx)
if hub == nil {
return errors.New("cannot get hub from context")
}

// Check if job already added to jobData slice
_, ok := cronsMonitorData.JobDatas[job.Name]
if ok {
Expand All @@ -118,7 +143,7 @@ func checkinJobStarting(ctx context.Context, job *batchv1.Job, cronsMonitorData
logger.Debug().Msgf("Checking in at start of job: %s\n", job.Name)

// All containers running in the pod
checkinId := sentry.CaptureCheckIn(
checkinId := hub.CaptureCheckIn(
&sentry.CheckIn{
MonitorSlug: cronsMonitorData.MonitorSlug,
Status: sentry.CheckInStatusInProgress,
Expand All @@ -133,6 +158,11 @@ func checkinJobStarting(ctx context.Context, job *batchv1.Job, cronsMonitorData
// Sends the checkin event to sentry crons for when a job ends
func checkinJobEnding(ctx context.Context, job *batchv1.Job, cronsMonitorData *CronsMonitorData) error {

hub := sentry.GetHubFromContext(ctx)
if hub == nil {
return errors.New("cannot get hub from context")
}

logger := zerolog.Ctx(ctx)

// Check desired number of pods have succeeded
Expand All @@ -157,7 +187,7 @@ func checkinJobEnding(ctx context.Context, job *batchv1.Job, cronsMonitorData *C
}

logger.Trace().Msgf("checking in at end of job: %s\n", job.Name)
sentry.CaptureCheckIn(
hub.CaptureCheckIn(
&sentry.CheckIn{
ID: jobData.getCheckinId(),
MonitorSlug: cronsMonitorData.MonitorSlug,
Expand Down
2 changes: 2 additions & 0 deletions k8s/errors/cronjob-basic-error.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ metadata:
name: cronjob-basic-error
labels:
type: test-pod
annotations:
k8s.sentry.io/dsn: "https://474d9da00094c5e39d6800c01f3aeff6@o4506191942320128.ingest.sentry.io/4506363396816896"
spec:
schedule: "* * * * *"
jobTemplate:
Expand Down
2 changes: 2 additions & 0 deletions k8s/errors/cronjob-basic-success.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ metadata:
name: cronjob-basic-success
labels:
type: test-pod
annotations:
k8s.sentry.io/dsn: "https://474d9da00094c5e39d6800c01f3aeff6@o4506191942320128.ingest.sentry.io/4506363396816896"
spec:
schedule: "* * * * *"
jobTemplate:
Expand Down
2 changes: 2 additions & 0 deletions k8s/errors/cronjob-late-maybe-error.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ metadata:
name: cronjob-late-maybe-error
labels:
type: test-pod
annotations:
k8s.sentry.io/dsn: "https://c6a5dd95a40ab7e4e34a3af43c14f848@o4506191942320128.ingest.sentry.io/4506363401601024"
spec:
schedule: "* * * * *"
jobTemplate:
Expand Down
2 changes: 2 additions & 0 deletions k8s/errors/cronjob-late-success.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ metadata:
name: cronjob-late-success
labels:
type: test-pod
annotations:
k8s.sentry.io/dsn: "https://c6a5dd95a40ab7e4e34a3af43c14f848@o4506191942320128.ingest.sentry.io/4506363401601024"
spec:
schedule: "* * * * *"
jobTemplate:
Expand Down
2 changes: 2 additions & 0 deletions k8s/errors/deployment-create-container-error.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ metadata:
labels:
run: deployment-create-container-error
type: test-pod
annotations:
k8s.sentry.io/dsn: "https://474d9da00094c5e39d6800c01f3aeff6@o4506191942320128.ingest.sentry.io/4506363396816896"
spec:
replicas: 2
selector:
Expand Down
2 changes: 2 additions & 0 deletions k8s/errors/pod-outofmemory.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ metadata:
run: pod-outofmemory
type: test-pod
name: pod-outofmemory
annotations:
k8s.sentry.io/dsn: "https://474d9da00094c5e39d6800c01f3aeff6@o4506191942320128.ingest.sentry.io/4506363396816896"
spec:
containers:
- image: python:3.10-alpine
Expand Down
6 changes: 2 additions & 4 deletions mocks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,15 @@ import (
)

type TransportMock struct {
mu sync.Mutex
events []*sentry.Event
lastEvent *sentry.Event
mu sync.Mutex
events []*sentry.Event
}

func (t *TransportMock) Configure(options sentry.ClientOptions) {}
func (t *TransportMock) SendEvent(event *sentry.Event) {
t.mu.Lock()
defer t.mu.Unlock()
t.events = append(t.events, event)
t.lastEvent = event
}
func (t *TransportMock) Flush(timeout time.Duration) bool {
return true
Expand Down
162 changes: 162 additions & 0 deletions sentry_dsn_data.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
package main

import (
"context"
"errors"
"os"
"sync"

"github.com/getsentry/sentry-go"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

var DSNAnnotation = "k8s.sentry.io/dsn"

var dsnClientMapping = NewDsnClientMapping()

// Map from Sentry DSN to Client
type DsnClientMapping struct {
mutex sync.RWMutex
clientMap map[string]*sentry.Client
customDsnFlag bool
}

func NewDsnClientMapping() *DsnClientMapping {
return &DsnClientMapping{
mutex: sync.RWMutex{},
clientMap: make(map[string]*sentry.Client),
customDsnFlag: isTruthy(os.Getenv("SENTRY_K8S_CUSTOM_DSNS")),
}
}

// Return client if added successfully
// (also returns client if already exists)
func (d *DsnClientMapping) AddClientToMap(options sentry.ClientOptions) (*sentry.Client, error) {
d.mutex.Lock()
defer d.mutex.Unlock()

// Create a new client for the dsn
// even if client already exists, it
// will be re-initialized with a new client
newClient, err := sentry.NewClient(
sentry.ClientOptions{
Dsn: options.Dsn,
Debug: true,
AttachStacktrace: true,
},
)
if err != nil {
return nil, err
}
d.clientMap[options.Dsn] = newClient
return newClient, nil
}

// Retrieve a client with given dsn
func (d *DsnClientMapping) GetClientFromMap(dsn string) (*sentry.Client, bool) {
d.mutex.RLock()
defer d.mutex.RUnlock()

// Check if we have this dsn
existingClient, ok := d.clientMap[dsn]
return existingClient, ok
}

func (d *DsnClientMapping) GetClientFromObject(ctx context.Context, objectMeta *metav1.ObjectMeta, clientOptions sentry.ClientOptions) (*sentry.Client, bool) {

// If the custom DSN flag is set to false
// then avoid searching for the custom DSN
// or adding an alternative client and instead
// just return nil as the client
if !d.customDsnFlag {
return nil, false
}

// Find DSN annotation from the object
altDsn, err := searchDsn(ctx, objectMeta)
if err != nil {
return nil, false
}

// If we did find an alternative DSN
if altDsn != "" {
// Attempt to retrieve the corresponding client
client, _ := dsnClientMapping.GetClientFromMap(altDsn)
if client == nil {
// create new client
clientOptions.Dsn = altDsn
client, err = dsnClientMapping.AddClientToMap(clientOptions)
if err != nil {
return nil, false
}
}
return client, true
} else {
return nil, false
}
}

// Recursive function to find if there is a DSN annotation
func searchDsn(ctx context.Context, object *metav1.ObjectMeta) (string, error) {

dsn, ok := object.Annotations[DSNAnnotation]
if ok {
return dsn, nil
}

if len(object.OwnerReferences) == 0 {
return "", nil
}

owningRef := object.OwnerReferences[0]
owningObjectMeta, ok := findObjectMeta(ctx, owningRef.Kind, object.Namespace, owningRef.Name)

if !ok {
return "", errors.New("the DSN cannot be found")
}

return searchDsn(ctx, owningObjectMeta)
}

func findObjectMeta(ctx context.Context, kind string, namespace string, name string) (*metav1.ObjectMeta, bool) {

clientset, err := getClientsetFromContext(ctx)
if err != nil {
return nil, false
}

switch kind {
case "Pod":
pod, err := clientset.CoreV1().Pods(namespace).Get(context.Background(), name, metav1.GetOptions{})
if err != nil {
return nil, false
}
return &pod.ObjectMeta, true
case "ReplicaSet":
replicaSet, err := clientset.AppsV1().ReplicaSets(namespace).Get(context.Background(), name, metav1.GetOptions{})
if err != nil {
return nil, false
}
return &replicaSet.ObjectMeta, true
case "Deployment":
deployment, err := clientset.AppsV1().Deployments(namespace).Get(context.Background(), name, metav1.GetOptions{})
if err != nil {
return nil, false
}
return &deployment.ObjectMeta, true
case "Job":
job, err := clientset.BatchV1().Jobs(namespace).Get(context.Background(), name, metav1.GetOptions{})
if err != nil {
return nil, false
}
return &job.ObjectMeta, true
case "CronJob":
cronjob, err := clientset.BatchV1().CronJobs(namespace).Get(context.Background(), name, metav1.GetOptions{})
if err != nil {
return nil, false
}
return &cronjob.ObjectMeta, true
default:
return nil, false
}
}
16 changes: 16 additions & 0 deletions watcher_events.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,23 @@ func handleWatchEvent(ctx context.Context, event *watch.Event, cutoffTime metav1
logger.Error().Msgf("Cannot get Sentry hub from context")
return
}

// To avoid concurrency issue
hub = hub.Clone()
hub.WithScope(func(scope *sentry.Scope) {

// Find the object meta that the event is about
objectMeta, ok := findObjectMeta(ctx, eventObject.InvolvedObject.Kind, eventObject.InvolvedObject.Namespace, eventObject.InvolvedObject.Name)
if ok {
// if DSN annotation provided, we bind a new client with that DSN
client, ok := dsnClientMapping.GetClientFromObject(ctx, objectMeta, hub.Client().Options())
if ok {
hub.BindClient(client)
}
}

// Pass down clone context
ctx = sentry.SetHubOnContext(ctx, hub)
setWatcherTag(scope, eventsWatcherName)
sentryEvent := handleGeneralEvent(ctx, eventObject, scope)
if sentryEvent != nil {
Expand Down
1 change: 1 addition & 0 deletions watcher_events_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ func TestHandleWatchEvent(t *testing.T) {

// The Sentry event message should match that of the event message
expectedMsg := "Fake Message: TestHandleWatchEvent"

if events[0].Message != expectedMsg {
t.Errorf("received %s, wanted %s", events[0].Message, expectedMsg)
}
Expand Down
12 changes: 11 additions & 1 deletion watcher_pods.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,8 @@ func handlePodWatchEvent(ctx context.Context, event *watch.Event) {
logger.Error().Msgf("Cannot get Sentry hub from context")
return
}
// To avoid concurrency issue
hub = hub.Clone()

containerStatuses := podObject.Status.ContainerStatuses
logger.Trace().Msgf("Container statuses: %#v\n", containerStatuses)
Expand All @@ -112,8 +114,16 @@ func handlePodWatchEvent(ctx context.Context, event *watch.Event) {
// Ignore non-Terminated statuses
continue
}

hub.WithScope(func(scope *sentry.Scope) {

// If DSN annotation provided, we bind a new client with that DSN
client, ok := dsnClientMapping.GetClientFromObject(ctx, &podObject.ObjectMeta, hub.Client().Options())
if ok {
hub.BindClient(client)
}

// Pass down clone context
ctx = sentry.SetHubOnContext(ctx, hub)
setWatcherTag(scope, podsWatcherName)
sentryEvent := handlePodTerminationEvent(ctx, &status, podObject, scope)
if sentryEvent != nil {
Expand Down

0 comments on commit da92dca

Please sign in to comment.