Skip to content

Commit

Permalink
Update EBSCO reindexer args for simplicity (#2694)
Browse files Browse the repository at this point in the history
* Update EBSCO reindexer args for simplicity

* Apply auto-formatting rules

* fix some bugs in main.py, update infra

* fix tests

* Apply auto-formatting rules

---------

Co-authored-by: Buildkite on behalf of Wellcome Collection <wellcomedigitalplatform@wellcome.ac.uk>
  • Loading branch information
kenoir and weco-bot authored Aug 30, 2024
1 parent 46aea6d commit a2e3bea
Show file tree
Hide file tree
Showing 6 changed files with 93 additions and 71 deletions.
1 change: 1 addition & 0 deletions ebsco_adapter/ebsco_adapter/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ services:
environment:
- AWS_PROFILE=platform-developer
- OUTPUT_TOPIC_ARN=${OUTPUT_TOPIC_ARN}
- REINDEX_TOPIC_ARN=${REINDEX_TOPIC_ARN}
- CUSTOMER_ID=${CUSTOMER_ID}
- S3_BUCKET=${S3_BUCKET}
- S3_PREFIX=${S3_PREFIX}
Expand Down
3 changes: 2 additions & 1 deletion ebsco_adapter/ebsco_adapter/run_local.sh
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,12 @@ CUSTOMER_ID=$(aws ssm get-parameter --name /catalogue_pipeline/ebsco_adapter/cus
FTP_REMOTE_DIR=$(aws ssm get-parameter --name /catalogue_pipeline/ebsco_adapter/ftp_remote_dir --query "Parameter.Value" --output text)
S3_BUCKET=$(aws ssm get-parameter --name /catalogue_pipeline/ebsco_adapter/bucket_name --query "Parameter.Value" --output text)
OUTPUT_TOPIC_ARN=$(aws ssm get-parameter --name /catalogue_pipeline/ebsco_adapter/output_topic_arn --query "Parameter.Value" --output text)
REINDEX_TOPIC_ARN=$(aws ssm get-parameter --name /catalogue_pipeline/ebsco_adapter/reindex_topic_arn --query "Parameter.Value" --output text)

# Update the S3_PREFIX to be the environment (use dev for local testing)
S3_PREFIX=prod

export FTP_PASSWORD FTP_SERVER FTP_USERNAME CUSTOMER_ID FTP_REMOTE_DIR S3_BUCKET S3_PREFIX OUTPUT_TOPIC_ARN
export FTP_PASSWORD FTP_SERVER FTP_USERNAME CUSTOMER_ID FTP_REMOTE_DIR S3_BUCKET S3_PREFIX OUTPUT_TOPIC_ARN REINDEX_TOPIC_ARN

# Ensure the docker image is up to date
docker-compose --log-level ERROR build dev
Expand Down
56 changes: 34 additions & 22 deletions ebsco_adapter/ebsco_adapter/src/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
ftp_username = os.environ.get("FTP_USERNAME")
ftp_password = os.environ.get("FTP_PASSWORD")
ftp_remote_dir = os.environ.get("FTP_REMOTE_DIR")
sns_topic_arn = os.environ.get("OUTPUT_TOPIC_ARN")
output_topic_arn = os.environ.get("OUTPUT_TOPIC_ARN")
reindex_topic_arn = os.environ.get("REINDEX_TOPIC_ARN")

s3_bucket = os.environ.get("S3_BUCKET", "wellcomecollection-platform-ebsco-adapter")
Expand Down Expand Up @@ -56,9 +56,9 @@ def run_process(temp_dir, ebsco_ftp, s3_store, sns_publisher, invoked_at):


def run_reindex(s3_store, sns_publisher, invoked_at, reindex_type, ids=None):
assert reindex_type in ["full", "partial"], "Invalid reindex type"
assert reindex_type in ["reindex-full", "reindex-partial"], "Invalid reindex type"
assert (
ids is not None or reindex_type == "full"
ids is not None or reindex_type == "reindex-full"
), "You must provide IDs for partial reindexing"

print(f"Running reindex with type {reindex_type} and ids {ids} ...")
Expand Down Expand Up @@ -117,60 +117,72 @@ def _get_iso8601_invoked_at():
return invoked_at


# This script can be run locally, or invoked as an ECS task with the required
# command overrides.
#
# Usage: main.py --process-type <process_type> --reindex-ids <reindex_ids>
# process_type: Type of process (reindex-full, reindex-partial, scheduled)
# reindex_ids: Comma-separated list of IDs to reindex (for partial)
if __name__ == "__main__":
event = None
context = SimpleNamespace(invoked_function_arn=None)

reindex_processes = {"reindex-full", "reindex-partial"}
valid_process_types = reindex_processes.union({"scheduled"})

# Parse command line arguments for running locally
parser = argparse.ArgumentParser(description="Perform reindexing operations")
parser.add_argument(
"--reindex-type",
"--process-type",
type=str,
choices=["full", "partial"],
help="Type of reindexing (full or partial)",
choices=list(valid_process_types),
required=True,
help="Type of process (reindex-full, reindex-partial, scheduled)",
)
parser.add_argument(
"--reindex-ids",
type=str,
help="Comma-separated list of IDs to reindex (for partial)",
)
parser.add_argument(
"--scheduled-invoke",
action="store_true",
help="To run a regular process invocation, without reindexing.",
)

invoked_at = _get_iso8601_invoked_at()
args = parser.parse_args()

process_type = None
reindex_ids = None
sns_publisher = None

if args.reindex_type:
process_type = f"reindex-{args.reindex_type}"
sns_publisher = SnsPublisher(reindex_topic_arn)
if args.reindex_ids:
process_type = args.process_type

# Validate arguments, stop if invalid
if process_type not in valid_process_types:
raise ValueError(f"Invalid process type: {process_type}")

if process_type == "reindex-partial" and not args.reindex_ids:
raise ValueError("You must provide IDs for partial reindexing")
elif process_type == "reindex-partial":
reindex_ids = args.reindex_ids.split(",")
reindex_ids = [rid.strip() for rid in reindex_ids]
elif args.scheduled_invoke:
process_type = "scheduled"
sns_publisher = SnsPublisher(sns_topic_arn)

# Run the process
with ProcessMetrics(
process_type
) as metrics, tempfile.TemporaryDirectory() as temp_dir, EbscoFtp(
ftp_server, ftp_username, ftp_password, ftp_remote_dir
) as ebsco_ftp:
s3_store = S3Store(s3_bucket)

if args.reindex_type:
if process_type in reindex_processes:
sns_publisher = SnsPublisher(reindex_topic_arn)
run_reindex(
s3_store,
sns_publisher,
invoked_at,
args.reindex_type,
process_type,
reindex_ids,
)
elif args.scheduled_invoke:
else:
assert (
process_type == "scheduled"
), "Invalid process type, arg validation failed?!"
sns_publisher = SnsPublisher(output_topic_arn)
run_process(temp_dir, ebsco_ftp, s3_store, sns_publisher, invoked_at)
6 changes: 4 additions & 2 deletions ebsco_adapter/ebsco_adapter/src/test_main.py
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ def test_run_reindex():
assert fake_sns_client.test_get_published_messages() == []

print("\n--- Running test reindex with type full ---")
run_reindex(s3_store, sns_publisher, invoked_at, "full")
run_reindex(s3_store, sns_publisher, invoked_at, "reindex-full")

reindex_published_messages = fake_sns_client.test_get_published_messages()
assert reindex_published_messages == [ebs9579e, ebs29555e]
Expand All @@ -199,7 +199,9 @@ def test_run_reindex():
assert fake_sns_client.test_get_published_messages() == []

print("\n--- Running test reindex with type partial ---")
run_reindex(s3_store, sns_publisher, invoked_at, "partial", ["ebs9579e"])
run_reindex(
s3_store, sns_publisher, invoked_at, "reindex-partial", ["ebs9579e"]
)

reindex_published_messages = fake_sns_client.test_get_published_messages()
assert reindex_published_messages == [ebs9579e]
91 changes: 45 additions & 46 deletions ebsco_adapter/terraform/ftp_task.tf
Original file line number Diff line number Diff line change
Expand Up @@ -21,66 +21,65 @@ module "ftp_task" {
memory = 4096
}

resource "aws_scheduler_schedule" "ftp_task_schedule" {
name = "ebsco-adapter-ftp-schedule"
group_name = "default"

flexible_time_window {
mode = "OFF"
}

schedule_expression = "rate(1 days)"

# Disable the schedule for now
state = "ENABLED"

target {
arn = aws_ecs_cluster.cluster.arn
role_arn = aws_iam_role.eventbridge_task_scheduler.arn
resource "aws_cloudwatch_event_rule" "reindex_rule" {
name = "ebsco-adapter-reindex-rule"
description = "Rule to trigger custom reindex event for EBSCO adapter"
event_pattern = jsonencode({
"source" : ["weco.pipeline.reindex"],
"detail" : {
"ReindexTargets" : ["ebsco"]
}
})
}

ecs_parameters {
task_definition_arn = local.task_definition_arn_latest
launch_type = "FARGATE"
resource "aws_cloudwatch_event_target" "ftp_task_reindex_target" {
arn = aws_ecs_cluster.cluster.arn
rule = aws_cloudwatch_event_rule.reindex_rule.name
role_arn = aws_iam_role.eventbridge_task_scheduler.arn

network_configuration {
assign_public_ip = false
security_groups = [
aws_security_group.egress.id,
local.network_config.ec_privatelink_security_group_id
]
subnets = local.network_config.subnets
}
}
ecs_target {
task_definition_arn = local.task_definition_arn_latest
launch_type = "FARGATE"

input = jsonencode({
containerOverrides = [
{
name = "ebsco-adapter-ftp"
command = ["--scheduled-invoke"]
}
network_configuration {
assign_public_ip = false
security_groups = [
aws_security_group.egress.id,
local.network_config.ec_privatelink_security_group_id
]
})

retry_policy {
maximum_retry_attempts = 3
subnets = local.network_config.subnets
}
}

input = jsonencode({
containerOverrides = [
{
name = "ebsco-adapter-ftp"
command = ["--process-type", "reindex-full"]
}
]
})
}

resource "aws_cloudwatch_event_rule" "reindex_rule" {
name = "ebsco-adapter-reindex-rule"
description = "Rule to catch custom reindex event"
resource "aws_cloudwatch_event_rule" "schedule_rule" {
name = "ebsco-adapter-schedule-rule"
description = "Rule to schedule and manually trigger EBSCO adapter"

# Invoke the rule every day
schedule_expression = "rate(1 day)"

# Also allow manual invocation
event_pattern = jsonencode({
"source" : ["weco.pipeline.reindex"],
"source" : ["weco.pipeline.adapter"],
"detail" : {
"ReindexTargets" : ["ebsco"]
"InvokeTargets" : ["ebsco"]
}
})
}

resource "aws_cloudwatch_event_target" "ftp_task_reindex_target" {
resource "aws_cloudwatch_event_target" "ftp_task_schedule_target" {
arn = aws_ecs_cluster.cluster.arn
rule = aws_cloudwatch_event_rule.reindex_rule.name
rule = aws_cloudwatch_event_rule.schedule_rule.name
role_arn = aws_iam_role.eventbridge_task_scheduler.arn

ecs_target {
Expand All @@ -101,7 +100,7 @@ resource "aws_cloudwatch_event_target" "ftp_task_reindex_target" {
containerOverrides = [
{
name = "ebsco-adapter-ftp"
command = ["--reindex-type", "full"]
command = ["--process-type", "scheduled"]
}
]
})
Expand Down
7 changes: 7 additions & 0 deletions ebsco_adapter/terraform/parameters.tf
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,13 @@ resource "aws_ssm_parameter" "ebsco_adapter_output_topic_arn" {
value = module.ebsco_adapter_output_topic.arn
}

resource "aws_ssm_parameter" "ebsco_adapter_reindex_topic_arn" {
name = "/catalogue_pipeline/ebsco_adapter/reindex_topic_arn"
description = "The ARN of the SNS topic to publish reindex messages to"
type = "String"
value = local.reindexer_topic_arn
}

resource "aws_ssm_parameter" "ebsco_adapter_bucket_name" {
name = "/catalogue_pipeline/ebsco_adapter/bucket_name"
description = "The name of the S3 bucket to write files to"
Expand Down

0 comments on commit a2e3bea

Please sign in to comment.