Skip to content

Commit

Permalink
[#20033] YSQL: DDL commands must wait for rollback/roll-forward opera…
Browse files Browse the repository at this point in the history
…tions

Summary:
Currently DDL Atomicity operations are asynchronous. This means that if DDL Atomicity is enabled, even after a YSQL DDL operation finishes, some DocDB DDL operations pertaining to this transaction may still happen in the background.

Although this poses no correctness issues, it may happen that sometimes new DDL on the same table from the same PG session may fail because DDL rollback/roll-forward for the previous transaction has not finished yet. Otherwise, it may happen that any immediately succeeding DML fails because of the concurrent DocDB DDL happening on the table (example: an ALTER TABLE operation from the asynchronous background task may increment the schema version, causing any DML to fail with SchemaVersionMismatch). This may cause confusion to the user.

To fix this, after committing/aborting the DDL operation, YSQL must wait for any rollback/roll-forward operations to finish before returning control to the user.

Upgrade Safety
* This change adds a new RPC IsYsqlDdlVerificationDone. This is issued by the TServers to the YB-Master to find the
status of the DDL transaction in a best effort manner
* This expectation is that TServers are upgraded after the master. Therefore when the TS sends a request to the master, it should be able to recognize this mesage and respond back. Even in the case that this request fails for any reason, it does not matter as fetching the DDL rollback status is best-effort and does not affect correctness. The TServers simply log error and continue.
If this request fails and the TS is unable to wait for rollback to continue, it is possible for any upcoming DDL or DML on the same table to immediately fail with "DDL verification in progress". This phase is very transitory and should no longer continue once the DDL verification is finished.

**Upgrade/Rollback safety**
All changes are guarded by flag `ysql_ddl_transaction_wait_for_ddl_verification`

Jira: DB-8998

Test Plan:
./yb_build.sh --cxx-test pg_ddl_atomicity-test
Jenkins run through https://phorge.dev.yugabyte.com/D30471
Additionally new stress test with error injection will be implemented in a follow up patch.

Reviewers: hsunder, myang

Reviewed By: hsunder, myang

Subscribers: fizaa, yql, ybase

Differential Revision: https://phorge.dev.yugabyte.com/D30467
  • Loading branch information
hari90 committed Mar 15, 2024
1 parent 4ead4bb commit 588e705
Show file tree
Hide file tree
Showing 31 changed files with 1,512 additions and 752 deletions.
46 changes: 45 additions & 1 deletion src/yb/client/client-internal.cc
Original file line number Diff line number Diff line change
Expand Up @@ -690,7 +690,12 @@ Status YBClient::Data::DeleteTable(YBClient* client,
indexed_table_name->GetFromTableIdentifierPB(resp.indexed_table());
}

LOG(INFO) << "Deleted table " << (!table_id.empty() ? table_id : table_name.ToString());
if (req.ysql_ddl_rollback_enabled()) {
LOG(INFO) << "Marked table " << (!table_id.empty() ? table_id : table_name.ToString())
<< " for deletion at the end of transaction " << txn->ToString();
} else {
LOG(INFO) << "Deleted table " << (!table_id.empty() ? table_id : table_name.ToString());
}
return Status::OK();
}

Expand Down Expand Up @@ -2884,6 +2889,45 @@ Status YBClient::Data::ReportYsqlDdlTxnStatus(
return Status::OK();
}

Status YBClient::Data::IsYsqlDdlVerificationInProgress(
const TransactionMetadata& txn,
CoarseTimePoint deadline,
bool *ddl_verification_in_progress) {
master::IsYsqlDdlVerificationDoneRequestPB req;
master::IsYsqlDdlVerificationDoneResponsePB resp;

txn.ToPB(req.mutable_transaction());
auto s = SyncLeaderMasterRpc(
deadline, req, &resp, "IsYsqlDdlVerificationDone",
&master::MasterDdlProxy::IsYsqlDdlVerificationDoneAsync);

// Check DDL verification is paused. This can happen during error injection tests. If so, no
// point continuing to retry.
if (resp.has_error()) {
if (resp.error().has_status() &&
resp.error().status().has_message() &&
resp.error().status().message().find("DDL Rollback is paused") != std::string::npos) {
*ddl_verification_in_progress = false;
}
return StatusFromPB(resp.error().status());
}

RETURN_NOT_OK(s);

*ddl_verification_in_progress = !resp.done();
return Status::OK();
}

