From d67c1d11bf45bb59a12a5dccd5daac5c7906319b Mon Sep 17 00:00:00 2001 From: Timothee Cezard Date: Wed, 25 May 2022 11:21:08 +0100 Subject: [PATCH 1/4] Add script to run on cron: run_ingestion_cron.sh is meant to be executed often directly on the cron server run_ingestion.sh is executed by run_ingestion_cron.sh to and require lots of memory --- bin/run_ingestion.sh | 87 +++++++++++++++++++++++++++++++++++++++ bin/run_ingestion_cron.sh | 30 ++++++++++++++ setup.py | 3 +- 3 files changed, 119 insertions(+), 1 deletion(-) create mode 100644 bin/run_ingestion.sh create mode 100644 bin/run_ingestion_cron.sh diff --git a/bin/run_ingestion.sh b/bin/run_ingestion.sh new file mode 100644 index 0000000..9d9d3cd --- /dev/null +++ b/bin/run_ingestion.sh @@ -0,0 +1,87 @@ +#!/bin/bash + +#email_recipient=*********** +#eva_dir=************** +# Grab the variables from a config bash script +source ~/.covid19dp_processing 2> /dev/null + +if [ -z "${email_recipient}" ] || [ -z "${eva_dir}" ]; +then + echo "run_ingestion.sh does not have access to email_recipient and eva_dir variables. Please set them above or populate ~/.covid19dp_processing" + exit 1 +fi + +tmp_dir=${eva_dir}/scratch +software_dir=${eva_dir}/software/covid19dp-submission/production_deployments/ +project_dir=${eva_dir}/data/PRJEB45554 +lock_file=${project_dir}/.lock_ingest_covid19dp_submission + +#Check if the previous process is still running +if [[ -e ${lock_file} ]]; +then + echo "processing in $(cat ${lock_file}) is still going. Exit" + exit 0 +fi + +# Check if there is an unfinished process +valid_dir=${project_dir}/30_eva_valid + +for dir in ${valid_dir}/????_??_??_??_??_??; +do + if [[ ! -e ${dir}/.process_complete ]]; + then + current_date=$(basename ${dir}) + break + fi +done + +# If the current_date is set and if it isn't then create a new one +if [ -z "$current_date" ]; +then + current_date=$(date --rfc-3339=second | cut -d '+' -f 1 | sed 's/[- :]/_/g' ) +fi + +processing_dir=${valid_dir}/${current_date} + +echo ${current_date} > ${lock_file} +# Ensure that the lock file is delete on exit of the script +trap 'rm ${lock_file}' EXIT + +mkdir -p ${processing_dir} +export TMPDIR=${tmp_dir} + +${software_dir}/production/bin/ingest_covid19dp_submission.py \ + --project-dir ${project_dir} --app-config-file ${software_dir}/app_config.yml \ + --nextflow-config-file ${software_dir}/workflow.config \ + --processed-analyses-file ${project_dir}/processed_analysis.txt --num-analyses 10000 \ + --resume-snapshot ${current_date} \ + >> ${processing_dir}/ingest_covid19dp.log \ + 2>> ${processing_dir}/ingest_covid19dp.err + +# Assess success +process_exit_status=$? +set -o pipefail && cat ${processing_dir}/ingest_covid19dp.log | grep 'submission_workflow.nf' | grep 'completed successfully' +grep_exit_status=$? +if [ ${process_exit_status} == 0 ] && [ ${grep_exit_status} == 0 ]; +then + touch ${processing_dir}/.process_complete + nb_processed=$(cat ${project_dir}/processed_analysis.txt| wc -l) + cat > ${processing_dir}/email <<- EOF +From: eva-noreply@ebi.ac.uk +To: ${email_recipient} +Subject: COVID19 Data Processing batch ${current_date} completed successfully + +Accessioning/Clustering of 10,000 new COVID19 samples started in ${current_date} is now complete. +The total number of sample processed is ${nb_processed} +EOF + cat ${processing_dir}/email | sendmail ${email_recipient} +else + cat > ${processing_dir}/email <<- EOF +From: eva-noreply@ebi.ac.uk +To: ${email_recipient} +Subject: COVID19 Data Processing batch ${current_date} failed + +Accessioning/Clustering processing batch ${current_date} failed. +EOF + cat ${processing_dir}/email | sendmail ${email_recipient} +fi \ No newline at end of file diff --git a/bin/run_ingestion_cron.sh b/bin/run_ingestion_cron.sh new file mode 100644 index 0000000..9b3d369 --- /dev/null +++ b/bin/run_ingestion_cron.sh @@ -0,0 +1,30 @@ +#!/bin/bash + +#email_recipient=*********** +# Grab the variables from a config bash script +source ~/.covid19dp_processing 2> /dev/null + +if [ -z "${email_recipient}" ] ; +then + echo "run_ingestion.sh does not have access to eva_dir variable. Please set it above or populate ~/.covid19dp_processing" + exit 1 +fi +software_dir=${eva_dir}/software/covid19dp-submission/production_deployments/ +project_dir=${eva_dir}/data/PRJEB45554 +lock_file=${project_dir}/.lock_ingest_covid19dp_submission + +#Check if the previous process is still running +if [[ -e ${lock_file} ]]; +then + echo "processing in $(cat ${lock_file}) is still going. Exit" + exit 0 +fi + +# Also check if there isn't a ingest_covid19dp process that would not started +if bjobs -o 'job_name:100' | grep 'ingest_covid19dp' > /dev/null; +then + echo "Lock file is not set but there is an ingest_covid19dp job running" +else + bsub -e ${project_dir}/run_ingestion.err -o ${project_dir}/run_ingestion.out -J ingest_covid19dp -M 20G \ + -R "rusage[mem=20960]" "${software_dir}/production/bin/run_ingestion.sh" +fi diff --git a/setup.py b/setup.py index 5d76612..fd05fdc 100644 --- a/setup.py +++ b/setup.py @@ -25,5 +25,6 @@ 'License :: OSI Approved :: Apache Software License', 'Programming Language :: Python :: 3' ], - scripts=glob.glob(join(base_dir, 'bin', '*.py')) + scripts=[join(base_dir, 'bin', s) for s in ('ingest_covid19dp_submission.py', 'run_ingestion.sh', + 'run_ingestion_cron.sh')] ) From a4316399bbf06b15490386fa954412d2e53e62fe Mon Sep 17 00:00:00 2001 From: Timothee Cezard Date: Wed, 25 May 2022 17:30:33 +0100 Subject: [PATCH 2/4] Update bin/run_ingestion.sh Co-authored-by: sundarvenkata-EBI --- bin/run_ingestion.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/bin/run_ingestion.sh b/bin/run_ingestion.sh index 9d9d3cd..53586b1 100644 --- a/bin/run_ingestion.sh +++ b/bin/run_ingestion.sh @@ -72,7 +72,7 @@ To: ${email_recipient} Subject: COVID19 Data Processing batch ${current_date} completed successfully Accessioning/Clustering of 10,000 new COVID19 samples started in ${current_date} is now complete. -The total number of sample processed is ${nb_processed} +The total number of samples processed is ${nb_processed} EOF cat ${processing_dir}/email | sendmail ${email_recipient} else From 7e2cd0ea9398db658bec78c73a0d3eebc18b8221 Mon Sep 17 00:00:00 2001 From: Timothee Cezard Date: Thu, 26 May 2022 10:28:29 +0100 Subject: [PATCH 3/4] Add basic test to ensure that the locking feature works --- .github/workflows/python-package.yml | 3 +++ bin/run_ingestion.sh | 14 ++++++++---- bin/run_ingestion_cron.sh | 4 ++-- tests/resources/bin/bjobs | 3 +++ tests/resources/bin/bsub | 3 +++ tests/test_run_ingestion.sh | 32 ++++++++++++++++++++++++++++ 6 files changed, 53 insertions(+), 6 deletions(-) mode change 100644 => 100755 bin/run_ingestion.sh mode change 100644 => 100755 bin/run_ingestion_cron.sh create mode 100755 tests/resources/bin/bjobs create mode 100755 tests/resources/bin/bsub create mode 100755 tests/test_run_ingestion.sh diff --git a/.github/workflows/python-package.yml b/.github/workflows/python-package.yml index 890cfd6..dcca67c 100644 --- a/.github/workflows/python-package.yml +++ b/.github/workflows/python-package.yml @@ -72,3 +72,6 @@ jobs: export ASPERA_ID_DSA=~/.aspera/connect/etc/asperaweb_id_dsa.openssh export NXF_DEFAULT_DSL=1 PYTHONPATH=. pytest tests + - name: Test bash script + run: | + tests/test_run_ingestion.sh diff --git a/bin/run_ingestion.sh b/bin/run_ingestion.sh old mode 100644 new mode 100755 index 53586b1..3cea099 --- a/bin/run_ingestion.sh +++ b/bin/run_ingestion.sh @@ -3,7 +3,7 @@ #email_recipient=*********** #eva_dir=************** # Grab the variables from a config bash script -source ~/.covid19dp_processing 2> /dev/null +source ~/.covid19dp_processing if [ -z "${email_recipient}" ] || [ -z "${eva_dir}" ]; then @@ -15,6 +15,8 @@ tmp_dir=${eva_dir}/scratch software_dir=${eva_dir}/software/covid19dp-submission/production_deployments/ project_dir=${eva_dir}/data/PRJEB45554 lock_file=${project_dir}/.lock_ingest_covid19dp_submission +number_to_process=10000 + #Check if the previous process is still running if [[ -e ${lock_file} ]]; @@ -42,18 +44,22 @@ then fi processing_dir=${valid_dir}/${current_date} +log_dir=${project_dir}/00_logs/${current_date} +public_dir=${project_dir}/60_eva_public/${current_date} echo ${current_date} > ${lock_file} # Ensure that the lock file is delete on exit of the script trap 'rm ${lock_file}' EXIT -mkdir -p ${processing_dir} +# Need to create the directories because we're using resume from the first execution so they won't be created by +# ingest_covid19dp_submission.py +mkdir -p ${processing_dir} ${log_dir} ${public_dir} export TMPDIR=${tmp_dir} ${software_dir}/production/bin/ingest_covid19dp_submission.py \ --project-dir ${project_dir} --app-config-file ${software_dir}/app_config.yml \ --nextflow-config-file ${software_dir}/workflow.config \ - --processed-analyses-file ${project_dir}/processed_analysis.txt --num-analyses 10000 \ + --processed-analyses-file ${project_dir}/processed_analysis.txt --num-analyses ${number_to_process} \ --resume-snapshot ${current_date} \ >> ${processing_dir}/ingest_covid19dp.log \ 2>> ${processing_dir}/ingest_covid19dp.err @@ -71,7 +77,7 @@ From: eva-noreply@ebi.ac.uk To: ${email_recipient} Subject: COVID19 Data Processing batch ${current_date} completed successfully -Accessioning/Clustering of 10,000 new COVID19 samples started in ${current_date} is now complete. +Accessioning/Clustering of ${number_to_process} new COVID19 samples started in ${current_date} is now complete. The total number of samples processed is ${nb_processed} EOF cat ${processing_dir}/email | sendmail ${email_recipient} diff --git a/bin/run_ingestion_cron.sh b/bin/run_ingestion_cron.sh old mode 100644 new mode 100755 index 9b3d369..f916063 --- a/bin/run_ingestion_cron.sh +++ b/bin/run_ingestion_cron.sh @@ -2,9 +2,9 @@ #email_recipient=*********** # Grab the variables from a config bash script -source ~/.covid19dp_processing 2> /dev/null +source ~/.covid19dp_processing -if [ -z "${email_recipient}" ] ; +if [ -z "${eva_dir}" ] ; then echo "run_ingestion.sh does not have access to eva_dir variable. Please set it above or populate ~/.covid19dp_processing" exit 1 diff --git a/tests/resources/bin/bjobs b/tests/resources/bin/bjobs new file mode 100755 index 0000000..08a7b20 --- /dev/null +++ b/tests/resources/bin/bjobs @@ -0,0 +1,3 @@ +#!/bin/bash + +echo "" \ No newline at end of file diff --git a/tests/resources/bin/bsub b/tests/resources/bin/bsub new file mode 100755 index 0000000..094336a --- /dev/null +++ b/tests/resources/bin/bsub @@ -0,0 +1,3 @@ +#!/bin/bash + +echo "" \ No newline at end of file diff --git a/tests/test_run_ingestion.sh b/tests/test_run_ingestion.sh new file mode 100755 index 0000000..d7caba4 --- /dev/null +++ b/tests/test_run_ingestion.sh @@ -0,0 +1,32 @@ +#!/bin/bash + +basedir=$(dirname $(dirname $(readlink -f $0))) +echo eva_dir=$basedir/tests/resources > ~/.covid19dp_processing +export PATH=$basedir/tests/resources/bin:$PATH +prj_dir=$basedir/tests/resources/data/PRJEB45554 + + +function tearDown { + rm -rf $basedir/tests/resources/data + rm ~/.covid19dp_processing +} +trap tearDown EXIT + + +if ${basedir}/bin/run_ingestion_cron.sh | grep '' > /dev/null ; +then + echo 'Test 1 Pass' +else + echo 'Test 1 Fail' + exit 1 +fi +mkdir -p $prj_dir +touch $prj_dir/.lock_ingest_covid19dp_submission + +if ${basedir}/bin/run_ingestion_cron.sh | grep '' > /dev/null ; +then + echo 'Test 2 Fail' + exit 1 +else + echo 'Test 2 Pass' +fi From 7dcd48ef74b591bb1249ecf79eb48189b878b8b2 Mon Sep 17 00:00:00 2001 From: Timothee Cezard Date: Thu, 26 May 2022 11:08:37 +0100 Subject: [PATCH 4/4] fix test --- covid19dp_submission/download_analyses.py | 1 - tests/test_download_analyses.py | 4 ++-- tests/test_ingest_covid19dp_submission.py | 4 ++-- 3 files changed, 4 insertions(+), 5 deletions(-) diff --git a/covid19dp_submission/download_analyses.py b/covid19dp_submission/download_analyses.py index c71f6fb..9fb0350 100644 --- a/covid19dp_submission/download_analyses.py +++ b/covid19dp_submission/download_analyses.py @@ -145,7 +145,6 @@ def download_files(analyses_array, download_target_dir, processed_analyses_file) def download_files_via_aspera(analyses_array, download_target_dir, processed_analyses_file, ascp, aspera_id_dsa, batch_size=100): logger.info(f"total number of files to download: {len(analyses_array)}") - print(f'{len(analyses_array)} analysis') with open(processed_analyses_file, 'a') as open_file: # This copy won't change throughout the iteration for analysis_batch in chunked(copy.copy(analyses_array), batch_size): diff --git a/tests/test_download_analyses.py b/tests/test_download_analyses.py index e9b4dac..0444c34 100644 --- a/tests/test_download_analyses.py +++ b/tests/test_download_analyses.py @@ -48,13 +48,13 @@ def check_processed_analysis_file(self, number_of_lines): def test_download_analyses(self): data = self.get_processed_files_data() self.create_processed_analysis_file(data) - ascp_bin = os.path.expanduser("ascp") + ascp_bin = "ascp" aspera_id_dsa_key = os.environ['ASPERA_ID_DSA'] download_analyses(project=self.project, num_analyses=self.num_analyses_to_download, processed_analyses_file=self.processed_analyses_file, download_target_dir=self.download_target_dir, ascp=ascp_bin, aspera_id_dsa=aspera_id_dsa_key, batch_size=100) - vcf_files = glob.glob(f"{self.download_target_dir}/*.vcf") + vcf_files = glob.glob(f"{self.download_target_dir}/*.vcf") + glob.glob(f"{self.download_target_dir}/*.vcf.gz") self.assertEqual(self.num_analyses_to_download, len(vcf_files)) self.check_processed_analysis_file(len(data) + self.num_analyses_to_download) diff --git a/tests/test_ingest_covid19dp_submission.py b/tests/test_ingest_covid19dp_submission.py index 18b825b..118c5f1 100644 --- a/tests/test_ingest_covid19dp_submission.py +++ b/tests/test_ingest_covid19dp_submission.py @@ -92,9 +92,9 @@ def test_ingest_covid19dp_submission(self): nextflow_config_file=self.nextflow_config_file, resume=None) num_clustered_variants = self.mongo_db[self.accessioning_database_name]['clusteredVariantEntity'] \ .count_documents(filter={}) - self.assertEqual(52, num_clustered_variants) + self.assertEqual(50, num_clustered_variants) # check if files are synchronized to the ftp dir self.assertEqual(2, len(glob.glob(f"{self.app_config['submission']['public_ftp_dir']}/*"))) num_incremental_release_records = self.mongo_db[self.accessioning_database_name]['releaseRecordEntity']\ .count_documents(filter={}) - self.assertEqual(52, num_incremental_release_records) + self.assertEqual(50, num_incremental_release_records)