Skip to content

Commit

Permalink
[native] Advance Velox and apply TaskState changes
Browse files Browse the repository at this point in the history
  • Loading branch information
zuyu authored and aditi-pandit committed Nov 27, 2024
1 parent af08811 commit 0ee4687
Show file tree
Hide file tree
Showing 4 changed files with 25 additions and 21 deletions.
14 changes: 9 additions & 5 deletions presto-native-execution/presto_cpp/main/PeriodicTaskManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -333,16 +333,20 @@ void PeriodicTaskManager::updateTaskStats() {
RECORD_METRIC_VALUE(
kCounterNumTasksBytesProcessed, taskManager_->getBytesProcessed());
RECORD_METRIC_VALUE(
kCounterNumTasksRunning, taskNumbers[velox::exec::TaskState::kRunning]);
kCounterNumTasksRunning,
taskNumbers[static_cast<int>(velox::exec::TaskState::kRunning)]);
RECORD_METRIC_VALUE(
kCounterNumTasksFinished, taskNumbers[velox::exec::TaskState::kFinished]);
kCounterNumTasksFinished,
taskNumbers[static_cast<int>(velox::exec::TaskState::kFinished)]);
RECORD_METRIC_VALUE(
kCounterNumTasksCancelled,
taskNumbers[velox::exec::TaskState::kCanceled]);
taskNumbers[static_cast<int>(velox::exec::TaskState::kCanceled)]);
RECORD_METRIC_VALUE(
kCounterNumTasksAborted, taskNumbers[velox::exec::TaskState::kAborted]);
kCounterNumTasksAborted,
taskNumbers[static_cast<int>(velox::exec::TaskState::kAborted)]);
RECORD_METRIC_VALUE(
kCounterNumTasksFailed, taskNumbers[velox::exec::TaskState::kFailed]);
kCounterNumTasksFailed,
taskNumbers[static_cast<int>(velox::exec::TaskState::kFailed)]);

