diff --git a/Common/node/connector/dataStore.js b/Common/node/connector/dataStore.js deleted file mode 100644 index 83d2a2be4..000000000 --- a/Common/node/connector/dataStore.js +++ /dev/null @@ -1,138 +0,0 @@ -/* -* -* Copyright (C) 2011, The Locker Project -* All rights reserved. -* -* Please see the LICENSE file for more information. -* -*/ - -var IJOD = require('ijod').IJOD; -var lstate = require('lstate'); - -var ijodFiles = {}; -var mongo; -var mongoID = 'id'; - -exports.init = function(mongoid, _mongo) { - mongo = _mongo; - mongoID = mongoid; - for (var i in mongo.collections) { - // MAYBETODO: do .count() for each and reset lstate.set("field",val) here? - if (!ijodFiles[i]) { - ijodFiles[i] = new IJOD(i); - } - } -} - -exports.addCollection = function(name) { - if(!mongo.collections[name]) - mongo.addCollection(name); -} - -function now() { - return Date.now(); -} - -// arguments: type should match up to one of the mongo collection fields -// object will be the object to persist -// options is optional, but if it exists, available options are: strip + timestamp -// strip is an array of properties to strip off before persisting the object. -// options = {strip: ['person','checkins']}, for example -// timeStamp will be the timestamp stored w/ the record if it exists, otherwise, just use now. -// -exports.addObject = function(type, object, options, callback) { - lstate.up(type); - var timeStamp = now(); - if (arguments.length == 3) callback = options; - if (typeof options == 'object') { - for (var i in options['strip']) { - object[options['strip'][i]].delete - } - if (options['timeStamp']) { - timeStamp = options['timeStamp']; - } - } - ijodFiles[type].addRecord(timeStamp, object, function(err) { - if (err) - callback(err); - setCurrent(type, object, callback); - }) -} - -// same deal, except no strip option, just timestamp is available currently -exports.removeObject = function(type, id, options, callback) { - lstate.down(type); - var timeStamp = now(); - if (arguments.length == 3) callback = options; - if (typeof options == 'object') { - if (options['timeStamp']) { - timeStamp = options['timeStamp']; - } - } - var record = {deleted: timeStamp}; - record[mongoID] = id; - ijodFiles[type].addRecord(timeStamp, record, function(err) { - if (err) - callback(err); - removeCurrent(type, id, callback); - }) -} - - -// mongos -function getMongo(type, id, callback) { - var m = mongo.collections[type]; - if(!m) - callback(new Error('invalid type:' + type), null); - else if(!(id && (typeof id === 'string' || typeof id === 'number'))) - callback(new Error('bad id:' + id), null); - else - return m; -} - -exports.queryCurrent = function(type, query, options) { - query = query || {}; - options = options || {}; - var m = mongo.collections[type]; - if(!m) - callback(new Error('invalid type:' + type), null); - else - return m.find(query, options); -} - -exports.getAllCurrent = function(type, callback, options) { - options = options || {}; - var m = mongo.collections[type]; - if(!m) - callback(new Error('invalid type:' + type), null); - else - m.find({}, options).toArray(callback); -} - -exports.getCurrent = function(type, id, callback) { - var m = getMongo(type, id, callback); - if(m) { - var query = {}; - query[mongoID] = id; - m.findOne(query, callback); - } -} - -function setCurrent(type, object, callback) { - var m = getMongo(type, object[mongoID], callback); - if(m) { - var query = {}; - query[mongoID] = object[mongoID]; - m.update(query, object, {upsert:true, safe:true}, callback); - } -} - -function removeCurrent(type, id, callback) { - var m = getMongo(type, id, callback); - if(m) { - var query = {}; - query[mongoID] = id; - m.remove(query, callback); - } -} diff --git a/Common/node/ijod.js b/Common/node/ijod.js index 295a34763..6abda10e9 100644 --- a/Common/node/ijod.js +++ b/Common/node/ijod.js @@ -12,25 +12,334 @@ * Indexed JSON On Disk */ -var fs = require('fs'), - lconfig = require(__dirname + '/lconfig'), - path = require('path'), - lfs = require(__dirname + '/lfs'); - -function IJOD(name, dir) { - this.name = name; - this.dataFileName = name + '.json'; - if (dir) { - this.dataFile = fs.openSync(path.join(lconfig.lockerDir, lconfig.me, dir, this.dataFileName), 'a'); - } else { - this.dataFile = fs.openSync(this.dataFileName, 'a'); +var fs = require('fs'); +var path = require('path'); +var deepCompare = require('./deepCompare'); +var sqlite = require('sqlite-fts'); +var zlib = require("compress-buffer"); +var lutil = require("lutil"); +var async = require("async"); +var mmh3 = require("murmurhash3"); + +function IJOD(arg) { + if(!arg || !arg.name) throw new Error("invalid args"); + var self = this; + this.transactionItems = null; + self.name = arg.name; + self.gzname = arg.name + '.json.gz'; + self.dbname = arg.name + '.db'; +}; +IJOD.prototype.open = function(callback) { + var self = this; + try { + self.fda = fs.openSync(self.gzname, 'a'); + self.fdr = fs.openSync(self.gzname, 'r'); + var stat = fs.fstatSync(self.fdr); + self.len = stat.size; + } catch(E) { + return callback(E); + } + self.db = new sqlite.Database(); + self.db.open(self.dbname, function (err) { + if(err) return callback(err); + self.db.executeScript("CREATE TABLE IF NOT EXISTS ijod (id TEXT PRIMARY KEY, at INTEGER, len INTEGER, hash TEXT);",function (err) { + if(err) return callback(err); + callback(null, self); + }); + }); +}; +IJOD.prototype.close = function(callback) { + if (this.closed) { + console.error("Double close in IJOD"); + var E = new Error("double close"); + console.log(E.stack); + return callback(); + } + this.closed = true; + fs.closeSync(this.fda); + fs.closeSync(this.fdr); + this.db.close(callback); +} +exports.IJOD = IJOD; + +IJOD.prototype.startAddTransaction = function(cbDone) { + if (this.transactionItems) return cbDone(); + this.transactionItems = []; + console.log("****************************** BEGIN TRANSACTION in normal " + this.name); + this.db.execute("BEGIN TRANSACTION", function(error, rows) { cbDone(); }); +}; + +IJOD.prototype.commitAddTransaction = function(cbDone) { + if (!this.transactionItems || this.transactionItems.length == 0) return cbDone(); + //console.log("Commiting %d items", this.transactionItems.length); + var totalSize = this.transactionItems.reduce(function(prev, cur, idx, arr) { return prev + arr[idx].length; }, 0); + var writeBuffer = new Buffer(totalSize); + var idx = 0; + var self = this; + lutil.forEachSeries(self.transactionItems, function(item, cb) { + item.copy(writeBuffer, idx); + idx += item.length; + cb(); + }, function(err) { + fs.write(self.fda, writeBuffer, 0, writeBuffer.length, null, function(err, written, buffer) { + // We end the transaction + writeBuffer = null; + self.transactionItems = null; + if (err) { + console.error("Error writing to IJOD: %e", err); + } else if (written != totalSize) { + console.error("Only %d written of %d bytes to IJOD", written, totalSize); + } + console.log("****************************** COMMIT TRANSACTION in normal " + this.name); + self.db.execute("COMMIT TRANSACTION", function(error, rows) { cbDone(); }); + }); + }); +}; + +/// Abort a pending add transaction +/** +* Any pending write chunks are destroyed and the database transaction is rolled back. +* This is safe to call without a transaction started. +*/ +IJOD.prototype.abortAddTransaction = function(cbDone) { + if (!this.transactionItems) return cbDone(); + this.transactionItems = null; + this.db.execute("ROLLBACK TRANSACTION", function(error, rows) { cbDone(); }); +}; + +// takes arg of at least an id and data, callback(err) when done +IJOD.prototype.addData = function(arg, callback) { + if(!arg || !arg.id) return callback("invalid arg"); + arg.id = arg.id.toString(); // safety w/ numbers + var tmpJson = JSON.stringify(arg); + var hash = mmh3.murmur32HexSync(tmpJson); + if(!arg.at) arg.at = Date.now(); + var self = this; + this.startAddTransaction(function() { + var tmpJson = JSON.stringify(arg); + var gzdata = zlib.compress(new Buffer(tmpJson+"\n")); + self.transactionItems.push(gzdata); + var at = self.len; + self.len += gzdata.length; + self.db.execute("REPLACE INTO ijod VALUES (?, ?, ?, ?)", [arg.id, at, self.len-at, hash], callback); + }); +} + +// adds a deleted record to the ijod and removes from index +IJOD.prototype.delData = function(arg, callback) { + if(!arg || !arg.id) return callback("invalid arg"); + arg.id = arg.id.toString(); // safety w/ numbers + if(!arg.at) arg.at = Date.now(); + arg.type = "delete"; + var self = this; + var gzdata = zlib.compress(new Buffer(JSON.stringify(arg)+"\n")); + fs.write(self.fda, gzdata, 0, gzdata.length, null, function(err, written, buffer) { + if (err) { + return callback(err); } + + var at = self.len; + self.len += gzdata.length; + self.db.execute("DELETE FROM ijod WHERE id = ?", [arg.id], callback); + }); } -exports.IJOD = IJOD; +// this only calls callback(err, rawstring) once! +IJOD.prototype.getOne = function(arg, callback) { + if(!arg || !arg.id) return callback("invalid arg"); + arg.id = arg.id.toString(); // safety w/ numbers + var self = this; + var did = false; + self.db.query("SELECT at,len FROM ijod WHERE id = ? LIMIT 1", [arg.id], function(err, row){ + if(did) return; // only call callback ones + did = true; + if(err) return callback(err); + if(!row) return callback(); + var buf = new Buffer(row.len); + fs.readSync(self.fdr, buf, 0, row.len, row.at); + var data = zlib.uncompress(buf); + return callback(err, arg.raw ? data : stripper(data)); + }); +} + +// will call callback(err, rawstring) continuously until rawstring==undefined +IJOD.prototype.getAll = function(arg, callback) { + if(!arg) return callback("invalid arg"); + var params = []; + var sql = "SELECT at,len FROM ijod "; + if(arg.limit) + { + sql += " LIMIT ?"; + params.push(parseInt(arg.limit)); + } + if(arg.offset) + { + sql += " OFFSET ?"; + params.push(parseInt(arg.offset)); + } + var self = this; + self.db.query(sql, params, function(err, row){ + if(err) return callback(err); + if(!row) return callback(); + var buf = new Buffer(row.len); + fs.readSync(self.fdr, buf, 0, row.len, row.at); + var data = zlib.uncompress(buf); + return callback(err, arg.raw ? data : stripper(data)); + }); +} + +// takes a new object and checks first if it exists +// callback(err, new|same|update) +IJOD.prototype.smartAdd = function(arg, callback) { + if(!arg || !arg.id) return callback("invalid arg"); + arg.id = arg.id.toString(); // safety w/ numbers + var self = this; + var start = Date.now(); + self.getOne(arg, function(err, existing){ + //console.log("getOne in %d", (Date.now() - start)); + if(err) return callback(err); + // first check if it's new + if(!existing) + { + self.addData(arg, function(err){ + if(err) return callback(err); + callback(null, "new"); + }); + return; + } + try { var obj = JSON.parse(existing); } catch(E){ return callback(E); } + delete obj.at; // make sure not to compare any timestamps + delete arg.at; + // they're identical, do nothing + if (deepCompare(arg, obj)) return callback(null, 'same'); + // it's different, save an update + self.addData(arg, function(err){ + if(err) return callback(err); + callback(null, "update"); + }) + }); +} + +IJOD.prototype.batchSmartAdd = function(entries, callback) { + //console.log("Batch smart add %d entries", entries.length); + var t = Date.now(); + var self = this; + // TODO: We have 5 attempts to write and then we blow up, we need something smarter to do with this data + self.batchAttempt = self.batchAttempt || 0; + if (++self.batchAttempt > 5) { + self.batchAttempt = 0; + console.error("Out of attempts trying to batchSmartAdd, blowing up"); + throw new Error("Unable to batchSmartAdd"); + } + + function handleError(msg) { + console.error("Batch smart add error: %s", msg); + console.trace(); + self.abortAddTransaction(function() { + setTimeout(function() { + self.batchSmartAdd(entries, callback); + }, 500); + }); + } + + var script = ["CREATE TEMP TABLE IF NOT EXISTS batchSmartAdd (id TEXT)", "DELETE FROM batchSmartAdd", "BEGIN TRANSACTION"].join(";") + ";"; + console.log("****************************** BEGIN TRANSACTION in batch " + this.name); + self.db.executeScript(script, function(error, rows) { + if (error) return handleError(error); + self.db.prepare("INSERT INTO batchSmartAdd VALUES (?)", function(error, stmt) { + if (error) return handleError(error); + async.forEachSeries(entries, function(entry, cb) { + if (!entry) return cb(); + stmt.bind(1, entry.id, function(err) { + if (err) return cb(err); + stmt.step(function(error, row) { + if (error) return cb(error); + stmt.reset(); + cb(); + }); + }); + }, function(error) { + if (error) return handleError(error); + console.log("****************************** COMMIT TRANSACTION in batch " + this.name); + self.db.execute("COMMIT TRANSACTION", function(error, rows) { + if (error) return handleError(error); + stmt.finalize(function(error) { + self.db.execute("SELECT id,hash FROM ijod WHERE ijod.id IN (SELECT id FROM batchSmartAdd)", function(error, rows) { + if (error) return handleError(error); + var knownIds = {}; + rows = rows || []; + rows.forEach(function(row) { + knownIds[row.id] = row.hash; + }); + self.startAddTransaction(function() { + async.forEachSeries(entries, function(entry, cb) { + if (!entry) return cb(); + if (knownIds[entry.id]) { + // See if we need to update + entry.id = entry.id.toString(); // safety w/ numbers + var hash = mmh3.murmur32HexSync(JSON.stringify(entry)); + // If the id and hashes match it's the same! + if (hash == knownIds[entry.id]) { + return cb(); + } + } + self.addData(entry, cb); + }, function(error) { + if (error) return handleError(error); + self.batchAttempt = 0; + self.commitAddTransaction(callback); + //console.log("Batch done: %d", (Date.now() - t)); + }); + }) + }); + }); + }); + }); + }); + }); +}; + +// utilities to respond to a web request, shared between synclets and push +IJOD.prototype.reqCurrent = function(req, res) +{ + var streaming = (req.query['stream'] == "true"); + var options = {}; + if(req.query['limit']) options.limit = parseInt(req.query['limit']); + if(req.query['offset']) options.offset = parseInt(req.query['offset']); + + var ctype = streaming ? "application/jsonstream" : "application/json"; + res.writeHead(200, {'content-type' : ctype}); + var first = true; + this.getAll(options, function(err, item){ + if(err) logger.error(err); + if(item == null) + { // all done + if(!streaming) res.write("]"); + return res.end() + } + if(streaming) return res.write(item+'\n'); + if(first) + { + first = false; + return res.write('['+item); + } + res.write(','+item); + }); + +} +IJOD.prototype.reqID = function(req, res) +{ + this.getOne({id:req.params.id}, function(err, item) { + if (err) logger.error(err); + if (!item) return res.send("not found",404); + res.writeHead(200, {'content-type' : 'application/json'}); + res.end(item); + }); +} -IJOD.prototype.addRecord = function(timeStamp, record, callback) { - var str = JSON.stringify({timeStamp:timeStamp, data:record}) + '\n'; - var b = new Buffer(str); - fs.write(this.dataFile, b, 0, b.length, this.fileLength, callback); +// make a string and return only the interior data object! +function stripper(buf) +{ + var s = buf.toString("utf8"); + return s.slice(s.indexOf('{',1),s.lastIndexOf('}',s.length-3)+1); // -3 accounts for }\n } diff --git a/Common/node/ldatastore.js b/Common/node/ldatastore.js deleted file mode 100644 index 0c0f3e9c8..000000000 --- a/Common/node/ldatastore.js +++ /dev/null @@ -1,190 +0,0 @@ -/* -* -* Copyright (C) 2011, The Locker Project -* All rights reserved. -* -* Please see the LICENSE file for more information. -* -*/ -var IJOD = require('ijod').IJOD - , lconfig = require('lconfig') - , logger = require('logger') - , lmongo = require('lmongo') - , ijodFiles = {} - , deepCompare = require('deepCompare') - , mongo = {} - , colls = {} - , mongoIDs = {} - ; - -exports.init = function(owner, callback) { - if (mongo[owner]) return callback(); - lmongo.init(owner, [], function(_mongo) { - mongo[owner] = _mongo; - colls[owner] = mongo[owner].collections[owner]; - callback(); - }); -} - -exports.addCollection = function(owner, name, dir, id) { - mongoIDs[dir + "_" + name] = id; - if(!colls[owner][dir + "_" + name]) { - mongo[owner].addCollection(owner, dir + "_" + name); - var ndx = {}; - ndx[id] = true; - colls[owner][dir + "_" + name].ensureIndex(ndx,{unique:true},function() {}); - } - if(!ijodFiles[dir + "_" + name]) - ijodFiles[dir + "_" + name] = new IJOD(name, dir); -} - -// arguments: type should match up to one of the mongo collection fields -// object will be the object to persist -// options is optional, but if it exists, available options are: strip + timestamp -// strip is an array of properties to strip off before persisting the object. -// options = {strip: ['person','checkins']}, for example -// timeStamp will be the timestamp stored w/ the record if it exists, otherwise, just use now. -// -exports.addObject = function(owner, type, object, options, callback) { - var timeStamp = Date.now(); - if (arguments.length == 3) callback = options; - if (typeof options == 'object') { - for (var i in options['strip']) { - var key = options['strip'][i]; - delete object[key]; - } - if (options['timeStamp']) { - timeStamp = options['timeStamp']; - } - } - setCurrent(owner, type, object, function(err, newType, doc) { - if (newType === 'same') return callback(err, newType, doc); - ijodFiles[type].addRecord(timeStamp, object, function(err) { - callback(err, newType, doc); - }); - }); -} - -// same deal, except no strip option, just timestamp is available currently -exports.removeObject = function(owner, type, id, options, callback) { - var timeStamp = Date.now(); - if (arguments.length == 4) callback = options; - if (typeof options == 'object') { - if (options['timeStamp']) { - timeStamp = options['timeStamp']; - } - } - var record = {deleted: timeStamp}; - record[mongoIDs[type]] = id; - ijodFiles[type].addRecord(timeStamp, record, function(err) { - if (err) return callback(err); - removeCurrent(owner, type, id, callback); - }) -} - -exports.queryCurrent = function(owner, type, query, options, callback) { - try { - query = query || {}; - options = options || {}; - var m = getMongo(owner, type, callback); - m.find(query, options).toArray(callback); - } catch(err) { - return callback(err); - } -} - -exports.getAllCurrent = function(owner, type, callback, options) { - try { - options = options || {}; - var m = getMongo(owner, type, callback); - m.find({}, options).toArray(callback); - } catch(err) { - return callback(err); - } -} - -exports.getEachCurrent = function(owner, type, callback, options) { - try { - options = options || {}; - var m = getMongo(owner, type, callback); - m.find({}, options).each(callback); - } catch(err) { - return callback(err); - } -} - -exports.getCurrent = function(owner, type, id, callback) { - try { - if (!(id && (typeof id === 'string' || typeof id === 'number'))) return callback(new Error('bad id:' + id), null); - var m = getMongo(owner, type); - var query = {_id: mongo[owner].db.bson_serializer.ObjectID(id)}; - m.findOne(query, callback); - } catch(err) { - return callback(err); - } -} - -exports.getCurrentId = function(owner, type, id, callback) { - try { - if (!(id && (typeof id === 'string' || typeof id === 'number'))) return callback(new Error('bad id:' + id), null); - var m = getMongo(owner, type); - var query = {"id":parseInt(id)}; - m.findOne(query, callback); - } catch(err) { - return callback(err); - } -} - - -function setCurrent(owner, type, object, callback) { - if (type && object && callback && object[mongoIDs[type]]) { - try { - var m = getMongo(owner, type, callback); - var query = {}; - query[mongoIDs[type]] = object[mongoIDs[type]]; - m.findAndModify(query, [], object, {upsert:true, safe:true}, function(err, doc) { - if (deepCompare(doc, {})) { - m.findOne(query, function(err, newDoc) { - callback(err, 'new', newDoc); - }); - } else { - var id = doc._id; - delete doc._id; - if (deepCompare(doc, object)) { - callback(err, 'same', doc); - } else { - doc._id = id; - callback(err, 'update', doc); - } - } - }); - } catch(err) { - return callback(err); - } - } else { - logger.error('failed to set current in ldatastore'); - logger.error(type) - logger.error(object) - logger.error(callback); - } -} - -function removeCurrent(owner, type, id, callback) { - try { - var m = getMongo(owner, type); - var query = {}; - query[mongoIDs[type]] = id; - m.remove(query, callback); - } catch(err) { - return callback(err); - } -} - -function getMongo(owner, type) { - var m = colls[owner][type]; - if(!m) { - mongo[owner].addCollection(owner, type); - m = colls[owner][type]; - } - return m; -} diff --git a/Common/node/lpushmanager.js b/Common/node/lpushmanager.js index 07d0fd583..2ece6e494 100644 --- a/Common/node/lpushmanager.js +++ b/Common/node/lpushmanager.js @@ -1,37 +1,17 @@ var fs = require('fs') , path = require('path') , lconfig = require("lconfig") - , ldatastore = require('ldatastore') - , datastore = {} + , logger = require("logger") , async = require('async') , datasets = {} , levents = require('levents') , lutil = require('lutil') + , IJOD = require('ijod').IJOD , config = {} ; -// this works, but feels like it should be a cleaner abstraction layer on top of the datastore instead of this garbage -datastore.init = function(callback) { - ldatastore.init('push', callback); -} - -datastore.addCollection = function(dataset) { - ldatastore.addCollection('push', dataset, 'push', 'id'); -} - -datastore.removeObject = function(dataset, id, ts, callback) { - if (typeof(ts) === 'function') { - ldatastore.removeObject('push', 'push_' + dataset, id, {timeStamp: Date.now()}, ts); - } else { - ldatastore.removeObject('push', 'push_' + dataset, id, ts, callback); - } -} - -datastore.addObject = function(dataset, obj, ts, callback) { - ldatastore.addObject('push', 'push_' + dataset, obj, ts, callback); -} - +config.ijods = {}; config.datasets = {}; module.exports.datasets = config.datasets; @@ -45,27 +25,42 @@ module.exports.init = function () { } module.exports.acceptData = function(dataset, response, callback) { - datastore.init(function() { - var deletedIDs = {}; - if (response.config) { - if (config[dataset]) { - deletedIDs = compareIDs(config[dataset], response.config); - } else { - config[dataset] = {}; - } - lutil.extend(true, config[dataset], response.config); - lutil.atomicWriteFileSync(path.join(lconfig.lockerDir, lconfig.me, "push", 'push_config.json'), - JSON.stringify(config, null, 4)); - } - if (typeof(response.data) === 'string') { - return callback('data is in a wacked out format'); - } - if (dataset.length === 0) { - return callback(); + var deletedIDs = {}; + if (response.config) { + if (config[dataset]) { + deletedIDs = compareIDs(config[dataset], response.config); + } else { + config[dataset] = {}; } - processData(deletedIDs, response.data, dataset, callback); + lutil.extend(true, config[dataset], response.config); + lutil.atomicWriteFileSync(path.join(lconfig.lockerDir, lconfig.me, "push", 'push_config.json'), + JSON.stringify(config, null, 4)); + } + if (typeof(response.data) === 'string') { + return callback('data is in a wacked out format'); + } + if (dataset.length === 0) { + return callback(); + } + processData(deletedIDs, response.data, dataset, callback); +} + +// simple async friendly wrapper +function getIJOD(dataset, create, callback) { + if(config.ijods[dataset]) return callback(config.ijods[dataset]); + var name = path.join(lconfig.lockerDir, lconfig.me, "push", dataset); + // only load if one exists or create flag is set + fs.stat(name+".db", function(err, stat){ + if(!stat && !create) return callback(); + var ij = new IJOD({name:name}) + config.ijods[dataset] = ij; + ij.open(function(err) { + if(err) logger.error(err); + return callback(ij); + }); }); } +module.exports.getIJOD = getIJOD; // copy copy copy function compareIDs (originalConfig, newConfig) { @@ -90,35 +85,36 @@ function processData (deleteIDs, data, dataset, callback) { lutil.atomicWriteFileSync(path.join(lconfig.lockerDir, lconfig.me, "push", 'push_config.json'), JSON.stringify(config, null, 4)); } - datastore.addCollection(dataset); - - if (deleteIDs && deleteIDs.length > 0 && data) { - addData(dataset, data, function(err) { - if(err) { - callback(err); - } else { - deleteData(dataset, deleteIDs, callback); - } - }); - } else if (data && data.length > 0) { - addData(dataset, data, callback); - } else if (deleteIDs && deleteIDs.length > 0) { - deleteData(dataset, deleteIDs, callback); - } else { - callback(); - } + // TODO: Explicitly close + getIJOD(dataset, true, function(ijod){ + if (deleteIDs && deleteIDs.length > 0 && data) { + addData(dataset, data, ijod, function(err) { + if(err) { + callback(err); + } else { + deleteData(dataset, deleteIDs, ijod, callback); + } + }); + } else if (data && data.length > 0) { + addData(dataset, data, ijod, callback); + } else if (deleteIDs && deleteIDs.length > 0) { + deleteData(dataset, deleteIDs, ijod, callback); + } else { + callback(); + } + }); } -function deleteData (dataset, deleteIds, callback) { +function deleteData (dataset, deleteIds, ijod, callback) { var q = async.queue(function(id, cb) { levents.fireEvent(lutil.idrNew(dataset, 'push', id), 'delete'); - datastore.removeObject(dataset, id, cb); + ijod.delData({id:id}, cb); }, 5); deleteIds.forEach(q.push); q.drain = callback; } -function addData (dataset, data, callback) { +function addData (dataset, data, ijod, callback) { var errs = []; var q = async.queue(function(item, cb) { var object = (item.obj) ? item : {obj: item}; @@ -129,24 +125,29 @@ function addData (dataset, data, callback) { } if (object.type === 'delete') { levents.fireEvent(lutil.idrNew(dataset, 'push', object.obj.id), 'delete'); - datastore.removeObject(dataset, object.obj["id"], {timeStamp: object.timestamp}, cb); + ijod.delData({id:object.obj["id"]}, cb); } else { - datastore.addObject(dataset, object.obj, {timeStamp: object.timestamp}, function(err, type, doc) { - if (type === 'same') return cb(); - levents.fireEvent(lutil.idrNew(dataset, 'push', object.obj.id), type, doc); + var arg = {id:object.obj.id, data:object.obj}; + if(object.timestamp) arg.at = object.timestamp; + ijod.smartAdd(arg, function(err, type) { + if(err) logger.error(err); + if (!type || type === 'same') return cb(); + levents.fireEvent(lutil.idrNew(dataset, 'push', object.obj.id), type, object.obj); return cb(); }); } } else { cb(); } - }, 5); - data.forEach(function(d){ q.push(d, errs.push); }); // hehe fun + }, 1); + ijod.startAddTransaction(function(err) { + data.forEach(function(d){ q.push(d, errs.push); }); // hehe fun + }); q.drain = function() { if (errs.length > 0) { - callback(errs); + ijod.abortAddTransaction(function() { callback(errs); }); } else { - callback(); + ijod.commitAddTransaction(callback); } }; } diff --git a/Common/node/lservicemanager.js b/Common/node/lservicemanager.js index eafeadb02..6e4161aed 100644 --- a/Common/node/lservicemanager.js +++ b/Common/node/lservicemanager.js @@ -25,19 +25,18 @@ var stats; var serviceMap = { }; // All of the immediately addressable services in the system var shuttingDown = null; -var syncletManager, registry; +var registry; var lockerPortNext = parseInt("1" + lconfig.lockerPort, 10); /** * Scans the Me directory for instaled services */ -exports.init = function (sman, reg, callback) { +exports.init = function (reg, callback) { logger = require('logger'); logger.info('lservicemanager lockerPortNext = ' + lockerPortNext); stats = new dispatcher(lconfig.stats); - syncletManager = sman; registry = reg; var dirs = fs.readdirSync(lconfig.me); for (var i = 0; i < dirs.length; i++) { @@ -220,12 +219,6 @@ exports.mapReload = function(id) { levents.addListener(ev[0], js.id, ev[1], batching); } } - // start em up if they're ready - if (js.synclets && js.auth && js.authed) { - for (var j = 0; j < js.synclets.length; j++) { - syncletManager.scheduleRun(js, js.synclets[j]); - } - } exports.mapDirty(js.id); }; diff --git a/Common/node/lsyncmanager.js b/Common/node/lsyncmanager.js index bcb46be4c..3f3678a44 100644 --- a/Common/node/lsyncmanager.js +++ b/Common/node/lsyncmanager.js @@ -2,8 +2,7 @@ var fs = require('fs') , path = require('path') , lconfig = require("lconfig") , spawn = require('child_process').spawn - , ldatastore = require('ldatastore') - , datastore = {} + , IJOD = require('ijod').IJOD , async = require('async') , url = require('url') , lutil = require('lutil') @@ -15,85 +14,113 @@ var fs = require('fs') , dispatcher = require('./instrument.js').StatsdDispatcher , stats = new dispatcher(lconfig.stats); +var PAGING_TIMING = 2000; // 2s gap in paging + var runningContexts = {}; // Map of a synclet to a running context -function LockerInterface(synclet, info) { - EventEmitter.call(this); - this.synclet = synclet; - this.info = info; - this.srcdir = path.join(lconfig.lockerDir, info.srcdir); - this.workingDirectory = path.join(lconfig.lockerDir, lconfig.me, info.id); - this.processing = false; // If we're processing events - this.events = []; +function SyncletManager() +{ + this.scheduled = {}; + this.offlineMode = false; } -util.inherits(LockerInterface, EventEmitter); -LockerInterface.prototype.error = function(message) { - logger.error("Error from synclet " + this.synclet.name + "/" + this.info.id + ": " + message); +/// Schedule a synclet to run +/** +* timeToRun is optional. In this case the next run time is calculated +* based on normal frequency and tolerance methods. +*/ +SyncletManager.prototype.schedule = function(connectorInfo, syncletInfo, timeToRun) { + var key = this.getKey(connectorInfo, syncletInfo); + // Let's get back to a clean slate on this synclet + if (this.scheduled[key]) { + logger.debug(key + " was already scheduled."); + this.cleanup(connectorInfo, syncletInfo); + } + syncletInfo.status = "waiting"; + if (!syncletInfo.frequency) return; + + // In offline mode things may only be ran directly with runAndReschedule + if (this.offlineMode) return; + + // validation check + if(syncletInfo.nextRun && typeof syncletInfo.nextRun != "number") delete syncletInfo.nextRun; + + if (timeToRun === undefined) { + // had a schedule and missed it, run it now + if(syncletInfo.nextRun && syncletInfo.nextRun <= Date.now()) { + logger.verbose("scheduling " + key + " to run immediately (missed)"); + timeToRun = 0; + } else if (!syncletInfo.nextRun) { + // if not scheduled yet, schedule it to run in the future + var milliFreq = parseInt(syncletInfo.frequency) * 1000; + syncletInfo.nextRun = parseInt(Date.now() + milliFreq + (((Math.random() - 0.5) * 0.5) * milliFreq)); // 50% fuzz added or subtracted + timeToRun = syncletInfo.nextRun - Date.now(); + } + } + + logger.verbose("scheduling " + key + " (freq " + syncletInfo.frequency + "s) to run in " + (timeToRun / 1000) + "s"); + var self = this; + this.scheduled[key] = setTimeout(function() { + self.runAndReschedule(connectorInfo, syncletInfo); + }, timeToRun); }; -// Fire an event from the synclet -LockerInterface.prototype.event = function(action, lockerType, obj) { - this.events.push({action:action, lockerType:lockerType, obj:obj}); - this.emit("event"); - this.processEvents(); +/// Remove the synclet from scheduled and cleanup all other state, does not reset it to run again +SyncletManager.prototype.cleanup = function(connectorInfo, syncletInfo) { + var key = this.getKey(connectorInfo, syncletInfo); + if (this.scheduled[key]) { + clearTimeout(this.scheduled[key]); // remove any existing timer + delete this.scheduled[key]; + } }; -LockerInterface.prototype.processEvents = function() { - if (this.processing) return; - // Process the events we have - this.processing = true; +/// Run the synclet and then attempt to reschedule it +SyncletManager.prototype.runAndReschedule = function(connectorInfo, syncletInfo, callback) { + //TODO: Ensure that we only run one per connector at a time. + delete syncletInfo.nextRun; // We're going to run and get a new time, so reset + this.cleanup(connectorInfo, syncletInfo); var self = this; - var curEvents = this.events; - this.events = []; - async.forEachSeries(curEvents, function(event, cb) { - processData([], self.info, self.synclet, event.lockerType, [event], cb); - }, function(error) { - self.processing = false; - if (self.events.length === 0) { - self.emit("drain"); - } else { - process.nextTick(function() { - self.processEvents(); - }); + // Tolerance isn't done yet, we'll come back + if (!this.checkToleranceReady(connectorInfo, syncletInfo)) { + return self.schedule(connectorInfo, syncletInfo); + } + executeSynclet(connectorInfo, syncletInfo, function(error) { + var nextRunTime = undefined; + if (connectorInfo.config && connectorInfo.config.nextRun < 0) { + // This wants to page so we'll schedule it for anther run in just a short gap. + nextRunTime = PAGING_TIMING; } + // Make sure we reschedule this before we return anything else + self.schedule(connectorInfo, syncletInfo, nextRunTime); + if (callback) return callback(error); }); }; -// Signals that the synclet context is complete and may be cleaned up -LockerInterface.prototype.end = function() { - if (this.events.length > 0) { - this.once("drain", function() { - this.emit("end"); - }); - } else { - this.emit("end"); +/// Return true if the tolerance is ready for us to actually run +SyncletManager.prototype.checkToleranceReady = function(connectorInfo, syncletInfo) { + // Make sure the baseline is there + if (syncletInfo.tolMax === undefined) { + syncletInfo.tolAt = 0; + syncletInfo.tolMax = 0; } + // if we can have tolerance, try again later + if(syncletInfo.tolAt > 0) { + syncletInfo.tolAt--; + logger.verbose("tolerance now at " + syncletInfo.tolAt + " synclet " + syncletInfo.name + " for " + connectorInfo.id); + return false; + } + return true; }; - -// this works, but feels like it should be a cleaner abstraction layer on top of the datastore instead of this garbage -datastore.init = function (callback) { - ldatastore.init('synclets', callback); -}; - -datastore.addCollection = function (collectionKey, id, mongoId) { - ldatastore.addCollection('synclets', collectionKey, id, mongoId); -}; - -datastore.removeObject = function (collectionKey, id, ts, callback) { - if (typeof(ts) === 'function') { - ldatastore.removeObject('synclets', collectionKey, id, {timeStamp: Date.now()}, ts); - } else { - ldatastore.removeObject('synclets', collectionKey, id, ts, callback); - } +// This trivial helper function just makes sure we're consistent and we can change it easly +SyncletManager.prototype.getKey = function(connectorInfo, syncletInfo) { + return connectorInfo.id + "-" + syncletInfo.name; }; -datastore.addObject = function (collectionKey, obj, ts, callback) { - ldatastore.addObject('synclets', collectionKey, obj, ts, callback); -}; +syncletManager = new SyncletManager(); +var ijods = {}; // core syncmanager init function, need to talk to serviceManager var serviceManager; exports.init = function (sman, callback) { serviceManager = sman; - datastore.init(callback); + callback(); }; var executeable = true; @@ -101,87 +128,65 @@ exports.setExecuteable = function (e) { executeable = e; }; +// Run a connector or a specific synclet right now exports.syncNow = function (serviceId, syncletId, post, callback) { - if(typeof syncletId == "function") - { - callback = syncletId; - syncletId = false; + if(typeof syncletId == "function") { + callback = syncletId; + syncletId = false; + } + + var js = serviceManager.map(serviceId); + if (!js || !js.synclets) return callback("no synclets like that installed"); + async.forEachSeries(js.synclets, function (synclet, cb) { + if (!synclet) { + logger.error("Unknown synclet info in syncNow"); + return cb(); } - var js = serviceManager.map(serviceId); - if (!js || !js.synclets) return callback("no synclets like that installed"); - async.forEachSeries(js.synclets, function (synclet, cb) { - if (!synclet) { - logger.error("Unknown synclet info in syncNow"); - cb(); - } - if(syncletId && synclet.name != syncletId) return cb(); - if(post) - { - if(!Array.isArray(synclet.posts)) synclet.posts = []; - synclet.posts.push(post); - } - executeSynclet(js, synclet, cb, true); - }, callback); + // If they requested a specific synclet we'll skip everything but that one + if(syncletId && synclet.name != syncletId) return cb(); + if(post) { + if(!Array.isArray(synclet.posts)) synclet.posts = []; + synclet.posts.push(post); + } + syncletManager.runAndReschedule(js, synclet, cb); + }, callback); }; // run all synclets that have a tolerance and reset them exports.flushTolerance = function(callback, force) { - var map = serviceManager.map(); - async.forEach(Object.keys(map), function(service, cb){ // do all services in parallel - if(!map[service].synclets) return cb(); - async.forEachSeries(map[service].synclets, function(synclet, cb2) { // do each synclet in series - if (!force && (!synclet.tolAt || synclet.tolAt === 0)) return cb2(); - synclet.tolAt = 0; - executeSynclet(map[service], synclet, cb2); - }, cb); - }, callback); + // TODO: This now has an implied force, is that correct, why wouldn't you want this to force? + var map = serviceManager.map(); + async.forEach(Object.keys(map), function(service, cb){ // do all services in parallel + // We only want services with synclets + if(!map[service].synclets) return cb(); + async.forEachSeries(map[service].synclets, function(synclet, cb2) { // do each synclet in series + synclet.tolAt = 0; + syncletManager.runAndReschedule(map[service], synclet, cb2); + }, cb); + }, callback); }; /** * Add a timeout to run a synclet +* DEPRECATE: Use the syncletManager.schedule directly */ -var scheduled = {}; exports.scheduleRun = function(info, synclet) { - synclet.status = "waiting"; - if (!synclet.frequency) return; - - var key = info.id + "-" + synclet.name; - if(scheduled[key]) clearTimeout(scheduled[key]); // remove any existing timer - - // run from a clean state - var force = false; - function run() { - delete scheduled[key]; - executeSynclet(info, synclet, function(){}, force); - } - - // the synclet is paging and needs to run again immediately, forcefully - if(info.config && info.config.nextRun === -1) - { - force = true; - delete info.config.nextRun; - logger.verbose("scheduling "+key+" to run immediately (paging)"); - return setTimeout(run, 2000); - } - - // validation check - if(synclet.nextRun && typeof synclet.nextRun != "number") delete synclet.nextRun; - - // had a schedule and missed it, run it now - if(synclet.nextRun && synclet.nextRun <= Date.now()) { - logger.verbose("scheduling "+key+" to run immediately (missed)"); - return process.nextTick(run); - } + logger.warn("Deprecated use of scheduleRun"); + console.trace(); + syncletManager.schedule(info, synclet); +}; - // if not scheduled yet, schedule it to run in the future - if(!synclet.nextRun) - { - var milliFreq = parseInt(synclet.frequency) * 1000; - synclet.nextRun = parseInt(Date.now() + milliFreq + (((Math.random() - 0.5) * 0.5) * milliFreq)); // 50% fuzz added or subtracted - } - var timeout = synclet.nextRun - Date.now(); - logger.verbose("scheduling "+key+" (freq "+synclet.frequency+") to run in "+(timeout/1000)+"s"); - scheduled[key] = setTimeout(run, timeout); +// Find al the synclets and get their initial scheduling done +exports.scheduleAll = function(callback) { + var map = serviceManager.map(); + async.forEach(Object.keys(map), function(service, cb){ // do all services in parallel + // We only want services with synclets + if(!map[service].synclets) return cb(); + async.forEach(map[service].synclets, function(synclet, cb2) { // do each synclet in series + syncletManager.schedule(map[service], synclet); + cb2(); + }, cb); + }, callback); }; function localError(base, err) { @@ -191,38 +196,18 @@ function localError(base, err) { /** * Executes a synclet */ -function executeSynclet(info, synclet, callback, force) { +function executeSynclet(info, synclet, callback) { if(!callback) callback = function(err){ if(err) return logger.error(err); logger.debug("finished processing "+synclet.name); }; if (synclet.status === 'running') return callback('already running'); - delete synclet.nextRun; // cancel any schedule if(!info.auth || !info.authed) return callback("no auth info for "+info.id); - // we're put on hold from running any for some reason, re-schedule them - // this is a workaround for making synclets available in the map separate from scheduling them which could be done better - if (!force && !executeable) { - setTimeout(function() { - executeSynclet(info, synclet, callback); - }, 1000); - return; - } - if(!synclet.tolMax) { - synclet.tolAt = 0; - synclet.tolMax = 0; - } - // if we can have tolerance, try again later - if(!force && synclet.tolAt > 0) { - synclet.tolAt--; - logger.verbose("tolerance now at "+synclet.tolAt+" synclet "+synclet.name+" for "+info.id); - exports.scheduleRun(info, synclet); - return callback(); - } // if another synclet is running, come back a little later, don't overlap! if (info.status == 'running' || runningContexts[info.id + "/" + synclet.name]) { logger.verbose("delaying "+synclet.name); setTimeout(function() { - executeSynclet(info, synclet, callback, force); + executeSynclet(info, synclet, callback); }, 10000); return; } @@ -231,6 +216,24 @@ function executeSynclet(info, synclet, callback, force) { var tstart = Date.now(); stats.increment('synclet.' + info.id + '.' + synclet.name + '.start'); stats.increment('synclet.' + info.id + '.' + synclet.name + '.running'); + // This is super handy for testing. + /* + var fname = path.join("tmp", info.id + "-" + synclet.name + ".json"); + if (path.existsSync(fname)) { + var resp = JSON.parse(fs.readFileSync(fname)); + var startTime = Date.now(); + console.log("Start response processing for %s", (info.id + "/" + synclet.name)); + processResponse({}, info, synclet, resp, function(processErr) { + process.stdin.on("data", function(data) { + console.log(data.toString()); + }); + console.log("Response Processing Done for %s in %d", (info.id + "/" + synclet.name), (Date.now() - startTime)); + info.status = 'waiting'; + callback(processErr); + }); + return; + } + */ if (info.vm || synclet.vm) { // Go ahead and create a context immediately so we get it listed as @@ -280,7 +283,7 @@ function executeSynclet(info, synclet, callback, force) { var deleteIDs = compareIDs(info.config, response.config); info.auth = lutil.extend(true, info.auth, response.auth); // for refresh tokens and profiles info.config = lutil.extend(true, info.config, response.config); - exports.scheduleRun(info, synclet); + //exports.scheduleRun(info, synclet); serviceManager.mapDirty(info.id); // save out to disk processResponse(deleteIDs, info, synclet, response, function(processErr) { info.status = 'waiting'; @@ -349,7 +352,7 @@ function executeSynclet(info, synclet, callback, force) { var deleteIDs = compareIDs(info.config, response.config); info.auth = lutil.extend(true, info.auth, response.auth); // for refresh tokens and profiles info.config = lutil.extend(true, info.config, response.config); - exports.scheduleRun(info, synclet); + //exports.scheduleRun(info, synclet); serviceManager.mapDirty(info.id); // save out to disk processResponse(deleteIDs, info, synclet, response, function(err){ info.status = 'waiting'; @@ -388,42 +391,73 @@ function compareIDs (originalConfig, newConfig) { } function processResponse(deleteIDs, info, synclet, response, callback) { - datastore.init(function() { - synclet.status = 'waiting'; + synclet.status = 'waiting'; - var dataKeys = []; - if (typeof(response.data) === 'string') { - return callback('bad data from synclet'); - } - for (var i in response.data) { - if(!Array.isArray(response.data[i])) continue; - dataKeys.push(i); - } - for (var i in deleteIDs) { - if (!dataKeys[i]) dataKeys.push(i); + var dataKeys = []; + if (typeof(response.data) === 'string') { + return callback('bad data from synclet'); + } + for (var i in response.data) { + if(!Array.isArray(response.data[i])) continue; + dataKeys.push(i); + } + for (var i in deleteIDs) { + if (!dataKeys[i]) dataKeys.push(i); + } + synclet.deleted = synclet.added = synclet.updated = 0; + async.forEach(dataKeys, function(key, cb) { processData(deleteIDs[key], info, synclet, key, response.data[key], cb); }, function(err){ + if(err) logger.error("err processing data: "+err); + // here we roughly compromise a multiplier up or down based on the threshold being met + var threshold = synclet.threshold || lconfig.tolerance.threshold; + var total = synclet.deleted + synclet.added + synclet.updated; + if (total < threshold) + { + if(synclet.tolMax < lconfig.tolerance.maxstep) synclet.tolMax++; // max 10x scheduled + synclet.tolAt = synclet.tolMax; + } else { + if(synclet.tolMax > 0) synclet.tolMax--; + synclet.tolAt = synclet.tolMax; } - synclet.deleted = synclet.added = synclet.updated = 0; - async.forEach(dataKeys, function(key, cb) { processData(deleteIDs[key], info, synclet, key, response.data[key], cb); }, function(err){ - if(err) logger.error("err processing data: "+err); - // here we roughly compromise a multiplier up or down based on the threshold being met - var threshold = synclet.threshold || lconfig.tolerance.threshold; - var total = synclet.deleted + synclet.added + synclet.updated; - if (total < threshold) { - if(synclet.tolMax < lconfig.tolerance.maxstep) synclet.tolMax++; // max 10x scheduled - synclet.tolAt = synclet.tolMax; - } else { - if(synclet.tolMax > 0) synclet.tolMax--; - synclet.tolAt = synclet.tolMax; - } - stats.increment('synclet.' + info.id + '.' + synclet.name + '.added', synclet.added); - stats.increment('synclet.' + info.id + '.' + synclet.name + '.updated', synclet.updated); - stats.increment('synclet.' + info.id + '.' + synclet.name + '.deleted', synclet.deleted); - stats.increment('synclet.' + info.id + '.' + synclet.name + '.length', dataKeys.reduce(function(prev, cur, idx, arr) { console.log(cur); return prev + response.data[cur].length; }, 0)); - logger.info("total of "+synclet.added+" added, "+synclet.updated+" updated, "+synclet.deleted+" deleted, and threshold "+threshold+" so setting tolerance to "+synclet.tolMax); - callback(err); + stats.increment('synclet.' + info.id + '.' + synclet.name + '.added', synclet.added); + stats.increment('synclet.' + info.id + '.' + synclet.name + '.updated', synclet.updated); + stats.increment('synclet.' + info.id + '.' + synclet.name + '.deleted', synclet.deleted); + stats.increment('synclet.' + info.id + '.' + synclet.name + '.length', dataKeys.reduce(function(prev, cur, idx, arr) { return prev + response.data[cur].length; }, 0)); + logger.info("total of "+synclet.added+"+"+synclet.updated+"+"+synclet.deleted+" and threshold "+threshold+" so setting tolerance to "+synclet.tolMax); + callback(err); + }); +} + +// simple async friendly wrapper +function getIJOD(id, key, create, callback) { + var name = path.join(lconfig.lockerDir, lconfig.me, id, key); + //console.log("Open IJOD %s", name); + if(ijods[name]) return callback(ijods[name]); + // only load if one exists or create flag is set + fs.stat(name+".db", function(err, stat){ + if(!stat && !create) return callback(); + var ij = new IJOD({name:name}) + ijods[name] = ij; + ij.open(function(err){ + if(err) logger.error(err); + return callback(ij); }); }); } +exports.getIJOD = getIJOD; + +function closeIJOD(id, key, callback) { + var name = path.join(lconfig.lockerDir, lconfig.me, id, key); + //console.log("Close IJOD %s", name); + if (ijods[name]) { + ijods[name].close(function(error) { + delete ijods[name]; + callback(); + }); + } else { + callback(); + } +} +exports.closeIJOD = closeIJOD; function processData (deleteIDs, info, synclet, key, data, callback) { // this extra (handy) log breaks the synclet tests somehow?? @@ -443,29 +477,39 @@ function processData (deleteIDs, info, synclet, key, data, callback) { else if (info.mongoId) mongoId = info.mongoId[key + 's'] || info.mongoId[key] || 'id'; else mongoId = 'id'; - datastore.addCollection(key, info.id, mongoId); - - if (deleteIDs && deleteIDs.length > 0 && data) { - addData(collection, mongoId, data, info, synclet, idr, function(err) { - if (err) return callback(err); - deleteData(collection, mongoId, deleteIDs, info, synclet, idr, callback); + getIJOD(info.id, key, true, function(ij){ + function finish(err) { + closeIJOD(info.id, key, function() { + return callback(err); }); - } else if (data && data.length > 0) { - addData(collection, mongoId, data, info, synclet, idr, callback); - } else if (deleteIDs && deleteIDs.length > 0) { - deleteData(collection, mongoId, deleteIDs, info, synclet, idr, callback); - } else { - callback(); - } + } + if (deleteIDs && deleteIDs.length > 0 && data) { + addData(collection, mongoId, data, info, synclet, idr, ij, function(err) { + if(err) { + finish(err); + } else { + deleteData(collection, mongoId, deleteIDs, info, synclet, idr, ij, finish); + } + }); + } else if (data && data.length > 0) { + addData(collection, mongoId, data, info, synclet, idr, ij, function(err) { + finish(); + }); + } else if (deleteIDs && deleteIDs.length > 0) { + deleteData(collection, mongoId, deleteIDs, info, synclet, idr, ij, finish); + } else { + finish(); + } + }); } -function deleteData (collection, mongoId, deleteIds, info, synclet, idr, callback) { +function deleteData (collection, mongoId, deleteIds, info, synclet, idr, ij, callback) { var q = async.queue(function(id, cb) { var r = url.parse(idr); r.hash = id.toString(); levents.fireEvent(url.format(r), 'delete'); synclet.deleted++; - datastore.removeObject(collection, id, {timeStamp: Date.now()}, cb); + ij.delData({id:id}, cb); }, 5); // debug stuff var oldProcess = q.process; @@ -481,56 +525,46 @@ function deleteData (collection, mongoId, deleteIds, info, synclet, idr, callbac q.drain = callback; } -function addData (collection, mongoId, data, info, synclet, idr, callback) { - var errs = []; - var q = async.queue(function(item, cb) { - var object = (item.obj) ? item : {obj: item}; - if (object.obj) { - if(object.obj[mongoId] === null || object.obj[mongoId] === undefined) { - localError(info.title + ' ' + url.format(idr), "missing primary key (" + mongoId + ") value: "+JSON.stringify(object.obj)); - errs.push({"message":"no value for primary key", "obj": object.obj}); - return cb(); - } - var r = url.parse(idr); - r.hash = object.obj[mongoId].toString(); - if (object.type === 'delete') { - levents.fireEvent(url.format(r), 'delete'); - synclet.deleted++; - datastore.removeObject(collection, object.obj[mongoId], {timeStamp: object.timestamp}, cb); - } else { - var source = r.pathname.substring(1); - var options = {timeStamp: object.timestamp}; - if(info.strip && info.strip[source]) options.strip = info.strip[source]; - datastore.addObject(collection, object.obj, options, function(err, type, doc) { - if (type === 'same') return cb(); - if (type === 'new') synclet.added++; - if (type === 'update') synclet.updated++; - levents.fireEvent(url.format(r), type, doc); - return cb(); - }); - } - } else { - cb(); - } - }, 5); - // debug stuff - var oldProcess = q.process; - q.process = function() { - var task = q.tasks[0]; - try { - oldProcess(); - } catch (err) { - console.error('ERROR: caught error while processing q on task ', task); +function addData (collection, mongoId, data, info, synclet, idr, ij, callback) { + var errs = []; + // Take out the deletes + var deletes = data.filter(function(item) { + var object = (item.obj) ? item : {obj: item}; + if (object.obj && object.type === "delete") { + return true; + } + return false; + }); + // TODO The deletes + async.forEachSeries(deletes, function(item, cb) { + var r = url.parse(idr); + r.hash = object.obj[mongoId].toString(); + levents.fireEvent(url.format(r), 'delete'); + synclet.deleted++; + ij.delData({id:object.obj[mongoId]}, cb); + }, function(err) { + // Now we'll batch process the rest as adds + var entries = data.filter(function(item) { + var object = (item.obj) ? item : {obj: item}; + if (object.obj && object.type === "delete") { + return false; } - }; - data.forEach(function(d){ q.push(d, errs.push); }); // hehe fun - q.drain = function() { - if (errs.length > 0) { - callback(errs); - } else { - callback(); - } - }; + return true; + }); + entries = entries.map(function(item) { + var object = (item.obj) ? item : {obj: item}; + return {id:object.obj[mongoId], data:object.obj}; + }); + ij.batchSmartAdd(entries, function() { + // TODO: Return some stats from batch add for added and updated + entries.forEach(function(item) { + var r = url.parse(idr); + r.hash = item.toString(); + levents.fireEvent(url.format(r), "new", item.data); + }); + callback(); + }); + }); } diff --git a/Common/node/lutil.js b/Common/node/lutil.js index 6e47930b6..d0b8019a7 100644 --- a/Common/node/lutil.js +++ b/Common/node/lutil.js @@ -230,6 +230,23 @@ exports.streamFromUrl = function(url, cbEach, cbDone) { }; +/// An async forEachSeries +/** +* The async implementation can explode the stack, this version will not. +*/ +exports.forEachSeries = function(items, cbEach, cbDone) { + function runOne(idx) { + idx = idx || 0; + if (idx >= items.length) return cbDone(); + cbEach(items[idx], function(err) { + if (err) return cbDone(err) + process.nextTick(function() { + runOne(idx + 1); + }); + }); + } + runOne(); +} /* * @sourceUrl - URL of the avatar you want to fetch * @rawfile - where you want the raw downloaded data stored diff --git a/Common/node/synclet/dataaccess.js b/Common/node/synclet/dataaccess.js deleted file mode 100644 index 71c727f9e..000000000 --- a/Common/node/synclet/dataaccess.js +++ /dev/null @@ -1,103 +0,0 @@ -var dataStore = require('../ldatastore') - , fs = require('fs') - , path = require('path') - , lconfig = require('../lconfig') - , logger = require('../logger') - , lfs = require('../lfs') - ; - -module.exports = function(app) { - // In adherence with the contact/* provider API - // Returns a list of the current set of friends or followers - app.get('/synclets/:syncletId/getCurrent/:type', function(req, res) { - dataStore.init('synclets', function() { - var type = req.params.type; - var options = {}; - if(req.query['limit']) options.limit = parseInt(req.query['limit']); - if(req.query['offset']) options.skip = parseInt(req.query['offset']); - - if(req.query['stream'] == "true") - { - res.writeHead(200, {'content-type' : 'application/jsonstream'}); - dataStore.getEachCurrent('synclets', req.params.syncletId + "_" + req.params.type, function(err, object) { - if (err) logger.error(err); // only useful here for logging really - if (!object) return res.end(); - res.write(JSON.stringify(object)+'\n'); - }, options); - }else{ - // we need to cut it off somewhere as building the objects for 10ks/100ks+ result sets in ram is too much - if(!options.limit) options.limit = 20; - if(options.limit > 1000) options.limit = 1000; - dataStore.getAllCurrent('synclets', req.params.syncletId + "_" + req.params.type, function(err, objects) { - if (err) { - res.writeHead(500, {'content-type' : 'application/json'}); - res.end('{error : ' + err + '}') - } else { - res.send(objects); - } - }, options); - } - }); - }); - - // deprecated! 2012-01-12 by jer, pretty sure nothing uses this - app.get('/synclets/:syncletId/get_profile', function(req, res) { - lfs.readObjectFromFile(path.join(lconfig.lockerDir, lconfig.me, req.params.syncletId, 'profile.json'), function(userInfo) { - res.writeHead(200, {"Content-Type":"application/json"}); - res.end(JSON.stringify(userInfo)); - }); - }); - - app.get('/synclets/:syncletId/getPhoto/:id', function(req, res) { - var id = req.param('id'); - dataStore.init('synclets', function() { - fs.readdir(path.join(lconfig.lockerDir, lconfig.me, req.params.syncletId, 'photos'), function(err, files) { - var file; - for (var i = 0; files && i < files.length; i++) { - if (files[i].match('^' + id + '\\.[a-zA-Z0-9]+')) { - file = files[i]; - break; - } - } - if (file) { - var stream = fs.createReadStream(path.join(lconfig.lockerDir, lconfig.me, req.params.syncletId, 'photos', file)); - var head = false; - stream.on('data', function(chunk) { - if(!head) { - head = true; - res.writeHead(200, {'Content-Disposition': 'attachment; filename=' + file}); - } - res.write(chunk, "binary"); - }); - stream.on('error', function() { - res.writeHead(404); - res.end(); - }); - stream.on('end', function() { - res.end(); - }); - } else { - res.writeHead(404); - res.end(); - } - }); - }); - }); - - app.get('/synclets/:syncletId/:type/id/:id', function(req, res) { - dataStore.init('synclets', function() { - dataStore.getCurrent('synclets', req.params.syncletId + "_" + req.params.type, req.params.id, function(err, doc) { - if (err) { - logger.error(err); - res.end(); - } else if (doc) { - res.writeHead(200, {'content-type' : 'application/json'}); - res.end(JSON.stringify(doc)); - } else { - res.writeHead(404); - res.end(); - } - }); - }); - }); -} diff --git a/Ops/checkCurrent.js b/Ops/checkCurrent.js new file mode 100644 index 000000000..8ae8708f0 --- /dev/null +++ b/Ops/checkCurrent.js @@ -0,0 +1,44 @@ +var request = require("request"); +var async = require("async"); +var fs = require("fs"); +var path = require("path"); + +var connectors = { + "twitter":["friends", "timeline", "tweets"], + "facebook":["home", "photos", "friends"], + "github":["repos", "users"] +}; + +var host = process.argv.length > 2 ? process.argv[2] : "localhost"; +console.log("Gathering from %s", host); + +function runIt() { + async.forEachSeries(Object.keys(connectors), function(key, cb) { + async.forEachSeries(connectors[key], function(synclet, syncletCb) { + var totalData = ""; + var fname = path.join("verification", key + "-" + synclet + ((process.argv.length > 3 && process.argv[3] == "ijod") ? "-ijod" : "") + ".json"); + console.log("Opening %s", fname); + try { + if (fs.statSync(fname)) fs.unlinkSync(fname); + } catch(E) { + } + var req = request({url:("http://" + host + ":8042/synclets/" + key + "/getCurrent/" + synclet + "?stream=true")}); + req.pipe(fs.createWriteStream(fname)); + req.on("end", function() { + console.log("Run for %s/%s done", key, synclet); + syncletCb(); + }); + }, function(syncletErr) { + console.log("Connector %s done", key); + cb(); + }); + }, function(err) { + console.log("Connectors done"); + }); +} + +fs.stat("verification", function(err, stat) { + if (err) fs.mkdirSync("verification"); + runIt(); +}); + diff --git a/Ops/mongo2ijod.js b/Ops/mongo2ijod.js new file mode 100644 index 000000000..6501fa39d --- /dev/null +++ b/Ops/mongo2ijod.js @@ -0,0 +1,161 @@ +var sys = require("util"); +var fs = require("fs"); +var lconfig = require(__dirname +"/../Common/node/lconfig"); +var IJOD = require(__dirname+"/../Common/node/ijod").IJOD; +var async = require("async"); +var spawn = require('child_process').spawn; +var lutil = require("lutil"); + +var mongodb = require("mongodb"); +var mongo; + +if (exports) { + exports.run = function(cb) { + connect(cb); + } +} + +if(require.main === module) { + lconfig.load(__dirname+"/../Config/config.json"); + bootMongo() +} else { + console.log("Running mongo2ijod"); +} + +function connect(cb){ + mongo = new mongodb.Db('locker', new mongodb.Server("127.0.0.1", 27018, {})); + mongo.open(function(err, p_client) { + if(err) return console.error(err); + mongo.collectionNames(function(err, names){ + scan(names, lconfig.lockerDir + '/' + lconfig.me, function(err){ + console.error("done"); + mongo.close(); + cb(err); + }); + }); +// mongo.collection(name, setup); + }) + +} + +function scan(names, dir, callback) { + console.error("scanning "+dir); + var files = fs.readdirSync(dir); + lutil.forEachSeries(files, function(file, cb){ + var fullPath = dir + '/' + file; + var stats = fs.statSync(fullPath); + // Skip other files + if(!stats.isDirectory()) return cb(); + fs.stat(fullPath+"/me.json",function(err,stats){ + // No me.json skip + if (err) return cb(); + if(!stats || !stats.isFile()) return cb(); + var me = JSON.parse(fs.readFileSync(fullPath+"/me.json")); + // Skip non service directories + if(!me) return cb(); + lutil.forEachSeries(names, function(nameo, cb2){ + var name = nameo.name; + var pfix = "locker.asynclets_"+me.id+"_"; + // Skip invalid synclets + if(name.indexOf(pfix) == -1) return cb2(); + var dname = name.substr(pfix.length); + console.error(name); + var ij = new IJOD({name:fullPath+"/"+dname}); + ij.open(function(err) { + if(err) { + console.error(err); + return cb2(err); + } + var id = "id"; + if(me.mongoId && me.mongoId[dname]) id = me.mongoId[dname]; + if(me.mongoId && me.mongoId[dname+"s"]) id = me.mongoId[dname+"s"]; + mongo.collection(name.substr(7), function(err, coll){ + if(err) { + console.error(err); + return cb2(err); + } + eacher(coll, id, ij, function(err) { + if (err) return cb2(err); + ij.close(cb2); + }); + }) + }); + }, cb); + }); + },callback); +} + + + +function eacher(collection, id, ij, callback) { + // Locate all the entries using find + var count = 0; + var at = Date.now(); + var cursor = collection.find(); + ij.startAddTransaction(function() { + // Setting this to true just to start + var item = true; + async.until( + function() { return item == undefined;}, + function(stepCb) { + cursor.nextObject(function(err, cur) { + item = cur; + if (!item) { + return stepCb(); + } + if (!item[id]) { + console.error("can't find "+id+" in "+JSON.stringify(item)); + return stepCb("Could not find " + id); + } + ++count; + ij.smartAdd({id:item[id], data:item}, function(addError) { + if (addError) { + console.error("Adding to ijod error: " + addError); + console.error(addError.stack); + return callback(addError); + } + stepCb(); + }); + }); + }, + function(err) { + if (err) { + return callback(err); + } + ij.commitAddTransaction(callback); + }); + }); +}; + + +function bootMongo() +{ + var mongoProcess = spawn('mongod', ['--dbpath', lconfig.lockerDir + '/' + lconfig.me + '/' + lconfig.mongo.dataDir, + '--port', lconfig.mongo.port]); + mongoProcess.stderr.on('data', function(data) { + console.error('mongod err: ' + data); + }); + + var mongoOutput = ""; + + // watch for mongo startup + var callback = function(data) { + mongoOutput += data.toString(); + console.error(mongoOutput); + if(mongoOutput.match(/ waiting for connections on port/g)) { + mongoProcess.stdout.removeListener('data', callback); + connect(function() { + mongoProcess.kill(); + }); + } + }; + mongoProcess.stdout.on('data', callback); + + process.on("uncaughtException", function(E) { + console.error(E); + mongoProcess.kill(); + process.exit(1); + }); +} + + diff --git a/Ops/webservice-push.js b/Ops/webservice-push.js index fe2ff7a80..760a9dc2d 100644 --- a/Ops/webservice-push.js +++ b/Ops/webservice-push.js @@ -1,5 +1,4 @@ var pushManager = require(__dirname + '/../Common/node/lpushmanager') - , dataStore = require(__dirname + '/../Common/node/ldatastore') , logger = require(__dirname + '/../Common/node/logger'); ; @@ -27,36 +26,25 @@ module.exports = function(locker) { }); }); - // copy pasta from the synclet code, these should be utilizing some generic stuff instead locker.get('/push/:dataset/getCurrent', function(req, res) { - dataStore.init("push", function() { - var type = req.params.type; - var options = {}; - if(req.query['limit']) options.limit = parseInt(req.query['limit']); - if(req.query['offset']) options.skip = parseInt(req.query['offset']); - - dataStore.getAllCurrent("push", "push_" + req.params.dataset, function(err, objects) { - if (err) { - res.send({error : err}, 500); - } else { - res.send(objects, 200); - } - }, options); + console.log("push current for %s", req.params.dataset); + pushManager.getIJOD(req.params.dataset, false, function(ijod) { + if(!ijod) return res.send("not found",404); + ijod.reqCurrent(req, res); }); }); locker.get('/push/:dataset/:id', function(req, res) { - dataStore.init("push", function() { - dataStore.getCurrentId("push", "push_" + req.params.dataset, req.params.id, function(err, doc) { - if (err) { - logger.error(err); - res.end(); - } else if (doc) { - res.send(doc); - } else { - res.send('', 404); - } - }); + pushManager.getIJOD(req.params.dataset, false, function(ijod) { + if(!ijod) return res.send("not found",404); + ijod.reqID(req, res); + }); + }); + + locker.get('/push/:dataset/id/:id', function(req, res) { + pushManager.getIJOD(req.params.dataset, false, function(ijod) { + if(!ijod) return res.send("not found",404); + ijod.reqID(req, res); }); }); }; diff --git a/Ops/webservice-synclets.js b/Ops/webservice-synclets.js new file mode 100644 index 000000000..f8bfda6ea --- /dev/null +++ b/Ops/webservice-synclets.js @@ -0,0 +1,111 @@ +var syncManager = require('lsyncmanager'); +var fs = require('fs'); +var path = require('path'); +var lconfig = require('lconfig'); +var logger = require('logger'); +var lfs = require('lfs'); + +module.exports = function(locker) { + // get all the information about synclets + locker.get('/synclets', function(req, res) { + res.writeHead(200, { + 'Content-Type': 'text/javascript', + "Access-Control-Allow-Origin" : "*" + }); + var synclets = JSON.parse(JSON.stringify(syncManager.synclets())); + for(var s in synclets.installed) { + delete synclets.installed[s].config; + delete synclets.installed[s].auth; + } + res.end(JSON.stringify(synclets)); + }); + + // given a bunch of json describing a synclet, make a home for it on disk and add it to our map + locker.post('/synclets/install', function(req, res) { + if (!req.body.hasOwnProperty("srcdir")) { + res.writeHead(400); + res.end("{}") + return; + } + var metaData = syncManager.install(req.body); + if (!metaData) { + res.writeHead(404); + res.end("{}"); + return; + } + res.writeHead(200, { + 'Content-Type': 'application/json' + }); + res.end(JSON.stringify(metaData)); + }); + + locker.get('/synclets/:id/run', function(req, res) { + syncManager.syncNow(req.params.id, req.query.id, false, function() { + res.send(true); + }); + }); + + // not sure /post is the right base here but needed it for easy bodyparser flag + locker.post('/post/:id/:synclet', function(req, res) { + syncManager.syncNow(req.params.id, req.params.synclet, req.body, function() { + res.send(true); + }); + }); + + // Returns a list of the current set of friends or followers + locker.get('/synclets/:syncletId/getCurrent/:type', function(req, res) { + syncManager.getIJOD(req.params.syncletId, req.params.type, false, function(ijod) { + if(!ijod) return res.send("not found",404); + ijod.reqCurrent(req, res); + }); + }); + + locker.get('/synclets/:syncletId/:type/id/:id', function(req, res) { + syncManager.getIJOD(req.params.syncletId, req.params.type, false, function(ijod) { + if(!ijod) return res.send("not found",404); + ijod.reqID(req, res); + }); + }); + + locker.get('/synclets/:syncletId/get_profile', function(req, res) { + lfs.readObjectFromFile(path.join(lconfig.lockerDir, lconfig.me, req.params.syncletId, 'profile.json'), function(userInfo) { + res.writeHead(200, {"Content-Type":"application/json"}); + res.end(JSON.stringify(userInfo)); + }); + }); + + locker.get('/synclets/:syncletId/getPhoto/:id', function(req, res) { + var id = req.param('id'); + fs.readdir(path.join(lconfig.lockerDir, lconfig.me, req.params.syncletId, 'photos'), function(err, files) { + var file; + for (var i = 0; i < files.length; i++) { + if (files[i].match('^' + id + '\\.[a-zA-Z0-9]+')) { + file = files[i]; + break; + } + } + if (file) { + var stream = fs.createReadStream(path.join(lconfig.lockerDir, lconfig.me, req.params.syncletId, 'photos', file)); + var head = false; + stream.on('data', function(chunk) { + if(!head) { + head = true; + res.writeHead(200, {'Content-Disposition': 'attachment; filename=' + file}); + } + res.write(chunk, "binary"); + }); + stream.on('error', function() { + res.writeHead(404); + res.end(); + }); + stream.on('end', function() { + res.end(); + }); + } else { + res.writeHead(404); + res.end(); + } + }); + }); +}; + diff --git a/Ops/webservice.js b/Ops/webservice.js index d39da1913..ddc19e0af 100644 --- a/Ops/webservice.js +++ b/Ops/webservice.js @@ -256,9 +256,6 @@ locker.post('/post/:id/:synclet', function(req, res) { }); }); -// all synclet getCurrent, id, etc stuff -require('synclet/dataaccess')(locker); - function proxyRequest(method, req, res, next) { var slashIndex = req.url.indexOf("/", 4); if (slashIndex < 0) slashIndex = req.url.length; @@ -532,6 +529,7 @@ locker.error(function(err, req, res, next){ res.send("Something went wrong.", 500); }); +require("./webservice-synclets")(locker); require('./webservice-push')(locker); diff --git a/lockerd.js b/lockerd.js index 08a76177c..8f6bbbdd4 100644 --- a/lockerd.js +++ b/lockerd.js @@ -32,7 +32,8 @@ var async = require('async'); var util = require('util'); var lutil = require('lutil'); var carrier = require('carrier'); -require('graceful-fs'); +// TODO: Reconsider enabling this, but realistically we should be managing this ourselves. +// require('graceful-fs'); // This lconfig stuff has to come before any other locker modules are loaded!! @@ -139,20 +140,23 @@ function finishStartup() { pushManager.init(); // ordering sensitive, as synclet manager is inert during init, servicemanager's init will call into syncletmanager + // Dear lord this massive waterfall is so scary syncManager.init(serviceManager, function() { - registry.init(serviceManager, syncManager, lconfig, lcrypto, function() { - serviceManager.init(syncManager, registry, function() { // this may trigger synclets to start! - runMigrations("postServices", function() { - // start web server (so we can all start talking) - var webservice = require(__dirname + "/Ops/webservice.js"); - webservice.startService(lconfig.lockerPort, lconfig.lockerListenIP, function(locker) { - if (lconfig.airbrakeKey) locker.initAirbrake(lconfig.airbrakeKey); - registry.app(locker); // add it's endpoints - postStartup(); - }); - }); + registry.init(serviceManager, syncManager, lconfig, lcrypto, function() { + serviceManager.init(registry, function() { // this may trigger synclets to start! + syncManager.scheduleAll(function() { + runMigrations("postServices", function() { + // start web server (so we can all start talking) + var webservice = require(__dirname + "/Ops/webservice.js"); + webservice.startService(lconfig.lockerPort, lconfig.lockerListenIP, function(locker) { + if (lconfig.airbrakeKey) locker.initAirbrake(lconfig.airbrakeKey); + registry.app(locker); // add it's endpoints + postStartup(); + }); }); + }); }); + }); }); var lockerPortNext = "1"+lconfig.lockerPort; lockerPortNext++; diff --git a/migrations/0000000000008.js b/migrations/0000000000008.js new file mode 100644 index 000000000..104c57f90 --- /dev/null +++ b/migrations/0000000000008.js @@ -0,0 +1,15 @@ +/***************************** +* Converts mongo synclet data to ijod +*/ +var request = require("request"); +var logger = require("logger"); +var path = require("path"); +var lconfig = require("lconfig"); +var mongo2ijod = require(path.join(lconfig.lockerDir, "Ops", "mongo2ijod.js")); + +module.exports.preServices = function(config, callback) { + mongo2ijod.run(function(err) { + if (err) logger.error(err); + callback(err ? false : true); + }); +}; diff --git a/package.json b/package.json index 003a8fb9e..5a80b61f1 100644 --- a/package.json +++ b/package.json @@ -40,9 +40,10 @@ "eyes": "=0.1.6", "knox": "=0.0.9", "readabilitySAX": "=0.2.2", + "compress-buffer": "=0.5.1", "mongodb": "=0.9.8-1", "winston": "=0.5.2", - "sqlite-fts": "=0.0.8", + "sqlite-fts": "=0.0.9", "socket.io": "=0.8.5", "socket.io-client": "=0.8.4", "jade": "= 0.20.0", @@ -55,9 +56,10 @@ "ini": "=1.0.1", "uglify-js": "=1.1.1", "connect-form": "0.2.1", - "imagemagick": "https://github.com/lpatters/node-imagemagick/tarball/master", + "imagemagick": "=0.1.2", "moment": "=1.3.0", "mkdirp": "0.3.0", + "murmurhash3": "", "hashish": "=0.0.4", diff --git a/tests/ijod-test-local.js b/tests/ijod-test-local.js index ac6a5f922..ce2f6d369 100644 --- a/tests/ijod-test-local.js +++ b/tests/ijod-test-local.js @@ -10,30 +10,40 @@ var vows = require("vows"); var assert = require("assert"); var fs = require('fs'); +var path = require('path'); var IJOD = require("../Common/node/ijod.js").IJOD; -var lfs = require("../Common/node/lfs.js"); var lconfig = require('../Common/node/lconfig.js'); +lconfig.load("Config/config.json"); + var myIJOD; var suite = vows.describe("IJOD Module"); -var events = [{data:{'id':42, 'name':'Thing 1'}, 'timeStamp':10}, - {data:{'id':4242, 'name':'Thing 2'}, 'timeStamp':100}, - {data:{'id':424242, 'name':'Sally'}, 'timeStamp':1000}]; +var events = [{data:{'id':42, 'name':'Thing 1'}, 'id':"10"}, + {data:{'id':4242, 'name':'Thing 2'}, 'id':"100"}, + {data:{'id':424242, 'name':'Sally'}, 'id':"1000"}]; var errs = []; +var item; +var items =0; suite.addBatch({ 'Can add records to the IJOD': { topic: function() { - myIJOD = new IJOD(lconfig.me + '/ijodtest'); var self = this; - myIJOD.addRecord(events[0].timeStamp, events[0].data, function(err) { + console.error(lconfig.me); + myIJOD = new IJOD({name:"Data/ijodtest", dir:path.join(lconfig.lockerDir, lconfig.me)}, function(err){ if(err) errs.push(err); - myIJOD.addRecord(events[1].timeStamp, events[1].data, function(err) { + myIJOD.addData(events[0], function(err) { if(err) errs.push(err); - myIJOD.addRecord(events[2].timeStamp, events[2].data, self.callback); + myIJOD.addData(events[1], function(err) { + if(err) errs.push(err); + myIJOD.addData(events[2], function(err){ + if(err) errs.push(err); + self.callback() + }); + }); }); }); }, @@ -41,5 +51,37 @@ suite.addBatch({ assert.equal(errs.length, 0); } } +}).addBatch({ + 'Can get one record': { + topic: function() { + var self = this; + errs = []; + myIJOD.getOne({id:"10"}, function(err, i) { + if(err) errs.push(err); + item = i; + self.callback(); + }); + }, + "successfully" : function(item) { + assert.equal(errs.length, 0); + assert.include(item, "Thing 1"); + } + } +}).addBatch({ + 'Can get all records': { + topic: function() { + var self = this; + errs = []; + myIJOD.getAll({limit:3}, function(err, i) { + if(err) errs.push(err); + if(!i) return self.callback(); + items++; + }); + }, + "successfully" : function(item) { + assert.equal(errs.length, 0); + assert.equal(items, 3); + } + } }); suite.export(module); diff --git a/tests/lpushmanager-test-local.js b/tests/lpushmanager-test-local.js index 82f7698e2..585b55f9c 100644 --- a/tests/lpushmanager-test-local.js +++ b/tests/lpushmanager-test-local.js @@ -15,7 +15,6 @@ var vows = require("vows") , assert = require("assert") , lconfig = require("lconfig") , fs = require('fs') - , mongo , path = require('path') , request = require('request') , events = [] @@ -36,7 +35,6 @@ dataSets[4] = {"data": [ { "obj" : {"id" : 1}, "type" : "delete" } ]}; lconfig.load("Config/config.json"); var pushManager = require(__dirname + "/../Common/node/lpushmanager.js"); -var lmongo = require(__dirname + '/../Common/node/lmongo'); var levents = require("levents"); var realFireEvent = levents.fireEvent; @@ -69,31 +67,16 @@ vows.describe("Push Manager").addBatch({ "generates events" : function() { assert.equal(eventCount, 2); assert.equal(events[1].action, 'new'); - assert.notEqual(events[0].data._id, undefined); - assert.notEqual(events[1].data._id, undefined) assert.equal(events[0].data.id, 500); assert.equal(events[1].data.id, 1); events = []; }, - "and generates mongo data" : { - topic: function() { - var self = this; - lmongo.init('push', ['push_testing'], function(theMongo, theColls) { - mongo = theMongo; - colls = theColls; - colls.push_testing.count(self.callback); - }); - }, - "successfully" : function(err, count) { - assert.equal(count, 2); - } - }, "and writes out IJOD stuff" : { topic: function() { - fs.readFile(lconfig.me + "/push/testing.json", this.callback); + fs.readFile(lconfig.me + "/push/testing.json.gz", this.callback); }, "successfully" : function(err, data) { - assert.equal(data.toString(), '{"timeStamp":1312325283581,"data":{"id":500,"someData":"BAM"}}\n{"timeStamp":1312325283582,"data":{"id":1,"someData":"datas"}}\n'); + assert.notEqual(data, undefined); } } } @@ -101,13 +84,14 @@ vows.describe("Push Manager").addBatch({ }).addBatch({ "Querying the data API returns the data" : { topic: function() { - request.get({uri : "http://localhost:8043/push/testing/getCurrent"}, this.callback) + request.get({uri : "http://localhost:8043/push/testing/getCurrent?stream=true"}, this.callback) }, "from testSync" : function(err, resp, body) { - var data = JSON.parse(body); - obj = data[0]; - assert.equal(data[0].id, 500); - assert.equal(data[0].someData, 'BAM'); + var parts = body.split("\n"); + var data = JSON.parse(parts[0]); + obj = data; + assert.equal(data.id, 500); + assert.equal(data.someData, 'BAM'); } } }).addBatch({ @@ -136,15 +120,10 @@ vows.describe("Push Manager").addBatch({ topic: function() { var self = this; events = []; - pushManager.acceptData('testing', dataSets[2], function() { - colls.push_testing.count(self.callback); - }); - //request.post({uri : "http://localhost:8043/push/testing", json: dataSets[2]}, function() { - //colls.push_testing.count(self.callback); - //}); + pushManager.acceptData('testing', dataSets[2], self.callback) }, - "it will generate a delete event and remove the row from mongo" : function(err, count) { - assert.equal(count, 1); + "it handles it" : function(err) { + assert.equal(err, null); assert.equal(events.length, 1); assert.equal(events[0].action, 'delete'); assert.equal(events[0].idr, "testing://push/#500"); @@ -168,18 +147,6 @@ vows.describe("Push Manager").addBatch({ assert.deepEqual(JSON.parse(data), {}); } } -}).addBatch({ - "rows can be deleted by posting delete commands" : { - topic: function() { - var self = this; - pushManager.acceptData('testing', dataSets[4], function() { - colls.push_testing.count(self.callback); - }); - }, - "as well" : function(err, count) { - assert.equal(count, 0); - } - } }).addBatch({ "invalid dataset names are" : { topic: function() { diff --git a/tests/lsyncmanager-test-local.js b/tests/lsyncmanager-test-local.js index 092f4e6d6..1090f8490 100644 --- a/tests/lsyncmanager-test-local.js +++ b/tests/lsyncmanager-test-local.js @@ -15,7 +15,6 @@ var vows = require("vows") , assert = require("assert") , lconfig = require("lconfig") , fs = require('fs') - , mongo , allEvents = {} , request = require('request') , _id @@ -28,6 +27,112 @@ var syncManager = require("lsyncmanager.js"); var start; vows.describe("Synclet Manager").addBatch({ + "has a map of the available synclets" : function() { + lconfig.tolerance.threshold=0; // disable + levents.fireEvent = function(idr, action, data) { + allEvents[idr] = {action:action, data:data}; + } + assert.include(syncManager, "synclets"); + assert.include(syncManager.synclets(), "available"); + assert.include(syncManager.synclets(), "installed"); + }, + "Installed services" : { + "are found" : { + topic:syncManager.findInstalled(), + "and testSynclet exists": function() { + assert.include(syncManager.synclets().installed, "testSynclet"); + assert.isTrue(syncManager.isInstalled("testSynclet")); + }, + "and has status" : { + topic: syncManager.status('testSynclet'), + "frequency is 1200s" : function(topic) { + assert.equal(topic.synclets[0].frequency, 1200); + }, + "status is waiting" : function(topic) { + assert.equal(topic.status, 'waiting'); + }, + "finishedOnce is not true" : function(topic) { + assert.equal(topic.finishedOnce, undefined); + }, + "next run is about 120 seconds from now" : function(topic) { + // when runing as part of the full suite, this test fails because it gets back a time that's been time zoned + // i have no idea what's causing that, so this test is commented until that's sorted out + // + // 'Tue Aug 02 2011 17:12:10 GMT-0700 (PDT)120' + // Wed, 03 Aug 2011 00:12:15 GMT + // ✗ next run is about 120 seconds from now + // » expected true, got false // lsyncmanager-test-local.js:47 + // console.dir(new Date()); + // assert.isTrue(topic.nextRun > new Date() + 110); + // assert.isTrue(topic.nextRun < new Date() + 130); + }, + "which will return info about what will be synced" : function(topic) { + assert.equal(topic.info, 'Syncs test data!'); + } + }, + "manifest data is properly surfaced in the providers call" : function() { + assert.equal(syncManager.providers(['contact/twitter'])[0].title, 'Twitter'); + } + } + } +}).addBatch({ + // this will all be handled by the auth manager later, but this will have to do for now + // + "Installed services have hacky auth pieces added to them" : { + topic: syncManager.synclets().available, + "facebook worked" : function(topic) { + for (var i = 0; i < topic.length; i++) { + if (topic[i].provider === 'facebook') { + assert.equal(topic[i].authurl, "https://graph.facebook.com/oauth/authorize?client_id=fb-appkey&response_type=code&redirect_uri=http://localhost:8043/auth/facebook/auth&scope=email,offline_access,read_stream,user_photos,friends_photos,publish_stream,user_photo_video_tags"); + } + } + }, + "twitter worked" : function(topic) { + for (var i = 0; i < topic.length; i++) { + if (topic[i].provider === 'twitter') { + assert.equal(topic[i].authurl, "http://localhost:8043/auth/twitter/auth"); + } + } + }, + "github worked" : function(topic) { + for (var i = 0; i < topic.length; i++) { + if (topic[i].provider === 'github') { + assert.equal(topic[i].authurl, "https://github.com/login/oauth/authorize?client_id=gh-appkey&response_type=code&redirect_uri=http://localhost:8043/auth/github/auth"); + } + } + }, + "gcontacts worked" : function(topic) { + for (var i = 0; i < topic.length; i++) { + if (topic[i].provider === 'gcontacts') { + assert.equal(topic[i].authurl, "https://accounts.google.com/o/oauth2/auth?client_id=gc-appkey&redirect_uri=http://localhost:8043/auth/gcontacts/auth&scope=https://www.google.com/m8/feeds/&response_type=code"); + } + } + }, + "foursquare worked" : function(topic) { + for (var i = 0; i < topic.length; i++) { + if (topic[i].provider === 'foursquare') { + assert.equal(topic[i].authurl, "https://foursquare.com/oauth2/authenticate?client_id=4sq-appkey&response_type=code&redirect_uri=http://localhost:8043/auth/foursquare/auth"); + } + } + }, + "flickr worked" : function(topic) { + for (var i = 0; i < topic.length; i++) { + if (topic[i].provider === 'flickr') { + assert.equal(topic[i].authurl, "http://localhost:8043/auth/flickr/auth"); + } + } + } + }, + "Installed services can be executed immediately rather than waiting for next run" : { + topic:function() { + allEvents = {}; + syncManager.syncNow("testSynclet", this.callback); + }, + "successfully" : function(err, status) { + assert.isNull(err); + } + } +}).addBatch({ "Installed services can be executed immediately rather than waiting for next run" : { topic:function() { start = Date.now() - 1; @@ -38,21 +143,55 @@ vows.describe("Synclet Manager").addBatch({ } } }).addBatch({ + "and also generates " : { + topic: function() { + var self = this; + syncManager.getIJOD("testSynclet","testSync", false, function(ijod){ self.callback(null, ijod)}); + }, + "data in the namespaced collection" : function(err, ijod) { + assert.notEqual(ijod, undefined); + } + }, "and after running writes out IJOD stuff" : { topic: function() { - fs.readFile(lconfig.me + "/testSynclet/testSync.json", this.callback); + fs.readFile(lconfig.me + "/testSynclet/testSync.json.gz", this.callback); }, "successfully" : function(err, data) { - assert.equal(data.toString(), '{"timeStamp":1312325283581,"data":{"notId":500,"someData":"BAM"}}\n{"timeStamp":1312325283582,"data":{"notId":1,"someData":"datas"}}\n'); + assert.notEqual(data, undefined); } }, "into both" : { topic: function() { - fs.readFile(lconfig.me + "/testSynclet/dataStore.json", this.callback); + fs.readFile(lconfig.me + "/testSynclet/dataStore.json.gz", this.callback); }, "files": function(err, data) { - assert.equal(data.toString(), '{"timeStamp":1312325283583,"data":{"id":5,"notId":5,"random":"data"}}\n'); + assert.notEqual(data, undefined); } + }, + "and after generating " : { + topic: allEvents, + "correct number of events" : function(topic) { + assert.notEqual(Object.keys(allEvents).length, 0); + }, + "with correct data" : function(topic) { + /* + assert.equal(events[0].fromService, 'synclet/testSynclet'); + assert.equal(events[1].fromService, 'synclet/testSynclet'); + assert.equal(events[2].fromService, 'synclet/testSynclet'); + */ + assert.equal(allEvents["testsync://testsynclet/testSync?id=testSynclet#500"].action, 'new'); + assert.notEqual(allEvents["testsync://testsynclet/testSync?id=testSynclet#500"].data, undefined); + assert.notEqual(allEvents["testsync://testsynclet/testSync?id=testSynclet#1"].data, undefined) + assert.equal(allEvents["testsync://testsynclet/testSync?id=testSynclet#500"].data.notId, 500); + assert.equal(allEvents["testsync://testsynclet/testSync?id=testSynclet#1"].data.notId, 1); + }, + "correct types of events": function(topic) { + assert.equal(allEvents["datastore://testsynclet/dataStore?id=testSynclet#5"].action, 'new'); + assert.equal(allEvents["datastore://testsynclet/dataStore?id=testSynclet#5"].data.random, 'data'); + } + }, + "and set the finishedOnce property to true" : function(err, status) { + assert.equal(syncManager.synclets().installed.testSynclet.finishedOnce, true); } }).addBatch({ "Querying the data API returns the data" : { @@ -61,7 +200,7 @@ vows.describe("Synclet Manager").addBatch({ }, "from testSync" : function(err, resp, body) { var data = JSON.parse(body); - _id = data[0]._id; + _id = data[0].notId; obj = data[0]; assert.equal(data[0].notId, 500); assert.equal(data[0].someData, 'BAM'); @@ -87,4 +226,73 @@ vows.describe("Synclet Manager").addBatch({ assert.equal(Object.keys(allEvents).length, 0); } } +}).addBatch({ + "Running testSynclet again" : { + topic: function() { + syncManager.syncNow("testSynclet", this.callback); + }, + "with no value for 'notId'" : function(arg1, arg2, arg3) { + assert.equal(arg1[0].message, 'no value for primary key'); + } + } +}).addBatch({ + "Available services" : { + "gathered from the filesystem" : { + topic:syncManager.scanDirectory("Tests"), + "found a service": function() { + assert.ok(syncManager.synclets().available.length > 0); + }, + "and can be installed" : { + topic:syncManager.install({srcdir:"Tests/testSynclet","auth" : {"consumerKey":"daKey","consumerSecret":"daPassword"}}), + "by giving a valid install instance" : function(svcMetaInfo) { + assert.include(svcMetaInfo, "synclets"); + }, + "and by service map says it is installed" : function(svcMetaInfo) { + assert.isTrue(syncManager.isInstalled(svcMetaInfo.id)); + }, + "and by creating a valid service instance directory" : function(svcMetaInfo) { + statInfo = fs.statSync(lconfig.me + "/" + svcMetaInfo.id); + }, + "and by adding valid auth info" : function(svcMetaInfo) { + assert.deepEqual(svcMetaInfo.auth, {"consumerKey":"daKey","consumerSecret":"daPassword"}); + }, + "and passes along the icon": function(svcMetaInfo) { + assert.notEqual(svcMetaInfo.icon, undefined); + } + }, + "and can be installed a second time" : { + topic:syncManager.install({srcdir:"Tests/testSynclet"}), + "by giving a valid install instance" : function(svcMetaInfo) { + assert.include(svcMetaInfo, "id"); + }, + "and by service map says it is installed" : function(svcMetaInfo) { + assert.isTrue(syncManager.isInstalled(svcMetaInfo.id)); + }, + "and by creating a valid service instance directory" : function(svcMetaInfo) { + statInfo = fs.statSync(lconfig.me + "/" + svcMetaInfo.id); + }, + "and passes along the icon": function(svcMetaInfo) { + assert.notEqual(svcMetaInfo.icon, undefined); + } + } + } + }, + "Migrates services that need it during the install" : { + topic: [], + "changing their version" : function(topic) { + assert.include(syncManager.synclets().installed, "migration-test2"); + assert.isTrue(syncManager.isInstalled("migration-test2")); + assert.notEqual(syncManager.synclets().installed['migration-test2'], undefined); + assert.notEqual(syncManager.synclets().installed['migration-test2'].version, undefined); + assert.equal(syncManager.synclets().installed['migration-test2'].version, 1308079085972); + }, + "and running the migration successfully" : function(topic) { + var me = JSON.parse(fs.readFileSync(process.cwd() + "/" + lconfig.me + "/migration-test/me.json", 'ascii')); + assert.notEqual(me.mongoCollections, undefined); + assert.equal(me.mongoCollections[0], 'new_collection'); + } + }, + teardown : function() { + levents.fireEvent = realFireEvent; + } }).export(module);