Skip to content

Commit

Permalink
Merge branch 'branch-0.41' into performance-improvements
Browse files Browse the repository at this point in the history
  • Loading branch information
pentschev authored Oct 23, 2024
2 parents 1982736 + b94958d commit 2399389
Show file tree
Hide file tree
Showing 7 changed files with 247 additions and 31 deletions.
21 changes: 15 additions & 6 deletions cpp/include/ucxx/delayed_submission.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ class BaseDelayedSubmissionCollection {
std::string _name{"undefined"}; ///< The human-readable name of the collection, used for logging
bool _enabled{true}; ///< Whether the resource required to process the collection is enabled.
ItemIdType _itemId{0}; ///< The item ID counter, used to allow cancelation.
std::optional<ItemIdType> _processing{
std::nullopt}; ///< The ID of the item being processed, if any.
std::deque<std::pair<ItemIdType, T>> _collection{}; ///< The collection.
std::set<ItemIdType> _canceled{}; ///< IDs of canceled items.
std::mutex _mutex{}; ///< Mutex to provide access to `_collection`.
Expand Down Expand Up @@ -150,10 +152,17 @@ class BaseDelayedSubmissionCollection {
item = std::move(_collection.front());
_collection.pop_front();
if (_canceled.erase(item.first)) continue;
_processing = std::optional<ItemIdType>{item.first};
}

processItem(item.first, item.second);
}

{
// Clear the value of `_processing` as no more requests will be processed.
std::lock_guard<std::mutex> lock(_mutex);
_processing = std::nullopt;
}
}

/**
Expand All @@ -162,17 +171,17 @@ class BaseDelayedSubmissionCollection {
* Cancel a pending callback and thus do not execute it, unless the execution has
* already begun, in which case cancelation cannot be done.
*
* @throws std::runtime_error if the item is being processed and canceling is not
* possible anymore.
*
* @param[in] id the ID of the scheduled item, as returned by `schedule()`.
*/
void cancel(ItemIdType id)
{
std::lock_guard<std::mutex> lock(_mutex);
// TODO: Check if not cancellable anymore? Will likely need a separate set to keep
// track of registered items.
//
// If the callback is already running
// and the user has no way of knowing that but still destroys it, undefined
// behavior may occur.
if (_processing.has_value() && _processing.value() == id)
throw std::runtime_error("Cannot cancel, item is being processed.");

_canceled.insert(id);
ucxx_trace_req("Canceled item: %lu", id);
}
Expand Down
16 changes: 12 additions & 4 deletions cpp/include/ucxx/worker.h
Original file line number Diff line number Diff line change
Expand Up @@ -437,8 +437,12 @@ class Worker : public Component {
* If `period` is `0` this is a blocking call that only returns when the callback has been
* executed and will always return `true`, and if `period` is a positive integer the time
* in nanoseconds will be waited for the callback to complete and return `true` in the
* successful case or `false` otherwise. `period` only applies if the worker progress
* thread is running, otherwise the callback is immediately executed.
* successful case or `false` otherwise. However, if the callback is not cancelable
* anymore (i.e., it has already started), this method will keep retrying and may never
* return if the callback never completes, it is unsafe to return as this would allow the
* caller to destroy the callback and its resources causing undefined behavior. `period`
* only applies if the worker progress thread is running, otherwise the callback is
* immediately executed.
*
* @param[in] callback the callback to execute before progressing the worker.
* @param[in] period the time in nanoseconds to wait for the callback to complete.
Expand All @@ -462,8 +466,12 @@ class Worker : public Component {
* If `period` is `0` this is a blocking call that only returns when the callback has been
* executed and will always return `true`, and if `period` is a positive integer the time
* in nanoseconds will be waited for the callback to complete and return `true` in the
* successful case or `false` otherwise. `period` only applies if the worker progress
* thread is running, otherwise the callback is immediately executed.
* successful case or `false` otherwise. However, if the callback is not cancelable
* anymore (i.e., it has already started), this method will keep retrying and may never
* return if the callback never completes, it is unsafe to return as this would allow the
* caller to destroy the callback and its resources causing undefined behavior. `period`
* only applies if the worker progress thread is running, otherwise the callback is
* immediately executed.
*
* @param[in] callback the callback to execute before progressing the worker.
* @param[in] period the time in nanoseconds to wait for the callback to complete.
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/delayed_submission.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,6 @@ ItemIdType DelayedSubmissionCollection::registerGenericPost(DelayedSubmissionCal

void DelayedSubmissionCollection::cancelGenericPre(ItemIdType id) { _genericPre.cancel(id); }

void DelayedSubmissionCollection::cancelGenericPost(ItemIdType id) { _genericPre.cancel(id); }
void DelayedSubmissionCollection::cancelGenericPost(ItemIdType id) { _genericPost.cancel(id); }

} // namespace ucxx
6 changes: 4 additions & 2 deletions cpp/src/endpoint.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -136,10 +136,12 @@ void Endpoint::create(ucp_ep_params_t* params)
3000000000 /* 3s */))
break;

if (i == maxAttempts - 1)
if (i == maxAttempts - 1) {
status = UCS_ERR_TIMED_OUT;
ucxx_error("Timeout waiting for ucp_ep_create, all attempts failed");
else
} else {
ucxx_warn("Timeout waiting for ucp_ep_create, retrying");
}
}
utils::ucsErrorThrow(status);
} else {
Expand Down
40 changes: 30 additions & 10 deletions cpp/src/worker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -347,11 +347,21 @@ bool Worker::registerGenericPre(DelayedSubmissionCallbackType callback, uint64_t
}
signalWorkerFunction();

auto ret = callbackNotifier.wait(period, signalWorkerFunction);

if (!ret) _delayedSubmissionCollection->cancelGenericPre(id);

return ret;
size_t retryCount = 0;
while (true) {
auto ret = callbackNotifier.wait(period, signalWorkerFunction);

try {
if (!ret) _delayedSubmissionCollection->cancelGenericPre(id);
return ret;
} catch (const std::runtime_error& e) {
if (++retryCount % 10 == 0)
ucxx_warn(
"Could not cancel after %lu attempts, the callback has not returned and the process "
"may stop responding.",
retryCount);
}
}
}
}

Expand Down Expand Up @@ -384,11 +394,21 @@ bool Worker::registerGenericPost(DelayedSubmissionCallbackType callback, uint64_
}
signalWorkerFunction();

auto ret = callbackNotifier.wait(period, signalWorkerFunction);

if (!ret) _delayedSubmissionCollection->cancelGenericPost(id);

return ret;
size_t retryCount = 0;
while (true) {
auto ret = callbackNotifier.wait(period, signalWorkerFunction);

try {
if (!ret) _delayedSubmissionCollection->cancelGenericPost(id);
return ret;
} catch (const std::runtime_error& e) {
if (++retryCount % 10 == 0)
ucxx_warn(
"Could not cancel after %lu attempts, the callback has not returned and the process "
"may stop responding.",
retryCount);
}
}
}
}

Expand Down
2 changes: 1 addition & 1 deletion cpp/src/worker_progress_thread.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ void WorkerProgressThread::stop()
});
_signalWorkerFunction();
if (!callbackNotifierPost.wait(3000000000)) {
_delayedSubmissionCollection->cancelGenericPre(idPost);
_delayedSubmissionCollection->cancelGenericPost(idPost);
}

_thread.join();
Expand Down
Loading

0 comments on commit 2399389

Please sign in to comment.