Skip to content

Commit

Permalink
feat: Add index lookup join plan node (#12163)
Browse files Browse the repository at this point in the history
Summary:
Pull Request resolved: #12163

Add index lookup join plan node and add a method in table handle to indicate if the table support index lookup
or not.

Reviewed By: mbasmanova

Differential Revision: D68429744

fbshipit-source-id: c09a16f396c5889a30943dcdfb568d32938d580e
  • Loading branch information
xiaoxmeng authored and facebook-github-bot committed Jan 25, 2025
1 parent 292ea9d commit cac8000
Show file tree
Hide file tree
Showing 7 changed files with 502 additions and 25 deletions.
60 changes: 36 additions & 24 deletions velox/connectors/Connector.h
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,11 @@ class ConnectorTableHandle : public ISerializable {
VELOX_UNSUPPORTED();
}

/// Returns true if the connector table handle supports index lookup.
virtual bool supportsIndexLookup() const {
return false;
}

virtual folly::dynamic serialize() const override;

protected:
Expand Down Expand Up @@ -282,10 +287,10 @@ class DataSource {
}
};

/// Collection of context data for use in a DataSource or DataSink. One instance
/// of this per DataSource and DataSink. This may be passed between threads but
/// methods must be invoked sequentially. Serializing use is the responsibility
/// of the caller.
/// Collection of context data for use in a DataSource, IndexSource or DataSink.
/// One instance of this per DataSource and DataSink. This may be passed between
/// threads but methods must be invoked sequentially. Serializing use is the
/// responsibility of the caller.
class ConnectorQueryCtx {
public:
ConnectorQueryCtx(
Expand Down Expand Up @@ -327,9 +332,9 @@ class ConnectorQueryCtx {
return operatorPool_;
}

/// Returns the connector's memory pool which is an aggregate kind of memory
/// pool, used for the data sink for table write that needs the hierarchical
/// memory pool management, such as HiveDataSink.
/// Returns the connector's memory pool which is an aggregate kind of
/// memory pool, used for the data sink for table write that needs the
/// hierarchical memory pool management, such as HiveDataSink.
memory::MemoryPool* connectorMemoryPool() const {
return connectorPool_;
}
Expand All @@ -354,10 +359,10 @@ class ConnectorQueryCtx {
return cache_;
}

/// This is a combination of task id and the scan's PlanNodeId. This is an id
/// that allows sharing state between different threads of the same scan. This
/// is used for locating a scanTracker, which tracks the read density of
/// columns for prefetch and other memory hierarchy purposes.
/// This is a combination of task id and the scan's PlanNodeId. This is an
/// id that allows sharing state between different threads of the same
/// scan. This is used for locating a scanTracker, which tracks the read
/// density of columns for prefetch and other memory hierarchy purposes.
const std::string& scanId() const {
return scanId_;
}
Expand Down Expand Up @@ -441,8 +446,8 @@ class Connector {
VELOX_NYI("connectorConfig is not supported yet");
}

/// Returns true if this connector would accept a filter dynamically generated
/// during query execution.
/// Returns true if this connector would accept a filter dynamically
/// generated during query execution.
virtual bool canAddDynamicFilter() const {
return false;
}
Expand All @@ -469,6 +474,11 @@ class Connector {
return false;
}

/// Returns true if the connector supports index lookup, otherwise false.
virtual bool supportsIndexLookup() const {
return false;
}

virtual std::unique_ptr<DataSink> createDataSink(
RowTypePtr inputType,
std::shared_ptr<ConnectorInsertTableHandle> connectorInsertTableHandle,
Expand Down Expand Up @@ -517,9 +527,9 @@ class ConnectorFactory {
const std::string name_;
};

/// Adds a factory for creating connectors to the registry using connector name
/// as the key. Throws if factor with the same name is already present. Always
/// returns true. The return value makes it easy to use with
/// Adds a factory for creating connectors to the registry using connector
/// name as the key. Throws if factor with the same name is already present.
/// Always returns true. The return value makes it easy to use with
/// FB_ANONYMOUS_VARIABLE.
bool registerConnectorFactory(std::shared_ptr<ConnectorFactory> factory);

Expand All @@ -528,12 +538,12 @@ bool registerConnectorFactory(std::shared_ptr<ConnectorFactory> factory);
bool hasConnectorFactory(const std::string& connectorName);

/// Unregister a connector factory by name.
/// Returns true if a connector with the specified name has been unregistered,
/// false otherwise.
/// Returns true if a connector with the specified name has been
/// unregistered, false otherwise.
bool unregisterConnectorFactory(const std::string& connectorName);

/// Returns a factory for creating connectors with the specified name. Throws if
/// factory doesn't exist.
/// Returns a factory for creating connectors with the specified name.
/// Throws if factory doesn't exist.
std::shared_ptr<ConnectorFactory> getConnectorFactory(
const std::string& connectorName);

Expand All @@ -542,14 +552,16 @@ std::shared_ptr<ConnectorFactory> getConnectorFactory(
/// true. The return value makes it easy to use with FB_ANONYMOUS_VARIABLE.
bool registerConnector(std::shared_ptr<Connector> connector);

/// Removes the connector with specified ID from the registry. Returns true if
/// connector was removed and false if connector didn't exist.
/// Removes the connector with specified ID from the registry. Returns true
/// if connector was removed and false if connector didn't exist.
bool unregisterConnector(const std::string& connectorId);

/// Returns a connector with specified ID. Throws if connector doesn't exist.
/// Returns a connector with specified ID. Throws if connector doesn't
/// exist.
std::shared_ptr<Connector> getConnector(const std::string& connectorId);

/// Returns a map of all (connectorId -> connector) pairs currently registered.
/// Returns a map of all (connectorId -> connector) pairs currently
/// registered.
const std::unordered_map<std::string, std::shared_ptr<Connector>>&
getAllConnectors();

Expand Down
81 changes: 81 additions & 0 deletions velox/core/PlanNode.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,17 @@ std::vector<PlanNodePtr> deserializeSources(
return {};
}

std::vector<TypedExprPtr> deserializeJoinConditions(
const folly::dynamic& obj,
void* context) {
if (obj.count("joinConditions") == 0) {
return {};
}

return ISerializable::deserialize<std::vector<ITypedExpr>>(
obj["joinConditions"], context);
}

PlanNodePtr deserializeSingleSource(const folly::dynamic& obj, void* context) {
auto sources = deserializeSources(obj, context);
VELOX_CHECK_EQ(1, sources.size());
Expand Down Expand Up @@ -1405,6 +1416,75 @@ PlanNodePtr MergeJoinNode::create(const folly::dynamic& obj, void* context) {
outputType);
}

PlanNodePtr IndexLookupJoinNode::create(
const folly::dynamic& obj,
void* context) {
auto sources = deserializeSources(obj, context);
VELOX_CHECK_EQ(2, sources.size());
TableScanNodePtr lookupSource =
std::dynamic_pointer_cast<const TableScanNode>(sources[1]);
VELOX_CHECK_NOT_NULL(lookupSource);

auto leftKeys = deserializeFields(obj["leftKeys"], context);
auto rightKeys = deserializeFields(obj["rightKeys"], context);

VELOX_CHECK_EQ(obj.count("filter"), 0);

auto joinConditions = deserializeJoinConditions(obj, context);

auto outputType = deserializeRowType(obj["outputType"]);

return std::make_shared<IndexLookupJoinNode>(
deserializePlanNodeId(obj),
joinTypeFromName(obj["joinType"].asString()),
std::move(leftKeys),
std::move(rightKeys),
std::move(joinConditions),
sources[0],
std::move(lookupSource),
std::move(outputType));
}

folly::dynamic IndexLookupJoinNode::serialize() const {
auto obj = serializeBase();
if (!joinConditions_.empty()) {
folly::dynamic serializedJoins = folly::dynamic::array;
serializedJoins.reserve(joinConditions_.size());
for (const auto& joinCondition : joinConditions_) {
serializedJoins.push_back(joinCondition->serialize());
}
obj["joinConditions"] = std::move(serializedJoins);
}
return obj;
}

void IndexLookupJoinNode::addDetails(std::stringstream& stream) const {
AbstractJoinNode::addDetails(stream);
if (joinConditions_.empty()) {
return;
}

std::vector<std::string> joinConditionStrs;
joinConditionStrs.reserve(joinConditions_.size());
for (const auto& joinCondition : joinConditions_) {
joinConditionStrs.push_back(joinCondition->toString());
}
stream << ", joinConditions: [" << folly::join(", ", joinConditionStrs)
<< " ]";
}

// static
bool IndexLookupJoinNode::isSupported(core::JoinType joinType) {
switch (joinType) {
case core::JoinType::kInner:
[[fallthrough]];
case core::JoinType::kLeft:
return true;
default:
return false;
}
}

NestedLoopJoinNode::NestedLoopJoinNode(
const PlanNodeId& id,
JoinType joinType,
Expand Down Expand Up @@ -2726,6 +2806,7 @@ void PlanNode::registerSerDe() {
registry.Register("HashJoinNode", HashJoinNode::create);
registry.Register("MergeExchangeNode", MergeExchangeNode::create);
registry.Register("MergeJoinNode", MergeJoinNode::create);
registry.Register("IndexLookupJoinNode", IndexLookupJoinNode::create);
registry.Register("NestedLoopJoinNode", NestedLoopJoinNode::create);
registry.Register("LimitNode", LimitNode::create);
registry.Register("LocalMergeNode", LocalMergeNode::create);
Expand Down
108 changes: 107 additions & 1 deletion velox/core/PlanNode.h
Original file line number Diff line number Diff line change
Expand Up @@ -1762,12 +1762,118 @@ class MergeJoinNode : public AbstractJoinNode {

folly::dynamic serialize() const override;

/// If merge join supports this join type.
/// Returns true if the merge join supports this join type, otherwise false.
static bool isSupported(JoinType joinType);

static PlanNodePtr create(const folly::dynamic& obj, void* context);
};

/// Represents index lookup join. Translates to an exec::IndexLookupJoin
/// operator. Assumes the right input is a table scan source node that provides
/// indexed table lookup for the left input with the specified join keys and
/// conditions. The join keys must be a prefix of the index columns of the
/// lookup table. Each join condition must use columns from both sides. For the
/// right side, it can only use one index column. Each index column can either
/// be a join key or a join condition once. The table scan node of the right
/// input is translated to a connector::IndexSource within
/// exec::IndexLookupJoin.
///
/// Only INNER and LEFT joins are supported.
///
/// Take the following query for example, t is left table, r is the right table
/// with indexed columns. 'sid' is the join keys. 'u.event_type in t.event_list'
/// is the join condition.
///
/// SELECT t.sid, t.day_ts, u.event_type
/// FROM t LEFT JOIN u
/// ON t.sid = u.sid
/// AND contains(t.event_list, u.event_type)
/// AND t.ds BETWEEN '2024-01-01' AND '2024-01-07'
///
/// Here,
/// - 'joinType' is JoinType::kLeft
/// - 'left' describes scan of t with a filter on 'ds':t.ds BETWEEN '2024-01-01'
/// AND '2024-01-07'
/// - 'right' describes indexed table 'u' with ndex keys sid, event_type(and
/// maybe some more)
/// - 'leftKeys' is a list of one key 't.sid'
/// - 'rightKeys' is a list of one key 'u.sid'
/// - 'joinConditions' is a list of one expression: contains(t.event_list,
/// u.event_type)
/// - 'outputType' contains 3 columns : t.sid, t.day_ts, u.event_type
///
class IndexLookupJoinNode : public AbstractJoinNode {
public:
/// @param joinType Specifies the lookup join type. Only INNER and LEFT joins
/// are supported.
IndexLookupJoinNode(
const PlanNodeId& id,
JoinType joinType,
const std::vector<FieldAccessTypedExprPtr>& leftKeys,
const std::vector<FieldAccessTypedExprPtr>& rightKeys,
const std::vector<TypedExprPtr>& joinConditions,
PlanNodePtr left,
TableScanNodePtr right,
RowTypePtr outputType)
: AbstractJoinNode(
id,
joinType,
leftKeys,
rightKeys,
/*filter=*/nullptr,
std::move(left),
right,
outputType),
lookupSourceNode_(std::move(right)),
joinConditions_(joinConditions) {
VELOX_USER_CHECK(
!leftKeys.empty(),
"The lookup join node requires at least one join key");
VELOX_USER_CHECK_EQ(
leftKeys_.size(),
rightKeys_.size(),
"The lookup join node requires same number of join keys on left and right sides");
// TODO: add check that (1) 'rightKeys_' form an index prefix. each of
// 'joinConditions_' uses columns from both sides and uses exactly one index
// column from the right side.
VELOX_USER_CHECK(
lookupSourceNode_->tableHandle()->supportsIndexLookup(),
"The lookup table handle {} from connector {} doesn't support index lookup",
lookupSourceNode_->tableHandle()->name(),
lookupSourceNode_->tableHandle()->connectorId());
VELOX_USER_CHECK(
isSupported(joinType_),
"Unsupported index lookup join type {}",
joinTypeName(joinType_));
}

const TableScanNodePtr& lookupSource() const {
return lookupSourceNode_;
}

const std::vector<TypedExprPtr>& joinConditions() const {
return joinConditions_;
}

std::string_view name() const override {
return "IndexLookupJoin";
}

folly::dynamic serialize() const override;

static PlanNodePtr create(const folly::dynamic& obj, void* context);

/// Returns true if the lookup join supports this join type, otherwise false.
static bool isSupported(JoinType joinType);

private:
void addDetails(std::stringstream& stream) const override;

const TableScanNodePtr lookupSourceNode_;

const std::vector<TypedExprPtr> joinConditions_;
};

/// Represents inner/outer nested loop joins. Translates to an
/// exec::NestedLoopJoinProbe and exec::NestedLoopJoinBuild. A separate
/// pipeline is produced for the build side when generating exec::Operators.
Expand Down
1 change: 1 addition & 0 deletions velox/exec/tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ add_executable(
HashJoinTest.cpp
HashPartitionFunctionTest.cpp
HashTableTest.cpp
IndexLookupJoinTest.cpp
LimitTest.cpp
LocalPartitionTest.cpp
Main.cpp
Expand Down
Loading

0 comments on commit cac8000

Please sign in to comment.