Skip to content

Commit

Permalink
End to end scheduling (#32)
Browse files Browse the repository at this point in the history
* add failure list compilation

* add es to requirements

* polish up pending/failure lists

* polish up pending list check

* move failure/pending lists

* improve structure

* end to end scheduling

* tidy

* tidy

* improve commentary

* remove broken diagram

* fix diagram

* tidy

* print -> log

* help GH find the client-only code

* help GH find the client-only code

* fix stuff found in review

* longer timeout
  • Loading branch information
paul-butcher authored Oct 17, 2024
1 parent 3350d40 commit 8902b09
Show file tree
Hide file tree
Showing 37 changed files with 870 additions and 43 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -30,4 +30,4 @@ jobs:
flake8 . --count --exit-zero --max-complexity=10 --max-line-length=127 --statistics
- name: Test with pytest
run: |
pytest --doctest-modules
PYTHONPATH=$PWD/client:$PYTHONPATH pytest --doctest-modules
25 changes: 21 additions & 4 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ shoots/clean:
rm shoots/*transferred
rm shoots/*slice*
rm shoots/*failed*
rm shoots/*transfer_status

# Slice a given input file into manageable chunks, so that you can run them through the
# transfer process separately without overwhelming the target system.
Expand Down Expand Up @@ -58,8 +59,24 @@ shoots/clean:
# compile a list of shoots that failed since a given time, thus:
# make shoots/2024-08-06T15:33:00Z.failed
shoots/%.failed: src/compile_failure_list.py
python src/compile_failure_list.py $* > $@
python client/compile_failure_list.py $* > $@

# Once the whole thing is done, check that everything has actually gone through
# This produces a CSV recording
# True (successfully transferred) or False (not successfully transferred)
# against each shoot
%.transfer_status: %
cat $< | python client/compile_pending_list.py $* > $@

# Compile lists for retrying:

# Some things may have failed in the target system
# These are s3 keys that can be passed through the 'touched' target
%.transfer_status.touchable: %.transfer_status
grep False $< | sed 's/,.*//' | python client/touchable.py production > $@

# Others may have failed to transfer (or have been deleted from the target bucket due to expiry)
# These are shoot identifiers that need to go back through the whole system again
%.transfer_status.needs_transfer: %.transfer_status
grep False $< | sed 's/,.*//' | python client/untouchable.py production | sed 's/.*2754_//' | sed 's/\.zip//' > $@

# Once the whole thing is done
%.todo: %
cat $< | python src/compile_pending_list.py $* > $@
57 changes: 57 additions & 0 deletions client/compile_failure_list.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
"""
Produce a list of records that have failed since a given date/time.
Usage:
> python compile_failure_list.py 2024-09-05T40:00:00
This will print out the S3 keys of all the zips that have been
transferred to Archivematica, but failed to fully process, since 1400 on the 5th of September 2024.
"""
import boto3
import datetime
from reporting_client import get_es_client


def get_failures_since(session, since_time):
es = get_es_client(session)
response = es.search(
index="storage_ingests",
size=100,
query=get_query(since_time),
source=False,
fields=["bag.info.externalIdentifier", "lastModifiedDate"]
)
print("\n".join(get_zip_paths(response["hits"]["hits"])))


def get_zip_paths(hits):
return (f'born-digital-accessions/{hit["fields"]["bag.info.externalIdentifier"][0]}.zip' for hit in hits)


def get_query(since_time):
return {
"bool": {
"filter": [
{"term": {
"status.id": "failed"
}},
{"range": {
"lastModifiedDate": {
"gte": since_time
}
}}
]
}
}


def main():
import sys
get_failures_since(boto3.Session(), datetime.datetime.fromisoformat(sys.argv[1]))



if __name__ == "__main__":
main()
108 changes: 108 additions & 0 deletions client/compile_pending_list.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
"""
Compile a list of the ingested status of the requested shoots.
Given a list of shoots that you want to have been ingested,
this will check whether they have all been successfully ingested (True)
or not (False).
A shoot may have not been ingested due to a failure, or because it
is yet to be transferred (either in progress or just not even started)
This contrasts with compile_failure_list.py, which produces a list of recent failures.
Usage:
Provide a newline separated list of shoot identifiers on STDIN,
e.g. given a file myfile.txt:
```
CP1G00D1
CP1BAAD1
CP000001
CP999999
```
where
* CP1G00D1 and CP000001 have both been ingested,
* CP1BAAD1 is somehow broken
* CP999999 is yet to be ingested
$ cat myfile.txt | python compile_pending_list.py
Output:
```
2754_CP1G00D1, True
2754_CP1BAAD1, False
2754_CP000001, True
2754_CP999999, False
```
"""

import boto3
from reporting_client import get_es_client


def get_successful_list(session, expected):
es = get_es_client(session)
response = es.search(
index="storage_ingests",
size=1000,
query=find_shoots_query(expected),
source=False,
fields=["bag.info.externalIdentifier", "lastModifiedDate"]
)
succeeded = get_identifiers(response["hits"]["hits"])
for shoot in expected:
if shoot in succeeded:
print(f'{shoot}, True')
else:
print(f'{shoot}, {is_cracked_shoot_successful(es, shoot)}')


def is_cracked_shoot_successful(es, shoot):
response = es.search(
index="storage_ingests",
size=1000,
query=find_subshoots_query(shoot),
source=False,
fields=["bag.info.externalIdentifier", "lastModifiedDate", "status.id"]
)

return bool(response['hits']['hits']) and all((hit['fields']['status.id'] == "succeeded" for hit in response['hits']['hits']))


def get_identifiers(hits):
return [hit["fields"]["bag.info.externalIdentifier"][0] for hit in hits]


def find_shoots_query(shoots):
return {
"bool": {
"filter": [
{"term": {
"status.id": "succeeded"
}},
{"terms": {
"bag.info.externalIdentifier": shoots
}}
]
}
}


def find_subshoots_query(shoot):
return {
"bool": {
"filter": [
{"prefix": {
"bag.info.externalIdentifier": shoot
}}
]
}
}


def main():
import sys
get_successful_list(boto3.Session(), [f"2754_{shoot.strip()}" for shoot in sys.stdin.readlines()])


if __name__ == "__main__":
main()
22 changes: 22 additions & 0 deletions client/objects_on_target.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@

import botocore

BUCKETS = {
"staging": "wellcomecollection-archivematica-staging-transfer-source",
"production": "wellcomecollection-archivematica-transfer-source"
}


def find_objects(session, bucket, object_keys, yield_on_found):
for object_key in object_keys:
full_key = f"born-digital-accessions/{object_key.strip()}.zip"
try:
session.client('s3').head_object(Bucket=bucket, Key=full_key)
if yield_on_found:
yield full_key
except botocore.exceptions.ClientError as e:
if e.response['Error']['Code'] == '404':
if not yield_on_found:
yield full_key
else:
raise
20 changes: 20 additions & 0 deletions client/reporting_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
from elasticsearch import Elasticsearch

def get_es_client(session):
"""
Returns an Elasticsearch client for the reporting cluster.
"""
username = get_secret_string(
session, secret_id="reporting/read_only/es_username"
)
password = get_secret_string(
session, secret_id=f"reporting/read_only/es_password"
)
host = get_secret_string(
session, secret_id=f"reporting/es_host"
)
return Elasticsearch(f"https://{host}", basic_auth=(username, password))


def get_secret_string(session, *, secret_id):
return session.client("secretsmanager").get_secret_value(SecretId=secret_id)["SecretString"]
14 changes: 14 additions & 0 deletions client/start_restores.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
import sys
import boto3


def post_messages(session, shoot_numbers):
sns = session.resource("sns")
topic = sns.Topic(f"arn:aws:sns:eu-west-1:760097843905:restore_shoots-production")
for shoot_number in shoot_numbers:
print(f"requesting restore of {shoot_number}")
topic.publish(Message=shoot_number.strip())


if __name__ == "__main__":
post_messages(boto3.Session(), sys.stdin.readlines())
14 changes: 14 additions & 0 deletions client/start_touches.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
import sys
import boto3


def post_messages(session, environment, shoot_numbers):
sns = session.resource("sns")
topic = sns.Topic(f"arn:aws:sns:eu-west-1:404315009621:touch_shoots-{environment}")
for shoot_number in shoot_numbers:
print(f"requesting touch of {shoot_number}")
topic.publish(Message=shoot_number.strip())


if __name__ == "__main__":
post_messages(boto3.Session(), sys.argv[1], sys.stdin.readlines())
File renamed without changes.
9 changes: 9 additions & 0 deletions client/touchable.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
import boto3
import sys

from objects_on_target import find_objects, BUCKETS

if __name__ == '__main__':
print("\n".join(
find_objects(boto3.Session(), BUCKETS[sys.argv[1]], sys.stdin.readlines(), True)
))
9 changes: 9 additions & 0 deletions client/untouchable.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
import boto3
import sys

from objects_on_target import find_objects, BUCKETS

if __name__ == '__main__':
print("\n".join(
find_objects(boto3.Session(), BUCKETS[sys.argv[1]], sys.stdin.readlines(), False)
))
2 changes: 1 addition & 1 deletion src/check_status.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ def check_shoot_restore_status(bucket, shoot_number):
def check_folder_restore_status(bucket, s3_folder: str):
logger.info(s3_folder)
for obj in bucket.objects.filter(Prefix=s3_folder, OptionalObjectAttributes=[
'RestoreStatus',
'RestoreStatus'
]):
if should_download_file(obj.key):
status = obj.restore_status
Expand Down
59 changes: 34 additions & 25 deletions src/restore.py
Original file line number Diff line number Diff line change
@@ -1,36 +1,45 @@
import logging
from functools import partial
import boto3
from botocore.exceptions import ClientError
from concurrent.futures import ThreadPoolExecutor

from transferrer.common import should_download_file, shoot_number_to_folder_path, get_source_bucket

logger = logging.getLogger(__name__)


def restore_s3_folder(bucket, s3_folder: str, days_to_keep=1):
logger.info(f"restoring folder: {s3_folder}")
for obj in bucket.objects.filter(Prefix=s3_folder):
if should_download_file(obj.key):
try:
logger.info(f"restoring object: {obj.key}")
obj.restore_object(
RestoreRequest={
'Days': days_to_keep,
'GlacierJobParameters': {
'Tier': 'Bulk'
}
}
)
except ClientError as e:
if "The operation is not valid for the object's storage class" in str(e):
logger.info(f"attempt to restore non-glacier object: {obj.key}")
elif "RestoreAlreadyInProgress" in str(e):
logger.info(f"redundant attempt to restore object: {obj.key}")
else:
raise
# This variable governs the degree of parallelism to use when restoring folders.
# The correct number is to be discovered by experimentation
THREADS = 10


def restore_s3_folder(bucket, s3_folder: str, days_to_keep=3):
with ThreadPoolExecutor(max_workers=THREADS) as executor:
for key in executor.map(
partial(restore_file, days_to_keep=days_to_keep),
(obj for obj in bucket.objects.filter(Prefix=s3_folder) if should_download_file(obj.key))
):
logger.info(f"restore request sent: \t{key}")



def restore_file(obj, *, days_to_keep):
try:
obj.restore_object(
RestoreRequest={
'Days': days_to_keep,
'GlacierJobParameters': {
'Tier': 'Bulk'
}
}
)
return obj.key
except ClientError as e:
if "The operation is not valid for the object's storage class" in str(e):
logger.info(f"attempt to restore non-glacier object: {obj.key}")
elif "RestoreAlreadyInProgress" in str(e):
logger.info(f"redundant attempt to restore object: {obj.key}")
else:
logger.info(f"ignoring {obj.key}")

raise

def restore_shoot_folder(bucket, shoot_number):
restore_s3_folder(bucket, shoot_number_to_folder_path(shoot_number))
Expand Down
Loading

0 comments on commit 8902b09

Please sign in to comment.