Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

EVA-3560: SLURM migration #206

Merged
merged 5 commits into from
May 10, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 24 additions & 19 deletions eva_submission/nextflow/accession_and_load.nf
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,7 @@ workflow {
* Convert the genome to the same naming convention as the VCF
*/
process prepare_genome {
label 'default_time', 'med_mem'

input:
tuple path(fasta), path(report), val(assembly_accession), path(vcf_files)
Expand All @@ -189,6 +190,8 @@ process prepare_genome {
* Normalise the VCF files
*/
process normalise_vcf {
label 'default_time', 'med_mem'

input:
tuple val(vcf_filename), path(fasta), path(vcf_file), path(csi_file)

Expand All @@ -208,11 +211,10 @@ process normalise_vcf {
* Accession VCFs
*/
process accession_vcf {
clusterOptions "-g /accession/instance-${params.instance_id} \
-o $params.logs_dir/${log_filename}.log \
-e $params.logs_dir/${log_filename}.err"
label 'long_time', 'med_mem'

memory '6.7 GB'
clusterOptions "-o $params.logs_dir/${log_filename}.log \
-e $params.logs_dir/${log_filename}.err"

input:
tuple val(vcf_filename), val(vcf_file), val(assembly_accession), val(aggregation), val(fasta), val(report)
Expand All @@ -236,7 +238,7 @@ process accession_vcf {


"""
(java -Xmx6g -jar $params.jar.accession_pipeline --spring.config.location=file:$params.accession_job_props $pipeline_parameters) || \
(java -Xmx${task.memory.toGiga()}G -jar $params.jar.accession_pipeline --spring.config.location=file:$params.accession_job_props $pipeline_parameters) || \
# If accessioning fails due to missing variants, but the only missing variants are structural variants,
# then we should treat this as a success from the perspective of the automation.
# TODO revert once accessioning pipeline properly registers structural variants
Expand All @@ -251,6 +253,8 @@ process accession_vcf {
* Sort and compress accessioned VCFs
*/
process sort_and_compress_vcf {
label 'default_time', 'med_mem'

publishDir params.public_dir,
mode: 'copy'

Expand All @@ -269,6 +273,8 @@ process sort_and_compress_vcf {


process csi_index_vcf {
label 'default_time', 'small_mem'

publishDir params.public_dir,
mode: 'copy'

Expand All @@ -288,7 +294,7 @@ process csi_index_vcf {
* Copy files from eva_public to FTP folder.
*/
process copy_to_ftp {
label 'datamover'
label 'datamover', 'short_time', 'small_mem'

input:
// ensures that all indices are done before we copy
Expand Down Expand Up @@ -316,6 +322,8 @@ process csi_index_vcf {
* Load into variant db.
*/
process load_variants_vcf {
label 'long_time', 'med_mem'

clusterOptions {
return "-o $params.logs_dir/load_variants.${vcf_filename}.log \
-e $params.logs_dir/load_variants.${vcf_filename}.err"
Expand All @@ -327,8 +335,6 @@ process load_variants_vcf {
output:
val true, emit: variant_load_complete

memory '5 GB'

script:
def pipeline_parameters = " --spring.batch.job.names=load-vcf-job"
pipeline_parameters += " --input.vcf.aggregation=" + aggregation.toString().toUpperCase()
Expand All @@ -338,7 +344,7 @@ process load_variants_vcf {
pipeline_parameters += " --spring.data.mongodb.database=" + db_name.toString()

"""
java -Xmx4G -jar $params.jar.eva_pipeline --spring.config.location=file:$params.load_job_props --parameters.path=$params.load_job_props $pipeline_parameters
java -Xmx${task.memory.toGiga()}G -jar $params.jar.eva_pipeline --spring.config.location=file:$params.load_job_props --parameters.path=$params.load_job_props $pipeline_parameters
apriltuesday marked this conversation as resolved.
Show resolved Hide resolved
"""
}

Expand All @@ -347,6 +353,8 @@ process load_variants_vcf {
* Run VEP using eva-pipeline.
*/
process run_vep_on_variants {
label 'long_time', 'med_mem'

clusterOptions {
return "-o $params.logs_dir/annotation.${analysis_accession}.log \
-e $params.logs_dir/annotation.${analysis_accession}.err"
Expand All @@ -362,8 +370,6 @@ process run_vep_on_variants {
output:
val true, emit: vep_run_complete

memory '5 GB'

script:
def pipeline_parameters = ""

Expand All @@ -380,7 +386,7 @@ process run_vep_on_variants {
pipeline_parameters += " --app.vep.cache.species=" + vep_species.toString()

"""
java -Xmx4G -jar $params.jar.eva_pipeline --spring.config.location=file:$params.load_job_props --parameters.path=$params.load_job_props $pipeline_parameters
java -Xmx${task.memory.toGiga()}G -jar $params.jar.eva_pipeline --spring.config.location=file:$params.load_job_props --parameters.path=$params.load_job_props $pipeline_parameters
"""
}

Expand All @@ -390,6 +396,8 @@ process run_vep_on_variants {
* Calculate statistics using eva-pipeline.
*/
process calculate_statistics_vcf {
label 'long_time', 'med_mem'

clusterOptions {
return "-o $params.logs_dir/statistics.${analysis_accession}.log \
-e $params.logs_dir/statistics.${analysis_accession}.err"
Expand All @@ -406,12 +414,9 @@ process calculate_statistics_vcf {
output:
val true, emit: statistics_calc_complete

memory '5 GB'

script:
def pipeline_parameters = ""


pipeline_parameters += " --spring.batch.job.names=calculate-statistics-job"

pipeline_parameters += " --input.vcf.aggregation=" + aggregation.toString().toUpperCase()
Expand All @@ -421,14 +426,16 @@ process calculate_statistics_vcf {
pipeline_parameters += " --spring.data.mongodb.database=" + db_name.toString()

"""
java -Xmx4G -jar $params.jar.eva_pipeline --spring.config.location=file:$params.load_job_props --parameters.path=$params.load_job_props $pipeline_parameters
java -Xmx${task.memory.toGiga()}G -jar $params.jar.eva_pipeline --spring.config.location=file:$params.load_job_props --parameters.path=$params.load_job_props $pipeline_parameters
"""
}

/*
* Import Accession Into Variant warehouse
*/
process import_accession {
label 'default_time', 'med_mem'

clusterOptions {
log_filename = vcf_file.getFileName().toString()
return "-o $params.logs_dir/acc_import.${log_filename}.log \
Expand All @@ -440,8 +447,6 @@ process import_accession {
val all_accession_complete
val variant_load_output

memory '5 GB'

script:
def pipeline_parameters = ""

Expand All @@ -451,6 +456,6 @@ process import_accession {
pipeline_parameters += " --spring.data.mongodb.database=" + db_name.toString()

"""
java -Xmx4G -jar $params.jar.eva_pipeline --spring.config.location=file:$params.acc_import_job_props --parameters.path=$params.acc_import_job_props $pipeline_parameters
java -Xmx${task.memory.toGiga()}G -jar $params.jar.eva_pipeline --spring.config.location=file:$params.acc_import_job_props --parameters.path=$params.acc_import_job_props $pipeline_parameters
"""
}
6 changes: 6 additions & 0 deletions eva_submission/nextflow/prepare_brokering.nf
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ workflow {
* Compress the VCF file
*/
process compress_vcf {
label 'default_time', 'small_mem'

input:
path vcf_file
Expand All @@ -85,6 +86,7 @@ process compress_vcf {
* Index the compressed VCF file
*/
process csi_index_vcf {
label 'default_time', 'small_mem'

input:
tuple val(input_vcf), path(compressed_vcf)
Expand All @@ -101,6 +103,7 @@ process csi_index_vcf {
* Convert the genome to the same naming convention as the VCF
*/
process prepare_genome {
label 'default_time', 'med_mem'

input:
tuple path(fasta), path(report), val(assembly_accession), path(vcf_files)
Expand All @@ -122,6 +125,8 @@ process prepare_genome {
* Normalise the VCF files
*/
process normalise_vcf {
label 'default_time', 'med_mem'

publishDir "$params.output_dir",
overwrite: false,
mode: "copy",
Expand All @@ -148,6 +153,7 @@ process normalise_vcf {
* md5 the compressed vcf and its index
*/
process md5_vcf_and_index {
label 'short_time', 'small_mem'

publishDir "$params.output_dir",
overwrite: true,
Expand Down
37 changes: 18 additions & 19 deletions eva_submission/nextflow/remap_and_cluster.nf
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ def helpMessage() {
params.source_assemblies = null
params.target_assembly_accession = null
params.species_name = null
params.memory = 8
params.logs_dir = null
// help
params.help = null
Expand All @@ -46,6 +45,8 @@ if (!params.taxonomy_id || !params.source_assemblies || !params.target_assembly_


process retrieve_source_genome {
label 'short_time', 'med_mem'

when:
source_assembly_accession != params.target_assembly_accession

Expand All @@ -65,6 +66,7 @@ process retrieve_source_genome {


process retrieve_target_genome {
label 'short_time', 'med_mem'

input:
val target_assembly_accession
Expand All @@ -82,6 +84,7 @@ process retrieve_target_genome {
}

process update_source_genome {
label 'short_time', 'med_mem'

input:
tuple val(source_assembly_accession), path(source_fasta), path(source_report)
Expand All @@ -96,6 +99,7 @@ process update_source_genome {
}

process update_target_genome {
label 'short_time', 'med_mem'

input:
path target_fasta
Expand All @@ -116,8 +120,7 @@ process update_target_genome {
* Extract the submitted variants to remap from the accessioning warehouse and store them in a VCF file.
*/
process extract_vcf_from_mongo {
memory "${params.memory}GB"
clusterOptions "-g /accession"
label 'long_time', 'med_mem'

when:
source_assembly_accession != params.target_assembly_accession
Expand All @@ -133,7 +136,7 @@ process extract_vcf_from_mongo {
publishDir "$params.logs_dir", overwrite: true, mode: "copy", pattern: "*.log*"

"""
java -Xmx8G -jar $params.jar.vcf_extractor \
java -Xmx${task.memory.toGiga()}G -jar $params.jar.vcf_extractor \
--spring.config.location=file:${params.extraction_properties} \
--parameters.assemblyAccession=${source_assembly_accession} \
--parameters.fasta=${source_fasta} \
Expand All @@ -147,7 +150,7 @@ process extract_vcf_from_mongo {
* Variant remapping pipeline
*/
process remap_variants {
memory "${params.memory}GB"
label 'long_time', 'med_mem'

input:
tuple val(source_assembly_accession), path(source_fasta), path(source_vcf)
Expand Down Expand Up @@ -184,8 +187,7 @@ process remap_variants {
* Ingest the remapped submitted variants from a VCF file into the accessioning warehouse.
*/
process ingest_vcf_into_mongo {
memory "${params.memory}GB"
clusterOptions "-g /accession"
label 'long_time', 'med_mem'

input:
tuple val(source_assembly_accession), path(remapped_vcf)
Expand All @@ -198,7 +200,7 @@ process ingest_vcf_into_mongo {

script:
"""
java -Xmx8G -jar $params.jar.vcf_ingestion \
java -Xmx${task.memory.toGiga()}G -jar $params.jar.vcf_ingestion \
--spring.config.location=file:${params.ingestion_properties} \
--parameters.remappedFrom=${source_assembly_accession} \
--parameters.vcf=${remapped_vcf} \
Expand All @@ -212,8 +214,7 @@ process ingest_vcf_into_mongo {
* Cluster target assembly.
*/
process cluster_studies_from_mongo {
memory "${params.memory}GB"
clusterOptions "-g /accession/instance-${params.clustering_instance}"
label 'long_time', 'med_mem'

input:
path ingestion_log
Expand All @@ -225,7 +226,7 @@ process cluster_studies_from_mongo {
publishDir "$params.logs_dir", overwrite: true, mode: "copy"

"""
java -Xmx8G -jar $params.jar.clustering \
java -Xmx${task.memory.toGiga()}G -jar $params.jar.clustering \
--spring.config.location=file:${params.clustering_properties} \
--spring.batch.job.names=STUDY_CLUSTERING_JOB \
> ${params.target_assembly_accession}_clustering.log
Expand All @@ -236,8 +237,7 @@ process cluster_studies_from_mongo {
* Run clustering QC job
*/
process qc_clustering {
memory "${params.memory}GB"
clusterOptions "-g /accession"
label 'long_time', 'med_mem'

input:
path rs_report
Expand All @@ -248,7 +248,7 @@ process qc_clustering {
publishDir "$params.logs_dir", overwrite: true, mode: "copy", pattern: "*.log*"

"""
java -Xmx8G -jar $params.jar.clustering \
java -Xmx${task.memory.toGiga()}G -jar $params.jar.clustering \
--spring.config.location=file:${params.clustering_properties} \
--spring.batch.job.names=NEW_CLUSTERED_VARIANTS_QC_JOB \
> ${params.target_assembly_accession}_clustering_qc.log
Expand All @@ -260,8 +260,7 @@ process qc_clustering {
* Run Back propagation of new clustered RS only if the remapping was performed
*/
process backpropagate_clusters {
memory "${params.memory}GB"
clusterOptions "-g /accession"
label 'long_time', 'med_mem'

input:
tuple val(source_assembly_accession), path(remapped_vcf)
Expand All @@ -273,7 +272,7 @@ process backpropagate_clusters {
publishDir "$params.logs_dir", overwrite: true, mode: "copy", pattern: "*.log*"

"""
java -Xmx8G -jar $params.jar.clustering \
java -Xmx${task.memory.toGiga()}G -jar $params.jar.clustering \
--spring.config.location=file:${params.clustering_properties} \
--parameters.remappedFrom=${source_assembly_accession} \
--spring.batch.job.names=BACK_PROPAGATE_NEW_RS_JOB \
Expand All @@ -286,7 +285,7 @@ workflow {
species_name = params.species_name.toLowerCase().replace(" ", "_")

remapping_required = params.source_assemblies.any {it != params.target_assembly_accession}
if (remapping_required){
if (remapping_required) {
retrieve_source_genome(params.source_assemblies, species_name)
retrieve_target_genome(params.target_assembly_accession, species_name)
update_source_genome(retrieve_source_genome.out.source_assembly, params.remapping_config)
Expand All @@ -300,7 +299,7 @@ workflow {
// to make sure it does not run out of values when multiple remapping are performed
// See https://www.nextflow.io/docs/latest/process.html#multiple-input-channels
backpropagate_clusters(remap_variants.out.remapped_vcfs, qc_clustering.out.clustering_qc_log_filename)
}else{
} else {
// We're using params.genome_assembly_dir because cluster_studies_from_mongo needs to receive a file object
cluster_studies_from_mongo(params.genome_assembly_dir)
qc_clustering(cluster_studies_from_mongo.out.rs_report_filename)
Expand Down
Loading
Loading