-
Notifications
You must be signed in to change notification settings - Fork 1
/
ioFogClient.js
579 lines (544 loc) · 15.6 KB
/
ioFogClient.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
/*
* *******************************************************************************
* Copyright (c) 2018 Edgeworx, Inc.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v. 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
* *******************************************************************************
*/
'use strict'
/*
* Eclipse ioFog: Node.js SDK
*
* ioFogClient lib that mimics all requests to ioFog's Local API
*/
const exec = require('child_process').exec
const request = require('request')
const WebSocket = require('ws')
exports.ioMessageUtil = require('./lib/ioMessageUtil')
exports.byteUtils = require('./lib/byteUtils')
const OPCODE_PING = 0x9
const OPCODE_PONG = 0xA
const OPCODE_ACK = 0xB
const OPCODE_CONTROL_SIGNAL = 0xC
const OPCODE_MSG = 0xD
const OPCODE_RECEIPT = 0xE
let ELEMENT_ID = 'NOT_DEFINED' // publisher's ID
let SSL = false
let host = 'iofog'
let port = 54321
let wsConnectMessageTimeoutAttempts = 0
let wsConnectControlTimeoutAttempts = 0
const wsConnectAttemptsLimit = 5
const wsConnectTimeout = 1000
let wsMessage
let wsControl
require('console-stamp')(
console,
{
colors: {
stamp: 'yellow',
label: 'white',
metadata: 'green'
}
}
)
/**
* Sets custom host and port for connection (if no argument is specified will use the default values).
*
* @param <String> host - host' string name
* @param <Number> port - port's number
* @param <String> containerId - container's ID
* @param <Function> mainCb - main function to perform when all set up and checks are done
*/
exports.init = function (pHost, pPort, containerId, mainCb) {
const options = processArgs(process.argv)
if (options['--id']) {
ELEMENT_ID = options['--id']
}
if (process.env.SELFNAME) {
ELEMENT_ID = process.env.SELFNAME
}
if (process.env.SSL) {
SSL = true
}
if (!(!pHost || !pHost.trim())) {
host = pHost
}
if (!(!pPort || pPort <= 0)) {
port = pPort
}
if (!(!containerId || !containerId.trim())) {
ELEMENT_ID = containerId
}
exec('ping -c 3 ' + host, function checkHost (error, stdout, stderr) {
if (stderr || error) {
if (stderr) {
console.log('STERR :\n', stderr)
}
if (error) {
console.log('ERROR :\n', error)
}
console.warn('Host: \'' + host + '\' is not reachable. Changing to \'127.0.0.1\'')
host = '127.0.0.1'
}
mainCb()
})
}
/**
* Utility function to create ioMessage object
*
* @param <Object> opts - object to initialize ioMessage
* <String> tag
* <String> groupid
* <Integer> sequencenumber
* <Integer> sequencetotal
* <Byte> priority
* <String> authid
* <String> authgroup
* <Long> chainposition
* <String> hash
* <String> previoushash
* <String> nonce
* <Integer> difficultytarget
* <String> infotype
* <String> infoformat
* <Buffer> contextdata
* <Buffer> contentdata
*
* @returns <Object> ioMessage
*/
exports.ioMessage = function (opts) {
if (opts) {
opts.publisherId = ELEMENT_ID
} else {
opts = { publisherId: ELEMENT_ID }
}
return module.exports.ioMessageUtil.ioMessage(opts)
}
/**
* Posts new ioMessage to ioFog via Local API REST call
*
* @param <Object> ioMsg - ioMessage object to send
* @param <Object> cb - object with callback functions (onError, onBadRequest, onMessageReceipt)
*/
exports.sendNewMessage = function (ioMsg, cb) {
ioMsg.publisher = ELEMENT_ID
makeHttpRequest(
cb,
'/v2/messages/new',
module.exports.ioMessageUtil.toJSON(ioMsg, true),
function postNewMsg (body) {
if (body.id && body.timestamp) {
cb.onMessageReceipt(body.id, body.timestamp)
}
}
)
}
/**
* Gets all unread messages for container via Local API REST call
*
* @param <Object> cb - object with callback functions (onError, onBadRequest, onMessages)
*/
exports.getNextMessages = function (cb) {
makeHttpRequest(
cb,
'/v2/messages/next',
{
id: ELEMENT_ID
},
function getNextMsgs (body) {
if (body.messages) {
cb.onMessages(module.exports.ioMessageUtil.parseMessages(body.messages))
}
}
)
}
/**
* Gets all messages from specified publishers within time-frame (only publishers that the container is allowed to access)
*
* @param <Date> startdate - start date (timestamp) of a time-frame
* @param <Date> enddate - end date (timestamp) of a time-frame
* @param <Array> publishers - array of publishers to get messages
* @param <Object> cb - object with callback functions (onError, onBadRequest, onMessagesQuery)
*/
exports.getMessagesByQuery = function (startdate, enddate, publishers, cb) {
if (Array.isArray(publishers)) {
makeHttpRequest(
cb,
'/v2/messages/query',
{
id: ELEMENT_ID,
timeframestart: startdate,
timeframeend: enddate,
publishers: publishers
},
function getQueryMsgs (body) {
if (body.messages) {
cb.onMessagesQuery(body.timeframestart, body.timeframeend, module.exports.ioMessageUtil.parseMessages(body.messages))
}
}
)
} else {
console.error('getMessagesByQuery: Publishers input is not array!')
}
}
/**
* Gets new configurations for the container
*
* @param <Object> cb - object with callback functions (onError, onBadRequest, onNewConfig)
*/
exports.getConfig = function (cb) {
makeHttpRequest(
cb,
'/v2/config/get',
{
id: ELEMENT_ID
},
function getNewConfig (body) {
if (body.config) {
let configJSON = {}
try {
configJSON = JSON.parse(body.config)
} catch (error) {
console.error('There was an error parsing config to JSON: ', error)
}
cb.onNewConfig(configJSON)
}
}
)
}
/**
* Opens WebSocket Control connection to ioFog
*
* @param <Object> cb - object with callback functions (onError, onNewConfigSignal)
*/
exports.wsControlConnection = function (cb) {
openWSConnection(
cb,
'/v2/control/socket/id/',
function wsHandleControlData (data, flags) {
if (module.exports.byteUtils.isBinary(data) && data.length > 0) {
const opcode = data[0]
if (opcode === OPCODE_CONTROL_SIGNAL) {
cb.onNewConfigSignal()
sendAck(wsControl)
}
}
}
)
}
/**
* Opens WebSocket Message connection to ioFog
*
* @param <Function> onOpenSocketCb - function that will be triggered when connection is opened (call wsSendMessage in this function)
* @param <Object> cb - object with callback functions (onError, onMessages, onMessageReceipt)
*/
exports.wsMessageConnection = function (onOpenSocketCb, cb) {
openWSConnection(
cb,
'/v2/message/socket/id/',
function wsHandleMessageData (data, flags) {
if (module.exports.byteUtils.isBinary(data) && data.length) {
const opcode = data[0]; let pos
if (opcode === OPCODE_MSG) {
pos = 1
const msgLength = data.readUIntBE(pos, 4)
pos += 4
const bytes = data.slice(pos, msgLength + pos)
const msg = module.exports.ioMessageUtil.ioMessageFromBuffer(bytes)
cb.onMessages([msg])
sendAck(wsMessage)
} else if (opcode === OPCODE_RECEIPT) {
let size = data[1]
pos = 3
let messageId = ''
if (size) {
messageId = data.slice(pos, pos + size).toString('utf-8')
pos += size
}
size = data[2]
let timestamp = 0
if (size) {
timestamp = data.readUIntBE(pos, size)
}
cb.onMessageReceipt(messageId, timestamp)
sendAck(wsMessage)
}
}
},
onOpenSocketCb
)
}
/**
* Closes WebSocket Control connection if it's opened.
*
* @param <Function> cb - Function called after ws closed successfully
*/
exports.wsCloseControlConnection = function (cb) {
if (wsControl) {
wsControl.on('close', cb)
wsControl.close(1000)
}
setGlobalWS('/v2/control/socket/id/', null)
}
/**
* Closes WebSocket Message connection if it's opened.
*
* @param <Function> cb - Function called after ws closed successfully
*/
exports.wsCloseMessageConnection = function (cb) {
if (wsMessage) {
wsMessage.on('close', cb)
wsMessage.close(1000)
}
setGlobalWS('/v2/message/socket/id/', null)
}
/**
* Sends ioMessage to ioFog via WebSocket Message connection if it's opened.
*
* @param <Object> ioMsg - ioMessage object to send
*/
exports.wsSendMessage = function (ioMsg) {
if (!wsMessage || wsMessage.readyState !== WebSocket.OPEN) {
console.error('wsSendMessage: socket is not open.')
return
}
ioMsg.publisher = ELEMENT_ID
const msgBuffer = module.exports.ioMessageUtil.ioMsgBuffer(ioMsg)
const opCodeBuffer = Buffer.from([OPCODE_MSG])
const lengthBuffer = Buffer.from(module.exports.byteUtils.intToBytes(msgBuffer.length))
const resultBuffer = Buffer.concat([opCodeBuffer, lengthBuffer, msgBuffer], opCodeBuffer.length + lengthBuffer.length + msgBuffer.length)
wsMessage.send(resultBuffer, { binary: true, mask: true })
}
/**
* Utility function sends ACKNOWLEDGE response to ioFog
**/
function sendAck (ws) {
const buffer = Buffer.alloc(1)
buffer[0] = OPCODE_ACK
if (ws && ws.readyState === WebSocket.OPEN) {
ws.send(buffer, { binary: true, mask: true })
} else {
console.warn('Unable to send ACKNOWLEDGE: WS connection isn\'t open. ')
}
}
/**
* Not used - Utility function sends PING to ioFog
**/
// function sendPing (ws) {
// const buffer = Buffer.alloc(1)
// buffer[0] = OPCODE_PING
// if (ws && ws.readyState === WebSocket.OPEN) {
// ws.ping(buffer, true)
// } else {
// console.warn('Unable to send PING: WS connection isn\'t open. ')
// }
// }
/**
* Utility function sends PONG to ioFog
**/
function sendPong (ws) {
const buffer = Buffer.alloc(1)
buffer[0] = OPCODE_PONG
if (ws && ws.readyState === WebSocket.OPEN) {
ws.pong(buffer, true)
} else {
console.warn('Unable to send PONG: WS connection isn\'t open. ')
}
}
/**
* Utility function returns HTTP/HTTPS protocol for url based on settings.
*
* @returns <String> - HTTP/HTTPS protocol for url
*/
function getHttpProtocol () {
if (SSL) {
return 'https'
} else {
return 'http'
}
}
/**
* Utility function returns WS/WSS protocol for url based on settings.
*
* @returns <String> - WS/WSS protocol for url
*/
function getWSProtocol () {
if (SSL) {
return 'wss'
} else {
return 'ws'
}
}
/**
* Utility function to build url based protocol, relative url and settings.
*
* @param <String> protocol - HTTP or WS
* @param <String> url - relative path for URL
* @returns <String> - endpoint URL
*/
exports.getURL = function (protocol, url) {
return protocol + '://' + host + ':' + port + url
}
/**
* Utility function that makes HTTP/HTTPS post request to endpoint URL.
* Sends specified JSON.
*
* @param <Object> listenerCb - <Object> that contains listener callbacks (onError, onBadRequest)
* @param <String> relativeUrl - relative URL
* @param <Object> json - JSON <Object> to send
* @param <Function> onResponseCb - callback to process response body
*/
function makeHttpRequest (listenerCb, relativeUrl, json, onResponseCb) {
const endpoint = exports.getURL(getHttpProtocol(), relativeUrl)
request.post(
{
url: endpoint,
headers: {
'Content-Type': 'application/json'
},
json: json
},
function handleHttpResponse (err, resp, body) {
if (err) {
return listenerCb.onError(err)
}
if (resp && resp.statusCode === 400) {
return listenerCb.onBadRequest(body)
}
onResponseCb(body)
}
)
}
/**
* Utility function that opens WS/WSS connection to specified URL.
*
* @param <Object> listenerCb - <Object> that contains listener callback (onError)
* @param <String> relativeUrl - relative URL
* @param <Function> onDataCb - callback function that will be triggered when message is received from ioFog
* @param <Function> onOpenSocketCb - function that will be triggered when connection is opened (call wsSendMessage in this function)
*/
function openWSConnection (listenerCb, relativeUrl, onDataCb, onOpenSocketCb) {
const endpoint = exports.getURL(getWSProtocol(), relativeUrl + ELEMENT_ID)
// let pingFlag
const ws = new WebSocket(
endpoint,
{
protocolVersion: 13
}
)
ws.on(
'message',
onDataCb
)
ws.on(
'error',
function handleWsError (error) {
listenerCb.onError(error)
if (error && error.code === 'ECONNREFUSED') {
wsReconnect(relativeUrl, ws, listenerCb, onDataCb, onOpenSocketCb)
}
}
)
ws.on(
'ping',
function wsPing (data, flags) {
if (module.exports.byteUtils.isBinary(data) && data.length === 1 && data[0] === OPCODE_PING) {
sendPong(ws)
}
}
)
ws.on(
'pong',
function wsPong (data, flags) {
if (module.exports.byteUtils.isBinary(data) && data.length === 1 && data[0] === OPCODE_PONG) {
// pingFlag = null
}
}
)
ws.on('close', function wsClose (code, message) {
// code : 1006 - ioFog crashed, 1000 - CloseWebFrame from ioFog,
if (code === 1000) { return }
wsReconnect(relativeUrl, ws, listenerCb, onDataCb, onOpenSocketCb)
})
ws.on(
'open',
function wsOnOpen () {
switch (relativeUrl) {
case '/v2/control/socket/id/':
wsConnectControlTimeoutAttempts = 0
break
case '/v2/message/socket/id/':
wsConnectMessageTimeoutAttempts = 0
break
default:
console.warn('No global socket defined.')
}
if (onOpenSocketCb) {
onOpenSocketCb(module.exports)
}
}
)
setGlobalWS(relativeUrl, ws)
}
function setGlobalWS (relativeUrl, ws) {
switch (relativeUrl) {
case '/v2/control/socket/id/':
wsControl = ws
break
case '/v2/message/socket/id/':
wsMessage = ws
break
default:
console.warn('No global socket defined.')
}
}
/**
* Utility function to process start options
*
* @param args - array of start options
*/
function processArgs (args) {
args.shift()
args.shift()
const options = {}
args.forEach(function handleForEach (arg) {
if (arg.indexOf('=')) {
const pieces = arg.split('=')
options[pieces[0]] = pieces[1]
}
})
return options
}
/**
* Utility function to reconnect to ioFog via WebSocket.
*
* @param relativeUrl - array of start options
* @param <WebSocket> ws - webSocket that needs to be destroyed
* @param <Object> listenerCb - <Object> that contains listener callback (onError)
* @param <Function> onDataCb - callback function that will be triggered when message is received from ioFog
* @param <Function> onOpenSocketCb - function that will be triggered when connection is opened (call wsSendMessage in this function)
*/
function wsReconnect (relativeUrl, ws, listenerCb, onDataCb, onOpenSocketCb) {
console.info('Reconnecting to ioFog via socket.')
let timeout = 0
if (wsConnectControlTimeoutAttempts < wsConnectAttemptsLimit && relativeUrl === '/v2/control/socket/id/') {
timeout = wsConnectTimeout * Math.pow(2, wsConnectControlTimeoutAttempts)
wsConnectControlTimeoutAttempts++
} else if (wsConnectMessageTimeoutAttempts < wsConnectAttemptsLimit && relativeUrl === '/v2/message/socket/id/') {
timeout = wsConnectTimeout * Math.pow(2, wsConnectMessageTimeoutAttempts)
wsConnectMessageTimeoutAttempts++
} else {
timeout = wsConnectTimeout * Math.pow(2, wsConnectAttemptsLimit - 1)
}
ws = null
setGlobalWS(relativeUrl, ws)
setTimeout(
function wsReconnect () {
openWSConnection(listenerCb, relativeUrl, onDataCb, onOpenSocketCb)
}, timeout)
}