Skip to content

Commit

Permalink
Use a threadpool to fetch the files
Browse files Browse the repository at this point in the history
  • Loading branch information
paul-butcher committed Jun 12, 2024
1 parent c73786e commit 62b38b4
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 10 deletions.
14 changes: 9 additions & 5 deletions src/transferrer/common.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,20 @@
import os
import boto3
import botocore

SOURCE_BUCKET = "wellcomecollection-editorial-photography"


def get_source_bucket():
return get_source_client().Bucket(SOURCE_BUCKET)
def get_source_bucket(max_connections=10):
return get_source_client(max_connections).Bucket(SOURCE_BUCKET)


def get_source_client():
def get_source_client(max_connections):
session = boto3.Session()
return session.resource('s3')
return session.resource('s3', config=botocore.config.Config(
region_name="eu-west-1",
max_pool_connections=max_connections
))


def shoot_number_to_folder_path(shoot_number):
Expand Down Expand Up @@ -71,4 +75,4 @@ def discard_file(file):
False
"""
return os.path.basename(file) == "shoot.csv" or file[-4:] == ".xml"
return file[-4:] in (".csv", ".xml")
28 changes: 23 additions & 5 deletions src/transferrer/download.py
Original file line number Diff line number Diff line change
@@ -1,25 +1,43 @@
import os
import logging
from concurrent.futures import ThreadPoolExecutor
from functools import partial


from transferrer.common import should_download_file, shoot_number_to_folder_path, get_source_bucket

logger = logging.getLogger(__name__)
THREADS = 20


def download_s3_folder(bucket, s3_folder: str, local_dir: str):
os.makedirs(local_dir, exist_ok=True)
with ThreadPoolExecutor(max_workers=THREADS) as executor:
for key in executor.map(
partial(download_s3_file, local_dir=local_dir, s3_folder=s3_folder),
(obj for obj in bucket.objects.filter(Prefix=s3_folder) if should_download_file(obj.key))
):
logger.info(f"downloaded\t{key}")

downloadables = (obj for obj in bucket.objects.filter(Prefix=s3_folder) if should_download_file(obj.key))

for obj in downloadables:
target = os.path.join(local_dir, os.path.relpath(obj.key, s3_folder))
bucket.download_file(obj.key, target)
def download_s3_file(object_summary, *, local_dir: str, s3_folder: str):
logger.info(f"downloading\t{object_summary.key}")
target = os.path.join(local_dir, os.path.relpath(object_summary.key, s3_folder))
object_summary.Object().download_file(target)
return object_summary.key


def download_shoot_folder(bucket, shoot_number, local_dir):
download_s3_folder(bucket, shoot_number_to_folder_path(shoot_number), local_dir)


def download_shoot(shoot_number, local_dir):
download_shoot_folder(get_source_bucket(), shoot_number, local_dir)


if __name__ == "__main__":
import sys
shoot_number = sys.argv[1]
download_folder = os.path.join("download", shoot_number)
os.makedirs(download_folder, exist_ok=True)
download_shoot_folder(get_source_bucket(), sys.argv[1], download_folder)
download_shoot_folder(get_source_bucket(THREADS), sys.argv[1], download_folder)

0 comments on commit 62b38b4

Please sign in to comment.