Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add the scheduler option HeartbeatInterval #956

Merged
merged 2 commits into from
Nov 7, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions internal/rdb/rdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -1437,7 +1437,7 @@ func (r *RDB) ClearServerState(host string, pid int, serverID string) error {

// KEYS[1] -> asynq:schedulers:{<schedulerID>}
// ARGV[1] -> TTL in seconds
// ARGV[2:] -> schedler entries
// ARGV[2:] -> scheduler entries
var writeSchedulerEntriesCmd = redis.NewScript(`
redis.call("DEL", KEYS[1])
for i = 2, #ARGV do
Expand Down Expand Up @@ -1468,10 +1468,10 @@ func (r *RDB) WriteSchedulerEntries(schedulerID string, entries []*base.Schedule
}

// ClearSchedulerEntries deletes scheduler entries data from redis.
func (r *RDB) ClearSchedulerEntries(scheduelrID string) error {
func (r *RDB) ClearSchedulerEntries(schedulerID string) error {
var op errors.Op = "rdb.ClearSchedulerEntries"
ctx := context.Background()
key := base.SchedulerEntriesKey(scheduelrID)
key := base.SchedulerEntriesKey(schedulerID)
if err := r.client.ZRem(ctx, base.AllSchedulers, key).Err(); err != nil {
return errors.E(op, errors.Unknown, &errors.RedisCommandError{Command: "zrem", Err: err})
}
Expand Down
66 changes: 42 additions & 24 deletions scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,17 @@ type Scheduler struct {

state *serverState

logger *log.Logger
client *Client
rdb *rdb.RDB
cron *cron.Cron
location *time.Location
done chan struct{}
wg sync.WaitGroup
preEnqueueFunc func(task *Task, opts []Option)
postEnqueueFunc func(info *TaskInfo, err error)
errHandler func(task *Task, opts []Option, err error)
heartbeatInterval time.Duration
logger *log.Logger
client *Client
rdb *rdb.RDB
cron *cron.Cron
location *time.Location
done chan struct{}
wg sync.WaitGroup
preEnqueueFunc func(task *Task, opts []Option)
postEnqueueFunc func(info *TaskInfo, err error)
errHandler func(task *Task, opts []Option, err error)

// guards idmap
mu sync.Mutex
Expand All @@ -48,6 +49,8 @@ type Scheduler struct {
sharedConnection bool
}

const defaultHeartbeatInterval = 10 * time.Second

// NewScheduler returns a new Scheduler instance given the redis connection option.
// The parameter opts is optional, defaults will be used if opts is set to nil
func NewScheduler(r RedisConnOpt, opts *SchedulerOpts) *Scheduler {
Expand All @@ -68,6 +71,11 @@ func NewSchedulerFromRedisClient(c redis.UniversalClient, opts *SchedulerOpts) *
opts = &SchedulerOpts{}
}

heartbeatInterval := opts.HeartbeatInterval
if heartbeatInterval <= 0 {
heartbeatInterval = defaultHeartbeatInterval
}

logger := log.NewLogger(opts.Logger)
loglevel := opts.LogLevel
if loglevel == level_unspecified {
Expand All @@ -81,18 +89,19 @@ func NewSchedulerFromRedisClient(c redis.UniversalClient, opts *SchedulerOpts) *
}

return &Scheduler{
id: generateSchedulerID(),
state: &serverState{value: srvStateNew},
logger: logger,
client: NewClientFromRedisClient(c),
rdb: rdb.NewRDB(c),
cron: cron.New(cron.WithLocation(loc)),
location: loc,
done: make(chan struct{}),
preEnqueueFunc: opts.PreEnqueueFunc,
postEnqueueFunc: opts.PostEnqueueFunc,
errHandler: opts.EnqueueErrorHandler,
idmap: make(map[string]cron.EntryID),
id: generateSchedulerID(),
state: &serverState{value: srvStateNew},
heartbeatInterval: heartbeatInterval,
logger: logger,
client: NewClientFromRedisClient(c),
rdb: rdb.NewRDB(c),
cron: cron.New(cron.WithLocation(loc)),
location: loc,
done: make(chan struct{}),
preEnqueueFunc: opts.PreEnqueueFunc,
postEnqueueFunc: opts.PostEnqueueFunc,
errHandler: opts.EnqueueErrorHandler,
idmap: make(map[string]cron.EntryID),
}
}

Expand All @@ -106,6 +115,15 @@ func generateSchedulerID() string {

// SchedulerOpts specifies scheduler options.
type SchedulerOpts struct {
// HeartbeatInterval specifies the interval between scheduler heartbeats.
//
// If unset, zero or a negative value, the interval is set to 10 second.
//
// Note: Setting this value too low may add significant load to redis.
//
// By default, HeartbeatInterval is set to 10 seconds.
HeartbeatInterval time.Duration

// Logger specifies the logger used by the scheduler instance.
//
// If unset, the default logger is used.
Expand Down Expand Up @@ -284,7 +302,7 @@ func (s *Scheduler) Shutdown() {

func (s *Scheduler) runHeartbeater() {
defer s.wg.Done()
ticker := time.NewTicker(5 * time.Second)
ticker := time.NewTicker(s.heartbeatInterval)
for {
select {
case <-s.done:
Expand Down Expand Up @@ -317,7 +335,7 @@ func (s *Scheduler) beat() {
entries = append(entries, e)
}
s.logger.Debugf("Writing entries %v", entries)
if err := s.rdb.WriteSchedulerEntries(s.id, entries, 5*time.Second); err != nil {
if err := s.rdb.WriteSchedulerEntries(s.id, entries, s.heartbeatInterval); err != nil {
Copy link
Collaborator

@kamikazechaser kamikazechaser Nov 5, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From the original issue:

scheduler entries are prone to expire when redis is slow to respond

Even though this just affects the inspector, there is a small chance that the SchedulerEntries may be empty because they expired before the next beat ran. I would propose it to be such that redisKey ttl > beat tick to reduce the chance of the entries being empty at any given time.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Absolutely, I did not want to tackle this in the same PR, but I think we should have ttl = s.heartbeatInterval * 2.

I don't have strong opinions on this, what option do you prefer:

  • ttl = s.heartbeatInterval * 2
  • ttl = s.heartbeatInterval + time.Second
  • Or a second option

Copy link
Collaborator

@kamikazechaser kamikazechaser Nov 6, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ttl = s.heartbeatInterval * 2

This sounds good.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

s.logger.Warnf("Scheduler could not write heartbeat data: %v", err)
}
}
Expand Down
Loading