diff --git a/src/yb/cdc/cdc_state_table.cc b/src/yb/cdc/cdc_state_table.cc index d299b4ebfe04..cb7c9c7905d4 100644 --- a/src/yb/cdc/cdc_state_table.cc +++ b/src/yb/cdc/cdc_state_table.cc @@ -30,8 +30,8 @@ #include "yb/util/atomic.h" #include "yb/util/logging.h" #include "yb/util/stol_utils.h" - #include "yb/util/string_util.h" + #include "yb/yql/cql/ql/util/statement_result.h" DEFINE_RUNTIME_int32(cdc_state_table_num_tablets, 0, @@ -323,8 +323,6 @@ CDCStateTable::CDCStateTable(std::shared_future client_future CHECK(client_future_.valid()); } -CDCStateTable::CDCStateTable(client::YBClient* client) : client_(client) { CHECK_NOTNULL(client); } - std::string CDCStateTableKey::ToString() const { return Format( "TabletId: $0, StreamId: $1 $2", tablet_id, stream_id, @@ -435,33 +433,28 @@ Result CDCStateTable::GenerateCreateCdcStateTableR } Status CDCStateTable::WaitForCreateTableToFinishWithCache() { - if (created_) { - return Status::OK(); + if (!created_) { + RETURN_NOT_OK(WaitForCreateTableToFinishWithoutCache()); + created_ = true; } - auto* client = VERIFY_RESULT(GetClient()); - RETURN_NOT_OK(client->WaitForCreateTableToFinish(kCdcStateYBTableName)); - created_ = true; return Status::OK(); } Status CDCStateTable::WaitForCreateTableToFinishWithoutCache() { - auto* client = VERIFY_RESULT(GetClient()); - return client->WaitForCreateTableToFinish(kCdcStateYBTableName); + return client().WaitForCreateTableToFinish(kCdcStateYBTableName); } -Status CDCStateTable::OpenTable(client::TableHandle* cdc_table) { - auto* client = VERIFY_RESULT(GetClient()); - RETURN_NOT_OK(cdc_table->Open(kCdcStateYBTableName, client)); - return Status::OK(); +Result> CDCStateTable::OpenTable() { + auto cdc_table = std::make_shared(); + RETURN_NOT_OK(cdc_table->Open(kCdcStateYBTableName, &client())); + return cdc_table; } Result> CDCStateTable::GetTable() { bool use_cache = GetAtomicFlag(&FLAGS_enable_cdc_state_table_caching); if (!use_cache) { RETURN_NOT_OK(WaitForCreateTableToFinishWithoutCache()); - auto cdc_table = std::make_shared(); - RETURN_NOT_OK(OpenTable(cdc_table.get())); - return cdc_table; + return OpenTable(); } { @@ -472,30 +465,16 @@ Result> CDCStateTable::GetTable() { } std::lock_guard l(mutex_); - if (cdc_table_) { - return cdc_table_; + if (!cdc_table_) { + RETURN_NOT_OK(WaitForCreateTableToFinishWithCache()); + cdc_table_ = VERIFY_RESULT(OpenTable()); } - RETURN_NOT_OK(WaitForCreateTableToFinishWithCache()); - auto cdc_table = std::make_shared(); - RETURN_NOT_OK(OpenTable(cdc_table.get())); - cdc_table_.swap(cdc_table); return cdc_table_; } -Result CDCStateTable::GetClient() { - if (!client_) { - CHECK(client_future_.valid()); - client_ = client_future_.get(); - } - - SCHECK(client_, IllegalState, "CDC Client not initialized or shutting down"); - return client_; -} - -Result> CDCStateTable::GetSession() { - auto* client = VERIFY_RESULT(GetClient()); - auto session = client->NewSession(client->default_rpc_timeout()); - return session; +std::shared_ptr CDCStateTable::MakeSession() { + auto& c = client(); + return c.NewSession(c.default_rpc_timeout()); } template @@ -509,7 +488,7 @@ Status CDCStateTable::WriteEntriesAsync( } auto cdc_table = VERIFY_RESULT(GetTable()); - auto session = VERIFY_RESULT(GetSession()); + auto session = MakeSession(); std::vector ops; ops.reserve(entries.size() * 2); @@ -621,14 +600,9 @@ Result CDCStateTable::GetTableRange( Result CDCStateTable::GetTableRangeAsync( CDCStateTableEntrySelector&& field_filter, Status* iteration_status) { - auto* client = VERIFY_RESULT(GetClient()); - - bool table_creation_in_progress = false; - RETURN_NOT_OK(client->IsCreateTableInProgress(kCdcStateYBTableName, &table_creation_in_progress)); - if (table_creation_in_progress) { - return STATUS(Uninitialized, "CDC State Table creation is in progress"); - } - + bool creation_in_progress = false; + RETURN_NOT_OK(client().IsCreateTableInProgress(kCdcStateYBTableName, &creation_in_progress)); + SCHECK(!creation_in_progress, Uninitialized, "CDC State Table creation is in progress"); return GetTableRange(std::move(field_filter), iteration_status); } @@ -645,7 +619,7 @@ Result> CDCStateTable::TryFetchEntry( narrow_cast(Schema::first_column_id() + kCdcStreamIdIdx); auto cdc_table = VERIFY_RESULT(GetTable()); - auto session = VERIFY_RESULT(GetSession()); + auto session = MakeSession(); const auto read_op = cdc_table->NewReadOp(); auto* const req_read = read_op->mutable_request(); diff --git a/src/yb/cdc/cdc_state_table.h b/src/yb/cdc/cdc_state_table.h index a40555673818..5e2d1ea4e6e3 100644 --- a/src/yb/cdc/cdc_state_table.h +++ b/src/yb/cdc/cdc_state_table.h @@ -12,16 +12,22 @@ #pragma once +#include +#include +#include #include +#include #include +#include #include "yb/client/table_handle.h" #include "yb/common/opid.h" -#include "yb/util/status.h" #include "yb/gutil/thread_annotations.h" +#include "yb/util/status.h" + namespace yb { namespace client { @@ -120,7 +126,6 @@ class CDCStateTableRange; class CDCStateTable { public: explicit CDCStateTable(std::shared_future client_future); - explicit CDCStateTable(client::YBClient* client); static const std::string& GetNamespaceName(); static const std::string& GetTableName(); @@ -148,12 +153,12 @@ class CDCStateTable { const CDCStateTableKey& key, CDCStateTableEntrySelector&& field_filter = {}) EXCLUDES(mutex_); private: - Result GetClient(); - Result> GetSession(); + client::YBClient& client() { return *client_future_.get(); } + std::shared_ptr MakeSession(); Status WaitForCreateTableToFinishWithCache() REQUIRES(mutex_); Status WaitForCreateTableToFinishWithoutCache(); Result> GetTable() EXCLUDES(mutex_); - Status OpenTable(client::TableHandle* cdc_table); + Result> OpenTable(); template Status WriteEntriesAsync( const std::vector& entries, QLWriteRequestPB::QLStmtType statement_type, @@ -169,7 +174,6 @@ class CDCStateTable { std::shared_mutex mutex_; std::shared_future client_future_; - client::YBClient* client_ = nullptr; std::shared_ptr cdc_table_ GUARDED_BY(mutex_); bool created_ GUARDED_BY(mutex_) = false; diff --git a/src/yb/client/advisory_lock-test.cc b/src/yb/client/advisory_lock-test.cc index 75186d10b8dd..a302227462c5 100644 --- a/src/yb/client/advisory_lock-test.cc +++ b/src/yb/client/advisory_lock-test.cc @@ -11,6 +11,9 @@ // under the License. // +#include +#include + #include "yb/client/meta_cache.h" #include "yb/client/session.h" #include "yb/client/table.h" @@ -18,19 +21,25 @@ #include "yb/client/transaction_pool.h" #include "yb/client/yb_op.h" #include "yb/client/yb_table_name.h" + #include "yb/common/transaction_error.h" + #include "yb/integration-tests/cluster_itest_util.h" +#include "yb/integration-tests/mini_cluster.h" +#include "yb/integration-tests/yb_mini_cluster_test_base.h" + #include "yb/master/master_defaults.h" + +#include "yb/rpc/sidecars.h" + #include "yb/tablet/tablet.h" #include "yb/tablet/tablet_peer.h" + #include "yb/tserver/mini_tablet_server.h" #include "yb/tserver/tablet_server.h" #include "yb/tserver/ysql_advisory_lock_table.h" -#include "yb/integration-tests/mini_cluster.h" -#include "yb/integration-tests/yb_mini_cluster_test_base.h" - -#include "yb/rpc/sidecars.h" +#include "yb/util/std_util.h" #include "yb/util/test_thread_holder.h" DECLARE_int32(catalog_manager_bg_task_wait_ms); @@ -87,10 +96,11 @@ class AdvisoryLockTest: public MiniClusterTestWithClient { Status WaitForCreateTableToFinishAndLoadTable() { client::YBTableName table_name( - YQL_DATABASE_CQL, master::kSystemNamespaceName, kPgAdvisoryLocksTableName); + YQL_DATABASE_CQL, master::kSystemNamespaceName, + std::string(tserver::kPgAdvisoryLocksTableName)); RETURN_NOT_OK(client_->WaitForCreateTableToFinish( table_name, CoarseMonoClock::Now() + 10s * kTimeMultiplier)); - advisory_locks_table_ = GetYsqlAdvisoryLocksTable(); + advisory_locks_table_.emplace(ValueAsFuture(client_.get())); table_ = VERIFY_RESULT(advisory_locks_table_->GetTable()); return Status::OK(); } @@ -104,16 +114,11 @@ class AdvisoryLockTest: public MiniClusterTestWithClient { Result> GetTablets() { CHECK_NOTNULL(table_.get()); - auto future = client_->LookupAllTabletsFuture(table_, CoarseMonoClock::Now() + 10s); - return future.get(); - } - - std::unique_ptr GetYsqlAdvisoryLocksTable() { - return std::make_unique(*client_.get()); + return client_->LookupAllTabletsFuture(table_, CoarseMonoClock::Now() + 10s).get(); } Result GetTable() { - return GetYsqlAdvisoryLocksTable()->GetTable(); + return tserver::YsqlAdvisoryLocksTable(ValueAsFuture(client_.get())).GetTable(); } Result StartTransaction( @@ -125,9 +130,8 @@ class AdvisoryLockTest: public MiniClusterTestWithClient { return txn; } - Status Commit(client::YBTransactionPtr txn) { - auto commit_future = txn->CommitFuture(TransactionRpcDeadline()); - return commit_future.get(); + static Status Commit(client::YBTransactionPtr txn) { + return txn->CommitFuture(TransactionRpcDeadline()).get(); } protected: @@ -137,8 +141,8 @@ class AdvisoryLockTest: public MiniClusterTestWithClient { } std::unique_ptr sidecars_; - std::unique_ptr advisory_locks_table_; client::YBTablePtr table_; + std::optional advisory_locks_table_; }; TEST_F(AdvisoryLockTest, TestAdvisoryLockTableCreated) { diff --git a/src/yb/integration-tests/cdc_service-int-test.cc b/src/yb/integration-tests/cdc_service-int-test.cc index f4565b1008d0..9b479f68f6e8 100644 --- a/src/yb/integration-tests/cdc_service-int-test.cc +++ b/src/yb/integration-tests/cdc_service-int-test.cc @@ -255,7 +255,7 @@ void AssertChangeRecords( } void VerifyCdcStateNotEmpty(client::YBClient* client) { - CDCStateTable cdc_state_table(client); + auto cdc_state_table = MakeCDCStateTable(client); Status s; auto table_range = ASSERT_RESULT( cdc_state_table.GetTableRange(CDCStateTableEntrySelector().IncludeCheckpoint(), &s)); @@ -278,7 +278,7 @@ Status VerifyCdcStateMatches( LOG(INFO) << Format("Verifying tablet: $0, stream: $1, op_id: $2", tablet_id, stream_id, OpId(term, index).ToString()); - CDCStateTable cdc_state_table(client); + auto cdc_state_table = MakeCDCStateTable(client); auto row = VERIFY_RESULT(cdc_state_table.TryFetchEntry( {tablet_id, stream_id}, CDCStateTableEntrySelector().IncludeCheckpoint())); SCHECK(row, IllegalState, "CDC state row not found"); @@ -295,7 +295,7 @@ void VerifyStreamDeletedFromCdcState( const xrepl::StreamId& stream_id, const TabletId& tablet_id, int timeout_secs = 10) { - CDCStateTable cdc_state_table(client); + auto cdc_state_table = MakeCDCStateTable(client); // The deletion of cdc_state rows for the specified stream happen in an asynchronous thread, // so even if the request has returned, it doesn't mean that the rows have been deleted yet. @@ -2004,7 +2004,7 @@ TEST_F(CDCServiceTestDurableMinReplicatedIndex, TestBootstrapProducer) { // Verify that for each of the table's tablets, a new row in cdc_state table with the returned // id was inserted. - CDCStateTable cdc_state_table(client_.get()); + auto cdc_state_table = MakeCDCStateTable(client_.get()); Status s; auto table_range = ASSERT_RESULT( cdc_state_table.GetTableRange(CDCStateTableEntrySelector().IncludeCheckpoint(), &s)); @@ -2161,7 +2161,7 @@ TEST_F(CDCServiceTestDurableMinReplicatedIndex, TestCdcMinReplicatedIndexAreRese WaitForCDCIndex( tablet_id, CDCService(tserver), 5, 4 * FLAGS_update_min_cdc_indices_interval_secs); - CDCStateTable cdc_state_table(client_.get()); + auto cdc_state_table = MakeCDCStateTable(client_.get()); std::vector keys_to_delete; for (auto& stream_id : stream_ids) { diff --git a/src/yb/integration-tests/cdc_state_table-test.cc b/src/yb/integration-tests/cdc_state_table-test.cc index bd20a36004e7..0894a64e8085 100644 --- a/src/yb/integration-tests/cdc_state_table-test.cc +++ b/src/yb/integration-tests/cdc_state_table-test.cc @@ -72,7 +72,7 @@ TEST_F(CDCStateTableTest, TestUpdateEntriesWithReplaceMapFalse) { xrepl::StreamId stream_id = ASSERT_RESULT(CreateDBStream()); - CDCStateTable cdc_state_table(test_client()); + auto cdc_state_table = MakeCDCStateTable(test_client()); auto entry_opt = ASSERT_RESULT(cdc_state_table.TryFetchEntry( {tablets[0].tablet_id(), stream_id}, CDCStateTableEntrySelector().IncludeAll())); @@ -126,7 +126,7 @@ TEST_F(CDCStateTableTest, TestUpdateEntriesWithReplaceMapTrue) { xrepl::StreamId stream_id = ASSERT_RESULT(CreateDBStream()); - CDCStateTable cdc_state_table(test_client()); + auto cdc_state_table = MakeCDCStateTable(test_client()); auto entry_opt = ASSERT_RESULT(cdc_state_table.TryFetchEntry( {tablets[0].tablet_id(), stream_id}, CDCStateTableEntrySelector().IncludeAll())); @@ -179,7 +179,7 @@ TEST_F(CDCStateTableTest, TestUpsertEntriesWithReplaceMapFalse) { xrepl::StreamId stream_id = ASSERT_RESULT(CreateDBStream()); - CDCStateTable cdc_state_table(test_client()); + auto cdc_state_table = MakeCDCStateTable(test_client()); auto entry_opt = ASSERT_RESULT(cdc_state_table.TryFetchEntry( {tablets[0].tablet_id(), stream_id}, CDCStateTableEntrySelector().IncludeAll())); @@ -234,7 +234,7 @@ TEST_F(CDCStateTableTest, TestUpsertEntriesWithReplaceMapTrue) { xrepl::StreamId stream_id = ASSERT_RESULT(CreateDBStream()); - CDCStateTable cdc_state_table(test_client()); + auto cdc_state_table = MakeCDCStateTable(test_client()); auto entry_opt = ASSERT_RESULT(cdc_state_table.TryFetchEntry( {tablets[0].tablet_id(), stream_id}, CDCStateTableEntrySelector().IncludeAll())); @@ -288,7 +288,7 @@ TEST_F(CDCStateTableTest, TestUpdateEntriesWithUpdateAndRemoveKeyInSingleBatch) xrepl::StreamId stream_id = ASSERT_RESULT(CreateDBStream()); - CDCStateTable cdc_state_table(test_client()); + auto cdc_state_table = MakeCDCStateTable(test_client()); auto entry_opt = ASSERT_RESULT(cdc_state_table.TryFetchEntry( {tablets[0].tablet_id(), stream_id}, CDCStateTableEntrySelector().IncludeAll())); @@ -349,7 +349,7 @@ TEST_F(CDCStateTableTest, TestRemovingNonExistentKeyFromMap) { xrepl::StreamId stream_id = ASSERT_RESULT(CreateDBStream()); - CDCStateTable cdc_state_table(test_client()); + auto cdc_state_table = MakeCDCStateTable(test_client()); auto entry_opt = ASSERT_RESULT(cdc_state_table.TryFetchEntry( {tablets[0].tablet_id(), stream_id}, CDCStateTableEntrySelector().IncludeAll())); @@ -404,7 +404,7 @@ TEST_F(CDCStateTableTest, TestInsertEntriesWithSameKeyTwice) { xrepl::StreamId stream_id = ASSERT_RESULT(CreateDBStream()); - CDCStateTable cdc_state_table(test_client()); + auto cdc_state_table = MakeCDCStateTable(test_client()); auto entry_opt = ASSERT_RESULT(cdc_state_table.TryFetchEntry( {tablets[0].tablet_id(), stream_id}, CDCStateTableEntrySelector().IncludeAll())); @@ -459,7 +459,7 @@ TEST_F(CDCStateTableTest, TestInsertAndUpsertEntriesWithSameKey) { xrepl::StreamId stream_id = ASSERT_RESULT(CreateDBStream()); - CDCStateTable cdc_state_table(test_client()); + auto cdc_state_table = MakeCDCStateTable(test_client()); auto entry_opt = ASSERT_RESULT(cdc_state_table.TryFetchEntry( {tablets[0].tablet_id(), stream_id}, CDCStateTableEntrySelector().IncludeAll())); @@ -514,7 +514,7 @@ TEST_F(CDCStateTableTest, TestUpdateEntriesWithNoExistingEntry) { xrepl::StreamId stream_id = ASSERT_RESULT(CreateDBStream()); - CDCStateTable cdc_state_table(test_client()); + auto cdc_state_table = MakeCDCStateTable(test_client()); auto entry_opt = ASSERT_RESULT(cdc_state_table.TryFetchEntry( {tablets[0].tablet_id(), stream_id}, CDCStateTableEntrySelector().IncludeAll())); @@ -551,7 +551,7 @@ TEST_F(CDCStateTableTest, TestUpsertEntriesWithNoExistingEntry) { xrepl::StreamId stream_id = ASSERT_RESULT(CreateDBStream()); - CDCStateTable cdc_state_table(test_client()); + auto cdc_state_table = MakeCDCStateTable(test_client()); auto entry_opt = ASSERT_RESULT(cdc_state_table.TryFetchEntry( {tablets[0].tablet_id(), stream_id}, CDCStateTableEntrySelector().IncludeAll())); @@ -591,7 +591,7 @@ TEST_F(CDCStateTableTest, TestUpsertEntriesWithRemoveKey) { xrepl::StreamId stream_id = ASSERT_RESULT(CreateDBStream()); - CDCStateTable cdc_state_table(test_client()); + auto cdc_state_table = MakeCDCStateTable(test_client()); auto entry_opt = ASSERT_RESULT(cdc_state_table.TryFetchEntry( {tablets[0].tablet_id(), stream_id}, CDCStateTableEntrySelector().IncludeAll())); diff --git a/src/yb/integration-tests/cdc_test_util.cc b/src/yb/integration-tests/cdc_test_util.cc index 2d26655d2ded..39a02dc6ef04 100644 --- a/src/yb/integration-tests/cdc_test_util.cc +++ b/src/yb/integration-tests/cdc_test_util.cc @@ -16,9 +16,15 @@ #include #include "yb/cdc/cdc_service.pb.h" +#include "yb/cdc/cdc_service.h" +#include "yb/cdc/cdc_state_table.h" + #include "yb/client/xcluster_client.h" + #include "yb/consensus/log.h" +#include "yb/dockv/doc_key.h" + #include "yb/rpc/rpc_controller.h" #include "yb/tablet/tablet_metadata.h" @@ -31,11 +37,9 @@ #include "yb/util/backoff_waiter.h" #include "yb/util/result.h" +#include "yb/util/std_util.h" #include "yb/util/test_macros.h" -#include "yb/cdc/cdc_service.h" -#include "yb/dockv/doc_key.h" - DECLARE_int32(update_min_cdc_indices_interval_secs); namespace yb { @@ -177,5 +181,10 @@ Result> GetCDCSDKTabletMetrics( SCHECK(tablet_peer, IllegalState, "Tablet not found", tablet_id); return cdc_service.GetCDCSDKTabletMetrics(*tablet_peer.get(), stream_id, create); } + +CDCStateTable MakeCDCStateTable(client::YBClient* client) { + return CDCStateTable(ValueAsFuture(client)); +} + } // namespace cdc } // namespace yb diff --git a/src/yb/integration-tests/cdc_test_util.h b/src/yb/integration-tests/cdc_test_util.h index 4a773283c30a..93fbf8dadb34 100644 --- a/src/yb/integration-tests/cdc_test_util.h +++ b/src/yb/integration-tests/cdc_test_util.h @@ -54,5 +54,8 @@ Result> GetXClusterTabletMetrics( Result> GetCDCSDKTabletMetrics( cdc::CDCServiceImpl& cdc_service, const TabletId& tablet_id, const xrepl::StreamId stream_id, cdc::CreateMetricsEntityIfNotFound create = cdc::CreateMetricsEntityIfNotFound::kTrue); + +CDCStateTable MakeCDCStateTable(client::YBClient* client); + } // namespace cdc } // namespace yb diff --git a/src/yb/integration-tests/cdcsdk_consumption_consistent_changes-test.cc b/src/yb/integration-tests/cdcsdk_consumption_consistent_changes-test.cc index fd38014a24f1..aacf6be352d3 100644 --- a/src/yb/integration-tests/cdcsdk_consumption_consistent_changes-test.cc +++ b/src/yb/integration-tests/cdcsdk_consumption_consistent_changes-test.cc @@ -3159,7 +3159,7 @@ TEST_F(CDCSDKConsumptionConsistentChangesTest, TestConsumptionAfterDroppingTable for (auto& entry : table_1_tablets) { expected_tablets.insert(entry.tablet_id()); } - CDCStateTable cdc_state_table(test_client()); + auto cdc_state_table = MakeCDCStateTable(test_client()); Status s; auto table_range = ASSERT_RESULT(cdc_state_table.GetTableRange(CDCStateTableEntrySelector().IncludeAll(), &s)); diff --git a/src/yb/integration-tests/cdcsdk_snapshot-test.cc b/src/yb/integration-tests/cdcsdk_snapshot-test.cc index cc87fe4f85ce..4d7018040e15 100644 --- a/src/yb/integration-tests/cdcsdk_snapshot-test.cc +++ b/src/yb/integration-tests/cdcsdk_snapshot-test.cc @@ -640,8 +640,8 @@ TEST_F(CDCSDKYsqlTest, InsertedRowInbetweenSnapshot) { } ASSERT_EQ(count, 100); - // Read the cdc_state table veriy that checkpoint set is non-zero - CDCStateTable cdc_state_table(test_client()); + // Read the cdc_state table verify that checkpoint set is non-zero + auto cdc_state_table = MakeCDCStateTable(test_client()); Status s; for (auto row_result : ASSERT_RESULT( cdc_state_table.GetTableRange(CDCStateTableEntrySelector().IncludeCheckpoint(), &s))) { diff --git a/src/yb/integration-tests/cdcsdk_tablet_split-test.cc b/src/yb/integration-tests/cdcsdk_tablet_split-test.cc index fd375bd9f05c..b2777b6b1842 100644 --- a/src/yb/integration-tests/cdcsdk_tablet_split-test.cc +++ b/src/yb/integration-tests/cdcsdk_tablet_split-test.cc @@ -826,11 +826,11 @@ TEST_F(CDCSDKTabletSplitTest, YB_DISABLE_TEST_IN_TSAN(TestTabletSplitBeforeBoots WaitUntilSplitIsSuccesful(tablets.Get(0).tablet_id(), table); SleepFor(MonoDelta::FromSeconds(10)); - // We are checking the 'cdc_state' table just after tablet split is succesfull, but since we + // We are checking the 'cdc_state' table just after tablet split is successful, but since we // haven't started streaming from the parent tablet, we should only see 2 rows. uint seen_rows = 0; TabletId parent_tablet_id = tablets[0].tablet_id(); - CDCStateTable cdc_state_table(test_client()); + auto cdc_state_table = MakeCDCStateTable(test_client()); Status s; ASSERT_OK(WaitFor( [&]() -> Result { @@ -905,7 +905,7 @@ void CDCSDKTabletSplitTest::TestCDCStateTableAfterTabletSplit(CDCCheckpointType // entries, one for the parent tablet and two for the children tablets. uint seen_rows = 0; TabletId parent_tablet_id = tablets[0].tablet_id(); - CDCStateTable cdc_state_table(test_client()); + auto cdc_state_table = MakeCDCStateTable(test_client()); Status s; for (auto row_result : ASSERT_RESULT( cdc_state_table.GetTableRange(CDCStateTableEntrySelector().IncludeCheckpoint(), &s))) { @@ -983,7 +983,7 @@ google::protobuf::RepeatedPtrField tablets; // We will not be seeing the entry corresponding to the parent tablet since that is deleted now. TabletId parent_tablet_id = tablets[0].tablet_id(); - CDCStateTable cdc_state_table(test_client()); + auto cdc_state_table = MakeCDCStateTable(test_client()); Status s; for (auto row_result : ASSERT_RESULT( cdc_state_table.GetTableRange({} /* just key columns */, &s))) { @@ -1848,7 +1848,7 @@ TEST_F(CDCSDKTabletSplitTest, YB_DISABLE_TEST_IN_TSAN(TestSplitAfterSplit)) { } // Verify that the cdc_state has only current set of children tablets. - CDCStateTable cdc_state_table(test_client()); + auto cdc_state_table = MakeCDCStateTable(test_client()); ASSERT_OK(WaitFor( [&]() -> Result { Status s; @@ -2000,7 +2000,7 @@ void CDCSDKTabletSplitTest::TestStreamMetaDataCleanupDropTableAfterTabletSplit( expected_tablet_ids.insert(tablet.tablet_id()); } - CDCStateTable cdc_state_table(test_client()); + auto cdc_state_table = MakeCDCStateTable(test_client()); ASSERT_OK(WaitFor( [&]() -> Result { Status s; @@ -2180,7 +2180,7 @@ void CDCSDKTabletSplitTest::TestCleanUpCDCStreamsMetadataDuringTabletSplit( // Incase there is some lag in completing the execution of delete operation on cdc_state table // triggered by the CleanUpCDCStreamMetadata thread. SleepFor(MonoDelta::FromSeconds(2)); - CDCStateTable cdc_state_table(test_client()); + auto cdc_state_table = MakeCDCStateTable(test_client()); Status s; std::unordered_set tablets_found; for (auto row_result : ASSERT_RESULT(cdc_state_table.GetTableRange( diff --git a/src/yb/integration-tests/cdcsdk_ysql-test.cc b/src/yb/integration-tests/cdcsdk_ysql-test.cc index d42ecfc048b5..8ea00453e55d 100644 --- a/src/yb/integration-tests/cdcsdk_ysql-test.cc +++ b/src/yb/integration-tests/cdcsdk_ysql-test.cc @@ -1060,7 +1060,7 @@ TEST_F(CDCSDKYsqlTest, YB_DISABLE_TEST_IN_TSAN(TestAddTableAfterDropTable)) { expected_tablet_ids.insert(tablets[idx].Get(0).tablet_id()); } - CDCStateTable cdc_state_table(test_client()); + auto cdc_state_table = MakeCDCStateTable(test_client()); Status s; std::unordered_set tablets_found; for (auto row_result : ASSERT_RESULT( @@ -1148,7 +1148,7 @@ TEST_F(CDCSDKYsqlTest, YB_DISABLE_TEST_IN_TSAN(TestAddTableAfterDropTableAndMast expected_tablet_ids.insert(tablets[idx].Get(0).tablet_id()); } - CDCStateTable cdc_state_table(test_client()); + auto cdc_state_table = MakeCDCStateTable(test_client()); Status s; std::unordered_set tablets_found; for (auto row_result : ASSERT_RESULT( @@ -1539,7 +1539,7 @@ void CDCSDKYsqlTest::TestMultipleActiveStreamOnSameTablet(CDCCheckpointType chec OpId min_checkpoint = OpId::Max(); - CDCStateTable cdc_state_table(test_client()); + auto cdc_state_table = MakeCDCStateTable(test_client()); Status s; for (auto row_result : ASSERT_RESULT( cdc_state_table.GetTableRange(CDCStateTableEntrySelector().IncludeCheckpoint(), &s))) { @@ -1663,7 +1663,7 @@ void CDCSDKYsqlTest::TestActiveAndInactiveStreamOnSameTablet(CDCCheckpointType c OpId active_stream_checkpoint; ANNOTATE_UNPROTECTED_WRITE(FLAGS_cdc_state_checkpoint_update_interval_ms) = 100000; - CDCStateTable cdc_state_table(test_client()); + auto cdc_state_table = MakeCDCStateTable(test_client()); Status s; for (auto row_result : ASSERT_RESULT( cdc_state_table.GetTableRange(CDCStateTableEntrySelector().IncludeCheckpoint(), &s))) { @@ -3113,7 +3113,7 @@ TEST_F(CDCSDKYsqlTest, YB_DISABLE_TEST_IN_TSAN(TestStreamMetaDataCleanupMultiTab for (const auto& tablet : tablets[2]) { table_3_tablet_ids.insert(tablet.tablet_id()); } - CDCStateTable cdc_state_table(test_client()); + auto cdc_state_table = MakeCDCStateTable(test_client()); ASSERT_OK(WaitFor( [&]() -> Result { Status s; @@ -3372,7 +3372,7 @@ TEST_F(CDCSDKYsqlTest, YB_DISABLE_TEST_IN_TSAN(TestCreateStreamAfterSetCheckpoin // Forcefully update the checkpoint of the stream as MAX. auto max_commit_op_id = OpId::Max(); - CDCStateTable cdc_state_table(test_client()); + auto cdc_state_table = MakeCDCStateTable(test_client()); CDCStateTableEntry entry(tablets[0].tablet_id(), stream_id); entry.checkpoint = max_commit_op_id; ASSERT_OK(cdc_state_table.UpdateEntries({entry})); @@ -5266,7 +5266,7 @@ TEST_F(CDCSDKYsqlTest, YB_DISABLE_TEST_IN_TSAN(TestBackwardCompatibillitySupport // Here we are creating a scenario where active_time is not set in the cdc_state table because of // older server version, if we upgrade the server where active_time is part of cdc_state table, // GetChanges call should successful not intents GCed error. - CDCStateTable cdc_state_table(test_client()); + auto cdc_state_table = MakeCDCStateTable(test_client()); auto entry_opt = ASSERT_RESULT(cdc_state_table.TryFetchEntry( {tablets[0].tablet_id(), stream_id}, CDCStateTableEntrySelector().IncludeAll())); ASSERT_TRUE(entry_opt.has_value()); @@ -5326,7 +5326,7 @@ TEST_F(CDCSDKYsqlTest, YB_DISABLE_TEST_IN_TSAN(TestBackwardCompatibillitySupport // Here we are creating a scenario where active_time is not set in the cdc_state table because of // older server version, if we upgrade the server where active_time is part of cdc_state table, // GetChanges call should successful not intents GCed error. - CDCStateTable cdc_state_table(test_client()); + auto cdc_state_table = MakeCDCStateTable(test_client()); // Insert some records in transaction. ASSERT_OK(WriteRowsHelper(0 /* start */, 100 /* end */, &test_cluster_, true)); @@ -8299,7 +8299,7 @@ TEST_F(CDCSDKYsqlTest, TestCDCStateEntryForReplicationSlot) { // cdc_state entry for the replication slot should only be seen when replication commands are // enabled and a consistent_snapshot stream is created. - CDCStateTable cdc_state_table(test_client()); + auto cdc_state_table = MakeCDCStateTable(test_client()); auto stream_id_1 = ASSERT_RESULT( CreateConsistentSnapshotStreamWithReplicationSlot(CDCSDKSnapshotOption::USE_SNAPSHOT)); auto checkpoint = ASSERT_RESULT(GetCDCSDKSnapshotCheckpoint(stream_id_1, tablets[0].tablet_id())); @@ -8926,7 +8926,7 @@ void CDCSDKYsqlTest::TestNonEligibleTableShouldNotGetAddedToCDCStream( std::unordered_set actual_tablets; CdcStateTableRow expected_row; - CDCStateTable cdc_state_table(test_client()); + auto cdc_state_table = MakeCDCStateTable(test_client()); Status s; auto table_range = ASSERT_RESULT(cdc_state_table.GetTableRange(CDCStateTableEntrySelector().IncludeAll(), &s)); @@ -9543,7 +9543,7 @@ void CDCSDKYsqlTest::TestChildTabletsOfNonEligibleTableDoNotGetAddedToCDCStream( tablets_not_expected_in_state_table.insert(tablet.tablet_id()); } - CDCStateTable cdc_state_table(test_client()); + auto cdc_state_table = MakeCDCStateTable(test_client()); bool seen_unexpected_tablets = false; Status s; auto table_range = @@ -9682,8 +9682,8 @@ TEST_F(CDCSDKYsqlTest, TestUserTableCleanupWithDropTable) { stream_id, expected_tables, "Waiting for GetDBStreamInfo after drop table.", unqualified_table_ids); // Entries in cdc state table should not have changed as both the tasks - table removal & drop - // table cleanup havent progressed. - CDCStateTable cdc_state_table(test_client()); + // table cleanup haven't progressed. + auto cdc_state_table = MakeCDCStateTable(test_client()); Status s; auto table_range = ASSERT_RESULT(cdc_state_table.GetTableRange({}, &s)); @@ -9857,7 +9857,7 @@ TEST_F(CDCSDKYsqlTest, TestNonEligibleTableCleanupWithDropTable) { /* expected_unqualified_table_ids */ {}); // Entries in cdc state table should not have changed as both the tasks - table removal & drop // table cleanup havent progressed. - CDCStateTable cdc_state_table(test_client()); + auto cdc_state_table = MakeCDCStateTable(test_client()); Status s; auto table_range = ASSERT_RESULT(cdc_state_table.GetTableRange({}, &s)); @@ -10017,7 +10017,7 @@ void CDCSDKYsqlTest::TestRemovalOfColocatedTableFromCDCStream(bool start_removal bool seen_streaming_entry = false; std::unordered_set snapshot_entries_for_colocated_tables; int num_cdc_state_entries = 0; - CDCStateTable cdc_state_table(test_client()); + auto cdc_state_table = MakeCDCStateTable(test_client()); Status s; auto table_range = ASSERT_RESULT(cdc_state_table.GetTableRange({}, &s)); for (auto row_result : table_range) { diff --git a/src/yb/integration-tests/cdcsdk_ysql_test_base.cc b/src/yb/integration-tests/cdcsdk_ysql_test_base.cc index f18515767198..0fb992788649 100644 --- a/src/yb/integration-tests/cdcsdk_ysql_test_base.cc +++ b/src/yb/integration-tests/cdcsdk_ysql_test_base.cc @@ -56,7 +56,7 @@ Result CDCSDKYsqlTest::GetUniverseId(PostgresMiniCluster* cluster) { void CDCSDKYsqlTest::VerifyCdcStateMatches( client::YBClient* client, const xrepl::StreamId& stream_id, const TabletId& tablet_id, const uint64_t term, const uint64_t index) { - CDCStateTable cdc_state_table(client); + auto cdc_state_table = MakeCDCStateTable(client); auto row = ASSERT_RESULT(cdc_state_table.TryFetchEntry( {tablet_id, stream_id}, CDCStateTableEntrySelector().IncludeCheckpoint())); @@ -104,7 +104,7 @@ Result CDCSDKYsqlTest::GetUniverseId(PostgresMiniCluster* cluster) { void CDCSDKYsqlTest::VerifyStreamDeletedFromCdcState( client::YBClient* client, const xrepl::StreamId& stream_id, const TabletId& tablet_id, int timeout_secs) { - CDCStateTable cdc_state_table(client); + auto cdc_state_table = MakeCDCStateTable(client); // The deletion of cdc_state rows for the specified stream happen in an asynchronous thread, // so even if the request has returned, it doesn't mean that the rows have been deleted yet. @@ -119,7 +119,7 @@ Result CDCSDKYsqlTest::GetUniverseId(PostgresMiniCluster* cluster) { Result CDCSDKYsqlTest::GetStreamCheckpointInCdcState( client::YBClient* client, const xrepl::StreamId& stream_id, const TabletId& tablet_id) { - CDCStateTable cdc_state_table(client); + auto cdc_state_table = MakeCDCStateTable(client); auto row = VERIFY_RESULT(cdc_state_table.TryFetchEntry( {tablet_id, stream_id}, CDCStateTableEntrySelector().IncludeCheckpoint())); SCHECK(row, IllegalState, "Row not found in cdc_state table"); @@ -130,7 +130,7 @@ Result CDCSDKYsqlTest::GetUniverseId(PostgresMiniCluster* cluster) { void CDCSDKYsqlTest::VerifyStreamCheckpointInCdcState( client::YBClient* client, const xrepl::StreamId& stream_id, const TabletId& tablet_id, OpIdExpectedValue op_id_expected_value, int timeout_secs) { - CDCStateTable cdc_state_table(client); + auto cdc_state_table = MakeCDCStateTable(client); ASSERT_OK(WaitFor( [&]() -> Result { @@ -2685,7 +2685,7 @@ Result CDCSDKYsqlTest::GetUniverseId(PostgresMiniCluster* cluster) { Result CDCSDKYsqlTest::GetLastActiveTimeFromCdcStateTable( const xrepl::StreamId& stream_id, const TabletId& tablet_id, client::YBClient* client) { - CDCStateTable cdc_state_table(client); + auto cdc_state_table = MakeCDCStateTable(client); auto row = VERIFY_RESULT(cdc_state_table.TryFetchEntry( {tablet_id, stream_id}, CDCStateTableEntrySelector().IncludeActiveTime())); @@ -2698,7 +2698,7 @@ Result CDCSDKYsqlTest::GetUniverseId(PostgresMiniCluster* cluster) { Result> CDCSDKYsqlTest::GetSnapshotDetailsFromCdcStateTable( const xrepl::StreamId& stream_id, const TabletId& tablet_id, client::YBClient* client) { - CDCStateTable cdc_state_table(client); + auto cdc_state_table = MakeCDCStateTable(client); auto row = VERIFY_RESULT(cdc_state_table.TryFetchEntry( {tablet_id, stream_id}, CDCStateTableEntrySelector().IncludeCDCSDKSafeTime().IncludeSnapshotKey())); @@ -2717,7 +2717,7 @@ Result CDCSDKYsqlTest::GetUniverseId(PostgresMiniCluster* cluster) { Result CDCSDKYsqlTest::GetSafeHybridTimeFromCdcStateTable( const xrepl::StreamId& stream_id, const TabletId& tablet_id, client::YBClient* client) { - CDCStateTable cdc_state_table(client); + auto cdc_state_table = MakeCDCStateTable(client); auto row = VERIFY_RESULT(cdc_state_table.TryFetchEntry( {tablet_id, stream_id}, CDCStateTableEntrySelector().IncludeCDCSDKSafeTime())); @@ -2788,7 +2788,7 @@ Result CDCSDKYsqlTest::GetUniverseId(PostgresMiniCluster* cluster) { void CDCSDKYsqlTest::CheckTabletsInCDCStateTable( const std::unordered_set expected_tablet_ids, client::YBClient* client, const xrepl::StreamId& stream_id, const std::string timeout_msg) { - CDCStateTable cdc_state_table(test_client()); + auto cdc_state_table = MakeCDCStateTable(test_client()); Status s; auto table_range = ASSERT_RESULT(cdc_state_table.GetTableRange({}, &s)); @@ -2821,7 +2821,7 @@ Result CDCSDKYsqlTest::GetUniverseId(PostgresMiniCluster* cluster) { Result CDCSDKYsqlTest::GetStateTableRowCount() { int num = 0; - CDCStateTable cdc_state_table(test_client()); + auto cdc_state_table = MakeCDCStateTable(test_client()); Status s; auto table_range = VERIFY_RESULT( cdc_state_table.GetTableRange(CDCStateTableEntrySelector().IncludeCheckpoint(), &s)); @@ -3777,7 +3777,7 @@ Result CDCSDKYsqlTest::GetUniverseId(PostgresMiniCluster* cluster) { const xrepl::StreamId stream_id, const std::string& tablet_id) { // Read the cdc_state table safe should be set to valid value. CdcStateTableRow expected_row; - CDCStateTable cdc_state_table(test_client()); + auto cdc_state_table = MakeCDCStateTable(test_client()); Status s; auto table_range = VERIFY_RESULT(cdc_state_table.GetTableRange(CDCStateTableEntrySelector().IncludeAll(), &s)); @@ -3836,7 +3836,7 @@ Result CDCSDKYsqlTest::GetUniverseId(PostgresMiniCluster* cluster) { Result> CDCSDKYsqlTest::ReadSlotEntryFromStateTable(const xrepl::StreamId& stream_id) { std::optional slot_row = std::nullopt; - CDCStateTable cdc_state_table(test_client()); + auto cdc_state_table = MakeCDCStateTable(test_client()); Status s; auto table_range = VERIFY_RESULT(cdc_state_table.GetTableRange(CDCStateTableEntrySelector().IncludeAll(), &s)); @@ -3898,7 +3898,7 @@ Result CDCSDKYsqlTest::GetUniverseId(PostgresMiniCluster* cluster) { auto commit_record_txn_id = last_record.row_message().pg_transaction_id(); auto commit_record_commit_time = last_record.row_message().commit_time(); - CDCStateTable cdc_state_table(test_client()); + auto cdc_state_table = MakeCDCStateTable(test_client()); auto slot_entry = ASSERT_RESULT(cdc_state_table.TryFetchEntry( {kCDCSDKSlotEntryTabletId, stream_id}, CDCStateTableEntrySelector().IncludeData().IncludeCDCSDKSafeTime())); diff --git a/src/yb/integration-tests/xcluster/xcluster-tablet-split-itest.cc b/src/yb/integration-tests/xcluster/xcluster-tablet-split-itest.cc index 2fc0c4542205..ec9c14ad594f 100644 --- a/src/yb/integration-tests/xcluster/xcluster-tablet-split-itest.cc +++ b/src/yb/integration-tests/xcluster/xcluster-tablet-split-itest.cc @@ -330,7 +330,7 @@ TEST_F(CdcTabletSplitITest, GetChangesOnSplitParentTablet) { // They should have the checkpoint set to the split_op, but not have any replication times yet as // they have not been polled for yet. const auto child_tablet_ids = ListActiveTabletIdsForTable(cluster_.get(), table_->id()); - cdc::CDCStateTable cdc_state_table(client_.get()); + auto cdc_state_table = cdc::MakeCDCStateTable(client_.get()); Status s; int children_found = 0; OpId split_op_checkpoint; @@ -1097,13 +1097,12 @@ TEST_F(XClusterExternalTabletSplitITest, MasterFailoverDuringProducerPostSplitOp auto tablet_ids = ASSERT_RESULT(GetTestTableTabletIds(0)); tablet_ids.erase(parent_tablet); - client::YBClient* producer_client( - producer_cluster_ ? producer_client_.get() : client_.get()); + auto* producer_client = producer_cluster_ ? producer_client_.get() : client_.get(); ASSERT_OK(WaitFor( [&]() -> Result { std::unordered_set tablet_ids_map(tablet_ids.begin(), tablet_ids.end()); - cdc::CDCStateTable cdc_state_table(producer_client); + auto cdc_state_table = cdc::MakeCDCStateTable(producer_client); Status s; for (auto row_result : VERIFY_RESULT(cdc_state_table.GetTableRange({} /* just key columns */, &s))) { diff --git a/src/yb/integration-tests/xcluster/xcluster-test.cc b/src/yb/integration-tests/xcluster/xcluster-test.cc index 369ac3868e69..016a98cba984 100644 --- a/src/yb/integration-tests/xcluster/xcluster-test.cc +++ b/src/yb/integration-tests/xcluster/xcluster-test.cc @@ -435,7 +435,7 @@ class XClusterTestNoParam : public XClusterYcqlTestBase { // Verify that for each of the table's tablets, a new row in cdc_state table with the // returned id was inserted. - cdc::CDCStateTable cdc_state_table(producer_client()); + auto cdc_state_table = cdc::MakeCDCStateTable(producer_client()); Status s; auto table_range = VERIFY_RESULT( cdc_state_table.GetTableRange(cdc::CDCStateTableEntrySelector().IncludeCheckpoint(), &s)); diff --git a/src/yb/integration-tests/xcluster/xcluster_ysql-test.cc b/src/yb/integration-tests/xcluster/xcluster_ysql-test.cc index b3a6bb1d75dd..7c20c32888d4 100644 --- a/src/yb/integration-tests/xcluster/xcluster_ysql-test.cc +++ b/src/yb/integration-tests/xcluster/xcluster_ysql-test.cc @@ -1561,7 +1561,7 @@ TEST_F(XClusterYsqlTest, SetupUniverseReplicationWithProducerBootstrapId) { // Verify that for each of the table's tablets, a new row in cdc_state table with the returned // id was inserted. - cdc::CDCStateTable cdc_state_table(producer_client()); + auto cdc_state_table = cdc::MakeCDCStateTable(producer_client()); Status s; auto table_range = ASSERT_RESULT( cdc_state_table.GetTableRange(cdc::CDCStateTableEntrySelector().IncludeCheckpoint(), &s)); @@ -3074,7 +3074,7 @@ TEST_F(XClusterYsqlTest, DropTableOnProducerOnly) { } auto& tablet_id = tablet_ids.front(); - cdc::CDCStateTable cdc_state_table(producer_client()); + auto cdc_state_table = cdc::MakeCDCStateTable(producer_client()); auto key = cdc::CDCStateTableKey(tablet_id, stream_id); auto cdc_row = ASSERT_RESULT( cdc_state_table.TryFetchEntry(key, cdc::CDCStateTableEntrySelector().IncludeAll())); diff --git a/src/yb/integration-tests/xcluster/xcluster_ysql_index-test.cc b/src/yb/integration-tests/xcluster/xcluster_ysql_index-test.cc index d69b98da7caf..5e7b84003208 100644 --- a/src/yb/integration-tests/xcluster/xcluster_ysql_index-test.cc +++ b/src/yb/integration-tests/xcluster/xcluster_ysql_index-test.cc @@ -608,7 +608,7 @@ TEST_F(XClusterDbScopedYsqlIndexProducerOnlyTest, IndexCheckpointLocation) { ASSERT_OK(producer_cluster_.client_->GetTablets(index_table_name, (int32_t)1, &tablet_ids, NULL)); ASSERT_EQ(tablet_ids.size(), 1); - cdc::CDCStateTable cdc_state_table(producer_client()); + auto cdc_state_table = cdc::MakeCDCStateTable(producer_client()); LOG(INFO) << "Fetching CDC state for tablet " << tablet_ids.front() << " and stream " << stream_id; auto key = cdc::CDCStateTableKey(tablet_ids.front(), stream_id); diff --git a/src/yb/master/util/yql_vtable_helpers.cc b/src/yb/master/util/yql_vtable_helpers.cc index 45dc014f8c2d..16cbf7df5f99 100644 --- a/src/yb/master/util/yql_vtable_helpers.cc +++ b/src/yb/master/util/yql_vtable_helpers.cc @@ -13,7 +13,7 @@ #include "yb/master/util/yql_vtable_helpers.h" -#include +#include #include @@ -25,11 +25,10 @@ #include "yb/util/net/net_util.h" #include "yb/util/result.h" #include "yb/util/status_format.h" +#include "yb/util/std_util.h" #include "yb/util/yb_partition.h" -namespace yb { -namespace master { -namespace util { +namespace yb::master::util { // Ideally, we want clients to use YB's own load-balancing policy for Cassandra to route the // requests to the respective nodes hosting the partition keys. But for clients using vanilla @@ -101,11 +100,9 @@ PublicPrivateIPFutures GetPublicPrivateIPFutures( const auto& private_host = common.private_rpc_addresses()[0].host(); if (private_host.empty()) { - std::promise> promise; - result.private_ip_future = promise.get_future(); - promise.set_value(STATUS_FORMAT( + result.private_ip_future = ValueAsFuture(Result(STATUS_FORMAT( IllegalState, "Tablet server $0 doesn't have any rpc addresses registered", - ts_info.tserver_instance().permanent_uuid())); + ts_info.tserver_instance().permanent_uuid()))); return result; } @@ -199,6 +196,4 @@ QLValuePB GetValueHelper::Apply(const bool bool_val, const DataType data_t return value_pb; } -} // namespace util -} // namespace master -} // namespace yb +} // namespace yb::master::util diff --git a/src/yb/master/util/yql_vtable_helpers.h b/src/yb/master/util/yql_vtable_helpers.h index d5b84fa7bb1c..0272d1ac6aec 100644 --- a/src/yb/master/util/yql_vtable_helpers.h +++ b/src/yb/master/util/yql_vtable_helpers.h @@ -14,6 +14,7 @@ #pragma once #include +#include // This include is needed because we use std::shared_future>, and IpAddress // is an alias for boost::asio::ip::address. If we just include net_fwd.h, we get this compilation diff --git a/src/yb/master/ysql/ysql_manager.cc b/src/yb/master/ysql/ysql_manager.cc index c5e4e27bcaab..57cc9704b4cf 100644 --- a/src/yb/master/ysql/ysql_manager.cc +++ b/src/yb/master/ysql/ysql_manager.cc @@ -190,7 +190,7 @@ Status YsqlManager::CreateYbAdvisoryLocksTableIfNeeded(const LeaderEpoch& epoch) CreateTableRequestPB req; CreateTableResponsePB resp; - req.set_name(kPgAdvisoryLocksTableName); + req.set_name(std::string(tserver::kPgAdvisoryLocksTableName)); req.mutable_namespace_()->set_name(kSystemNamespaceName); req.set_table_type(TableType::YQL_TABLE_TYPE); req.set_num_tablets(FLAGS_num_advisory_locks_tablets); diff --git a/src/yb/tserver/pg_client_service.cc b/src/yb/tserver/pg_client_service.cc index b58bc232c8da..891b785e21e9 100644 --- a/src/yb/tserver/pg_client_service.cc +++ b/src/yb/tserver/pg_client_service.cc @@ -307,7 +307,6 @@ class SessionInfo { static auto Make(rw_spinlock* txn_assignment_mutex, CoarseDuration lifetime, const TransactionBuilder& builder, - const YsqlAdvisoryLocksTableProvider& advisory_locks_table_provider, Args&&... args) { struct ConstructorAccessor : public SessionInfo { explicit ConstructorAccessor(rw_spinlock* txn_assignment_mutex) @@ -320,7 +319,6 @@ class SessionInfo { [&builder, txn_assignment = &session_info->txn_assignment_](auto&&... builder_args) { return builder(txn_assignment, std::forward(builder_args)...); }, - advisory_locks_table_provider, accessor, std::forward(args)...); return std::shared_ptr(std::move(accessor), session_info); @@ -444,7 +442,7 @@ class PgClientServiceImpl::Impl { clock_(clock), transaction_pool_provider_(std::move(transaction_pool_provider)), messenger_(*messenger), - table_cache_(client_future), + table_cache_(client_future_), check_expired_sessions_(&messenger->scheduler()), check_object_id_allocators_(&messenger->scheduler()), xcluster_context_(xcluster_context), @@ -456,10 +454,11 @@ class PgClientServiceImpl::Impl { METRIC_pg_client_exchange_response_size.Instantiate(metric_entity)), transaction_builder_([this](auto&&... args) { return BuildTransaction(std::forward(args)...); - }) { + }), + advisory_locks_table_(client_future_), + cdc_state_table_(client_future_) { DCHECK(!permanent_uuid.empty()); ScheduleCheckExpiredSessions(CoarseMonoClock::now()); - cdc_state_table_ = std::make_shared(client_future); if (FLAGS_pg_client_use_shared_memory) { WARN_NOT_OK(SharedExchange::Cleanup(instance_id_), "Cleanup shared memory failed"); } @@ -501,10 +500,9 @@ class PgClientServiceImpl::Impl { auto session_info = SessionInfo::Make( &txns_assignment_mutexes_[session_id % txns_assignment_mutexes_.size()], FLAGS_pg_client_session_expiration_ms * 1ms, transaction_builder_, - GetYsqlAdvisoryLocksTableFunc(), - session_id, &client(), clock_, &table_cache_, - xcluster_context_, pg_node_level_mutation_counter_, &response_cache_, &sequence_cache_, - shared_mem_pool_, stats_exchange_response_size_, messenger_.scheduler()); + session_id, &client(), clock_, &table_cache_, xcluster_context_, + pg_node_level_mutation_counter_, &response_cache_, &sequence_cache_, shared_mem_pool_, + stats_exchange_response_size_, messenger_.scheduler(), advisory_locks_table_); resp->set_session_id(session_id); if (FLAGS_pg_client_use_shared_memory) { resp->set_instance_id(instance_id_); @@ -2100,24 +2098,6 @@ class PgClientServiceImpl::Impl { std::chrono::seconds(FLAGS_check_pg_object_id_allocators_interval_secs)); } - YsqlAdvisoryLocksTableProvider GetYsqlAdvisoryLocksTableFunc() { - return [this]() -> YsqlAdvisoryLocksTable& { return *GetYsqlAdvisoryLocksTable(); }; - } - - YsqlAdvisoryLocksTable* GetYsqlAdvisoryLocksTable() EXCLUDES(advisory_locks_table_mutex_) { - { - SharedLock lock(advisory_locks_table_mutex_); - if (advisory_locks_table_) { - return advisory_locks_table_.get(); - } - } - UniqueLock lock(advisory_locks_table_mutex_); - if (!advisory_locks_table_) { - advisory_locks_table_ = std::make_unique(client()); - } - return advisory_locks_table_.get(); - } - const TabletServerIf& tablet_server_; std::shared_future client_future_; scoped_refptr clock_; @@ -2152,8 +2132,6 @@ class PgClientServiceImpl::Impl { CoarseTimePoint check_expired_sessions_time_ GUARDED_BY(mutex_); rpc::ScheduledTaskTracker check_object_id_allocators_; - std::shared_ptr cdc_state_table_; - const TserverXClusterContextIf* xcluster_context_; PgMutationCounter* pg_node_level_mutation_counter_; @@ -2182,9 +2160,9 @@ class PgClientServiceImpl::Impl { std::vector stopping_sessions_ GUARDED_BY(mutex_); - std::shared_mutex advisory_locks_table_mutex_; - std::unique_ptr advisory_locks_table_ - GUARDED_BY(advisory_locks_table_mutex_); + YsqlAdvisoryLocksTable advisory_locks_table_; + + std::optional cdc_state_table_; }; PgClientServiceImpl::PgClientServiceImpl( diff --git a/src/yb/tserver/pg_client_session.cc b/src/yb/tserver/pg_client_session.cc index 4025d37cd726..1a15000d588e 100644 --- a/src/yb/tserver/pg_client_session.cc +++ b/src/yb/tserver/pg_client_session.cc @@ -52,8 +52,8 @@ #include "yb/tserver/pg_sequence_cache.h" #include "yb/tserver/pg_shared_mem_pool.h" #include "yb/tserver/pg_table_cache.h" -#include "yb/tserver/tserver_xcluster_context_if.h" #include "yb/tserver/tserver_fwd.h" +#include "yb/tserver/tserver_xcluster_context_if.h" #include "yb/tserver/ysql_advisory_lock_table.h" #include "yb/util/backoff_waiter.h" @@ -899,19 +899,18 @@ void PgClientSession::ReadPointHistory::Clear() { PgClientSession::PgClientSession( TransactionBuilder&& transaction_builder, - const YsqlAdvisoryLocksTableProvider& advisory_locks_table_provider, SharedThisSource shared_this_source, uint64_t id, client::YBClient* client, const scoped_refptr& clock, PgTableCache* table_cache, const TserverXClusterContextIf* xcluster_context, PgMutationCounter* pg_node_level_mutation_counter, PgResponseCache* response_cache, PgSequenceCache* sequence_cache, PgSharedMemoryPool& shared_mem_pool, - const EventStatsPtr& stats_exchange_response_size, rpc::Scheduler& scheduler) + const EventStatsPtr& stats_exchange_response_size, rpc::Scheduler& scheduler, + YsqlAdvisoryLocksTable& advisory_locks_table) : shared_this_(std::shared_ptr(std::move(shared_this_source), this)), id_(id), client_(*client), clock_(clock), transaction_builder_(std::move(transaction_builder)), - advisory_locks_table_provider_(advisory_locks_table_provider), table_cache_(*table_cache), xcluster_context_(xcluster_context), pg_node_level_mutation_counter_(pg_node_level_mutation_counter), @@ -920,7 +919,8 @@ PgClientSession::PgClientSession( shared_mem_pool_(shared_mem_pool), big_shared_mem_expiration_task_(&scheduler), stats_exchange_response_size_(stats_exchange_response_size), - read_point_history_(PrefixLogger(id_)) {} + read_point_history_(PrefixLogger(id_)), + advisory_locks_table_(advisory_locks_table) {} Status PgClientSession::CreateTable( const PgCreateTableRequestPB& req, PgCreateTableResponsePB* resp, rpc::RpcContext* context) { @@ -2474,12 +2474,11 @@ Status PgClientSession::AcquireAdvisoryLock( VLOG(2) << "Servicing AcquireAdvisoryLock: " << req.ShortDebugString(); SCHECK(FLAGS_ysql_yb_enable_advisory_locks, NotSupported, "advisory locks are disabled"); SCHECK(!req.session(), NotSupported, "session-level advisory locks are not yet implemented"); - auto& advisory_locks_table = advisory_locks_table_provider_(); auto setup_session_result = VERIFY_RESULT(SetupSession( req.options(), context->GetClientDeadline(), HybridTime())); auto* session = setup_session_result.session_data.session.get(); for (const auto& lock : req.locks()) { - auto lock_op = VERIFY_RESULT(advisory_locks_table.CreateLockOp( + auto lock_op = VERIFY_RESULT(advisory_locks_table_.CreateLockOp( req.db_oid(), lock.lock_id().classid(), lock.lock_id().objid(), lock.lock_id().objsubid(), lock.lock_mode() == AdvisoryLockMode::LOCK_SHARE ? PgsqlLockRequestPB::PG_LOCK_SHARE : PgsqlLockRequestPB::PG_LOCK_EXCLUSIVE, diff --git a/src/yb/tserver/pg_client_session.h b/src/yb/tserver/pg_client_session.h index 7e4844ba69d6..e5cb56bfdbfc 100644 --- a/src/yb/tserver/pg_client_session.h +++ b/src/yb/tserver/pg_client_session.h @@ -57,15 +57,11 @@ DECLARE_bool(ysql_enable_db_catalog_version_mode); -namespace yb { - -class YsqlAdvisoryLocksTable; -using YsqlAdvisoryLocksTableProvider = std::function; - -namespace tserver { +namespace yb::tserver { class PgMutationCounter; class TserverXClusterContextIf; +class YsqlAdvisoryLocksTable; #define PG_CLIENT_SESSION_METHODS \ (AlterDatabase) \ @@ -157,13 +153,13 @@ class PgClientSession { PgClientSession( TransactionBuilder&& transaction_builder, - const YsqlAdvisoryLocksTableProvider& advisory_locks_table, SharedThisSource shared_this_source, uint64_t id, client::YBClient* client, const scoped_refptr& clock, PgTableCache* table_cache, const TserverXClusterContextIf* xcluster_context, PgMutationCounter* pg_node_level_mutation_counter, PgResponseCache* response_cache, PgSequenceCache* sequence_cache, PgSharedMemoryPool& shared_mem_pool, - const EventStatsPtr& stats_exchange_response_size, rpc::Scheduler& scheduler); + const EventStatsPtr& stats_exchange_response_size, rpc::Scheduler& scheduler, + YsqlAdvisoryLocksTable& advisory_locks_table); virtual ~PgClientSession() = default; @@ -332,7 +328,6 @@ class PgClientSession { client::YBClient& client_; scoped_refptr clock_; const TransactionBuilder transaction_builder_; - YsqlAdvisoryLocksTableProvider advisory_locks_table_provider_; PgTableCache& table_cache_; const TserverXClusterContextIf* xcluster_context_; PgMutationCounter* pg_node_level_mutation_counter_; @@ -356,6 +351,7 @@ class PgClientSession { simple_spinlock pending_data_mutex_; std::vector pending_data_ GUARDED_BY(pending_data_mutex_); + YsqlAdvisoryLocksTable& advisory_locks_table_; }; template @@ -386,5 +382,4 @@ inline void TryUpdateAshWaitState(const PgGetDatabaseInfoRequestPB&) {} inline void TryUpdateAshWaitState(const PgIsInitDbDoneRequestPB&) {} inline void TryUpdateAshWaitState(const PgCreateSequencesDataTableRequestPB&) {} -} // namespace tserver -} // namespace yb +} // namespace yb::tserver diff --git a/src/yb/tserver/pg_table_cache.cc b/src/yb/tserver/pg_table_cache.cc index 3477a18a2741..591ae658dcb0 100644 --- a/src/yb/tserver/pg_table_cache.cc +++ b/src/yb/tserver/pg_table_cache.cc @@ -72,7 +72,7 @@ using CacheEntryPtr = std::shared_ptr; class PgTableCache::Impl { public: explicit Impl(std::shared_future client_future) - : client_future_(client_future) {} + : client_future_(std::move(client_future)) {} Status GetInfo( const TableId& table_id, @@ -197,8 +197,7 @@ PgTableCache::PgTableCache(std::shared_future client_future) : impl_(new Impl(std::move(client_future))) { } -PgTableCache::~PgTableCache() { -} +PgTableCache::~PgTableCache() = default; Status PgTableCache::GetInfo( const TableId& table_id, diff --git a/src/yb/tserver/ysql_advisory_lock_table.cc b/src/yb/tserver/ysql_advisory_lock_table.cc index 642dcca8723c..507f82c96d42 100644 --- a/src/yb/tserver/ysql_advisory_lock_table.cc +++ b/src/yb/tserver/ysql_advisory_lock_table.cc @@ -13,20 +13,23 @@ // // -#include -#include "yb/client/yb_table_name.h" #include "yb/client/client.h" + #include "yb/client/yb_op.h" +#include "yb/client/yb_table_name.h" + #include "yb/master/master_defaults.h" -DECLARE_bool(ysql_yb_enable_advisory_locks); +#include "yb/tserver/ysql_advisory_lock_table.h" -namespace yb { +DECLARE_bool(ysql_yb_enable_advisory_locks); +namespace yb::tserver { namespace { -void SetLockId(PgsqlAdvisoryLockPB& lock, uint32_t db_oid, uint32_t class_oid, - uint32_t objid, uint32_t objsubid) { +void SetLockId( + PgsqlAdvisoryLockPB& lock, uint32_t db_oid, uint32_t class_oid, + uint32_t objid, uint32_t objsubid) { lock.add_lock_partition_column_values()->mutable_value()->set_uint32_value(db_oid); lock.add_lock_range_column_values()->mutable_value()->set_uint32_value(class_oid); lock.add_lock_range_column_values()->mutable_value()->set_uint32_value(objid); @@ -35,19 +38,15 @@ void SetLockId(PgsqlAdvisoryLockPB& lock, uint32_t db_oid, uint32_t class_oid, } // namespace -YsqlAdvisoryLocksTable::YsqlAdvisoryLocksTable(client::YBClient& client) - : client_(client) {} - -YsqlAdvisoryLocksTable::~YsqlAdvisoryLocksTable() { -} +YsqlAdvisoryLocksTable::YsqlAdvisoryLocksTable(std::shared_future client_future) + : client_future_(std::move(client_future)) {} Result YsqlAdvisoryLocksTable::GetTable() { SCHECK(FLAGS_ysql_yb_enable_advisory_locks, NotSupported, "Advisory locks are not enabled"); std::lock_guard l(mutex_); if (!table_) { - static const client::YBTableName table_name( - YQL_DATABASE_CQL, master::kSystemNamespaceName, kPgAdvisoryLocksTableName); - table_ = VERIFY_RESULT(client_.OpenTable(table_name)); + table_ = VERIFY_RESULT(client_future_.get()->OpenTable(client::YBTableName{ + YQL_DATABASE_CQL, master::kSystemNamespaceName, std::string(kPgAdvisoryLocksTableName)})); } return table_; } @@ -56,9 +55,10 @@ Result YsqlAdvisoryLocksTable::CreateLockOp( uint32_t db_oid, uint32_t class_oid, uint32_t objid, uint32_t objsubid, PgsqlLockRequestPB::PgsqlAdvisoryLockMode mode, bool wait, rpc::Sidecars* sidecars) { auto lock = client::YBPgsqlLockOp::NewLock(VERIFY_RESULT(GetTable()), sidecars); - SetLockId(*lock->mutable_request()->mutable_lock_id(), db_oid, class_oid, objid, objsubid); - lock->mutable_request()->set_lock_mode(mode); - lock->mutable_request()->set_wait(wait); + auto& lock_req = *lock->mutable_request(); + SetLockId(*lock_req.mutable_lock_id(), db_oid, class_oid, objid, objsubid); + lock_req.set_lock_mode(mode); + lock_req.set_wait(wait); return lock; } @@ -66,8 +66,9 @@ Result YsqlAdvisoryLocksTable::CreateUnlockOp( uint32_t db_oid, uint32_t class_oid, uint32_t objid, uint32_t objsubid, PgsqlLockRequestPB::PgsqlAdvisoryLockMode mode, rpc::Sidecars* sidecars) { auto unlock = client::YBPgsqlLockOp::NewUnlock(VERIFY_RESULT(GetTable()), sidecars); - SetLockId(*unlock->mutable_request()->mutable_lock_id(), db_oid, class_oid, objid, objsubid); - unlock->mutable_request()->set_lock_mode(mode); + auto& unlock_req = *unlock->mutable_request(); + SetLockId(*unlock_req.mutable_lock_id(), db_oid, class_oid, objid, objsubid); + unlock_req.set_lock_mode(mode); return unlock; } @@ -79,4 +80,4 @@ Result YsqlAdvisoryLocksTable::CreateUnlockAllOp( return unlock; } -} // namespace yb +} // namespace yb::tserver diff --git a/src/yb/tserver/ysql_advisory_lock_table.h b/src/yb/tserver/ysql_advisory_lock_table.h index a6fc459ff116..f8705d15650b 100644 --- a/src/yb/tserver/ysql_advisory_lock_table.h +++ b/src/yb/tserver/ysql_advisory_lock_table.h @@ -15,19 +15,32 @@ #pragma once +#include +#include +#include + #include "yb/client/client_fwd.h" -#include "yb/rpc/rpc_fwd.h" + #include "yb/common/pgsql_protocol.pb.h" +#include "yb/gutil/thread_annotations.h" + +#include "yb/rpc/rpc_fwd.h" + +#include "yb/util/result.h" + namespace yb { -constexpr char kPgAdvisoryLocksTableName[] = "pg_advisory_locks"; +class AdvisoryLockTest; + +namespace tserver { + +constexpr std::string_view kPgAdvisoryLocksTableName = "pg_advisory_locks"; // Helper class for the advisory locks table. class YsqlAdvisoryLocksTable { public: - explicit YsqlAdvisoryLocksTable(client::YBClient& client); - ~YsqlAdvisoryLocksTable(); + explicit YsqlAdvisoryLocksTable(std::shared_future client_future); Result CreateLockOp( uint32_t db_oid, uint32_t class_oid, uint32_t objid, uint32_t objsubid, @@ -42,15 +55,14 @@ class YsqlAdvisoryLocksTable { uint32_t db_oid, rpc::Sidecars* sidecars) EXCLUDES(mutex_); private: - friend class AdvisoryLockTest; + friend class yb::AdvisoryLockTest; Result GetTable() EXCLUDES(mutex_); std::mutex mutex_; - client::YBTablePtr table_ GUARDED_BY(mutex_);; - client::YBClient& client_; + client::YBTablePtr table_ GUARDED_BY(mutex_); + std::shared_future client_future_; }; -using YsqlAdvisoryLocksTableProvider = std::function; - +} // namespace tserver } // namespace yb diff --git a/src/yb/util/std_util.h b/src/yb/util/std_util.h index 59259384fe41..745f10901ceb 100644 --- a/src/yb/util/std_util.h +++ b/src/yb/util/std_util.h @@ -13,6 +13,10 @@ #pragma once +#include +#include +#include + // Implementation of std functions we want to use, but cannot until we switch to newer C++. namespace yb { @@ -116,4 +120,12 @@ auto binary_search_iterator( return it == end || !cmp(value, transform(*it)) ? it : end; } +template +auto ValueAsFuture(T&& value) { + using Tp = std::remove_cvref_t; + std::promise promise; + promise.set_value(std::forward(value)); + return promise.get_future(); +} + } // namespace yb diff --git a/src/yb/util/test_util.h b/src/yb/util/test_util.h index 1db5fea7f777..4c51ccd94eb4 100644 --- a/src/yb/util/test_util.h +++ b/src/yb/util/test_util.h @@ -35,7 +35,10 @@ #include #include +#include #include +#include +#include #include