diff --git a/eva_submission/eload_brokering.py b/eva_submission/eload_brokering.py index d085ea5..dff819d 100644 --- a/eva_submission/eload_brokering.py +++ b/eva_submission/eload_brokering.py @@ -12,7 +12,7 @@ from eva_submission.ENA_submission.upload_to_ENA import ENAUploader, ENAUploaderAsync from eva_submission.biosample_submission.biosamples_submitters import SampleMetadataSubmitter, SampleReferenceSubmitter from eva_submission.eload_submission import Eload -from eva_submission.eload_utils import read_md5 +from eva_submission.eload_utils import read_md5, get_nextflow_config_flag from eva_submission.submission_config import EloadConfig @@ -169,7 +169,7 @@ def _run_brokering_prep_workflow(self): 'output_dir': output_dir, 'executable': cfg['executable'] } - # run the validation + # run the brokering preparation brokering_config_file = os.path.join(self.eload_dir, 'brokering_config_file.yaml') with open(brokering_config_file, 'w') as open_file: yaml.safe_dump(brokering_config, open_file) @@ -180,7 +180,8 @@ def _run_brokering_prep_workflow(self): ' '.join(( cfg['executable']['nextflow'], brokering_script, '-params-file', brokering_config_file, - '-work-dir', output_dir + '-work-dir', output_dir, + get_nextflow_config_flag() )) ) except subprocess.CalledProcessError as e: diff --git a/eva_submission/eload_ingestion.py b/eva_submission/eload_ingestion.py index 1072509..9fff9df 100644 --- a/eva_submission/eload_ingestion.py +++ b/eva_submission/eload_ingestion.py @@ -21,7 +21,8 @@ from eva_submission import NEXTFLOW_DIR from eva_submission.eload_submission import Eload -from eva_submission.eload_utils import provision_new_database_for_variant_warehouse, check_project_exists_in_evapro +from eva_submission.eload_utils import provision_new_database_for_variant_warehouse, check_project_exists_in_evapro, \ + get_nextflow_config_flag from eva_submission.submission_config import EloadConfig from eva_submission.vep_utils import get_vep_and_vep_cache_version @@ -684,7 +685,8 @@ def run_nextflow(self, workflow_name, params, resume, tasks=all_tasks): cfg['executable']['nextflow'], nextflow_script, '-params-file', params_file, '-work-dir', work_dir, - '-resume' if resume else '' + '-resume' if resume else '', + get_nextflow_config_flag() )) ) shutil.rmtree(work_dir) diff --git a/eva_submission/eload_utils.py b/eva_submission/eload_utils.py index 0e7b748..3e151ef 100644 --- a/eva_submission/eload_utils.py +++ b/eva_submission/eload_utils.py @@ -45,6 +45,7 @@ def is_single_insdc_sequence(reference_accession): return not NCBIAssembly.is_assembly_accession_format(reference_accession) and \ NCBISequence.is_genbank_accession_format(reference_accession) + def resolve_accession_from_text(reference_text): """ :param reference_text: @@ -262,3 +263,14 @@ def create_assembly_report_from_fasta(assembly_fasta_path): open_file.write('\t'.join(['na', 'na', 'na', 'na', contig_name, '<>', 'na', 'na', str(seq_length), 'na']) + '\n') return assembly_report_path + +def get_nextflow_config_flag(): + """ + Return the commandline flag for Nextflow to use the config provided in environment variable SUBMISSION_NEXTFLOW_CONFIG. + If not provided, return an empty string, which allows Nextflow to use the default precedence as described here: + https://www.nextflow.io/docs/latest/config.html + """ + env_val = os.getenv('SUBMISSION_NEXTFLOW_CONFIG') + if env_val: + return f'-c {env_val}' + return '' diff --git a/eva_submission/eload_validation.py b/eva_submission/eload_validation.py index 2378c54..12b5e38 100755 --- a/eva_submission/eload_validation.py +++ b/eva_submission/eload_validation.py @@ -13,7 +13,7 @@ from eva_submission import NEXTFLOW_DIR from eva_submission.eload_submission import Eload -from eva_submission.eload_utils import resolve_single_file_path, detect_vcf_aggregation +from eva_submission.eload_utils import resolve_single_file_path, detect_vcf_aggregation, get_nextflow_config_flag from eva_submission.samples_checker import compare_spreadsheet_and_vcf from eva_submission.xlsx.xlsx_validation import EvaXlsxValidator @@ -254,8 +254,8 @@ def _run_validation_workflow(self, validation_tasks): 'validation_tasks': validation_tasks } # run the validation - validation_confg_file = os.path.join(self.eload_dir, 'validation_confg_file.yaml') - with open(validation_confg_file, 'w') as open_file: + validation_config_file = os.path.join(self.eload_dir, 'validation_config_file.yaml') + with open(validation_config_file, 'w') as open_file: yaml.safe_dump(validation_config, open_file) validation_script = os.path.join(NEXTFLOW_DIR, 'validation.nf') try: @@ -264,8 +264,9 @@ def _run_validation_workflow(self, validation_tasks): ' '.join(( 'export NXF_OPTS="-Xms1g -Xmx8g"; ', cfg['executable']['nextflow'], validation_script, - '-params-file', validation_confg_file, - '-work-dir', output_dir + '-params-file', validation_config_file, + '-work-dir', output_dir, + get_nextflow_config_flag() )) ) except subprocess.CalledProcessError: diff --git a/eva_submission/nextflow/accession_and_load.nf b/eva_submission/nextflow/accession_and_load.nf index 2f17782..2086ca9 100644 --- a/eva_submission/nextflow/accession_and_load.nf +++ b/eva_submission/nextflow/accession_and_load.nf @@ -168,6 +168,7 @@ workflow { * Convert the genome to the same naming convention as the VCF */ process prepare_genome { + label 'default_time', 'med_mem' input: tuple path(fasta), path(report), val(assembly_accession), path(vcf_files) @@ -189,6 +190,8 @@ process prepare_genome { * Normalise the VCF files */ process normalise_vcf { + label 'default_time', 'med_mem' + input: tuple val(vcf_filename), path(fasta), path(vcf_file), path(csi_file) @@ -208,11 +211,10 @@ process normalise_vcf { * Accession VCFs */ process accession_vcf { - clusterOptions "-g /accession/instance-${params.instance_id} \ - -o $params.logs_dir/${log_filename}.log \ - -e $params.logs_dir/${log_filename}.err" + label 'long_time', 'med_mem' - memory '6.7 GB' + clusterOptions "-o $params.logs_dir/${log_filename}.log \ + -e $params.logs_dir/${log_filename}.err" input: tuple val(vcf_filename), val(vcf_file), val(assembly_accession), val(aggregation), val(fasta), val(report) @@ -236,7 +238,7 @@ process accession_vcf { """ - (java -Xmx6g -jar $params.jar.accession_pipeline --spring.config.location=file:$params.accession_job_props $pipeline_parameters) || \ + (java -Xmx${task.memory.toGiga()-1}G -jar $params.jar.accession_pipeline --spring.config.location=file:$params.accession_job_props $pipeline_parameters) || \ # If accessioning fails due to missing variants, but the only missing variants are structural variants, # then we should treat this as a success from the perspective of the automation. # TODO revert once accessioning pipeline properly registers structural variants @@ -251,6 +253,8 @@ process accession_vcf { * Sort and compress accessioned VCFs */ process sort_and_compress_vcf { + label 'default_time', 'med_mem' + publishDir params.public_dir, mode: 'copy' @@ -269,6 +273,8 @@ process sort_and_compress_vcf { process csi_index_vcf { + label 'default_time', 'small_mem' + publishDir params.public_dir, mode: 'copy' @@ -288,7 +294,7 @@ process csi_index_vcf { * Copy files from eva_public to FTP folder. */ process copy_to_ftp { - label 'datamover' + label 'datamover', 'short_time', 'small_mem' input: // ensures that all indices are done before we copy @@ -316,6 +322,8 @@ process csi_index_vcf { * Load into variant db. */ process load_variants_vcf { + label 'long_time', 'med_mem' + clusterOptions { return "-o $params.logs_dir/load_variants.${vcf_filename}.log \ -e $params.logs_dir/load_variants.${vcf_filename}.err" @@ -327,8 +335,6 @@ process load_variants_vcf { output: val true, emit: variant_load_complete - memory '5 GB' - script: def pipeline_parameters = " --spring.batch.job.names=load-vcf-job" pipeline_parameters += " --input.vcf.aggregation=" + aggregation.toString().toUpperCase() @@ -338,7 +344,7 @@ process load_variants_vcf { pipeline_parameters += " --spring.data.mongodb.database=" + db_name.toString() """ - java -Xmx4G -jar $params.jar.eva_pipeline --spring.config.location=file:$params.load_job_props --parameters.path=$params.load_job_props $pipeline_parameters + java -Xmx${task.memory.toGiga()-1}G -jar $params.jar.eva_pipeline --spring.config.location=file:$params.load_job_props --parameters.path=$params.load_job_props $pipeline_parameters """ } @@ -347,6 +353,8 @@ process load_variants_vcf { * Run VEP using eva-pipeline. */ process run_vep_on_variants { + label 'long_time', 'med_mem' + clusterOptions { return "-o $params.logs_dir/annotation.${analysis_accession}.log \ -e $params.logs_dir/annotation.${analysis_accession}.err" @@ -362,8 +370,6 @@ process run_vep_on_variants { output: val true, emit: vep_run_complete - memory '5 GB' - script: def pipeline_parameters = "" @@ -380,7 +386,7 @@ process run_vep_on_variants { pipeline_parameters += " --app.vep.cache.species=" + vep_species.toString() """ - java -Xmx4G -jar $params.jar.eva_pipeline --spring.config.location=file:$params.load_job_props --parameters.path=$params.load_job_props $pipeline_parameters + java -Xmx${task.memory.toGiga()-1}G -jar $params.jar.eva_pipeline --spring.config.location=file:$params.load_job_props --parameters.path=$params.load_job_props $pipeline_parameters """ } @@ -390,6 +396,8 @@ process run_vep_on_variants { * Calculate statistics using eva-pipeline. */ process calculate_statistics_vcf { + label 'long_time', 'med_mem' + clusterOptions { return "-o $params.logs_dir/statistics.${analysis_accession}.log \ -e $params.logs_dir/statistics.${analysis_accession}.err" @@ -406,12 +414,9 @@ process calculate_statistics_vcf { output: val true, emit: statistics_calc_complete - memory '5 GB' - script: def pipeline_parameters = "" - pipeline_parameters += " --spring.batch.job.names=calculate-statistics-job" pipeline_parameters += " --input.vcf.aggregation=" + aggregation.toString().toUpperCase() @@ -421,7 +426,7 @@ process calculate_statistics_vcf { pipeline_parameters += " --spring.data.mongodb.database=" + db_name.toString() """ - java -Xmx4G -jar $params.jar.eva_pipeline --spring.config.location=file:$params.load_job_props --parameters.path=$params.load_job_props $pipeline_parameters + java -Xmx${task.memory.toGiga()-1}G -jar $params.jar.eva_pipeline --spring.config.location=file:$params.load_job_props --parameters.path=$params.load_job_props $pipeline_parameters """ } @@ -429,6 +434,8 @@ process calculate_statistics_vcf { * Import Accession Into Variant warehouse */ process import_accession { + label 'default_time', 'med_mem' + clusterOptions { log_filename = vcf_file.getFileName().toString() return "-o $params.logs_dir/acc_import.${log_filename}.log \ @@ -440,8 +447,6 @@ process import_accession { val all_accession_complete val variant_load_output - memory '5 GB' - script: def pipeline_parameters = "" @@ -451,6 +456,6 @@ process import_accession { pipeline_parameters += " --spring.data.mongodb.database=" + db_name.toString() """ - java -Xmx4G -jar $params.jar.eva_pipeline --spring.config.location=file:$params.acc_import_job_props --parameters.path=$params.acc_import_job_props $pipeline_parameters + java -Xmx${task.memory.toGiga()-1}G -jar $params.jar.eva_pipeline --spring.config.location=file:$params.acc_import_job_props --parameters.path=$params.acc_import_job_props $pipeline_parameters """ } diff --git a/eva_submission/nextflow/prepare_brokering.nf b/eva_submission/nextflow/prepare_brokering.nf index f116bc6..7ea4334 100644 --- a/eva_submission/nextflow/prepare_brokering.nf +++ b/eva_submission/nextflow/prepare_brokering.nf @@ -62,6 +62,7 @@ workflow { * Compress the VCF file */ process compress_vcf { + label 'default_time', 'small_mem' input: path vcf_file @@ -85,6 +86,7 @@ process compress_vcf { * Index the compressed VCF file */ process csi_index_vcf { + label 'default_time', 'small_mem' input: tuple val(input_vcf), path(compressed_vcf) @@ -101,6 +103,7 @@ process csi_index_vcf { * Convert the genome to the same naming convention as the VCF */ process prepare_genome { + label 'default_time', 'med_mem' input: tuple path(fasta), path(report), val(assembly_accession), path(vcf_files) @@ -122,6 +125,8 @@ process prepare_genome { * Normalise the VCF files */ process normalise_vcf { + label 'default_time', 'med_mem' + publishDir "$params.output_dir", overwrite: false, mode: "copy", @@ -148,6 +153,7 @@ process normalise_vcf { * md5 the compressed vcf and its index */ process md5_vcf_and_index { + label 'short_time', 'small_mem' publishDir "$params.output_dir", overwrite: true, diff --git a/eva_submission/nextflow/remap_and_cluster.nf b/eva_submission/nextflow/remap_and_cluster.nf index 59d0db8..fca924a 100644 --- a/eva_submission/nextflow/remap_and_cluster.nf +++ b/eva_submission/nextflow/remap_and_cluster.nf @@ -25,7 +25,6 @@ def helpMessage() { params.source_assemblies = null params.target_assembly_accession = null params.species_name = null -params.memory = 8 params.logs_dir = null // help params.help = null @@ -46,6 +45,8 @@ if (!params.taxonomy_id || !params.source_assemblies || !params.target_assembly_ process retrieve_source_genome { + label 'short_time', 'med_mem' + when: source_assembly_accession != params.target_assembly_accession @@ -65,6 +66,7 @@ process retrieve_source_genome { process retrieve_target_genome { + label 'short_time', 'med_mem' input: val target_assembly_accession @@ -82,6 +84,7 @@ process retrieve_target_genome { } process update_source_genome { + label 'short_time', 'med_mem' input: tuple val(source_assembly_accession), path(source_fasta), path(source_report) @@ -96,6 +99,7 @@ process update_source_genome { } process update_target_genome { + label 'short_time', 'med_mem' input: path target_fasta @@ -116,8 +120,7 @@ process update_target_genome { * Extract the submitted variants to remap from the accessioning warehouse and store them in a VCF file. */ process extract_vcf_from_mongo { - memory "${params.memory}GB" - clusterOptions "-g /accession" + label 'long_time', 'med_mem' when: source_assembly_accession != params.target_assembly_accession @@ -133,7 +136,7 @@ process extract_vcf_from_mongo { publishDir "$params.logs_dir", overwrite: true, mode: "copy", pattern: "*.log*" """ - java -Xmx8G -jar $params.jar.vcf_extractor \ + java -Xmx${task.memory.toGiga()-1}G -jar $params.jar.vcf_extractor \ --spring.config.location=file:${params.extraction_properties} \ --parameters.assemblyAccession=${source_assembly_accession} \ --parameters.fasta=${source_fasta} \ @@ -147,7 +150,7 @@ process extract_vcf_from_mongo { * Variant remapping pipeline */ process remap_variants { - memory "${params.memory}GB" + label 'long_time', 'med_mem' input: tuple val(source_assembly_accession), path(source_fasta), path(source_vcf) @@ -184,8 +187,7 @@ process remap_variants { * Ingest the remapped submitted variants from a VCF file into the accessioning warehouse. */ process ingest_vcf_into_mongo { - memory "${params.memory}GB" - clusterOptions "-g /accession" + label 'long_time', 'med_mem' input: tuple val(source_assembly_accession), path(remapped_vcf) @@ -198,7 +200,7 @@ process ingest_vcf_into_mongo { script: """ - java -Xmx8G -jar $params.jar.vcf_ingestion \ + java -Xmx${task.memory.toGiga()-1}G -jar $params.jar.vcf_ingestion \ --spring.config.location=file:${params.ingestion_properties} \ --parameters.remappedFrom=${source_assembly_accession} \ --parameters.vcf=${remapped_vcf} \ @@ -212,8 +214,7 @@ process ingest_vcf_into_mongo { * Cluster target assembly. */ process cluster_studies_from_mongo { - memory "${params.memory}GB" - clusterOptions "-g /accession/instance-${params.clustering_instance}" + label 'long_time', 'med_mem' input: path ingestion_log @@ -225,7 +226,7 @@ process cluster_studies_from_mongo { publishDir "$params.logs_dir", overwrite: true, mode: "copy" """ - java -Xmx8G -jar $params.jar.clustering \ + java -Xmx${task.memory.toGiga()-1}G -jar $params.jar.clustering \ --spring.config.location=file:${params.clustering_properties} \ --spring.batch.job.names=STUDY_CLUSTERING_JOB \ > ${params.target_assembly_accession}_clustering.log @@ -236,8 +237,7 @@ process cluster_studies_from_mongo { * Run clustering QC job */ process qc_clustering { - memory "${params.memory}GB" - clusterOptions "-g /accession" + label 'long_time', 'med_mem' input: path rs_report @@ -248,7 +248,7 @@ process qc_clustering { publishDir "$params.logs_dir", overwrite: true, mode: "copy", pattern: "*.log*" """ - java -Xmx8G -jar $params.jar.clustering \ + java -Xmx${task.memory.toGiga()-1}G -jar $params.jar.clustering \ --spring.config.location=file:${params.clustering_properties} \ --spring.batch.job.names=NEW_CLUSTERED_VARIANTS_QC_JOB \ > ${params.target_assembly_accession}_clustering_qc.log @@ -260,8 +260,7 @@ process qc_clustering { * Run Back propagation of new clustered RS only if the remapping was performed */ process backpropagate_clusters { - memory "${params.memory}GB" - clusterOptions "-g /accession" + label 'long_time', 'med_mem' input: tuple val(source_assembly_accession), path(remapped_vcf) @@ -273,7 +272,7 @@ process backpropagate_clusters { publishDir "$params.logs_dir", overwrite: true, mode: "copy", pattern: "*.log*" """ - java -Xmx8G -jar $params.jar.clustering \ + java -Xmx${task.memory.toGiga()-1}G -jar $params.jar.clustering \ --spring.config.location=file:${params.clustering_properties} \ --parameters.remappedFrom=${source_assembly_accession} \ --spring.batch.job.names=BACK_PROPAGATE_NEW_RS_JOB \ @@ -286,7 +285,7 @@ workflow { species_name = params.species_name.toLowerCase().replace(" ", "_") remapping_required = params.source_assemblies.any {it != params.target_assembly_accession} - if (remapping_required){ + if (remapping_required) { retrieve_source_genome(params.source_assemblies, species_name) retrieve_target_genome(params.target_assembly_accession, species_name) update_source_genome(retrieve_source_genome.out.source_assembly, params.remapping_config) @@ -300,7 +299,7 @@ workflow { // to make sure it does not run out of values when multiple remapping are performed // See https://www.nextflow.io/docs/latest/process.html#multiple-input-channels backpropagate_clusters(remap_variants.out.remapped_vcfs, qc_clustering.out.clustering_qc_log_filename) - }else{ + } else { // We're using params.genome_assembly_dir because cluster_studies_from_mongo needs to receive a file object cluster_studies_from_mongo(params.genome_assembly_dir) qc_clustering(cluster_studies_from_mongo.out.rs_report_filename) diff --git a/eva_submission/nextflow/validation.nf b/eva_submission/nextflow/validation.nf index ed3e1e8..42e5762 100644 --- a/eva_submission/nextflow/validation.nf +++ b/eva_submission/nextflow/validation.nf @@ -54,6 +54,8 @@ workflow { * Validate the VCF file format */ process check_vcf_valid { + label 'long_time', 'med_mem' + publishDir "$params.output_dir", overwrite: false, mode: "copy" @@ -80,6 +82,8 @@ process check_vcf_valid { * Validate the VCF reference allele */ process check_vcf_reference { + label 'long_time', 'med_mem' + publishDir "$params.output_dir", overwrite: true, mode: "copy" @@ -106,6 +110,8 @@ process check_vcf_reference { * Detect the structural variant in VCF */ process detect_sv { + label 'default_time', 'med_mem' + publishDir "$params.output_dir", overwrite: false, mode: "copy" diff --git a/tests/nextflow-tests/nextflow.config b/tests/nextflow-tests/nextflow.config index 873d076..19648d4 100644 --- a/tests/nextflow-tests/nextflow.config +++ b/tests/nextflow-tests/nextflow.config @@ -1,5 +1,9 @@ process { - executor='local' + executor = 'local' + + time = '30m' + memory = '5 GB' + withLabel: datamover { tag = 'data movement happens in this process' } diff --git a/tests/test_eload_brokering.py b/tests/test_eload_brokering.py index 80be290..f7f0772 100644 --- a/tests/test_eload_brokering.py +++ b/tests/test_eload_brokering.py @@ -106,7 +106,7 @@ def test_run_brokering_prep_workflow(self): self.eload._run_brokering_prep_workflow() m_execute.assert_called_once_with( 'Nextflow brokering preparation process', - f'path_to_nextflow {nf_script} -params-file {config_file} -work-dir {temp_dir}' + f'path_to_nextflow {nf_script} -params-file {config_file} -work-dir {temp_dir} ' ) def test_parse_bcftools_norm_report(self): diff --git a/tests/test_eload_ingestion.py b/tests/test_eload_ingestion.py index d85bff7..da1bae5 100644 --- a/tests/test_eload_ingestion.py +++ b/tests/test_eload_ingestion.py @@ -592,8 +592,7 @@ def _post_run_nextflow_assert(self, m_run_command, workflow_name, work_dir, resu command = (f'export NXF_OPTS="-Xms1g -Xmx8g"; ' f'/path/to/nextflow {nextflow_script} -params-file {self.eload.project_dir}/workflow_params.yaml ' f'-work-dir {work_dir} ') - if resume: - command += '-resume' + command += '-resume ' if resume else ' ' m_run_command.assert_called_once_with('Nextflow workflow process', command) with open(os.path.join(self.eload.project_dir, 'workflow_params.yaml')) as open_file: params = yaml.safe_load(open_file)