Skip to content

Commit

Permalink
fix: add mutex protection for monitor metadata (#68)
Browse files Browse the repository at this point in the history
* add mutex protection to crons

* use defer for unlocking
  • Loading branch information
Jiahui-Zhang-20 committed Nov 29, 2023
1 parent 64552f6 commit 95ea882
Show file tree
Hide file tree
Showing 5 changed files with 54 additions and 51 deletions.
24 changes: 8 additions & 16 deletions crons.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ const (
EventHandlerDelete EventHandlerType = "DELETE"
)

var cronjobInformer cache.SharedIndexInformer
var jobInformer cache.SharedIndexInformer

// Starts the crons informer which has event handlers
// adds to the crons monitor data struct used for sending
// checkin events to Sentry
Expand All @@ -41,12 +44,12 @@ func startCronsInformers(ctx context.Context, namespace string) error {
)

// create the cronjob informer
cronjobInformer, err := createCronjobInformer(ctx, factory, namespace)
cronjobInformer, err = createCronjobInformer(ctx, factory, namespace)
if err != nil {
return err
}
// create the job informer
jobInformer, err := createJobInformer(ctx, factory, namespace)
jobInformer, err = createJobInformer(ctx, factory, namespace)
if err != nil {
return err
}
Expand Down Expand Up @@ -74,17 +77,6 @@ func startCronsInformers(ctx context.Context, namespace string) error {
// checkin events during the start and end of a job (along with the exit status)
func runSentryCronsCheckin(ctx context.Context, job *batchv1.Job, eventHandlerType EventHandlerType) error {

// Query the crons informer data
val := ctx.Value(CronsInformerDataKey{})
if val == nil {
return errors.New("no crons informer data struct given")
}
var cronsInformerData *map[string]CronsMonitorData
var ok bool
if cronsInformerData, ok = val.(*map[string]CronsMonitorData); !ok {
return errors.New("cannot convert cronsInformerData value from context")
}

// Try to find the cronJob name that owns the job
// in order to get the crons monitor data
if len(job.OwnerReferences) == 0 {
Expand All @@ -94,7 +86,7 @@ func runSentryCronsCheckin(ctx context.Context, job *batchv1.Job, eventHandlerTy
if !*cronjobRef.Controller || cronjobRef.Kind != "CronJob" {
return errors.New("job does not have cronjob reference")
}
cronsMonitorData, ok := (*cronsInformerData)[cronjobRef.Name]
cronsMonitorData, ok := cronsMetaData.getCronsMonitorData(cronjobRef.Name)
if !ok {
return errors.New("cannot find cronJob data")
}
Expand All @@ -112,7 +104,7 @@ func runSentryCronsCheckin(ctx context.Context, job *batchv1.Job, eventHandlerTy
}

// sends the checkin event to sentry crons for when a job starts
func checkinJobStarting(ctx context.Context, job *batchv1.Job, cronsMonitorData CronsMonitorData) error {
func checkinJobStarting(ctx context.Context, job *batchv1.Job, cronsMonitorData *CronsMonitorData) error {

logger := zerolog.Ctx(ctx)

Expand All @@ -137,7 +129,7 @@ func checkinJobStarting(ctx context.Context, job *batchv1.Job, cronsMonitorData
}

// sends the checkin event to sentry crons for when a job ends
func checkinJobEnding(ctx context.Context, job *batchv1.Job, cronsMonitorData CronsMonitorData) error {
func checkinJobEnding(ctx context.Context, job *batchv1.Job, cronsMonitorData *CronsMonitorData) error {

logger := zerolog.Ctx(ctx)
// do not check in to exit if there are still active pods
Expand Down
41 changes: 39 additions & 2 deletions crons_monitor_data.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
package main

import (
"sync"

"github.com/getsentry/sentry-go"
batchv1 "k8s.io/api/batch/v1"
)

type CronsInformerDataKey struct{}

// Struct associated with a job
type CronsJobData struct {
CheckinId sentry.EventID
Expand All @@ -25,6 +25,7 @@ func (j *CronsJobData) getCheckinId() sentry.EventID {

// Struct associated with a cronJob
type CronsMonitorData struct {
mutex sync.RWMutex
MonitorSlug string
monitorConfig *sentry.MonitorConfig
JobDatas map[string]*CronsJobData
Expand All @@ -44,6 +45,7 @@ func NewCronsMonitorData(monitorSlug string, schedule string, maxRunTime int64,
}
monitorSchedule := sentry.CrontabSchedule(schedule)
return &CronsMonitorData{
mutex: sync.RWMutex{},
MonitorSlug: monitorSlug,
monitorConfig: &sentry.MonitorConfig{
Schedule: monitorSchedule,
Expand All @@ -57,6 +59,41 @@ func NewCronsMonitorData(monitorSlug string, schedule string, maxRunTime int64,

// Add a job to the crons monitor
func (c *CronsMonitorData) addJob(job *batchv1.Job, checkinId sentry.EventID) error {
c.mutex.Lock()
defer c.mutex.Unlock()
c.JobDatas[job.Name] = NewCronsJobData(checkinId)
return nil
}

// wrapper struct over crons monitor map that
// handles syncrhonization
type CronsMetaData struct {
mutex *sync.RWMutex
cronsMonitorDataMap map[string]*CronsMonitorData
}

func (c *CronsMetaData) addCronsMonitorData(cronjobName string, newCronsMonitorData *CronsMonitorData) {
c.mutex.Lock()
defer c.mutex.Unlock()
c.cronsMonitorDataMap[cronjobName] = newCronsMonitorData
}

func (c *CronsMetaData) deleteCronsMonitorData(cronjobName string) {
c.mutex.Lock()
defer c.mutex.Unlock()
delete(c.cronsMonitorDataMap, cronjobName)
}

func (c *CronsMetaData) getCronsMonitorData(cronjobName string) (*CronsMonitorData, bool) {
c.mutex.RLock()
defer c.mutex.RUnlock()
cronsMonitorData, ok := c.cronsMonitorDataMap[cronjobName]
return cronsMonitorData, ok
}

func NewCronsMetaData() *CronsMetaData {
return &CronsMetaData{
mutex: &sync.RWMutex{},
cronsMonitorDataMap: make(map[string]*CronsMonitorData),
}
}
20 changes: 4 additions & 16 deletions informer_crons.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package main

import (
"context"
"errors"

"github.com/rs/zerolog"
batchv1 "k8s.io/api/batch/v1"
Expand All @@ -17,38 +16,27 @@ func createCronjobInformer(ctx context.Context, factory informers.SharedInformer

logger.Debug().Msgf("Starting cronJob informer\n")

val := ctx.Value(CronsInformerDataKey{})
if val == nil {
return nil, errors.New("no crons informer data struct given")
}

var cronsInformerData *map[string]CronsMonitorData
var ok bool
if cronsInformerData, ok = val.(*map[string]CronsMonitorData); !ok {
return nil, errors.New("cannot convert cronsInformerData value from context")
}

cronjobInformer := factory.Batch().V1().CronJobs().Informer()

var handler cache.ResourceEventHandlerFuncs

handler.AddFunc = func(obj interface{}) {
cronjob := obj.(*batchv1.CronJob)
logger.Debug().Msgf("ADD: CronJob Added to Store: %s\n", cronjob.GetName())
_, ok := (*cronsInformerData)[cronjob.Name]
_, ok := cronsMetaData.getCronsMonitorData(cronjob.Name)
if ok {
logger.Debug().Msgf("cronJob %s already exists in the crons informer data struct...\n", cronjob.Name)
} else {
(*cronsInformerData)[cronjob.Name] = *NewCronsMonitorData(cronjob.Name, cronjob.Spec.Schedule, 5, 3, cronjob.Spec.JobTemplate.Spec.Completions)
cronsMetaData.addCronsMonitorData(cronjob.Name, NewCronsMonitorData(cronjob.Name, cronjob.Spec.Schedule, 5, 3, cronjob.Spec.JobTemplate.Spec.Completions))
}
}

handler.DeleteFunc = func(obj interface{}) {
cronjob := obj.(*batchv1.CronJob)
logger.Debug().Msgf("DELETE: CronJob deleted from Store: %s\n", cronjob.GetName())
_, ok := (*cronsInformerData)[cronjob.Name]
_, ok := cronsMetaData.getCronsMonitorData(cronjob.Name)
if ok {
delete((*cronsInformerData), cronjob.Name)
cronsMetaData.deleteCronsMonitorData(cronjob.Name)
logger.Debug().Msgf("cronJob %s deleted from the crons informer data struct...\n", cronjob.Name)
} else {
logger.Debug().Msgf("cronJob %s not in the crons informer data struct...\n", cronjob.Name)
Expand Down
6 changes: 0 additions & 6 deletions informer_jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package main

import (
"context"
"errors"

"github.com/rs/zerolog"
batchv1 "k8s.io/api/batch/v1"
Expand All @@ -17,11 +16,6 @@ func createJobInformer(ctx context.Context, factory informers.SharedInformerFact

logger.Debug().Msgf("starting job informer\n")

val := ctx.Value(CronsInformerDataKey{})
if val == nil {
return nil, errors.New("no crons informer data struct given")
}

jobInformer := factory.Batch().V1().Jobs().Informer()

var handler cache.ResourceEventHandlerFuncs
Expand Down
14 changes: 3 additions & 11 deletions watcher_pods.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ import (

const podsWatcherName = "pods"

var cronsMetaData = NewCronsMetaData()

func handlePodTerminationEvent(ctx context.Context, containerStatus *v1.ContainerStatus, pod *v1.Pod, scope *sentry.Scope) *sentry.Event {
logger := zerolog.Ctx(ctx)

Expand Down Expand Up @@ -75,11 +77,6 @@ func handlePodWatchEvent(ctx context.Context, event *watch.Event) {

eventObjectRaw := event.Object

// err := runSentryCronsCheckin(ctx, event)
// if err != nil {
// return
// }

if event.Type != watch.Modified {
logger.Debug().Msgf("Skipping a pod watch event of type %s", event.Type)
return
Expand Down Expand Up @@ -186,18 +183,13 @@ func watchPodsInNamespaceForever(ctx context.Context, config *rest.Config, names

ctx = setClientsetOnContext(ctx, clientset)

// create the informers to integrate with sentry crons
// Create the informers to integrate with sentry crons
if isTruthy(os.Getenv("SENTRY_K8S_MONITOR_CRONJOBS")) {
cronsInformerData := make(map[string]CronsMonitorData)
ctx := context.WithValue(ctx, CronsInformerDataKey{}, &cronsInformerData)
logger.Info().Msgf("Enabling CronJob monitoring")

go startCronsInformers(ctx, namespace)

} else {
logger.Info().Msgf("CronJob monitoring is disabled")
}

for {
if err := watchPodsInNamespace(ctx, namespace); err != nil {
logger.Error().Msgf("Error while watching pods %s: %s", where, err)
Expand Down

0 comments on commit 95ea882

Please sign in to comment.