Skip to content

Commit

Permalink
Add failure handling for CREATE DATABASE commands
Browse files Browse the repository at this point in the history
  • Loading branch information
onurctirtir committed Feb 8, 2024
1 parent 2fae91c commit 731f191
Show file tree
Hide file tree
Showing 7 changed files with 282 additions and 14 deletions.
100 changes: 96 additions & 4 deletions src/backend/distributed/commands/database.c
Original file line number Diff line number Diff line change
Expand Up @@ -40,15 +40,32 @@
#include "distributed/deparse_shard_query.h"
#include "distributed/deparser.h"
#include "distributed/listutils.h"
#include "distributed/local_executor.h"
#include "distributed/metadata/distobject.h"
#include "distributed/metadata_sync.h"
#include "distributed/metadata_utility.h"
#include "distributed/multi_executor.h"
#include "distributed/relation_access_tracking.h"
#include "distributed/serialize_distributed_ddls.h"
#include "distributed/shard_cleaner.h"
#include "distributed/worker_protocol.h"
#include "distributed/worker_transaction.h"


/*
* Used to save original name of the database before it is replaced with a
* temporary name for failure handling purposes in PreprocessCreateDatabaseStmt().
*/
static char *CreateDatabaseCommandOriginalDbName = NULL;


/*
* The format string used when creating a temporary databases for failure
* handling purposes.
*/
#define TEMP_DATABASE_NAME_FMT "citus_temp_database_%lu"


/*
* DatabaseCollationInfo is used to store collation related information of a database.
*/
Expand Down Expand Up @@ -453,7 +470,12 @@ PreprocessAlterDatabaseSetStmt(Node *node, const char *queryString,
*
* In this stage, we perform validations that we want to ensure before delegating to
* previous utility hooks because it might not be convenient to throw an error in an
* implicit transaction that creates a database.
* implicit transaction that creates a database. Also in this stage, we save the original
* database name and replace dbname field with a temporary name for failure handling
* purposes. We let Postgres create the database with the temporary name, insert a cleanup
* record for the temporary database name on all workers and let PostprocessCreateDatabaseStmt()
* to return the distributed DDL job that both creates the database with the temporary name
* and then renames it back to its original name.
*
* We also serialize database commands globally by acquiring a Citus specific advisory
* lock based on OCLASS_DATABASE on the first primary worker node.
Expand All @@ -474,14 +496,40 @@ PreprocessCreateDatabaseStmt(Node *node, const char *queryString,

SerializeDistributedDDLsOnObjectClass(OCLASS_DATABASE);

OperationId operationId = RegisterOperationNeedingCleanup();

char *tempDatabaseName = psprintf(TEMP_DATABASE_NAME_FMT, operationId);

List *allNodes = TargetWorkerSetNodeList(ALL_SHARD_NODES, RowShareLock);
WorkerNode *workerNode = NULL;
foreach_ptr(workerNode, allNodes)
{
InsertCleanupRecordInSubtransaction(
CLEANUP_OBJECT_DATABASE,
pstrdup(quote_identifier(tempDatabaseName)),
workerNode->groupId,
CLEANUP_ON_FAILURE
);
}

CreateDatabaseCommandOriginalDbName = stmt->dbname;
stmt->dbname = tempDatabaseName;

return NIL;
}


/*
* PostprocessCreateDatabaseStmt is executed after the statement is applied to the local
* postgres instance. In this stage we prepare the commands that need to be run on
* all workers to create the database.
* postgres instance.
*
* In this stage, we first rename the temporary database back to its original name for
* local node and then return a list of distributed DDL jobs to create the database with
* the temporary name and then to rename it back to its original name. That way, if CREATE
* DATABASE fails on any of the nodes, the temporary database will be cleaned up by the
* cleanup records that we inserted in PreprocessCreateDatabaseStmt() and in case of a
* failure, we won't leak any databases called as the name that user intended to use for
* the database.
*
*/
List *
Expand Down Expand Up @@ -517,7 +565,51 @@ PostprocessCreateDatabaseStmt(Node *node, const char *queryString)
*/
List *createDatabaseDDLJobList =
NontransactionalNodeDDLTaskList(REMOTE_NODES, createDatabaseCommands);
return createDatabaseDDLJobList;

CreatedbStmt *stmt = castNode(CreatedbStmt, node);

char *renameDatabaseCommand =
psprintf("ALTER DATABASE %s RENAME TO %s",
quote_identifier(stmt->dbname),
quote_identifier(CreateDatabaseCommandOriginalDbName));

List *renameDatabaseCommands = list_make3(DISABLE_DDL_PROPAGATION,
renameDatabaseCommand,
ENABLE_DDL_PROPAGATION);

/*
* We use NodeDDLTaskList() to send the RENAME DATABASE statement to the
* workers because we want to execute it in a coordinated transaction.
*/
List *renameDatabaseDDLJobList =
NodeDDLTaskList(REMOTE_NODES, renameDatabaseCommands);

/*
* Temporarily disable citus.enable_ddl_propagation before issuing
* rename command locally because we don't want to execute it on remote
* nodes yet. We will execute it on remote nodes by returning it as a
* distributed DDL job.
*
* The reason why we don't want to execute it on remote nodes yet is that
* the database is not created on remote nodes yet.
*/
int saveNestLevel = NewGUCNestLevel();
set_config_option("citus.enable_ddl_propagation", "off",
(superuser() ? PGC_SUSET : PGC_USERSET), PGC_S_SESSION,
GUC_ACTION_LOCAL, true, 0, false);

ExecuteUtilityCommand(renameDatabaseCommand);

AtEOXact_GUC(true, saveNestLevel);

/*
* Restore the original database name because MarkObjectDistributed()
* resolves oid of the object based on the database name and is called
* after executing the distributed DDL job that renames temporary database.
*/
stmt->dbname = CreateDatabaseCommandOriginalDbName;

return list_concat(createDatabaseDDLJobList, renameDatabaseDDLJobList);
}


