From 8902b09905604c452d73bf408c7fa0f8cbcdbcf6 Mon Sep 17 00:00:00 2001 From: Paul Butcher Date: Thu, 17 Oct 2024 10:14:36 +0100 Subject: [PATCH] End to end scheduling (#32) * add failure list compilation * add es to requirements * polish up pending/failure lists * polish up pending list check * move failure/pending lists * improve structure * end to end scheduling * tidy * tidy * improve commentary * remove broken diagram * fix diagram * tidy * print -> log * help GH find the client-only code * help GH find the client-only code * fix stuff found in review * longer timeout --- .github/workflows/test.yml | 2 +- Makefile | 25 +++- client/compile_failure_list.py | 57 +++++++++ client/compile_pending_list.py | 108 ++++++++++++++++++ client/objects_on_target.py | 22 ++++ client/reporting_client.py | 20 ++++ client/start_restores.py | 14 +++ client/start_touches.py | 14 +++ {src => client}/start_transfers.py | 0 client/touchable.py | 9 ++ client/untouchable.py | 9 ++ src/check_status.py | 2 +- src/restore.py | 59 ++++++---- src/restore_lambda.py | 74 ++++++++++++ src/transfer_throttle.py | 71 ++++++++++++ terraform/README.md | 35 ++++++ terraform/main.tf | 40 +++++-- terraform/modules/lambda_scheduler/main.tf | 24 ++++ .../modules/lambda_scheduler/variables.tf | 18 +++ .../modules/notification_queue/README.md | 7 ++ terraform/modules/notification_queue/main.tf | 2 +- .../modules/notification_queue/variables.tf | 6 + terraform/modules/restorer_lambda/main.tf | 104 +++++++++++++++++ terraform/modules/restorer_lambda/outputs.tf | 11 ++ terraform/modules/restorer_lambda/provider.tf | 7 ++ .../modules/restorer_lambda/variables.tf | 8 ++ terraform/modules/toucher_lambda/main.tf | 2 +- terraform/modules/transfer_throttle/README.md | 13 +++ terraform/modules/transfer_throttle/main.tf | 104 +++++++++++++++++ .../modules/transfer_throttle/outputs.tf | 4 + .../modules/transfer_throttle/provider.tf | 7 ++ .../modules/transfer_throttle/variables.tf | 20 ++++ .../modules/transferrer_lambda/variables.tf | 2 +- terraform/modules/transferrer_pipe/main.tf | 1 + terraform/modules/transferrer_pipe/outputs.tf | 3 + .../modules/transferrer_pipe/variables.tf | 7 +- terraform/provider.tf | 2 +- 37 files changed, 870 insertions(+), 43 deletions(-) create mode 100644 client/compile_failure_list.py create mode 100644 client/compile_pending_list.py create mode 100644 client/objects_on_target.py create mode 100644 client/reporting_client.py create mode 100644 client/start_restores.py create mode 100644 client/start_touches.py rename {src => client}/start_transfers.py (100%) create mode 100644 client/touchable.py create mode 100644 client/untouchable.py create mode 100644 src/restore_lambda.py create mode 100644 src/transfer_throttle.py create mode 100644 terraform/README.md create mode 100644 terraform/modules/lambda_scheduler/main.tf create mode 100644 terraform/modules/lambda_scheduler/variables.tf create mode 100644 terraform/modules/notification_queue/README.md create mode 100644 terraform/modules/restorer_lambda/main.tf create mode 100644 terraform/modules/restorer_lambda/outputs.tf create mode 100644 terraform/modules/restorer_lambda/provider.tf create mode 100644 terraform/modules/restorer_lambda/variables.tf create mode 100644 terraform/modules/transfer_throttle/README.md create mode 100644 terraform/modules/transfer_throttle/main.tf create mode 100644 terraform/modules/transfer_throttle/outputs.tf create mode 100644 terraform/modules/transfer_throttle/provider.tf create mode 100644 terraform/modules/transfer_throttle/variables.tf create mode 100644 terraform/modules/transferrer_pipe/outputs.tf diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index ecfd4e7..ca93fad 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -30,4 +30,4 @@ jobs: flake8 . --count --exit-zero --max-complexity=10 --max-line-length=127 --statistics - name: Test with pytest run: | - pytest --doctest-modules + PYTHONPATH=$PWD/client:$PYTHONPATH pytest --doctest-modules diff --git a/Makefile b/Makefile index 3733532..0adc7fb 100644 --- a/Makefile +++ b/Makefile @@ -7,6 +7,7 @@ shoots/clean: rm shoots/*transferred rm shoots/*slice* rm shoots/*failed* + rm shoots/*transfer_status # Slice a given input file into manageable chunks, so that you can run them through the # transfer process separately without overwhelming the target system. @@ -58,8 +59,24 @@ shoots/clean: # compile a list of shoots that failed since a given time, thus: # make shoots/2024-08-06T15:33:00Z.failed shoots/%.failed: src/compile_failure_list.py - python src/compile_failure_list.py $* > $@ + python client/compile_failure_list.py $* > $@ + +# Once the whole thing is done, check that everything has actually gone through +# This produces a CSV recording +# True (successfully transferred) or False (not successfully transferred) +# against each shoot +%.transfer_status: % + cat $< | python client/compile_pending_list.py $* > $@ + +# Compile lists for retrying: + +# Some things may have failed in the target system +# These are s3 keys that can be passed through the 'touched' target +%.transfer_status.touchable: %.transfer_status + grep False $< | sed 's/,.*//' | python client/touchable.py production > $@ + +# Others may have failed to transfer (or have been deleted from the target bucket due to expiry) +# These are shoot identifiers that need to go back through the whole system again +%.transfer_status.needs_transfer: %.transfer_status + grep False $< | sed 's/,.*//' | python client/untouchable.py production | sed 's/.*2754_//' | sed 's/\.zip//' > $@ -# Once the whole thing is done -%.todo: % - cat $< | python src/compile_pending_list.py $* > $@ diff --git a/client/compile_failure_list.py b/client/compile_failure_list.py new file mode 100644 index 0000000..c4acd03 --- /dev/null +++ b/client/compile_failure_list.py @@ -0,0 +1,57 @@ +""" +Produce a list of records that have failed since a given date/time. +Usage: + +> python compile_failure_list.py 2024-09-05T40:00:00 + +This will print out the S3 keys of all the zips that have been +transferred to Archivematica, but failed to fully process, since 1400 on the 5th of September 2024. + + +""" +import boto3 +import datetime +from reporting_client import get_es_client + + +def get_failures_since(session, since_time): + es = get_es_client(session) + response = es.search( + index="storage_ingests", + size=100, + query=get_query(since_time), + source=False, + fields=["bag.info.externalIdentifier", "lastModifiedDate"] + ) + print("\n".join(get_zip_paths(response["hits"]["hits"]))) + + +def get_zip_paths(hits): + return (f'born-digital-accessions/{hit["fields"]["bag.info.externalIdentifier"][0]}.zip' for hit in hits) + + +def get_query(since_time): + return { + "bool": { + "filter": [ + {"term": { + "status.id": "failed" + }}, + {"range": { + "lastModifiedDate": { + "gte": since_time + } + }} + ] + } + } + + +def main(): + import sys + get_failures_since(boto3.Session(), datetime.datetime.fromisoformat(sys.argv[1])) + + + +if __name__ == "__main__": + main() diff --git a/client/compile_pending_list.py b/client/compile_pending_list.py new file mode 100644 index 0000000..1ced4e6 --- /dev/null +++ b/client/compile_pending_list.py @@ -0,0 +1,108 @@ +""" +Compile a list of the ingested status of the requested shoots. + +Given a list of shoots that you want to have been ingested, +this will check whether they have all been successfully ingested (True) +or not (False). + +A shoot may have not been ingested due to a failure, or because it +is yet to be transferred (either in progress or just not even started) + +This contrasts with compile_failure_list.py, which produces a list of recent failures. + +Usage: +Provide a newline separated list of shoot identifiers on STDIN, +e.g. given a file myfile.txt: +``` +CP1G00D1 +CP1BAAD1 +CP000001 +CP999999 +``` +where +* CP1G00D1 and CP000001 have both been ingested, +* CP1BAAD1 is somehow broken +* CP999999 is yet to be ingested + +$ cat myfile.txt | python compile_pending_list.py + +Output: +``` +2754_CP1G00D1, True +2754_CP1BAAD1, False +2754_CP000001, True +2754_CP999999, False +``` +""" + +import boto3 +from reporting_client import get_es_client + + +def get_successful_list(session, expected): + es = get_es_client(session) + response = es.search( + index="storage_ingests", + size=1000, + query=find_shoots_query(expected), + source=False, + fields=["bag.info.externalIdentifier", "lastModifiedDate"] + ) + succeeded = get_identifiers(response["hits"]["hits"]) + for shoot in expected: + if shoot in succeeded: + print(f'{shoot}, True') + else: + print(f'{shoot}, {is_cracked_shoot_successful(es, shoot)}') + + +def is_cracked_shoot_successful(es, shoot): + response = es.search( + index="storage_ingests", + size=1000, + query=find_subshoots_query(shoot), + source=False, + fields=["bag.info.externalIdentifier", "lastModifiedDate", "status.id"] + ) + + return bool(response['hits']['hits']) and all((hit['fields']['status.id'] == "succeeded" for hit in response['hits']['hits'])) + + +def get_identifiers(hits): + return [hit["fields"]["bag.info.externalIdentifier"][0] for hit in hits] + + +def find_shoots_query(shoots): + return { + "bool": { + "filter": [ + {"term": { + "status.id": "succeeded" + }}, + {"terms": { + "bag.info.externalIdentifier": shoots + }} + ] + } + } + + +def find_subshoots_query(shoot): + return { + "bool": { + "filter": [ + {"prefix": { + "bag.info.externalIdentifier": shoot + }} + ] + } + } + + +def main(): + import sys + get_successful_list(boto3.Session(), [f"2754_{shoot.strip()}" for shoot in sys.stdin.readlines()]) + + +if __name__ == "__main__": + main() diff --git a/client/objects_on_target.py b/client/objects_on_target.py new file mode 100644 index 0000000..b315264 --- /dev/null +++ b/client/objects_on_target.py @@ -0,0 +1,22 @@ + +import botocore + +BUCKETS = { + "staging": "wellcomecollection-archivematica-staging-transfer-source", + "production": "wellcomecollection-archivematica-transfer-source" +} + + +def find_objects(session, bucket, object_keys, yield_on_found): + for object_key in object_keys: + full_key = f"born-digital-accessions/{object_key.strip()}.zip" + try: + session.client('s3').head_object(Bucket=bucket, Key=full_key) + if yield_on_found: + yield full_key + except botocore.exceptions.ClientError as e: + if e.response['Error']['Code'] == '404': + if not yield_on_found: + yield full_key + else: + raise diff --git a/client/reporting_client.py b/client/reporting_client.py new file mode 100644 index 0000000..a9ee5c4 --- /dev/null +++ b/client/reporting_client.py @@ -0,0 +1,20 @@ +from elasticsearch import Elasticsearch + +def get_es_client(session): + """ + Returns an Elasticsearch client for the reporting cluster. + """ + username = get_secret_string( + session, secret_id="reporting/read_only/es_username" + ) + password = get_secret_string( + session, secret_id=f"reporting/read_only/es_password" + ) + host = get_secret_string( + session, secret_id=f"reporting/es_host" + ) + return Elasticsearch(f"https://{host}", basic_auth=(username, password)) + + +def get_secret_string(session, *, secret_id): + return session.client("secretsmanager").get_secret_value(SecretId=secret_id)["SecretString"] diff --git a/client/start_restores.py b/client/start_restores.py new file mode 100644 index 0000000..fef48bb --- /dev/null +++ b/client/start_restores.py @@ -0,0 +1,14 @@ +import sys +import boto3 + + +def post_messages(session, shoot_numbers): + sns = session.resource("sns") + topic = sns.Topic(f"arn:aws:sns:eu-west-1:760097843905:restore_shoots-production") + for shoot_number in shoot_numbers: + print(f"requesting restore of {shoot_number}") + topic.publish(Message=shoot_number.strip()) + + +if __name__ == "__main__": + post_messages(boto3.Session(), sys.stdin.readlines()) diff --git a/client/start_touches.py b/client/start_touches.py new file mode 100644 index 0000000..7a0edf0 --- /dev/null +++ b/client/start_touches.py @@ -0,0 +1,14 @@ +import sys +import boto3 + + +def post_messages(session, environment, shoot_numbers): + sns = session.resource("sns") + topic = sns.Topic(f"arn:aws:sns:eu-west-1:404315009621:touch_shoots-{environment}") + for shoot_number in shoot_numbers: + print(f"requesting touch of {shoot_number}") + topic.publish(Message=shoot_number.strip()) + + +if __name__ == "__main__": + post_messages(boto3.Session(), sys.argv[1], sys.stdin.readlines()) diff --git a/src/start_transfers.py b/client/start_transfers.py similarity index 100% rename from src/start_transfers.py rename to client/start_transfers.py diff --git a/client/touchable.py b/client/touchable.py new file mode 100644 index 0000000..a3f270f --- /dev/null +++ b/client/touchable.py @@ -0,0 +1,9 @@ +import boto3 +import sys + +from objects_on_target import find_objects, BUCKETS + +if __name__ == '__main__': + print("\n".join( + find_objects(boto3.Session(), BUCKETS[sys.argv[1]], sys.stdin.readlines(), True) + )) diff --git a/client/untouchable.py b/client/untouchable.py new file mode 100644 index 0000000..84a5e32 --- /dev/null +++ b/client/untouchable.py @@ -0,0 +1,9 @@ +import boto3 +import sys + +from objects_on_target import find_objects, BUCKETS + +if __name__ == '__main__': + print("\n".join( + find_objects(boto3.Session(), BUCKETS[sys.argv[1]], sys.stdin.readlines(), False) + )) diff --git a/src/check_status.py b/src/check_status.py index 8983328..ef1195a 100644 --- a/src/check_status.py +++ b/src/check_status.py @@ -12,7 +12,7 @@ def check_shoot_restore_status(bucket, shoot_number): def check_folder_restore_status(bucket, s3_folder: str): logger.info(s3_folder) for obj in bucket.objects.filter(Prefix=s3_folder, OptionalObjectAttributes=[ - 'RestoreStatus', + 'RestoreStatus' ]): if should_download_file(obj.key): status = obj.restore_status diff --git a/src/restore.py b/src/restore.py index 61ea911..794384e 100644 --- a/src/restore.py +++ b/src/restore.py @@ -1,36 +1,45 @@ import logging +from functools import partial import boto3 from botocore.exceptions import ClientError +from concurrent.futures import ThreadPoolExecutor from transferrer.common import should_download_file, shoot_number_to_folder_path, get_source_bucket - logger = logging.getLogger(__name__) - -def restore_s3_folder(bucket, s3_folder: str, days_to_keep=1): - logger.info(f"restoring folder: {s3_folder}") - for obj in bucket.objects.filter(Prefix=s3_folder): - if should_download_file(obj.key): - try: - logger.info(f"restoring object: {obj.key}") - obj.restore_object( - RestoreRequest={ - 'Days': days_to_keep, - 'GlacierJobParameters': { - 'Tier': 'Bulk' - } - } - ) - except ClientError as e: - if "The operation is not valid for the object's storage class" in str(e): - logger.info(f"attempt to restore non-glacier object: {obj.key}") - elif "RestoreAlreadyInProgress" in str(e): - logger.info(f"redundant attempt to restore object: {obj.key}") - else: - raise +# This variable governs the degree of parallelism to use when restoring folders. +# The correct number is to be discovered by experimentation +THREADS = 10 + + +def restore_s3_folder(bucket, s3_folder: str, days_to_keep=3): + with ThreadPoolExecutor(max_workers=THREADS) as executor: + for key in executor.map( + partial(restore_file, days_to_keep=days_to_keep), + (obj for obj in bucket.objects.filter(Prefix=s3_folder) if should_download_file(obj.key)) + ): + logger.info(f"restore request sent: \t{key}") + + + +def restore_file(obj, *, days_to_keep): + try: + obj.restore_object( + RestoreRequest={ + 'Days': days_to_keep, + 'GlacierJobParameters': { + 'Tier': 'Bulk' + } + } + ) + return obj.key + except ClientError as e: + if "The operation is not valid for the object's storage class" in str(e): + logger.info(f"attempt to restore non-glacier object: {obj.key}") + elif "RestoreAlreadyInProgress" in str(e): + logger.info(f"redundant attempt to restore object: {obj.key}") else: - logger.info(f"ignoring {obj.key}") - + raise def restore_shoot_folder(bucket, shoot_number): restore_s3_folder(bucket, shoot_number_to_folder_path(shoot_number)) diff --git a/src/restore_lambda.py b/src/restore_lambda.py new file mode 100644 index 0000000..0d763d8 --- /dev/null +++ b/src/restore_lambda.py @@ -0,0 +1,74 @@ + +import os +import boto3 +import json +import logging +from restore import restore_shoot_folder +from transferrer.common import get_source_bucket + +logger = logging.getLogger(__name__) +logger.setLevel(logging.INFO) +SHOOTS_PER_DAY = 60 + + +def notify_shoots_restored(topic, shoot_numbers): + for shoot in shoot_numbers: + notify_shoot_restored(topic, shoot) + + +def notify_shoot_restored(topic, shoot_number): + logger.info(f"requesting transfer of {shoot_number} from restored queue") + topic.publish(Message=shoot_number.strip()) + return shoot_number + + +def lambda_main(event, context): + bucket = get_source_bucket(boto3.Session()) + sqs = boto3.client('sqs') + sns = boto3.resource("sns") + source_queue = os.getenv("SOURCE_QUEUE") + + topic = sns.Topic(arn=os.getenv("COMPLETED_TOPIC")) + + notify_shoots_restored( + topic, + restore_from_messages(get_messages(sqs, source_queue, SHOOTS_PER_DAY), bucket, source_queue, sqs) + ) + + +def get_messages(sqs, source_queue, count): + logger.info(f"pulling messages from {source_queue}") + received = 0 + while received < count: + response = sqs.receive_message( + QueueUrl=source_queue, + # 10 is the biggest number SQS can think of. + # but it is not guaranteed that this call will actually pull 10, + # so we keep going until we've either managed to pull _count_ messages + # or we don't get any messages, implying that the queue is now empty. + MaxNumberOfMessages=min(10, count - received), + WaitTimeSeconds=20 + ) + messages = response.get('Messages', []) + message_count = len(messages) + if message_count == 0: + logger.info(f"pulled zero messages, quitting") + break + received += message_count + logger.info(f"pulled {message_count}, total {received}") + for message in messages: + yield message + + +def restore_from_messages(messages, bucket, source_queue, sqs): + + for message in messages: + logger.info('processing message') + try: + shoot_number = json.loads(message['Body'])['Message'] + restore_shoot_folder(bucket, shoot_number) + handle = message['ReceiptHandle'] + sqs.delete_message(QueueUrl=source_queue, ReceiptHandle=handle) + yield shoot_number + except Exception as err: + logger.exception(err) diff --git a/src/transfer_throttle.py b/src/transfer_throttle.py new file mode 100644 index 0000000..ac4f651 --- /dev/null +++ b/src/transfer_throttle.py @@ -0,0 +1,71 @@ +""" +Pulls messages from the restored queue and notifies the transfer queue. + +Archivematica can only handle about 20 transfers at a time, and about 60 in a day. +This Lambda is the throttle between the restorer restoring a whole day of material +and the transferrer which will try to transfer records as fast as the queue provides them. + +This also allows the restoration to complete before attempting a transfer. + +The restorer lambda pulls a full day's batch of transfers from the restore queue in the evening +and populates the queue_shoot_transfers with them. + +The following day, this throttle pulls those requested transfers off the queue at a manageable rate, +and puts them on the "transfer-shoots" queue, which triggers the actual transfer. + +""" +import os +import boto3 +import json +import logging + +logger = logging.getLogger(__name__) +logger.setLevel(logging.INFO) + + +def lambda_main(event, context): + source_queue = os.getenv("SOURCE_QUEUE") + target_topic = os.getenv("TARGET_TOPIC") + sns = boto3.resource("sns") + topic = sns.Topic(target_topic) + sqs = boto3.client('sqs') + + for message in push_messages(topic, get_messages(sqs, source_queue, 10)): + sqs.delete_message(QueueUrl=source_queue, ReceiptHandle=message['ReceiptHandle']) + + +def get_messages(sqs, source_queue, count): + logger.info(f"pulling messages from {source_queue}") + received = 0 + while received < count: + response = sqs.receive_message( + QueueUrl=source_queue, + # 10 is the biggest number SQS can think of. + # but it is not guaranteed that this call will actually pull 10, + # so we keep going until we've either managed to pull _count_ messages + # or we don't get any messages, implying that the queue is now empty. + MaxNumberOfMessages=min(10, count - received), + WaitTimeSeconds=20 + ) + messages = response.get('Messages', []) + message_count = len(messages) + if message_count == 0: + logger.info(f"pulled zero messages, quitting") + break + received += message_count + logger.info(f"pulled {message_count}, total {received}") + for message in messages: + yield message + + +def push_messages(topic, messages): + for message in messages: + yield push_message(topic, message) + + +def push_message(topic, message): + shoot_number = json.loads(message['Body'])['Message'] + logger.info(f"requesting transfer of {shoot_number} from restored queue") + topic.publish(Message=shoot_number.strip()) + return message + diff --git a/terraform/README.md b/terraform/README.md new file mode 100644 index 0000000..37d2d2c --- /dev/null +++ b/terraform/README.md @@ -0,0 +1,35 @@ +# Restore and Transfer pipeline +## How does it all fit together? + +In the evening, the Restorer +* Pulls a day's worth of shoots from the restore +* Restores the from Glacier +* Notifies the transfer throttle queue. + +Restoration takes a nondeterministic amount of time up to 12 hours +```mermaid +sequenceDiagram + Restore Schedule->>+Restorer: It's 2000 + Restorer->>+Restore Queue: Gimme 60 + Restore Queue -->> Restorer: OK + Restorer->>S3:Restore + S3 -->>Restorer: On it! + Restorer->>-Transfer Throttle Queue:60 shoots +``` + +Across the day, the transfer throttle +* pulls a manageable quantity from its queue +* shifts them onto the transfer queue + +The transferrer then transfers everything on its queue +```mermaid +sequenceDiagram + Transfer Schedule->>+Transfer Throttle: It's time + Transfer Throttle ->> Transfer Throttle Queue: Gimme 10 + Transfer Throttle Queue-->> Transfer Throttle: OK + Transfer Throttle->>- Transfer Queue: 10 shoots + Transfer Queue ->>+ Transferrer: 10 shoots + Transferrer->> Photography S3: Gimme Shoots + PhotographyS3 -->> Transferrer: OK + Transferrer ->>- Archive S3: 10 Shoots +``` diff --git a/terraform/main.tf b/terraform/main.tf index 7a43b7c..88b49cc 100644 --- a/terraform/main.tf +++ b/terraform/main.tf @@ -1,6 +1,6 @@ locals { event_batching_window_timeout = 20 - lambda_timeout = 120 //two minutes + lambda_timeout = 600 //five minutes # The lambda event source pulls messages from SQS in batches, finally triggering the lambda # when either it has enough messages, or enough time has elapsed. @@ -27,6 +27,32 @@ data "archive_file" "toucher_zip" { source_file = "../src/touch.py" } + +data "archive_file" "transfer_throttle_zip" { + type = "zip" + output_path = "transfer_throttle.zip" + source_file = "../src/transfer_throttle.py" +} + + +module "restorer_lambda" { + source = "./modules/restorer_lambda" + lambda_zip = data.archive_file.lambda_zip + providers = { + aws: aws.platform + } +} + +module "toucher_lambda" { + source = "./modules/toucher_lambda" + environment = "production" + lambda_zip = data.archive_file.toucher_zip + providers = { + aws: aws.digitisation + } +} + + module "staging_lambda" { source = "./modules/transferrer_pipe" environment = "staging" @@ -35,7 +61,6 @@ module "staging_lambda" { providers = { aws: aws.digitisation } - } module "production_lambda" { @@ -46,16 +71,17 @@ module "production_lambda" { providers = { aws: aws.digitisation } - lambda_storage = 8192 + lambda_storage = 10240 lambda_timeout = 600 + extra_topics = [module.transfer_throttle.output_topic_arn] } - -module "toucher_lambda" { - source = "./modules/toucher_lambda" +module "transfer_throttle" { + source = "./modules/transfer_throttle" environment = "production" - lambda_zip = data.archive_file.toucher_zip + lambda_zip = data.archive_file.transfer_throttle_zip providers = { aws: aws.digitisation } + upstream_topic_arn = module.restorer_lambda.completion_topic_arn } \ No newline at end of file diff --git a/terraform/modules/lambda_scheduler/main.tf b/terraform/modules/lambda_scheduler/main.tf new file mode 100644 index 0000000..985ab77 --- /dev/null +++ b/terraform/modules/lambda_scheduler/main.tf @@ -0,0 +1,24 @@ + + +resource "aws_cloudwatch_event_rule" "schedule_rule" { + name = "${var.name}_schedule" + description = var.description + schedule_expression = var.cron +} + +resource "aws_cloudwatch_event_target" "lambda_target" { + rule = aws_cloudwatch_event_rule.schedule_rule.name + target_id = "${var.name}_lambda_target" + arn = var.lambda_arn +} + +resource "aws_lambda_permission" "allow_cloudwatch" { + statement_id = "AllowExecutionFromCloudWatch" + action = "lambda:InvokeFunction" + function_name = var.lambda_function_name + principal = "events.amazonaws.com" + source_arn = aws_cloudwatch_event_rule.schedule_rule.arn +} + + + diff --git a/terraform/modules/lambda_scheduler/variables.tf b/terraform/modules/lambda_scheduler/variables.tf new file mode 100644 index 0000000..b38b323 --- /dev/null +++ b/terraform/modules/lambda_scheduler/variables.tf @@ -0,0 +1,18 @@ +variable "name" { + type = string +} +variable "cron" { + type = string +} + +variable "lambda_function_name" { + type = string +} + +variable "lambda_arn" { + type = string +} + +variable "description" { + type = string +} \ No newline at end of file diff --git a/terraform/modules/notification_queue/README.md b/terraform/modules/notification_queue/README.md new file mode 100644 index 0000000..5c3e796 --- /dev/null +++ b/terraform/modules/notification_queue/README.md @@ -0,0 +1,7 @@ +# Notification Queue + +This module creates a queue/topic pair. Notifications sent to the topic +will become messages on the queue. + +If the queue needs to also subscribe to other topics, then these can be +provided as extra_topics. \ No newline at end of file diff --git a/terraform/modules/notification_queue/main.tf b/terraform/modules/notification_queue/main.tf index b75d311..f7a00de 100644 --- a/terraform/modules/notification_queue/main.tf +++ b/terraform/modules/notification_queue/main.tf @@ -14,7 +14,7 @@ module "input_queue" { queue_name = "${var.action_name}-${var.environment}" - topic_arns = [module.notification_topic.arn] + topic_arns = concat(var.extra_topics, [module.notification_topic.arn]) visibility_timeout_seconds = var.queue_visibility_timeout max_receive_count = 1 message_retention_seconds = 1209600 diff --git a/terraform/modules/notification_queue/variables.tf b/terraform/modules/notification_queue/variables.tf index b5c9e4d..ab8bcb4 100644 --- a/terraform/modules/notification_queue/variables.tf +++ b/terraform/modules/notification_queue/variables.tf @@ -12,4 +12,10 @@ variable "environment" { variable "action_name" { type = string +} + +variable "extra_topics" { + type = list(string) + default = [] + description = "List of topics defined elsewhere that the queue should subscribe to" } \ No newline at end of file diff --git a/terraform/modules/restorer_lambda/main.tf b/terraform/modules/restorer_lambda/main.tf new file mode 100644 index 0000000..7183994 --- /dev/null +++ b/terraform/modules/restorer_lambda/main.tf @@ -0,0 +1,104 @@ +locals { + lambda_name = "editorial-photography-transfer-restorer" + lambda_timeout = 60 + input_queue_visibility_timeout = 120 + environment = "production" + digitisation_account = "404315009621" +} + + +module "input_queue" { + source = "../notification_queue" + environment = local.environment + queue_visibility_timeout = local.input_queue_visibility_timeout + action_name = "restore_shoots" +} + + +module "restore_lambda" { + source = "git@github.com:wellcomecollection/terraform-aws-lambda?ref=v1.2.0" + + name = local.lambda_name + runtime = "python3.12" + handler = "restore_lambda.lambda_main" + + filename = var.lambda_zip.output_path + timeout = local.lambda_timeout + + source_code_hash = var.lambda_zip.output_base64sha256 + environment = { + variables = { + COMPLETED_TOPIC = module.completion_topic.arn + SOURCE_QUEUE = module.input_queue.queue_url + } + } +} + +resource "aws_iam_role_policy" "restore_shoot_from_glacier" { + role = module.restore_lambda.lambda_role.name + name = "restore_shoot_from_glacier" + policy = jsonencode({ + Version = "2012-10-17" + Statement = [ + { + "Effect": "Allow", + "Action": ["s3:ListBucket", "s3:RestoreObject"], + "Resource": ["arn:aws:s3:::wellcomecollection-editorial-photography", "arn:aws:s3:::wellcomecollection-editorial-photography/*"] + }, + ] + } + ) +} + + +resource "aws_iam_role_policy" "pull_from_restore_queue" { + role = module.restore_lambda.lambda_role.name + name = "restore_queue_receieve_message" + policy = jsonencode({ + Version = "2012-10-17" + Statement = [ + { + "Effect": "Allow", + "Action": ["sqs:ReceiveMessage","sqs:DeleteMessage"] + "Resource": module.input_queue.queue_arn + }, + ] + } + ) +} + +module "restorer_scheduler" { + source = "../lambda_scheduler" + cron = "cron(0 20 ? * SUN-THU *)" + description = "Restore a batch of shoots in the evening so they are ready to be transferred in the morning" + lambda_arn = module.restore_lambda.lambda.arn + lambda_function_name = module.restore_lambda.lambda.function_name + name = "restorer" +} + + +module "completion_topic" { + source = "github.com/wellcomecollection/terraform-aws-sns-topic.git?ref=v1.0.1" + name = "shoot_restored-${local.environment}" + cross_account_subscription_ids = [ + local.digitisation_account + ] +} + + +resource "aws_iam_role_policy" "notify_restored_topic" { + role = module.restore_lambda.lambda_role.name + name = "notify-shoot_restored-${local.environment}" + policy = jsonencode({ + Version = "2012-10-17" + Statement = [ + { + "Effect": "Allow", + "Action": "sns:Publish" + "Resource": module.completion_topic.arn + }, + ] + } + ) +} + diff --git a/terraform/modules/restorer_lambda/outputs.tf b/terraform/modules/restorer_lambda/outputs.tf new file mode 100644 index 0000000..63ff8f7 --- /dev/null +++ b/terraform/modules/restorer_lambda/outputs.tf @@ -0,0 +1,11 @@ +output "lambda" { + value = module.restore_lambda.lambda +} + +output "role" { + value = module.restore_lambda.lambda_role +} + +output "completion_topic_arn" { + value = module.completion_topic.arn +} diff --git a/terraform/modules/restorer_lambda/provider.tf b/terraform/modules/restorer_lambda/provider.tf new file mode 100644 index 0000000..7afdcf4 --- /dev/null +++ b/terraform/modules/restorer_lambda/provider.tf @@ -0,0 +1,7 @@ +terraform { + required_providers { + aws = { + source = "hashicorp/aws" + } + } +} \ No newline at end of file diff --git a/terraform/modules/restorer_lambda/variables.tf b/terraform/modules/restorer_lambda/variables.tf new file mode 100644 index 0000000..6b292df --- /dev/null +++ b/terraform/modules/restorer_lambda/variables.tf @@ -0,0 +1,8 @@ +variable "lambda_zip" { + type = object( + { + output_path = string, + output_base64sha256 = string + } + ) +} diff --git a/terraform/modules/toucher_lambda/main.tf b/terraform/modules/toucher_lambda/main.tf index 0d78ee5..329d590 100644 --- a/terraform/modules/toucher_lambda/main.tf +++ b/terraform/modules/toucher_lambda/main.tf @@ -91,7 +91,7 @@ resource "aws_cloudwatch_event_rule" "schedule_rule" { name = "toucher_schedule" description = "Trigger the toucher Lambda at 07:30, then five further times across the working day" - schedule_expression = "cron(30 7,9,11,13,15,16, ? * MON-FRI *)" + schedule_expression = "cron(30 7,9,11,13,15,16 ? * MON-FRI *)" } resource "aws_cloudwatch_event_target" "lambda_target" { diff --git a/terraform/modules/transfer_throttle/README.md b/terraform/modules/transfer_throttle/README.md new file mode 100644 index 0000000..aafaaaa --- /dev/null +++ b/terraform/modules/transfer_throttle/README.md @@ -0,0 +1,13 @@ +# Transfer throttle + +The transfer throttle moves messages from the restored queue to the transfer queue, +in batches of a suitable size for Archivematica to handle. +It runs when Archivematica starts in the morning, and spreads the load across the day + +## Why + +This exists for two reasons: + +* Restoration takes up to 12 hours +* Archivematica needs to be fed slowly over the day + diff --git a/terraform/modules/transfer_throttle/main.tf b/terraform/modules/transfer_throttle/main.tf new file mode 100644 index 0000000..64b5808 --- /dev/null +++ b/terraform/modules/transfer_throttle/main.tf @@ -0,0 +1,104 @@ +locals { + lambda_name = "editorial-photography-transfer_throttle-${var.environment}" + buckets = tomap( + { + staging = "wellcomecollection-archivematica-staging-transfer-source", + production = "wellcomecollection-archivematica-transfer-source" + } + ) + target_bucket = lookup(local.buckets, var.environment) + input_queue_visibility_timeout = 300 +} + +module "input_queue" { + source = "../notification_queue" + environment = var.environment + queue_visibility_timeout = local.input_queue_visibility_timeout + action_name = "queue_shoot_transfers" + extra_topics = [var.upstream_topic_arn] +} + +module "transfer_throttle_lambda" { + source = "git@github.com:wellcomecollection/terraform-aws-lambda?ref=v1.2.0" + name = local.lambda_name + runtime = "python3.12" + handler = "transfer_throttle.lambda_main" + filename = var.lambda_zip.output_path + timeout = 300 + + environment = { + variables = { + SOURCE_QUEUE = module.input_queue.queue_url + ENVIRONMENT = var.environment + TARGET_TOPIC = module.output_topic.arn + } + } + source_code_hash = var.lambda_zip.output_base64sha256 +} + + + +module "transfer_scheduler" { + source = "../lambda_scheduler" + cron = "cron(30 7,9,11,13,15,16 ? * MON-FRI *)" + description = "Moves batches of shoots to the transferer at a rate Archivematica can handle" + lambda_arn = module.transfer_throttle_lambda.lambda.arn + lambda_function_name = module.transfer_throttle_lambda.lambda.function_name + name = "transfer_throttle" +} + + +resource "aws_iam_role_policy" "pull_from_input_queue" { + role = module.transfer_throttle_lambda.lambda_role.name + name = "queue_shoot_transfers_receieve_message-${var.environment}" + policy = jsonencode({ + Version = "2012-10-17" + Statement = [ + { + "Effect": "Allow", + "Action": ["sqs:ReceiveMessage","sqs:DeleteMessage"] + "Resource": module.input_queue.queue_arn + }, + ] + } + ) +} + + +module "output_topic" { + source = "github.com/wellcomecollection/terraform-aws-sns-topic.git?ref=v1.0.1" + name = "transfer_throttle_output-${var.environment}" +} + +resource "aws_iam_role_policy" "notify_output_topic" { + role = module.transfer_throttle_lambda.lambda_role.name + name = "notify_throttled_transfer-${var.environment}" + policy = jsonencode({ + Version = "2012-10-17" + Statement = [ + { + "Effect": "Allow", + "Action": "sns:Publish" + "Resource": module.output_topic.arn + }, + ] + } + ) +} + + + + + + + + + + + + + + + + + diff --git a/terraform/modules/transfer_throttle/outputs.tf b/terraform/modules/transfer_throttle/outputs.tf new file mode 100644 index 0000000..0b89f6d --- /dev/null +++ b/terraform/modules/transfer_throttle/outputs.tf @@ -0,0 +1,4 @@ + +output "output_topic_arn" { + value = module.output_topic.arn +} diff --git a/terraform/modules/transfer_throttle/provider.tf b/terraform/modules/transfer_throttle/provider.tf new file mode 100644 index 0000000..7afdcf4 --- /dev/null +++ b/terraform/modules/transfer_throttle/provider.tf @@ -0,0 +1,7 @@ +terraform { + required_providers { + aws = { + source = "hashicorp/aws" + } + } +} \ No newline at end of file diff --git a/terraform/modules/transfer_throttle/variables.tf b/terraform/modules/transfer_throttle/variables.tf new file mode 100644 index 0000000..d0bd49a --- /dev/null +++ b/terraform/modules/transfer_throttle/variables.tf @@ -0,0 +1,20 @@ +variable "environment" { + type = string + validation { + condition = contains(["staging", "production"], var.environment) + error_message = "environment must be one of staging or production" + } +} + +variable "lambda_zip" { + type = object( + { + output_path = string, + output_base64sha256 = string + } + ) +} + +variable "upstream_topic_arn" { + type = string +} \ No newline at end of file diff --git a/terraform/modules/transferrer_lambda/variables.tf b/terraform/modules/transferrer_lambda/variables.tf index 721fe17..e1f22b1 100644 --- a/terraform/modules/transferrer_lambda/variables.tf +++ b/terraform/modules/transferrer_lambda/variables.tf @@ -21,4 +21,4 @@ variable "lambda_storage" { variable "lambda_timeout" { type = number -} \ No newline at end of file +} diff --git a/terraform/modules/transferrer_pipe/main.tf b/terraform/modules/transferrer_pipe/main.tf index ca973ba..d7f92c8 100644 --- a/terraform/modules/transferrer_pipe/main.tf +++ b/terraform/modules/transferrer_pipe/main.tf @@ -12,6 +12,7 @@ module "input_queue" { environment = var.environment queue_visibility_timeout = var.queue_visibility_timeout action_name = "transfer-shoots" + extra_topics = var.extra_topics } module "trigger" { diff --git a/terraform/modules/transferrer_pipe/outputs.tf b/terraform/modules/transferrer_pipe/outputs.tf new file mode 100644 index 0000000..5c959f9 --- /dev/null +++ b/terraform/modules/transferrer_pipe/outputs.tf @@ -0,0 +1,3 @@ +output "input_queue_arn" { + value = module.input_queue.queue_arn +} \ No newline at end of file diff --git a/terraform/modules/transferrer_pipe/variables.tf b/terraform/modules/transferrer_pipe/variables.tf index b47615b..d5d7f76 100644 --- a/terraform/modules/transferrer_pipe/variables.tf +++ b/terraform/modules/transferrer_pipe/variables.tf @@ -28,4 +28,9 @@ variable "lambda_storage" { variable "lambda_timeout" { type = number default = 300 //five minutes -} \ No newline at end of file +} + +variable "extra_topics" { + type = list(string) + default = [] +} diff --git a/terraform/provider.tf b/terraform/provider.tf index d822472..2b68168 100644 --- a/terraform/provider.tf +++ b/terraform/provider.tf @@ -4,7 +4,7 @@ provider "aws" { alias = "digitisation" allowed_account_ids = ["404315009621"] assume_role { - role_arn = "arn:aws:iam::404315009621:role/digitisation-developer" + role_arn = "arn:aws:iam::404315009621:role/digitisation-admin" } }