Skip to content

Commit

Permalink
Bugfix/batch size (#209)
Browse files Browse the repository at this point in the history
* Fix a bug at CleanSubmitQueueCb_.

* Add the SHUTDOWN state handling.

Signed-off-by: RileyW <wrllrwwrllrw@gmail.com>

* Fix repeated channel.

Signed-off-by: RileyW <wrllrwwrllrw@gmail.com>

---------

Signed-off-by: RileyW <wrllrwwrllrw@gmail.com>
Co-authored-by: NamelessOIer <namelessoier@protonmail.com>
  • Loading branch information
RileyWen and NamelessOIer authored Jan 8, 2024
1 parent 7637d0d commit df9844c
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 12 deletions.
45 changes: 34 additions & 11 deletions src/CraneCtld/CranedKeeper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -452,6 +452,12 @@ CranedKeeper::CqTag *CranedKeeper::InitCranedStateMachine_(
break;
}

case GRPC_CHANNEL_SHUTDOWN: {
CRANE_WARN("Unexpected InitializingCraned SHUTDOWN state!");
next_tag_type = std::nullopt;
break;
}

case GRPC_CHANNEL_CONNECTING: {
if (raw_craned->m_prev_channel_state_ == GRPC_CHANNEL_CONNECTING) {
if (raw_craned->m_failure_retry_times_ <
Expand Down Expand Up @@ -481,10 +487,6 @@ CranedKeeper::CqTag *CranedKeeper::InitCranedStateMachine_(
// CranedKeeper::RegisterNewCraneds. Execution should never reach here.
CRANE_ERROR("Unexpected InitializingCraned IDLE state!");
break;

case GRPC_CHANNEL_SHUTDOWN:
CRANE_ERROR("Unexpected InitializingCraned SHUTDOWN state!");
break;
}

// CRANE_TRACE("Exit InitCranedStateMachine_");
Expand Down Expand Up @@ -564,6 +566,11 @@ CranedKeeper::CqTag *CranedKeeper::EstablishedCranedStateMachine_(
g_thread_pool->push_task(m_craned_rec_from_temp_failure_cb_,
craned->m_craned_id_);

{
util::lock_guard guard(m_unavail_craned_set_mtx_);
m_connecting_craned_set_.erase(craned->m_craned_id_);
}

next_tag_type = CqTag::kEstablishedCraned;
}
break;
Expand All @@ -575,6 +582,11 @@ CranedKeeper::CqTag *CranedKeeper::EstablishedCranedStateMachine_(
// READY -> TRANSIENT_FAILURE -> CONNECTING
CRANE_TRACE("READY -> TRANSIENT_FAILURE -> CONNECTING");

{
util::lock_guard guard(m_unavail_craned_set_mtx_);
m_connecting_craned_set_.emplace(craned->m_craned_id_);
}

craned->m_invalid_ = true;
if (m_craned_is_temp_down_cb_)
g_thread_pool->push_task(m_craned_is_temp_down_cb_,
Expand Down Expand Up @@ -623,10 +635,18 @@ CranedKeeper::CqTag *CranedKeeper::EstablishedCranedStateMachine_(
break;
}

case GRPC_CHANNEL_SHUTDOWN:
CRANE_ERROR("Unexpected SHUTDOWN channel state on Established Craned {}!",
craned->m_craned_id_);
case GRPC_CHANNEL_SHUTDOWN: {
CRANE_WARN("Unexpected SHUTDOWN channel state on Established Craned {}!",
craned->m_craned_id_);

craned->m_invalid_ = true;
if (m_craned_is_temp_down_cb_)
g_thread_pool->push_task(m_craned_is_temp_down_cb_,
craned->m_craned_id_);

next_tag_type = std::nullopt;
break;
}
}

if (next_tag_type.has_value()) {
Expand Down Expand Up @@ -700,10 +720,13 @@ void CranedKeeper::ConnectCranedNode_(CranedId const &craned_id) {
/*ms*/);
channel_args.SetInt(GRPC_ARG_MAX_RECONNECT_BACKOFF_MS,
30 /*s*/ * 1000 /*ms*/);
// channel_args.SetInt(GRPC_ARG_KEEPALIVE_TIME_MS, 5 /*s*/ * 1000 /*ms*/);
// channel_args.SetInt(GRPC_ARG_KEEPALIVE_TIMEOUT_MS, 10 /*s*/ * 1000
// /*ms*/); channel_args.SetInt(GRPC_ARG_KEEPALIVE_PERMIT_WITHOUT_CALLS, 1
// /*true*/);

// Sometimes, Craned might crash without cleaning up sockets and
// the socket will remain ESTABLISHED state even if that craned has died.
// Open KeepAlive option in case of such situation.
channel_args.SetInt(GRPC_ARG_KEEPALIVE_TIME_MS, 5 /*s*/ * 1000 /*ms*/);
channel_args.SetInt(GRPC_ARG_KEEPALIVE_TIMEOUT_MS, 6 /*s*/ * 1000 /*ms*/);
channel_args.SetInt(GRPC_ARG_KEEPALIVE_PERMIT_WITHOUT_CALLS, 1 /*true*/);

CRANE_TRACE("Creating a channel to {}:{}. Channel count: {}", craned_id,
kCranedDefaultPort, m_channel_count_.fetch_add(1) + 1);
Expand Down
2 changes: 1 addition & 1 deletion src/CraneCtld/TaskScheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1121,7 +1121,7 @@ void TaskScheduler::CleanSubmitQueueCb_() {
submit_tasks.resize(batch_size);

size_t actual_size =
m_submit_task_queue_.try_dequeue_bulk(submit_tasks.begin(), actual_size);
m_submit_task_queue_.try_dequeue_bulk(submit_tasks.begin(), batch_size);

if (actual_size == 0) return;

Expand Down

0 comments on commit df9844c

Please sign in to comment.