Skip to content

Commit

Permalink
Add failure handling for CREATE DATABASE commands (#7483)
Browse files Browse the repository at this point in the history
In preprocess phase, we save the original database name, replace
dbname field of CreatedbStmt with a temporary name (to let Postgres
to create the database with the temporary name locally) and then
we insert a cleanup record for the temporary database name on all
nodes **(\*\*)**.

And in postprocess phase, we first rename the temporary database
back to its original name for local node and then return a list of
distributed DDL jobs i) to create the database with the temporary
name and then ii) to rename it back to its original name on other
nodes. That way, if CREATE DATABASE fails on any of the nodes, the
temporary database will be cleaned up by the cleanup records that
we inserted in preprocess phase and in case of a failure, we won't
leak any databases called as the name that user intended to use for
the database.

Solves the problem documented in
#7369
for CREATE DATABASE commands.

**(\*\*):** To ensure that we insert cleanup records on all nodes,
with this PR we also start requiring having the coordinator in the
metadata because otherwise we would skip inserting a cleanup record
for the coordinator.
  • Loading branch information
onurctirtir authored and emelsimsek committed Mar 11, 2024
1 parent 478631d commit e0107d0
Show file tree
Hide file tree
Showing 15 changed files with 1,063 additions and 26 deletions.
129 changes: 120 additions & 9 deletions src/backend/distributed/commands/database.c
Original file line number Diff line number Diff line change
Expand Up @@ -40,15 +40,38 @@
#include "distributed/deparse_shard_query.h"
#include "distributed/deparser.h"
#include "distributed/listutils.h"
#include "distributed/local_executor.h"
#include "distributed/metadata/distobject.h"
#include "distributed/metadata_sync.h"
#include "distributed/metadata_utility.h"
#include "distributed/multi_executor.h"
#include "distributed/relation_access_tracking.h"
#include "distributed/serialize_distributed_ddls.h"
#include "distributed/shard_cleaner.h"
#include "distributed/worker_protocol.h"
#include "distributed/worker_transaction.h"


/*
* Used to save original name of the database before it is replaced with a
* temporary name for failure handling purposes in PreprocessCreateDatabaseStmt().
*/
static char *CreateDatabaseCommandOriginalDbName = NULL;


/*
* The format string used when creating a temporary databases for failure
* handling purposes.
*
* The fields are as follows to ensure using a unique name for each temporary
* database:
* - operationId: The operation id returned by RegisterOperationNeedingCleanup().
* - groupId: The group id of the worker node where CREATE DATABASE command
* is issued from.
*/
#define TEMP_DATABASE_NAME_FMT "citus_temp_database_%lu_%d"


