From 95ea882c60083bd39a4cd5f41925f1ae2376befb Mon Sep 17 00:00:00 2001 From: "Jiahui (Jack) Zhang" Date: Wed, 29 Nov 2023 01:32:15 -0800 Subject: [PATCH] fix: add mutex protection for monitor metadata (#68) * add mutex protection to crons * use defer for unlocking --- crons.go | 24 ++++++++---------------- crons_monitor_data.go | 41 +++++++++++++++++++++++++++++++++++++++-- informer_crons.go | 20 ++++---------------- informer_jobs.go | 6 ------ watcher_pods.go | 14 +++----------- 5 files changed, 54 insertions(+), 51 deletions(-) diff --git a/crons.go b/crons.go index b44a748..a994b27 100644 --- a/crons.go +++ b/crons.go @@ -23,6 +23,9 @@ const ( EventHandlerDelete EventHandlerType = "DELETE" ) +var cronjobInformer cache.SharedIndexInformer +var jobInformer cache.SharedIndexInformer + // Starts the crons informer which has event handlers // adds to the crons monitor data struct used for sending // checkin events to Sentry @@ -41,12 +44,12 @@ func startCronsInformers(ctx context.Context, namespace string) error { ) // create the cronjob informer - cronjobInformer, err := createCronjobInformer(ctx, factory, namespace) + cronjobInformer, err = createCronjobInformer(ctx, factory, namespace) if err != nil { return err } // create the job informer - jobInformer, err := createJobInformer(ctx, factory, namespace) + jobInformer, err = createJobInformer(ctx, factory, namespace) if err != nil { return err } @@ -74,17 +77,6 @@ func startCronsInformers(ctx context.Context, namespace string) error { // checkin events during the start and end of a job (along with the exit status) func runSentryCronsCheckin(ctx context.Context, job *batchv1.Job, eventHandlerType EventHandlerType) error { - // Query the crons informer data - val := ctx.Value(CronsInformerDataKey{}) - if val == nil { - return errors.New("no crons informer data struct given") - } - var cronsInformerData *map[string]CronsMonitorData - var ok bool - if cronsInformerData, ok = val.(*map[string]CronsMonitorData); !ok { - return errors.New("cannot convert cronsInformerData value from context") - } - // Try to find the cronJob name that owns the job // in order to get the crons monitor data if len(job.OwnerReferences) == 0 { @@ -94,7 +86,7 @@ func runSentryCronsCheckin(ctx context.Context, job *batchv1.Job, eventHandlerTy if !*cronjobRef.Controller || cronjobRef.Kind != "CronJob" { return errors.New("job does not have cronjob reference") } - cronsMonitorData, ok := (*cronsInformerData)[cronjobRef.Name] + cronsMonitorData, ok := cronsMetaData.getCronsMonitorData(cronjobRef.Name) if !ok { return errors.New("cannot find cronJob data") } @@ -112,7 +104,7 @@ func runSentryCronsCheckin(ctx context.Context, job *batchv1.Job, eventHandlerTy } // sends the checkin event to sentry crons for when a job starts -func checkinJobStarting(ctx context.Context, job *batchv1.Job, cronsMonitorData CronsMonitorData) error { +func checkinJobStarting(ctx context.Context, job *batchv1.Job, cronsMonitorData *CronsMonitorData) error { logger := zerolog.Ctx(ctx) @@ -137,7 +129,7 @@ func checkinJobStarting(ctx context.Context, job *batchv1.Job, cronsMonitorData } // sends the checkin event to sentry crons for when a job ends -func checkinJobEnding(ctx context.Context, job *batchv1.Job, cronsMonitorData CronsMonitorData) error { +func checkinJobEnding(ctx context.Context, job *batchv1.Job, cronsMonitorData *CronsMonitorData) error { logger := zerolog.Ctx(ctx) // do not check in to exit if there are still active pods diff --git a/crons_monitor_data.go b/crons_monitor_data.go index 9c7659e..b7ae1bc 100644 --- a/crons_monitor_data.go +++ b/crons_monitor_data.go @@ -1,12 +1,12 @@ package main import ( + "sync" + "github.com/getsentry/sentry-go" batchv1 "k8s.io/api/batch/v1" ) -type CronsInformerDataKey struct{} - // Struct associated with a job type CronsJobData struct { CheckinId sentry.EventID @@ -25,6 +25,7 @@ func (j *CronsJobData) getCheckinId() sentry.EventID { // Struct associated with a cronJob type CronsMonitorData struct { + mutex sync.RWMutex MonitorSlug string monitorConfig *sentry.MonitorConfig JobDatas map[string]*CronsJobData @@ -44,6 +45,7 @@ func NewCronsMonitorData(monitorSlug string, schedule string, maxRunTime int64, } monitorSchedule := sentry.CrontabSchedule(schedule) return &CronsMonitorData{ + mutex: sync.RWMutex{}, MonitorSlug: monitorSlug, monitorConfig: &sentry.MonitorConfig{ Schedule: monitorSchedule, @@ -57,6 +59,41 @@ func NewCronsMonitorData(monitorSlug string, schedule string, maxRunTime int64, // Add a job to the crons monitor func (c *CronsMonitorData) addJob(job *batchv1.Job, checkinId sentry.EventID) error { + c.mutex.Lock() + defer c.mutex.Unlock() c.JobDatas[job.Name] = NewCronsJobData(checkinId) return nil } + +// wrapper struct over crons monitor map that +// handles syncrhonization +type CronsMetaData struct { + mutex *sync.RWMutex + cronsMonitorDataMap map[string]*CronsMonitorData +} + +func (c *CronsMetaData) addCronsMonitorData(cronjobName string, newCronsMonitorData *CronsMonitorData) { + c.mutex.Lock() + defer c.mutex.Unlock() + c.cronsMonitorDataMap[cronjobName] = newCronsMonitorData +} + +func (c *CronsMetaData) deleteCronsMonitorData(cronjobName string) { + c.mutex.Lock() + defer c.mutex.Unlock() + delete(c.cronsMonitorDataMap, cronjobName) +} + +func (c *CronsMetaData) getCronsMonitorData(cronjobName string) (*CronsMonitorData, bool) { + c.mutex.RLock() + defer c.mutex.RUnlock() + cronsMonitorData, ok := c.cronsMonitorDataMap[cronjobName] + return cronsMonitorData, ok +} + +func NewCronsMetaData() *CronsMetaData { + return &CronsMetaData{ + mutex: &sync.RWMutex{}, + cronsMonitorDataMap: make(map[string]*CronsMonitorData), + } +} diff --git a/informer_crons.go b/informer_crons.go index 4e0898b..2c13b88 100644 --- a/informer_crons.go +++ b/informer_crons.go @@ -2,7 +2,6 @@ package main import ( "context" - "errors" "github.com/rs/zerolog" batchv1 "k8s.io/api/batch/v1" @@ -17,17 +16,6 @@ func createCronjobInformer(ctx context.Context, factory informers.SharedInformer logger.Debug().Msgf("Starting cronJob informer\n") - val := ctx.Value(CronsInformerDataKey{}) - if val == nil { - return nil, errors.New("no crons informer data struct given") - } - - var cronsInformerData *map[string]CronsMonitorData - var ok bool - if cronsInformerData, ok = val.(*map[string]CronsMonitorData); !ok { - return nil, errors.New("cannot convert cronsInformerData value from context") - } - cronjobInformer := factory.Batch().V1().CronJobs().Informer() var handler cache.ResourceEventHandlerFuncs @@ -35,20 +23,20 @@ func createCronjobInformer(ctx context.Context, factory informers.SharedInformer handler.AddFunc = func(obj interface{}) { cronjob := obj.(*batchv1.CronJob) logger.Debug().Msgf("ADD: CronJob Added to Store: %s\n", cronjob.GetName()) - _, ok := (*cronsInformerData)[cronjob.Name] + _, ok := cronsMetaData.getCronsMonitorData(cronjob.Name) if ok { logger.Debug().Msgf("cronJob %s already exists in the crons informer data struct...\n", cronjob.Name) } else { - (*cronsInformerData)[cronjob.Name] = *NewCronsMonitorData(cronjob.Name, cronjob.Spec.Schedule, 5, 3, cronjob.Spec.JobTemplate.Spec.Completions) + cronsMetaData.addCronsMonitorData(cronjob.Name, NewCronsMonitorData(cronjob.Name, cronjob.Spec.Schedule, 5, 3, cronjob.Spec.JobTemplate.Spec.Completions)) } } handler.DeleteFunc = func(obj interface{}) { cronjob := obj.(*batchv1.CronJob) logger.Debug().Msgf("DELETE: CronJob deleted from Store: %s\n", cronjob.GetName()) - _, ok := (*cronsInformerData)[cronjob.Name] + _, ok := cronsMetaData.getCronsMonitorData(cronjob.Name) if ok { - delete((*cronsInformerData), cronjob.Name) + cronsMetaData.deleteCronsMonitorData(cronjob.Name) logger.Debug().Msgf("cronJob %s deleted from the crons informer data struct...\n", cronjob.Name) } else { logger.Debug().Msgf("cronJob %s not in the crons informer data struct...\n", cronjob.Name) diff --git a/informer_jobs.go b/informer_jobs.go index f8e4841..3dcab1f 100644 --- a/informer_jobs.go +++ b/informer_jobs.go @@ -2,7 +2,6 @@ package main import ( "context" - "errors" "github.com/rs/zerolog" batchv1 "k8s.io/api/batch/v1" @@ -17,11 +16,6 @@ func createJobInformer(ctx context.Context, factory informers.SharedInformerFact logger.Debug().Msgf("starting job informer\n") - val := ctx.Value(CronsInformerDataKey{}) - if val == nil { - return nil, errors.New("no crons informer data struct given") - } - jobInformer := factory.Batch().V1().Jobs().Informer() var handler cache.ResourceEventHandlerFuncs diff --git a/watcher_pods.go b/watcher_pods.go index da51831..b64dad4 100644 --- a/watcher_pods.go +++ b/watcher_pods.go @@ -19,6 +19,8 @@ import ( const podsWatcherName = "pods" +var cronsMetaData = NewCronsMetaData() + func handlePodTerminationEvent(ctx context.Context, containerStatus *v1.ContainerStatus, pod *v1.Pod, scope *sentry.Scope) *sentry.Event { logger := zerolog.Ctx(ctx) @@ -75,11 +77,6 @@ func handlePodWatchEvent(ctx context.Context, event *watch.Event) { eventObjectRaw := event.Object - // err := runSentryCronsCheckin(ctx, event) - // if err != nil { - // return - // } - if event.Type != watch.Modified { logger.Debug().Msgf("Skipping a pod watch event of type %s", event.Type) return @@ -186,18 +183,13 @@ func watchPodsInNamespaceForever(ctx context.Context, config *rest.Config, names ctx = setClientsetOnContext(ctx, clientset) - // create the informers to integrate with sentry crons + // Create the informers to integrate with sentry crons if isTruthy(os.Getenv("SENTRY_K8S_MONITOR_CRONJOBS")) { - cronsInformerData := make(map[string]CronsMonitorData) - ctx := context.WithValue(ctx, CronsInformerDataKey{}, &cronsInformerData) logger.Info().Msgf("Enabling CronJob monitoring") - go startCronsInformers(ctx, namespace) - } else { logger.Info().Msgf("CronJob monitoring is disabled") } - for { if err := watchPodsInNamespace(ctx, namespace); err != nil { logger.Error().Msgf("Error while watching pods %s: %s", where, err)