Skip to content

Commit

Permalink
feat(core): tasker interface
Browse files Browse the repository at this point in the history
  • Loading branch information
ConsoleTVs committed Aug 29, 2024
1 parent b5728d6 commit 15b2e36
Show file tree
Hide file tree
Showing 6 changed files with 108 additions and 109 deletions.
12 changes: 6 additions & 6 deletions action.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,12 @@ import "time"
//
// It allows applying a logic based
// on the task's output.
type Action func(scheduler *Scheduler, task *Task)
type Action func(scheduler *Scheduler, task Tasker)

// ThenAdd adds the given tasks to the scheduler
// when the action is run.
func (action Action) ThenAdd(tasks ...*Task) Action {
return action.Then(func(scheduler *Scheduler, _ *Task) {
func (action Action) ThenAdd(tasks ...Tasker) Action {
return action.Then(func(scheduler *Scheduler, _ Tasker) {
scheduler.Add(tasks...)
})
}
Expand All @@ -20,7 +20,7 @@ func (action Action) ThenAdd(tasks ...*Task) Action {
// is run. The callback supports a specific action callback that
// gets the current scheduler and the executed task.
func (action Action) Then(callback Action) Action {
return func(scheduler *Scheduler, task *Task) {
return func(scheduler *Scheduler, task Tasker) {
if action != nil {
action(scheduler, task)
}
Expand All @@ -43,7 +43,7 @@ func Continue() Action {
// after the duration exceeds but rather that the
// task will be in the scheduler again after that time.
func Retry(duration time.Duration) Action {
return func(scheduler *Scheduler, task *Task) {
return func(scheduler *Scheduler, task Tasker) {
scheduler.Remove(task)

go func() {
Expand All @@ -55,7 +55,7 @@ func Retry(duration time.Duration) Action {

// Done removes the task from the scheduler
func Done() Action {
return func(scheduler *Scheduler, task *Task) {
return func(scheduler *Scheduler, task Tasker) {
scheduler.Remove(task)
}
}
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
module github.com/studiolambda/yotei

go 1.22.0
go 1.23.0
48 changes: 33 additions & 15 deletions scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,12 @@ package yotei
import (
"context"
"fmt"
"io"
"iter"
"log/slog"
"math/rand"
"runtime"
"slices"
"sync"
)

Expand All @@ -25,10 +28,21 @@ type Scheduler struct {

// WorkersNumCPUs uses the number of CPU cores of the computer.
// as the number of workers.
var WorkersNumCPUs uint64 = 0
var (
// SingleWorker fires a scheduler with just one worker.
SingleWorker uint64 = 1

// DefaultLogger is the default logger for the yotei scheduler.
var DefaultLogger *slog.Logger = nil
// NumCPUsWorkers fires a scheduler with [runtime.NumCPU] workers.
NumCPUsWorkers uint64 = uint64(runtime.NumCPU())
)

var (
// DefaultLogger is the default logger for the yotei scheduler.
DefaultLogger *slog.Logger = slog.Default()

// SilentLogger is a silent logger for the yotei scheduler.
SilentLogger *slog.Logger = slog.New(slog.NewTextHandler(io.Discard, nil))
)

// NewScheduler creates a new scheduler with the given workers and logger.
//
Expand All @@ -45,11 +59,11 @@ var DefaultLogger *slog.Logger = nil
// )
func NewScheduler(workers uint64, logger *slog.Logger) *Scheduler {
if workers == 0 {
workers = uint64(runtime.NumCPU())
workers = NumCPUsWorkers
}

if logger == nil {
logger = slog.Default()
logger = DefaultLogger
}

return &Scheduler{
Expand All @@ -60,7 +74,7 @@ func NewScheduler(workers uint64, logger *slog.Logger) *Scheduler {

// Add appends a task into the scheduler. If the task
// was already in the scheduler it will ignore it.
func (scheduler *Scheduler) Add(tasks ...*Task) {
func (scheduler *Scheduler) Add(tasks ...Tasker) {
scheduler.mutex.Lock()
defer scheduler.mutex.Unlock()

Expand All @@ -77,7 +91,7 @@ func (scheduler *Scheduler) Add(tasks ...*Task) {

// Has returns true if the given task is currently
// in the scheduler.
func (scheduler *Scheduler) Has(task *Task) bool {
func (scheduler *Scheduler) Has(task Tasker) bool {
scheduler.mutex.Lock()
defer scheduler.mutex.Unlock()

Expand All @@ -91,7 +105,7 @@ func (scheduler *Scheduler) Has(task *Task) bool {
}

// Remove deletes the given tasks from the scheduler.
func (scheduler *Scheduler) Remove(tasks ...*Task) {
func (scheduler *Scheduler) Remove(tasks ...Tasker) {
scheduler.mutex.Lock()
defer scheduler.mutex.Unlock()

Expand All @@ -104,7 +118,7 @@ func (scheduler *Scheduler) Remove(tasks ...*Task) {
}
}

func (scheduler *Scheduler) next() *Task {
func (scheduler *Scheduler) next() Tasker {
if !scheduler.mutex.TryLock() {
return nil
}
Expand Down Expand Up @@ -136,13 +150,13 @@ func (scheduler *Scheduler) next() *Task {
return nil
}

func (scheduler *Scheduler) handle(ctx context.Context, task *Task) {
func (scheduler *Scheduler) handle(ctx context.Context, task Tasker) {
if action := task.Handle(ctx); action != nil {
action(scheduler, task)
}
}

func (scheduler *Scheduler) handleTask(task *Task) {
func (scheduler *Scheduler) handleTasker(task Tasker) {
defer func() {
if !task.IsConcurrent() {
task.Unlock()
Expand Down Expand Up @@ -172,7 +186,7 @@ func (scheduler *Scheduler) worker() {
return
default:
if task := scheduler.next(); task != nil {
scheduler.handleTask(task)
scheduler.handleTasker(task)
continue
}
}
Expand Down Expand Up @@ -249,12 +263,16 @@ func (scheduler *Scheduler) IsRunning() bool {
return scheduler.ctx != nil
}

// Tasks returns the current scheduler tasks.
func (scheduler *Scheduler) Tasks() Tasks {
// Snapshot returns the current scheduler tasks as an iterator.
//
// When creating the iterator, the actual tasks are freezed
// from the moment this function called to ensure it
// is concurrent safe.
func (scheduler *Scheduler) Snapshot() iter.Seq[Tasker] {
scheduler.mutex.Lock()
defer scheduler.mutex.Unlock()

return scheduler.tasks
return slices.Values(slices.Clone(scheduler.tasks))
}

// String returns a string representation of a scheduler.
Expand Down
12 changes: 6 additions & 6 deletions scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ func (counter *CounterHandler) Count() uint64 {

func TestThreeTasks(t *testing.T) {
scheduler := yotei.NewScheduler(
yotei.WorkersNumCPUs,
yotei.DefaultLogger,
yotei.NumCPUsWorkers,
yotei.SilentLogger,
)

counter1 := &CounterHandler{}
Expand Down Expand Up @@ -74,8 +74,8 @@ func TestThreeTasks(t *testing.T) {

func TestItDoesNotRunLockedTasks(t *testing.T) {
scheduler := yotei.NewScheduler(
12,
yotei.DefaultLogger,
yotei.NumCPUsWorkers,
yotei.SilentLogger,
)

counter1 := &CounterHandler{}
Expand Down Expand Up @@ -113,8 +113,8 @@ func TestItDoesNotRunLockedTasks(t *testing.T) {

func TestSequence(t *testing.T) {
scheduler := yotei.NewScheduler(
yotei.WorkersNumCPUs,
yotei.DefaultLogger,
yotei.NumCPUsWorkers,
yotei.SilentLogger,
)

calls := make([]string, 0)
Expand Down
84 changes: 3 additions & 81 deletions task.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,37 +7,18 @@ import (
"time"
)

// Task is the the executionable action in the [Scheduler].
// Task is a sequential task executionable in the [yotei.Scheduler].
//
// A task must:
// - Be handlable
// - Have a weight
// - Have a duration
// - Be sequential or concurrent
// - Have an id
//
// Use [NewTask] to create a new task.
// Use [NewTask] to create a new sequential task.
type Task struct {
handler Handler
weight atomic.Uint64
duration atomic.Int64
locked atomic.Bool
concurrent atomic.Bool
id atomic.Value
}

// A list of actionable tasks
type Tasks []*Task

// Determines that the task can take unlimited duration.
var DurationUnlimited time.Duration = 0

// NewTask creates a new task with the given handler.
//
// By default, a task:
// - Has a weight of 1
// - Has unlimited duration
// - Is aequential
// NewTask creates a new sequential task with the given handler.
func NewTask(handler Handler) *Task {
if handler == nil {
panic("no task handler defined. please ensure the task handler is not nil")
Expand All @@ -50,21 +31,10 @@ func NewTask(handler Handler) *Task {
task.weight.Store(1)
task.duration.Store(int64(DurationUnlimited))
task.concurrent.Store(false)
task.id.Store("")

return task
}

func (task *Task) Identified(id string) *Task {
task.id.Store(id)

return task
}

func (task *Task) ID() string {
return task.id.Load().(string)
}

func (task *Task) Lock() {
task.locked.Store(true)
}
Expand Down Expand Up @@ -125,51 +95,3 @@ func (task *Task) String() string {
task.IsLocked(),
)
}

// Weight returns the sum of all he
// weights of all the tasks in the list.
func (tasks Tasks) Weight() uint64 {
total := uint64(0)

for _, task := range tasks {
total += task.Weight()
}

return total
}

// Unlocked returns the tasks that are unlocked
func (tasks Tasks) Unlocked() Tasks {
unlocked := make(Tasks, 0)

for _, task := range tasks {
if !task.IsLocked() {
unlocked = append(unlocked, task)
}
}

return unlocked
}

// Locked returns the tasks that are locked
func (tasks Tasks) Locked() Tasks {
locked := make(Tasks, 0)

for _, task := range tasks {
if task.IsLocked() {
locked = append(locked, task)
}
}

return locked
}

func (tasks Tasks) FindByID(id string) (*Task, bool) {
for _, task := range tasks {
if task.ID() == id {
return task, true
}
}

return nil, false
}
59 changes: 59 additions & 0 deletions tasker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package yotei

import (
"time"
)

type Tasker interface {
Handler
Duration() time.Duration
Weight() uint64
Lock()
Unlock()
IsLocked() bool
IsConcurrent() bool
}

// A list of actionable tasks
type Tasks []Tasker

// Determines that the task can take unlimited duration.
var DurationUnlimited time.Duration = 0

// Weight returns the sum of all he
// weights of all the tasks in the list.
func (tasks Tasks) Weight() uint64 {
total := uint64(0)

for _, task := range tasks {
total += task.Weight()
}

return total
}

// Unlocked returns the tasks that are unlocked
func (tasks Tasks) Unlocked() Tasks {
unlocked := make(Tasks, 0)

for _, task := range tasks {
if !task.IsLocked() {
unlocked = append(unlocked, task)
}
}

return unlocked
}

// Locked returns the tasks that are locked
func (tasks Tasks) Locked() Tasks {
locked := make(Tasks, 0)

for _, task := range tasks {
if task.IsLocked() {
locked = append(locked, task)
}
}

return locked
}

0 comments on commit 15b2e36

Please sign in to comment.