Skip to content

Commit

Permalink
This PR improves monitoring of pg basebackups. During a backup, it re…
Browse files Browse the repository at this point in the history
…gularly checks how much data has been uploaded and compares this to the last recorded amount in a persisted progress file. If the upload is progressing, it updates the record with the new data and current time. If the backup has not advanced compared to the previous value, it reports the time elapsed since the last progress and emits stalled metrics. Once a backup is complete, the record is reset.

[SRE-7631]
  • Loading branch information
sebinsunny committed Feb 7, 2024
1 parent 2b2ea98 commit cf6b3ef
Show file tree
Hide file tree
Showing 4 changed files with 122 additions and 4 deletions.
38 changes: 38 additions & 0 deletions pghoard/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,15 @@
from threading import Thread
from typing import (TYPE_CHECKING, Any, BinaryIO, Callable, Dict, Final, Optional, Protocol, Tuple, cast)

from pydantic import BaseModel, ValidationError
from rohmu import IO_BLOCK_SIZE, BaseTransfer, rohmufile
from rohmu.errors import Error, InvalidConfigurationError
from rohmu.typing import FileLike, HasName

from pghoard import pgutil

TAR_METADATA_FILENAME: Final[str] = ".pghoard_tar_metadata.json"
PROGRESS_FILE: Final[str] = "persisted_progress_file.json"

LOG = logging.getLogger("pghoard.common")

Expand Down Expand Up @@ -100,6 +102,42 @@ class BaseBackupMode(StrEnum):
pipe = "pipe"


class ProgressData(BaseModel):
current_progress: float
last_updated_time: float


class PersistedProgress(BaseModel):
progress: Dict[str, ProgressData] = {}

@classmethod
def read_persisted_progress(cls) -> "PersistedProgress":
if os.path.exists(PROGRESS_FILE):
with open(PROGRESS_FILE, "r") as file:
try:
return cls.parse_raw(file.read())
except ValidationError as e:
print(f"Validation error: {e}")
return cls()

def update_persisted_progress(self, key: str, current_progress: float, current_time: float) -> None:
self.progress[key] = ProgressData(current_progress=current_progress, last_updated_time=current_time)
self.save()

def reset_persisted_progress(self, key: str) -> None:
if key in self.progress:
del self.progress[key]
self.save()

def get_persisted_progress_for_key(self, key: str) -> ProgressData:
default_progress = ProgressData(current_progress=0, last_updated_time=time.monotonic())
return self.progress.get(key, default_progress)

def save(self):
with open(PROGRESS_FILE, "w") as file:
file.write(self.json())


