diff --git a/lib/config.js b/lib/config.js index dbcf4765..7bd00be0 100644 --- a/lib/config.js +++ b/lib/config.js @@ -55,6 +55,12 @@ let Config = { }, }, }, + // Max number of redis payloads we attempt to process before just triggering a requery + maxRedisEventsToProcess: 300, + // Debounce interval after which we flush the queued redis payloads + debounceInterval: 100, + // Maximum wait time for flushing queued redis payloads + maxDebounceWait: 3000 }; export default Config; diff --git a/lib/mongo/lib/dispatchers.js b/lib/mongo/lib/dispatchers.js index f49b1a72..79de10c1 100644 --- a/lib/mongo/lib/dispatchers.js +++ b/lib/mongo/lib/dispatchers.js @@ -1,5 +1,4 @@ import { Meteor } from 'meteor/meteor'; -import { DDPServer } from 'meteor/ddp-server'; import { EJSON } from 'meteor/ejson'; import { Events, RedisPipe } from '../../constants'; import RedisSubscriptionManager from '../../redis/RedisSubscriptionManager'; @@ -8,21 +7,15 @@ import getDedicatedChannel from '../../utils/getDedicatedChannel'; import Config from '../../config'; import OptimisticInvocation from '../OptimisticInvocation'; -const dispatchEvents = function(optimistic, collectionName, channels, events) { +const dispatchEvents = function dispatchEventsFn(optimistic, collectionName, channels, events) { if (optimistic) { - OptimisticInvocation.withValue(true, () => { + OptimisticInvocation.withValue(true, () => { events.forEach(event => { const docId = event[RedisPipe.DOC]._id; - const dedicatedChannel = getDedicatedChannel( - collectionName, - docId - ); - RedisSubscriptionManager.process(dedicatedChannel, event); - - channels.forEach(channelName => { - RedisSubscriptionManager.process(channelName, event); - }); + const dedicatedChannel = getDedicatedChannel(collectionName, docId); + RedisSubscriptionManager.process(dedicatedChannel, [event]); }); + channels.forEach(channel => RedisSubscriptionManager.process(channel, events)); }); } diff --git a/lib/processors/actions/requery.js b/lib/processors/actions/requery.js index 78b00f25..28e9cf28 100644 --- a/lib/processors/actions/requery.js +++ b/lib/processors/actions/requery.js @@ -4,11 +4,9 @@ import { MongoIDMap } from '../../cache/mongoIdMap'; /** * @param observableCollection - * @param newCommer - * @param event - * @param modifiedFields + * @param documentMap */ -export default function (observableCollection, newCommer, event, modifiedFields) { +export default function (observableCollection, documentMap) { const { store, selector, options } = observableCollection; const newStore = new MongoIDMap(); @@ -16,28 +14,27 @@ export default function (observableCollection, newCommer, event, modifiedFields) selector, { ...options, fields: { _id: 1 } }).fetch(); freshIds.forEach(doc => newStore.set(doc._id, doc)); - let added = false; store.compareWith(newStore, { + // Any documents found only on the left store + // should be removed leftOnly(docId) { observableCollection.remove(docId); }, + // Any documents found in both and with documentMap entries + // have received redis updates indicating there are changes + both(docId) { + if (documentMap[docId]) { + observableCollection.change(documentMap[docId]) + } + }, + // Any documents only present in the right store are newly + // added rightOnly(docId) { - if (newCommer && EJSON.equals(docId, newCommer._id)) { - added = true; - observableCollection.add(newCommer); + if (documentMap[docId]) { + observableCollection.add(documentMap[docId]); } else { observableCollection.addById(docId); } } }); - - // if we have an update, and we have a newcommer, that new commer may be inside the ids - // TODO: maybe refactor this in a separate action (?) - if (newCommer - && Events.UPDATE === event - && modifiedFields - && !added - && store.has(newCommer._id)) { - observableCollection.change(newCommer, modifiedFields); - } } diff --git a/lib/processors/default.js b/lib/processors/default.js index 7af80f73..4d9f5889 100644 --- a/lib/processors/default.js +++ b/lib/processors/default.js @@ -1,32 +1,13 @@ -import { Events } from '../constants'; +import { Meteor } from 'meteor/meteor'; +import Config from '../config'; +import RedisPipe, { Events } from '../constants'; +import requery from './actions/requery'; /** * @param observableCollection - * @param event * @param doc - * @param modifiedFields */ -export default function(observableCollection, event, doc, modifiedFields) { - switch (event) { - case Events.INSERT: - handleInsert(observableCollection, doc); - break; - case Events.UPDATE: - handleUpdate(observableCollection, doc, modifiedFields); - break; - case Events.REMOVE: - handleRemove(observableCollection, doc); - break; - default: - throw new Meteor.Error(`Invalid event specified: ${event}`); - } -} - -/** - * @param observableCollection - * @param doc - */ -const handleInsert = function(observableCollection, doc) { +const handleInsert = (observableCollection, doc) => { if ( !observableCollection.contains(doc._id) && observableCollection.isEligible(doc) @@ -36,30 +17,63 @@ const handleInsert = function(observableCollection, doc) { }; /** - * @param observableCollection - * @param doc - * @param modifiedFields - */ -const handleUpdate = function(observableCollection, doc, modifiedFields) { +* @param observableCollection +* @param doc +* @param modifiedFields +*/ +const handleUpdate = (observableCollection, doc, modifiedFields) => { if (observableCollection.isEligible(doc)) { if (observableCollection.contains(doc._id)) { observableCollection.change(doc, modifiedFields); } else { observableCollection.add(doc); } - } else { - if (observableCollection.contains(doc._id)) { - observableCollection.remove(doc._id); - } + } else if (observableCollection.contains(doc._id)) { + observableCollection.remove(doc._id); } }; /** - * @param observableCollection - * @param doc - */ -const handleRemove = function(observableCollection, doc) { +* @param observableCollection +* @param doc +*/ +const handleRemove = (observableCollection, doc) => { if (observableCollection.contains(doc._id)) { observableCollection.remove(doc._id); } }; + +/** + * @param observableCollection + * @param events + * @param documentMap + */ +export default function(observableCollection, events, documentMap) { + const needsRequery = events.length > Config.maxRedisEventsToProcess; + + if (needsRequery) { + requery(observableCollection, documentMap); + return; + } + + for (let i = 0; i < events.length; i++) { + const event = events[i]; + const docId = event[RedisPipe.DOC]._id; + const modifiedFields = event[RedisPipe.FIELDS]; + const doc = documentMap[docId]; + + switch (event[RedisPipe.EVENT]) { + case Events.INSERT: + handleInsert(observableCollection, doc); + break; + case Events.UPDATE: + handleUpdate(observableCollection, doc, modifiedFields); + break; + case Events.REMOVE: + handleRemove(observableCollection, doc); + break; + default: + throw new Meteor.Error(`Invalid event specified: ${event}`); + } + } +} diff --git a/lib/processors/direct.js b/lib/processors/direct.js index af54d8ac..edc433df 100644 --- a/lib/processors/direct.js +++ b/lib/processors/direct.js @@ -1,32 +1,14 @@ -import { Events } from '../constants'; +import { Meteor } from 'meteor/meteor'; +import Config from '../config'; +import RedisPipe, { Events } from '../constants'; +import requery from './actions/requery'; -/** - * @param observableCollection - * @param event - * @param doc - * @param modifiedFields - */ -export default function(observableCollection, event, doc, modifiedFields) { - switch (event) { - case Events.UPDATE: - handleUpdate(observableCollection, doc, modifiedFields); - break; - case Events.REMOVE: - handleRemove(observableCollection, doc); - break; - case Events.INSERT: - handleInsert(observableCollection, doc); - break; - default: - throw new Meteor.Error(`Invalid event specified: ${event}`); - } -} /** * @param observableCollection * @param doc */ -const handleInsert = function(observableCollection, doc) { +const handleInsert = (observableCollection, doc) => { if ( !observableCollection.contains(doc._id) && observableCollection.isEligible(doc) @@ -36,11 +18,11 @@ const handleInsert = function(observableCollection, doc) { }; /** - * @param observableCollection - * @param doc - * @param modifiedFields - */ -const handleUpdate = function(observableCollection, doc, modifiedFields) { +* @param observableCollection +* @param doc +* @param modifiedFields +*/ +const handleUpdate = (observableCollection, doc, modifiedFields) => { const otherSelectors = observableCollection.__containsOtherSelectorsThanId; if (otherSelectors) { @@ -50,24 +32,56 @@ const handleUpdate = function(observableCollection, doc, modifiedFields) { } else { observableCollection.add(doc); } - } else { - if (observableCollection.contains(doc._id)) { + } else if (observableCollection.contains(doc._id)) { observableCollection.remove(doc._id); - } } + } else if (observableCollection.contains(doc._id)) { + observableCollection.change(doc, modifiedFields); } else { - if (observableCollection.contains(doc._id)) { - observableCollection.change(doc, modifiedFields); - } else { - observableCollection.add(doc); - } + observableCollection.add(doc); } }; /** - * @param observableCollection - * @param doc - */ -const handleRemove = function(observableCollection, doc) { +* @param observableCollection +* @param doc +*/ +const handleRemove = (observableCollection, doc) => { observableCollection.remove(doc._id); }; + + +/** + * @param observableCollection + * @param events + * @param documentMap + */ +export default function(observableCollection, events, documentMap) { + const needsRequery = events.length > Config.maxRedisEventsToProcess; + + if (needsRequery) { + requery(observableCollection, documentMap); + return; + } + + for (let i = 0; i < events.length; i++) { + const event = events[i]; + const docId = event[RedisPipe.DOC]._id; + const modifiedFields = event[RedisPipe.FIELDS]; + const doc = documentMap[docId]; + + switch (event[RedisPipe.EVENT]) { + case Events.INSERT: + handleInsert(observableCollection, doc); + break; + case Events.UPDATE: + handleUpdate(observableCollection, doc, modifiedFields); + break; + case Events.REMOVE: + handleRemove(observableCollection, doc); + break; + default: + throw new Meteor.Error(`Invalid event specified: ${event}`); + } + } +} \ No newline at end of file diff --git a/lib/processors/index.js b/lib/processors/index.js index 34805acc..309d05bb 100644 --- a/lib/processors/index.js +++ b/lib/processors/index.js @@ -14,7 +14,7 @@ const StrategyProcessorMap = { export { getStrategy } /** - * @param strategy + * @param {String} strategy * @returns {*} */ export function getProcessor(strategy) { diff --git a/lib/processors/limit-sort.js b/lib/processors/limit-sort.js index a2e5af50..da90e7fc 100644 --- a/lib/processors/limit-sort.js +++ b/lib/processors/limit-sort.js @@ -1,84 +1,98 @@ -import { Events } from '../constants'; +import { Meteor } from 'meteor/meteor'; +import RedisPipe, { Events } from '../constants'; import { hasSortFields } from './lib/fieldsExist'; import requery from './actions/requery'; +import Config from '../config'; -/** - * @param observableCollection - * @param event - * @param doc - * @param modifiedFields - */ -export default function(observableCollection, event, doc, modifiedFields) { - switch (event) { - case Events.INSERT: - handleInsert(observableCollection, doc); - break; - case Events.UPDATE: - handleUpdate(observableCollection, doc, modifiedFields); - break; - case Events.REMOVE: - handleRemove(observableCollection, doc); - break; - default: - throw new Meteor.Error(`Invalid event specified: ${event}`); - } -} /** * @param observableCollection * @param doc */ -const handleInsert = function(observableCollection, doc) { +const handleInsert = (observableCollection, doc) => { if (observableCollection.isEligible(doc)) { - requery(observableCollection, doc); + return true; } + + return false; }; /** - * @param observableCollection - * @param doc - * @param modifiedFields - */ -const handleUpdate = function(observableCollection, doc, modifiedFields) { +* @param observableCollection +* @param doc +* @param modifiedFields +*/ +const handleUpdate = (observableCollection, doc, modifiedFields) => { if (observableCollection.contains(doc._id)) { if (observableCollection.isEligible(doc)) { if ( hasSortFields(observableCollection.options.sort, modifiedFields) ) { - requery( - observableCollection, - doc, - Events.UPDATE, - modifiedFields - ); - } else { - observableCollection.change(doc, modifiedFields); + return true; } + + observableCollection.change(doc, modifiedFields); } else { - requery(observableCollection); - } - } else { - if (observableCollection.isEligible(doc)) { - requery( - observableCollection, - doc, - Events.UPDATE, - modifiedFields - ); + return true; } + } else if (observableCollection.isEligible(doc)) { + return true; } + + return false; +}; + +/** +* @param observableCollection +* @param doc +*/ +const handleRemove = (observableCollection, doc) => { + if (observableCollection.contains(doc._id)) { + return true; + } else if (observableCollection.options.skip) { + return true; + } + + return false; }; /** * @param observableCollection + * @param event * @param doc + * @param modifiedFields */ -const handleRemove = function(observableCollection, doc) { - if (observableCollection.contains(doc._id)) { - requery(observableCollection, doc); - } else { - if (observableCollection.options.skip) { - requery(observableCollection, doc); +export default function(observableCollection, events, documentMap) { + let needsRequery = events.length > Config.maxRedisEventsToProcess; + + if (!needsRequery) { + for (let i = 0; i < events.length; i++) { + const event = events[i]; + const docId = event[RedisPipe.DOC]._id; + const modifiedFields = event[RedisPipe.FIELDS]; + const doc = documentMap[docId]; + + switch (event[RedisPipe.EVENT]) { + case Events.INSERT: + needsRequery = handleInsert(observableCollection, doc); + break; + case Events.UPDATE: + needsRequery = handleUpdate(observableCollection, doc, modifiedFields); + break; + case Events.REMOVE: + needsRequery = handleRemove(observableCollection, doc); + break; + default: + throw new Meteor.Error(`Invalid event specified: ${event}`); + } + + if (needsRequery) { + break; + } } } -}; + + if (needsRequery) { + requery(observableCollection, documentMap); + } +} \ No newline at end of file diff --git a/lib/redis/RedisSubscriptionManager.js b/lib/redis/RedisSubscriptionManager.js index 08aa1131..5d875550 100644 --- a/lib/redis/RedisSubscriptionManager.js +++ b/lib/redis/RedisSubscriptionManager.js @@ -1,6 +1,7 @@ import { Meteor } from 'meteor/meteor'; import { Random } from 'meteor/random'; import { _ } from 'meteor/underscore'; +import debounce from 'lodash/debounce'; import debug from '../debug'; import { RedisPipe, Events } from '../constants'; import getFieldsOfInterestFromAll from './lib/getFieldsOfInterestFromAll'; @@ -78,12 +79,26 @@ class RedisSubscriptionManager { initializeChannel(channel) { debug(`[RedisSubscriptionManager] Subscribing to channel: ${channel}`); - // create the handler for this channel + let redisEvents = []; const self = this; - const handler = function(message) { - self.queue.queueTask(() => { - self.process(channel, message, true); - }); + + // debounce redis events so that they are processed in bulk + const flushRedisEventsForChannel = debounce( + () => { + const events = redisEvents.slice(); + redisEvents = []; + self.queue.queueTask(() => { + self.process(channel, events, true); + }); + }, + Config.debounceInterval, + { maxWait: Config.maxDebounceWait, trailing: true, leading: false } + ); + + // create the handler for this channel + const handler = (message) => { + redisEvents.push(message); + flushRedisEventsForChannel(); }; this.channelHandlers[channel] = handler; @@ -110,105 +125,110 @@ class RedisSubscriptionManager { /** * @param channel - * @param data - * @param [fromRedis=false] + * @param events + * @param fromRedis */ - process(channel, data, fromRedis) { - // messages from redis that contain our uid were handled - // optimistically, so we can drop them. - if (fromRedis && data[RedisPipe.UID] === this.uid) { - return; - } - + process(channel, events, fromRedis) { const subscribers = this.store[channel]; if (!subscribers) { return; } - let isSynthetic = data[RedisPipe.SYNTHETIC]; + const filteredEvents = []; + const syntheticEvents = []; + events.forEach(event => { + // Ignore any updates that have been processed optimistically + if (fromRedis && event[RedisPipe.UID] === this.uid) return; - debug( - `[RedisSubscriptionManager] Received ${ - isSynthetic ? 'synthetic ' : '' - }event: "${data[RedisPipe.EVENT]}" to "${channel}"` - ); - - if (subscribers.length === 0) { - return; - } - - if (!isSynthetic) { - const collection = subscribers[0].observableCollection.collection; - - let doc; - if (data[RedisPipe.EVENT] === Events.REMOVE) { - doc = data[RedisPipe.DOC]; + const isSynthetic = !!event[RedisPipe.SYNTHETIC]; + if (isSynthetic) { + syntheticEvents.push(event); } else { - doc = this.getDoc(collection, subscribers, data); + filteredEvents.push(event); } + }); - // if by any chance it was deleted after it got dispatched - // doc will be undefined - if (!doc) { - return; - } + // Determine the collection from the first observable collection + const collection = subscribers[0].observableCollection.collection; + const documentMap = this.getDocumentMapForEvents(collection, subscribers, filteredEvents); + // Process filtered events in bulk + if (filteredEvents.length) { subscribers.forEach(redisSubscriber => { try { - redisSubscriber.process( - data[RedisPipe.EVENT], - doc, - data[RedisPipe.FIELDS] - ); + redisSubscriber.process(filteredEvents, documentMap); } catch (e) { debug( `[RedisSubscriptionManager] Exception while processing event: ${e.toString()}` ); } }); - } else { - subscribers.forEach(redisSubscriber => { - try { - redisSubscriber.processSynthetic( - data[RedisPipe.EVENT], - data[RedisPipe.DOC], - data[RedisPipe.MODIFIER], - data[RedisPipe.MODIFIED_TOP_LEVEL_FIELDS] - ); - } catch (e) { - debug( - `[RedisSubscriptionManager] Exception while processing synthetic event: ${e.toString()}` - ); - } - }); + } + + // Individually process synthetic events + // TODO: process synthetic events in bulk + if (syntheticEvents.length) { + syntheticEvents.forEach(data => { + subscribers.forEach(redisSubscriber => { + try { + redisSubscriber.processSynthetic( + data[RedisPipe.EVENT], + data[RedisPipe.DOC], + data[RedisPipe.MODIFIER], + data[RedisPipe.MODIFIED_TOP_LEVEL_FIELDS] + ); + } catch (e) { + debug( + `[RedisSubscriptionManager] Exception while processing synthetic event: ${e.toString()}` + ); + } + }); + }) } } /** - * @param collection - * @param subscribers - * @param data + * Build a documentMap for the docIds in the redis events + * @param collection + * @param subscribers + * @param events */ - getDoc(collection, subscribers, data) { - const event = data[RedisPipe.EVENT]; - let doc = data[RedisPipe.DOC]; - - if (collection._redisOplog && !collection._redisOplog.protectAgainstRaceConditions) { - // If there's no protection against race conditions - // It means we have received the full doc in doc + getDocumentMapForEvents(collection, subscribers, events) { + const documentMap = {}; + const options = {}; - return doc; + // Calculate fields of interest across all subscribers and add the + // appropriate field limiting if necessary + const fieldsOfInterest = getFieldsOfInterestFromAll(subscribers); + if (fieldsOfInterest !== true) { + options.fields = fieldsOfInterest; } - const fieldsOfInterest = getFieldsOfInterestFromAll(subscribers); + const docIdsToFetch = []; + events.forEach(event => { + const doc = event[RedisPipe.DOC]; + if (collection._redisOplog && !collection._redisOplog.protectAgainstRaceConditions) { + // If there's no protection against race conditions + // It means we have received the full doc in doc + documentMap[doc._id] = doc; + } + // no need to fetch full documents for the remove event + else if (event[RedisPipe.EVENT] === Events.REMOVE) { + documentMap[doc._id] = doc; + } else { + docIdsToFetch.push(doc._id); + } + }); - if (fieldsOfInterest === true) { - doc = collection.findOne(doc._id); - } else { - doc = collection.findOne(doc._id, { fields: fieldsOfInterest }); + // Execute a single bulk fetch for all docIds that need to be fetched and store them in + // the document map + if (docIdsToFetch.length) { + collection.find({ _id: { $in: docIdsToFetch } }, options).forEach(doc => { + documentMap[doc._id] = doc; + }); } - return doc; + return documentMap; } } diff --git a/package.js b/package.js index a5ac9cc0..724bb6a9 100644 --- a/package.js +++ b/package.js @@ -53,7 +53,7 @@ Package.onTest(function(api) { api.use('matb33:collection-hooks@1.1.2'); api.use('alanning:roles@3.5.1'); - api.use(['meteortesting:mocha']); + api.use(['meteortesting:mocha@1.0.0', 'meteortesting:mocha-core@1.0.1']); api.mainModule('testing/main.server.js', 'server'); api.addFiles('testing/publishComposite/boot.js', 'server'); diff --git a/testing/main.client.js b/testing/main.client.js index 7ce6b9d0..28848469 100644 --- a/testing/main.client.js +++ b/testing/main.client.js @@ -5,6 +5,7 @@ import './synthetic_mutators'; import './client_side_mutators'; import './publishComposite/client.test'; import './optimistic-ui/client.test'; +import './payload_batching.client'; // import './server-autorun/client'; import './transformations/client'; import './publish-counts/client'; diff --git a/testing/payload_batching.client.js b/testing/payload_batching.client.js new file mode 100644 index 00000000..29e366bb --- /dev/null +++ b/testing/payload_batching.client.js @@ -0,0 +1,145 @@ +import { Random } from 'meteor/random'; +import {Collections, config} from './boot'; +import helperGenerator from './lib/helpers'; +import Config from '../lib/config'; + +const Collection = Collections['Standard']; + +describe('Redis Payload Batching', function () { + const { + update, + createSync, + subscribe, + waitForHandleToBeReady + } = helperGenerator(config['Standard'].suffix); + + it('Should process all the updates at once since we are not waiting more than the debounceInterval between updates', async function(done) { + const docId = Random.id(); + let handle = subscribe({ _id: docId }); + + await createSync({ _id: docId, value: -1 }); + + await waitForHandleToBeReady(handle); + + let changes = 0; + const expectedChanges = 1; + Collection.find({ _id: docId }).observeChanges({ + changed() { + changes += 1; + + // ensure we don't receive more updates than expected + if (changes === expectedChanges) { + setTimeout(() => { + if (changes === expectedChanges) done(); + else throw new Error('Too many changes') + }, 200); + } + }, + }); + + // kick off several updates + for (let i = 0; i < 10; i++) { + update( + { _id: docId }, + { + $set: { + value: i, + }, + }, + { optimistic: false, pushToRedis: true } + ); + } + }); + + it('Should correctly process each update separately since we are waiting longer than the debounce interval between updates', async function(done) { + const docId = Random.id(); + let handle = subscribe({ _id: docId }); + // We wait twice the debounce interval to ensure that any payloads that were received + // and debounced by the server would have been processed + const sleepInterval = 2 * Config.debounceInterval; + // Execute multiple updates to confirm that those updates are not being batched + const numUpdates = 3; + // Since we are sleeping more than the debounce interval we expect our total number + // of changes received from the server to equal the number of updates + const expectedChanges = numUpdates; + + await createSync({ _id: docId, value: -1 }); + + await waitForHandleToBeReady(handle); + + let changes = 0; + Collection.find({ _id: docId }).observeChanges({ + changed() { + changes += 1; + + // ensure we receive the expected number of change events + if (changes === expectedChanges) { + done(); + } + }, + }); + + // kick off several updates + for (let i = 0; i < numUpdates; i++) { + update( + { _id: docId }, + { + $set: { + value: i, + }, + }, + { optimistic: false, pushToRedis: true } + ); + // wait till new debounce interval + await new Promise(resolve => setTimeout(resolve, sleepInterval)); + } + }); + + it('Should correctly use maxWait to batch changes if we exceed the first debounce window', async function(done) { + const docId = Random.id(); + let handle = subscribe({ _id: docId }); + // We set a short sleep interval here because we want to process more than one + // update in the same batch + const sleepInterval = 30; + // We execute 101 updates here so that the total execution time here is 30ms * 101 updates = 3030ms + // This ensures that our final update happens after the maxWait and should be processed in two batches + const numUpdates = 101; + // Since we should see our updates processed in two batches, we expect to receive only two changed events + // from the server + const expectedChanges = 2; + + await createSync({ _id: docId, value: -1 }); + + await waitForHandleToBeReady(handle); + + let changes = 0; + Collection.find({ _id: docId }).observeChanges({ + changed() { + changes += 1; + + // ensure we don't receive more updates than expected + if (changes === expectedChanges) { + setTimeout(() => { + if (changes === expectedChanges) done(); + else throw new Error('Too many changes') + }, 200); + } + }, + }) + + // kick off several updates + for (let i = 0; i < numUpdates; i++) { + update( + { _id: docId }, + { + $set: { + value: i, + }, + }, + { optimistic: false, pushToRedis: true } + ); + // wait till new debounce interval + await new Promise(resolve => setTimeout(resolve, sleepInterval)); + } + }).timeout(5000); +}); \ No newline at end of file