Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

End to end scheduling #32

Merged
merged 19 commits into from
Oct 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -30,4 +30,4 @@ jobs:
flake8 . --count --exit-zero --max-complexity=10 --max-line-length=127 --statistics
- name: Test with pytest
run: |
pytest --doctest-modules
PYTHONPATH=$PWD/client:$PYTHONPATH pytest --doctest-modules
25 changes: 21 additions & 4 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ shoots/clean:
rm shoots/*transferred
rm shoots/*slice*
rm shoots/*failed*
rm shoots/*transfer_status

# Slice a given input file into manageable chunks, so that you can run them through the
# transfer process separately without overwhelming the target system.
Expand Down Expand Up @@ -58,8 +59,24 @@ shoots/clean:
# compile a list of shoots that failed since a given time, thus:
# make shoots/2024-08-06T15:33:00Z.failed
shoots/%.failed: src/compile_failure_list.py
python src/compile_failure_list.py $* > $@
python client/compile_failure_list.py $* > $@

# Once the whole thing is done, check that everything has actually gone through
# This produces a CSV recording
# True (successfully transferred) or False (not successfully transferred)
# against each shoot
%.transfer_status: %
cat $< | python client/compile_pending_list.py $* > $@

# Compile lists for retrying:

# Some things may have failed in the target system
# These are s3 keys that can be passed through the 'touched' target
%.transfer_status.touchable: %.transfer_status
grep False $< | sed 's/,.*//' | python client/touchable.py production > $@

# Others may have failed to transfer (or have been deleted from the target bucket due to expiry)
# These are shoot identifiers that need to go back through the whole system again
%.transfer_status.needs_transfer: %.transfer_status
grep False $< | sed 's/,.*//' | python client/untouchable.py production | sed 's/.*2754_//' | sed 's/\.zip//' > $@

# Once the whole thing is done
%.todo: %
cat $< | python src/compile_pending_list.py $* > $@
57 changes: 57 additions & 0 deletions client/compile_failure_list.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
"""
Produce a list of records that have failed since a given date/time.
Usage:

> python compile_failure_list.py 2024-09-05T40:00:00

This will print out the S3 keys of all the zips that have been
transferred to Archivematica, but failed to fully process, since 1400 on the 5th of September 2024.


"""
import boto3
import datetime
from reporting_client import get_es_client


def get_failures_since(session, since_time):
es = get_es_client(session)
response = es.search(
index="storage_ingests",
size=100,
query=get_query(since_time),
source=False,
fields=["bag.info.externalIdentifier", "lastModifiedDate"]
)
print("\n".join(get_zip_paths(response["hits"]["hits"])))


def get_zip_paths(hits):
return (f'born-digital-accessions/{hit["fields"]["bag.info.externalIdentifier"][0]}.zip' for hit in hits)


def get_query(since_time):
return {
"bool": {
"filter": [
{"term": {
"status.id": "failed"
}},
{"range": {
"lastModifiedDate": {
"gte": since_time
}
}}
]
}
}


def main():
import sys
get_failures_since(boto3.Session(), datetime.datetime.fromisoformat(sys.argv[1]))



if __name__ == "__main__":
main()
108 changes: 108 additions & 0 deletions client/compile_pending_list.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
"""
Compile a list of the ingested status of the requested shoots.

Given a list of shoots that you want to have been ingested,
this will check whether they have all been successfully ingested (True)
or not (False).

A shoot may have not been ingested due to a failure, or because it
is yet to be transferred (either in progress or just not even started)

This contrasts with compile_failure_list.py, which produces a list of recent failures.

Usage:
Provide a newline separated list of shoot identifiers on STDIN,
e.g. given a file myfile.txt:
```
CP1G00D1
CP1BAAD1
CP000001
CP999999
```
where
* CP1G00D1 and CP000001 have both been ingested,
* CP1BAAD1 is somehow broken
* CP999999 is yet to be ingested

$ cat myfile.txt | python compile_pending_list.py

Output:
```
2754_CP1G00D1, True
2754_CP1BAAD1, False
2754_CP000001, True
2754_CP999999, False
```
"""

