Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Better retries #39

Open
wants to merge 30 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
3772ee6
add failure list compilation
paul-butcher Aug 7, 2024
3a45e6e
add es to requirements
paul-butcher Aug 7, 2024
1da6ee2
polish up pending/failure lists
paul-butcher Aug 9, 2024
8c8685c
polish up pending list check
paul-butcher Aug 9, 2024
f817087
move failure/pending lists
paul-butcher Sep 10, 2024
e1418c7
improve structure
paul-butcher Oct 2, 2024
dcfa212
Merge branch 'main' into failure-list-2
paul-butcher Oct 2, 2024
248effc
end to end scheduling
paul-butcher Oct 4, 2024
d04f7af
tidy
paul-butcher Oct 7, 2024
0ddc855
tidy
paul-butcher Oct 7, 2024
a740c38
improve commentary
paul-butcher Oct 7, 2024
f51eda5
remove broken diagram
paul-butcher Oct 7, 2024
3ab1a39
fix diagram
paul-butcher Oct 7, 2024
2cfb02a
tidy
paul-butcher Oct 7, 2024
20b7e1c
print -> log
paul-butcher Oct 7, 2024
1f5ea69
help GH find the client-only code
paul-butcher Oct 7, 2024
f2c5ec7
help GH find the client-only code
paul-butcher Oct 7, 2024
c5087e4
divide large shoots by maximum total size
paul-butcher Oct 10, 2024
e6d12f8
whoops, CLI
paul-butcher Oct 10, 2024
f2a0434
shuffle things about a bit
paul-butcher Oct 10, 2024
2b6038d
improve pending/failure reports
paul-butcher Oct 16, 2024
1490398
ignore partial shoots that are already on the target
paul-butcher Oct 22, 2024
0815e50
Merge branch 'improve-report' into huge-shoots
paul-butcher Oct 23, 2024
34a7629
improve retry/touch detection
paul-butcher Nov 20, 2024
c94a606
whoops merge
paul-butcher Nov 20, 2024
7bd28a6
whoops merge
paul-butcher Nov 20, 2024
a6005ee
whoops merge
paul-butcher Nov 20, 2024
24c8b4b
improve commentary
paul-butcher Nov 20, 2024
f137fbc
improve commentary
paul-butcher Nov 20, 2024
fb2979f
tidy up make
paul-butcher Nov 20, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 10 additions & 5 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -66,17 +66,22 @@ shoots/%.failed: src/compile_failure_list.py
# True (successfully transferred) or False (not successfully transferred)
# against each shoot
%.transfer_status: %
cat $< | python client/compile_pending_list.py $* > $@
cat $< | AWS_PROFILE=${SOURCE_PROFILE} python client/compile_pending_list.py $* > $@

# Compile lists for retrying:

# Some things may have failed in the target system
# These are s3 keys that can be passed through the 'touched' target
%.transfer_status.touchable: %.transfer_status
grep False $< | sed 's/,.*//' | python client/touchable.py production > $@
%.touchable: %
grep False $< | sed 's/,.*//' | AWS_PROFILE=${TARGET_PROFILE} python client/touchable.py production > $@

# Others may have failed to transfer (or have been deleted from the target bucket due to expiry)
# These are shoot identifiers that need to go back through the whole system again
%.transfer_status.needs_transfer: %.transfer_status
grep False $< | sed 's/,.*//' | python client/untouchable.py production | sed 's/.*2754_//' | sed 's/\.zip//' > $@
%.needs_transfer: %
grep False $< | sed 's/,.*//' | AWS_PROFILE=${TARGET_PROFILE} python client/untouchable.py production | sed 's/.*2754_//' | sed 's/\.zip//' > $@

%.queued_for_touch.production: %.transfer_status.touchable
cat $< | AWS_PROFILE=${TARGET_PROFILE} python client/queue_touches.py production

%.queued_for_transfer.production: %.transfer_status.needs_transfer
cat $< | AWS_PROFILE=${SOURCE_PROFILE} python client/start_restores.py production
56 changes: 54 additions & 2 deletions client/touchable.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,61 @@
import boto3
import sys
import os
from compile_pending_list import find_shoots_query, get_identifiers

from reporting_client import get_es_client

BUCKETS = {
"staging": "wellcomecollection-archivematica-staging-transfer-source",
"production": "wellcomecollection-archivematica-transfer-source"
}

def get_failed_subshoots(session, subshoots):
"""
Given a list of shoots/subshoots, (s3 keys),
return any that have not successfully been ingested.
"""
es = get_es_client(session)
# make it a list, in case it's a generator. We need to go over it more than once.
subshoots = list(subshoots)
# subshoots is a list of S3 Keys - e.g. born-digital-accessions/2754_CP000200.zip
# In order to query the storage_ingests database, we need the base name of the
# file - e.g. 2754_CP000200.
# so knock off the leading/path/ and the .suffix
ids = [s[:-4].rpartition('/')[2] for s in subshoots]

