-
Notifications
You must be signed in to change notification settings - Fork 0
/
net.go
110 lines (97 loc) · 2.08 KB
/
net.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
package vault
import (
"github.com/pkg/errors"
"math/rand"
"fmt"
"os"
)
type netImpl struct {
wr Warehouse
}
func SimpleNet(wr Warehouse) Net { return &netImpl{wr} }
func (ni *netImpl) Put(chunkUID string, data []byte, redundancy int) (error) {
var nodes = ni.wr.All()
var ok bool
var i int
for i < redundancy && len(nodes) > 0 {
j := rand.Intn(len(nodes))
node := nodes[j]
// remove
nodes[j] = nodes[len(nodes)-1]
nodes = nodes[:len(nodes)-1]
err := node.Put(chunkUID, data) // put on exists must return error
if err == nil {
ok = true
i++
}
}
if !ok {
return errors.Errorf("put %v: no suitable hosts", chunkUID)
}
return nil
}
func (ni *netImpl) Get(chunkUID string) ([]byte, error) {
var nodes = ni.wr.All()
for len(nodes) > 0 {
j := rand.Intn(len(nodes))
node := nodes[j]
data, err := node.Get(chunkUID)
if err == nil {
return data, nil
}
// remove
nodes[j] = nodes[len(nodes)-1]
nodes = nodes[:len(nodes)-1]
}
return nil, errors.Errorf("get %v: no suitable hosts", chunkUID)
}
func (ni *netImpl) Sync(redundancy int) (int, error) {
var nodes = ni.wr.All()
chunkIndex := map[string]int{}
// get index
for _, node := range nodes {
chunks, err := node.List()
if err != nil {
fmt.Fprintln(os.Stderr, "failed get index", err)
continue
}
for _, chunk := range chunks {
chunkIndex[chunk] = chunkIndex[chunk] + 1
}
}
var synced int
// redistribute chunks with less redundancy than expected
for chunk, count := range chunkIndex {
if count >= redundancy {
continue
}
data, err := ni.Get(chunk)
if err != nil {
fmt.Println("failed get chunk", chunk, err)
continue
}
err = ni.Put(chunk, data, redundancy-count)
if err != nil {
fmt.Println("failed redistribute chunk", chunk, err)
}
synced++
}
return synced, nil
}
func (ni *netImpl) Del(chunkUID string) error {
var nodes = ni.wr.All()
var success bool
var lastErr error
for _, node := range nodes {
err := node.Del(chunkUID)
if err != nil {
lastErr = err
} else {
success = true
}
}
if !success {
return lastErr
}
return nil
}