diff --git a/src/backend/distributed/README.md b/src/backend/distributed/README.md index e3a9a7a33dd..7da5dcfab05 100644 --- a/src/backend/distributed/README.md +++ b/src/backend/distributed/README.md @@ -48,7 +48,7 @@ The purpose of this document is to provide comprehensive technical documentation - [Rebalancing algorithm](#rebalancing-algorithm) - [Shard moves](#shard-moves) - [Shard splits](#shard-splits) - - [Background tasks](#background-tasks) + - [Background task runner](#background-task-runner) - [Resource cleanup](#resource-cleanup) - [Logical decoding / CDC](#logical-decoding--cdc) - [CDC ordering](#cdc-ordering) @@ -2344,17 +2344,164 @@ In the past we had some bugs where we had a `palloc` failure while holding `Spin # Rebalancing -A high-level overview of the rebalancer is given in [this rebalancer blog post](https://www.citusdata.com/blog/2021/03/13/scaling-out-postgres-with-citus-open-source-shard-rebalancer/). +A high-level overview of the shard rebalancer is given in [this rebalancer blog post][rebalancer-post]. It is a bit outdated though, specifically that it uses `rebalance_table_shards()` instead of the newer `citus_rebalance_start()`. + +The shard rebalancer consists of 4 main parts: + +1. The rebalancing algorithm: Decides what moves/splits it should do to make + the cluster balanced. +2. The background task runner: Runs a full rebalance according to a plan + created by the planner. +3. A shard group moves/split: These are the smallest units of work that the + rebalancer does, if this fails midway through the move is aborted and the + shard group remains unchanged. +4. Deferred cleanup: The source shards stay present for a while after a move to + let long-running read queries continue, eventually they need to be cleaned + up. + +These parts interact, but they are pretty self-contained. Usually it's only +necessary to change one of them to add a feature/fix a bug. + +[rebalancer-post]: https://www.citusdata.com/blog/2021/03/13/scaling-out-postgres-with-citus-open-source-shard-rebalancer/ ## Rebalancing algorithm +The rebalancing algorithm tries to find an optimal placement of shard groups +across nodes. This is not an easy job, because this is a [co-NP-complete +problem](https://en.wikipedia.org/wiki/Knapsack_problem). So instead of going for +the fully optimal solution it uses a greedy approach to reach a local +optimum, which so far has proved effective in getting to a pretty optimal +solution. + +Even though it won't result in the perfect balance, the greedy approach has two +important practical benefits over a perfect solution: +1. It's relatively easy to understand why the algorithm decided on a certain move. +2. Every move makes the balance better. So if the rebalance is cancelled midway + through, the cluster will always be in a better situation than before. + +As described in the [this rebalancer blog post][rebalance-post] the algorithm +takes three inputs from the function in the `pg_dist_rebalance_strategy` table: + +1. Is a shard group allowed on a certain node? +2. What is the "cost" of a shard group, relative to the other shard groups? +3. What is the "capacity" of a node, relative to the other nodes? + +Cost and capacity are vague on purpose, this way users can choose their own +way to determine cost of a shard group, but **in practice "cost" is usually +disk size** (because `by_disk_size` is the default rebalance strategy). +Capacity is almost always set to 1, because almost all Citus clusters are +homogeneous (they contain the same nodes, except for maybe the coordinator). The +main usage for "Is a shard group allowed on a certain node?" is to be able to pin a +specific shard group to a specific node. + +There is one last definition that you should know to understand the algorithm +and that is "utilization". Utilization is the total cost of all shard groups +divided by capacity. In practice this means that utilization is almost always +the same as cost because as explained above capacity is almost always 1. So if +you see "utilization" in the algorithm, for all intents and purposes you can +read it as "cost". + +The way the general algorithm works is fairly straightforward. It starts by +creating an in-memory representation of the cluster, and then it tries to +improve that in-memory representation by making theoretical moves. So to be +clear the algorithm doesn't actually do any shard group moves, it only does +those moves to its in-memory representation. The way it determines what +theoretical moves to make is as follows (updating utilization of in-memory +nodes after every move): + +1. Find all shard groups that are on a node where they are not allowed (due to + "Is a shard group allowed on a certain node?") +2. Order those nodes by cost +3. Move them one-by one to nodes with the lowest utilization where they are + allowed. +4. If the cluster is balanced we are done. +5. Take the most utilized node (A) and take the least utilized node (B). +6. Try moving the shard group with the highest cost from A to B. +7. If the balance is "better" commit this move and continue from step 4. (See subsection below for + what is "better") +8. If the balance is worse/equal try again from step 6 with the shard group + with the next highest cost on node A. If this was the lowest cost shard on + node A, then try with the highest cost shard again but on the next least + utilized node after node B. If no moves helped with the balance, try with + the next most utilized node after node A. If we tried all moves for all + nodes like this, we are done (we cannot get a better balance). + + +Of course, the devil is in the details though. + +### When is the balance better? + +The main way to determine if the balance is better is by comparing the +utilization of node A and B, before and after the move and seeing if they are +net closer to the average utilization of the nodes in the cluster. The easiest +way to explain this is with a simple example: + +We have two nodes A and B. A has a utilization of 100GB and B has a utilization +of 70GB. So we will move a shard from A to B. A move of 15GB is obviously best, +it results in perfect balance (A=85GB, B=85GB). A move of a 10GB is still +great, both improved in balance (A=90GB, B=80GB). A move of 20GB is also good, +the result is the same as a move of 10GB only with the nodes swapped (A=80GB, +B=90GB). + +The 10GB vs 20GB move shows a limitation of the current algorithm. The +algorithm mostly makes choices based on the end state, not on the cost of +moving a shard. This is usually not a huge problem in practice though. + +### Thresholds + +The algorithm is full of thresholds, the main reason these exist is because +moving shards around isn't free. + +- `threshold`: Used to determine if the cluster is in a good enough state. For + the `by_disk_size` rebalance strategy this is 10%, so if all nodes are at + most 10% above or 10% below the average utilization then no moves are + necessary anymore (i.e. the nodes are balanced enough). The main reason for + this threshold is that these small differences in utilization are not + necessarily problematic and might very well resolve automatically over time. For example, consider a scenario in which + one shard gets mostly written in during the weekend, while another one during + the week. Moving shards on Monday and that you then have to move back on + Friday is not very helpful given the overhead of moving data around. +- `improvement_threshold`: This is used in cases where a shard group move from + node A to B swaps which node now has the highest utilization (so afterwards B + will have higher utilization than A). As described above this can still + result in better balance. This threshold is meant to work around a + particularly bad situation where we move a lot of data for very little + benefit. Imagine this situation: A=200GB and B=99, thus moving a 100GB shard + from A to B would bring their utilization closer to the average (A=100GB, + B=199GB). But obviously that's a tiny gain for a move of 100GB, which + probably takes lots of resources and time. The `improvement_threshold` is set + to 50% for the `by_disk_size` rebalance strategy. This means that this move + is only chosen if the utilization improvement is larger than 50% of the + utilization that the shard group causes on its current node. + +### How do multiple colocation groups impact the rebalancer algorithm? + +The previous section glossed over colocation groups a bit. The main reason for +that is that the algorithm doesn't handle multiple colocation groups very well. +If there are multiple colocation groups each colocation group gets balanced +completely separately. For the utilization calculations only the costs are used +for the shard groups in the colocation group that is currently being rebalanced. +The reasoning for this is that if you have two colocation groups, you probably +want to spread the shard groups from both colocation groups across multiple +nodes. And not have shard groups from colocation group 1 only be on node A and +shard groups from colocation group 2 only be on node B. + +There is an important caveat here though for colocation groups that have fewer +shard groups than the number of nodes in the cluster (in practice these are +usually colocation groups used by schema based sharding, i.e. with a single +shard group): The rebalancer algorithm balances the shard groups from these +colocation groups as if they are all all part of a single colocation group. +The main reason for this is to make sure that schemas for schema based sharding +are spread evenly across the nodes. + + ## Shard moves -Shard moves move a shard group placement to a different node (group). Moves are orchestrated by the `citus_move_shard_placement` UDF, which is also the function that the rebalancer runs to move a shard. +Shard moves move a shard group placement to a different node (group). It would be more correct if these were called "shard **group** moves", but in many places we don't due to historical reasons. Moves are orchestrated by the `citus_move_shard_placement` UDF, which is also the function that the rebalancer runs to move a shard. We implement blocking and non-blocking shard splits. Non-blocking shard moves use logical replication, which has an important limitation. If the (distributed) table does not have a replica identity (usually the primary key), then update/delete commands will error out once we create a publication. That means using a non-blocking move without a replica identity does incur some downtime. Since a blocking move is generally faster (in part because it forces out regular work), it may be less invasive. We therefore force the user to choose when trying to move a shard group that includes a table without a replica identity by supplying `shard_transfer_mode := 'force_logical'` or `shard_transfer_mode := 'block_writes'`. -The blocking-move is mostly a simplified variant of the non-blocking move (with locks taken upfront). A non-blocking move involves the following steps: +The blocking-move is mostly a simplified variant of the non-blocking move, where the write locks are taken upfront so that no catch-up using logical replication is needed. A non-blocking move involves the following steps: - **Create the new shard group placement on the target node**. We also create constraints that do not involve an index and set up ownership and access control. - **Create publication(s) on the source node**. We create publications containing the shards in the source shard group placement. We create one publications per table owner, mainly because we need one subscription per table owner to prevent privilege escalation issues on older versions of PostgreSQL (15 and below). @@ -2379,7 +2526,7 @@ A workaround for the replica identity problem is to always assign REPLICA IDENTI ## Shard splits -Shard splits convert one shard group ("split parent") into two or more shard groups ("split children") by splitting the hash range. The new shard groups can be placed on the node itself, or on other nodes. We implement blocking and non-blocking shard splits. The blocking variant is mostly a simplified version of non-blocking, so we only cover non-blocking here. Shard splits have many similarities to shard moves, and have the same `shard_transfer_mode` choice. +Shard splits convert one shard group ("split parent") into two or more shard groups ("split children") by splitting the hash range. Just like with shard moves it would be more correct to call these "shard **group** splits", but again we often don't. The new shard groups can be placed on the node itself, or on other nodes. We implement blocking and non-blocking shard splits. The blocking variant is mostly a simplified version of non-blocking, so we only cover non-blocking here. Shard splits have many similarities to shard moves, and have the same `shard_transfer_mode` choice. The shard split is a lengthy process performed by the `NonBlockingShardSplit` function, supported by a custom output plugin to handle writes that happen during the split. There are a few different entry-points in this logic, namely: `citus_split_shard_by_split_points`, `create_distributed_table_concurrently`, and `isolate_tenant_to_node`. @@ -2409,19 +2556,54 @@ A difference between splits and moves is that the old shard ID disappears. In ca ## Background tasks +In the past the only way to trigger a rebalance was to call +`rebalance_table_shards()`, this function run the rebalance using the current +session. This has the huge downside that the connection needs to be kept open +until the rebalance completes. So eventually we [introduced +`citus_rebalance_start()`](https://www.citusdata.com/blog/2022/09/19/citus-11-1-shards-postgres-tables-without-interruption/#rebalance-background), +which uses a background worker to do the rebalancing, so users can disconnect +their client and the rebalance continues. It even automatically retries moves +if they failed for some reason. + +The way this works is using a general background job infrastructure that Citus +has in the tables `pg_dist_backround_job` and `pg_dist_background_task`. +A job (often) contains multiple tasks. In case of the rebalancer, the job is +the full rebalance, and each of its tasks are separate shard group moves. + +### Parallel background task execution + +A big benefit of the background task infrastructure is that it can execute tasks +and jobs in parallel. This can make rebalancing go much faster especially in +clusters with many nodes. To ensure that we're not doing too many tasks in +parallel though we have a few ways to limit concurrency: + +1. Tasks can depend on each other. This makes sure that one task doesn't start + before all the ones that it depends on have finished. +2. The maximum number of parallel tasks being executed at the same time can be + limited using `citus.max_background_task_executors`. The default for + this is 4. +3. Tasks can specify which nodes are involved in the task, that way we can + control that a single node is not involved into too many tasks. The + rebalancer specifies both the source and target node as being involved in + the task. That together with the default of 1 for + `citus.max_background_task_executors_per_node` makes sure that a node + doesn't handle more than a single shard move at once, while still allowing + moves involving different nodes to happen in parallel. For larger machines + it can be beneficial to increase the default a bit. + ## Resource cleanup During a shard move/split, some PostgreSQL objects can be created that live outside of the scope of any transaction or are committed early. We need to make sure those objects are dropped once the shard move ends, either through failure or success. For instance, subscriptions and publications used for logical replication need to be dropped in case of failure, but also the target shard (in case of failure) and source shard (in case of success). To achieve that, we write records to pg_dist_cleanup before creating an object to remember that we need to clean it. We distinguish between a few scenarios: +**Cleanup-always**: For most resources that require cleanup records, cleanup should happen regardless of whether the operation succeeds or fails. For instance, subscriptions and publications should always be dropped. We achieve cleanup always by writing pg_dist_cleanup records in a subtransaction, and at the end of the operation we try to clean up object immediately and if it succeeds delete the record. If cleanup fails, we do not fail the whole operation, but instead leave the pg_dist_cleanup record in place for the maintenance daemon. + **Cleanup-on-failure**: Cleanup should only happen if the operation fails. The main example is the target shard of a move/split. We achieve cleanup-on-failure by writing pg_dist_cleanup records in a subtransaction (transaction on a localhost connection that commits immediately) and deleting them in the outer transaction that performs the move/split. That way, they remain in pg_dist_cleanup in case of failure, but disappear in case of success. **Cleanup-deferred-on-success**: Cleanup should only happen after the operation (move/split) succeeds. We use this to clean the source shards of a shard move. We previously dropped shards immediately as part of the transaction, but this frequently led to deadlocks at the end of a shard move. We achieve cleanup-on-success by writing pg_dist_cleanup records as part of the outer transaction that performs the move/split. -**Cleanup-always**: For most resources that require cleanup records, cleanup should happen regardless of whether the operation succeeds or fails. For instance, subscriptions and publications should always be dropped. We achieve cleanup always by writing pg_dist_cleanup records in a subtransaction, and at the end of the operation we try to clean up object immediately and if it succeeds delete the record. If cleanup fails, we do not fail the whole operation, but instead leave the pg_dist_cleanup record in place for the maintenance daemon. - -Resource cleaner (currently shard_cleaner.c) is part of the maintenance daemon and periodically checks pg_dist_cleanup for cleanup tasks. It’s important to prevent cleanup of operations that are already running. Therefore, each operation has a unique operation ID (from a sequence) and takes an advisory lock on the operation ID. The resource cleaner learns the operation ID from pg_dist_cleanup and attempts to acquire this lock. If it cannot acquire the lock, the operation is not done and cleanup is skipped. If it can, the operation is done, and the resource cleaner rechecks whether the record still exists, since it could have been deleted by the operation. +Resource cleaner (currently shard_cleaner.c) is part of the maintenance daemon and periodically checks pg_dist_cleanup for cleanup tasks. It’s important to prevent cleanup of operations that are still running. Therefore, each operation has a unique operation ID (from a sequence) and takes an advisory lock on the operation ID. The resource cleaner learns the operation ID from pg_dist_cleanup and attempts to acquire this lock. If it cannot acquire the lock, the operation is not done and cleanup is skipped. If it can, the operation is done, and the resource cleaner rechecks whether the record still exists, since it could have been deleted by the operation. Cleanup records always need to be committed before creating the actual object. It’s also important for the cleanup operation to be idempotent, since the server might crash immediately after committing a cleanup record, but before actually creating the object. Hence, the object might not exist when trying to clean it up. In that case, the cleanup is seen as successful, and the cleanup record removed.