Skip to content

Commit

Permalink
C++ Client: tear everything down at the time of Client's destruction (#…
Browse files Browse the repository at this point in the history
…4154)

* C++ Client: tear everything down at the time of Client's destruction

* Remove "clean_shutdown_test"
  • Loading branch information
kosak committed Jul 10, 2023
1 parent acb7cd5 commit 988bb49
Show file tree
Hide file tree
Showing 14 changed files with 196 additions and 80 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@ class ClientImpl {
ClientImpl(Private, std::shared_ptr<TableHandleManagerImpl> &&managerImpl);
~ClientImpl();

void shutdown() {
managerImpl_->shutdown();
}

const std::shared_ptr<TableHandleManagerImpl> &managerImpl() const { return managerImpl_; }

private:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ class TableHandleImpl : public std::enable_shared_from_this<TableHandleImpl> {
std::shared_ptr<SubscriptionHandle> subscribe(std::shared_ptr<TickingCallback> callback);
std::shared_ptr<SubscriptionHandle> subscribe(TableHandle::onTickCallback_t onTick,
void *onTickUserData, TableHandle::onErrorCallback_t onError, void *onErrorUserData);
void unsubscribe(std::shared_ptr<SubscriptionHandle> handle);
void unsubscribe(const std::shared_ptr<SubscriptionHandle> &handle);

/**
* Used in tests.
Expand Down Expand Up @@ -249,7 +249,6 @@ class TableHandleImpl : public std::enable_shared_from_this<TableHandleImpl> {
std::shared_ptr<TableHandleManagerImpl> managerImpl_;
Ticket ticket_;
std::shared_ptr<internal::LazyState> lazyState_;
std::set<std::shared_ptr<SubscriptionHandle>> subscriptions_;
};
} // namespace impl
} // namespace deephaven::client
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#include <memory>
#include <optional>
#include "deephaven/client/server/server.h"
#include "deephaven/client/subscription/subscription_handle.h"
#include "deephaven/client/utility/executor.h"

namespace deephaven::client::impl {
Expand All @@ -14,6 +15,7 @@ class TableHandleImpl;
class TableHandleManagerImpl final : public std::enable_shared_from_this<TableHandleManagerImpl> {
struct Private {};
typedef deephaven::client::server::Server Server;
typedef deephaven::client::subscription::SubscriptionHandle SubscriptionHandle;
typedef deephaven::client::utility::Executor Executor;
typedef io::deephaven::proto::backplane::grpc::AsOfJoinTablesRequest AsOfJoinTablesRequest;
typedef io::deephaven::proto::backplane::grpc::ComboAggregateRequest ComboAggregateRequest;
Expand All @@ -37,6 +39,8 @@ class TableHandleManagerImpl final : public std::enable_shared_from_this<TableHa
TableHandleManagerImpl &operator=(const TableHandleManagerImpl &other) = delete;
~TableHandleManagerImpl();

void shutdown();

std::shared_ptr<TableHandleImpl> emptyTable(int64_t size);
std::shared_ptr<TableHandleImpl> fetchTable(std::string tableName);
std::shared_ptr<TableHandleImpl> timeTable(int64_t startTimeNanos, int64_t periodNanos);
Expand All @@ -53,6 +57,9 @@ class TableHandleManagerImpl final : public std::enable_shared_from_this<TableHa

std::shared_ptr<TableHandleImpl> makeTableHandleFromTicket(std::string ticket);

void addSubscriptionHandle(std::shared_ptr<SubscriptionHandle> handle);
void removeSubscriptionHandle(const std::shared_ptr<SubscriptionHandle> &handle);

const std::optional<Ticket> &consoleId() const { return consoleId_; }
const std::shared_ptr<Server> &server() const { return server_; }
const std::shared_ptr<Executor> &executor() const { return executor_; }
Expand All @@ -63,5 +70,10 @@ class TableHandleManagerImpl final : public std::enable_shared_from_this<TableHa
std::shared_ptr<Server> server_;
std::shared_ptr<Executor> executor_;
std::shared_ptr<Executor> flightExecutor_;
// Protects the below for concurrent access.
std::mutex mutex_;
// The SubscriptionHandles for the tables we have subscribed to. We keep these at the TableHandleManagerImpl level
// so we can cleanly shut them all down when the TableHandleManagerImpl::shutdown() is called.
std::set<std::shared_ptr<SubscriptionHandle>> subscriptions_;
};
} // namespace deephaven::client::impl
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,8 @@ class Server : public std::enable_shared_from_this<Server> {
// TODO(kosak): decide on the multithreaded story here
arrow::flight::FlightClient *flightClient() const { return flightClient_.get(); }

void shutdown();

/**
* Allocates a new Ticket from client-managed namespace.
*/
Expand Down Expand Up @@ -268,21 +270,39 @@ class Server : public std::enable_shared_from_this<Server> {
std::string sessionToken_;
std::chrono::milliseconds expirationInterval_;
std::chrono::system_clock::time_point nextHandshakeTime_;
std::thread completionQueueThread_;
std::thread keepAliveThread_;
};

template<typename TReq, typename TResp, typename TStub, typename TPtrToMember>
void Server::sendRpc(const TReq &req, std::shared_ptr<SFCallback<TResp>> responseCallback,
TStub *stub, const TPtrToMember &pm) {
auto now = std::chrono::system_clock::now();
// Keep this in a unique_ptr at first, for cleanup in case addAuthToken throws an exception.
// Keep this in a unique_ptr at first, in case we leave early due to cancellation or exception.
auto response = std::make_unique<ServerResponseHolder<TResp>>(now, std::move(responseCallback));
forEachHeaderNameAndValue([&response](const std::string &name, const std::string &value) {
response->ctx_.AddMetadata(name, value);
});
auto rpc = (stub->*pm)(&response->ctx_, req, &completionQueue_);
// It is the responsibility of "processNextCompletionQueueItem" to deallocate the storage pointed
// to by 'response'.
auto *rp = response.release();
rpc->Finish(&rp->response_, &rp->status_, rp);

// Per the GRPC documentation for CompletionQueue::Shutdown(), we must not add items to the CompletionQueue after
// it has been shut down. So we do a test and enqueue while under lock.
std::unique_lock guard(mutex_);
if (!cancelled_) {
auto rpc = (stub->*pm)(&response->ctx_, req, &completionQueue_);
// It is the responsibility of "processNextCompletionQueueItem" to deallocate the storage pointed
// to by 'response'.
auto *rp = response.release();
rpc->Finish(&rp->response_, &rp->status_, rp);
return;
}

// If we get here, we are cancelled. So instead of enqueuing the request, we need to signal failure to the callback.
// This can be done without holding the lock.
// TODO(kosak): a slight code savings can be achieved if this error code is moved to a non-template context,
// since it is not dependent on any template arguments.
guard.unlock();
const char *message = "Server cancelled. All further RPCs are being rejected";
auto eptr = std::make_exception_ptr(std::runtime_error(DEEPHAVEN_DEBUG_MSG(message)));
response->onFailure(std::move(eptr));
}
} // namespace deephaven::client::server
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,8 @@ class SubscriptionHandle {
public:
virtual ~SubscriptionHandle() = default;
/**
* Cancels the subscription.
* @param wait If true, waits for the internal subcription thread to be torn down. Use 'true'
* if you want to be sure that your callback will not be invoked after this call returns.
* Cancels the subscription and waits for the corresponding thread to die.
*/
virtual void cancel(bool wait) = 0;
virtual void cancel() = 0;
};
} // namespace deephaven::client::subscription
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
#include <functional>
#include <memory>
#include <mutex>
#include <thread>
#include "deephaven/dhcore/utility/callbacks.h"
#include "deephaven/dhcore/utility/utility.h"

