Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add single node mode for query trace tool #12171

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions velox/common/base/TraceConfig.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,13 @@ TraceConfig::TraceConfig(
std::unordered_set<std::string> _queryNodeIds,
std::string _queryTraceDir,
UpdateAndCheckTraceLimitCB _updateAndCheckTraceLimitCB,
std::string _taskRegExp)
std::string _taskRegExp,
bool _singleNodeMode)
: queryNodes(std::move(_queryNodeIds)),
queryTraceDir(std::move(_queryTraceDir)),
updateAndCheckTraceLimitCB(std::move(_updateAndCheckTraceLimitCB)),
taskRegExp(std::move(_taskRegExp)) {
taskRegExp(std::move(_taskRegExp)),
singleNodeMode(_singleNodeMode) {
VELOX_CHECK(!queryNodes.empty(), "Query trace nodes cannot be empty");
}
} // namespace facebook::velox::exec::trace
5 changes: 4 additions & 1 deletion velox/common/base/TraceConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,14 @@ struct TraceConfig {
UpdateAndCheckTraceLimitCB updateAndCheckTraceLimitCB;
/// The trace task regexp.
std::string taskRegExp;
/// Single node mode.
bool singleNodeMode;

TraceConfig(
std::unordered_set<std::string> _queryNodeIds,
std::string _queryTraceDir,
UpdateAndCheckTraceLimitCB _updateAndCheckTraceLimitCB,
std::string _taskRegExp);
std::string _taskRegExp,
bool _singleNodeMode = false);
};
} // namespace facebook::velox::exec::trace
9 changes: 9 additions & 0 deletions velox/core/QueryConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -416,6 +416,11 @@ class QueryConfig {
static constexpr const char* kQueryTraceTaskRegExp =
"query_trace_task_reg_exp";

/// Single Trace node mode. Only write the target trace node metadata if true,
/// and the num of query trace nodes must be 1.
static constexpr const char* kQueryTraceSingleNodeMode =
"query_trace_single_node_mode";

/// Config used to create operator trace directory. This config is provided to
/// underlying file system and the config is free form. The form should be
/// defined by the underlying file system.
Expand Down Expand Up @@ -788,6 +793,10 @@ class QueryConfig {
return get<std::string>(kQueryTraceTaskRegExp, "");
}

bool queryTraceSingleNodeMode() const {
return get<bool>(kQueryTraceSingleNodeMode, false);
}

std::string opTraceDirectoryCreateConfig() const {
return get<std::string>(kOpTraceDirectoryCreateConfig, "");
}
Expand Down
11 changes: 9 additions & 2 deletions velox/exec/Task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3084,7 +3084,8 @@ std::optional<trace::TraceConfig> Task::maybeMakeTraceConfig() const {
std::move(traceNodeIdSet),
traceDir,
std::move(updateAndCheckTraceLimitCB),
queryConfig.queryTraceTaskRegExp());
queryConfig.queryTraceTaskRegExp(),
queryConfig.queryTraceSingleNodeMode());
}

void Task::maybeInitTrace() {
Expand All @@ -3095,7 +3096,13 @@ void Task::maybeInitTrace() {
trace::createTraceDirectory(traceConfig_->queryTraceDir);
const auto metadataWriter = std::make_unique<trace::TaskTraceMetadataWriter>(
traceConfig_->queryTraceDir, memory::traceMemoryPool());
metadataWriter->write(queryCtx_, planFragment_.planNode);
if (traceConfig_->singleNodeMode) {
VELOX_USER_CHECK_EQ(traceConfig_->queryNodes.size(), 1);
metadataWriter->write(
queryCtx_, planFragment_.planNode, *traceConfig_->queryNodes.cbegin());
} else {
metadataWriter->write(queryCtx_, planFragment_.planNode);
}
}

void Task::testingVisitDrivers(const std::function<void(Driver*)>& callback) {
Expand Down
7 changes: 7 additions & 0 deletions velox/exec/TaskTraceWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -66,4 +66,11 @@ void TaskTraceMetadataWriter::write(
file->close();
}

void TaskTraceMetadataWriter::write(
const std::shared_ptr<core::QueryCtx>& queryCtx,
const core::PlanNodePtr& planNode,
const core::PlanNodeId& planNodeId) {
write(queryCtx, copyTraceNode(planNode, planNodeId));
}

} // namespace facebook::velox::exec::trace
5 changes: 5 additions & 0 deletions velox/exec/TaskTraceWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,11 @@ class TaskTraceMetadataWriter {
const std::shared_ptr<core::QueryCtx>& queryCtx,
const core::PlanNodePtr& planNode);

void write(
const std::shared_ptr<core::QueryCtx>& queryCtx,
const core::PlanNodePtr& planNode,
const core::PlanNodeId& planNodeId);

private:
const std::string traceDir_;
const std::shared_ptr<filesystems::FileSystem> fs_;
Expand Down
90 changes: 89 additions & 1 deletion velox/exec/TraceUtil.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

#include <folly/json.h>

#include <numeric>
#include "velox/common/base/Exceptions.h"
#include "velox/common/file/File.h"
#include "velox/common/file/FileSystems.h"
Expand Down Expand Up @@ -265,4 +264,93 @@ bool canTrace(const std::string& operatorType) {
"TableWrite"};
return kSupportedOperatorTypes.count(operatorType) > 0;
}

