Skip to content

Commit

Permalink
polishing off the python function and client scripts (#19)
Browse files Browse the repository at this point in the history
* snagging the python function and client scripts

* Update tests to new interfaces

* deploy with Terraform (#20)

* deploy with Terraform

* Add Makefile (#21)

* Add Makefile

* set the splitter to 20

* neater permission

* Makefile description (#23)

* deploy with Terraform

* Add Makefile

* set the splitter to 20

* neater permission

* better description of touched
  • Loading branch information
paul-butcher authored Jul 23, 2024
1 parent 44a273e commit 4581660
Show file tree
Hide file tree
Showing 33 changed files with 639 additions and 38 deletions.
52 changes: 52 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
.PHONY: shoots/clean %.sliced
.SECONDARY:

# Remove intermediate/final files from the shoots folder
shoots/clean:
rm shoots/*restored
rm shoots/*transferred
rm shoots/*slice*

# Request the Glacier restoration of the shoots in the given file
# The file is expected to contain one shoot identifier per line.
# In order to run this, set your AWS profile to one with authority in the workflow account.
%.restored : %
cat $< | python src/restore.py
cp $< $@


# Request the Glacier transfer of the shoots in the given file
# This rule depends on restoration having completed, which is not guaranteed
# (or even likely) if you run this rule without having previously requested the restoration
# Any shoots that are not yet fully restored will result in a DLQ message that can eventually
# be redriven when the s3 objects are finally available for download
# In order to run this, set your AWS profile to one with authority in the digitisation account.

# transfer to staging (see above)
%.transferred.staging: %.restored
cat $< | python src/start_transfers.py staging
cp $< $@


# transfer to production (see above)
%.transferred.production: %.restored
cat $< | python src/start_transfers.py production
cp $< $@

# Slice a given input file into manageable chunks, so that you can run them through the
# transfer process separately without overwhelming the target system.
# The right number for archivematica is probably about 20.

%.sliced: %
split -l 20 $< $<.

# Touch the files already on AWS. This will stimulate the corresponding transfer lambdas.
# The target system can sometimes be unexpectedly unavailable or overwhelmed,
# resulting in failures.
# This allows us to invoke the process from just before the failure
# In order to run this, set your AWS profile to one with authority in the digitisation account.
%.touched.staging: %
cat % | python src/touch.py staging

%.touched.production: %
cat % | python src/touch.py production
26 changes: 26 additions & 0 deletions src/check_status.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
import logging
import boto3
from transferrer.common import should_download_file, shoot_number_to_folder_path, get_source_bucket

logger = logging.getLogger(__name__)


def check_shoot_restore_status(bucket, shoot_number):
check_folder_restore_status(bucket, shoot_number_to_folder_path(shoot_number))


def check_folder_restore_status(bucket, s3_folder: str):
logger.info(s3_folder)
for obj in bucket.objects.filter(Prefix=s3_folder, OptionalObjectAttributes=[
'RestoreStatus',
]):
if should_download_file(obj.key):
status = obj.restore_status
else:
status = "ignored"
logger.info(f"{obj.key}\t{status}")


if __name__ == "__main__":
import sys
check_shoot_restore_status(get_source_bucket(boto3.Session()), sys.argv[1])
35 changes: 35 additions & 0 deletions src/lambda_function.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
import os
import json
import boto3
from transferrer.transfer import transfer_shoot
import logging
logging.basicConfig(level=logging.INFO)

logger = logging.getLogger()
logger.setLevel(logging.INFO)

ACCESSION_NUMBER = os.getenv("ACCESSION_NUMBER", "2754")


def lambda_handler(event, context):
shoots = [single_handler(shoot_id) for shoot_id in get_shoot_ids(event)]
return {
'statusCode': 200,
'body': shoots
}


def single_handler(shoot_id):
logger.info(f"transferring {shoot_id}")
transfer_shoot(
from_session=boto3.Session(),
to_session=boto3.Session(),
shoot_number=shoot_id,
accession_number=ACCESSION_NUMBER
)
return shoot_id


def get_shoot_ids(event):
for record in event['Records']:
yield json.loads(record['body'])["Message"]
52 changes: 52 additions & 0 deletions src/restore.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
import logging
import boto3
from botocore.exceptions import ClientError

from transferrer.common import should_download_file, shoot_number_to_folder_path, get_source_bucket

logger = logging.getLogger(__name__)


def restore_s3_folder(bucket, s3_folder: str, days_to_keep=1):
logger.info(f"restoring folder: {s3_folder}")
for obj in bucket.objects.filter(Prefix=s3_folder):
if should_download_file(obj.key):
try:
logger.info(f"restoring object: {obj.key}")
obj.restore_object(
RestoreRequest={
'Days': days_to_keep,
'GlacierJobParameters': {
'Tier': 'Bulk'
}
}
)
except ClientError as e:
if "The operation is not valid for the object's storage class" in str(e):
logger.info(f"attempt to restore non-glacier object: {obj.key}")
elif "RestoreAlreadyInProgress" in str(e):
logger.info(f"redundant attempt to restore object: {obj.key}")
else:
raise
else:
logger.info(f"ignoring {obj.key}")


def restore_shoot_folder(bucket, shoot_number):
restore_s3_folder(bucket, shoot_number_to_folder_path(shoot_number))


def restore_shoot_folders(shoot_numbers):
bucket = get_source_bucket(boto3.Session())
for shoot_number in (n.strip() for n in shoot_numbers):
logger.info(f"restoring shoot {shoot_number}")
restore_shoot_folder(bucket, shoot_number)


def main():
import sys
restore_shoot_folders(sys.stdin.readlines())


if __name__ == "__main__":
main()
14 changes: 14 additions & 0 deletions src/start_transfers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
import sys
import boto3


def post_messages(session, environment, shoot_numbers):
sns = session.resource("sns")
topic = sns.Topic(f"arn:aws:sns:eu-west-1:404315009621:transfer-shoots-{environment}")
for shoot_number in shoot_numbers:
print(f"requesting transfer of {shoot_number}")
topic.publish(Message=shoot_number.strip())


if __name__ == "__main__":
post_messages(boto3.Session(), sys.argv[1], sys.stdin.readlines())
42 changes: 42 additions & 0 deletions src/touch.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
"""
Like unix touch, this updates objects in s3 without substantive change.
Call with a newline-separated list of keys, thus
echo 'born-digital-accessions/2754_CP000179.zip \n born-digital-accessions/2754_CP000181.zip' | touch.py
This is intended for use when Archivematica has ephemerally failed to accept some of the shoot zips.
A fortnightly list of failed zips is published to Slack, but you could also generate it by looking elsewhere.
"""
import sys

import boto3
import datetime

BUCKETS = {
"staging": "wellcomecollection-archivematica-staging-transfer-source",
"production": "wellcomecollection-archivematica-transfer-source"
}


def touch_object(session, bucket, key):
print(f"touching: {bucket}/{key}")
session.client('s3').copy_object(
Bucket=bucket,
CopySource={'Bucket': bucket, 'Key': key},
Key=key,
Metadata={
'touched': datetime.datetime.now().isoformat()
},
MetadataDirective="REPLACE"
)


def touch_objects(session, bucket, object_keys):
for object_key in object_keys:
touch_object(session, bucket, object_key.strip())


if __name__ == '__main__':
touch_objects(boto3.Session(), BUCKETS[sys.argv[1]], sys.stdin.readlines())
7 changes: 3 additions & 4 deletions src/transferrer/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,11 @@
SOURCE_BUCKET = "wellcomecollection-editorial-photography"


def get_source_bucket(max_connections=10):
return get_source_client(max_connections).Bucket(SOURCE_BUCKET)
def get_source_bucket(session: boto3.session.Session, max_connections=10):
return get_source_client(session, max_connections).Bucket(SOURCE_BUCKET)


def get_source_client(max_connections):
session = boto3.Session()
def get_source_client(session: boto3.session.Session, max_connections):
return session.resource('s3', config=botocore.config.Config(
max_pool_connections=max_connections
))
Expand Down
7 changes: 4 additions & 3 deletions src/transferrer/download.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import logging
from concurrent.futures import ThreadPoolExecutor
from functools import partial
import boto3

from transferrer.common import should_download_file, shoot_number_to_folder_path, get_source_bucket

Expand All @@ -12,11 +13,11 @@
THREADS = 10


def download_shoot(shoot_number, local_dir):
def download_shoot(session: boto3.session.Session, shoot_number, local_dir):
# Allowing enough connections for each thread to have two of them
# prevents the `urllib3.connectionpool:Connection pool is full` warning
# and allows for better connection reuse.
download_shoot_folder(get_source_bucket(max_connections=THREADS * 2), shoot_number, local_dir)
download_shoot_folder(get_source_bucket(session, max_connections=THREADS * 2), shoot_number, local_dir)


def download_shoot_folder(bucket, shoot_number, local_dir):
Expand Down Expand Up @@ -51,4 +52,4 @@ def download_s3_file(object_summary, *, local_dir: str, s3_folder: str):
import sys
shoot_number = sys.argv[1]
download_folder = os.path.join("download", shoot_number)
download_shoot(sys.argv[1], download_folder)
download_shoot(boto3.Session(), sys.argv[1], download_folder)
18 changes: 9 additions & 9 deletions src/transferrer/make_zip.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,12 +56,11 @@ def create_born_digital_folder(source_files, target_directory, accession_id):

def move_files_to_objects_folder(source_files, target_directory):
"""
Move the contents of source directory into the objects subfolder in the target directory
Move the contents of source directory into the target directory
"""
os.makedirs(os.path.join(target_directory, "objects"))
os.makedirs(target_directory, exist_ok=True)
for file_abspath in source_files:
shutil.move(file_abspath, os.path.join(target_directory, "objects"))
shutil.move(file_abspath, target_directory)


def files_in_folder(source_directory):
Expand Down Expand Up @@ -103,21 +102,22 @@ def create_born_digital_zips(
accession_id = f"{shoot_number_to_accession_id(accession_number, shoot_number)}_{ix:03d}"
yield create_born_digital_zip(
full_paths(source_directory, batch),
os.path.join(target_directory, accession_id),
target_directory,
accession_id,
)
else:
accession_id = shoot_number_to_accession_id(accession_number, shoot_number)
yield create_born_digital_zip(
full_paths(source_directory, filenames),
os.path.join(target_directory, accession_id),
target_directory,
accession_id,
)


def create_born_digital_zip(source_files, target_directory, accession_id):
folder = create_born_digital_folder(source_files, target_directory, accession_id)
return shutil.make_archive(accession_id, "zip", folder)
def create_born_digital_zip(source_files, target_directory: str, accession_id: str):
accession_path = os.path.join(target_directory, accession_id)
folder = create_born_digital_folder(source_files, accession_path, accession_id)
return shutil.make_archive(accession_path, "zip", folder)


# copied from itertools 3.12 documentation.
Expand Down
25 changes: 25 additions & 0 deletions src/transferrer/transfer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
import os
import tempfile
from transferrer.download import download_shoot
from transferrer.make_zip import create_born_digital_zips
from transferrer.upload import upload
import boto3


def transfer_shoot(from_session, to_session, shoot_number, accession_number):
with tempfile.TemporaryDirectory() as tmpfolder:
source_folder = os.path.join(tmpfolder, "source")
target_folder = os.path.join(tmpfolder, "target")
download_shoot(from_session, shoot_number, source_folder)
for zipfile in create_born_digital_zips(source_folder, target_folder, accession_number, shoot_number, 250):
upload(to_session, zipfile)


if __name__ == "__main__":
import sys
transfer_shoot(
from_session=boto3.Session(profile_name=os.environ["AWS_SOURCE_PROFILE"]),
to_session=boto3.Session(profile_name=os.environ["AWS_TARGET_PROFILE"]),
shoot_number=sys.argv[1],
accession_number="2754"
)
12 changes: 7 additions & 5 deletions src/transferrer/upload.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,17 @@

logger = logging.getLogger(__name__)

TARGET_BUCKET = os.getenv("TARGET_BUCKET", "wellcomecollection-archivematica-staging-transfer-source")

def upload(s3, zipfile_path, target_bucket_name="wellcomecollection-archivematica-staging-transfer-source"):

def upload(session, zipfile_path, target_bucket_name=TARGET_BUCKET):
logger.info(f"uploading {zipfile_path} to {target_bucket_name}")
get_target_bucket(s3, target_bucket_name).upload_file(zipfile_path, f"born-digital-accessions/{os.path.basename(zipfile_path)}")
get_target_bucket(session, target_bucket_name).upload_file(zipfile_path, f"born-digital-accessions/{os.path.basename(zipfile_path)}")


def get_target_bucket(s3, target_bucket):
return s3.Bucket(target_bucket)
def get_target_bucket(session, target_bucket):
return session.resource('s3').Bucket(target_bucket)


if __name__ == "__main__":
upload( boto3.Session(profile_name=os.environ["AWS_TARGET_PROFILE"]).resource('s3'), sys.argv[1])
upload(boto3.Session(profile_name=os.environ["AWS_TARGET_PROFILE"]), sys.argv[1])
Loading

0 comments on commit 4581660

Please sign in to comment.