/*
* DatabaseCollationInfo is used to store collation related information of a database.
*/
Expand Down Expand Up @@ -286,8 +309,9 @@ PreprocessAlterDatabaseStmt(Node *node, const char *queryString,
* NontransactionalNodeDDLTask to run the command on the workers outside
* the transaction block.
*/

return NontransactionalNodeDDLTaskList(NON_COORDINATOR_NODES, commands);
bool warnForPartialFailure = true;
return NontransactionalNodeDDLTaskList(NON_COORDINATOR_NODES, commands,
warnForPartialFailure);
}
else
{
Expand Down Expand Up @@ -453,7 +477,12 @@ PreprocessAlterDatabaseSetStmt(Node *node, const char *queryString,
*
* In this stage, we perform validations that we want to ensure before delegating to
* previous utility hooks because it might not be convenient to throw an error in an
* implicit transaction that creates a database.
* implicit transaction that creates a database. Also in this stage, we save the original
* database name and replace dbname field with a temporary name for failure handling
* purposes. We let Postgres create the database with the temporary name, insert a cleanup
* record for the temporary database name on all nodes and let PostprocessCreateDatabaseStmt()
* to return the distributed DDL job that both creates the database with the temporary name
* and then renames it back to its original name.
*
* We also serialize database commands globally by acquiring a Citus specific advisory
* lock based on OCLASS_DATABASE on the first primary worker node.
Expand All @@ -467,22 +496,56 @@ PreprocessCreateDatabaseStmt(Node *node, const char *queryString,
return NIL;
}

EnsurePropagationToCoordinator();
EnsureCoordinatorIsInMetadata();

CreatedbStmt *stmt = castNode(CreatedbStmt, node);
EnsureSupportedCreateDatabaseCommand(stmt);

SerializeDistributedDDLsOnObjectClass(OCLASS_DATABASE);

OperationId operationId = RegisterOperationNeedingCleanup();

char *tempDatabaseName = psprintf(TEMP_DATABASE_NAME_FMT,
operationId, GetLocalGroupId());

List *remoteNodes = TargetWorkerSetNodeList(ALL_SHARD_NODES, RowShareLock);
WorkerNode *remoteNode = NULL;
foreach_ptr(remoteNode, remoteNodes)
{
InsertCleanupRecordOutsideTransaction(
CLEANUP_OBJECT_DATABASE,
pstrdup(quote_identifier(tempDatabaseName)),
remoteNode->groupId,
CLEANUP_ON_FAILURE
);
}

CreateDatabaseCommandOriginalDbName = stmt->dbname;
stmt->dbname = tempDatabaseName;

/*
* Delete cleanup records in the same transaction so that if the current
* transactions fails for some reason, then the cleanup records won't be
* deleted. In the happy path, we will delete the cleanup records without
* deferring them to the background worker.
*/
FinalizeOperationNeedingCleanupOnSuccess("create database");

return NIL;
}


/*
* PostprocessCreateDatabaseStmt is executed after the statement is applied to the local
* postgres instance. In this stage we prepare the commands that need to be run on
* all workers to create the database.
* postgres instance.
*
* In this stage, we first rename the temporary database back to its original name for
* local node and then return a list of distributed DDL jobs to create the database with
* the temporary name and then to rename it back to its original name. That way, if CREATE
* DATABASE fails on any of the nodes, the temporary database will be cleaned up by the
* cleanup records that we inserted in PreprocessCreateDatabaseStmt() and in case of a
* failure, we won't leak any databases called as the name that user intended to use for
* the database.
*/
List *
PostprocessCreateDatabaseStmt(Node *node, const char *queryString)
Expand Down Expand Up @@ -515,9 +578,55 @@ PostprocessCreateDatabaseStmt(Node *node, const char *queryString)
* block, we need to use NontransactionalNodeDDLTaskList() to send the CREATE
* DATABASE statement to the workers.
*/
bool warnForPartialFailure = false;
List *createDatabaseDDLJobList =
NontransactionalNodeDDLTaskList(REMOTE_NODES, createDatabaseCommands);
return createDatabaseDDLJobList;
NontransactionalNodeDDLTaskList(REMOTE_NODES, createDatabaseCommands,
warnForPartialFailure);

CreatedbStmt *stmt = castNode(CreatedbStmt, node);

char *renameDatabaseCommand =
psprintf("ALTER DATABASE %s RENAME TO %s",
quote_identifier(stmt->dbname),
quote_identifier(CreateDatabaseCommandOriginalDbName));

List *renameDatabaseCommands = list_make3(DISABLE_DDL_PROPAGATION,
renameDatabaseCommand,
ENABLE_DDL_PROPAGATION);

/*
* We use NodeDDLTaskList() to send the RENAME DATABASE statement to the
* workers because we want to execute it in a coordinated transaction.
*/
List *renameDatabaseDDLJobList =
NodeDDLTaskList(REMOTE_NODES, renameDatabaseCommands);

/*
* Temporarily disable citus.enable_ddl_propagation before issuing
* rename command locally because we don't want to execute it on remote
* nodes yet. We will execute it on remote nodes by returning it as a
* distributed DDL job.
*
* The reason why we don't want to execute it on remote nodes yet is that
* the database is not created on remote nodes yet.
*/
int saveNestLevel = NewGUCNestLevel();
set_config_option("citus.enable_ddl_propagation", "off",
(superuser() ? PGC_SUSET : PGC_USERSET), PGC_S_SESSION,
GUC_ACTION_LOCAL, true, 0, false);

ExecuteUtilityCommand(renameDatabaseCommand);

AtEOXact_GUC(true, saveNestLevel);

/*
* Restore the original database name because MarkObjectDistributed()
* resolves oid of the object based on the database name and is called
* after executing the distributed DDL job that renames temporary database.
*/
stmt->dbname = CreateDatabaseCommandOriginalDbName;

return list_concat(createDatabaseDDLJobList, renameDatabaseDDLJobList);
}


