Skip to content

Commit

Permalink
add single node trace
Browse files Browse the repository at this point in the history
  • Loading branch information
duanmeng committed Jan 25, 2025
1 parent 6010f95 commit 4e5f620
Show file tree
Hide file tree
Showing 11 changed files with 364 additions and 164 deletions.
6 changes: 4 additions & 2 deletions velox/common/base/TraceConfig.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,13 @@ TraceConfig::TraceConfig(
std::unordered_set<std::string> _queryNodeIds,
std::string _queryTraceDir,
UpdateAndCheckTraceLimitCB _updateAndCheckTraceLimitCB,
std::string _taskRegExp)
std::string _taskRegExp,
bool _singleNodeMode)
: queryNodes(std::move(_queryNodeIds)),
queryTraceDir(std::move(_queryTraceDir)),
updateAndCheckTraceLimitCB(std::move(_updateAndCheckTraceLimitCB)),
taskRegExp(std::move(_taskRegExp)) {
taskRegExp(std::move(_taskRegExp)),
singleNodeMode(_singleNodeMode) {
VELOX_CHECK(!queryNodes.empty(), "Query trace nodes cannot be empty");
}
} // namespace facebook::velox::exec::trace
5 changes: 4 additions & 1 deletion velox/common/base/TraceConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,14 @@ struct TraceConfig {
UpdateAndCheckTraceLimitCB updateAndCheckTraceLimitCB;
/// The trace task regexp.
std::string taskRegExp;
/// Single node mode.
bool singleNodeMode;

TraceConfig(
std::unordered_set<std::string> _queryNodeIds,
std::string _queryTraceDir,
UpdateAndCheckTraceLimitCB _updateAndCheckTraceLimitCB,
std::string _taskRegExp);
std::string _taskRegExp,
bool _singleNodeMode = false);
};
} // namespace facebook::velox::exec::trace
9 changes: 9 additions & 0 deletions velox/core/QueryConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -416,6 +416,11 @@ class QueryConfig {
static constexpr const char* kQueryTraceTaskRegExp =
"query_trace_task_reg_exp";

/// Single Trace node mode. Only write the target trace node metadata if true,
/// and the num of query trace nodes must be 1.
static constexpr const char* kQueryTraceSingleNodeMode =
"query_trace_single_node_mode";

/// Config used to create operator trace directory. This config is provided to
/// underlying file system and the config is free form. The form should be
/// defined by the underlying file system.
Expand Down Expand Up @@ -788,6 +793,10 @@ class QueryConfig {
return get<std::string>(kQueryTraceTaskRegExp, "");
}

bool queryTraceSingleNodeMode() const {
return get<bool>(kQueryTraceSingleNodeMode, false);
}

std::string opTraceDirectoryCreateConfig() const {
return get<std::string>(kOpTraceDirectoryCreateConfig, "");
}
Expand Down
11 changes: 9 additions & 2 deletions velox/exec/Task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3084,7 +3084,8 @@ std::optional<trace::TraceConfig> Task::maybeMakeTraceConfig() const {
std::move(traceNodeIdSet),
traceDir,
std::move(updateAndCheckTraceLimitCB),
queryConfig.queryTraceTaskRegExp());
queryConfig.queryTraceTaskRegExp(),
queryConfig.queryTraceSingleNodeMode());
}

void Task::maybeInitTrace() {
Expand All @@ -3095,7 +3096,13 @@ void Task::maybeInitTrace() {
trace::createTraceDirectory(traceConfig_->queryTraceDir);
const auto metadataWriter = std::make_unique<trace::TaskTraceMetadataWriter>(
traceConfig_->queryTraceDir, memory::traceMemoryPool());
metadataWriter->write(queryCtx_, planFragment_.planNode);
if (traceConfig_->singleNodeMode) {
VELOX_USER_CHECK_EQ(traceConfig_->queryNodes.size(), 1);
metadataWriter->write(
queryCtx_, planFragment_.planNode, *traceConfig_->queryNodes.cbegin());
} else {
metadataWriter->write(queryCtx_, planFragment_.planNode);
}
}

void Task::testingVisitDrivers(const std::function<void(Driver*)>& callback) {
Expand Down
7 changes: 7 additions & 0 deletions velox/exec/TaskTraceWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -66,4 +66,11 @@ void TaskTraceMetadataWriter::write(
file->close();
}

void TaskTraceMetadataWriter::write(
const std::shared_ptr<core::QueryCtx>& queryCtx,
const core::PlanNodePtr& planNode,
const core::PlanNodeId& planNodeId) {
write(queryCtx, copyTraceNode(planNode, planNodeId));
}

} // namespace facebook::velox::exec::trace
5 changes: 5 additions & 0 deletions velox/exec/TaskTraceWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,11 @@ class TaskTraceMetadataWriter {
const std::shared_ptr<core::QueryCtx>& queryCtx,
const core::PlanNodePtr& planNode);

void write(
const std::shared_ptr<core::QueryCtx>& queryCtx,
const core::PlanNodePtr& planNode,
const core::PlanNodeId& planNodeId);

private:
const std::string traceDir_;
const std::shared_ptr<filesystems::FileSystem> fs_;
Expand Down
90 changes: 89 additions & 1 deletion velox/exec/TraceUtil.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

#include <folly/json.h>

#include <numeric>
#include "velox/common/base/Exceptions.h"
#include "velox/common/file/File.h"
#include "velox/common/file/FileSystems.h"
Expand Down Expand Up @@ -265,4 +264,93 @@ bool canTrace(const std::string& operatorType) {
"TableWrite"};
return kSupportedOperatorTypes.count(operatorType) > 0;
}

