-
Notifications
You must be signed in to change notification settings - Fork 1
/
lock.go
178 lines (149 loc) · 3.54 KB
/
lock.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
package dynalock
import (
"context"
"time"
"github.com/aws/aws-sdk-go/service/dynamodb"
)
var (
// DefaultLockBackOff if locking is unsuccessful then this backoff will be used
DefaultLockBackOff = 3 * time.Second
)
type dynamodbLock struct {
ddb *Dynalock
last *KVPair
renewCh chan struct{}
renewEnable bool
tryLockPollingEnable bool
unlockCh chan struct{}
key string
value *dynamodb.AttributeValue
ttl time.Duration
}
// Lock attempt to lock the DynamoDB record, this will BLOCK and retry at a rate of once every 3 seconds
func (l *dynamodbLock) Lock(stopChan chan struct{}) (<-chan struct{}, error) {
lockHeld := make(chan struct{})
success, err := l.tryLock(lockHeld, stopChan)
if err != nil {
return nil, err
}
if success {
return lockHeld, nil
}
// if we have disabled renewals then return straight away
if !l.tryLockPollingEnable {
return nil, ErrLockAcquireFailed
}
// FIXME: This really needs a jitter for backoff
ticker := time.NewTicker(DefaultLockBackOff)
for {
select {
case <-ticker.C:
success, err := l.tryLock(lockHeld, stopChan)
if err != nil {
return nil, err
}
if success {
return lockHeld, nil
}
case <-stopChan:
return nil, ErrLockAcquireCancelled
}
}
}
// Lock attempt to lock the DynamoDB record, this will BLOCK and retry at a rate of once every 3 seconds
func (l *dynamodbLock) LockWithContext(ctx context.Context) (<-chan struct{}, error) {
lockHeld := make(chan struct{})
success, err := l.tryLock(lockHeld, ctx.Done())
if err != nil {
return nil, err
}
if success {
return lockHeld, nil
}
// if we have disabled renewals then return straight away
if !l.tryLockPollingEnable {
return nil, ErrLockAcquireFailed
}
// FIXME: This really needs a jitter for backoff
ticker := time.NewTicker(DefaultLockBackOff)
for {
select {
case <-ticker.C:
success, err := l.tryLock(lockHeld, ctx.Done())
if err != nil {
return nil, err
}
if success {
return lockHeld, nil
}
case <-ctx.Done():
return nil, ErrLockAcquireCancelled
}
}
}
// Unlock this will unlock and perfom a DELETE to remove the DynamoDB record
func (l *dynamodbLock) Unlock() error {
l.unlockCh <- struct{}{}
_, err := l.ddb.AtomicDelete(l.key, l.last)
if err != nil {
return err
}
l.last = nil
return err
}
func (l *dynamodbLock) tryLock(lockHeld chan struct{}, stopChan <-chan struct{}) (bool, error) {
success, new, err := l.ddb.AtomicPut(
l.key,
WriteWithPreviousKV(l.last),
WriteWithAttributeValue(l.value),
WriteWithTTL(l.ttl),
)
if err != nil {
if err == ErrKeyNotFound || err == ErrKeyModified || err == ErrKeyExists {
return false, nil
}
return false, err
}
if success {
l.last = new
// if renewals are enabled then hold the lock in the background
if l.renewEnable {
// keep holding
go l.holdLock(lockHeld, stopChan)
}
return true, nil
}
return false, err
}
func (l *dynamodbLock) holdLock(lockHeld chan struct{}, stopChan <-chan struct{}) {
defer close(lockHeld)
hold := func() error {
_, new, err := l.ddb.AtomicPut(
l.key,
WriteWithPreviousKV(l.last),
WriteWithAttributeValue(l.value),
WriteWithTTL(l.ttl),
)
if err == nil {
l.last = new
}
return err
}
// may need a floor of 1 second set
heartbeat := time.NewTicker(l.ttl / 3)
defer heartbeat.Stop()
for {
select {
case <-heartbeat.C:
if err := hold(); err != nil {
return
}
case <-l.renewCh:
return
case <-l.unlockCh:
return
case <-stopChan:
return
}
}
}