diff --git a/CHANGELOG.md b/CHANGELOG.md index 438ab4ee..dfd45cac 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Added + +- `Unregister` method is added to `Scheduler` to remove a registered entry. + ## [0.15.0] - 2021-01-31 **IMPORTATNT**: All `Inspector` related code are moved to subpackage "github.com/hibiken/asynq/inspeq" @@ -15,7 +19,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - `Inspector` related code are moved to subpackage "github.com/hibken/asynq/inspeq". - `RedisConnOpt` interface has changed slightly. If you have been passing `RedisClientOpt`, `RedisFailoverClientOpt`, or `RedisClusterClientOpt` as a pointer, - update your code to pass as a value. + update your code to pass as a value. - `ErrorMsg` field in `RetryTask` and `ArchivedTask` was renamed to `LastError`. ### Added diff --git a/scheduler.go b/scheduler.go index 5c4dbf61..8daa5077 100644 --- a/scheduler.go +++ b/scheduler.go @@ -30,6 +30,10 @@ type Scheduler struct { done chan struct{} wg sync.WaitGroup errHandler func(task *Task, opts []Option, err error) + // idmap maps Scheduler's entry ID to cron.EntryID + // to avoid using cron.EntryID as the public API of + // the Scheduler. + idmap map[string]cron.EntryID } // NewScheduler returns a new Scheduler instance given the redis connection option. @@ -65,6 +69,7 @@ func NewScheduler(r RedisConnOpt, opts *SchedulerOpts) *Scheduler { location: loc, done: make(chan struct{}), errHandler: opts.EnqueueErrorHandler, + idmap: make(map[string]cron.EntryID), } } @@ -145,12 +150,25 @@ func (s *Scheduler) Register(cronspec string, task *Task, opts ...Option) (entry logger: s.logger, errHandler: s.errHandler, } - if _, err = s.cron.AddJob(cronspec, job); err != nil { + cronID, err := s.cron.AddJob(cronspec, job) + if err != nil { return "", err } + s.idmap[job.id.String()] = cronID return job.id.String(), nil } +// Unregister removes a registered entry by entry ID. +// Unregister returns a non-nil error if no entries were found for the given entryID. +func (s *Scheduler) Unregister(entryID string) error { + cronID, ok := s.idmap[entryID] + if !ok { + return fmt.Errorf("asynq: no scheduler entry found") + } + s.cron.Remove(cronID) + return nil +} + // Run starts the scheduler until an os signal to exit the program is received. // It returns an error if scheduler is already running or has been stopped. func (s *Scheduler) Run() error { diff --git a/scheduler_test.go b/scheduler_test.go index 47905af8..20339cae 100644 --- a/scheduler_test.go +++ b/scheduler_test.go @@ -14,7 +14,7 @@ import ( "github.com/hibiken/asynq/internal/base" ) -func TestScheduler(t *testing.T) { +func TestSchedulerRegister(t *testing.T) { tests := []struct { cronspec string task *Task @@ -116,3 +116,47 @@ func TestSchedulerWhenRedisDown(t *testing.T) { } mu.Unlock() } + +func TestSchedulerUnregister(t *testing.T) { + tests := []struct { + cronspec string + task *Task + opts []Option + wait time.Duration + queue string + }{ + { + cronspec: "@every 3s", + task: NewTask("task1", nil), + opts: []Option{MaxRetry(10)}, + wait: 10 * time.Second, + queue: "default", + }, + } + + r := setup(t) + + for _, tc := range tests { + scheduler := NewScheduler(getRedisConnOpt(t), nil) + entryID, err := scheduler.Register(tc.cronspec, tc.task, tc.opts...) + if err != nil { + t.Fatal(err) + } + if err := scheduler.Unregister(entryID); err != nil { + t.Fatal(err) + } + + if err := scheduler.Start(); err != nil { + t.Fatal(err) + } + time.Sleep(tc.wait) + if err := scheduler.Stop(); err != nil { + t.Fatal(err) + } + + got := asynqtest.GetPendingMessages(t, r, tc.queue) + if len(got) != 0 { + t.Errorf("%d tasks were enqueued, want zero", len(got)) + } + } +}