Skip to content

Commit

Permalink
Merge pull request #12 from tcezard/download_via_ascp
Browse files Browse the repository at this point in the history
EVA-2860 - Download via aspera
  • Loading branch information
tcezard authored May 23, 2022
2 parents a66a6b9 + 15179f8 commit 7422272
Show file tree
Hide file tree
Showing 11 changed files with 200 additions and 51 deletions.
8 changes: 8 additions & 0 deletions .github/workflows/python-package.yml
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,15 @@ jobs:
run: |
sudo apt update
sudo apt -y install maven samtools bcftools parallel libbz2-dev liblzma-dev
- name: Set up ascp binary
# Might need to periodically update this URL
run: |
curl https://d3gcli72yxqn2z.cloudfront.net/connect_latest/v4/bin/ibm-aspera-connect_4.1.3.93_linux.tar.gz -o ibm-aspera-connect.tar.gz
tar xzvf ibm-aspera-connect.tar.gz
./ibm-aspera-connect_*_linux.sh
chmod +x ~/.aspera/connect/bin/ascp; sudo ln -s ~/.aspera/connect/bin/ascp /usr/bin
- name: Test with pytest
run: |
export ASPERA_ID_DSA=~/.aspera/connect/etc/asperaweb_id_dsa.openssh
export NXF_DEFAULT_DSL=1
PYTHONPATH=. pytest tests
6 changes: 6 additions & 0 deletions .gitlab-ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,16 @@ test:
- pip install pytest certifi
- if [ -f requirements.txt ]; then pip install -r requirements.txt; fi
- python3 setup.py install
- curl https://d3gcli72yxqn2z.cloudfront.net/connect_latest/v4/bin/ibm-aspera-connect_4.1.3.93_linux.tar.gz -o ibm-aspera-connect.tar.gz
- tar xzf ibm-aspera-connect.tar.gz
- ./ibm-aspera-connect_*_linux.sh
- chmod +x ~/.aspera/connect/bin/ascp; ln -s ~/.aspera/connect/bin/ascp /usr/bin
# reduce the default SSL security level to get around a misconfiguration in the Ensembl server
# https://github.com/Ensembl/ensembl-rest/issues/427
# See https://askubuntu.com/a/1233456
- sed -i 's/DEFAULT@SECLEVEL=2/DEFAULT@SECLEVEL=1/' /usr/lib/ssl/openssl.cnf
- export ASPERA_ID_DSA=~/.aspera/connect/etc/asperaweb_id_dsa.openssh
- export NXF_DEFAULT_DSL=1
script:
- PYTHONPATH=. pytest tests
environment:
Expand Down
11 changes: 10 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ pip git+install https://github.com/EBIvariation/covid19dp-submission.git@v0.1.2
## Usage

```
ingest_covid19dp_submission.py --project-dir /path/to/project/dir --num-analyses number/of/analysis/to/download --processed-analyses-file /file/containing/list/of/analyses/already/processed --app-config-file /path/to/app_config.yml --nextflow-config-file /path/to/nextflow.config
ingest_covid19dp_submission.py --project-dir /path/to/project/dir/PRJEB45554 --num-analyses 10000 --processed-analyses-file /file/containing/list/of/analyses/already/processed --app-config-file /path/to/app_config.yml --nextflow-config-file /path/to/nextflow.config
```

