Skip to content

Commit

Permalink
Backup restore fixes (#120)
Browse files Browse the repository at this point in the history
* Add restore to new cluster integration test

* Fix unit name

* Change fixture scope

* Fix application removal

* Add auxiliar code

* Add backup logic code

* Fix pgBackRest path

* Fix data path

* Fix remaining details

* Move to single relation

* Block charm when using another cluster repo

* Add fixes from the k8s charm

* Extract logic to new method

* Remaining fixes copied from k8s charm

* Fix new blocked status check

* Rollback order of workload failures call
  • Loading branch information
marceloneppel authored May 28, 2023
1 parent 9e561ff commit 526756e
Show file tree
Hide file tree
Showing 9 changed files with 379 additions and 65 deletions.
182 changes: 152 additions & 30 deletions src/backups.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,12 @@
import tempfile
from datetime import datetime
from pathlib import Path
from subprocess import PIPE, run
from typing import Dict, List, Optional, Tuple
from subprocess import PIPE, TimeoutExpired, run
from typing import Dict, List, Optional, OrderedDict, Tuple

import boto3 as boto3
import botocore
from botocore.exceptions import ClientError
from charms.data_platform_libs.v0.s3 import CredentialsChangedEvent, S3Requirer
from charms.operator_libs_linux.v1 import snap
from jinja2 import Template
Expand All @@ -39,6 +40,12 @@

logger = logging.getLogger(__name__)

ANOTHER_CLUSTER_REPOSITORY_ERROR_MESSAGE = "the S3 repository has backups from another cluster"
FAILED_TO_ACCESS_CREATE_BUCKET_ERROR_MESSAGE = (
"failed to access/create the bucket, check your S3 settings"
)
FAILED_TO_INITIALIZE_STANZA_ERROR_MESSAGE = "failed to initialize stanza, check your S3 settings"


class ListBackupsError(Exception):
"""Raised when pgBackRest fails to list backups."""
Expand All @@ -62,6 +69,11 @@ def __init__(self, charm, relation_name: str):
self.framework.observe(self.charm.on.list_backups_action, self._on_list_backups_action)
self.framework.observe(self.charm.on.restore_action, self._on_restore_action)

@property
def stanza_name(self) -> str:
"""Stanza name, composed by model and cluster name."""
return f"{self.model.name}.{self.charm.cluster_name}"

def _are_backup_settings_ok(self) -> Tuple[bool, Optional[str]]:
"""Validates whether backup settings are OK."""
if self.model.get_relation(self.relation_name) is None:
Expand Down Expand Up @@ -95,6 +107,35 @@ def _can_unit_perform_backup(self) -> Tuple[bool, Optional[str]]:

return self._are_backup_settings_ok()

def can_use_s3_repository(self) -> Tuple[bool, Optional[str]]:
"""Returns whether the charm was configured to use another cluster repository."""
# Prevent creating backups and storing in another cluster repository.
try:
return_code, stdout, stderr = self._execute_command(
[PGBACKREST_EXECUTABLE, PGBACKREST_CONFIGURATION_FILE, "info", "--output=json"],
timeout=30,
)
except TimeoutExpired as e:
logger.error(str(e))
return False, FAILED_TO_INITIALIZE_STANZA_ERROR_MESSAGE

else:
if return_code != 0:
logger.error(stderr)
return False, FAILED_TO_INITIALIZE_STANZA_ERROR_MESSAGE

if self.charm.unit.is_leader():
for stanza in json.loads(stdout):
if stanza.get("name") != self.charm.app_peer_data.get("stanza", self.stanza_name):
# Prevent archiving of WAL files.
self.charm.app_peer_data.update({"stanza": ""})
self.charm.update_config()
if self.charm._patroni.member_started:
self.charm._patroni.reload_patroni_configuration()
return False, ANOTHER_CLUSTER_REPOSITORY_ERROR_MESSAGE

return True, None

def _change_connectivity_to_database(self, connectivity: bool) -> None:
"""Enable or disable the connectivity to the database."""
self.charm.unit_peer_data.update({"connectivity": "on" if connectivity else "off"})
Expand Down Expand Up @@ -122,6 +163,44 @@ def _construct_endpoint(self, s3_parameters: Dict) -> str:

return endpoint

def _create_bucket_if_not_exists(self) -> None:
s3_parameters, missing_parameters = self._retrieve_s3_parameters()
if missing_parameters:
return

bucket_name = s3_parameters["bucket"]
region = s3_parameters.get("region")
session = boto3.session.Session(
aws_access_key_id=s3_parameters["access-key"],
aws_secret_access_key=s3_parameters["secret-key"],
region_name=s3_parameters["region"],
)

try:
s3 = session.resource("s3", endpoint_url=self._construct_endpoint(s3_parameters))
except ValueError as e:
logger.exception("Failed to create a session '%s' in region=%s.", bucket_name, region)
raise e
bucket = s3.Bucket(bucket_name)
try:
bucket.meta.client.head_bucket(Bucket=bucket_name)
logger.info("Bucket %s exists.", bucket_name)
exists = True
except ClientError:
logger.warning("Bucket %s doesn't exist or you don't have access to it.", bucket_name)
exists = False
if not exists:
try:
bucket.create(CreateBucketConfiguration={"LocationConstraint": region})

bucket.wait_until_exists()
logger.info("Created bucket '%s' in region=%s", bucket_name, region)
except ClientError as error:
logger.exception(
"Couldn't create bucket named '%s' in region=%s.", bucket_name, region
)
raise error

def _empty_data_files(self) -> bool:
"""Empty the PostgreSQL data directory in preparation of backup restore."""
try:
Expand Down Expand Up @@ -196,31 +275,41 @@ def _generate_backup_list_output(self) -> str:
backup_list.append((backup_id, "physical", backup_status))
return self._format_backup_list(backup_list)

def _list_backups_ids(self, show_failed: bool) -> List[str]:
"""Retrieve the list of backup ids.
def _list_backups(self, show_failed: bool) -> OrderedDict[str, str]:
"""Retrieve the list of backups.
Args:
show_failed: whether to also return the failed backups.
Returns:
the list of previously created backups or an empty list if there is no backups
in the S3 bucket.
a dict of previously created backups (id + stanza name) or an empty list
if there is no backups in the S3 bucket.
"""
return_code, output, stderr = self._execute_command(
[PGBACKREST_EXECUTABLE, PGBACKREST_CONFIGURATION_FILE, "info", "--output=json"]
)
if return_code != 0:
raise ListBackupsError(f"Failed to list backups with error: {stderr}")

backups = json.loads(output)[0]["backup"]
return [
datetime.strftime(
datetime.strptime(backup["label"][:-1], PGBACKREST_BACKUP_ID_FORMAT),
BACKUP_ID_FORMAT,
repository_info = next(iter(json.loads(output)), None)

# If there are no backups, returns an empty dict.
if repository_info is None:
return OrderedDict[str, str]()

backups = repository_info["backup"]
stanza_name = repository_info["name"]
return OrderedDict[str, str](
(
datetime.strftime(
datetime.strptime(backup["label"][:-1], PGBACKREST_BACKUP_ID_FORMAT),
BACKUP_ID_FORMAT,
),
stanza_name,
)
for backup in backups
if show_failed or not backup["error"]
]
)

def _initialise_stanza(self) -> None:
"""Initialize the stanza.
Expand All @@ -232,7 +321,13 @@ def _initialise_stanza(self) -> None:
if not self.charm.unit.is_leader():
return

if self.charm.is_blocked:
# Enable stanza initialisation if the backup settings were fixed after being invalid
# or pointing to a repository where there are backups from another cluster.
if self.charm.is_blocked and self.charm.unit.status.message not in [
ANOTHER_CLUSTER_REPOSITORY_ERROR_MESSAGE,
FAILED_TO_ACCESS_CREATE_BUCKET_ERROR_MESSAGE,
FAILED_TO_INITIALIZE_STANZA_ERROR_MESSAGE,
]:
logger.warning("couldn't initialize stanza due to a blocked status")
return

Expand All @@ -243,17 +338,17 @@ def _initialise_stanza(self) -> None:
[
PGBACKREST_EXECUTABLE,
PGBACKREST_CONFIGURATION_FILE,
f"--stanza={self.charm.cluster_name}",
f"--stanza={self.stanza_name}",
"stanza-create",
]
)
if return_code != 0:
logger.error(stderr)
self.charm.unit.status = BlockedStatus("failed to initialize stanza")
self.charm.unit.status = BlockedStatus(FAILED_TO_INITIALIZE_STANZA_ERROR_MESSAGE)
return

