Skip to content

Commit

Permalink
refactor(scheduler): optimize mutex lock acquisition (#138)
Browse files Browse the repository at this point in the history
  • Loading branch information
reugn authored Sep 24, 2024
1 parent 93c6a6c commit e4e93b2
Showing 1 changed file with 42 additions and 27 deletions.
69 changes: 42 additions & 27 deletions quartz/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ type Scheduler interface {
Stop()
}

// StdScheduler implements the quartz.Scheduler interface.
// StdScheduler implements the [Scheduler] interface.
type StdScheduler struct {
mtx sync.Mutex
wg sync.WaitGroup
Expand Down Expand Up @@ -134,7 +134,7 @@ func NewStdScheduler() Scheduler {
}, nil, nil)
}

// NewStdSchedulerWithOptions returns a new StdScheduler configured as specified.
// NewStdSchedulerWithOptions returns a new [StdScheduler] configured as specified.
//
// A custom [JobQueue] implementation may be provided to manage scheduled jobs.
// This is useful when distributed mode is required, so that jobs can be stored
Expand Down Expand Up @@ -169,9 +169,6 @@ func (sched *StdScheduler) ScheduleJob(
jobDetail *JobDetail,
trigger Trigger,
) error {
sched.queueMtx.Lock()
defer sched.queueMtx.Unlock()

if jobDetail == nil {
return illegalArgumentError("jobDetail is nil")
}
Expand All @@ -184,6 +181,7 @@ func (sched *StdScheduler) ScheduleJob(
if trigger == nil {
return illegalArgumentError("trigger is nil")
}

nextRunTime := int64(math.MaxInt64)
var err error
if !jobDetail.opts.Suspended {
Expand All @@ -197,8 +195,11 @@ func (sched *StdScheduler) ScheduleJob(
trigger: trigger,
priority: nextRunTime,
}
err = sched.queue.Push(toSchedule)
if err == nil {

sched.queueMtx.Lock()
defer sched.queueMtx.Unlock()

if err = sched.queue.Push(toSchedule); err == nil {
logger.Debugf("Successfully added job %s.", jobDetail.jobKey)
if sched.started {
sched.Reset()
Expand All @@ -224,7 +225,7 @@ func (sched *StdScheduler) Start(ctx context.Context) {
sched.wg.Add(1)
go sched.startExecutionLoop(ctx)

// starts worker pool when WorkerLimit is greater than 0
// starts worker pool if configured
sched.startWorkers(ctx)

sched.started = true
Expand Down Expand Up @@ -259,6 +260,7 @@ func (sched *StdScheduler) GetJobKeys(matchers ...Matcher[ScheduledJob]) ([]*Job
if err != nil {
return nil, err
}

keys := make([]*JobKey, 0, len(scheduledJobs))
for _, scheduled := range scheduledJobs {
keys = append(keys, scheduled.JobDetail().jobKey)
Expand All @@ -268,23 +270,25 @@ func (sched *StdScheduler) GetJobKeys(matchers ...Matcher[ScheduledJob]) ([]*Job

// GetScheduledJob returns the ScheduledJob with the specified key.
func (sched *StdScheduler) GetScheduledJob(jobKey *JobKey) (ScheduledJob, error) {
sched.queueMtx.Lock()
defer sched.queueMtx.Unlock()

if jobKey == nil {
return nil, illegalArgumentError("jobKey is nil")
}

sched.queueMtx.Lock()
defer sched.queueMtx.Unlock()

return sched.queue.Get(jobKey)
}

// DeleteJob removes the Job with the specified key if present.
func (sched *StdScheduler) DeleteJob(jobKey *JobKey) error {
sched.queueMtx.Lock()
defer sched.queueMtx.Unlock()

if jobKey == nil {
return illegalArgumentError("jobKey is nil")
}

sched.queueMtx.Lock()
defer sched.queueMtx.Unlock()

_, err := sched.queue.Remove(jobKey)
if err == nil {
logger.Debugf("Successfully deleted job %s.", jobKey)
Expand All @@ -298,19 +302,21 @@ func (sched *StdScheduler) DeleteJob(jobKey *JobKey) error {
// PauseJob suspends the job with the specified key from being
// executed by the scheduler.
func (sched *StdScheduler) PauseJob(jobKey *JobKey) error {
sched.queueMtx.Lock()
defer sched.queueMtx.Unlock()

if jobKey == nil {
return illegalArgumentError("jobKey is nil")
}

sched.queueMtx.Lock()
defer sched.queueMtx.Unlock()

job, err := sched.queue.Get(jobKey)
if err != nil {
return err
}
if job.JobDetail().opts.Suspended {
return illegalStateError(fmt.Sprintf("job %s is suspended", jobKey))
}

job, err = sched.queue.Remove(jobKey)
if err == nil {
job.JobDetail().opts.Suspended = true
Expand All @@ -319,8 +325,7 @@ func (sched *StdScheduler) PauseJob(jobKey *JobKey) error {
trigger: job.Trigger(),
priority: int64(math.MaxInt64),
}
err = sched.queue.Push(paused)
if err == nil {
if err = sched.queue.Push(paused); err == nil {
logger.Debugf("Successfully paused job %s.", jobKey)
if sched.started {
sched.Reset()
Expand All @@ -332,23 +337,26 @@ func (sched *StdScheduler) PauseJob(jobKey *JobKey) error {

// ResumeJob restarts the suspended job with the specified key.
func (sched *StdScheduler) ResumeJob(jobKey *JobKey) error {
sched.queueMtx.Lock()
defer sched.queueMtx.Unlock()

if jobKey == nil {
return illegalArgumentError("jobKey is nil")
}

sched.queueMtx.Lock()
defer sched.queueMtx.Unlock()

job, err := sched.queue.Get(jobKey)
if err != nil {
return err
}
if !job.JobDetail().opts.Suspended {
return illegalStateError(fmt.Sprintf("job %s is active", jobKey))
}

job, err = sched.queue.Remove(jobKey)
if err == nil {
job.JobDetail().opts.Suspended = false
nextRunTime, err := job.Trigger().NextFireTime(NowNano())
var nextRunTime int64
nextRunTime, err = job.Trigger().NextFireTime(NowNano())
if err != nil {
return err
}
Expand All @@ -357,8 +365,7 @@ func (sched *StdScheduler) ResumeJob(jobKey *JobKey) error {
trigger: job.Trigger(),
priority: nextRunTime,
}
err = sched.queue.Push(resumed)
if err == nil {
if err = sched.queue.Push(resumed); err == nil {
logger.Debugf("Successfully resumed job %s.", jobKey)
if sched.started {
sched.Reset()
Expand Down Expand Up @@ -401,7 +408,7 @@ func (sched *StdScheduler) Stop() {

func (sched *StdScheduler) startExecutionLoop(ctx context.Context) {
defer sched.wg.Done()
maxTimerDuration := time.Duration(1<<63 - 1)
const maxTimerDuration = time.Duration(1<<63 - 1)
timer := time.NewTimer(maxTimerDuration)
for {
queueSize, err := sched.queue.Size()
Expand Down Expand Up @@ -463,6 +470,7 @@ func (sched *StdScheduler) calculateNextTick() time.Duration {
logger.Errorf("Failed to calculate next tick: %s", err)
return sched.opts.RetryInterval
}

nextRunTime := scheduledJob.NextRunTime()
now := NowNano()
if nextRunTime > now {
Expand Down Expand Up @@ -536,6 +544,7 @@ func (sched *StdScheduler) validateJob(job ScheduledJob) (bool, func() (int64, e
if job.JobDetail().opts.Suspended {
return false, func() (int64, error) { return math.MaxInt64, nil }
}

now := NowNano()
if job.NextRunTime() < now-sched.opts.OutdatedThreshold.Nanoseconds() {
duration := time.Duration(now - job.NextRunTime())
Expand All @@ -549,7 +558,10 @@ func (sched *StdScheduler) validateJob(job ScheduledJob) (bool, func() (int64, e
logger.Debugf("Job %s is not due to run yet.", job.JobDetail().jobKey)
return false, func() (int64, error) { return job.NextRunTime(), nil }
}
return true, func() (int64, error) { return job.Trigger().NextFireTime(job.NextRunTime()) }

return true, func() (int64, error) {
return job.Trigger().NextFireTime(job.NextRunTime())
}
}

func (sched *StdScheduler) fetchAndReschedule() (ScheduledJob, bool) {
Expand All @@ -566,14 +578,17 @@ func (sched *StdScheduler) fetchAndReschedule() (ScheduledJob, bool) {
}
return nil, false
}

// validate the job
valid, nextRunTimeExtractor := sched.validateJob(job)

// calculate next run time for the job
nextRunTime, err := nextRunTimeExtractor()
if err != nil {
logger.Infof("Job %s exited the execution loop: %s.", job.JobDetail().jobKey, err)
return job, valid
}

// reschedule the job
toSchedule := &scheduledJob{
job: job.JobDetail(),
Expand Down

0 comments on commit e4e93b2

Please sign in to comment.