response = es.search(
index="storage_ingests",
size=1000,
query=find_shoots_query(ids),
source=False,
fields=["bag.info.externalIdentifier", "lastModifiedDate"]
)
succeeded = get_identifiers(response["hits"]["hits"])
for pair in zip(subshoots, ids):
if pair[1] not in succeeded:
yield pair[0]


def find_objects(session, bucket, object_keys):
"""
Look in a bucket to find all the zip objects corresponding to
a list of shoot numbers (e.g. 2754_CP000200).

These objects may be named with the shoot number alone, e.g. 2754_CP000200.zip
or may be part of a broken-up shoot with a suffix, e.g. 2754_CP000200_001.zip
"""
bucket = session.resource('s3').Bucket(bucket)
for shoot_id in object_keys:
prefix = f"born-digital-accessions/{shoot_id.strip()}"
for obj in bucket.objects.filter(Prefix=prefix):
if obj.key[-4:] == ".zip":
yield obj.key

from objects_on_target import find_objects, BUCKETS

if __name__ == '__main__':
objects = find_objects(boto3.Session(profile_name="digitisation-developer"), BUCKETS[sys.argv[1]], sys.stdin.readlines())
print("\n".join(
find_objects(boto3.Session(), BUCKETS[sys.argv[1]], sys.stdin.readlines(), True)
get_failed_subshoots(
boto3.Session(profile_name="platform-developer"), objects)
))
25 changes: 23 additions & 2 deletions client/untouchable.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,30 @@
"""
Given a list of shoot numbers, return any of them that have not been transferred

This script assumes that the presence of any part of the shoot on the target system
implies that it has been transferred. i.e. that if a shoot is a large one that needs to be
broken apart, then it does not check whether all parts have been successfully transferred.
"""

import boto3
import sys

from objects_on_target import find_objects, BUCKETS
BUCKETS = {
"staging": "wellcomecollection-archivematica-staging-transfer-source",
"production": "wellcomecollection-archivematica-transfer-source"
}

def find_missing_objects(session, bucket, object_keys):
bucket = session.resource('s3').Bucket(bucket)
for shoot_id in object_keys:
prefix = f"born-digital-accessions/{shoot_id.strip()}"
try:
next(iter(bucket.objects.filter(Prefix=prefix)))
except StopIteration:
yield shoot_id.strip()


if __name__ == '__main__':
print("\n".join(
find_objects(boto3.Session(), BUCKETS[sys.argv[1]], sys.stdin.readlines(), False)
find_missing_objects(boto3.Session(), BUCKETS[sys.argv[1]], sys.stdin.readlines())
))
27 changes: 15 additions & 12 deletions src/transferrer/download.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@

# This variable governs the degree of parallelism to use when downloading files.
# The correct number is to be discovered by experimentation
THREADS = 10
THREADS = 20


def get_shoot_batches(bucket, s3_folder, max_batch_bytes):
Expand Down Expand Up @@ -41,18 +41,18 @@ def list_folder_objects(bucket, s3_folder):
return (obj for obj in bucket.objects.filter(Prefix=s3_folder) if should_download_file(obj.key))


def download_shoot(session: boto3.session.Session, shoot_number, local_dir, max_batch_bytes):
def download_shoot(session: boto3.session.Session, shoot_number, local_dir, max_batch_bytes, ignore_suffixes):
# Allowing enough connections for each thread to have two of them
# prevents the `urllib3.connectionpool:Connection pool is full` warning
# and allows for better connection reuse.
return download_shoot_folder(get_source_bucket(session, max_connections=THREADS * 2), shoot_number, local_dir, max_batch_bytes)
return download_shoot_folder(get_source_bucket(session, max_connections=THREADS * 5), shoot_number, local_dir, max_batch_bytes, ignore_suffixes)


def download_shoot_folder(bucket, shoot_number, local_dir, max_batch_bytes):
return download_s3_folder(bucket, shoot_number_to_folder_path(shoot_number), local_dir, max_batch_bytes)
def download_shoot_folder(bucket, shoot_number, local_dir, max_batch_bytes, ignore_suffixes):
return download_s3_folder(bucket, shoot_number_to_folder_path(shoot_number), local_dir, max_batch_bytes, ignore_suffixes)


