-
Notifications
You must be signed in to change notification settings - Fork 0
/
rvn.js
82 lines (82 loc) · 2.74 KB
/
rvn.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
// Input: ZMQ
const zmq = require("zeromq")
const mingo = require("mingo")
const rvndb_code = require("rvndb-code")
const jq = require("bigjq")
const defaults = { host: "127.0.0.1", port: 28769 }
const init = function(config) {
let sock = zmq.socket("sub")
let host = (config.host ? config.host : defaults.host)
let port = (config.port ? config.port : defaults.port)
let connections = config.connections
sock.connect("tcp://" + host + ":" + port)
sock.subscribe("mempool")
sock.subscribe("block")
sock.on("message", async function(topic, message) {
let type = topic.toString()
let o = message.toString()
switch (type) {
case "mempool": {
let tx = JSON.parse(o)
Object.keys(connections.pool).forEach(async function(key) {
let connection = connections.pool[key]
const encoded = rvndb_code.encode(connection.query)
const types = encoded.q.db
if (!types || types.indexOf("u") >= 0) {
let filter = new mingo.Query(encoded.q.find)
if (filter.test(tx)) {
let decoded = rvndb_code.decode(tx)
let result
try {
if (encoded.r && encoded.r.f) {
result = await jq.run(encoded.r.f, [decoded])
} else {
result = [decoded]
}
} catch (e) {
console.log("Error", e)
}
connection.res.sseSend({ type: "mempool", data: result })
}
}
})
break
}
case "block": {
let block = JSON.parse(o)
Object.keys(connections.pool).forEach(async function(key) {
let connection = connections.pool[key]
const encoded = rvndb_code.encode(connection.query)
const types = encoded.q.db
if (!types || types.indexOf("c") >= 0) {
let filter = new mingo.Query(encoded.q.find)
let filtered = block.txs.filter(function(tx) {
return filter.test(tx)
})
let transformed = []
for(let i=0; i<filtered.length; i++) {
let tx = filtered[i]
let decoded = rvndb_code.decode(tx)
let result
try {
if (encoded.r && encoded.r.f) {
result = await jq.run(encoded.r.f, [decoded])
} else {
result = decoded
}
transformed.push(result)
} catch (e) {
console.log("Error", e)
}
}
connection.res.sseSend({
type: "block", index: block.i, data: transformed
})
}
})
break
}
}
})
}
module.exports = { init: init }