diff --git a/src/backend/distributed/commands/database.c b/src/backend/distributed/commands/database.c index 33223f41686..bdbb8126c6f 100644 --- a/src/backend/distributed/commands/database.c +++ b/src/backend/distributed/commands/database.c @@ -40,15 +40,32 @@ #include "distributed/deparse_shard_query.h" #include "distributed/deparser.h" #include "distributed/listutils.h" +#include "distributed/local_executor.h" #include "distributed/metadata/distobject.h" #include "distributed/metadata_sync.h" #include "distributed/metadata_utility.h" #include "distributed/multi_executor.h" #include "distributed/relation_access_tracking.h" #include "distributed/serialize_distributed_ddls.h" +#include "distributed/shard_cleaner.h" #include "distributed/worker_protocol.h" #include "distributed/worker_transaction.h" + +/* + * Used to save original name of the database before it is replaced with a + * temporary name for failure handling purposes in PreprocessCreateDatabaseStmt(). + */ +static char *CreateDatabaseCommandOriginalDbName = NULL; + + +/* + * The format string used when creating a temporary databases for failure + * handling purposes. + */ +#define TEMP_DATABASE_NAME_FMT "citus_temp_database_%lu" + + /* * DatabaseCollationInfo is used to store collation related information of a database. */ @@ -453,7 +470,12 @@ PreprocessAlterDatabaseSetStmt(Node *node, const char *queryString, * * In this stage, we perform validations that we want to ensure before delegating to * previous utility hooks because it might not be convenient to throw an error in an - * implicit transaction that creates a database. + * implicit transaction that creates a database. Also in this stage, we save the original + * database name and replace dbname field with a temporary name for failure handling + * purposes. We let Postgres create the database with the temporary name, insert a cleanup + * record for the temporary database name on all workers and let PostprocessCreateDatabaseStmt() + * to return the distributed DDL job that both creates the database with the temporary name + * and then renames it back to its original name. * * We also serialize database commands globally by acquiring a Citus specific advisory * lock based on OCLASS_DATABASE on the first primary worker node. @@ -474,14 +496,40 @@ PreprocessCreateDatabaseStmt(Node *node, const char *queryString, SerializeDistributedDDLsOnObjectClass(OCLASS_DATABASE); + OperationId operationId = RegisterOperationNeedingCleanup(); + + char *tempDatabaseName = psprintf(TEMP_DATABASE_NAME_FMT, operationId); + + List *allNodes = TargetWorkerSetNodeList(ALL_SHARD_NODES, RowShareLock); + WorkerNode *workerNode = NULL; + foreach_ptr(workerNode, allNodes) + { + InsertCleanupRecordInSubtransaction( + CLEANUP_OBJECT_DATABASE, + pstrdup(quote_identifier(tempDatabaseName)), + workerNode->groupId, + CLEANUP_ON_FAILURE + ); + } + + CreateDatabaseCommandOriginalDbName = stmt->dbname; + stmt->dbname = tempDatabaseName; + return NIL; } /* * PostprocessCreateDatabaseStmt is executed after the statement is applied to the local - * postgres instance. In this stage we prepare the commands that need to be run on - * all workers to create the database. + * postgres instance. + * + * In this stage, we first rename the temporary database back to its original name for + * local node and then return a list of distributed DDL jobs to create the database with + * the temporary name and then to rename it back to its original name. That way, if CREATE + * DATABASE fails on any of the nodes, the temporary database will be cleaned up by the + * cleanup records that we inserted in PreprocessCreateDatabaseStmt() and in case of a + * failure, we won't leak any databases called as the name that user intended to use for + * the database. * */ List * @@ -517,7 +565,51 @@ PostprocessCreateDatabaseStmt(Node *node, const char *queryString) */ List *createDatabaseDDLJobList = NontransactionalNodeDDLTaskList(REMOTE_NODES, createDatabaseCommands); - return createDatabaseDDLJobList; + + CreatedbStmt *stmt = castNode(CreatedbStmt, node); + + char *renameDatabaseCommand = + psprintf("ALTER DATABASE %s RENAME TO %s", + quote_identifier(stmt->dbname), + quote_identifier(CreateDatabaseCommandOriginalDbName)); + + List *renameDatabaseCommands = list_make3(DISABLE_DDL_PROPAGATION, + renameDatabaseCommand, + ENABLE_DDL_PROPAGATION); + + /* + * We use NodeDDLTaskList() to send the RENAME DATABASE statement to the + * workers because we want to execute it in a coordinated transaction. + */ + List *renameDatabaseDDLJobList = + NodeDDLTaskList(REMOTE_NODES, renameDatabaseCommands); + + /* + * Temporarily disable citus.enable_ddl_propagation before issuing + * rename command locally because we don't want to execute it on remote + * nodes yet. We will execute it on remote nodes by returning it as a + * distributed DDL job. + * + * The reason why we don't want to execute it on remote nodes yet is that + * the database is not created on remote nodes yet. + */ + int saveNestLevel = NewGUCNestLevel(); + set_config_option("citus.enable_ddl_propagation", "off", + (superuser() ? PGC_SUSET : PGC_USERSET), PGC_S_SESSION, + GUC_ACTION_LOCAL, true, 0, false); + + ExecuteUtilityCommand(renameDatabaseCommand); + + AtEOXact_GUC(true, saveNestLevel); + + /* + * Restore the original database name because MarkObjectDistributed() + * resolves oid of the object based on the database name and is called + * after executing the distributed DDL job that renames temporary database. + */ + stmt->dbname = CreateDatabaseCommandOriginalDbName; + + return list_concat(createDatabaseDDLJobList, renameDatabaseDDLJobList); } diff --git a/src/backend/distributed/operations/shard_cleaner.c b/src/backend/distributed/operations/shard_cleaner.c index 79041453022..497874fa978 100644 --- a/src/backend/distributed/operations/shard_cleaner.c +++ b/src/backend/distributed/operations/shard_cleaner.c @@ -92,6 +92,8 @@ static bool TryDropReplicationSlotOutsideTransaction(char *replicationSlotName, char *nodeName, int nodePort); static bool TryDropUserOutsideTransaction(char *username, char *nodeName, int nodePort); +static bool TryDropDatabaseOutsideTransaction(char *databaseName, char *nodeName, + int nodePort); static CleanupRecord * GetCleanupRecordByNameAndType(char *objectName, CleanupObject type); @@ -141,7 +143,6 @@ Datum citus_cleanup_orphaned_resources(PG_FUNCTION_ARGS) { CheckCitusVersion(ERROR); - EnsureCoordinator(); PreventInTransactionBlock(true, "citus_cleanup_orphaned_resources"); int droppedCount = DropOrphanedResourcesForCleanup(); @@ -245,12 +246,6 @@ TryDropOrphanedResources() static int DropOrphanedResourcesForCleanup() { - /* Only runs on Coordinator */ - if (!IsCoordinator()) - { - return 0; - } - List *cleanupRecordList = ListCleanupRecords(); /* @@ -603,6 +598,12 @@ TryDropResourceByCleanupRecordOutsideTransaction(CleanupRecord *record, return TryDropUserOutsideTransaction(record->objectName, nodeName, nodePort); } + case CLEANUP_OBJECT_DATABASE: + { + return TryDropDatabaseOutsideTransaction(record->objectName, nodeName, + nodePort); + } + default: { ereport(WARNING, (errmsg( @@ -883,6 +884,63 @@ TryDropUserOutsideTransaction(char *username, } +/* + * TryDropDatabaseOutsideTransaction drops the database with the given name + * if it exists. + */ +static bool +TryDropDatabaseOutsideTransaction(char *databaseName, char *nodeName, int nodePort) +{ + int connectionFlags = (OUTSIDE_TRANSACTION | FORCE_NEW_CONNECTION); + MultiConnection *connection = GetNodeUserDatabaseConnection(connectionFlags, + nodeName, nodePort, + CitusExtensionOwnerName(), + NULL); + + if (PQstatus(connection->pgConn) != CONNECTION_OK) + { + return false; + } + + /* + * We want to disable DDL propagation and set lock_timeout before issuing + * the DROP DATABASE command but we cannot do so in a way that's scoped + * to the DROP DATABASE command. This is because, we cannot use a + * transaction block for the DROP DATABASE command. + * + * For this reason, to avoid leaking the lock_timeout and DDL propagation + * settings to future commands, we force the connection to close at the end + * of the transaction. + */ + ForceConnectionCloseAtTransactionEnd(connection); + + /* + * The DROP DATABASE command should not propagate, so we disable DDL + * propagation. + */ + List *commandList = list_make3( + "SET lock_timeout TO '1s'", + "SET citus.enable_ddl_propagation TO OFF;", + psprintf("DROP DATABASE IF EXISTS %s;", quote_identifier(databaseName)) + ); + + bool executeCommand = true; + + const char *commandString = NULL; + foreach_ptr(commandString, commandList) + { + if (ExecuteOptionalRemoteCommand(connection, commandString, NULL) != RESPONSE_OKAY) + { + executeCommand = false; + break; + } + } + + CloseConnection(connection); + return executeCommand; +} + + /* * ErrorIfCleanupRecordForShardExists errors out if a cleanup record for the given * shard name exists. diff --git a/src/include/distributed/shard_cleaner.h b/src/include/distributed/shard_cleaner.h index e7d3dea1bf7..f489d9e2566 100644 --- a/src/include/distributed/shard_cleaner.h +++ b/src/include/distributed/shard_cleaner.h @@ -41,7 +41,8 @@ typedef enum CleanupObject CLEANUP_OBJECT_SUBSCRIPTION = 2, CLEANUP_OBJECT_REPLICATION_SLOT = 3, CLEANUP_OBJECT_PUBLICATION = 4, - CLEANUP_OBJECT_USER = 5 + CLEANUP_OBJECT_USER = 5, + CLEANUP_OBJECT_DATABASE = 6 } CleanupObject; /* diff --git a/src/test/regress/expected/alter_database_propagation.out b/src/test/regress/expected/alter_database_propagation.out index f01d39ab911..8bcff6cf9ba 100644 --- a/src/test/regress/expected/alter_database_propagation.out +++ b/src/test/regress/expected/alter_database_propagation.out @@ -140,7 +140,12 @@ DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx NOTICE: issuing ALTER DATABASE regression RESET lock_timeout DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx set citus.enable_create_database_propagation=on; +SET citus.next_operation_id TO 3000; create database "regression!'2"; +NOTICE: issuing ALTER DATABASE citus_temp_database_3000 RENAME TO "regression!'2" +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing ALTER DATABASE citus_temp_database_3000 RENAME TO "regression!'2" +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx alter database "regression!'2" with CONNECTION LIMIT 100; NOTICE: issuing ALTER DATABASE "regression!'2" WITH CONNECTION LIMIT 100; DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx @@ -189,7 +194,12 @@ alter DATABASE local_regression rename to local_regression2; drop database local_regression2; set citus.enable_create_database_propagation=on; drop database regression3; +SET citus.next_operation_id TO 3100; create database "regression!'4"; +NOTICE: issuing ALTER DATABASE citus_temp_database_3100 RENAME TO "regression!'4" +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing ALTER DATABASE citus_temp_database_3100 RENAME TO "regression!'4" +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx SELECT result FROM run_command_on_all_nodes( $$ ALTER TABLESPACE alter_db_tablespace RENAME TO "ts-needs\!escape" diff --git a/src/test/regress/expected/create_drop_database_propagation.out b/src/test/regress/expected/create_drop_database_propagation.out index da4ec4eb711..588fa6dec85 100644 --- a/src/test/regress/expected/create_drop_database_propagation.out +++ b/src/test/regress/expected/create_drop_database_propagation.out @@ -428,10 +428,11 @@ SELECT * FROM public.check_database_on_all_nodes('my_template_database') ORDER B set citus.enable_create_database_propagation=on; SET citus.log_remote_commands = true; set citus.grep_remote_commands = '%CREATE DATABASE%'; +SET citus.next_operation_id TO 2000; create database "mydatabase#1'2"; -NOTICE: issuing CREATE DATABASE "mydatabase#1'2" +NOTICE: issuing CREATE DATABASE citus_temp_database_2000 DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing CREATE DATABASE "mydatabase#1'2" +NOTICE: issuing CREATE DATABASE citus_temp_database_2000 DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx set citus.grep_remote_commands = '%DROP DATABASE%'; drop database if exists "mydatabase#1'2"; @@ -1264,6 +1265,69 @@ SELECT 1 FROM run_command_on_all_nodes($$REVOKE ALL ON TABLESPACE pg_default FRO DROP DATABASE no_createdb; DROP USER no_createdb; +-- Test a failure scenario by trying to create a distributed database that +-- already exists on one of the nodes. +\c - - - :worker_1_port +CREATE DATABASE "test_\!failure"; +NOTICE: Citus partially supports CREATE DATABASE for distributed databases +DETAIL: Citus does not propagate CREATE DATABASE command to other nodes +HINT: You can manually create a database and its extensions on other nodes. +\c - - - :master_port +SET citus.enable_create_database_propagation TO ON; +CREATE DATABASE "test_\!failure"; +ERROR: database "test_\!failure" already exists +CONTEXT: while executing command on localhost:xxxxx +SET client_min_messages TO WARNING; +CALL citus_cleanup_orphaned_resources(); +RESET client_min_messages; +SELECT result AS database_cleanedup_on_node FROM run_command_on_all_nodes($$SELECT COUNT(*)=0 FROM pg_database WHERE datname LIKE 'citus_temp_database_%'$$); + database_cleanedup_on_node +--------------------------------------------------------------------- + t + t + t +(3 rows) + +SELECT * FROM public.check_database_on_all_nodes($$test_\!failure$$) ORDER BY node_type, result; + node_type | result +--------------------------------------------------------------------- + coordinator (local) | {"database_properties": null, "pg_dist_object_record_for_db_exists": false, "stale_pg_dist_object_record_for_a_db_exists": false} + worker node (remote) | {"database_properties": null, "pg_dist_object_record_for_db_exists": false, "stale_pg_dist_object_record_for_a_db_exists": false} + worker node (remote) | {"database_properties": {"datacl": null, "datname": "test_\\!failure", "datctype": "C", "encoding": "UTF8", "datcollate": "C", "tablespace": "pg_default", "daticurules": null, "datallowconn": true, "datconnlimit": -1, "daticulocale": null, "datistemplate": false, "database_owner": "postgres", "datcollversion": null, "datlocprovider": "c"}, "pg_dist_object_record_for_db_exists": false, "stale_pg_dist_object_record_for_a_db_exists": false} +(3 rows) + +SET citus.enable_create_database_propagation TO OFF; +CREATE DATABASE "test_\!failure1"; +NOTICE: Citus partially supports CREATE DATABASE for distributed databases +DETAIL: Citus does not propagate CREATE DATABASE command to other nodes +HINT: You can manually create a database and its extensions on other nodes. +\c - - - :worker_1_port +DROP DATABASE "test_\!failure"; +SET citus.enable_create_database_propagation TO ON; +CREATE DATABASE "test_\!failure1"; +ERROR: database "test_\!failure1" already exists +CONTEXT: while executing command on localhost:xxxxx +SET client_min_messages TO WARNING; +CALL citus_cleanup_orphaned_resources(); +RESET client_min_messages; +SELECT result AS database_cleanedup_on_node FROM run_command_on_all_nodes($$SELECT COUNT(*)=0 FROM pg_database WHERE datname LIKE 'citus_temp_database_%'$$); + database_cleanedup_on_node +--------------------------------------------------------------------- + t + t + t +(3 rows) + +SELECT * FROM public.check_database_on_all_nodes($$test_\!failure1$$) ORDER BY node_type, result; + node_type | result +--------------------------------------------------------------------- + coordinator (remote) | {"database_properties": {"datacl": null, "datname": "test_\\!failure1", "datctype": "C", "encoding": "UTF8", "datcollate": "C", "tablespace": "pg_default", "daticurules": null, "datallowconn": true, "datconnlimit": -1, "daticulocale": null, "datistemplate": false, "database_owner": "postgres", "datcollversion": null, "datlocprovider": "c"}, "pg_dist_object_record_for_db_exists": false, "stale_pg_dist_object_record_for_a_db_exists": false} + worker node (local) | {"database_properties": null, "pg_dist_object_record_for_db_exists": false, "stale_pg_dist_object_record_for_a_db_exists": false} + worker node (remote) | {"database_properties": null, "pg_dist_object_record_for_db_exists": false, "stale_pg_dist_object_record_for_a_db_exists": false} +(3 rows) + +\c - - - :master_port +DROP DATABASE "test_\!failure1"; SET citus.enable_create_database_propagation TO ON; --clean up resources created by this test -- DROP TABLESPACE is not supported, so we need to drop it manually. diff --git a/src/test/regress/sql/alter_database_propagation.sql b/src/test/regress/sql/alter_database_propagation.sql index 4904919a6a6..9a8b1fab8af 100644 --- a/src/test/regress/sql/alter_database_propagation.sql +++ b/src/test/regress/sql/alter_database_propagation.sql @@ -49,6 +49,7 @@ alter database regression set lock_timeout to DEFAULT; alter database regression RESET lock_timeout; set citus.enable_create_database_propagation=on; +SET citus.next_operation_id TO 3000; create database "regression!'2"; alter database "regression!'2" with CONNECTION LIMIT 100; alter database "regression!'2" with IS_TEMPLATE true CONNECTION LIMIT 50; @@ -90,6 +91,7 @@ set citus.enable_create_database_propagation=on; drop database regression3; +SET citus.next_operation_id TO 3100; create database "regression!'4"; diff --git a/src/test/regress/sql/create_drop_database_propagation.sql b/src/test/regress/sql/create_drop_database_propagation.sql index 329f4861203..011b36661d5 100644 --- a/src/test/regress/sql/create_drop_database_propagation.sql +++ b/src/test/regress/sql/create_drop_database_propagation.sql @@ -219,6 +219,7 @@ SELECT * FROM public.check_database_on_all_nodes('my_template_database') ORDER B set citus.enable_create_database_propagation=on; SET citus.log_remote_commands = true; set citus.grep_remote_commands = '%CREATE DATABASE%'; +SET citus.next_operation_id TO 2000; create database "mydatabase#1'2"; @@ -746,6 +747,46 @@ SELECT 1 FROM run_command_on_all_nodes($$REVOKE ALL ON TABLESPACE pg_default FRO DROP DATABASE no_createdb; DROP USER no_createdb; +-- Test a failure scenario by trying to create a distributed database that +-- already exists on one of the nodes. + +\c - - - :worker_1_port +CREATE DATABASE "test_\!failure"; + +\c - - - :master_port + +SET citus.enable_create_database_propagation TO ON; + +CREATE DATABASE "test_\!failure"; + +SET client_min_messages TO WARNING; +CALL citus_cleanup_orphaned_resources(); +RESET client_min_messages; + +SELECT result AS database_cleanedup_on_node FROM run_command_on_all_nodes($$SELECT COUNT(*)=0 FROM pg_database WHERE datname LIKE 'citus_temp_database_%'$$); +SELECT * FROM public.check_database_on_all_nodes($$test_\!failure$$) ORDER BY node_type, result; + +SET citus.enable_create_database_propagation TO OFF; +CREATE DATABASE "test_\!failure1"; + +\c - - - :worker_1_port +DROP DATABASE "test_\!failure"; + +SET citus.enable_create_database_propagation TO ON; + +CREATE DATABASE "test_\!failure1"; + +SET client_min_messages TO WARNING; +CALL citus_cleanup_orphaned_resources(); +RESET client_min_messages; + +SELECT result AS database_cleanedup_on_node FROM run_command_on_all_nodes($$SELECT COUNT(*)=0 FROM pg_database WHERE datname LIKE 'citus_temp_database_%'$$); +SELECT * FROM public.check_database_on_all_nodes($$test_\!failure1$$) ORDER BY node_type, result; + +\c - - - :master_port + +DROP DATABASE "test_\!failure1"; + SET citus.enable_create_database_propagation TO ON; --clean up resources created by this test