Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/bulk redis payload processing #368

Open
wants to merge 13 commits into
base: master
Choose a base branch
from
49 changes: 7 additions & 42 deletions .npm/package/npm-shrinkwrap.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ It is also always helpful to have some context for your pull request. What was t
```
meteor create --release 1.8.1 --bare test
cd test
meteor npm i --save puppeteer@1.18.1 simpl-schema
meteor npm i --save puppeteer@1.18.1 simpl-schema chai
```

### Start Tests
Expand Down
6 changes: 6 additions & 0 deletions lib/config.js
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,12 @@ let Config = {
},
},
},
// Max number of redis payloads we attempt to process before just triggering a requery
maxRedisEventsToProcess: 300,
// Debounce interval after which we flush the queued redis payloads
debounceInterval: 100,
// Maximum wait time for flushing queued redis payloads
maxDebounceWait: 3000
};

export default Config;
17 changes: 5 additions & 12 deletions lib/mongo/lib/dispatchers.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import { Meteor } from 'meteor/meteor';
import { DDPServer } from 'meteor/ddp-server';
import { EJSON } from 'meteor/ejson';
import { Events, RedisPipe } from '../../constants';
import RedisSubscriptionManager from '../../redis/RedisSubscriptionManager';
Expand All @@ -8,21 +7,15 @@ import getDedicatedChannel from '../../utils/getDedicatedChannel';
import Config from '../../config';
import OptimisticInvocation from '../OptimisticInvocation';

const dispatchEvents = function(optimistic, collectionName, channels, events) {
const dispatchEvents = function dispatchEventsFn(optimistic, collectionName, channels, events) {
if (optimistic) {
OptimisticInvocation.withValue(true, () => {
OptimisticInvocation.withValue(true, () => {
events.forEach(event => {
const docId = event[RedisPipe.DOC]._id;
const dedicatedChannel = getDedicatedChannel(
collectionName,
docId
);
RedisSubscriptionManager.process(dedicatedChannel, event);

channels.forEach(channelName => {
RedisSubscriptionManager.process(channelName, event);
});
const dedicatedChannel = getDedicatedChannel(collectionName, docId);
RedisSubscriptionManager.process(dedicatedChannel, [event]);
});
channels.forEach(channel => RedisSubscriptionManager.process(channel, events));
});
}

Expand Down
36 changes: 15 additions & 21 deletions lib/processors/actions/requery.js
Original file line number Diff line number Diff line change
@@ -1,44 +1,38 @@
import { _ } from 'meteor/underscore';
import { EJSON } from 'meteor/ejson';
import { Events } from '../../constants';
import { MongoIDMap } from '../../cache/mongoIdMap';

/**
* @param observableCollection
* @param newCommer
* @param event
* @param modifiedFields
* @param documentMap
*/
export default function (observableCollection, newCommer, event, modifiedFields) {
export default function (observableCollection, documentMap) {
const { store, selector, options } = observableCollection;

const newStore = new MongoIDMap();
const freshIds = observableCollection.collection.find(
selector, { ...options, fields: { _id: 1 } }).fetch();
freshIds.forEach(doc => newStore.set(doc._id, doc));

let added = false;
store.compareWith(newStore, {
// Any documents found only on the left store
// should be removed
leftOnly(docId) {
observableCollection.remove(docId);
},
// Any documents found in both and with documentMap entries
// have received redis updates indicating there are changes
both(docId) {
if (documentMap[docId]) {
observableCollection.change(documentMap[docId])
}
},
// Any documents only present in the right store are newly
// added
rightOnly(docId) {
if (newCommer && EJSON.equals(docId, newCommer._id)) {
added = true;
observableCollection.add(newCommer);
if (documentMap[docId]) {
observableCollection.add(documentMap[docId]);
} else {
observableCollection.addById(docId);
}
}
});

// if we have an update, and we have a newcommer, that new commer may be inside the ids
// TODO: maybe refactor this in a separate action (?)
if (newCommer
&& Events.UPDATE === event
&& modifiedFields
&& !added
&& store.has(newCommer._id)) {
observableCollection.change(newCommer, modifiedFields);
}
}
88 changes: 51 additions & 37 deletions lib/processors/default.js
Original file line number Diff line number Diff line change
@@ -1,32 +1,13 @@
import { Events } from '../constants';
import { Meteor } from 'meteor/meteor';
import Config from '../config';
import RedisPipe, { Events } from '../constants';
import requery from './actions/requery';

/**
* @param observableCollection
* @param event
* @param doc
* @param modifiedFields
*/
export default function(observableCollection, event, doc, modifiedFields) {
switch (event) {
case Events.INSERT:
handleInsert(observableCollection, doc);
break;
case Events.UPDATE:
handleUpdate(observableCollection, doc, modifiedFields);
break;
case Events.REMOVE:
handleRemove(observableCollection, doc);
break;
default:
throw new Meteor.Error(`Invalid event specified: ${event}`);
}
}

/**
* @param observableCollection
* @param doc
*/
const handleInsert = function(observableCollection, doc) {
const handleInsert = (observableCollection, doc) => {
if (
!observableCollection.contains(doc._id) &&
observableCollection.isEligible(doc)
Expand All @@ -36,30 +17,63 @@ const handleInsert = function(observableCollection, doc) {
};

/**
* @param observableCollection
* @param doc
* @param modifiedFields
*/
const handleUpdate = function(observableCollection, doc, modifiedFields) {
* @param observableCollection
* @param doc
* @param modifiedFields
*/
const handleUpdate = (observableCollection, doc, modifiedFields) => {
if (observableCollection.isEligible(doc)) {
if (observableCollection.contains(doc._id)) {
observableCollection.change(doc, modifiedFields);
} else {
observableCollection.add(doc);
}
} else {
if (observableCollection.contains(doc._id)) {
observableCollection.remove(doc._id);
}
} else if (observableCollection.contains(doc._id)) {
observableCollection.remove(doc._id);
}
};

/**
* @param observableCollection
* @param doc
*/
const handleRemove = function(observableCollection, doc) {
* @param observableCollection
* @param doc
*/
const handleRemove = (observableCollection, doc) => {
if (observableCollection.contains(doc._id)) {
observableCollection.remove(doc._id);
}
};

/**
* @param observableCollection
* @param events
* @param documentMap
*/
export default function(observableCollection, events, documentMap) {
const needsRequery = events.length > Config.maxRedisEventsToProcess;

if (needsRequery) {
requery(observableCollection, documentMap);
return;
}

for (let i = 0; i < events.length; i++) {
const event = events[i];
const docId = event[RedisPipe.DOC]._id;
const modifiedFields = event[RedisPipe.FIELDS];
const doc = documentMap[docId];

switch (event[RedisPipe.EVENT]) {
case Events.INSERT:
handleInsert(observableCollection, doc);
break;
case Events.UPDATE:
handleUpdate(observableCollection, doc, modifiedFields);
break;
case Events.REMOVE:
handleRemove(observableCollection, doc);
break;
default:
throw new Meteor.Error(`Invalid event specified: ${event}`);
}
}
}
Loading