Skip to content

Commit

Permalink
perf: use min heap for entries
Browse files Browse the repository at this point in the history
  • Loading branch information
janrnc committed Jan 5, 2024
1 parent abf1acd commit 7171f48
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 37 deletions.
46 changes: 13 additions & 33 deletions cron.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,17 @@ package cron

import (
"context"
"sort"
"sync"
"time"
"fmt"
"container/heap"
)

// Cron keeps track of any number of entries, invoking the associated func as
// specified by the schedule. It may be started, stopped, and the entries may
// be inspected while running.
type Cron struct {
entries []*Entry
entries entryHeap
chain Chain
stop chan struct{}
add chan *Entry
Expand Down Expand Up @@ -61,25 +61,6 @@ type Entry struct {
job func()
}

// byTime is a wrapper for sorting the entry array by time
// (with zero time at the end).
type byTime []*Entry

func (s byTime) Len() int { return len(s) }
func (s byTime) Swap(i, j int) { s[i], s[j] = s[j], s[i] }
func (s byTime) Less(i, j int) bool {
// Two zero times should return false.
// Otherwise, zero is "greater" than any other time.
// (To sort it at the end of the list.)
if s[i].Next.IsZero() {
return false
}
if s[j].Next.IsZero() {
return true
}
return s[i].Next.Before(s[j].Next)
}

// New returns a new Cron job runner, modified by the given options.
//
// Available Settings
Expand All @@ -99,7 +80,7 @@ func (s byTime) Less(i, j int) bool {
// See "cron.With*" to modify the default behavior.
func New(opts ...Option) *Cron {
c := &Cron{
entries: nil,
entries: entryHeap{},
chain: NewChain(),
add: make(chan *Entry),
stop: make(chan struct{}),
Expand Down Expand Up @@ -225,11 +206,9 @@ func (c *Cron) run() {
entry.Next = entry.Schedule.Next(now)
c.logger.Info("schedule", "now", now, "entry", entry.ID, "next", entry.Next)
}
heap.Init(&c.entries)

for {
// Determine the next entry to run.
sort.Sort(byTime(c.entries))

var timer *time.Timer
if len(c.entries) == 0 || c.entries[0].Next.IsZero() {
// If there are no entries yet, just sleep - it still handles new entries
Expand All @@ -246,21 +225,23 @@ func (c *Cron) run() {
c.logger.Info("wake", "now", now)

// Run every entry whose next time was less than now
for _, e := range c.entries {
if e.Next.After(now) || e.Next.IsZero() {
for {
if c.entries[0].Next.After(now) || c.entries[0].Next.IsZero() {
break
}
e := heap.Pop(&c.entries).(*Entry)
c.startJob(e.job)
e.Prev = e.Next
e.Next = e.Schedule.Next(now)
heap.Push(&c.entries, e)
c.logger.Info("run", "now", now, "entry", e.ID, "next", e.Next)
}

case newEntry := <-c.add:
timer.Stop()
now = c.now()
newEntry.Next = newEntry.Schedule.Next(now)
c.entries = append(c.entries, newEntry)
heap.Push(&c.entries, newEntry)
c.logger.Info("added", "now", now, "entry", newEntry.ID, "next", newEntry.Next)

case replyChan := <-c.snapshot:
Expand Down Expand Up @@ -325,11 +306,10 @@ func (c *Cron) entrySnapshot() []Entry {
}

func (c *Cron) removeEntry(id ID) {
var entries []*Entry
for _, e := range c.entries {
if e.ID != id {
entries = append(entries, e)
for idx, e := range c.entries {
if e.ID == id {
heap.Remove(&c.entries, idx)
return
}
}
c.entries = entries
}
10 changes: 6 additions & 4 deletions cron_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"sync/atomic"
"testing"
"time"
"container/heap"
)

// Many tests schedule a job for every second, and then wait at most a second
Expand Down Expand Up @@ -528,18 +529,19 @@ func TestJob(t *testing.T) {

cron.Start()
defer cron.Stop()

select {
case <-time.After(OneSecond):
t.FailNow()
case <-wait(wg):
}

// Ensure the entries are in the right order.
cron.Stop()
expecteds := []ID{job2, job4, job5, job1, job3, job0}

var actuals []ID
for _, entry := range cron.Entries() {
actuals := []ID{}
for len(cron.entries) > 0 {
entry := heap.Pop(&cron.entries).(*Entry)
actuals = append(actuals, entry.ID)
}

Expand Down
30 changes: 30 additions & 0 deletions entry_heap.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package cron

type entryHeap []*Entry

func (h entryHeap) Len() int { return len(h) }
func (h entryHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] }
func (h entryHeap) Less(i, j int) bool {
// Two zero times should return false.
// Otherwise, zero is "greater" than any other time.
// (To sort it at the end of the list.)
if h[i].Next.IsZero() {
return false
}
if h[j].Next.IsZero() {
return true
}
return h[i].Next.Before(h[j].Next)
}

func (h *entryHeap) Push(x any) {
*h = append(*h, x.(*Entry))
}

func (h *entryHeap) Pop() any {
old := *h
n := len(old)
x := old[n-1]
*h = old[0 : n-1]
return x
}

0 comments on commit 7171f48

Please sign in to comment.