diff --git a/action.go b/action.go index 093de15..044a839 100644 --- a/action.go +++ b/action.go @@ -6,12 +6,12 @@ import "time" // // It allows applying a logic based // on the task's output. -type Action func(scheduler *Scheduler, task *Task) +type Action func(scheduler *Scheduler, task Tasker) // ThenAdd adds the given tasks to the scheduler // when the action is run. -func (action Action) ThenAdd(tasks ...*Task) Action { - return action.Then(func(scheduler *Scheduler, _ *Task) { +func (action Action) ThenAdd(tasks ...Tasker) Action { + return action.Then(func(scheduler *Scheduler, _ Tasker) { scheduler.Add(tasks...) }) } @@ -20,7 +20,7 @@ func (action Action) ThenAdd(tasks ...*Task) Action { // is run. The callback supports a specific action callback that // gets the current scheduler and the executed task. func (action Action) Then(callback Action) Action { - return func(scheduler *Scheduler, task *Task) { + return func(scheduler *Scheduler, task Tasker) { if action != nil { action(scheduler, task) } @@ -43,7 +43,7 @@ func Continue() Action { // after the duration exceeds but rather that the // task will be in the scheduler again after that time. func Retry(duration time.Duration) Action { - return func(scheduler *Scheduler, task *Task) { + return func(scheduler *Scheduler, task Tasker) { scheduler.Remove(task) go func() { @@ -55,7 +55,7 @@ func Retry(duration time.Duration) Action { // Done removes the task from the scheduler func Done() Action { - return func(scheduler *Scheduler, task *Task) { + return func(scheduler *Scheduler, task Tasker) { scheduler.Remove(task) } } diff --git a/go.mod b/go.mod index 298ac06..4494893 100644 --- a/go.mod +++ b/go.mod @@ -1,3 +1,3 @@ module github.com/studiolambda/yotei -go 1.22.0 +go 1.23.0 diff --git a/scheduler.go b/scheduler.go index 1bba804..b030b06 100644 --- a/scheduler.go +++ b/scheduler.go @@ -3,9 +3,12 @@ package yotei import ( "context" "fmt" + "io" + "iter" "log/slog" "math/rand" "runtime" + "slices" "sync" ) @@ -25,10 +28,21 @@ type Scheduler struct { // WorkersNumCPUs uses the number of CPU cores of the computer. // as the number of workers. -var WorkersNumCPUs uint64 = 0 +var ( + // SingleWorker fires a scheduler with just one worker. + SingleWorker uint64 = 1 -// DefaultLogger is the default logger for the yotei scheduler. -var DefaultLogger *slog.Logger = nil + // NumCPUsWorkers fires a scheduler with [runtime.NumCPU] workers. + NumCPUsWorkers uint64 = uint64(runtime.NumCPU()) +) + +var ( + // DefaultLogger is the default logger for the yotei scheduler. + DefaultLogger *slog.Logger = slog.Default() + + // SilentLogger is a silent logger for the yotei scheduler. + SilentLogger *slog.Logger = slog.New(slog.NewTextHandler(io.Discard, nil)) +) // NewScheduler creates a new scheduler with the given workers and logger. // @@ -45,11 +59,11 @@ var DefaultLogger *slog.Logger = nil // ) func NewScheduler(workers uint64, logger *slog.Logger) *Scheduler { if workers == 0 { - workers = uint64(runtime.NumCPU()) + workers = NumCPUsWorkers } if logger == nil { - logger = slog.Default() + logger = DefaultLogger } return &Scheduler{ @@ -60,7 +74,7 @@ func NewScheduler(workers uint64, logger *slog.Logger) *Scheduler { // Add appends a task into the scheduler. If the task // was already in the scheduler it will ignore it. -func (scheduler *Scheduler) Add(tasks ...*Task) { +func (scheduler *Scheduler) Add(tasks ...Tasker) { scheduler.mutex.Lock() defer scheduler.mutex.Unlock() @@ -77,7 +91,7 @@ func (scheduler *Scheduler) Add(tasks ...*Task) { // Has returns true if the given task is currently // in the scheduler. -func (scheduler *Scheduler) Has(task *Task) bool { +func (scheduler *Scheduler) Has(task Tasker) bool { scheduler.mutex.Lock() defer scheduler.mutex.Unlock() @@ -91,7 +105,7 @@ func (scheduler *Scheduler) Has(task *Task) bool { } // Remove deletes the given tasks from the scheduler. -func (scheduler *Scheduler) Remove(tasks ...*Task) { +func (scheduler *Scheduler) Remove(tasks ...Tasker) { scheduler.mutex.Lock() defer scheduler.mutex.Unlock() @@ -104,7 +118,7 @@ func (scheduler *Scheduler) Remove(tasks ...*Task) { } } -func (scheduler *Scheduler) next() *Task { +func (scheduler *Scheduler) next() Tasker { if !scheduler.mutex.TryLock() { return nil } @@ -136,13 +150,13 @@ func (scheduler *Scheduler) next() *Task { return nil } -func (scheduler *Scheduler) handle(ctx context.Context, task *Task) { +func (scheduler *Scheduler) handle(ctx context.Context, task Tasker) { if action := task.Handle(ctx); action != nil { action(scheduler, task) } } -func (scheduler *Scheduler) handleTask(task *Task) { +func (scheduler *Scheduler) handleTasker(task Tasker) { defer func() { if !task.IsConcurrent() { task.Unlock() @@ -172,7 +186,7 @@ func (scheduler *Scheduler) worker() { return default: if task := scheduler.next(); task != nil { - scheduler.handleTask(task) + scheduler.handleTasker(task) continue } } @@ -249,12 +263,16 @@ func (scheduler *Scheduler) IsRunning() bool { return scheduler.ctx != nil } -// Tasks returns the current scheduler tasks. -func (scheduler *Scheduler) Tasks() Tasks { +// Snapshot returns the current scheduler tasks as an iterator. +// +// When creating the iterator, the actual tasks are freezed +// from the moment this function called to ensure it +// is concurrent safe. +func (scheduler *Scheduler) Snapshot() iter.Seq[Tasker] { scheduler.mutex.Lock() defer scheduler.mutex.Unlock() - return scheduler.tasks + return slices.Values(slices.Clone(scheduler.tasks)) } // String returns a string representation of a scheduler. diff --git a/scheduler_test.go b/scheduler_test.go index 289984c..90db3d4 100644 --- a/scheduler_test.go +++ b/scheduler_test.go @@ -23,8 +23,8 @@ func (counter *CounterHandler) Count() uint64 { func TestThreeTasks(t *testing.T) { scheduler := yotei.NewScheduler( - yotei.WorkersNumCPUs, - yotei.DefaultLogger, + yotei.NumCPUsWorkers, + yotei.SilentLogger, ) counter1 := &CounterHandler{} @@ -74,8 +74,8 @@ func TestThreeTasks(t *testing.T) { func TestItDoesNotRunLockedTasks(t *testing.T) { scheduler := yotei.NewScheduler( - 12, - yotei.DefaultLogger, + yotei.NumCPUsWorkers, + yotei.SilentLogger, ) counter1 := &CounterHandler{} @@ -113,8 +113,8 @@ func TestItDoesNotRunLockedTasks(t *testing.T) { func TestSequence(t *testing.T) { scheduler := yotei.NewScheduler( - yotei.WorkersNumCPUs, - yotei.DefaultLogger, + yotei.NumCPUsWorkers, + yotei.SilentLogger, ) calls := make([]string, 0) diff --git a/task.go b/task.go index f0ac278..d60d369 100644 --- a/task.go +++ b/task.go @@ -7,37 +7,18 @@ import ( "time" ) -// Task is the the executionable action in the [Scheduler]. +// Task is a sequential task executionable in the [yotei.Scheduler]. // -// A task must: -// - Be handlable -// - Have a weight -// - Have a duration -// - Be sequential or concurrent -// - Have an id -// -// Use [NewTask] to create a new task. +// Use [NewTask] to create a new sequential task. type Task struct { handler Handler weight atomic.Uint64 duration atomic.Int64 locked atomic.Bool concurrent atomic.Bool - id atomic.Value } -// A list of actionable tasks -type Tasks []*Task - -// Determines that the task can take unlimited duration. -var DurationUnlimited time.Duration = 0 - -// NewTask creates a new task with the given handler. -// -// By default, a task: -// - Has a weight of 1 -// - Has unlimited duration -// - Is aequential +// NewTask creates a new sequential task with the given handler. func NewTask(handler Handler) *Task { if handler == nil { panic("no task handler defined. please ensure the task handler is not nil") @@ -50,21 +31,10 @@ func NewTask(handler Handler) *Task { task.weight.Store(1) task.duration.Store(int64(DurationUnlimited)) task.concurrent.Store(false) - task.id.Store("") - - return task -} - -func (task *Task) Identified(id string) *Task { - task.id.Store(id) return task } -func (task *Task) ID() string { - return task.id.Load().(string) -} - func (task *Task) Lock() { task.locked.Store(true) } @@ -125,51 +95,3 @@ func (task *Task) String() string { task.IsLocked(), ) } - -// Weight returns the sum of all he -// weights of all the tasks in the list. -func (tasks Tasks) Weight() uint64 { - total := uint64(0) - - for _, task := range tasks { - total += task.Weight() - } - - return total -} - -// Unlocked returns the tasks that are unlocked -func (tasks Tasks) Unlocked() Tasks { - unlocked := make(Tasks, 0) - - for _, task := range tasks { - if !task.IsLocked() { - unlocked = append(unlocked, task) - } - } - - return unlocked -} - -// Locked returns the tasks that are locked -func (tasks Tasks) Locked() Tasks { - locked := make(Tasks, 0) - - for _, task := range tasks { - if task.IsLocked() { - locked = append(locked, task) - } - } - - return locked -} - -func (tasks Tasks) FindByID(id string) (*Task, bool) { - for _, task := range tasks { - if task.ID() == id { - return task, true - } - } - - return nil, false -} diff --git a/tasker.go b/tasker.go new file mode 100644 index 0000000..5dee349 --- /dev/null +++ b/tasker.go @@ -0,0 +1,59 @@ +package yotei + +import ( + "time" +) + +type Tasker interface { + Handler + Duration() time.Duration + Weight() uint64 + Lock() + Unlock() + IsLocked() bool + IsConcurrent() bool +} + +// A list of actionable tasks +type Tasks []Tasker + +// Determines that the task can take unlimited duration. +var DurationUnlimited time.Duration = 0 + +// Weight returns the sum of all he +// weights of all the tasks in the list. +func (tasks Tasks) Weight() uint64 { + total := uint64(0) + + for _, task := range tasks { + total += task.Weight() + } + + return total +} + +// Unlocked returns the tasks that are unlocked +func (tasks Tasks) Unlocked() Tasks { + unlocked := make(Tasks, 0) + + for _, task := range tasks { + if !task.IsLocked() { + unlocked = append(unlocked, task) + } + } + + return unlocked +} + +// Locked returns the tasks that are locked +func (tasks Tasks) Locked() Tasks { + locked := make(Tasks, 0) + + for _, task := range tasks { + if task.IsLocked() { + locked = append(locked, task) + } + } + + return locked +}