diff --git a/alembic/versions/1def8c988372_add_librarian_transfer_toggling.py b/alembic/versions/1def8c988372_add_librarian_transfer_toggling.py index 74e1a6c..cf9291e 100644 --- a/alembic/versions/1def8c988372_add_librarian_transfer_toggling.py +++ b/alembic/versions/1def8c988372_add_librarian_transfer_toggling.py @@ -34,10 +34,17 @@ def upgrade(): sa.Column("size", sa.BigInteger(), nullable=False), sa.Column("checksum", sa.String(), nullable=False), sa.Column("count", sa.Integer(), nullable=False), + sa.Column("replacement_requested", sa.Boolean(), nullable=False), sa.PrimaryKeyConstraint("id"), ) + with op.batch_alter_table("outgoing_transfers") as batch_op: + batch_op.alter_column("file_name", nullable=True) + def downgrade(): op.drop_column("librarians", "transfers_enabled") op.drop_table("corrupt_files") + + with op.batch_alter_table("outgoing_transfers") as batch_op: + batch_op.alter_column("file_name", nullable=False) diff --git a/librarian_background/corruption_fixer.py b/librarian_background/corruption_fixer.py new file mode 100644 index 0000000..4db04dd --- /dev/null +++ b/librarian_background/corruption_fixer.py @@ -0,0 +1,139 @@ +""" +A background task that queries the corrupt files table and remedies them. +""" + +from time import perf_counter + +from loguru import logger +from sqlalchemy import select +from sqlalchemy.orm import Session + +from hera_librarian.errors import LibrarianError, LibrarianHTTPError +from hera_librarian.utils import compare_checksums, get_hash_function_from_hash +from librarian_server.database import get_session +from librarian_server.orm.file import CorruptFile, File +from librarian_server.orm.librarian import Librarian + +from .task import Task + + +class CorruptionFixer(Task): + """ + Checks in on corrupt files in the corrupt files table and remedies them. + """ + + def on_call(self): + with get_session() as session: + return self.core(session=session) + + def core(self, session: Session) -> bool: + query_start = perf_counter() + + stmt = ( + select(CorruptFile) + .filter(CorruptFile.replacement_requested != True) + .with_for_update(skip_locked=True) + ) + + results = session.execute(stmt).scalars().all() + + query_end = perf_counter() + + logger.info( + "Took {} s to query for {} corrupt files", + query_end - query_start, + len(results), + ) + + for corrupt in results: + logger.info( + "Attempting to fix {id} ({name})", id=corrupt.id, name=corrupt.file_name + ) + + # Step A: Check that the file is actually corrupt + try: + hash_function = get_hash_function_from_hash(corrupt.file.checksum) + instance = corrupt.instance + store = instance.store + path_info = store.store_manager.path_info( + instance.path, hash_function=hash_function + ) + + if compare_checksums(corrupt.file.checksum, path_info.checksum): + logger.info( + "CorruptFile {id} stated that file {name} was corrupt in instance {inst_id} " + "but we just checked the checksums: {chk_a}=={chk_b} and the file is fine " + "or was fixed behind our back; removing CorruptFile row", + id=corrupt.id, + name=corrupt.file_name, + inst_id=corrupt.instance_id, + chk_a=corrupt.file.checksum, + chk_b=path_info.checksum, + ) + session.delete(corrupt) + session.commit() + continue + except FileNotFoundError: + logger.error( + "Instance {} on store {} is missing, but we will continue with recovery (Instance: {})", + instance.path, + store.name, + instance.id, + ) + + # Ok, so the file _really is corrupt_ or it is missing + + # Remedy A: We have another local copy of the file! + # TODO: Implement this; it is not relevant for SO. + if len(corrupt.file.instances) > 1: + # Uhhh there is more than one instance here, we don't know what to do. + logger.error( + "File {name} has a corrupt instance {id} but there is {n} > 1 " + "instances of the file on this librarian; entered block that was " + "never completed and need manual remedy", + name=corrupt.file_name, + id=corrupt.instance_id, + n=len(corrupt.file.instances), + ) + continue + + # Remedy B: the origin of this file is another librarian. Ask for a new copy. + stmt = select(Librarian).filter_by(name=corrupt.file.source) + result = session.execute(stmt).scalars().one_or_none() + + if result is None: + logger.error( + "File {name} has one and only one corrupt instance {id} but there is no " + "valid librarian matching {lib} in the librarians table so cannot " + "request a new valid copy of the file", + name=corrupt.file_name, + id=corrupt.instance_id, + lib=corrupt.file.source, + ) + continue + + # Use the librarian to ask for a new copy. + result: Librarian + client = result.client() + + try: + client.ping() + except (LibrarianError, LibrarianHTTPError): + logger.error( + "Librarian {lib} is unreachable at the moment, cannot restore file {name}", + lib=result.name, + name=corrupt.file_name, + ) + continue + + # TODO: CALL PREPARE ENDPOINT + + # TODO: Deal with the fact that we would have broken remote instances..? + corrupt.file.delete(session=session, commit=False, force=True) + + # TODO: CALL RE-SEND ENDPOINT; DO NOT COMMIT UNTIL WE HEAR BACK; NOTE THAT WE WILL + # HAVE DELETED THE DATA EVEN IF WE FAIL (THAT IS NON-RECOVERABLE) BUT HAVING + # THE ROWS SIMPLIFIES THE LOGIC ABOVE. + + corrupt.replacement_requested = True + session.commit() diff --git a/librarian_background/send_clone.py b/librarian_background/send_clone.py index 08ecbf6..314d46a 100644 --- a/librarian_background/send_clone.py +++ b/librarian_background/send_clone.py @@ -449,6 +449,92 @@ def handle_existing_file( ) +def send_file_batch( + files: list[File], + librarian: Librarian, + session: Session, + store_preference: str | None = None, +): + client = librarian.client() + + outgoing_transfers, outgoing_information = process_batch( + files=files, + destination=librarian.name, + store_preference=store_preference, + ) + + session.add_all(outgoing_transfers) + session.commit() + + response = use_batch_to_call_librarian( + outgoing_transfers=outgoing_transfers, + outgoing_information=outgoing_information, + client=client, + librarian=librarian, + session=session, + ) + + # We were unable to speak to the librarian, and have had our + # transfers cancelled for us. Time to move on to the next + # batch and hope for the best. + + # Tested outside of the main loop. + if not response: # pragma: no cover + return False + + # Ok, they got out stuff. Need to do two things now: + # - Create the queue send item + # - Update the transfers with their information. + + send, transfer_provider, transfer_map = create_send_queue_item( + response=response, + outgoing_transfers=outgoing_transfers, + librarian=librarian, + session=session, + ) + + # Send is falsey if there was a problem in creating the send + # queue item. In that, case we've failed everything, and should break + # and come back later. + + # Tested outside of the main loop. + if not send: # pragma: no cover + return False + + # Now update the outgoing transfers with their information. + for transfer in outgoing_transfers: + remote_transfer_info: CloneBatchInitiationRequestFileItem = transfer_map.get( + transfer.id, None + ) + + if remote_transfer_info is None: # pragma: no cover + # This is an unreachable state; we already purged these + # scenarios. + logger.error( + "Trying to set parameters of a transfer that should not exist; " + "this should be an unreachable state." + ) + # In this case, the best thing that we can do is fail this individual + # transfer and pick it up later. + transfer.fail_transfer(session=session, commit=False) + + transfer.remote_transfer_id = remote_transfer_info.destination_transfer_id + transfer.transfer_data = transfer_provider + transfer.send_queue = send + transfer.send_queue_id = send.id + transfer.source_path = str(transfer.instance.path) + transfer.dest_path = str(remote_transfer_info.staging_location) + + session.commit() + + # Finally, call up the destination again and tell them everything is on its + # way. + + call_destination_and_state_ongoing(send=send, session=session) + + return True + + class SendClone(Task): """ Launches clones of files to a remote librarian. @@ -592,81 +678,14 @@ def core(self, session: Session): files_tried += this_batch_size - outgoing_transfers, outgoing_information = process_batch( + success = send_file_batch( files=files_to_try, - destination=self.destination_librarian, - store_preference=self.store_preference, - ) - - session.add_all(outgoing_transfers) - session.commit() - - response = use_batch_to_call_librarian( - outgoing_transfers=outgoing_transfers, - outgoing_information=outgoing_information, - client=client, - librarian=librarian, - session=session, - ) - - # We were unable to speak to the librarian, and have had our - # transfers cancelled for us. Time to move on to the next - # batch and hope for the best. - - # Tested outside of the main loop. - if not response: # pragma: no cover - continue - - # Ok, they got out stuff. Need to do two things now: - # - Create the queue send item - # - Update the transfers with their information. - - send, transfer_provider, transfer_map = create_send_queue_item( - response=response, - outgoing_transfers=outgoing_transfers, librarian=librarian, session=session, + store_preference=self.store_preference, ) - # Send is falsey if there was a problem in creating the send - # queue item. In that, case we've failed everything, and should break - # and come back later. - - # Tested outside of the main loop. - if not send: # pragma: no cover + if not success: break - # Now update the outgoing transfers with their information. - for transfer in outgoing_transfers: - remote_transfer_info: CloneBatchInitiationRequestFileItem = ( - transfer_map.get(transfer.id, None) - ) - - if remote_transfer_info is None: # pragma: no cover - # This is an unreachable state; we already purged these - # scenarios. - logger.error( - "Trying to set parameters of a transfer that should not exist; " - "this should be an unreachable state." - ) - # In this case, the best thing that we can do is fail this individual - # transfer and pick it up later. - transfer.fail_transfer(session=session, commit=False) - - transfer.remote_transfer_id = ( - remote_transfer_info.destination_transfer_id - ) - transfer.transfer_data = transfer_provider - transfer.send_queue = send - transfer.send_queue_id = send.id - transfer.source_path = str(transfer.instance.path) - transfer.dest_path = str(remote_transfer_info.staging_location) - - session.commit() - - # Finally, call up the destination again and tell them everything is on its - # way. - - call_destination_and_state_ongoing(send=send, session=session) - return diff --git a/librarian_server/api/corrupt.py b/librarian_server/api/corrupt.py new file mode 100644 index 0000000..25d5df0 --- /dev/null +++ b/librarian_server/api/corrupt.py @@ -0,0 +1,198 @@ +""" +API Endpoints for the upstream half of the corrupt files workflow. +""" + +from fastapi import APIRouter, Depends + +from hera_librarian.utils import get_hash_function_from_hash + +router = APIRouter(prefix="/api/v2/corrupt") + +from loguru import logger +from pydantic import BaseModel + +from ..database import yield_session +from .auth import CallbackUserDependency, ReadappendUserDependency + + +class CorruptionPreparationRequest(BaseModel): + file_name: str + librarian_name: str + + +class CorruptionPreparationResponse(BaseModel): + ready: bool + + +def user_and_librarian_validation_flow( + user, librarian_name, file_name +) -> tuple[Librarian, File, Instance, list[RemoteInstance]]: + user_is_librarian = user.username == librarian_name + + stmt = select(Librarian).filter_by(name=request.librarian_name) + librarian = session.execute(stmt).scalars().one_or_none() + + librarian_exists = librarian is not None + + stmt = select(RemoteInstance).filter_by( + file_name=request.file_name, librarian_id=librarian.id + ) + remote_instances = session.execute(stmt).scalars().all() + + remote_instance_registered_at_destination = bool(remote_instances) + + if not ( + remote_instance_registered_at_destination + and user_is_librarian + and librarian_exists + ): + # 401 + pass + + # So at this point we know: + # Downstream is the one asking for the new copy + # We sent them a copy that we confirmed + + # Check our own instance of the file to make sure it's not corrupted. + stmt = select(File).filter_by(file_name=request.file_name) + file = session.execute(stmt).scalars().one_or_none() + + try: + best_instance = [x for x in file.instances if x.available][0] + except IndexError: + # 400 + return + + hash_function = get_hash_function_from_hash(file.checksum) + path_info = best_instance.store.path_info( + best_instance.path, hash_function=hash_function + ) + + if not compare_checksums(file.checksum, path_info.checksum): + # Brother not this shit again + # 400 + # Add to corrupt files table + # Extremely unlikely + return + + # We know we have a valid copy of the file ready to go. + + from librarian_background import background_settings + + if not ( + background_settings.consume_queue + and background_settings.check_consumed_queue + and librarian.transfers_enabled + ): + # 400 we can't send anything! + return + + # Do we have login details for your librarian? + try: + librarian.client().ping(require_login=True) + except (LibrarianError, LibrarianHTTPError): + # Urrr we can't login no good + return + + return librarian, file, best_instance, remote_instances + + +@router.post("/prepare") +def prepare( + request: CorruptionPreparationRequest, + user: CallbackUserDependency, + session: Session = Depends(yield_session), +) -> CorruptionPreparationResponse: + """ + Prepare for a request to re-instate a downstream file. This checks: + + a) We can contact the downstream + b) We have a valid copy of the file + c) We have a send queue background task that will actually send the file. + + Possible response codes: + + 400 - We do not have a valid copy of the file either! + -> You are out of luck. Maybe try again later as we might restore from + a librarian above us in the chain? + 401 - You are asking about a file that was not sent to your librarian + -> Leave me alone! + 200 - Ready to send + -> Success! + """ + + logger.info( + "Recieved corruption remedy request for {} from {}", + request.file_name, + user.username, + ) + + user_and_librarian_validation_flow( + user, librarian_name=request.librarian_name, file_name=request.file_name + ) + + return CorruptionPreparationResponse(ready=True) + + +class CorruptionResendRequest(BaseModel): + librarian_name: str + file_name: str + + +class CorruptionResendResponse(BaseModel): + success: bool + + +@router.post("/resend") +def resend( + request: CorruptionResendRequest, + user: CallbackUserDependency, + session: Session, +) -> CorruptionResendResponse: + """ + Actually send a new copy of a file that we know you already have! We assume that + you deleted it before you called this endpoint, and that you called the prepare + endpoint to make sure we're all good to go first. We will: + + a) Delete our RemoteInstance(s) for this file on your librarian + b) Create an OutgoingTransfer and SendQueue + + This transfer will then take place asynchronously through your usual mechanisms. + You _must_ have a recieve clone task running on your librarian otherwise you won't + have the new file ingested. + + Possible response codes: + + 400 - We don't have a valid copy of the file. + 201 - We created the transfer + -> Success! + """ + + logger.info( + "Recieved corruption resend request for {} from {}", + request.file_name, + user.username, + ) + + librarian, file, instance, remote_instances = user_and_librarian_validation_flow( + user, librarian_name=request.librarian_name, file_name=request.file_name + ) + + from librarian_background.create_clone import send_file_batch + + success = send_file_batch(files=[file], librarian=librarian, session=session) + + if success: + logger.info( + "Successfully created send queue item to remedy corrupt data in {}", + request.file_name, + ) + session.delete(remote_instances) + session.commit() + else: + logger.info( + "Error creating send queue item to remedy corrupt data in {}", + request.file_name, + ) + + return CorruptionResendResponse(success=success) diff --git a/librarian_server/orm/file.py b/librarian_server/orm/file.py index a7d04e5..5f23aba 100644 --- a/librarian_server/orm/file.py +++ b/librarian_server/orm/file.py @@ -177,6 +177,8 @@ class CorruptFile(db.Base): "The checksum of the file that was re-computed and found to be incorrect." count: int = db.Column(db.Integer) "The number of times this file has been marked as corrupt." + replacement_requested: bool = db.Column(db.Boolean, default=False) + "Whether or not a replacement has been requested for this file." @classmethod def new_corrupt_file( diff --git a/librarian_server/orm/instance.py b/librarian_server/orm/instance.py index 8ee0240..18638b5 100644 --- a/librarian_server/orm/instance.py +++ b/librarian_server/orm/instance.py @@ -135,8 +135,8 @@ class RemoteInstance(db.Base): id = db.Column(db.Integer, primary_key=True, autoincrement=True, unique=True) "The unique ID of this instance." - file_name = db.Column(db.String(256), db.ForeignKey("files.name"), nullable=False) - "Name of the file this instance references." + file_name = db.Column(db.String(256), db.ForeignKey("files.name"), nullable=True) + "Name of the file this instance references; note this is NOT a foreign key" file = db.relationship( "File", back_populates="remote_instances",