Skip to content

Commit

Permalink
[Bug](pipeline) fix pipeline task call finish_p_dependency more than …
Browse files Browse the repository at this point in the history
…once (apache#20851)

fix pipeline task call finish_p_dependency more than once

When pipeline task meet eos->PENDING_FINISH->CANCELED, this task will call finish_p_dependency twice.
  • Loading branch information
BiteTheDDDDt authored Jun 15, 2023
1 parent 11e164e commit 4bfceb7
Show file tree
Hide file tree
Showing 4 changed files with 15 additions and 13 deletions.
7 changes: 7 additions & 0 deletions be/src/pipeline/pipeline_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,8 @@ QueryContext* PipelineTask::query_context() {

// The FSM see PipelineTaskState's comment
void PipelineTask::set_state(PipelineTaskState state) {
DCHECK(_cur_state != PipelineTaskState::FINISHED);

if (_cur_state == state) {
return;
}
Expand All @@ -301,6 +303,11 @@ void PipelineTask::set_state(PipelineTaskState state) {
COUNTER_UPDATE(_block_by_sink_counts, 1);
}
}

if (state == PipelineTaskState::FINISHED) {
_finish_p_dependency();
}

_cur_state = state;
}

Expand Down
12 changes: 6 additions & 6 deletions be/src/pipeline/pipeline_task.h
Original file line number Diff line number Diff line change
Expand Up @@ -163,12 +163,6 @@ class PipelineTask {

Status finalize();

void finish_p_dependency() {
for (const auto& p : _pipeline->_parents) {
p.lock()->finish_one_dependency(_previous_schedule_id);
}
}

PipelineFragmentContext* fragment_context() { return _fragment_context; }

QueryContext* query_context();
Expand Down Expand Up @@ -215,6 +209,12 @@ class PipelineTask {
int get_core_id() const { return this->_core_id; }

private:
void _finish_p_dependency() {
for (const auto& p : _pipeline->_parents) {
p.lock()->finish_one_dependency(_previous_schedule_id);
}
}

Status _open();
void _init_profile();
void _fresh_profile_counter();
Expand Down
5 changes: 0 additions & 5 deletions be/src/pipeline/task_scheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,6 @@ void TaskScheduler::_do_work(size_t index) {
"finalize fail:" + status.to_string());
_try_close_task(task, PipelineTaskState::CANCELED);
} else {
task->finish_p_dependency();
_try_close_task(task, PipelineTaskState::FINISHED);
}
continue;
Expand Down Expand Up @@ -338,10 +337,6 @@ void TaskScheduler::_try_close_task(PipelineTask* task, PipelineTaskState state)
return;
}
task->set_state(state);
// TODO: rethink the logic
if (state == PipelineTaskState::CANCELED) {
task->finish_p_dependency();
}
task->fragment_context()->close_a_pipeline();
}
}
Expand Down
4 changes: 2 additions & 2 deletions be/src/runtime/runtime_filter_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -281,8 +281,8 @@ Status RuntimeFilterMergeControllerEntity::merge(const PMergeFilterRequest* requ
auto iter = _filter_map.find(std::to_string(request->filter_id()));
VLOG_ROW << "recv filter id:" << request->filter_id() << " " << request->ShortDebugString();
if (iter == _filter_map.end()) {
LOG(WARNING) << "unknown filter id:" << std::to_string(request->filter_id());
return Status::InvalidArgument("unknown filter id");
return Status::InvalidArgument("unknown filter id {}",
std::to_string(request->filter_id()));
}
cntVal = iter->second;
if (auto bf = cntVal->filter->get_bloomfilter()) {
Expand Down

0 comments on commit 4bfceb7

Please sign in to comment.