From 48e02fc8c80cb841ee2765fb7e2a7c653187cbb4 Mon Sep 17 00:00:00 2001 From: tcezard Date: Mon, 30 Oct 2023 13:44:09 +0000 Subject: [PATCH] remove original nextflow files and refactor the tests --- eva_submission/eload_ingestion.py | 30 ++- eva_submission/nextflow/accession.nf | 209 ------------------ eva_submission/nextflow/accession_and_load.nf | 65 +++--- eva_submission/nextflow/variant_load.nf | 149 ------------- .../run_tests_accession_and_variant_load.sh | 2 +- .../nextflow-tests/run_tests_variant_load.sh | 28 +++ .../test_ingestion_config_variant_load.yaml | 42 ++++ tests/test_eload_ingestion.py | 104 +++++---- 8 files changed, 188 insertions(+), 441 deletions(-) delete mode 100644 eva_submission/nextflow/accession.nf delete mode 100644 eva_submission/nextflow/variant_load.nf create mode 100755 tests/nextflow-tests/run_tests_variant_load.sh create mode 100644 tests/nextflow-tests/test_ingestion_config_variant_load.yaml diff --git a/eva_submission/eload_ingestion.py b/eva_submission/eload_ingestion.py index 3ed2916e..a02c6a4f 100644 --- a/eva_submission/eload_ingestion.py +++ b/eva_submission/eload_ingestion.py @@ -89,7 +89,7 @@ def ingest( if do_accession or do_variant_load or annotation_only: self.fill_vep_versions(vep_cache_assembly_name) vcf_files_to_ingest = self._generate_csv_mappings_to_ingest() - self.run_accession_and_load_workflow(vcf_files_to_ingest, annotation_only, resume=resume) + self.run_accession_and_load_workflow(vcf_files_to_ingest, annotation_only, resume=resume, tasks=tasks) if 'optional_remap_and_cluster' in tasks: self.eload_cfg.set(self.config_section, 'clustering', 'instance_id', value=clustering_instance_id) @@ -100,10 +100,13 @@ def ingest( else: self.warning(f'Could not find any current supported assembly for the submission, skipping clustering') - # update metadata - self.update_loaded_assembly_in_browsable_files() + if do_accession or do_variant_load or annotation_only: + self._update_metadata_post_ingestion() + + def _update_metadata_post_ingestion(self): self.insert_browsable_files() self.update_browsable_files_with_date() + self.update_loaded_assembly_in_browsable_files() self.update_files_with_ftp_path() self.refresh_study_browser() @@ -305,7 +308,7 @@ def _generate_csv_mappings_to_ingest(self): self.warning(f"VCF files for analysis {analysis_alias} not found") return vcf_files_to_ingest - def run_accession_and_load_workflow(self, vcf_files_to_ingest, annotation_only, resume): + def run_accession_and_load_workflow(self, vcf_files_to_ingest, annotation_only, resume, tasks=None): instance_id = self.eload_cfg.query(self.config_section, 'accession', 'instance_id') output_dir = os.path.join(self.project_dir, project_dirs['accessions']) accession_properties_file = self.create_accession_properties( @@ -331,9 +334,10 @@ def run_accession_and_load_workflow(self, vcf_files_to_ingest, annotation_only, 'annotation_only': annotation_only, 'accession_job_props': accession_properties_file, 'load_job_props': variant_load_properties_file, - 'acc_import_job_props': accession_import_properties_file + 'acc_import_job_props': accession_import_properties_file, + 'tasks': tasks } - self.run_nextflow('accession_and_load', accession_config, resume) + self.run_nextflow('accession_and_load', accession_config, resume, tasks) def run_remap_and_cluster_workflow(self, target_assembly, resume): clustering_instance = self.eload_cfg.query(self.config_section, 'clustering', 'instance_id') @@ -626,15 +630,21 @@ def refresh_study_browser(self): def valid_vcf_filenames(self): return list(self.project_dir.joinpath(project_dirs['valid']).glob('*.vcf.gz')) - def run_nextflow(self, workflow_name, params, resume): + def run_nextflow(self, workflow_name, params, resume, tasks=None): """ Runs a Nextflow workflow using the provided parameters. This will create a Nextflow work directory and delete it if the process completes successfully. If the process fails, the work directory is preserved and the process can be resumed. """ work_dir = None + if tasks and tasks is not self.all_tasks: + # The subsection name combine the workflow and the tasks to ensure resumability only applies to a workflow + # and its tasks + subsection_name = workflow_name + '_' + '_'.join(sorted(tasks)) + else: + subsection_name = workflow_name if resume: - work_dir = self.eload_cfg.query(self.config_section, workflow_name, 'nextflow_dir') + work_dir = self.eload_cfg.query(self.config_section, subsection_name, 'nextflow_dir') if work_dir == self.nextflow_complete_value: self.info(f'Nextflow {workflow_name} pipeline already completed, skipping.') return @@ -643,7 +653,7 @@ def run_nextflow(self, workflow_name, params, resume): work_dir = None if not resume or not work_dir: work_dir = self.create_nextflow_temp_output_directory(base=self.project_dir) - self.eload_cfg.set(self.config_section, workflow_name, 'nextflow_dir', value=work_dir) + self.eload_cfg.set(self.config_section, subsection_name, 'nextflow_dir', value=work_dir) params_file = os.path.join(self.project_dir, f'{workflow_name}_params.yaml') with open(params_file, 'w') as open_file: @@ -662,7 +672,7 @@ def run_nextflow(self, workflow_name, params, resume): )) ) shutil.rmtree(work_dir) - self.eload_cfg.set(self.config_section, str(workflow_name), 'nextflow_dir', + self.eload_cfg.set(self.config_section, str(subsection_name), 'nextflow_dir', value=self.nextflow_complete_value) except subprocess.CalledProcessError as e: error_msg = f'Nextflow {workflow_name} pipeline failed: results might not be complete.' diff --git a/eva_submission/nextflow/accession.nf b/eva_submission/nextflow/accession.nf deleted file mode 100644 index 24ddd472..00000000 --- a/eva_submission/nextflow/accession.nf +++ /dev/null @@ -1,209 +0,0 @@ -#!/usr/bin/env nextflow - -nextflow.enable.dsl=2 - -def helpMessage() { - log.info""" - Accession variant files and copy to public FTP. - - Inputs: - --valid_vcfs csv file with the mappings for vcf file, assembly accession, fasta, assembly report, analysis_accession, db_name, aggregation - --project_accession project accession - --instance_id instance id to run accessioning - --accession_job_props properties file for accessioning job - --public_ftp_dir public FTP directory - --accessions_dir accessions directory (for properties files) - --public_dir directory for files to be made public - --logs_dir logs directory - --taxonomy taxonomy id - """ -} - -params.valid_vcfs = null -params.project_accession = null -params.instance_id = null -params.accession_job_props = null -params.public_ftp_dir = null -params.accessions_dir = null -params.public_dir = null -params.logs_dir = null -params.taxonomy = null -// executables -params.executable = ["bcftools": "bcftools", "tabix": "tabix"] -// java jars -params.jar = ["accession_pipeline": "accession_pipeline"] -// help -params.help = null - -// Show help message -if (params.help) exit 0, helpMessage() - -// Test input files -if (!params.valid_vcfs || !params.project_accession || !params.instance_id || !params.accession_job_props || !params.public_ftp_dir || !params.accessions_dir || !params.public_dir || !params.logs_dir || !params.taxonomy) { - if (!params.valid_vcfs) log.warn('Provide a csv file with the mappings (vcf file, assembly accession, fasta, assembly report, analysis_accession, db_name) --valid_vcfs') - if (!params.project_accession) log.warn('Provide a project accession using --project_accession') - if (!params.instance_id) log.warn('Provide an instance id using --instance_id') - if (!params.accession_job_props) log.warn('Provide job-specific properties using --accession_job_props') - if (!params.taxonomy) log.warn('Provide taxonomy id using --taxonomy') - if (!params.public_ftp_dir) log.warn('Provide public FTP directory using --public_ftp_dir') - if (!params.accessions_dir) log.warn('Provide accessions directory using --accessions_dir') - if (!params.public_dir) log.warn('Provide public directory using --public_dir') - if (!params.logs_dir) log.warn('Provide logs directory using --logs_dir') - exit 1, helpMessage() -} - -/* -Sequence of processes in case of: - non-human study: - accession_vcf -> sort_and_compress_vcf -> csi_index_vcf -> copy_to_ftp - human study (skip accessioning): - csi_index_vcf -> copy_to_ftp - -process input channels -accession_vcf -> valid_vcfs -csi_index_vcf -> csi_vcfs and compressed_vcf - -1. Check if the study we are working with is a human study or non-human by comparing the taxonomy_id of the study with human taxonomy_id (9606). -2. Provide values to the appropriate channels enabling them to start the corresponding processes. In case of non-human studies we want to start process - "accession_vcf" while in case of human studies we want to start processes "csi_index_vcf". - -non-human study: - - Initialize valid_vcfs channel with value so that it can start the process "accession_vcf". - - Initialize csi_vcfs channels as empty. This makes sure the processes "csi_index_vcf" are not started at the outset. - These processes will only be able to start after the process "sort_and_compress_vcf" finishes and create channels compressed_vcf with values. - -human study: - - Initialize valid_vcfs channel as empty, ensuring the process "accession_vcf" is not started and in turn accessioning part is also skipped - - Initialize csi_vcfs with values enabling them to start the processes "csi_index_vcf". -*/ -workflow { - is_human_study = (params.taxonomy == 9606) - - if (is_human_study) { - csi_vcfs = Channel.fromPath(params.valid_vcfs) - .splitCsv(header:true) - .map{row -> tuple(file(row.vcf_file))} - accessioned_files_to_rm = Channel.empty() - } else { - valid_vcfs = Channel.fromPath(params.valid_vcfs) - .splitCsv(header:true) - .map{row -> tuple(file(row.vcf_file), row.assembly_accession, row.aggregation, file(row.fasta), file(row.report))} - accession_vcf(valid_vcfs) - sort_and_compress_vcf(accession_vcf.out.accession_done) - csi_vcfs = sort_and_compress_vcf.out.compressed_vcf - accessioned_files_to_rm = accession_vcf.out.accessioned_filenames - } - csi_index_vcf(csi_vcfs) - copy_to_ftp(csi_index_vcf.out.csi_indexed_vcf.toList(), accessioned_files_to_rm.toList()) -} - - -/* - * Accession VCFs - */ -process accession_vcf { - clusterOptions "-g /accession/instance-${params.instance_id} \ - -o $params.logs_dir/${log_filename}.log \ - -e $params.logs_dir/${log_filename}.err" - - memory '6.7 GB' - - input: - tuple val(vcf_file), val(assembly_accession), val(aggregation), val(fasta), val(report) - - output: - val accessioned_filename, emit: accessioned_filenames - path "${accessioned_filename}.tmp", emit: accession_done - - script: - def pipeline_parameters = "" - pipeline_parameters += " --parameters.assemblyAccession=" + assembly_accession.toString() - pipeline_parameters += " --parameters.vcfAggregation=" + aggregation.toString() - pipeline_parameters += " --parameters.fasta=" + fasta.toString() - pipeline_parameters += " --parameters.assemblyReportUrl=file:" + report.toString() - pipeline_parameters += " --parameters.vcf=" + vcf_file.toString() - - vcf_filename = vcf_file.getFileName().toString() - accessioned_filename = vcf_filename.take(vcf_filename.indexOf(".vcf")) + ".accessioned.vcf" - log_filename = "accessioning.${vcf_filename}" - - pipeline_parameters += " --parameters.outputVcf=" + "${params.public_dir}/${accessioned_filename}" - - - """ - (java -Xmx6g -jar $params.jar.accession_pipeline --spring.config.location=file:$params.accession_job_props $pipeline_parameters) || \ - # If accessioning fails due to missing variants, but the only missing variants are structural variants, - # then we should treat this as a success from the perspective of the automation. - # TODO revert once accessioning pipeline properly registers structural variants - [[ \$(grep -o 'Skipped processing structural variant' ${params.logs_dir}/${log_filename}.log | wc -l) \ - == \$(grep -oP '\\d+(?= unaccessioned variants need to be checked)' ${params.logs_dir}/${log_filename}.log) ]] - echo "done" > ${accessioned_filename}.tmp - """ -} - - -/* - * Sort and compress accessioned VCFs - */ -process sort_and_compress_vcf { - publishDir params.public_dir, - mode: 'copy' - - input: - path tmp_file - - output: - // used by csi indexing process - path "*.gz", emit: compressed_vcf - - """ - filename=\$(basename $tmp_file) - filename=\${filename%.*} - $params.executable.bcftools sort -O z -o \${filename}.gz ${params.public_dir}/\${filename} - """ -} - - -process csi_index_vcf { - publishDir params.public_dir, - mode: 'copy' - - input: - path compressed_vcf - - output: - path "${compressed_vcf}.csi", emit: csi_indexed_vcf - - """ - $params.executable.bcftools index -c $compressed_vcf - """ -} - - -/* - * Copy files from eva_public to FTP folder. - */ - process copy_to_ftp { - label 'datamover' - - input: - // ensures that all indices are done before we copy - file csi_indices - val accessioned_vcfs - - script: - if( accessioned_vcfs.size() > 0 ) - """ - cd $params.public_dir - # remove the uncompressed accessioned vcf file - rm ${accessioned_vcfs.join(' ')} - rsync -va * ${params.public_ftp_dir}/${params.project_accession} - ls -l ${params.public_ftp_dir}/${params.project_accession}/* - """ - else - """ - cd $params.public_dir - rsync -va * ${params.public_ftp_dir}/${params.project_accession} - ls -l ${params.public_ftp_dir}/${params.project_accession}/* - """ - } diff --git a/eva_submission/nextflow/accession_and_load.nf b/eva_submission/nextflow/accession_and_load.nf index 6b76ff19..ce20121a 100644 --- a/eva_submission/nextflow/accession_and_load.nf +++ b/eva_submission/nextflow/accession_and_load.nf @@ -39,6 +39,8 @@ params.annotation_only = null params.executable = ["bcftools": "bcftools", "tabix": "tabix", "bgzip": "bgzip"] // java jars params.jar = ["accession_pipeline": "accession_pipeline", "eva_pipeline": "eva_pipeline"] +// ingestion tasks +params.ingestion_tasks = ["metadata_load", "accession", "variant_load", "annotation", "optional_remap_and_cluster"] // help params.help = null @@ -92,36 +94,43 @@ human study: - Initialize csi_vcfs with values enabling them to start the processes "csi_index_vcf". */ workflow { + all_accession_complete = null is_human_study = (params.taxonomy == 9606) - if (is_human_study) { - csi_vcfs = Channel.fromPath(params.valid_vcfs) - .splitCsv(header:true) - .map{row -> tuple(file(row.vcf_file))} - accessioned_files_to_rm = Channel.empty() - } else { - valid_vcfs = Channel.fromPath(params.valid_vcfs) - .splitCsv(header:true) - .map{row -> tuple(file(row.vcf_file), row.assembly_accession, row.aggregation, file(row.fasta), file(row.report))} - accession_vcf(valid_vcfs) - sort_and_compress_vcf(accession_vcf.out.accession_done) - csi_vcfs = sort_and_compress_vcf.out.compressed_vcf - accessioned_files_to_rm = accession_vcf.out.accessioned_filenames + if ("accession" in params.ingestion_tasks) { + if (is_human_study) { + csi_vcfs = Channel.fromPath(params.valid_vcfs) + .splitCsv(header:true) + .map{row -> tuple(file(row.vcf_file))} + accessioned_files_to_rm = Channel.empty() + } else { + valid_vcfs = Channel.fromPath(params.valid_vcfs) + .splitCsv(header:true) + .map{row -> tuple(file(row.vcf_file), row.assembly_accession, row.aggregation, file(row.fasta), file(row.report))} + accession_vcf(valid_vcfs) + sort_and_compress_vcf(accession_vcf.out.accession_done) + csi_vcfs = sort_and_compress_vcf.out.compressed_vcf + accessioned_files_to_rm = accession_vcf.out.accessioned_filenames + all_accession_complete = sort_and_compress_vcf.out.compressed_vcf + } + csi_index_vcf(csi_vcfs) + copy_to_ftp(csi_index_vcf.out.csi_indexed_vcf.toList(), accessioned_files_to_rm.toList()) } - csi_index_vcf(csi_vcfs) - copy_to_ftp(csi_index_vcf.out.csi_indexed_vcf.toList(), accessioned_files_to_rm.toList()) - - annotated_vcfs = Channel.fromPath(params.valid_vcfs) - .splitCsv(header:true) - .map{row -> tuple(file(row.vcf_file), file(row.fasta), row.analysis_accession, row.db_name, row.vep_version, row.vep_cache_version, row.vep_species, row.aggregation)} - load_vcf(annotated_vcfs) - - if (!is_human_study) { - vcf_files_dbname = Channel.fromPath(params.valid_vcfs) + if ("variant_load" in params.ingestion_tasks || "annotation" in params.ingestion_tasks) { + annotated_vcfs = Channel.fromPath(params.valid_vcfs) .splitCsv(header:true) - .map{row -> tuple(file(row.vcf_file), row.db_name)} - // the vcf_files_dbname give the link between input file and compressed_vcf is to ensure the accessioning has - // been completed - import_accession(vcf_files_dbname, sort_and_compress_vcf.out.compressed_vcf, load_vcf.out.variant_load_complete) + .map{row -> tuple(file(row.vcf_file), file(row.fasta), row.analysis_accession, row.db_name, row.vep_version, row.vep_cache_version, row.vep_species, row.aggregation)} + load_vcf(annotated_vcfs) + + if (!is_human_study) { + vcf_files_dbname = Channel.fromPath(params.valid_vcfs) + .splitCsv(header:true) + .map{row -> tuple(file(row.vcf_file), row.db_name)} + // the vcf_files_dbname give the link between input file and all_accession_complete is to ensure the + // accessioning has been completed + if (all_accession_complete){ + import_accession(vcf_files_dbname, all_accession_complete, load_vcf.out.variant_load_complete) + } + } } } @@ -300,7 +309,7 @@ process import_accession { input: tuple val(vcf_file), val(db_name) - path compressed_vcf + val all_accession_complete val variant_load_output memory '5 GB' diff --git a/eva_submission/nextflow/variant_load.nf b/eva_submission/nextflow/variant_load.nf deleted file mode 100644 index 2d05d3f7..00000000 --- a/eva_submission/nextflow/variant_load.nf +++ /dev/null @@ -1,149 +0,0 @@ -#!/usr/bin/env nextflow - -nextflow.enable.dsl=2 - -def helpMessage() { - log.info""" - Load variant files into variant warehouse. - - Inputs: - --valid_vcfs csv file with the mappings for vcf file, assembly accession, fasta, assembly report, - analysis_accession, db_name, vep version, vep cache version, aggregation - --project_accession project accession - --load_job_props properties file for variant load job - --acc_import_job_props properties file for accession import job - --annotation_only whether to only run annotation job - --taxonomy taxonomy id - --project_dir project directory - --logs_dir logs directory - """ -} - -params.valid_vcfs = null -params.vep_path = null -params.project_accession = null -params.load_job_props = null -params.acc_import_job_props = null -params.annotation_only = false -params.taxonomy = null -params.project_dir = null -params.logs_dir = null -// executables -params.executable = ["bgzip": "bgzip"] -// java jars -params.jar = ["eva_pipeline": "eva_pipeline"] -// help -params.help = null - -// Show help message -if (params.help) exit 0, helpMessage() - -// Test inputs -if (!params.valid_vcfs || !params.vep_path || !params.project_accession || !params.taxonomy || !params.load_job_props || !params.acc_import_job_props || !params.project_dir || !params.logs_dir) { - if (!params.valid_vcfs) log.warn('Provide a csv file with the mappings (vcf file, assembly accession, fasta, assembly report, analysis_accession, db_name) --valid_vcfs') - if (!params.vep_path) log.warn('Provide path to VEP installations using --vep_path') - if (!params.project_accession) log.warn('Provide project accession using --project_accession') - if (!params.taxonomy) log.warn('Provide taxonomy id using --taxonomy') - if (!params.load_job_props) log.warn('Provide path to variant load job properties file --load_job_props') - if (!params.acc_import_job_props) log.warn('Provide path to accession import job properties file using --acc_import_job_props') - if (!params.project_dir) log.warn('Provide project directory using --project_dir') - if (!params.logs_dir) log.warn('Provide logs directory using --logs_dir') - exit 1, helpMessage() -} - - -workflow { - unmerged_vcfs = Channel.fromPath(params.valid_vcfs) - .splitCsv(header:true) - .map{row -> tuple(file(row.vcf_file), file(row.fasta), row.analysis_accession, row.db_name, row.vep_version, row.vep_cache_version, row.vep_species, row.aggregation)} - load_vcf(unmerged_vcfs) - - if (params.taxonomy != 9606) { - vcf_files_list = Channel.fromPath(params.valid_vcfs) - .splitCsv(header:true) - .map{row -> tuple(file(row.vcf_file), row.db_name)} - import_accession(vcf_files_list, load_vcf.out.variant_load_complete) - } -} - -/* - * Load into variant db. - */ -process load_vcf { - clusterOptions { - log_filename = vcf_file.getFileName().toString() - return "-o $params.logs_dir/pipeline.${log_filename}.log \ - -e $params.logs_dir/pipeline.${log_filename}.err" - } - - input: - tuple val(vcf_file), val(fasta), val(analysis_accession), val(db_name), val(vep_version), val(vep_cache_version), val(vep_species), val(aggregation) - - output: - val true, emit: variant_load_complete - - memory '5 GB' - - script: - def pipeline_parameters = "" - - if (params.annotation_only) { - pipeline_parameters += " --spring.batch.job.names=annotate-variants-job" - } else if(aggregation.toString() == "none"){ - pipeline_parameters += " --spring.batch.job.names=genotyped-vcf-job" - } else{ - pipeline_parameters += " --spring.batch.job.names=aggregated-vcf-job" - } - - pipeline_parameters += " --input.vcf.aggregation=" + aggregation.toString().toUpperCase() - pipeline_parameters += " --input.vcf=" + vcf_file.toRealPath().toString() - pipeline_parameters += " --input.vcf.id=" + analysis_accession.toString() - pipeline_parameters += " --input.fasta=" + fasta.toString() - - pipeline_parameters += " --spring.data.mongodb.database=" + db_name.toString() - - if (vep_version.trim() == "" || vep_cache_version.trim() == "") { - pipeline_parameters += " --annotation.skip=true" - } else { - pipeline_parameters += " --annotation.skip=false" - pipeline_parameters += " --app.vep.version=" + vep_version.toString() - pipeline_parameters += " --app.vep.path=" + "${params.vep_path}/ensembl-vep-release-${vep_version}/vep" - pipeline_parameters += " --app.vep.cache.version=" + vep_cache_version.toString() - pipeline_parameters += " --app.vep.cache.species=" + vep_species.toString() - } - - """ - java -Xmx4G -jar $params.jar.eva_pipeline --spring.config.location=file:$params.load_job_props --parameters.path=$params.load_job_props $pipeline_parameters - """ -} - - -/* - * Import Accession Into Variant warehouse - */ -process import_accession { - clusterOptions { - log_filename = vcf_file.getFileName().toString() - return "-o $params.logs_dir/acc_import.${log_filename}.log \ - -e $params.logs_dir/acc_import.${log_filename}.err" - } - - input: - tuple val(vcf_file), val(db_name) - val variant_load_output - - memory '5 GB' - - script: - def pipeline_parameters = "" - - accessioned_report_name = vcf_file.getFileName().toString().replace('.vcf','.accessioned.vcf') - pipeline_parameters += " --input.accession.report=" + "${params.project_dir}/60_eva_public/${accessioned_report_name}" - pipeline_parameters += " --spring.batch.job.names=accession-import-job" - pipeline_parameters += " --spring.data.mongodb.database=" + db_name.toString() - - - """ - java -Xmx4G -jar $params.jar.eva_pipeline --spring.config.location=file:$params.acc_import_job_props --parameters.path=$params.acc_import_job_props $pipeline_parameters - """ -} diff --git a/tests/nextflow-tests/run_tests_accession_and_variant_load.sh b/tests/nextflow-tests/run_tests_accession_and_variant_load.sh index a8bdb0f5..3f3b9565 100755 --- a/tests/nextflow-tests/run_tests_accession_and_variant_load.sh +++ b/tests/nextflow-tests/run_tests_accession_and_variant_load.sh @@ -19,7 +19,7 @@ nextflow run ${SOURCE_DIR}/accession_and_load.nf -params-file test_ingestion_con # check for public files and logs printf "\e[32m====== Files made public ======\e[0m\n" for f in test1 test2 test3; do ls project/public/${f}.accessioned.vcf.gz project/public/${f}.accessioned.vcf.gz.csi; done -ls project/public/ | wc -l +ls -1 project/public/ | wc -l printf "\n\e[32m======== Commands run ========\e[0m\n" find work/ \( -name '*.out' -o -name '*.err' \) -exec cat {} \; diff --git a/tests/nextflow-tests/run_tests_variant_load.sh b/tests/nextflow-tests/run_tests_variant_load.sh new file mode 100755 index 00000000..211cbff0 --- /dev/null +++ b/tests/nextflow-tests/run_tests_variant_load.sh @@ -0,0 +1,28 @@ +#!/bin/bash + +set -Eeuo pipefail + +SCRIPT_DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" >/dev/null 2>&1 && pwd )" +SOURCE_DIR="$(dirname $(dirname $SCRIPT_DIR))/eva_submission/nextflow" + +cwd=${PWD} +cd ${SCRIPT_DIR} +mkdir -p project project/accessions project/public ftp + +# run accession and variant load +# note public_dir needs to be an absolute path, unlike others in config +printf "\e[32m==== ACCESSION & VARIANT LOAD PIPELINES ====\e[0m\n" +nextflow run ${SOURCE_DIR}/accession_and_load.nf -params-file test_ingestion_config_variant_load.yaml \ + --project_dir ${SCRIPT_DIR}/project \ + --accessions_dir ${SCRIPT_DIR}/project/accessions \ + --public_dir ${SCRIPT_DIR}/project/public +# check for public files and logs +printf "\e[32m====== Files made public ======\e[0m\n" +ls project/public/ | wc -l +printf "\n\e[32m======== Commands run ========\e[0m\n" +#find work/ \( -name '*.out' -o -name '*.err' \) -exec cat {} \; + +# clean up +rm -rf work .nextflow* +rm -r project ftp +cd ${cwd} diff --git a/tests/nextflow-tests/test_ingestion_config_variant_load.yaml b/tests/nextflow-tests/test_ingestion_config_variant_load.yaml new file mode 100644 index 00000000..348e0f73 --- /dev/null +++ b/tests/nextflow-tests/test_ingestion_config_variant_load.yaml @@ -0,0 +1,42 @@ +project_accession: PRJEB12345 +taxonomy: 1234 +instance_id: 1 +public_ftp_dir: ../../ftp +logs_dir: ../../../project/logs +public_dir: ../../../project/public +valid_vcfs: vcf_files_to_ingest.csv +vep_path: /path/to/vep +ingestion_tasks: + - variant_load + +source_assemblies: + - GCA_0000001 + - GCA_0000002 + - GCA_0000003 +target_assembly_accession: GCA_0000003 + +accession_job_props: accession.properties +load_job_props: test_variant_load.properties +acc_import_job_props: test_accession_import.properties + +executable: + bcftools: ../../../bin/fake_bcftools.sh + bgzip: ../../../bin/fake_bgzip.sh + tabix: ../../../bin/fake_tabix.sh + samtools: samtools + bedtools: bedtools + minimap2: minimap2 + nextflow: nextflow + genome_downloader: ../../../bin/fake_genome_downloader.py + custom_assembly: ../../../bin/fake_custom_assembly.py + python_activate: ../../../bin/venv_activate + +nextflow: + remapping: ../../../bin/fake_remapping_pipeline.nf + +jar: + accession_pipeline: ../../../java/accession.jar + eva_pipeline: ../../../java/variant-load.jar + vcf_extractor: ../../../java/extraction.jar + vcf_ingestion: ../../../java/remap-loading.jar + clustering: ../../../java/clustering.jar diff --git a/tests/test_eload_ingestion.py b/tests/test_eload_ingestion.py index 6be71757..77ad1968 100644 --- a/tests/test_eload_ingestion.py +++ b/tests/test_eload_ingestion.py @@ -10,6 +10,17 @@ from eva_submission.submission_config import load_config +def default_db_results_for_update_metadata(): + # The update of metadata at the end of execution + browsable_files = [(1, 'ERA', 'filename_1', 'PRJ', 123), (2, 'ERA', 'filename_1', 'PRJ', 123)] + return [ + browsable_files, # insert_browsable_files files_query + browsable_files, # insert_browsable_files find_browsable_files_query + [(1, 'GCA_999')], # update_loaded_assembly_in_browsable_files + [(1, 'filename_1'), (2, 'filename_2')] # update_files_with_ftp_path + ] + + def default_db_results_for_metadata_load(): return [ [(391,)] # Check the assembly_set_id in update_assembly_set_in_analysis @@ -23,33 +34,32 @@ def default_db_results_for_target_assembly(): def default_db_results_for_accession(): - browsable_files = [(1, 'ERA', 'filename_1', 'PRJ', 123), (2, 'ERA', 'filename_1', 'PRJ', 123)] return [ - browsable_files, # insert_browsable_files files_query - browsable_files, # insert_browsable_files find_browsable_files_query - [(1, 'filename_1'), (2, 'filename_2')] # update_files_with_ftp_path + [('Test Study Name')] # get_study_name ] -def default_db_results_for_variant_load(): +def default_db_results_for_clustering(): return [ - [('Test Study Name')], # get_study_name - [(1, 'filename_1'), (2, 'filename_2')] # update_loaded_assembly_in_browsable_files + [('GCA_123',)] # current supported assembly ] -def default_db_results_for_clustering(): +def default_db_results_for_accession_and_load(): return [ - [('GCA_123',)] # current supported assembly + [('Test Study Name',)] # get_study_name ] +def default_db_results_for_accession(): + return default_db_results_for_accession_and_load() + default_db_results_for_update_metadata() + + def default_db_results_for_ingestion(): return ( default_db_results_for_metadata_load() - + default_db_results_for_accession() + + default_db_results_for_accession_and_load() + default_db_results_for_clustering() - + default_db_results_for_variant_load() ) @@ -155,20 +165,18 @@ def test_load_from_ena_script_fails(self): def test_ingest_all_tasks(self): with self._patch_metadata_handle(), \ + patch.object(EloadIngestion, '_update_metadata_post_ingestion') as m_post_load_metadata, \ patch('eva_submission.eload_ingestion.get_all_results_for_query') as m_get_results, \ patch('eva_submission.eload_ingestion.command_utils.run_command_with_output', autospec=True), \ patch('eva_submission.eload_utils.get_metadata_connection_handle', autospec=True), \ - patch('eva_submission.eload_utils.get_all_results_for_query') as m_get_alias_results, \ + patch('eva_submission.eload_submission.get_hold_date_from_ena') as m_get_hold_date, \ patch('eva_submission.eload_ingestion.get_vep_and_vep_cache_version') as m_get_vep_versions, \ - patch('eva_submission.eload_utils.requests.post') as m_post, \ patch('eva_submission.eload_ingestion.get_species_name_from_ncbi') as m_get_species, \ patch('eva_submission.eload_ingestion.get_assembly_name_and_taxonomy_id') as m_get_tax, \ patch('eva_submission.eload_ingestion.insert_new_assembly_and_taxonomy') as insert_asm_tax, \ self._patch_mongo_database(): - m_get_alias_results.return_value = [['alias']] m_get_vep_versions.return_value = (100, 100) m_get_species.return_value = 'homo_sapiens' - m_post.return_value.text = self.get_mock_result_for_ena_date() m_get_results.side_effect = default_db_results_for_ingestion() m_get_tax.return_value = ('name', '9090') self.eload.ingest(1) @@ -215,7 +223,7 @@ def test_ingest_accession(self): tasks=['accession'] ) assert os.path.exists( - os.path.join(self.resources_folder, 'projects/PRJEB12345/accession_params.yaml') + os.path.join(self.resources_folder, 'projects', 'PRJEB12345', 'accession_and_load_params.yaml') ) def test_ingest_variant_load(self): @@ -233,10 +241,10 @@ def test_ingest_variant_load(self): m_get_vep_versions.return_value = (100, 100) m_get_species.return_value = 'homo_sapiens' m_post.return_value.text = self.get_mock_result_for_ena_date() - m_get_results.side_effect = default_db_results_for_variant_load() + m_get_results.side_effect = default_db_results_for_accession() self.eload.ingest(tasks=['variant_load']) assert os.path.exists( - os.path.join(self.resources_folder, 'projects/PRJEB12345/variant_load_params.yaml') + os.path.join(self.resources_folder, 'projects/PRJEB12345/accession_and_load_params.yaml') ) def test_insert_browsable_files(self): @@ -315,6 +323,7 @@ def assert_vep_versions(self, vep_version, vep_cache_version, vep_species): def test_ingest_variant_load_vep_versions_found(self): with self._patch_metadata_handle(), \ + patch.object(EloadIngestion, '_update_metadata_post_ingestion') as m_post_load_metadata, \ patch('eva_submission.eload_ingestion.get_all_results_for_query') as m_get_results, \ patch('eva_submission.eload_ingestion.command_utils.run_command_with_output', autospec=True), \ patch('eva_submission.eload_utils.get_metadata_connection_handle', autospec=True), \ @@ -326,7 +335,7 @@ def test_ingest_variant_load_vep_versions_found(self): self._patch_mongo_database(): m_get_alias_results.return_value = [['alias']] m_post.return_value.text = self.get_mock_result_for_ena_date() - m_get_results.side_effect = default_db_results_for_variant_load() + m_get_results.side_effect = default_db_results_for_accession() m_get_vep_versions.return_value = (100, 100) m_get_species.return_value = 'homo_sapiens' self.eload.ingest(tasks=['variant_load']) @@ -338,6 +347,7 @@ def test_ingest_variant_load_vep_versions_not_found(self): but skip annotation. """ with self._patch_metadata_handle(), \ + patch.object(EloadIngestion, '_update_metadata_post_ingestion') as m_post_load_metadata, \ patch('eva_submission.eload_ingestion.get_all_results_for_query') as m_get_results, \ patch('eva_submission.eload_ingestion.command_utils.run_command_with_output', autospec=True), \ patch('eva_submission.eload_utils.get_metadata_connection_handle', autospec=True), \ @@ -349,7 +359,7 @@ def test_ingest_variant_load_vep_versions_not_found(self): self._patch_mongo_database(): m_get_alias_results.return_value = [['alias']] m_post.return_value.text = self.get_mock_result_for_ena_date() - m_get_results.side_effect = default_db_results_for_variant_load() + m_get_results.side_effect = default_db_results_for_accession_and_load() m_get_vep_versions.return_value = (None, None) m_get_species.return_value = 'homo_sapiens' self.eload.ingest(tasks=['variant_load']) @@ -360,6 +370,7 @@ def test_ingest_variant_load_vep_versions_error(self): If getting VEP cache version raises an exception, we should stop the loading process altogether. """ with self._patch_metadata_handle(), \ + patch.object(EloadIngestion, '_update_metadata_post_ingestion') as m_post_load_metadata, \ patch('eva_submission.eload_ingestion.get_all_results_for_query') as m_get_results, \ patch('eva_submission.eload_ingestion.command_utils.run_command_with_output', autospec=True), \ patch('eva_submission.eload_utils.get_metadata_connection_handle', autospec=True), \ @@ -370,15 +381,16 @@ def test_ingest_variant_load_vep_versions_error(self): self._patch_mongo_database(): m_get_alias_results.return_value = [['alias']] m_post.return_value.text = self.get_mock_result_for_ena_date() - m_get_results.side_effect = default_db_results_for_variant_load() + m_get_results.side_effect = default_db_results_for_accession_and_load() m_get_vep_versions.side_effect = ValueError() with self.assertRaises(ValueError): self.eload.ingest(tasks=['variant_load']) - config_file = os.path.join(self.resources_folder, 'projects/PRJEB12345/variant_load_params.yaml') + config_file = os.path.join(self.resources_folder, 'projects/PRJEB12345/accession_and_load_params.yaml') assert not os.path.exists(config_file) def test_ingest_annotation_only(self): with self._patch_metadata_handle(), \ + patch.object(EloadIngestion, '_update_metadata_post_ingestion') as m_post_load_metadata, \ patch('eva_submission.eload_ingestion.get_all_results_for_query') as m_get_results, \ patch('eva_submission.eload_ingestion.command_utils.run_command_with_output', autospec=True), \ patch('eva_submission.eload_utils.get_metadata_connection_handle', autospec=True), \ @@ -392,10 +404,10 @@ def test_ingest_annotation_only(self): m_get_vep_versions.return_value = (100, 100) m_get_species.return_value = 'homo_sapiens' m_post.return_value.text = self.get_mock_result_for_ena_date() - m_get_results.side_effect = default_db_results_for_variant_load() + m_get_results.side_effect = default_db_results_for_accession() self.eload.ingest(tasks=['annotation']) assert os.path.exists( - os.path.join(self.resources_folder, 'projects/PRJEB12345/variant_load_params.yaml') + os.path.join(self.resources_folder, 'projects/PRJEB12345/accession_and_load_params.yaml') ) def test_ingest_clustering(self): @@ -456,6 +468,7 @@ def test_ingest_clustering_supported_assembly_in_another_taxonomy(self): def test_resume_when_step_fails(self): with self._patch_metadata_handle(), \ + patch.object(EloadIngestion, '_update_metadata_post_ingestion') as m_post_load_metadata, \ patch('eva_submission.eload_ingestion.get_all_results_for_query') as m_get_results, \ patch('eva_submission.eload_ingestion.command_utils.run_command_with_output', autospec=True) as m_run_command, \ patch('eva_submission.eload_utils.get_metadata_connection_handle', autospec=True), \ @@ -470,8 +483,9 @@ def test_resume_when_step_fails(self): m_get_vep_versions.return_value = (100, 100) m_get_species.return_value = 'homo_sapiens' m_post.return_value.text = self.get_mock_result_for_ena_date() - m_get_results.side_effect = default_db_results_for_metadata_load() \ - + default_db_results_for_ingestion() + m_get_results.side_effect = (default_db_results_for_metadata_load() + + default_db_results_for_accession_and_load() + + default_db_results_for_ingestion()) m_run_command.side_effect = [ None, # metadata load @@ -485,7 +499,7 @@ def test_resume_when_step_fails(self): with self.assertRaises(subprocess.CalledProcessError): self.eload.ingest() - nextflow_dir = self.eload.eload_cfg.query(self.eload.config_section, 'accession', 'nextflow_dir') + nextflow_dir = self.eload.eload_cfg.query(self.eload.config_section, 'accession_and_load', 'nextflow_dir') assert os.path.exists(nextflow_dir) self.eload.ingest(resume=True) @@ -493,6 +507,7 @@ def test_resume_when_step_fails(self): def test_resume_completed_job(self): with self._patch_metadata_handle(), \ + patch.object(EloadIngestion, '_update_metadata_post_ingestion') as m_post_load_metadata, \ patch('eva_submission.eload_ingestion.get_all_results_for_query') as m_get_results, \ patch('eva_submission.eload_ingestion.command_utils.run_command_with_output', autospec=True) as m_run_command, \ patch('eva_submission.eload_utils.get_metadata_connection_handle', autospec=True), \ @@ -513,16 +528,17 @@ def test_resume_completed_job(self): # Resuming with no existing job execution is fine self.eload.ingest(resume=True) num_db_calls = m_get_results.call_count - assert m_run_command.call_count == 4 + assert m_run_command.call_count == 3 # If we resume a successfully completed job, everything in the python will re-run (including db queries) # but the nextflow calls will not self.eload.ingest(resume=True) - assert m_get_results.call_count == 2*num_db_calls - assert m_run_command.call_count == 5 # 1 per task, plus 1 for metadata + assert m_get_results.call_count == 2 * num_db_calls + assert m_run_command.call_count == 4 # 1 per task, plus 1 for metadata load def test_resume_with_tasks(self): with self._patch_metadata_handle(), \ + patch.object(EloadIngestion, '_update_metadata_post_ingestion') as m_post_load_metadata, \ patch('eva_submission.eload_ingestion.get_all_results_for_query') as m_get_results, \ patch('eva_submission.eload_ingestion.command_utils.run_command_with_output', autospec=True) as m_run_command, \ patch('eva_submission.eload_utils.get_metadata_connection_handle', autospec=True), \ @@ -537,36 +553,36 @@ def test_resume_with_tasks(self): m_get_species.return_value = 'homo_sapiens' m_post.return_value.text = self.get_mock_result_for_ena_date() m_get_results.side_effect = ( - default_db_results_for_variant_load() - + default_db_results_for_accession() - + default_db_results_for_variant_load() + default_db_results_for_accession_and_load() + + default_db_results_for_clustering() + + default_db_results_for_accession_and_load() ) m_run_command.side_effect = [ subprocess.CalledProcessError(1, 'nextflow accession'), # first accession fails - None, # variant load run alone + None, # remapping run alone None, # accession on resume - None, # variant load on resume ] + accession_config_section = 'accession_and_load_accession' + remap_config_section = 'remap_and_cluster' # Accession fails... with self.assertRaises(subprocess.CalledProcessError): self.eload.ingest(tasks=['accession'], resume=True) - accession_nextflow_dir = self.eload.eload_cfg.query(self.eload.config_section, 'accession', 'nextflow_dir') + accession_nextflow_dir = self.eload.eload_cfg.query(self.eload.config_section, accession_config_section, 'nextflow_dir') assert os.path.exists(accession_nextflow_dir) - # ...doesn't resume when we run just variant_load (successfully)... - self.eload.ingest(tasks=['variant_load'], resume=True) - new_accession_nextflow_dir = self.eload.eload_cfg.query(self.eload.config_section, 'accession', - 'nextflow_dir') - assert new_accession_nextflow_dir == accession_nextflow_dir + # ...doesn't resume when we run just optional_remap_and_cluster (successfully)... + self.eload.ingest(tasks=['optional_remap_and_cluster'], resume=True) + new_remap_nextflow_dir = self.eload.eload_cfg.query(self.eload.config_section, remap_config_section, 'nextflow_dir') + assert new_remap_nextflow_dir != accession_nextflow_dir assert os.path.exists(accession_nextflow_dir) - load_nextflow_dir = self.eload.eload_cfg.query(self.eload.config_section, 'variant_load', 'nextflow_dir') - assert load_nextflow_dir == self.eload.nextflow_complete_value + assert not os.path.exists(new_remap_nextflow_dir) + assert new_remap_nextflow_dir == self.eload.nextflow_complete_value # ...and does resume when we run accession again. self.eload.ingest(tasks=['accession'], resume=True) - new_accession_nextflow_dir = self.eload.eload_cfg.query(self.eload.config_section, 'accession', + new_accession_nextflow_dir = self.eload.eload_cfg.query(self.eload.config_section, accession_config_section, 'nextflow_dir') assert new_accession_nextflow_dir == self.eload.nextflow_complete_value assert not os.path.exists(accession_nextflow_dir)