Skip to content

Commit

Permalink
Revert "[native] Advance Velox and apply TaskState changes"
Browse files Browse the repository at this point in the history
This reverts commit 0ee4687.
  • Loading branch information
zuyu authored and Mariam Almesfer committed Dec 2, 2024
1 parent 1961e6c commit 3c9b5c9
Show file tree
Hide file tree
Showing 4 changed files with 21 additions and 25 deletions.
14 changes: 5 additions & 9 deletions presto-native-execution/presto_cpp/main/PeriodicTaskManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -333,20 +333,16 @@ void PeriodicTaskManager::updateTaskStats() {
RECORD_METRIC_VALUE(
kCounterNumTasksBytesProcessed, taskManager_->getBytesProcessed());
RECORD_METRIC_VALUE(
kCounterNumTasksRunning,
taskNumbers[static_cast<int>(velox::exec::TaskState::kRunning)]);
kCounterNumTasksRunning, taskNumbers[velox::exec::TaskState::kRunning]);
RECORD_METRIC_VALUE(
kCounterNumTasksFinished,
taskNumbers[static_cast<int>(velox::exec::TaskState::kFinished)]);
kCounterNumTasksFinished, taskNumbers[velox::exec::TaskState::kFinished]);
RECORD_METRIC_VALUE(
kCounterNumTasksCancelled,
taskNumbers[static_cast<int>(velox::exec::TaskState::kCanceled)]);
taskNumbers[velox::exec::TaskState::kCanceled]);
RECORD_METRIC_VALUE(
kCounterNumTasksAborted,
taskNumbers[static_cast<int>(velox::exec::TaskState::kAborted)]);
kCounterNumTasksAborted, taskNumbers[velox::exec::TaskState::kAborted]);
RECORD_METRIC_VALUE(
kCounterNumTasksFailed,
taskNumbers[static_cast<int>(velox::exec::TaskState::kFailed)]);
kCounterNumTasksFailed, taskNumbers[velox::exec::TaskState::kFailed]);

