-
Notifications
You must be signed in to change notification settings - Fork 0
/
crontab.go
303 lines (260 loc) · 6.98 KB
/
crontab.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
package xz_crontab
import (
"context"
"errors"
"fmt"
"github.com/gorhill/cronexpr"
"log"
"runtime/debug"
"sync"
"time"
)
// 任务调度计划表
type JobSchedulerPlan struct {
Job *Job
Expr *cronexpr.Expression // 解析好的cronnxpr 表达式
NextTime time.Time
NextTimeN []time.Time
}
type Job struct {
Name string // 任务名
Par string // 额外参数
CronExpr string // cron 表达式
IsOpen bool
IsSkip bool // 如果为true 忽视重复 false 默认只会开启一个
Callback func(par ...interface{}) (err error)
Once bool // true 常驻只执行一次
ShowNextN uint // 显示几个下次执行的时间,默认为1个
}
// 执行的结果
type JobResult struct {
Name string
Err error // 错误
StartTing time.Time
EndTime time.Time
}
type Scheduler struct {
jobPlanTable map[string]*JobSchedulerPlan // 执行计划表
jobPlanTableInit map[string]*Job // 只会执行一次的脚本
is_stop bool
sync.RWMutex
ctx context.Context
cancel context.CancelFunc
nextCh chan string
}
var g_jobexecuting map[string]string
var g_JobResult_chan chan *JobResult
func init() {
//log.SetFlags(log.Lshortfile | log.LstdFlags)
}
func InitCrontab(jobs []Job) *Scheduler {
g_jobexecuting = make(map[string]string)
g_JobResult_chan = make(chan *JobResult, 100)
model := &Scheduler{
jobPlanTable: make(map[string]*JobSchedulerPlan),
jobPlanTableInit: make(map[string]*Job),
is_stop: false,
nextCh: make(chan string, 100),
}
model.ctx, model.cancel = context.WithCancel(context.Background())
for _, job := range jobs {
if job.IsOpen {
if job.Once == false {
model.jobPlanTable[job.Name], _ = buildSchedulerPlan(job)
} else {
model.once(job)
}
}
}
go model.SchedulerLoop()
return model
}
func (c *Scheduler) NextChGet() chan string {
return c.nextCh
}
func (c *Scheduler) once(job Job) {
go func() {
defer func() {
if r := recover(); r != nil {
log.Println(errors.New("灾难错误"), r, string(debug.Stack()))
}
}()
job.Callback(job.Name, job.Par, c.ctx)
}()
}
// 构建任务执行计划
func buildSchedulerPlan(job Job) (jobSchedulerPlan *JobSchedulerPlan, err error) {
var (
expr *cronexpr.Expression
)
if expr, err = cronexpr.Parse(job.CronExpr); err != nil {
fmt.Println(err, "解析错误了")
return
}
nowT := time.Now()
nextNow := expr.Next(nowT)
now := time.Now()
if nextNow.Before(now) {
err = errors.New("时间过期了")
return
}
var nextN []time.Time
if job.ShowNextN > 0 {
nextN = expr.NextN(now, job.ShowNextN)
}
jobSchedulerPlan = &JobSchedulerPlan{
Job: &job,
Expr: expr,
NextTime: nextNow,
NextTimeN: nextN,
}
return
}
// 调度协程
func (scheduler *Scheduler) SchedulerLoop() {
defer func() {
if err := recover(); err != nil {
fmt.Println(err)
}
}()
// 定时任务
var (
schedulerAfter time.Duration
schedulerTimer *time.Timer
)
// 计算调度的时间
schedulerAfter = scheduler.TrySchedule(true)
// 调度延时器
schedulerTimer = time.NewTimer(schedulerAfter)
// 调度延迟事件
for {
select {
case <-schedulerTimer.C:
case result := <-g_JobResult_chan:
dealResult(result)
}
// 重新调度一次任务
schedulerAfter = scheduler.TrySchedule(false)
// 重置任务定时器
schedulerTimer.Reset(schedulerAfter)
}
}
// 尝试遍历所有任务
func (scheduler *Scheduler) TrySchedule(isInit bool) (schedulerAfter time.Duration) {
var (
//jobPlan *JobSchedulerPlan
now time.Time
nearTime *time.Time
)
// 没有任务睡一s
if len(scheduler.jobPlanTable) == 0 {
schedulerAfter = 1 * time.Second
return
}
now = time.Now()
for key, jobPlan := range scheduler.jobPlanTable {
if jobPlan.NextTime.Unix() < 0 {
// 过期的删除
log.Println("我删除了", jobPlan.Job.Name)
delete(scheduler.jobPlanTable, key)
continue
}
timeLayout := "2006-01-02 15:04:05" //转化所需模板
if isInit { // 不需要下次某一个脚本触发的时候吧所有发过来了,只要发当次新的就行
if len(jobPlan.NextTimeN) > 0 {
for i, timan := range jobPlan.NextTimeN {
select {
case scheduler.nextCh <- fmt.Sprintf("%s,初始化下次执行的时间第:%d次,%s", jobPlan.Job.Name, i+1, timan.Format(timeLayout)):
default:
}
}
} else {
datetime := jobPlan.NextTime.Format(timeLayout)
select {
case scheduler.nextCh <- fmt.Sprintf("%s,初始化下次执行的时间第:1次,%s", jobPlan.Job.Name, datetime):
default:
}
}
}
if jobPlan.NextTime.Before(now) || jobPlan.NextTime.Equal(now) {
if scheduler.getStop() {
goto LOOP
}
// 执行的任务可能运行很久, 1分钟会调度60次,但是只能执行1次, 防止并发!
if jobPlan.Job.IsSkip == false {
// 如果任务正在执行,跳过本次调度
if _, jobExecuting := g_jobexecuting[jobPlan.Job.Name]; jobExecuting {
log.Printf("尚未退出,跳过执行:%s", jobPlan.Job.Name)
goto LOOP
}
}
if jobPlan.Job.Callback == nil {
goto LOOP
}
// 保存执行状态
g_jobexecuting[jobPlan.Job.Name] = jobPlan.Job.Name
go func(jobPlan *JobSchedulerPlan) { // go 需要传值进来
defer func() {
if r := recover(); r != nil {
log.Println(errors.New("灾难错误"), r, string(debug.Stack()))
}
}()
startTing := time.Now()
err := jobPlan.Job.Callback(jobPlan.Job.Name, jobPlan.Job.Par, jobPlan.NextTime.Format(timeLayout))
if err != nil {
log.Println(jobPlan.Job.Name, err)
}
endTime := time.Now()
pushg_JobResult_chan(jobPlan.Job.Name, startTing, endTime, err)
}(jobPlan)
LOOP:
// 更新下次执行时间
jobPlan.NextTime = jobPlan.Expr.Next(now)
if !isInit { // 运行后更新下一次时间
datetime := jobPlan.NextTime.Format(timeLayout)
select {
case scheduler.nextCh <- fmt.Sprintf("%s,下次执行的时间:%s", jobPlan.Job.Name, datetime):
default:
}
}
}
if nearTime == nil || jobPlan.NextTime.Before(*nearTime) {
nearTime = &jobPlan.NextTime
}
}
// 睡眠多少时间
schedulerAfter = (*nearTime).Sub(now)
//log.Println("schedulerAfter",schedulerAfter)
return
}
func pushg_JobResult_chan(name string, startTime, endTime time.Time, err error) {
g_JobResult_chan <- &JobResult{
Name: name,
StartTing: startTime,
EndTime: endTime,
Err: err,
}
}
func dealResult(result *JobResult) {
delete(g_jobexecuting, result.Name)
//log.Println("执行时间删除",result.EndTime.Unix() - result.EndTime.Unix(),result.Name)
}
// 关闭脚本
func (scheduler *Scheduler) getStop() bool {
scheduler.RLock()
defer scheduler.RUnlock()
return scheduler.is_stop
}
// 关闭脚本
func (scheduler *Scheduler) Stop() {
scheduler.Lock()
defer scheduler.Unlock()
scheduler.is_stop = true
// 取消一次性脚本
scheduler.cancel()
}
func (scheduler *Scheduler) Start() {
scheduler.Lock()
defer scheduler.Unlock()
scheduler.is_stop = false
}