Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add failure handling for CREATE DATABASE commands #7483

Merged
merged 18 commits into from
Feb 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
129 changes: 120 additions & 9 deletions src/backend/distributed/commands/database.c
onurctirtir marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -40,15 +40,38 @@
#include "distributed/deparse_shard_query.h"
#include "distributed/deparser.h"
#include "distributed/listutils.h"
#include "distributed/local_executor.h"
#include "distributed/metadata/distobject.h"
#include "distributed/metadata_sync.h"
#include "distributed/metadata_utility.h"
#include "distributed/multi_executor.h"
#include "distributed/relation_access_tracking.h"
#include "distributed/serialize_distributed_ddls.h"
#include "distributed/shard_cleaner.h"
#include "distributed/worker_protocol.h"
#include "distributed/worker_transaction.h"


/*
* Used to save original name of the database before it is replaced with a
* temporary name for failure handling purposes in PreprocessCreateDatabaseStmt().
*/
static char *CreateDatabaseCommandOriginalDbName = NULL;


/*
* The format string used when creating a temporary databases for failure
* handling purposes.
*
* The fields are as follows to ensure using a unique name for each temporary
* database:
* - operationId: The operation id returned by RegisterOperationNeedingCleanup().
* - groupId: The group id of the worker node where CREATE DATABASE command
* is issued from.
*/
#define TEMP_DATABASE_NAME_FMT "citus_temp_database_%lu_%d"


/*
* DatabaseCollationInfo is used to store collation related information of a database.
*/
Expand Down Expand Up @@ -286,8 +309,9 @@
* NontransactionalNodeDDLTask to run the command on the workers outside
* the transaction block.
*/

return NontransactionalNodeDDLTaskList(NON_COORDINATOR_NODES, commands);
bool warnForPartialFailure = true;
return NontransactionalNodeDDLTaskList(NON_COORDINATOR_NODES, commands,

Check warning on line 313 in src/backend/distributed/commands/database.c

View check run for this annotation

Codecov / codecov/patch

src/backend/distributed/commands/database.c#L312-L313

Added lines #L312 - L313 were not covered by tests
warnForPartialFailure);
}
else
{
Expand Down Expand Up @@ -453,7 +477,12 @@
*
* In this stage, we perform validations that we want to ensure before delegating to
* previous utility hooks because it might not be convenient to throw an error in an
* implicit transaction that creates a database.
* implicit transaction that creates a database. Also in this stage, we save the original
* database name and replace dbname field with a temporary name for failure handling
* purposes. We let Postgres create the database with the temporary name, insert a cleanup
* record for the temporary database name on all nodes and let PostprocessCreateDatabaseStmt()
* to return the distributed DDL job that both creates the database with the temporary name
* and then renames it back to its original name.
*
* We also serialize database commands globally by acquiring a Citus specific advisory
* lock based on OCLASS_DATABASE on the first primary worker node.
Expand All @@ -467,22 +496,56 @@
return NIL;
}

EnsurePropagationToCoordinator();
EnsureCoordinatorIsInMetadata();

CreatedbStmt *stmt = castNode(CreatedbStmt, node);
EnsureSupportedCreateDatabaseCommand(stmt);

SerializeDistributedDDLsOnObjectClass(OCLASS_DATABASE);

OperationId operationId = RegisterOperationNeedingCleanup();

char *tempDatabaseName = psprintf(TEMP_DATABASE_NAME_FMT,
operationId, GetLocalGroupId());

List *remoteNodes = TargetWorkerSetNodeList(ALL_SHARD_NODES, RowShareLock);
WorkerNode *remoteNode = NULL;
foreach_ptr(remoteNode, remoteNodes)
{
InsertCleanupRecordOutsideTransaction(
CLEANUP_OBJECT_DATABASE,
pstrdup(quote_identifier(tempDatabaseName)),
remoteNode->groupId,
CLEANUP_ON_FAILURE
);
}

CreateDatabaseCommandOriginalDbName = stmt->dbname;
stmt->dbname = tempDatabaseName;

/*
* Delete cleanup records in the same transaction so that if the current
* transactions fails for some reason, then the cleanup records won't be
* deleted. In the happy path, we will delete the cleanup records without
* deferring them to the background worker.
*/
FinalizeOperationNeedingCleanupOnSuccess("create database");

return NIL;
}


/*
* PostprocessCreateDatabaseStmt is executed after the statement is applied to the local
* postgres instance. In this stage we prepare the commands that need to be run on
* all workers to create the database.
* postgres instance.
*
* In this stage, we first rename the temporary database back to its original name for
* local node and then return a list of distributed DDL jobs to create the database with
* the temporary name and then to rename it back to its original name. That way, if CREATE
* DATABASE fails on any of the nodes, the temporary database will be cleaned up by the
* cleanup records that we inserted in PreprocessCreateDatabaseStmt() and in case of a
* failure, we won't leak any databases called as the name that user intended to use for
* the database.
*/
List *
PostprocessCreateDatabaseStmt(Node *node, const char *queryString)
Expand Down Expand Up @@ -515,9 +578,55 @@
* block, we need to use NontransactionalNodeDDLTaskList() to send the CREATE
* DATABASE statement to the workers.
*/
bool warnForPartialFailure = false;
List *createDatabaseDDLJobList =
NontransactionalNodeDDLTaskList(REMOTE_NODES, createDatabaseCommands);
return createDatabaseDDLJobList;
NontransactionalNodeDDLTaskList(REMOTE_NODES, createDatabaseCommands,
warnForPartialFailure);

