diff --git a/eva_submission/nextflow/accession_and_load.nf b/eva_submission/nextflow/accession_and_load.nf index ba4b041..406d1a8 100644 --- a/eva_submission/nextflow/accession_and_load.nf +++ b/eva_submission/nextflow/accession_and_load.nf @@ -139,8 +139,18 @@ workflow { .combine(normalise_vcf.out.vcf_tuples, by:0) .map{tuple(it[0], it[9], it[2], it[3], it[4], it[5], it[6], it[7], it[8])} // vcf_filename, normalised vcf, fasta, analysis_accession, db_name, vep_version, vep_cache_version, vep_species, aggregation load_variants_vcf(normalised_vcfs_ch) - run_vep_on_variants(normalised_vcfs_ch, load_variants_vcf.out.variant_load_complete) - calculate_statistics_vcf(normalised_vcfs_ch, load_variants_vcf.out.variant_load_complete) + // Ensure that all the load are completed before the VEP and calculate statistics starts + vep_ch = normalised_vcfs_ch + .groupTuple(by: [4, 5, 6, 7] ) // group by db_name, vep_version, vep_cache_version, vep_species + .map{tuple(it[4], it[5], it[6], it[7])} + run_vep_on_variants(vep_ch, load_variants_vcf.out.variant_load_complete.collect()) + normalised_vcfs_ch.view() + stats_ch = normalised_vcfs_ch + .groupTuple(by: [3, 4, 8]) // group by analysis_accession, db_name, aggregation + .map{tuple(it[3], it[4], it[8], it[1])} // analysis_accession, db_name, aggregation, grouped normalised_vcf_files + stats_ch.view() + + calculate_statistics_vcf(stats_ch, load_variants_vcf.out.variant_load_complete.collect()) if (!is_human_study) { vcf_files_dbname = Channel.fromPath(params.valid_vcfs) @@ -348,7 +358,8 @@ process run_vep_on_variants { vep_version.trim() != "" && vep_cache_version.trim() != "" input: - tuple val(vcf_filename), val(vcf_file), val(fasta), val(analysis_accession), val(db_name), val(vep_version), val(vep_cache_version), val(vep_species), val(aggregation) + //tuple val(vcf_filename), val(vcf_file), val(fasta), val(analysis_accession), val(db_name), val(vep_version), val(vep_cache_version), val(vep_species), val(aggregation) + tuple val(db_name), val(vep_version), val(vep_cache_version), val(vep_species) val variant_load_complete output: @@ -360,11 +371,7 @@ process run_vep_on_variants { def pipeline_parameters = "" pipeline_parameters += " --spring.batch.job.names=annotate-variants-job" - - pipeline_parameters += " --input.vcf.aggregation=" + aggregation.toString().toUpperCase() - pipeline_parameters += " --input.vcf=" + vcf_file.toRealPath().toString() - pipeline_parameters += " --input.vcf.id=" + analysis_accession.toString() - pipeline_parameters += " --input.fasta=" + fasta.toString() + pipeline_parameters += " --input.vcf.id=" // The whole study will be annotated again pipeline_parameters += " --spring.data.mongodb.database=" + db_name.toString() @@ -395,7 +402,7 @@ process calculate_statistics_vcf { aggregation.toString() == "none" input: - tuple val(vcf_filename), val(vcf_file), val(fasta), val(analysis_accession), val(db_name), val(vep_version), val(vep_cache_version), val(vep_species), val(aggregation) + tuple val(analysis_accession), val(db_name), val(aggregation), val(vcf_files) val variant_load_complete output: @@ -410,9 +417,8 @@ process calculate_statistics_vcf { pipeline_parameters += " --spring.batch.job.names=calculate-statistics-job" pipeline_parameters += " --input.vcf.aggregation=" + aggregation.toString().toUpperCase() - pipeline_parameters += " --input.vcf=" + vcf_file.toRealPath().toString() + pipeline_parameters += " --input.vcf=" + file(vcf_files[0]).toRealPath().toString() // If there are multiple file only use the first pipeline_parameters += " --input.vcf.id=" + analysis_accession.toString() - pipeline_parameters += " --input.fasta=" + fasta.toString() pipeline_parameters += " --spring.data.mongodb.database=" + db_name.toString()