def download_s3_folder(bucket, s3_folder: str, local_dir: str, max_batch_bytes: int):
def download_s3_folder(bucket, s3_folder: str, local_dir: str, max_batch_bytes: int, ignore_suffixes):
"""
Download the relevant content from an s3 folder to local_dir.

Expand All @@ -67,12 +67,15 @@ def download_s3_folder(bucket, s3_folder: str, local_dir: str, max_batch_bytes:
os.makedirs(local_dir, exist_ok=True)
if use_suffix:
suffix = f"_{ix:03d}"
with ThreadPoolExecutor(max_workers=THREADS) as executor:
files = list(executor.map(
partial(download_s3_file, local_dir=local_dir, s3_folder=s3_folder),
batch
))
yield files, suffix
if suffix in ignore_suffixes:
logger.info(f"{suffix} already on target, ignoring")
else:
with ThreadPoolExecutor(max_workers=THREADS) as executor:
files = list(executor.map(
partial(download_s3_file, local_dir=local_dir, s3_folder=s3_folder),
batch
))
yield files, suffix


def download_s3_file(object_summary, *, local_dir: str, s3_folder: str):
Expand Down
13 changes: 7 additions & 6 deletions src/transferrer/transfer.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,26 +2,26 @@
import tempfile
from transferrer.download import download_shoot
from transferrer.make_zip import make_zip_from
from transferrer.upload import upload
from transferrer.upload import upload, get_target_bucket
import boto3
import re

re_extract_suffix = re.compile('(_\\d\\d\\d)\\.zip')

MAX_SPACE_BYTES = os.getenv('MAX_SPACE_BYTES', 10240000000) # maximum setting for Lambda Ephemeral Storage


TARGET_BUCKET = os.getenv("TARGET_BUCKET")

def shoot_number_to_accession_id(accession_number, shoot_number):
"""
The accession id is simply the shoot_number prefixed with the accession number.

>>> shoot_number_to_accession_id("2754", "CP000159")
'2754_CP000159'
"""
if accession_number and shoot_number:
return accession_number + "_" + shoot_number
else:
raise ValueError(
f"misssing accession or shoot number - accession: '{accession_number}' shoot: '{shoot_number}'"
f"missing accession or shoot number - accession: '{accession_number}' shoot: '{shoot_number}'"
)


Expand All @@ -34,7 +34,8 @@ def transfer_shoot(from_session, to_session, shoot_number, accession_number, max
tmpfolder = root_dir.name
source_folder = os.path.join(tmpfolder, "source")
target_folder = os.path.join(tmpfolder, "target")
for files, suffix in download_shoot(from_session, shoot_number, source_folder, max_batch_bytes):
already_up = [match.group(1) for match in (re_extract_suffix.search(o.key) for o in get_target_bucket(to_session, TARGET_BUCKET).filter(Prefix=f"born-digital-accessions/{accession_id}")) if match]
for files, suffix in download_shoot(from_session, shoot_number, source_folder, max_batch_bytes, ignore_suffixes=already_up):
upload(
to_session,
make_zip_from(files, source_folder, target_folder, accession_id, suffix)
Expand Down
1 change: 1 addition & 0 deletions terraform/main.tf
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ module "restorer_lambda" {

module "toucher_lambda" {
source = "./modules/toucher_lambda"

environment = "production"
lambda_zip = data.archive_file.toucher_zip
providers = {
Expand Down
2 changes: 1 addition & 1 deletion terraform/modules/transfer_throttle/main.tf
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ module "transfer_throttle_lambda" {
module "transfer_scheduler" {
source = "../lambda_scheduler"
cron = "cron(30 7,9,11,13,15,16 ? * MON-FRI *)"
description = "Moves batches of shoots to the transferer at a rate Archivematica can handle"
description = "Moves batches of shoots to the transferrer at a rate Archivematica can handle"
lambda_arn = module.transfer_throttle_lambda.lambda.arn
lambda_function_name = module.transfer_throttle_lambda.lambda.function_name
name = "transfer_throttle"
Expand Down
16 changes: 16 additions & 0 deletions terraform/modules/transferrer_lambda/main.tf
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,22 @@ resource "aws_iam_role_policy" "write_to_archivematica_transfer_source" {
)
}

resource "aws_iam_role_policy" "list_archivematica_transfer_source" {
role = module.transfer_lambda.lambda_role.name
name = "list_archivematica_transfer_source-${var.environment}"
policy = jsonencode({
Version = "2012-10-17"
Statement = [
{
"Effect": "Allow",
"Action": "s3:ListBucket",
"Resource": "arn:aws:s3:::${local.target_bucket}"
},
]
}
)
}

resource "aws_iam_role_policy" "read_from_editorial_photography" {
role = module.transfer_lambda.lambda_role.name
name = "read_from_editorial_photography-${var.environment}"
Expand Down
Loading