Skip to content

Commit

Permalink
feat(logger): use slog (#19)
Browse files Browse the repository at this point in the history
  • Loading branch information
janrnc authored Mar 3, 2024
1 parent 4847beb commit d78a4dd
Show file tree
Hide file tree
Showing 10 changed files with 68 additions and 152 deletions.
4 changes: 2 additions & 2 deletions .devcontainer/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
FROM golang:bookworm
FROM golang:1.21-bookworm

RUN curl -sSfL https://raw.githubusercontent.com/golangci/golangci-lint/master/install.sh | sh -s -- -b $(go env GOPATH)/bin v1.53.3
RUN curl -sSfL https://raw.githubusercontent.com/golangci/golangci-lint/master/install.sh | sh -s -- -b $(go env GOPATH)/bin v1.56.2
6 changes: 3 additions & 3 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ jobs:
- name: Setup Go
uses: actions/setup-go@v4
with:
go-version: '1.20'
go-version: '1.21'

- name: Test
run: go test -v -race -coverprofile=coverage.out -covermode=atomic ./...
Expand All @@ -39,11 +39,11 @@ jobs:
- name: Setup Go
uses: actions/setup-go@v4
with:
go-version: '1.20'
go-version: '1.21'
cache: false

- name: Lint
uses: golangci/golangci-lint-action@v3
with:
version: v1.53
version: v1.56

13 changes: 7 additions & 6 deletions chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package cron

import (
"fmt"
"log/slog"
"runtime"
"sync"
"time"
Expand Down Expand Up @@ -38,7 +39,7 @@ func (c Chain) Then(job func()) func() {
}

// Recover panics in wrapped jobs and log them with the provided logger.
func Recover(logger Logger) JobWrapper {
func Recover(logger *slog.Logger) JobWrapper {
return func(job func()) func() {
return func() {
defer func() {
Expand All @@ -50,7 +51,7 @@ func Recover(logger Logger) JobWrapper {
if !ok {
err = fmt.Errorf("%v", r)
}
logger.Error(err, "panic", "stack", "...\n"+string(buf))
logger.Error(err.Error(), "event", "panic", "stack", "...\n"+string(buf))
}
}()
job()
Expand All @@ -61,15 +62,15 @@ func Recover(logger Logger) JobWrapper {
// DelayIfStillRunning serializes jobs, delaying subsequent runs until the
// previous one is complete. Jobs running after a delay of more than a minute
// have the delay logged at Info.
func DelayIfStillRunning(logger Logger) JobWrapper {
func DelayIfStillRunning(logger *slog.Logger) JobWrapper {
return func(job func()) func() {
var mu sync.Mutex
return func() {
start := time.Now()
mu.Lock()
defer mu.Unlock()
if dur := time.Since(start); dur > time.Minute {
logger.Info("delay", "duration", dur)
logger.Info("job execution delayed", "event", "delay", "duration", dur)
}
job()
}
Expand All @@ -78,7 +79,7 @@ func DelayIfStillRunning(logger Logger) JobWrapper {

// SkipIfStillRunning skips an invocation of the job if a previous invocation is
// still running. It logs skips to the given logger at Info level.
func SkipIfStillRunning(logger Logger) JobWrapper {
func SkipIfStillRunning(logger *slog.Logger) JobWrapper {
return func(job func()) func() {
var ch = make(chan struct{}, 1)
ch <- struct{}{}
Expand All @@ -88,7 +89,7 @@ func SkipIfStillRunning(logger Logger) JobWrapper {
defer func() { ch <- v }()
job()
default:
logger.Info("skip")
logger.Info("job execution skipped", "event", "skip")
}
}
}
Expand Down
27 changes: 12 additions & 15 deletions chain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,15 @@ package cron

import (
"io"
"log"
"log/slog"
"reflect"
"sync"
"testing"
"time"
)

var discardLogger = slog.New(slog.NewTextHandler(io.Discard, nil))

func appendingJob(slice *[]int, value int) func() {
var m sync.Mutex
return func() {
Expand Down Expand Up @@ -56,12 +58,7 @@ func TestChainRecover(t *testing.T) {
})

t.Run("Recovering JobWrapper recovers", func(t *testing.T) {
NewChain(Recover(PrintfLogger(log.New(io.Discard, "", 0)))).
Then(panickingJob)()
})

t.Run("composed with the *IfStillRunning wrappers", func(t *testing.T) {
NewChain(Recover(PrintfLogger(log.New(io.Discard, "", 0)))).
NewChain(Recover(discardLogger)).
Then(panickingJob)()
})
}
Expand Down Expand Up @@ -101,7 +98,7 @@ func TestChainDelayIfStillRunning(t *testing.T) {

t.Run("runs immediately", func(t *testing.T) {
var j countJob
wrappedJob := NewChain(DelayIfStillRunning(DiscardLogger)).Then(j.job())
wrappedJob := NewChain(DelayIfStillRunning(discardLogger)).Then(j.job())
go wrappedJob()
time.Sleep(2 * time.Millisecond) // Give the job 2ms to complete.
if c := j.Done(); c != 1 {
Expand All @@ -111,7 +108,7 @@ func TestChainDelayIfStillRunning(t *testing.T) {

t.Run("second run immediate if first done", func(t *testing.T) {
var j countJob
wrappedJob := NewChain(DelayIfStillRunning(DiscardLogger)).Then(j.job())
wrappedJob := NewChain(DelayIfStillRunning(discardLogger)).Then(j.job())
go func() {
go wrappedJob()
time.Sleep(time.Millisecond)
Expand All @@ -126,7 +123,7 @@ func TestChainDelayIfStillRunning(t *testing.T) {
t.Run("second run delayed if first not done", func(t *testing.T) {
var j countJob
j.delay = 10 * time.Millisecond
wrappedJob := NewChain(DelayIfStillRunning(DiscardLogger)).Then(j.job())
wrappedJob := NewChain(DelayIfStillRunning(discardLogger)).Then(j.job())
go func() {
go wrappedJob()
time.Sleep(time.Millisecond)
Expand Down Expand Up @@ -155,7 +152,7 @@ func TestChainSkipIfStillRunning(t *testing.T) {

t.Run("runs immediately", func(t *testing.T) {
var j countJob
wrappedJob := NewChain(SkipIfStillRunning(DiscardLogger)).Then(j.job())
wrappedJob := NewChain(SkipIfStillRunning(discardLogger)).Then(j.job())
go wrappedJob()
time.Sleep(2 * time.Millisecond) // Give the job 2ms to complete.
if c := j.Done(); c != 1 {
Expand All @@ -165,7 +162,7 @@ func TestChainSkipIfStillRunning(t *testing.T) {

t.Run("second run immediate if first done", func(t *testing.T) {
var j countJob
wrappedJob := NewChain(SkipIfStillRunning(DiscardLogger)).Then(j.job())
wrappedJob := NewChain(SkipIfStillRunning(discardLogger)).Then(j.job())
go func() {
go wrappedJob()
time.Sleep(time.Millisecond)
Expand All @@ -180,7 +177,7 @@ func TestChainSkipIfStillRunning(t *testing.T) {
t.Run("second run skipped if first not done", func(t *testing.T) {
var j countJob
j.delay = 10 * time.Millisecond
wrappedJob := NewChain(SkipIfStillRunning(DiscardLogger)).Then(j.job())
wrappedJob := NewChain(SkipIfStillRunning(discardLogger)).Then(j.job())
go func() {
go wrappedJob()
time.Sleep(time.Millisecond)
Expand All @@ -206,7 +203,7 @@ func TestChainSkipIfStillRunning(t *testing.T) {
t.Run("skip 10 jobs on rapid fire", func(t *testing.T) {
var j countJob
j.delay = 10 * time.Millisecond
wrappedJob := NewChain(SkipIfStillRunning(DiscardLogger)).Then(j.job())
wrappedJob := NewChain(SkipIfStillRunning(discardLogger)).Then(j.job())
for i := 0; i < 11; i++ {
go wrappedJob()
}
Expand All @@ -221,7 +218,7 @@ func TestChainSkipIfStillRunning(t *testing.T) {
var j1, j2 countJob
j1.delay = 10 * time.Millisecond
j2.delay = 10 * time.Millisecond
chain := NewChain(SkipIfStillRunning(DiscardLogger))
chain := NewChain(SkipIfStillRunning(discardLogger))
wrappedJob1 := chain.Then(j1.job())
wrappedJob2 := chain.Then(j2.job())
for i := 0; i < 11; i++ {
Expand Down
57 changes: 30 additions & 27 deletions cron.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
package cron

import (
"container/heap"
"context"
"fmt"
"log/slog"
"sync"
"time"
"fmt"
"container/heap"
)

// Cron keeps track of any number of entries, invoking the associated func as
Expand All @@ -19,11 +20,11 @@ type Cron struct {
remove chan ID
snapshot chan chan []Entry
running bool
logger Logger
logger *slog.Logger
runningMu sync.Mutex
location *time.Location
parser ScheduleParser
next ID
next ID
jobWaiter sync.WaitGroup
}

Expand Down Expand Up @@ -58,24 +59,25 @@ type Entry struct {
// Prev is the last time this job was run, or the zero time if never.
Prev time.Time

job func()
job func()
logger *slog.Logger
}

// New returns a new Cron job runner, modified by the given options.
//
// Available Settings
//
// Time Zone
// Description: The time zone in which schedules are interpreted
// Default: time.Local
// Time Zone
// Description: The time zone in which schedules are interpreted
// Default: time.Local
//
// Parser
// Description: Parser converts cron spec strings into cron.Schedules.
// Default: Accepts this spec: https://en.wikipedia.org/wiki/Cron
// Parser
// Description: Parser converts cron spec strings into cron.Schedules.
// Default: Accepts this spec: https://en.wikipedia.org/wiki/Cron
//
// Chain
// Description: Wrap submitted jobs to customize behavior.
// Default: A chain that recovers panics and logs them to stderr.
// Chain
// Description: Wrap submitted jobs to customize behavior.
// Default: A chain that recovers panics and logs them to stderr.
//
// See "cron.With*" to modify the default behavior.
func New(opts ...Option) *Cron {
Expand All @@ -88,10 +90,10 @@ func New(opts ...Option) *Cron {
remove: make(chan ID),
running: false,
runningMu: sync.Mutex{},
logger: DefaultLogger,
logger: slog.Default(),
location: time.Local,
parser: standardParser,
next: 1,
next: 1,
}
for _, opt := range opts {
opt(c)
Expand All @@ -115,15 +117,16 @@ func (c *Cron) Add(spec string, cmd func()) (ID, error) {
func (c *Cron) Schedule(schedule Schedule, cmd func()) (ID, error) {
c.runningMu.Lock()
defer c.runningMu.Unlock()

if c.next == 0 {
return 0, fmt.Errorf("run out of available ids")
}

entry := &Entry{
ID: c.next,
Schedule: schedule,
job: c.chain.Then(cmd),
ID: c.next,
Schedule: schedule,
job: c.chain.Then(cmd),
logger: c.logger.With("id", c.next),
}
c.next++
if !c.running {
Expand Down Expand Up @@ -198,13 +201,13 @@ func (c *Cron) Run() {
// run the scheduler.. this is private just due to the need to synchronize
// access to the 'running' state variable.
func (c *Cron) run() {
c.logger.Info("start")
c.logger.Info("starting scheduler", "event", "start")

// Figure out the next activation times for each entry.
now := c.now()
for _, entry := range c.entries {
entry.Next = entry.Schedule.Next(now)
c.logger.Info("schedule", "now", now, "entry", entry.ID, "next", entry.Next)
entry.logger.Debug("next execution time computed", "event", "next", "now", now, "next", entry.Next)
}
heap.Init(&c.entries)

Expand All @@ -222,7 +225,7 @@ func (c *Cron) run() {
select {
case now = <-timer.C:
now = now.In(c.location)
c.logger.Info("wake", "now", now)
c.logger.Debug("scheduler woke up", "event", "wake", "now", now)

// Run every entry whose next time was less than now
for {
Expand All @@ -234,30 +237,29 @@ func (c *Cron) run() {
e.Prev = e.Next
e.Next = e.Schedule.Next(now)
heap.Push(&c.entries, e)
c.logger.Info("run", "now", now, "entry", e.ID, "next", e.Next)
e.logger.Info("starting job", "event", "run", "now", now, "next", e.Next)
}

case newEntry := <-c.add:
timer.Stop()
now = c.now()
newEntry.Next = newEntry.Schedule.Next(now)
heap.Push(&c.entries, newEntry)
c.logger.Info("added", "now", now, "entry", newEntry.ID, "next", newEntry.Next)
newEntry.logger.Info("added new entry", "event", "add", "now", now, "next", newEntry.Next)

case replyChan := <-c.snapshot:
replyChan <- c.entrySnapshot()
continue

case <-c.stop:
timer.Stop()
c.logger.Info("stop")
c.logger.Info("stopping scheduler", "event", "stop")
return

case id := <-c.remove:
timer.Stop()
now = c.now()
c.removeEntry(id)
c.logger.Info("removed", "entry", id)
}

break
Expand Down Expand Up @@ -308,6 +310,7 @@ func (c *Cron) entrySnapshot() []Entry {
func (c *Cron) removeEntry(id ID) {
for idx, e := range c.entries {
if e.ID == id {
e.logger.Info("removed entry", "event", "remove")
heap.Remove(&c.entries, idx)
return
}
Expand Down
10 changes: 5 additions & 5 deletions cron_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,14 @@ package cron

import (
"bytes"
"container/heap"
"fmt"
"log"
"log/slog"
"strings"
"sync"
"sync/atomic"
"testing"
"time"
"container/heap"
)

// Many tests schedule a job for every second, and then wait at most a second
Expand All @@ -35,8 +35,8 @@ func (sw *syncWriter) String() string {
return sw.wr.String()
}

func newBufLogger(sw *syncWriter) Logger {
return PrintfLogger(log.New(sw, "", log.LstdFlags))
func newBufLogger(sw *syncWriter) *slog.Logger {
return slog.New(slog.NewTextHandler(sw, nil))
}

func TestFuncPanicRecovery(t *testing.T) {
Expand Down Expand Up @@ -529,7 +529,7 @@ func TestJob(t *testing.T) {

cron.Start()
defer cron.Stop()

select {
case <-time.After(OneSecond):
t.FailNow()
Expand Down
Loading

0 comments on commit d78a4dd

Please sign in to comment.