-
Notifications
You must be signed in to change notification settings - Fork 1
/
Column.h
480 lines (438 loc) · 13.3 KB
/
Column.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
/*
* Column.h
*
* Created on: Sep 23, 2016
* Author: duclv
*/
#include <vector>
#ifndef COLUMN_H_
#define COLUMN_H_
#include <math.h>
#include <iostream>
#include <string>
#include <vector>
#include <map>
#include <stdexcept>
#include <algorithm>
#include <cstdint>
#include "ColumnBase.h"
#include "Dictionary.h"
#include "PackedArray.h"
namespace std {
template<typename T>
class Column : public ColumnBase {
private:
// value vector for column
vector<size_t>* vecValue;
// bit packing array
PackedArray* packed;
// dictionary vector for column
Dictionary<T>* dictionary;
// bulk insert ?
bool bulkInsert = false;
// DATA SPACE
struct data_column {
bool versionFlag;
size_t encodedValueIdx; // index to encodedValue in vecValue vector
uint64_t csn;
};
vector<data_column>* dataColumn;
// VERSION SPACE
vector<size_t>* versionVecValue;
struct version_column {
size_t encodedValueIdx; // index to encodedValue in versionVecValue vector
uint64_t csn;
version_column* next;
};
vector<version_column>* versionColumn;
Dictionary<T>* deltaSpace;
map<size_t, size_t>* hashtable;
public:
Column() {
dictionary = new Dictionary<T>();
vecValue = new vector<size_t>();
packed = new PackedArray();
dataColumn = new vector<data_column>();
versionVecValue = new vector<size_t>();
versionColumn = new vector<version_column>();
deltaSpace = new Dictionary<T>();
hashtable = new map<size_t, size_t>();
}
virtual ~Column() {
delete vecValue;
delete dictionary;
PackedArray_destroy(packed);
delete dataColumn;
delete versionVecValue;
delete versionColumn;
delete deltaSpace;
delete hashtable;
}
vector<size_t>* getVecValue() {
if (vecValue == NULL) {
vecValue = new vector<size_t>();
}
vecValue->clear();
for (int i = 0; i < packed->count; i++) {
vecValue->push_back(PackedArray_get(packed, i));
}
return vecValue;
}
size_t vecValueSize() {
return packed->count;
}
size_t numOfRows() {
return packed->count;
}
size_t vecValueAt(size_t index) {
if (index < 0 || index >= packed->count) {
return -1; // indicate no result
}
return PackedArray_get(packed, index);
}
void updateVecValueAt(size_t index, size_t value) {
if (index < 0 || index >= packed->count) {
return; // no update
}
// update by bit backing
PackedArray_set(packed, index, value);
}
void printVecValue(int row) {
vecValue = getVecValue();
for (size_t i = 0; i < (*vecValue).size() && i < row; i++) {
cout << "vecValue[" << i << "] = " << (*vecValue)[i] << "\n";
}
}
void bitPackingVecValue() {
// #bit to represent encode dictionary value
size_t numOfBit = (size_t) ceil(log2((double) dictionary->size()));
// init bit packing array
packed = PackedArray_create(numOfBit, vecValue->size());
for (size_t i = 0; i < vecValue->size(); i++) {
size_t value = vecValue->at(i);
PackedArray_set(packed, i, value);
}
// free vecValue
vecValue->resize(0);
}
vector<size_t>* unpackingVecValue() {
if (vecValue == NULL) {
vecValue = new vector<size_t>();
}
vecValue->clear();
for (int i = 0; i < packed->count; i++) {
vecValue->push_back(PackedArray_get(packed, i));
}
return vecValue;
}
Dictionary<T>* getDictionary() {
if (dictionary == NULL) {
dictionary = new Dictionary<T>();
}
return dictionary;
}
// Update new value for dictionary
void updateDictionary(T& value, bool sorted = true, bool bulkInsert = true, uint64_t csn = 0) {
this->bulkInsert = bulkInsert;
dictionary->addNewElement(value, vecValue, sorted, bulkInsert);
if (!bulkInsert) {
// build dataColumn vector
data_column data;
data.encodedValueIdx = vecValue->size() - 1;
data.csn = csn;
data.versionFlag = false;
dataColumn->push_back(data);
}
}
bool isBulkInsert() {
return bulkInsert;
}
// bulk insert -> update vecValue after building entire dictionary
void bulkBuildVecVector(uint64_t csn = 0) {
vecValue->resize(0);
// sort dictionary
dictionary->sort();
dictionary->setSorted(true);
// get bulkVecValue vector
vector<T>* bulkVecValue = dictionary->getBulkVecValue();
if (bulkVecValue != NULL) {
for (size_t i = 0; i < bulkVecValue->size(); i++) {
// find position of valueId in dictionary
vector<size_t> result;
dictionary->search(bulkVecValue->at(i), ColumnBase::equalOp, result);
size_t pos = result[0];
if (pos != -1) vecValue->push_back(pos);
}
// build dataColumn vector
dataColumn->resize(0);
for (size_t i = 0; i < vecValue->size(); i++) {
data_column data;
data.encodedValueIdx = i;
data.csn = csn;
data.versionFlag = false;
dataColumn->push_back(data);
}
}
bulkVecValue->resize(0);
}
void createInvertedIndex() {
cout << "Creating inverted index for column: " << this->getName() << endl;
if (dictionary != NULL)
dictionary->buildInvertedIndex();
}
bool selection(T& searchValue, ColumnBase::OP_TYPE q_where_op,
vector<bool>* q_resultRid, bool initResultRid = true) {
// init q_resultRid to all true
if (initResultRid)
for (size_t i = 0; i < numOfRows(); i++) {
q_resultRid->push_back(true);
}
vector<size_t> result;
this->getDictionary()->search(searchValue, q_where_op, result);
// find rowId with appropriate dictionary position
for (size_t rowId = 0; !result.empty() && rowId < this->vecValueSize(); rowId++) {
size_t dictPosition = this->vecValueAt(rowId);
if ((q_where_op != ColumnBase::containOp && dictPosition >= result.front() && dictPosition <= result.back())
|| (q_where_op == ColumnBase::containOp && binary_search(result.begin(), result.end(), dictPosition))) {
// do nothing, keep q_resultRid true
}
else {
// update to false -> not in result
q_resultRid->at(rowId) = false;
}
}
return true;
}
vector<T> projection(vector<bool>* q_resultRid, size_t limit, size_t& limitCount) {
vector<T> outputs; // output result
limitCount = 0; // reset limit count
for (size_t rid = 0; rid < q_resultRid->size(); rid++) {
if (q_resultRid->at(rid)) {
size_t encodeValue = this->vecValueAt(rid);
T* a = this->getDictionary()->lookup(encodeValue);
outputs.push_back(*a);
if (++limitCount >= limit) break;
}
}
return outputs;
}
vector<T> projection(vector<int>* q_resultRid, size_t limit, size_t& limitCount) {
vector<T> outputs; // output result
limitCount = 0; // reset limit count
for (size_t i = 0; i < q_resultRid->size(); i++) {
size_t encodeValue = this->vecValueAt(q_resultRid->at(i));
T* a = this->getDictionary()->lookup(encodeValue);
outputs.push_back(*a);
if (++limitCount >= limit) break;
}
return outputs;
}
// Build hashmap of valueId based on selected row ids
void buildHashmap(map<size_t, vector<size_t>>& hashmap, vector<bool>* vecRowId) {
hashmap.clear();
for (size_t rowId = 0; rowId < vecRowId->size(); rowId++) {
// get valueId from bit packing if row id is selected
// then build hashmap
if (vecRowId->at(rowId)) {
size_t valueId = vecValueAt(rowId);
hashmap[valueId].push_back(rowId);
}
}
}
// Return vector of matching row ids
vector<size_t> probe(map<size_t, vector<size_t>>* hashmap, size_t probedValue) {
if (hashmap != NULL) {
try {
return hashmap->at(probedValue);
} catch (exception& e) {
// empty vector
return vector<size_t>();
}
}
return vector<size_t>();
}
// DATA SPACE
void insertDataVecValue(T&value, uint64_t csn) {
// uncompress vecValue vector from bit packing
vecValue = unpackingVecValue();
// add new value to dictionary
bool sorted = this->getType() == ColumnBase::intType;
dictionary->addNewElement(value, vecValue, sorted, false);
// get index of new insert value to vecValue vector
size_t newInsertVecValueIdx = vecValue->size() - 1;
// bit packing vecValue again
bitPackingVecValue();
// create new data space value
data_column newData;
newData.encodedValueIdx = newInsertVecValueIdx;
newData.csn = csn;
newData.versionFlag = false;
dataColumn->push_back(newData);
}
uint64_t getCSN(size_t rid) {
try {
return dataColumn->at(rid).csn;
} catch (out_of_range& e) {
return 0;
}
}
void setCSN(size_t rid) {
try {
data_column data = dataColumn->at(rid);
data.csn = UINT64_MAX;
dataColumn->at(rid) = data;
} catch (out_of_range& e) {
// nothing
}
}
// VERSION SPACE
void addVersionVecValue(T& value, uint64_t csn, size_t rid) {
// set maximum csn so that another transaction cannot update
this->setCSN(rid);
// add to delta space and version vector (start from last dictionary position)
bool sorted = dictionary->getSorted();
deltaSpace->addNewElement(value, versionVecValue, sorted, false);
// get index of encodedValue in versionVecValue
size_t encodedValueIdx = versionVecValue->size() - 1;
// create new version
version_column newVersion;
newVersion.encodedValueIdx = encodedValueIdx;
newVersion.next = NULL;
newVersion.csn = csn;
// check previous version on hash table
int preVersionIdx = -1;
try {
preVersionIdx = hashtable->at(rid);
} catch (out_of_range& e) {
// not existed on hash table, keep -1
preVersionIdx = -1;
}
if (preVersionIdx >= 0) {
version_column preVersion = versionColumn->at(preVersionIdx);
// point the next pointer of new created version to previous version
newVersion.next = &preVersion;
// replace the previous version on Version space vector by new version
versionColumn->at(preVersionIdx) = newVersion;
} else {
// add new version to Version space vector
versionColumn->push_back(newVersion);
// create a new entry for rid on Hash table
(*hashtable)[rid] = versionColumn->size() - 1;
}
// update version_flag & new csn on DATA space
data_column dataValue = dataColumn->at(rid);
dataValue.versionFlag = true;
dataValue.csn = csn + 200;
dataColumn->at(rid) = dataValue;
}
vector<T> projectionWithVersion(vector<bool>* q_resultRid, uint64_t txTs,
size_t limit, size_t& limitCount) {
vector<T> outputs; // output result
limitCount = 0; // reset limit count
for (size_t rid = 0; rid < q_resultRid->size(); rid++) {
if (q_resultRid->at(rid)) {
// get data at rid
data_column data = dataColumn->at(rid);
// if has no version -> get value from data space
if (!data.versionFlag) {
// check txTs >= CSN
if (txTs >= data.csn) {
size_t vecValueIdx = data.encodedValueIdx;
size_t dictIdx = this->vecValueAt(vecValueIdx);
T* a = this->getDictionary()->lookup(dictIdx);
if (a != NULL) {
outputs.push_back(*a);
}
}
}
// has version-> get value from version space
else {
// get version index from hashtable
int versionIdx = -1;
try {
versionIdx = hashtable->at(rid);
} catch (exception& e) {
// not existed rid in hashtable, do nothing
}
// get version from Version space
if (versionIdx >= 0 && versionIdx < versionColumn->size()) {
version_column versionData = versionColumn->at(
versionIdx);
// traverse all versions from newest to oldest to find version with CSN <= txTs
while (versionData.csn > txTs
&& versionData.next != NULL) {
versionData = *versionData.next;
}
if (txTs >= versionData.csn) {
size_t versionVecValueIdx = versionData.encodedValueIdx;
size_t dictIdx = this->versionVecValue->at(
versionVecValueIdx);
// lookup in Delta space
T* a = deltaSpace->lookup(dictIdx);
if (a != NULL) {
outputs.push_back(*a);
}
}
}
}
// get maximum limitCount result
if (++limitCount >= limit)
break;
}
}
return outputs;
}
void updateVersionSpace2DataSpace(size_t rid) {
try {
// get latest version of rid from hash table
size_t versionIdx = hashtable->at(rid);
version_column lastestVersion = versionColumn->at(versionIdx);
size_t encodedValue = versionVecValue->at(lastestVersion.encodedValueIdx);
// get dictionary value from delta space
T* a = deltaSpace->lookup(encodedValue);
// update this value into column's dictionary
vecValue = unpackingVecValue();
size_t newEncodedValue = dictionary->addNewElement(*a, vecValue, dictionary->getSorted(), false);
vecValue->pop_back(); // remove last item
bitPackingVecValue();
// get data at rid
data_column dataAtRid = dataColumn->at(rid);
// update vecValue and data at rid
updateVecValueAt(rid, newEncodedValue);
dataAtRid.csn = lastestVersion.csn; // lastest update csn
dataAtRid.encodedValueIdx = rid; // index to vecValue
dataAtRid.versionFlag = true;
dataColumn->at(rid) = dataAtRid;
} catch (exception& e) {
//do nothing
}
}
void removeOldVersion(size_t rid, uint64_t txStartTs) {
try {
// get latest version of rid from hashtable
size_t versionIdx = hashtable->at(rid);
version_column version = versionColumn->at(versionIdx);
// check if history version has csn < txStartTs then remove
version_column* curVersion = &version;
while (version.csn >= txStartTs && version.next != NULL) {
curVersion = &version;
version = *version.next;
}
// remove old version
if (version.csn < txStartTs) {
vector<version_column*> vecOld;
while (version.next != NULL) {
vecOld.push_back(version.next);
}
for (version_column* old : vecOld) {
delete old;
}
curVersion->next = NULL;
}
} catch (exception& e) {
//do nothing
}
}
};
} /* namespace std */
#endif /* COLUMN_H_ */