-
Notifications
You must be signed in to change notification settings - Fork 21
/
chains.go
157 lines (132 loc) · 3.72 KB
/
chains.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
package tasqueue
import (
"context"
"fmt"
"github.com/google/uuid"
"github.com/vmihailenco/msgpack/v5"
)
// ChainMeta contains fields related to a chain job.
type ChainMeta struct {
ID string
// Status of the overall chain
Status string
// ID of the current job part of chain
JobID string
// List of IDs of completed jobs
PrevJobs []string
}
// ChainMessage is a wrapper over Chain, containing meta info such as status, id.
// A ChainMessage is stored in the results store.
type ChainMessage struct {
ChainMeta
}
type Chain struct {
Jobs []Job
Opts ChainOpts
}
type ChainOpts struct {
// Optional ID passed by client. If empty, Tasqueue generates it.
ID string
}
// NewChain() accepts a list of Tasks and creates a chain by setting the
// onSuccess task of i'th task to (i+1)'th task, hence forming a "chain".
// It returns the first task (essentially the first node of the linked list), which can be queued normally.
func NewChain(j []Job, opts ChainOpts) (Chain, error) {
if len(j) < 2 {
return Chain{}, fmt.Errorf("minimum 2 tasks required to form chain")
}
// Set the on success tasks as the i+1 task,
// hence forming a "chain" of tasks.
for i := 0; i < len(j)-1; i++ {
j[i].OnSuccess = append(j[i].OnSuccess, &j[i+1])
}
return Chain{Jobs: j, Opts: opts}, nil
}
// message() converts a group into a group message, ready to be enqueued/stored.
func (c *Chain) message() ChainMessage {
if c.Opts.ID == "" {
c.Opts.ID = uuid.NewString()
}
return ChainMessage{
ChainMeta: ChainMeta{
ID: c.Opts.ID,
Status: StatusProcessing,
},
}
}
func (s *Server) EnqueueChain(ctx context.Context, c Chain) (string, error) {
msg := c.message()
root := c.Jobs[0]
jobID, err := s.Enqueue(ctx, root)
if err != nil {
return "", err
}
msg.JobID = jobID
if err := s.setChainMessage(ctx, msg); err != nil {
return "", err
}
return msg.ID, nil
}
func (s *Server) GetChain(ctx context.Context, id string) (ChainMessage, error) {
c, err := s.getChainMessage(ctx, id)
if err != nil {
return ChainMessage{}, err
}
if c.Status == StatusDone || c.Status == StatusFailed {
return c, nil
}
// Fetch the current job, to check its status
currJob, err := s.GetJob(ctx, c.JobID)
if err != nil {
return ChainMessage{}, nil
}
checkJobs:
switch currJob.Status {
//If the current job failed, add it to previous jobs list
// Set the chain status to failed
case StatusFailed:
c.PrevJobs = append(c.PrevJobs, currJob.ID)
c.Status = StatusFailed
// If the current job status is an intermediatery status
// Set the chain status as processing.
case StatusStarted, StatusProcessing, StatusRetrying:
c.Status = StatusProcessing
// If the current job status is done, check the next job id.
// If there is no next job id, the chain is complete, set overall status
// to success. Otherwise update the current job and perform all the above checks.
case StatusDone:
c.PrevJobs = append(c.PrevJobs, currJob.ID)
if len(currJob.OnSuccessIDs) == 0 {
c.Status = StatusDone
} else {
currJob, err = s.GetJob(ctx, currJob.OnSuccessIDs[0])
if err != nil {
return ChainMessage{}, nil
}
goto checkJobs
}
}
if err = s.setChainMessage(ctx, c); err != nil {
return ChainMessage{}, nil
}
return c, nil
}
const chainPrefix = "chain:msg:"
func (s *Server) setChainMessage(ctx context.Context, c ChainMessage) error {
b, err := msgpack.Marshal(c)
if err != nil {
return err
}
return s.results.Set(ctx, chainPrefix+c.ID, b)
}
func (s *Server) getChainMessage(ctx context.Context, id string) (ChainMessage, error) {
b, err := s.GetResult(ctx, chainPrefix+id)
if err != nil {
return ChainMessage{}, err
}
var c ChainMessage
if err := msgpack.Unmarshal(b, &c); err != nil {
return ChainMessage{}, err
}
return c, nil
}