Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add progress based basebackup metrics #615

Merged
merged 1 commit into from
Mar 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion pghoard/basebackup/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
from pghoard.basebackup.delta import DeltaBaseBackup
from pghoard.common import (
TAR_METADATA_FILENAME, BackupFailure, BaseBackupFormat, BaseBackupMode, CallbackEvent, CompressionData, EncryptionData,
FileType, NoException, PGHoardThread, connection_string_using_pgpass, download_backup_meta_file,
FileType, NoException, PersistedProgress, PGHoardThread, connection_string_using_pgpass, download_backup_meta_file,
extract_pghoard_bb_v2_metadata, replication_connection_string_and_slot_using_pgpass, set_stream_nonblocking,
set_subprocess_stdout_and_stderr_nonblocking, terminate_subprocess
)
Expand Down Expand Up @@ -564,6 +564,8 @@ def run_local_tar_basebackup(self, delta: bool = False, with_delta_stats: bool =
db_conn.commit()

self.log.info("Starting to backup %r and %r tablespaces to %r", pgdata, len(tablespaces), compressed_base)
progress_instance = PersistedProgress()
progress_instance.reset_all(metrics=self.metrics)
start_time = time.monotonic()

if delta:
Expand Down
35 changes: 32 additions & 3 deletions pghoard/basebackup/delta.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,18 @@

from rohmu import BaseTransfer, rohmufile
from rohmu.dates import now
from rohmu.delta.common import (BackupManifest, BackupPath, SnapshotFile, SnapshotHash, SnapshotResult, SnapshotUploadResult)
from rohmu.delta.common import (
BackupManifest, BackupPath, ProgressMetrics, ProgressStep, SnapshotFile, SnapshotHash, SnapshotResult,
SnapshotUploadResult
)
from rohmu.delta.snapshot import Snapshotter
from rohmu.errors import FileNotFoundFromStorageError
from rohmu.typing import HasRead, HasSeek

from pghoard.basebackup.chunks import ChunkUploader
from pghoard.common import (
BackupFailure, BaseBackupFormat, CallbackQueue, CompressionData, EncryptionData, FileLikeWithName, FileType,
FileTypePrefixes, download_backup_meta_file, extract_pghoard_delta_metadata
FileTypePrefixes, PersistedProgress, download_backup_meta_file, extract_pghoard_delta_metadata
)
from pghoard.metrics import Metrics
from pghoard.transfer import TransferQueue, UploadEvent
Expand All @@ -45,6 +48,8 @@ class HasReadAndSeek(HasRead, HasSeek, Protocol):

FilesChunk = Set[Tuple]
SnapshotFiles = Dict[str, SnapshotFile]
PROGRESS_CHECK_INTERVAL = 10
STALLED_PROGRESS_THRESHOLD = 600

EMPTY_FILE_HASH = hashlib.blake2s().hexdigest()

Expand Down Expand Up @@ -73,9 +78,33 @@ def __init__(
self.tracked_snapshot_files: SnapshotFiles = self._list_existing_files()
self.chunk_uploader = chunk_uploader
self.data_file_format = data_file_format
self.last_flush_time: float = 0

def _snapshot(self, snapshotter: Snapshotter) -> SnapshotResult:
snapshotter.snapshot(reuse_old_snapshotfiles=False)
def progress_callback(progress_step: ProgressStep, progress_data: ProgressMetrics):
key = "snapshot_progress"
elapsed: float = time.monotonic() - self.last_flush_time
if elapsed > PROGRESS_CHECK_INTERVAL:
persisted_progress = PersistedProgress.read(self.metrics)
progress_info = persisted_progress.get(key)
tags: dict = {"phase": progress_step.value}

if progress_data["handled"] > progress_info.current_progress:
progress_info.update(progress_data["handled"])
persisted_progress.write(self.metrics)
self.last_flush_time = time.monotonic()
self.metrics.gauge("pghoard.seconds_since_backup_progress_stalled", 0, tags=tags)
else:
stalled_age = progress_info.age
self.metrics.gauge("pghoard.seconds_since_backup_progress_stalled", stalled_age, tags=tags)

if stalled_age >= STALLED_PROGRESS_THRESHOLD:
self.log.warning(
"Snapshot progress for %s has been stalled for %s seconds.", progress_step, stalled_age
)

self.last_flush_time = time.monotonic()
snapshotter.snapshot(reuse_old_snapshotfiles=False, progress_callback=progress_callback)
snapshot_result = SnapshotResult(end=None, state=None, hashes=None)
snapshot_result.state = snapshotter.get_snapshot_state()
snapshot_result.hashes = [
Expand Down
69 changes: 69 additions & 0 deletions pghoard/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import re
import tarfile
import tempfile
import threading
import time
from contextlib import suppress
from dataclasses import dataclass, field
Expand All @@ -24,13 +25,16 @@
from threading import Thread
from typing import (TYPE_CHECKING, Any, BinaryIO, Callable, Dict, Final, Optional, Protocol, Tuple, cast)

from pydantic import BaseModel, Field
from rohmu import IO_BLOCK_SIZE, BaseTransfer, rohmufile
from rohmu.errors import Error, InvalidConfigurationError
from rohmu.typing import FileLike, HasName

from pghoard import pgutil
from pghoard.metrics import Metrics

TAR_METADATA_FILENAME: Final[str] = ".pghoard_tar_metadata.json"
PROGRESS_FILE: Final[str] = "persisted_progress_file.json"

LOG = logging.getLogger("pghoard.common")

Expand Down Expand Up @@ -100,6 +104,71 @@ class BaseBackupMode(StrEnum):
pipe = "pipe"


class ProgressData(BaseModel):
current_progress: float = 0
last_updated_time: float = 0

@property
def age(self) -> float:
return time.time() - self.last_updated_time

def update(self, current_progress: float) -> None:
self.current_progress = current_progress
self.last_updated_time = time.time()


def atomic_write(file_path: str, data: str, temp_dir: Optional[str] = None):
temp_dir = temp_dir or os.path.dirname(file_path)
temp_file = None
try:
with tempfile.NamedTemporaryFile("w", delete=False, dir=temp_dir) as temp_file:
temp_file.write(data)
temp_path = temp_file.name
os.rename(temp_path, file_path)
except Exception as ex: # pylint: disable=broad-except
LOG.exception("Failed to write file atomically: %r", ex)
if temp_file:
with suppress(FileNotFoundError):
os.unlink(temp_file.name)


class PersistedProgress(BaseModel):
progress: Dict[str, ProgressData] = Field(default_factory=dict)
_lock: threading.Lock = threading.Lock()

@classmethod
def read(cls, metrics: Metrics) -> "PersistedProgress":
if os.path.exists(PROGRESS_FILE):
with open(PROGRESS_FILE, "r") as file:
try:
return cls.parse_raw(file.read())
except Exception as ex: # pylint: disable=broad-except
LOG.exception("Failed to read persisted progress file: %r", ex)
metrics.unexpected_exception(ex, where="read_persisted_progress")
return cls()

def write(self, metrics: Metrics):
with self._lock:
try:
data = self.json()
atomic_write(PROGRESS_FILE, data)
except Exception as ex: # pylint: disable=broad-except
metrics.unexpected_exception(ex, where="write_persisted_progress")

def get(self, key: str) -> ProgressData:
self.progress.setdefault(key, ProgressData())
return self.progress[key]

def reset(self, key: str, metrics: Metrics) -> None:
if key in self.progress:
del self.progress[key]
self.write(metrics=metrics)
sebinsunny marked this conversation as resolved.
Show resolved Hide resolved

def reset_all(self, metrics: Metrics) -> None:
self.progress = {}
self.write(metrics=metrics)


def create_pgpass_file(connection_string_or_info):
"""Look up password from the given object which can be a dict or a
string and write a possible password in a pgpass file;
Expand Down
36 changes: 33 additions & 3 deletions pghoard/transfer.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
from rohmu.typing import Metadata

from pghoard.common import (
CallbackEvent, CallbackQueue, FileType, PGHoardThread, Queue, QuitEvent, StrEnum, create_alert_file,
CallbackEvent, CallbackQueue, FileType, PersistedProgress, PGHoardThread, Queue, QuitEvent, StrEnum, create_alert_file,
get_object_storage_config
)
from pghoard.fetcher import FileFetchManager
Expand Down Expand Up @@ -147,14 +147,33 @@ def untrack_upload_event(self, file_key: str) -> None:

def increment(self, file_key: str, total_bytes_uploaded: float) -> None:
metric_data = {}
persisted_progress = PersistedProgress.read(metrics=self.metrics)

with self._tracked_events_lock:
if file_key not in self._tracked_events:
raise Exception(f"UploadEvent for {file_key} is not being tracked.")

file_type = self._tracked_events[file_key].file_type
if file_type in (
FileType.Basebackup, FileType.Basebackup_chunk, FileType.Basebackup_delta, FileType.Basebackup_delta_chunk
FileType.Basebackup,
FileType.Basebackup_chunk,
FileType.Basebackup_delta,
FileType.Basebackup_delta_chunk,
):
progress_info = persisted_progress.get(file_key)
if total_bytes_uploaded > progress_info.current_progress:
progress_info.update(total_bytes_uploaded)
persisted_progress.write(metrics=self.metrics)
self.metrics.gauge("pghoard.seconds_since_backup_progress_stalled", 0)
else:
stalled_age = progress_info.age
self.metrics.gauge("pghoard.seconds_since_backup_progress_stalled", stalled_age)
if stalled_age >= 600:
self.log.warning(
"Upload for file %s has been stalled for %s seconds.",
file_key,
stalled_age,
)
metric_data = {
"metric": "pghoard.basebackup_bytes_uploaded",
"inc_value": total_bytes_uploaded,
Expand Down Expand Up @@ -410,6 +429,13 @@ def run_safe(self):
time.monotonic() - start_time
)

if file_to_transfer.operation in {TransferOperation.Upload} and filetype in (
FileType.Basebackup, FileType.Basebackup_chunk, FileType.Basebackup_delta, FileType.Basebackup_delta_chunk
):
if result.success:
persisted_progress = PersistedProgress.read(metrics=self.metrics)
persisted_progress.reset(key, metrics=self.metrics)

self.fetch_manager.stop()
self.log.debug("Quitting TransferAgent")

Expand Down Expand Up @@ -513,6 +539,10 @@ def handle_upload(self, site, key, file_to_transfer: UploadEvent):

# Sleep for a bit to avoid busy looping. Increase sleep time if the op fails multiple times
self.sleep(min(0.5 * 2 ** (file_to_transfer.retry_number - 1), 20))

if file_to_transfer.file_type in (
FileType.Basebackup, FileType.Basebackup_chunk, FileType.Basebackup_delta, FileType.Basebackup_delta_chunk
) and file_to_transfer.retry_number < 2:
persisted_progress = PersistedProgress.read(metrics=self.metrics)
persisted_progress.reset(key, metrics=self.metrics)
sebinsunny marked this conversation as resolved.
Show resolved Hide resolved
sebinsunny marked this conversation as resolved.
Show resolved Hide resolved
self.transfer_queue.put(file_to_transfer)
return None
43 changes: 30 additions & 13 deletions test/basebackup/test_delta.py
Original file line number Diff line number Diff line change
Expand Up @@ -442,22 +442,39 @@ def test_upload_single_delta_files_progress(
delta_hashes = {file_hash for _, file_hash in delta_files}

with patch.object(deltabasebackup, "_delta_upload_hexdigest") as mock_delta_upload_hexdigest, \
patch.object(deltabasebackup, "metrics") as mock_metrics, \
patch.object(deltabasebackup, "metrics") as mock_metrics, \
patch.object(snapshotter, "update_snapshot_file_data"):
mock_delta_upload_hexdigest.side_effect = [(200, 10, file_hash, True) for file_hash in delta_hashes]
with snapshotter.lock:
deltabasebackup._snapshot(snapshotter=snapshotter) # pylint: disable=protected-access
deltabasebackup._upload_single_delta_files( # pylint: disable=protected-access
todo_hexdigests=delta_hashes, snapshotter=snapshotter, progress=initial_progress
)
expected_calls = [
mock.call(
"pghoard.basebackup_estimated_progress",
initial_progress + (idx + 1) * (100 - initial_progress) / files_count,
tags={"site": "delta"}
) for idx in range(files_count)
]
assert mock_metrics.gauge.mock_calls == expected_calls
with patch("pghoard.basebackup.delta.PROGRESS_CHECK_INTERVAL", new=1):
deltabasebackup._snapshot(snapshotter=snapshotter) # pylint: disable=protected-access
deltabasebackup._upload_single_delta_files( # pylint: disable=protected-access
todo_hexdigests=delta_hashes, snapshotter=snapshotter, progress=initial_progress
)
expected_calls = [
mock.call(
"pghoard.seconds_since_backup_progress_stalled", 0, tags={"phase": "creating_missing_directories"}
),
mock.call("pghoard.seconds_since_backup_progress_stalled", 0, tags={"phase": "adding_missing_files"}),
]

expected_calls += [
mock.call(
"pghoard.seconds_since_backup_progress_stalled",
0,
tags={"phase": "processing_and_hashing_snapshot_files"}
) for _ in range(files_count)
]

expected_calls = [
mock.call(
"pghoard.basebackup_estimated_progress",
initial_progress + (idx + 1) * (100 - initial_progress) / files_count,
tags={"site": "delta"}
) for idx in range(files_count)
]

assert mock_metrics.gauge.mock_calls == expected_calls


def test_upload_single_delta_files(
Expand Down
52 changes: 51 additions & 1 deletion test/test_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,9 @@
from mock.mock import Mock
from rohmu.errors import Error

from pghoard import metrics
from pghoard.common import (
TAR_METADATA_FILENAME, create_pgpass_file, default_json_serialization, download_backup_meta_file,
TAR_METADATA_FILENAME, PersistedProgress, create_pgpass_file, default_json_serialization, download_backup_meta_file,
extract_pg_command_version_string, extract_pghoard_bb_v2_metadata, extract_pghoard_delta_metadata, json_encode,
pg_major_version, pg_version_string_to_number, write_json_file
)
Expand Down Expand Up @@ -88,6 +89,55 @@ def test_json_serialization(self, tmpdir):

assert ob2 == ob2_

def test_persisted_progress(self, mocker, tmp_path):
test_progress_file = tmp_path / "test_progress.json"
original_time = 1625072042.123456
test_data = {
"progress": {
"0af668268d0fe14c6e269760b08d80a634c421b8381df25f31fbed5e8a8c8d8b": {
"current_progress": 100,
"last_updated_time": original_time
}
}
}

with open(test_progress_file, "w") as file:
json.dump(test_data, file)

mocker.patch("pghoard.common.PROGRESS_FILE", test_progress_file)
persisted_progress = PersistedProgress.read(metrics=metrics.Metrics(statsd={}))
assert "0af668268d0fe14c6e269760b08d80a634c421b8381df25f31fbed5e8a8c8d8b" in persisted_progress.progress
assert persisted_progress.progress["0af668268d0fe14c6e269760b08d80a634c421b8381df25f31fbed5e8a8c8d8b"
].current_progress == 100
assert persisted_progress.progress["0af668268d0fe14c6e269760b08d80a634c421b8381df25f31fbed5e8a8c8d8b"
].last_updated_time == 1625072042.123456

new_progress = 200
progress_info = persisted_progress.get("0af668268d0fe14c6e269760b08d80a634c421b8381df25f31fbed5e8a8c8d8b")
progress_info.update(new_progress)
persisted_progress.write(metrics=metrics.Metrics(statsd={}))

updated_progress = PersistedProgress.read(metrics=metrics.Metrics(statsd={}))
assert updated_progress.progress["0af668268d0fe14c6e269760b08d80a634c421b8381df25f31fbed5e8a8c8d8b"
].current_progress == new_progress
assert updated_progress.progress["0af668268d0fe14c6e269760b08d80a634c421b8381df25f31fbed5e8a8c8d8b"
].last_updated_time > original_time

def test_default_persisted_progress_creation(self, mocker, tmp_path):
tmp_file = tmp_path / "non_existent_progress.json"
assert not tmp_file.exists()

mocker.patch("pghoard.common.PROGRESS_FILE", str(tmp_file))
persisted_progress = PersistedProgress.read(metrics=metrics.Metrics(statsd={}))

assert persisted_progress.progress == {}
persisted_progress.write(metrics=metrics.Metrics(statsd={}))

assert tmp_file.exists()
with open(tmp_file, "r") as file:
data = json.load(file)
assert data == {"progress": {}}


def test_pg_major_version():
assert pg_major_version("10") == "10"
Expand Down
Loading
Loading