Skip to content

Commit

Permalink
remove original nextflow files and refactor the tests
Browse files Browse the repository at this point in the history
  • Loading branch information
tcezard committed Oct 30, 2023
1 parent 3c0245a commit 9155bce
Show file tree
Hide file tree
Showing 8 changed files with 188 additions and 441 deletions.
30 changes: 20 additions & 10 deletions eva_submission/eload_ingestion.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ def ingest(
if do_accession or do_variant_load or annotation_only:
self.fill_vep_versions(vep_cache_assembly_name)
vcf_files_to_ingest = self._generate_csv_mappings_to_ingest()
self.run_accession_and_load_workflow(vcf_files_to_ingest, annotation_only, resume=resume)
self.run_accession_and_load_workflow(vcf_files_to_ingest, annotation_only, resume=resume, tasks=tasks)

if 'optional_remap_and_cluster' in tasks:
self.eload_cfg.set(self.config_section, 'clustering', 'instance_id', value=clustering_instance_id)
Expand All @@ -100,10 +100,13 @@ def ingest(
else:
self.warning(f'Could not find any current supported assembly for the submission, skipping clustering')

# update metadata
self.update_loaded_assembly_in_browsable_files()
if do_accession or do_variant_load or annotation_only:
self._update_metadata_post_ingestion()

def _update_metadata_post_ingestion(self):
self.insert_browsable_files()
self.update_browsable_files_with_date()
self.update_loaded_assembly_in_browsable_files()
self.update_files_with_ftp_path()
self.refresh_study_browser()

Expand Down Expand Up @@ -305,7 +308,7 @@ def _generate_csv_mappings_to_ingest(self):
self.warning(f"VCF files for analysis {analysis_alias} not found")
return vcf_files_to_ingest

def run_accession_and_load_workflow(self, vcf_files_to_ingest, annotation_only, resume):
def run_accession_and_load_workflow(self, vcf_files_to_ingest, annotation_only, resume, tasks=None):
instance_id = self.eload_cfg.query(self.config_section, 'accession', 'instance_id')
output_dir = os.path.join(self.project_dir, project_dirs['accessions'])
accession_properties_file = self.create_accession_properties(
Expand All @@ -331,9 +334,10 @@ def run_accession_and_load_workflow(self, vcf_files_to_ingest, annotation_only,
'annotation_only': annotation_only,
'accession_job_props': accession_properties_file,
'load_job_props': variant_load_properties_file,
'acc_import_job_props': accession_import_properties_file
'acc_import_job_props': accession_import_properties_file,
'tasks': tasks
}
self.run_nextflow('accession_and_load', accession_config, resume)
self.run_nextflow('accession_and_load', accession_config, resume, tasks)

def run_remap_and_cluster_workflow(self, target_assembly, resume):
clustering_instance = self.eload_cfg.query(self.config_section, 'clustering', 'instance_id')
Expand Down Expand Up @@ -626,15 +630,21 @@ def refresh_study_browser(self):
def valid_vcf_filenames(self):
return list(self.project_dir.joinpath(project_dirs['valid']).glob('*.vcf.gz'))

def run_nextflow(self, workflow_name, params, resume):
def run_nextflow(self, workflow_name, params, resume, tasks=None):
"""
Runs a Nextflow workflow using the provided parameters.
This will create a Nextflow work directory and delete it if the process completes successfully.
If the process fails, the work directory is preserved and the process can be resumed.
"""
work_dir = None
if tasks and tasks is not self.all_tasks:
# The subsection name combine the workflow and the tasks to ensure resumability only applies to a workflow
# and its tasks
subsection_name = workflow_name + '_' + '_'.join(sorted(tasks))
else:
subsection_name = workflow_name
if resume:
work_dir = self.eload_cfg.query(self.config_section, workflow_name, 'nextflow_dir')
work_dir = self.eload_cfg.query(self.config_section, subsection_name, 'nextflow_dir')
if work_dir == self.nextflow_complete_value:
self.info(f'Nextflow {workflow_name} pipeline already completed, skipping.')
return
Expand All @@ -643,7 +653,7 @@ def run_nextflow(self, workflow_name, params, resume):
work_dir = None
if not resume or not work_dir:
work_dir = self.create_nextflow_temp_output_directory(base=self.project_dir)
self.eload_cfg.set(self.config_section, workflow_name, 'nextflow_dir', value=work_dir)
self.eload_cfg.set(self.config_section, subsection_name, 'nextflow_dir', value=work_dir)

params_file = os.path.join(self.project_dir, f'{workflow_name}_params.yaml')
with open(params_file, 'w') as open_file:
Expand All @@ -662,7 +672,7 @@ def run_nextflow(self, workflow_name, params, resume):
))
)
shutil.rmtree(work_dir)
self.eload_cfg.set(self.config_section, str(workflow_name), 'nextflow_dir',
self.eload_cfg.set(self.config_section, str(subsection_name), 'nextflow_dir',
value=self.nextflow_complete_value)
except subprocess.CalledProcessError as e:
error_msg = f'Nextflow {workflow_name} pipeline failed: results might not be complete.'
Expand Down
209 changes: 0 additions & 209 deletions eva_submission/nextflow/accession.nf

This file was deleted.

65 changes: 37 additions & 28 deletions eva_submission/nextflow/accession_and_load.nf
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ params.annotation_only = null
params.executable = ["bcftools": "bcftools", "tabix": "tabix", "bgzip": "bgzip"]
// java jars
params.jar = ["accession_pipeline": "accession_pipeline", "eva_pipeline": "eva_pipeline"]
// ingestion tasks
params.ingestion_tasks = ["metadata_load", "accession", "variant_load", "annotation", "optional_remap_and_cluster"]
// help
params.help = null

Expand Down Expand Up @@ -92,36 +94,43 @@ human study:
- Initialize csi_vcfs with values enabling them to start the processes "csi_index_vcf".
*/
workflow {
all_accession_complete = null
is_human_study = (params.taxonomy == 9606)
if (is_human_study) {
csi_vcfs = Channel.fromPath(params.valid_vcfs)
.splitCsv(header:true)
.map{row -> tuple(file(row.vcf_file))}
accessioned_files_to_rm = Channel.empty()
} else {
valid_vcfs = Channel.fromPath(params.valid_vcfs)
.splitCsv(header:true)
.map{row -> tuple(file(row.vcf_file), row.assembly_accession, row.aggregation, file(row.fasta), file(row.report))}
accession_vcf(valid_vcfs)
sort_and_compress_vcf(accession_vcf.out.accession_done)
csi_vcfs = sort_and_compress_vcf.out.compressed_vcf
accessioned_files_to_rm = accession_vcf.out.accessioned_filenames
if ("accession" in params.ingestion_tasks) {
if (is_human_study) {
csi_vcfs = Channel.fromPath(params.valid_vcfs)
.splitCsv(header:true)
.map{row -> tuple(file(row.vcf_file))}
accessioned_files_to_rm = Channel.empty()
} else {
valid_vcfs = Channel.fromPath(params.valid_vcfs)
.splitCsv(header:true)
.map{row -> tuple(file(row.vcf_file), row.assembly_accession, row.aggregation, file(row.fasta), file(row.report))}
accession_vcf(valid_vcfs)
sort_and_compress_vcf(accession_vcf.out.accession_done)
csi_vcfs = sort_and_compress_vcf.out.compressed_vcf
accessioned_files_to_rm = accession_vcf.out.accessioned_filenames
all_accession_complete = sort_and_compress_vcf.out.compressed_vcf
}
csi_index_vcf(csi_vcfs)
copy_to_ftp(csi_index_vcf.out.csi_indexed_vcf.toList(), accessioned_files_to_rm.toList())
}
csi_index_vcf(csi_vcfs)
copy_to_ftp(csi_index_vcf.out.csi_indexed_vcf.toList(), accessioned_files_to_rm.toList())

annotated_vcfs = Channel.fromPath(params.valid_vcfs)
.splitCsv(header:true)
.map{row -> tuple(file(row.vcf_file), file(row.fasta), row.analysis_accession, row.db_name, row.vep_version, row.vep_cache_version, row.vep_species, row.aggregation)}
load_vcf(annotated_vcfs)

if (!is_human_study) {
vcf_files_dbname = Channel.fromPath(params.valid_vcfs)
if ("variant_load" in params.ingestion_tasks || "annotation" in params.ingestion_tasks) {
annotated_vcfs = Channel.fromPath(params.valid_vcfs)
.splitCsv(header:true)
.map{row -> tuple(file(row.vcf_file), row.db_name)}
// the vcf_files_dbname give the link between input file and compressed_vcf is to ensure the accessioning has
// been completed
import_accession(vcf_files_dbname, sort_and_compress_vcf.out.compressed_vcf, load_vcf.out.variant_load_complete)
.map{row -> tuple(file(row.vcf_file), file(row.fasta), row.analysis_accession, row.db_name, row.vep_version, row.vep_cache_version, row.vep_species, row.aggregation)}
load_vcf(annotated_vcfs)

if (!is_human_study) {
vcf_files_dbname = Channel.fromPath(params.valid_vcfs)
.splitCsv(header:true)
.map{row -> tuple(file(row.vcf_file), row.db_name)}
// the vcf_files_dbname give the link between input file and all_accession_complete is to ensure the
// accessioning has been completed
if (all_accession_complete){
import_accession(vcf_files_dbname, all_accession_complete, load_vcf.out.variant_load_complete)
}
}
}
}

Expand Down Expand Up @@ -300,7 +309,7 @@ process import_accession {

input:
tuple val(vcf_file), val(db_name)
path compressed_vcf
val all_accession_complete
val variant_load_output

memory '5 GB'
Expand Down
Loading

0 comments on commit 9155bce

Please sign in to comment.