CreatedbStmt *stmt = castNode(CreatedbStmt, node);

char *renameDatabaseCommand =
psprintf("ALTER DATABASE %s RENAME TO %s",
quote_identifier(stmt->dbname),
quote_identifier(CreateDatabaseCommandOriginalDbName));

List *renameDatabaseCommands = list_make3(DISABLE_DDL_PROPAGATION,
renameDatabaseCommand,
ENABLE_DDL_PROPAGATION);

/*
* We use NodeDDLTaskList() to send the RENAME DATABASE statement to the
* workers because we want to execute it in a coordinated transaction.
*/
List *renameDatabaseDDLJobList =
NodeDDLTaskList(REMOTE_NODES, renameDatabaseCommands);

/*
* Temporarily disable citus.enable_ddl_propagation before issuing
* rename command locally because we don't want to execute it on remote
* nodes yet. We will execute it on remote nodes by returning it as a
* distributed DDL job.
*
* The reason why we don't want to execute it on remote nodes yet is that
* the database is not created on remote nodes yet.
*/
int saveNestLevel = NewGUCNestLevel();
set_config_option("citus.enable_ddl_propagation", "off",
(superuser() ? PGC_SUSET : PGC_USERSET), PGC_S_SESSION,
GUC_ACTION_LOCAL, true, 0, false);

ExecuteUtilityCommand(renameDatabaseCommand);

AtEOXact_GUC(true, saveNestLevel);

/*
* Restore the original database name because MarkObjectDistributed()
* resolves oid of the object based on the database name and is called
* after executing the distributed DDL job that renames temporary database.
*/
stmt->dbname = CreateDatabaseCommandOriginalDbName;

onurctirtir marked this conversation as resolved.
Show resolved Hide resolved
return list_concat(createDatabaseDDLJobList, renameDatabaseDDLJobList);
}


Expand Down Expand Up @@ -571,8 +680,10 @@
* use NontransactionalNodeDDLTaskList() to send the DROP DATABASE statement
* to the workers.
*/
bool warnForPartialFailure = true;
List *dropDatabaseDDLJobList =
NontransactionalNodeDDLTaskList(REMOTE_NODES, dropDatabaseCommands);
NontransactionalNodeDDLTaskList(REMOTE_NODES, dropDatabaseCommands,
warnForPartialFailure);
return dropDatabaseDDLJobList;
}

Expand Down
3 changes: 3 additions & 0 deletions src/backend/distributed/commands/index.c
Original file line number Diff line number Diff line change
Expand Up @@ -493,6 +493,7 @@ GenerateCreateIndexDDLJob(IndexStmt *createIndexStatement, const char *createInd
ddlJob->startNewTransaction = createIndexStatement->concurrent;
ddlJob->metadataSyncCommand = createIndexCommand;
ddlJob->taskList = CreateIndexTaskList(createIndexStatement);
ddlJob->warnForPartialFailure = true;

return ddlJob;
}
Expand Down Expand Up @@ -652,6 +653,7 @@ PreprocessReindexStmt(Node *node, const char *reindexCommand,
"concurrently");
ddlJob->metadataSyncCommand = reindexCommand;
ddlJob->taskList = CreateReindexTaskList(relationId, reindexStatement);
ddlJob->warnForPartialFailure = true;

ddlJobs = list_make1(ddlJob);
}
Expand Down Expand Up @@ -780,6 +782,7 @@ PreprocessDropIndexStmt(Node *node, const char *dropIndexCommand,
ddlJob->metadataSyncCommand = dropIndexCommand;
ddlJob->taskList = DropIndexTaskList(distributedRelationId, distributedIndexId,
dropIndexStatement);
ddlJob->warnForPartialFailure = true;

ddlJobs = list_make1(ddlJob);
}
Expand Down
13 changes: 9 additions & 4 deletions src/backend/distributed/commands/utility_hook.c
Original file line number Diff line number Diff line change
Expand Up @@ -1377,7 +1377,7 @@ ExecuteDistributedDDLJob(DDLJob *ddlJob)
errhint("Use DROP INDEX CONCURRENTLY IF EXISTS to remove the "
"invalid index, then retry the original command.")));
}
else
else if (ddlJob->warnForPartialFailure)
{
ereport(WARNING,
(errmsg(
Expand All @@ -1386,9 +1386,9 @@ ExecuteDistributedDDLJob(DDLJob *ddlJob)
"state.\nIf the problematic command is a CREATE operation, "
"consider using the 'IF EXISTS' syntax to drop the object,"
"\nif applicable, and then re-attempt the original command.")));

PG_RE_THROW();
}

