From 988bb4950b09d65eac0268bc9e7855a7547af49e Mon Sep 17 00:00:00 2001 From: Corey Kosak Date: Sun, 9 Jul 2023 23:36:32 -0400 Subject: [PATCH] C++ Client: tear everything down at the time of Client's destruction (#4154) * C++ Client: tear everything down at the time of Client's destruction * Remove "clean_shutdown_test" --- .../deephaven/client/impl/client_impl.h | 4 ++ .../deephaven/client/impl/table_handle_impl.h | 3 +- .../client/impl/table_handle_manager_impl.h | 12 +++++ .../private/deephaven/client/server/server.h | 32 +++++++++--- .../client/subscription/subscription_handle.h | 6 +-- .../deephaven/client/utility/executor.h | 14 ++++-- cpp-client/deephaven/client/src/client.cc | 13 +++-- .../client/src/impl/table_handle_impl.cc | 39 +++++++-------- .../src/impl/table_handle_manager_impl.cc | 19 +++++++ .../deephaven/client/src/server/server.cc | 33 ++++++++++-- .../src/subscription/subscribe_thread.cc | 46 +++++++++-------- .../deephaven/client/src/utility/executor.cc | 50 +++++++++++++++---- .../dhcore/ticking/subscription_handle.h | 2 +- cpp-client/tests/CMakeLists.txt | 3 +- 14 files changed, 196 insertions(+), 80 deletions(-) diff --git a/cpp-client/deephaven/client/include/private/deephaven/client/impl/client_impl.h b/cpp-client/deephaven/client/include/private/deephaven/client/impl/client_impl.h index 4bdb1842370..3a6d82a3e15 100644 --- a/cpp-client/deephaven/client/include/private/deephaven/client/impl/client_impl.h +++ b/cpp-client/deephaven/client/include/private/deephaven/client/impl/client_impl.h @@ -28,6 +28,10 @@ class ClientImpl { ClientImpl(Private, std::shared_ptr &&managerImpl); ~ClientImpl(); + void shutdown() { + managerImpl_->shutdown(); + } + const std::shared_ptr &managerImpl() const { return managerImpl_; } private: diff --git a/cpp-client/deephaven/client/include/private/deephaven/client/impl/table_handle_impl.h b/cpp-client/deephaven/client/include/private/deephaven/client/impl/table_handle_impl.h index 6238a6d6f2a..b0854fd93be 100644 --- a/cpp-client/deephaven/client/include/private/deephaven/client/impl/table_handle_impl.h +++ b/cpp-client/deephaven/client/include/private/deephaven/client/impl/table_handle_impl.h @@ -219,7 +219,7 @@ class TableHandleImpl : public std::enable_shared_from_this { std::shared_ptr subscribe(std::shared_ptr callback); std::shared_ptr subscribe(TableHandle::onTickCallback_t onTick, void *onTickUserData, TableHandle::onErrorCallback_t onError, void *onErrorUserData); - void unsubscribe(std::shared_ptr handle); + void unsubscribe(const std::shared_ptr &handle); /** * Used in tests. @@ -249,7 +249,6 @@ class TableHandleImpl : public std::enable_shared_from_this { std::shared_ptr managerImpl_; Ticket ticket_; std::shared_ptr lazyState_; - std::set> subscriptions_; }; } // namespace impl } // namespace deephaven::client diff --git a/cpp-client/deephaven/client/include/private/deephaven/client/impl/table_handle_manager_impl.h b/cpp-client/deephaven/client/include/private/deephaven/client/impl/table_handle_manager_impl.h index e4db1eeed61..1a9a935b64a 100644 --- a/cpp-client/deephaven/client/include/private/deephaven/client/impl/table_handle_manager_impl.h +++ b/cpp-client/deephaven/client/include/private/deephaven/client/impl/table_handle_manager_impl.h @@ -6,6 +6,7 @@ #include #include #include "deephaven/client/server/server.h" +#include "deephaven/client/subscription/subscription_handle.h" #include "deephaven/client/utility/executor.h" namespace deephaven::client::impl { @@ -14,6 +15,7 @@ class TableHandleImpl; class TableHandleManagerImpl final : public std::enable_shared_from_this { struct Private {}; typedef deephaven::client::server::Server Server; + typedef deephaven::client::subscription::SubscriptionHandle SubscriptionHandle; typedef deephaven::client::utility::Executor Executor; typedef io::deephaven::proto::backplane::grpc::AsOfJoinTablesRequest AsOfJoinTablesRequest; typedef io::deephaven::proto::backplane::grpc::ComboAggregateRequest ComboAggregateRequest; @@ -37,6 +39,8 @@ class TableHandleManagerImpl final : public std::enable_shared_from_this emptyTable(int64_t size); std::shared_ptr fetchTable(std::string tableName); std::shared_ptr timeTable(int64_t startTimeNanos, int64_t periodNanos); @@ -53,6 +57,9 @@ class TableHandleManagerImpl final : public std::enable_shared_from_this makeTableHandleFromTicket(std::string ticket); + void addSubscriptionHandle(std::shared_ptr handle); + void removeSubscriptionHandle(const std::shared_ptr &handle); + const std::optional &consoleId() const { return consoleId_; } const std::shared_ptr &server() const { return server_; } const std::shared_ptr &executor() const { return executor_; } @@ -63,5 +70,10 @@ class TableHandleManagerImpl final : public std::enable_shared_from_this server_; std::shared_ptr executor_; std::shared_ptr flightExecutor_; + // Protects the below for concurrent access. + std::mutex mutex_; + // The SubscriptionHandles for the tables we have subscribed to. We keep these at the TableHandleManagerImpl level + // so we can cleanly shut them all down when the TableHandleManagerImpl::shutdown() is called. + std::set> subscriptions_; }; } // namespace deephaven::client::impl diff --git a/cpp-client/deephaven/client/include/private/deephaven/client/server/server.h b/cpp-client/deephaven/client/include/private/deephaven/client/server/server.h index 5a765b91d25..46192523db3 100644 --- a/cpp-client/deephaven/client/include/private/deephaven/client/server/server.h +++ b/cpp-client/deephaven/client/include/private/deephaven/client/server/server.h @@ -131,6 +131,8 @@ class Server : public std::enable_shared_from_this { // TODO(kosak): decide on the multithreaded story here arrow::flight::FlightClient *flightClient() const { return flightClient_.get(); } + void shutdown(); + /** * Allocates a new Ticket from client-managed namespace. */ @@ -268,21 +270,39 @@ class Server : public std::enable_shared_from_this { std::string sessionToken_; std::chrono::milliseconds expirationInterval_; std::chrono::system_clock::time_point nextHandshakeTime_; + std::thread completionQueueThread_; + std::thread keepAliveThread_; }; template void Server::sendRpc(const TReq &req, std::shared_ptr> responseCallback, TStub *stub, const TPtrToMember &pm) { auto now = std::chrono::system_clock::now(); - // Keep this in a unique_ptr at first, for cleanup in case addAuthToken throws an exception. + // Keep this in a unique_ptr at first, in case we leave early due to cancellation or exception. auto response = std::make_unique>(now, std::move(responseCallback)); forEachHeaderNameAndValue([&response](const std::string &name, const std::string &value) { response->ctx_.AddMetadata(name, value); }); - auto rpc = (stub->*pm)(&response->ctx_, req, &completionQueue_); - // It is the responsibility of "processNextCompletionQueueItem" to deallocate the storage pointed - // to by 'response'. - auto *rp = response.release(); - rpc->Finish(&rp->response_, &rp->status_, rp); + + // Per the GRPC documentation for CompletionQueue::Shutdown(), we must not add items to the CompletionQueue after + // it has been shut down. So we do a test and enqueue while under lock. + std::unique_lock guard(mutex_); + if (!cancelled_) { + auto rpc = (stub->*pm)(&response->ctx_, req, &completionQueue_); + // It is the responsibility of "processNextCompletionQueueItem" to deallocate the storage pointed + // to by 'response'. + auto *rp = response.release(); + rpc->Finish(&rp->response_, &rp->status_, rp); + return; + } + + // If we get here, we are cancelled. So instead of enqueuing the request, we need to signal failure to the callback. + // This can be done without holding the lock. + // TODO(kosak): a slight code savings can be achieved if this error code is moved to a non-template context, + // since it is not dependent on any template arguments. + guard.unlock(); + const char *message = "Server cancelled. All further RPCs are being rejected"; + auto eptr = std::make_exception_ptr(std::runtime_error(DEEPHAVEN_DEBUG_MSG(message))); + response->onFailure(std::move(eptr)); } } // namespace deephaven::client::server diff --git a/cpp-client/deephaven/client/include/private/deephaven/client/subscription/subscription_handle.h b/cpp-client/deephaven/client/include/private/deephaven/client/subscription/subscription_handle.h index 4d7b10b9bb5..158539397cc 100644 --- a/cpp-client/deephaven/client/include/private/deephaven/client/subscription/subscription_handle.h +++ b/cpp-client/deephaven/client/include/private/deephaven/client/subscription/subscription_handle.h @@ -8,10 +8,8 @@ class SubscriptionHandle { public: virtual ~SubscriptionHandle() = default; /** - * Cancels the subscription. - * @param wait If true, waits for the internal subcription thread to be torn down. Use 'true' - * if you want to be sure that your callback will not be invoked after this call returns. + * Cancels the subscription and waits for the corresponding thread to die. */ - virtual void cancel(bool wait) = 0; + virtual void cancel() = 0; }; } // namespace deephaven::client::subscription diff --git a/cpp-client/deephaven/client/include/private/deephaven/client/utility/executor.h b/cpp-client/deephaven/client/include/private/deephaven/client/utility/executor.h index 9aaf8ea75eb..8b043666677 100644 --- a/cpp-client/deephaven/client/include/private/deephaven/client/utility/executor.h +++ b/cpp-client/deephaven/client/include/private/deephaven/client/utility/executor.h @@ -8,6 +8,7 @@ #include #include #include +#include #include "deephaven/dhcore/utility/callbacks.h" #include "deephaven/dhcore/utility/utility.h" @@ -17,11 +18,13 @@ class Executor { }; public: - static std::shared_ptr create(); + static std::shared_ptr create(std::string id); - explicit Executor(Private); + explicit Executor(Private, std::string id); ~Executor(); + void shutdown(); + typedef deephaven::dhcore::utility::Callback<> callback_t; void invoke(std::shared_ptr f); @@ -33,11 +36,14 @@ class Executor { private: static void threadStart(std::shared_ptr self); - [[noreturn]] - void runForever(); + void runUntilCancelled(); + // For debugging. + std::string id_; std::mutex mutex_; std::condition_variable condvar_; + bool cancelled_ = false; std::deque> todo_; + std::thread executorThread_; }; } // namespace deephaven::client::utility diff --git a/cpp-client/deephaven/client/src/client.cc b/cpp-client/deephaven/client/src/client.cc index 5921a558518..178e73e5235 100644 --- a/cpp-client/deephaven/client/src/client.cc +++ b/cpp-client/deephaven/client/src/client.cc @@ -48,8 +48,8 @@ void printTableData(std::ostream &s, const TableHandle &tableHandle, bool wantHe } // namespace Client Client::connect(const std::string &target, const ClientOptions &options) { - auto executor = Executor::create(); - auto flightExecutor = Executor::create(); + auto executor = Executor::create("Client executor"); + auto flightExecutor = Executor::create("Flight executor"); auto server = Server::createFromTarget(target, options); auto impl = ClientImpl::create(std::move(server), executor, flightExecutor, options.sessionType_); return Client(std::move(impl)); @@ -61,7 +61,14 @@ Client::Client(std::shared_ptr impl) : impl_(std::move(impl)) } Client::Client(Client &&other) noexcept = default; Client &Client::operator=(Client &&other) noexcept = default; -Client::~Client() = default; + +// There is only one Client associated with the server connection. Clients can only be moved, not copied. +// When the client owning the state is destructed, we tear down the connection. +Client::~Client() { + if (impl_ != nullptr) { + impl_->shutdown(); + } +} TableHandleManager Client::getManager() const { return TableHandleManager(impl_->managerImpl()); diff --git a/cpp-client/deephaven/client/src/impl/table_handle_impl.cc b/cpp-client/deephaven/client/src/impl/table_handle_impl.cc index e923010404b..efe96da2dc4 100644 --- a/cpp-client/deephaven/client/src/impl/table_handle_impl.cc +++ b/cpp-client/deephaven/client/src/impl/table_handle_impl.cc @@ -7,7 +7,6 @@ #include #include #include -#include #include #include #include @@ -371,20 +370,6 @@ std::shared_ptr TableHandleImpl::asOfJoin(AsOfJoinTablesRequest return TableHandleImpl::create(managerImpl_, std::move(resultTicket), std::move(ls)); } -std::shared_ptr TableHandleImpl::subscribe( - std::shared_ptr callback) { - // On the flight executor thread, we invoke DoExchange (waiting for a successful response). - // We wait for that response here. That makes the first part of this call synchronous. If there - // is an error in the DoExchange invocation, the caller will get an exception here. The - // remainder of the interaction (namely, the sending of a BarrageSubscriptionRequest and the - // parsing of all the replies) is done on a newly-created thread dedicated to that job. - auto schema = lazyState_->getSchema(); - auto handle = SubscriptionThread::start(managerImpl_->server(), managerImpl_->flightExecutor().get(), - schema, ticket_, std::move(callback)); - subscriptions_.insert(handle); - return handle; -} - namespace { class CStyleTickingCallback final : public TickingCallback { public: @@ -416,12 +401,22 @@ TableHandleImpl::subscribe(TableHandle::onTickCallback_t onTick, void *onTickUse return subscribe(std::move(cb)); } -void TableHandleImpl::unsubscribe(std::shared_ptr handle) { - auto node = subscriptions_.extract(handle); - if (node.empty()) { - return; - } - node.value()->cancel(true); +std::shared_ptr TableHandleImpl::subscribe(std::shared_ptr callback) { + // On the flight executor thread, we invoke DoExchange (waiting for a successful response). + // We wait for that response here. That makes the first part of this call synchronous. If there + // is an error in the DoExchange invocation, the caller will get an exception here. The + // remainder of the interaction (namely, the sending of a BarrageSubscriptionRequest and the + // parsing of all the replies) is done on a newly-created thread dedicated to that job. + auto schema = lazyState_->getSchema(); + auto handle = SubscriptionThread::start(managerImpl_->server(), managerImpl_->flightExecutor().get(), + schema, ticket_, std::move(callback)); + managerImpl_->addSubscriptionHandle(handle); + return handle; +} + +void TableHandleImpl::unsubscribe(const std::shared_ptr &handle) { + managerImpl_->removeSubscriptionHandle(handle); + handle->cancel(); } std::vector> TableHandleImpl::getColumnImpls() { @@ -648,7 +643,7 @@ class GetSchemaCallback final : arrow::flight::FlightCallOptions options; server_->forEachHeaderNameAndValue( [&options](const std::string &name, const std::string &value) { - options.headers.push_back(std::make_pair(name, value)); + options.headers.emplace_back(name, value); } ); diff --git a/cpp-client/deephaven/client/src/impl/table_handle_manager_impl.cc b/cpp-client/deephaven/client/src/impl/table_handle_manager_impl.cc index a1bfa17580d..41a2d1258f5 100644 --- a/cpp-client/deephaven/client/src/impl/table_handle_manager_impl.cc +++ b/cpp-client/deephaven/client/src/impl/table_handle_manager_impl.cc @@ -35,6 +35,15 @@ TableHandleManagerImpl::TableHandleManagerImpl(Private, std::optional && TableHandleManagerImpl::~TableHandleManagerImpl() = default; +void TableHandleManagerImpl::shutdown() { + for (const auto &sub : subscriptions_) { + sub->cancel(); + } + executor_->shutdown(); + flightExecutor_->shutdown(); + server_->shutdown(); +} + std::shared_ptr TableHandleManagerImpl::emptyTable(int64_t size) { auto resultTicket = server_->newTicket(); auto [cb, ls] = TableHandleImpl::createEtcCallback(nullptr, this, resultTicket); @@ -88,4 +97,14 @@ std::shared_ptr TableHandleManagerImpl::makeTableHandleFromTick server_->getExportedTableCreationResponseAsync(resultTicket, std::move(cb)); return TableHandleImpl::create(shared_from_this(), std::move(resultTicket), std::move(ls)); } + +void TableHandleManagerImpl::addSubscriptionHandle(std::shared_ptr handle) { + std::unique_lock guard(mutex_); + subscriptions_.insert(std::move(handle)); +} + +void TableHandleManagerImpl::removeSubscriptionHandle(const std::shared_ptr &handle) { + std::unique_lock guard(mutex_); + subscriptions_.erase(handle); +} } // namespace deephaven::client::impl diff --git a/cpp-client/deephaven/client/src/server/server.cc b/cpp-client/deephaven/client/src/server/server.cc index 68c24822a10..fe29c211f73 100644 --- a/cpp-client/deephaven/client/src/server/server.cc +++ b/cpp-client/deephaven/client/src/server/server.cc @@ -204,10 +204,8 @@ std::shared_ptr Server::createFromTarget( auto result = std::make_shared(Private(), std::move(as), std::move(cs), std::move(ss), std::move(ts), std::move(cfs), std::move(fc), copts.extraHeaders(), std::move(sessionToken), expirationInterval, nextHandshakeTime); - std::thread t1(&processCompletionQueueForever, result); - std::thread t2(&sendKeepaliveMessages, result); - t1.detach(); - t2.detach(); + result->completionQueueThread_ = std::thread(&processCompletionQueueForever, result); + result->keepAliveThread_ = std::thread(&sendKeepaliveMessages, result); return result; } @@ -236,6 +234,28 @@ Server::Server(Private, Server::~Server() = default; +void Server::shutdown() { + // TODO(cristianferretti): change to logging framework + std::cerr << DEEPHAVEN_DEBUG_MSG("Server shutdown requested\n"); + + std::unique_lock guard(mutex_); + if (cancelled_) { + guard.unlock(); // to be nice + std::cerr << DEEPHAVEN_DEBUG_MSG("Already cancelled\n"); + return; + } + cancelled_ = true; + guard.unlock(); + + // This will cause the completion queue thread to shut down. + completionQueue_.Shutdown(); + // This will cause the handshake thread to shut down (because cancelled_ is true). + condVar_.notify_all(); + + completionQueueThread_.join(); + keepAliveThread_.join(); +} + namespace { Ticket makeNewTicket(int32_t ticketId) { constexpr auto ticketSize = sizeof(ticketId); @@ -505,6 +525,8 @@ void Server::processCompletionQueueForever(const std::shared_ptr &self) break; } } + // TODO(cristianferretti): change to logging framework + std::cerr << DEEPHAVEN_DEBUG_MSG("Process completion queue thread exiting\n"); } bool Server::processNextCompletionQueueItem() { @@ -583,6 +605,9 @@ void Server::sendKeepaliveMessages(const std::shared_ptr &self) { break; } } + + // TODO(cristianferretti): change to logging framework + std::cerr << DEEPHAVEN_DEBUG_MSG("Keepalive thread exiting\n"); } bool Server::keepaliveHelper() { diff --git a/cpp-client/deephaven/client/src/subscription/subscribe_thread.cc b/cpp-client/deephaven/client/src/subscription/subscribe_thread.cc index 435ca36e366..88213cadc52 100644 --- a/cpp-client/deephaven/client/src/subscription/subscribe_thread.cc +++ b/cpp-client/deephaven/client/src/subscription/subscribe_thread.cc @@ -74,9 +74,9 @@ class UpdateProcessor final : public SubscriptionHandle { std::shared_ptr schema, std::shared_ptr callback); ~UpdateProcessor() final; - void cancel(bool wait) final; + void cancel() final; - static void runForever(const std::shared_ptr &self); + static void runUntilCancelled(std::shared_ptr self); void runForeverHelper(); private: @@ -84,10 +84,9 @@ class UpdateProcessor final : public SubscriptionHandle { std::shared_ptr schema_; std::shared_ptr callback_; - std::atomic cancelled_ = false; std::mutex mutex_; - std::condition_variable condVar_; - bool threadAlive_ = false; + bool cancelled_ = false; + std::thread thread_; }; class OwningBuffer final : public arrow::Buffer { @@ -106,7 +105,7 @@ struct ColumnSourceAndSize { ColumnSourceAndSize arrayToColumnSource(const arrow::Array &array); } // namespace -std::shared_ptr SubscriptionThread::start( std::shared_ptr server, +std::shared_ptr SubscriptionThread::start(std::shared_ptr server, Executor *flightExecutor, std::shared_ptr schema, const Ticket &ticket, std::shared_ptr callback) { std::promise> promise; @@ -172,30 +171,36 @@ std::shared_ptr UpdateProcessor::startThread( std::shared_ptr callback) { auto result = std::make_shared(std::move(fsr), std::move(schema), std::move(callback)); - std::thread t(&runForever, result); - t.detach(); + result->thread_ = std::thread(&runUntilCancelled, result); return result; } UpdateProcessor::UpdateProcessor(std::unique_ptr fsr, std::shared_ptr schema, std::shared_ptr callback) : fsr_(std::move(fsr)), schema_(std::move(schema)), callback_(std::move(callback)), - cancelled_(false), threadAlive_(true) {} -UpdateProcessor::~UpdateProcessor() = default; + cancelled_(false) {} -void UpdateProcessor::cancel(bool wait) { - cancelled_ = true; - fsr_->Cancel(); - if (!wait) { - return; - } +UpdateProcessor::~UpdateProcessor() { + cancel(); +} + +void UpdateProcessor::cancel() { + // TODO(cristianferretti): change to logging framework + std::cerr << DEEPHAVEN_DEBUG_MSG("Susbcription shutdown requested\n"); std::unique_lock guard(mutex_); - while (threadAlive_) { - condVar_.wait(guard); + if (cancelled_) { + guard.unlock(); // to be nice + std::cerr << DEEPHAVEN_DEBUG_MSG("Already cancelled\n"); + return; } + cancelled_ = true; + guard.unlock(); + + fsr_->Cancel(); + thread_.join(); } -void UpdateProcessor::runForever(const std::shared_ptr &self) { +void UpdateProcessor::runUntilCancelled(std::shared_ptr self) { try { self->runForeverHelper(); } catch (...) { @@ -204,9 +209,6 @@ void UpdateProcessor::runForever(const std::shared_ptr &self) { self->callback_->onFailure(std::current_exception()); } } - std::unique_lock guard(self->mutex_); - self->threadAlive_ = false; - self->condVar_.notify_all(); } void UpdateProcessor::runForeverHelper() { diff --git a/cpp-client/deephaven/client/src/utility/executor.cc b/cpp-client/deephaven/client/src/utility/executor.cc index 4bc75a0d19d..75665c2c558 100644 --- a/cpp-client/deephaven/client/src/utility/executor.cc +++ b/cpp-client/deephaven/client/src/utility/executor.cc @@ -8,24 +8,44 @@ #include "deephaven/dhcore/utility/utility.h" using deephaven::dhcore::utility::streamf; +using deephaven::dhcore::utility::stringf; namespace deephaven::client::utility { -std::shared_ptr Executor::create() { - auto result = std::make_shared(Private()); - std::thread t(&threadStart, result); - t.detach(); +std::shared_ptr Executor::create(std::string id) { + auto result = std::make_shared(Private(), std::move(id)); + result->executorThread_ = std::thread(&threadStart, result); return result; } -Executor::Executor(Private) {} +Executor::Executor(Private, std::string id) : id_(std::move(id)), cancelled_(false) {} Executor::~Executor() = default; +void Executor::shutdown() { + // TODO(cristianferretti): change to logging framework + std::cerr << DEEPHAVEN_DEBUG_MSG(stringf("Executor '%o' shutdown requested\n", id_)); + std::unique_lock guard(mutex_); + if (cancelled_) { + guard.unlock(); // to be nice + std::cerr << DEEPHAVEN_DEBUG_MSG("Already cancelled\n"); + return; + } + cancelled_ = true; + guard.unlock(); + condvar_.notify_all(); + executorThread_.join(); +} + void Executor::invoke(std::shared_ptr cb) { - mutex_.lock(); + std::unique_lock guard(mutex_); auto needsNotify = todo_.empty(); - todo_.push_back(std::move(cb)); - mutex_.unlock(); + if (cancelled_) { + auto message = stringf("Executor '%o' is cancelled: ignoring invoke()\n", id_); + throw std::runtime_error(DEEPHAVEN_DEBUG_MSG(message)); + } else { + todo_.push_back(std::move(cb)); + } + guard.unlock(); if (needsNotify) { condvar_.notify_all(); @@ -33,13 +53,21 @@ void Executor::invoke(std::shared_ptr cb) { } void Executor::threadStart(std::shared_ptr self) { - self->runForever(); + self->runUntilCancelled(); + // TODO(cristianferretti): change to logging framework + std::cerr << DEEPHAVEN_DEBUG_MSG(stringf("Executor '%o' thread exiting\n", self->id_)); } -void Executor::runForever() { +void Executor::runUntilCancelled() { std::unique_lock lock(mutex_); while (true) { - while (todo_.empty()) { + while (true) { + if (cancelled_) { + return; + } + if (!todo_.empty()) { + break; + } condvar_.wait(lock); } std::vector> localCallbacks( diff --git a/cpp-client/deephaven/dhcore/include/private/deephaven/dhcore/ticking/subscription_handle.h b/cpp-client/deephaven/dhcore/include/private/deephaven/dhcore/ticking/subscription_handle.h index 4d7b10b9bb5..5095461b79f 100644 --- a/cpp-client/deephaven/dhcore/include/private/deephaven/dhcore/ticking/subscription_handle.h +++ b/cpp-client/deephaven/dhcore/include/private/deephaven/dhcore/ticking/subscription_handle.h @@ -12,6 +12,6 @@ class SubscriptionHandle { * @param wait If true, waits for the internal subcription thread to be torn down. Use 'true' * if you want to be sure that your callback will not be invoked after this call returns. */ - virtual void cancel(bool wait) = 0; + virtual void cancel() = 0; }; } // namespace deephaven::client::subscription diff --git a/cpp-client/tests/CMakeLists.txt b/cpp-client/tests/CMakeLists.txt index c8be931b81b..87a25b70478 100644 --- a/cpp-client/tests/CMakeLists.txt +++ b/cpp-client/tests/CMakeLists.txt @@ -6,6 +6,7 @@ set(CMAKE_CXX_STANDARD 17) add_executable(tests add_drop_test.cc aggregates_test.cc + attributes_test.cc buffer_column_source_test.cc cython_support_test.cc head_and_tail_test.cc @@ -28,7 +29,7 @@ add_executable(tests view_test.cc third_party/catch.hpp - attributes_test.cc) + ) target_compile_options(tests PRIVATE -Wall -Werror) target_include_directories(tests PUBLIC "..")