-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathcluster.go
177 lines (155 loc) · 4.28 KB
/
cluster.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
package raft
import (
"encoding/gob"
"encoding/json"
"fmt"
"github.com/Mathew-Estafanous/memlist"
"io"
"log"
"os"
"strconv"
"sync"
"time"
)
// Cluster keeps track of all other nodes and their addresses.
// It also holds agreed upon constants such as heart beat time and election timeout.
type Cluster interface {
GetNode(id uint64) (Node, error)
AllNodes() map[uint64]Node
Quorum() int
}
type Node struct {
// An ID that uniquely identifies the raft in the Cluster.
ID uint64 `json:"id"`
// Address of the node, that other rafts can contact.
Addr string `json:"addr"`
}
// StaticCluster is a static definition of all members of the cluster. As such new members
// cannot be dynamically discovered. All members must be known from the start.
type StaticCluster struct {
mu sync.Mutex
// AllLogs the nodes within the raft Cluster. Key is a raft id.
Nodes map[uint64]Node
logger *log.Logger
}
func (c *StaticCluster) AllNodes() map[uint64]Node {
c.mu.Lock()
defer c.mu.Unlock()
return c.Nodes
}
// NewCluster will create an entirely new static that doesn't contain any nodes.
func NewCluster() *StaticCluster {
return &StaticCluster{
Nodes: make(map[uint64]Node),
logger: log.New(os.Stdout, "[Cluster]", log.LstdFlags),
}
}
// NewClusterWithConfig similarly creates a static and adds all the nodes that are
// defined by configuration reader. The config file formatting is expected to be a
// json format.
func NewClusterWithConfig(conf io.Reader) (*StaticCluster, error) {
cl := NewCluster()
decoder := json.NewDecoder(conf)
if err := decoder.Decode(&cl.Nodes); err != nil {
return nil, err
}
return cl, nil
}
func (c *StaticCluster) GetNode(id uint64) (Node, error) {
c.mu.Lock()
defer c.mu.Unlock()
n, ok := c.Nodes[id]
if !ok {
return Node{}, fmt.Errorf("couldn't find a node with id %v", id)
}
return n, nil
}
func (c *StaticCluster) Quorum() int {
return len(c.Nodes)/2 + 1
}
func (c *StaticCluster) addNode(n Node) error {
c.mu.Lock()
defer c.mu.Unlock()
if _, ok := c.Nodes[n.ID]; ok {
return fmt.Errorf("[Cluster] A node with ID: %d is already registered", n.ID)
}
c.logger.Printf("Added a new node with ID: %d and Address: %v", n.ID, n.Addr)
c.Nodes[n.ID] = n
return nil
}
func (c *StaticCluster) removeNode(id uint64) (Node, error) {
c.mu.Lock()
defer c.mu.Unlock()
n, ok := c.Nodes[id]
if !ok {
return Node{}, fmt.Errorf("[Cluster] A node with ID: %d is not registered", id)
}
c.logger.Printf("Removed a node with ID: %d and Address: %v", n.ID, n.Addr)
delete(c.Nodes, n.ID)
return n, nil
}
type DynamicCluster struct {
cl *StaticCluster
member *memlist.Member
logger *log.Logger
mu sync.Mutex
}
func NewDynamicCluster(port uint16, raftNode Node) (*DynamicCluster, error) {
gob.Register(Node{})
cluster := &DynamicCluster{
cl: NewCluster(),
logger: log.New(os.Stdout, fmt.Sprintf("[Dynamic Cluster :%d]", port), log.LstdFlags),
}
config := memlist.DefaultLocalConfig()
config.Name = "M#" + strconv.Itoa(int(port))
config.BindPort = port
config.EventListener = cluster
config.MetaData = raftNode
member, err := memlist.Create(config)
if err != nil {
return nil, err
}
cluster.member = member
err = cluster.cl.addNode(raftNode)
if err != nil {
return nil, err
}
return cluster, nil
}
func (c *DynamicCluster) GetNode(id uint64) (Node, error) {
return c.cl.GetNode(id)
}
func (c *DynamicCluster) AllNodes() map[uint64]Node {
return c.cl.AllNodes()
}
func (c *DynamicCluster) Quorum() int {
return c.cl.Quorum()
}
func (c *DynamicCluster) OnMembershipChange(peer memlist.Node) {
node, ok := peer.Data.(Node)
if !ok {
c.logger.Printf("Failed to get member node Data: %v", peer.Data)
}
switch peer.State {
case memlist.Alive:
err := c.cl.addNode(node)
if err != nil {
c.logger.Printf("Failed to add node: %v", err)
return
}
case memlist.Left, memlist.Dead:
node, err := c.cl.removeNode(node.ID)
if err != nil {
c.logger.Printf("Failed to remove node: %v", err)
return
}
c.logger.Printf("Removed a node with ID: %d and Address: %v", node.ID, node.Addr)
}
}
// Join will initiate the joining process of the raft node to the cluster.
func (c *DynamicCluster) Join(otherAddr string) error {
return c.member.Join(otherAddr)
}
func (c *DynamicCluster) Leave(timeout time.Duration) error {
return c.member.Leave(timeout)
}