diff --git a/src/yb/client/client-internal.cc b/src/yb/client/client-internal.cc index 294fa2c3d9c7..49c908834065 100644 --- a/src/yb/client/client-internal.cc +++ b/src/yb/client/client-internal.cc @@ -690,7 +690,12 @@ Status YBClient::Data::DeleteTable(YBClient* client, indexed_table_name->GetFromTableIdentifierPB(resp.indexed_table()); } - LOG(INFO) << "Deleted table " << (!table_id.empty() ? table_id : table_name.ToString()); + if (req.ysql_ddl_rollback_enabled()) { + LOG(INFO) << "Marked table " << (!table_id.empty() ? table_id : table_name.ToString()) + << " for deletion at the end of transaction " << txn->ToString(); + } else { + LOG(INFO) << "Deleted table " << (!table_id.empty() ? table_id : table_name.ToString()); + } return Status::OK(); } @@ -2884,6 +2889,45 @@ Status YBClient::Data::ReportYsqlDdlTxnStatus( return Status::OK(); } +Status YBClient::Data::IsYsqlDdlVerificationInProgress( + const TransactionMetadata& txn, + CoarseTimePoint deadline, + bool *ddl_verification_in_progress) { + master::IsYsqlDdlVerificationDoneRequestPB req; + master::IsYsqlDdlVerificationDoneResponsePB resp; + + txn.ToPB(req.mutable_transaction()); + auto s = SyncLeaderMasterRpc( + deadline, req, &resp, "IsYsqlDdlVerificationDone", + &master::MasterDdlProxy::IsYsqlDdlVerificationDoneAsync); + + // Check DDL verification is paused. This can happen during error injection tests. If so, no + // point continuing to retry. + if (resp.has_error()) { + if (resp.error().has_status() && + resp.error().status().has_message() && + resp.error().status().message().find("DDL Rollback is paused") != std::string::npos) { + *ddl_verification_in_progress = false; + } + return StatusFromPB(resp.error().status()); + } + + RETURN_NOT_OK(s); + + *ddl_verification_in_progress = !resp.done(); + return Status::OK(); +} + +Status YBClient::Data::WaitForDdlVerificationToFinish( + const TransactionMetadata& txn, + CoarseTimePoint deadline) { + return RetryFunc( + deadline, + Format("Waiting on YSQL DDL Verification for $0 to be completed", txn.transaction_id), + Format("Timed out on YSQL DDL Verification for $0 to be completed", txn.transaction_id), + std::bind(&YBClient::Data::IsYsqlDdlVerificationInProgress, this, txn, _1, _2)); +} + Result YBClient::Data::CheckIfPitrActive(CoarseTimePoint deadline) { CheckIfPitrActiveRequestPB req; CheckIfPitrActiveResponsePB resp; diff --git a/src/yb/client/client-internal.h b/src/yb/client/client-internal.h index fdfb3d43d3f2..249b3840c269 100644 --- a/src/yb/client/client-internal.h +++ b/src/yb/client/client-internal.h @@ -442,6 +442,13 @@ class YBClient::Data { Status ReportYsqlDdlTxnStatus( const TransactionMetadata& txn, bool is_committed, const CoarseTimePoint& deadline); + Status IsYsqlDdlVerificationInProgress( + const TransactionMetadata& txn, + CoarseTimePoint deadline, + bool *ddl_verification_in_progress); + + Status WaitForDdlVerificationToFinish(const TransactionMetadata& txn, CoarseTimePoint deadline); + Result CheckIfPitrActive(CoarseTimePoint deadline); Status GetXClusterStreams( diff --git a/src/yb/client/client-test-util.cc b/src/yb/client/client-test-util.cc index 87c294b68dac..4fc46af76573 100644 --- a/src/yb/client/client-test-util.cc +++ b/src/yb/client/client-test-util.cc @@ -92,7 +92,8 @@ namespace { const bool exists) { ASSERT_OK(LoggedWaitFor([&]() -> Result { auto ret = client->TableExists( - client::YBTableName(YQL_DATABASE_PGSQL, database_name, table_name)); + client::YBTableName(YQL_DATABASE_PGSQL, database_name, table_name), + true /* skip_hidden */); WARN_NOT_OK(ResultToStatus(ret), "TableExists call failed"); return ret.ok() && ret.get() == exists; }, MonoDelta::FromSeconds(timeout_secs), diff --git a/src/yb/client/client.cc b/src/yb/client/client.cc index 10d4bef05e6b..ae3d3301ebc6 100644 --- a/src/yb/client/client.cc +++ b/src/yb/client/client.cc @@ -278,6 +278,12 @@ DEFINE_NON_RUNTIME_uint32(wait_for_ysql_backends_catalog_version_client_master_r 20000, "WaitForYsqlBackendsCatalogVersion client-to-master RPC timeout. Specifically, both the " "postgres-to-tserver and tserver-to-master RPC timeout."); + +DEFINE_RUNTIME_uint32(ddl_verification_timeout_multiplier, 5, + "Multiplier for the timeout used for DDL verification. DDL verification may involve waiting for" + " DDL operations to finish at the yb-master. This is a multiplier for" + " default_admin_operation_timeout which is the timeout used for a single DDL operation "); + TAG_FLAG(wait_for_ysql_backends_catalog_version_client_master_rpc_timeout_ms, advanced); namespace yb { @@ -2490,6 +2496,13 @@ Status YBClient::ReportYsqlDdlTxnStatus(const TransactionMetadata& txn, bool is_ return data_->ReportYsqlDdlTxnStatus(txn, is_committed, deadline); } +Status YBClient::WaitForDdlVerificationToFinish(const TransactionMetadata& txn) { + auto deadline = CoarseMonoClock::Now() + + MonoDelta::FromSeconds(FLAGS_ddl_verification_timeout_multiplier * + default_admin_operation_timeout().ToSeconds()); + return data_->WaitForDdlVerificationToFinish(txn, deadline); +} + Result YBClient::CheckIfPitrActive() { auto deadline = CoarseMonoClock::Now() + default_rpc_timeout(); return data_->CheckIfPitrActive(deadline); @@ -2497,7 +2510,8 @@ Result YBClient::CheckIfPitrActive() { Result> YBClient::ListTables(const std::string& filter, bool exclude_ysql, - const std::string& ysql_db_filter) { + const std::string& ysql_db_filter, + bool skip_hidden) { ListTablesRequestPB req; ListTablesResponsePB resp; @@ -2522,6 +2536,9 @@ Result> YBClient::ListTables(const std::string& filter, if (exclude_ysql && table_info.table_type() == TableType::PGSQL_TABLE_TYPE) { continue; } + if (skip_hidden && table_info.hidden()) { + continue; + } result.emplace_back(master::GetDatabaseTypeForTable(table_info.table_type()), table_info.namespace_().id(), table_info.namespace_().name(), @@ -2648,8 +2665,10 @@ Result> YBClient::GetTableSchemaFromSysCatalog( return make_pair(current_schema, resp.version()); } -Result YBClient::TableExists(const YBTableName& table_name) { - for (const YBTableName& table : VERIFY_RESULT(ListTables(table_name.table_name()))) { +Result YBClient::TableExists(const YBTableName& table_name, bool skip_hidden) { + auto tables = VERIFY_RESULT(ListTables( + table_name.table_name(), /*exclude_ysql=*/false, /*ysql_db_filter=*/"", skip_hidden)); + for (const YBTableName& table : tables) { if (table == table_name) { return true; } diff --git a/src/yb/client/client.h b/src/yb/client/client.h index ec596f80a489..7ca1a63a041a 100644 --- a/src/yb/client/client.h +++ b/src/yb/client/client.h @@ -737,7 +737,8 @@ class YBClient { Result> ListTables( const std::string& filter = "", bool exclude_ysql = false, - const std::string& ysql_db_filter = ""); + const std::string& ysql_db_filter = "", + bool skip_hidden = false); // List tables in a namespace. // @@ -817,9 +818,9 @@ class YBClient { CoarseTimePoint deadline, std::vector* master_uuids); - // Check if the table given by 'table_name' exists. - // Result value is set only on success. - Result TableExists(const YBTableName& table_name); + // Check if the table given by 'table_name' exists. 'skip_hidden' indicates whether to consider + // hidden tables. Result value is set only on success. + Result TableExists(const YBTableName& table_name, bool skip_hidden = false); Result IsLoadBalanced(uint32_t num_servers); Result IsLoadBalancerIdle(); @@ -935,6 +936,8 @@ class YBClient { // Provide the completion status of 'txn' to the YB-Master. Status ReportYsqlDdlTxnStatus(const TransactionMetadata& txn, bool is_committed); + Status WaitForDdlVerificationToFinish(const TransactionMetadata& txn); + Result CheckIfPitrActive(); void LookupTabletByKey(const std::shared_ptr& table, diff --git a/src/yb/integration-tests/cdcsdk_test_base.cc b/src/yb/integration-tests/cdcsdk_test_base.cc index f2b4c08e8b81..53529c926a17 100644 --- a/src/yb/integration-tests/cdcsdk_test_base.cc +++ b/src/yb/integration-tests/cdcsdk_test_base.cc @@ -55,6 +55,7 @@ using std::string; DECLARE_bool(ysql_enable_pack_full_row_update); +DECLARE_bool(ysql_ddl_transaction_wait_for_ddl_verification); namespace yb { using client::YBClient; @@ -142,7 +143,7 @@ Status CDCSDKTestBase::SetUpWithParams( ANNOTATE_UNPROTECTED_WRITE(FLAGS_max_replication_slots) = 500; ANNOTATE_UNPROTECTED_WRITE(FLAGS_allowed_preview_flags_csv) = "ysql_ddl_rollback_enabled"; ANNOTATE_UNPROTECTED_WRITE(FLAGS_ysql_ddl_rollback_enabled) = true; - + ANNOTATE_UNPROTECTED_WRITE(FLAGS_ysql_ddl_transaction_wait_for_ddl_verification) = true; MiniClusterOptions opts; opts.num_masters = num_masters; diff --git a/src/yb/master/CMakeLists.txt b/src/yb/master/CMakeLists.txt index 25bd2e50e237..13d472593c73 100644 --- a/src/yb/master/CMakeLists.txt +++ b/src/yb/master/CMakeLists.txt @@ -170,7 +170,7 @@ set(MASTER_SRCS ysql_tablegroup_manager.cc ysql_tablespace_manager.cc ysql_ddl_handler.cc - ysql_transaction_ddl.cc) + ysql_ddl_verification_task.cc) set(MASTER_DEPS yb_common diff --git a/src/yb/master/catalog_entity_info.cc b/src/yb/master/catalog_entity_info.cc index 24f3766df185..b3c421090d6a 100644 --- a/src/yb/master/catalog_entity_info.cc +++ b/src/yb/master/catalog_entity_info.cc @@ -516,9 +516,9 @@ TableType TableInfo::GetTableType() const { return LockForRead()->pb.table_type(); } -bool TableInfo::IsBeingDroppedDueToDdlTxn(const std::string& txn_id_pb, bool txn_success) const { +bool TableInfo::IsBeingDroppedDueToDdlTxn(const std::string& pb_txn_id, bool txn_success) const { auto l = LockForRead(); - if (l->pb_transaction_id() != txn_id_pb) { + if (l->pb_transaction_id() != pb_txn_id) { return false; } // The table can be dropped in 2 cases due to a DDL: @@ -989,6 +989,36 @@ bool TableInfo::AttachedYCQLIndexDeletionInProgress(const TableId& index_table_i IndexPermissions::INDEX_PERM_WRITE_AND_DELETE_WHILE_REMOVING; } +void TableInfo::AddDdlTxnWaitingForSchemaVersion(int schema_version, const TransactionId& txn) { + std::lock_guard l(lock_); + auto res = ddl_txns_waiting_for_schema_version_.emplace(schema_version, txn); + // There should never have existed an entry for this schema version already as only one + // DDL transaction is allowed on an entity at a given time. + LOG_IF(DFATAL, !res.second) << "Found existing entry for schema version " << schema_version + << " for table " << table_id_ << " with txn " << txn + << " previous transaction " << res.first->second; +} + +std::vector TableInfo::EraseDdlTxnsWaitingForSchemaVersion(int schema_version) { + std::lock_guard l(lock_); + std::vector txns; + auto upper_bound_iter = ddl_txns_waiting_for_schema_version_.upper_bound(schema_version); + // Ideally we will perform this erase operation at the end of every alter table operation, and + // thus we should be able to return only one schema version. However, alter table is an async + // operation. It is possible in the case of YSQL DDL Transaction verification that while an + // alter operation for a rollback operation is about to start (i.e. table in ALTERING state), a + // new alter operation can start on the same table (this is by design and poses no correctness + // issues). It may be possible that the TServers respond back with the latest schema version. + // To handle this case, we return the transactions waiting on all schema versions less than the + // requested schema version as well. + for (auto it = ddl_txns_waiting_for_schema_version_.begin(); it != upper_bound_iter; ++it) { + txns.push_back(it->second); + } + ddl_txns_waiting_for_schema_version_.erase( + ddl_txns_waiting_for_schema_version_.begin(), upper_bound_iter); + return txns; +} + void PersistentTableInfo::set_state(SysTablesEntryPB::State state, const string& msg) { VLOG_WITH_FUNC(2) << "Setting state for " << name() << " to " << SysTablesEntryPB::State_Name(state) << " reason: " << msg; @@ -1009,8 +1039,19 @@ const std::string& PersistentTableInfo::indexed_table_id() const { Result PersistentTableInfo::is_being_modified_by_ddl_transaction( const TransactionId& txn) const { - return has_ysql_ddl_txn_verifier_state() && - txn == VERIFY_RESULT(FullyDecodeTransactionId(pb_transaction_id())); + + return txn == VERIFY_RESULT(GetCurrentDdlTransactionId()); +} + +Result PersistentTableInfo::GetCurrentDdlTransactionId() const { + TransactionId txn = TransactionId::Nil(); + if (has_ysql_ddl_txn_verifier_state()) { + auto& pb_txn_id = pb_transaction_id(); + RSTATUS_DCHECK(!pb_txn_id.empty(), InternalError, + "Table $0 has ysql_ddl_txn_verifier_state but no transaction", name()); + txn = VERIFY_RESULT(FullyDecodeTransactionId(pb_txn_id)); + } + return txn; } bool IsReplicationInfoSet(const ReplicationInfoPB& replication_info) { diff --git a/src/yb/master/catalog_entity_info.h b/src/yb/master/catalog_entity_info.h index 897c6d841417..b9dced9746f2 100644 --- a/src/yb/master/catalog_entity_info.h +++ b/src/yb/master/catalog_entity_info.h @@ -475,6 +475,10 @@ struct PersistentTableInfo : public Persistent is_being_modified_by_ddl_transaction(const TransactionId& txn) const; + // Returns the transaction-id of the DDL transaction operating on it, Nil if no such DDL is + // happening. + Result GetCurrentDdlTransactionId() const; + const std::string& state_name() const { return SysTablesEntryPB_State_Name(pb.state()); } @@ -737,6 +741,12 @@ class TableInfo : public RefCountedThreadSafe, bool AttachedYCQLIndexDeletionInProgress(const TableId& index_table_id) const; + void AddDdlTxnWaitingForSchemaVersion(int schema_version, + const TransactionId& txn) EXCLUDES(lock_); + + std::vector EraseDdlTxnsWaitingForSchemaVersion( + int schema_version) EXCLUDES(lock_); + private: friend class RefCountedThreadSafe; ~TableInfo(); @@ -788,6 +798,12 @@ class TableInfo : public RefCountedThreadSafe, // table from completing. Not needed once D23712 lands. std::atomic_bool bootstrapping_xcluster_replication_ = false; + // Store a map of schema version->TransactionId of the DDL transaction that initiated the + // change. When a schema version n has propagated to all tablets, we use this map to signal all + // the DDL transactions waiting for schema version n and any k < n. The map is ordered by schema + // version to easily figure out all the entries for schema version < n. + std::map ddl_txns_waiting_for_schema_version_ GUARDED_BY(lock_); + DISALLOW_COPY_AND_ASSIGN(TableInfo); }; diff --git a/src/yb/master/catalog_loaders.cc b/src/yb/master/catalog_loaders.cc index 289e9bed9d9d..0b1b5746b250 100644 --- a/src/yb/master/catalog_loaders.cc +++ b/src/yb/master/catalog_loaders.cc @@ -38,8 +38,8 @@ #include "yb/master/async_rpc_tasks.h" #include "yb/master/backfill_index.h" #include "yb/master/master_util.h" +#include "yb/master/ysql_ddl_verification_task.h" #include "yb/master/ysql_tablegroup_manager.h" -#include "yb/master/ysql_transaction_ddl.h" #include "yb/util/flags.h" #include "yb/util/status_format.h" @@ -143,27 +143,38 @@ Status TableLoader::Visit(const TableId& table_id, const SysTablesEntryPB& metad table_lock.Commit(); catalog_manager_->HandleNewTableId(table->id()); - // Tables created as part of a Transaction should check transaction status and be deleted - // if the transaction is aborted. - if (metadata.has_transaction()) { + // Tables created/altered as part of a transaction should check transaction status to figure out + // their final state. Tables that have already been deleted can be safely skipped. + if (!table->is_deleted() && metadata.has_transaction()) { TransactionMetadata txn = VERIFY_RESULT(TransactionMetadata::FromPB(metadata.transaction())); if (metadata.ysql_ddl_txn_verifier_state_size() > 0) { - state_->AddPostLoadTask( - std::bind(&CatalogManager::ScheduleYsqlTxnVerification, - catalog_manager_, table, txn, state_->epoch), - "Verify DDL transaction for table " + table->ToString()); + // This table is undergoing DDL changes. Update the transaction->tables mapping with this + // table being loaded. Post-load, we can verify whether the transaction is a success or + // failure and perform any required cleanup. However for now, add this to in-memory mapping + // right away so that so that catalog manager is aware of this transaction and can correctly + // handle any IsYsqlDdlTransactionInProgress requests coming from YSQL. + const bool new_transaction = + catalog_manager_->CreateOrUpdateDdlTxnVerificationState(table, txn); + // A single DDL transaction may affect multiple tables. We need to create only one + // verification task for a DDL transaction, so schedule a post-load task only if this is + // a new transaction. + if (new_transaction) { + state_->AddPostLoadTask( + std::bind(&CatalogManager::ScheduleVerifyTransaction, + catalog_manager_, table, txn, state_->epoch), + "Verify DDL transaction for table " + table->ToString()); + } } else { // This is a table/index for which YSQL transaction verification is not supported yet. // For these, we only support rolling back creating the table. If the transaction has // completed, merely check for the presence of this entity in the PG catalog. LOG(INFO) << "Enqueuing table for Transaction Verification: " << table->ToString(); - std::function when_done = - std::bind(&CatalogManager::VerifyTablePgLayer, catalog_manager_, table, _1, state_->epoch); state_->AddPostLoadTask( - std::bind(&YsqlTransactionDdl::VerifyTransaction, - catalog_manager_->ysql_transaction_.get(), - txn, table, false /* has_ysql_ddl_txn_state */, when_done), + std::bind(&CatalogManager::ScheduleVerifyTablePgLayer, + catalog_manager_, + txn, table, state_->epoch), "VerifyTransaction"); + } } @@ -462,16 +473,10 @@ Status NamespaceLoader::Visit(const NamespaceId& ns_id, const SysNamespaceEntryP LOG(INFO) << "Enqueuing keyspace for Transaction Verification: " << ns->ToString(); TransactionMetadata txn = VERIFY_RESULT(TransactionMetadata::FromPB(metadata.transaction())); - std::function when_done = std::bind( - &CatalogManager::VerifyNamespacePgLayer, catalog_manager_, ns, _1, state_->epoch); state_->AddPostLoadTask( std::bind( - &YsqlTransactionDdl::VerifyTransaction, - catalog_manager_->ysql_transaction_.get(), - txn, - nullptr /* table */, - false /* has_ysql_ddl_state */, - when_done), + &CatalogManager::ScheduleVerifyNamespacePgLayer, + catalog_manager_, txn, ns, state_->epoch), "VerifyTransaction"); } break; diff --git a/src/yb/master/catalog_manager.cc b/src/yb/master/catalog_manager.cc index 80d3b5db9e65..25e3c5e6cb7c 100644 --- a/src/yb/master/catalog_manager.cc +++ b/src/yb/master/catalog_manager.cc @@ -161,8 +161,8 @@ #include "yb/master/yql_triggers_vtable.h" #include "yb/master/yql_types_vtable.h" #include "yb/master/yql_views_vtable.h" +#include "yb/master/ysql_ddl_verification_task.h" #include "yb/master/ysql_tablegroup_manager.h" -#include "yb/master/ysql_transaction_ddl.h" #include "yb/rpc/messenger.h" #include "yb/rpc/rpc_controller.h" @@ -1048,9 +1048,6 @@ Status CatalogManager::Init() { state_ = kStarting; } - ysql_transaction_ = std::make_unique( - sys_catalog_.get(), master_->client_future(), background_tasks_thread_pool_.get()); - // Initialize the metrics emitted by the catalog manager. load_balance_policy_->InitMetrics(); @@ -1506,6 +1503,12 @@ Status CatalogManager::RunLoaders(SysCatalogLoadingState* state) { ClearXReplState(); + // Clear Ddl transaction state. + { + LockGuard l(ddl_txn_verifier_mutex_); + ysql_ddl_txn_verfication_state_map_.clear(); + } + std::vector> descs; master_->ts_manager()->GetAllDescriptors(&descs); for (const auto& ts_desc : descs) { @@ -3524,12 +3527,7 @@ Status CatalogManager::CreateYsqlSysTable( // Verify Transaction gets committed, which occurs after table create finishes. if (req->has_transaction() && PREDICT_TRUE(FLAGS_enable_transactional_ddl_gc)) { LOG(INFO) << "Enqueuing table for Transaction Verification: " << req->name(); - std::function when_done = - std::bind(&CatalogManager::VerifyTablePgLayer, this, table, _1, epoch); - WARN_NOT_OK(background_tasks_thread_pool_->SubmitFunc( - std::bind(&YsqlTransactionDdl::VerifyTransaction, ysql_transaction_.get(), - txn, table, false /* has_ysql_txn_ddl_state */, when_done)), - "Could not submit VerifyTransaction to thread pool"); + ScheduleVerifyTablePgLayer(txn, table, epoch); } tablet::ChangeMetadataRequestPB change_req; @@ -4335,15 +4333,10 @@ Status CatalogManager::CreateTable(const CreateTableRequestPB* orig_req, // Verify Transaction gets committed, which occurs after table create finishes. if (req.has_transaction()) { if (schedule_ysql_txn_verifier) { - ScheduleYsqlTxnVerification(table, txn, epoch); + RETURN_NOT_OK(ScheduleYsqlTxnVerification(table, txn, epoch)); } else if (PREDICT_TRUE(FLAGS_enable_transactional_ddl_gc)) { LOG(INFO) << "Enqueuing table for Transaction Verification: " << req.name(); - std::function when_done = - std::bind(&CatalogManager::VerifyTablePgLayer, this, table, _1, epoch); - WARN_NOT_OK(background_tasks_thread_pool_->SubmitFunc( - std::bind(&YsqlTransactionDdl::VerifyTransaction, ysql_transaction_.get(), - txn, table, false /* has_ysql_ddl_txn_state */, when_done)), - "Could not submit VerifyTransaction to thread pool"); + ScheduleVerifyTablePgLayer(txn, table, epoch); } } @@ -4416,38 +4409,23 @@ Status CatalogManager::CreateTableIfNotFound( return Status::OK(); } -Status CatalogManager::VerifyTablePgLayer( - scoped_refptr table, bool rpc_success, const LeaderEpoch& epoch) { - // Upon Transaction completion, check pg system table using OID to ensure SUCCESS. - - bool entry_exists = false; - if (!table->IsColocationParentTable()) { - // Check that pg_class still has an entry for the table. - const PgOid database_oid = VERIFY_RESULT(GetPgsqlDatabaseOidByTableId(table->id())); - const auto pg_class_table_id = GetPgsqlTableId(database_oid, kPgClassTableOid); - - PgOid pg_table_oid = VERIFY_RESULT(table->GetPgTableOid()); - PgOid relfilenode_oid = VERIFY_RESULT(table->GetPgRelfilenodeOid()); - - entry_exists = VERIFY_RESULT( - ysql_transaction_->PgEntryExists(pg_class_table_id, pg_table_oid, - // If relfilenode_oid is the same as pg table oid, this is isn't a rewritten table and - // we don't need to perform additional checks on the relfilenode column. - relfilenode_oid == pg_table_oid ? boost::none : boost::make_optional(relfilenode_oid))); - } else { - // The table we have is a dummy parent table, hence not present in YSQL. - // We need to check a tablegroup instead. - const auto tablegroup_id = GetTablegroupIdFromParentTableId(table->id()); - const PgOid database_oid = VERIFY_RESULT(GetPgsqlDatabaseOidByTablegroupId(tablegroup_id)); - const auto pg_yb_tablegroup_table_id = GetPgsqlTableId(database_oid, kPgYbTablegroupTableOid); - const PgOid tablegroup_oid = VERIFY_RESULT(GetPgsqlTablegroupOid(tablegroup_id)); +void CatalogManager::ScheduleVerifyTablePgLayer(TransactionMetadata txn, + const TableInfoPtr& table, + const LeaderEpoch& epoch) { + auto when_done = [this, table, epoch](Result exists) { + WARN_NOT_OK(VerifyTablePgLayer(table, exists, epoch), "Failed to verify table"); + }; + TableSchemaVerificationTask::CreateAndStartTask( + *this, table, txn, std::move(when_done), sys_catalog_.get(), master_->client_future(), + *master_->messenger(), epoch, false /* ddl_atomicity_enabled */); +} - entry_exists = VERIFY_RESULT( - ysql_transaction_->PgEntryExists(pg_yb_tablegroup_table_id, - tablegroup_oid, - boost::none /* relfilenode_oid */)); +Status CatalogManager::VerifyTablePgLayer( + scoped_refptr table, Result exists, const LeaderEpoch& epoch) { + if (!exists.ok()) { + return exists.status(); } - + // Upon Transaction completion, check pg system table using OID to ensure SUCCESS. auto l = table->LockForWrite(); auto* mutable_table_info = table->mutable_metadata()->mutable_dirty(); auto& metadata = mutable_table_info->pb; @@ -4458,19 +4436,12 @@ Status CatalogManager::VerifyTablePgLayer( "Unexpected table state ($0), abandoning transaction GC work for $1", SysTablesEntryPB_State_Name(metadata.state()), table->ToString())); - // #5981: Mark un-retryable rpc failures as pass to avoid infinite retry of GC'd txns. - const bool txn_check_passed = entry_exists || !rpc_success; - - if (txn_check_passed) { + if (exists.get()) { // Remove the transaction from the entry since we're done processing it. metadata.clear_transaction(); RETURN_NOT_OK(sys_catalog_->Upsert(epoch, table)); - if (entry_exists) { - LOG_WITH_PREFIX(INFO) << "Table transaction succeeded: " << table->ToString(); - } else { - LOG_WITH_PREFIX(WARNING) - << "Unknown RPC failure, removing transaction on table: " << table->ToString(); - } + LOG_WITH_PREFIX(INFO) << "Table transaction succeeded: " << table->ToString(); + // Commit the in-memory state. l.Commit(); } else { @@ -6286,7 +6257,6 @@ Status CatalogManager::DeleteTable( if (req->ysql_ddl_rollback_enabled()) { DCHECK(req->has_transaction()); DCHECK(req->transaction().has_transaction_id()); - RETURN_NOT_OK(WaitForDdlVerificationToFinish(table, req->transaction().transaction_id())); } scoped_refptr indexed_table; @@ -6336,8 +6306,14 @@ Status CatalogManager::DeleteTable( // If the table is already undergoing DDL transaction verification as part of a different // transaction then fail this request. if (l->pb_transaction_id() != req->transaction().transaction_id()) { - const Status s = STATUS(TryAgain, "Table is undergoing DDL transaction verification"); - return SetupError(resp->mutable_error(), MasterErrorPB::TABLE_SCHEMA_CHANGE_IN_PROGRESS, s); + auto txn_meta = VERIFY_RESULT(TransactionMetadata::FromPB(l->pb.transaction())); + auto req_txn_meta = VERIFY_RESULT(TransactionMetadata::FromPB(req->transaction())); + RETURN_NOT_OK(TriggerDdlVerificationIfNeeded(txn_meta, epoch)); + return STATUS_EC_FORMAT(TryAgain, + MasterError(MasterErrorPB::TABLE_SCHEMA_CHANGE_IN_PROGRESS), + "DDL transaction $0 cannot continue as table $1 is undergoing DDL transaction " + "verification for $2", req_txn_meta.transaction_id, table->name(), + txn_meta.transaction_id); } // This DROP operation is part of a DDL transaction that has already made changes // to this table. @@ -6367,7 +6343,7 @@ Status CatalogManager::DeleteTable( TRACE("Committing in-memory state as part of DeleteTable operation"); l.Commit(); if (!ysql_txn_verifier_state_present) { - ScheduleYsqlTxnVerification(table, txn, epoch); + RETURN_NOT_OK(ScheduleYsqlTxnVerification(table, txn, epoch)); } return Status::OK(); } @@ -6789,6 +6765,16 @@ void CatalogManager::CleanUpDeletedTables(const LeaderEpoch& epoch) { for (auto& lock : table_locks) { lock.Commit(); } + for (auto table : tables_to_update_on_disk) { + // Clean up any DDL verification state that is waiting for this table to start deleting. + auto res = table->LockForRead()->GetCurrentDdlTransactionId(); + WARN_NOT_OK( + res, Format("Failed to get current DDL transaction for table $0", table->ToString())); + if (!res.ok() || res.get() == TransactionId::Nil()) { + continue; + } + RemoveDdlTransactionState(table->id(), {res.get()}); + } // TODO: Check if we want to delete the totally deleted table from the sys_catalog here. // TODO: SysCatalog::DeleteItem() if we've DELETED all user tables in a DELETING namespace. // TODO: Also properly handle namespace_ids_map_.erase(table->namespace_id()) @@ -7085,7 +7071,6 @@ Status CatalogManager::AlterTable(const AlterTableRequestPB* req, if (req->ysql_ddl_rollback_enabled()) { DCHECK(req->has_transaction()); DCHECK(req->transaction().has_transaction_id()); - RETURN_NOT_OK(WaitForDdlVerificationToFinish(table, req->transaction().transaction_id())); } TRACE("Locking table"); @@ -7095,8 +7080,14 @@ Status CatalogManager::AlterTable(const AlterTableRequestPB* req, // If the table is already undergoing an alter operation, return failure. if (req->ysql_ddl_rollback_enabled() && l->has_ysql_ddl_txn_verifier_state()) { if (l->pb_transaction_id() != req->transaction().transaction_id()) { - const Status s = STATUS(TryAgain, "Table is undergoing DDL transaction verification"); - return SetupError(resp->mutable_error(), MasterErrorPB::TABLE_SCHEMA_CHANGE_IN_PROGRESS, s); + auto txn_meta = VERIFY_RESULT(TransactionMetadata::FromPB(l->pb.transaction())); + auto req_txn_meta = VERIFY_RESULT(TransactionMetadata::FromPB(req->transaction())); + RETURN_NOT_OK(TriggerDdlVerificationIfNeeded(txn_meta, epoch)); + return STATUS_EC_FORMAT(TryAgain, + MasterError(MasterErrorPB::TABLE_SCHEMA_CHANGE_IN_PROGRESS), + "DDL transaction $0 cannot continue as table $1 is undergoing DDL transaction " + "verification for $2", req_txn_meta.transaction_id, table->name(), + txn_meta.transaction_id); } } @@ -7260,31 +7251,39 @@ Status CatalogManager::AlterTable(const AlterTableRequestPB* req, // rolls back. TransactionMetadata txn; bool schedule_ysql_txn_verifier = false; - if (!req->ysql_ddl_rollback_enabled()) { - // If DDL rollback is no longer enabled, make sure that there is no transaction - // verification state present. - table_pb.clear_ysql_ddl_txn_verifier_state(); - } else if (req->has_transaction() && table->GetTableType() == PGSQL_TABLE_TYPE) { - if (!l->has_ysql_ddl_txn_verifier_state()) { - table_pb.mutable_transaction()->CopyFrom(req->transaction()); - auto *ddl_state = table_pb.add_ysql_ddl_txn_verifier_state(); - SchemaToPB(previous_schema, ddl_state->mutable_previous_schema()); - ddl_state->set_previous_table_name(previous_table_name); - schedule_ysql_txn_verifier = true; + // DDL rollback is not applicable for the alter change that sets wal_retention_secs. + if (!req->has_wal_retention_secs()) { + if (!req->ysql_ddl_rollback_enabled()) { + // If DDL rollback is no longer enabled, make sure that there is no transaction + // verification state present. + if (l->has_ysql_ddl_txn_verifier_state()) { + auto txn_id = + VERIFY_RESULT(TransactionMetadata::FromPB(l->pb.transaction())).transaction_id; + LOG(INFO) << "Clearing ysql_ddl_txn_verifier state for table " << table->ToString(); + table_pb.clear_ysql_ddl_txn_verifier_state(); + RemoveDdlTransactionState(table->id(), {txn_id}); + } + } else if (req->has_transaction() && table->GetTableType() == PGSQL_TABLE_TYPE) { + if (!l->has_ysql_ddl_txn_verifier_state()) { + table_pb.mutable_transaction()->CopyFrom(req->transaction()); + auto *ddl_state = table_pb.add_ysql_ddl_txn_verifier_state(); + SchemaToPB(previous_schema, ddl_state->mutable_previous_schema()); + ddl_state->set_previous_table_name(previous_table_name); + schedule_ysql_txn_verifier = true; + } + txn = VERIFY_RESULT(TransactionMetadata::FromPB(req->transaction())); + RSTATUS_DCHECK(!txn.status_tablet.empty(), Corruption, "Given incomplete Transaction"); + DCHECK_EQ(table_pb.ysql_ddl_txn_verifier_state_size(), 1); + table_pb.mutable_ysql_ddl_txn_verifier_state(0)->set_contains_alter_table_op(true); } - txn = VERIFY_RESULT(TransactionMetadata::FromPB(req->transaction())); - RSTATUS_DCHECK(!txn.status_tablet.empty(), Corruption, "Given incomplete Transaction"); - DCHECK_EQ(table_pb.ysql_ddl_txn_verifier_state_size(), 1); - table_pb.mutable_ysql_ddl_txn_verifier_state(0)->set_contains_alter_table_op(true); } - // Update the in-memory state. TRACE("Committing in-memory state"); l.Commit(); // Verify Transaction gets committed, which occurs after table alter finishes. if (schedule_ysql_txn_verifier) { - ScheduleYsqlTxnVerification(table, txn, epoch); + RETURN_NOT_OK(ScheduleYsqlTxnVerification(table, txn, epoch)); } RETURN_NOT_OK(SendAlterTableRequest(table, epoch, req)); @@ -9210,50 +9209,44 @@ void CatalogManager::ProcessPendingNamespace( ns_write_lock.Commit(); if (has_transaction) { LOG(INFO) << "Enqueuing keyspace for Transaction Verification: " << ns->ToString(); - std::function when_done = - std::bind(&CatalogManager::VerifyNamespacePgLayer, this, ns, _1, epoch); - WARN_NOT_OK( - background_tasks_thread_pool_->SubmitFunc(std::bind( - &YsqlTransactionDdl::VerifyTransaction, ysql_transaction_.get(), txn, - nullptr /* table */, false /* has_ysql_ddl_state */, when_done)), - "Could not submit VerifyTransaction to thread pool"); + ScheduleVerifyNamespacePgLayer(txn, ns, epoch); } } -Status CatalogManager::VerifyNamespacePgLayer( - scoped_refptr ns, bool rpc_success, const LeaderEpoch& epoch) { - // Upon Transaction completion, check pg system table using OID to ensure SUCCESS. - const auto pg_table_id = GetPgsqlTableId(atoi(kSystemNamespaceId), kPgDatabaseTableOid); - const PgOid database_oid = VERIFY_RESULT(GetPgsqlDatabaseOid(ns->id())); - auto entry_exists = VERIFY_RESULT( - ysql_transaction_->PgEntryExists(pg_table_id, - database_oid, - boost::none /* relfilenode_oid */)); +void CatalogManager::ScheduleVerifyNamespacePgLayer( + TransactionMetadata txn, scoped_refptr ns, const LeaderEpoch& epoch) { + auto when_done = [this, ns, epoch](Result result) { + WARN_NOT_OK(VerifyNamespacePgLayer(ns, result, epoch), "VerifyNamespacePgLayer"); + }; + NamespaceVerificationTask::CreateAndStartTask( + *this, ns, txn, std::move(when_done), sys_catalog_.get(), master_->client_future(), + *master_->messenger(), epoch); +} + +Status CatalogManager::VerifyNamespacePgLayer(scoped_refptr ns, + Result exists, + const LeaderEpoch& epoch) { + if (!exists.ok()) { + return exists.status(); + } auto l = ns->LockForWrite(); SysNamespaceEntryPB& metadata = ns->mutable_metadata()->mutable_dirty()->pb; - // #5981: Mark un-retryable rpc failures as pass to avoid infinite retry of GC'd txns. - bool txn_check_passed = entry_exists || !rpc_success; - - if (txn_check_passed) { + if (exists.get()) { // Passed checks. Remove the transaction from the entry since we're done processing it. SCHECK_EQ(metadata.state(), SysNamespaceEntryPB::RUNNING, Aborted, - Substitute("Invalid Namespace state ($0), abandoning transaction GC work for $1", + Format("Invalid Namespace state ($0), abandoning transaction GC work for $1", SysNamespaceEntryPB_State_Name(metadata.state()), ns->ToString())); metadata.clear_transaction(); RETURN_NOT_OK(sys_catalog_->Upsert(leader_ready_term(), ns)); - if (entry_exists) { - LOG(INFO) << "Namespace transaction succeeded: " << ns->ToString(); - } else { - LOG(WARNING) << "Unknown RPC Failure, removing transaction on namespace: " << ns->ToString(); - } + LOG(INFO) << "Namespace transaction succeeded: " << ns->ToString(); // Commit the namespace in-memory state. l.Commit(); } else { // Transaction failed. We need to delete this Database now. SCHECK(metadata.state() == SysNamespaceEntryPB::RUNNING || metadata.state() == SysNamespaceEntryPB::FAILED, Aborted, - Substitute("Invalid Namespace state ($0), aborting delete.", + Format("Invalid Namespace state ($0), aborting delete.", SysNamespaceEntryPB_State_Name(metadata.state()), ns->ToString())); LOG(INFO) << "Namespace transaction failed, deleting: " << ns->ToString(); metadata.set_state(SysNamespaceEntryPB::DELETING); @@ -11247,14 +11240,14 @@ void CatalogManager::ExtractTabletsToProcess( } } -bool CatalogManager::AreTablesDeleting() { +bool CatalogManager::AreTablesDeletingOrHiding() { SharedLock lock(mutex_); for (const auto& table : tables_->GetAllTables()) { auto table_lock = table->LockForRead(); // TODO(jason): possibly change this to started_deleting when we begin removing DELETED tables // from tables_ (see CleanUpDeletedTables). - if (table_lock->is_deleting()) { + if (table_lock->is_deleting() || table_lock->is_hiding()) { return true; } } @@ -11384,6 +11377,9 @@ Status CatalogManager::HandleTabletSchemaVersionReport( } } + // Clean up any DDL verification state that is waiting for this Alter to complete. + RemoveDdlTransactionState(table->id(), table->EraseDdlTxnsWaitingForSchemaVersion(version)); + // With Replication Enabled, verify that we've finished applying the New Schema. // This may need to be refactored when we support Replication + Active Index Backfill in #7613. if (IsTableXClusterConsumer(*table)) { @@ -13293,6 +13289,13 @@ void CatalogManager::CheckTableDeleted(const TableInfoPtr& table, const LeaderEp return; } lock.Commit(); + // Clean up any DDL verification state that is waiting for this table to be deleted. + auto res = table->LockForRead()->GetCurrentDdlTransactionId(); + WARN_NOT_OK( + res, "Failed to get current DDL transaction for table " + table->ToString()); + if (res.ok() && res.get() != TransactionId::Nil()) { + RemoveDdlTransactionState(table->id(), {res.get()}); + } }), "Failed to submit update table task"); } diff --git a/src/yb/master/catalog_manager.h b/src/yb/master/catalog_manager.h index 1a3954a5d043..e7ba9285cfe4 100644 --- a/src/yb/master/catalog_manager.h +++ b/src/yb/master/catalog_manager.h @@ -183,11 +183,17 @@ YB_DEFINE_ENUM( (kReady) ); -using DdlTxnIdToTablesMap = - std::unordered_map>, TransactionIdHash>; - const std::string& GetIndexedTableId(const SysTablesEntryPB& pb); +YB_DEFINE_ENUM(YsqlDdlVerificationState, + (kDdlInProgress) + (kDdlPostProcessing) + (kDdlPostProcessingFailed)); + +YB_DEFINE_ENUM(TxnState, (kUnknown) (kCommitted) (kAborted)); + +struct YsqlTableDdlTxnState; + // The component of the master which tracks the state and location // of tables/tablets in the cluster. // @@ -342,9 +348,12 @@ class CatalogManager : public tserver::TabletPeerLookupIf, Status WaitForAlterTableToFinish(const TableId& table_id, CoarseTimePoint deadline); + void ScheduleVerifyTablePgLayer(TransactionMetadata txn, + const TableInfoPtr& table, const LeaderEpoch& epoch); + // Called when transaction associated with table create finishes. Verifies postgres layer present. - Status VerifyTablePgLayer( - scoped_refptr table, bool txn_query_succeeded, const LeaderEpoch& epoch); + Status VerifyTablePgLayer(scoped_refptr table, Result exists, + const LeaderEpoch& epoch); // Truncate the specified table. // @@ -436,40 +445,59 @@ class CatalogManager : public tserver::TabletPeerLookupIf, Result GetTableNamespaceId(TableId table_id) EXCLUDES(mutex_); - void ScheduleYsqlTxnVerification(const scoped_refptr& table, - const TransactionMetadata& txn, const LeaderEpoch& epoch) - EXCLUDES(ddl_txn_verifier_mutex_); - - Status YsqlTableSchemaChecker(scoped_refptr table, - const std::string& txn_id_pb, - bool txn_rpc_success, const LeaderEpoch& epoch); - - Status YsqlDdlTxnCompleteCallback(scoped_refptr table, - const std::string& txn_id_pb, - bool success, const LeaderEpoch& epoch); + Status ScheduleYsqlTxnVerification(const TableInfoPtr& table, + const TransactionMetadata& txn, const LeaderEpoch& epoch) + EXCLUDES(ddl_txn_verifier_mutex_); + + // If YsqlDdlVerificationState already exists for 'txn', update it by adding an entry for 'table'. + // Otherwise, create a new YsqlDdlVerificationState for 'txn' with an entry for 'table'. + // Returns true if a new YsqlDdlVerificationState was created. + bool CreateOrUpdateDdlTxnVerificationState( + const TableInfoPtr& table, const TransactionMetadata& txn) + EXCLUDES(ddl_txn_verifier_mutex_); + + // Schedules a task to find the status of 'txn' and update the schema of 'table' based on whether + // 'txn' was committed or aborted. This function should only ever be invoked after using + // the above CreateOrUpdateDdlTxnVerificationState to verify that no task for the same transaction + // has already been invoked. Scheduling two tasks for the same transaction will not lead to any + // correctness issues, but will lead to unnecessary work (i.e. polling the transaction + // coordinator and performing schema comparison). + Status ScheduleVerifyTransaction(const TableInfoPtr& table, + const TransactionMetadata& txn, const LeaderEpoch& epoch); + + Status YsqlTableSchemaChecker(TableInfoPtr table, + const std::string& pb_txn_id, + Result is_committed, + const LeaderEpoch& epoch); + + Status YsqlDdlTxnCompleteCallback(const std::string& pb_txn_id, + bool is_committed, + const LeaderEpoch& epoch); Status YsqlDdlTxnCompleteCallbackInternal( TableInfo* table, const TransactionId& txn_id, bool success, const LeaderEpoch& epoch); - Status HandleSuccessfulYsqlDdlTxn( - TableInfo* table, TableInfo::WriteLock* l, const LeaderEpoch& epoch); + Status HandleSuccessfulYsqlDdlTxn(const YsqlTableDdlTxnState txn_data); - Status HandleAbortedYsqlDdlTxn( - TableInfo* table, TableInfo::WriteLock* l, const LeaderEpoch& epoch); + Status HandleAbortedYsqlDdlTxn(const YsqlTableDdlTxnState txn_data); - Status ClearYsqlDdlTxnState(TableInfo* table, TableInfo::WriteLock* l, const LeaderEpoch& epoch); + Status ClearYsqlDdlTxnState(const YsqlTableDdlTxnState txn_data); - Status YsqlDdlTxnAlterTableHelper(TableInfo *table, - TableInfo::WriteLock* l, + Status YsqlDdlTxnAlterTableHelper(const YsqlTableDdlTxnState txn_data, const std::vector& ddl_log_entries, - const std::string& new_table_name, - const LeaderEpoch& epoch); + const std::string& new_table_name); - Status YsqlDdlTxnDropTableHelper( - TableInfo* table, TableInfo::WriteLock* l, const LeaderEpoch& epoch); + Status YsqlDdlTxnDropTableHelper(const YsqlTableDdlTxnState txn_data); Status WaitForDdlVerificationToFinish( - const scoped_refptr& table, const std::string& pb_txn_id); + const TableInfoPtr& table, const std::string& pb_txn_id); + + void UpdateDdlVerificationState(const TransactionId& txn, YsqlDdlVerificationState state); + + void RemoveDdlTransactionState( + const TableId& table_id, const std::vector& txn_ids); + + Status TriggerDdlVerificationIfNeeded(const TransactionMetadata& txn, const LeaderEpoch& epoch); // Get the information about the specified table. Status GetTableSchema(const GetTableSchemaRequestPB* req, @@ -1166,6 +1194,12 @@ class CatalogManager : public tserver::TabletPeerLookupIf, rpc::RpcContext* rpc, const LeaderEpoch& epoch); + Status IsYsqlDdlVerificationDone( + const IsYsqlDdlVerificationDoneRequestPB* req, + IsYsqlDdlVerificationDoneResponsePB* resp, + rpc::RpcContext* rpc, + const LeaderEpoch& epoch); + Status GetStatefulServiceLocation( const GetStatefulServiceLocationRequestPB* req, GetStatefulServiceLocationResponsePB* resp); @@ -1587,6 +1621,7 @@ class CatalogManager : public tserver::TabletPeerLookupIf, friend class YsqlBackendsManager; friend class BackendsCatalogVersionJob; friend class AddTableToXClusterTargetTask; + friend class VerifyDdlTransactionTask; FRIEND_TEST(yb::MasterPartitionedTest, VerifyOldLeaderStepsDown); @@ -1665,8 +1700,11 @@ class CatalogManager : public tserver::TabletPeerLookupIf, TransactionMetadata txn, const LeaderEpoch& epoch); // Called when transaction associated with NS create finishes. Verifies postgres layer present. - Status VerifyNamespacePgLayer( - scoped_refptr ns, bool txn_query_succeeded, const LeaderEpoch& epoch); + void ScheduleVerifyNamespacePgLayer(TransactionMetadata txn, + scoped_refptr ns, const LeaderEpoch& epoch); + + Status VerifyNamespacePgLayer(scoped_refptr ns, Result exists, + const LeaderEpoch& epoch); Status ConsensusStateToTabletLocations(const consensus::ConsensusStatePB& cstate, TabletLocationsPB* locs_pb); @@ -1791,7 +1829,7 @@ class CatalogManager : public tserver::TabletPeerLookupIf, TableToTabletInfos *tablets_to_process); // Determine whether any tables are in the DELETING state. - bool AreTablesDeleting() override; + bool AreTablesDeletingOrHiding() override; // Task that takes care of the tablet assignments/creations. // Loops through the "not created" tablets and sends a CreateTablet() request. @@ -2408,9 +2446,6 @@ class CatalogManager : public tserver::TabletPeerLookupIf, // A pointer to the system.partitions tablet for the RebuildYQLSystemPartitions bg task. std::shared_ptr system_partitions_tablet_ = nullptr; - // Handles querying and processing YSQL DDL Transactions as a catalog manager background task. - std::unique_ptr ysql_transaction_; - std::atomic time_elected_leader_; std::unique_ptr cdc_state_client_; @@ -3164,13 +3199,24 @@ class CatalogManager : public tserver::TabletPeerLookupIf, rpc::ScheduledTaskTracker refresh_ysql_tablespace_info_task_; - // Guards ddl_txn_id_to_table_map_ below. - mutable MutexType ddl_txn_verifier_mutex_; + struct YsqlDdlTransactionState { + // Indicates whether the transaction is committed or aborted or unknown. + TxnState txn_state; + + // Indicates the verification state of the DDL transaction. + YsqlDdlVerificationState state; + + // The table info objects of the tables affected by this transaction. + std::vector> tables; + }; // This map stores the transaction ids of all the DDL transactions undergoing verification. // For each transaction, it also stores pointers to the table info objects of the tables affected // by that transaction. - DdlTxnIdToTablesMap ddl_txn_id_to_table_map_ GUARDED_BY(ddl_txn_verifier_mutex_); + mutable MutexType ddl_txn_verifier_mutex_; + + std::unordered_map + ysql_ddl_txn_verfication_state_map_ GUARDED_BY(ddl_txn_verifier_mutex_); ServerRegistrationPB server_registration_; diff --git a/src/yb/master/catalog_manager_bg_tasks.cc b/src/yb/master/catalog_manager_bg_tasks.cc index 94879c6d2991..6828300e6295 100644 --- a/src/yb/master/catalog_manager_bg_tasks.cc +++ b/src/yb/master/catalog_manager_bg_tasks.cc @@ -266,7 +266,7 @@ void CatalogManagerBgTasks::Run() { WARN_NOT_OK(catalog_manager_->clone_state_manager()->Run(), "Failed to run CloneStateManager: "); - if (!to_delete.empty() || catalog_manager_->AreTablesDeleting()) { + if (!to_delete.empty() || catalog_manager_->AreTablesDeletingOrHiding()) { catalog_manager_->CleanUpDeletedTables(l.epoch()); } diff --git a/src/yb/master/catalog_manager_if.h b/src/yb/master/catalog_manager_if.h index 62ed9235d4a2..64b0031bc20a 100644 --- a/src/yb/master/catalog_manager_if.h +++ b/src/yb/master/catalog_manager_if.h @@ -254,7 +254,7 @@ class CatalogManagerIf { virtual Result> GetTabletInfo(const TabletId& tablet_id) = 0; - virtual bool AreTablesDeleting() = 0; + virtual bool AreTablesDeletingOrHiding() = 0; virtual Status GetCurrentConfig(consensus::ConsensusStatePB *cpb) const = 0; diff --git a/src/yb/master/master_ddl.proto b/src/yb/master/master_ddl.proto index 2f0400a508f6..1a631ac22a4f 100644 --- a/src/yb/master/master_ddl.proto +++ b/src/yb/master/master_ddl.proto @@ -711,6 +711,10 @@ message GetUDTypeInfoResponsePB { optional UDTypeInfoPB udtype = 2; } +// ============================================================================ +// YSQL DDL Atomicity +// ============================================================================ + message ReportYsqlDdlTxnStatusRequestPB { // The transaction whose status is being reported. optional bytes transaction_id = 1; @@ -723,6 +727,16 @@ message ReportYsqlDdlTxnStatusResponsePB { optional MasterErrorPB error = 1; } +message IsYsqlDdlVerificationDoneRequestPB { + optional TransactionMetadataPB transaction = 1; +} + +message IsYsqlDdlVerificationDoneResponsePB { + optional MasterErrorPB error = 1; + + optional bool done = 2; +} + service MasterDdl { option (yb.rpc.custom_service_name) = "yb.master.MasterService"; @@ -775,4 +789,6 @@ service MasterDdl { rpc ReportYsqlDdlTxnStatus(ReportYsqlDdlTxnStatusRequestPB) returns (ReportYsqlDdlTxnStatusResponsePB); + rpc IsYsqlDdlVerificationDone(IsYsqlDdlVerificationDoneRequestPB) returns + (IsYsqlDdlVerificationDoneResponsePB); } diff --git a/src/yb/master/master_ddl_service.cc b/src/yb/master/master_ddl_service.cc index 4faebd55dbdc..e5f9f67f714d 100644 --- a/src/yb/master/master_ddl_service.cc +++ b/src/yb/master/master_ddl_service.cc @@ -53,6 +53,7 @@ class MasterDdlServiceImpl : public MasterServiceBase, public MasterDdlIf { (IsDeleteNamespaceDone) (IsDeleteTableDone) (IsTruncateTableDone) + (IsYsqlDdlVerificationDone) (LaunchBackfillIndexForTable) (ListNamespaces) (ListTablegroups) diff --git a/src/yb/master/multi_step_monitored_task.cc b/src/yb/master/multi_step_monitored_task.cc index a8864b3b3394..3d0848508d22 100644 --- a/src/yb/master/multi_step_monitored_task.cc +++ b/src/yb/master/multi_step_monitored_task.cc @@ -195,17 +195,15 @@ void MultiStepMonitoredTask::EndTask(const Status& status) { TaskCompleted(status); - StdStatusCallback callback; - callback.swap(completion_callback_); - completion_timestamp_ = MonoTime::Now(); LOG_WITH_PREFIX(INFO) << this << " task ended" << (status.ok() ? " successfully" : ""); + auto retain_self = shared_from_this(); UnregisterTask(); - // Unsafe to use 'this' beyond this point. - if (callback) { - callback(status); + if (completion_callback_) { + completion_callback_(status); + completion_callback_ = nullptr; } } diff --git a/src/yb/master/ysql_ddl_handler.cc b/src/yb/master/ysql_ddl_handler.cc index ee4112936de3..6c6b0dddb6cb 100644 --- a/src/yb/master/ysql_ddl_handler.cc +++ b/src/yb/master/ysql_ddl_handler.cc @@ -15,9 +15,10 @@ #include "yb/master/catalog_manager.h" #include "yb/master/master.h" #include "yb/master/master_ddl.pb.h" -#include "yb/master/ysql_transaction_ddl.h" +#include "yb/master/ysql_ddl_verification_task.h" #include "yb/util/backoff_waiter.h" +#include "yb/util/sync_point.h" #include "yb/util/trace.h" DEFINE_RUNTIME_bool(retry_if_ddl_txn_verification_pending, true, @@ -35,6 +36,9 @@ DEFINE_test_flag(int32, ysql_max_random_delay_before_ddl_verification_usecs, 0, DEFINE_test_flag(bool, pause_ddl_rollback, false, "Pause DDL rollback"); +DEFINE_test_flag(bool, hang_on_ddl_verification_progress, false, + "Used in tests to simulate a hang while checking ddl verification progress."); + using namespace std::placeholders; using std::shared_ptr; using std::string; @@ -43,149 +47,210 @@ using std::vector; namespace yb { namespace master { -void CatalogManager::ScheduleYsqlTxnVerification( - const scoped_refptr& table, const TransactionMetadata& txn, +/* + * This file contains all the logic required for YSQL DDL transaction verification. This is done + * by maintaining the verification state for every YSQL DDL transaction in + * 'ysql_ddl_txn_verfication_state_map_'. Each transaction id is associated with a vector of tables + * that it affects and has 2 states. + * 1. txn_state: This indicates whether the transaction has finished, and whether it has + * committed or aborted. This is updated either when the poller in ysql_transaction_ddl finishes + * or when PG sends a ReportYsqlDdlTxnStatus RPC. + * 2. state: This indicates the state of the transaction verification. It can be one of the + * following: + * a. kDdlInProgress: This indicates that the transaction is not finished yet. + * b. kDdlPostProcessing: This indicates that the transaction is finished and 'txn_state' + * can tell us whether the transaction is committed or aborted. + * c. kDdlPostProcessingFailed: This indicates due to any error (such as failure to schedule + * callbacks/could not get reply from the transaction coordinator) we could not perform the + * rollback/rollforward necessary. In this case, the tables remain with the DDL state on them. + * If a future DDL transaction tries to modify the same table, it will re-trigger the DDL + * verification if it sees that the transaction is in kDdlPostProcessingFailed. A master + * restart will also re-trigger the DDL verification. In future we could have a background + * thread that periodically checks for such transactions and re-triggers the DDL verification. + * If DDL transaction is verified successfully, it will be removed from the map. Note that DDL + * transaction verificaion can thus be kicked off in 4 ways: + * a) When a DDL is started, we kick off a poller through ysql_transaction_ddl that checks the + * transaction status and starts the DDL transaction verification once the transaction finishes. + * b) When a DDL is finished, YSQL will send a ReportYsqlDdlTxnStatus RPC to the master with the + * status of the transaction. This will also trigger the DDL transaction verification. + * c) YSQL sends IsYsqlDdlVerificationDone RPC to the master to check whether the DDL transaction + * verification is complete before returning to the client. This may also trigger DDL + * verification if the transaction is in kDdlPostProcessingFailed state. + * d) When another DDL tries to modify the same table, it will trigger DDL verification if the + * old transaction is in kDdlPostProcessingFailed state. +*/ +Status CatalogManager::ScheduleYsqlTxnVerification( + const TableInfoPtr& table, const TransactionMetadata& txn, const LeaderEpoch& epoch) { - // Add this transaction to the map containing all the transactions yet to be - // verified. - { - LockGuard lock(ddl_txn_verifier_mutex_); - ddl_txn_id_to_table_map_[txn.transaction_id].push_back(table); - } + + bool new_transaction = CreateOrUpdateDdlTxnVerificationState(table, txn); if (FLAGS_TEST_disable_ysql_ddl_txn_verification) { LOG(INFO) << "Skip scheduling table " << table->ToString() << " for transaction verification " << "as TEST_disable_ysql_ddl_txn_verification is set"; - return; + return Status::OK(); } - // Schedule transaction verification. - auto l = table->LockForRead(); - LOG(INFO) << "Enqueuing table for DDL transaction Verification: " << table->name() - << " id: " << table->id() << " schema version: " << l->pb.version() - << " for transaction " << txn; - std::function when_done = - std::bind(&CatalogManager::YsqlTableSchemaChecker, this, table, - l->pb_transaction_id(), _1, epoch); - // For now, just print warning if submission to thread pool fails. Fix this as part of - // #13358. - WARN_NOT_OK(background_tasks_thread_pool_->SubmitFunc( - std::bind(&YsqlTransactionDdl::VerifyTransaction, ysql_transaction_.get(), txn, table, - true /* has_ysql_ddl_state */, when_done)), - "Could not submit VerifyTransaction to thread pool"); + if (new_transaction) { + return ScheduleVerifyTransaction(table, txn, epoch); + } + return Status::OK(); } -Status CatalogManager::YsqlTableSchemaChecker( - scoped_refptr table, const string& pb_txn_id, bool txn_rpc_success, - const LeaderEpoch& epoch) { - if (!txn_rpc_success) { - return STATUS_FORMAT(IllegalState, "Failed to find Transaction Status for table $0", - table->ToString()); +bool CatalogManager::CreateOrUpdateDdlTxnVerificationState( + const TableInfoPtr& table, const TransactionMetadata& txn) { + LockGuard lock(ddl_txn_verifier_mutex_); + auto state = FindOrNull(ysql_ddl_txn_verfication_state_map_, txn.transaction_id); + if (state) { + // This transaction is already being verified. Add this table to the list of tables modified + // by this transaction and return. + LOG_IF(DFATAL, state->txn_state == TxnState::kCommitted) + << "Transaction " << txn << " is already complete, but received request to verify table " + << table; + LOG(INFO) << "Enqueuing table " << table << " to the list of tables being verified for " + << "transaction " << txn; + state->tables.push_back(table); + return false; } - bool is_committed = VERIFY_RESULT(ysql_transaction_->PgSchemaChecker(table)); - return YsqlDdlTxnCompleteCallback(table, pb_txn_id, is_committed, epoch); + + LOG(INFO) << "Enqueuing table " << table->ToString() + << " for schema comparison for transaction " << txn; + ysql_ddl_txn_verfication_state_map_.emplace(txn.transaction_id, + YsqlDdlTransactionState{TxnState::kUnknown, + YsqlDdlVerificationState::kDdlInProgress, + {table}}); + return true; } -Status CatalogManager::ReportYsqlDdlTxnStatus( - const ReportYsqlDdlTxnStatusRequestPB* req, ReportYsqlDdlTxnStatusResponsePB* resp, - rpc::RpcContext* rpc, const LeaderEpoch& epoch) { - DCHECK(req); - const auto& req_txn = req->transaction_id(); - SCHECK(!req_txn.empty(), IllegalState, - "Received ReportYsqlDdlTxnStatus request without transaction id"); - auto txn = VERIFY_RESULT(FullyDecodeTransactionId(req_txn)); +Status CatalogManager::ScheduleVerifyTransaction( + const TableInfoPtr& table, const TransactionMetadata& txn, + const LeaderEpoch& epoch) { + auto l = table->LockForRead(); + LOG(INFO) << "Enqueuing table for DDL transaction Verification: " << table->name() + << " id: " << table->id() << " schema version: " << l->pb.version() + << " for transaction " << txn; + const string txn_id_pb = l->pb_transaction_id(); + auto when_done = [this, table, txn_id_pb, epoch](Result is_committed) { + WARN_NOT_OK(YsqlTableSchemaChecker(table, txn_id_pb, is_committed, epoch), + "YsqlTableSchemaChecker failed"); + }; + TableSchemaVerificationTask::CreateAndStartTask( + *this, table, txn, std::move(when_done), sys_catalog_.get(), master_->client_future(), + *master_->messenger(), epoch, true /* ddl_atomicity_enabled */); + return Status::OK(); +} - const auto is_committed = req->is_committed(); - LOG(INFO) << "Received ReportYsqlDdlTxnStatus request for transaction " << txn - << ". Status: " << (is_committed ? "Success" : "Aborted"); - { - SharedLock lock(ddl_txn_verifier_mutex_); - const auto iter = ddl_txn_id_to_table_map_.find(txn); - if (iter == ddl_txn_id_to_table_map_.end()) { - // Transaction not found in the list of transactions to be verified. Ideally this means that - // the YB-Master background task somehow got to it before PG backend sent this report. However - // it is possible to receive this report BEFORE we added the transaction to the map if: - // 1. The transaction failed before performing any DocDB schema change. - // 2. Transaction failed and this report arrived in the small window between schema change - // initiation and scheduling the verification task. - // We have to do nothing in case of (1). In case of (2), it is safe to do nothing as the - // background task will take care of it. This is not optimal but (2) is expected to be very - // rare. - LOG(INFO) << "DDL transaction " << txn << " not found in list of transactions to be " - << "verified, nothing to do"; +Status CatalogManager::YsqlTableSchemaChecker(TableInfoPtr table, + const string& pb_txn_id, + Result is_committed, + const LeaderEpoch& epoch) { + if (!is_committed.ok()) { + auto txn = VERIFY_RESULT(FullyDecodeTransactionId(pb_txn_id)); + LockGuard lock(ddl_txn_verifier_mutex_); + auto verifier_state = FindOrNull(ysql_ddl_txn_verfication_state_map_, txn); + if (!verifier_state) { + VLOG(3) << "Transaction " << txn << " is already verified, ignoring"; return Status::OK(); } - for (const auto& table : iter->second) { - // Submit this table for transaction verification. - LOG(INFO) << "Enqueuing table " << table->ToString() << " for handling " - << (is_committed ? "successful " : "aborted ") << txn; - WARN_NOT_OK( - background_tasks_thread_pool_->SubmitFunc([this, table, req_txn, is_committed, epoch]() { - WARN_NOT_OK( - YsqlDdlTxnCompleteCallback(table, req_txn, is_committed, epoch), - "Transaction verification failed for table " + table->ToString()); - }), - "Could not submit YsqlDdlTxnCompleteCallback to thread pool"); + + if (verifier_state->state == YsqlDdlVerificationState::kDdlPostProcessing) { + // Verification is already in progress. + VLOG(3) << "Transaction " << txn << " is already being verified, ignoring"; + return Status::OK(); } + verifier_state->state = YsqlDdlVerificationState::kDdlPostProcessingFailed; + return STATUS_FORMAT(IllegalState, + "Find Transaction Status for table $0 txn: $1 failed with $2", + table->ToString(), txn, is_committed.status()); } - return Status::OK(); + + return YsqlDdlTxnCompleteCallback(pb_txn_id, is_committed.get(), epoch); } -Status CatalogManager::YsqlDdlTxnCompleteCallback(scoped_refptr table, - const string& pb_txn_id, - bool success, +Status CatalogManager::YsqlDdlTxnCompleteCallback(const string& pb_txn_id, + bool is_committed, const LeaderEpoch& epoch) { + SleepFor(MonoDelta::FromMicroseconds(RandomUniformInt(0, FLAGS_TEST_ysql_max_random_delay_before_ddl_verification_usecs))); - DCHECK(!pb_txn_id.empty()); - DCHECK(table); - const auto& table_id = table->id(); - const auto txn_id = VERIFY_RESULT(FullyDecodeTransactionId(pb_txn_id)); - bool table_present = false; + auto txn = VERIFY_RESULT(FullyDecodeTransactionId(pb_txn_id)); + LOG(INFO) << "YsqlDdlTxnCompleteCallback for transaction " + << txn << " is_committed: " << (is_committed ? "true" : "false"); + + vector tables; { LockGuard lock(ddl_txn_verifier_mutex_); - const auto iter = ddl_txn_id_to_table_map_.find(txn_id); - if (iter == ddl_txn_id_to_table_map_.end()) { - LOG(INFO) << "DDL transaction " << txn_id << " for table " << table->ToString() - << " is already verified, ignoring"; + auto verifier_state = FindOrNull(ysql_ddl_txn_verfication_state_map_, txn); + if (!verifier_state) { + VLOG(3) << "Transaction " << txn << " is already verified, ignoring"; return Status::OK(); } - auto& tables = iter->second; - auto removed_elements_iter = std::remove_if(tables.begin(), tables.end(), - [&table_id](const scoped_refptr& table) { - return table->id() == table_id; - }); - if (removed_elements_iter != tables.end()) { - tables.erase(removed_elements_iter, tables.end()); - table_present = true; - if (tables.empty()) { - ddl_txn_id_to_table_map_.erase(iter); - } + auto state = verifier_state->state; + if (state == YsqlDdlVerificationState::kDdlPostProcessing) { + // Verification is already in progress. + VLOG(3) << "Transaction " << txn << " is already being verified, ignoring"; + return Status::OK(); } + + tables = verifier_state->tables; + verifier_state->txn_state = + (is_committed) ? TxnState::kCommitted : TxnState::kAborted; + verifier_state->state = YsqlDdlVerificationState::kDdlPostProcessing; } - if (!table_present) { - LOG(INFO) << "DDL transaction " << txn_id << " for table " << table->ToString() - << " is already verified, ignoring"; - return Status::OK(); - } - if (table->is_index()) { - // This is an index. If the indexed table is being deleted or marked for deletion, then skip - // doing anything as the deletion of the table will delete this index. - const auto& indexed_table_id = table->indexed_table_id(); - auto indexed_table = VERIFY_RESULT(FindTableById(indexed_table_id)); - if (table->IsBeingDroppedDueToDdlTxn(pb_txn_id, success) && - indexed_table->IsBeingDroppedDueToDdlTxn(pb_txn_id, success)) { - VLOG(1) << "Skipping DDL transaction verification for index " << table->ToString() - << " as the indexed table " << indexed_table->ToString() - << " is also being dropped"; - return Status::OK(); + + bool ddl_verification_success = true; + for (auto& table : tables) { + if (table->is_index()) { + // This is an index. If the indexed table is being deleted or marked for deletion, then skip + // doing anything as the deletion of the table will delete this index. + const auto& indexed_table_id = table->indexed_table_id(); + auto indexed_table = VERIFY_RESULT(FindTableById(indexed_table_id)); + if (table->IsBeingDroppedDueToDdlTxn(pb_txn_id, is_committed) && + indexed_table->IsBeingDroppedDueToDdlTxn(pb_txn_id, is_committed)) { + LOG(INFO) << "Skipping DDL transaction verification for index " << table->ToString() + << " as the indexed table " << indexed_table->ToString() + << " is also being dropped"; + continue; + } + } + + auto s = background_tasks_thread_pool_->SubmitFunc([this, table, txn, is_committed, epoch]() { + auto s = YsqlDdlTxnCompleteCallbackInternal(table.get(), txn, is_committed, epoch); + if (!s.ok()) { + LOG(WARNING) << "YsqlDdlTxnCompleteCallback failed for table " << table->ToString() + << " txn " << txn << ": " << s.ToString(); + UpdateDdlVerificationState(txn, YsqlDdlVerificationState::kDdlPostProcessingFailed); + } + }); + if (!s.ok()) { + ddl_verification_success = false; } } - return YsqlDdlTxnCompleteCallbackInternal(table.get(), txn_id, success, epoch); + if (!ddl_verification_success) { + UpdateDdlVerificationState(txn, YsqlDdlVerificationState::kDdlPostProcessingFailed); + } + return Status::OK(); +} + +Status CatalogManager::ReportYsqlDdlTxnStatus( + const ReportYsqlDdlTxnStatusRequestPB* req, ReportYsqlDdlTxnStatusResponsePB* resp, + rpc::RpcContext* rpc, const LeaderEpoch& epoch) { + DCHECK(req); + const auto& req_txn = req->transaction_id(); + SCHECK(!req_txn.empty(), IllegalState, + "Received ReportYsqlDdlTxnStatus request without transaction id"); + return YsqlDdlTxnCompleteCallback(req_txn, req->is_committed(), epoch); } +struct YsqlTableDdlTxnState { + TableInfo* table; + TableInfo::WriteLock& write_lock; + LeaderEpoch epoch; + TransactionId ddl_txn_id; +}; + Status CatalogManager::YsqlDdlTxnCompleteCallbackInternal( TableInfo* table, const TransactionId& txn_id, bool success, const LeaderEpoch& epoch) { @@ -196,8 +261,8 @@ Status CatalogManager::YsqlDdlTxnCompleteCallbackInternal( auto l = table->LockForWrite(); if (!VERIFY_RESULT(l->is_being_modified_by_ddl_transaction(txn_id))) { // Transaction verification completed for this table. - LOG(INFO) << "Verification of transaction " << txn_id << " for " << id - << " is already complete, ignoring"; + VLOG(3) << "Verification of transaction " << txn_id << " for " << id + << " is already complete, ignoring"; return Status::OK(); } LOG(INFO) << "YsqlDdlTxnCompleteCallback for " << id @@ -212,31 +277,39 @@ Status CatalogManager::YsqlDdlTxnCompleteCallbackInternal( "Unexpected table state ($0), abandoning DDL rollback for $1", SysTablesEntryPB_State_Name(metadata.state()), table->ToString()); + auto txn_data = YsqlTableDdlTxnState { + .table = table, + .write_lock = l, + .epoch = epoch, + .ddl_txn_id = txn_id + }; + if (success) { - RETURN_NOT_OK(HandleSuccessfulYsqlDdlTxn(table, &l, epoch)); + RETURN_NOT_OK(HandleSuccessfulYsqlDdlTxn(txn_data)); } else { - RETURN_NOT_OK(HandleAbortedYsqlDdlTxn(table, &l, epoch)); + RETURN_NOT_OK(HandleAbortedYsqlDdlTxn(txn_data)); } return Status::OK(); } Status CatalogManager::HandleSuccessfulYsqlDdlTxn( - TableInfo* const table, TableInfo::WriteLock* l, const LeaderEpoch& epoch) { + const YsqlTableDdlTxnState txn_data) { // The only DDL operations that roll-forward (i.e. take complete effect after commit) are DROP // TABLE and DROP COLUMN. - if ((*l)->is_being_deleted_by_ysql_ddl_txn()) { - return YsqlDdlTxnDropTableHelper(table, l, epoch); + auto& l = txn_data.write_lock; + if (l->is_being_deleted_by_ysql_ddl_txn()) { + return YsqlDdlTxnDropTableHelper(txn_data); } vector cols_being_dropped; - auto& mutable_pb = l->mutable_data()->pb; + auto& mutable_pb = l.mutable_data()->pb; for (const auto& col : mutable_pb.schema().columns()) { if (col.marked_for_deletion()) { cols_being_dropped.push_back(col.name()); } } if (cols_being_dropped.empty()) { - return ClearYsqlDdlTxnState(table, l, epoch); + return ClearYsqlDdlTxnState(txn_data); } Schema current_schema; RETURN_NOT_OK(SchemaFromPB(mutable_pb.schema(), ¤t_schema)); @@ -247,61 +320,57 @@ Status CatalogManager::HandleSuccessfulYsqlDdlTxn( RETURN_NOT_OK(builder.RemoveColumn(col)); ddl_log_entries.emplace_back( master_->clock()->Now(), - table->id(), + txn_data.table->id(), mutable_pb, Format("Drop column $0", col)); } SchemaToPB(builder.Build(), mutable_pb.mutable_schema()); - return YsqlDdlTxnAlterTableHelper(table, l, ddl_log_entries, "" /* new_table_name */, epoch); + return YsqlDdlTxnAlterTableHelper(txn_data, ddl_log_entries, "" /* new_table_name */); } -Status CatalogManager::HandleAbortedYsqlDdlTxn(TableInfo *const table, - TableInfo::WriteLock* l, - const LeaderEpoch& epoch) { - auto& mutable_pb = l->mutable_data()->pb; +Status CatalogManager::HandleAbortedYsqlDdlTxn(const YsqlTableDdlTxnState txn_data) { + auto& mutable_pb = txn_data.write_lock.mutable_data()->pb; const auto& ddl_state = mutable_pb.ysql_ddl_txn_verifier_state(0); if (ddl_state.contains_create_table_op()) { // This table was created in this aborted transaction. Drop this table. - return YsqlDdlTxnDropTableHelper(table, l, epoch); + return YsqlDdlTxnDropTableHelper(txn_data); } if (ddl_state.contains_alter_table_op()) { - LOG(INFO) << "Alter transaction on " << table->id() - << " failed, rolling back its schema changes"; std::vector ddl_log_entries; ddl_log_entries.emplace_back( master_->clock()->Now(), - table->id(), + txn_data.table->id(), mutable_pb, "Rollback of DDL Transaction"); mutable_pb.mutable_schema()->CopyFrom(ddl_state.previous_schema()); const string new_table_name = ddl_state.previous_table_name(); mutable_pb.set_name(new_table_name); - return YsqlDdlTxnAlterTableHelper(table, l, ddl_log_entries, new_table_name, epoch); + return YsqlDdlTxnAlterTableHelper(txn_data, ddl_log_entries, new_table_name); } // This must be a failed Delete transaction. DCHECK(ddl_state.contains_drop_table_op()); - return ClearYsqlDdlTxnState(table, l, epoch); + return ClearYsqlDdlTxnState(txn_data); } -Status CatalogManager::ClearYsqlDdlTxnState( - TableInfo* table, TableInfo::WriteLock* l, const LeaderEpoch& epoch) { - auto& pb = l->mutable_data()->pb; - LOG(INFO) << "Clearing ysql_ddl_txn_verifier_state from table " << table->id(); +Status CatalogManager::ClearYsqlDdlTxnState(const YsqlTableDdlTxnState txn_data) { + auto& pb = txn_data.write_lock.mutable_data()->pb; + VLOG(3) << "Clearing ysql_ddl_txn_verifier_state from table " << txn_data.table->id(); pb.clear_ysql_ddl_txn_verifier_state(); pb.clear_transaction(); - RETURN_NOT_OK(sys_catalog_->Upsert(epoch, table)); - l->Commit(); + + RETURN_NOT_OK(sys_catalog_->Upsert(txn_data.epoch, txn_data.table)); + txn_data.write_lock.Commit(); + RemoveDdlTransactionState(txn_data.table->id(), {txn_data.ddl_txn_id}); return Status::OK(); } -Status CatalogManager::YsqlDdlTxnAlterTableHelper(TableInfo *table, - TableInfo::WriteLock* l, +Status CatalogManager::YsqlDdlTxnAlterTableHelper(const YsqlTableDdlTxnState txn_data, const std::vector& ddl_log_entries, - const string& new_table_name, - const LeaderEpoch& epoch) { - auto& table_pb = l->mutable_data()->pb; - table_pb.set_version(table_pb.version() + 1); + const string& new_table_name) { + auto& table_pb = txn_data.write_lock.mutable_data()->pb; + const int target_schema_version = table_pb.version() + 1; + table_pb.set_version(target_schema_version); table_pb.set_updates_only_index_permissions(false); table_pb.set_state(SysTablesEntryPB::ALTERING); table_pb.set_state_msg( @@ -312,32 +381,36 @@ Status CatalogManager::YsqlDdlTxnAlterTableHelper(TableInfo *table, // Update sys-catalog with the new table schema. RETURN_NOT_OK(UpdateSysCatalogWithNewSchema( - table, + txn_data.table, ddl_log_entries, "" /* new_namespace_id */, new_table_name, - epoch, + txn_data.epoch, nullptr /* resp */)); - l->Commit(); + txn_data.write_lock.Commit(); + + // Enqueue this transaction to be notified when the alter operation is updated. + auto table = txn_data.table; + table->AddDdlTxnWaitingForSchemaVersion(target_schema_version, txn_data.ddl_txn_id); + LOG(INFO) << "Sending Alter Table request as part of rollback for table " << table->name(); - return SendAlterTableRequestInternal(table, TransactionId::Nil(), epoch); + return SendAlterTableRequestInternal(table, TransactionId::Nil(), txn_data.epoch); } -Status CatalogManager::YsqlDdlTxnDropTableHelper( - TableInfo* table, TableInfo::WriteLock* l, const LeaderEpoch& epoch) { - LOG(INFO) << "Dropping " << table->ToString(); - l->Commit(); +Status CatalogManager::YsqlDdlTxnDropTableHelper(const YsqlTableDdlTxnState txn_data) { + auto table = txn_data.table; + txn_data.write_lock.Commit(); DeleteTableRequestPB dtreq; DeleteTableResponsePB dtresp; dtreq.mutable_table()->set_table_name(table->name()); dtreq.mutable_table()->set_table_id(table->id()); dtreq.set_is_index_table(table->is_index()); - return DeleteTableInternal(&dtreq, &dtresp, nullptr /* rpc */, epoch); + return DeleteTableInternal(&dtreq, &dtresp, nullptr /* rpc */, txn_data.epoch); } Status CatalogManager::WaitForDdlVerificationToFinish( - const scoped_refptr& table, const string& pb_txn_id) { + const TableInfoPtr& table, const string& pb_txn_id) { auto is_ddl_in_progress = [&] { TRACE("Locking table"); @@ -365,7 +438,7 @@ Status CatalogManager::WaitForDdlVerificationToFinish( RetryFunc(CoarseMonoClock::Now() + std::chrono::milliseconds(FLAGS_wait_for_ysql_ddl_verification_timeout_ms), "Waiting for ddl transaction", - Format("Table is undergoing DDL transaction verification: $0", table->ToString()), + Format("Table is undergoing DDL transaction verification: $0", table->ToStringWithState()), [&](CoarseTimePoint deadline, bool *ddl_verification_in_progress) -> Status { *ddl_verification_in_progress = is_ddl_in_progress(); return Status::OK(); @@ -377,10 +450,125 @@ Status CatalogManager::WaitForDdlVerificationToFinish( // retry more often), instead return a TryAgain error. if (s.IsTimedOut()) { return STATUS_FORMAT(TryAgain, "Table is undergoing DDL transaction verification: $0", - table->ToString()); + table->ToStringWithState()); } return s; } +Status CatalogManager::IsYsqlDdlVerificationDone( + const IsYsqlDdlVerificationDoneRequestPB* req, IsYsqlDdlVerificationDoneResponsePB* resp, + rpc::RpcContext* rpc, const LeaderEpoch& epoch) { + + if (GetAtomicFlag(&FLAGS_TEST_hang_on_ddl_verification_progress)) { + TEST_SYNC_POINT("YsqlDdlHandler::IsYsqlDdlVerificationDone:Fail"); + } + + auto txn = VERIFY_RESULT(TransactionMetadata::FromPB(req->transaction())); + const auto& req_txn = txn.transaction_id; + SCHECK(!FLAGS_TEST_pause_ddl_rollback, InvalidArgument, + "DDL Rollback is paused for txn $0", txn.transaction_id); + + bool is_done = false; + { + LockGuard lock(ddl_txn_verifier_mutex_); + is_done = !ysql_ddl_txn_verfication_state_map_.contains(req_txn); + } + resp->set_done(is_done); + VLOG(1) << "Received IsYsqlDdlVerificationDone request for transaction " << req_txn + << " responding with " << (is_done ? "true" : "false"); + WARN_NOT_OK(TriggerDdlVerificationIfNeeded(txn, epoch), + Format("Failed to re-trigger DDL verification for transaction $0", req_txn)); + return Status::OK(); +} + +void CatalogManager::UpdateDdlVerificationState(const TransactionId& txn, + YsqlDdlVerificationState state) { + LockGuard lock(ddl_txn_verifier_mutex_); + auto verifier_state = FindOrNull(ysql_ddl_txn_verfication_state_map_, txn); + if (verifier_state) { + LOG(INFO) << "Updating the verification state for " << txn << " to " << state; + verifier_state->state = state; + } +} + +void CatalogManager::RemoveDdlTransactionState( + const TableId& table_id, const std::vector& txn_ids) { + if (txn_ids.size() == 0) { + return; + } + LockGuard lock(ddl_txn_verifier_mutex_); + for (const auto& txn_id : txn_ids) { + auto iter = ysql_ddl_txn_verfication_state_map_.find(txn_id); + if (iter == ysql_ddl_txn_verfication_state_map_.end()) { + continue; + } + LOG(INFO) << "Removing " << table_id << " from DDL Verification state for " << txn_id; + auto& tables = iter->second.tables; + const auto num_tables = std::erase_if(tables, + [&table_id](const TableInfoPtr& table) { + return table->id() == table_id; + }); + DCHECK_LE(num_tables, 1); + if (tables.empty()) { + LOG(INFO) << "Erasing DDL Verification state for " << txn_id; + ysql_ddl_txn_verfication_state_map_.erase(iter); + } + } +} + +Status CatalogManager::TriggerDdlVerificationIfNeeded( + const TransactionMetadata& txn, const LeaderEpoch& epoch) { + if (FLAGS_TEST_disable_ysql_ddl_txn_verification) { + LOG(INFO) << "Skip transaction verification as TEST_disable_ysql_ddl_txn_verification is set"; + return Status::OK(); + } + + TableInfoPtr table; + { + LockGuard lock(ddl_txn_verifier_mutex_); + auto verifier_state = FindOrNull(ysql_ddl_txn_verfication_state_map_, txn.transaction_id); + if (!verifier_state) { + VLOG(3) << "Not triggering Ddl Verification as transaction already completed " << txn; + return Status::OK(); + } + + auto state = verifier_state->state; + if (state != YsqlDdlVerificationState::kDdlPostProcessingFailed) { + VLOG(3) << "Not triggering Ddl Verification as it is in progress " << txn; + return Status::OK(); + } + + table = verifier_state->tables.front(); + if (verifier_state->txn_state != TxnState::kUnknown) { + // We already know whether this transaction is a success or a failure. We don't need to poll + // the transaction coordinator at this point. We can simply invoke post DDL verification + // directly. + const bool is_committed = verifier_state->txn_state == TxnState::kCommitted; + const string pb_txn_id = table->LockForRead()->pb_transaction_id(); + return background_tasks_thread_pool_->SubmitFunc( + [this, pb_txn_id, is_committed, epoch]() { + WARN_NOT_OK(YsqlDdlTxnCompleteCallback(pb_txn_id, is_committed, epoch), + "YsqlDdlTxnCompleteCallback failed"); + } + ); + } + } + + // Schedule transaction verification. + auto l = table->LockForRead(); + LOG(INFO) << "Enqueuing table for DDL transaction Verification: " << table->name() + << " id: " << table->id() << " schema version: " << l->pb.version() + << " for transaction " << txn; + + const string txn_id_pb = l->pb_transaction_id(); + auto when_done = [this, table, txn_id_pb, epoch](Result is_committed) { + WARN_NOT_OK(YsqlTableSchemaChecker(table, txn_id_pb, is_committed, epoch), + "YsqlTableSchemaChecker failed"); + }; + TableSchemaVerificationTask::CreateAndStartTask( + *this, table, txn, std::move(when_done), sys_catalog_.get(), master_->client_future(), + *master_->messenger(), epoch, true /* ddl_atomicity_enabled */); + return Status::OK(); +} } // namespace master } // namespace yb diff --git a/src/yb/master/ysql_transaction_ddl.cc b/src/yb/master/ysql_ddl_verification_task.cc similarity index 54% rename from src/yb/master/ysql_transaction_ddl.cc rename to src/yb/master/ysql_ddl_verification_task.cc index 0f1727bdbfcd..2f6aed97f3d4 100644 --- a/src/yb/master/ysql_transaction_ddl.cc +++ b/src/yb/master/ysql_ddl_verification_task.cc @@ -12,7 +12,7 @@ // #include -#include "yb/master/ysql_transaction_ddl.h" +#include "yb/master/ysql_ddl_verification_task.h" #include "yb/client/transaction_rpc.h" @@ -24,6 +24,7 @@ #include "yb/gutil/casts.h" +#include "yb/master/catalog_manager.h" #include "yb/master/master_util.h" #include "yb/master/sys_catalog.h" @@ -49,6 +50,9 @@ DEFINE_test_flag(bool, skip_transaction_verification, false, "Test only flag to keep the txn metadata in SysTablesEntryPB and skip" " transaction verification on the master"); +DEFINE_test_flag(int32, ysql_ddl_transaction_verification_failure_percentage, 0, + "Inject random failure in checking transaction status for DDL transactions"); + using std::string; using std::vector; @@ -60,25 +64,16 @@ static const char* kTablegroupNameColName = "grpname"; namespace { -bool IsTableModifiedByTransaction(TableInfo* table, - const TransactionMetadata& transaction) { - auto l = table->LockForRead(); - const auto& txn = transaction.transaction_id; - auto result = l->is_being_modified_by_ddl_transaction(txn); - if (!result.ok()) { - LOG(ERROR) << "Failed to parse transaction for table " << table->id() - << " skipping transaction verification"; - return false; - } - if (!result.get()) { - LOG(INFO) << "Verification of DDL transaction " << txn << " already completed for table " - << table->id(); - return false; - } - return true; -} +struct PgColumnFields { + // Order determines the order in which the columns were created. This is equal to the + // 'attnum' field in the pg_attribute table in PG catalog. + int order; + std::string attname; -string PrintPgCols(const vector& pg_cols) { + PgColumnFields(int attnum, std::string name) : order(attnum), attname(name) {} +}; + +string PrintPgCols(const vector& pg_cols) { std::stringstream ss; ss << "{ "; for (const auto& col : pg_cols) { @@ -88,33 +83,81 @@ string PrintPgCols(const vector& pg_cols) { return ss.str(); } -} // namespace +Status PgEntryExistsWithReadTime( + SysCatalogTable* sys_catalog, + const TableId& pg_table_id, + PgOid entry_oid, + boost::optional relfilenode_oid, + const ReadHybridTime& read_time, + bool* result, + HybridTime* read_restart_ht); -YsqlTransactionDdl::~YsqlTransactionDdl() { - // Shutdown any outstanding RPCs. - rpcs_.Shutdown(); +Status PgSchemaCheckerWithReadTime(SysCatalogTable* sys_catalog, + const scoped_refptr& table, + const ReadHybridTime& read_time, + bool* result, + HybridTime* read_restart_ht); + +bool MatchPgDocDBSchemaColumns(const scoped_refptr& table, + const Schema& schema, + const vector& pg_cols); + +Result> ReadPgAttribute(SysCatalogTable* sys_catalog, + scoped_refptr table); + +Status ReadPgAttributeWithReadTime( + SysCatalogTable* sys_catalog, + const scoped_refptr& table, + const ReadHybridTime& read_time, + vector* pg_cols, + HybridTime* read_restart_ht); + +Result> +GetPgCatalogTableScanIterator( + SysCatalogTable* sys_catalog, + const PgTableReadData& read_data, + PgOid oid_value, + const dockv::ReaderProjection& projection, + RequestScope* request_scope) { + // Use Scan to query the given table, filtering by lookup_oid_col. + auto iter = VERIFY_RESULT(read_data.NewUninitializedIterator(projection)); + + PgsqlConditionPB cond; + cond.add_operands()->set_column_id(projection.columns.front().id.rep()); + cond.set_op(QL_OP_EQUAL); + cond.add_operands()->mutable_value()->set_uint32_value(oid_value); + const dockv::KeyEntryValues empty_key_components; + docdb::DocPgsqlScanSpec spec( + read_data.schema(), rocksdb::kDefaultQueryId, empty_key_components, empty_key_components, + &cond, std::nullopt /* hash_code */, std::nullopt /* max_hash_code */); + // Grab a RequestScope to prevent intent clean up, before we Init the iterator. + *request_scope = VERIFY_RESULT(VERIFY_RESULT(sys_catalog->Tablet())->CreateRequestScope()); + RETURN_NOT_OK(iter->Init(spec)); + return iter; } -Result YsqlTransactionDdl::PgEntryExists(const TableId& pg_table_id, - PgOid entry_oid, - boost::optional relfilenode_oid) { +Result PgEntryExists(SysCatalogTable& sys_catalog, + const TableId& pg_table_id, + PgOid entry_oid, + boost::optional relfilenode_oid) { bool result = false; - RETURN_NOT_OK(sys_catalog_->ReadWithRestarts(std::bind( - &YsqlTransactionDdl::PgEntryExistsWithReadTime, this, pg_table_id, entry_oid, relfilenode_oid, + RETURN_NOT_OK(sys_catalog.ReadWithRestarts(std::bind( + &PgEntryExistsWithReadTime, &sys_catalog, pg_table_id, entry_oid, relfilenode_oid, std::placeholders::_1, &result, std::placeholders::_2))); return result; } // Note: "relfilenode_oid" is only used for rewritten tables. For rewritten tables, we need to // check both the oid and the relfilenode columns in pg_class. -Status YsqlTransactionDdl::PgEntryExistsWithReadTime( +Status PgEntryExistsWithReadTime( + SysCatalogTable* sys_catalog, const TableId& pg_table_id, PgOid entry_oid, boost::optional relfilenode_oid, const ReadHybridTime& read_time, bool* result, HybridTime* read_restart_ht) { - auto read_data = VERIFY_RESULT(sys_catalog_->TableReadData(pg_table_id, read_time)); + auto read_data = VERIFY_RESULT(sys_catalog->TableReadData(pg_table_id, read_time)); auto oid_col = VERIFY_RESULT(read_data.ColumnByName("oid")).rep(); ColumnIdRep relfilenode_col = kInvalidColumnId.rep(); @@ -131,7 +174,7 @@ Status YsqlTransactionDdl::PgEntryExistsWithReadTime( RequestScope request_scope; auto iter = VERIFY_RESULT( - GetPgCatalogTableScanIterator(read_data, entry_oid, projection, &request_scope)); + GetPgCatalogTableScanIterator(sys_catalog, read_data, entry_oid, projection, &request_scope)); // If no rows found, the entry does not exist. qlexpr::QLTableRow row; @@ -156,152 +199,19 @@ Status YsqlTransactionDdl::PgEntryExistsWithReadTime( return Status::OK(); } -Result> -YsqlTransactionDdl::GetPgCatalogTableScanIterator( - const PgTableReadData& read_data, - PgOid oid_value, - const dockv::ReaderProjection& projection, - RequestScope* request_scope) { - // Use Scan to query the given table, filtering by lookup_oid_col. - auto iter = VERIFY_RESULT(read_data.NewUninitializedIterator(projection)); - - PgsqlConditionPB cond; - cond.add_operands()->set_column_id(projection.columns.front().id.rep()); - cond.set_op(QL_OP_EQUAL); - cond.add_operands()->mutable_value()->set_uint32_value(oid_value); - const dockv::KeyEntryValues empty_key_components; - docdb::DocPgsqlScanSpec spec( - read_data.schema(), rocksdb::kDefaultQueryId, empty_key_components, empty_key_components, - &cond, std::nullopt /* hash_code */, std::nullopt /* max_hash_code */); - // Grab a RequestScope to prevent intent clean up, before we Init the iterator. - *request_scope = VERIFY_RESULT(VERIFY_RESULT(sys_catalog_->Tablet())->CreateRequestScope()); - RETURN_NOT_OK(iter->Init(spec)); - return iter; -} - -void YsqlTransactionDdl::VerifyTransaction( - const TransactionMetadata& transaction_metadata, - scoped_refptr table, - bool has_ysql_ddl_txn_state, - std::function complete_callback) { - if (FLAGS_TEST_skip_transaction_verification) { - return; - } - - SleepFor(MonoDelta::FromMilliseconds(FLAGS_ysql_transaction_bg_task_wait_ms)); - - if (has_ysql_ddl_txn_state && !IsTableModifiedByTransaction(table.get(), transaction_metadata)) { - // The table no longer has any ddl transaction verification state pertaining to - // 'transaction_metadata'. It was parallelly completed in some other thread, so there is - // nothing to do. - return; - } - - YB_LOG_EVERY_N_SECS(INFO, 1) << "Verifying Transaction " << transaction_metadata; - - tserver::GetTransactionStatusRequestPB req; - req.set_tablet_id(transaction_metadata.status_tablet); - req.add_transaction_id()->assign( - pointer_cast(transaction_metadata.transaction_id.data()), - transaction_metadata.transaction_id.size()); - - auto client = client_future_.get(); - if (!client) { - LOG(WARNING) << "Shutting down. Cannot get GetTransactionStatus: " << transaction_metadata; - return; - } - // Prepare the rpc after checking if it is shutting down in case it returns because of - // client is null and leave the reserved rpc as uninitialized. - auto rpc_handle = rpcs_.Prepare(); - if (rpc_handle == rpcs_.InvalidHandle()) { - LOG(WARNING) << "Shutting down. Cannot send GetTransactionStatus: " << transaction_metadata; - return; - } - // We need to query the TransactionCoordinator here. Can't use TransactionStatusResolver in - // TransactionParticipant since this TransactionMetadata may not have any actual data flushed yet. - *rpc_handle = client::GetTransactionStatus( - TransactionRpcDeadline(), - nullptr /* tablet */, - client, - &req, - [this, rpc_handle, transaction_metadata, table, has_ysql_ddl_txn_state, complete_callback] - (Status status, const tserver::GetTransactionStatusResponsePB& resp) { - auto retained = rpcs_.Unregister(rpc_handle); - TransactionReceived(transaction_metadata, table, has_ysql_ddl_txn_state, - complete_callback, std::move(status), resp); - }); - (**rpc_handle).SendRpc(); -} - -void YsqlTransactionDdl::TransactionReceived( - const TransactionMetadata& transaction, - scoped_refptr table, - bool has_ysql_ddl_txn_state, - std::function complete_callback, - Status txn_status, const tserver::GetTransactionStatusResponsePB& resp) { - if (has_ysql_ddl_txn_state && !IsTableModifiedByTransaction(table.get(), transaction)) { - // The table no longer has any ddl transaction verification state pertaining to - // 'transaction_metadata'. It was parallelly completed in some other thread, so there is - // nothing to do. - return; - } - - if (!txn_status.ok()) { - LOG(WARNING) << "Transaction Status attempt (" << transaction.ToString() - << ") failed with status " << txn_status; - WARN_NOT_OK(thread_pool_->SubmitFunc([complete_callback] () { - WARN_NOT_OK(complete_callback(false /* txn_rpc_success */), "Callback failure"); - }), "Failed to enqueue callback"); - return; - // #5981: Improve failure handling to retry transient errors or recognize transaction complete. - } - if (resp.has_error()) { - const Status s = StatusFromPB(resp.error().status()); - const tserver::TabletServerErrorPB::Code code = resp.error().code(); - LOG(WARNING) << "Transaction Status attempt (" << transaction.ToString() - << ") failed with error code " << tserver::TabletServerErrorPB::Code_Name(code) - << ": " << s; - WARN_NOT_OK(thread_pool_->SubmitFunc([complete_callback] () { - WARN_NOT_OK(complete_callback(false /* txn_rpc_success */), "Callback failure"); - }), "Failed to enqueue callback"); - // #5981: Maybe have the same heuristic as above? - return; - } - YB_LOG_EVERY_N_SECS(INFO, 1) << "Got Response for " << transaction.ToString() - << ", resp: " << resp.ShortDebugString(); - bool is_pending = (resp.status_size() == 0); - for (int i = 0; i < resp.status_size() && !is_pending; ++i) { - // NOTE: COMMITTED state is also "pending" because we need APPLIED. - is_pending = resp.status(i) == TransactionStatus::PENDING || - resp.status(i) == TransactionStatus::COMMITTED; - } - if (is_pending) { - // Re-enqueue if transaction is still pending. - WARN_NOT_OK(thread_pool_->SubmitFunc( - std::bind(&YsqlTransactionDdl::VerifyTransaction, this, - transaction, table, has_ysql_ddl_txn_state, complete_callback)), - "Could not submit VerifyTransaction to thread pool"); - return; - } - // If this transaction isn't pending, then the transaction is in a terminal state. - // Note: We ignore the resp.status() now, because it could be ABORT'd but actually a SUCCESS. - // Determine whether the transaction was a success by comparing with the PG schema. - WARN_NOT_OK(thread_pool_->SubmitFunc([complete_callback] () { - WARN_NOT_OK(complete_callback(true /* txn_rpc_success */), "Callback failure"); - }), "Failed to enqueue callback"); -} - -Result YsqlTransactionDdl::PgSchemaChecker(const scoped_refptr& table) { +Result PgSchemaChecker(SysCatalogTable& sys_catalog, const scoped_refptr& table) { bool result = false; - RETURN_NOT_OK(sys_catalog_->ReadWithRestarts(std::bind( - &YsqlTransactionDdl::PgSchemaCheckerWithReadTime, this, table, std::placeholders::_1, &result, + RETURN_NOT_OK(sys_catalog.ReadWithRestarts( + std::bind(&PgSchemaCheckerWithReadTime, &sys_catalog, table, std::placeholders::_1, &result, std::placeholders::_2))); return result; } -Status YsqlTransactionDdl::PgSchemaCheckerWithReadTime( - const scoped_refptr& table, const ReadHybridTime& read_time, bool* result, - HybridTime* read_restart_ht) { +Status PgSchemaCheckerWithReadTime(SysCatalogTable* sys_catalog, + const scoped_refptr& table, + const ReadHybridTime& read_time, + bool* result, + HybridTime* read_restart_ht) { PgOid oid = kPgInvalidOid; string pg_catalog_table_id, name_col; @@ -321,16 +231,17 @@ Status YsqlTransactionDdl::PgSchemaCheckerWithReadTime( name_col = kTableNameColName; } - auto read_data = VERIFY_RESULT(sys_catalog_->TableReadData(pg_catalog_table_id, read_time)); + auto read_data = VERIFY_RESULT(sys_catalog->TableReadData(pg_catalog_table_id, read_time)); auto oid_col_id = VERIFY_RESULT(read_data.ColumnByName("oid")).rep(); auto relname_col_id = VERIFY_RESULT(read_data.ColumnByName(name_col)).rep(); dockv::ReaderProjection projection; ColumnIdRep relfilenode_col_id = kInvalidColumnId.rep(); - // If this isn't a system table, we need to check if the relfilenode in pg_class matches the // DocDB table ID (as the table may have been rewritten). - bool check_relfilenode = !table->is_system(); + // For colocated parent table are never rewritten so we can skip them. Also, they do not exist in + // YSQL so we cannot check its relfilenode. + bool check_relfilenode = !table->is_system() && !table->IsColocationParentTable(); if (check_relfilenode) { relfilenode_col_id = VERIFY_RESULT(read_data.ColumnByName("relfilenode")).rep(); projection.Init(read_data.schema(), {oid_col_id, relname_col_id, relfilenode_col_id}); @@ -339,8 +250,8 @@ Status YsqlTransactionDdl::PgSchemaCheckerWithReadTime( } RequestScope request_scope; - auto iter = - VERIFY_RESULT(GetPgCatalogTableScanIterator(read_data, oid, projection, &request_scope)); + auto iter = VERIFY_RESULT(GetPgCatalogTableScanIterator( + sys_catalog, read_data, oid, projection, &request_scope)); auto l = table->LockForRead(); if (!l->has_ysql_ddl_txn_verifier_state()) { @@ -410,7 +321,7 @@ Status YsqlTransactionDdl::PgSchemaCheckerWithReadTime( return Status::OK(); } - vector pg_cols = VERIFY_RESULT(ReadPgAttribute(table)); + vector pg_cols = VERIFY_RESULT(ReadPgAttribute(sys_catalog, table)); // In DocDB schema, columns are sorted based on 'order'. sort(pg_cols.begin(), pg_cols.end(), [](const auto& lhs, const auto& rhs) { return lhs.order < rhs.order; @@ -444,10 +355,10 @@ Status YsqlTransactionDdl::PgSchemaCheckerWithReadTime( table->ToString()); } -bool YsqlTransactionDdl::MatchPgDocDBSchemaColumns( +bool MatchPgDocDBSchemaColumns( const scoped_refptr& table, const Schema& schema, - const vector& pg_cols) { + const vector& pg_cols) { const string& fail_msg = "Schema mismatch for table " + table->ToString() + " with schema " + schema.ToString() + " and PG catalog schema " + PrintPgCols(pg_cols); @@ -461,7 +372,8 @@ bool YsqlTransactionDdl::MatchPgDocDBSchemaColumns( size_t pg_idx = 0; for (const auto& col : columns) { // 'ybrowid' is a column present only in DocDB. Skip it. - if (col.name() == "ybrowid" || col.name() == "ybidxbasectid") { + if (col.name() == "ybrowid" || col.name() == "ybidxbasectid" || + col.name() == "ybuniqueidxkeysuffix") { continue; } @@ -502,17 +414,18 @@ bool YsqlTransactionDdl::MatchPgDocDBSchemaColumns( return true; } -Result> -YsqlTransactionDdl::ReadPgAttribute(scoped_refptr table) { +Result> ReadPgAttribute(SysCatalogTable* sys_catalog, + scoped_refptr table) { vector pg_cols; - RETURN_NOT_OK(sys_catalog_->ReadWithRestarts(std::bind( - &YsqlTransactionDdl::ReadPgAttributeWithReadTime, this, table, std::placeholders::_1, + RETURN_NOT_OK(sys_catalog->ReadWithRestarts(std::bind( + &ReadPgAttributeWithReadTime, sys_catalog, table, std::placeholders::_1, &pg_cols, std::placeholders::_2))); return pg_cols; } -Status YsqlTransactionDdl::ReadPgAttributeWithReadTime( - scoped_refptr table, +Status ReadPgAttributeWithReadTime( + SysCatalogTable* sys_catalog, + const scoped_refptr& table, const ReadHybridTime& read_time, vector* pg_cols, HybridTime* read_restart_ht) { @@ -521,7 +434,7 @@ Status YsqlTransactionDdl::ReadPgAttributeWithReadTime( const PgOid database_oid = VERIFY_RESULT(GetPgsqlDatabaseOidByTableId(table->id())); const PgOid table_oid = VERIFY_RESULT(table->GetPgTableOid()); auto read_data = - VERIFY_RESULT(sys_catalog_->TableReadData(database_oid, kPgAttributeTableOid, read_time)); + VERIFY_RESULT(sys_catalog->TableReadData(database_oid, kPgAttributeTableOid, read_time)); const auto attrelid_col_id = VERIFY_RESULT(read_data.ColumnByName("attrelid")).rep(); const auto attname_col_id = VERIFY_RESULT(read_data.ColumnByName("attname")).rep(); const auto atttypid_col_id = VERIFY_RESULT(read_data.ColumnByName("atttypid")).rep(); @@ -532,7 +445,7 @@ Status YsqlTransactionDdl::ReadPgAttributeWithReadTime( RequestScope request_scope; auto iter = VERIFY_RESULT(GetPgCatalogTableScanIterator( - read_data, table_oid, projection, &request_scope)); + sys_catalog, read_data, table_oid, projection, &request_scope)); qlexpr::QLTableRow row; while (VERIFY_RESULT(iter->FetchNext(&row))) { @@ -570,6 +483,290 @@ Status YsqlTransactionDdl::ReadPgAttributeWithReadTime( *read_restart_ht = VERIFY_RESULT(iter->RestartReadHt()); return Status::OK(); } +} // namespace + +PollTransactionStatusBase::~PollTransactionStatusBase() { + // Shutdown any outstanding RPCs. + rpcs_.Shutdown(); +} + +Status PollTransactionStatusBase::VerifyTransaction() { + if (FLAGS_TEST_skip_transaction_verification) { + return Status::OK(); + } + + YB_LOG_EVERY_N_SECS(INFO, 1) << "Verifying Transaction " << transaction_; + + tserver::GetTransactionStatusRequestPB req; + req.set_tablet_id(transaction_.status_tablet); + req.add_transaction_id()->assign( + pointer_cast(transaction_.transaction_id.data()), + transaction_.transaction_id.size()); + + auto client = client_future_.get(); + if (!client) { + return STATUS_FORMAT(IllegalState, + "Shutting down. Cannot send GetTransactionStatus: $0", transaction_); + } + // Prepare the rpc after checking if it is shutting down in case it returns because of + // client is null and leave the reserved rpc as uninitialized. + auto rpc_handle = rpcs_.Prepare(); + if (rpc_handle == rpcs_.InvalidHandle()) { + return STATUS_FORMAT(IllegalState, + "Shutting down. Cannot send GetTransactionStatus: $0", transaction_); + } + // We need to query the TransactionCoordinator here. Can't use TransactionStatusResolver in + // TransactionParticipant since this TransactionMetadata may not have any actual data flushed yet. + *rpc_handle = client::GetTransactionStatus( + TransactionRpcDeadline(), + nullptr /* tablet */, + client, + &req, + [this, rpc_handle] + (Status status, const tserver::GetTransactionStatusResponsePB& resp) { + auto retained = rpcs_.Unregister(rpc_handle); + TransactionReceived(std::move(status), resp); + }); + (**rpc_handle).SendRpc(); + return Status::OK(); +} + +void PollTransactionStatusBase::TransactionReceived( + Status txn_status, const tserver::GetTransactionStatusResponsePB& resp) { + if (FLAGS_TEST_ysql_ddl_transaction_verification_failure_percentage > 0 && + RandomUniformInt(1, 99) <= FLAGS_TEST_ysql_ddl_transaction_verification_failure_percentage) { + LOG(ERROR) << "Injecting failure for transaction, inducing failure to enqueue callback"; + FinishPollTransaction(STATUS_FORMAT(InternalError, "Injected failure")); + return; + } + + if (!txn_status.ok()) { + LOG(WARNING) << "Transaction Status attempt (" << transaction_ + << ") failed with status " << txn_status; + FinishPollTransaction(txn_status); + return; + } + if (resp.has_error()) { + const Status s = StatusFromPB(resp.error().status()); + FinishPollTransaction(STATUS_FORMAT( + InternalError, "Transaction Status attempt failed with error code $0: $1", + tserver::TabletServerErrorPB::Code_Name(resp.error().code()), s)); + return; + } + YB_LOG_EVERY_N_SECS(INFO, 1) << "Got Response for " << transaction_ + << ", resp: " << resp.ShortDebugString(); + bool is_pending = (resp.status_size() == 0); + for (int i = 0; i < resp.status_size() && !is_pending; ++i) { + // NOTE: COMMITTED state is also "pending" because we need APPLIED. + is_pending = resp.status(i) == TransactionStatus::PENDING || + resp.status(i) == TransactionStatus::COMMITTED; + } + if (is_pending) { + TransactionPending(); + return; + } + // If this transaction isn't pending, then the transaction is in a terminal state. + // Note: We ignore the resp.status() now, because it could be ABORT'd but actually a SUCCESS. + // Determine whether the transaction was a success by comparing with the PG schema. + FinishPollTransaction(Status::OK()); +} + +NamespaceVerificationTask::NamespaceVerificationTask( + CatalogManager& catalog_manager, scoped_refptr ns, + const TransactionMetadata& transaction, std::function)> complete_callback, + SysCatalogTable* sys_catalog, std::shared_future client_future, + rpc::Messenger& messenger, const LeaderEpoch& epoch) + : MultiStepNamespaceTaskBase( + catalog_manager, *catalog_manager.AsyncTaskPool(), messenger, *ns, epoch), + PollTransactionStatusBase(transaction, std::move(client_future)), + sys_catalog_(*sys_catalog) { + completion_callback_ = [this, + complete_callback = std::move(complete_callback)](const Status& status) { + if (!status.ok()) { + complete_callback(status); + } else { + complete_callback(entry_exists_); + } + }; +} + +void NamespaceVerificationTask::CreateAndStartTask( + CatalogManager& catalog_manager, + scoped_refptr ns, + const TransactionMetadata& transaction, + std::function)> complete_callback, + SysCatalogTable* sys_catalog, + std::shared_future client_future, + rpc::Messenger& messenger, + const LeaderEpoch& epoch) { + + auto task = std::make_shared(catalog_manager, ns, transaction, + std::move(complete_callback), + sys_catalog, + std::move(client_future), + messenger, epoch); + + task->Start(); +} + +Status NamespaceVerificationTask::FirstStep() { + // Schedule verify transaction with some delay. + this->ScheduleNextStepWithDelay([this] { + return VerifyTransaction(); + }, "VerifyTransaction", MonoDelta::FromMilliseconds(FLAGS_ysql_transaction_bg_task_wait_ms)); + return Status::OK(); +} + +void NamespaceVerificationTask::TransactionPending() { + this->ScheduleNextStep([this] { + return VerifyTransaction(); + }, "VerifyTransaction"); +} + +void NamespaceVerificationTask::FinishPollTransaction(Status status) { + ScheduleNextStep( + std::bind(&NamespaceVerificationTask::CheckNsExists, this, std::move(status)), + "CheckNsExists"); +} + +Status NamespaceVerificationTask::CheckNsExists(Status txn_rpc_status) { + RETURN_NOT_OK(txn_rpc_status); + const auto pg_table_id = GetPgsqlTableId(atoi(kSystemNamespaceId), kPgDatabaseTableOid); + const PgOid database_oid = VERIFY_RESULT(GetPgsqlDatabaseOid(namespace_info_.id())); + entry_exists_ = VERIFY_RESULT( + PgEntryExists(sys_catalog_, pg_table_id, database_oid, boost::none /* relfilenodeoid */)); + + Complete(); + return Status::OK(); +} + +Status NamespaceVerificationTask::ValidateRunnable() { + RETURN_NOT_OK(MultiStepNamespaceTaskBase::ValidateRunnable()); + const auto& l = namespace_info_.LockForRead(); + SCHECK(l->pb.has_transaction(), IllegalState, + "Namespace $0 is not being modified by any transaction", namespace_info_.ToString()); + SCHECK_EQ(l->pb.state(), SysNamespaceEntryPB::RUNNING, Aborted, + Format("Invalid Namespace state ($0), abandoning transaction GC work for $1", + SysNamespaceEntryPB_State_Name(l->pb.state()), namespace_info_.ToString())); + return Status::OK(); +} + +TableSchemaVerificationTask::TableSchemaVerificationTask( + CatalogManager& catalog_manager, scoped_refptr table, + const TransactionMetadata& transaction, std::function)> complete_callback, + SysCatalogTable* sys_catalog, std::shared_future client_future, + rpc::Messenger& messenger, const LeaderEpoch& epoch, bool ddl_atomicity_enabled) + : MultiStepTableTaskBase( + catalog_manager, *catalog_manager.AsyncTaskPool(), messenger, std::move(table), epoch), + PollTransactionStatusBase(transaction, std::move(client_future)), + sys_catalog_(*sys_catalog), + ddl_atomicity_enabled_(ddl_atomicity_enabled) { + completion_callback_ = [this, + complete_callback = std::move(complete_callback)](const Status& status) { + if (!status.ok()) { + complete_callback(status); + } else { + complete_callback(is_committed_); + } + }; +} + +void TableSchemaVerificationTask::CreateAndStartTask( + CatalogManager& catalog_manager, + scoped_refptr table, + const TransactionMetadata& transaction, + std::function)> complete_callback, + SysCatalogTable* sys_catalog, + std::shared_future client_future, + rpc::Messenger& messenger, + const LeaderEpoch& epoch, + bool ddl_atomicity_enabled) { + + auto task = std::make_shared( + catalog_manager, table, transaction, complete_callback, sys_catalog, + std::move(client_future), messenger, epoch, ddl_atomicity_enabled); + + task->Start(); +} + +Status TableSchemaVerificationTask::FirstStep() { + // Schedule verify transaction with some delay. + this->ScheduleNextStepWithDelay([this] { + return VerifyTransaction(); + }, "VerifyTransaction", MonoDelta::FromMilliseconds(FLAGS_ysql_transaction_bg_task_wait_ms)); + return Status::OK(); +} + +void TableSchemaVerificationTask::TransactionPending() { + // Schedule verify transaction with some delay. + this->ScheduleNextStep([this] { + return VerifyTransaction(); + }, "VerifyTransaction"); +} + +Status TableSchemaVerificationTask::ValidateRunnable() { + RETURN_NOT_OK(MultiStepTableTaskBase::ValidateRunnable()); + if (!ddl_atomicity_enabled_) { + return Status::OK(); + } + auto l = table_info_->LockForRead(); + SCHECK(VERIFY_RESULT(l->is_being_modified_by_ddl_transaction(transaction_.transaction_id)), + IllegalState, + "Table $0 is being modified by transaction $1, not $2", table_info_->ToString(), + l->GetCurrentDdlTransactionId(), transaction_.transaction_id); + SCHECK(table_info_->is_running(), IllegalState, "Task $0 failed since table is in state $1", + description(), l->state_name()); + return Status::OK(); +} + +void TableSchemaVerificationTask::FinishPollTransaction(Status status) { + ScheduleNextStep([this, status] { + return ddl_atomicity_enabled_ ? CompareSchema(status) : CheckTableExists(status); + }, "Compare Schema"); +} + +Status TableSchemaVerificationTask::FinishTask(Result is_committed) { + RETURN_NOT_OK(is_committed); + + is_committed_ = *is_committed; + Complete(); + return Status::OK(); +} + +Status TableSchemaVerificationTask::CheckTableExists(Status txn_rpc_success) { + RETURN_NOT_OK(txn_rpc_success); + if (!table_info_->IsColocationParentTable()) { + // Check that pg_class still has an entry for the table. + const PgOid database_oid = VERIFY_RESULT(GetPgsqlDatabaseOidByTableId(table_info_->id())); + const auto pg_class_table_id = GetPgsqlTableId(database_oid, kPgClassTableOid); + + PgOid pg_table_oid = VERIFY_RESULT(table_info_->GetPgTableOid()); + PgOid relfilenode_oid = VERIFY_RESULT(table_info_->GetPgRelfilenodeOid()); + + return FinishTask(PgEntryExists( + sys_catalog_, pg_class_table_id, pg_table_oid, + // If relfilenode_oid is the same as pg table oid, this is isn't a rewritten table and + // we don't need to perform additional checks on the relfilenode column. + relfilenode_oid == pg_table_oid ? boost::none : boost::make_optional(relfilenode_oid))); + } + // The table we have is a dummy parent table, hence not present in YSQL. + // We need to check a tablegroup instead. + const auto tablegroup_id = GetTablegroupIdFromParentTableId(table_info_->id()); + const PgOid database_oid = VERIFY_RESULT(GetPgsqlDatabaseOidByTablegroupId(tablegroup_id)); + const auto pg_yb_tablegroup_table_id = GetPgsqlTableId(database_oid, kPgYbTablegroupTableOid); + const PgOid tablegroup_oid = VERIFY_RESULT(GetPgsqlTablegroupOid(tablegroup_id)); + + return FinishTask(PgEntryExists( + sys_catalog_, pg_yb_tablegroup_table_id, tablegroup_oid, boost::none /* relfilenode_oid */)); +} + +Status TableSchemaVerificationTask::CompareSchema(Status txn_rpc_success) { + RETURN_NOT_OK(txn_rpc_success); + + // If the transaction was a success, we need to compare the schema of the table in PG catalog + // with the schema in DocDB. + return FinishTask(PgSchemaChecker(sys_catalog_, table_info_)); +} } // namespace master } // namespace yb diff --git a/src/yb/master/ysql_ddl_verification_task.h b/src/yb/master/ysql_ddl_verification_task.h new file mode 100644 index 000000000000..152022bd30de --- /dev/null +++ b/src/yb/master/ysql_ddl_verification_task.h @@ -0,0 +1,191 @@ +// Copyright (c) YugaByte, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except +// in compliance with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations +// under the License. +// + +#pragma once + +#include +#include + +#include "yb/client/client_fwd.h" + +#include "yb/common/entity_ids.h" +#include "yb/common/pg_types.h" +#include "yb/common/transaction.h" + +#include "yb/docdb/doc_rowwise_iterator.h" + +#include "yb/master/catalog_entity_info.h" +#include "yb/master/catalog_entity_tasks.h" +#include "yb/master/master_fwd.h" +#include "yb/master/multi_step_monitored_task.h" + +#include "yb/rpc/rpc.h" + +#include "yb/util/status_fwd.h" +#include "yb/util/threadpool.h" + +namespace yb { +namespace tserver { +class GetTransactionStatusResponsePB; +} + +namespace master { +/* + * Currently the metadata for YSQL Tables is stored in both the PG catalog and DocDB schema. + * This class helps maintain consistency between the two schemas. When a DDL transaction fails, + * since the PG catalog is modified using the DocDB transactions framework, the changes to the + * PG catalog are automatically rolled-back. This tasks in this file help perform similar rollback + * for the DocDB schema upon transaction failure. + * When a DDL transaction modifies the DocDB schema, the transaction metadata, and the current + * schema of the table is stored in the SysTablesEntryPB. At this point, future DDL operations on + * this table are blocked until verification is complete. A poller is scheduled to monitor whether + * the transaction is complete. Once the transaction is detected to be complete, it compares the PG + * catalog and the DocDB schema and finds whether the transaction is a success or a failure. + * Based on whether the transaction was a success or failure, the CatalogManager will effect + * rollback or roll-forward of the DocDB schema. + * + * Note that the above protocol introduces eventual consistency between the two types of metadata. + * However this does not affect correctness because of the following two properties: + * 1) When inconsistent, the DocDB schema always has more columns/constraints than PG schema. + * 2) Clients always use the PG schema (which is guaranteed to not return uncommitted state) + * to prepare their read/write requests. + * + * These two properties ensure that we can't have orphaned data or failed integrity checks + * or use DDL entities created by uncommitted transactions. + */ + +// Helper class that encapsulates the logic to poll the transaction status. +class PollTransactionStatusBase { + public: + PollTransactionStatusBase( + const TransactionMetadata& transaction, + std::shared_future client_future) + : transaction_(transaction), + client_future_(std::move(client_future)) {} + + virtual ~PollTransactionStatusBase(); + + protected: + Status VerifyTransaction(); + virtual void TransactionPending() = 0; + virtual void FinishPollTransaction(Status s) = 0; + + TransactionMetadata transaction_; + + private: + void TransactionReceived(Status txn_status, + const tserver::GetTransactionStatusResponsePB& response); + + std::shared_future client_future_; + rpc::Rpcs rpcs_; +}; + +class NamespaceVerificationTask : public MultiStepNamespaceTaskBase, + public PollTransactionStatusBase { + public: + static void CreateAndStartTask( + CatalogManager& catalog_manager, + scoped_refptr ns, + const TransactionMetadata& transaction, + std::function)> complete_callback, + SysCatalogTable* sys_catalog, + std::shared_future client_future, + rpc::Messenger& messenger, + const LeaderEpoch& epoch); + + server::MonitoredTaskType type() const override { + return server::MonitoredTaskType::kNamespaceVerification; + } + + std::string type_name() const override { return "Namespace verification"; } + + std::string description() const override { + return Format("TableSchemaVerificationTask for $0", namespace_info_.ToString()); + }; + + ~NamespaceVerificationTask() = default; + + NamespaceVerificationTask( + CatalogManager& catalog_manager, + scoped_refptr ns, + const TransactionMetadata& transaction, + std::function)> complete_callback, + SysCatalogTable* sys_catalog, + std::shared_future client_future, + rpc::Messenger& messenger, + const LeaderEpoch& epoch); + + private: + Status FirstStep() override; + void TransactionPending() override; + Status ValidateRunnable() override; + void FinishPollTransaction(Status s) override; + Status CheckNsExists(Status status); + + SysCatalogTable& sys_catalog_; + bool entry_exists_ = false; +}; + +class TableSchemaVerificationTask : public MultiStepTableTaskBase, + public PollTransactionStatusBase { + public: + static void CreateAndStartTask( + CatalogManager& catalog_manager, + scoped_refptr table, + const TransactionMetadata& transaction, + std::function)> complete_callback, + SysCatalogTable* sys_catalog, + std::shared_future client_future, + rpc::Messenger& messenger, + const LeaderEpoch& epoch, + bool ddl_atomicity_enabled); + + server::MonitoredTaskType type() const override { + return server::MonitoredTaskType::TableSchemaVerification; + } + + std::string type_name() const override { return "TableSchemaVerificationTask"; } + + std::string description() const override { + return Format("TableSchemaVerificationTask for $0", table_info_->ToString()); + }; + + ~TableSchemaVerificationTask() = default; + + TableSchemaVerificationTask( + CatalogManager& catalog_manager, + scoped_refptr table, + const TransactionMetadata& transaction, + std::function)> complete_callback, + SysCatalogTable* sys_catalog, + std::shared_future client_future, + rpc::Messenger& messenger, + const LeaderEpoch& epoch, + bool ddl_atomicity_enabled); + + private: + Status FirstStep() override; + void TransactionPending() override; + Status ValidateRunnable() override; + Status CheckTableExists(Status s); + Status CompareSchema(Status s); + Status FinishTask(Result is_committed); + void FinishPollTransaction(Status s) override; + + SysCatalogTable& sys_catalog_; + bool ddl_atomicity_enabled_; + bool is_committed_ = false; +}; + +} // namespace master +} // namespace yb diff --git a/src/yb/master/ysql_transaction_ddl.h b/src/yb/master/ysql_transaction_ddl.h deleted file mode 100644 index ce14151eaf04..000000000000 --- a/src/yb/master/ysql_transaction_ddl.h +++ /dev/null @@ -1,149 +0,0 @@ -// Copyright (c) YugaByte, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except -// in compliance with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software distributed under the License -// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express -// or implied. See the License for the specific language governing permissions and limitations -// under the License. -// - -#pragma once - -#include -#include - -#include "yb/client/client_fwd.h" - -#include "yb/common/entity_ids.h" -#include "yb/common/pg_types.h" -#include "yb/common/transaction.h" - -#include "yb/docdb/doc_rowwise_iterator.h" - -#include "yb/master/catalog_entity_info.h" -#include "yb/master/master_fwd.h" - -#include "yb/rpc/rpc.h" - -#include "yb/util/status_fwd.h" -#include "yb/util/threadpool.h" - -namespace yb { -namespace tserver { -class GetTransactionStatusResponsePB; -} - -namespace master { - -/* - * Currently the metadata for YSQL Tables is stored in both the PG catalog and DocDB schema. - * This class helps maintain consistency between the two schemas. When a DDL transaction fails, - * since the PG catalog is modified using the DocDB transactions framework, the changes to the - * PG catalog are automatically rolled-back. This class helps perform similar rollback for the - * DocDB schema upon transaction failure. - * When a DDL transaction modifies the DocDB schema, the transaction metadata, and the current - * schema of the table is stored in the SysTablesEntryPB. At this point, future DDL operations on - * this table are blocked until verification is complete. A poller is scheduled through this - * class. It monitors whether the transaction is complete. Once the transaction is detected to - * be complete, it compares the PG catalog and the DocDB schema and finds whether the transaction - * is a success or a failure. - * Based on whether the transaction was a success or failure, the CatalogManager will effect - * rollback or roll-forward of the DocDB schema. - * - * Note that the above protocol introduces eventual consistency between the two types of metadata. - * However this mostly not affect correctness because of the following two properties: - * 1) When inconsistent, the DocDB schema always has more columns/constraints than PG schema. - * 2) Clients always use the PG schema (which is guaranteed to not return uncommitted state) - * to prepare their read/write requests. - * - * These two properties ensure that we can't have orphaned data or failed integrity checks - * or use DDL entities created by uncommitted transactions. - */ - -class YsqlTransactionDdl { - public: - struct PgColumnFields { - // Order determines the order in which the columns were created. This is equal to the - // 'attnum' field in the pg_attribute table in PG catalog. - int order; - std::string attname; - - PgColumnFields(int attnum, std::string name) : order(attnum), attname(name) {} - }; - - YsqlTransactionDdl( - const SysCatalogTable* sys_catalog, std::shared_future client_future, - ThreadPool* thread_pool) - : sys_catalog_(sys_catalog), client_future_(std::move(client_future)), - thread_pool_(thread_pool) {} - - ~YsqlTransactionDdl(); - - void set_thread_pool(yb::ThreadPool* thread_pool) { - thread_pool_ = thread_pool; - } - - void VerifyTransaction(const TransactionMetadata& transaction, - scoped_refptr table, - bool has_ysql_ddl_txn_state, - std::function complete_callback); - - Result PgEntryExists(const TableId& tableId, - PgOid entry_oid, - boost::optional relfilenode_oid); - Status PgEntryExistsWithReadTime( - const TableId& tableId, - PgOid entry_oid, - boost::optional - relfilenode_oid, - const ReadHybridTime& read_time, - bool* result, - HybridTime* read_restart_ht); - - Result PgSchemaChecker(const scoped_refptr& table); - Status PgSchemaCheckerWithReadTime( - const scoped_refptr& table, - const ReadHybridTime& read_time, - bool* result, - HybridTime* read_restart_ht); - - protected: - void TransactionReceived(const TransactionMetadata& transaction, - scoped_refptr table, - bool has_ysql_ddl_txn_state, - std::function complete_callback, - Status txn_status, - const tserver::GetTransactionStatusResponsePB& response); - - bool MatchPgDocDBSchemaColumns(const scoped_refptr& table, - const Schema& schema, - const std::vector& pg_cols); - - Result> ReadPgAttribute(scoped_refptr table); - Status ReadPgAttributeWithReadTime( - scoped_refptr table, - const ReadHybridTime& read_time, - std::vector* pg_cols, - HybridTime* read_restart_ht); - - // Scan table 'pg_table_id' for all rows that satisfy the SQL filter - // 'WHERE old_col_name = oid_value'. Each returned row contains the columns specified in - // 'col_names'. - Result> GetPgCatalogTableScanIterator( - const PgTableReadData& read_data, - PgOid oid_value, - const dockv::ReaderProjection& projection, - RequestScope* request_scope); - - const SysCatalogTable* sys_catalog_; - std::shared_future client_future_; - ThreadPool* thread_pool_; - rpc::Rpcs rpcs_; -}; - -} // namespace master -} // namespace yb diff --git a/src/yb/server/monitored_task.h b/src/yb/server/monitored_task.h index 0619006cf6e2..1826546e0d4b 100644 --- a/src/yb/server/monitored_task.h +++ b/src/yb/server/monitored_task.h @@ -87,7 +87,9 @@ YB_DEFINE_ENUM(MonitoredTaskType, (kAddTableToXClusterTarget) (kMarkTableAsRunning) (kAddTableToXClusterSource) - (kAddNamespaceToXClusterSource)); + (kAddNamespaceToXClusterSource) + (kNamespaceVerification) + (TableSchemaVerification)); class MonitoredTask : public std::enable_shared_from_this { public: diff --git a/src/yb/tserver/pg_client_session.cc b/src/yb/tserver/pg_client_session.cc index d4522c3c4d83..266d39cb6f26 100644 --- a/src/yb/tserver/pg_client_session.cc +++ b/src/yb/tserver/pg_client_session.cc @@ -80,6 +80,10 @@ DEFINE_RUNTIME_string(ysql_sequence_cache_method, "connection", "Where sequence values are cached for both existing and new sequences. Valid values are " "\"connection\" and \"server\""); +DEFINE_RUNTIME_bool(ysql_ddl_transaction_wait_for_ddl_verification, false, + "If set, DDL transactions will wait for DDL verification to complete before " + "returning to the client. "); + DECLARE_bool(ysql_serializable_isolation_for_ddl_txn); DECLARE_bool(ysql_ddl_rollback_enabled); DECLARE_bool(yb_enable_cdc_consistent_snapshot_streams); @@ -956,7 +960,7 @@ Status PgClientSession::FinishTransaction( } const TransactionMetadata* metadata = nullptr; - if (has_docdb_schema_changes && FLAGS_report_ysql_ddl_txn_status_to_master) { + if (has_docdb_schema_changes) { metadata = VERIFY_RESULT(GetDdlTransactionMetadata(true, context->GetClientDeadline())); LOG_IF(DFATAL, !metadata) << "metadata is required"; } @@ -995,12 +999,28 @@ Status PgClientSession::FinishTransaction( txn_value->Abort(); } - if (metadata) { - // If we failed to report the status of this DDL transaction, we can just log and ignore it, - // as the poller in the YB-Master will figure out the status of this transaction using the - // transaction status tablet and PG catalog. - ERROR_NOT_OK(client().ReportYsqlDdlTxnStatus(*metadata, req.commit()), - "Sending ReportYsqlDdlTxnStatus call failed"); + // If this transaction was DDL that had DocDB syscatalog changes, then the YB-Master may have + // any operations postponed to the end of transaction. Report the status of the transaction and + // wait for the post-processing by YB-Master to end. + if (FLAGS_ysql_ddl_rollback_enabled && has_docdb_schema_changes && metadata) { + if (FLAGS_report_ysql_ddl_txn_status_to_master) { + // If we failed to report the status of this DDL transaction, we can just log and ignore it, + // as the poller in the YB-Master will figure out the status of this transaction using the + // transaction status tablet and PG catalog. + ERROR_NOT_OK(client().ReportYsqlDdlTxnStatus(*metadata, req.commit()), + "Sending ReportYsqlDdlTxnStatus call failed"); + } + + if (FLAGS_ysql_ddl_transaction_wait_for_ddl_verification) { + // Wait for DDL verification to end. This may include actions such as a) removing an added + // column in case of ADD COLUMN abort b) dropping a column marked for deletion in case of + // DROP COLUMN commit. c) removing DELETE marker on a column if DROP COLUMN aborted d) Roll + // back changes to table/column names in case of transaction abort. d) dropping a table in + // case of DROP TABLE commit. All the above actions take place only after the transaction + // is completed. + ERROR_NOT_OK(client().WaitForDdlVerificationToFinish(*metadata), + "WaitForDdlVerificationToFinish call failed"); + } } return Status::OK(); } diff --git a/src/yb/yql/pggate/pg_client.cc b/src/yb/yql/pggate/pg_client.cc index 1fe3c05cb1f2..2e9ac4af97e6 100644 --- a/src/yb/yql/pggate/pg_client.cc +++ b/src/yb/yql/pggate/pg_client.cc @@ -51,6 +51,7 @@ DECLARE_bool(use_node_hostname_for_local_tserver); DECLARE_int32(backfill_index_client_rpc_timeout_ms); DECLARE_int32(yb_client_admin_operation_timeout_sec); +DECLARE_uint32(ddl_verification_timeout_multiplier); DEFINE_UNKNOWN_uint64(pg_client_heartbeat_interval_ms, 10000, "Pg client heartbeat interval in ms."); @@ -473,10 +474,11 @@ class PgClient::Impl : public BigDataFetcher { tserver::PgFinishTransactionRequestPB req; req.set_session_id(session_id_); req.set_commit(commit); + bool has_docdb_schema_changes = false; if (ddl_mode) { ddl_mode->ToPB(req.mutable_ddl_mode()); + has_docdb_schema_changes = ddl_mode->has_docdb_schema_changes; } - tserver::PgFinishTransactionResponsePB resp; if (PREDICT_FALSE(yb_debug_log_docdb_requests)) { @@ -485,7 +487,19 @@ class PgClient::Impl : public BigDataFetcher { (ddl_mode ? " DDL" : "")); } - RETURN_NOT_OK(proxy_->FinishTransaction(req, &resp, PrepareController())); + // If docdb schema changes are present, then this transaction had DDL changes that changed the + // DocDB schema. In this case FinishTransaction has to wait for any post-processing for these + // DDLs to complete. Some examples of such post-processing is rolling back any DocDB schema + // changes in case this transaction was aborted (or) dropping a column/table marked for deletion + // after commit. Increase the deadline in that case for this operation. FinishTransaction waits + // for FLAGS_ddl_verification_timeout_multiplier times the normal timeout for this operation, + // so we have to wait longer than that here. + auto deadline = !has_docdb_schema_changes ? CoarseTimePoint() : + CoarseMonoClock::Now() + + MonoDelta::FromSeconds((FLAGS_ddl_verification_timeout_multiplier + 1) * + FLAGS_yb_client_admin_operation_timeout_sec); + RETURN_NOT_OK(proxy_->FinishTransaction(req, &resp, PrepareController(deadline))); + return ResponseStatus(resp); } diff --git a/src/yb/yql/pgwrapper/pg_ddl_atomicity-test.cc b/src/yb/yql/pgwrapper/pg_ddl_atomicity-test.cc index db3bc8ebc9e7..e57f39e42c87 100644 --- a/src/yb/yql/pgwrapper/pg_ddl_atomicity-test.cc +++ b/src/yb/yql/pgwrapper/pg_ddl_atomicity-test.cc @@ -36,6 +36,7 @@ #include "yb/util/backoff_waiter.h" #include "yb/util/monotime.h" #include "yb/util/string_util.h" +#include "yb/util/sync_point.h" #include "yb/util/test_thread_holder.h" #include "yb/util/timestamp.h" #include "yb/util/tsan_util.h" @@ -45,8 +46,20 @@ #include "yb/yql/pgwrapper/pg_ddl_atomicity_test_base.h" #include "yb/yql/pgwrapper/pg_test_utils.h" +#include "yb/yql/pgwrapper/pg_mini_test_base.h" +#include "yb/master/mini_master.h" +#include "yb/master/master.h" + +using namespace std::literals; using std::string; using std::vector; +using namespace std::literals; + +DECLARE_bool(TEST_hang_on_ddl_verification_progress); +DECLARE_string(allowed_preview_flags_csv); +DECLARE_bool(ysql_ddl_rollback_enabled); +DECLARE_bool(report_ysql_ddl_txn_status_to_master); +DECLARE_bool(ysql_ddl_transaction_wait_for_ddl_verification); namespace yb { namespace pgwrapper { @@ -108,7 +121,7 @@ TEST_F(PgDdlAtomicityTest, TestDatabaseGC) { VerifyNamespaceNotExists(client.get(), test_name); } -TEST_F(PgDdlAtomicityTest, TestCreateDbFailureAndRestartGC) { +TEST_F(PgDdlAtomicityTest, TestCreateDbAndRestartGC) { NamespaceName test_name = "test_pgsql"; auto client = ASSERT_RESULT(cluster_->CreateClient()); auto conn = ASSERT_RESULT(Connect()); @@ -218,6 +231,38 @@ TEST_F( threads.Stop(); } +TEST_F(PgDdlAtomicityTest, FailureRecoveryTestWithAbortedTxn) { + // Make TransactionParticipant::Impl::CheckForAbortedTransactions and TabletLoader::Visit deadlock + // on the mutex. GH issue #15849. + + // Temporarily disable abort cleanup. This flag will be reset when we RestartMaster. + ASSERT_OK(cluster_->SetFlagOnMasters("transactions_poll_check_aborted", "true")); + + auto client = ASSERT_RESULT(cluster_->CreateClient()); + auto conn = ASSERT_RESULT(Connect()); + ASSERT_OK(conn.Execute(CreateTableStmt(kDropTable))); + + // Create an aborted transaction so that TransactionParticipant::Impl::CheckForAbortedTransactions + // has something to do. + ASSERT_OK(conn.TestFailDdl(CreateTableStmt(kCreateTable))); + + // Crash in the middle of a DDL so that TabletLoader::Visit will perform some writes to + // sys_catalog on CatalogManager startup. + ASSERT_OK(cluster_->SetFlagOnMasters("TEST_simulate_crash_after_table_marked_deleting", "true")); + // Set pause rollback flag. + ASSERT_OK(cluster_->SetFlagOnMasters("TEST_pause_ddl_rollback", "true")); + ASSERT_OK(conn.Execute(DropTableStmt(kDropTable))); + + ASSERT_EQ(cluster_->master_daemons().size(), 1); + // Give enough time for CheckForAbortedTransactions to start and get stuck. + cluster_->GetLeaderMaster()->mutable_flags()->push_back( + "--TEST_delay_sys_catalog_reload_secs=10"); + + RestartMaster(); + + VerifyTableNotExists(client.get(), kDatabase, kDropTable, 40); +} + // Class for sanity test. class PgDdlAtomicitySanityTest : public PgDdlAtomicityTest { protected: @@ -226,8 +271,10 @@ class PgDdlAtomicitySanityTest : public PgDdlAtomicityTest { "--allowed_preview_flags_csv=ysql_ddl_rollback_enabled"); options->extra_tserver_flags.push_back("--ysql_ddl_rollback_enabled=true"); options->extra_tserver_flags.push_back("--report_ysql_ddl_txn_status_to_master=true"); + options->extra_tserver_flags.push_back("--ysql_ddl_transaction_wait_for_ddl_verification=true"); // TODO (#19975): Enable read committed isolation options->extra_tserver_flags.push_back("--yb_enable_read_committed_isolation=false"); + options->extra_master_flags.push_back("--vmodule=ysql_ddl_handler=5,ysql_transaction_ddl=5"); } }; @@ -256,6 +303,15 @@ TEST_F(PgDdlAtomicitySanityTest, BasicTest) { ASSERT_OK(VerifyRowsAfterDdlSuccess(&conn, num_rows)); } +TEST_F(PgDdlAtomicitySanityTest, BasicTest1) { + auto conn = ASSERT_RESULT(Connect()); + ASSERT_OK(conn.Execute(CreateTableStmt(kCreateTable))); + ASSERT_OK(conn.TestFailDdl(DropColumnStmt(kCreateTable))); + auto client = ASSERT_RESULT(cluster_->CreateClient()); + // ASSERT_OK(VerifySchema(client.get(), kDatabase, kCreateTable, {"key", "num"})); + ASSERT_OK(VerifySchema(client.get(), kDatabase, kCreateTable, {"key", "value", "num"})); +} + TEST_F(PgDdlAtomicitySanityTest, CreateFailureRollback) { auto conn = ASSERT_RESULT(Connect()); auto client = ASSERT_RESULT(cluster_->CreateClient()); @@ -678,36 +734,6 @@ TEST_F(PgDdlAtomicitySanityTest, YB_DISABLE_TEST(FailureRecoveryTest)) { ASSERT_NOK(conn.Execute(DropTableStmt(kAddCol))); } -TEST_F(PgDdlAtomicityTest, FailureRecoveryTestWithAbortedTxn) { - // Make TransactionParticipant::Impl::CheckForAbortedTransactions and TabletLoader::Visit deadlock - // on the mutex. GH issue #15849. - - // Temporarily disable abort cleanup. This flag will be reset when we RestartMaster. - ASSERT_OK(cluster_->SetFlagOnMasters("transactions_poll_check_aborted", "true")); - - auto client = ASSERT_RESULT(cluster_->CreateClient()); - auto conn = ASSERT_RESULT(Connect()); - ASSERT_OK(conn.Execute(CreateTableStmt(kDropTable))); - - // Create an aborted transaction so that TransactionParticipant::Impl::CheckForAbortedTransactions - // has something to do. - ASSERT_OK(conn.TestFailDdl(CreateTableStmt(kCreateTable))); - - // Crash in the middle of a DDL so that TabletLoader::Visit will perform some writes to - // sys_catalog on CatalogManager startup. - ASSERT_OK(cluster_->SetFlagOnMasters("TEST_simulate_crash_after_table_marked_deleting", "true")); - ASSERT_OK(conn.Execute(DropTableStmt(kDropTable))); - - ASSERT_EQ(cluster_->master_daemons().size(), 1); - // Give enough time for CheckForAbortedTransactions to start and get stuck. - cluster_->GetLeaderMaster()->mutable_flags()->push_back( - "--TEST_delay_sys_catalog_reload_secs=10"); - - RestartMaster(); - - VerifyTableNotExists(client.get(), kDatabase, kDropTable, 40); -} - TEST_F(PgDdlAtomicitySanityTest, AddReplicaIdentityTest) { ASSERT_OK( cluster_->SetFlagOnMasters("allowed_preview_flags_csv", "ysql_yb_enable_replica_identity")); @@ -1258,12 +1284,11 @@ TEST_F(PgDdlAtomicitySnapshotTest, SnapshotTest) { ASSERT_OK(VerifySchema(client.get(), kDatabase, table, {"key", "value", "num"})); } - /* - TODO (deepthi): Uncomment the following code after #14679 is fixed. // Run different failing DDL operations on the tables. - ASSERT_OK(conn.TestFailDdl(RenameTableStmt(add_col_test))); - ASSERT_OK(conn.TestFailDdl(RenameColumnStmt(drop_table_test))); - */ + ASSERT_OK(cluster_->SetFlagOnMasters("TEST_pause_ddl_rollback", "true")); + ASSERT_OK(conn.TestFailDdl(RenameTableStmt(kAddCol))); + ASSERT_OK(conn.TestFailDdl(RenameColumnStmt(kDropTable))); + ASSERT_OK(cluster_->SetFlagOnMasters("TEST_pause_ddl_rollback", "false")); // Restore to before rollback. LOG(INFO) << "Start restoration to timestamp " << hybrid_time_before_rollback; @@ -1337,7 +1362,7 @@ Status PgDdlAtomicitySnapshotTest::ListSnapshotTest(DdlErrorInjection inject_err RETURN_NOT_OK(testListSnapshots(&conn, inject_error, DropTableStmt(kDropTable), snapshot_id, ExpectSuccess::kFalse)); // Wait for the table to be deleted before the next test. - client::VerifyTableNotExists(client.get(), kDatabase, kDropTable, 20); + client::VerifyTableNotExists(client.get(), kDatabase, kDropTable, 40); // Verify that an index marked for deletion causes ListSnapshots to fail. return testListSnapshots(&conn, inject_error, DropIndexStmt(kDropIndex), snapshot_id, @@ -1358,6 +1383,8 @@ class PgLibPqMatviewTest: public PgDdlAtomicitySanityTest { options->extra_tserver_flags.push_back( "--allowed_preview_flags_csv=ysql_ddl_rollback_enabled"); options->extra_tserver_flags.push_back("--ysql_ddl_rollback_enabled=true"); + options->extra_master_flags.push_back("--vmodule=ysql_ddl_handler=3,ysql_transaction_ddl=3"); + options->extra_tserver_flags.push_back("--ysql_ddl_transaction_wait_for_ddl_verification=true"); } protected: void MatviewTest(); @@ -1427,6 +1454,7 @@ TEST_F(PgLibPqMatviewTest, MatviewTestWithoutPgOptimization) { } TEST_F(PgLibPqMatviewTest, MatviewTest) { + ASSERT_OK(cluster_->SetFlagOnTServers("report_ysql_ddl_txn_status_to_master", "true")); MatviewTest(); } @@ -1476,9 +1504,12 @@ class PgLibPqTableRewrite: auto num_tables = VERIFY_RESULT(client->ListTables(kTable)).size(); auto num_indexes = VERIFY_RESULT(client->ListTables(kIndex)).size(); auto num_matviews = VERIFY_RESULT(client->ListTables(kMaterializedView)).size(); + LOG(INFO) << "Number of tables: " << num_tables << ", indexes: " << num_indexes + << ", materialized views: " << num_matviews; return num_tables == 1 && num_indexes == 1 && num_matviews == 1; }, MonoDelta::FromSeconds(60), "Verify that we dropped the stale DocDB tables"); } + const std::string kTable = "test_table"; const std::string kIndex = "test_idx"; const std::string kMaterializedView = "test_mv"; @@ -1557,5 +1588,62 @@ TEST_P(PgLibPqTableRewrite, {"ybrowid", "a_renamed", "b"})); } +class PgDdlAtomicityMiniClusterTest : public PgMiniTestBase { + protected: + void SetUp() override { + ANNOTATE_UNPROTECTED_WRITE(FLAGS_allowed_preview_flags_csv) = "ysql_ddl_rollback_enabled"; + ANNOTATE_UNPROTECTED_WRITE(FLAGS_ysql_ddl_rollback_enabled) = true; + ANNOTATE_UNPROTECTED_WRITE(FLAGS_report_ysql_ddl_txn_status_to_master) = true; + ANNOTATE_UNPROTECTED_WRITE(FLAGS_ysql_ddl_transaction_wait_for_ddl_verification) = true; + pgwrapper::PgMiniTestBase::SetUp(); + } +}; + +TEST_F(PgDdlAtomicityMiniClusterTest, TestWaitForRollbackWithMasterRestart) { + SyncPoint::GetInstance()->LoadDependency( + {{"YsqlDdlHandler::IsYsqlDdlVerificationDone:Fail", + "PgDdlAtomicitySanityTest::TestWaitForRollbackWithMasterRestart:WaitForFail"}}); + SyncPoint::GetInstance()->EnableProcessing(); + + auto conn = ASSERT_RESULT(Connect()); + const auto kDropCol = "drop_col"; + ASSERT_OK(conn.ExecuteFormat("CREATE TABLE $0 (key INT PRIMARY KEY, value TEXT, num real)", + kDropCol)); + + TestThreadHolder thread_holder; + + // Fetch the table id before starting the thread. + auto client = ASSERT_RESULT(cluster_->CreateClient()); + const auto table_id = ASSERT_RESULT(GetTableIdByTableName(client.get(), "yugabyte", kDropCol)); + ANNOTATE_UNPROTECTED_WRITE(FLAGS_TEST_hang_on_ddl_verification_progress) = true; + + // Start the thread to drop the column. + thread_holder.AddThreadFunctor([&stop = thread_holder.stop_flag(), &conn, kDropCol] { + ASSERT_OK(conn.ExecuteFormat("ALTER TABLE $0 DROP COLUMN value", kDropCol)); + }); + + // Wait until the alter operation hits the sync point in ysql_ddl_handler. + TEST_SYNC_POINT("PgDdlAtomicitySanityTest::TestWaitForRollbackWithMasterRestart:WaitForFail"); + + // Restart master to simulate the case where IsYsqlDdlVerificationDone poller in YSQL spans a + // master restart. + ASSERT_OK(RestartMaster()); + + // Allow verification to proceed and wait for thread to finish. + ANNOTATE_UNPROTECTED_WRITE(FLAGS_TEST_hang_on_ddl_verification_progress) = false; + thread_holder.JoinAll(); + + // Verify that alter table was successful. + std::shared_ptr table_info = std::make_shared(); + Synchronizer sync; + ASSERT_OK(client->GetTableSchemaById(table_id, table_info, sync.AsStatusCallback())); + ASSERT_OK(sync.Wait()); + + const auto& columns = table_info->schema.columns(); + ASSERT_EQ(columns.size(), 2); + ASSERT_EQ(columns[0].name(), "key"); + ASSERT_EQ(columns[1].name(), "num"); +} + } // namespace pgwrapper } // namespace yb diff --git a/src/yb/yql/pgwrapper/pg_ddl_atomicity_test_base.h b/src/yb/yql/pgwrapper/pg_ddl_atomicity_test_base.h index 20f75c4174e4..959cabc29280 100644 --- a/src/yb/yql/pgwrapper/pg_ddl_atomicity_test_base.h +++ b/src/yb/yql/pgwrapper/pg_ddl_atomicity_test_base.h @@ -20,6 +20,8 @@ #include "yb/yql/pgwrapper/libpq_utils.h" #include "yb/util/status.h" +using namespace std::literals; + namespace yb::pgwrapper { YB_STRONGLY_TYPED_BOOL(DdlErrorInjection); @@ -226,7 +228,7 @@ class PgDdlAtomicityTestBase : public LibPqTestBase { const std::string kDatabase = "yugabyte"; constexpr static std::string_view kDdlVerificationError = - "Table is undergoing DDL transaction verification"sv; + "TABLE_SCHEMA_CHANGE_IN_PROGRESS"sv; }; } // namespace yb::pgwrapper diff --git a/src/yb/yql/pgwrapper/pg_drop_column_test.cc b/src/yb/yql/pgwrapper/pg_drop_column_test.cc index c594242d8910..f7cd43e0ec72 100644 --- a/src/yb/yql/pgwrapper/pg_drop_column_test.cc +++ b/src/yb/yql/pgwrapper/pg_drop_column_test.cc @@ -26,7 +26,8 @@ class PgDropColumnSanityTest : public LibPqTestBase { options->extra_tserver_flags.push_back( "--allowed_preview_flags_csv=ysql_ddl_rollback_enabled"); options->extra_tserver_flags.push_back("--ysql_ddl_rollback_enabled=true"); - options->extra_tserver_flags.push_back("--report_ysql_ddl_txn_status_to_master=false"); + options->extra_tserver_flags.push_back("--report_ysql_ddl_txn_status_to_master=true"); + options->extra_tserver_flags.push_back("--ysql_ddl_transaction_wait_for_ddl_verification=true"); } public: @@ -63,7 +64,7 @@ void PgDropColumnSanityTest::SetupTables() { ASSERT_OK(conn.ExecuteFormat( "INSERT INTO $0 VALUES (1, 1), (2, 2), (3, 3), (4, 4), (5, 5)", table)); // Disable rollback. - ASSERT_OK(cluster_->SetFlagOnMasters("TEST_disable_ysql_ddl_txn_verification", "true")); + ASSERT_OK(cluster_->SetFlagOnMasters("TEST_pause_ddl_rollback", "true")); // Issue Alter Table Drop column. ASSERT_OK(conn.TestFailDdl(Format("ALTER TABLE $0 DROP COLUMN col_to_test", table))); } @@ -166,7 +167,7 @@ class PgDropReferencingColumnFKTest : public PgDropColumnSanityTest { " (5, 5), (6, 6), (7, 7)", table)); ASSERT_OK(conn.ExecuteFormat("INSERT INTO $0 VALUES (6, 6), (7, 7)", ref_table)); // Disable rollback. - ASSERT_OK(cluster_->SetFlagOnMasters("TEST_disable_ysql_ddl_txn_verification", "true")); + ASSERT_OK(cluster_->SetFlagOnMasters("TEST_pause_ddl_rollback", "true")); // Issue Alter Table Drop column. ASSERT_OK(conn.TestFailDdl(Format("ALTER TABLE $0 DROP COLUMN col", ref_table))); } @@ -205,7 +206,7 @@ class PgDropColumnColocatedTableTest : public PgDropColumnSanityTest { ASSERT_OK(conn.ExecuteFormat("INSERT INTO $0 VALUES (1, 1), (2, 2), (3, 3), (4, 4)," " (5, 5)", table)); // Disable rollback. - ASSERT_OK(cluster_->SetFlagOnMasters("TEST_disable_ysql_ddl_txn_verification", "true")); + ASSERT_OK(cluster_->SetFlagOnMasters("TEST_pause_ddl_rollback", "true")); // Issue Alter Table Drop column. ASSERT_OK(conn.TestFailDdl(Format("ALTER TABLE $0 DROP COLUMN col_to_test", table))); } @@ -237,7 +238,7 @@ class PgDropColumnTablegroupTest : public PgDropColumnSanityTest { ASSERT_OK(conn.ExecuteFormat("INSERT INTO $0 VALUES (1, 1), (2, 2), (3, 3), (4, 4)," " (5, 5)", table)); // Disable rollback. - ASSERT_OK(cluster_->SetFlagOnMasters("TEST_disable_ysql_ddl_txn_verification", "true")); + ASSERT_OK(cluster_->SetFlagOnMasters("TEST_pause_ddl_rollback", "true")); // Issue Alter Table Drop column. ASSERT_OK(conn.TestFailDdl(Format("ALTER TABLE $0 DROP COLUMN col_to_test", table))); } diff --git a/src/yb/yql/pgwrapper/pg_mini-test.cc b/src/yb/yql/pgwrapper/pg_mini-test.cc index 39625c1c05f1..c9b4e2ea4922 100644 --- a/src/yb/yql/pgwrapper/pg_mini-test.cc +++ b/src/yb/yql/pgwrapper/pg_mini-test.cc @@ -1036,21 +1036,22 @@ TEST_F(PgMiniTest, DropDBMarkDeleted) { auto *catalog_manager = &ASSERT_RESULT(cluster_->GetLeaderMiniMaster())->catalog_manager(); PGConn conn = ASSERT_RESULT(Connect()); - ASSERT_FALSE(catalog_manager->AreTablesDeleting()); + ASSERT_FALSE(catalog_manager->AreTablesDeletingOrHiding()); ASSERT_OK(conn.ExecuteFormat("CREATE DATABASE $0", kDatabaseName)); ASSERT_OK(conn.ExecuteFormat("DROP DATABASE $0", kDatabaseName)); // System tables should be deleting then deleted. int num_sleeps = 0; - while (catalog_manager->AreTablesDeleting() && (num_sleeps++ != kMaxNumSleeps)) { + while (catalog_manager->AreTablesDeletingOrHiding() && (num_sleeps++ != kMaxNumSleeps)) { LOG(INFO) << "Tables are deleting..."; std::this_thread::sleep_for(kSleepTime); } - ASSERT_FALSE(catalog_manager->AreTablesDeleting()) << "Tables should have finished deleting"; + ASSERT_FALSE(catalog_manager->AreTablesDeletingOrHiding()) + << "Tables should have finished deleting"; // Make sure that the table deletions are persisted. ASSERT_OK(RestartCluster()); // Refresh stale local variable after RestartSync. catalog_manager = &ASSERT_RESULT(cluster_->GetLeaderMiniMaster())->catalog_manager(); - ASSERT_FALSE(catalog_manager->AreTablesDeleting()); + ASSERT_FALSE(catalog_manager->AreTablesDeletingOrHiding()); } TEST_F(PgMiniTest, DropDBWithTables) { @@ -1078,16 +1079,17 @@ TEST_F(PgMiniTest, DropDBWithTables) { ASSERT_OK(conn.ExecuteFormat("DROP DATABASE $0", kDatabaseName)); // User and system tables should be deleting then deleted. int num_sleeps = 0; - while (catalog_manager->AreTablesDeleting() && (num_sleeps++ != kMaxNumSleeps)) { + while (catalog_manager->AreTablesDeletingOrHiding() && (num_sleeps++ != kMaxNumSleeps)) { LOG(INFO) << "Tables are deleting..."; std::this_thread::sleep_for(kSleepTime); } - ASSERT_FALSE(catalog_manager->AreTablesDeleting()) << "Tables should have finished deleting"; + ASSERT_FALSE(catalog_manager->AreTablesDeletingOrHiding()) + << "Tables should have finished deleting"; // Make sure that the table deletions are persisted. ASSERT_OK(RestartCluster()); catalog_manager = &ASSERT_RESULT(cluster_->GetLeaderMiniMaster())->catalog_manager(); sys_tablet = ASSERT_RESULT(catalog_manager->GetTabletInfo(master::kSysCatalogTabletId)); - ASSERT_FALSE(catalog_manager->AreTablesDeleting()); + ASSERT_FALSE(catalog_manager->AreTablesDeletingOrHiding()); { auto tablet_lock = sys_tablet->LockForWrite(); num_tables_after = tablet_lock->pb.table_ids_size(); diff --git a/src/yb/yql/pgwrapper/pg_mini_test_base.cc b/src/yb/yql/pgwrapper/pg_mini_test_base.cc index 80e5a342a756..e0b6fb2a774e 100644 --- a/src/yb/yql/pgwrapper/pg_mini_test_base.cc +++ b/src/yb/yql/pgwrapper/pg_mini_test_base.cc @@ -17,6 +17,7 @@ #include "yb/gutil/casts.h" +#include "yb/master/master.h" #include "yb/master/mini_master.h" #include "yb/master/sys_catalog_initialization.h" @@ -132,6 +133,13 @@ Status PgMiniTestBase::RestartCluster() { return pg_supervisor_->Start(); } +Status PgMiniTestBase::RestartMaster() { + LOG(INFO) << "Restarting Master"; + auto mini_master_ = VERIFY_RESULT(cluster_->GetLeaderMiniMaster()); + RETURN_NOT_OK(mini_master_->Restart()); + return mini_master_->master()->WaitUntilCatalogManagerIsLeaderAndReadyForTests(); +} + void PgMiniTestBase::OverrideMiniClusterOptions(MiniClusterOptions* options) {} const std::shared_ptr PgMiniTestBase::PickPgTabletServer( diff --git a/src/yb/yql/pgwrapper/pg_mini_test_base.h b/src/yb/yql/pgwrapper/pg_mini_test_base.h index cd227caa2044..0537ae92e783 100644 --- a/src/yb/yql/pgwrapper/pg_mini_test_base.h +++ b/src/yb/yql/pgwrapper/pg_mini_test_base.h @@ -67,6 +67,8 @@ class PgMiniTestBase : public MiniClusterTestWithClient { Status RestartCluster(); + Status RestartMaster(); + const HostPort& pg_host_port() const { return pg_host_port_; } diff --git a/src/yb/yql/pgwrapper/pg_namespace_master_restart-test.cc b/src/yb/yql/pgwrapper/pg_namespace_master_restart-test.cc index b8a9e506c8f5..cf7fb5c5e691 100644 --- a/src/yb/yql/pgwrapper/pg_namespace_master_restart-test.cc +++ b/src/yb/yql/pgwrapper/pg_namespace_master_restart-test.cc @@ -32,13 +32,6 @@ class PgNamespaceMasterRestartTest : public PgMiniTestBase { void SetUp() override { pgwrapper::PgMiniTestBase::SetUp(); } - - void RestartMaster() { - LOG(INFO) << "Restarting Master"; - auto mini_master_ = ASSERT_RESULT(cluster_->GetLeaderMiniMaster()); - ASSERT_OK(mini_master_->Restart()); - ASSERT_OK(mini_master_->master()->WaitUntilCatalogManagerIsLeaderAndReadyForTests()); - } }; TEST_F(PgNamespaceMasterRestartTest, CreateNamespaceWithDelay) { @@ -64,7 +57,7 @@ TEST_F(PgNamespaceMasterRestartTest, CreateNamespaceWithDelay) { TEST_SYNC_POINT("PgNamespaceMasterRestartTest::CreateNamespaceWithDelay:WaitForFail"); // Restart master - RestartMaster(); + ASSERT_OK(RestartMaster()); // Stop threads thread_holder.JoinAll();