namespace {
const std::vector<core::PlanNodePtr> kEmptySources;
} // namespace

const std::vector<core::PlanNodePtr>& DummySourceNode::sources() const {
return kEmptySources;
}

std::string_view DummySourceNode::name() const {
return "DummySource";
}

folly::dynamic DummySourceNode::serialize() const {
folly::dynamic obj = folly::dynamic::object;
obj["name"] = "DummySource";
obj["outputType"] = outputType_->serialize();
return obj;
}

core::PlanNodePtr copyTraceNode(
const core::PlanNodePtr& node,
core::PlanNodeId nodeId) {
const auto* traceNode = core::PlanNode::findFirstNode(
node.get(),
[&nodeId](const core::PlanNode* node) { return node->id() == nodeId; });
if (const auto* hashJoinNode =
dynamic_cast<const core::HashJoinNode*>(traceNode)) {
return std::make_shared<core::HashJoinNode>(
nodeId,
hashJoinNode->joinType(),
hashJoinNode->isNullAware(),
hashJoinNode->leftKeys(),
hashJoinNode->rightKeys(),
hashJoinNode->filter(),
std::make_shared<DummySourceNode>(
hashJoinNode->sources()[0]->outputType()),
std::make_shared<DummySourceNode>(
hashJoinNode->sources()[1]->outputType()),
hashJoinNode->outputType());
} else if (
const auto* filterNode =
dynamic_cast<const core::FilterNode*>(traceNode)) {
// Single FilterNode.
return std::make_shared<core::FilterNode>(
nodeId,
filterNode->filter(),
std::make_shared<DummySourceNode>(
filterNode->sources().front()->outputType()));
} else if (
const auto* projectNode =
dynamic_cast<const core::ProjectNode*>(traceNode)) {
// A standalone ProjectNode.
if (projectNode->sources().empty() ||
projectNode->sources().front()->name() != "Filter") {
return std::make_shared<core::ProjectNode>(
nodeId,
projectNode->names(),
projectNode->projections(),
std::make_shared<DummySourceNode>(
projectNode->sources().front()->outputType()));
}

// -- ProjectNode [nodeId]
// -- FilterNode [nodeId - 1]
const auto originalFilterNode =
std::dynamic_pointer_cast<const core::FilterNode>(
projectNode->sources().front());
VELOX_CHECK_NOT_NULL(originalFilterNode);

auto filterNode = std::make_shared<core::FilterNode>(
originalFilterNode->id(),
originalFilterNode->filter(),
std::make_shared<DummySourceNode>(
originalFilterNode->sources().front()->outputType()));
return std::make_shared<core::ProjectNode>(
nodeId,
projectNode->names(),
projectNode->projections(),
std::move(filterNode));
} else {
VELOX_UNSUPPORTED();
}
}

void registerDummySourceSerDe() {
auto& registry = DeserializationWithContextRegistryForSharedPtr();
registry.Register("DummySource", DummySourceNode::create);
}
} // namespace facebook::velox::exec::trace
36 changes: 35 additions & 1 deletion velox/exec/TraceUtil.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
#include <folly/dynamic.h>