namespace {
const std::vector<core::PlanNodePtr> kEmptySources;
} // namespace

const std::vector<core::PlanNodePtr>& DummySourceNode::sources() const {
return kEmptySources;
}

std::string_view DummySourceNode::name() const {
return "DummySource";
}

folly::dynamic DummySourceNode::serialize() const {
folly::dynamic obj = folly::dynamic::object;
obj["name"] = "DummySource";
obj["outputType"] = outputType_->serialize();
return obj;
}

core::PlanNodePtr copyTraceNode(
const core::PlanNodePtr& node,
core::PlanNodeId nodeId) {
const auto* traceNode = core::PlanNode::findFirstNode(
node.get(),
[&nodeId](const core::PlanNode* node) { return node->id() == nodeId; });
if (const auto* hashJoinNode =
dynamic_cast<const core::HashJoinNode*>(traceNode)) {
return std::make_shared<core::HashJoinNode>(
nodeId,
hashJoinNode->joinType(),
hashJoinNode->isNullAware(),
hashJoinNode->leftKeys(),
hashJoinNode->rightKeys(),
hashJoinNode->filter(),
std::make_shared<DummySourceNode>(
hashJoinNode->sources()[0]->outputType()),
std::make_shared<DummySourceNode>(
hashJoinNode->sources()[1]->outputType()),
hashJoinNode->outputType());
} else if (
const auto* filterNode =
dynamic_cast<const core::FilterNode*>(traceNode)) {
// Single FilterNode.
return std::make_shared<core::FilterNode>(
nodeId,
filterNode->filter(),
std::make_shared<DummySourceNode>(
filterNode->sources().front()->outputType()));
} else if (
const auto* projectNode =
dynamic_cast<const core::ProjectNode*>(traceNode)) {
// A standalone ProjectNode.
if (projectNode->sources().empty() ||
projectNode->sources().front()->name() != "Filter") {
return std::make_shared<core::ProjectNode>(
nodeId,
projectNode->names(),
projectNode->projections(),
std::make_shared<DummySourceNode>(
projectNode->sources().front()->outputType()));
}

// -- ProjectNode [nodeId]
// -- FilterNode [nodeId - 1]
const auto originalFilterNode =
std::dynamic_pointer_cast<const core::FilterNode>(
projectNode->sources().front());
VELOX_CHECK_NOT_NULL(originalFilterNode);

auto filterNode = std::make_shared<core::FilterNode>(
originalFilterNode->id(),
originalFilterNode->filter(),
std::make_shared<DummySourceNode>(
originalFilterNode->sources().front()->outputType()));
return std::make_shared<core::ProjectNode>(
nodeId,
projectNode->names(),
projectNode->projections(),
std::move(filterNode));
} else {
VELOX_UNSUPPORTED();
}
}

void registerDummySourceSerDe() {
auto& registry = DeserializationWithContextRegistryForSharedPtr();
registry.Register("DummySource", DummySourceNode::create);
}
} // namespace facebook::velox::exec::trace
37 changes: 36 additions & 1 deletion velox/exec/TraceUtil.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
#include <folly/dynamic.h>

