From e4e93b2712c40a6d7e31663a39a751e19330c805 Mon Sep 17 00:00:00 2001 From: "Eugene R." Date: Tue, 24 Sep 2024 18:16:29 +0300 Subject: [PATCH] refactor(scheduler): optimize mutex lock acquisition (#138) --- quartz/scheduler.go | 69 +++++++++++++++++++++++++++------------------ 1 file changed, 42 insertions(+), 27 deletions(-) diff --git a/quartz/scheduler.go b/quartz/scheduler.go index 1b161cb..1c30d7d 100644 --- a/quartz/scheduler.go +++ b/quartz/scheduler.go @@ -67,7 +67,7 @@ type Scheduler interface { Stop() } -// StdScheduler implements the quartz.Scheduler interface. +// StdScheduler implements the [Scheduler] interface. type StdScheduler struct { mtx sync.Mutex wg sync.WaitGroup @@ -134,7 +134,7 @@ func NewStdScheduler() Scheduler { }, nil, nil) } -// NewStdSchedulerWithOptions returns a new StdScheduler configured as specified. +// NewStdSchedulerWithOptions returns a new [StdScheduler] configured as specified. // // A custom [JobQueue] implementation may be provided to manage scheduled jobs. // This is useful when distributed mode is required, so that jobs can be stored @@ -169,9 +169,6 @@ func (sched *StdScheduler) ScheduleJob( jobDetail *JobDetail, trigger Trigger, ) error { - sched.queueMtx.Lock() - defer sched.queueMtx.Unlock() - if jobDetail == nil { return illegalArgumentError("jobDetail is nil") } @@ -184,6 +181,7 @@ func (sched *StdScheduler) ScheduleJob( if trigger == nil { return illegalArgumentError("trigger is nil") } + nextRunTime := int64(math.MaxInt64) var err error if !jobDetail.opts.Suspended { @@ -197,8 +195,11 @@ func (sched *StdScheduler) ScheduleJob( trigger: trigger, priority: nextRunTime, } - err = sched.queue.Push(toSchedule) - if err == nil { + + sched.queueMtx.Lock() + defer sched.queueMtx.Unlock() + + if err = sched.queue.Push(toSchedule); err == nil { logger.Debugf("Successfully added job %s.", jobDetail.jobKey) if sched.started { sched.Reset() @@ -224,7 +225,7 @@ func (sched *StdScheduler) Start(ctx context.Context) { sched.wg.Add(1) go sched.startExecutionLoop(ctx) - // starts worker pool when WorkerLimit is greater than 0 + // starts worker pool if configured sched.startWorkers(ctx) sched.started = true @@ -259,6 +260,7 @@ func (sched *StdScheduler) GetJobKeys(matchers ...Matcher[ScheduledJob]) ([]*Job if err != nil { return nil, err } + keys := make([]*JobKey, 0, len(scheduledJobs)) for _, scheduled := range scheduledJobs { keys = append(keys, scheduled.JobDetail().jobKey) @@ -268,23 +270,25 @@ func (sched *StdScheduler) GetJobKeys(matchers ...Matcher[ScheduledJob]) ([]*Job // GetScheduledJob returns the ScheduledJob with the specified key. func (sched *StdScheduler) GetScheduledJob(jobKey *JobKey) (ScheduledJob, error) { - sched.queueMtx.Lock() - defer sched.queueMtx.Unlock() - if jobKey == nil { return nil, illegalArgumentError("jobKey is nil") } + + sched.queueMtx.Lock() + defer sched.queueMtx.Unlock() + return sched.queue.Get(jobKey) } // DeleteJob removes the Job with the specified key if present. func (sched *StdScheduler) DeleteJob(jobKey *JobKey) error { - sched.queueMtx.Lock() - defer sched.queueMtx.Unlock() - if jobKey == nil { return illegalArgumentError("jobKey is nil") } + + sched.queueMtx.Lock() + defer sched.queueMtx.Unlock() + _, err := sched.queue.Remove(jobKey) if err == nil { logger.Debugf("Successfully deleted job %s.", jobKey) @@ -298,12 +302,13 @@ func (sched *StdScheduler) DeleteJob(jobKey *JobKey) error { // PauseJob suspends the job with the specified key from being // executed by the scheduler. func (sched *StdScheduler) PauseJob(jobKey *JobKey) error { - sched.queueMtx.Lock() - defer sched.queueMtx.Unlock() - if jobKey == nil { return illegalArgumentError("jobKey is nil") } + + sched.queueMtx.Lock() + defer sched.queueMtx.Unlock() + job, err := sched.queue.Get(jobKey) if err != nil { return err @@ -311,6 +316,7 @@ func (sched *StdScheduler) PauseJob(jobKey *JobKey) error { if job.JobDetail().opts.Suspended { return illegalStateError(fmt.Sprintf("job %s is suspended", jobKey)) } + job, err = sched.queue.Remove(jobKey) if err == nil { job.JobDetail().opts.Suspended = true @@ -319,8 +325,7 @@ func (sched *StdScheduler) PauseJob(jobKey *JobKey) error { trigger: job.Trigger(), priority: int64(math.MaxInt64), } - err = sched.queue.Push(paused) - if err == nil { + if err = sched.queue.Push(paused); err == nil { logger.Debugf("Successfully paused job %s.", jobKey) if sched.started { sched.Reset() @@ -332,12 +337,13 @@ func (sched *StdScheduler) PauseJob(jobKey *JobKey) error { // ResumeJob restarts the suspended job with the specified key. func (sched *StdScheduler) ResumeJob(jobKey *JobKey) error { - sched.queueMtx.Lock() - defer sched.queueMtx.Unlock() - if jobKey == nil { return illegalArgumentError("jobKey is nil") } + + sched.queueMtx.Lock() + defer sched.queueMtx.Unlock() + job, err := sched.queue.Get(jobKey) if err != nil { return err @@ -345,10 +351,12 @@ func (sched *StdScheduler) ResumeJob(jobKey *JobKey) error { if !job.JobDetail().opts.Suspended { return illegalStateError(fmt.Sprintf("job %s is active", jobKey)) } + job, err = sched.queue.Remove(jobKey) if err == nil { job.JobDetail().opts.Suspended = false - nextRunTime, err := job.Trigger().NextFireTime(NowNano()) + var nextRunTime int64 + nextRunTime, err = job.Trigger().NextFireTime(NowNano()) if err != nil { return err } @@ -357,8 +365,7 @@ func (sched *StdScheduler) ResumeJob(jobKey *JobKey) error { trigger: job.Trigger(), priority: nextRunTime, } - err = sched.queue.Push(resumed) - if err == nil { + if err = sched.queue.Push(resumed); err == nil { logger.Debugf("Successfully resumed job %s.", jobKey) if sched.started { sched.Reset() @@ -401,7 +408,7 @@ func (sched *StdScheduler) Stop() { func (sched *StdScheduler) startExecutionLoop(ctx context.Context) { defer sched.wg.Done() - maxTimerDuration := time.Duration(1<<63 - 1) + const maxTimerDuration = time.Duration(1<<63 - 1) timer := time.NewTimer(maxTimerDuration) for { queueSize, err := sched.queue.Size() @@ -463,6 +470,7 @@ func (sched *StdScheduler) calculateNextTick() time.Duration { logger.Errorf("Failed to calculate next tick: %s", err) return sched.opts.RetryInterval } + nextRunTime := scheduledJob.NextRunTime() now := NowNano() if nextRunTime > now { @@ -536,6 +544,7 @@ func (sched *StdScheduler) validateJob(job ScheduledJob) (bool, func() (int64, e if job.JobDetail().opts.Suspended { return false, func() (int64, error) { return math.MaxInt64, nil } } + now := NowNano() if job.NextRunTime() < now-sched.opts.OutdatedThreshold.Nanoseconds() { duration := time.Duration(now - job.NextRunTime()) @@ -549,7 +558,10 @@ func (sched *StdScheduler) validateJob(job ScheduledJob) (bool, func() (int64, e logger.Debugf("Job %s is not due to run yet.", job.JobDetail().jobKey) return false, func() (int64, error) { return job.NextRunTime(), nil } } - return true, func() (int64, error) { return job.Trigger().NextFireTime(job.NextRunTime()) } + + return true, func() (int64, error) { + return job.Trigger().NextFireTime(job.NextRunTime()) + } } func (sched *StdScheduler) fetchAndReschedule() (ScheduledJob, bool) { @@ -566,14 +578,17 @@ func (sched *StdScheduler) fetchAndReschedule() (ScheduledJob, bool) { } return nil, false } + // validate the job valid, nextRunTimeExtractor := sched.validateJob(job) + // calculate next run time for the job nextRunTime, err := nextRunTimeExtractor() if err != nil { logger.Infof("Job %s exited the execution loop: %s.", job.JobDetail().jobKey, err) return job, valid } + // reschedule the job toSchedule := &scheduledJob{ job: job.JobDetail(),