Skip to content

Commit

Permalink
feat(job)!: improve standard job implementations (#140)
Browse files Browse the repository at this point in the history
  • Loading branch information
reugn authored Sep 25, 2024
1 parent ed9e582 commit 8db8288
Show file tree
Hide file tree
Showing 7 changed files with 123 additions and 112 deletions.
6 changes: 4 additions & 2 deletions examples/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,9 @@ func sampleJobs(ctx context.Context, wg *sync.WaitGroup) {
return
}
curlJob := job.NewCurlJob(request)
functionJob := job.NewFunctionJobWithDesc("42", func(_ context.Context) (int, error) { return 42, nil })
functionJob := job.NewFunctionJobWithDesc(
func(_ context.Context) (int, error) { return 42, nil },
"42")

shellJobDetail := quartz.NewJobDetail(shellJob, quartz.NewJobKey("shellJob"))
curlJobDetail := quartz.NewJobDetail(curlJob, quartz.NewJobKey("curlJob"))
Expand All @@ -114,7 +116,7 @@ func sampleJobs(ctx context.Context, wg *sync.WaitGroup) {
} else {
fmt.Println(string(response))
}
fmt.Printf("Function job result: %v\n", *functionJob.Result())
fmt.Printf("Function job result: %v\n", functionJob.Result())

time.Sleep(time.Second * 2)
sched.Stop()
Expand Down
70 changes: 37 additions & 33 deletions job/curl_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,64 +12,67 @@ import (
"github.com/reugn/go-quartz/quartz"
)

// CurlJob represents a cURL command Job, implements the quartz.Job interface.
// cURL is a command-line tool for getting or sending data including files
// using URL syntax.
// CurlJob represents a job that can be used to schedule HTTP requests.
// It implements the [quartz.Job] interface.
type CurlJob struct {
sync.Mutex
httpClient HTTPHandler
request *http.Request
response *http.Response
jobStatus Status
mtx sync.Mutex
httpClient HTTPHandler
request *http.Request
response *http.Response
jobStatus Status

once sync.Once
description string
callback func(context.Context, *CurlJob)
}

var _ quartz.Job = (*CurlJob)(nil)

// HTTPHandler sends an HTTP request and returns an HTTP response,
// following policy (such as redirects, cookies, auth) as configured
// on the implementing HTTP client.
// HTTPHandler sends an HTTP request and returns an HTTP response, following
// policy (such as redirects, cookies, auth) as configured on the implementing
// HTTP client.
type HTTPHandler interface {
Do(req *http.Request) (*http.Response, error)
}

// CurlJobOptions represents optional parameters for constructing a CurlJob.
// CurlJobOptions represents optional parameters for constructing a [CurlJob].
type CurlJobOptions struct {
HTTPClient HTTPHandler
Callback func(context.Context, *CurlJob)
}

// NewCurlJob returns a new CurlJob using the default HTTP client.
// NewCurlJob returns a new [CurlJob] using the default HTTP client.
func NewCurlJob(request *http.Request) *CurlJob {
return NewCurlJobWithOptions(request, CurlJobOptions{HTTPClient: http.DefaultClient})
}

// NewCurlJobWithOptions returns a new CurlJob configured with CurlJobOptions.
// NewCurlJobWithOptions returns a new [CurlJob] configured with [CurlJobOptions].
func NewCurlJobWithOptions(request *http.Request, opts CurlJobOptions) *CurlJob {
if opts.HTTPClient == nil {
opts.HTTPClient = http.DefaultClient
}
return &CurlJob{
httpClient: opts.HTTPClient,
request: request,
jobStatus: StatusNA,
description: formatRequest(request),
callback: opts.Callback,
httpClient: opts.HTTPClient,
request: request,
jobStatus: StatusNA,
callback: opts.Callback,
}
}

// Description returns the description of the CurlJob.
func (cu *CurlJob) Description() string {
cu.once.Do(func() {
cu.description = formatRequest(cu.request)
})
return fmt.Sprintf("CurlJob%s%s", quartz.Sep, cu.description)
}

// DumpResponse returns the response of the job in its HTTP/1.x wire
// representation.
// If body is true, DumpResponse also returns the body.
func (cu *CurlJob) DumpResponse(body bool) ([]byte, error) {
cu.Lock()
defer cu.Unlock()
cu.mtx.Lock()
defer cu.mtx.Unlock()
if cu.response != nil {
return httputil.DumpResponse(cu.response, body)
}
Expand All @@ -78,42 +81,43 @@ func (cu *CurlJob) DumpResponse(body bool) ([]byte, error) {

// JobStatus returns the status of the CurlJob.
func (cu *CurlJob) JobStatus() Status {
cu.Lock()
defer cu.Unlock()
cu.mtx.Lock()
defer cu.mtx.Unlock()
return cu.jobStatus
}

func formatRequest(r *http.Request) string {
var request []string
url := fmt.Sprintf("%v %v %v", r.Method, r.URL, r.Proto)
request = append(request, url)
var sb strings.Builder
_, _ = fmt.Fprintf(&sb, "%v %v %v", r.Method, r.URL, r.Proto)
for name, headers := range r.Header {
for _, h := range headers {
request = append(request, fmt.Sprintf("%v: %v", name, h))
_, _ = fmt.Fprintf(&sb, "\n%v: %v", name, h)
}
}
if r.ContentLength > 0 {
request = append(request, fmt.Sprintf("Content Length: %d", r.ContentLength))
_, _ = fmt.Fprintf(&sb, "\nContent Length: %d", r.ContentLength)
}
return strings.Join(request, "\n")
return sb.String()
}

// Execute is called by a Scheduler when the Trigger associated with this job fires.
func (cu *CurlJob) Execute(ctx context.Context) error {
cu.Lock()
cu.mtx.Lock()
cu.request = cu.request.WithContext(ctx)
var err error
cu.response, err = cu.httpClient.Do(cu.request)

if err == nil && cu.response.StatusCode >= 200 && cu.response.StatusCode < 400 {
// update job status based on HTTP response code
if cu.response != nil && cu.response.StatusCode >= http.StatusOK &&
cu.response.StatusCode < http.StatusBadRequest {
cu.jobStatus = StatusOK
} else {
cu.jobStatus = StatusFailure
}
cu.Unlock()
cu.mtx.Unlock()

if cu.callback != nil {
cu.callback(ctx, cu)
}
return nil
return err
}
74 changes: 37 additions & 37 deletions job/function_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,81 +8,81 @@ import (
"github.com/reugn/go-quartz/quartz"
)

// Function represents an argument-less function which returns
// a generic type R and a possible error.
// Function represents a function which takes a [context.Context] as its
// only argument and returns a generic type R and a possible error.
type Function[R any] func(context.Context) (R, error)

// FunctionJob represents a Job that invokes the passed Function,
// implements the quartz.Job interface.
// FunctionJob represents a Job that invokes the passed [Function],
// implements the [quartz.Job] interface.
type FunctionJob[R any] struct {
sync.RWMutex
function *Function[R]
desc string
result *R
err error
jobStatus Status
mtx sync.RWMutex
function Function[R]
description string
result R
err error
jobStatus Status
}

var _ quartz.Job = (*FunctionJob[any])(nil)

// NewFunctionJob returns a new FunctionJob without an explicit description.
// NewFunctionJob returns a new [FunctionJob] with a generated description.
func NewFunctionJob[R any](function Function[R]) *FunctionJob[R] {
return &FunctionJob[R]{
function: &function,
desc: fmt.Sprintf("FunctionJob%s%p", quartz.Sep, &function),
jobStatus: StatusNA,
}
return NewFunctionJobWithDesc(
function,
fmt.Sprintf("FunctionJob%s%p", quartz.Sep, &function),
)
}

// NewFunctionJobWithDesc returns a new FunctionJob with an explicit description.
func NewFunctionJobWithDesc[R any](desc string, function Function[R]) *FunctionJob[R] {
// NewFunctionJobWithDesc returns a new [FunctionJob] with the specified
// description.
func NewFunctionJobWithDesc[R any](function Function[R],
description string) *FunctionJob[R] {
return &FunctionJob[R]{
function: &function,
desc: desc,
jobStatus: StatusNA,
function: function,
description: description,
jobStatus: StatusNA,
}
}

// Description returns the description of the FunctionJob.
func (f *FunctionJob[R]) Description() string {
return f.desc
return f.description
}

// Execute is called by a Scheduler when the Trigger associated with this job fires.
// It invokes the held function, setting the results in Result and Error members.
// It invokes the held function, setting the results in result and error members.
func (f *FunctionJob[R]) Execute(ctx context.Context) error {
result, err := (*f.function)(ctx)
f.Lock()
result, err := f.function(ctx)
f.mtx.Lock()
if err != nil {
var zero R
f.jobStatus = StatusFailure
f.result = nil
f.err = err
f.result, f.err = zero, err
} else {
f.jobStatus = StatusOK
f.result = &result
f.err = nil
f.result, f.err = result, nil
}
f.Unlock()
f.mtx.Unlock()
return err
}

// Result returns the result of the FunctionJob.
func (f *FunctionJob[R]) Result() *R {
f.RLock()
defer f.RUnlock()
func (f *FunctionJob[R]) Result() R {
f.mtx.RLock()
defer f.mtx.RUnlock()
return f.result
}

// Error returns the error of the FunctionJob.
func (f *FunctionJob[R]) Error() error {
f.RLock()
defer f.RUnlock()
f.mtx.RLock()
defer f.mtx.RUnlock()
return f.err
}

// JobStatus returns the status of the FunctionJob.
func (f *FunctionJob[R]) JobStatus() Status {
f.RLock()
defer f.RUnlock()
f.mtx.RLock()
defer f.mtx.RUnlock()
return f.jobStatus
}
31 changes: 18 additions & 13 deletions job/function_job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,24 +21,28 @@ func TestFunctionJob(t *testing.T) {
return "fired1", nil
})

funcJob2 := job.NewFunctionJob(func(_ context.Context) (int, error) {
funcJob2 := job.NewFunctionJob(func(_ context.Context) (*int, error) {
atomic.AddInt32(&n, 2)
return 42, nil
result := 42
return &result, nil
})

sched := quartz.NewStdScheduler()
sched.Start(ctx)
assert.IsNil(t, sched.ScheduleJob(quartz.NewJobDetail(funcJob1, quartz.NewJobKey("funcJob1")),

assert.IsNil(t, sched.ScheduleJob(quartz.NewJobDetail(funcJob1,
quartz.NewJobKey("funcJob1")),
quartz.NewRunOnceTrigger(time.Millisecond*300)))
assert.IsNil(t, sched.ScheduleJob(quartz.NewJobDetail(funcJob2, quartz.NewJobKey("funcJob2")),
assert.IsNil(t, sched.ScheduleJob(quartz.NewJobDetail(funcJob2,
quartz.NewJobKey("funcJob2")),
quartz.NewRunOnceTrigger(time.Millisecond*800)))

time.Sleep(time.Second)
assert.IsNil(t, sched.Clear())
sched.Stop()

assert.Equal(t, funcJob1.JobStatus(), job.StatusOK)
assert.NotEqual(t, funcJob1.Result(), nil)
assert.Equal(t, *funcJob1.Result(), "fired1")
assert.Equal(t, funcJob1.Result(), "fired1")

assert.Equal(t, funcJob2.JobStatus(), job.StatusOK)
assert.NotEqual(t, funcJob2.Result(), nil)
Expand All @@ -47,22 +51,22 @@ func TestFunctionJob(t *testing.T) {
assert.Equal(t, int(atomic.LoadInt32(&n)), 6)
}

func TestNewFunctionJobWithDesc(t *testing.T) {
func TestNewFunctionJob_WithDesc(t *testing.T) {
jobDesc := "test job"

funcJob1 := job.NewFunctionJobWithDesc(jobDesc, func(_ context.Context) (string, error) {
funcJob1 := job.NewFunctionJobWithDesc(func(_ context.Context) (string, error) {
return "fired1", nil
})
}, jobDesc)

funcJob2 := job.NewFunctionJobWithDesc(jobDesc, func(_ context.Context) (string, error) {
funcJob2 := job.NewFunctionJobWithDesc(func(_ context.Context) (string, error) {
return "fired2", nil
})
}, jobDesc)

assert.Equal(t, funcJob1.Description(), jobDesc)
assert.Equal(t, funcJob2.Description(), jobDesc)
}

func TestFunctionJobRespectsContext(t *testing.T) {
func TestFunctionJob_RespectsContext(t *testing.T) {
var n int
funcJob2 := job.NewFunctionJob(func(ctx context.Context) (bool, error) {
timer := time.NewTimer(time.Hour)
Expand All @@ -79,6 +83,7 @@ func TestFunctionJobRespectsContext(t *testing.T) {

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

sig := make(chan struct{})
go func() { defer close(sig); _ = funcJob2.Execute(ctx) }()

Expand All @@ -92,5 +97,5 @@ func TestFunctionJobRespectsContext(t *testing.T) {
t.Fatal("job side effect should have reflected cancelation:", n)
}
assert.ErrorIs(t, funcJob2.Error(), context.Canceled)
assert.IsNil(t, funcJob2.Result())
assert.Equal(t, funcJob2.Result(), false)
}
4 changes: 2 additions & 2 deletions job/job_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@ const (
// StatusNA is the initial Job status.
StatusNA Status = iota

// StatusOK indicates the Job completed successfully.
// StatusOK indicates that the Job completed successfully.
StatusOK

// StatusFailure indicates the Job failed.
// StatusFailure indicates that the Job failed.
StatusFailure
)
Loading

0 comments on commit 8db8288

Please sign in to comment.