Expand Down Expand Up @@ -571,8 +680,10 @@ PreprocessDropDatabaseStmt(Node *node, const char *queryString,
* use NontransactionalNodeDDLTaskList() to send the DROP DATABASE statement
* to the workers.
*/
bool warnForPartialFailure = true;
List *dropDatabaseDDLJobList =
NontransactionalNodeDDLTaskList(REMOTE_NODES, dropDatabaseCommands);
NontransactionalNodeDDLTaskList(REMOTE_NODES, dropDatabaseCommands,
warnForPartialFailure);
return dropDatabaseDDLJobList;
}

Expand Down
3 changes: 3 additions & 0 deletions src/backend/distributed/commands/index.c
Original file line number Diff line number Diff line change
Expand Up @@ -493,6 +493,7 @@ GenerateCreateIndexDDLJob(IndexStmt *createIndexStatement, const char *createInd
ddlJob->startNewTransaction = createIndexStatement->concurrent;
ddlJob->metadataSyncCommand = createIndexCommand;
ddlJob->taskList = CreateIndexTaskList(createIndexStatement);
ddlJob->warnForPartialFailure = true;

return ddlJob;
}
Expand Down Expand Up @@ -652,6 +653,7 @@ PreprocessReindexStmt(Node *node, const char *reindexCommand,
"concurrently");
ddlJob->metadataSyncCommand = reindexCommand;
ddlJob->taskList = CreateReindexTaskList(relationId, reindexStatement);
ddlJob->warnForPartialFailure = true;

ddlJobs = list_make1(ddlJob);
}
Expand Down Expand Up @@ -780,6 +782,7 @@ PreprocessDropIndexStmt(Node *node, const char *dropIndexCommand,
ddlJob->metadataSyncCommand = dropIndexCommand;
ddlJob->taskList = DropIndexTaskList(distributedRelationId, distributedIndexId,
dropIndexStatement);
ddlJob->warnForPartialFailure = true;

ddlJobs = list_make1(ddlJob);
}
Expand Down
13 changes: 9 additions & 4 deletions src/backend/distributed/commands/utility_hook.c
Original file line number Diff line number Diff line change
Expand Up @@ -1377,7 +1377,7 @@ ExecuteDistributedDDLJob(DDLJob *ddlJob)
errhint("Use DROP INDEX CONCURRENTLY IF EXISTS to remove the "
"invalid index, then retry the original command.")));
}
else
else if (ddlJob->warnForPartialFailure)
{
ereport(WARNING,
(errmsg(
Expand All @@ -1386,9 +1386,9 @@ ExecuteDistributedDDLJob(DDLJob *ddlJob)
"state.\nIf the problematic command is a CREATE operation, "
"consider using the 'IF EXISTS' syntax to drop the object,"
"\nif applicable, and then re-attempt the original command.")));

PG_RE_THROW();
}

PG_RE_THROW();
}
PG_END_TRY();
}
Expand Down Expand Up @@ -1604,9 +1604,12 @@ DDLTaskList(Oid relationId, const char *commandString)
* NontransactionalNodeDDLTaskList builds a list of tasks to execute a DDL command on a
* given target set of nodes with cannotBeExecutedInTransaction is set to make sure
* that task list is executed outside a transaction block.
*
* Also sets warnForPartialFailure for the returned DDLJobs.
*/
List *
NontransactionalNodeDDLTaskList(TargetWorkerSet targets, List *commands)
NontransactionalNodeDDLTaskList(TargetWorkerSet targets, List *commands,
bool warnForPartialFailure)
{
List *ddlJobs = NodeDDLTaskList(targets, commands);
DDLJob *ddlJob = NULL;
Expand All @@ -1617,6 +1620,8 @@ NontransactionalNodeDDLTaskList(TargetWorkerSet targets, List *commands)
{
task->cannotBeExecutedInTransaction = true;
}

ddlJob->warnForPartialFailure = warnForPartialFailure;
}
return ddlJobs;
}
Expand Down
78 changes: 71 additions & 7 deletions src/backend/distributed/operations/shard_cleaner.c
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,8 @@ static bool TryDropReplicationSlotOutsideTransaction(char *replicationSlotName,
char *nodeName,
int nodePort);
static bool TryDropUserOutsideTransaction(char *username, char *nodeName, int nodePort);
static bool TryDropDatabaseOutsideTransaction(char *databaseName, char *nodeName,
int nodePort);

