From 4580ab8bf35f60bb1bbf119cc508de12bbbba7f9 Mon Sep 17 00:00:00 2001 From: Josh Borrow Date: Thu, 25 Jan 2024 09:38:14 -0500 Subject: [PATCH 01/22] Add core error logging framework --- .../versions/71df5b41ae41_initial_schema.py | 62 +++++--------- hera_librarian/errors.py | 33 ++++++++ hera_librarian/transfer.py | 23 ++++++ librarian_server/logger.py | 43 +++++++++- librarian_server/orm/__init__.py | 3 +- librarian_server/orm/errors.py | 73 +++++++++++++++++ librarian_server/orm/transfer.py | 21 +---- tests/server_unit_test/test_error.py | 80 +++++++++++++++++++ 8 files changed, 273 insertions(+), 65 deletions(-) create mode 100644 hera_librarian/errors.py create mode 100644 hera_librarian/transfer.py create mode 100644 librarian_server/orm/errors.py create mode 100644 tests/server_unit_test/test_error.py diff --git a/alembic/versions/71df5b41ae41_initial_schema.py b/alembic/versions/71df5b41ae41_initial_schema.py index 0a7926e..ce97ad3 100644 --- a/alembic/versions/71df5b41ae41_initial_schema.py +++ b/alembic/versions/71df5b41ae41_initial_schema.py @@ -14,51 +14,16 @@ branch_labels = None depends_on = None -from alembic import op -from sqlalchemy import ( - Column, - DateTime, - BigInteger, - String, - Integer, - PrimaryKeyConstraint, - ForeignKey, - Enum, - PickleType, - Boolean, -) - import enum +from sqlalchemy import (BigInteger, Boolean, Column, DateTime, Enum, + ForeignKey, Integer, PickleType, PrimaryKeyConstraint, + String) -class DeletionPolicy(enum.Enum): - """ - Enumeration for whether or not a file can be deleted from a store. - - Always defaults to 'DISALLOWED' when parsing. - """ - - DISALLOWED = 0 - ALLOWED = 1 - - -class TransferStatus(enum.Enum): - """ - The status of a transfer. - """ - - INITIATED = 0 - "Transfer has been initiated, but client has not yet started moving data" - ONGOING = 1 - "Client is currently (asynchronously) moving data to us. This is not possible with all transfer managers." - STAGED = 2 - "Transfer has been staged, server is ready to complete the transfer." - COMPLETED = 3 - "Transfer is completed" - FAILED = 4 - "Transfer has been confirmed to have failed." - CANCELLED = 5 - "Transfer has been cancelled by the client." +from alembic import op +from hera_librarian.deletion import DeletionPolicy +from hera_librarian.errors import ErrorCategory, ErrorSeverity +from hera_librarian.transfer import TransferStatus def upgrade(): @@ -196,7 +161,18 @@ def upgrade(): # Securely store authenticator using a password hashing function Column("authenticator", String(256), nullable=False), Column("last_seen", DateTime(), nullable=False), - Column("last_heard", DateTime(), nullable=False) + Column("last_heard", DateTime(), nullable=False), + ) + + op.create_table( + "errors", + Column("id", Integer(), primary_key=True, autoincrement=True, unique=True), + Column("severity", Enum(ErrorSeverity), nullable=False), + Column("category", Enum(ErrorCategory), nullable=False), + Column("message", String(256), nullable=False), + Column("raised_time", DateTime(), nullable=False), + Column("cleared_time", DateTime()), + Column("cleared", Boolean(), nullable=False), ) diff --git a/hera_librarian/errors.py b/hera_librarian/errors.py new file mode 100644 index 0000000..d6cb081 --- /dev/null +++ b/hera_librarian/errors.py @@ -0,0 +1,33 @@ +""" +Error enumeraion for the librarian. Categories of errors. +""" + +from enum import Enum + +class ErrorSeverity(Enum): + """ + Severity of errors. + """ + + CRITICAL = "critical" + "Critical errors are those that need to be fixed immediately." + + WARNING = "warning" + "Warnings are errors that are not critical, but still need to be fixed." + + INFO = "info" + "Informational errors are those that are not critical and do not need to be fixed." + + +class ErrorCategory(Enum): + """ + Categories of errors. + """ + + DATA_INTEGRITY = "data_integrity" + "Data integrity errors are those that indicate that the data on the librarian is not correct (has the wrong checksum)." + + DATA_AVAILABILITY = "data_availability" + "Data availability errors are those that indicate that data available in the database is not available on the librarian." + + diff --git a/hera_librarian/transfer.py b/hera_librarian/transfer.py new file mode 100644 index 0000000..e27a31c --- /dev/null +++ b/hera_librarian/transfer.py @@ -0,0 +1,23 @@ +""" +TransferStatus enum. +""" + +from enum import Enum + +class TransferStatus(Enum): + """ + The status of a transfer. + """ + + INITIATED = 0 + "Transfer has been initiated, but client has not yet started moving data" + ONGOING = 1 + "Client is currently (asynchronously) moving data to us. This is not possible with all transfer managers." + STAGED = 2 + "Transfer has been staged, server is ready to complete the transfer." + COMPLETED = 3 + "Transfer is completed" + FAILED = 4 + "Transfer has been confirmed to have failed." + CANCELLED = 5 + "Transfer has been cancelled by the client." \ No newline at end of file diff --git a/librarian_server/logger.py b/librarian_server/logger.py index 9f4b491..de036a1 100644 --- a/librarian_server/logger.py +++ b/librarian_server/logger.py @@ -4,6 +4,10 @@ import logging as log +from sqlalchemy.orm import Session + +from hera_librarian.errors import ErrorCategory, ErrorSeverity + from .settings import server_settings logging_level = log.getLevelName(server_settings.log_level) @@ -14,4 +18,41 @@ format="(%(module)s:%(funcName)s) [%(asctime)s] {%(levelname)s}:%(message)s", ) -log.debug("Logging set up.") \ No newline at end of file +error_severity_to_logging_level = { + ErrorSeverity.CRITICAL: log.CRITICAL, + ErrorSeverity.WARNING: log.WARNING, + ErrorSeverity.INFO: log.INFO, +} + +log.debug("Logging set up.") + + +def log_to_database( + severity: ErrorSeverity, category: ErrorCategory, message: str, session: Session +) -> None: + """ + Log an error to the database. + + Parameters + ---------- + + severity : ErrorSeverity + The severity of this error. + category : ErrorCategory + The category of this error. + message : str + The message describing this error. + session : Session + The database session to use. + """ + + # Avoid circular imports. + from .orm.errors import Error + + log_level = error_severity_to_logging_level[severity] + log.log(log_level, message) + + error = Error.new_error(severity, category, message) + + session.add(error) + session.commit() diff --git a/librarian_server/orm/__init__.py b/librarian_server/orm/__init__.py index 3beec66..1376fe0 100644 --- a/librarian_server/orm/__init__.py +++ b/librarian_server/orm/__init__.py @@ -6,4 +6,5 @@ from .instance import Instance from .storemetadata import StoreMetadata from .transfer import IncomingTransfer, TransferStatus, CloneTransfer, OutgoingTransfer -from .librarian import Librarian \ No newline at end of file +from .librarian import Librarian +from .errors import Error \ No newline at end of file diff --git a/librarian_server/orm/errors.py b/librarian_server/orm/errors.py new file mode 100644 index 0000000..094ff15 --- /dev/null +++ b/librarian_server/orm/errors.py @@ -0,0 +1,73 @@ +""" +ORM for 'errors' table, describing (potentially critical) errors +that need to be remedied by an outside entity. +""" + +from .. import database as db + +from datetime import datetime +from sqlalchemy.orm import Session + +from hera_librarian.errors import ErrorCategory, ErrorSeverity + + +class Error(db.Base): + """ + Represents an error that needs to be fixed. + """ + + __tablename__ = "errors" + + id = db.Column(db.Integer, primary_key=True, autoincrement=True, unique=True) + "The unique ID of this error." + severity = db.Column(db.Enum(ErrorSeverity), nullable=False) + "The severity of this error." + category = db.Column(db.Enum(ErrorCategory), nullable=False) + "The category of this error." + message = db.Column(db.String(256), nullable=False) + "The message describing this error." + raised_time = db.Column(db.DateTime, nullable=False) + "The time at which this error was raised." + cleared_time = db.Column(db.DateTime, nullable=True) + "The time at which this error was cleared." + cleared = db.Column(db.Boolean, nullable=False) + "Whether or not this error has been cleared." + + @classmethod + def new_error( + self, severity: ErrorSeverity, category: ErrorCategory, message: str + ) -> "Error": + """ + Create a new error object. + + Parameters + ---------- + severity : ErrorSeverity + The severity of this error. + category : ErrorCategory + The category of this error. + message : str + The message describing this error. + """ + return Error( + severity=severity, + category=category, + message=message, + raised_time=datetime.utcnow(), + cleared_time=None, + cleared=False, + ) + + def clear(self, session: Session) -> None: + """ + Clear this error. + + Parameters + ---------- + session : Session + The database session to use. + """ + + self.cleared = True + self.cleared_time = datetime.utcnow() + session.commit() diff --git a/librarian_server/orm/transfer.py b/librarian_server/orm/transfer.py index f185251..ea22b2d 100644 --- a/librarian_server/orm/transfer.py +++ b/librarian_server/orm/transfer.py @@ -9,8 +9,8 @@ from .librarian import Librarian from hera_librarian.models.clone import CloneFailRequest, CloneFailResponse +from hera_librarian.transfer import TransferStatus -from enum import Enum import datetime from typing import TYPE_CHECKING @@ -22,25 +22,6 @@ from sqlalchemy.orm import Session -class TransferStatus(Enum): - """ - The status of a transfer. - """ - - INITIATED = 0 - "Transfer has been initiated, but client has not yet started moving data" - ONGOING = 1 - "Client is currently (asynchronously) moving data to us. This is not possible with all transfer managers." - STAGED = 2 - "Transfer has been staged, server is ready to complete the transfer." - COMPLETED = 3 - "Transfer is completed" - FAILED = 4 - "Transfer has been confirmed to have failed." - CANCELLED = 5 - "Transfer has been cancelled by the client." - - class IncomingTransfer(db.Base): """ An incoming transfer to this librarian. Created once an upload is initialized, diff --git a/tests/server_unit_test/test_error.py b/tests/server_unit_test/test_error.py new file mode 100644 index 0000000..bea9c04 --- /dev/null +++ b/tests/server_unit_test/test_error.py @@ -0,0 +1,80 @@ +""" +Tests we can log errors. +""" + +from hera_librarian.errors import ErrorCategory, ErrorSeverity + + +def test_error_to_db(test_server, test_orm): + # Don't import until we've set up server settings for logging. + from librarian_server.logger import log_to_database + + _, session_maker, _ = test_server + + with session_maker() as session: + log_to_database( + ErrorSeverity.CRITICAL, ErrorCategory.DATA_AVAILABILITY, "test", session + ) + log_to_database( + ErrorSeverity.INFO, ErrorCategory.DATA_AVAILABILITY, "test", session + ) + log_to_database( + ErrorSeverity.WARNING, ErrorCategory.DATA_AVAILABILITY, "test", session + ) + log_to_database( + ErrorSeverity.CRITICAL, ErrorCategory.DATA_INTEGRITY, "test", session + ) + + # Check that they were logged correctly + with session_maker() as session: + errors = session.query(test_orm.Error).all() + + assert len(errors) == 4 + + for error in errors: + assert error.message == "test" + assert error.cleared is False + assert error.cleared_time is None + + assert errors[0].severity == ErrorSeverity.CRITICAL + assert errors[0].category == ErrorCategory.DATA_AVAILABILITY + + assert errors[1].severity == ErrorSeverity.INFO + assert errors[1].category == ErrorCategory.DATA_AVAILABILITY + + assert errors[2].severity == ErrorSeverity.WARNING + assert errors[2].category == ErrorCategory.DATA_AVAILABILITY + + assert errors[3].severity == ErrorSeverity.CRITICAL + assert errors[3].category == ErrorCategory.DATA_INTEGRITY + + # Check we can clear them + + with session_maker() as session: + errors = session.query(test_orm.Error).all() + + for error in errors: + error.clear(session) + + # Check that they were cleared correctly + with session_maker() as session: + errors = session.query(test_orm.Error).all() + + assert len(errors) == 4 + + for error in errors: + assert error.message == "test" + assert error.cleared is True + assert error.cleared_time is not None + + assert errors[0].severity == ErrorSeverity.CRITICAL + assert errors[0].category == ErrorCategory.DATA_AVAILABILITY + + assert errors[1].severity == ErrorSeverity.INFO + assert errors[1].category == ErrorCategory.DATA_AVAILABILITY + + assert errors[2].severity == ErrorSeverity.WARNING + assert errors[2].category == ErrorCategory.DATA_AVAILABILITY + + assert errors[3].severity == ErrorSeverity.CRITICAL + assert errors[3].category == ErrorCategory.DATA_INTEGRITY From 028f18a2fe3c27cd9a7fab470cdbff851f665cff Mon Sep 17 00:00:00 2001 From: Josh Borrow Date: Thu, 25 Jan 2024 12:47:06 -0500 Subject: [PATCH 02/22] Add logging to check_integrity --- librarian_background/check_integrity.py | 33 ++++++++++++++++--------- 1 file changed, 22 insertions(+), 11 deletions(-) diff --git a/librarian_background/check_integrity.py b/librarian_background/check_integrity.py index 5179d12..399fa73 100644 --- a/librarian_background/check_integrity.py +++ b/librarian_background/check_integrity.py @@ -2,18 +2,18 @@ Task for checking the integrity of the store. """ -from .task import Task - -import logging import datetime +import logging from schedule import CancelJob +from sqlalchemy.orm import Session -from librarian_server.orm import StoreMetadata, Instance from librarian_server.database import get_session +from librarian_server.logger import (ErrorCategory, ErrorSeverity, + log_to_database) +from librarian_server.orm import Instance, StoreMetadata -from sqlalchemy.orm import Session - +from .task import Task logger = logging.getLogger("schedule") @@ -50,8 +50,11 @@ def core(self, session: Session): store = self.get_store(session=session) except ValueError: # Store doesn't exist. Cancel this job. - logger.error( - f"Store {self.store_name} does not exist. Cancelling job. Please update the configuration." + log_to_database( + severity=ErrorSeverity.CRITICAL, + category=ErrorCategory.CONFIGURATION, + message=f"Store {self.store_name} does not exist. Cancelling job. Please update the configuration.", + session=session, ) return CancelJob @@ -73,7 +76,12 @@ def core(self, session: Session): path_info = store.store_manager.path_info(file.path) except FileNotFoundError: all_files_fine = False - logger.error(f"File {file.path} on store {store.name} is missing!") + log_to_database( + severity=ErrorSeverity.ERROR, + category=ErrorCategory.DATA_AVAILABILITY, + message=f"File {file.path} on store {store.name} is missing. (Instance: {file.id})", + session=session, + ) continue # Compare checksum to database @@ -88,8 +96,11 @@ def core(self, session: Session): else: # File is not fine. Log it. all_files_fine = False - logger.error( - f"File {file.path} on store {store.name} has an incorrect checksum. Expected {expected_checksum}, got {path_info.md5}." + log_to_database( + severity=ErrorSeverity.ERROR, + category=ErrorCategory.DATA_INTEGRITY, + message=f"File {file.path} on store {store.name} has an incorrect checksum. Expected {expected_checksum}, got {path_info.md5}. (Instance: {file.id})", + session=session, ) if all_files_fine: From 05e10143fcf0593003a5aab34fe9e926a26f5758 Mon Sep 17 00:00:00 2001 From: Josh Borrow Date: Thu, 25 Jan 2024 12:47:13 -0500 Subject: [PATCH 03/22] Add new "ERROR" level --- hera_librarian/errors.py | 6 +++++- librarian_server/logger.py | 1 + 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/hera_librarian/errors.py b/hera_librarian/errors.py index d6cb081..3b2419f 100644 --- a/hera_librarian/errors.py +++ b/hera_librarian/errors.py @@ -12,6 +12,9 @@ class ErrorSeverity(Enum): CRITICAL = "critical" "Critical errors are those that need to be fixed immediately." + ERROR = "error" + "Errors are those that need to be fixed, but are not critical." + WARNING = "warning" "Warnings are errors that are not critical, but still need to be fixed." @@ -30,4 +33,5 @@ class ErrorCategory(Enum): DATA_AVAILABILITY = "data_availability" "Data availability errors are those that indicate that data available in the database is not available on the librarian." - + CONFIGURATION = "configuration" + "Configuration errors are those that indicate that the librarian has been configured incorrectly." diff --git a/librarian_server/logger.py b/librarian_server/logger.py index de036a1..3224d2a 100644 --- a/librarian_server/logger.py +++ b/librarian_server/logger.py @@ -20,6 +20,7 @@ error_severity_to_logging_level = { ErrorSeverity.CRITICAL: log.CRITICAL, + ErrorSeverity.ERROR: log.ERROR, ErrorSeverity.WARNING: log.WARNING, ErrorSeverity.INFO: log.INFO, } From e73bdb4a6120e53c4d7c7484c498573ac58f288b Mon Sep 17 00:00:00 2001 From: Josh Borrow Date: Thu, 25 Jan 2024 12:55:09 -0500 Subject: [PATCH 04/22] Add 'Caller' to error so we can track where it's from. --- .../versions/71df5b41ae41_initial_schema.py | 1 + librarian_server/logger.py | 19 +++++++++++++++++-- librarian_server/orm/errors.py | 6 +++++- 3 files changed, 23 insertions(+), 3 deletions(-) diff --git a/alembic/versions/71df5b41ae41_initial_schema.py b/alembic/versions/71df5b41ae41_initial_schema.py index ce97ad3..8b1e5be 100644 --- a/alembic/versions/71df5b41ae41_initial_schema.py +++ b/alembic/versions/71df5b41ae41_initial_schema.py @@ -173,6 +173,7 @@ def upgrade(): Column("raised_time", DateTime(), nullable=False), Column("cleared_time", DateTime()), Column("cleared", Boolean(), nullable=False), + Column("caller", String(256)), ) diff --git a/librarian_server/logger.py b/librarian_server/logger.py index 3224d2a..f1d3496 100644 --- a/librarian_server/logger.py +++ b/librarian_server/logger.py @@ -2,6 +2,7 @@ Logging setup. Use this as 'from logger import log' """ +import inspect import logging as log from sqlalchemy.orm import Session @@ -45,15 +46,29 @@ def log_to_database( The message describing this error. session : Session The database session to use. + + Notes + ----- + + Automatically stores the above frame's file name, function, and line number in + the 'caller' field of the error. """ - + # Avoid circular imports. from .orm.errors import Error log_level = error_severity_to_logging_level[severity] log.log(log_level, message) - error = Error.new_error(severity, category, message) + caller = ( + inspect.stack()[1].filename + + ":" + + inspect.stack()[1].function + + ":" + + str(inspect.stack()[1].lineno) + ) + + error = Error.new_error(severity, category, message, caller=caller) session.add(error) session.commit() diff --git a/librarian_server/orm/errors.py b/librarian_server/orm/errors.py index 094ff15..ac586ee 100644 --- a/librarian_server/orm/errors.py +++ b/librarian_server/orm/errors.py @@ -3,6 +3,7 @@ that need to be remedied by an outside entity. """ +from typing import Optional from .. import database as db from datetime import datetime @@ -32,10 +33,12 @@ class Error(db.Base): "The time at which this error was cleared." cleared = db.Column(db.Boolean, nullable=False) "Whether or not this error has been cleared." + caller = db.Column(db.String(256), nullable=True) + "The caller that raised this error." @classmethod def new_error( - self, severity: ErrorSeverity, category: ErrorCategory, message: str + self, severity: ErrorSeverity, category: ErrorCategory, message: str, caller: Optional[str] = None ) -> "Error": """ Create a new error object. @@ -56,6 +59,7 @@ def new_error( raised_time=datetime.utcnow(), cleared_time=None, cleared=False, + caller=caller, ) def clear(self, session: Session) -> None: From 4e239da247e50367e9171940badbd25f47fa291f Mon Sep 17 00:00:00 2001 From: Josh Borrow Date: Thu, 25 Jan 2024 13:02:06 -0500 Subject: [PATCH 05/22] Update create_clone to store failture states in database. --- hera_librarian/errors.py | 6 +++ librarian_background/create_clone.py | 77 ++++++++++++++++++---------- 2 files changed, 56 insertions(+), 27 deletions(-) diff --git a/hera_librarian/errors.py b/hera_librarian/errors.py index 3b2419f..ce5197c 100644 --- a/hera_librarian/errors.py +++ b/hera_librarian/errors.py @@ -35,3 +35,9 @@ class ErrorCategory(Enum): CONFIGURATION = "configuration" "Configuration errors are those that indicate that the librarian has been configured incorrectly." + + STORE_FULL = "store_full" + "Store full errors are those that indicate that a store is full and cannot accept new data." + + PROGRAMMING = "programming" + "Programming errors are those that indicate that the librarian has a bug, i.e. we have reached an 'unreachable' state." \ No newline at end of file diff --git a/librarian_background/create_clone.py b/librarian_background/create_clone.py index 7dc5eae..c3fbba6 100644 --- a/librarian_background/create_clone.py +++ b/librarian_background/create_clone.py @@ -4,19 +4,20 @@ """ -from .task import Task - -import logging import datetime +import logging +from pathlib import Path from schedule import CancelJob -from pathlib import Path +from sqlalchemy.orm import Session -from librarian_server.orm import StoreMetadata, Instance, CloneTransfer, TransferStatus from librarian_server.database import get_session +from librarian_server.logger import (ErrorCategory, ErrorSeverity, + log_to_database) +from librarian_server.orm import (CloneTransfer, Instance, StoreMetadata, + TransferStatus) -from sqlalchemy.orm import Session - +from .task import Task logger = logging.getLogger("schedule") @@ -36,15 +37,13 @@ class CreateLocalClone(Task): # TODO: In the future, we could implement a _rolling_ n day clone here, i.e. only keep the last n days of files on the clone_to store. def get_store(self, name: str, session: Session) -> StoreMetadata: - possible_metadata = ( - session.query(StoreMetadata).filter_by(name=name).first() - ) + possible_metadata = session.query(StoreMetadata).filter_by(name=name).first() if not possible_metadata: raise ValueError(f"Store {name} does not exist.") return possible_metadata - + def on_call(self): with get_session() as session: return self.core(session=session) @@ -54,8 +53,11 @@ def core(self, session: Session): store_from = self.get_store(self.clone_from, session) except ValueError: # Store doesn't exist. Cancel this job. - logger.error( - f"Store {self.clone_from} does not exist. Cancelling job. Please update the configuration." + log_to_database( + severity=ErrorSeverity.CRITICAL, + category=ErrorCategory.CONFIGURATION, + message=f"Store {self.clone_from} does not exist. Cancelling job. Please update the configuration.", + session=session, ) return CancelJob @@ -63,8 +65,11 @@ def core(self, session: Session): store_to = self.get_store(self.clone_to, session) except ValueError: # Store doesn't exist. Cancel this job. - logger.error( - f"Store {self.clone_to} does not exist. Cancelling job. Please update the configuration." + log_to_database( + severity=ErrorSeverity.CRITICAL, + category=ErrorCategory.CONFIGURATION, + message=f"Store {self.clone_to} does not exist. Cancelling job. Please update the configuration.", + session=session, ) return CancelJob @@ -107,7 +112,7 @@ def core(self, session: Session): session.add(transfer) session.commit() - # TODO: Check if there is an already existing transfer! Maybe it is running asynchronously? Maybe we need to check the status? + # TODO: Check if there is an already existing transfer! Maybe it is running asynchronously? Maybe we need to check the status? # Now we can clone the file to the clone_to store. try: @@ -115,8 +120,13 @@ def core(self, session: Session): file_size=instance.file.size, file_name=instance.file.name ) except ValueError: - logger.error( - f"File {instance.file.name} is too large to fit on store {store_to}. Skipping." + # TODO: In the future where we have multiple potential clone stores for SneakerNet we should + # automatically fail-over to the next store. + log_to_database( + severity=ErrorSeverity.ERROR, + category=ErrorCategory.STORE_FULL, + message=f"File {instance.file.name} is too large to fit on store {store_to}. Skipping. (Instance {instance.id})", + session=session, ) transfer.fail_transfer(session=session) @@ -146,8 +156,11 @@ def core(self, session: Session): continue except FileNotFoundError as e: - logger.error( - f"File {instance.path} does not exist on store {store_from}. Skipping." + log_to_database( + severity=ErrorSeverity.ERROR, + category=ErrorCategory.DATA_AVAILABILITY, + message=f"File {instance.path} does not exist on store {store_from}. Skipping. (Instance {instance.id})", + session=session, ) transfer.fail_transfer(session=session) @@ -157,8 +170,11 @@ def core(self, session: Session): continue if not success: - logger.error( - f"Failed to transfer file {instance.path} to store {store_to}. Skipping." + log_to_database( + severity=ErrorSeverity.ERROR, + category=ErrorCategory.DATA_AVAILABILITY, + message=f"Failed to transfer file {instance.path} to store {store_to}. Skipping. (Instance {instance.id})", + session=session, ) transfer.fail_transfer(session=session) @@ -177,9 +193,12 @@ def core(self, session: Session): path_info = store_to.store_manager.path_info(staged_path) if path_info.md5 != instance.file.checksum: - logger.error( - f"File {instance.path} on store {store_to} has an incorrect checksum. " - f"Expected {instance.file.checksum}, got {path_info.md5}." + log_to_database( + severity=ErrorSeverity.ERROR, + category=ErrorCategory.DATA_INTEGRITY, + message=f"File {instance.path} on store {store_to} has an incorrect checksum. " + f"Expected {instance.file.checksum}, got {path_info.md5}. (Instance {instance.id})", + session=session, ) transfer.fail_transfer(session=session) @@ -194,9 +213,13 @@ def core(self, session: Session): staging_path=staged_path, store_path=Path(instance.file.name) ) except FileExistsError: - logger.error( - f"File {instance.path} already exists on store {store_to}. Skipping." + log_to_database( + severity=ErrorSeverity.CRITICAL, + category=ErrorCategory.PROGRAMMING, + message=f"File {instance.path} already exists on store {store_to}. Skipping. (Instance {instance.id})", + session=session, ) + store_to.store_manager.unstage(staging_name) transfer.fail_transfer(session=session) From 52cc31ddfd730d8bbbc3da1e8824c065bb0320f4 Mon Sep 17 00:00:00 2001 From: Josh Borrow Date: Thu, 25 Jan 2024 13:04:16 -0500 Subject: [PATCH 06/22] Make error message a true varchat --- alembic/versions/71df5b41ae41_initial_schema.py | 2 +- librarian_server/orm/errors.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/alembic/versions/71df5b41ae41_initial_schema.py b/alembic/versions/71df5b41ae41_initial_schema.py index 8b1e5be..b7ede11 100644 --- a/alembic/versions/71df5b41ae41_initial_schema.py +++ b/alembic/versions/71df5b41ae41_initial_schema.py @@ -169,7 +169,7 @@ def upgrade(): Column("id", Integer(), primary_key=True, autoincrement=True, unique=True), Column("severity", Enum(ErrorSeverity), nullable=False), Column("category", Enum(ErrorCategory), nullable=False), - Column("message", String(256), nullable=False), + Column("message", String, nullable=False), Column("raised_time", DateTime(), nullable=False), Column("cleared_time", DateTime()), Column("cleared", Boolean(), nullable=False), diff --git a/librarian_server/orm/errors.py b/librarian_server/orm/errors.py index ac586ee..f9a2269 100644 --- a/librarian_server/orm/errors.py +++ b/librarian_server/orm/errors.py @@ -25,7 +25,7 @@ class Error(db.Base): "The severity of this error." category = db.Column(db.Enum(ErrorCategory), nullable=False) "The category of this error." - message = db.Column(db.String(256), nullable=False) + message = db.Column(db.String, nullable=False) "The message describing this error." raised_time = db.Column(db.DateTime, nullable=False) "The time at which this error was raised." From 7edc02983b5dbe5fab0479971aac6f6017d75e93 Mon Sep 17 00:00:00 2001 From: Josh Borrow Date: Fri, 26 Jan 2024 09:06:50 -0500 Subject: [PATCH 07/22] Minor fixes in send/recv clones. Not done there yet --- librarian_background/recieve_clone.py | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/librarian_background/recieve_clone.py b/librarian_background/recieve_clone.py index eca971e..4504a35 100644 --- a/librarian_background/recieve_clone.py +++ b/librarian_background/recieve_clone.py @@ -21,6 +21,7 @@ TransferStatus, Librarian, ) +from librarian_server.logger import log_to_database, ErrorCategory, ErrorSeverity from hera_librarian.deletion import DeletionPolicy from hera_librarian.models.clone import ( @@ -69,6 +70,7 @@ def core(self, session: Session): store: StoreMetadata = transfer.store if store is None: + # TODO: Check if this should be a programming error. logger.error( f"Transfer {transfer.id} has no store associated with it. Skipping for now." ) @@ -80,6 +82,7 @@ def core(self, session: Session): try: path_info = store.store_manager.path_info(Path(transfer.staging_path)) except TypeError: + # TODO: Check if this should be a programming error. logger.error( f"Transfer {transfer.id} has no staging path associated with it. Skipping for now." ) @@ -88,7 +91,7 @@ def core(self, session: Session): continue - # TODO: Make this check more robust? + # TODO: Make this check more robust? Could have transfer managers provide checks? if ( path_info.md5 == transfer.transfer_checksum and path_info.size == transfer.transfer_size @@ -159,8 +162,8 @@ def core(self, session: Session): try: response: CloneCompleteResponse = ( - librarian.client.do_pydantic_http_post( - endpoint="/api/v2/clone/complete", + librarian.client.post( + endpoint="clone/complete", request_model=request, response_model=CloneCompleteResponse, ) From 884efe51586f28c2e56696c435841d1674b0db0a Mon Sep 17 00:00:00 2001 From: Josh Borrow Date: Fri, 26 Jan 2024 09:35:55 -0500 Subject: [PATCH 08/22] Add endpoint error models --- hera_librarian/models/errors.py | 91 +++++++++++++++++++++++++++++++++ 1 file changed, 91 insertions(+) create mode 100644 hera_librarian/models/errors.py diff --git a/hera_librarian/models/errors.py b/hera_librarian/models/errors.py new file mode 100644 index 0000000..79bb9de --- /dev/null +++ b/hera_librarian/models/errors.py @@ -0,0 +1,91 @@ +""" +Models for error endpoints. +""" + +from datetime import datetime +from typing import Optional + +from pydantic import BaseModel, Field, RootModel + +from ..errors import ErrorCategory, ErrorSeverity + + +class ErrorRequest(BaseModel): + """ + A request for a set of errors from the librarian. + """ + + id: Optional[int] = None + "The ID of the error to search for. If left empty, all errors will be returned." + category: Optional[ErrorCategory] = None + "The category of errors to return. If left empty, all errors will be returned." + severity: Optional[ErrorSeverity] = None + "The severity of errors to return. If left empty, all errors will be returned." + create_time_window: Optional[tuple[datetime, ...]] = Field( + default=None, min_length=2, max_length=2 + ) + "The time window to search for files in. This is a tuple of two datetimes, the first being the start and the second being the end. Note that the datetimes should be in UTC." + include_resolved: bool = False + "Whether or not to include resolved errors in the response. By default, we do not." + max_results: int = 64 + "The number of errors to return." + + +class ErrorResponse(BaseModel): + """ + The response model for an individual error. We actually return + ErrorResponses, defined below, which is a list of these. + """ + + id: int + "The ID of this error." + severity: ErrorSeverity + "The severity of this error." + category: ErrorCategory + "The category of this error." + message: str + "The message describing this error." + raised_time: datetime + "The time at which this error was raised." + cleared_time: Optional[datetime] + "The time at which this error was cleared." + cleared: bool + "Whether or not this error has been cleared." + caller: Optional[str] + "The caller that raised this error." + + +ErrorResponses = RootModel[list[ErrorResponse]] + + +class ErrorClearRequest(BaseModel): + """ + A request to clear an error. + """ + + id: int + "The ID of the error to clear." + + +class ErrorClearResponse(BaseModel): + """ + The response to an error clear request. + """ + + id: int + "The ID of the error that was cleared." + cleared_time: datetime + "The time at which the error was cleared." + cleared: bool + "Whether or not the error was cleared." + + +class ErrorSearchFailedResponse(BaseModel): + """ + The response to an error search request that failed. + """ + + reason: str + "The reason the search failed." + suggested_remedy: str + "A suggested remedy for the failure." From 21f3a31d3418af8228e92802c25a31a7735d86db Mon Sep 17 00:00:00 2001 From: Josh Borrow Date: Fri, 26 Jan 2024 13:29:48 -0500 Subject: [PATCH 09/22] Added API Endpoints --- librarian_server/api/errors.py | 53 +++++++++++++++++++++++ librarian_server/api/search.py | 77 +++++++++++++++++++++++++++++++++- 2 files changed, 129 insertions(+), 1 deletion(-) create mode 100644 librarian_server/api/errors.py diff --git a/librarian_server/api/errors.py b/librarian_server/api/errors.py new file mode 100644 index 0000000..7bede5d --- /dev/null +++ b/librarian_server/api/errors.py @@ -0,0 +1,53 @@ +""" +Endpoints for the API that allows the control of errors. + +For searching errors, see /search.py. +""" + +from datetime import datetime +from typing import Optional + +from fastapi import APIRouter, Depends, Response, status +from sqlalchemy.orm import Session + +from hera_librarian.models.errors import (ErrorClearRequest, + ErrorClearResponse, + ErrorSearchFailedResponse) +from librarian_server.database import yield_session +from librarian_server.orm import Error, ErrorCategory, ErrorSeverity + +router = APIRouter("/api/v2/error") + + +@router.post("/clear", response_model=ErrorClearResponse | ErrorSearchFailedResponse) +def clear_error( + request: ErrorClearRequest, + response: Response, + session: Session = Depends(yield_session), +): + """ + Clears an error. + + Possible response codes: + + 200 - OK. Error cleared successfully. + 404 - No error found to match search criteria. + """ + + error = session.get(Error, request.id) + + if error is None: + response.status_code = status.HTTP_404_NOT_FOUND + + return ErrorSearchFailedResponse( + error_message="No error found to with ID {request.id} to clear.", + suggested_remedy="Check you are searching for a valid error ID.", + ) + + error.clear(session) + + error = session.get(Error, request.id) + + return ErrorClearResponse( + id=request.id, cleared_time=error.cleared_time, cleared=error.cleared + ) diff --git a/librarian_server/api/search.py b/librarian_server/api/search.py index 2b4b4e0..46300ad 100644 --- a/librarian_server/api/search.py +++ b/librarian_server/api/search.py @@ -2,10 +2,16 @@ Contains endpoints for searching the files uploaded to the librarian. """ +from typing import Optional + from fastapi import APIRouter, Depends, Response, status from sqlalchemy import select from sqlalchemy.orm import Session +from hera_librarian.models.errors import (ErrorSearchFailedResponse, + ErrorSearchRequest, + ErrorSearchResponse, + ErrorSearchResponses) from hera_librarian.models.search import (FileSearchFailedResponse, FileSearchRequest, FileSearchResponse, @@ -15,8 +21,8 @@ from ..database import yield_session from ..logger import log +from ..orm.errors import Error, ErrorCategory, ErrorSeverity from ..orm.file import File -from ..orm.instance import Instance, RemoteInstance from ..settings import server_settings router = APIRouter(prefix="/api/v2/search") @@ -101,3 +107,72 @@ def file( ) return FileSearchResponses(respond_files) + + +@router.post("/error", response_model=ErrorSearchResponses | ErrorSearchFailedResponse) +def error( + request: FileSearchRequest, + response: Response, + session: Session = Depends(yield_session), +): + """ + Searches for errors based upon the FileSearchRequest. + + Possible response codes: + + 200 - OK. Search completed successfully. + 404 - No file found to match search criteria. + """ + + log.debug(f"Received error search request: {request}") + + # Start to build our query. + query = select(Error) + + if request.id is not None: + query = query.where(Error.id == request.id) + + if request.category is not None: + query = query.where(Error.category == request.category) + + if request.severity is not None: + query = query.where(Error.severity == request.severity) + + if request.create_time_window is not None: + query.where(Error.raised_time >= request.create_time_window[0]) + query.where(Error.raised_time <= request.create_time_window[1]) + + if request.include_resolved is False: + query = query.where(Error.cleared == False) + + query.order_by(Error.raised_time) + query.limit(max(min(request.max_results, server_settings.max_search_results), 0)) + + results = session.execute(query).scalars().all() + + if len(results) == 0: + log.debug(f"No errors found. Returning 'error'.") + response.status_code = status.HTTP_404_NOT_FOUND + return ErrorSearchFailedResponse( + reason="No files found.", + suggested_remedy="Check that you are searching for the correct file.", + ) + + # Build the response. + respond_errors = [] + + for result in results: + respond_errors.append( + ErrorSearchResponse( + id=result.id, + severity=result.severity, + category=result.category, + message=result.message, + raised_time=result.raised_time, + cleared_time=result.cleared_time, + cleared=result.cleared, + caller=result.caller, + ) + ) + + return ErrorSearchResponses(respond_errors) From f0c6aa3983642886e312582abf79d190ed9288d8 Mon Sep 17 00:00:00 2001 From: Josh Borrow Date: Fri, 26 Jan 2024 13:59:23 -0500 Subject: [PATCH 10/22] Added a few tests for error searching --- hera_librarian/models/errors.py | 6 ++-- librarian_server/api/search.py | 4 +-- tests/conftest.py | 32 +++++++++++++++++-- tests/server_unit_test/test_search.py | 46 ++++++++++++++++++++++++--- 4 files changed, 76 insertions(+), 12 deletions(-) diff --git a/hera_librarian/models/errors.py b/hera_librarian/models/errors.py index 79bb9de..746ebcf 100644 --- a/hera_librarian/models/errors.py +++ b/hera_librarian/models/errors.py @@ -10,7 +10,7 @@ from ..errors import ErrorCategory, ErrorSeverity -class ErrorRequest(BaseModel): +class ErrorSearchRequest(BaseModel): """ A request for a set of errors from the librarian. """ @@ -31,7 +31,7 @@ class ErrorRequest(BaseModel): "The number of errors to return." -class ErrorResponse(BaseModel): +class ErrorSearchResponse(BaseModel): """ The response model for an individual error. We actually return ErrorResponses, defined below, which is a list of these. @@ -55,7 +55,7 @@ class ErrorResponse(BaseModel): "The caller that raised this error." -ErrorResponses = RootModel[list[ErrorResponse]] +ErrorSearchResponses = RootModel[list[ErrorSearchResponse]] class ErrorClearRequest(BaseModel): diff --git a/librarian_server/api/search.py b/librarian_server/api/search.py index 46300ad..406b07e 100644 --- a/librarian_server/api/search.py +++ b/librarian_server/api/search.py @@ -111,12 +111,12 @@ def file( @router.post("/error", response_model=ErrorSearchResponses | ErrorSearchFailedResponse) def error( - request: FileSearchRequest, + request: ErrorSearchRequest, response: Response, session: Session = Depends(yield_session), ): """ - Searches for errors based upon the FileSearchRequest. + Searches for errors based upon the ErrorSearchRequest. Possible response codes: diff --git a/tests/conftest.py b/tests/conftest.py index eaeb4ae..1161319 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -7,12 +7,14 @@ import os import random import shutil +import datetime from pathlib import Path from subprocess import run import pytest from hera_librarian.utils import get_md5_from_path, get_size_from_path +from hera_librarian.errors import ErrorCategory, ErrorSeverity from .server import Server, server_setup @@ -324,9 +326,9 @@ def test_server_with_missing_file(test_server, test_orm): @pytest.fixture(scope="function") -def test_server_with_many_files(test_server, test_orm): +def test_server_with_many_files_and_errors(test_server, test_orm): """ - Test server with a valid file and instance in the store. + Test server with many valid files and some errors in the database. """ session = test_server[1]() @@ -335,6 +337,7 @@ def test_server_with_many_files(test_server, test_orm): data = random.randbytes(1024) + # Add many files file_names = [f"many_server_example_file_{x}.txt" for x in range(128)] for file_name in file_names: @@ -363,6 +366,26 @@ def test_server_with_many_files(test_server, test_orm): session.commit() + # Add errors + error_ids = [] + + for x in range(128): + error = test_orm.Error.new_error( + severity=random.choice([e for e in ErrorSeverity]), + category=random.choice([e for e in ErrorCategory]), + message=f"Test error {x}", + caller="test", + ) + + if x % 2: + error.cleared = True + error.cleared_time = datetime.datetime.utcnow() + + session.add(error) + session.commit() + + error_ids.append(error.id) + session.close() yield test_server @@ -378,6 +401,11 @@ def test_server_with_many_files(test_server, test_orm): session.delete(file) session.delete(instance) + for error_id in error_ids: + error = session.get(test_orm.Error, error_id) + + session.delete(error) + session.commit() session.close() diff --git a/tests/server_unit_test/test_search.py b/tests/server_unit_test/test_search.py index 520cf82..d2d72ea 100644 --- a/tests/server_unit_test/test_search.py +++ b/tests/server_unit_test/test_search.py @@ -4,13 +4,16 @@ import datetime +from hera_librarian.models.errors import (ErrorSearchFailedResponse, + ErrorSearchRequest, + ErrorSearchResponses) from hera_librarian.models.search import (FileSearchFailedResponse, FileSearchRequest, FileSearchResponse, FileSearchResponses) -def test_search_by_filename(test_server_with_many_files, test_client): +def test_search_by_filename(test_server_with_many_files_and_errors, test_client): request = FileSearchRequest(name="many_server_example_file_0.txt") response = test_client.post( @@ -24,7 +27,7 @@ def test_search_by_filename(test_server_with_many_files, test_client): response = FileSearchResponses.model_validate_json(response.content) -def test_search_by_created_time(test_server_with_many_files, test_client): +def test_search_by_created_time(test_server_with_many_files_and_errors, test_client): request = FileSearchRequest( create_time_window=( datetime.datetime.utcnow() - datetime.timedelta(days=1), @@ -43,7 +46,7 @@ def test_search_by_created_time(test_server_with_many_files, test_client): response = FileSearchResponses.model_validate_json(response.content) -def test_search_by_source(test_server_with_many_files, test_client): +def test_search_by_source(test_server_with_many_files_and_errors, test_client): request = FileSearchRequest(source="test") response = test_client.post( @@ -57,7 +60,7 @@ def test_search_by_source(test_server_with_many_files, test_client): response = FileSearchResponses.model_validate_json(response.content) -def test_search_by_uploader(test_server_with_many_files, test_client): +def test_search_by_uploader(test_server_with_many_files_and_errors, test_client): request = FileSearchRequest(uploader="test") response = test_client.post( @@ -71,7 +74,7 @@ def test_search_by_uploader(test_server_with_many_files, test_client): response = FileSearchResponses.model_validate_json(response.content) -def test_failed_search(test_server_with_many_files, test_client): +def test_failed_search(test_server_with_many_files_and_errors, test_client): request = FileSearchRequest(name="this_file_does_not_exist") response = test_client.post( @@ -83,3 +86,36 @@ def test_failed_search(test_server_with_many_files, test_client): assert response.status_code == 404 response = FileSearchFailedResponse.model_validate_json(response.content) + + +def test_error_all_search( + test_server_with_many_files_and_errors, test_client, test_orm +): + request = ErrorSearchRequest(include_resolved=False) + + response = test_client.post( + "/api/v2/search/error", + headers={"Content-Type": "application/json"}, + content=request.model_dump_json(), + ) + + assert response.status_code == 200 + + response = ErrorSearchResponses.model_validate_json(response.content) + + for model in response.root: + assert model.cleared is False + + +def test_failed_error_search(test_server_with_many_files_and_errors, test_client): + request = ErrorSearchRequest(id=-1) + + response = test_client.post( + "/api/v2/search/error", + headers={"Content-Type": "application/json"}, + content=request.model_dump_json(), + ) + + assert response.status_code == 404 + + response = ErrorSearchFailedResponse.model_validate_json(response.content) From 4f6b5242fac83a1d261b62ded65b88fb7081edb5 Mon Sep 17 00:00:00 2001 From: Josh Borrow Date: Fri, 26 Jan 2024 14:31:22 -0500 Subject: [PATCH 11/22] Added full test coverage for search endpoint --- librarian_server/api/search.py | 16 +++---- tests/server_unit_test/test_clone.py | 3 +- tests/server_unit_test/test_search.py | 60 ++++++++++++++++++++++----- tests/server_unit_test/test_upload.py | 8 +++- 4 files changed, 65 insertions(+), 22 deletions(-) diff --git a/librarian_server/api/search.py b/librarian_server/api/search.py index 406b07e..df5ddfa 100644 --- a/librarian_server/api/search.py +++ b/librarian_server/api/search.py @@ -5,7 +5,7 @@ from typing import Optional from fastapi import APIRouter, Depends, Response, status -from sqlalchemy import select +from sqlalchemy import desc, select from sqlalchemy.orm import Session from hera_librarian.models.errors import (ErrorSearchFailedResponse, @@ -61,8 +61,9 @@ def file( if request.source is not None: query = query.where(File.source == request.source) - query.order_by(File.create_time) - query.limit(max(min(request.max_results, server_settings.max_search_results), 0)) + query = query.order_by(desc(File.create_time)) + max_results = max(min(request.max_results, server_settings.max_search_results), 0) + query = query.limit(max_results) # Execute the query. results = session.execute(query).scalars().all() @@ -139,14 +140,15 @@ def error( query = query.where(Error.severity == request.severity) if request.create_time_window is not None: - query.where(Error.raised_time >= request.create_time_window[0]) - query.where(Error.raised_time <= request.create_time_window[1]) + query = query.where(Error.raised_time >= request.create_time_window[0]) + query = query.where(Error.raised_time <= request.create_time_window[1]) if request.include_resolved is False: query = query.where(Error.cleared == False) - query.order_by(Error.raised_time) - query.limit(max(min(request.max_results, server_settings.max_search_results), 0)) + query = query.order_by(desc(Error.raised_time)) + max_results = max(min(request.max_results, server_settings.max_search_results), 0) + query = query.limit(max_results) results = session.execute(query).scalars().all() diff --git a/tests/server_unit_test/test_clone.py b/tests/server_unit_test/test_clone.py index db60884..fd4156f 100644 --- a/tests/server_unit_test/test_clone.py +++ b/tests/server_unit_test/test_clone.py @@ -337,7 +337,6 @@ def test_incoming_transfer_endpoints( # Check it's in the database with correct status - # Clean up that garbage with get_session() as session: transfer = session.get(test_orm.OutgoingTransfer, transfer_id) @@ -423,7 +422,7 @@ def test_clone_file_exists(test_client, test_server, test_orm, garbage_filename) ) _, get_session, _ = test_server - + with get_session() as session: session.add(file) session.commit() diff --git a/tests/server_unit_test/test_search.py b/tests/server_unit_test/test_search.py index d2d72ea..b8ab2a8 100644 --- a/tests/server_unit_test/test_search.py +++ b/tests/server_unit_test/test_search.py @@ -4,9 +4,10 @@ import datetime -from hera_librarian.models.errors import (ErrorSearchFailedResponse, +from hera_librarian.models.errors import (ErrorCategory, + ErrorSearchFailedResponse, ErrorSearchRequest, - ErrorSearchResponses) + ErrorSearchResponses, ErrorSeverity) from hera_librarian.models.search import (FileSearchFailedResponse, FileSearchRequest, FileSearchResponse, @@ -91,21 +92,44 @@ def test_failed_search(test_server_with_many_files_and_errors, test_client): def test_error_all_search( test_server_with_many_files_and_errors, test_client, test_orm ): - request = ErrorSearchRequest(include_resolved=False) + def make_request(request): + response = test_client.post( + "/api/v2/search/error", + headers={"Content-Type": "application/json"}, + content=request.model_dump_json(), + ) - response = test_client.post( - "/api/v2/search/error", - headers={"Content-Type": "application/json"}, - content=request.model_dump_json(), - ) + assert response.status_code == 200 - assert response.status_code == 200 + return ErrorSearchResponses.model_validate_json(response.content).root - response = ErrorSearchResponses.model_validate_json(response.content) + response = make_request(ErrorSearchRequest(include_resolved=False)) - for model in response.root: + for model in response: assert model.cleared is False + response = make_request(ErrorSearchRequest(include_resolved=True)) + + includes_cleared = False + for model in response: + includes_cleared = includes_cleared or model.cleared + + assert includes_cleared + + response = make_request(ErrorSearchRequest(max_results=1)) + + assert len(response) == 1 + + response = make_request(ErrorSearchRequest(severity=ErrorSeverity.CRITICAL)) + + for model in response: + assert model.severity == ErrorSeverity.CRITICAL + + response = make_request(ErrorSearchRequest(category=ErrorCategory.CONFIGURATION)) + + for model in response: + assert model.category == ErrorCategory.CONFIGURATION + def test_failed_error_search(test_server_with_many_files_and_errors, test_client): request = ErrorSearchRequest(id=-1) @@ -119,3 +143,17 @@ def test_failed_error_search(test_server_with_many_files_and_errors, test_client assert response.status_code == 404 response = ErrorSearchFailedResponse.model_validate_json(response.content) + + request = ErrorSearchRequest( + create_time_window=[datetime.datetime.min, datetime.datetime.min] + ) + + response = test_client.post( + "/api/v2/search/error", + headers={"Content-Type": "application/json"}, + content=request.model_dump_json(), + ) + + assert response.status_code == 404 + + response = ErrorSearchFailedResponse.model_validate_json(response.content) diff --git a/tests/server_unit_test/test_upload.py b/tests/server_unit_test/test_upload.py index 7e24c52..0b5a73c 100644 --- a/tests/server_unit_test/test_upload.py +++ b/tests/server_unit_test/test_upload.py @@ -45,7 +45,9 @@ def test_negative_upload_size(test_client: TestClient): def test_extreme_upload_size( - test_client: TestClient, test_server: tuple[FastAPI, callable, Server], test_orm: Any + test_client: TestClient, + test_server: tuple[FastAPI, callable, Server], + test_orm: Any, ): """ Tests that an upload size that is too large results in an error. @@ -79,7 +81,9 @@ def test_extreme_upload_size( def test_valid_stage( - test_client: TestClient, test_server: tuple[FastAPI, callable, Server], test_orm: Any + test_client: TestClient, + test_server: tuple[FastAPI, callable, Server], + test_orm: Any, ): """ Tests that a valid stage works. From af8f79dbc0b5a2ad69eec4765410478af9a30ba0 Mon Sep 17 00:00:00 2001 From: Josh Borrow Date: Fri, 26 Jan 2024 15:09:07 -0500 Subject: [PATCH 12/22] Unit tests for error --- librarian_server/__init__.py | 10 +++-- librarian_server/api/__init__.py | 3 +- librarian_server/api/errors.py | 16 ++++++-- tests/server_unit_test/test_error.py | 61 ++++++++++++++++++++++++++++ 4 files changed, 83 insertions(+), 7 deletions(-) diff --git a/librarian_server/__init__.py b/librarian_server/__init__.py index e4f8ec3..3247424 100644 --- a/librarian_server/__init__.py +++ b/librarian_server/__init__.py @@ -5,9 +5,11 @@ asynchronously, and that background tasks can work on any available ASGI server. """ -from .settings import server_settings from fastapi import FastAPI +from .settings import server_settings + + def main() -> FastAPI: from .logger import log @@ -18,11 +20,13 @@ def main() -> FastAPI: log.debug("Adding API router.") - from .api import upload_router, ping_router, clone_router, search_router + from .api import (clone_router, error_router, ping_router, search_router, + upload_router) app.include_router(upload_router) app.include_router(ping_router) app.include_router(clone_router) app.include_router(search_router) + app.include_router(error_router) - return app \ No newline at end of file + return app diff --git a/librarian_server/api/__init__.py b/librarian_server/api/__init__.py index 5bbab80..ee185c4 100644 --- a/librarian_server/api/__init__.py +++ b/librarian_server/api/__init__.py @@ -8,4 +8,5 @@ from .upload import router as upload_router from .ping import router as ping_router from .clone import router as clone_router -from .search import router as search_router \ No newline at end of file +from .search import router as search_router +from .errors import router as error_router \ No newline at end of file diff --git a/librarian_server/api/errors.py b/librarian_server/api/errors.py index 7bede5d..51df2a0 100644 --- a/librarian_server/api/errors.py +++ b/librarian_server/api/errors.py @@ -10,13 +10,14 @@ from fastapi import APIRouter, Depends, Response, status from sqlalchemy.orm import Session +from hera_librarian.errors import ErrorCategory, ErrorSeverity from hera_librarian.models.errors import (ErrorClearRequest, ErrorClearResponse, ErrorSearchFailedResponse) from librarian_server.database import yield_session -from librarian_server.orm import Error, ErrorCategory, ErrorSeverity +from librarian_server.orm import Error -router = APIRouter("/api/v2/error") +router = APIRouter(prefix="/api/v2/error") @router.post("/clear", response_model=ErrorClearResponse | ErrorSearchFailedResponse) @@ -31,6 +32,7 @@ def clear_error( Possible response codes: 200 - OK. Error cleared successfully. + 400 - Error has already been cleared. 404 - No error found to match search criteria. """ @@ -40,7 +42,15 @@ def clear_error( response.status_code = status.HTTP_404_NOT_FOUND return ErrorSearchFailedResponse( - error_message="No error found to with ID {request.id} to clear.", + reason="No error found to with ID {request.id} to clear.", + suggested_remedy="Check you are searching for a valid error ID.", + ) + + if error.cleared: + response.status_code = status.HTTP_400_BAD_REQUEST + + return ErrorSearchFailedResponse( + reason="Error with ID {request.id} has already been cleared.", suggested_remedy="Check you are searching for a valid error ID.", ) diff --git a/tests/server_unit_test/test_error.py b/tests/server_unit_test/test_error.py index bea9c04..4ee05d1 100644 --- a/tests/server_unit_test/test_error.py +++ b/tests/server_unit_test/test_error.py @@ -3,6 +3,9 @@ """ from hera_librarian.errors import ErrorCategory, ErrorSeverity +from hera_librarian.models.errors import (ErrorClearRequest, + ErrorClearResponse, + ErrorSearchFailedResponse) def test_error_to_db(test_server, test_orm): @@ -78,3 +81,61 @@ def test_error_to_db(test_server, test_orm): assert errors[3].severity == ErrorSeverity.CRITICAL assert errors[3].category == ErrorCategory.DATA_INTEGRITY + + +def test_clear_endpoint(test_server_with_many_files_and_errors, test_client, test_orm): + """ + Test the clear endpoint. + """ + + request = ErrorClearRequest(id=-1) + + response = test_client.post( + "/api/v2/error/clear", + headers={"Content-Type": "application/json"}, + content=request.model_dump_json(), + ) + + assert response.status_code == 404 + + response = ErrorSearchFailedResponse.model_validate_json(response.content) + + # Find an un-cleared item in the database. + with test_server_with_many_files_and_errors[1]() as session: + error_id = session.query(test_orm.Error).filter_by(cleared=False).first().id + + request = ErrorClearRequest(id=error_id) + + response = test_client.post( + "/api/v2/error/clear", + headers={"Content-Type": "application/json"}, + content=request.model_dump_json(), + ) + + assert response.status_code == 200 + + response = ErrorClearResponse.model_validate_json(response.content) + + assert response.cleared is True + + # Check it was cleared in the database. + + with test_server_with_many_files_and_errors[1]() as session: + error = session.get(test_orm.Error, error_id) + + assert error.cleared is True + assert error.cleared_time is not None + + # Check we can't clear it again. + + request = ErrorClearRequest(id=error_id) + + response = test_client.post( + "/api/v2/error/clear", + headers={"Content-Type": "application/json"}, + content=request.model_dump_json(), + ) + + assert response.status_code == 400 + + response = ErrorSearchFailedResponse.model_validate_json(response.content) From 04017f9ff23b5804ccfd900c95253f6fb99850a8 Mon Sep 17 00:00:00 2001 From: Josh Borrow Date: Fri, 26 Jan 2024 15:19:01 -0500 Subject: [PATCH 13/22] Added initial client --- hera_librarian/client.py | 94 ++++++++++++++++++++++++++++++++++ librarian_server/api/search.py | 5 +- 2 files changed, 97 insertions(+), 2 deletions(-) diff --git a/hera_librarian/client.py b/hera_librarian/client.py index cb984b8..d30cb9f 100644 --- a/hera_librarian/client.py +++ b/hera_librarian/client.py @@ -10,7 +10,11 @@ from pydantic import BaseModel from .deletion import DeletionPolicy +from .errors import ErrorCategory, ErrorSeverity from .exceptions import LibrarianError, LibrarianHTTPError +from .models.errors import (ErrorClearRequest, ErrorClearResponse, + ErrorSearchFailedResponse, ErrorSearchRequest, + ErrorSearchResponse, ErrorSearchResponses) from .models.ping import PingRequest, PingResponse from .models.search import (FileSearchRequest, FileSearchResponse, FileSearchResponses) @@ -352,3 +356,93 @@ def search_files( raise e return response.root + + def search_errors( + self, + id: Optional[int] = None, + category: Optional[ErrorCategory] = None, + severity: Optional[ErrorSeverity] = None, + create_time_window: Optional[tuple[datetime, ...]] = None, + include_resolved: bool = False, + max_results: int = 64, + ) -> list[ErrorSearchResponse]: + """ + Search for files on this librarain. + Parameters + ---------- + id : Optional[int], optional + The ID of the error to search for. If left empty, all errors will be + returned., by default None + category : Optional[ErrorCategory], optional + The category of errors to return. If left empty, all errors will be + returned., by default None + severity : Optional[ErrorSeverity], optional + The severity of errors to return. If left empty, all errors will be + returned., by default None + create_time_window : Optional[tuple[datetime, ...]], optional + The time window to search for files in. This is a tuple of two + datetimes, the first being the start and the second being the end. + Note that the datetimes should be in UTC., by default None + include_resolved : bool, optional + Whether or not to include resolved errors in the response. By + default, we do not., by default False + max_results : int, optional + The number of errors to return., by default 64 + + Returns + ------- + list[ErrorSearchResponse] + A list of errors that match the query. + """ + + try: + response: ErrorSearchResponses = self.post( + endpoint="search/errors", + request=ErrorSearchRequest( + id=id, + category=category, + severity=severity, + create_time_window=create_time_window, + include_resolved=include_resolved, + max_results=max_results, + ), + response=ErrorSearchResponses, + ) + except LibrarianHTTPError as e: + if e.status_code == 404 and e.reason == "No errors found.": + return [] + else: + raise e + + return response.root + + def clear_error( + self, + id: int, + ): + """ + Clear an error on this librarain. + + Parameters + ---------- + id : int + The ID of the error to clear. + + Raises + ------ + + ValueError + If the provided error ID is not found, or if the error has already been cleared. + """ + + try: + self.post( + endpoint="error/clear", + request=ErrorClearRequest(id=id), + response=ErrorClearResponse, + ) + except LibrarianHTTPError as e: + if e.status_code == 404 and "No error found with ID" in e.reason: + raise ValueError(e.reason) + elif e.status_code == 400 and "Error with ID" in e.reason: + raise ValueError(e.reason) diff --git a/librarian_server/api/search.py b/librarian_server/api/search.py index df5ddfa..d086862 100644 --- a/librarian_server/api/search.py +++ b/librarian_server/api/search.py @@ -156,8 +156,9 @@ def error( log.debug(f"No errors found. Returning 'error'.") response.status_code = status.HTTP_404_NOT_FOUND return ErrorSearchFailedResponse( - reason="No files found.", - suggested_remedy="Check that you are searching for the correct file.", + reason="No errors found.", + suggested_remedy="Check that you are searching for the correct " + "errors, or maybe you have a happy librarian!", ) # Build the response. From a174994c8be469f5a8d904c4b82b09b9f0deb282 Mon Sep 17 00:00:00 2001 From: Josh Borrow Date: Fri, 26 Jan 2024 16:20:26 -0500 Subject: [PATCH 14/22] Added integration test of client and server working in tandem for errors --- hera_librarian/cli.py | 8 +++ hera_librarian/client.py | 7 ++- librarian_server/api/errors.py | 4 +- tests/integration_test/conftest.py | 29 ++++++++- tests/integration_test/test_errors.py | 84 +++++++++++++++++++++++++++ 5 files changed, 127 insertions(+), 5 deletions(-) create mode 100644 tests/integration_test/test_errors.py diff --git a/hera_librarian/cli.py b/hera_librarian/cli.py index 0bc51c1..ba339ab 100644 --- a/hera_librarian/cli.py +++ b/hera_librarian/cli.py @@ -388,6 +388,14 @@ def upload(args): return +def search_errors(args): + """ + Search for errors on the librarian. + """ + + raise NotImplementedError + + # make the base parser def generate_parser(): """Make a librarian ArgumentParser. diff --git a/hera_librarian/client.py b/hera_librarian/client.py index d30cb9f..28e972b 100644 --- a/hera_librarian/client.py +++ b/hera_librarian/client.py @@ -397,7 +397,7 @@ def search_errors( try: response: ErrorSearchResponses = self.post( - endpoint="search/errors", + endpoint="search/error", request=ErrorSearchRequest( id=id, category=category, @@ -411,7 +411,7 @@ def search_errors( except LibrarianHTTPError as e: if e.status_code == 404 and e.reason == "No errors found.": return [] - else: + else: # pragma: no cover raise e return response.root @@ -443,6 +443,9 @@ def clear_error( ) except LibrarianHTTPError as e: if e.status_code == 404 and "No error found with ID" in e.reason: + print("HELLO WORLD") raise ValueError(e.reason) elif e.status_code == 400 and "Error with ID" in e.reason: raise ValueError(e.reason) + else: # pragma: no cover + raise e diff --git a/librarian_server/api/errors.py b/librarian_server/api/errors.py index 51df2a0..ba1f837 100644 --- a/librarian_server/api/errors.py +++ b/librarian_server/api/errors.py @@ -42,7 +42,7 @@ def clear_error( response.status_code = status.HTTP_404_NOT_FOUND return ErrorSearchFailedResponse( - reason="No error found to with ID {request.id} to clear.", + reason=f"No error found with ID {request.id} to clear.", suggested_remedy="Check you are searching for a valid error ID.", ) @@ -50,7 +50,7 @@ def clear_error( response.status_code = status.HTTP_400_BAD_REQUEST return ErrorSearchFailedResponse( - reason="Error with ID {request.id} has already been cleared.", + reason=f"Error with ID {request.id} has already been cleared.", suggested_remedy="Check you are searching for a valid error ID.", ) diff --git a/tests/integration_test/conftest.py b/tests/integration_test/conftest.py index 71f545e..4b3f657 100644 --- a/tests/integration_test/conftest.py +++ b/tests/integration_test/conftest.py @@ -19,7 +19,7 @@ @pytest.fixture(scope="package") -def server(xprocess, tmp_path_factory, request): +def server(xprocess, tmp_path_factory, request) -> Server: """ Starts a single server with pytest-xprocess. """ @@ -95,3 +95,30 @@ def librarian_client_command_line(server): os.environ["LIBRARIAN_CLIENT_CONNECTIONS"] = connections yield "test-A" + + +@pytest.fixture(scope="package") +def librarian_database_session_maker(server: Server): + """ + Generates a session maker for the database for the librarian + running in the other process. Use this to make database changes + behind the librarian's back (sparingly!). + + If using this, ask yourself if there should be a client API + endpoint for this instead. + """ + + from sqlalchemy import create_engine + from sqlalchemy.orm import sessionmaker + + engine = create_engine( + server.SQLALCHEMY_DATABASE_URI, connect_args={"check_same_thread": False} + ) + + SessionMaker = sessionmaker(bind=engine, autocommit=False, autoflush=False) + + yield SessionMaker + + del SessionMaker + + engine.dispose() diff --git a/tests/integration_test/test_errors.py b/tests/integration_test/test_errors.py new file mode 100644 index 0000000..55dd7a7 --- /dev/null +++ b/tests/integration_test/test_errors.py @@ -0,0 +1,84 @@ +""" +Tests that the client can handle communicating with the librarian about errors correctly. +""" + +import pytest + +from hera_librarian.errors import ErrorCategory, ErrorSeverity + + +@pytest.fixture(scope="function") +def server_with_fake_errors(server, test_orm, librarian_database_session_maker): + """ + Starts a server with a fake error. + """ + + with librarian_database_session_maker() as session: + error = test_orm.Error.new_error( + severity=ErrorSeverity.CRITICAL, + category=ErrorCategory.CONFIGURATION, + message="This is a fake error.", + ) + + session.add(error) + session.commit() + + error_id = error.id + + yield server + + with librarian_database_session_maker() as session: + error = session.get(test_orm.Error, error_id) + + session.delete(error) + session.commit() + + +def test_error_search(server_with_fake_errors, librarian_client): + """ + Tests that the client can search for errors correctly. + """ + + all_errors = librarian_client.search_errors() + + assert len(all_errors) > 0 + + assert all_errors[0].id is not None + assert all_errors[0].severity is not None + assert all_errors[0].category is not None + assert all_errors[0].message is not None + assert all_errors[0].raised_time is not None + assert all_errors[0].cleared_time is None + assert all_errors[0].cleared is False + + # See if we can clear this error. + + error_to_clear = all_errors[0].id + + cleared_error = librarian_client.clear_error(error_to_clear) + + all_errors = librarian_client.search_errors( + id=error_to_clear, include_resolved=True + ) + + assert all_errors[0].cleared + + # Check what happens if we clear it again + + with pytest.raises(ValueError): + cleared_error = librarian_client.clear_error(error_to_clear) + + +def test_error_search_missing(server_with_fake_errors, librarian_client): + """ + Tests that the client can handle searching for errors that don't exist. + """ + + all_errors = librarian_client.search_errors(id=-1) + + assert len(all_errors) == 0 + + # Try to clear it + + with pytest.raises(ValueError): + cleared_error = librarian_client.clear_error(-1) From c44521202475af389dfdc111756494f6be890173 Mon Sep 17 00:00:00 2001 From: Josh Borrow Date: Fri, 26 Jan 2024 16:45:35 -0500 Subject: [PATCH 15/22] Add cli parsers --- hera_librarian/cli.py | 53 +++++++++++++++++++++++++++++++++++++--- hera_librarian/client.py | 2 +- 2 files changed, 51 insertions(+), 4 deletions(-) diff --git a/hera_librarian/cli.py b/hera_librarian/cli.py index ba339ab..11058a8 100644 --- a/hera_librarian/cli.py +++ b/hera_librarian/cli.py @@ -17,7 +17,7 @@ import dateutil.parser from . import LibrarianClient -from .exceptions import LibrarianClientRemovedFunctionality, LibrarianError +from .exceptions import LibrarianClientRemovedFunctionality, LibrarianError, LibrarianHTTPError from .settings import client_settings __version__ = "TEST" @@ -26,7 +26,6 @@ # define some common help strings _conn_name_help = "Which Librarian to talk to; as in ~/.hl_client.cfg." - def die(fmt, *args): """Exit the script with the specifying error string. @@ -53,6 +52,13 @@ def die(fmt, *args): sys.exit(1) +def get_client(conn_name): + if conn_name not in client_settings.connections: + die("Connection name {} not found in client settings.".format(conn_name)) + + return LibrarianClient.from_info(client_settings.connections[conn_name]) + + # from https://stackoverflow.com/questions/17330139/python-printing-a-dictionary-as-a-horizontal-table-with-headers def print_table(dict_list, col_list=None, col_names=None): """Pretty print a list of dictionaries as a dynamically sized table. @@ -393,7 +399,48 @@ def search_errors(args): Search for errors on the librarian. """ - raise NotImplementedError + client = get_client(args.conn_name) + + try: + errors = client.search_errors( + id=args.id, + category=args.category, + severity=args.severity, + create_time_window=args.create_time_window, + include_resolved=args.include_resolved, + max_results=args.max_results, + ) + except LibrarianHTTPError as e: + die(f"Unexpected error communicating with the librarian server: {e.reason}") + + if len(errors) == 0: + print("No errors found.") + return + + print_table( + [e.dict() for e in errors], + col_list=["id", "severity", "category", "message", "raised_time", "cleared_time", "cleared", "caller"], + col_names=["ID", "Severity", "Category", "Message", "Raised", "Cleared", "Cleared Time", "Caller"], + ) + + return 0 + + +def clear_error(args): + """ + Clear an error on the librarian. + """ + + client = get_client(args.conn_name) + + try: + client.clear_error(args.id) + except ValueError as e: + die(f"Unable to find or clear error on the librarian: {e.args[0]}") + except LibrarianHTTPError as e: + die(f"Unexpected error communicating with the librarian server: {e.reason}") + + return 0 # make the base parser diff --git a/hera_librarian/client.py b/hera_librarian/client.py index 28e972b..5f0d36c 100644 --- a/hera_librarian/client.py +++ b/hera_librarian/client.py @@ -368,6 +368,7 @@ def search_errors( ) -> list[ErrorSearchResponse]: """ Search for files on this librarain. + Parameters ---------- id : Optional[int], optional @@ -443,7 +444,6 @@ def clear_error( ) except LibrarianHTTPError as e: if e.status_code == 404 and "No error found with ID" in e.reason: - print("HELLO WORLD") raise ValueError(e.reason) elif e.status_code == 400 and "Error with ID" in e.reason: raise ValueError(e.reason) From df3482e085b0224e07f6b70db5e5a85d4e544334 Mon Sep 17 00:00:00 2001 From: Josh Borrow Date: Mon, 29 Jan 2024 13:22:08 -0500 Subject: [PATCH 16/22] Add parsing for search errors and clear errors --- hera_librarian/cli.py | 182 +++++++++++++++++++++++++++++++++------ hera_librarian/errors.py | 8 +- 2 files changed, 165 insertions(+), 25 deletions(-) diff --git a/hera_librarian/cli.py b/hera_librarian/cli.py index 11058a8..b5e0151 100644 --- a/hera_librarian/cli.py +++ b/hera_librarian/cli.py @@ -13,11 +13,13 @@ import sys import time from pathlib import Path +from typing import Optional import dateutil.parser from . import LibrarianClient -from .exceptions import LibrarianClientRemovedFunctionality, LibrarianError, LibrarianHTTPError +from .exceptions import (LibrarianClientRemovedFunctionality, LibrarianError, + LibrarianHTTPError) from .settings import client_settings __version__ = "TEST" @@ -26,6 +28,7 @@ # define some common help strings _conn_name_help = "Which Librarian to talk to; as in ~/.hl_client.cfg." + def die(fmt, *args): """Exit the script with the specifying error string. @@ -55,10 +58,39 @@ def die(fmt, *args): def get_client(conn_name): if conn_name not in client_settings.connections: die("Connection name {} not found in client settings.".format(conn_name)) - + return LibrarianClient.from_info(client_settings.connections[conn_name]) +def parse_create_time_window( + args, + start_time_name: str = "create_time_start", + end_time_name: str = "create_time_end", +) -> Optional[tuple[datetime.datetime, datetime.datetime]]: + """ + Parses a window to search for files between two times. + """ + + create_time_window = None + + if args.create_time_start is not None or args.create_time_end is not None: + create_time_window = [] + + if args.create_time_start is not None: + create_time_window.append(dateutil.parser.parse(args.create_time_start)) + else: + create_time_window.append(datetime.datetime.min) + + if args.create_time_end is not None: + create_time_window.append(dateutil.parser.parse(args.create_time_end)) + else: + create_time_window.append(datetime.datetime.max) + + create_time_window = tuple(create_time_window) + + return create_time_window + + # from https://stackoverflow.com/questions/17330139/python-printing-a-dictionary-as-a-horizontal-table-with-headers def print_table(dict_list, col_list=None, col_names=None): """Pretty print a list of dictionaries as a dynamically sized table. @@ -267,23 +299,7 @@ def search_files(args): # Create the search request - # Start with the most complex part, parsing dates... - create_time_window = None - - if args.create_time_start is not None or args.create_time_end is not None: - create_time_window = [] - - if args.create_time_start is not None: - create_time_window.append(dateutil.parser.parse(args.create_time_start)) - else: - create_time_window.append(datetime.datetime.min) - - if args.create_time_end is not None: - create_time_window.append(dateutil.parser.parse(args.create_time_end)) - else: - create_time_window.append(datetime.datetime.max) - - create_time_window = tuple(create_time_window) + create_time_window = parse_create_time_window(args) # Perform the search @@ -391,7 +407,7 @@ def upload(args): except Exception as e: die("Upload failed (unknown error): {}".format(e)) - return + return 0 def search_errors(args): @@ -416,11 +432,29 @@ def search_errors(args): if len(errors) == 0: print("No errors found.") return - + print_table( [e.dict() for e in errors], - col_list=["id", "severity", "category", "message", "raised_time", "cleared_time", "cleared", "caller"], - col_names=["ID", "Severity", "Category", "Message", "Raised", "Cleared", "Cleared Time", "Caller"], + col_list=[ + "id", + "severity", + "category", + "message", + "raised_time", + "cleared_time", + "cleared", + "caller", + ], + col_names=[ + "ID", + "Severity", + "Category", + "Message", + "Raised", + "Cleared", + "Cleared Time", + "Caller", + ], ) return 0 @@ -439,7 +473,7 @@ def clear_error(args): die(f"Unable to find or clear error on the librarian: {e.args[0]}") except LibrarianHTTPError as e: die(f"Unexpected error communicating with the librarian server: {e.reason}") - + return 0 @@ -484,6 +518,7 @@ def generate_parser(): config_set_file_deletion_policy_subparser(sub_parsers) config_stage_files_subparser(sub_parsers) config_upload_subparser(sub_parsers) + config_search_errors_subparser(sub_parsers) return ap @@ -995,6 +1030,105 @@ def config_upload_subparser(sub_parsers): return +def config_search_errors_subparser(sub_parsers): + # function documentation + doc = """Search for errors in the librarian. + + """ + example = """Search for errors matching the query, for instance to find all errors + with a level of 'CRITICAL', you would use: + + librarian search-errors LIBRARIAN_NAME --severity=critical + + """ + + hlp = "Search for errors matching a query" + + from .errors import ErrorCategory, ErrorSeverity + + # add sub parser + sp = sub_parsers.add_parser( + "search-errors", description=doc, epilog=example, help=hlp + ) + + sp.add_argument("conn_name", metavar="CONNECTION-NAME", help=_conn_name_help) + + sp.add_argument( + "--id", + help="Search for an error with this ID.", + type=int, + ) + + sp.add_argument( + "-c", + "--category", + type=ErrorCategory, + choices=list(ErrorCategory), + ) + + sp.add_argument( + "-s", + "--severity", + help="Search for errors with this severity.", + type=ErrorSeverity, + choices=list(ErrorSeverity), + ) + + sp.add_argument( + "--create-time-start", + help="Search for errors who were created after this date and time. Use a parseable date string, if no timezone is specified, UTC is assumed.", + ) + + sp.add_argument( + "--create-time-end", + help="Search for errors who were created before this date and time. Use a parseable date string, if no timezone is specified, UTC is assumed.", + ) + + sp.add_argument( + "--include-resolved", + action="store_true", + help="If this flag is present, include errors that have been cleared in the search. Otherwise, only active errors are returned.", + ) + + sp.add_argument( + "--max-results", + type=int, + default=64, + help="Maximum number of results to return.", + ) + + sp.set_defaults(func=search_errors) + + +def config_clear_error_subparser(sub_parsers): + # function documentation + doc = """Clear an error on the librarian. + + """ + example = """Clear an error with the given ID: + + librarian clear-error LIBRARIAN_NAME 1234 + + """ + + hlp = "Clear an error on the librarian" + + # add sub parser + sp = sub_parsers.add_parser( + "clear-error", description=doc, epilog=example, help=hlp + ) + + sp.add_argument("conn_name", metavar="CONNECTION-NAME", help=_conn_name_help) + + sp.add_argument( + "id", + help="The ID of the error to clear.", + type=int, + ) + + sp.set_defaults(func=clear_error) + + def main(): # make a parser and run the specified command parser = generate_parser() diff --git a/hera_librarian/errors.py b/hera_librarian/errors.py index ce5197c..3510e4d 100644 --- a/hera_librarian/errors.py +++ b/hera_librarian/errors.py @@ -21,6 +21,9 @@ class ErrorSeverity(Enum): INFO = "info" "Informational errors are those that are not critical and do not need to be fixed." + def __str__(self): + return self.value + class ErrorCategory(Enum): """ @@ -40,4 +43,7 @@ class ErrorCategory(Enum): "Store full errors are those that indicate that a store is full and cannot accept new data." PROGRAMMING = "programming" - "Programming errors are those that indicate that the librarian has a bug, i.e. we have reached an 'unreachable' state." \ No newline at end of file + "Programming errors are those that indicate that the librarian has a bug, i.e. we have reached an 'unreachable' state." + + def __str__(self): + return self.value \ No newline at end of file From 040372e74a5fb320986a851372d3e9b07a5b59ae Mon Sep 17 00:00:00 2001 From: Josh Borrow Date: Mon, 29 Jan 2024 13:49:21 -0500 Subject: [PATCH 17/22] Add cli test --- hera_librarian/cli.py | 6 ++- tests/integration_test/test_errors.py | 66 +++++++++++++++++++++++---- 2 files changed, 61 insertions(+), 11 deletions(-) diff --git a/hera_librarian/cli.py b/hera_librarian/cli.py index b5e0151..206af06 100644 --- a/hera_librarian/cli.py +++ b/hera_librarian/cli.py @@ -417,12 +417,14 @@ def search_errors(args): client = get_client(args.conn_name) + create_time_window = parse_create_time_window(args) + try: errors = client.search_errors( id=args.id, category=args.category, severity=args.severity, - create_time_window=args.create_time_window, + create_time_window=create_time_window, include_resolved=args.include_resolved, max_results=args.max_results, ) @@ -519,6 +521,7 @@ def generate_parser(): config_stage_files_subparser(sub_parsers) config_upload_subparser(sub_parsers) config_search_errors_subparser(sub_parsers) + config_clear_error_subparser(sub_parsers) return ap @@ -1122,6 +1125,7 @@ def config_clear_error_subparser(sub_parsers): sp.add_argument( "id", + metavar="ERROR-ID", help="The ID of the error to clear.", type=int, ) diff --git a/tests/integration_test/test_errors.py b/tests/integration_test/test_errors.py index 55dd7a7..a831b17 100644 --- a/tests/integration_test/test_errors.py +++ b/tests/integration_test/test_errors.py @@ -2,6 +2,8 @@ Tests that the client can handle communicating with the librarian about errors correctly. """ +import subprocess + import pytest from hera_librarian.errors import ErrorCategory, ErrorSeverity @@ -13,24 +15,28 @@ def server_with_fake_errors(server, test_orm, librarian_database_session_maker): Starts a server with a fake error. """ + error_ids = [] + with librarian_database_session_maker() as session: - error = test_orm.Error.new_error( - severity=ErrorSeverity.CRITICAL, - category=ErrorCategory.CONFIGURATION, - message="This is a fake error.", - ) + for error in range(32): + error = test_orm.Error.new_error( + severity=ErrorSeverity.CRITICAL, + category=ErrorCategory.CONFIGURATION, + message="This is a fake error.", + ) - session.add(error) - session.commit() + session.add(error) + session.commit() - error_id = error.id + error_ids.append(error.id) yield server with librarian_database_session_maker() as session: - error = session.get(test_orm.Error, error_id) + for error_id in error_ids: + error = session.get(test_orm.Error, error_id) + session.delete(error) - session.delete(error) session.commit() @@ -82,3 +88,43 @@ def test_error_search_missing(server_with_fake_errors, librarian_client): with pytest.raises(ValueError): cleared_error = librarian_client.clear_error(-1) + + +def test_error_search_cli_path(server_with_fake_errors, librarian_client_command_line): + """ + Tests that the CLI can search for errors correctly. + """ + + captured = subprocess.check_output( + [ + "librarian", + "search-errors", + librarian_client_command_line, + "--id=30", + "--include-resolved", + ] + ) + + assert "This is a fake error." in str(captured) + + captured = subprocess.check_output( + [ + "librarian", + "clear-error", + librarian_client_command_line, + "30", + ] + ) + + assert captured == b"" + + captured = subprocess.check_output( + [ + "librarian", + "search-errors", + librarian_client_command_line, + "--include-resolved", + ] + ) + + assert "True" in str(captured) From 23115a627bfb4b59b336e778766282ab279369f8 Mon Sep 17 00:00:00 2001 From: Josh Borrow Date: Mon, 29 Jan 2024 13:52:31 -0500 Subject: [PATCH 18/22] Change test server store names for bg test execution --- tests/server.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/server.py b/tests/server.py index e0b1fbb..ff1df4a 100644 --- a/tests/server.py +++ b/tests/server.py @@ -75,7 +75,7 @@ def server_setup(tmp_path_factory) -> Server: store_config = [ { - "store_name": "test_store", + "store_name": "local_store", "store_type": "local", "ingestable": True, "store_data": { @@ -90,7 +90,7 @@ def server_setup(tmp_path_factory) -> Server: }, }, { - "store_name": "test_store_clone", + "store_name": "local_clone", "store_type": "local", "ingestable": False, "store_data": { From fcb31637b3bda0596463dce97aafb23dfba2f7eb Mon Sep 17 00:00:00 2001 From: Josh Borrow Date: Mon, 29 Jan 2024 14:14:51 -0500 Subject: [PATCH 19/22] Added recv error flagging --- hera_librarian/errors.py | 3 +++ librarian_background/recieve_clone.py | 31 ++++++++++++++++++++------- 2 files changed, 26 insertions(+), 8 deletions(-) diff --git a/hera_librarian/errors.py b/hera_librarian/errors.py index 3510e4d..c61ebd6 100644 --- a/hera_librarian/errors.py +++ b/hera_librarian/errors.py @@ -45,5 +45,8 @@ class ErrorCategory(Enum): PROGRAMMING = "programming" "Programming errors are those that indicate that the librarian has a bug, i.e. we have reached an 'unreachable' state." + LIBRARIAN_NETWORK_AVAILABILITY = "librarian_network_availability" + "Librarian network availability errors are those that indicate that the network connection to other librarians is not available." + def __str__(self): return self.value \ No newline at end of file diff --git a/librarian_background/recieve_clone.py b/librarian_background/recieve_clone.py index 4504a35..0284c8d 100644 --- a/librarian_background/recieve_clone.py +++ b/librarian_background/recieve_clone.py @@ -70,9 +70,14 @@ def core(self, session: Session): store: StoreMetadata = transfer.store if store is None: - # TODO: Check if this should be a programming error. - logger.error( - f"Transfer {transfer.id} has no store associated with it. Skipping for now." + log_to_database( + severity=ErrorSeverity.CRITICAL, + category=ErrorCategory.PROGRAMMING, + message=( + f"Transfer {transfer.id} has no store associated with it. " + "Skipping for now, but this should never happen." + ), + session=session, ) all_transfers_succeeded = False @@ -82,9 +87,13 @@ def core(self, session: Session): try: path_info = store.store_manager.path_info(Path(transfer.staging_path)) except TypeError: - # TODO: Check if this should be a programming error. - logger.error( - f"Transfer {transfer.id} has no staging path associated with it. Skipping for now." + log_to_database( + severity=ErrorSeverity.ERROR, + category=ErrorCategory.DATA_AVAILABILITY, + message=( + f"Transfer {transfer.id}: cannot get information about staging " + f"path: {transfer.staging_path}. Skipping for now." + ), ) all_transfers_succeeded = False @@ -169,8 +178,14 @@ def core(self, session: Session): ) ) except Exception as e: - logger.error( - f"Failed to call back to librarian {librarian.name} with exception {e}." + log_to_database( + severity=ErrorSeverity.ERROR, + category=ErrorCategory.LIBRARIAN_NETWORK_AVAILABILITY, + message=( + f"Failed to call back to librarian {librarian.name} " + f"with exception {e}." + ), + session=session, ) else: logger.error( From 2d569aaabceef651e76eb8744d534bf49a98a494 Mon Sep 17 00:00:00 2001 From: Josh Borrow Date: Mon, 29 Jan 2024 14:38:59 -0500 Subject: [PATCH 20/22] Add logging to clones --- librarian_background/recieve_clone.py | 1 + librarian_background/send_clone.py | 69 +++++++++++++++++++++------ 2 files changed, 55 insertions(+), 15 deletions(-) diff --git a/librarian_background/recieve_clone.py b/librarian_background/recieve_clone.py index 0284c8d..5e623c2 100644 --- a/librarian_background/recieve_clone.py +++ b/librarian_background/recieve_clone.py @@ -94,6 +94,7 @@ def core(self, session: Session): f"Transfer {transfer.id}: cannot get information about staging " f"path: {transfer.staging_path}. Skipping for now." ), + session=session, ) all_transfers_succeeded = False diff --git a/librarian_background/send_clone.py b/librarian_background/send_clone.py index e30662c..191bb99 100644 --- a/librarian_background/send_clone.py +++ b/librarian_background/send_clone.py @@ -22,6 +22,7 @@ Librarian, ) from librarian_server.settings import server_settings +from librarian_server.logger import log_to_database from hera_librarian.models.clone import ( CloneInitiationRequest, @@ -29,6 +30,7 @@ CloneOngoingRequest, CloneOngoingResponse ) +from hera_librarian.errors import ErrorCategory, ErrorSeverity from typing import TYPE_CHECKING @@ -70,9 +72,14 @@ def core(self, session: Session): ).first() if librarian is None: - logger.error( - f"Librarian {self.destination_librarian} does not exist within database." - "Cancelling job. Please update the configuration." + log_to_database( + severity=ErrorSeverity.CRITICAL, + category=ErrorCategory.LIBRARIAN_NETWORK_AVAILABILITY, + description=( + f"Librarian {self.destination_librarian} does not exist within database. " + "Cancelling job. Please update the configuration (and re-start the librarian)." + ), + session=session, ) return CancelJob @@ -81,10 +88,18 @@ def core(self, session: Session): try: client.ping() except Exception as e: - logger.error( - f"Librarian {self.destination_librarian} is unreachable. Cancelling job." + log_to_database( + severity=ErrorSeverity.ERROR, + category=ErrorCategory.LIBRARIAN_NETWORK_AVAILABILITY, + description=( + f"Librarian {self.destination_librarian} is unreachable. Skipping sending clones." + ), + session=session, ) - return CancelJob + + # No point canceling job, our freind could just be down for a while. + + return current_time = datetime.datetime.utcnow() age_in_days = datetime.timedelta(days=self.age_in_days) @@ -106,9 +121,16 @@ def core(self, session: Session): ).first() if use_store is None: - logger.error( - "Store {self.store_preference} does not exist. Cancelling job. Please update the configuration." + log_to_database( + severity=ErrorSeverity.CRITICAL, + category=ErrorCategory.CONFIGURATION, + description=( + f"Store {self.store_preference} does not exist. Cancelling job. " + "Please update the configuration." + ), + session=session, ) + return CancelJob for file in files_without_remote_instances: @@ -165,8 +187,14 @@ def core(self, session: Session): response_model=CloneInitiationResponse, ) except Exception as e: - logger.error( - f"Failed to stage clone for file {file.name} with exception {e}." + log_to_database( + severity=ErrorSeverity.ERROR, + category=ErrorCategory.LIBRARIAN_NETWORK_AVAILABILITY, + description=( + f"Unable to communicate with remote librarian for {transfer.id} " + f"to stage clone with exception {e}." + ), + session=session, ) # Mark the transfer as failed. @@ -199,8 +227,14 @@ def core(self, session: Session): continue if not success: - logger.error( - f"Failed to transfer file {instance.path} to remote store. Skipping." + log_to_database( + severity=ErrorSeverity.ERROR, + category=ErrorCategory.DATA_AVAILABILITY, + description=( + f"Unable to transfer file {instance.path} to remote store. " + "Skipping." + ), + session=session, ) transfer.fail_transfer(session=session) @@ -227,9 +261,14 @@ def core(self, session: Session): response_model=CloneOngoingResponse, ) except Exception as e: - logger.error( - f"Unable to communicate with remote librarian for {transfer.id} " - f"to let it know that the transfer is ongoing with exception {e}." + log_to_database( + severity=ErrorSeverity.ERROR, + category=ErrorCategory.LIBRARIAN_NETWORK_AVAILABILITY, + description=( + f"Unable to communicate with remote librarian for {transfer.id} " + f"to let it know that the transfer is ongoing with exception {e}." + ), + session=session, ) # Don't fail the transfer... I mean, the data is on the way (it might From 020b7d64012628c012995ef2d225c6081c5044ba Mon Sep 17 00:00:00 2001 From: Josh Borrow Date: Tue, 30 Jan 2024 13:40:42 -0500 Subject: [PATCH 21/22] Fix debug message in create_clone --- librarian_background/create_clone.py | 5 ++- librarian_server/api/upload.py | 62 +++++++++++++++++----------- 2 files changed, 41 insertions(+), 26 deletions(-) diff --git a/librarian_background/create_clone.py b/librarian_background/create_clone.py index c3fbba6..25b8c64 100644 --- a/librarian_background/create_clone.py +++ b/librarian_background/create_clone.py @@ -155,11 +155,14 @@ def core(self, session: Session): transfer.fail_transfer(session=session) continue + else: + break + except FileNotFoundError as e: log_to_database( severity=ErrorSeverity.ERROR, category=ErrorCategory.DATA_AVAILABILITY, - message=f"File {instance.path} does not exist on store {store_from}. Skipping. (Instance {instance.id})", + message=f"File {e.filename} does not exist when trying to clone from {store_from}. Skipping. (Instance {instance.id})", session=session, ) diff --git a/librarian_server/api/upload.py b/librarian_server/api/upload.py index 95a77a3..f835da1 100644 --- a/librarian_server/api/upload.py +++ b/librarian_server/api/upload.py @@ -3,32 +3,34 @@ stores. """ -from ..webutil import ServerError -from ..orm.storemetadata import StoreMetadata -from ..orm.transfer import TransferStatus, IncomingTransfer -from ..orm.file import File -from ..database import yield_session -from ..logger import log - -from hera_librarian.models.uploads import ( - UploadInitiationRequest, - UploadInitiationResponse, - UploadCompletionRequest, - UploadFailedResponse, -) - from pathlib import Path from typing import Optional -from fastapi import APIRouter, Response, status, Depends -from sqlalchemy.orm import Session +from fastapi import APIRouter, Depends, Response, status from sqlalchemy import select +from sqlalchemy.orm import Session + +from hera_librarian.models.uploads import (UploadCompletionRequest, + UploadFailedResponse, + UploadInitiationRequest, + UploadInitiationResponse) + +from ..database import yield_session +from ..logger import log +from ..orm.file import File +from ..orm.storemetadata import StoreMetadata +from ..orm.transfer import IncomingTransfer, TransferStatus +from ..webutil import ServerError router = APIRouter(prefix="/api/v2/upload") @router.post("/stage", response_model=UploadInitiationResponse | UploadFailedResponse) -def stage(request: UploadInitiationRequest, response: Response, session: Session = Depends(yield_session)): +def stage( + request: UploadInitiationRequest, + response: Response, + session: Session = Depends(yield_session), +): """ Initiates an upload to a store. @@ -69,12 +71,16 @@ def stage(request: UploadInitiationRequest, response: Response, session: Session ) # First, try to see if this is someone trying to re-start an existing transfer! - existing_transfer = session.query(IncomingTransfer).filter( - (IncomingTransfer.transfer_checksum == request.upload_checksum) - & (IncomingTransfer.status != TransferStatus.FAILED) - & (IncomingTransfer.status != TransferStatus.COMPLETED) - & (IncomingTransfer.status != TransferStatus.CANCELLED) - ).all() + existing_transfer = ( + session.query(IncomingTransfer) + .filter( + (IncomingTransfer.transfer_checksum == request.upload_checksum) + & (IncomingTransfer.status != TransferStatus.FAILED) + & (IncomingTransfer.status != TransferStatus.COMPLETED) + & (IncomingTransfer.status != TransferStatus.CANCELLED) + ) + .all() + ) if len(existing_transfer) != 0: log.info( @@ -164,7 +170,11 @@ def stage(request: UploadInitiationRequest, response: Response, session: Session @router.post("/commit") -def commit(request: UploadCompletionRequest, response: Response, session: Session = Depends(yield_session)): +def commit( + request: UploadCompletionRequest, + response: Response, + session: Session = Depends(yield_session), +): """ Commits a file to a store, called once it has been uploaded. @@ -177,7 +187,9 @@ def commit(request: UploadCompletionRequest, response: Response, session: Sessio log.debug(f"Received upload completion request: {request}") - store: StoreMetadata = session.query(StoreMetadata).filter_by(name=request.store_name).first() + store: StoreMetadata = ( + session.query(StoreMetadata).filter_by(name=request.store_name).first() + ) # Go grab the transfer from the database. transfer = session.get(IncomingTransfer, request.transfer_id) From 4200bb42f21862df8c546f1938b4b625ae473e28 Mon Sep 17 00:00:00 2001 From: Josh Borrow Date: Tue, 30 Jan 2024 14:39:16 -0500 Subject: [PATCH 22/22] Fixed create_clone bugs --- librarian_background/create_clone.py | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/librarian_background/create_clone.py b/librarian_background/create_clone.py index 25b8c64..3be0baf 100644 --- a/librarian_background/create_clone.py +++ b/librarian_background/create_clone.py @@ -117,7 +117,8 @@ def core(self, session: Session): # Now we can clone the file to the clone_to store. try: staging_name, staged_path = store_to.store_manager.stage( - file_size=instance.file.size, file_name=instance.file.name + file_size=instance.file.size, + file_name=Path(instance.file.name).name, ) except ValueError: # TODO: In the future where we have multiple potential clone stores for SneakerNet we should @@ -212,8 +213,12 @@ def core(self, session: Session): continue + resolved_store_path = store_to.store_manager.store( + Path(instance.file.name) + ) + store_to.store_manager.commit( - staging_path=staged_path, store_path=Path(instance.file.name) + staging_path=staged_path, store_path=resolved_store_path ) except FileExistsError: log_to_database(