Skip to content

Commit

Permalink
Fixed incorrect data generation on the customer and date_dim tables
Browse files Browse the repository at this point in the history
  • Loading branch information
pdabre12 authored and Pratik Joseph Dabre committed Jun 28, 2024
1 parent 03214b7 commit 3f40e32
Show file tree
Hide file tree
Showing 51 changed files with 1,069 additions and 1,158 deletions.
2 changes: 1 addition & 1 deletion presto-native-execution/presto_cpp/main/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ add_executable(presto_server PrestoMain.cpp)
# "undefined reference to `vtable for velox::connector::tpch::TpchTableHandle`"
# TODO: Fix these errors.
target_link_libraries(presto_server presto_server_lib velox_hive_connector
velox_tpch_connector velox_tpcds_connector)
velox_tpch_connector presto_tpcds_connector)

if(PRESTO_ENABLE_REMOTE_FUNCTIONS)
add_library(presto_server_remote_function JsonSignatureParser.cpp
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,6 @@ add_library(presto_tpcds_connector OBJECT tpcds/TpcdsConnector.cpp)

target_link_libraries(presto_tpcds_connector velox_connector tpcds_gen fmt::fmt)

if(${VELOX_ENABLE_TPCDS_CONNECTOR}) # make this change to a presto flag
add_subdirectory(tpcds)
endif()
# if(${VELOX_ENABLE_TPCDS_CONNECTOR}) # make this change to a presto flag
add_subdirectory(tpcds)
# endif()
Original file line number Diff line number Diff line change
Expand Up @@ -60,11 +60,7 @@ DSDGenIterator::DSDGenIterator(
double scaleFactor,
vector_size_t parallel,
vector_size_t child) {
auto dsdgenBackend = DSDGenBackendSingleton.try_get();

table_defs.resize(DBGEN_VERSION); // there are 24 TPC-DS tables

VELOX_CHECK_NOT_NULL(dsdgenBackend, "Unable to initialize dbgen's dbgunk.");
VELOX_CHECK_GE(scaleFactor, 0, "Tpch scale factor must be non-negative");
dsdgenCtx_.scaleFactor = scaleFactor;
InitializeDSDgen(scaleFactor, parallel, child, dsdgenCtx_);
Expand All @@ -73,14 +69,15 @@ DSDGenIterator::DSDGenIterator(
void DSDGenIterator::initializeTable(
std::vector<VectorPtr> children,
int table_id) {
auto tdef = getSimpleTdefsByNumber(table_id);
auto tdef = getSimpleTdefsByNumber(table_id, dsdgenCtx_);
tpcds_table_def table_def;
table_def.name = tdef->name;
table_def.fl_child = tdef->flags & FL_CHILD ? 1 : 0;
table_def.fl_small = tdef->flags & FL_SMALL ? 1 : 0;
table_def.first_column = tdef->nFirstColumn;
table_def.children = children;
table_defs[table_id] = std::make_unique<tpcds_table_def>(table_def);
table_def.dsdGenContext = &dsdgenCtx_;
}

std::vector<std::unique_ptr<tpcds_table_def>>& DSDGenIterator::getTableDefs() {
Expand All @@ -92,6 +89,9 @@ tpcds_builder_func DSDGenIterator::GetTDefFunctionByNumber(int table_id) {
return table_funcs->builder;
}

void DSDGenIterator::initTableOffset(int32_t table_id, size_t offset) {
row_skip(table_id, offset, dsdgenCtx_);
}
void DSDGenIterator::genRow(int32_t table_id, size_t index) {
auto builder_func = GetTDefFunctionByNumber(table_id);
builder_func((void*)&table_defs, index, dsdgenCtx_);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ class DSDGenIterator {

// Generate different types of records.
void genRow(int32_t table_id, size_t index);
void initTableOffset(int32_t table_id, size_t offset);
ds_key_t getRowCount(int32_t table_id);

tpcds_builder_func GetTDefFunctionByNumber(int table_id);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -476,39 +476,15 @@ RowTypePtr getTableSchema(Table table) {
"s_zip",
"s_country",
"s_gmt_offset",
"s_tax_percentage",
"s_tax_precentage",
},
{
INTEGER(),
VARCHAR(),
DATE(),
DATE(),
INTEGER(),
VARCHAR(),
INTEGER(),
INTEGER(),
VARCHAR(),
VARCHAR(),
INTEGER(),
VARCHAR(),
VARCHAR(),
VARCHAR(),
INTEGER(),
VARCHAR(),
INTEGER(),
VARCHAR(),
INTEGER(),
VARCHAR(),
VARCHAR(),
VARCHAR(),
VARCHAR(),
VARCHAR(),
VARCHAR(),
VARCHAR(),
VARCHAR(),
// todo: change to DECIMAL(5, 2)
INTEGER(),
DECIMAL(5, 2),
INTEGER(), VARCHAR(), DATE(), DATE(), INTEGER(),
VARCHAR(), INTEGER(), INTEGER(), VARCHAR(), VARCHAR(),
INTEGER(), VARCHAR(), VARCHAR(), VARCHAR(), INTEGER(),
VARCHAR(), INTEGER(), VARCHAR(), VARCHAR(), VARCHAR(),
VARCHAR(), VARCHAR(), VARCHAR(), VARCHAR(), VARCHAR(),
VARCHAR(), VARCHAR(), DECIMAL(5, 2), DECIMAL(5, 2),
});
return type;
}
Expand Down Expand Up @@ -879,6 +855,7 @@ RowVectorPtr genTpcdsCallCenter(
offset);
auto children = allocateVectors(callCenterRowType, vectorSize, pool);
auto table_id = static_cast<int>(Table::TBL_CALL_CENTER);
dsdGenIterator.initTableOffset(table_id, offset);
dsdGenIterator.initializeTable(children, table_id);
auto& tableDef = dsdGenIterator.getTableDefs();
for (size_t i = 0; i < vectorSize; ++i) {
Expand Down Expand Up @@ -907,6 +884,7 @@ RowVectorPtr genTpcdsCatalogPage(
offset);
auto children = allocateVectors(catalogPageRowType, vectorSize, pool);
auto table_id = static_cast<int>(Table::TBL_CATALOG_PAGE);
dsdGenIterator.initTableOffset(table_id, offset);
dsdGenIterator.initializeTable(children, table_id);
auto& tableDef = dsdGenIterator.getTableDefs();
for (size_t i = 0; i < vectorSize; ++i) {
Expand Down Expand Up @@ -1032,6 +1010,7 @@ RowVectorPtr genTpcdsCustomer(
auto children = allocateVectors(customerRowType, vectorSize, pool);

auto table_id = static_cast<int>(Table::TBL_CUSTOMER);
dsdGenIterator.initTableOffset(table_id, offset);
dsdGenIterator.initializeTable(children, table_id);

auto& tableDef = dsdGenIterator.getTableDefs();
Expand Down Expand Up @@ -1126,7 +1105,7 @@ RowVectorPtr genTpcdsDateDim(
auto children = allocateVectors(dateDimRowType, vectorSize, pool);

auto table_id = static_cast<int>(Table::TBL_DATE_DIM);

dsdGenIterator.initTableOffset(table_id, offset);
dsdGenIterator.initializeTable(children, table_id);

auto& tableDef = dsdGenIterator.getTableDefs();
Expand Down Expand Up @@ -1377,7 +1356,7 @@ RowVectorPtr genTpcdsStore(
auto children = allocateVectors(storeRowType, vectorSize, pool);

auto table_id = static_cast<int>(Table::TBL_STORE);

dsdGenIterator.initTableOffset(table_id, offset);
dsdGenIterator.initializeTable(children, table_id);

auto& tableDef = dsdGenIterator.getTableDefs();
Expand Down Expand Up @@ -1421,7 +1400,8 @@ RowVectorPtr genTpcdsStoreReturns(

auto table_id = static_cast<int>(Table::TBL_STORE_SALES);
auto child_table_id = static_cast<int>(Table::TBL_STORE_RETURNS);


dsdGenIterator.initTableOffset(table_id, offset);
dsdGenIterator.initializeTable(children, table_id);
dsdGenIterator.initializeTable(childChildren, child_table_id);
auto& tableDef = dsdGenIterator.getTableDefs();
Expand Down Expand Up @@ -1471,7 +1451,7 @@ RowVectorPtr genTpcdsStoreSales(

auto table_id = static_cast<int>(Table::TBL_STORE_SALES);
auto child_table_id = static_cast<int>(Table::TBL_STORE_RETURNS);

dsdGenIterator.initTableOffset(table_id, offset);
dsdGenIterator.initializeTable(children, table_id);
dsdGenIterator.initializeTable(childChildren, child_table_id);
auto& tableDef = dsdGenIterator.getTableDefs();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,11 +105,11 @@ int mk_address(ds_addr_t* pAddr, int nColumn, DSDGenContext& dsdGenContext) {
sprintf(pAddr->suite_num, "Suite %c", ((i >> 1) % 25) + 'A');
}

pTdef = getTdefsByNumber(getTableFromColumn(nColumn), dsdGenContext);
pTdef = getTdefsByNumber(getTableFromColumn(nColumn, dsdGenContext), dsdGenContext);

/* city is picked from a distribution which maps to large/medium/small */
if (pTdef->flags & FL_SMALL) {
i = (int)get_rowcount(getTableFromColumn(nColumn), dsdGenContext);
i = (int)get_rowcount(getTableFromColumn(nColumn, dsdGenContext), dsdGenContext);
genrand_integer(
&i,
DIST_UNIFORM,
Expand All @@ -125,7 +125,7 @@ int mk_address(ds_addr_t* pAddr, int nColumn, DSDGenContext& dsdGenContext) {
/* county is picked from a distribution, based on population and keys the
* rest */
if (pTdef->flags & FL_SMALL) {
i = (int)get_rowcount(getTableFromColumn(nColumn), dsdGenContext);
i = (int)get_rowcount(getTableFromColumn(nColumn, dsdGenContext), dsdGenContext);
genrand_integer(
&nRegion,
DIST_UNIFORM,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -395,10 +395,10 @@ int itodt(date_t* dest, int src) {
* Side Effects:
* TODO:
*/
static int doomsday[4] = {3, 2, 0, 5};
static int known[13] = {0, 3, 0, 0, 4, 9, 6, 11, 8, 5, 10, 7, 12};
int set_dow(date_t* d) {
static int last_year = -1, dday;
int doomsday[4] = {3, 2, 0, 5};
int known[13] = {0, 3, 0, 0, 4, 9, 6, 11, 8, 5, 10, 7, 12};
int last_year = -1, dday;
int res, q, r, s;

if (d->year != last_year) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -287,8 +287,8 @@ ds_key_t web_join(int col, ds_key_t join_key, DSDGenContext& dsdGenContext) {
static date_t dSiteOpen, /* open/close dates for current web site */
dSiteClose;
int nTemp;
tdef* pWS = getSimpleTdefsByNumber(WEB_SITE);
tdef* pWP = getSimpleTdefsByNumber(WEB_PAGE);
tdef* pWS = getSimpleTdefsByNumber(WEB_SITE, dsdGenContext);
tdef* pWP = getSimpleTdefsByNumber(WEB_PAGE, dsdGenContext);

if (!dsdGenContext.web_join_init) {
strtodt(&dSiteClose, WEB_END_DATE);
Expand Down Expand Up @@ -424,13 +424,13 @@ ds_key_t mk_join(
int nYear, nFromTable = 0, nTableIndex = to_tbl;
tdef* pTdef;

nFromTable = getTableFromColumn(from_col);
nFromTable = getTableFromColumn(from_col, dsdGenContext);

/*
* if the table being joined to employs sparse keys, the join gets handled
* in sparse.c
*/
pTdef = getSimpleTdefsByNumber(to_tbl);
pTdef = getSimpleTdefsByNumber(to_tbl, dsdGenContext);
if (pTdef->flags & FL_SPARSE) {
if (pTdef->arSparseKeys == NULL)
initSparseKeys(to_tbl, dsdGenContext);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
#include "tdefs.h"

/*
* Routine: nullCheck(int nColumn)
* Routine: nullCheck(int nColumn, DSDGenContext& dsdGenContext)
* Purpose:
* Algorithm:
* Data Structures:
Expand All @@ -53,13 +53,13 @@
* Side Effects:
* TODO: None
*/
int nullCheck(int nColumn) {
static int nLastTable = 0;
int nullCheck(int nColumn, DSDGenContext& dsdGenContext) {
int nLastTable = 0;
tdef* pTdef;
ds_key_t kBitMask = 1;

nLastTable = getTableFromColumn(nColumn);
pTdef = getSimpleTdefsByNumber(nLastTable);
nLastTable = getTableFromColumn(nColumn, dsdGenContext);
pTdef = getSimpleTdefsByNumber(nLastTable, dsdGenContext);

kBitMask <<= nColumn - pTdef->nFirstColumn;

Expand All @@ -86,11 +86,11 @@ int nullCheck(int nColumn) {
void nullSet(ds_key_t* pDest, int nStream, DSDGenContext& dsdGenContext) {
int nThreshold;
ds_key_t kBitMap;
static int nLastTable = 0;
int nLastTable = 0;
tdef* pTdef;

nLastTable = getTableFromColumn(nStream);
pTdef = getSimpleTdefsByNumber(nLastTable);
nLastTable = getTableFromColumn(nStream, dsdGenContext);
pTdef = getSimpleTdefsByNumber(nLastTable, dsdGenContext);

/* burn the RNG calls */
genrand_integer(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,10 +171,10 @@ int checkSeeds(tdef* pTdef, DSDGenContext& dsdGenContext) {
int row_stop(int tbl, DSDGenContext& dsdGenContext) {
tdef* pTdef;

pTdef = getSimpleTdefsByNumber(tbl);
pTdef = getSimpleTdefsByNumber(tbl, dsdGenContext);
checkSeeds(pTdef, dsdGenContext);
if (pTdef->flags & FL_PARENT) {
pTdef = getSimpleTdefsByNumber(pTdef->nParam);
pTdef = getSimpleTdefsByNumber(pTdef->nParam, dsdGenContext);
checkSeeds(pTdef, dsdGenContext);
}
return (0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -446,11 +446,10 @@ int set_option(const char* name, const char* param) {
// switch (o->flags & TYPE_MASK) {
// case OPT_FLG:
// if ((param && (*param == 'Y' || *param == 'Y' || *param ==
//OPTION_START)) || (param == NULL)) { if (o->action) if (o->action((char
//*)o->name, NULL) < 0) usage((char *)o->name, "Cannot process option");
// set_flg(name);
// } else
// clr_flg(name);
// OPTION_START)) || (param == NULL)) { if (o->action)
// if (o->action((char
//*)o->name, NULL) < 0) usage((char
//*)o->name, "Cannot process option"); set_flg(name); } else clr_flg(name);
// res = 1;
// break;
// case OPT_INT:
Expand Down Expand Up @@ -483,7 +482,7 @@ int set_option(const char* name, const char* param) {
// break;
// default:
// fprintf(stderr, "Invalid option/type (%d/%s)\n", o->flags &
//TYPE_MASK, o->name); exit(0); break;
// TYPE_MASK, o->name); exit(0); break;
// }
//
// o->flags |= OPT_SET; /* marked as set */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ ds_key_t getIDCount(int nTable, DSDGenContext& dsdGenContext) {
kRowcount = get_rowcount(nTable, dsdGenContext);
if (nTable >= PSEUDO_TABLE_START)
return (kRowcount);
pTdef = getSimpleTdefsByNumber(nTable);
pTdef = getSimpleTdefsByNumber(nTable, dsdGenContext);
if (pTdef->flags & FL_TYPE_2) {
kUniqueCount = (kRowcount / 6) * 3;
switch (kRowcount % 6) {
Expand Down Expand Up @@ -337,7 +337,7 @@ ds_key_t get_rowcount(int table, DSDGenContext& dsdGenContext) {
/* now adjust for the multiplier */
nMultiplier = 1;
if (nTable < PSEUDO_TABLE_START) {
pTdef = getSimpleTdefsByNumber(nTable);
pTdef = getSimpleTdefsByNumber(nTable, dsdGenContext);
nMultiplier = (pTdef->flags & FL_TYPE_2) ? 2 : 1;
}
for (i = 1;
Expand Down Expand Up @@ -715,7 +715,7 @@ ds_key_t dateScaling(int nTable, ds_key_t jDate, DSDGenContext& dsdGenContext) {
date_t Date;
int nDateWeight = 1, nCalendarTotal, nDayWeight;
ds_key_t kRowCount = -1;
tdef* pTdef = getSimpleTdefsByNumber(nTable);
tdef* pTdef = getSimpleTdefsByNumber(nTable, dsdGenContext);

if (!dsdGenContext.dateScaling_init) {
pDistIndex = find_dist("calendar");
Expand Down Expand Up @@ -812,7 +812,7 @@ void setUpdateScaling(int nTable, DSDGenContext& dsdGenContext) {
int i, nBaseTable;
ds_key_t kNewRowcount = 0;

pTdef = getSimpleTdefsByNumber(nTable);
pTdef = getSimpleTdefsByNumber(nTable, dsdGenContext);
if (!(pTdef->flags & FL_SOURCE_DDL) || !(pTdef->flags & FL_DATE_BASED) ||
(pTdef->flags & FL_NOP))
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ int setSCDKeys(
dsdGenContext.setSCDKeys_init = 1;
}

nTableID = getTableFromColumn(nColumnID);
nTableID = getTableFromColumn(nColumnID, dsdGenContext);
nModulo = (int)(kIndex % 6);
switch (nModulo) {
case 1: /* 1 revision */
Expand Down
Loading

0 comments on commit 3f40e32

Please sign in to comment.