const auto driverCounts = taskManager_->getDriverCounts();
RECORD_METRIC_VALUE(kCounterNumQueuedDrivers, driverCounts.numQueuedDrivers);
Expand Down
10 changes: 5 additions & 5 deletions presto-native-execution/presto_cpp/main/PrestoTask.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,15 +40,15 @@ namespace {

protocol::TaskState toPrestoTaskState(exec::TaskState state) {
switch (state) {
case exec::TaskState::kRunning:
case exec::kRunning:
return protocol::TaskState::RUNNING;
case exec::TaskState::kFinished:
case exec::kFinished:
return protocol::TaskState::FINISHED;
case exec::TaskState::kCanceled:
case exec::kCanceled:
return protocol::TaskState::CANCELED;
case exec::TaskState::kFailed:
case exec::kFailed:
return protocol::TaskState::FAILED;
case exec::TaskState::kAborted:
case exec::kAborted:
[[fallthrough]];
default:
return protocol::TaskState::ABORTED;
Expand Down
20 changes: 10 additions & 10 deletions presto-native-execution/presto_cpp/main/TaskManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -629,7 +629,7 @@ std::unique_ptr<TaskInfo> TaskManager::deleteTask(
auto execTask = prestoTask->task;
if (execTask) {
auto state = execTask->state();
if (state == exec::TaskState::kRunning) {
if (state == exec::kRunning) {
execTask->requestAbort();
}
prestoTask->info.stats.endTime =
Expand Down Expand Up @@ -881,13 +881,13 @@ folly::Future<std::unique_ptr<Result>> TaskManager::getResults(
for (;;) {
if (prestoTask->taskStarted) {
// If the task has finished, then send completion result.
if (prestoTask->task->state() == exec::TaskState::kFinished) {
if (prestoTask->task->state() == exec::kFinished) {
promiseHolder->promise.setValue(createCompleteResult(token));
return std::move(future).via(httpSrvCpuExecutor_);
}
// If task is not running let the request timeout. The task may have
// failed at creation time and the coordinator hasn't yet caught up.
if (prestoTask->task->state() == exec::TaskState::kRunning) {
if (prestoTask->task->state() == exec::kRunning) {
getData(
promiseHolder,
folly::to_weak_ptr(state),
Expand Down Expand Up @@ -1166,11 +1166,11 @@ int32_t TaskManager::yieldTasks(

std::array<size_t, 5> TaskManager::getTaskNumbers(size_t& numTasks) const {
std::array<size_t, 5> res{0};
const auto taskMap = *taskMap_.rlock();
auto taskMap = taskMap_.rlock();
numTasks = 0;
for (const auto& [_, task] : taskMap) {
if (task->task) {
++res[static_cast<int>(task->task->state())];
for (const auto& pair : *taskMap) {
if (pair.second->task != nullptr) {
++res[pair.second->task->state()];
++numTasks;
}
}
Expand All @@ -1180,8 +1180,8 @@ std::array<size_t, 5> TaskManager::getTaskNumbers(size_t& numTasks) const {
int64_t TaskManager::getBytesProcessed() const {
const auto taskMap = *taskMap_.rlock();
int64_t totalCount = 0;
for (const auto& [_, task] : taskMap) {
totalCount += task->info.stats.processedInputDataSizeInBytes;
for (const auto& pair : taskMap) {
totalCount += pair.second->info.stats.processedInputDataSizeInBytes;
}
return totalCount;
}
Expand All @@ -1190,7 +1190,7 @@ void TaskManager::shutdown() {
size_t numTasks;
auto taskNumbers = getTaskNumbers(numTasks);
size_t seconds = 0;
while (taskNumbers[static_cast<int>(exec::TaskState::kRunning)] > 0) {
while (taskNumbers[velox::exec::TaskState::kRunning] > 0) {
PRESTO_SHUTDOWN_LOG(INFO)
<< "Waited (" << seconds
<< " seconds so far) for 'Running' tasks to complete. " << numTasks
Expand Down
2 changes: 1 addition & 1 deletion presto-native-execution/velox
Submodule velox updated 67 files
+3 −2 .github/workflows/conbench_upload.yml
+0 −5 scripts/setup-adapters.sh
+0 −1 scripts/velox_env_linux.yml
+0 −1 scripts/velox_env_mac.yml
+0 −3 velox/common/base/Counters.h
+3 −14 velox/common/file/FileSystems.cpp
+0 −6 velox/common/file/FileSystems.h
+13 −0 velox/common/file/tests/FaultyFile.cpp
+97 −1 velox/common/file/tests/FaultyFile.h
+1 −55 velox/common/file/tests/FaultyFileSystem.cpp
+0 −39 velox/common/file/tests/FaultyFileSystem.h
+0 −174 velox/common/file/tests/FaultyFileSystemOperations.h
+0 −22 velox/common/file/tests/FileTest.cpp
+1 −0 velox/connectors/hive/storage_adapters/hdfs/HdfsFileSystem.cpp
+1 −0 velox/connectors/hive/storage_adapters/hdfs/HdfsReadFile.cpp
+1 −0 velox/connectors/hive/storage_adapters/hdfs/HdfsReadFile.h
+0 −7 velox/core/PlanNode.h
+0 −10 velox/core/QueryConfig.h
+1 −4 velox/docs/functions/presto/regexp.rst
+2 −7 velox/docs/functions/spark/regexp.rst
+1 −3 velox/docs/monitoring/metrics.rst
+51 −110 velox/dwio/common/Throttler.cpp
+5 −26 velox/dwio/common/Throttler.h
+36 −163 velox/dwio/common/tests/ThrottlerTest.cpp
+2 −5 velox/exec/Driver.cpp
+77 −32 velox/exec/HashProbe.cpp
+8 −6 velox/exec/HashProbe.h
+1 −3 velox/exec/Operator.cpp
+1 −1 velox/exec/OperatorTraceScan.cpp
+12 −109 velox/exec/Task.cpp
+2 −62 velox/exec/Task.h
+1 −27 velox/exec/TaskStructs.h
+8 −22 velox/exec/TraceUtil.cpp
+9 −6 velox/exec/TraceUtil.h
+164 −0 velox/exec/tests/HashJoinTest.cpp
+2 −264 velox/exec/tests/LocalPartitionTest.cpp
+0 −99 velox/exec/tests/TaskTest.cpp
+3 −43 velox/exec/tests/TraceUtilTest.cpp
+2 −6 velox/exec/tests/utils/HiveConnectorTestBase.cpp
+1 −6 velox/exec/tests/utils/PlanBuilder.cpp
+0 −4 velox/exec/tests/utils/PlanBuilder.h
+0 −87 velox/functions/lib/Re2Functions.h
+85 −2 velox/functions/prestosql/RegexpReplace.h
+0 −2 velox/functions/prestosql/tests/RegexpReplaceTest.cpp
+29 −53 velox/functions/sparksql/RegexFunctions.cpp
+1 −28 velox/functions/sparksql/tests/RegexFunctionsTest.cpp
+2 −9 velox/tool/trace/AggregationReplayer.h
+2 −9 velox/tool/trace/FilterProjectReplayer.h
+0 −1 velox/tool/trace/HashJoinReplayer.cpp
+2 −9 velox/tool/trace/HashJoinReplayer.h
+6 −10 velox/tool/trace/OperatorReplayerBase.cpp
+3 −3 velox/tool/trace/OperatorReplayerBase.h
+2 −9 velox/tool/trace/PartitionedOutputReplayer.cpp
+0 −1 velox/tool/trace/PartitionedOutputReplayer.h
+8 −3 velox/tool/trace/TableScanReplayer.cpp
+5 −8 velox/tool/trace/TableScanReplayer.h
+1 −8 velox/tool/trace/TableWriterReplayer.h
+6 −12 velox/tool/trace/TraceReplayRunner.cpp
+0 −1 velox/tool/trace/TraceReplayRunner.h
+1 −2 velox/tool/trace/tests/AggregationReplayerTest.cpp
+3 −24 velox/tool/trace/tests/FilterProjectReplayerTest.cpp
+9 −90 velox/tool/trace/tests/HashJoinReplayerTest.cpp
+1 −3 velox/tool/trace/tests/PartitionedOutputReplayerTest.cpp
+3 −24 velox/tool/trace/tests/TableScanReplayerTest.cpp
+0 −2 velox/tool/trace/tests/TableWriterReplayerTest.cpp
+0 −20 velox/type/Filter.h
+0 −8 velox/type/tests/FilterTest.cpp

0 comments on commit 3c9b5c9

Please sign in to comment.