Skip to content

Commit

Permalink
Use multiple processors when PGUpgrade is given CPU resources
Browse files Browse the repository at this point in the history
The --jobs flag allows for some aspects of pg_upgrade to operate in
parallel. The documentation says:

    This option can dramatically reduce the time to upgrade
    a multi-database server running on a multiprocessor machine.

Issue: PGO-1958
See: https://www.postgresql.org/docs/current/pgupgrade.html
  • Loading branch information
cbandy committed Dec 9, 2024
1 parent 8fb1788 commit 5fae3e9
Show file tree
Hide file tree
Showing 4 changed files with 131 additions and 18 deletions.
39 changes: 31 additions & 8 deletions internal/controller/pgupgrade/jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,11 @@ import (
appsv1 "k8s.io/api/apps/v1"
batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"

"github.com/crunchydata/postgres-operator/internal/feature"
"github.com/crunchydata/postgres-operator/internal/initialize"
"github.com/crunchydata/postgres-operator/internal/naming"
"github.com/crunchydata/postgres-operator/pkg/apis/postgres-operator.crunchydata.com/v1beta1"
Expand All @@ -33,17 +35,17 @@ func pgUpgradeJob(upgrade *v1beta1.PGUpgrade) metav1.ObjectMeta {

// upgradeCommand returns an entrypoint that prepares the filesystem for
// and performs a PostgreSQL major version upgrade using pg_upgrade.
func upgradeCommand(upgrade *v1beta1.PGUpgrade, fetchKeyCommand string) []string {
oldVersion := fmt.Sprint(upgrade.Spec.FromPostgresVersion)
newVersion := fmt.Sprint(upgrade.Spec.ToPostgresVersion)
func upgradeCommand(oldVersion, newVersion int, fetchKeyCommand string, availableCPUs int) []string {
// Use multiple CPUs when three or more are available.
argJobs := fmt.Sprintf(` --jobs=%d`, max(1, availableCPUs-1))

// if the fetch key command is set for TDE, provide the value during initialization
initdb := `/usr/pgsql-"${new_version}"/bin/initdb -k -D /pgdata/pg"${new_version}"`
if fetchKeyCommand != "" {
initdb += ` --encryption-key-command "` + fetchKeyCommand + `"`
}

args := []string{oldVersion, newVersion}
args := []string{fmt.Sprint(oldVersion), fmt.Sprint(newVersion)}
script := strings.Join([]string{
`declare -r data_volume='/pgdata' old_version="$1" new_version="$2"`,
`printf 'Performing PostgreSQL upgrade from version "%s" to "%s" ...\n\n' "$@"`,
Expand Down Expand Up @@ -97,14 +99,14 @@ func upgradeCommand(upgrade *v1beta1.PGUpgrade, fetchKeyCommand string) []string
`echo -e "Step 5: Running pg_upgrade check...\n"`,
`time /usr/pgsql-"${new_version}"/bin/pg_upgrade --old-bindir /usr/pgsql-"${old_version}"/bin \`,
`--new-bindir /usr/pgsql-"${new_version}"/bin --old-datadir /pgdata/pg"${old_version}"\`,
` --new-datadir /pgdata/pg"${new_version}" --link --check`,
` --new-datadir /pgdata/pg"${new_version}" --link --check` + argJobs,

// Assuming the check completes successfully, the pg_upgrade command will
// be run that actually prepares the upgraded pgdata directory.
`echo -e "\nStep 6: Running pg_upgrade...\n"`,
`time /usr/pgsql-"${new_version}"/bin/pg_upgrade --old-bindir /usr/pgsql-"${old_version}"/bin \`,
`--new-bindir /usr/pgsql-"${new_version}"/bin --old-datadir /pgdata/pg"${old_version}" \`,
`--new-datadir /pgdata/pg"${new_version}" --link`,
`--new-datadir /pgdata/pg"${new_version}" --link` + argJobs,

// Since we have cleared the Patroni cluster step by removing the EndPoints, we copy patroni.dynamic.json
// from the old data dir to help retain PostgreSQL parameters you had set before.
Expand All @@ -118,10 +120,21 @@ func upgradeCommand(upgrade *v1beta1.PGUpgrade, fetchKeyCommand string) []string
return append([]string{"bash", "-ceu", "--", script, "upgrade"}, args...)
}

// largestWholeCPU returns the maximum CPU request or limit as a non-negative
// integer of CPUs. When resources lacks any CPU, the result is zero.
func largestWholeCPU(resources corev1.ResourceRequirements) int {
// Read CPU quantities as millicores then divide to get the "floor."
// NOTE: [resource.Quantity.Value] looks easier, but it rounds up.
return max(
int(resources.Limits.Cpu().ScaledValue(resource.Milli)/1000),
int(resources.Requests.Cpu().ScaledValue(resource.Milli)/1000),
0)
}

// generateUpgradeJob returns a Job that can upgrade the PostgreSQL data
// directory of the startup instance.
func (r *PGUpgradeReconciler) generateUpgradeJob(
_ context.Context, upgrade *v1beta1.PGUpgrade,
ctx context.Context, upgrade *v1beta1.PGUpgrade,
startup *appsv1.StatefulSet, fetchKeyCommand string,
) *batchv1.Job {
job := &batchv1.Job{}
Expand Down Expand Up @@ -167,6 +180,12 @@ func (r *PGUpgradeReconciler) generateUpgradeJob(
job.Spec.BackoffLimit = initialize.Int32(0)
job.Spec.Template.Spec.RestartPolicy = corev1.RestartPolicyNever

// When enabled, calculate the number of CPUs for pg_upgrade.
wholeCPUs := 0
if feature.Enabled(ctx, feature.PGUpgradeCPUConcurrency) {
wholeCPUs = largestWholeCPU(upgrade.Spec.Resources)
}

// Replace all containers with one that does the upgrade.
job.Spec.Template.Spec.EphemeralContainers = nil
job.Spec.Template.Spec.InitContainers = nil
Expand All @@ -179,7 +198,11 @@ func (r *PGUpgradeReconciler) generateUpgradeJob(
VolumeMounts: database.VolumeMounts,

// Use our upgrade command and the specified image and resources.
Command: upgradeCommand(upgrade, fetchKeyCommand),
Command: upgradeCommand(
upgrade.Spec.FromPostgresVersion,
upgrade.Spec.ToPostgresVersion,
fetchKeyCommand,
wholeCPUs),
Image: pgUpgradeContainerImage(upgrade),
ImagePullPolicy: upgrade.Spec.ImagePullPolicy,
Resources: upgrade.Spec.Resources,
Expand Down
89 changes: 87 additions & 2 deletions internal/controller/pgupgrade/jobs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,85 @@ import (
"k8s.io/apimachinery/pkg/api/resource"
"sigs.k8s.io/yaml"

"github.com/crunchydata/postgres-operator/internal/feature"
"github.com/crunchydata/postgres-operator/internal/initialize"
"github.com/crunchydata/postgres-operator/internal/testing/cmp"
"github.com/crunchydata/postgres-operator/pkg/apis/postgres-operator.crunchydata.com/v1beta1"
)

func TestLargestWholeCPU(t *testing.T) {
assert.Equal(t, 0,
largestWholeCPU(corev1.ResourceRequirements{}),
"expected the zero value to be zero")

for _, tt := range []struct {
Name, ResourcesYAML string
Result int
}{
{
Name: "Negatives", ResourcesYAML: `{requests: {cpu: -3}, limits: {cpu: -5}}`,
Result: 0,
},
{
Name: "SmallPositive", ResourcesYAML: `limits: {cpu: 600m}`,
Result: 0,
},
{
Name: "FractionalPositive", ResourcesYAML: `requests: {cpu: 2200m}`,
Result: 2,
},
{
Name: "LargePositive", ResourcesYAML: `limits: {cpu: 10}`,
Result: 10,
},
{
Name: "RequestsAndLimits", ResourcesYAML: `{requests: {cpu: 2}, limits: {cpu: 4}}`,
Result: 4,
},
} {
t.Run(tt.Name, func(t *testing.T) {
var resources corev1.ResourceRequirements
assert.NilError(t, yaml.Unmarshal([]byte(tt.ResourcesYAML), &resources))
assert.Equal(t, tt.Result, largestWholeCPU(resources))
})
}
}

func TestUpgradeCommand(t *testing.T) {
expectScript := func(t *testing.T, script string) {
t.Helper()

t.Run("PrettyYAML", func(t *testing.T) {
b, err := yaml.Marshal(script)
assert.NilError(t, err)
assert.Assert(t, strings.HasPrefix(string(b), `|`),
"expected literal block scalar, got:\n%s", b)
})
}

t.Run("CPUs", func(t *testing.T) {
for _, tt := range []struct {
CPUs int
Jobs string
}{
{CPUs: 0, Jobs: "--jobs=1"},
{CPUs: 1, Jobs: "--jobs=1"},
{CPUs: 2, Jobs: "--jobs=1"},
{CPUs: 3, Jobs: "--jobs=2"},
{CPUs: 10, Jobs: "--jobs=9"},
} {
command := upgradeCommand(10, 11, "", tt.CPUs)
assert.Assert(t, len(command) > 3)
assert.DeepEqual(t, []string{"bash", "-ceu", "--"}, command[:3])

script := command[3]
assert.Assert(t, cmp.Contains(script, tt.Jobs))

expectScript(t, script)
}
})
}

func TestGenerateUpgradeJob(t *testing.T) {
ctx := context.Background()
reconciler := &PGUpgradeReconciler{}
Expand Down Expand Up @@ -120,11 +194,11 @@ spec:
echo -e "Step 5: Running pg_upgrade check...\n"
time /usr/pgsql-"${new_version}"/bin/pg_upgrade --old-bindir /usr/pgsql-"${old_version}"/bin \
--new-bindir /usr/pgsql-"${new_version}"/bin --old-datadir /pgdata/pg"${old_version}"\
--new-datadir /pgdata/pg"${new_version}" --link --check
--new-datadir /pgdata/pg"${new_version}" --link --check --jobs=1
echo -e "\nStep 6: Running pg_upgrade...\n"
time /usr/pgsql-"${new_version}"/bin/pg_upgrade --old-bindir /usr/pgsql-"${old_version}"/bin \
--new-bindir /usr/pgsql-"${new_version}"/bin --old-datadir /pgdata/pg"${old_version}" \
--new-datadir /pgdata/pg"${new_version}" --link
--new-datadir /pgdata/pg"${new_version}" --link --jobs=1
echo -e "\nStep 7: Copying patroni.dynamic.json...\n"
cp /pgdata/pg"${old_version}"/patroni.dynamic.json /pgdata/pg"${new_version}"
echo -e "\npg_upgrade Job Complete!"
Expand All @@ -149,6 +223,17 @@ spec:
status: {}
`))

t.Run(feature.PGUpgradeCPUConcurrency+"Enabled", func(t *testing.T) {
gate := feature.NewGate()
assert.NilError(t, gate.SetFromMap(map[string]bool{
feature.PGUpgradeCPUConcurrency: true,
}))
ctx := feature.NewContext(context.Background(), gate)

job := reconciler.generateUpgradeJob(ctx, upgrade, startup, "")
assert.Assert(t, cmp.MarshalContains(job, `--jobs=2`))
})

tdeJob := reconciler.generateUpgradeJob(ctx, upgrade, startup, "echo testKey")
assert.Assert(t, cmp.MarshalContains(tdeJob,
`/usr/pgsql-"${new_version}"/bin/initdb -k -D /pgdata/pg"${new_version}" --encryption-key-command "echo testKey"`))
Expand Down
20 changes: 12 additions & 8 deletions internal/feature/features.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,9 @@ const (
// Support custom sidecars for pgBouncer Pods
PGBouncerSidecars = "PGBouncerSidecars"

// Adjust PGUpgrade parallelism according to CPU resources
PGUpgradeCPUConcurrency = "PGUpgradeCPUConcurrency"

// Support tablespace volumes
TablespaceVolumes = "TablespaceVolumes"

Expand All @@ -95,14 +98,15 @@ func NewGate() MutableGate {
gate := featuregate.NewFeatureGate()

if err := gate.Add(map[Feature]featuregate.FeatureSpec{
AppendCustomQueries: {Default: false, PreRelease: featuregate.Alpha},
AutoCreateUserSchema: {Default: true, PreRelease: featuregate.Beta},
AutoGrowVolumes: {Default: false, PreRelease: featuregate.Alpha},
BridgeIdentifiers: {Default: false, PreRelease: featuregate.Alpha},
InstanceSidecars: {Default: false, PreRelease: featuregate.Alpha},
PGBouncerSidecars: {Default: false, PreRelease: featuregate.Alpha},
TablespaceVolumes: {Default: false, PreRelease: featuregate.Alpha},
VolumeSnapshots: {Default: false, PreRelease: featuregate.Alpha},
AppendCustomQueries: {Default: false, PreRelease: featuregate.Alpha},
AutoCreateUserSchema: {Default: true, PreRelease: featuregate.Beta},
AutoGrowVolumes: {Default: false, PreRelease: featuregate.Alpha},
BridgeIdentifiers: {Default: false, PreRelease: featuregate.Alpha},
InstanceSidecars: {Default: false, PreRelease: featuregate.Alpha},
PGBouncerSidecars: {Default: false, PreRelease: featuregate.Alpha},
PGUpgradeCPUConcurrency: {Default: false, PreRelease: featuregate.Alpha},
TablespaceVolumes: {Default: false, PreRelease: featuregate.Alpha},
VolumeSnapshots: {Default: false, PreRelease: featuregate.Alpha},
}); err != nil {
panic(err)
}
Expand Down
1 change: 1 addition & 0 deletions internal/feature/features_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ func TestDefaults(t *testing.T) {
assert.Assert(t, false == gate.Enabled(BridgeIdentifiers))
assert.Assert(t, false == gate.Enabled(InstanceSidecars))
assert.Assert(t, false == gate.Enabled(PGBouncerSidecars))
assert.Assert(t, false == gate.Enabled(PGUpgradeCPUConcurrency))
assert.Assert(t, false == gate.Enabled(TablespaceVolumes))
assert.Assert(t, false == gate.Enabled(VolumeSnapshots))
}
Expand Down

0 comments on commit 5fae3e9

Please sign in to comment.