# Store the stanza name to be used in configurations updates.
self.charm.app_peer_data.update({"stanza": self.charm.cluster_name})
self.charm.app_peer_data.update({"stanza": self.stanza_name})

# Update the configuration to use pgBackRest as the archiving mechanism.
self.charm.update_config()
Expand All @@ -268,18 +363,21 @@ def _initialise_stanza(self) -> None:
[
PGBACKREST_EXECUTABLE,
PGBACKREST_CONFIGURATION_FILE,
f"--stanza={self.charm.cluster_name}",
f"--stanza={self.stanza_name}",
"check",
]
)
if return_code != 0:
raise Exception(stderr)
self.charm.unit.status = ActiveStatus()
except RetryError as e:
# If the check command doesn't succeed, remove the stanza name
# and rollback the configuration.
self.charm.app_peer_data.update({"stanza": ""})
self.charm.update_config()

logger.exception(e)
self.charm.unit.status = BlockedStatus(
f"failed to initialize stanza with error {str(e)}"
)
self.charm.unit.status = BlockedStatus(FAILED_TO_INITIALIZE_STANZA_ERROR_MESSAGE)

@property
def _is_primary_pgbackrest_service_running(self) -> bool:
Expand All @@ -303,6 +401,17 @@ def _on_s3_credential_changed(self, event: CredentialsChangedEvent):
logger.debug("Cannot set pgBackRest configurations, missing configurations.")
return

try:
self._create_bucket_if_not_exists()
except (ClientError, ValueError):
self.charm.unit.status = BlockedStatus(FAILED_TO_ACCESS_CREATE_BUCKET_ERROR_MESSAGE)
return

