diff --git a/client/index.js b/client/index.js index 46ea9b0..34f1448 100644 --- a/client/index.js +++ b/client/index.js @@ -136,17 +136,17 @@ module.exports = class Client extends Emitter{ // let bulk = this.db.collection(this.name).initializeUnorderedBulkOp(); // for(let i =0;i { - // logClient.silly('Inserted jobs. Here in callback.'); - // if(err) logClient.error(err); - // self.dbTasks.pop(); - // callback(); + // logClient.silly('Inserted jobs. Here in callback.'); + // if(err) logClient.error(err); + // self.dbTasks.pop(); + // callback(); // }); }); } @@ -164,7 +164,7 @@ module.exports = class Client extends Emitter{ _toJob(task){ let opt = typeof task.opt === 'string' ? {uri:task.opt} : task.opt; - let priority = task.hasOwnProperty('priority')? task.priority : (opt.hasOwnProperty('priority')?opt.priority:DEFAULT_PRIORITY); + let priority = Object.prototype.hasOwnProperty.call(task,'priority')? task.priority : (Object.prototype.hasOwnProperty.call(opt,'priority')?opt.priority:DEFAULT_PRIORITY); if(priority !== DEFAULT_PRIORITY){ opt.priority = priority; @@ -310,7 +310,7 @@ module.exports = class Client extends Emitter{ _goOne(job){ let argv = job.opt; let fn = job.next; - let priority = this._getPriority(job.hasOwnProperty('priority')?job.priority:DEFAULT_PRIORITY); + let priority = this._getPriority(Object.prototype.hasOwnProperty.call(job,'priority')?job.priority:DEFAULT_PRIORITY); let objJob = this.gearmanClient.submitJob(this.name+'_'+fn, JSON.stringify(argv),{background: false, priority:priority}); objJob.jobVO = job; diff --git a/package.json b/package.json index 7d137c6..89b57dc 100644 --- a/package.json +++ b/package.json @@ -1,10 +1,10 @@ { "name": "floodesh", - "version": "0.8.18", + "version": "0.8.19", "description": "Floodesh is a distributed web spider/crawler written with Nodejs.", "main": "index.js", "scripts": { - "hint": "eslint lib/*.js worker/*.js client/*.js", + "hint": "eslint lib/*.js worker/*.js client/*.js tests/*.js", "test": "./node_modules/mocha/bin/mocha --timeout 10000 tests/test-*.js" }, "repository": { @@ -29,10 +29,10 @@ }, "dependencies": { "commander": "^2.9.0", - "floodesh-lib": "^1.1.1", + "floodesh-lib": "^1.1.3", "gearman-node-bda": "^0.10.0", "mongodb": "^2.1.4", - "ramda": "^0.26.1", + "ramda": "^0.27.0", "seenreq": "^2.0.0", "winston": "^2.1.1" }, diff --git a/tests/config.js b/tests/config.js index b983669..fc29146 100644 --- a/tests/config.js +++ b/tests/config.js @@ -1,16 +1,16 @@ module.exports = { - production:{ - gearman:{ - servers:[{"host":"127.0.0.1"}] + production:{ + gearman:{ + servers:[{'host':'127.0.0.1'}] + }, + retry:3, + mongodb:'mongodb://10.252.25.62:27017/gearman' }, - retry:3, - mongodb:"mongodb://10.252.25.62:27017/gearman" - }, - development:{ - gearman:{ - servers:[{"host":"192.168.98.116"}] - }, - retry:3, - mongodb:"mongodb://192.168.98.116:27017/gearman" - } -} + development:{ + gearman:{ + servers:[{'host':'192.168.98.116'}] + }, + retry:3, + mongodb:'mongodb://192.168.98.116:27017/gearman' + } +}; diff --git a/tests/test-basic.js b/tests/test-basic.js index e40fe83..38e7c0f 100644 --- a/tests/test-basic.js +++ b/tests/test-basic.js @@ -1,109 +1,104 @@ -"use strict" +'use strict'; -const path = require('path') +const path = require('path'); process.chdir(path.join(process.cwd(),'tests')); -const should = require('should') -const sinon = require('sinon') -require('should-sinon') -const App = require('./lib/client.js') -const bottleneck = require('mof-bottleneck') -const request = require('mof-request') -const cheerio = require('mof-cheerio') -const iconv = require('mof-iconv') -const co = require('co') +const should = require('should'); +const sinon = require('sinon'); +require('should-sinon'); +const App = require('./lib/client.js'); +const bottleneck = require('mof-bottleneck'); +const request = require('mof-request'); +const cheerio = require('mof-cheerio'); +const iconv = require('mof-iconv'); +const co = require('co'); describe('Test worker in floodesh', ()=>{ - const Worker = require('../worker'); - let worker; - beforeEach(() => { - - }); - - it('should load config file', ()=>{ - worker = new Worker(); - should.exist(worker.config.logBaseDir); - should.exist(worker.config.retry); - should.exist(worker.config.gearman); - should.exist(worker.config.logger); - should.exist(worker.config.mongodb); - worker.exit(); - }); - - it("should new a worker", ()=> { - worker = new Worker(); - worker.should.be.an.instanceOf(Worker); - }); - - it('should retry when time out', done=>{ - worker = new Worker(); - let getError = ()=>{ - let e = new Error(); - e.code = 'ETIMEDOUT'; - return e; - }; - - worker.use((ctx, next)=>{ - should.exist(ctx.opt); - console.log("middleware: %j", ctx.opt); - should.exist(ctx.opt.retries); - - return next(); + const Worker = require('../worker'); + let worker; + beforeEach(() => { + }); - worker.use(co.wrap(request())); - // worker.use((ctx, next)=>{ - // let e = getError(); - // throw e; - // return next(); - // }); - - worker.on('complete', ctx=> { - process.nextTick(()=>{ - should.exist(ctx); + + it('should load config file', ()=>{ + worker = new Worker(); + should.exist(worker.config.logBaseDir); + should.exist(worker.config.retry); + should.exist(worker.config.gearman); + should.exist(worker.config.logger); + should.exist(worker.config.mongodb); worker.exit(); - done(); - }); }); - - let job = {workComplete:function(){},reportException:function(){}}; - let ctx = {opt:{uri:"http://www.baidu.com"}, app:{},request:{}, response:{},resourceList:{},job:job}; - worker.jobs.add(job); - worker.emit("error",getError(),ctx); - }); - it('should emit complete even if one of middlewares do not call next', done=>{ - worker = new Worker(); - let number=[]; - worker.use((ctx, next)=>{ - number.push(2); - return next(); + it('should new a worker', ()=> { + worker = new Worker(); + worker.should.be.an.instanceOf(Worker); }); - worker.use((ctx, next)=>{ - should.exist(ctx); - should.exist(next); + it('should retry when time out', done=>{ + worker = new Worker(); + let getError = ()=>{ + let e = new Error(); + e.code = 'ETIMEDOUT'; + return e; + }; + + worker.use((ctx, next)=>{ + should.exist(ctx.opt); + console.log('middleware: %j', ctx.opt); + should.exist(ctx.opt.retries); + + return next(); + }); + worker.use(co.wrap(request())); + + worker.on('complete', ctx=> { + process.nextTick(()=>{ + should.exist(ctx); + worker.exit(); + done(); + }); + }); + + let job = {workComplete:function(){},reportError:function(){},reportWarning:function(){}}; + let ctx = {opt:{uri:'http://www.baidu.com'}, app:{},request:{}, response:{},resourceList:{},job:job}; + worker.jobs.add(job); + worker.emit('error',getError(),ctx); }); - worker.use((ctx, next)=>{ - number.push(3); - return next(); - }); - - worker.on('complete',ctx=>{ - number.should.eql([1,2]); - process.nextTick(()=>{ - should.exist(ctx); - worker.exit(); - done(); - }); - }); + it('should emit complete even if one of middlewares do not call next', done=>{ + worker = new Worker(); + let number=[]; + worker.use((ctx, next)=>{ + number.push(2); + return next(); + }); + + worker.use((ctx, next)=>{ + should.exist(ctx); + should.exist(next); + }); + + worker.use((ctx, next)=>{ + number.push(3); + return next(); + }); + + worker.on('complete',ctx=>{ + number.should.eql([1,2]); + process.nextTick(()=>{ + should.exist(ctx); + worker.exit(); + done(); + }); + }); - let ctx = worker.enqueue({uri:"http://www.baidu.com"}); - number.push(1); + let ctx = worker.enqueue({uri:'http://www.baidu.com'}); + number.push(1); - ctx.func='home'; - ctx.job = {workComplete:function(){}}; - worker.jobs.add(ctx.job); - }); + ctx.func='home'; + ctx.job = {workComplete:function(){}}; + worker.jobs.add(ctx.job); + }); }); diff --git a/tests/test-client.js b/tests/test-client.js index 0840517..5951dc8 100644 --- a/tests/test-client.js +++ b/tests/test-client.js @@ -1,94 +1,94 @@ -"use strict" +'use strict'; -const should = require('should') -const sinon = require('sinon') -require('should-sinon') -const App = require('./lib/client.js') -const Status = require('./lib/status.js') +const should = require('should'); +const sinon = require('sinon'); +require('should-sinon'); +const App = require('./lib/client.js'); +const Status = require('./lib/status.js'); describe('Test client in floodesh', ()=>{ - let app; - beforeEach(() => { - app = new App(); - }); + let app; + beforeEach(() => { + app = new App(); + }); - it('should load config file', ()=>{ - should.exist(app.config.logBaseDir); - should.exist(app.config.gearman); - should.exist(app.config.logger); - should.exist(app.config.mongodb); - }); + it('should load config file', ()=>{ + should.exist(app.config.logBaseDir); + should.exist(app.config.gearman); + should.exist(app.config.logger); + should.exist(app.config.mongodb); + }); - it('should use seed in client',()=>{ - app._init = sinon.spy(); - app.start(); - app._init.should.be.calledOnce(); - app.seed.length.should.be.equal(2); - }); - - it('should load seed file if no specified seed ',()=>{ - delete app.seed; - app._init = sinon.spy(); - app.start(); - app._init.should.be.calledOnce(); - app.seed.length.should.be.equal(20); - }); + it('should use seed in client',()=>{ + app._init = sinon.spy(); + app.start(); + app._init.should.be.calledOnce(); + app.seed.length.should.be.equal(2); + }); + + it('should load seed file if no specified seed ',()=>{ + delete app.seed; + app._init = sinon.spy(); + app.start(); + app._init.should.be.calledOnce(); + app.seed.length.should.be.equal(20); + }); - it('should fire up init event ',(done)=>{ - app.seed=[]; - app.start(); - app.onInit = sinon.spy(); - app.on('init',()=>{ - app.onInit.should.be.calledOnce(); - done(); + it('should fire up init event ',(done)=>{ + app.seed=[]; + app.start(); + app.onInit = sinon.spy(); + app.on('init',()=>{ + app.onInit.should.be.calledOnce(); + done(); + }); }); - }); - it('should fire up init event ',(done)=>{ - app.seed=[]; - app.start(); - app.onInit = sinon.spy(); - app.on('init',()=>{ - app.onInit.should.be.calledOnce(); - done(); + it('should fire up init event ',(done)=>{ + app.seed=[]; + app.start(); + app.onInit = sinon.spy(); + app.on('init',()=>{ + app.onInit.should.be.calledOnce(); + done(); + }); }); - }); - - it('should retry if task failed ',(done)=>{ - app.start(); - app.removeAllListeners("ready"); - let job = app._toJob({opt:'http://www.baidu.com/?q=1', next:'home'}); - job.status = Status.failed; - job.fetchCount = 1; - app.on('ready',() => { - app.db.collection(app.name).insert(job, ()=>{ - app._dequeue((err, result) => { - should.not.exists(err); - result.value.opt.uri.should.eql('http://www.baidu.com/?q=1'); - done(); - }); - }); - }); - }); + it('should retry if task failed ',(done)=>{ + app.start(); + app.removeAllListeners('ready'); + let job = app._toJob({opt:'http://www.baidu.com/?q=1', next:'home'}); + job.status = Status.failed; + job.fetchCount = 1; + + app.on('ready',() => { + app.db.collection(app.name).insert(job, ()=>{ + app._dequeue((err, result) => { + should.not.exists(err); + result.value.opt.uri.should.eql('http://www.baidu.com/?q=1'); + done(); + }); + }); + }); + }); - it('should not dequeue if fetchCount is larger than1 and status is failed ',(done)=>{ - app.start(); - app.removeAllListeners("ready"); - let job = app._toJob({opt:'http://www.baidu.com/?q=1', next:'home'}); - job.status = Status.failed; - job.fetchCount = 2; + it('should not dequeue if fetchCount is larger than1 and status is failed ',(done)=>{ + app.start(); + app.removeAllListeners('ready'); + let job = app._toJob({opt:'http://www.baidu.com/?q=1', next:'home'}); + job.status = Status.failed; + job.fetchCount = 2; + + app.on('ready',() => { + app.db.collection(app.name).insert(job, ()=>{ + app._dequeue((err, result) => { + should.not.exists(err); + should.not.exist(result.value); + done(); + }); + }); + }); + }); - app.on('ready',() => { - app.db.collection(app.name).insert(job, ()=>{ - app._dequeue((err, result) => { - should.not.exists(err); - should.not.exist(result.value); - done(); - }); - }); - }); - }); - }); diff --git a/tests/test-worker.js b/tests/test-worker.js index fd7ab81..129f899 100644 --- a/tests/test-worker.js +++ b/tests/test-worker.js @@ -1,128 +1,128 @@ -"use strict" +'use strict'; -const should = require('should') -const sinon = require('sinon') -require('should-sinon') -const Worker = require('../worker') -const Core = require('floodesh-lib') -const events = require('events') +const should = require('should'); +const sinon = require('sinon'); +require('should-sinon'); +const Worker = require('../worker'); +const Core = require('floodesh-lib'); +const events = require('events'); describe('Worker',function(){ - let w; - beforeEach( () => { - w = new Worker(); - }); - - describe('#constructor', () => { - it('should return default instance of Worker and Core', () => { - w.should.be.an.instanceOf(Worker); - w.should.be.an.instanceOf(Core); - }); - - it('should call _init in constructor()', () =>{ - should.exist(w._w); - }); - }); - - describe('#_init', () =>{ + let w; beforeEach( () => { - Worker.prototype._hookSIG= sinon.spy(); - w = new Worker(); - }); - - it('should listen on customized functions', () =>{ - should.exists(w._w.functions.testapp_home); + w = new Worker(); }); - it('should call _hookSIG() once', ()=>{ - w._hookSIG.should.be.calledOnce(); - }); - - it("should subscribe events of `Core`",()=>{ - events.EventEmitter.listenerCount(w,"complete").should.equal(1); - events.EventEmitter.listenerCount(w,"error").should.equal(1); - events.EventEmitter.listenerCount(w,"exit").should.equal(1); + describe('#constructor', () => { + it('should return default instance of Worker and Core', () => { + w.should.be.an.instanceOf(Worker); + w.should.be.an.instanceOf(Core); + }); + + it('should call _init in constructor()', () =>{ + should.exist(w._w); + }); }); - }); + + describe('#_init', () =>{ + beforeEach( () => { + Worker.prototype._hookSIG= sinon.spy(); + w = new Worker(); + }); - describe('#_onJob', () => { - it('should return a function',()=>{ - let func = w._onJob("abc"); - func.should.be.an.instanceof(Function); - }) - }); + it('should listen on customized functions', () =>{ + should.exists(w._w.functions.testapp_home); + }); + + it('should call _hookSIG() once', ()=>{ + w._hookSIG.should.be.calledOnce(); + }); - describe('#_back',()=>{ - let ctx = {job:{}}; - beforeEach(()=>{ - ctx.job.sendWorkData = sinon.spy(); - ctx.job.workComplete = sinon.spy(); - ctx.dataSet = new Map(); - ctx.tasks = [{opt:{uri:"http://www.baidu.com/"},fn:"ffn"}]; - }); - - it("shouldn't call sendWorkData if ctx has empty dataSet",()=>{ - w._back(ctx); - ctx.job.sendWorkData.should.have.callCount(0); - }); - - it('should call sendWorkData if ctx has dataSet', ()=>{ - ctx.dataSet.set('data',"ABCDEFGHIJKLMNOPQRSTUVWXYZ"); - w._back(ctx); - ctx.job.sendWorkData.should.be.calledOnce(); - let argv = ctx.job.sendWorkData.getCall(0).args[0]; - JSON.parse(argv)[0][0].should.equal('data'); + it('should subscribe events of `Core`',()=>{ + events.EventEmitter.listenerCount(w,'complete').should.equal(1); + events.EventEmitter.listenerCount(w,'error').should.equal(1); + events.EventEmitter.listenerCount(w,'exit').should.equal(1); + }); }); - it('should call workComplete',()=>{ - w._back(ctx); - ctx.job.workComplete.should.be.calledOnce(); + describe('#_onJob', () => { + it('should return a function',()=>{ + let func = w._onJob('abc'); + func.should.be.an.instanceof(Function); + }); }); - }); - describe('#_parseJobArgv',()=>{ - let ctx = {job:{}}; - it('should parse payload in job',()=>{ - let payload = {uri:"http://www.baidu",method:"GET"}; - let buf = new Buffer(JSON.stringify(payload)); - ctx.job = {payload:buf}; - let rt = w._parseJobArgv(ctx.job); - - should.exists(rt); - rt.method.should.equal(payload.method); - rt.uri.should.equal(payload.uri); + describe('#_back',()=>{ + let ctx = {job:{}}; + beforeEach(()=>{ + ctx.job.sendWorkData = sinon.spy(); + ctx.job.workComplete = sinon.spy(); + ctx.dataSet = new Map(); + ctx.tasks = [{opt:{uri:'http://www.baidu.com/'},fn:'ffn'}]; + }); + + it('shouldn\'t call sendWorkData if ctx has empty dataSet',()=>{ + w._back(ctx); + ctx.job.sendWorkData.should.have.callCount(0); + }); + + it('should call sendWorkData if ctx has dataSet', ()=>{ + ctx.dataSet.set('data','ABCDEFGHIJKLMNOPQRSTUVWXYZ'); + w._back(ctx); + ctx.job.sendWorkData.should.be.calledOnce(); + let argv = ctx.job.sendWorkData.getCall(0).args[0]; + JSON.parse(argv)[0][0].should.equal('data'); + }); + + it('should call workComplete',()=>{ + w._back(ctx); + ctx.job.workComplete.should.be.calledOnce(); + }); }); - }); - - describe('#_onError',()=>{ - let ctx = {resourceList:{}}; - it('should send error to server',()=>{ - ctx.job = {}; - ctx.job.reportError = sinon.spy(); - ctx.job.reportWarning = sinon.spy(); - w._finally = sinon.spy(); - let e = new Error("this is an error for testing"); - w._onError(e,ctx); - ctx.job.reportWarning.should.be.calledOnce(); - ctx.job.reportError.should.be.calledOnce(); - w._finally.should.be.calledOnce(); - }) - }); - describe('#_hookSIG', ()=>{ - let ctx = {}; - beforeEach(()=>{ - ctx.job = {resetAbilities:function(){},close:function(){}}; + describe('#_parseJobArgv',()=>{ + let ctx = {job:{}}; + it('should parse payload in job',()=>{ + let payload = {uri:'http://www.baidu',method:'GET'}; + let buf = new Buffer(JSON.stringify(payload)); + ctx.job = {payload:buf}; + let rt = w._parseJobArgv(ctx.job); + + should.exists(rt); + rt.method.should.equal(payload.method); + rt.uri.should.equal(payload.uri); + }); }); - it('should listen on process SIGINT signal',()=>{ - events.EventEmitter.listenerCount(process,'SIGINT').should.equal(8); + describe('#_onError',()=>{ + let ctx = {resourceList:{}}; + it('should send error to server',()=>{ + ctx.job = {}; + ctx.job.reportError = sinon.spy(); + ctx.job.reportWarning = sinon.spy(); + w._finally = sinon.spy(); + let e = new Error('this is an error for testing'); + w._onError(e,ctx); + ctx.job.reportWarning.should.be.calledOnce(); + ctx.job.reportError.should.be.calledOnce(); + w._finally.should.be.calledOnce(); + }); }); - - it('should set readyToExit to true when receive INT signal',()=>{ - w.exit(); - w.readyToExit.should.be.true(); + + describe('#_hookSIG', ()=>{ + let ctx = {}; + beforeEach(()=>{ + ctx.job = {resetAbilities:function(){},close:function(){}}; + }); + + it('should listen on process SIGINT signal',()=>{ + events.EventEmitter.listenerCount(process,'SIGINT').should.equal(8); + }); + + it('should set readyToExit to true when receive INT signal',()=>{ + w.exit(); + w.readyToExit.should.be.true(); + }); }); - }); }); diff --git a/worker/index.js b/worker/index.js index db9d862..42719fd 100644 --- a/worker/index.js +++ b/worker/index.js @@ -196,13 +196,19 @@ module.exports = class Worker extends Core { /* default erorr handler for `Worker` * should send message to server to end current job. * + * log content: if e is an instance of `Error`, the first line is formatted as ${className}: ${message}, and is followed by a series of stack frames. otherwise just e itself which implicitly call e.toString() */ _onError(e, ctx){ if(e instanceof Error){ - this.logger.error(e.stack,ctx.opt); + this.logger.error(e.stack, ctx.opt); }else{ this.logger.error(e, ctx.opt); } + + if(!e){ + this.logger.warning(`expect an error object or string message, got ${typeof e}`); + e = '';//to avoid exception when read property `code` from null or undefined. + } // to release resources that is occupied, for bottleneck, database, tcp connection etc. Object.keys(ctx.resourceList).forEach(resource=>{