From 889e61c391902e112e12a99e951d15bd52a6f3f5 Mon Sep 17 00:00:00 2001 From: Eric Maciel Date: Thu, 19 Nov 2020 15:30:52 -0500 Subject: [PATCH 01/12] Bulk redis payload processing --- lib/config.js | 2 + lib/mongo/lib/dispatchers.js | 19 ++-- lib/processors/actions/requery.js | 30 ++---- lib/processors/default.js | 114 +++++++++++--------- lib/processors/direct.js | 118 +++++++++++--------- lib/processors/index.js | 2 +- lib/processors/limit-sort.js | 122 +++++++++++---------- lib/redis/RedisSubscriber.js | 6 +- lib/redis/RedisSubscriptionManager.js | 149 +++++++++++++------------- 9 files changed, 294 insertions(+), 268 deletions(-) diff --git a/lib/config.js b/lib/config.js index dbcf4765..5ce6f9f2 100644 --- a/lib/config.js +++ b/lib/config.js @@ -55,6 +55,8 @@ let Config = { }, }, }, + maxRedisEventsToProcess: 1000, + debounceInterval: 200, }; export default Config; diff --git a/lib/mongo/lib/dispatchers.js b/lib/mongo/lib/dispatchers.js index f49b1a72..43fa11a8 100644 --- a/lib/mongo/lib/dispatchers.js +++ b/lib/mongo/lib/dispatchers.js @@ -1,5 +1,4 @@ import { Meteor } from 'meteor/meteor'; -import { DDPServer } from 'meteor/ddp-server'; import { EJSON } from 'meteor/ejson'; import { Events, RedisPipe } from '../../constants'; import RedisSubscriptionManager from '../../redis/RedisSubscriptionManager'; @@ -8,20 +7,18 @@ import getDedicatedChannel from '../../utils/getDedicatedChannel'; import Config from '../../config'; import OptimisticInvocation from '../OptimisticInvocation'; -const dispatchEvents = function(optimistic, collectionName, channels, events) { +const dispatchEvents = function dispatchEventsFn(optimistic, collectionName, channels, events) { if (optimistic) { OptimisticInvocation.withValue(true, () => { + channels.forEach(channel => RedisSubscriptionManager.process(channel, events)); + events.forEach(event => { - const docId = event[RedisPipe.DOC]._id; - const dedicatedChannel = getDedicatedChannel( - collectionName, - docId + const docId = event[RedisPipe.DOC]._id; + const dedicatedChannel = getDedicatedChannel( + collectionName, + docId ); - RedisSubscriptionManager.process(dedicatedChannel, event); - - channels.forEach(channelName => { - RedisSubscriptionManager.process(channelName, event); - }); + RedisSubscriptionManager.process(dedicatedChannel, [event]); }); }); } diff --git a/lib/processors/actions/requery.js b/lib/processors/actions/requery.js index e23b311e..d57fcc85 100644 --- a/lib/processors/actions/requery.js +++ b/lib/processors/actions/requery.js @@ -1,15 +1,10 @@ -import { _ } from 'meteor/underscore'; -import { EJSON } from 'meteor/ejson'; -import { Events } from '../../constants'; import { MongoIDMap } from '../../cache/mongoIdMap'; /** * @param observableCollection - * @param newCommer - * @param event - * @param modifiedFields + * @param documentMap */ -export default function (observableCollection, newCommer, event, modifiedFields) { +export default function (observableCollection, documentMap) { const { store, selector, options } = observableCollection; const newStore = new MongoIDMap(); @@ -17,28 +12,21 @@ export default function (observableCollection, newCommer, event, modifiedFields) selector, { ...options, fields: { _id: 1 } }).fetch(); freshIds.forEach(doc => newStore.set(doc._id, doc)); - let added = false; store.compareWith(newStore, { leftOnly(docId) { observableCollection.remove(docId); }, + both(docId) { + if (documentMap[docId]) { + observableCollection.change(documentMap[docId]) + } + }, rightOnly(docId) { - if (newCommer && EJSON.equals(docId, newCommer._id)) { - added = true; - observableCollection.add(newCommer); + if (documentMap[docId]) { + observableCollection.add(documentMap[docId]); } else { observableCollection.addById(docId); } } }); - - // if we have an update, and we have a newcommer, that new commer may be inside the ids - // TODO: maybe refactor this in a separate action (?) - if (newCommer - && Events.UPDATE === event - && modifiedFields - && !added - && store.has(newCommer._id)) { - observableCollection.change(newCommer, modifiedFields); - } } diff --git a/lib/processors/default.js b/lib/processors/default.js index 7af80f73..40c53810 100644 --- a/lib/processors/default.js +++ b/lib/processors/default.js @@ -1,65 +1,79 @@ -import { Events } from '../constants'; +import { Meteor } from 'meteor/meteor'; +import Config from '../config'; +import RedisPipe, { Events } from '../constants'; +import requery from './actions/requery'; /** * @param observableCollection - * @param event * @param doc - * @param modifiedFields */ -export default function(observableCollection, event, doc, modifiedFields) { - switch (event) { - case Events.INSERT: - handleInsert(observableCollection, doc); - break; - case Events.UPDATE: - handleUpdate(observableCollection, doc, modifiedFields); - break; - case Events.REMOVE: - handleRemove(observableCollection, doc); - break; - default: - throw new Meteor.Error(`Invalid event specified: ${event}`); - } -} +const handleInsert = (observableCollection, doc) => { + if ( + !observableCollection.contains(doc._id) && + observableCollection.isEligible(doc) + ) { + observableCollection.add(doc); + } +}; /** - * @param observableCollection - * @param doc - */ -const handleInsert = function(observableCollection, doc) { - if ( - !observableCollection.contains(doc._id) && - observableCollection.isEligible(doc) - ) { - observableCollection.add(doc); - } +* @param observableCollection +* @param doc +* @param modifiedFields +*/ +const handleUpdate = (observableCollection, doc, modifiedFields) => { + if (observableCollection.isEligible(doc)) { + if (observableCollection.contains(doc._id)) { + observableCollection.change(doc, modifiedFields); + } else { + observableCollection.add(doc); + } + } else if (observableCollection.contains(doc._id)) { + observableCollection.remove(doc._id); + } }; /** - * @param observableCollection - * @param doc - * @param modifiedFields - */ -const handleUpdate = function(observableCollection, doc, modifiedFields) { - if (observableCollection.isEligible(doc)) { - if (observableCollection.contains(doc._id)) { - observableCollection.change(doc, modifiedFields); - } else { - observableCollection.add(doc); - } - } else { - if (observableCollection.contains(doc._id)) { - observableCollection.remove(doc._id); - } - } +* @param observableCollection +* @param doc +*/ +const handleRemove = (observableCollection, doc) => { + if (observableCollection.contains(doc._id)) { + observableCollection.remove(doc._id); + } }; /** - * @param observableCollection - * @param doc + * @param observableCollection + * @param events + * @param documentMap */ -const handleRemove = function(observableCollection, doc) { - if (observableCollection.contains(doc._id)) { - observableCollection.remove(doc._id); +export default function(observableCollection, events, documentMap) { + const needsRequery = events.length > Config.maxRedisEventsToProcess; + + if (needsRequery) { + requery(observableCollection, documentMap); + return; } -}; + + for (let i = 0; i < events.length; i++) { + const event = events[i]; + const docId = event[RedisPipe.DOC]._id; + const modifiedFields = event[RedisPipe.FIELDS]; + const doc = documentMap[docId]; + + switch (event[RedisPipe.EVENT]) { + case Events.INSERT: + handleInsert(observableCollection, doc); + break; + case Events.UPDATE: + handleUpdate(observableCollection, doc, modifiedFields); + break; + case Events.REMOVE: + handleRemove(observableCollection, doc); + break; + default: + throw new Meteor.Error(`Invalid event specified: ${event}`); + } + } +} diff --git a/lib/processors/direct.js b/lib/processors/direct.js index af54d8ac..ec62f451 100644 --- a/lib/processors/direct.js +++ b/lib/processors/direct.js @@ -1,32 +1,14 @@ -import { Events } from '../constants'; +import { Meteor } from 'meteor/meteor'; +import Config from '../config'; +import RedisPipe, { Events } from '../constants'; +import requery from './actions/requery'; -/** - * @param observableCollection - * @param event - * @param doc - * @param modifiedFields - */ -export default function(observableCollection, event, doc, modifiedFields) { - switch (event) { - case Events.UPDATE: - handleUpdate(observableCollection, doc, modifiedFields); - break; - case Events.REMOVE: - handleRemove(observableCollection, doc); - break; - case Events.INSERT: - handleInsert(observableCollection, doc); - break; - default: - throw new Meteor.Error(`Invalid event specified: ${event}`); - } -} /** * @param observableCollection * @param doc */ -const handleInsert = function(observableCollection, doc) { +const handleInsert = (observableCollection, doc) => { if ( !observableCollection.contains(doc._id) && observableCollection.isEligible(doc) @@ -36,38 +18,70 @@ const handleInsert = function(observableCollection, doc) { }; /** - * @param observableCollection - * @param doc - * @param modifiedFields - */ -const handleUpdate = function(observableCollection, doc, modifiedFields) { - const otherSelectors = observableCollection.__containsOtherSelectorsThanId; +* @param observableCollection +* @param doc +* @param modifiedFields +*/ +const handleUpdate = (observableCollection, doc, modifiedFields) => { + const otherSelectors = observableCollection.__containsOtherSelectorsThanId; - if (otherSelectors) { - if (observableCollection.isEligible(doc)) { - if (observableCollection.contains(doc._id)) { - observableCollection.change(doc, modifiedFields); - } else { - observableCollection.add(doc); - } - } else { - if (observableCollection.contains(doc._id)) { - observableCollection.remove(doc._id); - } - } - } else { - if (observableCollection.contains(doc._id)) { - observableCollection.change(doc, modifiedFields); - } else { - observableCollection.add(doc); - } - } + if (otherSelectors) { + if (observableCollection.isEligible(doc)) { + if (observableCollection.contains(doc._id)) { + observableCollection.change(doc, modifiedFields); + } else { + observableCollection.add(doc); + } + } else if (observableCollection.contains(doc._id)) { + observableCollection.remove(doc._id); + } + } else if (observableCollection.contains(doc._id)) { + observableCollection.change(doc, modifiedFields); + } else { + observableCollection.add(doc); + } }; /** - * @param observableCollection - * @param doc - */ -const handleRemove = function(observableCollection, doc) { +* @param observableCollection +* @param doc +*/ +const handleRemove = (observableCollection, doc) => { observableCollection.remove(doc._id); }; + + +/** + * @param observableCollection + * @param events + * @param documentMap + */ +export default function(observableCollection, events, documentMap) { + const needsRequery = events.length > Config.maxRedisEventsToProcess;; + + if (needsRequery) { + requery(observableCollection, documentMap); + return; + } + + for (let i = 0; i < events.length; i++) { + const event = events[i]; + const docId = event[RedisPipe.DOC]._id; + const modifiedFields = event[RedisPipe.FIELDS]; + const doc = documentMap[docId]; + + switch (event[RedisPipe.EVENT]) { + case Events.INSERT: + handleInsert(observableCollection, doc); + break; + case Events.UPDATE: + handleUpdate(observableCollection, doc, modifiedFields); + break; + case Events.REMOVE: + handleRemove(observableCollection, doc); + break; + default: + throw new Meteor.Error(`Invalid event specified: ${event}`); + } + } +} \ No newline at end of file diff --git a/lib/processors/index.js b/lib/processors/index.js index 34805acc..3f5494b9 100644 --- a/lib/processors/index.js +++ b/lib/processors/index.js @@ -14,7 +14,7 @@ const StrategyProcessorMap = { export { getStrategy } /** - * @param strategy + * @param strategy * @returns {*} */ export function getProcessor(strategy) { diff --git a/lib/processors/limit-sort.js b/lib/processors/limit-sort.js index a2e5af50..99f8deb2 100644 --- a/lib/processors/limit-sort.js +++ b/lib/processors/limit-sort.js @@ -1,84 +1,98 @@ -import { Events } from '../constants'; +import { Meteor } from 'meteor/meteor'; +import RedisPipe, { Events } from '../constants'; import { hasSortFields } from './lib/fieldsExist'; import requery from './actions/requery'; +import Config from '../config'; -/** - * @param observableCollection - * @param event - * @param doc - * @param modifiedFields - */ -export default function(observableCollection, event, doc, modifiedFields) { - switch (event) { - case Events.INSERT: - handleInsert(observableCollection, doc); - break; - case Events.UPDATE: - handleUpdate(observableCollection, doc, modifiedFields); - break; - case Events.REMOVE: - handleRemove(observableCollection, doc); - break; - default: - throw new Meteor.Error(`Invalid event specified: ${event}`); - } -} /** * @param observableCollection * @param doc */ -const handleInsert = function(observableCollection, doc) { +const handleInsert = (observableCollection, doc) => { if (observableCollection.isEligible(doc)) { - requery(observableCollection, doc); + return true; } + + return false; }; /** - * @param observableCollection - * @param doc - * @param modifiedFields - */ -const handleUpdate = function(observableCollection, doc, modifiedFields) { +* @param observableCollection +* @param doc +* @param modifiedFields +*/ +const handleUpdate = (observableCollection, doc, modifiedFields) => { if (observableCollection.contains(doc._id)) { if (observableCollection.isEligible(doc)) { if ( hasSortFields(observableCollection.options.sort, modifiedFields) ) { - requery( - observableCollection, - doc, - Events.UPDATE, - modifiedFields - ); - } else { - observableCollection.change(doc, modifiedFields); + return true; } + + observableCollection.change(doc, modifiedFields); } else { - requery(observableCollection); - } - } else { - if (observableCollection.isEligible(doc)) { - requery( - observableCollection, - doc, - Events.UPDATE, - modifiedFields - ); + return true; } + } else if (observableCollection.isEligible(doc)) { + return true; } + + return false; +}; + +/** +* @param observableCollection +* @param doc +*/ +const handleRemove = (observableCollection, doc) => { + if (observableCollection.contains(doc._id)) { + return true; + } else if (observableCollection.options.skip) { + return true; + } + + return false; }; /** * @param observableCollection + * @param event * @param doc + * @param modifiedFields */ -const handleRemove = function(observableCollection, doc) { - if (observableCollection.contains(doc._id)) { - requery(observableCollection, doc); - } else { - if (observableCollection.options.skip) { - requery(observableCollection, doc); +export default function(observableCollection, events, documentMap) { + let needsRequery = events.length > Config.maxRedisEventsToProcess;; + + if (!needsRequery) { + for (let i = 0; i < events.length; i++) { + const event = events[i]; + const docId = event[RedisPipe.DOC]._id; + const modifiedFields = event[RedisPipe.FIELDS]; + const doc = documentMap[docId]; + + switch (event[RedisPipe.EVENT]) { + case Events.INSERT: + needsRequery = handleInsert(observableCollection, doc); + break; + case Events.UPDATE: + needsRequery = handleUpdate(observableCollection, doc, modifiedFields); + break; + case Events.REMOVE: + needsRequery = handleRemove(observableCollection, doc); + break; + default: + throw new Meteor.Error(`Invalid event specified: ${event}`); + } + + if (needsRequery) { + break; + } } } -}; + + if (needsRequery) { + requery(observableCollection, documentMap); + } +} \ No newline at end of file diff --git a/lib/redis/RedisSubscriber.js b/lib/redis/RedisSubscriber.js index e2791ea7..09a90375 100644 --- a/lib/redis/RedisSubscriber.js +++ b/lib/redis/RedisSubscriber.js @@ -46,13 +46,13 @@ export default class RedisSubscriber { ); } } - + /** * @param args */ process(...args) { - this.processor.call(null, this.observableCollection, ...args); - } + this.processor.call(null, this.observableCollection, ...args); + } /** * @param event diff --git a/lib/redis/RedisSubscriptionManager.js b/lib/redis/RedisSubscriptionManager.js index 5ced6f27..2752882a 100644 --- a/lib/redis/RedisSubscriptionManager.js +++ b/lib/redis/RedisSubscriptionManager.js @@ -23,7 +23,7 @@ class RedisSubscriptionManager { * Returns all RedisSubscribers regardless of channel */ getAllRedisSubscribers() { - let redisSubscribers = []; + const redisSubscribers = []; for (channel in this.store) { this.store[channel].forEach(_redisSubscriber => redisSubscribers.push(_redisSubscriber) @@ -78,12 +78,22 @@ class RedisSubscriptionManager { initializeChannel(channel) { debug(`[RedisSubscriptionManager] Subscribing to channel: ${channel}`); + let redisEvents = []; // create the handler for this channel const self = this; - const handler = function(message) { + + // debounce redis events into 200 ms batches + const flushRedisEventsForChannel = _.debounce(() => { + const events = redisEvents.slice(); + redisEvents = []; self.queue.queueTask(() => { - self.process(channel, message, true); + this.process(channel, events, true); }); + }, Config.debounceInterval, false) + + const handler = (message) => { + redisEvents.push(message); + flushRedisEventsForChannel(); }; this.channelHandlers[channel] = handler; @@ -110,105 +120,92 @@ class RedisSubscriptionManager { /** * @param channel - * @param data - * @param [fromRedis=false] + * @param events + * @param fromRedis */ - process(channel, data, fromRedis) { - // messages from redis that contain our uid were handled - // optimistically, so we can drop them. - if (fromRedis && data[RedisPipe.UID] === this.uid) { - return; - } - + process(channel, events, fromRedis) { const subscribers = this.store[channel]; if (!subscribers) { return; } - let isSynthetic = data[RedisPipe.SYNTHETIC]; - - debug( - `[RedisSubscriptionManager] Received ${ - isSynthetic ? 'synthetic ' : '' - }event: "${data[RedisPipe.EVENT]}" to "${channel}"` - ); - - if (subscribers.length === 0) { - return; + // messages from redis that contain our uid were handled + // optimistically, so we can drop them. + let filteredEvents = events.filter(event => !event[RedisPipe.SYNTHETIC]); + const syntheticEvents = events.filter(event => event[RedisPipe.SYNTHETIC]); + if (fromRedis) { + filteredEvents = filteredEvents.filter(event => event[RedisPipe.UID] !== this.uid); } - if (!isSynthetic) { - const collection = subscribers[0].observableCollection.collection; - - let doc; - if (data[RedisPipe.EVENT] === Events.REMOVE) { - doc = data[RedisPipe.DOC]; - } else { - doc = this.getDoc(collection, subscribers, data); - } - // if by any chance it was deleted after it got dispatched - // doc will be undefined - if (!doc) { - return; - } + // TODO: HANDLE SYNTHETIC EVENTS + // determine the collection from the first observable collection + const collection = subscribers[0].observableCollection.collection; + const documentMap = this.getDocumentMapForEvents(collection, subscribers, filteredEvents); + if (filteredEvents.length) { subscribers.forEach(redisSubscriber => { try { - redisSubscriber.process( - data[RedisPipe.EVENT], - doc, - data[RedisPipe.FIELDS] - ); + redisSubscriber.process(filteredEvents, documentMap); } catch (e) { debug( `[RedisSubscriptionManager] Exception while processing event: ${e.toString()}` ); } }); - } else { - subscribers.forEach(redisSubscriber => { - try { - redisSubscriber.processSynthetic( - data[RedisPipe.EVENT], - data[RedisPipe.DOC], - data[RedisPipe.MODIFIER], - data[RedisPipe.MODIFIED_TOP_LEVEL_FIELDS] - ); - } catch (e) { - debug( - `[RedisSubscriptionManager] Exception while processing synthetic event: ${e.toString()}` - ); - } - }); + } + if (syntheticEvents.length) { + // TODO: process synthetic events in bulk + syntheticEvents.forEach(data => { + subscribers.forEach(redisSubscriber => { + try { + redisSubscriber.processSynthetic( + data[RedisPipe.EVENT], + data[RedisPipe.DOC], + data[RedisPipe.MODIFIER], + data[RedisPipe.MODIFIED_TOP_LEVEL_FIELDS] + ); + } catch (e) { + debug( + `[RedisSubscriptionManager] Exception while processing synthetic event: ${e.toString()}` + ); + } + }); + }) } } - /** - * @param collection - * @param subscribers - * @param data - */ - getDoc(collection, subscribers, data) { - const event = data[RedisPipe.EVENT]; - let doc = data[RedisPipe.DOC]; - - if (collection._redisOplog && !collection._redisOplog.protectAgainstRaceConditions) { - // If there's no protection against race conditions - // It means we have received the full doc in doc - - return doc; + getDocumentMapForEvents(collection, subscribers, events) { + const documentMap = {}; + const options = {}; + const fieldsOfInterest = getFieldsOfInterestFromAll(subscribers); + if (fieldsOfInterest !== true) { + options.fields = fieldsOfInterest; } - const fieldsOfInterest = getFieldsOfInterestFromAll(subscribers); + const docIdsToFetch = []; + events.forEach(event => { + const doc = event[RedisPipe.DOC]; + if (collection._redisOplog && !collection._redisOplog.protectAgainstRaceConditions) { + // If there's no protection against race conditions + // It means we have received the full doc in doc + documentMap[doc._id] = doc; + } + // no need to fetch full documents for remove event + else if (event[RedisPipe.EVENT] === Events.REMOVE) { + documentMap[doc._id] = doc; + } else { + docIdsToFetch.push(doc._id); + } + }); - if (fieldsOfInterest === true) { - doc = collection.findOne(doc._id); - } else { - doc = collection.findOne(doc._id, { fields: fieldsOfInterest }); + if (docIdsToFetch.length) { + collection.find({ _id: { $in: docIdsToFetch } }, options).fetch().forEach(doc => { + documentMap[doc._id] = doc; + }); } - return doc; + return documentMap; } } From a57fa530c7c4e9786f7016e0b0cc7820b156603e Mon Sep 17 00:00:00 2001 From: Eric Maciel Date: Thu, 19 Nov 2020 15:34:32 -0500 Subject: [PATCH 02/12] Code cleanup --- lib/processors/index.js | 2 +- lib/redis/RedisSubscriber.js | 6 +++--- lib/redis/RedisSubscriptionManager.js | 4 +--- 3 files changed, 5 insertions(+), 7 deletions(-) diff --git a/lib/processors/index.js b/lib/processors/index.js index 3f5494b9..34805acc 100644 --- a/lib/processors/index.js +++ b/lib/processors/index.js @@ -14,7 +14,7 @@ const StrategyProcessorMap = { export { getStrategy } /** - * @param strategy + * @param strategy * @returns {*} */ export function getProcessor(strategy) { diff --git a/lib/redis/RedisSubscriber.js b/lib/redis/RedisSubscriber.js index 09a90375..e2791ea7 100644 --- a/lib/redis/RedisSubscriber.js +++ b/lib/redis/RedisSubscriber.js @@ -46,13 +46,13 @@ export default class RedisSubscriber { ); } } - + /** * @param args */ process(...args) { - this.processor.call(null, this.observableCollection, ...args); - } + this.processor.call(null, this.observableCollection, ...args); + } /** * @param event diff --git a/lib/redis/RedisSubscriptionManager.js b/lib/redis/RedisSubscriptionManager.js index 2752882a..e44f0048 100644 --- a/lib/redis/RedisSubscriptionManager.js +++ b/lib/redis/RedisSubscriptionManager.js @@ -87,7 +87,7 @@ class RedisSubscriptionManager { const events = redisEvents.slice(); redisEvents = []; self.queue.queueTask(() => { - this.process(channel, events, true); + self.process(channel, events, true); }); }, Config.debounceInterval, false) @@ -137,8 +137,6 @@ class RedisSubscriptionManager { filteredEvents = filteredEvents.filter(event => event[RedisPipe.UID] !== this.uid); } - - // TODO: HANDLE SYNTHETIC EVENTS // determine the collection from the first observable collection const collection = subscribers[0].observableCollection.collection; const documentMap = this.getDocumentMapForEvents(collection, subscribers, filteredEvents); From 95fd06af1df190e36b5c84fce8412b42f32bf2bb Mon Sep 17 00:00:00 2001 From: Eric Maciel Date: Thu, 19 Nov 2020 17:58:45 -0500 Subject: [PATCH 03/12] Add comments and fix formatting --- lib/processors/actions/requery.js | 6 ++++ lib/processors/default.js | 36 +++++++++---------- lib/processors/index.js | 2 +- lib/redis/RedisSubscriber.js | 21 +++++++---- lib/redis/RedisSubscriptionManager.js | 51 ++++++++++++++++++--------- 5 files changed, 75 insertions(+), 41 deletions(-) diff --git a/lib/processors/actions/requery.js b/lib/processors/actions/requery.js index d57fcc85..27b77eca 100644 --- a/lib/processors/actions/requery.js +++ b/lib/processors/actions/requery.js @@ -13,14 +13,20 @@ export default function (observableCollection, documentMap) { freshIds.forEach(doc => newStore.set(doc._id, doc)); store.compareWith(newStore, { + // Any documents found only on the left store + // should be removed leftOnly(docId) { observableCollection.remove(docId); }, + // Any documents found in both and with documentMap entries + // have received redis updates indicating there are changes both(docId) { if (documentMap[docId]) { observableCollection.change(documentMap[docId]) } }, + // Any documents only present in the right store are newly + // added rightOnly(docId) { if (documentMap[docId]) { observableCollection.add(documentMap[docId]); diff --git a/lib/processors/default.js b/lib/processors/default.js index 40c53810..dbb75428 100644 --- a/lib/processors/default.js +++ b/lib/processors/default.js @@ -8,12 +8,12 @@ import requery from './actions/requery'; * @param doc */ const handleInsert = (observableCollection, doc) => { - if ( - !observableCollection.contains(doc._id) && - observableCollection.isEligible(doc) - ) { - observableCollection.add(doc); - } + if ( + !observableCollection.contains(doc._id) && + observableCollection.isEligible(doc) + ) { + observableCollection.add(doc); + } }; /** @@ -22,15 +22,15 @@ const handleInsert = (observableCollection, doc) => { * @param modifiedFields */ const handleUpdate = (observableCollection, doc, modifiedFields) => { - if (observableCollection.isEligible(doc)) { - if (observableCollection.contains(doc._id)) { - observableCollection.change(doc, modifiedFields); - } else { - observableCollection.add(doc); - } - } else if (observableCollection.contains(doc._id)) { - observableCollection.remove(doc._id); - } + if (observableCollection.isEligible(doc)) { + if (observableCollection.contains(doc._id)) { + observableCollection.change(doc, modifiedFields); + } else { + observableCollection.add(doc); + } + } else if (observableCollection.contains(doc._id)) { + observableCollection.remove(doc._id); + } }; /** @@ -38,9 +38,9 @@ const handleUpdate = (observableCollection, doc, modifiedFields) => { * @param doc */ const handleRemove = (observableCollection, doc) => { - if (observableCollection.contains(doc._id)) { - observableCollection.remove(doc._id); - } + if (observableCollection.contains(doc._id)) { + observableCollection.remove(doc._id); + } }; /** diff --git a/lib/processors/index.js b/lib/processors/index.js index 34805acc..22759bcd 100644 --- a/lib/processors/index.js +++ b/lib/processors/index.js @@ -14,7 +14,7 @@ const StrategyProcessorMap = { export { getStrategy } /** - * @param strategy + * @param {String} strategy * @returns {*} */ export function getProcessor(strategy) { diff --git a/lib/redis/RedisSubscriber.js b/lib/redis/RedisSubscriber.js index e2791ea7..2fc130c4 100644 --- a/lib/redis/RedisSubscriber.js +++ b/lib/redis/RedisSubscriber.js @@ -1,6 +1,6 @@ -import { Strategy } from '../constants'; +/* globals Kadira */ +import RedisPipe, { Strategy } from '../constants'; import { getProcessor } from '../processors'; -import { _ } from 'meteor/underscore'; import { Meteor } from 'meteor/meteor'; import extractIdsFromSelector from '../utils/extractIdsFromSelector'; import RedisSubscriptionManager from './RedisSubscriptionManager'; @@ -46,12 +46,21 @@ export default class RedisSubscriber { ); } } - + /** - * @param args + * @param events + * @param documentMap */ - process(...args) { - this.processor.call(null, this.observableCollection, ...args); + process(events, documentMap) { + this.processor.call(null, this.observableCollection, events, documentMap); + if (events.length && Kadira) { + events.forEach(event => { + const op = event[RedisPipe.EVENT]; + if (op) { + Kadira.models.pubsub.trackDocumentChanges(this.observableCollection.__driver__._ownerInfo, { op }); + } + }) + } } /** diff --git a/lib/redis/RedisSubscriptionManager.js b/lib/redis/RedisSubscriptionManager.js index e44f0048..5e03f389 100644 --- a/lib/redis/RedisSubscriptionManager.js +++ b/lib/redis/RedisSubscriptionManager.js @@ -79,10 +79,9 @@ class RedisSubscriptionManager { debug(`[RedisSubscriptionManager] Subscribing to channel: ${channel}`); let redisEvents = []; - // create the handler for this channel const self = this; - // debounce redis events into 200 ms batches + // debounce redis events so that they are processed in bulk const flushRedisEventsForChannel = _.debounce(() => { const events = redisEvents.slice(); redisEvents = []; @@ -90,7 +89,7 @@ class RedisSubscriptionManager { self.process(channel, events, true); }); }, Config.debounceInterval, false) - + // create the handler for this channel const handler = (message) => { redisEvents.push(message); flushRedisEventsForChannel(); @@ -119,9 +118,9 @@ class RedisSubscriptionManager { } /** - * @param channel - * @param events - * @param fromRedis + * @param channel + * @param events + * @param fromRedis */ process(channel, events, fromRedis) { const subscribers = this.store[channel]; @@ -129,18 +128,25 @@ class RedisSubscriptionManager { return; } - // messages from redis that contain our uid were handled - // optimistically, so we can drop them. - let filteredEvents = events.filter(event => !event[RedisPipe.SYNTHETIC]); - const syntheticEvents = events.filter(event => event[RedisPipe.SYNTHETIC]); - if (fromRedis) { - filteredEvents = filteredEvents.filter(event => event[RedisPipe.UID] !== this.uid); - } + const filteredEvents = []; + const syntheticEvents = []; + events.forEach(event => { + // Ignore any updates that have been processed optimistically + if (fromRedis && event[RedisPipe.UID] === this.uid) return; - // determine the collection from the first observable collection + const isSynthetic = !!event[RedisPipe.SYNTHETIC]; + if (isSynthetic) { + syntheticEvents.push(event); + } else { + filteredEvents.push(event); + } + }); + + // Determine the collection from the first observable collection const collection = subscribers[0].observableCollection.collection; const documentMap = this.getDocumentMapForEvents(collection, subscribers, filteredEvents); + // Process filtered events in bulk if (filteredEvents.length) { subscribers.forEach(redisSubscriber => { try { @@ -152,8 +158,10 @@ class RedisSubscriptionManager { } }); } + + // Individually process synthetic events + // TODO: process synthetic events in bulk if (syntheticEvents.length) { - // TODO: process synthetic events in bulk syntheticEvents.forEach(data => { subscribers.forEach(redisSubscriber => { try { @@ -173,9 +181,18 @@ class RedisSubscriptionManager { } } + /** + * Build a documentMap for the docIds in the redis events + * @param collection + * @param subscribers + * @param events + */ getDocumentMapForEvents(collection, subscribers, events) { const documentMap = {}; const options = {}; + + // Calculate fields of interest across all subscribers and add the + // appropriate field limiting if necessary const fieldsOfInterest = getFieldsOfInterestFromAll(subscribers); if (fieldsOfInterest !== true) { options.fields = fieldsOfInterest; @@ -189,7 +206,7 @@ class RedisSubscriptionManager { // It means we have received the full doc in doc documentMap[doc._id] = doc; } - // no need to fetch full documents for remove event + // no need to fetch full documents for the remove event else if (event[RedisPipe.EVENT] === Events.REMOVE) { documentMap[doc._id] = doc; } else { @@ -197,6 +214,8 @@ class RedisSubscriptionManager { } }); + // Execute a single bulk fetch for all docIds that need to be fetched and store them in + // the document map if (docIdsToFetch.length) { collection.find({ _id: { $in: docIdsToFetch } }, options).fetch().forEach(doc => { documentMap[doc._id] = doc; From dff86b8b92e2e687ba3bacce46e8012859606fbe Mon Sep 17 00:00:00 2001 From: Eric Maciel Date: Thu, 19 Nov 2020 18:02:25 -0500 Subject: [PATCH 04/12] Formatting cleanup --- lib/processors/index.js | 2 +- lib/redis/RedisSubscriber.js | 169 ++++++++++++-------------- lib/redis/RedisSubscriptionManager.js | 4 +- 3 files changed, 81 insertions(+), 94 deletions(-) diff --git a/lib/processors/index.js b/lib/processors/index.js index 22759bcd..309d05bb 100644 --- a/lib/processors/index.js +++ b/lib/processors/index.js @@ -14,7 +14,7 @@ const StrategyProcessorMap = { export { getStrategy } /** - * @param {String} strategy + * @param {String} strategy * @returns {*} */ export function getProcessor(strategy) { diff --git a/lib/redis/RedisSubscriber.js b/lib/redis/RedisSubscriber.js index 2fc130c4..0100cf8a 100644 --- a/lib/redis/RedisSubscriber.js +++ b/lib/redis/RedisSubscriber.js @@ -1,104 +1,91 @@ -/* globals Kadira */ -import RedisPipe, { Strategy } from '../constants'; -import { getProcessor } from '../processors'; -import { Meteor } from 'meteor/meteor'; -import extractIdsFromSelector from '../utils/extractIdsFromSelector'; -import RedisSubscriptionManager from './RedisSubscriptionManager'; -import syntheticProcessor from '../processors/synthetic'; -import getDedicatedChannel from '../utils/getDedicatedChannel'; +import RedisPipe, { Strategy } from "../constants"; +import { getProcessor } from "../processors"; +import { Meteor } from "meteor/meteor"; +import extractIdsFromSelector from "../utils/extractIdsFromSelector"; +import RedisSubscriptionManager from "./RedisSubscriptionManager"; +import syntheticProcessor from "../processors/synthetic"; +import getDedicatedChannel from "../utils/getDedicatedChannel"; export default class RedisSubscriber { - /** - * @param observableCollection - * @param strategy - */ - constructor(observableCollection, strategy) { - this.observableCollection = observableCollection; - this.strategy = strategy; - this.processor = getProcessor(strategy); + /** + * @param observableCollection + * @param strategy + */ + constructor(observableCollection, strategy) { + this.observableCollection = observableCollection; + this.strategy = strategy; + this.processor = getProcessor(strategy); - // We do this because we override the behavior of dedicated "_id" channels - this.channels = this.getChannels(this.observableCollection.channels); + // We do this because we override the behavior of dedicated "_id" channels + this.channels = this.getChannels(this.observableCollection.channels); - RedisSubscriptionManager.attach(this); - } + RedisSubscriptionManager.attach(this); + } - /** - * @param channels - * @returns {*} - */ - getChannels(channels) { - const collectionName = this.observableCollection.collectionName; + /** + * @param channels + * @returns {*} + */ + getChannels(channels) { + const collectionName = this.observableCollection.collectionName; - switch (this.strategy) { - case Strategy.DEFAULT: - case Strategy.LIMIT_SORT: - return channels; - case Strategy.DEDICATED_CHANNELS: - const ids = extractIdsFromSelector( - this.observableCollection.selector - ); + switch (this.strategy) { + case Strategy.DEFAULT: + case Strategy.LIMIT_SORT: + return channels; + case Strategy.DEDICATED_CHANNELS: + const ids = extractIdsFromSelector(this.observableCollection.selector); - return ids.map(id => getDedicatedChannel(collectionName, id)); - default: - throw new Meteor.Error( - `Strategy could not be found: ${this.strategy}` - ); - } - } - - /** - * @param events - * @param documentMap - */ - process(events, documentMap) { - this.processor.call(null, this.observableCollection, events, documentMap); - if (events.length && Kadira) { - events.forEach(event => { - const op = event[RedisPipe.EVENT]; - if (op) { - Kadira.models.pubsub.trackDocumentChanges(this.observableCollection.__driver__._ownerInfo, { op }); - } - }) - } + return ids.map((id) => getDedicatedChannel(collectionName, id)); + default: + throw new Meteor.Error(`Strategy could not be found: ${this.strategy}`); } + } - /** - * @param event - * @param doc - * @param modifier - * @param modifiedTopLevelFields - */ - processSynthetic(event, doc, modifier, modifiedTopLevelFields) { - syntheticProcessor( - this.observableCollection, - event, - doc, - modifier, - modifiedTopLevelFields - ); - } + /** + * @param events + * @param documentMap + */ + process(events, documentMap) { + this.processor.call(null, this.observableCollection, events, documentMap); + } - /** - * Detaches from RedisSubscriptionManager - */ - stop() { - try { - RedisSubscriptionManager.detach(this); - } catch (e) { - console.warn( - `[RedisSubscriber] Weird! There was an error while stopping the publication: `, - e - ); - } - } + /** + * @param event + * @param doc + * @param modifier + * @param modifiedTopLevelFields + */ + processSynthetic(event, doc, modifier, modifiedTopLevelFields) { + syntheticProcessor( + this.observableCollection, + event, + doc, + modifier, + modifiedTopLevelFields + ); + } - /** - * Retrieves the fields that are used for matching the validity of the document - * - * @returns {array} - */ - getFieldsOfInterest() { - return this.observableCollection.fieldsOfInterest; + /** + * Detaches from RedisSubscriptionManager + */ + stop() { + try { + RedisSubscriptionManager.detach(this); + } catch (e) { + console.warn( + `[RedisSubscriber] Weird! There was an error while stopping the publication: `, + e + ); } + } + + /** + * Retrieves the fields that are used for matching the validity of the document + * + * @returns {array} + */ + getFieldsOfInterest() { + return this.observableCollection.fieldsOfInterest; + } } diff --git a/lib/redis/RedisSubscriptionManager.js b/lib/redis/RedisSubscriptionManager.js index 5e03f389..45ad5691 100644 --- a/lib/redis/RedisSubscriptionManager.js +++ b/lib/redis/RedisSubscriptionManager.js @@ -118,9 +118,9 @@ class RedisSubscriptionManager { } /** - * @param channel + * @param channel * @param events - * @param fromRedis + * @param fromRedis */ process(channel, events, fromRedis) { const subscribers = this.store[channel]; From 9455d41067d5c117f39c9efb4c50a042591ed1c2 Mon Sep 17 00:00:00 2001 From: Eric Maciel Date: Fri, 20 Nov 2020 15:05:54 -0500 Subject: [PATCH 05/12] Additional code cleanup --- lib/processors/direct.js | 2 +- lib/processors/limit-sort.js | 2 +- lib/redis/RedisSubscriptionManager.js | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/lib/processors/direct.js b/lib/processors/direct.js index ec62f451..012294e5 100644 --- a/lib/processors/direct.js +++ b/lib/processors/direct.js @@ -57,7 +57,7 @@ const handleRemove = (observableCollection, doc) => { * @param documentMap */ export default function(observableCollection, events, documentMap) { - const needsRequery = events.length > Config.maxRedisEventsToProcess;; + const needsRequery = events.length > Config.maxRedisEventsToProcess; if (needsRequery) { requery(observableCollection, documentMap); diff --git a/lib/processors/limit-sort.js b/lib/processors/limit-sort.js index 99f8deb2..6705827a 100644 --- a/lib/processors/limit-sort.js +++ b/lib/processors/limit-sort.js @@ -63,7 +63,7 @@ const handleRemove = (observableCollection, doc) => { * @param modifiedFields */ export default function(observableCollection, events, documentMap) { - let needsRequery = events.length > Config.maxRedisEventsToProcess;; + let needsRequery = events.length > Config.maxRedisEventsToProcess; if (!needsRequery) { for (let i = 0; i < events.length; i++) { diff --git a/lib/redis/RedisSubscriptionManager.js b/lib/redis/RedisSubscriptionManager.js index 45ad5691..db1fbdfe 100644 --- a/lib/redis/RedisSubscriptionManager.js +++ b/lib/redis/RedisSubscriptionManager.js @@ -217,7 +217,7 @@ class RedisSubscriptionManager { // Execute a single bulk fetch for all docIds that need to be fetched and store them in // the document map if (docIdsToFetch.length) { - collection.find({ _id: { $in: docIdsToFetch } }, options).fetch().forEach(doc => { + collection.find({ _id: { $in: docIdsToFetch } }, options).forEach(doc => { documentMap[doc._id] = doc; }); } From 3bfa275979f732a53487171433d19cfc5ff38e16 Mon Sep 17 00:00:00 2001 From: Eric Maciel Date: Fri, 20 Nov 2020 15:50:45 -0500 Subject: [PATCH 06/12] Use legacy mocha to avoid promise and done resolution errors --- package.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/package.js b/package.js index d447275b..375623fc 100644 --- a/package.js +++ b/package.js @@ -53,7 +53,7 @@ Package.onTest(function(api) { api.use('matb33:collection-hooks@0.8.4'); api.use('alanning:roles@1.2.16'); - api.use(['meteortesting:mocha']); + api.use(['meteortesting:mocha@1.0.0', 'meteortesting:mocha-core@1.0.1']); api.mainModule('testing/main.server.js', 'server'); api.addFiles('testing/publishComposite/boot.js', 'server'); From ca8b6037a150a4e8104fde5a4d192353fc57e829 Mon Sep 17 00:00:00 2001 From: Eric Maciel Date: Mon, 23 Nov 2020 09:39:06 -0500 Subject: [PATCH 07/12] Fix indentation --- lib/mongo/lib/dispatchers.js | 7 +- lib/processors/default.js | 22 ++--- lib/processors/direct.js | 80 +++++++++--------- lib/processors/limit-sort.js | 22 ++--- lib/redis/RedisSubscriber.js | 157 ++++++++++++++++++----------------- 5 files changed, 144 insertions(+), 144 deletions(-) diff --git a/lib/mongo/lib/dispatchers.js b/lib/mongo/lib/dispatchers.js index 43fa11a8..eed839d8 100644 --- a/lib/mongo/lib/dispatchers.js +++ b/lib/mongo/lib/dispatchers.js @@ -13,11 +13,8 @@ const dispatchEvents = function dispatchEventsFn(optimistic, collectionName, cha channels.forEach(channel => RedisSubscriptionManager.process(channel, events)); events.forEach(event => { - const docId = event[RedisPipe.DOC]._id; - const dedicatedChannel = getDedicatedChannel( - collectionName, - docId - ); + const docId = event[RedisPipe.DOC]._id; + const dedicatedChannel = getDedicatedChannel(collectionName, docId); RedisSubscriptionManager.process(dedicatedChannel, [event]); }); }); diff --git a/lib/processors/default.js b/lib/processors/default.js index dbb75428..4d9f5889 100644 --- a/lib/processors/default.js +++ b/lib/processors/default.js @@ -63,17 +63,17 @@ export default function(observableCollection, events, documentMap) { const doc = documentMap[docId]; switch (event[RedisPipe.EVENT]) { - case Events.INSERT: - handleInsert(observableCollection, doc); - break; - case Events.UPDATE: - handleUpdate(observableCollection, doc, modifiedFields); - break; - case Events.REMOVE: - handleRemove(observableCollection, doc); - break; - default: - throw new Meteor.Error(`Invalid event specified: ${event}`); + case Events.INSERT: + handleInsert(observableCollection, doc); + break; + case Events.UPDATE: + handleUpdate(observableCollection, doc, modifiedFields); + break; + case Events.REMOVE: + handleRemove(observableCollection, doc); + break; + default: + throw new Meteor.Error(`Invalid event specified: ${event}`); } } } diff --git a/lib/processors/direct.js b/lib/processors/direct.js index 012294e5..edc433df 100644 --- a/lib/processors/direct.js +++ b/lib/processors/direct.js @@ -23,23 +23,23 @@ const handleInsert = (observableCollection, doc) => { * @param modifiedFields */ const handleUpdate = (observableCollection, doc, modifiedFields) => { - const otherSelectors = observableCollection.__containsOtherSelectorsThanId; + const otherSelectors = observableCollection.__containsOtherSelectorsThanId; - if (otherSelectors) { - if (observableCollection.isEligible(doc)) { - if (observableCollection.contains(doc._id)) { - observableCollection.change(doc, modifiedFields); - } else { - observableCollection.add(doc); - } - } else if (observableCollection.contains(doc._id)) { - observableCollection.remove(doc._id); - } - } else if (observableCollection.contains(doc._id)) { - observableCollection.change(doc, modifiedFields); - } else { - observableCollection.add(doc); - } + if (otherSelectors) { + if (observableCollection.isEligible(doc)) { + if (observableCollection.contains(doc._id)) { + observableCollection.change(doc, modifiedFields); + } else { + observableCollection.add(doc); + } + } else if (observableCollection.contains(doc._id)) { + observableCollection.remove(doc._id); + } + } else if (observableCollection.contains(doc._id)) { + observableCollection.change(doc, modifiedFields); + } else { + observableCollection.add(doc); + } }; /** @@ -57,31 +57,31 @@ const handleRemove = (observableCollection, doc) => { * @param documentMap */ export default function(observableCollection, events, documentMap) { - const needsRequery = events.length > Config.maxRedisEventsToProcess; + const needsRequery = events.length > Config.maxRedisEventsToProcess; - if (needsRequery) { - requery(observableCollection, documentMap); - return; - } + if (needsRequery) { + requery(observableCollection, documentMap); + return; + } - for (let i = 0; i < events.length; i++) { - const event = events[i]; - const docId = event[RedisPipe.DOC]._id; - const modifiedFields = event[RedisPipe.FIELDS]; - const doc = documentMap[docId]; + for (let i = 0; i < events.length; i++) { + const event = events[i]; + const docId = event[RedisPipe.DOC]._id; + const modifiedFields = event[RedisPipe.FIELDS]; + const doc = documentMap[docId]; - switch (event[RedisPipe.EVENT]) { - case Events.INSERT: - handleInsert(observableCollection, doc); - break; - case Events.UPDATE: - handleUpdate(observableCollection, doc, modifiedFields); - break; - case Events.REMOVE: - handleRemove(observableCollection, doc); - break; - default: - throw new Meteor.Error(`Invalid event specified: ${event}`); - } - } + switch (event[RedisPipe.EVENT]) { + case Events.INSERT: + handleInsert(observableCollection, doc); + break; + case Events.UPDATE: + handleUpdate(observableCollection, doc, modifiedFields); + break; + case Events.REMOVE: + handleRemove(observableCollection, doc); + break; + default: + throw new Meteor.Error(`Invalid event specified: ${event}`); + } + } } \ No newline at end of file diff --git a/lib/processors/limit-sort.js b/lib/processors/limit-sort.js index 6705827a..da90e7fc 100644 --- a/lib/processors/limit-sort.js +++ b/lib/processors/limit-sort.js @@ -73,17 +73,17 @@ export default function(observableCollection, events, documentMap) { const doc = documentMap[docId]; switch (event[RedisPipe.EVENT]) { - case Events.INSERT: - needsRequery = handleInsert(observableCollection, doc); - break; - case Events.UPDATE: - needsRequery = handleUpdate(observableCollection, doc, modifiedFields); - break; - case Events.REMOVE: - needsRequery = handleRemove(observableCollection, doc); - break; - default: - throw new Meteor.Error(`Invalid event specified: ${event}`); + case Events.INSERT: + needsRequery = handleInsert(observableCollection, doc); + break; + case Events.UPDATE: + needsRequery = handleUpdate(observableCollection, doc, modifiedFields); + break; + case Events.REMOVE: + needsRequery = handleRemove(observableCollection, doc); + break; + default: + throw new Meteor.Error(`Invalid event specified: ${event}`); } if (needsRequery) { diff --git a/lib/redis/RedisSubscriber.js b/lib/redis/RedisSubscriber.js index 0100cf8a..8e10348f 100644 --- a/lib/redis/RedisSubscriber.js +++ b/lib/redis/RedisSubscriber.js @@ -1,91 +1,94 @@ -import RedisPipe, { Strategy } from "../constants"; -import { getProcessor } from "../processors"; -import { Meteor } from "meteor/meteor"; -import extractIdsFromSelector from "../utils/extractIdsFromSelector"; -import RedisSubscriptionManager from "./RedisSubscriptionManager"; -import syntheticProcessor from "../processors/synthetic"; -import getDedicatedChannel from "../utils/getDedicatedChannel"; +import { Strategy } from '../constants'; +import { getProcessor } from '../processors'; +import { Meteor } from 'meteor/meteor'; +import extractIdsFromSelector from '../utils/extractIdsFromSelector'; +import RedisSubscriptionManager from './RedisSubscriptionManager'; +import syntheticProcessor from '../processors/synthetic'; +import getDedicatedChannel from '../utils/getDedicatedChannel'; export default class RedisSubscriber { - /** - * @param observableCollection - * @param strategy - */ - constructor(observableCollection, strategy) { - this.observableCollection = observableCollection; - this.strategy = strategy; - this.processor = getProcessor(strategy); + /** + * @param observableCollection + * @param strategy + */ + constructor(observableCollection, strategy) { + this.observableCollection = observableCollection; + this.strategy = strategy; + this.processor = getProcessor(strategy); - // We do this because we override the behavior of dedicated "_id" channels - this.channels = this.getChannels(this.observableCollection.channels); + // We do this because we override the behavior of dedicated "_id" channels + this.channels = this.getChannels(this.observableCollection.channels); - RedisSubscriptionManager.attach(this); - } + RedisSubscriptionManager.attach(this); + } - /** - * @param channels - * @returns {*} - */ - getChannels(channels) { - const collectionName = this.observableCollection.collectionName; + /** + * @param channels + * @returns {*} + */ + getChannels(channels) { + const collectionName = this.observableCollection.collectionName; - switch (this.strategy) { - case Strategy.DEFAULT: - case Strategy.LIMIT_SORT: - return channels; - case Strategy.DEDICATED_CHANNELS: - const ids = extractIdsFromSelector(this.observableCollection.selector); + switch (this.strategy) { + case Strategy.DEFAULT: + case Strategy.LIMIT_SORT: + return channels; + case Strategy.DEDICATED_CHANNELS: + const ids = extractIdsFromSelector( + this.observableCollection.selector + ); - return ids.map((id) => getDedicatedChannel(collectionName, id)); - default: - throw new Meteor.Error(`Strategy could not be found: ${this.strategy}`); + return ids.map(id => getDedicatedChannel(collectionName, id)); + default: + throw new Meteor.Error( + `Strategy could not be found: ${this.strategy}` + ); + } } - } - /** - * @param events - * @param documentMap - */ - process(events, documentMap) { - this.processor.call(null, this.observableCollection, events, documentMap); - } + /** + * @param args + */ + process(...args) { + this.processor.call(null, this.observableCollection, ...args); + } - /** - * @param event - * @param doc - * @param modifier - * @param modifiedTopLevelFields - */ - processSynthetic(event, doc, modifier, modifiedTopLevelFields) { - syntheticProcessor( - this.observableCollection, - event, - doc, - modifier, - modifiedTopLevelFields - ); - } + /** + * @param event + * @param doc + * @param modifier + * @param modifiedTopLevelFields + */ + processSynthetic(event, doc, modifier, modifiedTopLevelFields) { + syntheticProcessor( + this.observableCollection, + event, + doc, + modifier, + modifiedTopLevelFields + ); + } - /** - * Detaches from RedisSubscriptionManager - */ - stop() { - try { - RedisSubscriptionManager.detach(this); - } catch (e) { - console.warn( - `[RedisSubscriber] Weird! There was an error while stopping the publication: `, - e - ); + /** + * Detaches from RedisSubscriptionManager + */ + stop() { + try { + RedisSubscriptionManager.detach(this); + } catch (e) { + console.warn( + `[RedisSubscriber] Weird! There was an error while stopping the publication: `, + e + ); + } } - } - /** - * Retrieves the fields that are used for matching the validity of the document - * - * @returns {array} - */ - getFieldsOfInterest() { - return this.observableCollection.fieldsOfInterest; - } + /** + * Retrieves the fields that are used for matching the validity of the document + * + * @returns {array} + */ + getFieldsOfInterest() { + return this.observableCollection.fieldsOfInterest; + } } From 887d35af0eda4782a4d34728edc2581134a3ed4b Mon Sep 17 00:00:00 2001 From: Eric Maciel Date: Wed, 2 Dec 2020 13:32:29 -0500 Subject: [PATCH 08/12] Add tests for batch processing config --- .npm/package/npm-shrinkwrap.json | 49 ++-------- CONTRIBUTING.md | 2 +- lib/config.js | 8 +- lib/mongo/lib/dispatchers.js | 5 +- lib/redis/RedisSubscriptionManager.js | 20 ++-- package.js | 2 +- testing/main.client.js | 1 + testing/payload_batching.client.js | 130 ++++++++++++++++++++++++++ 8 files changed, 161 insertions(+), 56 deletions(-) create mode 100644 testing/payload_batching.client.js diff --git a/.npm/package/npm-shrinkwrap.json b/.npm/package/npm-shrinkwrap.json index 9b02f5ba..e727ff6d 100644 --- a/.npm/package/npm-shrinkwrap.json +++ b/.npm/package/npm-shrinkwrap.json @@ -1,26 +1,6 @@ { "lockfileVersion": 1, "dependencies": { - "assertion-error": { - "version": "1.1.0", - "resolved": "https://registry.npmjs.org/assertion-error/-/assertion-error-1.1.0.tgz", - "integrity": "sha512-jgsaNduz+ndvGyFt3uSuWqvy4lCnIJiovtouQN5JZHOKCS2QuhEdbcQHFhVksz2N2U9hXJo8odG7ETyWlEeuDw==" - }, - "chai": { - "version": "4.2.0", - "resolved": "https://registry.npmjs.org/chai/-/chai-4.2.0.tgz", - "integrity": "sha512-XQU3bhBukrOsQCuwZndwGcCVQHyZi53fQ6Ys1Fym7E4olpIqqZZhhoFJoaKVvV17lWQoXYwgWN2nF5crA8J2jw==" - }, - "check-error": { - "version": "1.0.2", - "resolved": "https://registry.npmjs.org/check-error/-/check-error-1.0.2.tgz", - "integrity": "sha1-V00xLt2Iu13YkS6Sht1sCu1KrII=" - }, - "deep-eql": { - "version": "3.0.1", - "resolved": "https://registry.npmjs.org/deep-eql/-/deep-eql-3.0.1.tgz", - "integrity": "sha512-+QeIQyN5ZuO+3Uk5DYh6/1eKO0m0YmJFGNmFHGACpf1ClL1nmlV/p4gNgbl2pJGxgXb4faqo6UE+M5ACEMyVcw==" - }, "deep-extend": { "version": "0.5.0", "resolved": "https://registry.npmjs.org/deep-extend/-/deep-extend-0.5.0.tgz", @@ -31,20 +11,10 @@ "resolved": "https://registry.npmjs.org/double-ended-queue/-/double-ended-queue-2.1.0-0.tgz", "integrity": "sha1-ED01J/0xUo9AGIEwyEHv3XgmTlw=" }, - "get-func-name": { - "version": "2.0.0", - "resolved": "https://registry.npmjs.org/get-func-name/-/get-func-name-2.0.0.tgz", - "integrity": "sha1-6td0q+5y4gQJQzoGY2YCPdaIekE=" - }, - "lodash.clonedeep": { - "version": "4.5.0", - "resolved": "https://registry.npmjs.org/lodash.clonedeep/-/lodash.clonedeep-4.5.0.tgz", - "integrity": "sha1-4j8/nE+Pvd6HJSnBBxhXoIblzO8=" - }, - "pathval": { - "version": "1.1.0", - "resolved": "https://registry.npmjs.org/pathval/-/pathval-1.1.0.tgz", - "integrity": "sha1-uULm1L3mUwBe9rcTYd74cn0GReA=" + "lodash": { + "version": "4.17.15", + "resolved": "https://registry.npmjs.org/lodash/-/lodash-4.17.15.tgz", + "integrity": "sha512-8xOcRHvCjnocdS5cpwXQXVzmmh5e5+saE2QGoeQmbKmRS6J3VQppPOIt0MnmE+4xlZoumy0GPG0D0MVIQbNA1A==" }, "redis": { "version": "2.8.0", @@ -52,19 +22,14 @@ "integrity": "sha512-M1OkonEQwtRmZv4tEWF2VgpG0JWJ8Fv1PhlgT5+B+uNq2cA3Rt1Yt/ryoR+vQNOQcIEgdCdfH0jr3bDpihAw1A==" }, "redis-commands": { - "version": "1.5.0", - "resolved": "https://registry.npmjs.org/redis-commands/-/redis-commands-1.5.0.tgz", - "integrity": "sha512-6KxamqpZ468MeQC3bkWmCB1fp56XL64D4Kf0zJSwDZbVLLm7KFkoIcHrgRvQ+sk8dnhySs7+yBg94yIkAK7aJg==" + "version": "1.6.0", + "resolved": "https://registry.npmjs.org/redis-commands/-/redis-commands-1.6.0.tgz", + "integrity": "sha512-2jnZ0IkjZxvguITjFTrGiLyzQZcTvaw8DAaCXxZq/dsHXz7KfMQ3OUJy7Tz9vnRtZRVz6VRCPDvruvU8Ts44wQ==" }, "redis-parser": { "version": "2.6.0", "resolved": "https://registry.npmjs.org/redis-parser/-/redis-parser-2.6.0.tgz", "integrity": "sha1-Uu0J2srBCPGmMcB+m2mUHnoZUEs=" - }, - "type-detect": { - "version": "4.0.8", - "resolved": "https://registry.npmjs.org/type-detect/-/type-detect-4.0.8.tgz", - "integrity": "sha512-0fr/mIH1dlO+x7TlcMy+bIDqKPsw/70tVyeHW787goQjhmqaZe10uwLujubK9q9Lg6Fiho1KUKDYz0Z7k7g5/g==" } } } diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index a18b2a8b..970b2a70 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -30,7 +30,7 @@ It is also always helpful to have some context for your pull request. What was t ``` meteor create --release 1.8.1 --bare test cd test -meteor npm i --save puppeteer@1.18.1 simpl-schema +meteor npm i --save puppeteer@1.18.1 simpl-schema chai ``` ### Start Tests diff --git a/lib/config.js b/lib/config.js index 5ce6f9f2..7bd00be0 100644 --- a/lib/config.js +++ b/lib/config.js @@ -55,8 +55,12 @@ let Config = { }, }, }, - maxRedisEventsToProcess: 1000, - debounceInterval: 200, + // Max number of redis payloads we attempt to process before just triggering a requery + maxRedisEventsToProcess: 300, + // Debounce interval after which we flush the queued redis payloads + debounceInterval: 100, + // Maximum wait time for flushing queued redis payloads + maxDebounceWait: 3000 }; export default Config; diff --git a/lib/mongo/lib/dispatchers.js b/lib/mongo/lib/dispatchers.js index eed839d8..79de10c1 100644 --- a/lib/mongo/lib/dispatchers.js +++ b/lib/mongo/lib/dispatchers.js @@ -9,14 +9,13 @@ import OptimisticInvocation from '../OptimisticInvocation'; const dispatchEvents = function dispatchEventsFn(optimistic, collectionName, channels, events) { if (optimistic) { - OptimisticInvocation.withValue(true, () => { - channels.forEach(channel => RedisSubscriptionManager.process(channel, events)); - + OptimisticInvocation.withValue(true, () => { events.forEach(event => { const docId = event[RedisPipe.DOC]._id; const dedicatedChannel = getDedicatedChannel(collectionName, docId); RedisSubscriptionManager.process(dedicatedChannel, [event]); }); + channels.forEach(channel => RedisSubscriptionManager.process(channel, events)); }); } diff --git a/lib/redis/RedisSubscriptionManager.js b/lib/redis/RedisSubscriptionManager.js index db1fbdfe..73373f71 100644 --- a/lib/redis/RedisSubscriptionManager.js +++ b/lib/redis/RedisSubscriptionManager.js @@ -1,6 +1,7 @@ import { Meteor } from 'meteor/meteor'; import { Random } from 'meteor/random'; import { _ } from 'meteor/underscore'; +import debounce from 'lodash/debounce'; import debug from '../debug'; import { RedisPipe, Events } from '../constants'; import getFieldsOfInterestFromAll from './lib/getFieldsOfInterestFromAll'; @@ -82,13 +83,18 @@ class RedisSubscriptionManager { const self = this; // debounce redis events so that they are processed in bulk - const flushRedisEventsForChannel = _.debounce(() => { - const events = redisEvents.slice(); - redisEvents = []; - self.queue.queueTask(() => { - self.process(channel, events, true); - }); - }, Config.debounceInterval, false) + const flushRedisEventsForChannel = debounce( + () => { + const events = redisEvents.slice(); + redisEvents = []; + self.queue.queueTask(() => { + self.process(channel, events, true); + }); + }, + Config.debounceInterval, + { maxWait: Config.maxDebounceWait, trailing: true, leading: false } + ); + // create the handler for this channel const handler = (message) => { redisEvents.push(message); diff --git a/package.js b/package.js index 375623fc..38b79c57 100644 --- a/package.js +++ b/package.js @@ -13,7 +13,7 @@ Package.describe({ Npm.depends({ redis: '2.8.0', 'deep-extend': '0.5.0', - 'lodash.clonedeep': '4.5.0' + 'lodash': '4.17.15' }); Package.onUse(function(api) { diff --git a/testing/main.client.js b/testing/main.client.js index cbab9d3f..66648add 100644 --- a/testing/main.client.js +++ b/testing/main.client.js @@ -5,6 +5,7 @@ import './synthetic_mutators'; import './client_side_mutators'; import './publishComposite/client.test'; import './optimistic-ui/client.test'; +import './payload_batching.client'; // import './server-autorun/client'; import './transformations/client'; import './publish-counts/client'; diff --git a/testing/payload_batching.client.js b/testing/payload_batching.client.js new file mode 100644 index 00000000..96dcb1e1 --- /dev/null +++ b/testing/payload_batching.client.js @@ -0,0 +1,130 @@ +import { Random } from 'meteor/random'; +import { assert } from 'chai'; +import {Collections, config} from './boot'; +import helperGenerator from './lib/helpers'; + +const Collection = Collections['Standard']; + +describe.only('Redis Payload Batching', function () { + const { + update, + createSync, + subscribe, + waitForHandleToBeReady + } = helperGenerator(config['Standard'].suffix); + + it('Ensure config debounce interval works as expected part 1', async function(done) { + const docId = Random.id(); + let handle = subscribe({ _id: docId }); + + await createSync({ _id: docId, value: -1 }); + + await waitForHandleToBeReady(handle); + + let changes = 0; + const expectedChanges = 1; + Collection.find({ _id: docId }).observeChanges({ + changed() { + changes += 1; + + // ensure we don't receive more updates than expected + if (changes === expectedChanges) { + setTimeout(() => { + if (changes === expectedChanges) done(); + else throw new Error('Too many changes') + }, 200) + } + }, + }) + + // kick off several updates + for (let i = 0; i < 10; i++) { + update( + { _id: docId }, + { + $set: { + value: i, + }, + }, + { optimistic: false, pushToRedis: true } + ); + } + }); + + it('Ensure config debounce interval works as expected part 2', async function(done) { + const docId = Random.id(); + let handle = subscribe({ _id: docId }); + + await createSync({ _id: docId, value: -1 }); + + await waitForHandleToBeReady(handle); + + let changes = 0; + const expectedChanges = 3; + Collection.find({ _id: docId }).observeChanges({ + changed() { + changes += 1; + + // ensure we receive the expected number of change events + if (changes === expectedChanges) { + done(); + } + }, + }) + + // kick off several updates + for (let i = 0; i < expectedChanges; i++) { + update( + { _id: docId }, + { + $set: { + value: i, + }, + }, + { optimistic: false, pushToRedis: true } + ); + // wait till new debounce interval + await new Promise(resolve => setTimeout(resolve, 200)) + } + }); + + it('Ensure config max wait works as expected', async function(done) { + const docId = Random.id(); + let handle = subscribe({ _id: docId }); + + await createSync({ _id: docId, value: -1 }); + + await waitForHandleToBeReady(handle); + + let changes = 0; + const expectedChanges = 2; + Collection.find({ _id: docId }).observeChanges({ + changed() { + changes += 1; + + // ensure we don't receive more updates than expected + if (changes === expectedChanges) { + setTimeout(() => { + if (changes === expectedChanges) done(); + else throw new Error('Too many changes') + }, 200) + } + }, + }) + + // kick off several updates + for (let i = 0; i < 101; i++) { + update( + { _id: docId }, + { + $set: { + value: i, + }, + }, + { optimistic: false, pushToRedis: true } + ); + // wait till new debounce interval + await new Promise(resolve => setTimeout(resolve, 30)) + } + }).timeout(5000); +}); \ No newline at end of file From 75dfd458870165d2aa9b98e0b28c273dc5e1742f Mon Sep 17 00:00:00 2001 From: Eric Maciel Date: Thu, 3 Dec 2020 13:15:28 -0500 Subject: [PATCH 09/12] Cleanup test file --- testing/payload_batching.client.js | 38 +++++++++++++++++++++--------- 1 file changed, 27 insertions(+), 11 deletions(-) diff --git a/testing/payload_batching.client.js b/testing/payload_batching.client.js index 96dcb1e1..e5fb00fa 100644 --- a/testing/payload_batching.client.js +++ b/testing/payload_batching.client.js @@ -2,10 +2,11 @@ import { Random } from 'meteor/random'; import { assert } from 'chai'; import {Collections, config} from './boot'; import helperGenerator from './lib/helpers'; +import Config from '../lib/config'; const Collection = Collections['Standard']; -describe.only('Redis Payload Batching', function () { +describe('Redis Payload Batching', function () { const { update, createSync, @@ -13,7 +14,7 @@ describe.only('Redis Payload Batching', function () { waitForHandleToBeReady } = helperGenerator(config['Standard'].suffix); - it('Ensure config debounce interval works as expected part 1', async function(done) { + it('Should process all the updates at once since we are not waiting more than the debounceInterval between updates', async function(done) { const docId = Random.id(); let handle = subscribe({ _id: docId }); @@ -51,16 +52,23 @@ describe.only('Redis Payload Batching', function () { } }); - it('Ensure config debounce interval works as expected part 2', async function(done) { + it('Should correctly process each update separately since we are waiting longer than the debounce interval between updates', async function(done) { const docId = Random.id(); let handle = subscribe({ _id: docId }); + // We wait twice the debounce interval to ensure that any payloads that were received + // and debounced by the server would have been processed + const sleepInterval = 2 * Config.debounceInterval; + // Execute multiple updates to confirm that those updates are not being batched + const numUpdates = 3; + // Since we are sleeping more than the debounce interval we expect our total number + // of changes received from the server to equal the number of updates + const expectedChanges = numUpdates; await createSync({ _id: docId, value: -1 }); await waitForHandleToBeReady(handle); let changes = 0; - const expectedChanges = 3; Collection.find({ _id: docId }).observeChanges({ changed() { changes += 1; @@ -73,7 +81,7 @@ describe.only('Redis Payload Batching', function () { }) // kick off several updates - for (let i = 0; i < expectedChanges; i++) { + for (let i = 0; i < numUpdates; i++) { update( { _id: docId }, { @@ -84,20 +92,28 @@ describe.only('Redis Payload Batching', function () { { optimistic: false, pushToRedis: true } ); // wait till new debounce interval - await new Promise(resolve => setTimeout(resolve, 200)) + await new Promise(resolve => setTimeout(resolve, SLEEP_INTERVAL)) } }); - it('Ensure config max wait works as expected', async function(done) { + it('Should correctly use maxWait to batch changes if we exceed the first debounce window', async function(done) { const docId = Random.id(); let handle = subscribe({ _id: docId }); - + // We set a short sleep interval here because we want to process more than one + // update in the same batch + const sleepInterval = 30; + // We execute 101 updates here so that the total execution time here is 30ms * 101 updates = 3030ms + // This ensures that our final update happens after the maxWait and should be processed in two batches + const numUpdates = 101; + // Since we should see our updates processed in two batches, we expect to receive only two changed events + // from the server + const expectedChanges = 2; + await createSync({ _id: docId, value: -1 }); await waitForHandleToBeReady(handle); let changes = 0; - const expectedChanges = 2; Collection.find({ _id: docId }).observeChanges({ changed() { changes += 1; @@ -113,7 +129,7 @@ describe.only('Redis Payload Batching', function () { }) // kick off several updates - for (let i = 0; i < 101; i++) { + for (let i = 0; i < numUpdates; i++) { update( { _id: docId }, { @@ -124,7 +140,7 @@ describe.only('Redis Payload Batching', function () { { optimistic: false, pushToRedis: true } ); // wait till new debounce interval - await new Promise(resolve => setTimeout(resolve, 30)) + await new Promise(resolve => setTimeout(resolve, sleepInterval)) } }).timeout(5000); }); \ No newline at end of file From f2224525c3664e85bf9901a7e23a9483004a7351 Mon Sep 17 00:00:00 2001 From: Eric Maciel Date: Thu, 3 Dec 2020 13:17:41 -0500 Subject: [PATCH 10/12] Remove unused assert --- testing/payload_batching.client.js | 1 - 1 file changed, 1 deletion(-) diff --git a/testing/payload_batching.client.js b/testing/payload_batching.client.js index e5fb00fa..b18fd9d8 100644 --- a/testing/payload_batching.client.js +++ b/testing/payload_batching.client.js @@ -1,5 +1,4 @@ import { Random } from 'meteor/random'; -import { assert } from 'chai'; import {Collections, config} from './boot'; import helperGenerator from './lib/helpers'; import Config from '../lib/config'; From b677e471d338e3da4a19b610a911f7eec49c1abb Mon Sep 17 00:00:00 2001 From: Eric Maciel Date: Thu, 3 Dec 2020 13:19:09 -0500 Subject: [PATCH 11/12] Add semi colons --- testing/payload_batching.client.js | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/testing/payload_batching.client.js b/testing/payload_batching.client.js index b18fd9d8..0ea78317 100644 --- a/testing/payload_batching.client.js +++ b/testing/payload_batching.client.js @@ -32,10 +32,10 @@ describe('Redis Payload Batching', function () { setTimeout(() => { if (changes === expectedChanges) done(); else throw new Error('Too many changes') - }, 200) + }, 200); } }, - }) + }); // kick off several updates for (let i = 0; i < 10; i++) { @@ -77,7 +77,7 @@ describe('Redis Payload Batching', function () { done(); } }, - }) + }); // kick off several updates for (let i = 0; i < numUpdates; i++) { @@ -91,7 +91,7 @@ describe('Redis Payload Batching', function () { { optimistic: false, pushToRedis: true } ); // wait till new debounce interval - await new Promise(resolve => setTimeout(resolve, SLEEP_INTERVAL)) + await new Promise(resolve => setTimeout(resolve, SLEEP_INTERVAL)); } }); @@ -122,7 +122,7 @@ describe('Redis Payload Batching', function () { setTimeout(() => { if (changes === expectedChanges) done(); else throw new Error('Too many changes') - }, 200) + }, 200); } }, }) @@ -139,7 +139,7 @@ describe('Redis Payload Batching', function () { { optimistic: false, pushToRedis: true } ); // wait till new debounce interval - await new Promise(resolve => setTimeout(resolve, sleepInterval)) + await new Promise(resolve => setTimeout(resolve, sleepInterval)); } }).timeout(5000); }); \ No newline at end of file From 2f2d5931e6f30671982e028f0ef5b7e274c07519 Mon Sep 17 00:00:00 2001 From: Eric Maciel Date: Thu, 3 Dec 2020 13:25:46 -0500 Subject: [PATCH 12/12] Fix test case constant --- testing/payload_batching.client.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/testing/payload_batching.client.js b/testing/payload_batching.client.js index 0ea78317..29e366bb 100644 --- a/testing/payload_batching.client.js +++ b/testing/payload_batching.client.js @@ -91,7 +91,7 @@ describe('Redis Payload Batching', function () { { optimistic: false, pushToRedis: true } ); // wait till new debounce interval - await new Promise(resolve => setTimeout(resolve, SLEEP_INTERVAL)); + await new Promise(resolve => setTimeout(resolve, sleepInterval)); } });