diff --git a/Makefile b/Makefile index 0adc7fb..e44f2fe 100644 --- a/Makefile +++ b/Makefile @@ -66,17 +66,22 @@ shoots/%.failed: src/compile_failure_list.py # True (successfully transferred) or False (not successfully transferred) # against each shoot %.transfer_status: % - cat $< | python client/compile_pending_list.py $* > $@ + cat $< | AWS_PROFILE=${SOURCE_PROFILE} python client/compile_pending_list.py $* > $@ # Compile lists for retrying: # Some things may have failed in the target system # These are s3 keys that can be passed through the 'touched' target -%.transfer_status.touchable: %.transfer_status - grep False $< | sed 's/,.*//' | python client/touchable.py production > $@ +%.touchable: % + grep False $< | sed 's/,.*//' | AWS_PROFILE=${TARGET_PROFILE} python client/touchable.py production > $@ # Others may have failed to transfer (or have been deleted from the target bucket due to expiry) # These are shoot identifiers that need to go back through the whole system again -%.transfer_status.needs_transfer: %.transfer_status - grep False $< | sed 's/,.*//' | python client/untouchable.py production | sed 's/.*2754_//' | sed 's/\.zip//' > $@ +%.needs_transfer: % + grep False $< | sed 's/,.*//' | AWS_PROFILE=${TARGET_PROFILE} python client/untouchable.py production | sed 's/.*2754_//' | sed 's/\.zip//' > $@ +%.queued_for_touch.production: %.transfer_status.touchable + cat $< | AWS_PROFILE=${TARGET_PROFILE} python client/queue_touches.py production + +%.queued_for_transfer.production: %.transfer_status.needs_transfer + cat $< | AWS_PROFILE=${SOURCE_PROFILE} python client/start_restores.py production diff --git a/client/touchable.py b/client/touchable.py index a3f270f..fa59f31 100644 --- a/client/touchable.py +++ b/client/touchable.py @@ -1,9 +1,61 @@ import boto3 import sys +import os +from compile_pending_list import find_shoots_query, get_identifiers + +from reporting_client import get_es_client + +BUCKETS = { + "staging": "wellcomecollection-archivematica-staging-transfer-source", + "production": "wellcomecollection-archivematica-transfer-source" +} + +def get_failed_subshoots(session, subshoots): + """ + Given a list of shoots/subshoots, (s3 keys), + return any that have not successfully been ingested. + """ + es = get_es_client(session) + # make it a list, in case it's a generator. We need to go over it more than once. + subshoots = list(subshoots) + # subshoots is a list of S3 Keys - e.g. born-digital-accessions/2754_CP000200.zip + # In order to query the storage_ingests database, we need the base name of the + # file - e.g. 2754_CP000200. + # so knock off the leading/path/ and the .suffix + ids = [s[:-4].rpartition('/')[2] for s in subshoots] + + response = es.search( + index="storage_ingests", + size=1000, + query=find_shoots_query(ids), + source=False, + fields=["bag.info.externalIdentifier", "lastModifiedDate"] + ) + succeeded = get_identifiers(response["hits"]["hits"]) + for pair in zip(subshoots, ids): + if pair[1] not in succeeded: + yield pair[0] + + +def find_objects(session, bucket, object_keys): + """ + Look in a bucket to find all the zip objects corresponding to + a list of shoot numbers (e.g. 2754_CP000200). + + These objects may be named with the shoot number alone, e.g. 2754_CP000200.zip + or may be part of a broken-up shoot with a suffix, e.g. 2754_CP000200_001.zip + """ + bucket = session.resource('s3').Bucket(bucket) + for shoot_id in object_keys: + prefix = f"born-digital-accessions/{shoot_id.strip()}" + for obj in bucket.objects.filter(Prefix=prefix): + if obj.key[-4:] == ".zip": + yield obj.key -from objects_on_target import find_objects, BUCKETS if __name__ == '__main__': + objects = find_objects(boto3.Session(profile_name="digitisation-developer"), BUCKETS[sys.argv[1]], sys.stdin.readlines()) print("\n".join( - find_objects(boto3.Session(), BUCKETS[sys.argv[1]], sys.stdin.readlines(), True) + get_failed_subshoots( + boto3.Session(profile_name="platform-developer"), objects) )) diff --git a/client/untouchable.py b/client/untouchable.py index 84a5e32..38b81e7 100644 --- a/client/untouchable.py +++ b/client/untouchable.py @@ -1,9 +1,30 @@ +""" +Given a list of shoot numbers, return any of them that have not been transferred + +This script assumes that the presence of any part of the shoot on the target system +implies that it has been transferred. i.e. that if a shoot is a large one that needs to be +broken apart, then it does not check whether all parts have been successfully transferred. +""" + import boto3 import sys -from objects_on_target import find_objects, BUCKETS +BUCKETS = { + "staging": "wellcomecollection-archivematica-staging-transfer-source", + "production": "wellcomecollection-archivematica-transfer-source" +} + +def find_missing_objects(session, bucket, object_keys): + bucket = session.resource('s3').Bucket(bucket) + for shoot_id in object_keys: + prefix = f"born-digital-accessions/{shoot_id.strip()}" + try: + next(iter(bucket.objects.filter(Prefix=prefix))) + except StopIteration: + yield shoot_id.strip() + if __name__ == '__main__': print("\n".join( - find_objects(boto3.Session(), BUCKETS[sys.argv[1]], sys.stdin.readlines(), False) + find_missing_objects(boto3.Session(), BUCKETS[sys.argv[1]], sys.stdin.readlines()) )) diff --git a/src/transferrer/download.py b/src/transferrer/download.py index b4949e1..0adabe3 100644 --- a/src/transferrer/download.py +++ b/src/transferrer/download.py @@ -12,7 +12,7 @@ # This variable governs the degree of parallelism to use when downloading files. # The correct number is to be discovered by experimentation -THREADS = 10 +THREADS = 20 def get_shoot_batches(bucket, s3_folder, max_batch_bytes): @@ -41,18 +41,18 @@ def list_folder_objects(bucket, s3_folder): return (obj for obj in bucket.objects.filter(Prefix=s3_folder) if should_download_file(obj.key)) -def download_shoot(session: boto3.session.Session, shoot_number, local_dir, max_batch_bytes): +def download_shoot(session: boto3.session.Session, shoot_number, local_dir, max_batch_bytes, ignore_suffixes): # Allowing enough connections for each thread to have two of them # prevents the `urllib3.connectionpool:Connection pool is full` warning # and allows for better connection reuse. - return download_shoot_folder(get_source_bucket(session, max_connections=THREADS * 2), shoot_number, local_dir, max_batch_bytes) + return download_shoot_folder(get_source_bucket(session, max_connections=THREADS * 5), shoot_number, local_dir, max_batch_bytes, ignore_suffixes) -def download_shoot_folder(bucket, shoot_number, local_dir, max_batch_bytes): - return download_s3_folder(bucket, shoot_number_to_folder_path(shoot_number), local_dir, max_batch_bytes) +def download_shoot_folder(bucket, shoot_number, local_dir, max_batch_bytes, ignore_suffixes): + return download_s3_folder(bucket, shoot_number_to_folder_path(shoot_number), local_dir, max_batch_bytes, ignore_suffixes) -def download_s3_folder(bucket, s3_folder: str, local_dir: str, max_batch_bytes: int): +def download_s3_folder(bucket, s3_folder: str, local_dir: str, max_batch_bytes: int, ignore_suffixes): """ Download the relevant content from an s3 folder to local_dir. @@ -67,12 +67,15 @@ def download_s3_folder(bucket, s3_folder: str, local_dir: str, max_batch_bytes: os.makedirs(local_dir, exist_ok=True) if use_suffix: suffix = f"_{ix:03d}" - with ThreadPoolExecutor(max_workers=THREADS) as executor: - files = list(executor.map( - partial(download_s3_file, local_dir=local_dir, s3_folder=s3_folder), - batch - )) - yield files, suffix + if suffix in ignore_suffixes: + logger.info(f"{suffix} already on target, ignoring") + else: + with ThreadPoolExecutor(max_workers=THREADS) as executor: + files = list(executor.map( + partial(download_s3_file, local_dir=local_dir, s3_folder=s3_folder), + batch + )) + yield files, suffix def download_s3_file(object_summary, *, local_dir: str, s3_folder: str): diff --git a/src/transferrer/transfer.py b/src/transferrer/transfer.py index d827068..f2ebc8f 100644 --- a/src/transferrer/transfer.py +++ b/src/transferrer/transfer.py @@ -2,18 +2,18 @@ import tempfile from transferrer.download import download_shoot from transferrer.make_zip import make_zip_from -from transferrer.upload import upload +from transferrer.upload import upload, get_target_bucket import boto3 +import re +re_extract_suffix = re.compile('(_\\d\\d\\d)\\.zip') MAX_SPACE_BYTES = os.getenv('MAX_SPACE_BYTES', 10240000000) # maximum setting for Lambda Ephemeral Storage - - +TARGET_BUCKET = os.getenv("TARGET_BUCKET") def shoot_number_to_accession_id(accession_number, shoot_number): """ The accession id is simply the shoot_number prefixed with the accession number. - >>> shoot_number_to_accession_id("2754", "CP000159") '2754_CP000159' """ @@ -21,7 +21,7 @@ def shoot_number_to_accession_id(accession_number, shoot_number): return accession_number + "_" + shoot_number else: raise ValueError( - f"misssing accession or shoot number - accession: '{accession_number}' shoot: '{shoot_number}'" + f"missing accession or shoot number - accession: '{accession_number}' shoot: '{shoot_number}'" ) @@ -34,7 +34,8 @@ def transfer_shoot(from_session, to_session, shoot_number, accession_number, max tmpfolder = root_dir.name source_folder = os.path.join(tmpfolder, "source") target_folder = os.path.join(tmpfolder, "target") - for files, suffix in download_shoot(from_session, shoot_number, source_folder, max_batch_bytes): + already_up = [match.group(1) for match in (re_extract_suffix.search(o.key) for o in get_target_bucket(to_session, TARGET_BUCKET).filter(Prefix=f"born-digital-accessions/{accession_id}")) if match] + for files, suffix in download_shoot(from_session, shoot_number, source_folder, max_batch_bytes, ignore_suffixes=already_up): upload( to_session, make_zip_from(files, source_folder, target_folder, accession_id, suffix) diff --git a/terraform/main.tf b/terraform/main.tf index c4d7eee..1d890f0 100644 --- a/terraform/main.tf +++ b/terraform/main.tf @@ -45,6 +45,7 @@ module "restorer_lambda" { module "toucher_lambda" { source = "./modules/toucher_lambda" + environment = "production" lambda_zip = data.archive_file.toucher_zip providers = { diff --git a/terraform/modules/transfer_throttle/main.tf b/terraform/modules/transfer_throttle/main.tf index 64b5808..0a5cb14 100644 --- a/terraform/modules/transfer_throttle/main.tf +++ b/terraform/modules/transfer_throttle/main.tf @@ -41,7 +41,7 @@ module "transfer_throttle_lambda" { module "transfer_scheduler" { source = "../lambda_scheduler" cron = "cron(30 7,9,11,13,15,16 ? * MON-FRI *)" - description = "Moves batches of shoots to the transferer at a rate Archivematica can handle" + description = "Moves batches of shoots to the transferrer at a rate Archivematica can handle" lambda_arn = module.transfer_throttle_lambda.lambda.arn lambda_function_name = module.transfer_throttle_lambda.lambda.function_name name = "transfer_throttle" diff --git a/terraform/modules/transferrer_lambda/main.tf b/terraform/modules/transferrer_lambda/main.tf index 80f5c89..1ec85af 100644 --- a/terraform/modules/transferrer_lambda/main.tf +++ b/terraform/modules/transferrer_lambda/main.tf @@ -48,6 +48,22 @@ resource "aws_iam_role_policy" "write_to_archivematica_transfer_source" { ) } +resource "aws_iam_role_policy" "list_archivematica_transfer_source" { + role = module.transfer_lambda.lambda_role.name + name = "list_archivematica_transfer_source-${var.environment}" + policy = jsonencode({ + Version = "2012-10-17" + Statement = [ + { + "Effect": "Allow", + "Action": "s3:ListBucket", + "Resource": "arn:aws:s3:::${local.target_bucket}" + }, + ] + } + ) +} + resource "aws_iam_role_policy" "read_from_editorial_photography" { role = module.transfer_lambda.lambda_role.name name = "read_from_editorial_photography-${var.environment}"