Status YBClient::Data::WaitForDdlVerificationToFinish(
const TransactionMetadata& txn,
CoarseTimePoint deadline) {
return RetryFunc(
deadline,
Format("Waiting on YSQL DDL Verification for $0 to be completed", txn.transaction_id),
Format("Timed out on YSQL DDL Verification for $0 to be completed", txn.transaction_id),
std::bind(&YBClient::Data::IsYsqlDdlVerificationInProgress, this, txn, _1, _2));
}

Result<bool> YBClient::Data::CheckIfPitrActive(CoarseTimePoint deadline) {
CheckIfPitrActiveRequestPB req;
CheckIfPitrActiveResponsePB resp;
Expand Down
7 changes: 7 additions & 0 deletions src/yb/client/client-internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -442,6 +442,13 @@ class YBClient::Data {
Status ReportYsqlDdlTxnStatus(
const TransactionMetadata& txn, bool is_committed, const CoarseTimePoint& deadline);

Status IsYsqlDdlVerificationInProgress(
const TransactionMetadata& txn,
CoarseTimePoint deadline,
bool *ddl_verification_in_progress);

Status WaitForDdlVerificationToFinish(const TransactionMetadata& txn, CoarseTimePoint deadline);

Result<bool> CheckIfPitrActive(CoarseTimePoint deadline);

Status GetXClusterStreams(
Expand Down
3 changes: 2 additions & 1 deletion src/yb/client/client-test-util.cc
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,8 @@ namespace {
const bool exists) {
ASSERT_OK(LoggedWaitFor([&]() -> Result<bool> {
auto ret = client->TableExists(
client::YBTableName(YQL_DATABASE_PGSQL, database_name, table_name));
client::YBTableName(YQL_DATABASE_PGSQL, database_name, table_name),
true /* skip_hidden */);
WARN_NOT_OK(ResultToStatus(ret), "TableExists call failed");
return ret.ok() && ret.get() == exists;
}, MonoDelta::FromSeconds(timeout_secs),
Expand Down
25 changes: 22 additions & 3 deletions src/yb/client/client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,12 @@ DEFINE_NON_RUNTIME_uint32(wait_for_ysql_backends_catalog_version_client_master_r
20000,
"WaitForYsqlBackendsCatalogVersion client-to-master RPC timeout. Specifically, both the "
"postgres-to-tserver and tserver-to-master RPC timeout.");

DEFINE_RUNTIME_uint32(ddl_verification_timeout_multiplier, 5,
"Multiplier for the timeout used for DDL verification. DDL verification may involve waiting for"
" DDL operations to finish at the yb-master. This is a multiplier for"
" default_admin_operation_timeout which is the timeout used for a single DDL operation ");

TAG_FLAG(wait_for_ysql_backends_catalog_version_client_master_rpc_timeout_ms, advanced);

namespace yb {
Expand Down Expand Up @@ -2490,14 +2496,22 @@ Status YBClient::ReportYsqlDdlTxnStatus(const TransactionMetadata& txn, bool is_
return data_->ReportYsqlDdlTxnStatus(txn, is_committed, deadline);
}

Status YBClient::WaitForDdlVerificationToFinish(const TransactionMetadata& txn) {
auto deadline = CoarseMonoClock::Now() +
MonoDelta::FromSeconds(FLAGS_ddl_verification_timeout_multiplier *
default_admin_operation_timeout().ToSeconds());
return data_->WaitForDdlVerificationToFinish(txn, deadline);
}

Result<bool> YBClient::CheckIfPitrActive() {
auto deadline = CoarseMonoClock::Now() + default_rpc_timeout();
return data_->CheckIfPitrActive(deadline);
}

Result<std::vector<YBTableName>> YBClient::ListTables(const std::string& filter,
bool exclude_ysql,
const std::string& ysql_db_filter) {
const std::string& ysql_db_filter,
bool skip_hidden) {
ListTablesRequestPB req;
ListTablesResponsePB resp;

Expand All @@ -2522,6 +2536,9 @@ Result<std::vector<YBTableName>> YBClient::ListTables(const std::string& filter,
if (exclude_ysql && table_info.table_type() == TableType::PGSQL_TABLE_TYPE) {
continue;
}
if (skip_hidden && table_info.hidden()) {
continue;
}
result.emplace_back(master::GetDatabaseTypeForTable(table_info.table_type()),
table_info.namespace_().id(),
table_info.namespace_().name(),
Expand Down Expand Up @@ -2648,8 +2665,10 @@ Result<pair<Schema, uint32_t>> YBClient::GetTableSchemaFromSysCatalog(
return make_pair(current_schema, resp.version());
}

Result<bool> YBClient::TableExists(const YBTableName& table_name) {
for (const YBTableName& table : VERIFY_RESULT(ListTables(table_name.table_name()))) {
Result<bool> YBClient::TableExists(const YBTableName& table_name, bool skip_hidden) {
auto tables = VERIFY_RESULT(ListTables(
table_name.table_name(), /*exclude_ysql=*/false, /*ysql_db_filter=*/"", skip_hidden));
for (const YBTableName& table : tables) {
if (table == table_name) {
return true;
}
Expand Down
11 changes: 7 additions & 4 deletions src/yb/client/client.h
Original file line number Diff line number Diff line change
Expand Up @@ -737,7 +737,8 @@ class YBClient {
Result<std::vector<YBTableName>> ListTables(
const std::string& filter = "",
bool exclude_ysql = false,
const std::string& ysql_db_filter = "");
const std::string& ysql_db_filter = "",
bool skip_hidden = false);

// List tables in a namespace.
//
Expand Down Expand Up @@ -817,9 +818,9 @@ class YBClient {
CoarseTimePoint deadline,
std::vector<std::string>* master_uuids);

// Check if the table given by 'table_name' exists.
// Result value is set only on success.
Result<bool> TableExists(const YBTableName& table_name);
// Check if the table given by 'table_name' exists. 'skip_hidden' indicates whether to consider
// hidden tables. Result value is set only on success.
Result<bool> TableExists(const YBTableName& table_name, bool skip_hidden = false);

Result<bool> IsLoadBalanced(uint32_t num_servers);
Result<bool> IsLoadBalancerIdle();
Expand Down Expand Up @@ -935,6 +936,8 @@ class YBClient {
// Provide the completion status of 'txn' to the YB-Master.
Status ReportYsqlDdlTxnStatus(const TransactionMetadata& txn, bool is_committed);

Status WaitForDdlVerificationToFinish(const TransactionMetadata& txn);

Result<bool> CheckIfPitrActive();

void LookupTabletByKey(const std::shared_ptr<YBTable>& table,
Expand Down
3 changes: 2 additions & 1 deletion src/yb/integration-tests/cdcsdk_test_base.cc
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
using std::string;

DECLARE_bool(ysql_enable_pack_full_row_update);
DECLARE_bool(ysql_ddl_transaction_wait_for_ddl_verification);

namespace yb {
using client::YBClient;
Expand Down Expand Up @@ -142,7 +143,7 @@ Status CDCSDKTestBase::SetUpWithParams(
ANNOTATE_UNPROTECTED_WRITE(FLAGS_max_replication_slots) = 500;
ANNOTATE_UNPROTECTED_WRITE(FLAGS_allowed_preview_flags_csv) = "ysql_ddl_rollback_enabled";
ANNOTATE_UNPROTECTED_WRITE(FLAGS_ysql_ddl_rollback_enabled) = true;

ANNOTATE_UNPROTECTED_WRITE(FLAGS_ysql_ddl_transaction_wait_for_ddl_verification) = true;

MiniClusterOptions opts;
opts.num_masters = num_masters;
Expand Down
2 changes: 1 addition & 1 deletion src/yb/master/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ set(MASTER_SRCS
ysql_tablegroup_manager.cc
ysql_tablespace_manager.cc
ysql_ddl_handler.cc
ysql_transaction_ddl.cc)
ysql_ddl_verification_task.cc)

set(MASTER_DEPS
yb_common
Expand Down
49 changes: 45 additions & 4 deletions src/yb/master/catalog_entity_info.cc
Original file line number Diff line number Diff line change
Expand Up @@ -516,9 +516,9 @@ TableType TableInfo::GetTableType() const {
return LockForRead()->pb.table_type();
}

bool TableInfo::IsBeingDroppedDueToDdlTxn(const std::string& txn_id_pb, bool txn_success) const {
bool TableInfo::IsBeingDroppedDueToDdlTxn(const std::string& pb_txn_id, bool txn_success) const {
auto l = LockForRead();
if (l->pb_transaction_id() != txn_id_pb) {
if (l->pb_transaction_id() != pb_txn_id) {
return false;
}
// The table can be dropped in 2 cases due to a DDL:
Expand Down Expand Up @@ -989,6 +989,36 @@ bool TableInfo::AttachedYCQLIndexDeletionInProgress(const TableId& index_table_i
IndexPermissions::INDEX_PERM_WRITE_AND_DELETE_WHILE_REMOVING;
}

void TableInfo::AddDdlTxnWaitingForSchemaVersion(int schema_version, const TransactionId& txn) {
std::lock_guard l(lock_);
auto res = ddl_txns_waiting_for_schema_version_.emplace(schema_version, txn);
// There should never have existed an entry for this schema version already as only one
// DDL transaction is allowed on an entity at a given time.
LOG_IF(DFATAL, !res.second) << "Found existing entry for schema version " << schema_version
<< " for table " << table_id_ << " with txn " << txn
<< " previous transaction " << res.first->second;
}

std::vector<TransactionId> TableInfo::EraseDdlTxnsWaitingForSchemaVersion(int schema_version) {
std::lock_guard l(lock_);
std::vector<TransactionId> txns;
auto upper_bound_iter = ddl_txns_waiting_for_schema_version_.upper_bound(schema_version);
// Ideally we will perform this erase operation at the end of every alter table operation, and
// thus we should be able to return only one schema version. However, alter table is an async
// operation. It is possible in the case of YSQL DDL Transaction verification that while an
// alter operation for a rollback operation is about to start (i.e. table in ALTERING state), a
// new alter operation can start on the same table (this is by design and poses no correctness
// issues). It may be possible that the TServers respond back with the latest schema version.
// To handle this case, we return the transactions waiting on all schema versions less than the
// requested schema version as well.
for (auto it = ddl_txns_waiting_for_schema_version_.begin(); it != upper_bound_iter; ++it) {
txns.push_back(it->second);
}
ddl_txns_waiting_for_schema_version_.erase(
ddl_txns_waiting_for_schema_version_.begin(), upper_bound_iter);
return txns;
}

void PersistentTableInfo::set_state(SysTablesEntryPB::State state, const string& msg) {
VLOG_WITH_FUNC(2) << "Setting state for " << name() << " to "
<< SysTablesEntryPB::State_Name(state) << " reason: " << msg;
Expand All @@ -1009,8 +1039,19 @@ const std::string& PersistentTableInfo::indexed_table_id() const {

Result<bool> PersistentTableInfo::is_being_modified_by_ddl_transaction(
const TransactionId& txn) const {
return has_ysql_ddl_txn_verifier_state() &&
txn == VERIFY_RESULT(FullyDecodeTransactionId(pb_transaction_id()));

return txn == VERIFY_RESULT(GetCurrentDdlTransactionId());
}

Result<TransactionId> PersistentTableInfo::GetCurrentDdlTransactionId() const {
TransactionId txn = TransactionId::Nil();
if (has_ysql_ddl_txn_verifier_state()) {
auto& pb_txn_id = pb_transaction_id();
RSTATUS_DCHECK(!pb_txn_id.empty(), InternalError,
"Table $0 has ysql_ddl_txn_verifier_state but no transaction", name());
txn = VERIFY_RESULT(FullyDecodeTransactionId(pb_txn_id));
}
return txn;
}

bool IsReplicationInfoSet(const ReplicationInfoPB& replication_info) {
Expand Down
16 changes: 16 additions & 0 deletions src/yb/master/catalog_entity_info.h
Original file line number Diff line number Diff line change
Expand Up @@ -475,6 +475,10 @@ struct PersistentTableInfo : public Persistent<SysTablesEntryPB, SysRowEntryType

Result<bool> is_being_modified_by_ddl_transaction(const TransactionId& txn) const;

// Returns the transaction-id of the DDL transaction operating on it, Nil if no such DDL is
// happening.
Result<TransactionId> GetCurrentDdlTransactionId() const;

const std::string& state_name() const {
return SysTablesEntryPB_State_Name(pb.state());
}
Expand Down Expand Up @@ -737,6 +741,12 @@ class TableInfo : public RefCountedThreadSafe<TableInfo>,

bool AttachedYCQLIndexDeletionInProgress(const TableId& index_table_id) const;

void AddDdlTxnWaitingForSchemaVersion(int schema_version,
const TransactionId& txn) EXCLUDES(lock_);

std::vector<TransactionId> EraseDdlTxnsWaitingForSchemaVersion(
int schema_version) EXCLUDES(lock_);

private:
friend class RefCountedThreadSafe<TableInfo>;
~TableInfo();
Expand Down Expand Up @@ -788,6 +798,12 @@ class TableInfo : public RefCountedThreadSafe<TableInfo>,
// table from completing. Not needed once D23712 lands.
std::atomic_bool bootstrapping_xcluster_replication_ = false;

// Store a map of schema version->TransactionId of the DDL transaction that initiated the
// change. When a schema version n has propagated to all tablets, we use this map to signal all
// the DDL transactions waiting for schema version n and any k < n. The map is ordered by schema
// version to easily figure out all the entries for schema version < n.
std::map<int, TransactionId> ddl_txns_waiting_for_schema_version_ GUARDED_BY(lock_);

DISALLOW_COPY_AND_ASSIGN(TableInfo);
};

Expand Down
47 changes: 26 additions & 21 deletions src/yb/master/catalog_loaders.cc
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@
#include "yb/master/async_rpc_tasks.h"
#include "yb/master/backfill_index.h"
#include "yb/master/master_util.h"
#include "yb/master/ysql_ddl_verification_task.h"
#include "yb/master/ysql_tablegroup_manager.h"
#include "yb/master/ysql_transaction_ddl.h"

#include "yb/util/flags.h"
#include "yb/util/status_format.h"
Expand Down Expand Up @@ -143,27 +143,38 @@ Status TableLoader::Visit(const TableId& table_id, const SysTablesEntryPB& metad
table_lock.Commit();
catalog_manager_->HandleNewTableId(table->id());

// Tables created as part of a Transaction should check transaction status and be deleted
// if the transaction is aborted.
if (metadata.has_transaction()) {
// Tables created/altered as part of a transaction should check transaction status to figure out
// their final state. Tables that have already been deleted can be safely skipped.
if (!table->is_deleted() && metadata.has_transaction()) {
TransactionMetadata txn = VERIFY_RESULT(TransactionMetadata::FromPB(metadata.transaction()));
if (metadata.ysql_ddl_txn_verifier_state_size() > 0) {
state_->AddPostLoadTask(
std::bind(&CatalogManager::ScheduleYsqlTxnVerification,
catalog_manager_, table, txn, state_->epoch),
"Verify DDL transaction for table " + table->ToString());
// This table is undergoing DDL changes. Update the transaction->tables mapping with this
// table being loaded. Post-load, we can verify whether the transaction is a success or
// failure and perform any required cleanup. However for now, add this to in-memory mapping
// right away so that so that catalog manager is aware of this transaction and can correctly
// handle any IsYsqlDdlTransactionInProgress requests coming from YSQL.
const bool new_transaction =
catalog_manager_->CreateOrUpdateDdlTxnVerificationState(table, txn);
// A single DDL transaction may affect multiple tables. We need to create only one
// verification task for a DDL transaction, so schedule a post-load task only if this is
// a new transaction.
if (new_transaction) {
state_->AddPostLoadTask(
std::bind(&CatalogManager::ScheduleVerifyTransaction,
catalog_manager_, table, txn, state_->epoch),
"Verify DDL transaction for table " + table->ToString());
}
} else {
// This is a table/index for which YSQL transaction verification is not supported yet.
// For these, we only support rolling back creating the table. If the transaction has
// completed, merely check for the presence of this entity in the PG catalog.
LOG(INFO) << "Enqueuing table for Transaction Verification: " << table->ToString();
std::function<Status(bool)> when_done =
std::bind(&CatalogManager::VerifyTablePgLayer, catalog_manager_, table, _1, state_->epoch);
state_->AddPostLoadTask(
std::bind(&YsqlTransactionDdl::VerifyTransaction,
catalog_manager_->ysql_transaction_.get(),
txn, table, false /* has_ysql_ddl_txn_state */, when_done),
std::bind(&CatalogManager::ScheduleVerifyTablePgLayer,
catalog_manager_,
txn, table, state_->epoch),
"VerifyTransaction");

}
}

Expand Down Expand Up @@ -462,16 +473,10 @@ Status NamespaceLoader::Visit(const NamespaceId& ns_id, const SysNamespaceEntryP
LOG(INFO) << "Enqueuing keyspace for Transaction Verification: " << ns->ToString();
TransactionMetadata txn =
VERIFY_RESULT(TransactionMetadata::FromPB(metadata.transaction()));
std::function<Status(bool)> when_done = std::bind(
&CatalogManager::VerifyNamespacePgLayer, catalog_manager_, ns, _1, state_->epoch);
state_->AddPostLoadTask(
std::bind(
&YsqlTransactionDdl::VerifyTransaction,
catalog_manager_->ysql_transaction_.get(),
txn,
nullptr /* table */,
false /* has_ysql_ddl_state */,
when_done),
&CatalogManager::ScheduleVerifyNamespacePgLayer,
catalog_manager_, txn, ns, state_->epoch),
"VerifyTransaction");
}
break;
Expand Down
Loading

0 comments on commit 588e705

Please sign in to comment.