diff --git a/pghoard/common.py b/pghoard/common.py index ee444b8c..2062fd78 100644 --- a/pghoard/common.py +++ b/pghoard/common.py @@ -15,6 +15,7 @@ import re import tarfile import tempfile +import threading import time from contextlib import suppress from dataclasses import dataclass, field @@ -24,13 +25,16 @@ from threading import Thread from typing import (TYPE_CHECKING, Any, BinaryIO, Callable, Dict, Final, Optional, Protocol, Tuple, cast) +from pydantic import BaseModel, Field from rohmu import IO_BLOCK_SIZE, BaseTransfer, rohmufile from rohmu.errors import Error, InvalidConfigurationError from rohmu.typing import FileLike, HasName from pghoard import pgutil +from pghoard.metrics import Metrics TAR_METADATA_FILENAME: Final[str] = ".pghoard_tar_metadata.json" +PROGRESS_FILE: Final[str] = "persisted_progress_file.json" LOG = logging.getLogger("pghoard.common") @@ -100,6 +104,55 @@ class BaseBackupMode(StrEnum): pipe = "pipe" +class ProgressData(BaseModel): + current_progress: float = 0 + last_updated_time: float = 0 + + @property + def age(self) -> float: + return time.time() - self.last_updated_time + + def update(self, current_progress: float) -> None: + self.current_progress = current_progress + self.last_updated_time = time.time() + + +class PersistedProgress(BaseModel): + progress: Dict[str, ProgressData] = Field(default_factory=dict) + _lock: threading.Lock = threading.Lock() + + @classmethod + def read(cls, metrics: Metrics) -> "PersistedProgress": + if os.path.exists(PROGRESS_FILE): + with open(PROGRESS_FILE, "r") as file: + try: + return cls.parse_raw(file.read()) + except Exception as ex: # pylint: disable=broad-except + LOG.exception("Failed to read persisted progress file: %r", ex) + metrics.unexpected_exception(ex, where="read_persisted_progress") + return cls() + + def write(self, metrics: Metrics): + with self._lock: + try: + with tempfile.NamedTemporaryFile("w", delete=False, dir=os.path.dirname(PROGRESS_FILE)) as temp_file: + temp_file.write(self.json()) + temp_path = temp_file.name + os.rename(temp_path, PROGRESS_FILE) + except Exception as ex: # pylint: disable=broad-except + LOG.exception("Failed to write persisted progress file: %r", ex) + metrics.unexpected_exception(ex, where="write_persisted_progress") + + def get(self, key: str) -> ProgressData: + self.progress.setdefault(key, ProgressData()) + return self.progress[key] + + def reset(self, key: str, metrics: Metrics) -> None: + if key in self.progress: + del self.progress[key] + self.write(metrics=metrics) + + def create_pgpass_file(connection_string_or_info): """Look up password from the given object which can be a dict or a string and write a possible password in a pgpass file; diff --git a/pghoard/transfer.py b/pghoard/transfer.py index 1af3daf3..44f4a61e 100644 --- a/pghoard/transfer.py +++ b/pghoard/transfer.py @@ -25,7 +25,7 @@ from rohmu.typing import Metadata from pghoard.common import ( - CallbackEvent, CallbackQueue, FileType, PGHoardThread, Queue, QuitEvent, StrEnum, create_alert_file, + CallbackEvent, CallbackQueue, FileType, PersistedProgress, PGHoardThread, Queue, QuitEvent, StrEnum, create_alert_file, get_object_storage_config ) from pghoard.fetcher import FileFetchManager @@ -147,14 +147,33 @@ def untrack_upload_event(self, file_key: str) -> None: def increment(self, file_key: str, total_bytes_uploaded: float) -> None: metric_data = {} + persisted_progress = PersistedProgress.read(metrics=self.metrics) + with self._tracked_events_lock: if file_key not in self._tracked_events: raise Exception(f"UploadEvent for {file_key} is not being tracked.") file_type = self._tracked_events[file_key].file_type if file_type in ( - FileType.Basebackup, FileType.Basebackup_chunk, FileType.Basebackup_delta, FileType.Basebackup_delta_chunk + FileType.Basebackup, + FileType.Basebackup_chunk, + FileType.Basebackup_delta, + FileType.Basebackup_delta_chunk, ): + progress_info = persisted_progress.get(file_key) + if total_bytes_uploaded > progress_info.current_progress: + progress_info.update(total_bytes_uploaded) + persisted_progress.write(metrics=self.metrics) + self.metrics.gauge("pghoard.basebackup_stalled", 0) + else: + stalled_age = progress_info.age + self.metrics.gauge("pghoard.basebackup_stalled", stalled_age) + if stalled_age >= 600: + self.log.warning( + "Upload for file %s has been stalled for %s seconds.", + file_key, + stalled_age, + ) metric_data = { "metric": "pghoard.basebackup_bytes_uploaded", "inc_value": total_bytes_uploaded, @@ -410,6 +429,13 @@ def run_safe(self): time.monotonic() - start_time ) + if file_to_transfer.operation in {TransferOperation.Upload} and filetype in ( + FileType.Basebackup, FileType.Basebackup_chunk, FileType.Basebackup_delta, FileType.Basebackup_delta_chunk + ): + if result.success: + persisted_progress = PersistedProgress.read(metrics=self.metrics) + persisted_progress.reset(key, metrics=self.metrics) + self.fetch_manager.stop() self.log.debug("Quitting TransferAgent") @@ -513,6 +539,10 @@ def handle_upload(self, site, key, file_to_transfer: UploadEvent): # Sleep for a bit to avoid busy looping. Increase sleep time if the op fails multiple times self.sleep(min(0.5 * 2 ** (file_to_transfer.retry_number - 1), 20)) - + if file_to_transfer.file_type in ( + FileType.Basebackup, FileType.Basebackup_chunk, FileType.Basebackup_delta, FileType.Basebackup_delta_chunk + ) and file_to_transfer.retry_number > 2: + persisted_progress = PersistedProgress.read(metrics=self.metrics) + persisted_progress.reset(key, metrics=self.metrics) self.transfer_queue.put(file_to_transfer) return None diff --git a/test/test_common.py b/test/test_common.py index d575f79b..7aaa42e5 100644 --- a/test/test_common.py +++ b/test/test_common.py @@ -14,8 +14,9 @@ from mock.mock import Mock from rohmu.errors import Error +from pghoard import metrics from pghoard.common import ( - TAR_METADATA_FILENAME, create_pgpass_file, default_json_serialization, download_backup_meta_file, + TAR_METADATA_FILENAME, PersistedProgress, create_pgpass_file, default_json_serialization, download_backup_meta_file, extract_pg_command_version_string, extract_pghoard_bb_v2_metadata, extract_pghoard_delta_metadata, json_encode, pg_major_version, pg_version_string_to_number, write_json_file ) @@ -88,6 +89,55 @@ def test_json_serialization(self, tmpdir): assert ob2 == ob2_ + def test_persisted_progress(self, mocker, tmp_path): + test_progress_file = tmp_path / "test_progress.json" + original_time = 1625072042.123456 + test_data = { + "progress": { + "0af668268d0fe14c6e269760b08d80a634c421b8381df25f31fbed5e8a8c8d8b": { + "current_progress": 100, + "last_updated_time": original_time + } + } + } + + with open(test_progress_file, "w") as file: + json.dump(test_data, file) + + mocker.patch("pghoard.common.PROGRESS_FILE", test_progress_file) + persisted_progress = PersistedProgress.read(metrics=metrics.Metrics(statsd={})) + assert "0af668268d0fe14c6e269760b08d80a634c421b8381df25f31fbed5e8a8c8d8b" in persisted_progress.progress + assert persisted_progress.progress["0af668268d0fe14c6e269760b08d80a634c421b8381df25f31fbed5e8a8c8d8b" + ].current_progress == 100 + assert persisted_progress.progress["0af668268d0fe14c6e269760b08d80a634c421b8381df25f31fbed5e8a8c8d8b" + ].last_updated_time == 1625072042.123456 + + new_progress = 200 + progress_info = persisted_progress.get("0af668268d0fe14c6e269760b08d80a634c421b8381df25f31fbed5e8a8c8d8b") + progress_info.update(new_progress) + persisted_progress.write(metrics=metrics.Metrics(statsd={})) + + updated_progress = PersistedProgress.read(metrics=metrics.Metrics(statsd={})) + assert updated_progress.progress["0af668268d0fe14c6e269760b08d80a634c421b8381df25f31fbed5e8a8c8d8b" + ].current_progress == new_progress + assert updated_progress.progress["0af668268d0fe14c6e269760b08d80a634c421b8381df25f31fbed5e8a8c8d8b" + ].last_updated_time > original_time + + def test_default_persisted_progress_creation(self, mocker, tmp_path): + tmp_file = tmp_path / "non_existent_progress.json" + assert not tmp_file.exists() + + mocker.patch("pghoard.common.PROGRESS_FILE", str(tmp_file)) + persisted_progress = PersistedProgress.read(metrics=metrics.Metrics(statsd={})) + + assert persisted_progress.progress == {} + persisted_progress.write(metrics=metrics.Metrics(statsd={})) + + assert tmp_file.exists() + with open(tmp_file, "r") as file: + data = json.load(file) + assert data == {"progress": {}} + def test_pg_major_version(): assert pg_major_version("10") == "10" diff --git a/test/test_transferagent.py b/test/test_transferagent.py index 703383fb..31475a96 100644 --- a/test/test_transferagent.py +++ b/test/test_transferagent.py @@ -15,7 +15,7 @@ from rohmu.errors import FileNotFoundFromStorageError, StorageError from pghoard import metrics -from pghoard.common import CallbackEvent, CallbackQueue, FileType, QuitEvent +from pghoard.common import (CallbackEvent, CallbackQueue, FileType, PersistedProgress, QuitEvent) from pghoard.transfer import (BaseTransferEvent, DownloadEvent, TransferAgent, UploadEvent, UploadEventProgressTracker) # pylint: disable=attribute-defined-outside-init @@ -316,3 +316,27 @@ def test_handle_metadata_error(self): evt = self.transfer_agent.handle_metadata(self.test_site, "foo", "bar") assert evt.success is False assert isinstance(evt.exception, Exception) + + def test_handle_upload_with_persisted_progress(self, mocker, tmp_path): + + temp_progress_file = tmp_path / "test_progress.json" + assert not temp_progress_file.exists() + + mocker.patch("pghoard.common.PROGRESS_FILE", temp_progress_file) + upload_event = UploadEvent( + backup_site_name="test_site", + file_type=FileType.Basebackup, + file_path=Path(self.foo_basebackup_path), + source_data=Path(self.foo_basebackup_path), + metadata={}, + file_size=3, + callback_queue=CallbackQueue(), + remove_after_upload=True + ) + + self.transfer_agent.handle_upload("test_site", self.foo_basebackup_path, upload_event) + updated_progress = PersistedProgress.read(metrics=metrics.Metrics(statsd={})) + assert temp_progress_file.exists() + assert updated_progress.progress[self.foo_basebackup_path].current_progress == 3 + if temp_progress_file.exists(): + temp_progress_file.unlink()