def create_pgpass_file(connection_string_or_info):
"""Look up password from the given object which can be a dict or a
string and write a possible password in a pgpass file;
Expand Down
28 changes: 26 additions & 2 deletions pghoard/transfer.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
from rohmu.typing import Metadata

from pghoard.common import (
CallbackEvent, CallbackQueue, FileType, PGHoardThread, Queue, QuitEvent, StrEnum, create_alert_file,
CallbackEvent, CallbackQueue, FileType, PersistedProgress, PGHoardThread, Queue, QuitEvent, StrEnum, create_alert_file,
get_object_storage_config
)
from pghoard.fetcher import FileFetchManager
Expand Down Expand Up @@ -147,6 +147,9 @@ def untrack_upload_event(self, file_key: str) -> None:

def increment(self, file_key: str, total_bytes_uploaded: float) -> None:
metric_data = {}
now = time.monotonic()
persisted_progress = PersistedProgress.read_persisted_progress()

with self._tracked_events_lock:
if file_key not in self._tracked_events:
raise Exception(f"UploadEvent for {file_key} is not being tracked.")
Expand All @@ -155,6 +158,16 @@ def increment(self, file_key: str, total_bytes_uploaded: float) -> None:
if file_type in (
FileType.Basebackup, FileType.Basebackup_chunk, FileType.Basebackup_delta, FileType.Basebackup_delta_chunk
):
progress_info = persisted_progress.get_persisted_progress_for_key(file_key)
last_persisted_progress = progress_info.current_progress
last_persisted_epoch = progress_info.last_updated_time
if total_bytes_uploaded > last_persisted_progress:
persisted_progress.update_persisted_progress(file_key, total_bytes_uploaded, now)
self.metrics.gauge("basebackup_stalled", 0)
elif total_bytes_uploaded <= last_persisted_progress:
stalled_time = now - last_persisted_epoch
self.metrics.gauge("basebackup_stalled", stalled_time)
self.log.warning("Upload for file %s has been stalled for %s seconds.", file_key, stalled_time)
metric_data = {
"metric": "pghoard.basebackup_bytes_uploaded",
"inc_value": total_bytes_uploaded,
Expand Down Expand Up @@ -410,6 +423,13 @@ def run_safe(self):
time.monotonic() - start_time
)

if file_to_transfer.operation in {TransferOperation.Upload} and filetype in (
FileType.Basebackup, FileType.Basebackup_chunk, FileType.Basebackup_delta, FileType.Basebackup_delta_chunk
):
if result.success:
persisted_progress = PersistedProgress.read_persisted_progress()
persisted_progress.reset_persisted_progress(key)

self.fetch_manager.stop()
self.log.debug("Quitting TransferAgent")

Expand Down Expand Up @@ -513,6 +533,10 @@ def handle_upload(self, site, key, file_to_transfer: UploadEvent):

# Sleep for a bit to avoid busy looping. Increase sleep time if the op fails multiple times
self.sleep(min(0.5 * 2 ** (file_to_transfer.retry_number - 1), 20))

if file_to_transfer.file_type in (
FileType.Basebackup, FileType.Basebackup_chunk, FileType.Basebackup_delta, FileType.Basebackup_delta_chunk
):
persisted_progress = PersistedProgress.read_persisted_progress()
persisted_progress.reset_persisted_progress(key)
self.transfer_queue.put(file_to_transfer)
return None
40 changes: 39 additions & 1 deletion test/test_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,14 @@
import os
from pathlib import Path
from typing import Any, Dict
from unittest.mock import patch

import pytest
from mock.mock import Mock
from rohmu.errors import Error

from pghoard.common import (
TAR_METADATA_FILENAME, create_pgpass_file, default_json_serialization, download_backup_meta_file,
TAR_METADATA_FILENAME, PersistedProgress, create_pgpass_file, default_json_serialization, download_backup_meta_file,
extract_pg_command_version_string, extract_pghoard_bb_v2_metadata, extract_pghoard_delta_metadata, json_encode,
pg_major_version, pg_version_string_to_number, write_json_file
)
Expand Down Expand Up @@ -88,6 +89,43 @@ def test_json_serialization(self, tmpdir):

assert ob2 == ob2_

def test_persisted_progress(self):
test_progress_file = "test_progress.json"

test_data = {
"progress": {
"0af668268d0fe14c6e269760b08d80a634c421b8381df25f31fbed5e8a8c8d8b": {
"current_progress": 100,
"last_updated_time": 1625072042.123456
}
}
}

with open(test_progress_file, "w") as file:
json.dump(test_data, file)

with patch("pghoard.common.PROGRESS_FILE", test_progress_file):
persisted_progress = PersistedProgress.read_persisted_progress()
assert "0af668268d0fe14c6e269760b08d80a634c421b8381df25f31fbed5e8a8c8d8b" in persisted_progress.progress
assert persisted_progress.progress["0af668268d0fe14c6e269760b08d80a634c421b8381df25f31fbed5e8a8c8d8b"
].current_progress == 100
assert persisted_progress.progress["0af668268d0fe14c6e269760b08d80a634c421b8381df25f31fbed5e8a8c8d8b"
].last_updated_time == 1625072042.123456

new_progress = 200
new_time = 1625072099.123456
persisted_progress.update_persisted_progress(
"0af668268d0fe14c6e269760b08d80a634c421b8381df25f31fbed5e8a8c8d8b", new_progress, new_time
)

updated_progress = PersistedProgress.read_persisted_progress()
assert updated_progress.progress["0af668268d0fe14c6e269760b08d80a634c421b8381df25f31fbed5e8a8c8d8b"
].current_progress == new_progress
assert updated_progress.progress["0af668268d0fe14c6e269760b08d80a634c421b8381df25f31fbed5e8a8c8d8b"
].last_updated_time == new_time

os.remove(test_progress_file)


def test_pg_major_version():
assert pg_major_version("10") == "10"
Expand Down
20 changes: 19 additions & 1 deletion test/test_transferagent.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
from rohmu.errors import FileNotFoundFromStorageError, StorageError

from pghoard import metrics
from pghoard.common import CallbackEvent, CallbackQueue, FileType, QuitEvent
from pghoard.common import (PROGRESS_FILE, CallbackEvent, CallbackQueue, FileType, PersistedProgress, QuitEvent)
from pghoard.transfer import (BaseTransferEvent, DownloadEvent, TransferAgent, UploadEvent, UploadEventProgressTracker)

# pylint: disable=attribute-defined-outside-init
Expand Down Expand Up @@ -316,3 +316,21 @@ def test_handle_metadata_error(self):
evt = self.transfer_agent.handle_metadata(self.test_site, "foo", "bar")
assert evt.success is False
assert isinstance(evt.exception, Exception)

def test_handle_upload_with_persisted_progress(self):
upload_event = UploadEvent(
backup_site_name="test_site",
file_type=FileType.Basebackup,
file_path=Path(self.foo_basebackup_path),
source_data=Path(self.foo_basebackup_path),
metadata={},
file_size=3,
callback_queue=CallbackQueue(),
remove_after_upload=True
)

self.transfer_agent.handle_upload("test_site", self.foo_basebackup_path, upload_event)
updated_progress = PersistedProgress.read_persisted_progress()
assert updated_progress.progress[self.foo_basebackup_path].current_progress == 3
if os.path.exists(PROGRESS_FILE):
os.remove(PROGRESS_FILE)

0 comments on commit cf6b3ef

Please sign in to comment.