Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: delayed detection of job ending for crons #69

Merged
merged 6 commits into from
Dec 5, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 29 additions & 24 deletions crons.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,45 +36,46 @@ func startCronsInformers(ctx context.Context, namespace string) error {
return errors.New("failed to get clientset")
}

// create factory that will produce both the cronjob informer and job informer
// Create factory that will produce both the cronjob informer and job informer
factory := informers.NewSharedInformerFactoryWithOptions(
clientset,
5*time.Second,
informers.WithNamespace(namespace),
)

// create the cronjob informer
// Create the cronjob informer
cronjobInformer, err = createCronjobInformer(ctx, factory, namespace)
if err != nil {
return err
}
// create the job informer
// Create the job informer
jobInformer, err = createJobInformer(ctx, factory, namespace)
if err != nil {
return err
}

// channel to tell the factory to stop the informers
// Channel to tell the factory to stop the informers
doneChan := make(chan struct{})
factory.Start(doneChan)

// sync the cronjob informer cache
// Sync the cronjob informer cache
if ok := cache.WaitForCacheSync(doneChan, cronjobInformer.HasSynced); !ok {
return errors.New("cronjob informer failed to sync")
}
// sync the job informer cache
// Sync the job informer cache
if ok := cache.WaitForCacheSync(doneChan, jobInformer.HasSynced); !ok {
return errors.New("job informer failed to sync")
}

// wait for the channel to be closed
// Wait for the channel to be closed
<-doneChan

return nil
}

// Starts the jobs informer with event handlers that trigger
// checkin events during the start and end of a job (along with the exit status)
// Captures sentry crons checkin event if appropriate
// by checking the job status to determine if the job just created pod (job starting)
// or if the job exited
func runSentryCronsCheckin(ctx context.Context, job *batchv1.Job, eventHandlerType EventHandlerType) error {

// Try to find the cronJob name that owns the job
Expand All @@ -91,19 +92,20 @@ func runSentryCronsCheckin(ctx context.Context, job *batchv1.Job, eventHandlerTy
return errors.New("cannot find cronJob data")
Jiahui-Zhang-20 marked this conversation as resolved.
Show resolved Hide resolved
}

// capture checkin event called for by informer handler
if eventHandlerType == EventHandlerAdd {
// The job just begun so check in to start
if job.Status.Active == 0 && job.Status.Succeeded == 0 && job.Status.Failed == 0 {
Jiahui-Zhang-20 marked this conversation as resolved.
Show resolved Hide resolved
// Add the job to the cronJob informer data
checkinJobStarting(ctx, job, cronsMonitorData)
} else if eventHandlerType == EventHandlerUpdate || eventHandlerType == EventHandlerDelete {
// Delete pod from the cronJob informer data
} else if job.Status.Active > 0 {
return nil
} else if job.Status.Failed > 0 || job.Status.Succeeded > 0 {
checkinJobEnding(ctx, job, cronsMonitorData)
return nil // finished
}

return nil
}

// sends the checkin event to sentry crons for when a job starts
// Sends the checkin event to sentry crons for when a job starts
func checkinJobStarting(ctx context.Context, job *batchv1.Job, cronsMonitorData *CronsMonitorData) error {

logger := zerolog.Ctx(ctx)
Expand All @@ -128,21 +130,24 @@ func checkinJobStarting(ctx context.Context, job *batchv1.Job, cronsMonitorData
return nil
}

// sends the checkin event to sentry crons for when a job ends
// Sends the checkin event to sentry crons for when a job ends
func checkinJobEnding(ctx context.Context, job *batchv1.Job, cronsMonitorData *CronsMonitorData) error {

logger := zerolog.Ctx(ctx)
// do not check in to exit if there are still active pods
if job.Status.Active > 0 {
return nil
}

// Check desired number of pods have succeeded
var jobStatus sentry.CheckInStatus
if job.Status.Succeeded >= cronsMonitorData.requiredCompletions {
jobStatus = sentry.CheckInStatusOK

if job.Status.Conditions == nil {
return nil
} else {
jobStatus = sentry.CheckInStatusError
if job.Status.Conditions[0].Type == "Complete" {
jobStatus = sentry.CheckInStatusOK
} else if job.Status.Conditions[0].Type == "Failed" {
jobStatus = sentry.CheckInStatusError
} else {
return nil
}
}

// Get job data to retrieve the checkin ID
Expand All @@ -163,7 +168,7 @@ func checkinJobEnding(ctx context.Context, job *batchv1.Job, cronsMonitorData *C
return nil
}

// adds to the sentry events whenever it is associated with a cronjob
// Adds to the sentry events whenever it is associated with a cronjob
// so the sentry event contains the corresponding slug monitor, cronjob name, timestamp of when the cronjob began, and
// the k8s cronjob metadata
func runCronsDataHandler(ctx context.Context, scope *sentry.Scope, pod *v1.Pod, sentryEvent *sentry.Event) (bool, error) {
Expand Down
6 changes: 2 additions & 4 deletions crons_monitor_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ type CronsMonitorData struct {
}

// Constructor for cronsMonitorData
func NewCronsMonitorData(monitorSlug string, schedule string, maxRunTime int64, checkinMargin int64, completions *int32) *CronsMonitorData {
func NewCronsMonitorData(monitorSlug string, schedule string, completions *int32) *CronsMonitorData {

// Get required number of pods to complete
var requiredCompletions int32
Expand All @@ -48,9 +48,7 @@ func NewCronsMonitorData(monitorSlug string, schedule string, maxRunTime int64,
mutex: sync.RWMutex{},
MonitorSlug: monitorSlug,
monitorConfig: &sentry.MonitorConfig{
Schedule: monitorSchedule,
MaxRuntime: maxRunTime,
CheckInMargin: checkinMargin,
Schedule: monitorSchedule,
},
JobDatas: make(map[string]*CronsJobData),
requiredCompletions: requiredCompletions,
Expand Down
2 changes: 1 addition & 1 deletion informer_crons.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ func createCronjobInformer(ctx context.Context, factory informers.SharedInformer
if ok {
logger.Debug().Msgf("cronJob %s already exists in the crons informer data struct...\n", cronjob.Name)
} else {
cronsMetaData.addCronsMonitorData(cronjob.Name, NewCronsMonitorData(cronjob.Name, cronjob.Spec.Schedule, 5, 3, cronjob.Spec.JobTemplate.Spec.Completions))
cronsMetaData.addCronsMonitorData(cronjob.Name, NewCronsMonitorData(cronjob.Name, cronjob.Spec.Schedule, cronjob.Spec.JobTemplate.Spec.Completions))
}
}

Expand Down