-
Notifications
You must be signed in to change notification settings - Fork 0
/
index.js
353 lines (313 loc) · 15.6 KB
/
index.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
/* Copyright 2019 ScaleOut Software, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
'use strict';
const TimeWindow = require('./timeWindow');
/**
* User-supplied callback that extracts a timestamp from an element in a time-ordered array of events.
*
* @callback timestampSelector
* @param {any} elem - Timestamped element in a time-ordered array of events.
* @returns {number} The time that the elem argument occurred, represented as milliseconds elapsed since January 1, 1970 00:00:00 UTC.
*/
/**
* Transforms an ordered array into an array of sliding windows. The source array must be sorted chronologically.
* @param {Array} sourceArray - Array of time-ordered elements to transform.
* @param {timestampSelector} timestampSelector - Function to extract a timestamp from elements in the source array.
* @param {number} windowDuration - Duration of each time window in milliseconds. This is a maximum value that will be shortened for the last window(s) in the returned sequence.
* @param {number} every - The period of time, in milliseconds, between the start of each sliding window.
* @param {number} start - Start time (inclusive) of the first sliding window, expressed as milliseconds elapsed since January 1, 1970 00:00:00 UTC. If undefined, the timestamp of the array's first element will be used.
* @param {number} end - End time (exclusive) for the last sliding window(s), expressed as milliseconds elapsed since January 1, 1970 00:00:00 UTC. If undefined, the timestamp of the array's last element will be used, and the last window's end time will be inclusive.
* @returns {TimeWindow[]} An array of TimeWindow instances.
*/
function toSlidingWindows(sourceArray, timestampSelector, windowDuration, every, start, end) {
if (!Array.isArray(sourceArray)) {
throw new TypeError('sourceArray must be an Array instance');
}
if (typeof timestampSelector !== 'function') {
throw new TypeError(timestampSelector + ' is not a function');
}
if (every == null || !Number.isInteger(every)) {
throw new TypeError('The "every" argument must be an integer representing a duration in milliseconds.');
}
if (sourceArray.length === 0) {
return [];
}
// Automatically deduce start/end times if none are provided:
if (start == null) {
start = timestampSelector(sourceArray[0]);
}
let autoEndTime = false;
if (end == null) {
// Look at the last item's timestamp and use it as the
// end time for this transform (we also make the end date inclusive for the
// final window(s)--otherwise the last item wouldn't be included).
end = timestampSelector(sourceArray[sourceArray.length -1]);
autoEndTime = true;
}
if (!Number.isInteger(start) || !Number.isInteger(end)) {
throw new TypeError('start and end time arguments must be integers (typically milliseconds elapsed since January 1, 1970 00:00:00 UTC.');
}
if (start > end) {
throw new TypeError('start time occurs after end time.');
}
// create (empty) windows
const windows = [];
let winStart = start;
while (winStart < end)
{
let actualDur = windowDuration;
if ((winStart + windowDuration) > end) {
actualDur = end - winStart;
}
const isLastWindow = ((winStart + every) < end) ? false : true;
let isEndInclusive = false;
if (autoEndTime && isLastWindow) {
// this window bumps up agains the final endDate, and the user is letting
// the algorithm automatically pick the end time. In this
// scenario, we make the end time of the final window inclusive.
isEndInclusive = true;
}
// Leaving the TimeWindow's sourceIndex & length members undefined here--will populate them
// in the next loop that does a pass through the sourceArray.
windows.push(new TimeWindow(sourceArray, winStart, winStart + actualDur, isEndInclusive));
winStart += every;
}
// calculate each Window's offset and length in the source array. We memoize
// the prior window's starting point so we don't have to traverse the entire
// array to find each window's starting point.
let startingIndexMemo = 0;
for (const win of windows) {
let foundStart = false;
for (let i = startingIndexMemo; i < sourceArray.length; i++) {
const timestamp = timestampSelector(sourceArray[i]);
if (!foundStart) {
win.sourceIndex = i;
if (timestamp < win.start) {
continue; // keep looking for a start index in the source array.
}
else
{
// We found the starting index for the window.
foundStart = true;
startingIndexMemo = i;
}
}
if ((win.isEndInclusive && timestamp > win.end) ||
(!win.isEndInclusive && timestamp >= win.end)) {
// Element is outside of the window's end time. Close it out and move on to
// the next window.
win.length = (i - win.sourceIndex);
break;
}
if (i === (sourceArray.length - 1)) {
// Last element in the source array. Close out the window.
win.length = (sourceArray.length - win.sourceIndex);
break;
}
}
}
return windows;
}
/**
* Transforms an ordered array into an array of non-overlapped, fixed-time windows. The source array must be sorted chronologically.
* @param {Array} sourceArray - Array of time-ordered elements to transform.
* @param {timestampSelector} timestampSelector - Function to extract a timestamp from elements in the source array.
* @param {number} windowDuration - Duration of each time window in milliseconds. This is a maximum value that may be shortened for the last window in the returned sequence.
* @param {number} start - Start time (inclusive) of the first sliding window, expressed as milliseconds elapsed since January 1, 1970 00:00:00 UTC. If undefined, the timestamp of the array's first element will be used.
* @param {number} end - End time (exclusive) for the last sliding window(s), expressed as milliseconds elapsed since January 1, 1970 00:00:00 UTC. If undefined, the timestamp of the array's last element will be used, and the last window's end time will be inclusive.
* @returns {TimeWindow[]} An array of TimeWindow instances.
*/
function toTumblingWindows(sourceArray, timestampSelector, windowDuration, start, end) {
return toSlidingWindows(sourceArray, timestampSelector, windowDuration, windowDuration, start, end);
}
/**
* Transforms an ordered array into an array of session windows. The source array must be sorted chronologically.
* @param {Array} sourceArray - Array of time-ordered elements to transform.
* @param {timestampSelector} timestampSelector - Function to extract a timestamp from elements in the source array.
* @param {number} idleThreshold - max allowed time gap between elements before a new session is started, in milliseconds.
* @returns {TimeWindow[]} An array of TimeWindow instances.
*/
function toSessionWindows(sourceArray, timestampSelector, idleThreshold) {
if (!Array.isArray(sourceArray)) {
throw new TypeError('sourceArray must be an Array instance');
}
if (typeof timestampSelector !== 'function') {
throw new TypeError(timestampSelector + ' is not a function');
}
if (!Number.isInteger(idleThreshold) || idleThreshold <= 0) {
throw new TypeError('The "idleThreshold" argument must be a positive integer representing a duration in milliseconds.');
}
if (sourceArray.length === 0) {
return [];
}
const windows = [];
const firstTimestamp = timestampSelector(sourceArray[0]);
let currentWindowStartTime = firstTimestamp;
let currentWindowEndTime = firstTimestamp;
let currentWindowStartIndex = 0;
for (let i = 1; i < sourceArray.length; i++) {
const elem = sourceArray[i];
const timestamp = timestampSelector(elem);
if (timestamp - currentWindowEndTime > idleThreshold) {
// idle threshold exceeded; close out current window and create a new one for this element.
windows.push(new TimeWindow(sourceArray, currentWindowStartTime, currentWindowEndTime, true, currentWindowStartIndex, i - currentWindowStartIndex));
currentWindowStartTime = currentWindowEndTime = timestamp;
currentWindowStartIndex = i;
}
else {
currentWindowEndTime = timestamp;
}
}
// close out the last window.
windows.push(new TimeWindow(sourceArray, currentWindowStartTime, currentWindowEndTime, true, currentWindowStartIndex, sourceArray.length - currentWindowStartIndex));
return windows;
}
/**
* Adds one or more elements to a time-ordered array of items, inserting them in chronological order.
* @param {Array} arr - destination array for new element(s).
* @param {timestampSelector} timestampSelector - function to extract a timestamp from an element.
* @param {...any} values - value(s) to add to the array.
*/
function addToOrdered(arr, timestampSelector, ...values) {
if (!Array.isArray(arr)) {
throw new TypeError('arr must be an Array instance');
}
if (typeof timestampSelector !== 'function') {
throw new TypeError(timestampSelector + ' is not a function');
}
for (let i = 0; i < values.length; i++) {
const timestamp = timestampSelector(values[i]);
if (timestamp == null) {
throw new Error('timestampSelector returned null/undefined.');
}
// We assume that the incoming values are the newest events and will
// therefore be inserted at/near the end of the array. So rather than sorting
// the entire array every time we do an add, we do a reverse linear search to
// find the location. If our assumption holds then performance will be near O(1).
let insertPosition = arr.length;
while (insertPosition > 0) {
if (timestamp < timestampSelector(arr[insertPosition - 1]))
insertPosition--;
else
break;
}
arr.splice(insertPosition, 0, values[i]);
}
}
/**
* Adds one or more elements to a time-ordered array of items, inserting them in chronological order. If
* the size of the destination array exceeds the supplied maxArraySize argument, the oldest elements in
* the destination array will be evicted.
* @param {Array} arr - destination array for new element(s).
* @param {number} maxArraySize - max allowed size of destination array before eviction begins.
* @param {timestampSelector} timestampSelector - function to extract a timestamp from an element.
* @param {...any} values - value(s) to add to the array.
*/
function addToOrderedAndEvictOldest(arr, maxArraySize, timestampSelector, ...values) {
if (!Number.isInteger(maxArraySize) || maxArraySize <= 0) {
throw new RangeError('maxArraySize must be a positive integer.');
}
addToOrdered(arr, timestampSelector, ...values);
const removeCount = arr.length - maxArraySize;
if (removeCount > 0) {
removeFirstItems(arr, removeCount);
}
}
/**
* Adds one or more elements to a time-ordered array of items, inserting them in chronological order. Any
* elements in the array with a timestamp prior to the supplied startTime argument will be evicted from the
* destination array.
* @param {Array} arr - destination array for new element(s).
* @param {Date|number} startTime - start time (inclusive) of the first allowed element in the destination array.
* @param {timestampSelector} timestampSelector - function to extract a timestamp from an element.
* @param {...any} values - value(s) to add to the array.
*/
function addToOrderedAndEvictBefore(arr, startTime, timestampSelector, ...values) {
addToOrdered(arr, timestampSelector, ...values);
// find index of first element to keep.
let removeCount = 0;
while (removeCount < arr.length)
{
if (timestampSelector(arr[removeCount]) < startTime)
removeCount++;
else
break;
}
if (removeCount > 0)
removeFirstItems(arr, removeCount);
}
/**
* Adds one or more elements to a time-ordered array of items, inserting them in chronological order. If
* the number of sessions in the destination array exceeds the supplied maxSessionCount argument, elements
* in the oldest session(s) in the destination array will be evicted.
* @param {Array} arr - destination array for new element(s).
* @param {number} maxSessionCount - max allowed number of sessions.
* @param {timestampSelector} timestampSelector - function to extract a timestamp from an element.
* @param {number} idleThreshold - max allowed time gap between elements before a new session is started, in milliseconds.
* @param {...any} values - value(s) to add to the array.
*/
function addToOrderedAndEvictSessions(arr, maxSessionCount, timestampSelector, idleThreshold, ...values) {
if (!Number.isInteger(maxSessionCount) || maxSessionCount < 1) {
throw new RangeError('maxSessionCount must be a positive integer.');
}
if (!Number.isInteger(idleThreshold) || idleThreshold <= 0) {
throw new TypeError('The "idleThreshold" argument must be a positive integer representing a duration in milliseconds.');
}
addToOrdered(arr, timestampSelector, ...values);
const sessions = toSessionWindows(arr, timestampSelector, idleThreshold);
const sessionRemoveCount = sessions.length - maxSessionCount;
if (sessionRemoveCount > 0) {
const removeCount = sessions[sessionRemoveCount].sourceIndex;
removeFirstItems(arr, removeCount);
}
}
/**
* Perform in-place removal of the first N elements in an array.
* @param {Array} arr - array to be modified.
* @param {number} count - Number of elements to remove from the front of the array.
*/
function removeFirstItems(arr, count) {
if (!Array.isArray(arr)) {
throw new TypeError('arr must be an Array instance');
}
if (!Number.isInteger(count) || count < 0) {
throw new RangeError('count must be a positive integer.');
}
if (count > arr.length) {
throw new RangeError('count cannot be larger than array\'s length');
}
if (count === 1) {
arr.shift();
}
else if (count >= 1) {
// Instead of repeatedly making the expensive shift() call,
// we manually shift elements in a single pass:
for (let from = count, to = 0; from < arr.length; from++, to++) {
arr[to] = arr[from];
}
// Use Array.length to truncate leftover elements:
arr.length = arr.length - count;
}
}
module.exports = {
toSlidingWindows: toSlidingWindows,
toTumblingWindows: toTumblingWindows,
toSessionWindows: toSessionWindows,
addToOrdered: addToOrdered,
addToOrderedAndEvictOldest: addToOrderedAndEvictOldest,
addToOrderedAndEvictBefore: addToOrderedAndEvictBefore,
addToOrderedAndEvictSessions: addToOrderedAndEvictSessions,
removeFirstItems: removeFirstItems
};