can_use_s3_repository, validation_message = self.can_use_s3_repository()
if not can_use_s3_repository:
self.charm.unit.status = BlockedStatus(validation_message)
return

self._initialise_stanza()

self.start_stop_pgbackrest_service()
Expand Down Expand Up @@ -331,7 +440,7 @@ def _on_create_backup_action(self, event) -> None:
metadata,
os.path.join(
s3_parameters["path"],
f"backup/{self.charm.cluster_name}/latest",
f"backup/{self.stanza_name}/latest",
),
s3_parameters,
):
Expand All @@ -351,7 +460,7 @@ def _on_create_backup_action(self, event) -> None:
command = [
PGBACKREST_EXECUTABLE,
PGBACKREST_CONFIGURATION_FILE,
f"--stanza={self.charm.cluster_name}",
f"--stanza={self.stanza_name}",
"--log-level-console=debug",
"--type=full",
"backup",
Expand Down Expand Up @@ -385,14 +494,14 @@ def _on_create_backup_action(self, event) -> None:
logs,
os.path.join(
s3_parameters["path"],
f"backup/{self.charm.cluster_name}/{backup_id}/backup.log",
f"backup/{self.stanza_name}/{backup_id}/backup.log",
),
s3_parameters,
)
event.fail("Failed to backup PostgreSQL")
else:
try:
backup_id = self._list_backups_ids(show_failed=True)[-1]
backup_id = list(self._list_backups(show_failed=True).keys())[-1]
except ListBackupsError as e:
logger.exception(e)
event.fail("Failed to check backup id")
Expand All @@ -408,7 +517,7 @@ def _on_create_backup_action(self, event) -> None:
logs,
os.path.join(
s3_parameters["path"],
f"backup/{self.charm.cluster_name}/{backup_id}/backup.log",
f"backup/{self.stanza_name}/{backup_id}/backup.log",
),
s3_parameters,
):
Expand Down Expand Up @@ -448,7 +557,8 @@ def _on_restore_action(self, event):
# Validate the provided backup id.
logger.info("Validating provided backup-id")
try:
if backup_id not in self._list_backups_ids(show_failed=False):
backups = self._list_backups(show_failed=False)
if backup_id not in backups.keys():
event.fail(f"Invalid backup-id: {backup_id}")
return
except ListBackupsError as e:
Expand Down Expand Up @@ -476,8 +586,8 @@ def _on_restore_action(self, event):
logger.info("Configuring Patroni to restore the backup")
self.charm.app_peer_data.update(
{
"archive-mode": "off",
"restoring-backup": f"{datetime.strftime(datetime.strptime(backup_id, BACKUP_ID_FORMAT), PGBACKREST_BACKUP_ID_FORMAT)}F",
"restore-stanza": backups[backup_id],
}
)
self.charm.update_config()
Expand Down Expand Up @@ -512,12 +622,21 @@ def _pre_restore_checks(self, event: ActionEvent) -> bool:
Returns:
a boolean indicating whether restore should be run.
"""
are_backup_settings_ok, validation_message = self._are_backup_settings_ok()
if not are_backup_settings_ok:
logger.warning(validation_message)
event.fail(validation_message)
return False

if not event.params.get("backup-id"):
event.fail("Missing backup-id to restore")
return False

logger.info("Checking if cluster is in blocked state")
if self.charm.is_blocked:
if (
self.charm.is_blocked
and self.charm.unit.status.message != ANOTHER_CLUSTER_REPOSITORY_ERROR_MESSAGE
):
error_message = "Cluster or unit is in a blocking state"
logger.warning(error_message)
event.fail(error_message)
Expand Down Expand Up @@ -565,7 +684,7 @@ def _render_pgbackrest_conf_file(self) -> bool:
s3_uri_style=s3_parameters["s3-uri-style"],
access_key=s3_parameters["access-key"],
secret_key=s3_parameters["secret-key"],
stanza=self.charm.cluster_name,
stanza=self.stanza_name,
storage_path=self.charm._storage_path,
user=BACKUP_USER,
)
Expand All @@ -576,7 +695,7 @@ def _render_pgbackrest_conf_file(self) -> bool:

def _restart_database(self) -> None:
"""Removes the restoring backup flag and restart the database."""
self.charm.app_peer_data.update({"archive-mode": "", "restoring-backup": ""})
self.charm.app_peer_data.update({"restoring-backup": ""})
self.charm.update_config()
self.charm._patroni.start_patroni()

Expand All @@ -597,6 +716,9 @@ def _retrieve_s3_parameters(self) -> Tuple[Dict, List[str]]:
)
return {}, missing_required_parameters

# Retrieve the backup path, strip its slashes and add a "/" in the beginning of the path.
s3_parameters["path"] = f'/{s3_parameters["path"].strip("/")}'

# Add some sensible defaults (as expected by the code) for missing optional parameters
s3_parameters.setdefault("endpoint", "https://s3.amazonaws.com")
s3_parameters.setdefault("region")
Expand Down
Loading

0 comments on commit 526756e

Please sign in to comment.