Skip to content

Commit

Permalink
[#25373] YSQL: Avoid lazy initialization of YsqlAdvisoryLocksTable
Browse files Browse the repository at this point in the history
Summary:
Lazy initialization of the `YsqlAdvisoryLocksTable` object requires too much of tangling code. It is much easier to use `std::shared_future<client::YBClient*>` instead of `client::YBClient&` in the `YsqlAdvisoryLocksTable`'s constructor. Like other classes does: `PgTableCache`, `CDCStateTable`.

**Note:** In context of this diff some additional improvement is performed:
- created helper function `ValueAsFuture`
- removed redundant constructor from `CDCStateTable` (it breaks thread safety)
- included missed headers
Jira: DB-14603

Test Plan: Jenkins

Reviewers: pjain, hsunder, bkolagani, yyan, xCluster

Reviewed By: hsunder

Subscribers: ycdcxcluster, ybase, yql

Tags: #jenkins-ready

Differential Revision: https://phorge.dev.yugabyte.com/D40808
  • Loading branch information
d-uspenskiy committed Dec 22, 2024
1 parent 79c1abb commit 30962ae
Show file tree
Hide file tree
Showing 27 changed files with 216 additions and 228 deletions.
68 changes: 21 additions & 47 deletions src/yb/cdc/cdc_state_table.cc
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@
#include "yb/util/atomic.h"
#include "yb/util/logging.h"
#include "yb/util/stol_utils.h"

#include "yb/util/string_util.h"

#include "yb/yql/cql/ql/util/statement_result.h"

DEFINE_RUNTIME_int32(cdc_state_table_num_tablets, 0,
Expand Down Expand Up @@ -323,8 +323,6 @@ CDCStateTable::CDCStateTable(std::shared_future<client::YBClient*> client_future
CHECK(client_future_.valid());
}

CDCStateTable::CDCStateTable(client::YBClient* client) : client_(client) { CHECK_NOTNULL(client); }

std::string CDCStateTableKey::ToString() const {
return Format(
"TabletId: $0, StreamId: $1 $2", tablet_id, stream_id,
Expand Down Expand Up @@ -435,33 +433,28 @@ Result<master::CreateTableRequestPB> CDCStateTable::GenerateCreateCdcStateTableR
}

Status CDCStateTable::WaitForCreateTableToFinishWithCache() {
if (created_) {
return Status::OK();
if (!created_) {
RETURN_NOT_OK(WaitForCreateTableToFinishWithoutCache());
created_ = true;
}
auto* client = VERIFY_RESULT(GetClient());
RETURN_NOT_OK(client->WaitForCreateTableToFinish(kCdcStateYBTableName));
created_ = true;
return Status::OK();
}

Status CDCStateTable::WaitForCreateTableToFinishWithoutCache() {
auto* client = VERIFY_RESULT(GetClient());
return client->WaitForCreateTableToFinish(kCdcStateYBTableName);
return client().WaitForCreateTableToFinish(kCdcStateYBTableName);
}

Status CDCStateTable::OpenTable(client::TableHandle* cdc_table) {
auto* client = VERIFY_RESULT(GetClient());
RETURN_NOT_OK(cdc_table->Open(kCdcStateYBTableName, client));
return Status::OK();
Result<std::shared_ptr<client::TableHandle>> CDCStateTable::OpenTable() {
auto cdc_table = std::make_shared<client::TableHandle>();
RETURN_NOT_OK(cdc_table->Open(kCdcStateYBTableName, &client()));
return cdc_table;
}

Result<std::shared_ptr<client::TableHandle>> CDCStateTable::GetTable() {
bool use_cache = GetAtomicFlag(&FLAGS_enable_cdc_state_table_caching);
if (!use_cache) {
RETURN_NOT_OK(WaitForCreateTableToFinishWithoutCache());
auto cdc_table = std::make_shared<client::TableHandle>();
RETURN_NOT_OK(OpenTable(cdc_table.get()));
return cdc_table;
return OpenTable();
}

{
Expand All @@ -472,30 +465,16 @@ Result<std::shared_ptr<client::TableHandle>> CDCStateTable::GetTable() {
}

std::lock_guard l(mutex_);
if (cdc_table_) {
return cdc_table_;
if (!cdc_table_) {
RETURN_NOT_OK(WaitForCreateTableToFinishWithCache());
cdc_table_ = VERIFY_RESULT(OpenTable());
}
RETURN_NOT_OK(WaitForCreateTableToFinishWithCache());
auto cdc_table = std::make_shared<client::TableHandle>();
RETURN_NOT_OK(OpenTable(cdc_table.get()));
cdc_table_.swap(cdc_table);
return cdc_table_;
}

Result<client::YBClient*> CDCStateTable::GetClient() {
if (!client_) {
CHECK(client_future_.valid());
client_ = client_future_.get();
}

SCHECK(client_, IllegalState, "CDC Client not initialized or shutting down");
return client_;
}

Result<std::shared_ptr<client::YBSession>> CDCStateTable::GetSession() {
auto* client = VERIFY_RESULT(GetClient());
auto session = client->NewSession(client->default_rpc_timeout());
return session;
std::shared_ptr<client::YBSession> CDCStateTable::MakeSession() {
auto& c = client();
return c.NewSession(c.default_rpc_timeout());
}

template <class CDCEntry>
Expand All @@ -509,7 +488,7 @@ Status CDCStateTable::WriteEntriesAsync(
}

auto cdc_table = VERIFY_RESULT(GetTable());
auto session = VERIFY_RESULT(GetSession());
auto session = MakeSession();

std::vector<client::YBOperationPtr> ops;
ops.reserve(entries.size() * 2);
Expand Down Expand Up @@ -621,14 +600,9 @@ Result<CDCStateTableRange> CDCStateTable::GetTableRange(

Result<CDCStateTableRange> CDCStateTable::GetTableRangeAsync(
CDCStateTableEntrySelector&& field_filter, Status* iteration_status) {
auto* client = VERIFY_RESULT(GetClient());

bool table_creation_in_progress = false;
RETURN_NOT_OK(client->IsCreateTableInProgress(kCdcStateYBTableName, &table_creation_in_progress));
if (table_creation_in_progress) {
return STATUS(Uninitialized, "CDC State Table creation is in progress");
}

bool creation_in_progress = false;
RETURN_NOT_OK(client().IsCreateTableInProgress(kCdcStateYBTableName, &creation_in_progress));
SCHECK(!creation_in_progress, Uninitialized, "CDC State Table creation is in progress");
return GetTableRange(std::move(field_filter), iteration_status);
}

Expand All @@ -645,7 +619,7 @@ Result<std::optional<CDCStateTableEntry>> CDCStateTable::TryFetchEntry(
narrow_cast<ColumnIdRep>(Schema::first_column_id() + kCdcStreamIdIdx);

auto cdc_table = VERIFY_RESULT(GetTable());
auto session = VERIFY_RESULT(GetSession());
auto session = MakeSession();

const auto read_op = cdc_table->NewReadOp();
auto* const req_read = read_op->mutable_request();
Expand Down
16 changes: 10 additions & 6 deletions src/yb/cdc/cdc_state_table.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,22 @@

#pragma once

#include <future>
#include <memory>
#include <optional>
#include <shared_mutex>
#include <string>
#include <unordered_set>
#include <vector>

#include "yb/client/table_handle.h"

#include "yb/common/opid.h"

#include "yb/util/status.h"
#include "yb/gutil/thread_annotations.h"

#include "yb/util/status.h"

namespace yb {

namespace client {
Expand Down Expand Up @@ -120,7 +126,6 @@ class CDCStateTableRange;
class CDCStateTable {
public:
explicit CDCStateTable(std::shared_future<client::YBClient*> client_future);
explicit CDCStateTable(client::YBClient* client);

static const std::string& GetNamespaceName();
static const std::string& GetTableName();
Expand Down Expand Up @@ -148,12 +153,12 @@ class CDCStateTable {
const CDCStateTableKey& key, CDCStateTableEntrySelector&& field_filter = {}) EXCLUDES(mutex_);

private:
Result<client::YBClient*> GetClient();
Result<std::shared_ptr<client::YBSession>> GetSession();
client::YBClient& client() { return *client_future_.get(); }
std::shared_ptr<client::YBSession> MakeSession();
Status WaitForCreateTableToFinishWithCache() REQUIRES(mutex_);
Status WaitForCreateTableToFinishWithoutCache();
Result<std::shared_ptr<client::TableHandle>> GetTable() EXCLUDES(mutex_);
Status OpenTable(client::TableHandle* cdc_table);
Result<std::shared_ptr<client::TableHandle>> OpenTable();
template <class CDCEntry>
Status WriteEntriesAsync(
const std::vector<CDCEntry>& entries, QLWriteRequestPB::QLStmtType statement_type,
Expand All @@ -169,7 +174,6 @@ class CDCStateTable {

std::shared_mutex mutex_;
std::shared_future<client::YBClient*> client_future_;
client::YBClient* client_ = nullptr;

std::shared_ptr<client::TableHandle> cdc_table_ GUARDED_BY(mutex_);
bool created_ GUARDED_BY(mutex_) = false;
Expand Down
38 changes: 21 additions & 17 deletions src/yb/client/advisory_lock-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -11,26 +11,35 @@
// under the License.
//

#include <future>
#include <optional>

#include "yb/client/meta_cache.h"
#include "yb/client/session.h"
#include "yb/client/table.h"
#include "yb/client/transaction.h"
#include "yb/client/transaction_pool.h"
#include "yb/client/yb_op.h"
#include "yb/client/yb_table_name.h"

#include "yb/common/transaction_error.h"

#include "yb/integration-tests/cluster_itest_util.h"
#include "yb/integration-tests/mini_cluster.h"
#include "yb/integration-tests/yb_mini_cluster_test_base.h"

#include "yb/master/master_defaults.h"

#include "yb/rpc/sidecars.h"

#include "yb/tablet/tablet.h"
#include "yb/tablet/tablet_peer.h"

#include "yb/tserver/mini_tablet_server.h"
#include "yb/tserver/tablet_server.h"
#include "yb/tserver/ysql_advisory_lock_table.h"

#include "yb/integration-tests/mini_cluster.h"
#include "yb/integration-tests/yb_mini_cluster_test_base.h"

#include "yb/rpc/sidecars.h"
#include "yb/util/std_util.h"
#include "yb/util/test_thread_holder.h"

DECLARE_int32(catalog_manager_bg_task_wait_ms);
Expand Down Expand Up @@ -87,10 +96,11 @@ class AdvisoryLockTest: public MiniClusterTestWithClient<MiniCluster> {

Status WaitForCreateTableToFinishAndLoadTable() {
client::YBTableName table_name(
YQL_DATABASE_CQL, master::kSystemNamespaceName, kPgAdvisoryLocksTableName);
YQL_DATABASE_CQL, master::kSystemNamespaceName,
std::string(tserver::kPgAdvisoryLocksTableName));
RETURN_NOT_OK(client_->WaitForCreateTableToFinish(
table_name, CoarseMonoClock::Now() + 10s * kTimeMultiplier));
advisory_locks_table_ = GetYsqlAdvisoryLocksTable();
advisory_locks_table_.emplace(ValueAsFuture(client_.get()));
table_ = VERIFY_RESULT(advisory_locks_table_->GetTable());
return Status::OK();
}
Expand All @@ -104,16 +114,11 @@ class AdvisoryLockTest: public MiniClusterTestWithClient<MiniCluster> {

Result<std::vector<client::internal::RemoteTabletPtr>> GetTablets() {
CHECK_NOTNULL(table_.get());
auto future = client_->LookupAllTabletsFuture(table_, CoarseMonoClock::Now() + 10s);
return future.get();
}

std::unique_ptr<YsqlAdvisoryLocksTable> GetYsqlAdvisoryLocksTable() {
return std::make_unique<YsqlAdvisoryLocksTable>(*client_.get());
return client_->LookupAllTabletsFuture(table_, CoarseMonoClock::Now() + 10s).get();
}

Result<client::YBTablePtr> GetTable() {
return GetYsqlAdvisoryLocksTable()->GetTable();
return tserver::YsqlAdvisoryLocksTable(ValueAsFuture(client_.get())).GetTable();
}

Result<client::YBTransactionPtr> StartTransaction(
Expand All @@ -125,9 +130,8 @@ class AdvisoryLockTest: public MiniClusterTestWithClient<MiniCluster> {
return txn;
}

Status Commit(client::YBTransactionPtr txn) {
auto commit_future = txn->CommitFuture(TransactionRpcDeadline());
return commit_future.get();
static Status Commit(client::YBTransactionPtr txn) {
return txn->CommitFuture(TransactionRpcDeadline()).get();
}

protected:
Expand All @@ -137,8 +141,8 @@ class AdvisoryLockTest: public MiniClusterTestWithClient<MiniCluster> {
}

std::unique_ptr<rpc::Sidecars> sidecars_;
std::unique_ptr<YsqlAdvisoryLocksTable> advisory_locks_table_;
client::YBTablePtr table_;
std::optional<tserver::YsqlAdvisoryLocksTable> advisory_locks_table_;
};

TEST_F(AdvisoryLockTest, TestAdvisoryLockTableCreated) {
Expand Down
10 changes: 5 additions & 5 deletions src/yb/integration-tests/cdc_service-int-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,7 @@ void AssertChangeRecords(
}

void VerifyCdcStateNotEmpty(client::YBClient* client) {
CDCStateTable cdc_state_table(client);
auto cdc_state_table = MakeCDCStateTable(client);
Status s;
auto table_range = ASSERT_RESULT(
cdc_state_table.GetTableRange(CDCStateTableEntrySelector().IncludeCheckpoint(), &s));
Expand All @@ -278,7 +278,7 @@ Status VerifyCdcStateMatches(
LOG(INFO) << Format("Verifying tablet: $0, stream: $1, op_id: $2",
tablet_id, stream_id, OpId(term, index).ToString());

CDCStateTable cdc_state_table(client);
auto cdc_state_table = MakeCDCStateTable(client);
auto row = VERIFY_RESULT(cdc_state_table.TryFetchEntry(
{tablet_id, stream_id}, CDCStateTableEntrySelector().IncludeCheckpoint()));
SCHECK(row, IllegalState, "CDC state row not found");
Expand All @@ -295,7 +295,7 @@ void VerifyStreamDeletedFromCdcState(
const xrepl::StreamId& stream_id,
const TabletId& tablet_id,
int timeout_secs = 10) {
CDCStateTable cdc_state_table(client);
auto cdc_state_table = MakeCDCStateTable(client);

// The deletion of cdc_state rows for the specified stream happen in an asynchronous thread,
// so even if the request has returned, it doesn't mean that the rows have been deleted yet.
Expand Down Expand Up @@ -2004,7 +2004,7 @@ TEST_F(CDCServiceTestDurableMinReplicatedIndex, TestBootstrapProducer) {

// Verify that for each of the table's tablets, a new row in cdc_state table with the returned
// id was inserted.
CDCStateTable cdc_state_table(client_.get());
auto cdc_state_table = MakeCDCStateTable(client_.get());
Status s;
auto table_range = ASSERT_RESULT(
cdc_state_table.GetTableRange(CDCStateTableEntrySelector().IncludeCheckpoint(), &s));
Expand Down Expand Up @@ -2161,7 +2161,7 @@ TEST_F(CDCServiceTestDurableMinReplicatedIndex, TestCdcMinReplicatedIndexAreRese
WaitForCDCIndex(
tablet_id, CDCService(tserver), 5, 4 * FLAGS_update_min_cdc_indices_interval_secs);

CDCStateTable cdc_state_table(client_.get());
auto cdc_state_table = MakeCDCStateTable(client_.get());

std::vector<CDCStateTableKey> keys_to_delete;
for (auto& stream_id : stream_ids) {
Expand Down
Loading

0 comments on commit 30962ae

Please sign in to comment.