Skip to content

Commit

Permalink
Merge pull request #13 from tcezard/EVA2876_run_ingestion_on_cron
Browse files Browse the repository at this point in the history
EVA-2876 - Add script to run on Covid19 processing on cron
  • Loading branch information
tcezard authored May 26, 2022
2 parents 7422272 + 7dcd48e commit 68c9887
Show file tree
Hide file tree
Showing 10 changed files with 170 additions and 6 deletions.
3 changes: 3 additions & 0 deletions .github/workflows/python-package.yml
Original file line number Diff line number Diff line change
Expand Up @@ -72,3 +72,6 @@ jobs:
export ASPERA_ID_DSA=~/.aspera/connect/etc/asperaweb_id_dsa.openssh
export NXF_DEFAULT_DSL=1
PYTHONPATH=. pytest tests
- name: Test bash script
run: |
tests/test_run_ingestion.sh
93 changes: 93 additions & 0 deletions bin/run_ingestion.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
#!/bin/bash

#email_recipient=***********
#eva_dir=**************
# Grab the variables from a config bash script
source ~/.covid19dp_processing

if [ -z "${email_recipient}" ] || [ -z "${eva_dir}" ];
then
echo "run_ingestion.sh does not have access to email_recipient and eva_dir variables. Please set them above or populate ~/.covid19dp_processing"
exit 1
fi

tmp_dir=${eva_dir}/scratch
software_dir=${eva_dir}/software/covid19dp-submission/production_deployments/
project_dir=${eva_dir}/data/PRJEB45554
lock_file=${project_dir}/.lock_ingest_covid19dp_submission
number_to_process=10000


#Check if the previous process is still running
if [[ -e ${lock_file} ]];
then
echo "processing in $(cat ${lock_file}) is still going. Exit"
exit 0
fi

# Check if there is an unfinished process
valid_dir=${project_dir}/30_eva_valid

for dir in ${valid_dir}/????_??_??_??_??_??;
do
if [[ ! -e ${dir}/.process_complete ]];
then
current_date=$(basename ${dir})
break
fi
done

# If the current_date is set and if it isn't then create a new one
if [ -z "$current_date" ];
then
current_date=$(date --rfc-3339=second | cut -d '+' -f 1 | sed 's/[- :]/_/g' )
fi

processing_dir=${valid_dir}/${current_date}
log_dir=${project_dir}/00_logs/${current_date}
public_dir=${project_dir}/60_eva_public/${current_date}

echo ${current_date} > ${lock_file}
# Ensure that the lock file is delete on exit of the script
trap 'rm ${lock_file}' EXIT

# Need to create the directories because we're using resume from the first execution so they won't be created by
# ingest_covid19dp_submission.py
mkdir -p ${processing_dir} ${log_dir} ${public_dir}
export TMPDIR=${tmp_dir}

${software_dir}/production/bin/ingest_covid19dp_submission.py \
--project-dir ${project_dir} --app-config-file ${software_dir}/app_config.yml \
--nextflow-config-file ${software_dir}/workflow.config \
--processed-analyses-file ${project_dir}/processed_analysis.txt --num-analyses ${number_to_process} \
--resume-snapshot ${current_date} \
>> ${processing_dir}/ingest_covid19dp.log \
2>> ${processing_dir}/ingest_covid19dp.err

# Assess success
process_exit_status=$?
set -o pipefail && cat ${processing_dir}/ingest_covid19dp.log | grep 'submission_workflow.nf' | grep 'completed successfully'
grep_exit_status=$?
if [ ${process_exit_status} == 0 ] && [ ${grep_exit_status} == 0 ];
then
touch ${processing_dir}/.process_complete
nb_processed=$(cat ${project_dir}/processed_analysis.txt| wc -l)
cat > ${processing_dir}/email <<- EOF
From: eva-noreply@ebi.ac.uk
To: ${email_recipient}
Subject: COVID19 Data Processing batch ${current_date} completed successfully
Accessioning/Clustering of ${number_to_process} new COVID19 samples started in ${current_date} is now complete.
The total number of samples processed is ${nb_processed}
EOF
cat ${processing_dir}/email | sendmail ${email_recipient}
else
cat > ${processing_dir}/email <<- EOF
From: eva-noreply@ebi.ac.uk
To: ${email_recipient}
Subject: COVID19 Data Processing batch ${current_date} failed
Accessioning/Clustering processing batch ${current_date} failed.
EOF
cat ${processing_dir}/email | sendmail ${email_recipient}
fi
30 changes: 30 additions & 0 deletions bin/run_ingestion_cron.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
#!/bin/bash

#email_recipient=***********
# Grab the variables from a config bash script
source ~/.covid19dp_processing

if [ -z "${eva_dir}" ] ;
then
echo "run_ingestion.sh does not have access to eva_dir variable. Please set it above or populate ~/.covid19dp_processing"
exit 1
fi
software_dir=${eva_dir}/software/covid19dp-submission/production_deployments/
project_dir=${eva_dir}/data/PRJEB45554
lock_file=${project_dir}/.lock_ingest_covid19dp_submission