Expand All @@ -17,11 +18,13 @@ class Executor {
};

public:
static std::shared_ptr<Executor> create();
static std::shared_ptr<Executor> create(std::string id);

explicit Executor(Private);
explicit Executor(Private, std::string id);
~Executor();

void shutdown();

typedef deephaven::dhcore::utility::Callback<> callback_t;

void invoke(std::shared_ptr<callback_t> f);
Expand All @@ -33,11 +36,14 @@ class Executor {

private:
static void threadStart(std::shared_ptr<Executor> self);
[[noreturn]]
void runForever();
void runUntilCancelled();

// For debugging.
std::string id_;
std::mutex mutex_;
std::condition_variable condvar_;
bool cancelled_ = false;
std::deque<std::shared_ptr<callback_t>> todo_;
std::thread executorThread_;
};
} // namespace deephaven::client::utility
13 changes: 10 additions & 3 deletions cpp-client/deephaven/client/src/client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,8 @@ void printTableData(std::ostream &s, const TableHandle &tableHandle, bool wantHe
} // namespace

Client Client::connect(const std::string &target, const ClientOptions &options) {
auto executor = Executor::create();
auto flightExecutor = Executor::create();
auto executor = Executor::create("Client executor");
auto flightExecutor = Executor::create("Flight executor");
auto server = Server::createFromTarget(target, options);
auto impl = ClientImpl::create(std::move(server), executor, flightExecutor, options.sessionType_);
return Client(std::move(impl));
Expand All @@ -61,7 +61,14 @@ Client::Client(std::shared_ptr<impl::ClientImpl> impl) : impl_(std::move(impl))
}
Client::Client(Client &&other) noexcept = default;
Client &Client::operator=(Client &&other) noexcept = default;
Client::~Client() = default;

