-
Notifications
You must be signed in to change notification settings - Fork 1
/
index.js
195 lines (167 loc) · 5.36 KB
/
index.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
'use strict';
var access = require('safe-access')
var pull = require('pull-stream')
var pl = require('pull-level')
var merge = require('lodash/object/merge')
var compact = require('lodash/array/compact')
var peek = require('level-peek')
var stringify = require('stable-stringify')
var tc = require('type-check').typeCheck;
var bytewise = require('bytewise');
module.exports = {
read: read,
readOne: makeReadOne(read),
makeReadOne: makeReadOne,
write: write,
writeOne: makeWriteOne(write),
makeWriteOne: makeWriteOne,
resolveIndexDocs: resolveIndexDocs,
addIndexDocs: addIndexDocs,
makeIndexDoc: makeIndexDoc,
makeRange: makeRange
}
// settings = {
// db: JS,
// index_defs: JSON,
// level_opts: JSON
// }
function esc (value) {
// Don't stringify null or undefined
if (value === null || value === undefined) {
return value
} else {
return stringify(value)
}
}
// Returns a source stream containing all the documents selected by a query
function read (settings, query) {
if(!tc('{ createIfMissing: Boolean, ... }', settings.db.options)) {
throw new Error('settings.db is not supposed to be ' + settings.db)
}
// Make a range from the query
var range = makeRange(query, settings.level_opts)
var deferred = pull.defer()
if (query.peek) {
// Use level-peek to get first or last
peek[query.peek](settings.db, range, function (err, key, value) {
if (err) { throw err }
deferred.resolve(
pull(
// If document exists, put into stream, if not, send empty stream
key ? pull.values([{ key: key, value: value }]) : pull.empty(),
resolveIndexDocs(settings.db)
)
)
})
} else {
deferred.resolve(
pull(
pl.read(settings.db, range),
resolveIndexDocs(settings.db)
)
)
}
return deferred
}
// Takes a function returning a source stream and returns a function readOne
// which reads one item from a stream and returns it with a callback syntax.
function makeReadOne (read) {
return function readOne (settings, query, callback) {
pull(
read(settings, query),
pull.collect(function (err, arr) {
callback(err, arr[0])
})
)
}
}
// Returns a sink stream writing the documents passed in as well as their
// corresponding index_defs.
function write (settings, callback) {
if(!tc('{ createIfMissing: Boolean, ... }', settings.db.options)) {
throw new Error('settings.db is not supposed to be ' + settings.db)
}
return pull(
addIndexDocs(settings.index_defs),
pl.write(settings.db, settings.level_opts, callback)
)
}
// Takes a function returning a sink stream and returns a function writeOne
// which takes a document and writes it to the stream
function makeWriteOne (write) {
return function writeOne (settings, doc, callback) {
pull(
pull.values([doc]),
write(settings, callback)
)
}
}
// Returns a through stream which takes index documents and resolves them to
// actual documents
function resolveIndexDocs (db) {
return pull.asyncMap(function (data, callback) {
db.get(data.value, function (err, value) {
callback(null, value && { key: data.value, value: value })
})
})
}
// Returns a through stream which injects index docs corresponding to each doc
// in the input stream
function addIndexDocs (index_defs) {
if (!tc('[String|[String]]', index_defs)) {
throw new Error('index_defs is not supposed to be ' + index_defs)
}
return pull(
pull.map(function (doc) {
var batch = Object.keys(index_defs).map(function (key) {
return makeIndexDoc(doc, index_defs[key])
})
doc.type = 'put'
batch.push(doc)
return batch
}),
pull.flatten()
)
}
// Returns an index document generated from doc and index_def
function makeIndexDoc (doc, index_def) {
if (!Array.isArray(index_def)) { index_def = [ index_def ] }
// Assemble index key from index definition
var index_key = index_def.reduce(function (acc, keypath) {
var index_prop = esc(access(doc.value, keypath))
acc.push(index_prop)
return acc
}, [])
var index_doc = {
key: bytewise.encode(['i', index_def.join(',')].concat(index_key).concat([doc.key])),
value: doc.key,
type: 'put'
}
return index_doc
}
// Generate a range that retreives the documents requested by the query
function makeRange (query, level_opts) {
// Avoid having to write queries with redundant array notation
if (!Array.isArray(query.k)) { query.k = [ query.k ] }
if (!Array.isArray(query.v)) { query.v = [ query.v ] }
// Gathers values in query value field, generating gte - lte
var acc = query.v.reduce(function (acc, item) {
// Avoid having to write queries with redundant array notation
if (!Array.isArray(item)) { item = [ item ] }
// Push bottom of range (first array element) into gte
acc.gte.push(esc(item[0]))
// If it is not a range, use same value for lte, if it is use top of range
acc.lte.push(esc(item.length > 1 ? item[1] : item[0]))
return acc
}, { gte: [], lte: [] })
// Eliminate null values
var lte = compact(acc.lte)
var gte = compact(acc.gte)
var range = {
gte: bytewise.encode(['i', query.k.join(',')].concat(gte).concat([null])),
lte: bytewise.encode(['i', query.k.join(',')].concat(lte).concat([undefined]))
}
if (query.reverse) { range.reverse = true }
range = merge(level_opts || {}, range)
return range
}