static CleanupRecord * GetCleanupRecordByNameAndType(char *objectName,
CleanupObject type);
Expand Down Expand Up @@ -141,7 +143,6 @@ Datum
citus_cleanup_orphaned_resources(PG_FUNCTION_ARGS)
{
CheckCitusVersion(ERROR);
EnsureCoordinator();
PreventInTransactionBlock(true, "citus_cleanup_orphaned_resources");

int droppedCount = DropOrphanedResourcesForCleanup();
Expand Down Expand Up @@ -245,12 +246,6 @@ TryDropOrphanedResources()
static int
DropOrphanedResourcesForCleanup()
{
/* Only runs on Coordinator */
if (!IsCoordinator())
{
return 0;
}

List *cleanupRecordList = ListCleanupRecords();

/*
Expand Down Expand Up @@ -608,6 +603,12 @@ TryDropResourceByCleanupRecordOutsideTransaction(CleanupRecord *record,
return TryDropUserOutsideTransaction(record->objectName, nodeName, nodePort);
}

case CLEANUP_OBJECT_DATABASE:
{
return TryDropDatabaseOutsideTransaction(record->objectName, nodeName,
nodePort);
}

default:
{
ereport(WARNING, (errmsg(
Expand Down Expand Up @@ -888,6 +889,69 @@ TryDropUserOutsideTransaction(char *username,
}


/*
* TryDropDatabaseOutsideTransaction drops the database with the given name
* if it exists.
*/
static bool
TryDropDatabaseOutsideTransaction(char *databaseName, char *nodeName, int nodePort)
{
int connectionFlags = (OUTSIDE_TRANSACTION | FORCE_NEW_CONNECTION);
MultiConnection *connection = GetNodeUserDatabaseConnection(connectionFlags,
nodeName, nodePort,
CitusExtensionOwnerName(),
NULL);

if (PQstatus(connection->pgConn) != CONNECTION_OK)
{
return false;
}

/*
* We want to disable DDL propagation and set lock_timeout before issuing
* the DROP DATABASE command but we cannot do so in a way that's scoped
* to the DROP DATABASE command. This is because, we cannot use a
* transaction block for the DROP DATABASE command.
*
* For this reason, to avoid leaking the lock_timeout and DDL propagation
* settings to future commands, we force the connection to close at the end
* of the transaction.
*/
ForceConnectionCloseAtTransactionEnd(connection);

/*
* The DROP DATABASE command should not propagate, so we disable DDL
* propagation.
*/
List *commandList = list_make3(
"SET lock_timeout TO '1s'",
"SET citus.enable_ddl_propagation TO OFF;",
psprintf("DROP DATABASE IF EXISTS %s;", quote_identifier(databaseName))
);

bool executeCommand = true;

const char *commandString = NULL;
foreach_ptr(commandString, commandList)
{
/*
* Cannot use SendOptionalCommandListToWorkerOutsideTransactionWithConnection()
* because we don't want to open a transaction block on remote nodes as DROP
* DATABASE commands cannot be run inside a transaction block.
*/
if (ExecuteOptionalRemoteCommand(connection, commandString, NULL) !=
RESPONSE_OKAY)
{
executeCommand = false;
break;
}
}

CloseConnection(connection);
return executeCommand;
}


/*
* ErrorIfCleanupRecordForShardExists errors out if a cleanup record for the given
* shard name exists.
Expand Down
Loading

0 comments on commit e0107d0

Please sign in to comment.