// There is only one Client associated with the server connection. Clients can only be moved, not copied.
// When the client owning the state is destructed, we tear down the connection.
Client::~Client() {
if (impl_ != nullptr) {
impl_->shutdown();
}
}

TableHandleManager Client::getManager() const {
return TableHandleManager(impl_->managerImpl());
Expand Down
39 changes: 17 additions & 22 deletions cpp-client/deephaven/client/src/impl/table_handle_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
#include <map>
#include <memory>
#include <mutex>
#include <set>
#include <arrow/flight/client.h>
#include <arrow/flight/types.h>
#include <arrow/scalar.h>
Expand Down Expand Up @@ -371,20 +370,6 @@ std::shared_ptr<TableHandleImpl> TableHandleImpl::asOfJoin(AsOfJoinTablesRequest
return TableHandleImpl::create(managerImpl_, std::move(resultTicket), std::move(ls));
}

std::shared_ptr<SubscriptionHandle> TableHandleImpl::subscribe(
std::shared_ptr<TickingCallback> callback) {
// On the flight executor thread, we invoke DoExchange (waiting for a successful response).
// We wait for that response here. That makes the first part of this call synchronous. If there
// is an error in the DoExchange invocation, the caller will get an exception here. The
// remainder of the interaction (namely, the sending of a BarrageSubscriptionRequest and the
// parsing of all the replies) is done on a newly-created thread dedicated to that job.
auto schema = lazyState_->getSchema();
auto handle = SubscriptionThread::start(managerImpl_->server(), managerImpl_->flightExecutor().get(),
schema, ticket_, std::move(callback));
subscriptions_.insert(handle);
return handle;
}

namespace {
class CStyleTickingCallback final : public TickingCallback {
public:
Expand Down Expand Up @@ -416,12 +401,22 @@ TableHandleImpl::subscribe(TableHandle::onTickCallback_t onTick, void *onTickUse
return subscribe(std::move(cb));
}

void TableHandleImpl::unsubscribe(std::shared_ptr<SubscriptionHandle> handle) {
auto node = subscriptions_.extract(handle);
if (node.empty()) {
return;
}
node.value()->cancel(true);
std::shared_ptr<SubscriptionHandle> TableHandleImpl::subscribe(std::shared_ptr<TickingCallback> callback) {
// On the flight executor thread, we invoke DoExchange (waiting for a successful response).
// We wait for that response here. That makes the first part of this call synchronous. If there
// is an error in the DoExchange invocation, the caller will get an exception here. The
// remainder of the interaction (namely, the sending of a BarrageSubscriptionRequest and the
// parsing of all the replies) is done on a newly-created thread dedicated to that job.
auto schema = lazyState_->getSchema();
auto handle = SubscriptionThread::start(managerImpl_->server(), managerImpl_->flightExecutor().get(),
schema, ticket_, std::move(callback));
managerImpl_->addSubscriptionHandle(handle);
return handle;
}

void TableHandleImpl::unsubscribe(const std::shared_ptr<SubscriptionHandle> &handle) {
managerImpl_->removeSubscriptionHandle(handle);
handle->cancel();
}

std::vector<std::shared_ptr<ColumnImpl>> TableHandleImpl::getColumnImpls() {
Expand Down Expand Up @@ -648,7 +643,7 @@ class GetSchemaCallback final :
arrow::flight::FlightCallOptions options;
server_->forEachHeaderNameAndValue(
[&options](const std::string &name, const std::string &value) {
options.headers.push_back(std::make_pair(name, value));
options.headers.emplace_back(name, value);
}
);

Expand Down
19 changes: 19 additions & 0 deletions cpp-client/deephaven/client/src/impl/table_handle_manager_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,15 @@ TableHandleManagerImpl::TableHandleManagerImpl(Private, std::optional<Ticket> &&

TableHandleManagerImpl::~TableHandleManagerImpl() = default;

void TableHandleManagerImpl::shutdown() {
for (const auto &sub : subscriptions_) {
sub->cancel();
}
executor_->shutdown();
flightExecutor_->shutdown();
server_->shutdown();
}

std::shared_ptr<TableHandleImpl> TableHandleManagerImpl::emptyTable(int64_t size) {
auto resultTicket = server_->newTicket();
auto [cb, ls] = TableHandleImpl::createEtcCallback(nullptr, this, resultTicket);
Expand Down Expand Up @@ -88,4 +97,14 @@ std::shared_ptr<TableHandleImpl> TableHandleManagerImpl::makeTableHandleFromTick
server_->getExportedTableCreationResponseAsync(resultTicket, std::move(cb));
return TableHandleImpl::create(shared_from_this(), std::move(resultTicket), std::move(ls));
}

void TableHandleManagerImpl::addSubscriptionHandle(std::shared_ptr<SubscriptionHandle> handle) {
std::unique_lock guard(mutex_);
subscriptions_.insert(std::move(handle));
}

void TableHandleManagerImpl::removeSubscriptionHandle(const std::shared_ptr<SubscriptionHandle> &handle) {
std::unique_lock guard(mutex_);
subscriptions_.erase(handle);
}
} // namespace deephaven::client::impl
33 changes: 29 additions & 4 deletions cpp-client/deephaven/client/src/server/server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -204,10 +204,8 @@ std::shared_ptr<Server> Server::createFromTarget(
auto result = std::make_shared<Server>(Private(), std::move(as), std::move(cs),
std::move(ss), std::move(ts), std::move(cfs), std::move(fc), copts.extraHeaders(),
std::move(sessionToken), expirationInterval, nextHandshakeTime);
std::thread t1(&processCompletionQueueForever, result);
std::thread t2(&sendKeepaliveMessages, result);
t1.detach();
t2.detach();
result->completionQueueThread_ = std::thread(&processCompletionQueueForever, result);
result->keepAliveThread_ = std::thread(&sendKeepaliveMessages, result);
return result;
}

Expand Down Expand Up @@ -236,6 +234,28 @@ Server::Server(Private,

Server::~Server() = default;

void Server::shutdown() {
// TODO(cristianferretti): change to logging framework
std::cerr << DEEPHAVEN_DEBUG_MSG("Server shutdown requested\n");

std::unique_lock<std::mutex> guard(mutex_);
if (cancelled_) {
guard.unlock(); // to be nice
std::cerr << DEEPHAVEN_DEBUG_MSG("Already cancelled\n");
return;
}
cancelled_ = true;
guard.unlock();

// This will cause the completion queue thread to shut down.
completionQueue_.Shutdown();
// This will cause the handshake thread to shut down (because cancelled_ is true).
condVar_.notify_all();

completionQueueThread_.join();
keepAliveThread_.join();
}

namespace {
Ticket makeNewTicket(int32_t ticketId) {
constexpr auto ticketSize = sizeof(ticketId);
Expand Down Expand Up @@ -505,6 +525,8 @@ void Server::processCompletionQueueForever(const std::shared_ptr<Server> &self)
break;
}
}
// TODO(cristianferretti): change to logging framework
std::cerr << DEEPHAVEN_DEBUG_MSG("Process completion queue thread exiting\n");
}

bool Server::processNextCompletionQueueItem() {
Expand Down Expand Up @@ -583,6 +605,9 @@ void Server::sendKeepaliveMessages(const std::shared_ptr<Server> &self) {
break;
}
}

// TODO(cristianferretti): change to logging framework
std::cerr << DEEPHAVEN_DEBUG_MSG("Keepalive thread exiting\n");
}

bool Server::keepaliveHelper() {
Expand Down
Loading

0 comments on commit 988bb49

Please sign in to comment.