PG_RE_THROW();
}
PG_END_TRY();
}
Expand Down Expand Up @@ -1604,9 +1604,12 @@ DDLTaskList(Oid relationId, const char *commandString)
* NontransactionalNodeDDLTaskList builds a list of tasks to execute a DDL command on a
* given target set of nodes with cannotBeExecutedInTransaction is set to make sure
* that task list is executed outside a transaction block.
*
* Also sets warnForPartialFailure for the returned DDLJobs.
*/
List *
NontransactionalNodeDDLTaskList(TargetWorkerSet targets, List *commands)
NontransactionalNodeDDLTaskList(TargetWorkerSet targets, List *commands,
bool warnForPartialFailure)
{
List *ddlJobs = NodeDDLTaskList(targets, commands);
DDLJob *ddlJob = NULL;
Expand All @@ -1617,6 +1620,8 @@ NontransactionalNodeDDLTaskList(TargetWorkerSet targets, List *commands)
{
task->cannotBeExecutedInTransaction = true;
}

ddlJob->warnForPartialFailure = warnForPartialFailure;
}
return ddlJobs;
}
Expand Down
78 changes: 71 additions & 7 deletions src/backend/distributed/operations/shard_cleaner.c
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,8 @@ static bool TryDropReplicationSlotOutsideTransaction(char *replicationSlotName,
char *nodeName,
int nodePort);
static bool TryDropUserOutsideTransaction(char *username, char *nodeName, int nodePort);
static bool TryDropDatabaseOutsideTransaction(char *databaseName, char *nodeName,
int nodePort);

static CleanupRecord * GetCleanupRecordByNameAndType(char *objectName,
CleanupObject type);
Expand Down Expand Up @@ -141,7 +143,6 @@ Datum
citus_cleanup_orphaned_resources(PG_FUNCTION_ARGS)
{
CheckCitusVersion(ERROR);
EnsureCoordinator();
PreventInTransactionBlock(true, "citus_cleanup_orphaned_resources");

int droppedCount = DropOrphanedResourcesForCleanup();
Expand Down Expand Up @@ -245,12 +246,6 @@ TryDropOrphanedResources()
static int
DropOrphanedResourcesForCleanup()
{
/* Only runs on Coordinator */
if (!IsCoordinator())
{
return 0;
}

List *cleanupRecordList = ListCleanupRecords();

/*
Expand Down Expand Up @@ -608,6 +603,12 @@ TryDropResourceByCleanupRecordOutsideTransaction(CleanupRecord *record,
return TryDropUserOutsideTransaction(record->objectName, nodeName, nodePort);
}

case CLEANUP_OBJECT_DATABASE:
{
return TryDropDatabaseOutsideTransaction(record->objectName, nodeName,
nodePort);
}

default:
{
ereport(WARNING, (errmsg(
Expand Down Expand Up @@ -888,6 +889,69 @@ TryDropUserOutsideTransaction(char *username,
}


/*
* TryDropDatabaseOutsideTransaction drops the database with the given name
* if it exists.
*/
static bool
TryDropDatabaseOutsideTransaction(char *databaseName, char *nodeName, int nodePort)
{
int connectionFlags = (OUTSIDE_TRANSACTION | FORCE_NEW_CONNECTION);
MultiConnection *connection = GetNodeUserDatabaseConnection(connectionFlags,
nodeName, nodePort,
CitusExtensionOwnerName(),
NULL);

if (PQstatus(connection->pgConn) != CONNECTION_OK)
{
return false;
}

/*
* We want to disable DDL propagation and set lock_timeout before issuing
* the DROP DATABASE command but we cannot do so in a way that's scoped
* to the DROP DATABASE command. This is because, we cannot use a
* transaction block for the DROP DATABASE command.
*
* For this reason, to avoid leaking the lock_timeout and DDL propagation
* settings to future commands, we force the connection to close at the end
* of the transaction.
*/
ForceConnectionCloseAtTransactionEnd(connection);

/*
* The DROP DATABASE command should not propagate, so we disable DDL
* propagation.
*/
List *commandList = list_make3(
"SET lock_timeout TO '1s'",
"SET citus.enable_ddl_propagation TO OFF;",
psprintf("DROP DATABASE IF EXISTS %s;", quote_identifier(databaseName))
);

bool executeCommand = true;

const char *commandString = NULL;
foreach_ptr(commandString, commandList)
{
/*
* Cannot use SendOptionalCommandListToWorkerOutsideTransactionWithConnection()
* because we don't want to open a transaction block on remote nodes as DROP
* DATABASE commands cannot be run inside a transaction block.
*/
if (ExecuteOptionalRemoteCommand(connection, commandString, NULL) !=
RESPONSE_OKAY)
{
executeCommand = false;
break;
}
}

CloseConnection(connection);
return executeCommand;
}


/*
* ErrorIfCleanupRecordForShardExists errors out if a cleanup record for the given
* shard name exists.
Expand Down
Loading
Loading