diff --git a/hera_librarian/models/corrupt.py b/hera_librarian/models/corrupt.py new file mode 100644 index 0000000..45e598e --- /dev/null +++ b/hera_librarian/models/corrupt.py @@ -0,0 +1,24 @@ +""" +Models for the corruption fixing endpoints. +""" + +from pydantic import BaseModel + + +class CorruptionPreparationRequest(BaseModel): + file_name: str + librarian_name: str + + +class CorruptionPreparationResponse(BaseModel): + ready: bool + + +class CorruptionResendRequest(BaseModel): + librarian_name: str + file_name: str + + +class CorruptionResendResponse(BaseModel): + success: bool + destination_transfer_id: int diff --git a/librarian_background/corruption_fixer.py b/librarian_background/corruption_fixer.py index 4db04dd..09c62d4 100644 --- a/librarian_background/corruption_fixer.py +++ b/librarian_background/corruption_fixer.py @@ -9,10 +9,19 @@ from sqlalchemy.orm import Session from hera_librarian.errors import LibrarianError, LibrarianHTTPError +from hera_librarian.models.corrupt import ( + CorruptionPreparationRequest, + CorruptionPreparationResponse, + CorruptionResendRequest, + CorruptionResendResponse, +) +from hera_librarian.transfer import TransferStatus from hera_librarian.utils import compare_checksums, get_hash_function_from_hash from librarian_server.database import get_session from librarian_server.orm.file import CorruptFile, File +from librarian_server.orm.instance import Instance from librarian_server.orm.librarian import Librarian +from librarian_server.orm.transfer import IncomingTransfer from .task import Task @@ -50,16 +59,24 @@ def core(self, session: Session) -> bool: "Attempting to fix {id} ({name})", id=corrupt.id, name=corrupt.file_name ) + # First: query the file table to see if we still have the file. We do not store + # a foreign key in the corrupt table because we may have deleted the file and + # failed to contact the upstream. + stmt = select(File).filter_by(name=corrupt.file_name) + potential_file = session.execute(stmt).scalar_one_or_none() + + stmt = select(Instance).filter_by(id=corrupt.instance_id) + potential_instance = session.execute(stmt).scalar_one_or_none() + # Step A: Check that the file is actually corrupt try: - hash_function = get_hash_function_from_hash(corrupt.file.checksum) - instance = corrupt.instance - store = instance.store + hash_function = get_hash_function_from_hash(potential_file.checksum) + store = potential_instance.store path_info = store.store_manager.path_info( - instance.path, hash_function=hash_function + potential_instance.path, hash_function=hash_function ) - if compare_checksums(corrupt.file.checksum, path_info.checksum): + if compare_checksums(potential_file.checksum, path_info.checksum): logger.info( "CorruptFile {id} stated that file {name} was corrupt in instance {inst_id} " "but we just checked the checksums: {chk_a}=={chk_b} and the file is fine " @@ -67,39 +84,38 @@ def core(self, session: Session) -> bool: id=corrupt.id, name=corrupt.file_name, inst_id=corrupt.instance_id, - chk_a=corrupt.file.checksum, + chk_a=potential_file.checksum, chk_b=path_info.checksum, ) session.delete(corrupt) session.commit() continue - except FileNotFoundError: - logger.error( - "Instance {} on store {} is missing, but we will continue with recovery (Instance: {})", - instance.path, - store.name, - instance.id, - ) - # Ok, so the file _really is corrupt_ or it is missing - - # Remedy A: We have another local copy of the file! - # TODO: Implement this; it is not relevant for SO. - if len(corrupt.file.instances) > 1: - # Uhhh there is more than one instance here, we don't know what to do. + # Remedy A: We have another local copy of the file! + # TODO: Implement this; it is not relevant for SO. + if len(potential_file.instances) > 1: + # Uhhh there is more than one instance here, we don't know what to do. + logger.error( + "File {name} has a corrupt instance {id} but there is {n} > 1 " + "instances of the file on this librarian; entered block that was " + "never completed and need manual remedy", + name=corrupt.file_name, + id=corrupt.instance_id, + n=len(potential_file.instances), + ) + continue + except (FileNotFoundError, AttributeError): logger.error( - "File {name} has a corrupt instance {id} but there is {n} > 1 " - "instances of the file on this librarian; entered block that was " - "never completed and need manual remedy", - name=corrupt.file_name, - id=corrupt.instance_id, - n=len(corrupt.file.instances), + "Instance {} is missing, but we will continue with recovery (File: {})", + corrupt.instance_id, + corrupt.file_name, ) - continue + + # Ok, so the file _really is corrupt_ or it is missing and we only have one instance # Remedy B: the origin of this file is another librarian. Ask for a new copy. - stmt = select(Librarian).filter_by(name=corrupt.file.source) - result = session.execute(stmt).scalars().one_or_none() + stmt = select(Librarian).filter_by(name=corrupt.file_source) + result = session.execute(stmt).scalar_one_or_none() if result is None: logger.error( @@ -108,7 +124,7 @@ def core(self, session: Session) -> bool: "request a new valid copy of the file", name=corrupt.file_name, id=corrupt.instance_id, - lib=corrupt.file.source, + lib=corrupt.file_source, ) continue @@ -126,14 +142,132 @@ def core(self, session: Session) -> bool: ) continue - # TODO: CALL PREPARE ENDPOINT + prepare_request = CorruptionPreparationRequest( + file_name=corrupt.file_name, librarian_name=result.name + ) - # TODO: Deal with the fact that we would have broken remote instances..? - corrupt.file.delete(session=session, commit=False, force=True) + try: + prepare_response: CorruptionPreparationResponse = client.post( + endpoint="corrupt/prepare", + request=prepare_request, + response=CorruptionPreparationResponse, + ) - # TODO: CALL RE-SEND ENDPOINT; DO NOT COMMIT UNTIL WE HEAR BACK; NOTE THAT WE WILL - # HAVE DELETED THE DATA EVEN IF WE FAIL (THAT IS NON-RECOVERABLE) BUT HAVING - # THE ROWS SIMPLIFIES THE LOGIC ABOVE. + if not prepare_response.ready: + raise ValueError("Preparation endpoint returned False") + except (LibrarianError, LibrarianHTTPError, ValueError) as e: + logger.error( + "Librarian {lib} contact during preparation for corruption fix to restore " + "{name} did not succeed: {e}", + lib=result.name, + name=corrupt.file_name, + e=e, + ) + continue + + # This also deletes remote instances which will need to be repaired. However + # it is unlikely that we will be in that situation. Unfortunately we _must_ commit + # this as the files table must be accessed from a different table. + corrupt.file.delete(session=session, commit=True, force=True) + + resend_request = CorruptionResendRequest( + file_name=corrupt.file_name, + librarian_name=result.name, + ) + + try: + resend_response: CorruptionResendResponse = client.post( + "corrupt/resend", + request=resend_request, + response=CorruptionResendResponse, + ) + + if not resend_response.success: + raise ValueError("Failure during resend") + except (LibrarianError, LibrarianHTTPError): + logger.error( + "Failed during the resend request flow for librarian {lib}, " + "corrupt {id} for file {name} with {e}", + lib=result.name, + id=corrupt.id, + name=corrupt.file_name, + e=e, + ) + continue + corrupt.incoming_transfer_id = resend_response.destination_transfer_id corrupt.replacement_requested = True session.commit() + + # Now check in on files that we already requested new copies of. + query_start = perf_counter() + + stmt = ( + select(CorruptFile) + .filter(CorruptFile.replacement_requested == True) + .with_for_update(skip_locked=True) + ) + + results = session.execute(stmt).scalars().all() + + query_end = perf_counter() + + logger.info( + "Took {} s to query for {} corrupt files already in progress", + query_end - query_start, + len(results), + ) + + for result in results: + stmt = select(IncomingTransfer).filter_by(id=result.incoming_transfer_id) + transfer = session.execute(stmt).scalar_one_or_none() + + file_is_fixed = False + + if transfer.status in [TransferStatus.FAILED, TransferStatus.CANCELLED]: + logger.warning( + "Transfer for corrupt file {id} ({name}) is in status {status}", + id=result.id, + name=result.file_name, + status=transfer.status, + ) + # That's no good. We should check to see if we got the file anyway: + stmt = select(File).filter_by(name=result.file_name) + file = session.execute(stmt).scalar_one_or_none() + + if file is not None: + # Oh, we're good. Phew, we successfully ingested it. + logger.info( + "Though transfer is in status {status}, file {name} was successfully " + "ingested anyway", + status=transfer.status, + name=result.file_name, + ) + file_is_fixed = True + else: + # We actually need to re-download it. + logger.warning( + "Re-setting corrupt file {id} ({name}) to not having a replacement requested " + "as the transfer failed. It will be re-downloaded at the next invocation ", + id=result.id, + name=result.file_name, + ) + result.replacement_requested = False + elif transfer.status in [TransferStatus.COMPLETED]: + # That's good, we got the file! + file_is_fixed = True + else: + file_is_fixed = False + + if file_is_fixed: + logger.info( + "Confirmed that corrupt file {id} ({name}) has been replaced with a new copy; " + "deleting the CorruptFile row", + id=result.id, + name=result.file_name, + ) + session.delete(result) + + session.commit() + + return diff --git a/librarian_background/send_clone.py b/librarian_background/send_clone.py index 314d46a..2fdf5d0 100644 --- a/librarian_background/send_clone.py +++ b/librarian_background/send_clone.py @@ -532,7 +532,7 @@ def send_file_batch( call_destination_and_state_ongoing(send=send, session=session) - return True + return list(transfer_map.values()) class SendClone(Task): diff --git a/librarian_server/api/corrupt.py b/librarian_server/api/corrupt.py index 2107012..cea2100 100644 --- a/librarian_server/api/corrupt.py +++ b/librarian_server/api/corrupt.py @@ -7,6 +7,12 @@ from sqlalchemy.orm import Session from hera_librarian.exceptions import LibrarianError, LibrarianHTTPError +from hera_librarian.models.corrupt import ( + CorruptionPreparationRequest, + CorruptionPreparationResponse, + CorruptionResendRequest, + CorruptionResendResponse, +) from hera_librarian.utils import compare_checksums, get_hash_function_from_hash from librarian_server.orm.instance import Instance, RemoteInstance from librarian_server.orm.librarian import Librarian @@ -14,24 +20,19 @@ router = APIRouter(prefix="/api/v2/corrupt") from loguru import logger -from pydantic import BaseModel from ..database import yield_session -from .auth import CallbackUserDependency, ReadappendUserDependency - - -class CorruptionPreparationRequest(BaseModel): - file_name: str - librarian_name: str - - -class CorruptionPreparationResponse(BaseModel): - ready: bool +from .auth import CallbackUserDependency, User def user_and_librarian_validation_flow( - user, librarian_name, file_name, session + user: User, librarian_name: str, file_name: str, session: Session ) -> tuple[Librarian, File, Instance, list[RemoteInstance]]: + """ + Figure out if this user is a librarian and that we can make file transfers + to that librarian for this file. Also validates the file on our librarian to make + sure it is not corrupt and is present. + """ user_is_librarian = user.username == librarian_name stmt = select(Librarian).filter_by(name=librarian_name) @@ -163,15 +164,6 @@ def prepare( return CorruptionPreparationResponse(ready=True) -class CorruptionResendRequest(BaseModel): - librarian_name: str - file_name: str - - -class CorruptionResendResponse(BaseModel): - success: bool - - @router.post("/resend") def resend( request: CorruptionResendRequest, @@ -227,4 +219,6 @@ def resend( request.file_name, ) - return CorruptionResendResponse(success=success) + return CorruptionResendResponse( + success=bool(success), destination_transfer_id=success[0] + ) diff --git a/librarian_server/orm/file.py b/librarian_server/orm/file.py index 5f23aba..eb0d020 100644 --- a/librarian_server/orm/file.py +++ b/librarian_server/orm/file.py @@ -150,7 +150,10 @@ def delete( class CorruptFile(db.Base): """ An ORM object for a file that has been marked as (potentially) corrupt - during a check. This will need to be verified and fixed. + during a check. This will need to be verified and fixed. We do not store + references to the files and instances table here because those may be + deleted as part of the recovery process. As such, we need to store + copies of that data so that we can ask upstream for file recovery. """ __tablename__ = "corrupt_files" @@ -158,16 +161,14 @@ class CorruptFile(db.Base): id: int = db.Column(db.Integer, primary_key=True) "The ID of the corrupt file." file_name: str = db.Column( - db.String(256), db.ForeignKey("files.name"), nullable=False + db.String(256), ) "The name of the file." - file = db.relationship("File", primaryjoin="CorruptFile.file_name == File.name") - "The file object associated with this." - instance_id: int = db.Column(db.Integer, db.ForeignKey("instances.id")) + file_source: str = db.Column(db.String(256)) + "The source of the file." + instance_id: int = db.Column(db.Integer) "The instance ID of the corrupt file." - instance = db.relationship( - "Instance", primaryjoin="CorruptFile.instance_id == Instance.id" - ) + instance_path: str = db.Column(db.String(256)) "The instance object associated with this." corrupt_time: datetime = db.Column(db.DateTime) "The time at which the file was marked as corrupt." @@ -177,8 +178,11 @@ class CorruptFile(db.Base): "The checksum of the file that was re-computed and found to be incorrect." count: int = db.Column(db.Integer) "The number of times this file has been marked as corrupt." + replacement_requested: bool = db.Column(db.Boolean, default=False) "Whether or not a replacement has been requested for this file." + incoming_transfer_id: int = db.Column(db.Integer) + "The incoming transfer associated with the replacement" @classmethod def new_corrupt_file( @@ -204,8 +208,9 @@ def new_corrupt_file( return CorruptFile( file_name=instance.file.name, - file=instance.file, + file_source=instance.file.source, instance_id=instance.id, + instance_path=instance.path, instance=instance, corrupt_time=datetime.now(timezone.utc), size=size, diff --git a/tests/integration_test/test_send_queue.py b/tests/integration_test/test_send_queue.py index 88e31e8..8808a54 100644 --- a/tests/integration_test/test_send_queue.py +++ b/tests/integration_test/test_send_queue.py @@ -402,6 +402,10 @@ def test_send_from_existing_file_row( for instance in file.instances: assert instance.available == False + # Now see what happens when we corrupt a file and run the appropriate background tasks. + from librarian_background.check_integrity import CheckIntegrity + from librarian_background.corruption_fixer import CorruptionFixer + # Remove the librarians we added. assert mocked_admin_client.remove_librarian(name="live_server")