Expand Down
72 changes: 65 additions & 7 deletions src/backend/distributed/operations/shard_cleaner.c
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,8 @@ static bool TryDropReplicationSlotOutsideTransaction(char *replicationSlotName,
char *nodeName,
int nodePort);
static bool TryDropUserOutsideTransaction(char *username, char *nodeName, int nodePort);
static bool TryDropDatabaseOutsideTransaction(char *databaseName, char *nodeName,
int nodePort);

static CleanupRecord * GetCleanupRecordByNameAndType(char *objectName,
CleanupObject type);
Expand Down Expand Up @@ -141,7 +143,6 @@ Datum
citus_cleanup_orphaned_resources(PG_FUNCTION_ARGS)
{
CheckCitusVersion(ERROR);
EnsureCoordinator();
PreventInTransactionBlock(true, "citus_cleanup_orphaned_resources");

int droppedCount = DropOrphanedResourcesForCleanup();
Expand Down Expand Up @@ -245,12 +246,6 @@ TryDropOrphanedResources()
static int
DropOrphanedResourcesForCleanup()
{
/* Only runs on Coordinator */
if (!IsCoordinator())
{
return 0;
}

List *cleanupRecordList = ListCleanupRecords();

/*
Expand Down Expand Up @@ -603,6 +598,12 @@ TryDropResourceByCleanupRecordOutsideTransaction(CleanupRecord *record,
return TryDropUserOutsideTransaction(record->objectName, nodeName, nodePort);
}

case CLEANUP_OBJECT_DATABASE:
{
return TryDropDatabaseOutsideTransaction(record->objectName, nodeName,
nodePort);
}

default:
{
ereport(WARNING, (errmsg(
Expand Down Expand Up @@ -883,6 +884,63 @@ TryDropUserOutsideTransaction(char *username,
}


/*
* TryDropDatabaseOutsideTransaction drops the database with the given name
* if it exists.
*/
static bool
TryDropDatabaseOutsideTransaction(char *databaseName, char *nodeName, int nodePort)
{
int connectionFlags = (OUTSIDE_TRANSACTION | FORCE_NEW_CONNECTION);
MultiConnection *connection = GetNodeUserDatabaseConnection(connectionFlags,
nodeName, nodePort,
CitusExtensionOwnerName(),
NULL);

if (PQstatus(connection->pgConn) != CONNECTION_OK)
{
return false;
}

/*
* We want to disable DDL propagation and set lock_timeout before issuing
* the DROP DATABASE command but we cannot do so in a way that's scoped
* to the DROP DATABASE command. This is because, we cannot use a
* transaction block for the DROP DATABASE command.
*
* For this reason, to avoid leaking the lock_timeout and DDL propagation
* settings to future commands, we force the connection to close at the end
* of the transaction.
*/
ForceConnectionCloseAtTransactionEnd(connection);

/*
* The DROP DATABASE command should not propagate, so we disable DDL
* propagation.
*/
List *commandList = list_make3(
"SET lock_timeout TO '1s'",
"SET citus.enable_ddl_propagation TO OFF;",
psprintf("DROP DATABASE IF EXISTS %s;", quote_identifier(databaseName))
);

bool executeCommand = true;

const char *commandString = NULL;
foreach_ptr(commandString, commandList)
{
if (ExecuteOptionalRemoteCommand(connection, commandString, NULL) != RESPONSE_OKAY)
{
executeCommand = false;
break;
}
}

CloseConnection(connection);
return executeCommand;
}


/*
* ErrorIfCleanupRecordForShardExists errors out if a cleanup record for the given
* shard name exists.
Expand Down
3 changes: 2 additions & 1 deletion src/include/distributed/shard_cleaner.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@ typedef enum CleanupObject
CLEANUP_OBJECT_SUBSCRIPTION = 2,
CLEANUP_OBJECT_REPLICATION_SLOT = 3,
CLEANUP_OBJECT_PUBLICATION = 4,
CLEANUP_OBJECT_USER = 5
CLEANUP_OBJECT_USER = 5,
CLEANUP_OBJECT_DATABASE = 6
} CleanupObject;

/*
Expand Down
10 changes: 10 additions & 0 deletions src/test/regress/expected/alter_database_propagation.out
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,12 @@ DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing ALTER DATABASE regression RESET lock_timeout
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
set citus.enable_create_database_propagation=on;
SET citus.next_operation_id TO 3000;
create database "regression!'2";
NOTICE: issuing ALTER DATABASE citus_temp_database_3000 RENAME TO "regression!'2"
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing ALTER DATABASE citus_temp_database_3000 RENAME TO "regression!'2"
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
alter database "regression!'2" with CONNECTION LIMIT 100;
NOTICE: issuing ALTER DATABASE "regression!'2" WITH CONNECTION LIMIT 100;
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
Expand Down Expand Up @@ -189,7 +194,12 @@ alter DATABASE local_regression rename to local_regression2;
drop database local_regression2;
set citus.enable_create_database_propagation=on;
drop database regression3;
SET citus.next_operation_id TO 3100;
create database "regression!'4";
NOTICE: issuing ALTER DATABASE citus_temp_database_3100 RENAME TO "regression!'4"
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing ALTER DATABASE citus_temp_database_3100 RENAME TO "regression!'4"
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
SELECT result FROM run_command_on_all_nodes(
$$
ALTER TABLESPACE alter_db_tablespace RENAME TO "ts-needs\!escape"
Expand Down
68 changes: 66 additions & 2 deletions src/test/regress/expected/create_drop_database_propagation.out
Original file line number Diff line number Diff line change
Expand Up @@ -428,10 +428,11 @@ SELECT * FROM public.check_database_on_all_nodes('my_template_database') ORDER B
set citus.enable_create_database_propagation=on;
SET citus.log_remote_commands = true;
set citus.grep_remote_commands = '%CREATE DATABASE%';
SET citus.next_operation_id TO 2000;
create database "mydatabase#1'2";
NOTICE: issuing CREATE DATABASE "mydatabase#1'2"
NOTICE: issuing CREATE DATABASE citus_temp_database_2000
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing CREATE DATABASE "mydatabase#1'2"
NOTICE: issuing CREATE DATABASE citus_temp_database_2000
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
set citus.grep_remote_commands = '%DROP DATABASE%';
drop database if exists "mydatabase#1'2";
Expand Down Expand Up @@ -1264,6 +1265,69 @@ SELECT 1 FROM run_command_on_all_nodes($$REVOKE ALL ON TABLESPACE pg_default FRO

DROP DATABASE no_createdb;
DROP USER no_createdb;
-- Test a failure scenario by trying to create a distributed database that
-- already exists on one of the nodes.
\c - - - :worker_1_port
CREATE DATABASE "test_\!failure";
NOTICE: Citus partially supports CREATE DATABASE for distributed databases
DETAIL: Citus does not propagate CREATE DATABASE command to other nodes
HINT: You can manually create a database and its extensions on other nodes.
\c - - - :master_port
SET citus.enable_create_database_propagation TO ON;
CREATE DATABASE "test_\!failure";
ERROR: database "test_\!failure" already exists
CONTEXT: while executing command on localhost:xxxxx
SET client_min_messages TO WARNING;
CALL citus_cleanup_orphaned_resources();
RESET client_min_messages;
SELECT result AS database_cleanedup_on_node FROM run_command_on_all_nodes($$SELECT COUNT(*)=0 FROM pg_database WHERE datname LIKE 'citus_temp_database_%'$$);
database_cleanedup_on_node
---------------------------------------------------------------------
t
t
t
(3 rows)

SELECT * FROM public.check_database_on_all_nodes($$test_\!failure$$) ORDER BY node_type, result;
node_type | result
---------------------------------------------------------------------
coordinator (local) | {"database_properties": null, "pg_dist_object_record_for_db_exists": false, "stale_pg_dist_object_record_for_a_db_exists": false}
worker node (remote) | {"database_properties": null, "pg_dist_object_record_for_db_exists": false, "stale_pg_dist_object_record_for_a_db_exists": false}
worker node (remote) | {"database_properties": {"datacl": null, "datname": "test_\\!failure", "datctype": "C", "encoding": "UTF8", "datcollate": "C", "tablespace": "pg_default", "daticurules": null, "datallowconn": true, "datconnlimit": -1, "daticulocale": null, "datistemplate": false, "database_owner": "postgres", "datcollversion": null, "datlocprovider": "c"}, "pg_dist_object_record_for_db_exists": false, "stale_pg_dist_object_record_for_a_db_exists": false}
(3 rows)

SET citus.enable_create_database_propagation TO OFF;
CREATE DATABASE "test_\!failure1";
NOTICE: Citus partially supports CREATE DATABASE for distributed databases
DETAIL: Citus does not propagate CREATE DATABASE command to other nodes
HINT: You can manually create a database and its extensions on other nodes.
\c - - - :worker_1_port
DROP DATABASE "test_\!failure";
SET citus.enable_create_database_propagation TO ON;
CREATE DATABASE "test_\!failure1";
ERROR: database "test_\!failure1" already exists
CONTEXT: while executing command on localhost:xxxxx
SET client_min_messages TO WARNING;
CALL citus_cleanup_orphaned_resources();
RESET client_min_messages;
SELECT result AS database_cleanedup_on_node FROM run_command_on_all_nodes($$SELECT COUNT(*)=0 FROM pg_database WHERE datname LIKE 'citus_temp_database_%'$$);
database_cleanedup_on_node
---------------------------------------------------------------------
t
t
t
(3 rows)

SELECT * FROM public.check_database_on_all_nodes($$test_\!failure1$$) ORDER BY node_type, result;
node_type | result
---------------------------------------------------------------------
coordinator (remote) | {"database_properties": {"datacl": null, "datname": "test_\\!failure1", "datctype": "C", "encoding": "UTF8", "datcollate": "C", "tablespace": "pg_default", "daticurules": null, "datallowconn": true, "datconnlimit": -1, "daticulocale": null, "datistemplate": false, "database_owner": "postgres", "datcollversion": null, "datlocprovider": "c"}, "pg_dist_object_record_for_db_exists": false, "stale_pg_dist_object_record_for_a_db_exists": false}
worker node (local) | {"database_properties": null, "pg_dist_object_record_for_db_exists": false, "stale_pg_dist_object_record_for_a_db_exists": false}
worker node (remote) | {"database_properties": null, "pg_dist_object_record_for_db_exists": false, "stale_pg_dist_object_record_for_a_db_exists": false}
(3 rows)

\c - - - :master_port
DROP DATABASE "test_\!failure1";
SET citus.enable_create_database_propagation TO ON;
--clean up resources created by this test
-- DROP TABLESPACE is not supported, so we need to drop it manually.
Expand Down
2 changes: 2 additions & 0 deletions src/test/regress/sql/alter_database_propagation.sql
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ alter database regression set lock_timeout to DEFAULT;
alter database regression RESET lock_timeout;

set citus.enable_create_database_propagation=on;
SET citus.next_operation_id TO 3000;
create database "regression!'2";
alter database "regression!'2" with CONNECTION LIMIT 100;
alter database "regression!'2" with IS_TEMPLATE true CONNECTION LIMIT 50;
Expand Down Expand Up @@ -90,6 +91,7 @@ set citus.enable_create_database_propagation=on;

drop database regression3;

SET citus.next_operation_id TO 3100;
create database "regression!'4";


Expand Down
Loading

0 comments on commit 731f191

Please sign in to comment.