Skip to content

Commit

Permalink
First sketches of whole system
Browse files Browse the repository at this point in the history
  • Loading branch information
JBorrow committed Dec 12, 2024
1 parent 89f5d8d commit ce5ebf9
Show file tree
Hide file tree
Showing 6 changed files with 437 additions and 72 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,17 @@ def upgrade():
sa.Column("size", sa.BigInteger(), nullable=False),
sa.Column("checksum", sa.String(), nullable=False),
sa.Column("count", sa.Integer(), nullable=False),
sa.Column("replacement_requested", sa.Boolean(), nullable=False),
sa.PrimaryKeyConstraint("id"),
)

with op.batch_alter_table("outgoing_transfers") as batch_op:
batch_op.alter_column("file_name", nullable=True)


def downgrade():
op.drop_column("librarians", "transfers_enabled")
op.drop_table("corrupt_files")

with op.batch_alter_table("outgoing_transfers") as batch_op:
batch_op.alter_column("file_name", nullable=False)
139 changes: 139 additions & 0 deletions librarian_background/corruption_fixer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
"""
A background task that queries the corrupt files table and remedies them.
"""

from time import perf_counter

from loguru import logger
from sqlalchemy import select
from sqlalchemy.orm import Session

from hera_librarian.errors import LibrarianError, LibrarianHTTPError
from hera_librarian.utils import compare_checksums, get_hash_function_from_hash
from librarian_server.database import get_session
from librarian_server.orm.file import CorruptFile, File
from librarian_server.orm.librarian import Librarian

from .task import Task


class CorruptionFixer(Task):
"""
Checks in on corrupt files in the corrupt files table and remedies them.
"""

def on_call(self):
with get_session() as session:
return self.core(session=session)

def core(self, session: Session) -> bool:
query_start = perf_counter()

stmt = (
select(CorruptFile)
.filter(CorruptFile.replacement_requested != True)
.with_for_update(skip_locked=True)
)

results = session.execute(stmt).scalars().all()

query_end = perf_counter()

logger.info(
"Took {} s to query for {} corrupt files",
query_end - query_start,
len(results),
)

for corrupt in results:
logger.info(
"Attempting to fix {id} ({name})", id=corrupt.id, name=corrupt.file_name
)

# Step A: Check that the file is actually corrupt
try:
hash_function = get_hash_function_from_hash(corrupt.file.checksum)
instance = corrupt.instance
store = instance.store
path_info = store.store_manager.path_info(
instance.path, hash_function=hash_function
)

if compare_checksums(corrupt.file.checksum, path_info.checksum):
logger.info(
"CorruptFile {id} stated that file {name} was corrupt in instance {inst_id} "
"but we just checked the checksums: {chk_a}=={chk_b} and the file is fine "
"or was fixed behind our back; removing CorruptFile row",
id=corrupt.id,
name=corrupt.file_name,
inst_id=corrupt.instance_id,
chk_a=corrupt.file.checksum,
chk_b=path_info.checksum,
)
session.delete(corrupt)
session.commit()
continue
except FileNotFoundError:
logger.error(
"Instance {} on store {} is missing, but we will continue with recovery (Instance: {})",
instance.path,
store.name,
instance.id,
)

# Ok, so the file _really is corrupt_ or it is missing

# Remedy A: We have another local copy of the file!
# TODO: Implement this; it is not relevant for SO.
if len(corrupt.file.instances) > 1:
# Uhhh there is more than one instance here, we don't know what to do.
logger.error(
"File {name} has a corrupt instance {id} but there is {n} > 1 "
"instances of the file on this librarian; entered block that was "
"never completed and need manual remedy",
name=corrupt.file_name,
id=corrupt.instance_id,
n=len(corrupt.file.instances),
)
continue

# Remedy B: the origin of this file is another librarian. Ask for a new copy.
stmt = select(Librarian).filter_by(name=corrupt.file.source)
result = session.execute(stmt).scalars().one_or_none()

if result is None:
logger.error(
"File {name} has one and only one corrupt instance {id} but there is no "
"valid librarian matching {lib} in the librarians table so cannot "
"request a new valid copy of the file",
name=corrupt.file_name,
id=corrupt.instance_id,
lib=corrupt.file.source,
)
continue

# Use the librarian to ask for a new copy.
result: Librarian
client = result.client()

try:
client.ping()
except (LibrarianError, LibrarianHTTPError):
logger.error(
"Librarian {lib} is unreachable at the moment, cannot restore file {name}",
lib=result.name,
name=corrupt.file_name,
)
continue

# TODO: CALL PREPARE ENDPOINT

# TODO: Deal with the fact that we would have broken remote instances..?
corrupt.file.delete(session=session, commit=False, force=True)

# TODO: CALL RE-SEND ENDPOINT; DO NOT COMMIT UNTIL WE HEAR BACK; NOTE THAT WE WILL
# HAVE DELETED THE DATA EVEN IF WE FAIL (THAT IS NON-RECOVERABLE) BUT HAVING
# THE ROWS SIMPLIFIES THE LOGIC ABOVE.

