Skip to content

Commit

Permalink
refactor dsn code
Browse files Browse the repository at this point in the history
  • Loading branch information
Jiahui-Zhang-20 committed Dec 11, 2023
1 parent 062583e commit c17f4ef
Show file tree
Hide file tree
Showing 10 changed files with 72 additions and 135 deletions.
28 changes: 6 additions & 22 deletions crons.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func runSentryCronsCheckin(ctx context.Context, job *batchv1.Job, eventHandlerTy
return errors.New("cannot get hub from context")
}

// to avoid concurrency issue
// To avoid concurrency issue
hub = hub.Clone()

// Try to find the cronJob name that owns the job
Expand All @@ -101,30 +101,14 @@ func runSentryCronsCheckin(ctx context.Context, job *batchv1.Job, eventHandlerTy
}

hub.WithScope(func(scope *sentry.Scope) {
altDsn, err := searchDsn(ctx, job.ObjectMeta)
if err != nil {
return
}

// if we did find an alternative DSN
if altDsn != "" {
// attempt to retrieve the corresponding client
client, _ := dsnData.GetClient(altDsn)

if client == nil {
newOptions := hub.Client().Options()
newOptions.Dsn = altDsn
client, err = dsnData.AddClient(newOptions)
if err != nil {
return
}
}

// bind the alternative client to the top layer
// If DSN annotation provided, we bind a new client with that DSN
client, ok := dsnData.GetClientFromObject(ctx, &job.ObjectMeta, hub.Clone().Client().Options())
if ok {
hub.BindClient(client)
}

// pass clone hub down with context
// Pass clone hub down with context
ctx = sentry.SetHubOnContext(ctx, hub)
// The job just begun so check in to start
if job.Status.Active == 0 && job.Status.Succeeded == 0 && job.Status.Failed == 0 {
Expand All @@ -134,7 +118,7 @@ func runSentryCronsCheckin(ctx context.Context, job *batchv1.Job, eventHandlerTy
return
} else if job.Status.Failed > 0 || job.Status.Succeeded > 0 {
checkinJobEnding(ctx, job, cronsMonitorData)
return // finished
return // Finished
}
})

Expand Down
2 changes: 1 addition & 1 deletion k8s/errors/cronjob-basic-error.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ metadata:
labels:
type: test-pod
"annotations": {
"dsn" : "https://474d9da00094c5e39d6800c01f3aeff6@o4506191942320128.ingest.sentry.io/4506363396816896",
"k8s.sentry.io/dsn" : "https://474d9da00094c5e39d6800c01f3aeff6@o4506191942320128.ingest.sentry.io/4506363396816896",
}
spec:
schedule: "* * * * *"
Expand Down
2 changes: 1 addition & 1 deletion k8s/errors/cronjob-basic-success.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ metadata:
labels:
type: test-pod
"annotations": {
"dsn" : "https://474d9da00094c5e39d6800c01f3aeff6@o4506191942320128.ingest.sentry.io/4506363396816896",
"k8s.sentry.io/dsn" : "https://474d9da00094c5e39d6800c01f3aeff6@o4506191942320128.ingest.sentry.io/4506363396816896",
}
spec:
schedule: "* * * * *"
Expand Down
2 changes: 1 addition & 1 deletion k8s/errors/cronjob-late-maybe-error.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ metadata:
labels:
type: test-pod
"annotations": {
"dsn" : "https://c6a5dd95a40ab7e4e34a3af43c14f848@o4506191942320128.ingest.sentry.io/4506363401601024",
"k8s.sentry.io/dsn" : "https://c6a5dd95a40ab7e4e34a3af43c14f848@o4506191942320128.ingest.sentry.io/4506363401601024",
}
spec:
schedule: "* * * * *"
Expand Down
2 changes: 1 addition & 1 deletion k8s/errors/cronjob-late-success.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ metadata:
labels:
type: test-pod
"annotations": {
"dsn" : "https://c6a5dd95a40ab7e4e34a3af43c14f848@o4506191942320128.ingest.sentry.io/4506363401601024",
"k8s.sentry.io/dsn" : "https://c6a5dd95a40ab7e4e34a3af43c14f848@o4506191942320128.ingest.sentry.io/4506363401601024",
}
spec:
schedule: "* * * * *"
Expand Down
2 changes: 1 addition & 1 deletion k8s/errors/deployment-create-container-error.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ metadata:
run: deployment-create-container-error
type: test-pod
"annotations": {
"dsn" : "https://474d9da00094c5e39d6800c01f3aeff6@o4506191942320128.ingest.sentry.io/4506363396816896",
"k8s.sentry.io/dsn" : "https://474d9da00094c5e39d6800c01f3aeff6@o4506191942320128.ingest.sentry.io/4506363396816896",
}
spec:
replicas: 2
Expand Down
2 changes: 1 addition & 1 deletion k8s/errors/pod-outofmemory.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ metadata:
type: test-pod
name: pod-outofmemory
"annotations": {
"dsn" : "https://474d9da00094c5e39d6800c01f3aeff6@o4506191942320128.ingest.sentry.io/4506363396816896",
"k8s.sentry.io/dsn" : "https://474d9da00094c5e39d6800c01f3aeff6@o4506191942320128.ingest.sentry.io/4506363396816896",
}
spec:
containers:
Expand Down
109 changes: 52 additions & 57 deletions sentry_dsn_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,30 +9,31 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

var DSN = "dsn"
var DSNAnnotation = "k8s.sentry.io/dsn"

// map from Sentry DSN to Client
type DsnData struct {
type DsnClientMapping struct {
mutex sync.RWMutex
clientMap map[string]*sentry.Client
}

func NewDsnData() *DsnData {
return &DsnData{
func NewDsnData() *DsnClientMapping {
return &DsnClientMapping{
mutex: sync.RWMutex{},
clientMap: make(map[string]*sentry.Client),
}
}

// return client if added successfully
func (d *DsnData) AddClient(options sentry.ClientOptions) (*sentry.Client, error) {
// (also returns client if already exists)
func (d *DsnClientMapping) AddClientToMap(options sentry.ClientOptions) (*sentry.Client, error) {
d.mutex.Lock()
defer d.mutex.Unlock()

// check if we already encountered this dsn
existingClient, ok := d.clientMap[options.Dsn]
if ok {
return existingClient, errors.New("a client with the given dsn already exists")
return existingClient, nil
}

// create a new client for the dsn
Expand All @@ -52,28 +53,49 @@ func (d *DsnData) AddClient(options sentry.ClientOptions) (*sentry.Client, error
}

// retrieve a client with given dsn
func (d *DsnData) GetClient(dsn string) (*sentry.Client, error) {
func (d *DsnClientMapping) GetClientFromMap(dsn string) (*sentry.Client, bool) {
d.mutex.RLock()
defer d.mutex.RUnlock()

// check if we have this dsn
existingClient, ok := d.clientMap[dsn]
if ok {
return existingClient, nil
return existingClient, true
} else {
return nil, errors.New("a client with given DSN does not exist")
return nil, false
}
}

// recursive function to find if there is a DSN annotation
func searchDsn(ctx context.Context, object metav1.ObjectMeta) (string, error) {
func (d *DsnClientMapping) GetClientFromObject(ctx context.Context, objectMeta *metav1.ObjectMeta, clientOptions sentry.ClientOptions) (*sentry.Client, bool) {

clientset, err := getClientsetFromContext(ctx)
// find DSN annotation from the object
altDsn, err := searchDsn(ctx, objectMeta)
if err != nil {
return "", err
return nil, false
}

// if we did find an alternative DSN
if altDsn != "" {
// attempt to retrieve the corresponding client
client, _ := dsnData.GetClientFromMap(altDsn)
if client == nil {
// create new client
clientOptions.Dsn = altDsn
client, err = dsnData.AddClientToMap(clientOptions)
if err != nil {
return nil, false
}
}
return client, true
} else {
return nil, false
}
}

// recursive function to find if there is a DSN annotation
func searchDsn(ctx context.Context, object *metav1.ObjectMeta) (string, error) {

dsn, ok := object.Annotations[DSN]
dsn, ok := object.Annotations[DSNAnnotation]
if ok {
return dsn, nil
}
Expand All @@ -83,40 +105,13 @@ func searchDsn(ctx context.Context, object metav1.ObjectMeta) (string, error) {
}

owningRef := object.OwnerReferences[0]
switch kind := owningRef.Kind; kind {
case "Pod":
parentPod, err := clientset.CoreV1().Pods(object.Namespace).Get(context.Background(), owningRef.Name, metav1.GetOptions{})
if err != nil {
return "", err
}
return searchDsn(ctx, parentPod.ObjectMeta)
case "ReplicaSet":
parentReplicaSet, err := clientset.AppsV1().ReplicaSets(object.Namespace).Get(context.Background(), owningRef.Name, metav1.GetOptions{})
if err != nil {
return "", err
}
return searchDsn(ctx, parentReplicaSet.ObjectMeta)
case "Deployment":
parentDeployment, err := clientset.AppsV1().Deployments(object.Namespace).Get(context.Background(), owningRef.Name, metav1.GetOptions{})
if err != nil {
return "", err
}
return searchDsn(ctx, parentDeployment.ObjectMeta)
case "Job":
parentJob, err := clientset.BatchV1().Jobs(object.Namespace).Get(context.Background(), owningRef.Name, metav1.GetOptions{})
if err != nil {
return "", err
}
return searchDsn(ctx, parentJob.ObjectMeta)
case "CronJob":
parentCronjob, err := clientset.BatchV1().CronJobs(object.Namespace).Get(context.Background(), owningRef.Name, metav1.GetOptions{})
if err != nil {
return "", err
}
return searchDsn(ctx, parentCronjob.ObjectMeta)
default:
return "", errors.New("unsupported object kind encountered")
owningObjectMeta, err := findObjectMeta(ctx, owningRef.Kind, object.Namespace, owningRef.Name)

if err != nil {
return "", err
}

return searchDsn(ctx, owningObjectMeta)
}

func findObjectMeta(ctx context.Context, kind string, namespace string, name string) (*metav1.ObjectMeta, error) {
Expand All @@ -128,35 +123,35 @@ func findObjectMeta(ctx context.Context, kind string, namespace string, name str

switch kind {
case "Pod":
parentPod, err := clientset.CoreV1().Pods(namespace).Get(context.Background(), name, metav1.GetOptions{})
pod, err := clientset.CoreV1().Pods(namespace).Get(context.Background(), name, metav1.GetOptions{})
if err != nil {
return nil, err
}
return &parentPod.ObjectMeta, nil
return &pod.ObjectMeta, nil
case "ReplicaSet":
parentReplicaSet, err := clientset.AppsV1().ReplicaSets(namespace).Get(context.Background(), name, metav1.GetOptions{})
replicaSet, err := clientset.AppsV1().ReplicaSets(namespace).Get(context.Background(), name, metav1.GetOptions{})
if err != nil {
return nil, err
}
return &parentReplicaSet.ObjectMeta, nil
return &replicaSet.ObjectMeta, nil
case "Deployment":
parentDeployment, err := clientset.AppsV1().Deployments(namespace).Get(context.Background(), name, metav1.GetOptions{})
deployment, err := clientset.AppsV1().Deployments(namespace).Get(context.Background(), name, metav1.GetOptions{})
if err != nil {
return nil, err
}
return &parentDeployment.ObjectMeta, nil
return &deployment.ObjectMeta, nil
case "Job":
parentJob, err := clientset.BatchV1().Jobs(namespace).Get(context.Background(), name, metav1.GetOptions{})
job, err := clientset.BatchV1().Jobs(namespace).Get(context.Background(), name, metav1.GetOptions{})
if err != nil {
return nil, err
}
return &parentJob.ObjectMeta, nil
return &job.ObjectMeta, nil
case "CronJob":
parentCronjob, err := clientset.BatchV1().CronJobs(namespace).Get(context.Background(), name, metav1.GetOptions{})
cronjob, err := clientset.BatchV1().CronJobs(namespace).Get(context.Background(), name, metav1.GetOptions{})
if err != nil {
return nil, err
}
return &parentCronjob.ObjectMeta, nil
return &cronjob.ObjectMeta, nil
default:
return nil, errors.New("unsupported object kind encountered")
}
Expand Down
27 changes: 3 additions & 24 deletions watcher_events.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,30 +150,9 @@ func handleWatchEvent(ctx context.Context, event *watch.Event, cutoffTime metav1
return
}

// Search for alternative DSN in the annotations
altDsn, err := searchDsn(ctx, *objectMeta)
if err != nil {
return
}

// If we did find an alternative DSN
if altDsn != "" {
// Attempt to retrieve the corresponding client
client, err := dsnData.GetClient(altDsn)
if err != nil {
return
}

if client == nil {
newOptions := hub.Client().Options()
newOptions.Dsn = altDsn
client, err = dsnData.AddClient(newOptions)
if err != nil {
return
}
}

// Bind the alternative client to the top layer
// if DSN annotation provided, we bind a new client with that DSN
client, ok := dsnData.GetClientFromObject(ctx, objectMeta, hub.Clone().Client().Options())
if ok {
hub.BindClient(client)
}

Expand Down
31 changes: 5 additions & 26 deletions watcher_pods.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ func handlePodWatchEvent(ctx context.Context, event *watch.Event) {
logger.Error().Msgf("Cannot get Sentry hub from context")
return
}
// to avoid concurrency issue
// To avoid concurrency issue
hub = hub.Clone()

containerStatuses := podObject.Status.ContainerStatuses
Expand All @@ -117,34 +117,13 @@ func handlePodWatchEvent(ctx context.Context, event *watch.Event) {
}
hub.WithScope(func(scope *sentry.Scope) {

// search for alternative DSN in the annotations
altDsn, err := searchDsn(ctx, podObject.ObjectMeta)
if err != nil {
return
}

// if we did find an alternative DSN
if altDsn != "" {
// attempt to retrieve the corresponding client
client, err := dsnData.GetClient(altDsn)
if err != nil {
return
}

if client == nil {
newOptions := hub.Client().Options()
newOptions.Dsn = altDsn
client, err = dsnData.AddClient(newOptions)
if err != nil {
return
}
}

// bind the alternative client to the top layer
// If DSN annotation provided, we bind a new client with that DSN
client, ok := dsnData.GetClientFromObject(ctx, &podObject.ObjectMeta, hub.Clone().Client().Options())
if ok {
hub.BindClient(client)
}

// pass down clone context
// Pass down clone context
ctx = sentry.SetHubOnContext(ctx, hub)
setWatcherTag(scope, podsWatcherName)
sentryEvent := handlePodTerminationEvent(ctx, &status, podObject, scope)
Expand Down

0 comments on commit c17f4ef

Please sign in to comment.