-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathfsm.go
87 lines (74 loc) · 1.84 KB
/
fsm.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
package raft
// FSM (finite state machine) defines an interface that must be implemented by
// the client to receive commands sent by the raft Cluster.
type FSM interface {
// Apply will be invoked when a log has been successfully committed and
// should then be applied upon the state of the fsm.
Apply(data []byte) error
// Snapshot will create a byte slice representation of all the required data
// to represent the current state of the machine.
Snapshot() ([]byte, error)
// Restore the entire state of the FSM to a starting state.
Restore(cmd []byte) error
}
func (r *Raft) runFSM() {
for t := range r.fsmCh {
switch t := t.(type) {
case *fsmUpdate:
err := r.fsm.Apply(t.cmd)
t.respond(err)
case *fsmSnapshot:
state, err := r.fsm.Snapshot()
t.state = state
t.respond(err)
case *fsmRestore:
err := r.fsm.Restore(t.cmd)
t.respond(err)
}
}
}
type fsmUpdate struct {
errorTask
cmd []byte
}
type fsmSnapshot struct {
errorTask
state []byte
}
type fsmRestore struct {
errorTask
cmd []byte
}
// Task represents an operation that has been sent to the raft Cluster. Every task
// represents a future operation that returns when all operations have been applied
// to other raft replications.
type Task interface {
// Error is a blocking operation that will wait until the task has finished
// before return the result of the task.
//
// A non-nil error will be returned if the task failed to be committed.
Error() error
}
type errorTask struct {
errCh chan error
err error
}
func (l *errorTask) respond(err error) {
if l.errCh == nil {
return
}
if l.err != nil {
return
}
l.errCh <- err
close(l.errCh)
}
func (l *errorTask) Error() error {
// If an error has already been received previously then we can
// just return that error.
if l.err != nil {
return l.err
}
l.err = <-l.errCh
return l.err
}