corrupt.replacement_requested = True
session.commit()
159 changes: 89 additions & 70 deletions librarian_background/send_clone.py
Original file line number Diff line number Diff line change
Expand Up @@ -449,6 +449,92 @@ def handle_existing_file(
)


def send_file_batch(
files: list[File],
librarian: Librarian,
session: Session,
store_preference: str | None = None,
):
client = librarian.client()

outgoing_transfers, outgoing_information = process_batch(
files=files,
destination=librarian.name,
store_preference=store_preference,
)

session.add_all(outgoing_transfers)
session.commit()

response = use_batch_to_call_librarian(
outgoing_transfers=outgoing_transfers,
outgoing_information=outgoing_information,
client=client,
librarian=librarian,
session=session,
)

# We were unable to speak to the librarian, and have had our
# transfers cancelled for us. Time to move on to the next
# batch and hope for the best.

# Tested outside of the main loop.
if not response: # pragma: no cover
return False

# Ok, they got out stuff. Need to do two things now:
# - Create the queue send item
# - Update the transfers with their information.

send, transfer_provider, transfer_map = create_send_queue_item(
response=response,
outgoing_transfers=outgoing_transfers,
librarian=librarian,
session=session,
)

# Send is falsey if there was a problem in creating the send
# queue item. In that, case we've failed everything, and should break
# and come back later.

# Tested outside of the main loop.
if not send: # pragma: no cover
return False

# Now update the outgoing transfers with their information.
for transfer in outgoing_transfers:
remote_transfer_info: CloneBatchInitiationRequestFileItem = transfer_map.get(
transfer.id, None
)

if remote_transfer_info is None: # pragma: no cover
# This is an unreachable state; we already purged these
# scenarios.
logger.error(
"Trying to set parameters of a transfer that should not exist; "
"this should be an unreachable state."
)
# In this case, the best thing that we can do is fail this individual
# transfer and pick it up later.
transfer.fail_transfer(session=session, commit=False)

transfer.remote_transfer_id = remote_transfer_info.destination_transfer_id
transfer.transfer_data = transfer_provider
transfer.send_queue = send
transfer.send_queue_id = send.id
transfer.source_path = str(transfer.instance.path)
transfer.dest_path = str(remote_transfer_info.staging_location)

session.commit()

# Finally, call up the destination again and tell them everything is on its
# way.

call_destination_and_state_ongoing(send=send, session=session)

return True


class SendClone(Task):
"""
Launches clones of files to a remote librarian.
Expand Down Expand Up @@ -592,81 +678,14 @@ def core(self, session: Session):

files_tried += this_batch_size

outgoing_transfers, outgoing_information = process_batch(
success = send_file_batch(
files=files_to_try,
destination=self.destination_librarian,
store_preference=self.store_preference,
)

session.add_all(outgoing_transfers)
session.commit()

response = use_batch_to_call_librarian(
outgoing_transfers=outgoing_transfers,
outgoing_information=outgoing_information,
client=client,
librarian=librarian,
session=session,
)

# We were unable to speak to the librarian, and have had our
# transfers cancelled for us. Time to move on to the next
# batch and hope for the best.

# Tested outside of the main loop.
if not response: # pragma: no cover
continue

# Ok, they got out stuff. Need to do two things now:
# - Create the queue send item
# - Update the transfers with their information.

send, transfer_provider, transfer_map = create_send_queue_item(
response=response,
outgoing_transfers=outgoing_transfers,
librarian=librarian,
session=session,
store_preference=self.store_preference,
)

# Send is falsey if there was a problem in creating the send
# queue item. In that, case we've failed everything, and should break
# and come back later.

# Tested outside of the main loop.
if not send: # pragma: no cover
if not success:
break

# Now update the outgoing transfers with their information.
for transfer in outgoing_transfers:
remote_transfer_info: CloneBatchInitiationRequestFileItem = (
transfer_map.get(transfer.id, None)
)

if remote_transfer_info is None: # pragma: no cover
# This is an unreachable state; we already purged these
# scenarios.
logger.error(
"Trying to set parameters of a transfer that should not exist; "
"this should be an unreachable state."
)
# In this case, the best thing that we can do is fail this individual
# transfer and pick it up later.
transfer.fail_transfer(session=session, commit=False)

transfer.remote_transfer_id = (
remote_transfer_info.destination_transfer_id
)
transfer.transfer_data = transfer_provider
transfer.send_queue = send
transfer.send_queue_id = send.id
transfer.source_path = str(transfer.instance.path)
transfer.dest_path = str(remote_transfer_info.staging_location)

session.commit()

# Finally, call up the destination again and tell them everything is on its
# way.

call_destination_and_state_ongoing(send=send, session=session)

return
Loading

0 comments on commit ce5ebf9

Please sign in to comment.