-
Notifications
You must be signed in to change notification settings - Fork 0
/
lock.js
147 lines (122 loc) · 3.99 KB
/
lock.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
const debug = require('debug')('Lock');
const md5 = require('./md5');
const pwait = require('pwait');
const Cache = require('./cache');
class Lock {
constructor(cacheInstance, options) {
if (!cacheInstance || !cacheInstance instanceof Cache) throw new Error('Lock need first parameter to be Cache instance');
this.cache = cacheInstance;
this.options = Object.assign({
//prefix for every key
keyPrefix: 'Lock',
//the check delay in ms
checkLockDelay: 50,
//default timeout
defaultTimeout: 10000 //10 seconds
}, options || {});
debug('new instance with options', this.options);
}
getKey(key) {
if (typeof key !== 'string' && typeof key !== 'number') {
key = md5(JSON.stringify(key));
}
return `${this.options.keyPrefix}:${key}`;
}
async getAllLock(lockName, timeout) {
debug(`getting all lock for ${lockName}`);
let lockTimeoutValue = 0, delayed = false;
do {
lockTimeoutValue = Date.now() + timeout + 1;
let r = await this.cache.set(this.getKey(lockName) + ':all', lockTimeoutValue, 'PX', timeout, 'NX');
if (r) break;
debug(`locked, wait ${this.options.checkLockDelay}ms for ${lockName}`);
delayed = true;
await pwait(this.options.checkLockDelay);
} while (true);
if (delayed) debug(`unlocked for ${lockName}`);
return { lockTimeoutValue, delayed };
}
async getRaceLock(lockName, timeout, ignore) {
debug(`getting race lock for ${lockName}`);
let lockTimeoutValue = 0, delayed = false, ignored = false;
lockTimeoutValue = Date.now() + timeout + 1;
let key = this.getKey(lockName) + ':race';
let r = await this.cache.set(key, lockTimeoutValue, 'PX', timeout, 'NX');
if (r) {
debug(`${lockName} not locked`);
return { lockTimeoutValue, delayed, ignored};
}
if (ignore) {
debug(`ignore race lock for ${lockName}`);
ignored = true;
return { lockTimeoutValue, delayed, ignored };
}
while (true) {
debug(`race locked, wait ${this.options.checkLockDelay}ms for ${lockName}`);
delayed = true;
await pwait(this.options.checkLockDelay);
if (await this.cache.get(key) === null) break;
}
debug(`race unlocked for ${lockName}`);
return { lockTimeoutValue, delayed, ignored };
}
async all(lockName, timeout, task) {
// if (!lockName) throw new Error('need lockName');
if (!task && typeof timeout === 'function') {
task = timeout;
timeout = this.options.defaultTimeout;
}
if (typeof task !== 'function') throw new Error('task should be function returns Promise');
let { lockTimeoutValue, delayed } = await this.getAllLock(lockName, timeout);
let err = null, result = undefined;
debug(`executing task for ${lockName}`);
try {
result = await task(delayed);
} catch (_err) {
console.error(`Lock: task throws error for ${lockName}: `, _err);
err = _err;
}
debug(`task executed for ${lockName}`);
if (lockTimeoutValue > Date.now()) {
debug(`unlocking ${lockName}`);
await this.cache.del(this.getKey(lockName) + ':all');
}
if (err) throw err;
return result;
}
async race(lockName, timeout, task, ignore) {
// if (!lockName) throw new Error('need lockName');
if (typeof timeout === 'function') {
ignore = task;
task = timeout;
timeout = this.options.defaultTimeout;
}
if (typeof task !== 'function') throw new Error('task should be function returns Promise');
let { lockTimeoutValue, delayed, ignored } = await this.getRaceLock(lockName, timeout, ignore);
let err = null, result = undefined;
if (ignored || delayed) {
return {
executed: false,
result: null
};
}
debug(`executing race task for ${lockName}`);
try {
result = await task(delayed);
} catch (_err) {
if (!_err || !_err.cacheable) console.error(`Lock: task throws error for ${lockName}: `, _err);
err = _err;
}
debug(`task executed for ${lockName}`);
if (lockTimeoutValue > Date.now()) {
debug(`unlocking ${lockName}`);
await this.cache.del(this.getKey(lockName) + ':race');
}
if (err) throw err;
return {
executed: true,
result
};
}
}
module.exports = Lock;