#Check if the previous process is still running
if [[ -e ${lock_file} ]];
then
echo "processing in $(cat ${lock_file}) is still going. Exit"
exit 0
fi

# Also check if there isn't a ingest_covid19dp process that would not started
if bjobs -o 'job_name:100' | grep 'ingest_covid19dp' > /dev/null;
then
echo "Lock file is not set but there is an ingest_covid19dp job running"
else
bsub -e ${project_dir}/run_ingestion.err -o ${project_dir}/run_ingestion.out -J ingest_covid19dp -M 20G \
-R "rusage[mem=20960]" "${software_dir}/production/bin/run_ingestion.sh"
fi
1 change: 0 additions & 1 deletion covid19dp_submission/download_analyses.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,6 @@ def download_files(analyses_array, download_target_dir, processed_analyses_file)
def download_files_via_aspera(analyses_array, download_target_dir, processed_analyses_file, ascp, aspera_id_dsa,
batch_size=100):
logger.info(f"total number of files to download: {len(analyses_array)}")
print(f'{len(analyses_array)} analysis')
with open(processed_analyses_file, 'a') as open_file:
# This copy won't change throughout the iteration
for analysis_batch in chunked(copy.copy(analyses_array), batch_size):
Expand Down
3 changes: 2 additions & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,5 +25,6 @@
'License :: OSI Approved :: Apache Software License',
'Programming Language :: Python :: 3'
],
scripts=glob.glob(join(base_dir, 'bin', '*.py'))
scripts=[join(base_dir, 'bin', s) for s in ('ingest_covid19dp_submission.py', 'run_ingestion.sh',
'run_ingestion_cron.sh')]
)
3 changes: 3 additions & 0 deletions tests/resources/bin/bjobs
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
#!/bin/bash

echo ""
3 changes: 3 additions & 0 deletions tests/resources/bin/bsub
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
#!/bin/bash

echo "<submitted>"
4 changes: 2 additions & 2 deletions tests/test_download_analyses.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,13 +48,13 @@ def check_processed_analysis_file(self, number_of_lines):
def test_download_analyses(self):
data = self.get_processed_files_data()
self.create_processed_analysis_file(data)
ascp_bin = os.path.expanduser("ascp")
ascp_bin = "ascp"
aspera_id_dsa_key = os.environ['ASPERA_ID_DSA']
download_analyses(project=self.project, num_analyses=self.num_analyses_to_download,
processed_analyses_file=self.processed_analyses_file,
download_target_dir=self.download_target_dir, ascp=ascp_bin,
aspera_id_dsa=aspera_id_dsa_key, batch_size=100)
vcf_files = glob.glob(f"{self.download_target_dir}/*.vcf")
vcf_files = glob.glob(f"{self.download_target_dir}/*.vcf") + glob.glob(f"{self.download_target_dir}/*.vcf.gz")
self.assertEqual(self.num_analyses_to_download, len(vcf_files))
self.check_processed_analysis_file(len(data) + self.num_analyses_to_download)

Expand Down
4 changes: 2 additions & 2 deletions tests/test_ingest_covid19dp_submission.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,9 +92,9 @@ def test_ingest_covid19dp_submission(self):
nextflow_config_file=self.nextflow_config_file, resume=None)
num_clustered_variants = self.mongo_db[self.accessioning_database_name]['clusteredVariantEntity'] \
.count_documents(filter={})
self.assertEqual(52, num_clustered_variants)
self.assertEqual(50, num_clustered_variants)
# check if files are synchronized to the ftp dir
self.assertEqual(2, len(glob.glob(f"{self.app_config['submission']['public_ftp_dir']}/*")))
num_incremental_release_records = self.mongo_db[self.accessioning_database_name]['releaseRecordEntity']\
.count_documents(filter={})
self.assertEqual(52, num_incremental_release_records)
self.assertEqual(50, num_incremental_release_records)
32 changes: 32 additions & 0 deletions tests/test_run_ingestion.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
#!/bin/bash

basedir=$(dirname $(dirname $(readlink -f $0)))
echo eva_dir=$basedir/tests/resources > ~/.covid19dp_processing
export PATH=$basedir/tests/resources/bin:$PATH
prj_dir=$basedir/tests/resources/data/PRJEB45554


function tearDown {
rm -rf $basedir/tests/resources/data
rm ~/.covid19dp_processing
}
trap tearDown EXIT


if ${basedir}/bin/run_ingestion_cron.sh | grep '<submitted>' > /dev/null ;
then
echo 'Test 1 Pass'
else
echo 'Test 1 Fail'
exit 1
fi
mkdir -p $prj_dir
touch $prj_dir/.lock_ingest_covid19dp_submission

if ${basedir}/bin/run_ingestion_cron.sh | grep '<submitted>' > /dev/null ;
then
echo 'Test 2 Fail'
exit 1
else
echo 'Test 2 Pass'
fi

0 comments on commit 68c9887

Please sign in to comment.