diff --git a/ebsco_adapter/ebsco_adapter/docker-compose.yml b/ebsco_adapter/ebsco_adapter/docker-compose.yml index 5b51475a30..8eab6bbdef 100644 --- a/ebsco_adapter/ebsco_adapter/docker-compose.yml +++ b/ebsco_adapter/ebsco_adapter/docker-compose.yml @@ -13,6 +13,7 @@ services: environment: - AWS_PROFILE=platform-developer - OUTPUT_TOPIC_ARN=${OUTPUT_TOPIC_ARN} + - REINDEX_TOPIC_ARN=${REINDEX_TOPIC_ARN} - CUSTOMER_ID=${CUSTOMER_ID} - S3_BUCKET=${S3_BUCKET} - S3_PREFIX=${S3_PREFIX} diff --git a/ebsco_adapter/ebsco_adapter/run_local.sh b/ebsco_adapter/ebsco_adapter/run_local.sh index 59d4acb3e5..0d9a067e25 100755 --- a/ebsco_adapter/ebsco_adapter/run_local.sh +++ b/ebsco_adapter/ebsco_adapter/run_local.sh @@ -13,11 +13,12 @@ CUSTOMER_ID=$(aws ssm get-parameter --name /catalogue_pipeline/ebsco_adapter/cus FTP_REMOTE_DIR=$(aws ssm get-parameter --name /catalogue_pipeline/ebsco_adapter/ftp_remote_dir --query "Parameter.Value" --output text) S3_BUCKET=$(aws ssm get-parameter --name /catalogue_pipeline/ebsco_adapter/bucket_name --query "Parameter.Value" --output text) OUTPUT_TOPIC_ARN=$(aws ssm get-parameter --name /catalogue_pipeline/ebsco_adapter/output_topic_arn --query "Parameter.Value" --output text) +REINDEX_TOPIC_ARN=$(aws ssm get-parameter --name /catalogue_pipeline/ebsco_adapter/reindex_topic_arn --query "Parameter.Value" --output text) # Update the S3_PREFIX to be the environment (use dev for local testing) S3_PREFIX=prod -export FTP_PASSWORD FTP_SERVER FTP_USERNAME CUSTOMER_ID FTP_REMOTE_DIR S3_BUCKET S3_PREFIX OUTPUT_TOPIC_ARN +export FTP_PASSWORD FTP_SERVER FTP_USERNAME CUSTOMER_ID FTP_REMOTE_DIR S3_BUCKET S3_PREFIX OUTPUT_TOPIC_ARN REINDEX_TOPIC_ARN # Ensure the docker image is up to date docker-compose --log-level ERROR build dev diff --git a/ebsco_adapter/ebsco_adapter/src/main.py b/ebsco_adapter/ebsco_adapter/src/main.py index 31ef1b578a..f39ba3ea45 100644 --- a/ebsco_adapter/ebsco_adapter/src/main.py +++ b/ebsco_adapter/ebsco_adapter/src/main.py @@ -19,7 +19,7 @@ ftp_username = os.environ.get("FTP_USERNAME") ftp_password = os.environ.get("FTP_PASSWORD") ftp_remote_dir = os.environ.get("FTP_REMOTE_DIR") -sns_topic_arn = os.environ.get("OUTPUT_TOPIC_ARN") +output_topic_arn = os.environ.get("OUTPUT_TOPIC_ARN") reindex_topic_arn = os.environ.get("REINDEX_TOPIC_ARN") s3_bucket = os.environ.get("S3_BUCKET", "wellcomecollection-platform-ebsco-adapter") @@ -56,9 +56,9 @@ def run_process(temp_dir, ebsco_ftp, s3_store, sns_publisher, invoked_at): def run_reindex(s3_store, sns_publisher, invoked_at, reindex_type, ids=None): - assert reindex_type in ["full", "partial"], "Invalid reindex type" + assert reindex_type in ["reindex-full", "reindex-partial"], "Invalid reindex type" assert ( - ids is not None or reindex_type == "full" + ids is not None or reindex_type == "reindex-full" ), "You must provide IDs for partial reindexing" print(f"Running reindex with type {reindex_type} and ids {ids} ...") @@ -117,46 +117,53 @@ def _get_iso8601_invoked_at(): return invoked_at +# This script can be run locally, or invoked as an ECS task with the required +# command overrides. +# +# Usage: main.py --process-type --reindex-ids +# process_type: Type of process (reindex-full, reindex-partial, scheduled) +# reindex_ids: Comma-separated list of IDs to reindex (for partial) if __name__ == "__main__": event = None context = SimpleNamespace(invoked_function_arn=None) + reindex_processes = {"reindex-full", "reindex-partial"} + valid_process_types = reindex_processes.union({"scheduled"}) + # Parse command line arguments for running locally parser = argparse.ArgumentParser(description="Perform reindexing operations") parser.add_argument( - "--reindex-type", + "--process-type", type=str, - choices=["full", "partial"], - help="Type of reindexing (full or partial)", + choices=list(valid_process_types), + required=True, + help="Type of process (reindex-full, reindex-partial, scheduled)", ) parser.add_argument( "--reindex-ids", type=str, help="Comma-separated list of IDs to reindex (for partial)", ) - parser.add_argument( - "--scheduled-invoke", - action="store_true", - help="To run a regular process invocation, without reindexing.", - ) invoked_at = _get_iso8601_invoked_at() args = parser.parse_args() - process_type = None reindex_ids = None sns_publisher = None - if args.reindex_type: - process_type = f"reindex-{args.reindex_type}" - sns_publisher = SnsPublisher(reindex_topic_arn) - if args.reindex_ids: + process_type = args.process_type + + # Validate arguments, stop if invalid + if process_type not in valid_process_types: + raise ValueError(f"Invalid process type: {process_type}") + + if process_type == "reindex-partial" and not args.reindex_ids: + raise ValueError("You must provide IDs for partial reindexing") + elif process_type == "reindex-partial": reindex_ids = args.reindex_ids.split(",") reindex_ids = [rid.strip() for rid in reindex_ids] - elif args.scheduled_invoke: - process_type = "scheduled" - sns_publisher = SnsPublisher(sns_topic_arn) + # Run the process with ProcessMetrics( process_type ) as metrics, tempfile.TemporaryDirectory() as temp_dir, EbscoFtp( @@ -164,13 +171,18 @@ def _get_iso8601_invoked_at(): ) as ebsco_ftp: s3_store = S3Store(s3_bucket) - if args.reindex_type: + if process_type in reindex_processes: + sns_publisher = SnsPublisher(reindex_topic_arn) run_reindex( s3_store, sns_publisher, invoked_at, - args.reindex_type, + process_type, reindex_ids, ) - elif args.scheduled_invoke: + else: + assert ( + process_type == "scheduled" + ), "Invalid process type, arg validation failed?!" + sns_publisher = SnsPublisher(output_topic_arn) run_process(temp_dir, ebsco_ftp, s3_store, sns_publisher, invoked_at) diff --git a/ebsco_adapter/ebsco_adapter/src/test_main.py b/ebsco_adapter/ebsco_adapter/src/test_main.py index a1fa844d9a..81620d2140 100644 --- a/ebsco_adapter/ebsco_adapter/src/test_main.py +++ b/ebsco_adapter/ebsco_adapter/src/test_main.py @@ -190,7 +190,7 @@ def test_run_reindex(): assert fake_sns_client.test_get_published_messages() == [] print("\n--- Running test reindex with type full ---") - run_reindex(s3_store, sns_publisher, invoked_at, "full") + run_reindex(s3_store, sns_publisher, invoked_at, "reindex-full") reindex_published_messages = fake_sns_client.test_get_published_messages() assert reindex_published_messages == [ebs9579e, ebs29555e] @@ -199,7 +199,9 @@ def test_run_reindex(): assert fake_sns_client.test_get_published_messages() == [] print("\n--- Running test reindex with type partial ---") - run_reindex(s3_store, sns_publisher, invoked_at, "partial", ["ebs9579e"]) + run_reindex( + s3_store, sns_publisher, invoked_at, "reindex-partial", ["ebs9579e"] + ) reindex_published_messages = fake_sns_client.test_get_published_messages() assert reindex_published_messages == [ebs9579e] diff --git a/ebsco_adapter/terraform/ftp_task.tf b/ebsco_adapter/terraform/ftp_task.tf index 4363db828a..e85a2843fa 100644 --- a/ebsco_adapter/terraform/ftp_task.tf +++ b/ebsco_adapter/terraform/ftp_task.tf @@ -21,66 +21,65 @@ module "ftp_task" { memory = 4096 } -resource "aws_scheduler_schedule" "ftp_task_schedule" { - name = "ebsco-adapter-ftp-schedule" - group_name = "default" - - flexible_time_window { - mode = "OFF" - } - - schedule_expression = "rate(1 days)" - - # Disable the schedule for now - state = "ENABLED" - - target { - arn = aws_ecs_cluster.cluster.arn - role_arn = aws_iam_role.eventbridge_task_scheduler.arn +resource "aws_cloudwatch_event_rule" "reindex_rule" { + name = "ebsco-adapter-reindex-rule" + description = "Rule to trigger custom reindex event for EBSCO adapter" + event_pattern = jsonencode({ + "source" : ["weco.pipeline.reindex"], + "detail" : { + "ReindexTargets" : ["ebsco"] + } + }) +} - ecs_parameters { - task_definition_arn = local.task_definition_arn_latest - launch_type = "FARGATE" +resource "aws_cloudwatch_event_target" "ftp_task_reindex_target" { + arn = aws_ecs_cluster.cluster.arn + rule = aws_cloudwatch_event_rule.reindex_rule.name + role_arn = aws_iam_role.eventbridge_task_scheduler.arn - network_configuration { - assign_public_ip = false - security_groups = [ - aws_security_group.egress.id, - local.network_config.ec_privatelink_security_group_id - ] - subnets = local.network_config.subnets - } - } + ecs_target { + task_definition_arn = local.task_definition_arn_latest + launch_type = "FARGATE" - input = jsonencode({ - containerOverrides = [ - { - name = "ebsco-adapter-ftp" - command = ["--scheduled-invoke"] - } + network_configuration { + assign_public_ip = false + security_groups = [ + aws_security_group.egress.id, + local.network_config.ec_privatelink_security_group_id ] - }) - - retry_policy { - maximum_retry_attempts = 3 + subnets = local.network_config.subnets } } + + input = jsonencode({ + containerOverrides = [ + { + name = "ebsco-adapter-ftp" + command = ["--process-type", "reindex-full"] + } + ] + }) } -resource "aws_cloudwatch_event_rule" "reindex_rule" { - name = "ebsco-adapter-reindex-rule" - description = "Rule to catch custom reindex event" +resource "aws_cloudwatch_event_rule" "schedule_rule" { + name = "ebsco-adapter-schedule-rule" + description = "Rule to schedule and manually trigger EBSCO adapter" + + # Invoke the rule every day + schedule_expression = "rate(1 day)" + + # Also allow manual invocation event_pattern = jsonencode({ - "source" : ["weco.pipeline.reindex"], + "source" : ["weco.pipeline.adapter"], "detail" : { - "ReindexTargets" : ["ebsco"] + "InvokeTargets" : ["ebsco"] } }) } -resource "aws_cloudwatch_event_target" "ftp_task_reindex_target" { +resource "aws_cloudwatch_event_target" "ftp_task_schedule_target" { arn = aws_ecs_cluster.cluster.arn - rule = aws_cloudwatch_event_rule.reindex_rule.name + rule = aws_cloudwatch_event_rule.schedule_rule.name role_arn = aws_iam_role.eventbridge_task_scheduler.arn ecs_target { @@ -101,7 +100,7 @@ resource "aws_cloudwatch_event_target" "ftp_task_reindex_target" { containerOverrides = [ { name = "ebsco-adapter-ftp" - command = ["--reindex-type", "full"] + command = ["--process-type", "scheduled"] } ] }) diff --git a/ebsco_adapter/terraform/parameters.tf b/ebsco_adapter/terraform/parameters.tf index 23eef2a1a6..a86641c9d5 100644 --- a/ebsco_adapter/terraform/parameters.tf +++ b/ebsco_adapter/terraform/parameters.tf @@ -70,6 +70,13 @@ resource "aws_ssm_parameter" "ebsco_adapter_output_topic_arn" { value = module.ebsco_adapter_output_topic.arn } +resource "aws_ssm_parameter" "ebsco_adapter_reindex_topic_arn" { + name = "/catalogue_pipeline/ebsco_adapter/reindex_topic_arn" + description = "The ARN of the SNS topic to publish reindex messages to" + type = "String" + value = local.reindexer_topic_arn +} + resource "aws_ssm_parameter" "ebsco_adapter_bucket_name" { name = "/catalogue_pipeline/ebsco_adapter/bucket_name" description = "The name of the S3 bucket to write files to"