import boto3
from reporting_client import get_es_client


def get_successful_list(session, expected):
es = get_es_client(session)
response = es.search(
index="storage_ingests",
size=1000,
query=find_shoots_query(expected),
source=False,
fields=["bag.info.externalIdentifier", "lastModifiedDate"]
)
succeeded = get_identifiers(response["hits"]["hits"])
for shoot in expected:
if shoot in succeeded:
print(f'{shoot}, True')
else:
print(f'{shoot}, {is_cracked_shoot_successful(es, shoot)}')


def is_cracked_shoot_successful(es, shoot):
response = es.search(
index="storage_ingests",
size=1000,
query=find_subshoots_query(shoot),
source=False,
fields=["bag.info.externalIdentifier", "lastModifiedDate", "status.id"]
)

return bool(response['hits']['hits']) and all((hit['fields']['status.id'] == "succeeded" for hit in response['hits']['hits']))


def get_identifiers(hits):
return [hit["fields"]["bag.info.externalIdentifier"][0] for hit in hits]


def find_shoots_query(shoots):
return {
"bool": {
"filter": [
{"term": {
"status.id": "succeeded"
}},
{"terms": {
"bag.info.externalIdentifier": shoots
}}
]
}
}


def find_subshoots_query(shoot):
return {
"bool": {
"filter": [
{"prefix": {
"bag.info.externalIdentifier": shoot
}}
]
}
}


def main():
import sys
get_successful_list(boto3.Session(), [f"2754_{shoot.strip()}" for shoot in sys.stdin.readlines()])


if __name__ == "__main__":
main()
22 changes: 22 additions & 0 deletions client/objects_on_target.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@

import botocore

BUCKETS = {
"staging": "wellcomecollection-archivematica-staging-transfer-source",
"production": "wellcomecollection-archivematica-transfer-source"
}


def find_objects(session, bucket, object_keys, yield_on_found):
for object_key in object_keys:
full_key = f"born-digital-accessions/{object_key.strip()}.zip"
try:
session.client('s3').head_object(Bucket=bucket, Key=full_key)
if yield_on_found:
yield full_key
except botocore.exceptions.ClientError as e:
if e.response['Error']['Code'] == '404':
if not yield_on_found:
yield full_key
else:
raise
20 changes: 20 additions & 0 deletions client/reporting_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
from elasticsearch import Elasticsearch

def get_es_client(session):
"""
Returns an Elasticsearch client for the reporting cluster.
"""
username = get_secret_string(
session, secret_id="reporting/read_only/es_username"
)
password = get_secret_string(
session, secret_id=f"reporting/read_only/es_password"
)
host = get_secret_string(
session, secret_id=f"reporting/es_host"
)
return Elasticsearch(f"https://{host}", basic_auth=(username, password))


def get_secret_string(session, *, secret_id):
return session.client("secretsmanager").get_secret_value(SecretId=secret_id)["SecretString"]
14 changes: 14 additions & 0 deletions client/start_restores.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
import sys
import boto3


def post_messages(session, shoot_numbers):
sns = session.resource("sns")
topic = sns.Topic(f"arn:aws:sns:eu-west-1:760097843905:restore_shoots-production")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No point in making the env configurable here, if only for testing?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could do, but there is no staging transfer throttle, which is the only way in which this matters.

If we want to test things going to staging we can do that in steps using restore.py and start_transfers.py locally.

for shoot_number in shoot_numbers:
print(f"requesting restore of {shoot_number}")
topic.publish(Message=shoot_number.strip())


if __name__ == "__main__":
post_messages(boto3.Session(), sys.stdin.readlines())
14 changes: 14 additions & 0 deletions client/start_touches.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
import sys
import boto3