const auto driverCounts = taskManager_->getDriverCounts();
RECORD_METRIC_VALUE(kCounterNumQueuedDrivers, driverCounts.numQueuedDrivers);
Expand Down
10 changes: 5 additions & 5 deletions presto-native-execution/presto_cpp/main/PrestoTask.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,15 +40,15 @@ namespace {

protocol::TaskState toPrestoTaskState(exec::TaskState state) {
switch (state) {
case exec::kRunning:
case exec::TaskState::kRunning:
return protocol::TaskState::RUNNING;
case exec::kFinished:
case exec::TaskState::kFinished:
return protocol::TaskState::FINISHED;
case exec::kCanceled:
case exec::TaskState::kCanceled:
return protocol::TaskState::CANCELED;
case exec::kFailed:
case exec::TaskState::kFailed:
return protocol::TaskState::FAILED;
case exec::kAborted:
case exec::TaskState::kAborted:
[[fallthrough]];
default:
return protocol::TaskState::ABORTED;
Expand Down
20 changes: 10 additions & 10 deletions presto-native-execution/presto_cpp/main/TaskManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -629,7 +629,7 @@ std::unique_ptr<TaskInfo> TaskManager::deleteTask(
auto execTask = prestoTask->task;
if (execTask) {
auto state = execTask->state();
if (state == exec::kRunning) {
if (state == exec::TaskState::kRunning) {
execTask->requestAbort();
}
prestoTask->info.stats.endTime =
Expand Down Expand Up @@ -881,13 +881,13 @@ folly::Future<std::unique_ptr<Result>> TaskManager::getResults(
for (;;) {
if (prestoTask->taskStarted) {
// If the task has finished, then send completion result.
if (prestoTask->task->state() == exec::kFinished) {
if (prestoTask->task->state() == exec::TaskState::kFinished) {
promiseHolder->promise.setValue(createCompleteResult(token));
return std::move(future).via(httpSrvCpuExecutor_);
}
// If task is not running let the request timeout. The task may have
// failed at creation time and the coordinator hasn't yet caught up.
if (prestoTask->task->state() == exec::kRunning) {
if (prestoTask->task->state() == exec::TaskState::kRunning) {
getData(
promiseHolder,
folly::to_weak_ptr(state),
Expand Down Expand Up @@ -1166,11 +1166,11 @@ int32_t TaskManager::yieldTasks(

std::array<size_t, 5> TaskManager::getTaskNumbers(size_t& numTasks) const {
std::array<size_t, 5> res{0};
auto taskMap = taskMap_.rlock();
const auto taskMap = *taskMap_.rlock();
numTasks = 0;
for (const auto& pair : *taskMap) {
if (pair.second->task != nullptr) {
++res[pair.second->task->state()];
for (const auto& [_, task] : taskMap) {
if (task->task) {
++res[static_cast<int>(task->task->state())];
++numTasks;
}
}
Expand All @@ -1180,8 +1180,8 @@ std::array<size_t, 5> TaskManager::getTaskNumbers(size_t& numTasks) const {
int64_t TaskManager::getBytesProcessed() const {
const auto taskMap = *taskMap_.rlock();
int64_t totalCount = 0;
for (const auto& pair : taskMap) {
totalCount += pair.second->info.stats.processedInputDataSizeInBytes;
for (const auto& [_, task] : taskMap) {
totalCount += task->info.stats.processedInputDataSizeInBytes;
}
return totalCount;
}
Expand All @@ -1190,7 +1190,7 @@ void TaskManager::shutdown() {
size_t numTasks;
auto taskNumbers = getTaskNumbers(numTasks);
size_t seconds = 0;
while (taskNumbers[velox::exec::TaskState::kRunning] > 0) {
while (taskNumbers[static_cast<int>(exec::TaskState::kRunning)] > 0) {
PRESTO_SHUTDOWN_LOG(INFO)
<< "Waited (" << seconds
<< " seconds so far) for 'Running' tasks to complete. " << numTasks
Expand Down
2 changes: 1 addition & 1 deletion presto-native-execution/velox
Submodule velox updated 67 files
+2 −3 .github/workflows/conbench_upload.yml
+5 −0 scripts/setup-adapters.sh
+1 −0 scripts/velox_env_linux.yml
+1 −0 scripts/velox_env_mac.yml
+3 −0 velox/common/base/Counters.h
+14 −3 velox/common/file/FileSystems.cpp
+6 −0 velox/common/file/FileSystems.h
+0 −13 velox/common/file/tests/FaultyFile.cpp
+1 −97 velox/common/file/tests/FaultyFile.h
+55 −1 velox/common/file/tests/FaultyFileSystem.cpp
+39 −0 velox/common/file/tests/FaultyFileSystem.h
+174 −0 velox/common/file/tests/FaultyFileSystemOperations.h
+22 −0 velox/common/file/tests/FileTest.cpp
+0 −1 velox/connectors/hive/storage_adapters/hdfs/HdfsFileSystem.cpp
+0 −1 velox/connectors/hive/storage_adapters/hdfs/HdfsReadFile.cpp
+0 −1 velox/connectors/hive/storage_adapters/hdfs/HdfsReadFile.h
+7 −0 velox/core/PlanNode.h
+10 −0 velox/core/QueryConfig.h
+4 −1 velox/docs/functions/presto/regexp.rst
+7 −2 velox/docs/functions/spark/regexp.rst
+3 −1 velox/docs/monitoring/metrics.rst
+110 −51 velox/dwio/common/Throttler.cpp
+26 −5 velox/dwio/common/Throttler.h
+163 −36 velox/dwio/common/tests/ThrottlerTest.cpp
+5 −2 velox/exec/Driver.cpp
+32 −77 velox/exec/HashProbe.cpp
+6 −8 velox/exec/HashProbe.h
+3 −1 velox/exec/Operator.cpp
+1 −1 velox/exec/OperatorTraceScan.cpp
+109 −12 velox/exec/Task.cpp
+62 −2 velox/exec/Task.h
+27 −1 velox/exec/TaskStructs.h
+22 −8 velox/exec/TraceUtil.cpp
+6 −9 velox/exec/TraceUtil.h
+0 −164 velox/exec/tests/HashJoinTest.cpp
+264 −2 velox/exec/tests/LocalPartitionTest.cpp
+99 −0 velox/exec/tests/TaskTest.cpp
+43 −3 velox/exec/tests/TraceUtilTest.cpp
+6 −2 velox/exec/tests/utils/HiveConnectorTestBase.cpp
+6 −1 velox/exec/tests/utils/PlanBuilder.cpp
+4 −0 velox/exec/tests/utils/PlanBuilder.h
+87 −0 velox/functions/lib/Re2Functions.h
+2 −85 velox/functions/prestosql/RegexpReplace.h
+2 −0 velox/functions/prestosql/tests/RegexpReplaceTest.cpp
+53 −29 velox/functions/sparksql/RegexFunctions.cpp
+28 −1 velox/functions/sparksql/tests/RegexFunctionsTest.cpp
+9 −2 velox/tool/trace/AggregationReplayer.h
+9 −2 velox/tool/trace/FilterProjectReplayer.h
+1 −0 velox/tool/trace/HashJoinReplayer.cpp
+9 −2 velox/tool/trace/HashJoinReplayer.h
+10 −6 velox/tool/trace/OperatorReplayerBase.cpp
+3 −3 velox/tool/trace/OperatorReplayerBase.h
+9 −2 velox/tool/trace/PartitionedOutputReplayer.cpp
+1 −0 velox/tool/trace/PartitionedOutputReplayer.h
+3 −8 velox/tool/trace/TableScanReplayer.cpp
+8 −5 velox/tool/trace/TableScanReplayer.h
+8 −1 velox/tool/trace/TableWriterReplayer.h
+12 −6 velox/tool/trace/TraceReplayRunner.cpp
+1 −0 velox/tool/trace/TraceReplayRunner.h
+2 −1 velox/tool/trace/tests/AggregationReplayerTest.cpp
+24 −3 velox/tool/trace/tests/FilterProjectReplayerTest.cpp
+90 −9 velox/tool/trace/tests/HashJoinReplayerTest.cpp
+3 −1 velox/tool/trace/tests/PartitionedOutputReplayerTest.cpp
+24 −3 velox/tool/trace/tests/TableScanReplayerTest.cpp
+2 −0 velox/tool/trace/tests/TableWriterReplayerTest.cpp
+20 −0 velox/type/Filter.h
+8 −0 velox/type/tests/FilterTest.cpp

0 comments on commit 0ee4687

Please sign in to comment.