namespace facebook::velox::exec::trace {

/// Creates a directory to store the query trace metdata and data.
void createTraceDirectory(
const std::string& traceDir,
Expand Down Expand Up @@ -142,4 +141,40 @@ folly::dynamic getTaskMetadata(

/// Checks whether the operator can be traced.
bool canTrace(const std::string& operatorType);

class DummySourceNode : public core::PlanNode {
public:
explicit DummySourceNode(const RowTypePtr outputType)
: PlanNode(""), outputType_(std::move(outputType)) {}

const RowTypePtr& outputType() const override {
return outputType_;
}

const std::vector<core::PlanNodePtr>& sources() const override;

std::string_view name() const override;

folly::dynamic serialize() const override;

static core::PlanNodePtr create(const folly::dynamic& obj, void* context) {
return std::make_shared<DummySourceNode>(
ISerializable::deserialize<RowType>(obj["outputType"]));
}

private:
void addDetails(std::stringstream& stream) const override {
// Nothing to add.
}

const RowTypePtr outputType_;
};

void registerDummySourceSerDe();

/// Copy the target plan node, and replay the source node with an empty
/// ValueNode.
core::PlanNodePtr copyTraceNode(
const core::PlanNodePtr& node,
core::PlanNodeId nodeId);
} // namespace facebook::velox::exec::trace
73 changes: 49 additions & 24 deletions velox/exec/tests/OperatorTraceTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ class OperatorTraceTest : public HiveConnectorTestBase {
core::PlanNode::registerSerDe();
core::ITypedExpr::registerSerDe();
registerPartitionFunctionSerDe();
trace::registerDummySourceSerDe();
}

void SetUp() override {
Expand Down Expand Up @@ -99,7 +100,9 @@ class OperatorTraceTest : public HiveConnectorTestBase {
}

for (auto i = 0; i < left->sources().size(); ++i) {
isSamePlan(left->sources().at(i), right->sources().at(i));
if (!isSamePlan(left->sources().at(i), right->sources().at(i))) {
return false;
}
}
return true;
}
Expand Down Expand Up @@ -267,6 +270,7 @@ TEST_F(OperatorTraceTest, traceMetadata) {

const auto outputDir = TempDirectoryPath::create();
auto planNodeIdGenerator = std::make_shared<core::PlanNodeIdGenerator>();
core::PlanNodeId traceNodeId;
const auto planNode =
PlanBuilder(planNodeIdGenerator)
.values(rows, false)
Expand All @@ -282,6 +286,7 @@ TEST_F(OperatorTraceTest, traceMetadata) {
"c0 < 135",
{"c0", "c1", "c2"},
core::JoinType::kInner)
.capturePlanNodeId(traceNodeId)
.planNode();
const auto expectedQueryConfigs =
std::unordered_map<std::string, std::string>{
Expand All @@ -299,30 +304,50 @@ TEST_F(OperatorTraceTest, traceMetadata) {
executor_.get(),
core::QueryConfig(expectedQueryConfigs),
expectedConnectorProperties);
auto writer = trace::TaskTraceMetadataWriter(outputDir->getPath(), pool());
writer.write(queryCtx, planNode);
std::unordered_map<std::string, std::string> acutalQueryConfigs;
std::unordered_map<std::string, std::unordered_map<std::string, std::string>>
actualConnectorProperties;
core::PlanNodePtr actualQueryPlan;
auto reader = trace::TaskTraceMetadataReader(outputDir->getPath(), pool());
reader.read(acutalQueryConfigs, actualConnectorProperties, actualQueryPlan);

ASSERT_TRUE(isSamePlan(actualQueryPlan, planNode));
ASSERT_EQ(acutalQueryConfigs.size(), expectedQueryConfigs.size());
for (const auto& [key, value] : acutalQueryConfigs) {
ASSERT_EQ(acutalQueryConfigs.at(key), expectedQueryConfigs.at(key));
}
int i = 0;
for (const auto singleTraceMode : {true, false}) {
const auto tracePath =
fmt::format("{}/mode={}", outputDir->getPath(), singleTraceMode);
trace::createTraceDirectory(tracePath);
auto writer = trace::TaskTraceMetadataWriter(tracePath, pool());
if (singleTraceMode) {
writer.write(queryCtx, planNode, traceNodeId);
} else {
writer.write(queryCtx, planNode);
}
std::unordered_map<std::string, std::string> acutalQueryConfigs;
std::
unordered_map<std::string, std::unordered_map<std::string, std::string>>
actualConnectorProperties;
core::PlanNodePtr actualQueryPlan;
auto reader = trace::TaskTraceMetadataReader(tracePath, pool());
reader.read(acutalQueryConfigs, actualConnectorProperties, actualQueryPlan);

if (singleTraceMode) {
ASSERT_EQ(actualQueryPlan->id(), planNode->id());
ASSERT_EQ(actualQueryPlan->name(), planNode->name());
ASSERT_EQ(actualQueryPlan->sources().size(), planNode->sources().size());
ASSERT_EQ(actualQueryPlan->sources().front()->name(), "DummySource");
} else {
ASSERT_TRUE(isSamePlan(actualQueryPlan, planNode));
}

ASSERT_EQ(
actualConnectorProperties.size(), expectedConnectorProperties.size());
ASSERT_EQ(actualConnectorProperties.count("test_trace"), 1);
const auto expectedConnectorConfigs =
expectedConnectorProperties.at("test_trace")->rawConfigsCopy();
const auto actualConnectorConfigs =
actualConnectorProperties.at("test_trace");
for (const auto& [key, value] : actualConnectorConfigs) {
ASSERT_EQ(actualConnectorConfigs.at(key), expectedConnectorConfigs.at(key));
ASSERT_EQ(acutalQueryConfigs.size(), expectedQueryConfigs.size());
for (const auto& [key, value] : acutalQueryConfigs) {
ASSERT_EQ(acutalQueryConfigs.at(key), expectedQueryConfigs.at(key));
}

ASSERT_EQ(
actualConnectorProperties.size(), expectedConnectorProperties.size());
ASSERT_EQ(actualConnectorProperties.count("test_trace"), 1);
const auto expectedConnectorConfigs =
expectedConnectorProperties.at("test_trace")->rawConfigsCopy();
const auto actualConnectorConfigs =
actualConnectorProperties.at("test_trace");
for (const auto& [key, value] : actualConnectorConfigs) {
ASSERT_EQ(
actualConnectorConfigs.at(key), expectedConnectorConfigs.at(key));
}
}
}

Expand Down
Loading

0 comments on commit 4e5f620

Please sign in to comment.