Skip to content

Commit

Permalink
Add Unregister method to Scheduler
Browse files Browse the repository at this point in the history
  • Loading branch information
hibiken committed Mar 11, 2021
1 parent f618f5b commit de28c1e
Show file tree
Hide file tree
Showing 3 changed files with 69 additions and 3 deletions.
6 changes: 5 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

### Added

- `Unregister` method is added to `Scheduler` to remove a registered entry.

## [0.15.0] - 2021-01-31

**IMPORTATNT**: All `Inspector` related code are moved to subpackage "github.com/hibiken/asynq/inspeq"
Expand All @@ -15,7 +19,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

- `Inspector` related code are moved to subpackage "github.com/hibken/asynq/inspeq".
- `RedisConnOpt` interface has changed slightly. If you have been passing `RedisClientOpt`, `RedisFailoverClientOpt`, or `RedisClusterClientOpt` as a pointer,
update your code to pass as a value.
update your code to pass as a value.
- `ErrorMsg` field in `RetryTask` and `ArchivedTask` was renamed to `LastError`.

### Added
Expand Down
20 changes: 19 additions & 1 deletion scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,10 @@ type Scheduler struct {
done chan struct{}
wg sync.WaitGroup
errHandler func(task *Task, opts []Option, err error)
// idmap maps Scheduler's entry ID to cron.EntryID
// to avoid using cron.EntryID as the public API of
// the Scheduler.
idmap map[string]cron.EntryID
}

// NewScheduler returns a new Scheduler instance given the redis connection option.
Expand Down Expand Up @@ -65,6 +69,7 @@ func NewScheduler(r RedisConnOpt, opts *SchedulerOpts) *Scheduler {
location: loc,
done: make(chan struct{}),
errHandler: opts.EnqueueErrorHandler,
idmap: make(map[string]cron.EntryID),
}
}

Expand Down Expand Up @@ -145,12 +150,25 @@ func (s *Scheduler) Register(cronspec string, task *Task, opts ...Option) (entry
logger: s.logger,
errHandler: s.errHandler,
}
if _, err = s.cron.AddJob(cronspec, job); err != nil {
cronID, err := s.cron.AddJob(cronspec, job)
if err != nil {
return "", err
}
s.idmap[job.id.String()] = cronID
return job.id.String(), nil
}

// Unregister removes a registered entry by entry ID.
// Unregister returns a non-nil error if no entries were found for the given entryID.
func (s *Scheduler) Unregister(entryID string) error {
cronID, ok := s.idmap[entryID]
if !ok {
return fmt.Errorf("asynq: no scheduler entry found")
}
s.cron.Remove(cronID)
return nil
}

// Run starts the scheduler until an os signal to exit the program is received.
// It returns an error if scheduler is already running or has been stopped.
func (s *Scheduler) Run() error {
Expand Down
46 changes: 45 additions & 1 deletion scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (
"github.com/hibiken/asynq/internal/base"
)

func TestScheduler(t *testing.T) {
func TestSchedulerRegister(t *testing.T) {
tests := []struct {
cronspec string
task *Task
Expand Down Expand Up @@ -116,3 +116,47 @@ func TestSchedulerWhenRedisDown(t *testing.T) {
}
mu.Unlock()
}

func TestSchedulerUnregister(t *testing.T) {
tests := []struct {
cronspec string
task *Task
opts []Option
wait time.Duration
queue string
}{
{
cronspec: "@every 3s",
task: NewTask("task1", nil),
opts: []Option{MaxRetry(10)},
wait: 10 * time.Second,
queue: "default",
},
}

r := setup(t)

for _, tc := range tests {
scheduler := NewScheduler(getRedisConnOpt(t), nil)
entryID, err := scheduler.Register(tc.cronspec, tc.task, tc.opts...)
if err != nil {
t.Fatal(err)
}
if err := scheduler.Unregister(entryID); err != nil {
t.Fatal(err)
}

if err := scheduler.Start(); err != nil {
t.Fatal(err)
}
time.Sleep(tc.wait)
if err := scheduler.Stop(); err != nil {
t.Fatal(err)
}

got := asynqtest.GetPendingMessages(t, r, tc.queue)
if len(got) != 0 {
t.Errorf("%d tasks were enqueued, want zero", len(got))
}
}
}

0 comments on commit de28c1e

Please sign in to comment.