namespace facebook::velox::exec::trace {

/// Creates a directory to store the query trace metdata and data.
void createTraceDirectory(
const std::string& traceDir,
Expand Down Expand Up @@ -142,4 +141,39 @@ folly::dynamic getTaskMetadata(

/// Checks whether the operator can be traced.
bool canTrace(const std::string& operatorType);

class DummySourceNode : public core::PlanNode {
public:
explicit DummySourceNode(const RowTypePtr outputType)
: PlanNode(""), outputType_(std::move(outputType)) {}

const RowTypePtr& outputType() const override {
return outputType_;
}

const std::vector<core::PlanNodePtr>& sources() const override;

std::string_view name() const override;

folly::dynamic serialize() const override;

static core::PlanNodePtr create(const folly::dynamic& obj, void* context) {
return std::make_shared<DummySourceNode>(
ISerializable::deserialize<RowType>(obj["outputType"]));
}

private:
void addDetails(std::stringstream& stream) const override {
// Nothing to add.
}

const RowTypePtr outputType_;
};

void registerDummySourceSerDe();

/// Copy the target plan node, and replay the source node with DummySourceNode.
core::PlanNodePtr copyTraceNode(
const core::PlanNodePtr& node,
core::PlanNodeId nodeId);
} // namespace facebook::velox::exec::trace
73 changes: 49 additions & 24 deletions velox/exec/tests/OperatorTraceTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ class OperatorTraceTest : public HiveConnectorTestBase {
core::PlanNode::registerSerDe();
core::ITypedExpr::registerSerDe();
registerPartitionFunctionSerDe();
trace::registerDummySourceSerDe();
}

void SetUp() override {
Expand Down Expand Up @@ -99,7 +100,9 @@ class OperatorTraceTest : public HiveConnectorTestBase {
}

for (auto i = 0; i < left->sources().size(); ++i) {
isSamePlan(left->sources().at(i), right->sources().at(i));
if (!isSamePlan(left->sources().at(i), right->sources().at(i))) {
return false;
}
}
return true;
}
Expand Down Expand Up @@ -267,6 +270,7 @@ TEST_F(OperatorTraceTest, traceMetadata) {

const auto outputDir = TempDirectoryPath::create();
auto planNodeIdGenerator = std::make_shared<core::PlanNodeIdGenerator>();
core::PlanNodeId traceNodeId;
const auto planNode =
PlanBuilder(planNodeIdGenerator)
.values(rows, false)
Expand All @@ -282,6 +286,7 @@ TEST_F(OperatorTraceTest, traceMetadata) {
"c0 < 135",
{"c0", "c1", "c2"},
core::JoinType::kInner)
.capturePlanNodeId(traceNodeId)
.planNode();
const auto expectedQueryConfigs =
std::unordered_map<std::string, std::string>{
Expand All @@ -299,30 +304,50 @@ TEST_F(OperatorTraceTest, traceMetadata) {
executor_.get(),
core::QueryConfig(expectedQueryConfigs),
expectedConnectorProperties);
auto writer = trace::TaskTraceMetadataWriter(outputDir->getPath(), pool());
writer.write(queryCtx, planNode);
std::unordered_map<std::string, std::string> acutalQueryConfigs;
std::unordered_map<std::string, std::unordered_map<std::string, std::string>>
actualConnectorProperties;
core::PlanNodePtr actualQueryPlan;
auto reader = trace::TaskTraceMetadataReader(outputDir->getPath(), pool());
reader.read(acutalQueryConfigs, actualConnectorProperties, actualQueryPlan);

ASSERT_TRUE(isSamePlan(actualQueryPlan, planNode));
ASSERT_EQ(acutalQueryConfigs.size(), expectedQueryConfigs.size());
for (const auto& [key, value] : acutalQueryConfigs) {
ASSERT_EQ(acutalQueryConfigs.at(key), expectedQueryConfigs.at(key));
}
int i = 0;
for (const auto singleTraceMode : {true, false}) {
const auto tracePath =
fmt::format("{}/mode={}", outputDir->getPath(), singleTraceMode);
trace::createTraceDirectory(tracePath);
auto writer = trace::TaskTraceMetadataWriter(tracePath, pool());
if (singleTraceMode) {
writer.write(queryCtx, planNode, traceNodeId);
} else {
writer.write(queryCtx, planNode);
}
std::unordered_map<std::string, std::string> acutalQueryConfigs;
std::
unordered_map<std::string, std::unordered_map<std::string, std::string>>
actualConnectorProperties;
core::PlanNodePtr actualQueryPlan;
auto reader = trace::TaskTraceMetadataReader(tracePath, pool());
reader.read(acutalQueryConfigs, actualConnectorProperties, actualQueryPlan);

if (singleTraceMode) {
ASSERT_EQ(actualQueryPlan->id(), planNode->id());
ASSERT_EQ(actualQueryPlan->name(), planNode->name());
ASSERT_EQ(actualQueryPlan->sources().size(), planNode->sources().size());
ASSERT_EQ(actualQueryPlan->sources().front()->name(), "DummySource");
} else {
ASSERT_TRUE(isSamePlan(actualQueryPlan, planNode));
}

ASSERT_EQ(
actualConnectorProperties.size(), expectedConnectorProperties.size());
ASSERT_EQ(actualConnectorProperties.count("test_trace"), 1);
const auto expectedConnectorConfigs =
expectedConnectorProperties.at("test_trace")->rawConfigsCopy();
const auto actualConnectorConfigs =
actualConnectorProperties.at("test_trace");
for (const auto& [key, value] : actualConnectorConfigs) {
ASSERT_EQ(actualConnectorConfigs.at(key), expectedConnectorConfigs.at(key));
ASSERT_EQ(acutalQueryConfigs.size(), expectedQueryConfigs.size());
for (const auto& [key, value] : acutalQueryConfigs) {
ASSERT_EQ(acutalQueryConfigs.at(key), expectedQueryConfigs.at(key));
}

ASSERT_EQ(
actualConnectorProperties.size(), expectedConnectorProperties.size());
ASSERT_EQ(actualConnectorProperties.count("test_trace"), 1);
const auto expectedConnectorConfigs =
expectedConnectorProperties.at("test_trace")->rawConfigsCopy();
const auto actualConnectorConfigs =
actualConnectorProperties.at("test_trace");
for (const auto& [key, value] : actualConnectorConfigs) {
ASSERT_EQ(
actualConnectorConfigs.at(key), expectedConnectorConfigs.at(key));
}
}
}

Expand Down
Loading
Loading