def post_messages(session, environment, shoot_numbers):
sns = session.resource("sns")
topic = sns.Topic(f"arn:aws:sns:eu-west-1:404315009621:touch_shoots-{environment}")
for shoot_number in shoot_numbers:
print(f"requesting touch of {shoot_number}")
topic.publish(Message=shoot_number.strip())


if __name__ == "__main__":
post_messages(boto3.Session(), sys.argv[1], sys.stdin.readlines())
File renamed without changes.
9 changes: 9 additions & 0 deletions client/touchable.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
import boto3
import sys

from objects_on_target import find_objects, BUCKETS

if __name__ == '__main__':
print("\n".join(
find_objects(boto3.Session(), BUCKETS[sys.argv[1]], sys.stdin.readlines(), True)
))
9 changes: 9 additions & 0 deletions client/untouchable.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
import boto3
import sys

from objects_on_target import find_objects, BUCKETS

if __name__ == '__main__':
print("\n".join(
find_objects(boto3.Session(), BUCKETS[sys.argv[1]], sys.stdin.readlines(), False)
))
2 changes: 1 addition & 1 deletion src/check_status.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ def check_shoot_restore_status(bucket, shoot_number):
def check_folder_restore_status(bucket, s3_folder: str):
logger.info(s3_folder)
for obj in bucket.objects.filter(Prefix=s3_folder, OptionalObjectAttributes=[
'RestoreStatus',
'RestoreStatus'
]):
if should_download_file(obj.key):
status = obj.restore_status
Expand Down
59 changes: 34 additions & 25 deletions src/restore.py
Original file line number Diff line number Diff line change
@@ -1,36 +1,45 @@
import logging
from functools import partial
import boto3
from botocore.exceptions import ClientError
from concurrent.futures import ThreadPoolExecutor

from transferrer.common import should_download_file, shoot_number_to_folder_path, get_source_bucket

logger = logging.getLogger(__name__)


def restore_s3_folder(bucket, s3_folder: str, days_to_keep=1):
logger.info(f"restoring folder: {s3_folder}")
for obj in bucket.objects.filter(Prefix=s3_folder):
if should_download_file(obj.key):
try:
logger.info(f"restoring object: {obj.key}")
obj.restore_object(
RestoreRequest={
'Days': days_to_keep,
'GlacierJobParameters': {
'Tier': 'Bulk'
}
}
)
except ClientError as e:
if "The operation is not valid for the object's storage class" in str(e):
logger.info(f"attempt to restore non-glacier object: {obj.key}")
elif "RestoreAlreadyInProgress" in str(e):
logger.info(f"redundant attempt to restore object: {obj.key}")
else:
raise
# This variable governs the degree of parallelism to use when restoring folders.
# The correct number is to be discovered by experimentation
THREADS = 10


def restore_s3_folder(bucket, s3_folder: str, days_to_keep=3):
with ThreadPoolExecutor(max_workers=THREADS) as executor:
for key in executor.map(
partial(restore_file, days_to_keep=days_to_keep),
(obj for obj in bucket.objects.filter(Prefix=s3_folder) if should_download_file(obj.key))
):
logger.info(f"restore request sent: \t{key}")



def restore_file(obj, *, days_to_keep):
try:
obj.restore_object(
RestoreRequest={
'Days': days_to_keep,
'GlacierJobParameters': {
'Tier': 'Bulk'
}
}
)
return obj.key
except ClientError as e:
if "The operation is not valid for the object's storage class" in str(e):
logger.info(f"attempt to restore non-glacier object: {obj.key}")
elif "RestoreAlreadyInProgress" in str(e):
logger.info(f"redundant attempt to restore object: {obj.key}")
else:
logger.info(f"ignoring {obj.key}")

raise

def restore_shoot_folder(bucket, shoot_number):
restore_s3_folder(bucket, shoot_number_to_folder_path(shoot_number))
Expand Down
Loading