Skip to content

Commit

Permalink
refactor code and update readme
Browse files Browse the repository at this point in the history
  • Loading branch information
amitiwary999 committed Aug 31, 2024
1 parent 30bf8df commit c6ff4c1
Show file tree
Hide file tree
Showing 6 changed files with 57 additions and 79 deletions.
22 changes: 18 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
# TaskScheduler

This is use to complete task(function) in parllel to complete task fast. Each task consist of two things function that need to perform and metaId that use to fetch meta from the database using the metaId. Meta is use in the function to perform task. Each task(function) may or may not need parameters.
This can use to peform tasks, with option to retry if failed and also peform task after some delay, in parallel to complete task quickly. Each task consist of two things function that need to perform and metaId that use to fetch meta from the database using the metaId. Meta is use in the function to perform task. Each task(function) may or may not need parameters.

Use go get github.com/amitiwary999/task-scheduler to fetch this module in your code.

Expand All @@ -9,8 +8,23 @@ import this library and then init the task scheduler
```
import ("github.com/amitiwary999/task-scheduler/scheduler")
tsk := scheduler.NewTaskScheduler(doneChannel, postgresUrl, poolLimit, workerCount, taskQueueLimit)
go tsk.StartScheduler()
tconf := &scheduler.TaskConfig{
MaxTaskWorker: 10,
TaskQueueSize: 10000,
Done: done,
RetryTimeDuration: time.Duration(5 * time.Second),
FuncGenerator: generateFunc,
}
tsk := scheduler.NewTaskScheduler(tconf)
/**
storage use to save the task so that we can fetch fail task later to retry again. Scheduler need StorageClient interface so that it can perform the storage operation. So before InitScheduler call make sure to get a StorageClient interface. We provide the function to create postgres db client. Will add support for the other storage later.
*/
dbClient, err := storage.NewPostgresClient(os.Getenv("POSTGRES_URL"), int16(poolLimit), "jobdetail")
tsk.InitScheduler(dbClient)
/**
After init scheduler successfully, Task can be submit. Each task has Meta of task and TaskFn function to perform the task.
*/
meta := model.TaskMeta{
MetaId: id,
Delay: intValue(after how much delay task need to perform, optional)
Expand Down
7 changes: 2 additions & 5 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (

model "github.com/amitiwary999/task-scheduler/model"
scheduler "github.com/amitiwary999/task-scheduler/scheduler"

storage "github.com/amitiwary999/task-scheduler/storage"
"github.com/joho/godotenv"
)

Expand Down Expand Up @@ -40,17 +40,14 @@ func main() {
gracefulShutdown := make(chan os.Signal, 1)
signal.Notify(gracefulShutdown, syscall.SIGINT, syscall.SIGTERM)
tconf := &scheduler.TaskConfig{
PostgUrl: os.Getenv("POSTGRES_URL"),
PoolLimit: int16(poolLimit),
MaxTaskWorker: 10,
TaskQueueSize: 10000,
JobTableName: "jobdetail",
Done: done,
RetryTimeDuration: time.Duration(5 * time.Second),
FuncGenerator: generateFunc,
}
tsk := scheduler.NewTaskScheduler(tconf)
postgClient, err := tsk.InitStorage()
postgClient, err := storage.NewPostgresClient(os.Getenv("POSTGRES_URL"), int16(poolLimit), "jobdetail")
if err != nil {
return
}
Expand Down
62 changes: 27 additions & 35 deletions cmd/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@ import (
"errors"
"fmt"
"math/rand"
"os"
"strconv"
"sync"
"testing"
"time"
Expand Down Expand Up @@ -60,44 +58,38 @@ func TestTaskScheduler(t *testing.T) {
if err != nil {
t.Errorf("error load env %v\n", err)
}
poolLimit, err := strconv.Atoi(os.Getenv("POSTGRES_POOL_LIMIT"))

done := make(chan int)
tconf := &scheduler.TaskConfig{
MaxTaskWorker: 10,
TaskQueueSize: 10000,
Done: done,
RetryTimeDuration: time.Duration(3 * time.Second),
FuncGenerator: testGenerateFunc,
}
tsk := scheduler.NewTaskScheduler(tconf)
postgClient := dbStruct{}
tsk.InitScheduler(postgClient)
if err != nil {
t.Errorf("error in the string conversion pool limit %v", err)
} else {
done := make(chan int)
tconf := &scheduler.TaskConfig{
PostgUrl: os.Getenv("POSTGRES_URL"),
PoolLimit: int16(poolLimit),
MaxTaskWorker: 10,
TaskQueueSize: 10000,
JobTableName: "jobdetail",
Done: done,
RetryTimeDuration: time.Duration(3 * time.Second),
FuncGenerator: testGenerateFunc,
return
}
for i := 0; i < 10; i++ {
id := fmt.Sprintf("task_%v", i)
meta := &model.TaskMeta{
MetaId: id,
}
tsk := scheduler.NewTaskScheduler(tconf)
postgClient := dbStruct{}
tsk.InitScheduler(postgClient)
if err != nil {
return
if id == "task_6" {
meta.Retry = 5
}
for i := 0; i < 10; i++ {
id := fmt.Sprintf("task_%v", i)
meta := &model.TaskMeta{
MetaId: id,
}
if id == "task_6" {
meta.Retry = 5
}
mdlTsk := model.Task{
Meta: meta,
}
err := tsk.AddNewTask(mdlTsk)
if err == nil {
wg.Add(1)
}
mdlTsk := model.Task{
Meta: meta,
}
err := tsk.AddNewTask(mdlTsk)
if err == nil {
wg.Add(1)
}
}

wg.Wait()
if successC == 9 && failC == 1 {
t.Log("successfully completed the tasks")
Expand Down
14 changes: 7 additions & 7 deletions manager/task-manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,17 +10,17 @@ import (
)

type TaskManager struct {
postgClient util.PostgClient
storageClient util.StorageClient
taskActor *TaskActor
done chan int
priorityQueue PriorityQueue
retryTimeDuration time.Duration
funcGenerator func() func(*model.TaskMeta) error
}

func InitManager(postgClient util.PostgClient, taskActor *TaskActor, retryTime time.Duration, funcGenerator func() func(*model.TaskMeta) error, done chan int) *TaskManager {
func InitManager(storageClient util.StorageClient, taskActor *TaskActor, retryTime time.Duration, funcGenerator func() func(*model.TaskMeta) error, done chan int) *TaskManager {
return &TaskManager{
postgClient: postgClient,
storageClient: storageClient,
taskActor: taskActor,
funcGenerator: funcGenerator,
retryTimeDuration: retryTime,
Expand All @@ -40,7 +40,7 @@ func (tm *TaskManager) AddNewTask(task model.Task) error {
if task.Meta.Delay > 0 {
task.Meta.ExecutionTime = time.Now().Unix() + int64(task.Meta.Delay)*60
}
id, err := tm.postgClient.SaveTask(task.Meta)
id, err := tm.storageClient.SaveTask(task.Meta)
if err != nil {
fmt.Printf("failed to save the task %v\n", err)
return err
Expand Down Expand Up @@ -70,7 +70,7 @@ func (tm *TaskManager) assignTask(idTask string, meta *model.TaskMeta) {
taskStatus = util.JOB_DETAIL_STATUS_FAILED
}
}
tm.postgClient.UpdateTaskStatus(idTask, taskStatus, *meta)
tm.storageClient.UpdateTaskStatus(idTask, taskStatus, *meta)
}
tsk := model.ActorTask{
Meta: meta,
Expand Down Expand Up @@ -108,7 +108,7 @@ func (tm *TaskManager) retryFailedTask() {
ticker.Stop()
return
case <-ticker.C:
tsks, err := tm.postgClient.GetFailTask()
tsks, err := tm.storageClient.GetFailTask()
if err == nil {
for _, tsk := range tsks {
go tm.assignTask(tsk.Id, tsk.Meta)
Expand All @@ -121,7 +121,7 @@ func (tm *TaskManager) retryFailedTask() {
}

func (tm *TaskManager) loadPendingTask() {
tsks, err := tm.postgClient.GetPendingTask()
tsks, err := tm.storageClient.GetPendingTask()
if err == nil {
for _, tsk := range tsks {
go tm.assignTask(tsk.Id, tsk.Meta)
Expand Down
29 changes: 2 additions & 27 deletions scheduler/TaskSchedulerConfig.go
Original file line number Diff line number Diff line change
@@ -1,19 +1,14 @@
package scheduler

import (
"fmt"
"time"

manager "github.com/amitiwary999/task-scheduler/manager"
model "github.com/amitiwary999/task-scheduler/model"
storage "github.com/amitiwary999/task-scheduler/storage"
"github.com/amitiwary999/task-scheduler/util"
)

type TaskConfig struct {
PostgUrl string
PoolLimit int16
JobTableName string
MaxTaskWorker uint16
TaskQueueSize uint16
RetryTimeDuration time.Duration
Expand All @@ -22,11 +17,8 @@ type TaskConfig struct {
}

type TaskScheduler struct {
postgUrl string
poolLimit int16
maxTaskWorker uint16
taskQueueSize uint16
jobTableName string
retryTimeDuration time.Duration
funcGenerator func() func(*model.TaskMeta) error
done chan int
Expand All @@ -36,33 +28,16 @@ type TaskScheduler struct {
func NewTaskScheduler(tconf *TaskConfig) *TaskScheduler {
return &TaskScheduler{
done: tconf.Done,
postgUrl: tconf.PostgUrl,
poolLimit: tconf.PoolLimit,
maxTaskWorker: tconf.MaxTaskWorker,
taskQueueSize: tconf.TaskQueueSize,
jobTableName: tconf.JobTableName,
funcGenerator: tconf.FuncGenerator,
retryTimeDuration: tconf.RetryTimeDuration,
}
}

func (t *TaskScheduler) InitStorage() (util.PostgClient, error) {
postgClient, error := storage.NewPostgresClient(t.postgUrl, t.poolLimit, t.jobTableName)
if error != nil {
fmt.Printf("postgres cient failed %v\n", error)
return nil, error
}
err := postgClient.CreateJobTable()
if err != nil {
fmt.Printf("failed to create the table to save job details %v \n", err)
return nil, err
}
return postgClient, nil
}

func (t *TaskScheduler) InitScheduler(postgClient util.PostgClient) {
func (t *TaskScheduler) InitScheduler(storageClient util.StorageClient) {
ta := manager.NewTaskActor(t.maxTaskWorker, t.done, t.taskQueueSize)
taskM := manager.InitManager(postgClient, ta, t.retryTimeDuration, t.funcGenerator, t.done)
taskM := manager.InitManager(storageClient, ta, t.retryTimeDuration, t.funcGenerator, t.done)
t.taskM = taskM
taskM.StartManager()
}
Expand Down
2 changes: 1 addition & 1 deletion util/type.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ type SupabaseClient interface {
GetPendingTask() ([]byte, error)
}

type PostgClient interface {
type StorageClient interface {
SaveTask(meta *model.TaskMeta) (string, error)
UpdateTaskStatus(id, status string, meta model.TaskMeta) error
GetPendingTask() ([]model.PendingTask, error)
Expand Down

0 comments on commit c6ff4c1

Please sign in to comment.