diff --git a/velox/connectors/Connector.h b/velox/connectors/Connector.h index 220f104cb590..b51bbc22ad13 100644 --- a/velox/connectors/Connector.h +++ b/velox/connectors/Connector.h @@ -118,6 +118,11 @@ class ConnectorTableHandle : public ISerializable { VELOX_UNSUPPORTED(); } + /// Returns true if the connector table handle supports index lookup. + virtual bool supportsIndexLookup() const { + return false; + } + virtual folly::dynamic serialize() const override; protected: @@ -282,10 +287,10 @@ class DataSource { } }; -/// Collection of context data for use in a DataSource or DataSink. One instance -/// of this per DataSource and DataSink. This may be passed between threads but -/// methods must be invoked sequentially. Serializing use is the responsibility -/// of the caller. +/// Collection of context data for use in a DataSource, IndexSource or DataSink. +/// One instance of this per DataSource and DataSink. This may be passed between +/// threads but methods must be invoked sequentially. Serializing use is the +/// responsibility of the caller. class ConnectorQueryCtx { public: ConnectorQueryCtx( @@ -327,9 +332,9 @@ class ConnectorQueryCtx { return operatorPool_; } - /// Returns the connector's memory pool which is an aggregate kind of memory - /// pool, used for the data sink for table write that needs the hierarchical - /// memory pool management, such as HiveDataSink. + /// Returns the connector's memory pool which is an aggregate kind of + /// memory pool, used for the data sink for table write that needs the + /// hierarchical memory pool management, such as HiveDataSink. memory::MemoryPool* connectorMemoryPool() const { return connectorPool_; } @@ -354,10 +359,10 @@ class ConnectorQueryCtx { return cache_; } - /// This is a combination of task id and the scan's PlanNodeId. This is an id - /// that allows sharing state between different threads of the same scan. This - /// is used for locating a scanTracker, which tracks the read density of - /// columns for prefetch and other memory hierarchy purposes. + /// This is a combination of task id and the scan's PlanNodeId. This is an + /// id that allows sharing state between different threads of the same + /// scan. This is used for locating a scanTracker, which tracks the read + /// density of columns for prefetch and other memory hierarchy purposes. const std::string& scanId() const { return scanId_; } @@ -441,8 +446,8 @@ class Connector { VELOX_NYI("connectorConfig is not supported yet"); } - /// Returns true if this connector would accept a filter dynamically generated - /// during query execution. + /// Returns true if this connector would accept a filter dynamically + /// generated during query execution. virtual bool canAddDynamicFilter() const { return false; } @@ -469,6 +474,11 @@ class Connector { return false; } + /// Returns true if the connector supports index lookup, otherwise false. + virtual bool supportsIndexLookup() const { + return false; + } + virtual std::unique_ptr createDataSink( RowTypePtr inputType, std::shared_ptr connectorInsertTableHandle, @@ -517,9 +527,9 @@ class ConnectorFactory { const std::string name_; }; -/// Adds a factory for creating connectors to the registry using connector name -/// as the key. Throws if factor with the same name is already present. Always -/// returns true. The return value makes it easy to use with +/// Adds a factory for creating connectors to the registry using connector +/// name as the key. Throws if factor with the same name is already present. +/// Always returns true. The return value makes it easy to use with /// FB_ANONYMOUS_VARIABLE. bool registerConnectorFactory(std::shared_ptr factory); @@ -528,12 +538,12 @@ bool registerConnectorFactory(std::shared_ptr factory); bool hasConnectorFactory(const std::string& connectorName); /// Unregister a connector factory by name. -/// Returns true if a connector with the specified name has been unregistered, -/// false otherwise. +/// Returns true if a connector with the specified name has been +/// unregistered, false otherwise. bool unregisterConnectorFactory(const std::string& connectorName); -/// Returns a factory for creating connectors with the specified name. Throws if -/// factory doesn't exist. +/// Returns a factory for creating connectors with the specified name. +/// Throws if factory doesn't exist. std::shared_ptr getConnectorFactory( const std::string& connectorName); @@ -542,14 +552,16 @@ std::shared_ptr getConnectorFactory( /// true. The return value makes it easy to use with FB_ANONYMOUS_VARIABLE. bool registerConnector(std::shared_ptr connector); -/// Removes the connector with specified ID from the registry. Returns true if -/// connector was removed and false if connector didn't exist. +/// Removes the connector with specified ID from the registry. Returns true +/// if connector was removed and false if connector didn't exist. bool unregisterConnector(const std::string& connectorId); -/// Returns a connector with specified ID. Throws if connector doesn't exist. +/// Returns a connector with specified ID. Throws if connector doesn't +/// exist. std::shared_ptr getConnector(const std::string& connectorId); -/// Returns a map of all (connectorId -> connector) pairs currently registered. +/// Returns a map of all (connectorId -> connector) pairs currently +/// registered. const std::unordered_map>& getAllConnectors(); diff --git a/velox/core/PlanNode.cpp b/velox/core/PlanNode.cpp index a86b9343a182..66fda5f72e70 100644 --- a/velox/core/PlanNode.cpp +++ b/velox/core/PlanNode.cpp @@ -40,6 +40,17 @@ std::vector deserializeSources( return {}; } +std::vector deserializeJoinConditions( + const folly::dynamic& obj, + void* context) { + if (obj.count("joinConditions") == 0) { + return {}; + } + + return ISerializable::deserialize>( + obj["joinConditions"], context); +} + PlanNodePtr deserializeSingleSource(const folly::dynamic& obj, void* context) { auto sources = deserializeSources(obj, context); VELOX_CHECK_EQ(1, sources.size()); @@ -1405,6 +1416,75 @@ PlanNodePtr MergeJoinNode::create(const folly::dynamic& obj, void* context) { outputType); } +PlanNodePtr IndexLookupJoinNode::create( + const folly::dynamic& obj, + void* context) { + auto sources = deserializeSources(obj, context); + VELOX_CHECK_EQ(2, sources.size()); + TableScanNodePtr lookupSource = + std::dynamic_pointer_cast(sources[1]); + VELOX_CHECK_NOT_NULL(lookupSource); + + auto leftKeys = deserializeFields(obj["leftKeys"], context); + auto rightKeys = deserializeFields(obj["rightKeys"], context); + + VELOX_CHECK_EQ(obj.count("filter"), 0); + + auto joinConditions = deserializeJoinConditions(obj, context); + + auto outputType = deserializeRowType(obj["outputType"]); + + return std::make_shared( + deserializePlanNodeId(obj), + joinTypeFromName(obj["joinType"].asString()), + std::move(leftKeys), + std::move(rightKeys), + std::move(joinConditions), + sources[0], + std::move(lookupSource), + std::move(outputType)); +} + +folly::dynamic IndexLookupJoinNode::serialize() const { + auto obj = serializeBase(); + if (!joinConditions_.empty()) { + folly::dynamic serializedJoins = folly::dynamic::array; + serializedJoins.reserve(joinConditions_.size()); + for (const auto& joinCondition : joinConditions_) { + serializedJoins.push_back(joinCondition->serialize()); + } + obj["joinConditions"] = std::move(serializedJoins); + } + return obj; +} + +void IndexLookupJoinNode::addDetails(std::stringstream& stream) const { + AbstractJoinNode::addDetails(stream); + if (joinConditions_.empty()) { + return; + } + + std::vector joinConditionStrs; + joinConditionStrs.reserve(joinConditions_.size()); + for (const auto& joinCondition : joinConditions_) { + joinConditionStrs.push_back(joinCondition->toString()); + } + stream << ", joinConditions: [" << folly::join(", ", joinConditionStrs) + << " ]"; +} + +// static +bool IndexLookupJoinNode::isSupported(core::JoinType joinType) { + switch (joinType) { + case core::JoinType::kInner: + [[fallthrough]]; + case core::JoinType::kLeft: + return true; + default: + return false; + } +} + NestedLoopJoinNode::NestedLoopJoinNode( const PlanNodeId& id, JoinType joinType, @@ -2726,6 +2806,7 @@ void PlanNode::registerSerDe() { registry.Register("HashJoinNode", HashJoinNode::create); registry.Register("MergeExchangeNode", MergeExchangeNode::create); registry.Register("MergeJoinNode", MergeJoinNode::create); + registry.Register("IndexLookupJoinNode", IndexLookupJoinNode::create); registry.Register("NestedLoopJoinNode", NestedLoopJoinNode::create); registry.Register("LimitNode", LimitNode::create); registry.Register("LocalMergeNode", LocalMergeNode::create); diff --git a/velox/core/PlanNode.h b/velox/core/PlanNode.h index ad69363cf29f..84cac1c7e468 100644 --- a/velox/core/PlanNode.h +++ b/velox/core/PlanNode.h @@ -1762,12 +1762,118 @@ class MergeJoinNode : public AbstractJoinNode { folly::dynamic serialize() const override; - /// If merge join supports this join type. + /// Returns true if the merge join supports this join type, otherwise false. static bool isSupported(JoinType joinType); static PlanNodePtr create(const folly::dynamic& obj, void* context); }; +/// Represents index lookup join. Translates to an exec::IndexLookupJoin +/// operator. Assumes the right input is a table scan source node that provides +/// indexed table lookup for the left input with the specified join keys and +/// conditions. The join keys must be a prefix of the index columns of the +/// lookup table. Each join condition must use columns from both sides. For the +/// right side, it can only use one index column. Each index column can either +/// be a join key or a join condition once. The table scan node of the right +/// input is translated to a connector::IndexSource within +/// exec::IndexLookupJoin. +/// +/// Only INNER and LEFT joins are supported. +/// +/// Take the following query for example, t is left table, r is the right table +/// with indexed columns. 'sid' is the join keys. 'u.event_type in t.event_list' +/// is the join condition. +/// +/// SELECT t.sid, t.day_ts, u.event_type +/// FROM t LEFT JOIN u +/// ON t.sid = u.sid +/// AND contains(t.event_list, u.event_type) +/// AND t.ds BETWEEN '2024-01-01' AND '2024-01-07' +/// +/// Here, +/// - 'joinType' is JoinType::kLeft +/// - 'left' describes scan of t with a filter on 'ds':t.ds BETWEEN '2024-01-01' +/// AND '2024-01-07' +/// - 'right' describes indexed table 'u' with ndex keys sid, event_type(and +/// maybe some more) +/// - 'leftKeys' is a list of one key 't.sid' +/// - 'rightKeys' is a list of one key 'u.sid' +/// - 'joinConditions' is a list of one expression: contains(t.event_list, +/// u.event_type) +/// - 'outputType' contains 3 columns : t.sid, t.day_ts, u.event_type +/// +class IndexLookupJoinNode : public AbstractJoinNode { + public: + /// @param joinType Specifies the lookup join type. Only INNER and LEFT joins + /// are supported. + IndexLookupJoinNode( + const PlanNodeId& id, + JoinType joinType, + const std::vector& leftKeys, + const std::vector& rightKeys, + const std::vector& joinConditions, + PlanNodePtr left, + TableScanNodePtr right, + RowTypePtr outputType) + : AbstractJoinNode( + id, + joinType, + leftKeys, + rightKeys, + /*filter=*/nullptr, + std::move(left), + right, + outputType), + lookupSourceNode_(std::move(right)), + joinConditions_(joinConditions) { + VELOX_USER_CHECK( + !leftKeys.empty(), + "The lookup join node requires at least one join key"); + VELOX_USER_CHECK_EQ( + leftKeys_.size(), + rightKeys_.size(), + "The lookup join node requires same number of join keys on left and right sides"); + // TODO: add check that (1) 'rightKeys_' form an index prefix. each of + // 'joinConditions_' uses columns from both sides and uses exactly one index + // column from the right side. + VELOX_USER_CHECK( + lookupSourceNode_->tableHandle()->supportsIndexLookup(), + "The lookup table handle {} from connector {} doesn't support index lookup", + lookupSourceNode_->tableHandle()->name(), + lookupSourceNode_->tableHandle()->connectorId()); + VELOX_USER_CHECK( + isSupported(joinType_), + "Unsupported index lookup join type {}", + joinTypeName(joinType_)); + } + + const TableScanNodePtr& lookupSource() const { + return lookupSourceNode_; + } + + const std::vector& joinConditions() const { + return joinConditions_; + } + + std::string_view name() const override { + return "IndexLookupJoin"; + } + + folly::dynamic serialize() const override; + + static PlanNodePtr create(const folly::dynamic& obj, void* context); + + /// Returns true if the lookup join supports this join type, otherwise false. + static bool isSupported(JoinType joinType); + + private: + void addDetails(std::stringstream& stream) const override; + + const TableScanNodePtr lookupSourceNode_; + + const std::vector joinConditions_; +}; + /// Represents inner/outer nested loop joins. Translates to an /// exec::NestedLoopJoinProbe and exec::NestedLoopJoinBuild. A separate /// pipeline is produced for the build side when generating exec::Operators. diff --git a/velox/exec/tests/CMakeLists.txt b/velox/exec/tests/CMakeLists.txt index 4a0f43fbb15a..6a76674b8aad 100644 --- a/velox/exec/tests/CMakeLists.txt +++ b/velox/exec/tests/CMakeLists.txt @@ -51,6 +51,7 @@ add_executable( HashJoinTest.cpp HashPartitionFunctionTest.cpp HashTableTest.cpp + IndexLookupJoinTest.cpp LimitTest.cpp LocalPartitionTest.cpp Main.cpp diff --git a/velox/exec/tests/IndexLookupJoinTest.cpp b/velox/exec/tests/IndexLookupJoinTest.cpp new file mode 100644 index 000000000000..272ed92f778e --- /dev/null +++ b/velox/exec/tests/IndexLookupJoinTest.cpp @@ -0,0 +1,224 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "velox/common/base/tests/GTestUtils.h" +#include "velox/connectors/Connector.h" +#include "velox/core/PlanNode.h" +#include "velox/exec/tests/utils/HiveConnectorTestBase.h" +#include "velox/exec/tests/utils/PlanBuilder.h" + +using namespace facebook::velox; +using namespace facebook::velox::exec; +using namespace facebook::velox::exec::test; +using namespace facebook::velox::common::testutil; + +namespace { +class IndexLookupJoinTest : public HiveConnectorTestBase { + protected: + IndexLookupJoinTest() = default; + + void SetUp() override { + HiveConnectorTestBase::SetUp(); + core::PlanNode::registerSerDe(); + connector::hive::HiveColumnHandle::registerSerDe(); + Type::registerSerDe(); + core::ITypedExpr::registerSerDe(); + } + + void TearDown() override { + HiveConnectorTestBase::TearDown(); + } + + void testSerde(const core::PlanNodePtr& plan) { + auto serialized = plan->serialize(); + + auto copy = ISerializable::deserialize(serialized, pool()); + + ASSERT_EQ(plan->toString(true, true), copy->toString(true, true)); + } +}; + +class IndexTableHandle : public connector::ConnectorTableHandle { + public: + explicit IndexTableHandle(std::string connectorId) + : ConnectorTableHandle(std::move(connectorId)) {} + + ~IndexTableHandle() override = default; + + std::string toString() const override { + static const std::string str{"IndexTableHandle"}; + return str; + } + + const std::string& name() const override { + static const std::string connectorName{"IndexTableHandle"}; + return connectorName; + } + + bool supportsIndexLookup() const override { + return true; + } + + folly::dynamic serialize() const override { + folly::dynamic obj = folly::dynamic::object; + obj["name"] = name(); + obj["connectorId"] = connectorId(); + return obj; + } + + static std::shared_ptr create( + const folly::dynamic& obj, + void* context) { + return std::make_shared(obj["connectorId"].getString()); + } + + static void registerSerDe() { + auto& registry = DeserializationWithContextRegistryForSharedPtr(); + registry.Register("IndexTableHandle", create); + } +}; + +TEST_F(IndexLookupJoinTest, planNodeAndSerde) { + IndexTableHandle::registerSerDe(); + + auto indexConnectorHandle = + std::make_shared("IndexConnector"); + + auto left = makeRowVector( + {"t0", "t1", "t2"}, + {makeFlatVector({1, 2, 3}), + makeFlatVector({10, 20, 30}), + makeFlatVector({10, 30, 20})}); + + auto right = makeRowVector( + {"u0", "u1", "u2"}, + {makeFlatVector({1, 2, 3}), + makeFlatVector({10, 20, 30}), + makeFlatVector({10, 30, 20})}); + + auto planNodeIdGenerator = std::make_shared(); + + auto planBuilder = exec::test::PlanBuilder(); + auto nonIndexTableScan = std::dynamic_pointer_cast( + exec::test::PlanBuilder::TableScanBuilder(planBuilder) + .outputType(std::dynamic_pointer_cast(right->type())) + .endTableScan() + .planNode()); + VELOX_CHECK_NOT_NULL(nonIndexTableScan); + + auto indexTableScan = std::dynamic_pointer_cast( + exec::test::PlanBuilder::TableScanBuilder(planBuilder) + .tableHandle(indexConnectorHandle) + .outputType(std::dynamic_pointer_cast(right->type())) + .endTableScan() + .planNode()); + VELOX_CHECK_NOT_NULL(indexTableScan); + + for (const auto joinType : {core::JoinType::kLeft, core::JoinType::kInner}) { + auto plan = PlanBuilder(planNodeIdGenerator) + .values({left}) + .indexLookupJoin( + {"t0"}, + {"u0"}, + indexTableScan, + {}, + {"t0", "u1", "t2", "t1"}, + joinType) + .planNode(); + auto indexLookupJoinNode = + std::dynamic_pointer_cast(plan); + ASSERT_TRUE(indexLookupJoinNode->joinConditions().empty()); + ASSERT_EQ( + indexLookupJoinNode->lookupSource()->tableHandle()->connectorId(), + "IndexConnector"); + testSerde(plan); + } + + // with join conditions. + for (const auto joinType : {core::JoinType::kLeft, core::JoinType::kInner}) { + auto plan = PlanBuilder(planNodeIdGenerator) + .values({left}) + .indexLookupJoin( + {"t0"}, + {"u0"}, + indexTableScan, + {"u1 > t2"}, + {"t0", "u1", "t2", "t1"}, + joinType) + .planNode(); + auto indexLookupJoinNode = + std::dynamic_pointer_cast(plan); + ASSERT_EQ(indexLookupJoinNode->joinConditions().size(), 1); + ASSERT_EQ( + indexLookupJoinNode->lookupSource()->tableHandle()->connectorId(), + "IndexConnector"); + testSerde(plan); + } + + // bad join type. + { + VELOX_ASSERT_USER_THROW( + PlanBuilder(planNodeIdGenerator) + .values({left}) + .indexLookupJoin( + {"t0"}, + {"u0"}, + indexTableScan, + {}, + {"t0", "u1", "t2", "t1"}, + core::JoinType::kFull) + .planNode(), + "Unsupported index lookup join type FULL"); + } + + // bad table handle. + { + VELOX_ASSERT_USER_THROW( + PlanBuilder(planNodeIdGenerator) + .values({left}) + .indexLookupJoin( + {"t0"}, {"u0"}, nonIndexTableScan, {}, {"t0", "u1", "t2", "t1"}) + .planNode(), + "The lookup table handle hive_table from connector test-hive doesn't support index lookup"); + } + + // Non-matched join keys. + { + VELOX_ASSERT_THROW( + PlanBuilder(planNodeIdGenerator) + .values({left}) + .indexLookupJoin( + {"t0", "t1"}, + {"u0"}, + indexTableScan, + {"u1 > t2"}, + {"t0", "u1", "t2", "t1"}) + .planNode(), + "JoinNode requires same number of join keys on left and right sides"); + } + + // No join keys. + { + VELOX_ASSERT_THROW( + PlanBuilder(planNodeIdGenerator) + .values({left}) + .indexLookupJoin( + {}, {}, indexTableScan, {"u1 > t2"}, {"t0", "u1", "t2", "t1"}) + .planNode(), + "JoinNode requires at least one join key"); + } +} +} // namespace diff --git a/velox/exec/tests/utils/PlanBuilder.cpp b/velox/exec/tests/utils/PlanBuilder.cpp index cd419343d65c..478fa4c194ca 100644 --- a/velox/exec/tests/utils/PlanBuilder.cpp +++ b/velox/exec/tests/utils/PlanBuilder.cpp @@ -1572,6 +1572,39 @@ PlanBuilder& PlanBuilder::nestedLoopJoin( return *this; } +PlanBuilder& PlanBuilder::indexLookupJoin( + const std::vector& leftKeys, + const std::vector& rightKeys, + const core::TableScanNodePtr& right, + const std::vector& joinConditions, + const std::vector& outputLayout, + core::JoinType joinType) { + VELOX_CHECK_NOT_NULL(planNode_, "indexLookupJoin cannot be the source node"); + const auto inputType = concat(planNode_->outputType(), right->outputType()); + auto outputType = extract(inputType, outputLayout); + + auto leftKeyFields = fields(planNode_->outputType(), leftKeys); + auto rightKeyFields = fields(right->outputType(), rightKeys); + + std::vector joinConditionExprs{}; + joinConditionExprs.reserve(joinConditions.size()); + for (const auto& joinCondition : joinConditions) { + joinConditionExprs.push_back( + parseExpr(joinCondition, inputType, options_, pool_)); + } + + planNode_ = std::make_shared( + nextPlanNodeId(), + joinType, + std::move(leftKeyFields), + std::move(rightKeyFields), + std::move(joinConditionExprs), + std::move(planNode_), + right, + std::move(outputType)); + return *this; +} + PlanBuilder& PlanBuilder::unnest( const std::vector& replicateColumns, const std::vector& unnestColumns, diff --git a/velox/exec/tests/utils/PlanBuilder.h b/velox/exec/tests/utils/PlanBuilder.h index 63ca50797cda..0c4c5ee68b52 100644 --- a/velox/exec/tests/utils/PlanBuilder.h +++ b/velox/exec/tests/utils/PlanBuilder.h @@ -1100,6 +1100,26 @@ class PlanBuilder { const std::vector& outputLayout, core::JoinType joinType = core::JoinType::kInner); + /// Add an IndexLoopJoinNode to join two inputs using one or more join keys + /// plus optional join conditions. First input comes from the preceding plan + /// node. Second input is specified in 'right' parameter and must be a + /// table source with the connector table handle with index lookup support. + /// + /// @param right The right input source with index lookup support. + /// @param joinCondition SQL expressions as the join conditions. Each join + /// condition must use columns from both sides. For the right side, it can + /// only use one index column. + /// @param joinType Type of the join supported: inner, left. + /// + /// See hashJoin method for the description of the other parameters. + PlanBuilder& indexLookupJoin( + const std::vector& leftKeys, + const std::vector& rightKeys, + const core::TableScanNodePtr& right, + const std::vector& joinCondition, + const std::vector& outputLayout, + core::JoinType joinType = core::JoinType::kInner); + /// Add an UnnestNode to unnest one or more columns of type array or map. /// /// The output will contain 'replicatedColumns' followed by unnested columns,