diff --git a/presto-docs/src/main/sphinx/presto-cpp.rst b/presto-docs/src/main/sphinx/presto-cpp.rst index 0f9ed62a12ced..3e7b5d5fd8903 100644 --- a/presto-docs/src/main/sphinx/presto-cpp.rst +++ b/presto-docs/src/main/sphinx/presto-cpp.rst @@ -49,4 +49,6 @@ Only specific connectors are supported in the Presto C++ evaluation engine. * Iceberg connector supports both V1 and V2 tables, including tables with delete files. -* TPCH connector, with ``tpch.naming=standard`` catalog property. \ No newline at end of file +* TPCH connector, with ``tpch.naming=standard`` catalog property. + +* TPCDS connector. \ No newline at end of file diff --git a/presto-native-execution/etc/catalog/tpcds.properties b/presto-native-execution/etc/catalog/tpcds.properties new file mode 100644 index 0000000000000..ba8147db14c71 --- /dev/null +++ b/presto-native-execution/etc/catalog/tpcds.properties @@ -0,0 +1 @@ +connector.name=tpcds diff --git a/presto-native-execution/presto_cpp/main/CMakeLists.txt b/presto-native-execution/presto_cpp/main/CMakeLists.txt index 14d288ebca647..9e4edd17360fc 100644 --- a/presto-native-execution/presto_cpp/main/CMakeLists.txt +++ b/presto-native-execution/presto_cpp/main/CMakeLists.txt @@ -14,6 +14,7 @@ add_subdirectory(types) add_subdirectory(http) add_subdirectory(common) add_subdirectory(thrift) +add_subdirectory(connectors) add_library( presto_server_lib @@ -93,7 +94,7 @@ add_executable(presto_server PrestoMain.cpp) # "undefined reference to `vtable for velox::connector::tpch::TpchTableHandle`" # TODO: Fix these errors. target_link_libraries(presto_server presto_server_lib velox_hive_connector - velox_tpch_connector) + velox_tpch_connector presto_tpcds_connector) if(PRESTO_ENABLE_REMOTE_FUNCTIONS) add_library(presto_server_remote_function JsonSignatureParser.cpp diff --git a/presto-native-execution/presto_cpp/main/PrestoServer.cpp b/presto-native-execution/presto_cpp/main/PrestoServer.cpp index 5ee52935cc224..38dcdebf01f19 100644 --- a/presto-native-execution/presto_cpp/main/PrestoServer.cpp +++ b/presto-native-execution/presto_cpp/main/PrestoServer.cpp @@ -267,6 +267,8 @@ void PrestoServer::run() { std::make_unique("system")); registerPrestoToVeloxConnector( std::make_unique("$system@system")); + registerPrestoToVeloxConnector( + std::make_unique("tpcds")); initializeVeloxMemory(); initializeThreadPools(); diff --git a/presto-native-execution/presto_cpp/main/connectors/CMakeLists.txt b/presto-native-execution/presto_cpp/main/connectors/CMakeLists.txt new file mode 100644 index 0000000000000..46f0608b8e9b1 --- /dev/null +++ b/presto-native-execution/presto_cpp/main/connectors/CMakeLists.txt @@ -0,0 +1,15 @@ +# Copyright (c) Facebook, Inc. and its affiliates. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +add_subdirectory(tpcds) diff --git a/presto-native-execution/presto_cpp/main/connectors/tpcds/CMakeLists.txt b/presto-native-execution/presto_cpp/main/connectors/tpcds/CMakeLists.txt new file mode 100644 index 0000000000000..d6738c90eb836 --- /dev/null +++ b/presto-native-execution/presto_cpp/main/connectors/tpcds/CMakeLists.txt @@ -0,0 +1,51 @@ +# Copyright (c) Facebook, Inc. and its affiliates. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +cmake_policy(SET CMP0079 NEW) + +project(TPCDS) + +add_library(presto_tpcds_connector OBJECT TpcdsConnector.cpp) +target_link_libraries(presto_tpcds_connector velox_connector tpcds_gen fmt::fmt) + +# Without this hack, there are multiple link errors similar to the one below +# only on GCC. "undefined reference to `vtable for +# velox::connector::tpcds::TpcdsTableHandle`. TODO: Fix this hack. +target_link_libraries(velox_exec_test_lib presto_tpcds_connector) + +if(CMAKE_CXX_COMPILER_ID MATCHES "Clang") + add_compile_options(-Wno-deprecated-declarations -Wno-writable-strings + -Wno-missing-field-initializers) +endif() + +# This stringop-overflow warning seems to have lots of false positives and has +# been the source of a lot of compiler bug reports (e.g. +# https://gcc.gnu.org/bugzilla/show_bug.cgi?id=99578), which causes +# parquet-amalgamation.cpp to fail to compile. For now, we disable this warning +# on the affected compiler (GCC). +if(CMAKE_CXX_COMPILER_ID MATCHES "GNU") + add_compile_options(-Wno-stringop-overflow -Wno-write-strings) +endif() + +# Add subdirectories +add_subdirectory(${CMAKE_SOURCE_DIR}/presto_cpp/external/dsdgen/dsdgen-c build) + +add_library(append_info OBJECT utils/append_info-c.cpp) +target_link_libraries(append_info velox_vector_test_lib Folly::folly xsimd) +target_link_libraries(dsdgen_c append_info) + +add_library(tpcds_gen TpcdsGen.cpp DSDGenIterator.cpp) +target_include_directories(tpcds_gen PUBLIC dsdgen/include) +target_link_libraries(tpcds_gen velox_memory velox_vector dsdgen_c append_info + fmt::fmt) diff --git a/presto-native-execution/presto_cpp/main/connectors/tpcds/DSDGenIterator.cpp b/presto-native-execution/presto_cpp/main/connectors/tpcds/DSDGenIterator.cpp new file mode 100644 index 0000000000000..ce6242772a0b3 --- /dev/null +++ b/presto-native-execution/presto_cpp/main/connectors/tpcds/DSDGenIterator.cpp @@ -0,0 +1,96 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "presto_cpp/main/connectors/tpcds/DSDGenIterator.h" +#include "presto_cpp/external/dsdgen/include/dsdgen-c/dist.h" +#include "presto_cpp/external/dsdgen/include/dsdgen-c/genrand.h" +#include "presto_cpp/external/dsdgen/include/dsdgen-c/parallel.h" +#include "presto_cpp/external/dsdgen/include/dsdgen-c/params.h" +#include "presto_cpp/external/dsdgen/include/dsdgen-c/scaling.h" +#include "presto_cpp/external/dsdgen/include/dsdgen-c/tdefs.h" + +using namespace facebook::velox; +namespace facebook::presto::connector::tpcds { + +void InitializeDSDgen( + double scale, + vector_size_t parallel, + vector_size_t child, + DSDGenContext& dsdGenContext) { + dsdGenContext.Reset(); + resetCountCount(); + + std::string scaleStr = std::to_string(scale); + set_str("SCALE", scaleStr.c_str(), dsdGenContext); + std::string parallelStr = std::to_string(parallel); + set_str("PARALLEL", parallelStr.c_str(), dsdGenContext); + std::string childStr = std::to_string(child); + set_str("CHILD", childStr.c_str(), dsdGenContext); + + init_rand(dsdGenContext); // no random numbers without this +} + +std::string getQuery(int query) { + if (query <= 0 || query > TPCDS_QUERIES_COUNT) { + throw std::exception(); + } + return TPCDS_QUERIES[query - 1]; +} + +DSDGenIterator::DSDGenIterator( + double scaleFactor, + vector_size_t parallel, + vector_size_t child) { + table_defs_.resize(DBGEN_VERSION); // there are 24 TPC-DS tables + VELOX_CHECK_GE(scaleFactor, 0.0, "Tpcds scale factor must be non-negative"); + InitializeDSDgen(scaleFactor, parallel, child, dsdgenCtx_); +} + +void DSDGenIterator::initializeTable( + const std::vector& children, + int table_id) { + auto tdef = getSimpleTdefsByNumber(table_id, dsdgenCtx_); + tpcds_table_def table_def; + table_def.name = tdef->name; + table_def.fl_child = tdef->flags & FL_CHILD ? 1 : 0; + table_def.fl_small = tdef->flags & FL_SMALL ? 1 : 0; + table_def.first_column = tdef->nFirstColumn; + table_def.children = children; + table_def.dsdGenContext = &dsdgenCtx_; + table_defs_[table_id] = std::make_unique(table_def); +} + +std::vector>& DSDGenIterator::getTableDefs() { + return table_defs_; +}; + +tpcds_builder_func DSDGenIterator::GetTDefFunctionByNumber(int table_id) { + auto table_funcs = getTdefFunctionsByNumber(table_id); + return table_funcs->builder; +} + +void DSDGenIterator::initTableOffset(int32_t table_id, size_t offset) { + row_skip(table_id, offset, dsdgenCtx_); +} +void DSDGenIterator::genRow(int32_t table_id, size_t index) { + auto builder_func = GetTDefFunctionByNumber(table_id); + builder_func((void*)&table_defs_, index, dsdgenCtx_); + row_stop(table_id, dsdgenCtx_); +} + +int64_t DSDGenIterator::getRowCount(int32_t table) { + return get_rowcount(table, dsdgenCtx_); +} + +} // namespace facebook::presto::connector::tpcds diff --git a/presto-native-execution/presto_cpp/main/connectors/tpcds/DSDGenIterator.h b/presto-native-execution/presto_cpp/main/connectors/tpcds/DSDGenIterator.h new file mode 100644 index 0000000000000..6d5aad6cc312b --- /dev/null +++ b/presto-native-execution/presto_cpp/main/connectors/tpcds/DSDGenIterator.h @@ -0,0 +1,68 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include + +#include "presto_cpp/external/dsdgen/include/dsdgen-c/dist.h" +#include "presto_cpp/external/dsdgen/include/tpcds_constants.hpp" +#include "presto_cpp/main/connectors/tpcds/utils/append_info-c.hpp" + +using namespace facebook::velox; +namespace facebook::presto::connector::tpcds { + +typedef int64_t ds_key_t; + +typedef int (*tpcds_builder_func)(void*, ds_key_t, DSDGenContext& dsdgenCtx); + +void InitializeDSDgen( + double scale, + vector_size_t parallel, + vector_size_t child, + DSDGenContext& dsdGenContext); + +std::string getQuery(int query); + +/// This class exposes a thread-safe and reproducible iterator over TPC-DS +/// synthetically generated data, backed by DSDGEN. +class DSDGenIterator { + public: + explicit DSDGenIterator( + double scaleFactor, + vector_size_t parallel, + vector_size_t child); + + void initializeTable(const std::vector& children, int table); + + std::vector>& getTableDefs(); + + // Before generating records using the gen*() functions below, call the + // appropriate init*() function to correctly initialize the seed given the + // offset to be generated. + void initTableOffset(int32_t table_id, size_t offset); + + // Generate different types of records. + void genRow(int32_t table_id, size_t index); + + ds_key_t getRowCount(int32_t table_id); + + tpcds_builder_func GetTDefFunctionByNumber(int table_id); + + protected: + DSDGenContext dsdgenCtx_; + std::vector> table_defs_; +}; + +} // namespace facebook::presto::connector::tpcds diff --git a/presto-native-execution/presto_cpp/main/connectors/tpcds/TpcdsConnector.cpp b/presto-native-execution/presto_cpp/main/connectors/tpcds/TpcdsConnector.cpp new file mode 100644 index 0000000000000..6fc9e6d601849 --- /dev/null +++ b/presto-native-execution/presto_cpp/main/connectors/tpcds/TpcdsConnector.cpp @@ -0,0 +1,140 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "presto_cpp/main/connectors/tpcds/TpcdsConnector.h" +#include "presto_cpp/main/connectors/tpcds/DSDGenIterator.h" +#include "presto_cpp/main/connectors/tpcds/TpcdsGen.h" + +using namespace ::facebook::velox; +namespace facebook::presto::connector::tpcds { + +using facebook::presto::connector::tpcds::Table; + +std::string TpcdsTableHandle::toString() const { + return fmt::format( + "table: {}, scale factor: {}", toTableName(table_), scaleFactor_); +} + +TpcdsDataSource::TpcdsDataSource( + const std::shared_ptr& outputType, + const std::shared_ptr& tableHandle, + const std::unordered_map< + std::string, + std::shared_ptr>& columnHandles, + velox::memory::MemoryPool* FOLLY_NONNULL pool) + : pool_(pool) { + auto tpcdsTableHandle = + std::dynamic_pointer_cast(tableHandle); + VELOX_CHECK_NOT_NULL( + tpcdsTableHandle, "TableHandle must be an instance of TpcdsTableHandle"); + tpcdsTable_ = tpcdsTableHandle->getTable(); + scaleFactor_ = tpcdsTableHandle->getScaleFactor(); + DSDGenIterator dsdGenIterator(scaleFactor_, 1, 1); + tpcdsTableRowCount_ = + dsdGenIterator.getRowCount(static_cast(tpcdsTable_)); + + auto tpcdsTableSchema = getTableSchema(tpcdsTableHandle->getTable()); + VELOX_CHECK_NOT_NULL(tpcdsTableSchema, "TpcdsSchema can't be null."); + + outputColumnMappings_.reserve(outputType->size()); + + for (const auto& outputName : outputType->names()) { + auto it = columnHandles.find(outputName); + VELOX_CHECK( + it != columnHandles.end(), + "ColumnHandle is missing for output column '{}' on table '{}'", + outputName, + toTableName(tpcdsTable_)); + + auto handle = std::dynamic_pointer_cast(it->second); + VELOX_CHECK_NOT_NULL( + handle, + "ColumnHandle must be an instance of TpcdsColumnHandle " + "for '{}' on table '{}'", + handle->name(), + toTableName(tpcdsTable_)); + + auto idx = tpcdsTableSchema->getChildIdxIfExists(handle->name()); + VELOX_CHECK( + idx != std::nullopt, + "Column '{}' not found on TPC-DS table '{}'.", + handle->name(), + toTableName(tpcdsTable_)); + outputColumnMappings_.emplace_back(*idx); + } + outputType_ = outputType; +} + +RowVectorPtr TpcdsDataSource::projectOutputColumns(RowVectorPtr inputVector) { + std::vector children; + children.reserve(outputColumnMappings_.size()); + + for (const auto channel : outputColumnMappings_) { + children.emplace_back(inputVector->childAt(channel)); + } + + return std::make_shared( + pool_, + outputType_, + BufferPtr(), + inputVector->size(), + std::move(children)); +} + +void TpcdsDataSource::addSplit(std::shared_ptr split) { + VELOX_CHECK_EQ( + currentSplit_, + nullptr, + "Previous split has not been processed yet. Call next() to process the split."); + currentSplit_ = std::dynamic_pointer_cast(split); + VELOX_CHECK(currentSplit_, "Wrong type of split for TpcdsDataSource."); + + size_t partSize = std::ceil( + (double)tpcdsTableRowCount_ / (double)currentSplit_->totalParts_); + + splitOffset_ = partSize * currentSplit_->partNumber_; + splitEnd_ = splitOffset_ + partSize; +} + +std::optional TpcdsDataSource::next( + uint64_t size, + velox::ContinueFuture& /*future*/) { + VELOX_CHECK_NOT_NULL( + currentSplit_, "No split to process. Call addSplit() first."); + + size_t maxRows = std::min(size, (splitEnd_ - splitOffset_)); + vector_size_t parallel = currentSplit_->totalParts_; + vector_size_t child = currentSplit_->partNumber_; + auto outputVector = genTpcdsData( + tpcdsTable_, maxRows, splitOffset_, pool_, scaleFactor_, parallel, child); + + // If the split is exhausted. + if (!outputVector || outputVector->size() == 0) { + currentSplit_ = nullptr; + return nullptr; + } + + // splitOffset needs to advance based on maxRows passed to getTpcdsData(), and + // not the actual number of returned rows in the output vector, as they are + // not the same for lineitem. + splitOffset_ += maxRows; + completedRows_ += outputVector->size(); + completedBytes_ += outputVector->retainedSize(); + + return projectOutputColumns(outputVector); +} + +VELOX_REGISTER_CONNECTOR_FACTORY(std::make_shared()) + +} // namespace facebook::presto::connector::tpcds diff --git a/presto-native-execution/presto_cpp/main/connectors/tpcds/TpcdsConnector.h b/presto-native-execution/presto_cpp/main/connectors/tpcds/TpcdsConnector.h new file mode 100644 index 0000000000000..659faa76b387c --- /dev/null +++ b/presto-native-execution/presto_cpp/main/connectors/tpcds/TpcdsConnector.h @@ -0,0 +1,181 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include "presto_cpp/main/connectors/tpcds/TpcdsConnectorSplit.h" +#include "presto_cpp/main/connectors/tpcds/TpcdsGen.h" +#include "velox/connectors/Connector.h" + +using namespace facebook::velox; +using namespace facebook::velox::connector; + +namespace facebook::presto::connector::tpcds { + +class TpcdsConnector; + +// TPC-DS column handle only needs the column name (all columns are generated in +// the same way). +class TpcdsColumnHandle : public velox::connector::ColumnHandle { + public: + explicit TpcdsColumnHandle(const std::string& name) : name_(name) {} + + const std::string& name() const { + return name_; + } + + private: + const std::string name_; +}; + +// TPC-DS table handle uses the underlying enum to describe the target table. +class TpcdsTableHandle : public ConnectorTableHandle { + public: + explicit TpcdsTableHandle( + std::string connectorId, + tpcds::Table table, + double scaleFactor = 1.0) + : ConnectorTableHandle(std::move(connectorId)), + table_(table), + scaleFactor_(scaleFactor) { + VELOX_CHECK_GE(scaleFactor, 0.0, "Tpcds scale factor must be non-negative"); + } + + ~TpcdsTableHandle() override {} + + std::string toString() const override; + + tpcds::Table getTable() const { + return table_; + } + + double getScaleFactor() const { + return scaleFactor_; + } + + private: + const tpcds::Table table_; + double scaleFactor_; +}; + +class TpcdsDataSource : public velox::connector::DataSource { + public: + TpcdsDataSource( + const std::shared_ptr& outputType, + const std::shared_ptr& + tableHandle, + const std::unordered_map< + std::string, + std::shared_ptr>& columnHandles, + velox::memory::MemoryPool* FOLLY_NONNULL pool); + + void addSplit(std::shared_ptr split) override; + + void addDynamicFilter( + column_index_t /*outputChannel*/, + const std::shared_ptr& /*filter*/) override { + VELOX_NYI("Dynamic filters not supported by TpcdsConnector."); + } + + std::optional next(uint64_t size, velox::ContinueFuture& future) + override; + + uint64_t getCompletedRows() override { + return completedRows_; + } + + uint64_t getCompletedBytes() override { + return completedBytes_; + } + + std::unordered_map runtimeStats() override { + // TODO: Which stats do we want to expose here? + return {}; + } + + private: + RowVectorPtr projectOutputColumns(RowVectorPtr vector); + + tpcds::Table tpcdsTable_; + double scaleFactor_{1.0}; + size_t tpcdsTableRowCount_{0}; + RowTypePtr outputType_; + + // Mapping between output columns and their indices (column_index_t) in the + // dbgen generated datasets. + std::vector outputColumnMappings_; + + std::shared_ptr currentSplit_; + + // First (splitOffset_) and last (splitEnd_) row number that should be + // generated by this split. + uint64_t splitOffset_{0}; + uint64_t splitEnd_{0}; + + size_t completedRows_{0}; + size_t completedBytes_{0}; + + memory::MemoryPool* FOLLY_NONNULL pool_; +}; + +class TpcdsConnector final : public velox::connector::Connector { + public: + TpcdsConnector( + const std::string& id, + std::shared_ptr config, + folly::Executor* FOLLY_NULLABLE /*executor*/) + : Connector(id) {} + + std::unique_ptr createDataSource( + const std::shared_ptr& outputType, + const std::shared_ptr& tableHandle, + const std::unordered_map< + std::string, + std::shared_ptr>& columnHandles, + ConnectorQueryCtx* FOLLY_NONNULL connectorQueryCtx) override final { + return std::make_unique( + outputType, + tableHandle, + columnHandles, + connectorQueryCtx->memoryPool()); + } + + std::unique_ptr createDataSink( + RowTypePtr /*inputType*/, + std::shared_ptr< + ConnectorInsertTableHandle> /*connectorInsertTableHandle*/, + ConnectorQueryCtx* /*connectorQueryCtx*/, + CommitStrategy /*commitStrategy*/) override final { + VELOX_NYI("TpcdsConnector does not support data sink."); + } +}; + +class TpcdsConnectorFactory : public ConnectorFactory { + public: + static constexpr const char* kTpcdsConnectorName{"tpcds"}; + + TpcdsConnectorFactory() : ConnectorFactory(kTpcdsConnectorName) {} + + explicit TpcdsConnectorFactory(const char* connectorName) + : ConnectorFactory(connectorName) {} + + std::shared_ptr newConnector( + const std::string& id, + std::shared_ptr config, + folly::Executor* executor = nullptr) override { + return std::make_shared(id, config, executor); + } +}; + +} // namespace facebook::presto::connector::tpcds diff --git a/presto-native-execution/presto_cpp/main/connectors/tpcds/TpcdsConnectorSplit.h b/presto-native-execution/presto_cpp/main/connectors/tpcds/TpcdsConnectorSplit.h new file mode 100644 index 0000000000000..055f8d3569fa9 --- /dev/null +++ b/presto-native-execution/presto_cpp/main/connectors/tpcds/TpcdsConnectorSplit.h @@ -0,0 +1,66 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include "velox/connectors/Connector.h" + +using namespace facebook::velox; + +namespace facebook::presto::connector::tpcds { + +struct TpcdsConnectorSplit : public velox::connector::ConnectorSplit { + explicit TpcdsConnectorSplit( + const std::string& connectorId, + const vector_size_t totalParts = 1, + const vector_size_t partNumber = 0) + : velox::connector::ConnectorSplit(connectorId), + totalParts_(totalParts), + partNumber_(partNumber) { + VELOX_CHECK_GE(totalParts, 1, "totalParts must be >= 1"); + VELOX_CHECK_GT(totalParts, partNumber, "totalParts must be > partNumber"); + } + + // In how many parts the generated TPC-DS table will be segmented, roughly + // `rowCount / totalParts` + const vector_size_t totalParts_{1}; + + // Which of these parts will be read by this split. + const vector_size_t partNumber_{0}; +}; + +} // namespace facebook::presto::connector::tpcds + +template <> +struct fmt::formatter + : formatter { + auto format( + facebook::presto::connector::tpcds::TpcdsConnectorSplit s, + format_context& ctx) { + return formatter::format(s.toString(), ctx); + } +}; + +template <> +struct fmt::formatter< + std::shared_ptr> + : formatter { + auto format( + std::shared_ptr + s, + format_context& ctx) { + return formatter::format(s->toString(), ctx); + } +}; diff --git a/presto-native-execution/presto_cpp/main/connectors/tpcds/TpcdsGen.cpp b/presto-native-execution/presto_cpp/main/connectors/tpcds/TpcdsGen.cpp new file mode 100644 index 0000000000000..43a93f06597d4 --- /dev/null +++ b/presto-native-execution/presto_cpp/main/connectors/tpcds/TpcdsGen.cpp @@ -0,0 +1,975 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "presto_cpp/main/connectors/tpcds/TpcdsGen.h" +#include "presto_cpp/main/connectors/tpcds/DSDGenIterator.h" + +#define CALL_CENTER 0 +#define DBGEN_VERSION 24 + +using namespace facebook::velox; +namespace facebook::presto::connector::tpcds { + +namespace { +size_t getVectorSize(size_t rowCount, size_t maxRows, size_t offset) { + if (offset >= rowCount) { + return 0; + } + return std::min(rowCount - offset, maxRows); +} + +std::vector allocateVectors( + const RowTypePtr& type, + size_t vectorSize, + memory::MemoryPool* pool) { + std::vector vector; + vector.reserve(type->size()); + + for (const auto& childType : type->children()) { + vector.emplace_back(BaseVector::create(childType, vectorSize, pool)); + } + return vector; +} + +RowVectorPtr genTpcdsRowVector( + int32_t tableId, + memory::MemoryPool* pool, + RowTypePtr rowType, + std::vector>& tableDef) { + auto rowCount = tableDef[tableId]->rowIndex; + for (auto& child : tableDef[tableId]->children) { + child->resize(rowCount); + } + + return std::make_shared( + pool, + rowType, + BufferPtr(nullptr), + rowCount, + std::move(tableDef[tableId]->children)); +} + +RowVectorPtr genTpcdsTableData( + Table table, + memory::MemoryPool* pool, + size_t maxRows, + size_t offset, + double scaleFactor, + vector_size_t parallel, + vector_size_t child) { + // Create schema and allocate vectors. + auto rowType = getTableSchema(table); + auto table_id = static_cast(table); + DSDGenIterator dsdGenIterator(scaleFactor, parallel, child); + size_t vectorSize = + getVectorSize(dsdGenIterator.getRowCount(table_id), maxRows, offset); + auto children = allocateVectors(rowType, vectorSize, pool); + dsdGenIterator.initTableOffset(table_id, offset); + dsdGenIterator.initializeTable(children, table_id); + auto& tableDef = dsdGenIterator.getTableDefs(); + for (size_t i = 0; i < vectorSize; ++i) { + dsdGenIterator.genRow(table_id, i + offset + 1); + } + return std::make_shared( + pool, + rowType, + BufferPtr(nullptr), + vectorSize, + std::move(tableDef[table_id]->children)); +} + +RowVectorPtr genTpcdsParentAndChildTable( + memory::MemoryPool* pool, + size_t maxRows, + size_t offset, + double scaleFactor, + vector_size_t parallel, + vector_size_t child, + Table parentTable, + Table childTable, + bool generateChildTable) { + // Whenever a call to generate a table which is either marked as a + // parent or a child is requested, both the child and parent tables + // need to be populated. + + auto parentTableRowType = getTableSchema(parentTable); + DSDGenIterator dsdGenIterator(scaleFactor, parallel, child); + size_t parentTableVectorSize = getVectorSize( + dsdGenIterator.getRowCount(static_cast(parentTable)), + maxRows, + offset); + + size_t parentTableUpperBound = parentTableVectorSize * 16; + auto children = + allocateVectors(parentTableRowType, parentTableUpperBound, pool); + + auto childTableRowType = getTableSchema(childTable); + auto childChildren = + allocateVectors(childTableRowType, parentTableUpperBound, pool); + + auto parentTableId = static_cast(parentTable); + auto childTableId = static_cast(childTable); + + dsdGenIterator.initTableOffset(parentTableId, offset); + dsdGenIterator.initializeTable(children, parentTableId); + dsdGenIterator.initializeTable(childChildren, childTableId); + auto& tableDef = dsdGenIterator.getTableDefs(); + for (size_t i = 0; i < parentTableVectorSize; ++i) { + dsdGenIterator.genRow(parentTableId, i + offset + 1); + } + if (generateChildTable) { + return genTpcdsRowVector(childTableId, pool, childTableRowType, tableDef); + } else { + return genTpcdsRowVector(parentTableId, pool, parentTableRowType, tableDef); + } +} +} // namespace + +static RowTypePtr callCenterRowType = ROW( + {"cc_call_center_sk", "cc_call_center_id", + "cc_rec_start_date", "cc_rec_end_date", + "cc_closed_date_sk", "cc_open_date_sk", + "cc_name", "cc_class", + "cc_employees", "cc_sq_ft", + "cc_hours", "cc_manager", + "cc_mkt_id", "cc_mkt_class", + "cc_mkt_desc", "cc_market_manager", + "cc_division", "cc_division_name", + "cc_company", "cc_company_name", + "cc_street_number", "cc_street_name", + "cc_street_type", "cc_suite_number", + "cc_city", "cc_county", + "cc_state", "cc_zip", + "cc_country", "cc_gmt_offset", + "cc_tax_percentage"}, + {BIGINT(), VARCHAR(), DATE(), DATE(), INTEGER(), INTEGER(), + VARCHAR(), VARCHAR(), INTEGER(), INTEGER(), VARCHAR(), VARCHAR(), + INTEGER(), VARCHAR(), VARCHAR(), VARCHAR(), INTEGER(), VARCHAR(), + INTEGER(), VARCHAR(), VARCHAR(), VARCHAR(), VARCHAR(), VARCHAR(), + VARCHAR(), VARCHAR(), VARCHAR(), VARCHAR(), VARCHAR(), DECIMAL(5, 2), + DECIMAL(5, 2)}); + +static RowTypePtr catalogPageRowType = + ROW({"cp_catalog_page_sk", + "cp_catalog_page_id", + "cp_start_date_sk", + "cp_end_date_sk", + "cp_department", + "cp_catalog_number", + "cp_catalog_page_number", + "cp_description", + "cp_type"}, + {BIGINT(), + VARCHAR(), + INTEGER(), + INTEGER(), + VARCHAR(), + INTEGER(), + INTEGER(), + VARCHAR(), + VARCHAR()}); + +static RowTypePtr catalogReturnsRowType = ROW( + {"cr_returned_date_sk", + "cr_returned_time_sk", + "cr_item_sk", + "cr_refunded_customer_sk", + "cr_refunded_cdemo_sk", + "cr_refunded_hdemo_sk", + "cr_refunded_addr_sk", + "cr_returning_customer_sk", + "cr_returning_cdemo_sk", + "cr_returning_hdemo_sk", + "cr_returning_addr_sk", + "cr_call_center_sk", + "cr_catalog_page_sk", + "cr_ship_mode_sk", + "cr_warehouse_sk", + "cr_reason_sk", + "cr_order_number", + "cr_return_quantity", + "cr_return_amount", + "cr_return_tax", + "cr_return_amt_inc_tax", + "cr_fee", + "cr_return_ship_cost", + "cr_refunded_cash", + "cr_reversed_charge", + "cr_store_credit", + "cr_net_loss"}, + {BIGINT(), BIGINT(), BIGINT(), BIGINT(), BIGINT(), + BIGINT(), BIGINT(), BIGINT(), BIGINT(), BIGINT(), + BIGINT(), BIGINT(), BIGINT(), BIGINT(), BIGINT(), + BIGINT(), BIGINT(), INTEGER(), DECIMAL(7, 2), DECIMAL(7, 2), + DECIMAL(7, 2), DECIMAL(7, 2), DECIMAL(7, 2), DECIMAL(7, 2), DECIMAL(7, 2), + DECIMAL(7, 2), DECIMAL(7, 2)}); + +static RowTypePtr catalogSalesRowType = ROW( + {"cs_sold_date_sk", + "cs_sold_time_sk", + "cs_ship_date_sk", + "cs_bill_customer_sk", + "cs_bill_cdemo_sk", + "cs_bill_hdemo_sk", + "cs_bill_addr_sk", + "cs_ship_customer_sk", + "cs_ship_cdemo_sk", + "cs_ship_hdemo_sk", + "cs_ship_addr_sk", + "cs_call_center_sk", + "cs_catalog_page_sk", + "cs_ship_mode_sk", + "cs_warehouse_sk", + "cs_item_sk", + "cs_promo_sk", + "cs_order_number", + "cs_quantity", + "cs_wholesale_cost", + "cs_list_price", + "cs_sales_price", + "cs_ext_discount_amt", + "cs_ext_sales_price", + "cs_ext_wholesale_cost", + "cs_ext_list_price", + "cs_ext_tax", + "cs_coupon_amt", + "cs_ext_ship_cost", + "cs_net_paid", + "cs_net_paid_inc_tax", + "cs_net_paid_inc_ship", + "cs_net_paid_inc_ship_tax", + "cs_net_profit"}, + {BIGINT(), BIGINT(), BIGINT(), BIGINT(), BIGINT(), + BIGINT(), BIGINT(), BIGINT(), BIGINT(), BIGINT(), + BIGINT(), BIGINT(), BIGINT(), BIGINT(), BIGINT(), + BIGINT(), BIGINT(), BIGINT(), INTEGER(), DECIMAL(7, 2), + DECIMAL(7, 2), DECIMAL(7, 2), DECIMAL(7, 2), DECIMAL(7, 2), DECIMAL(7, 2), + DECIMAL(7, 2), DECIMAL(7, 2), DECIMAL(7, 2), DECIMAL(7, 2), DECIMAL(7, 2), + DECIMAL(7, 2), DECIMAL(7, 2), DECIMAL(7, 2), DECIMAL(7, 2)}); + +static RowTypePtr customerRowType = ROW( + { + "c_customer_sk", + "c_customer_id", + "c_current_cdemo_sk", + "c_current_hdemo_sk", + "c_current_addr_sk", + "c_first_shipto_date_sk", + "c_first_sales_date_sk", + "c_salutation", + "c_first_name", + "c_last_name", + "c_preferred_cust_flag", + "c_birth_day", + "c_birth_month", + "c_birth_year", + "c_birth_country", + "c_login", + "c_email_address", + "c_last_review_date_sk", + }, + { + BIGINT(), + VARCHAR(), + BIGINT(), + BIGINT(), + BIGINT(), + BIGINT(), + BIGINT(), + VARCHAR(), + VARCHAR(), + VARCHAR(), + VARCHAR(), + INTEGER(), + INTEGER(), + INTEGER(), + VARCHAR(), + VARCHAR(), + VARCHAR(), + BIGINT(), + }); + +static RowTypePtr customerAddressRowType = + ROW({"ca_address_sk", + "ca_address_id", + "ca_street_number", + "ca_street_name", + "ca_street_type", + "ca_suite_number", + "ca_city", + "ca_county", + "ca_state", + "ca_zip", + "ca_country", + "ca_gmt_offset", + "ca_location_type"}, + {BIGINT(), + VARCHAR(), + VARCHAR(), + VARCHAR(), + VARCHAR(), + VARCHAR(), + VARCHAR(), + VARCHAR(), + VARCHAR(), + VARCHAR(), + VARCHAR(), + DECIMAL(5, 2), + VARCHAR()}); + +static RowTypePtr customerDemographicsRowType = + ROW({"cd_demo_sk", + "cd_gender", + "cd_marital_status", + "cd_education_status", + "cd_purchase_estimate", + "cd_credit_rating", + "cd_dep_count", + "cd_dep_employed_count", + "cd_dep_college_count"}, + {BIGINT(), + VARCHAR(), + VARCHAR(), + VARCHAR(), + INTEGER(), + VARCHAR(), + INTEGER(), + INTEGER(), + INTEGER()}); + +static RowTypePtr dateDimRowType = ROW( + { + "d_date_sk", + "d_date_id", + "d_date", + "d_month_seq", + "d_week_seq", + "d_quarter_seq", + "d_year", + "d_dow", + "d_moy", + "d_dom", + "d_qoy", + "d_fy_year", + "d_fy_quarter_seq", + "d_fy_week_seq", + "d_day_name", + "d_quarter_name", + "d_holiday", + "d_weekend", + "d_following_holiday", + "d_first_dom", + "d_last_dom", + "d_same_day_ly", + "d_same_day_lq", + "d_current_day", + "d_current_week", + "d_current_month", + "d_current_quarter", + "d_current_year", + }, + { + BIGINT(), VARCHAR(), DATE(), INTEGER(), INTEGER(), INTEGER(), + INTEGER(), INTEGER(), INTEGER(), INTEGER(), INTEGER(), INTEGER(), + INTEGER(), INTEGER(), VARCHAR(), VARCHAR(), VARCHAR(), VARCHAR(), + VARCHAR(), INTEGER(), INTEGER(), INTEGER(), INTEGER(), VARCHAR(), + VARCHAR(), VARCHAR(), VARCHAR(), VARCHAR(), + }); + +static RowTypePtr householdDemographicsRowType = + ROW({"hd_demo_sk", + "hd_income_band_sk", + "hd_buy_potential", + "hd_dep_count", + "hd_vehicle_count"}, + {BIGINT(), BIGINT(), VARCHAR(), INTEGER(), INTEGER()}); + +static RowTypePtr incomeBandRowType = + ROW({"ib_income_band_sk", "ib_lower_bound", "ib_upper_bound"}, + {BIGINT(), INTEGER(), INTEGER()}); + +static RowTypePtr inventoryRowType = ROW( + {"inv_date_sk", "inv_item_sk", "inv_warehouse_sk", "inv_quantity_on_hand"}, + {BIGINT(), BIGINT(), BIGINT(), INTEGER()}); + +static RowTypePtr itemRowType = ROW( + {"i_item_sk", "i_item_id", "i_rec_start_date", "i_rec_end_date", + "i_item_desc", "i_current_price", "i_wholesale_cost", "i_brand_id", + "i_brand", "i_class_id", "i_class", "i_category_id", + "i_category", "i_manufact_id", "i_manufact", "i_size", + "i_formulation", "i_color", "i_units", "i_container", + "i_manager_id", "i_product_name"}, + {BIGINT(), VARCHAR(), DATE(), DATE(), VARCHAR(), DECIMAL(7, 2), + DECIMAL(7, 2), INTEGER(), VARCHAR(), INTEGER(), VARCHAR(), INTEGER(), + VARCHAR(), INTEGER(), VARCHAR(), VARCHAR(), VARCHAR(), VARCHAR(), + VARCHAR(), VARCHAR(), INTEGER(), VARCHAR()}); + +static RowTypePtr promotionRowType = + ROW({"p_promo_sk", + "p_promo_id", + "p_start_date_sk", + "p_end_date_sk", + "p_item_sk", + "p_cost", + "p_response_targe", + "p_promo_name", + "p_channel_dmail", + "p_channel_email", + "p_channel_catalog", + "p_channel_tv", + "p_channel_radio", + "p_channel_press", + "p_channel_event", + "p_channel_demo", + "p_channel_details", + "p_purpose", + "p_discount_active"}, + {BIGINT(), + VARCHAR(), + BIGINT(), + BIGINT(), + BIGINT(), + DECIMAL(15, 2), + INTEGER(), + VARCHAR(), + VARCHAR(), + VARCHAR(), + VARCHAR(), + VARCHAR(), + VARCHAR(), + VARCHAR(), + VARCHAR(), + VARCHAR(), + VARCHAR(), + VARCHAR(), + VARCHAR()}); + +static RowTypePtr reasonRowType = + ROW({"r_reason_sk", "r_reason_id", "r_reason_desc"}, + {BIGINT(), VARCHAR(), VARCHAR()}); + +static RowTypePtr shipModeRowType = + ROW({"sm_ship_mode_sk", + "sm_ship_mode_id", + "sm_type", + "sm_code", + "sm_carrier", + "sm_contract"}, + {BIGINT(), VARCHAR(), VARCHAR(), VARCHAR(), VARCHAR(), VARCHAR()}); + +static RowTypePtr storeRowType = ROW( + { + "s_store_sk", + "s_store_id", + "s_rec_start_date", + "s_rec_end_date", + "s_closed_date_sk", + "s_store_name", + "s_number_employees", + "s_floor_space", + "s_hours", + "s_manager", + "s_market_id", + "s_geography_class", + "s_market_desc", + "s_market_manager", + "s_division_id", + "s_division_name", + "s_company_id", + "s_company_name", + "s_street_number", + "s_street_name", + "s_street_type", + "s_suite_number", + "s_city", + "s_county", + "s_state", + "s_zip", + "s_country", + "s_gmt_offset", + "s_tax_precentage", + }, + { + BIGINT(), VARCHAR(), DATE(), DATE(), BIGINT(), + VARCHAR(), INTEGER(), INTEGER(), VARCHAR(), VARCHAR(), + INTEGER(), VARCHAR(), VARCHAR(), VARCHAR(), INTEGER(), + VARCHAR(), INTEGER(), VARCHAR(), VARCHAR(), VARCHAR(), + VARCHAR(), VARCHAR(), VARCHAR(), VARCHAR(), VARCHAR(), + VARCHAR(), VARCHAR(), DECIMAL(5, 2), DECIMAL(5, 2), + }); + +static RowTypePtr storeReturnsRowType = + ROW({"sr_returned_date_sk", + "sr_return_time_sk", + "sr_item_sk", + "sr_customer_sk", + "sr_cdemo_sk", + "sr_hdemo_sk", + "sr_addr_sk", + "sr_store_sk", + "sr_reason_sk", + "sr_ticket_number", + "sr_return_quantity", + "sr_return_amt", + "sr_return_tax", + "sr_return_amt_inc_tax", + "sr_fee", + "sr_return_ship_cost", + "sr_refunded_cash", + "sr_reversed_charge", + "sr_store_credit", + "sr_net_loss"}, + {BIGINT(), BIGINT(), BIGINT(), BIGINT(), + BIGINT(), BIGINT(), BIGINT(), BIGINT(), + BIGINT(), BIGINT(), INTEGER(), DECIMAL(7, 2), + DECIMAL(7, 2), DECIMAL(7, 2), DECIMAL(7, 2), DECIMAL(7, 2), + DECIMAL(7, 2), DECIMAL(7, 2), DECIMAL(7, 2), DECIMAL(7, 2)}); + +static RowTypePtr storeSalesRowType = ROW( + {"ss_sold_date_sk", "ss_sold_time_sk", "ss_item_sk", + "ss_customer_sk", "ss_cdemo_sk", "ss_hdemo_sk", + "ss_addr_sk", "ss_store_sk", "ss_promo_sk", + "ss_ticket_number", "ss_quantity", "ss_wholesale_cost", + "ss_list_price", "ss_sales_price", "ss_ext_discount_amt", + "ss_ext_sales_price", "ss_ext_wholesale_cost", "ss_ext_list_price", + "ss_ext_tax", "ss_coupon_amt", "ss_net_paid", + "ss_net_paid_inc_tax", "ss_net_profit"}, + {BIGINT(), BIGINT(), BIGINT(), BIGINT(), BIGINT(), + BIGINT(), BIGINT(), BIGINT(), BIGINT(), BIGINT(), + INTEGER(), DECIMAL(7, 2), DECIMAL(7, 2), DECIMAL(7, 2), DECIMAL(7, 2), + DECIMAL(7, 2), DECIMAL(7, 2), DECIMAL(7, 2), DECIMAL(7, 2), DECIMAL(7, 2), + DECIMAL(7, 2), DECIMAL(7, 2), DECIMAL(7, 2)}); + +static RowTypePtr timeDimRowType = + ROW({"t_time_sk", + "t_time_id", + "t_time", + "t_hour", + "t_minute", + "t_second", + "t_am_pm", + "t_shift", + "t_sub_shift", + "t_meal_time"}, + {BIGINT(), + VARCHAR(), + INTEGER(), + INTEGER(), + INTEGER(), + INTEGER(), + VARCHAR(), + VARCHAR(), + VARCHAR(), + VARCHAR()}); + +static RowTypePtr warehouseRowType = + ROW({"w_warehouse_sk", + "w_warehouse_id", + "w_warehouse_name", + "w_warehouse_sq_ft", + "w_street_number", + "w_street_name", + "w_street_type", + "w_suite_number", + "w_city", + "w_county", + "w_state", + "w_zip", + "w_country", + "w_gmt_offset"}, + {BIGINT(), + VARCHAR(), + VARCHAR(), + INTEGER(), + VARCHAR(), + VARCHAR(), + VARCHAR(), + VARCHAR(), + VARCHAR(), + VARCHAR(), + VARCHAR(), + VARCHAR(), + VARCHAR(), + DECIMAL(5, 2)}); + +static RowTypePtr webPageRowType = + ROW({"wp_web_page_sk", + "wp_web_page_id", + "wp_rec_start_date", + "wp_rec_end_date", + "wp_creation_date_sk", + "wp_access_date_sk", + "wp_autogen_flag", + "wp_customer_sk", + "wp_url", + "wp_type", + "wp_char_count", + "wp_link_count", + "wp_image_count", + "wp_max_ad_count"}, + {BIGINT(), + VARCHAR(), + DATE(), + DATE(), + BIGINT(), + BIGINT(), + VARCHAR(), + BIGINT(), + VARCHAR(), + VARCHAR(), + INTEGER(), + INTEGER(), + INTEGER(), + INTEGER()}); + +static RowTypePtr webReturnsRowType = ROW( + {"wr_returned_date_sk", + "wr_returned_time_sk", + "wr_item_sk", + "wr_refunded_customer_sk", + "wr_refunded_cdemo_sk", + "wr_refunded_hdemo_sk", + "wr_refunded_addr_sk", + "wr_returning_customer_sk", + "wr_returning_cdemo_sk", + "wr_returning_hdemo_sk", + "wr_returning_addr_sk", + "wr_web_page_sk", + "wr_reason_sk", + "wr_order_number", + "wr_return_quantity", + "wr_return_amt", + "wr_return_tax", + "wr_return_amt_inc_tax", + "wr_fee", + "wr_return_ship_cost", + "wr_refunded_cash", + "wr_reversed_charge", + "wr_account_credit", + "wr_net_loss"}, + {BIGINT(), BIGINT(), BIGINT(), BIGINT(), BIGINT(), + BIGINT(), BIGINT(), BIGINT(), BIGINT(), BIGINT(), + BIGINT(), BIGINT(), BIGINT(), BIGINT(), INTEGER(), + DECIMAL(7, 2), DECIMAL(7, 2), DECIMAL(7, 2), DECIMAL(7, 2), DECIMAL(7, 2), + DECIMAL(7, 2), DECIMAL(7, 2), DECIMAL(7, 2), DECIMAL(7, 2)}); + +static RowTypePtr webSalesRowType = ROW( + {"ws_sold_date_sk", + "ws_sold_time_sk", + "ws_ship_date_sk", + "ws_item_sk", + "ws_bill_customer_sk", + "ws_bill_cdemo_sk", + "ws_bill_hdemo_sk", + "ws_bill_addr_sk", + "ws_ship_customer_sk", + "ws_ship_cdemo_sk", + "ws_ship_hdemo_sk", + "ws_ship_addr_sk", + "ws_web_page_sk", + "ws_web_site_sk", + "ws_ship_mode_sk", + "ws_warehouse_sk", + "ws_promo_sk", + "ws_order_number", + "ws_quantity", + "ws_wholesale_cost", + "ws_list_price", + "ws_sales_price", + "ws_ext_discount_amt", + "ws_ext_sales_price", + "ws_ext_wholesale_cost", + "ws_ext_list_price", + "ws_ext_tax", + "ws_coupon_amt", + "ws_ext_ship_cost", + "ws_net_paid", + "ws_net_paid_inc_tax", + "ws_net_paid_inc_ship", + "ws_net_paid_inc_ship_tax", + "ws_net_profit"}, + {BIGINT(), BIGINT(), BIGINT(), BIGINT(), BIGINT(), + BIGINT(), BIGINT(), BIGINT(), BIGINT(), BIGINT(), + BIGINT(), BIGINT(), BIGINT(), BIGINT(), BIGINT(), + BIGINT(), BIGINT(), BIGINT(), INTEGER(), DECIMAL(7, 2), + DECIMAL(7, 2), DECIMAL(7, 2), DECIMAL(7, 2), DECIMAL(7, 2), DECIMAL(7, 2), + DECIMAL(7, 2), DECIMAL(7, 2), DECIMAL(7, 2), DECIMAL(7, 2), DECIMAL(7, 2), + DECIMAL(7, 2), DECIMAL(7, 2), DECIMAL(7, 2), DECIMAL(7, 2)}); + +static RowTypePtr websiteRowType = ROW( + {"web_site_sk", "web_site_id", "web_rec_start_date", + "web_rec_end_date", "web_name", "web_open_date_sk", + "web_close_date_sk", "web_class", "web_manager", + "web_mkt_id", "web_mkt_class", "web_mkt_desc", + "web_market_manager", "web_company_id", "web_company_name", + "web_street_number", "web_street_name", "web_street_type", + "web_suite_number", "web_city", "web_county", + "web_state", "web_zip", "web_country", + "web_gmt_offset", "web_tax_percentage"}, + {BIGINT(), VARCHAR(), DATE(), DATE(), VARCHAR(), BIGINT(), + BIGINT(), VARCHAR(), VARCHAR(), INTEGER(), VARCHAR(), VARCHAR(), + VARCHAR(), INTEGER(), VARCHAR(), VARCHAR(), VARCHAR(), VARCHAR(), + VARCHAR(), VARCHAR(), VARCHAR(), VARCHAR(), VARCHAR(), VARCHAR(), + DECIMAL(5, 2), DECIMAL(5, 2)}); + +RowTypePtr getTableSchema(Table table) { + switch (table) { + case Table::TBL_CALL_CENTER: + return callCenterRowType; + case Table::TBL_CATALOG_PAGE: + return catalogPageRowType; + case Table::TBL_CATALOG_RETURNS: + return catalogReturnsRowType; + case Table::TBL_CATALOG_SALES: + return catalogSalesRowType; + case Table::TBL_CUSTOMER: + return customerRowType; + case Table::TBL_CUSTOMER_ADDRESS: + return customerAddressRowType; + case Table::TBL_CUSTOMER_DEMOGRAPHICS: + return customerDemographicsRowType; + case Table::TBL_DATE_DIM: + return dateDimRowType; + case Table::TBL_HOUSEHOLD_DEMOGRAPHICS: + return householdDemographicsRowType; + case Table::TBL_INCOME_BAND: + return incomeBandRowType; + case Table::TBL_INVENTORY: + return inventoryRowType; + case Table::TBL_ITEM: + return itemRowType; + case Table::TBL_PROMOTION: + return promotionRowType; + case Table::TBL_REASON: + return reasonRowType; + case Table::TBL_SHIP_MODE: + return shipModeRowType; + case Table::TBL_STORE: + return storeRowType; + case Table::TBL_STORE_RETURNS: + return storeReturnsRowType; + case Table::TBL_STORE_SALES: + return storeSalesRowType; + case Table::TBL_TIME_DIM: + return timeDimRowType; + case Table::TBL_WAREHOUSE: + return warehouseRowType; + case Table::TBL_WEB_PAGE: + return webPageRowType; + case Table::TBL_WEB_RETURNS: + return webReturnsRowType; + case Table::TBL_WEB_SALES: + return webSalesRowType; + case Table::TBL_WEB_SITE: + return websiteRowType; + default: + VELOX_UNREACHABLE(); + } +} + +std::string toTableName(Table table) { + switch (table) { + case Table::TBL_CALL_CENTER: + return "call_center"; + case Table::TBL_CATALOG_PAGE: + return "catalog_page"; + case Table::TBL_CATALOG_RETURNS: + return "catalog_returns"; + case Table::TBL_CATALOG_SALES: + return "catalog_sales"; + case Table::TBL_CUSTOMER: + return "customer"; + case Table::TBL_CUSTOMER_ADDRESS: + return "customer_address"; + case Table::TBL_CUSTOMER_DEMOGRAPHICS: + return "customer_demographics"; + case Table::TBL_DATE_DIM: + return "date_dim"; + case Table::TBL_HOUSEHOLD_DEMOGRAPHICS: + return "household_demographics"; + case Table::TBL_INCOME_BAND: + return "income_band"; + case Table::TBL_INVENTORY: + return "inventory"; + case Table::TBL_ITEM: + return "item"; + case Table::TBL_PROMOTION: + return "promotion"; + case Table::TBL_REASON: + return "reason"; + case Table::TBL_SHIP_MODE: + return "ship_mode"; + case Table::TBL_STORE: + return "store"; + case Table::TBL_STORE_RETURNS: + return "store_returns"; + case Table::TBL_STORE_SALES: + return "store_sales"; + case Table::TBL_TIME_DIM: + return "time_dim"; + case Table::TBL_WAREHOUSE: + return "warehouse"; + case Table::TBL_WEB_PAGE: + return "web_page"; + case Table::TBL_WEB_RETURNS: + return "web_returns"; + case Table::TBL_WEB_SALES: + return "web_sales"; + case Table::TBL_WEB_SITE: + return "web_site"; + default: + VELOX_UNREACHABLE(); + } +} + +TypePtr resolveTpcdsColumn(Table table, const std::string& columnName) { + return getTableSchema(table)->findChild(columnName); +} + +Table fromTableName(const std::string_view& tableName) { + static std::unordered_map map{ + {"call_center", Table::TBL_CALL_CENTER}, + {"catalog_page", Table::TBL_CATALOG_PAGE}, + {"catalog_returns", Table::TBL_CATALOG_RETURNS}, + {"catalog_sales", Table::TBL_CATALOG_SALES}, + {"customer", Table::TBL_CUSTOMER}, + {"customer_address", Table::TBL_CUSTOMER_ADDRESS}, + {"customer_demographics", Table::TBL_CUSTOMER_DEMOGRAPHICS}, + {"date_dim", Table::TBL_DATE_DIM}, + {"household_demographics", Table::TBL_HOUSEHOLD_DEMOGRAPHICS}, + {"income_band", Table::TBL_INCOME_BAND}, + {"inventory", Table::TBL_INVENTORY}, + {"item", Table::TBL_ITEM}, + {"promotion", Table::TBL_PROMOTION}, + {"reason", Table::TBL_REASON}, + {"ship_mode", Table::TBL_SHIP_MODE}, + {"store", Table::TBL_STORE}, + {"store_returns", Table::TBL_STORE_RETURNS}, + {"store_sales", Table::TBL_STORE_SALES}, + {"time_dim", Table::TBL_TIME_DIM}, + {"warehouse", Table::TBL_WAREHOUSE}, + {"web_page", Table::TBL_WEB_PAGE}, + {"web_returns", Table::TBL_WEB_RETURNS}, + {"web_sales", Table::TBL_WEB_SALES}, + {"web_site", Table::TBL_WEB_SITE}, + }; + + auto it = map.find(tableName); + if (it != map.end()) { + return it->second; + } + throw std::invalid_argument( + fmt::format("Invalid TPC-DS table name: '{}'", tableName)); +} + +RowVectorPtr genTpcdsData( + Table table, + size_t maxRows, + size_t offset, + memory::MemoryPool* pool, + double scaleFactor, + vector_size_t parallel, + vector_size_t child) { + switch (table) { + case Table::TBL_CATALOG_RETURNS: + return genTpcdsParentAndChildTable( + pool, + maxRows, + offset, + scaleFactor, + parallel, + child, + Table::TBL_CATALOG_SALES, + Table::TBL_CATALOG_RETURNS, + true); + case Table::TBL_CATALOG_SALES: + return genTpcdsParentAndChildTable( + pool, + maxRows, + offset, + scaleFactor, + parallel, + child, + Table::TBL_CATALOG_SALES, + Table::TBL_CATALOG_RETURNS, + false); + case Table::TBL_WEB_RETURNS: + return genTpcdsParentAndChildTable( + pool, + maxRows, + offset, + scaleFactor, + parallel, + child, + Table::TBL_WEB_SALES, + Table::TBL_WEB_RETURNS, + true); + case Table::TBL_WEB_SALES: + return genTpcdsParentAndChildTable( + pool, + maxRows, + offset, + scaleFactor, + parallel, + child, + Table::TBL_WEB_SALES, + Table::TBL_WEB_RETURNS, + false); + case Table::TBL_STORE_RETURNS: + return genTpcdsParentAndChildTable( + pool, + maxRows, + offset, + scaleFactor, + parallel, + child, + Table::TBL_STORE_SALES, + Table::TBL_STORE_RETURNS, + true); + case Table::TBL_STORE_SALES: + return genTpcdsParentAndChildTable( + pool, + maxRows, + offset, + scaleFactor, + parallel, + child, + Table::TBL_STORE_SALES, + Table::TBL_STORE_RETURNS, + false); + case Table::TBL_CALL_CENTER: + case Table::TBL_CATALOG_PAGE: + case Table::TBL_CUSTOMER: + case Table::TBL_CUSTOMER_ADDRESS: + case Table::TBL_CUSTOMER_DEMOGRAPHICS: + case Table::TBL_DATE_DIM: + case Table::TBL_HOUSEHOLD_DEMOGRAPHICS: + case Table::TBL_INCOME_BAND: + case Table::TBL_INVENTORY: + case Table::TBL_ITEM: + case Table::TBL_PROMOTION: + case Table::TBL_REASON: + case Table::TBL_SHIP_MODE: + case Table::TBL_STORE: + case Table::TBL_TIME_DIM: + case Table::TBL_WAREHOUSE: + case Table::TBL_WEB_PAGE: + case Table::TBL_WEB_SITE: + return genTpcdsTableData( + table, pool, maxRows, offset, scaleFactor, parallel, child); + default: + VELOX_UNREACHABLE(); + } +} +} // namespace facebook::presto::connector::tpcds diff --git a/presto-native-execution/presto_cpp/main/connectors/tpcds/TpcdsGen.h b/presto-native-execution/presto_cpp/main/connectors/tpcds/TpcdsGen.h new file mode 100644 index 0000000000000..373b907bee34b --- /dev/null +++ b/presto-native-execution/presto_cpp/main/connectors/tpcds/TpcdsGen.h @@ -0,0 +1,99 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include "velox/common/memory/Memory.h" +#include "velox/vector/ComplexVector.h" + +using namespace facebook::velox; + +namespace facebook::presto::connector::tpcds { + +/// This file uses TPC-DS DSDGEN to generate data encoded using Velox Vectors. + +enum class Table : uint8_t { + TBL_CALL_CENTER, + TBL_CATALOG_PAGE, + TBL_CATALOG_RETURNS, + TBL_CATALOG_SALES, + TBL_CUSTOMER, + TBL_CUSTOMER_ADDRESS, + TBL_CUSTOMER_DEMOGRAPHICS, + TBL_DATE_DIM, + TBL_HOUSEHOLD_DEMOGRAPHICS, + TBL_INCOME_BAND, + TBL_INVENTORY, + TBL_ITEM, + TBL_PROMOTION, + TBL_REASON, + TBL_SHIP_MODE, + TBL_STORE, + TBL_STORE_RETURNS, + TBL_STORE_SALES, + TBL_TIME_DIM, + TBL_WAREHOUSE, + TBL_WEB_PAGE, + TBL_WEB_RETURNS, + TBL_WEB_SALES, + TBL_WEB_SITE +}; + +static constexpr auto tables = { + tpcds::Table::TBL_CALL_CENTER, + tpcds::Table::TBL_CATALOG_PAGE, + tpcds::Table::TBL_CATALOG_RETURNS, + tpcds::Table::TBL_CATALOG_SALES, + tpcds::Table::TBL_CUSTOMER, + tpcds::Table::TBL_CUSTOMER_ADDRESS, + tpcds::Table::TBL_CUSTOMER_DEMOGRAPHICS, + tpcds::Table::TBL_DATE_DIM, + tpcds::Table::TBL_HOUSEHOLD_DEMOGRAPHICS, + tpcds::Table::TBL_INCOME_BAND, + tpcds::Table::TBL_INVENTORY, + tpcds::Table::TBL_ITEM, + tpcds::Table::TBL_PROMOTION, + tpcds::Table::TBL_REASON, + tpcds::Table::TBL_SHIP_MODE, + tpcds::Table::TBL_STORE, + tpcds::Table::TBL_STORE_RETURNS, + tpcds::Table::TBL_STORE_SALES, + tpcds::Table::TBL_TIME_DIM, + tpcds::Table::TBL_WAREHOUSE, + tpcds::Table::TBL_WEB_PAGE, + tpcds::Table::TBL_WEB_RETURNS, + tpcds::Table::TBL_WEB_SALES, + tpcds::Table::TBL_WEB_SITE}; + +// Returns table name as a string. +std::string toTableName(Table table); + +/// Returns the schema (RowType) for a particular TPC-DS table. +RowTypePtr getTableSchema(Table table); + +/// Returns the type of a particular table:column pair. Throws if `columnName` +/// does not exist in `table`. +TypePtr resolveTpcdsColumn(Table table, const std::string& columnName); + +Table fromTableName(const std::string_view& tableName); + +RowVectorPtr genTpcdsData( + Table table, + size_t maxRows, + size_t offset, + memory::MemoryPool* pool, + double scaleFactor, + vector_size_t parallel, + vector_size_t child); +} // namespace facebook::presto::connector::tpcds diff --git a/presto-native-execution/presto_cpp/main/connectors/tpcds/utils/append_info-c.cpp b/presto-native-execution/presto_cpp/main/connectors/tpcds/utils/append_info-c.cpp new file mode 100644 index 0000000000000..b542be208ada0 --- /dev/null +++ b/presto-native-execution/presto_cpp/main/connectors/tpcds/utils/append_info-c.cpp @@ -0,0 +1,159 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "append_info-c.hpp" + +#include +#include +#include "presto_cpp/external/dsdgen/include/dsdgen-c/append_info.h" +#include "presto_cpp/external/dsdgen/include/dsdgen-c/config.h" +#include "presto_cpp/external/dsdgen/include/dsdgen-c/date.h" +#include "presto_cpp/external/dsdgen/include/dsdgen-c/nulls.h" +#include "presto_cpp/external/dsdgen/include/dsdgen-c/porting.h" +#include "velox/vector/FlatVector.h" + +using namespace facebook::presto::connector::tpcds; +append_info* append_info_get(void* info_list, int table_id) { + auto& append_vector = + *((std::vector>*)info_list); + return (append_info*)append_vector[table_id].get(); +} + +bool facebook::presto::connector::tpcds::tpcds_table_def::IsNull( + int32_t column) { + return nullCheck(column, *dsdGenContext); +} + +void append_row_start(append_info info) { + auto append_info = (tpcds_table_def*)info; +} + +void append_row_end(append_info info) { + auto append_info = (tpcds_table_def*)info; + append_info->colIndex %= append_info->children.size(); + append_info->rowIndex++; +} + +void append_varchar( + int32_t column, + append_info info, + const char* value, + bool fillEmptyStringAsNull) { + auto append_info = (tpcds_table_def*)info; + if (((append_info->IsNull(column)) || (!value) || (*value == '\0')) && + (fillEmptyStringAsNull)) { + append_info->children[append_info->colIndex]->setNull( + append_info->rowIndex, true); + } else { + append_info->children[append_info->colIndex] + ->asFlatVector() + ->set(append_info->rowIndex, value); + } + append_info->colIndex++; +} + +void append_varchar( + int32_t column, + append_info info, + std::string value, + bool fillEmptyStringAsNull) { + append_varchar(column, info, value.data(), fillEmptyStringAsNull); +} + +void append_key(int32_t column, append_info info, int64_t value) { + auto append_info = (tpcds_table_def*)info; + if (append_info->IsNull(column) || value < 0) { + append_info->children[append_info->colIndex]->setNull( + append_info->rowIndex, true); + } else { + append_info->children[append_info->colIndex]->asFlatVector()->set( + append_info->rowIndex, value); + } + append_info->colIndex++; +} + +void append_integer(int32_t column, append_info info, int32_t value) { + auto append_info = (tpcds_table_def*)info; + if (append_info->IsNull(column)) { + append_info->children[append_info->colIndex]->setNull( + append_info->rowIndex, true); + } else { + append_info->children[append_info->colIndex]->asFlatVector()->set( + append_info->rowIndex, value); + } + append_info->colIndex++; +} + +void append_boolean(int32_t column, append_info info, int32_t value) { + auto append_info = (tpcds_table_def*)info; + if (append_info->IsNull(column)) { + append_info->children[append_info->colIndex]->setNull( + append_info->rowIndex, true); + } else { + append_info->children[append_info->colIndex]->asFlatVector()->set( + append_info->rowIndex, value != 0); + } + append_info->colIndex++; +} + +// value is a Julian date +// FIXME: direct int conversion, offsets should be constant +void append_date(int32_t column, append_info info, int64_t value) { + auto append_info = (tpcds_table_def*)info; + if (append_info->IsNull(column) || value < 0) { + append_info->children[append_info->colIndex]->setNull( + append_info->rowIndex, true); + } else { + date_t dTemp; + jtodt(&dTemp, (int)value); + auto stringDate = + fmt::format("{}-{}-{}", dTemp.year, dTemp.month, dTemp.day); + auto date = DATE()->toDays(stringDate); + append_info->children[append_info->colIndex]->asFlatVector()->set( + append_info->rowIndex, date); + } + append_info->colIndex++; +} + +void append_decimal(int32_t column, append_info info, decimal_t* val) { + auto append_info = (tpcds_table_def*)info; + if (append_info->IsNull(column)) { + append_info->children[append_info->colIndex]->setNull( + append_info->rowIndex, true); + } else { + auto type = append_info->children[append_info->colIndex]->type(); + if (type->isShortDecimal()) { + append_info->children[append_info->colIndex] + ->asFlatVector() + ->set(append_info->rowIndex, val->number); + } else { + append_info->children[append_info->colIndex] + ->asFlatVector() + ->set(append_info->rowIndex, val->number); + } + } + append_info->colIndex++; +} + +void append_integer_decimal(int32_t column, append_info info, int32_t value) { + auto append_info = (tpcds_table_def*)info; + if (append_info->IsNull(column)) { + append_info->children[append_info->colIndex]->setNull( + append_info->rowIndex, true); + } else { + append_info->children[append_info->colIndex]->asFlatVector()->set( + append_info->rowIndex, (int64_t)value * 100); + } + append_info->colIndex++; +} diff --git a/presto-native-execution/presto_cpp/main/connectors/tpcds/utils/append_info-c.hpp b/presto-native-execution/presto_cpp/main/connectors/tpcds/utils/append_info-c.hpp new file mode 100644 index 0000000000000..1593f23780030 --- /dev/null +++ b/presto-native-execution/presto_cpp/main/connectors/tpcds/utils/append_info-c.hpp @@ -0,0 +1,37 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include "append_info-c.hpp" +#include +#include "presto_cpp/external/dsdgen/include/dsdgen-c/dist.h" +#include "velox/vector/BaseVector.h" +#include "velox/vector/ComplexVector.h" + +using namespace facebook::velox; +namespace facebook::presto::connector::tpcds { + +struct tpcds_table_def { + const char *name; + int fl_small; + int fl_child; + int first_column; + int colIndex = 0; + int rowIndex = 0; + DSDGenContext* dsdGenContext; + std::vector children; + bool IsNull(int32_t column); +}; +} // namespace facebook::presto::connector::tpcds \ No newline at end of file diff --git a/presto-native-execution/presto_cpp/main/tests/CMakeLists.txt b/presto-native-execution/presto_cpp/main/tests/CMakeLists.txt index 9635db335915b..2d8e2ea9b0df5 100644 --- a/presto-native-execution/presto_cpp/main/tests/CMakeLists.txt +++ b/presto-native-execution/presto_cpp/main/tests/CMakeLists.txt @@ -38,6 +38,7 @@ target_link_libraries( $ $ velox_hive_connector + presto_tpcds_connector velox_tpch_connector velox_presto_serializer velox_functions_prestosql diff --git a/presto-native-execution/presto_cpp/main/types/CMakeLists.txt b/presto-native-execution/presto_cpp/main/types/CMakeLists.txt index ecdd1bb30b7ff..1dd3da3f76a1d 100644 --- a/presto-native-execution/presto_cpp/main/types/CMakeLists.txt +++ b/presto-native-execution/presto_cpp/main/types/CMakeLists.txt @@ -22,7 +22,7 @@ add_dependencies(presto_types presto_operators presto_type_converter velox_type velox_type_fbhive velox_dwio_dwrf_proto) target_link_libraries(presto_types presto_type_converter velox_type_fbhive - velox_hive_partition_function velox_tpch_gen) + velox_hive_partition_function velox_tpch_gen tpcds_gen) set_property(TARGET presto_types PROPERTY JOB_POOL_LINK presto_link_job_pool) diff --git a/presto-native-execution/presto_cpp/main/types/PrestoToVeloxConnector.cpp b/presto-native-execution/presto_cpp/main/types/PrestoToVeloxConnector.cpp index faa4f93eaf67a..91d7f30bf5ec8 100644 --- a/presto-native-execution/presto_cpp/main/types/PrestoToVeloxConnector.cpp +++ b/presto-native-execution/presto_cpp/main/types/PrestoToVeloxConnector.cpp @@ -14,6 +14,8 @@ #include "presto_cpp/main/types/PrestoToVeloxConnector.h" #include +#include "presto_cpp/main/connectors/tpcds/TpcdsConnector.h" +#include "presto_cpp/main/connectors/tpcds/TpcdsConnectorSplit.h" #include "velox/connectors/hive/HiveConnector.h" #include "velox/connectors/hive/HiveConnectorSplit.h" #include "velox/connectors/hive/HiveDataSink.h" @@ -23,6 +25,7 @@ #include "velox/connectors/tpch/TpchConnector.h" #include "velox/connectors/tpch/TpchConnectorSplit.h" +using namespace facebook::velox; namespace facebook::presto { namespace { @@ -106,15 +109,16 @@ TypePtr stringToType( return typeParser.parse(typeString); } -connector::hive::HiveColumnHandle::ColumnType toHiveColumnType( +velox::connector::hive::HiveColumnHandle::ColumnType toHiveColumnType( protocol::ColumnType type) { switch (type) { case protocol::ColumnType::PARTITION_KEY: - return connector::hive::HiveColumnHandle::ColumnType::kPartitionKey; + return velox::connector::hive::HiveColumnHandle::ColumnType:: + kPartitionKey; case protocol::ColumnType::REGULAR: - return connector::hive::HiveColumnHandle::ColumnType::kRegular; + return velox::connector::hive::HiveColumnHandle::ColumnType::kRegular; case protocol::ColumnType::SYNTHESIZED: - return connector::hive::HiveColumnHandle::ColumnType::kSynthesized; + return velox::connector::hive::HiveColumnHandle::ColumnType::kSynthesized; default: VELOX_UNSUPPORTED( "Unsupported Hive column type: {}.", toJsonString(type)); @@ -783,7 +787,7 @@ std::unique_ptr toFilter( VELOX_UNSUPPORTED("Unsupported filter found."); } -std::unique_ptr toHiveTableHandle( +std::unique_ptr toHiveTableHandle( const protocol::TupleDomain& domainPredicate, const std::shared_ptr& remainingPredicate, bool isPushdownFilterEnabled, @@ -793,7 +797,7 @@ std::unique_ptr toHiveTableHandle( const protocol::Map& tableParameters, const VeloxExprConverter& exprConverter, const TypeParser& typeParser) { - connector::hive::SubfieldFilters subfieldFilters; + velox::connector::hive::SubfieldFilters subfieldFilters; auto domains = domainPredicate.domains; for (const auto& domain : *domains) { auto filter = domain.second; @@ -833,7 +837,7 @@ std::unique_ptr toHiveTableHandle( } if (tableParameters.empty()) { - return std::make_unique( + return std::make_unique( tableHandle.connectorId, tableName, isPushdownFilterEnabled, @@ -848,7 +852,7 @@ std::unique_ptr toHiveTableHandle( finalTableParameters[key] = value; } - return std::make_unique( + return std::make_unique( tableHandle.connectorId, tableName, isPushdownFilterEnabled, @@ -858,7 +862,7 @@ std::unique_ptr toHiveTableHandle( finalTableParameters); } -connector::hive::LocationHandle::TableType toTableType( +velox::connector::hive::LocationHandle::TableType toTableType( protocol::TableType tableType) { switch (tableType) { case protocol::TableType::NEW: @@ -866,17 +870,17 @@ connector::hive::LocationHandle::TableType toTableType( // So they can be treated as New. They do not require Append or Overwrite // semantics as applicable for regular tables. case protocol::TableType::TEMPORARY: - return connector::hive::LocationHandle::TableType::kNew; + return velox::connector::hive::LocationHandle::TableType::kNew; case protocol::TableType::EXISTING: - return connector::hive::LocationHandle::TableType::kExisting; + return velox::connector::hive::LocationHandle::TableType::kExisting; default: VELOX_UNSUPPORTED("Unsupported table type: {}.", toJsonString(tableType)); } } -std::shared_ptr toLocationHandle( +std::shared_ptr toLocationHandle( const protocol::LocationHandle& locationHandle) { - return std::make_shared( + return std::make_shared( locationHandle.targetPath, locationHandle.writePath, toTableType(locationHandle.tableType)); @@ -975,8 +979,8 @@ toHiveSortingColumns(const protocol::List& sortedBy) { std::shared_ptr toHiveBucketProperty( - const std::vector>& - inputColumns, + const std::vector>& inputColumns, const std::shared_ptr& bucketProperty, const TypeParser& typeParser) { if (bucketProperty == nullptr) { @@ -1248,7 +1252,7 @@ HivePrestoToVeloxConnector::toVeloxInsertTableHandle( const auto table = hiveInsertTableHandle->pageSinkMetadata.table; VELOX_USER_CHECK_NOT_NULL(table, "Table must not be null for insert query"); - return std::make_unique( + return std::make_unique( inputColumns, toLocationHandle(hiveInsertTableHandle->locationHandle), toFileFormat(hiveInsertTableHandle->tableStorageFormat, "TableWrite"), @@ -1261,20 +1265,20 @@ HivePrestoToVeloxConnector::toVeloxInsertTableHandle( table->storage.serdeParameters.end())); } -std::vector> +std::vector> HivePrestoToVeloxConnector::toHiveColumns( const protocol::List& inputColumns, const TypeParser& typeParser, bool& hasPartitionColumn) const { hasPartitionColumn = false; - std::vector> + std::vector> hiveColumns; hiveColumns.reserve(inputColumns.size()); for (const auto& columnHandle : inputColumns) { hasPartitionColumn |= columnHandle.columnType == protocol::ColumnType::PARTITION_KEY; hiveColumns.emplace_back( - std::dynamic_pointer_cast( + std::dynamic_pointer_cast( std::shared_ptr(toVeloxColumnHandle(&columnHandle, typeParser)))); } return hiveColumns; @@ -1299,7 +1303,7 @@ HivePrestoToVeloxConnector::createVeloxPartitionFunctionSpec( "Unsupported Hive bucket function type: {}", toJsonString(hivePartitioningHandle->bucketFunctionType)); effectivelyGather = hivePartitioningHandle->bucketCount == 1; - return std::make_unique( + return std::make_unique( hivePartitioningHandle->bucketCount, bucketToPartition, channels, @@ -1360,7 +1364,7 @@ IcebergPrestoToVeloxConnector::toVeloxSplit( {"$data_sequence_number", std::to_string(icebergSplit->dataSequenceNumber)}); - return std::make_unique( + return std::make_unique( catalogId, icebergSplit->path, toVeloxFileFormat(icebergSplit->fileFormat), @@ -1385,7 +1389,7 @@ IcebergPrestoToVeloxConnector::toVeloxColumnHandle( // TODO(imjalpreet): Modify 'hiveType' argument of the 'HiveColumnHandle' // constructor similar to how Hive Connector is handling for bucketing velox::type::fbhive::HiveTypeParser hiveTypeParser; - return std::make_unique( + return std::make_unique( icebergColumn->columnIdentity.name, toHiveColumnType(icebergColumn->columnType), stringToType(icebergColumn->type, typeParser), @@ -1470,7 +1474,7 @@ TpchPrestoToVeloxConnector::toVeloxSplit( auto tpchSplit = dynamic_cast(connectorSplit); VELOX_CHECK_NOT_NULL( tpchSplit, "Unexpected split type {}", connectorSplit->_type); - return std::make_unique( + return std::make_unique( catalogId, tpchSplit->totalParts, tpchSplit->partNumber); } @@ -1481,7 +1485,7 @@ TpchPrestoToVeloxConnector::toVeloxColumnHandle( auto tpchColumn = dynamic_cast(column); VELOX_CHECK_NOT_NULL( tpchColumn, "Unexpected column handle type {}", column->_type); - return std::make_unique( + return std::make_unique( tpchColumn->columnName); } @@ -1500,7 +1504,7 @@ TpchPrestoToVeloxConnector::toVeloxTableHandle( tpchLayout, "Unexpected layout type {}", tableHandle.connectorTableLayout->_type); - return std::make_unique( + return std::make_unique( tableHandle.connectorId, tpch::fromTableName(tpchLayout->table.tableName), tpchLayout->table.scaleFactor); @@ -1510,4 +1514,52 @@ std::unique_ptr TpchPrestoToVeloxConnector::createConnectorProtocol() const { return std::make_unique(); } + +std::unique_ptr +TpcdsPrestoToVeloxConnector::toVeloxSplit( + const protocol::ConnectorId& catalogId, + const protocol::ConnectorSplit* const connectorSplit) const { + auto tpcdsSplit = dynamic_cast(connectorSplit); + VELOX_CHECK_NOT_NULL( + tpcdsSplit, "Unexpected split type {}", connectorSplit->_type); + return std::make_unique( + catalogId, tpcdsSplit->totalParts, tpcdsSplit->partNumber); +} + +std::unique_ptr +TpcdsPrestoToVeloxConnector::toVeloxColumnHandle( + const protocol::ColumnHandle* column, + const TypeParser& typeParser) const { + auto tpcdsColumn = dynamic_cast(column); + VELOX_CHECK_NOT_NULL( + tpcdsColumn, "Unexpected column handle type {}", column->_type); + return std::make_unique( + tpcdsColumn->columnName); +} + +std::unique_ptr +TpcdsPrestoToVeloxConnector::toVeloxTableHandle( + const protocol::TableHandle& tableHandle, + const VeloxExprConverter& exprConverter, + const TypeParser& typeParser, + std::unordered_map< + std::string, + std::shared_ptr>& assignments) const { + auto tpcdsLayout = + std::dynamic_pointer_cast( + tableHandle.connectorTableLayout); + VELOX_CHECK_NOT_NULL( + tpcdsLayout, + "Unexpected layout type {}", + tableHandle.connectorTableLayout->_type); + return std::make_unique( + tableHandle.connectorId, + presto::connector::tpcds::fromTableName(tpcdsLayout->table.tableName), + tpcdsLayout->table.scaleFactor); +} + +std::unique_ptr +TpcdsPrestoToVeloxConnector::createConnectorProtocol() const { + return std::make_unique(); +} } // namespace facebook::presto diff --git a/presto-native-execution/presto_cpp/main/types/PrestoToVeloxConnector.h b/presto-native-execution/presto_cpp/main/types/PrestoToVeloxConnector.h index 754aaeddbef05..f00348805b456 100644 --- a/presto-native-execution/presto_cpp/main/types/PrestoToVeloxConnector.h +++ b/presto-native-execution/presto_cpp/main/types/PrestoToVeloxConnector.h @@ -210,4 +210,30 @@ class TpchPrestoToVeloxConnector final : public PrestoToVeloxConnector { std::unique_ptr createConnectorProtocol() const final; }; + +class TpcdsPrestoToVeloxConnector final : public PrestoToVeloxConnector { + public: + explicit TpcdsPrestoToVeloxConnector(std::string connectorId) + : PrestoToVeloxConnector(std::move(connectorId)) {} + + std::unique_ptr toVeloxSplit( + const protocol::ConnectorId& catalogId, + const protocol::ConnectorSplit* connectorSplit) const final; + + std::unique_ptr toVeloxColumnHandle( + const protocol::ColumnHandle* column, + const TypeParser& typeParser) const final; + + std::unique_ptr toVeloxTableHandle( + const protocol::TableHandle& tableHandle, + const VeloxExprConverter& exprConverter, + const TypeParser& typeParser, + std::unordered_map< + std::string, + std::shared_ptr>& assignments) + const final; + + std::unique_ptr createConnectorProtocol() + const final; +}; } // namespace facebook::presto diff --git a/presto-native-execution/presto_cpp/main/types/tests/CMakeLists.txt b/presto-native-execution/presto_cpp/main/types/tests/CMakeLists.txt index 4cd79ed1bf5bb..f10efab107253 100644 --- a/presto-native-execution/presto_cpp/main/types/tests/CMakeLists.txt +++ b/presto-native-execution/presto_cpp/main/types/tests/CMakeLists.txt @@ -20,6 +20,7 @@ target_link_libraries( GTest::gtest_main presto_operators presto_protocol + presto_tpcds_connector velox_hive_connector velox_tpch_connector velox_exec @@ -48,6 +49,7 @@ target_link_libraries( $ $ presto_operators + presto_tpcds_connector velox_core velox_dwio_common_exception velox_encode @@ -83,6 +85,7 @@ target_link_libraries( presto_to_velox_connector_test presto_protocol presto_operators + presto_tpcds_connector presto_type_converter presto_types velox_hive_connector diff --git a/presto-native-execution/presto_cpp/presto_protocol/ConnectorProtocol.h b/presto-native-execution/presto_cpp/presto_protocol/ConnectorProtocol.h index b91131791bb0c..9108aa1577b2c 100644 --- a/presto-native-execution/presto_cpp/presto_protocol/ConnectorProtocol.h +++ b/presto-native-execution/presto_cpp/presto_protocol/ConnectorProtocol.h @@ -272,4 +272,15 @@ using SystemConnectorProtocol = ConnectorProtocolTemplate< SystemTransactionHandle, NotImplemented>; +using TpcdsConnectorProtocol = ConnectorProtocolTemplate< + TpcdsTableHandle, + TpcdsTableLayoutHandle, + TpcdsColumnHandle, + NotImplemented, + NotImplemented, + TpcdsSplit, + TpcdsPartitioningHandle, + TpcdsTransactionHandle, + NotImplemented>; + } // namespace facebook::presto::protocol diff --git a/presto-native-execution/presto_cpp/presto_protocol/presto_protocol.cpp b/presto-native-execution/presto_cpp/presto_protocol/presto_protocol.cpp index 990a23c8564e5..585c5d998b600 100644 --- a/presto-native-execution/presto_cpp/presto_protocol/presto_protocol.cpp +++ b/presto-native-execution/presto_cpp/presto_protocol/presto_protocol.cpp @@ -991,6 +991,36 @@ void from_json(const json& j, std::shared_ptr& p) { * limitations under the License. */ +// TpcdsTransactionHandle is special since +// the corresponding class in Java is an enum. + +namespace facebook::presto::protocol { + +void to_json(json& j, const TpcdsTransactionHandle& p) { + j = json::array(); + j.push_back(p._type); + j.push_back(p.instance); +} + +void from_json(const json& j, TpcdsTransactionHandle& p) { + j[0].get_to(p._type); + j[1].get_to(p.instance); +} +} // namespace facebook::presto::protocol +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + // TpchTransactionHandle is special since // the corresponding class in Java is an enum. @@ -1022,6 +1052,7 @@ void from_json(const json& j, TpchTransactionHandle& p) { */ // dependency TpchTransactionHandle +// dependency TpcdsTransactionHandle namespace facebook::presto::protocol { void to_json(json& j, const std::shared_ptr& p) { @@ -8058,6 +8089,7 @@ static const std::pair IcebergTableType_enum_table[] = {IcebergTableType::MANIFESTS, "MANIFESTS"}, {IcebergTableType::PARTITIONS, "PARTITIONS"}, {IcebergTableType::FILES, "FILES"}, + {IcebergTableType::REFS, "REFS"}, {IcebergTableType::PROPERTIES, "PROPERTIES"}, {IcebergTableType::CHANGELOG, "CHANGELOG"}, {IcebergTableType::EQUALITY_DELETES, "EQUALITY_DELETES"}, @@ -13410,6 +13442,178 @@ void from_json(const json& j, TopNRowNumberNode& p) { } } // namespace facebook::presto::protocol namespace facebook::presto::protocol { +TpcdsColumnHandle::TpcdsColumnHandle() noexcept { + _type = "tpcds"; +} + +void to_json(json& j, const TpcdsColumnHandle& p) { + j = json::object(); + j["@type"] = "tpcds"; + to_json_key( + j, + "columnName", + p.columnName, + "TpcdsColumnHandle", + "String", + "columnName"); + to_json_key(j, "type", p.type, "TpcdsColumnHandle", "Type", "type"); +} + +void from_json(const json& j, TpcdsColumnHandle& p) { + p._type = j["@type"]; + from_json_key( + j, + "columnName", + p.columnName, + "TpcdsColumnHandle", + "String", + "columnName"); + from_json_key(j, "type", p.type, "TpcdsColumnHandle", "Type", "type"); +} +} // namespace facebook::presto::protocol +namespace facebook::presto::protocol { +TpcdsPartitioningHandle::TpcdsPartitioningHandle() noexcept { + _type = "tpcds"; +} + +void to_json(json& j, const TpcdsPartitioningHandle& p) { + j = json::object(); + j["@type"] = "tpcds"; + to_json_key( + j, "table", p.table, "TpcdsPartitioningHandle", "String", "table"); + to_json_key( + j, + "totalRows", + p.totalRows, + "TpcdsPartitioningHandle", + "int64_t", + "totalRows"); +} + +void from_json(const json& j, TpcdsPartitioningHandle& p) { + p._type = j["@type"]; + from_json_key( + j, "table", p.table, "TpcdsPartitioningHandle", "String", "table"); + from_json_key( + j, + "totalRows", + p.totalRows, + "TpcdsPartitioningHandle", + "int64_t", + "totalRows"); +} +} // namespace facebook::presto::protocol +namespace facebook::presto::protocol { +TpcdsTableHandle::TpcdsTableHandle() noexcept { + _type = "tpcds"; +} + +void to_json(json& j, const TpcdsTableHandle& p) { + j = json::object(); + j["@type"] = "tpcds"; + to_json_key( + j, "tableName", p.tableName, "TpcdsTableHandle", "String", "tableName"); + to_json_key( + j, + "scaleFactor", + p.scaleFactor, + "TpcdsTableHandle", + "double", + "scaleFactor"); +} + +void from_json(const json& j, TpcdsTableHandle& p) { + p._type = j["@type"]; + from_json_key( + j, "tableName", p.tableName, "TpcdsTableHandle", "String", "tableName"); + from_json_key( + j, + "scaleFactor", + p.scaleFactor, + "TpcdsTableHandle", + "double", + "scaleFactor"); +} +} // namespace facebook::presto::protocol +namespace facebook::presto::protocol { +TpcdsSplit::TpcdsSplit() noexcept { + _type = "tpcds"; +} + +void to_json(json& j, const TpcdsSplit& p) { + j = json::object(); + j["@type"] = "tpcds"; + to_json_key( + j, + "tableHandle", + p.tableHandle, + "TpcdsSplit", + "TpcdsTableHandle", + "tableHandle"); + to_json_key(j, "partNumber", p.partNumber, "TpcdsSplit", "int", "partNumber"); + to_json_key(j, "totalParts", p.totalParts, "TpcdsSplit", "int", "totalParts"); + to_json_key( + j, + "addresses", + p.addresses, + "TpcdsSplit", + "List", + "addresses"); + to_json_key(j, "noSexism", p.noSexism, "TpcdsSplit", "bool", "noSexism"); +} + +void from_json(const json& j, TpcdsSplit& p) { + p._type = j["@type"]; + from_json_key( + j, + "tableHandle", + p.tableHandle, + "TpcdsSplit", + "TpcdsTableHandle", + "tableHandle"); + from_json_key( + j, "partNumber", p.partNumber, "TpcdsSplit", "int", "partNumber"); + from_json_key( + j, "totalParts", p.totalParts, "TpcdsSplit", "int", "totalParts"); + from_json_key( + j, + "addresses", + p.addresses, + "TpcdsSplit", + "List", + "addresses"); + from_json_key(j, "noSexism", p.noSexism, "TpcdsSplit", "bool", "noSexism"); +} +} // namespace facebook::presto::protocol +namespace facebook::presto::protocol { +TpcdsTableLayoutHandle::TpcdsTableLayoutHandle() noexcept { + _type = "tpcds"; +} + +void to_json(json& j, const TpcdsTableLayoutHandle& p) { + j = json::object(); + j["@type"] = "tpcds"; + to_json_key( + j, + "table", + p.table, + "TpcdsTableLayoutHandle", + "TpcdsTableHandle", + "table"); +} + +void from_json(const json& j, TpcdsTableLayoutHandle& p) { + p._type = j["@type"]; + from_json_key( + j, + "table", + p.table, + "TpcdsTableLayoutHandle", + "TpcdsTableHandle", + "table"); +} +} // namespace facebook::presto::protocol +namespace facebook::presto::protocol { TpchColumnHandle::TpchColumnHandle() noexcept { _type = "tpch"; } diff --git a/presto-native-execution/presto_cpp/presto_protocol/presto_protocol.h b/presto-native-execution/presto_cpp/presto_protocol/presto_protocol.h index 942dd11ac730d..43a2846c9b3ab 100644 --- a/presto-native-execution/presto_cpp/presto_protocol/presto_protocol.h +++ b/presto-native-execution/presto_cpp/presto_protocol/presto_protocol.h @@ -477,6 +477,34 @@ struct AllOrNoneValueSet : public ValueSet { }; void to_json(json& j, const AllOrNoneValueSet& p); void from_json(const json& j, AllOrNoneValueSet& p); +} // namespace facebook::presto::protocol +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +// TpcdsTransactionHandle is special since +// the corresponding class in Java is an enum. + +namespace facebook::presto::protocol { + +struct TpcdsTransactionHandle : public ConnectorTransactionHandle { + String instance = {}; +}; + +void to_json(json& j, const TpcdsTransactionHandle& p); + +void from_json(const json& j, TpcdsTransactionHandle& p); + } // namespace facebook::presto::protocol /* * Licensed under the Apache License, Version 2.0 (the "License"); @@ -2149,6 +2177,7 @@ enum class IcebergTableType { MANIFESTS, PARTITIONS, FILES, + REFS, PROPERTIES, CHANGELOG, EQUALITY_DELETES, @@ -3093,6 +3122,58 @@ struct TopNRowNumberNode : public PlanNode { void to_json(json& j, const TopNRowNumberNode& p); void from_json(const json& j, TopNRowNumberNode& p); } // namespace facebook::presto::protocol +namespace facebook::presto::protocol { +struct TpcdsColumnHandle : public ColumnHandle { + String columnName = {}; + Type type = {}; + + TpcdsColumnHandle() noexcept; +}; +void to_json(json& j, const TpcdsColumnHandle& p); +void from_json(const json& j, TpcdsColumnHandle& p); +} // namespace facebook::presto::protocol +namespace facebook::presto::protocol { +struct TpcdsPartitioningHandle : public ConnectorPartitioningHandle { + String table = {}; + int64_t totalRows = {}; + + TpcdsPartitioningHandle() noexcept; +}; +void to_json(json& j, const TpcdsPartitioningHandle& p); +void from_json(const json& j, TpcdsPartitioningHandle& p); +} // namespace facebook::presto::protocol +namespace facebook::presto::protocol { +struct TpcdsTableHandle : public ConnectorTableHandle { + String tableName = {}; + double scaleFactor = {}; + + TpcdsTableHandle() noexcept; +}; +void to_json(json& j, const TpcdsTableHandle& p); +void from_json(const json& j, TpcdsTableHandle& p); +} // namespace facebook::presto::protocol +namespace facebook::presto::protocol { +struct TpcdsSplit : public ConnectorSplit { + TpcdsTableHandle tableHandle = {}; + int partNumber = {}; + int totalParts = {}; + List addresses = {}; + bool noSexism = {}; + + TpcdsSplit() noexcept; +}; +void to_json(json& j, const TpcdsSplit& p); +void from_json(const json& j, TpcdsSplit& p); +} // namespace facebook::presto::protocol +namespace facebook::presto::protocol { +struct TpcdsTableLayoutHandle : public ConnectorTableLayoutHandle { + TpcdsTableHandle table = {}; + + TpcdsTableLayoutHandle() noexcept; +}; +void to_json(json& j, const TpcdsTableLayoutHandle& p); +void from_json(const json& j, TpcdsTableLayoutHandle& p); +} // namespace facebook::presto::protocol /* * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/presto-native-execution/presto_cpp/presto_protocol/presto_protocol.yml b/presto-native-execution/presto_cpp/presto_protocol/presto_protocol.yml index cee79cb865dc7..7da9cd86dc013 100644 --- a/presto-native-execution/presto_cpp/presto_protocol/presto_protocol.yml +++ b/presto-native-execution/presto_cpp/presto_protocol/presto_protocol.yml @@ -53,6 +53,7 @@ AbstractClasses: - { name: IcebergColumnHandle, key: hive-iceberg } - { name: TpchColumnHandle, key: tpch } - { name: SystemColumnHandle, key: $system@system } + - { name: TpcdsColumnHandle, key: tpcds } ConnectorPartitioningHandle: super: JsonEncodedSubclass @@ -60,6 +61,7 @@ AbstractClasses: - { name: SystemPartitioningHandle, key: $remote } - { name: HivePartitioningHandle, key: hive} - { name: TpchPartitioningHandle, key: tpch} + - { name: TpcdsPartitioningHandle, key: tpcds} ConnectorTableHandle: super: JsonEncodedSubclass @@ -68,6 +70,7 @@ AbstractClasses: - { name: IcebergTableHandle, key: hive-iceberg } - { name: TpchTableHandle, key: tpch } - { name: SystemTableHandle, key: $system@system } + - { name: TpcdsTableHandle, key: tpcds } ConnectorOutputTableHandle: super: JsonEncodedSubclass @@ -85,6 +88,8 @@ AbstractClasses: - { name: HiveTransactionHandle, key: hive } - { name: RemoteTransactionHandle, key: $remote } - { name: SystemTransactionHandle, key: $system@system } + - { name: TpchTransactionHandle, key: tpch } + - { name: TpcdsTransactionHandle, key: tpcds } ConnectorTableLayoutHandle: super: JsonEncodedSubclass @@ -93,6 +98,7 @@ AbstractClasses: - { name: IcebergTableLayoutHandle, key: hive-iceberg } - { name: TpchTableLayoutHandle, key: tpch } - { name: SystemTableLayoutHandle, key: $system@system } + - { name: TpcdsTableLayoutHandle, key: tpcds } ConnectorMetadataUpdateHandle: super: JsonEncodedSubclass @@ -105,6 +111,7 @@ AbstractClasses: - { name: HiveSplit, key: hive } - { name: IcebergSplit, key: hive-iceberg } - { name: TpchSplit, key: tpch } + - { name: TpcdsSplit, key: tpcds } - { name: RemoteSplit, key: $remote } - { name: EmptySplit, key: $empty } - { name: SystemSplit, key: $system@system } @@ -265,6 +272,11 @@ JavaClasses: - presto-tpch/src/main/java/com/facebook/presto/tpch/TpchTableLayoutHandle.java - presto-tpch/src/main/java/com/facebook/presto/tpch/TpchColumnHandle.java - presto-tpch/src/main/java/com/facebook/presto/tpch/TpchPartitioningHandle.java + - presto-tpcds/src/main/java/com/facebook/presto/tpcds/TpcdsSplit.java + - presto-tpcds/src/main/java/com/facebook/presto/tpcds/TpcdsTableHandle.java + - presto-tpcds/src/main/java/com/facebook/presto/tpcds/TpcdsTableLayoutHandle.java + - presto-tpcds/src/main/java/com/facebook/presto/tpcds/TpcdsColumnHandle.java + - presto-tpcds/src/main/java/com/facebook/presto/tpcds/TpcdsPartitioningHandle.java - presto-spi/src/main/java/com/facebook/presto/spi/plan/LimitNode.java - presto-spi/src/main/java/com/facebook/presto/spi/function/LongVariableConstraint.java - presto-common/src/main/java/com/facebook/presto/common/predicate/Marker.java diff --git a/presto-native-execution/presto_cpp/presto_protocol/special/ConnectorTransactionHandle.cpp.inc b/presto-native-execution/presto_cpp/presto_protocol/special/ConnectorTransactionHandle.cpp.inc index 8ec2a94e84bd9..1b89a65615b1a 100644 --- a/presto-native-execution/presto_cpp/presto_protocol/special/ConnectorTransactionHandle.cpp.inc +++ b/presto-native-execution/presto_cpp/presto_protocol/special/ConnectorTransactionHandle.cpp.inc @@ -13,6 +13,7 @@ */ // dependency TpchTransactionHandle +// dependency TpcdsTransactionHandle namespace facebook::presto::protocol { void to_json(json& j, const std::shared_ptr& p) { diff --git a/presto-native-execution/presto_cpp/presto_protocol/special/TpcdsTransactionHandle.cpp.inc b/presto-native-execution/presto_cpp/presto_protocol/special/TpcdsTransactionHandle.cpp.inc new file mode 100644 index 0000000000000..f741669d72dc7 --- /dev/null +++ b/presto-native-execution/presto_cpp/presto_protocol/special/TpcdsTransactionHandle.cpp.inc @@ -0,0 +1,30 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +// TpcdsTransactionHandle is special since +// the corresponding class in Java is an enum. + +namespace facebook::presto::protocol { + +void to_json(json& j, const TpcdsTransactionHandle& p) { + j = json::array(); + j.push_back(p._type); + j.push_back(p.instance); +} + +void from_json(const json& j, TpcdsTransactionHandle& p) { + j[0].get_to(p._type); + j[1].get_to(p.instance); +} +} // namespace facebook::presto::protocol diff --git a/presto-native-execution/presto_cpp/presto_protocol/special/TpcdsTransactionHandle.hpp.inc b/presto-native-execution/presto_cpp/presto_protocol/special/TpcdsTransactionHandle.hpp.inc new file mode 100644 index 0000000000000..d665925648813 --- /dev/null +++ b/presto-native-execution/presto_cpp/presto_protocol/special/TpcdsTransactionHandle.hpp.inc @@ -0,0 +1,28 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +// TpcdsTransactionHandle is special since +// the corresponding class in Java is an enum. + +namespace facebook::presto::protocol { + +struct TpcdsTransactionHandle : public ConnectorTransactionHandle { + String instance = {}; +}; + +void to_json(json& j, const TpcdsTransactionHandle& p); + +void from_json(const json& j, TpcdsTransactionHandle& p); + +} // namespace facebook::presto::protocol diff --git a/presto-native-execution/src/test/java/com/facebook/presto/nativeworker/AbstractTestNativeTpcdsConnectorQueries.java b/presto-native-execution/src/test/java/com/facebook/presto/nativeworker/AbstractTestNativeTpcdsConnectorQueries.java new file mode 100644 index 0000000000000..6979598e25454 --- /dev/null +++ b/presto-native-execution/src/test/java/com/facebook/presto/nativeworker/AbstractTestNativeTpcdsConnectorQueries.java @@ -0,0 +1,83 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.nativeworker; + +import com.facebook.presto.Session; +import com.facebook.presto.tests.AbstractTestQueryFramework; +import org.testng.annotations.Test; + +public abstract class AbstractTestNativeTpcdsConnectorQueries + extends AbstractTestQueryFramework +{ + @Override + public Session getSession() + { + return Session.builder(super.getSession()).setCatalog("tpcds").setSchema("tiny").build(); + } + + @Test + public void testTpcdsTinyTablesRowCount() + { + Session session = getSession(); + assertQuery(session, "SELECT count(*) FROM catalog_returns"); + assertQuery(session, "SELECT count(*) FROM catalog_sales"); + assertQuery(session, "SELECT count(*) FROM store_sales"); + assertQuery(session, "SELECT count(*) FROM store_returns"); + assertQuery(session, "SELECT count(*) FROM web_sales"); + assertQuery(session, "SELECT count(*) FROM web_returns"); + assertQuery(session, "SELECT count(*) FROM inventory"); + assertQuery(session, "SELECT count(*) FROM item"); + assertQuery(session, "SELECT count(*) FROM customer_address"); + assertQuery(session, "SELECT count(*) FROM customer_demographics"); + assertQuery(session, "SELECT count(*) FROM call_center"); + assertQuery(session, "SELECT count(*) FROM customer"); + assertQuery(session, "SELECT count(*) FROM web_site"); + assertQuery(session, "SELECT count(*) FROM web_page"); + assertQuery(session, "SELECT count(*) FROM promotion"); + assertQuery(session, "SELECT count(*) FROM reason"); + assertQuery(session, "SELECT count(*) FROM store"); + assertQuery(session, "SELECT count(*) FROM income_band"); + assertQuery(session, "SELECT count(*) FROM household_demographics"); + assertQuery(session, "SELECT count(*) FROM warehouse"); + assertQuery(session, "SELECT count(*) FROM catalog_page"); + assertQuery(session, "SELECT count(*) FROM date_dim"); + assertQuery(session, "SELECT count(*) FROM time_dim"); + assertQuery(session, "SELECT count(*) FROM ship_mode"); + } + + @Test + public void testTpcdsBasicQueries() + { + Session session = getSession(); + assertQuery(session, "SELECT cc_call_center_sk, cc_name, cc_manager, cc_mkt_id, trim(cast(cc_mkt_class as varchar)) FROM call_center"); + assertQuery(session, "SELECT ss_store_sk, SUM(ss_net_paid) AS total_sales " + + "FROM store_sales GROUP BY ss_store_sk ORDER BY total_sales DESC LIMIT 10"); + assertQuery(session, "SELECT sr_item_sk, SUM(sr_return_quantity) AS total_returns " + + "FROM store_returns WHERE sr_item_sk = 12345 GROUP BY sr_item_sk"); + assertQuery(session, "SELECT ws_order_number, SUM(ws_net_paid) AS total_paid FROM web_sales " + + "WHERE ws_sold_date_sk BETWEEN 2451180 AND 2451545 GROUP BY ws_order_number"); + assertQuery(session, "SELECT inv_item_sk, inv_quantity_on_hand FROM inventory WHERE inv_quantity_on_hand > 1000 " + + "ORDER BY inv_quantity_on_hand DESC"); + assertQuery(session, "SELECT SUM(ss_net_paid) AS total_revenue FROM store_sales, promotion " + + "WHERE p_promo_sk = 100 GROUP BY p_promo_sk"); + assertQuery(session, "SELECT trim(cast(c.c_customer_id as varchar)) FROM customer c " + + "JOIN customer_demographics cd ON c.c_customer_sk = cd.cd_demo_sk WHERE cd_purchase_estimate > 5000"); + assertQuery(session, "SELECT trim(cast(cd_gender as varchar)), AVG(cd_purchase_estimate) AS avg_purchase_estimate FROM customer_demographics" + + " GROUP BY cd_gender ORDER BY avg_purchase_estimate DESC"); + + // No row passes the filter. + assertQuery(session, + "SELECT s_store_sk, s_store_id, s_number_employees FROM store WHERE s_number_employees > 1000"); + } +} diff --git a/presto-native-execution/src/test/java/com/facebook/presto/nativeworker/PrestoNativeQueryRunnerUtils.java b/presto-native-execution/src/test/java/com/facebook/presto/nativeworker/PrestoNativeQueryRunnerUtils.java index 69bb2433b3f18..f0953b9a4e3a2 100644 --- a/presto-native-execution/src/test/java/com/facebook/presto/nativeworker/PrestoNativeQueryRunnerUtils.java +++ b/presto-native-execution/src/test/java/com/facebook/presto/nativeworker/PrestoNativeQueryRunnerUtils.java @@ -478,6 +478,10 @@ public static Optional> getExternalWorkerLaunc Files.write(catalogDirectoryPath.resolve("tpchstandard.properties"), format("connector.name=tpch%n").getBytes()); + // Add a tpcds catalog. + Files.write(catalogDirectoryPath.resolve("tpcds.properties"), + format("connector.name=tpcds%n").getBytes()); + // Disable stack trace capturing as some queries (using TRY) generate a lot of exceptions. return new ProcessBuilder(prestoServerPath, "--logtostderr=1", "--v=1") .directory(tempDirectoryPath.toFile()) diff --git a/presto-native-execution/src/test/java/com/facebook/presto/nativeworker/TestPrestoNativeTpcdsConnectorQueries.java b/presto-native-execution/src/test/java/com/facebook/presto/nativeworker/TestPrestoNativeTpcdsConnectorQueries.java new file mode 100644 index 0000000000000..50c3cc3a10037 --- /dev/null +++ b/presto-native-execution/src/test/java/com/facebook/presto/nativeworker/TestPrestoNativeTpcdsConnectorQueries.java @@ -0,0 +1,35 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.nativeworker; + +import com.facebook.presto.testing.ExpectedQueryRunner; +import com.facebook.presto.testing.QueryRunner; + +public class TestPrestoNativeTpcdsConnectorQueries + extends AbstractTestNativeTpcdsConnectorQueries +{ + @Override + protected QueryRunner createQueryRunner() + throws Exception + { + return PrestoNativeQueryRunnerUtils.createNativeQueryRunner(true); + } + + @Override + protected ExpectedQueryRunner createExpectedQueryRunner() + throws Exception + { + return PrestoNativeQueryRunnerUtils.createJavaQueryRunner(); + } +} diff --git a/presto-tpcds/src/main/java/com/facebook/presto/tpcds/TpcdsConnectorFactory.java b/presto-tpcds/src/main/java/com/facebook/presto/tpcds/TpcdsConnectorFactory.java index 814e56e3136d9..1f5c9b88fb445 100644 --- a/presto-tpcds/src/main/java/com/facebook/presto/tpcds/TpcdsConnectorFactory.java +++ b/presto-tpcds/src/main/java/com/facebook/presto/tpcds/TpcdsConnectorFactory.java @@ -65,6 +65,7 @@ public Connector create(String catalogName, Map config, Connecto { int splitsPerNode = getSplitsPerNode(config); NodeManager nodeManager = context.getNodeManager(); + Boolean isNativeExecution = context.getConnectorSystemConfig().isNativeExecution(); return new Connector() { @Override @@ -76,7 +77,7 @@ public ConnectorTransactionHandle beginTransaction(IsolationLevel isolationLevel @Override public ConnectorMetadata getMetadata(ConnectorTransactionHandle transactionHandle) { - return new TpcdsMetadata(); + return new TpcdsMetadata(isNativeExecution); } @Override diff --git a/presto-tpcds/src/main/java/com/facebook/presto/tpcds/TpcdsMetadata.java b/presto-tpcds/src/main/java/com/facebook/presto/tpcds/TpcdsMetadata.java index d571057317e39..6933eed679133 100644 --- a/presto-tpcds/src/main/java/com/facebook/presto/tpcds/TpcdsMetadata.java +++ b/presto-tpcds/src/main/java/com/facebook/presto/tpcds/TpcdsMetadata.java @@ -55,6 +55,8 @@ public class TpcdsMetadata implements ConnectorMetadata { + private final boolean isNativeExecution; + public static final String TINY_SCHEMA_NAME = "tiny"; public static final double TINY_SCALE_FACTOR = 0.01; @@ -64,8 +66,9 @@ public class TpcdsMetadata private final Set tableNames; private final TpcdsTableStatisticsFactory tpcdsTableStatisticsFactory = new TpcdsTableStatisticsFactory(); - public TpcdsMetadata() + public TpcdsMetadata(boolean isNativeExecution) { + this.isNativeExecution = isNativeExecution; ImmutableSet.Builder tableNames = ImmutableSet.builder(); for (Table tpcdsTable : Table.getBaseTables()) { tableNames.add(tpcdsTable.getName().toLowerCase(ENGLISH)); @@ -134,14 +137,14 @@ public ConnectorTableMetadata getTableMetadata(ConnectorSession session, Connect Table table = Table.getTable(tpcdsTableHandle.getTableName()); String schemaName = scaleFactorSchemaName(tpcdsTableHandle.getScaleFactor()); - return getTableMetadata(schemaName, table); + return getTableMetadata(schemaName, table, isNativeExecution); } - private static ConnectorTableMetadata getTableMetadata(String schemaName, Table tpcdsTable) + private static ConnectorTableMetadata getTableMetadata(String schemaName, Table tpcdsTable, boolean isNativeExecution) { ImmutableList.Builder columns = ImmutableList.builder(); for (Column column : tpcdsTable.getColumns()) { - columns.add(new ColumnMetadata(column.getName(), getPrestoType(column.getType()))); + columns.add(new ColumnMetadata(column.getName(), getPrestoType(column.getType(), isNativeExecution))); } SchemaTableName tableName = new SchemaTableName(schemaName, tpcdsTable.getName()); return new ConnectorTableMetadata(tableName, columns.build()); @@ -189,7 +192,7 @@ public Map> listTableColumns(ConnectorSess for (String schemaName : getSchemaNames(session, Optional.ofNullable(prefix.getSchemaName()))) { for (Table tpcdsTable : Table.getBaseTables()) { if (prefix.getTableName() == null || tpcdsTable.getName().equals(prefix.getTableName())) { - ConnectorTableMetadata tableMetadata = getTableMetadata(schemaName, tpcdsTable); + ConnectorTableMetadata tableMetadata = getTableMetadata(schemaName, tpcdsTable, isNativeExecution); tableColumns.put(new SchemaTableName(schemaName, tpcdsTable.getName()), tableMetadata.getColumns()); } } @@ -243,7 +246,7 @@ public static double schemaNameToScaleFactor(String schemaName) } } - public static Type getPrestoType(ColumnType tpcdsType) + public static Type getPrestoType(ColumnType tpcdsType, boolean isNativeExecution) { switch (tpcdsType.getBase()) { case IDENTIFIER: @@ -254,8 +257,15 @@ public static Type getPrestoType(ColumnType tpcdsType) return DateType.DATE; case DECIMAL: return createDecimalType(tpcdsType.getPrecision().get(), tpcdsType.getScale().get()); - case CHAR: - return createCharType(tpcdsType.getPrecision().get()); + case CHAR: { + if (isNativeExecution) { + // Presto native does not support CHAR type yet. + return createVarcharType(tpcdsType.getPrecision().get()); + } + else { + return createCharType(tpcdsType.getPrecision().get()); + } + } case VARCHAR: return createVarcharType(tpcdsType.getPrecision().get()); case TIME: diff --git a/presto-tpcds/src/main/java/com/facebook/presto/tpcds/TpcdsRecordSet.java b/presto-tpcds/src/main/java/com/facebook/presto/tpcds/TpcdsRecordSet.java index a3821d2cdc307..4480dcf36fdaf 100644 --- a/presto-tpcds/src/main/java/com/facebook/presto/tpcds/TpcdsRecordSet.java +++ b/presto-tpcds/src/main/java/com/facebook/presto/tpcds/TpcdsRecordSet.java @@ -57,7 +57,7 @@ public TpcdsRecordSet(Results results, List columns) this.columns = ImmutableList.copyOf(columns); ImmutableList.Builder columnTypes = ImmutableList.builder(); for (Column column : columns) { - columnTypes.add(getPrestoType(column.getType())); + columnTypes.add(getPrestoType(column.getType(), false)); } this.columnTypes = columnTypes.build(); } @@ -103,7 +103,7 @@ public long getReadTimeNanos() @Override public Type getType(int field) { - return getPrestoType(columns.get(field).getType()); + return getPrestoType(columns.get(field).getType(), false); } @Override diff --git a/presto-tpcds/src/main/java/com/facebook/presto/tpcds/TpcdsSplitManager.java b/presto-tpcds/src/main/java/com/facebook/presto/tpcds/TpcdsSplitManager.java index 77cd902330026..10ae57007cb6a 100644 --- a/presto-tpcds/src/main/java/com/facebook/presto/tpcds/TpcdsSplitManager.java +++ b/presto-tpcds/src/main/java/com/facebook/presto/tpcds/TpcdsSplitManager.java @@ -24,10 +24,13 @@ import com.facebook.presto.spi.connector.ConnectorTransactionHandle; import com.google.common.collect.ImmutableList; +import java.util.HashSet; import java.util.Set; import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkState; +import static java.util.Arrays.asList; +import static java.util.Collections.unmodifiableSet; import static java.util.Objects.requireNonNull; public class TpcdsSplitManager @@ -62,12 +65,24 @@ public ConnectorSplitSource getSplits( int totalParts = nodes.size() * splitsPerNode; int partNumber = 0; - // Split the data using split and skew by the number of nodes available. + // For larger tables, split the data using split and skew by the number of nodes available. + // The TPCDS connector in presto native uses dsdgen-c for data generation. For certain smaller tables, + // the data cannot be generated in parallel. For these cases, a single split should be processed by + // only one of the worker nodes. + Set smallTables = unmodifiableSet(new HashSet<>(asList("call_center", "item", "store", "web_page", "web_site"))); ImmutableList.Builder splits = ImmutableList.builder(); - for (Node node : nodes) { - for (int i = 0; i < splitsPerNode; i++) { - splits.add(new TpcdsSplit(tableHandle, partNumber, totalParts, ImmutableList.of(node.getHostAndPort()), noSexism)); - partNumber++; + if (smallTables.contains(tableHandle.getTableName())) { + Node node = nodes.stream() + .findFirst() + .orElse(null); + splits.add(new TpcdsSplit(tableHandle, 0, 1, ImmutableList.of(node.getHostAndPort()), noSexism)); + } + else { + for (Node node : nodes) { + for (int i = 0; i < splitsPerNode; i++) { + splits.add(new TpcdsSplit(tableHandle, partNumber, totalParts, ImmutableList.of(node.getHostAndPort()), noSexism)); + partNumber++; + } } } return new FixedSplitSource(splits.build()); diff --git a/presto-tpcds/src/test/java/com/facebook/presto/tpcds/TestTpcdsMetadataStatistics.java b/presto-tpcds/src/test/java/com/facebook/presto/tpcds/TestTpcdsMetadataStatistics.java index 5d732fff6e1b8..432ef7c665f43 100644 --- a/presto-tpcds/src/test/java/com/facebook/presto/tpcds/TestTpcdsMetadataStatistics.java +++ b/presto-tpcds/src/test/java/com/facebook/presto/tpcds/TestTpcdsMetadataStatistics.java @@ -45,7 +45,7 @@ public class TestTpcdsMetadataStatistics { private static final EstimateAssertion estimateAssertion = new EstimateAssertion(0.01); private static final ConnectorSession session = null; - private final TpcdsMetadata metadata = new TpcdsMetadata(); + private final TpcdsMetadata metadata = new TpcdsMetadata(false); @Test public void testNoTableStatsForNotSupportedSchema()