See [application configuration](covid19dp_submission/etc/example_app_config.yml) and [nextflow configuration](covid19dp_submission/etc/example_nextflow.config) examples.
Expand All @@ -33,3 +33,12 @@ The above command will run the following steps (see [workflow definition](covid1
* [Cluster the variants in the SARS-Cov-2](covid19dp_submission/steps/cluster_assembly.py) assembly in the accessioning warehouse.

For usage in EBI cluster, see [here](https://www.ebi.ac.uk/panda/jira/browse/EVA-2495?focusedCommentId=366472&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-366472) (limited to EBI internal users only).


For resuming a previous run

```
ingest_covid19dp_submission.py --project-dir /path/to/project/dir/PRJEB45554 --num-analyses 10000 --processed-analyses-file /file/containing/list/of/analyses/already/processed --app-config-file /path/to/app_config.yml --nextflow-config-file /path/to/nextflow.config --resume-snapshot <processing_directory_name>
```

where the processing directory is formatted like 2022_05_18_11_00_41 inside the 30_eva_valid folder
19 changes: 10 additions & 9 deletions bin/ingest_covid19dp_submission.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,15 @@
# limitations under the License.
import argparse

from ebi_eva_common_pyutils.logger import logging_config

from covid19dp_submission.ingest_covid19dp_submission import ingest_covid19dp_submission


def main():
parser = argparse.ArgumentParser(description='Ingest a snapshot submission from the Covid-19 data portal project',
formatter_class=argparse.RawTextHelpFormatter, add_help=False)
parser = argparse.ArgumentParser(description='Ingest a snapshot submission from the Covid-19 data portal project')
parser.add_argument("--project", default='PRJEB45554', required=False,
help="project from which analyses needs to be downloaded")
parser.add_argument("--snapshot-name", help="Snapshot name (ex: 2021_06_28_14_28_56)", default=None,
required=False)
parser.add_argument("--project-dir", help="Project directory (ex: /path/to/PRJ)", default=None, required=True)
parser.add_argument("--num-analyses", type=int, default=10000, required=False,
help="Number of analyses to download (max = 10000)")
Expand All @@ -33,13 +32,15 @@ def main():
help="Full path to the application config file (ex: /path/to/config.yml)", required=True)
parser.add_argument("--nextflow-config-file",
help="Full path to the Nextflow config file", default=None, required=False)
parser.add_argument("--resume",
help="Indicate if a previous concatenation job is to be resumed", action='store_true',
required=False)
parser.add_argument("--resume-snapshot", type=str, required=False,
help="Resume a previous job. You need to specify the snapshot name to be resumed "
"(ex: 2021_06_28_14_28_56)")
args = parser.parse_args()
ingest_covid19dp_submission(args.project, args.snapshot_name, args.project_dir, args.num_analyses,
logging_config.add_stdout_handler()

ingest_covid19dp_submission(args.project, args.project_dir, args.num_analyses,
args.processed_analyses_file, args.app_config_file, args.nextflow_config_file,
args.resume)
args.resume_snapshot)


if __name__ == "__main__":
Expand Down
65 changes: 56 additions & 9 deletions covid19dp_submission/download_analyses.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,28 +13,39 @@
# limitations under the License.

import argparse
import copy
import glob
import os
import urllib

import requests
from ebi_eva_common_pyutils.command_utils import run_command_with_output
from ebi_eva_common_pyutils.logger import logging_config
from more_itertools import chunked
from retry import retry

logger = logging_config.get_logger(__name__)


def download_analyses(project, num_analyses, processed_analyses_file, download_target_dir):
class UnfinishedBatchError(Exception):
pass


def download_analyses(project, num_analyses, processed_analyses_file, download_target_dir, ascp, aspera_id_dsa,
batch_size=100):
total_analyses = total_analyses_in_project(project)
logger.info(f"total analyses in project {project}: {total_analyses}")

analyses_array = get_analyses_to_process(project, num_analyses, total_analyses, processed_analyses_file)
logger.info(f"number of analyses to process: {len(analyses_array)}")

os.makedirs(download_target_dir, exist_ok=True)
download_files(analyses_array, download_target_dir, processed_analyses_file)
# Sending a shallow copy of the analyses_array because it will be modified during the download to accommodate
# the retry mechanism
download_files_via_aspera(copy.copy(analyses_array), download_target_dir, processed_analyses_file, ascp, aspera_id_dsa,
batch_size)

vcf_files_downloaded = glob.glob(f"{download_target_dir}/*.vcf")
vcf_files_downloaded = glob.glob(f"{download_target_dir}/*.vcf") + glob.glob(f"{download_target_dir}/*.vcf.gz")
logger.info(f"total number of files downloaded: {len(vcf_files_downloaded)}")

if len(analyses_array) != len(vcf_files_downloaded):
Expand Down Expand Up @@ -83,8 +94,10 @@ def get_analyses_to_process(project, num_analyses, total_analyses, processed_ana

@retry(logger=logger, tries=4, delay=120, backoff=1.2, jitter=(1, 3))
def get_analyses_from_ena(project, offset, limit):
analyses_url = f"https://www.ebi.ac.uk/ena/portal/api/filereport?result=analysis&accession={project}&offset={offset}" \
f"&limit={limit}&format=json&fields=run_ref,analysis_accession,submitted_ftp"
analyses_url = (
f"https://www.ebi.ac.uk/ena/portal/api/filereport?result=analysis&accession={project}&offset={offset}"
f"&limit={limit}&format=json&fields=run_ref,analysis_accession,submitted_ftp,submitted_aspera"
)
response = requests.get(analyses_url)
if response.status_code != 200:
logger.error(f"Error fetching analyses info from ENA for {project}")
Expand All @@ -103,9 +116,10 @@ def filter_out_processed_analyses(analyses_array, processed_analyses):

def get_processed_analyses(processed_analyses_file):
processed_analyses = set()
with open(processed_analyses_file, 'r') as file:
for line in file:
processed_analyses.add(line.split(",")[0])
if os.path.isfile(processed_analyses_file):
with open(processed_analyses_file, 'r') as file:
for line in file:
processed_analyses.add(line.split(",")[0])
return processed_analyses


Expand All @@ -127,6 +141,34 @@ def download_files(analyses_array, download_target_dir, processed_analyses_file)
os.remove(download_file_path)


@retry(exceptions=(UnfinishedBatchError,), logger=logger, tries=4, delay=10, backoff=1.2, jitter=(1, 3))
def download_files_via_aspera(analyses_array, download_target_dir, processed_analyses_file, ascp, aspera_id_dsa,
batch_size=100):
logger.info(f"total number of files to download: {len(analyses_array)}")
print(f'{len(analyses_array)} analysis')
with open(processed_analyses_file, 'a') as open_file:
# This copy won't change throughout the iteration
for analysis_batch in chunked(copy.copy(analyses_array), batch_size):
download_urls = [
f"era-fasp@{analysis['submitted_aspera']}" for analysis in analysis_batch
]
command = f'{ascp} -i {aspera_id_dsa} -QT -l 300m -P 33001 {" ".join(download_urls)} {download_target_dir}'
run_command_with_output(f"Download batch of covid19 data", command)

for analysis in analysis_batch:
expected_output_file = os.path.join(download_target_dir, os.path.basename(analysis['submitted_aspera']))
if os.path.exists(expected_output_file):
open_file.write(f"{analysis['analysis_accession']},{analysis['submitted_ftp']}\n")
# WARNING: This will modify the content of the original analysis array allowing the retry to
# only deal with a subset of files to download.
analyses_array.remove(analysis)
else:
logger.warn(f"Failed to download {analysis['submitted_aspera']}")
if len(analyses_array) > 0:
# Trigger a retry
raise UnfinishedBatchError(f'There are {len(analyses_array)} vcf files that were not downloaded')


@retry(logger=logger, tries=4, delay=120, backoff=1.2, jitter=(1, 3))
def download_file(download_url, download_file_path):
urllib.request.urlretrieve(download_url, download_file_path)
Expand All @@ -142,14 +184,19 @@ def main():
parser.add_argument("--processed-analyses-file", required=True,
help="full path to the file containing all the processed analyses")
parser.add_argument("--download-target-dir", required=True, help="Full path to the target download directory")
parser.add_argument("--ascp-bin", required=True, help="Full path to the ascp binary.")
parser.add_argument("--aspera-id-dsa-key", required=True, help="Full path to the aspera id dsa key.")
parser.add_argument("--batch-size", required=True, type=int, help="number of vcf file to download with each ascp "
"command.")

args = parser.parse_args()
logging_config.add_stdout_handler()

if args.num_analyses < 1 or args.num_analyses > 10000:
raise Exception("number of analyses to download should be between 1 and 10000")

download_analyses(args.project, args.num_analyses, args.processed_analyses_file, args.download_target_dir)
download_analyses(args.project, args.num_analyses, args.processed_analyses_file, args.download_target_dir,
args.ascp_bin, args.aspera_id_dsa_key, args.batch_size)


if __name__ == "__main__":
Expand Down
51 changes: 35 additions & 16 deletions covid19dp_submission/ingest_covid19dp_submission.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,21 +21,26 @@
import yaml
from ebi_eva_common_pyutils.command_utils import run_command_with_output
from ebi_eva_common_pyutils.config_utils import get_args_from_private_config_file
from ebi_eva_common_pyutils.logger import logging_config

from covid19dp_submission import NEXTFLOW_DIR
from covid19dp_submission.download_analyses import download_analyses
from covid19dp_submission.steps.vcf_vertical_concat.run_vcf_vertical_concat_pipeline import get_concat_result_file_name

logger = logging_config.get_logger(__name__)


def get_analyses_file_list(download_target_dir: str) -> List[str]:
return sorted([os.path.basename(member) for member in os.listdir(download_target_dir)
if member.lower().endswith(".vcf")])
if member.lower().endswith(".vcf") or member.lower().endswith(".vcf.gz")])


def _create_required_dirs(config: dict):
required_dirs = [config['submission']['download_target_dir'], config['submission']['concat_processing_dir'],
config['submission']['accession_output_dir'], config['submission']['public_ftp_dir'],
config['submission']['log_dir'], config['submission']['validation_dir']]
required_dirs = [config['submission']['download_target_dir'],
config['submission']['concat_processing_dir'],
config['submission']['accession_output_dir'],
config['submission']['log_dir'],
config['submission']['validation_dir']]
for dir_name in required_dirs:
os.makedirs(dir_name, exist_ok=True)

Expand Down Expand Up @@ -70,24 +75,35 @@ def _get_config(snapshot_name: str, project_dir: str, nextflow_config_file: str,
'script_path': os.path.dirname(inspect.getmodule(sys.modules[__name__]).__file__)}
config['executable']['nextflow_config_file'] = nextflow_config_file
config['executable']['nextflow_param_file'] = submission_param_file

return config


def ingest_covid19dp_submission(project: str, snapshot_name: str, project_dir: str, num_analyses: int,
def ingest_covid19dp_submission(project: str, project_dir: str, num_analyses: int,
processed_analyses_file: str, app_config_file: str, nextflow_config_file: str or None,
resume: bool):
resume: str):
process_new_snapshot = False
if snapshot_name is None:
if resume is None:
snapshot_name = datetime.now().strftime("%Y_%m_%d_%H_%M_%S")
process_new_snapshot = True
else:
snapshot_name = resume

config = _get_config(snapshot_name, project_dir, nextflow_config_file, app_config_file)
_create_required_dirs(config)

if process_new_snapshot:
download_analyses(project, num_analyses, processed_analyses_file, config['submission']['download_target_dir'])

_create_required_dirs(config)
else:
# Check that the snapshot exists
assert os.path.exists(
config['submission']['download_target_dir']), f'Cannot resume execution for snapshot {snapshot_name}'

list_file = get_analyses_file_list(config['submission']['download_target_dir'])
if len(list_file) < num_analyses:
num_analyses = num_analyses - len(list_file)
download_analyses(project, num_analyses, processed_analyses_file, config['submission']['download_target_dir'],
config['executable']['ascp_bin'], config['aspera']['aspera_id_dsa_key'], config.get('download_batch_size', 100))
else:
logger.info(f'All {num_analyses} analysis have been downloaded already. Skipping.')
vcf_files_to_be_downloaded = create_download_file_list(config)
config['submission']['concat_result_file'] = \
get_concat_result_file_name(config['submission']['concat_processing_dir'], len(vcf_files_to_be_downloaded),
Expand All @@ -96,9 +112,12 @@ def ingest_covid19dp_submission(project: str, snapshot_name: str, project_dir: s
nextflow_file_to_run = os.path.join(NEXTFLOW_DIR, 'submission_workflow.nf')
yaml.safe_dump(config, open(config['executable']['nextflow_param_file'], "w"))

run_nextflow_command = f"{config['executable']['nextflow']} run {nextflow_file_to_run}"
run_nextflow_command += f" -c {nextflow_config_file}" if nextflow_config_file else ""
run_nextflow_command += f" -resume" if resume else ""
run_nextflow_command += f" --PYTHONPATH {config['executable']['python']['script_path']}"
run_nextflow_command += f" -params-file {config['executable']['nextflow_param_file']}"
# run the nextflow script in the download directory so that each execution is independent
run_nextflow_command = (f"cd {config['submission']['download_target_dir']}; "
f"{config['executable']['nextflow']} run {nextflow_file_to_run} "
f"-c {nextflow_config_file} "
f"--PYTHONPATH {config['executable']['python']['script_path']} "
f"-params-file {config['executable']['nextflow_param_file']}")
run_nextflow_command += " -resume" if resume else ""

run_command_with_output(f"Running submission pipeline: {nextflow_file_to_run}...", run_nextflow_command)
1 change: 1 addition & 0 deletions covid19dp_submission/nextflow/submission_workflow.nf
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ process sync_accessions_to_public_ftp {

script:
"""
mkdir -p $params.submission.public_ftp_dir
(rsync -av $params.submission.accession_output_dir/* $params.submission.public_ftp_dir) \
>> $params.submission.log_dir/sync_accessions_to_public_ftp.log 2>&1
"""
Expand Down
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,4 @@ ebi_eva_common_pyutils==0.3.15
pyyaml
retry
pymongo
more_itertools
4 changes: 4 additions & 0 deletions tests/resources/properties/app_config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,10 @@ executable:
vcf_assembly_checker: vcf_assembly_checker_linux
vcf_validator: vcf_validator_linux
bcftools: bcftools
ascp_bin: ~/.aspera/connect/bin/ascp

aspera:
aspera_id_dsa_key: ~/.aspera/connect/etc/asperaweb_id_dsa.openssh

jar:
accession_pipeline: {accession_jar_file}
Expand Down
Loading

0 comments on commit 7422272

Please sign in to comment.