Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

EVA-3457 - Split variant load #195

Merged
merged 5 commits into from
Jan 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 6 additions & 8 deletions eva_submission/eload_ingestion.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@

class EloadIngestion(Eload):
config_section = 'ingestion' # top-level config key
all_tasks = ['metadata_load', 'accession', 'variant_load', 'annotation', 'optional_remap_and_cluster']
all_tasks = ['metadata_load', 'accession', 'variant_load', 'optional_remap_and_cluster']
nextflow_complete_value = '<complete>'

def __init__(self, eload_number, config_object: EloadConfig = None):
Expand Down Expand Up @@ -78,17 +78,16 @@ def ingest(
self.update_assembly_set_in_analysis()
do_accession = 'accession' in tasks
do_variant_load = 'variant_load' in tasks
annotation_only = 'annotation' in tasks and not do_variant_load

if instance_id:
self.eload_cfg.set(self.config_section, 'accession', 'instance_id', value=instance_id)
if do_accession:
self.update_config_with_hold_date(self.project_accession)

if do_accession or do_variant_load or annotation_only:
if do_accession or do_variant_load:
self.fill_vep_versions(vep_cache_assembly_name)
vcf_files_to_ingest = self._generate_csv_mappings_to_ingest()
self.run_accession_and_load_workflow(vcf_files_to_ingest, annotation_only, resume=resume, tasks=tasks)
self.run_accession_and_load_workflow(vcf_files_to_ingest, resume=resume, tasks=tasks)

if 'optional_remap_and_cluster' in tasks:
self.eload_cfg.set(self.config_section, 'clustering', 'instance_id', value=clustering_instance_id)
Expand All @@ -99,7 +98,7 @@ def ingest(
else:
self.warning(f'Could not find any current supported assembly for the submission, skipping clustering')

if do_accession or do_variant_load or annotation_only:
if do_accession or do_variant_load:
self._update_metadata_post_ingestion()

def _update_metadata_post_ingestion(self):
Expand Down Expand Up @@ -308,7 +307,7 @@ def _generate_csv_mappings_to_ingest(self):
self.warning(f"VCF files for analysis {analysis_alias} not found")
return vcf_files_to_ingest

def run_accession_and_load_workflow(self, vcf_files_to_ingest, annotation_only, resume, tasks=None):
def run_accession_and_load_workflow(self, vcf_files_to_ingest, resume, tasks=None):
instance_id = self.eload_cfg.query(self.config_section, 'accession', 'instance_id')
output_dir = os.path.join(self.project_dir, project_dirs['accessions'])
accession_properties_file = self.create_accession_properties(
Expand All @@ -331,12 +330,11 @@ def run_accession_and_load_workflow(self, vcf_files_to_ingest, annotation_only,
'executable': cfg['executable'],
'jar': cfg['jar'],
'taxonomy': self.taxonomy,
'annotation_only': annotation_only,
'accession_job_props': accession_properties_file,
'load_job_props': variant_load_properties_file,
'acc_import_job_props': accession_import_properties_file,
}
tasks = [task for task in tasks if task in ['accession', 'variant_load', 'annotation']]
tasks = [task for task in tasks if task in ['accession', 'variant_load']]
self.run_nextflow('accession_and_load', accession_config, resume, tasks)

def run_remap_and_cluster_workflow(self, target_assembly, resume):
Expand Down
113 changes: 90 additions & 23 deletions eva_submission/nextflow/accession_and_load.nf
Original file line number Diff line number Diff line change
Expand Up @@ -132,13 +132,15 @@ workflow {
csi_index_vcf(csi_vcfs)
copy_to_ftp(csi_index_vcf.out.csi_indexed_vcf.toList(), accessioned_files_to_rm.toList())
}
if ("variant_load" in params.ingestion_tasks || "annotation" in params.ingestion_tasks) {
if ("variant_load" in params.ingestion_tasks) {
normalised_vcfs_ch = Channel.fromPath(params.valid_vcfs)
.splitCsv(header:true)
.map{row -> tuple(file(row.vcf_file).name, file(row.vcf_file), file(row.fasta), row.analysis_accession, row.db_name, row.vep_version, row.vep_cache_version, row.vep_species, row.aggregation)}
.combine(normalise_vcf.out.vcf_tuples, by:0)
.map{tuple(it[0], it[9], it[2], it[3], it[4], it[5], it[6], it[7], it[8])} // vcf_filename, normalised vcf, fasta, analysis_accession, db_name, vep_version, vep_cache_version, vep_species, aggregation
load_vcf(normalised_vcfs_ch)
load_variants_vcf(normalised_vcfs_ch)
run_vep_on_variants(normalised_vcfs_ch, load_variants_vcf.out.variant_load_complete)
calculate_statistics_vcf(normalised_vcfs_ch, load_variants_vcf.out.variant_load_complete)

if (!is_human_study) {
vcf_files_dbname = Channel.fromPath(params.valid_vcfs)
Expand All @@ -147,7 +149,7 @@ workflow {
// the vcf_files_dbname give the link between input file and all_accession_complete is to ensure the
// accessioning has been completed
if (all_accession_complete){
import_accession(vcf_files_dbname, all_accession_complete, load_vcf.out.variant_load_complete)
import_accession(vcf_files_dbname, all_accession_complete, load_variants_vcf.out.variant_load_complete)
}
}
}
Expand Down Expand Up @@ -302,14 +304,13 @@ process csi_index_vcf {
"""
}


/*
* Load into variant db.
*/
process load_vcf {
process load_variants_vcf {
clusterOptions {
return "-o $params.logs_dir/pipeline.${vcf_filename}.log \
-e $params.logs_dir/pipeline.${vcf_filename}.err"
return "-o $params.logs_dir/load_variants.${vcf_filename}.log \
-e $params.logs_dir/load_variants.${vcf_filename}.err"
}

input:
Expand All @@ -321,39 +322,105 @@ process load_vcf {
memory '5 GB'

script:
def pipeline_parameters = ""
def pipeline_parameters = " --spring.batch.job.names=load-vcf-job"
pipeline_parameters += " --input.vcf.aggregation=" + aggregation.toString().toUpperCase()
pipeline_parameters += " --input.vcf=" + vcf_file.toRealPath().toString()
pipeline_parameters += " --input.vcf.id=" + analysis_accession.toString()
pipeline_parameters += " --input.fasta=" + fasta.toString()
pipeline_parameters += " --spring.data.mongodb.database=" + db_name.toString()

if (params.annotation_only) {
pipeline_parameters += " --spring.batch.job.names=annotate-variants-job"
} else if(aggregation.toString() == "none"){
pipeline_parameters += " --spring.batch.job.names=genotyped-vcf-job"
} else{
pipeline_parameters += " --spring.batch.job.names=aggregated-vcf-job"
"""
java -Xmx4G -jar $params.jar.eva_pipeline --spring.config.location=file:$params.load_job_props --parameters.path=$params.load_job_props $pipeline_parameters
"""
}


/*
* Run VEP using eva-pipeline.
*/
process run_vep_on_variants {
clusterOptions {
return "-o $params.logs_dir/annotation.${vcf_filename}.log \
-e $params.logs_dir/annotation.${vcf_filename}.err"
}

when:
vep_version.trim() != "" && vep_cache_version.trim() != ""

input:
tuple val(vcf_filename), val(vcf_file), val(fasta), val(analysis_accession), val(db_name), val(vep_version), val(vep_cache_version), val(vep_species), val(aggregation)
val variant_load_complete

output:
val true, emit: vep_run_complete
apriltuesday marked this conversation as resolved.
Show resolved Hide resolved

memory '5 GB'

script:
def pipeline_parameters = ""

pipeline_parameters += " --spring.batch.job.names=annotate-variants-job"

pipeline_parameters += " --input.vcf.aggregation=" + aggregation.toString().toUpperCase()
pipeline_parameters += " --input.vcf=" + vcf_file.toRealPath().toString()
pipeline_parameters += " --input.vcf.id=" + analysis_accession.toString()
pipeline_parameters += " --input.fasta=" + fasta.toString()

pipeline_parameters += " --spring.data.mongodb.database=" + db_name.toString()

if (vep_version.trim() == "" || vep_cache_version.trim() == "") {
pipeline_parameters += " --annotation.skip=true"
} else {
pipeline_parameters += " --annotation.skip=false"
pipeline_parameters += " --app.vep.version=" + vep_version.toString()
pipeline_parameters += " --app.vep.path=" + "${params.vep_path}/ensembl-vep-release-${vep_version}/vep"
pipeline_parameters += " --app.vep.cache.version=" + vep_cache_version.toString()
pipeline_parameters += " --app.vep.cache.species=" + vep_species.toString()
}
pipeline_parameters += " --annotation.skip=false"
pipeline_parameters += " --app.vep.version=" + vep_version.toString()
pipeline_parameters += " --app.vep.path=" + "${params.vep_path}/ensembl-vep-release-${vep_version}/vep"
pipeline_parameters += " --app.vep.cache.version=" + vep_cache_version.toString()
pipeline_parameters += " --app.vep.cache.species=" + vep_species.toString()

"""
java -Xmx4G -jar $params.jar.eva_pipeline --spring.config.location=file:$params.load_job_props --parameters.path=$params.load_job_props $pipeline_parameters
"""
}



/*
* Calculate statistics using eva-pipeline.
*/
process calculate_statistics_vcf {
clusterOptions {
return "-o $params.logs_dir/statistics.${vcf_filename}.log \
-e $params.logs_dir/statistics.${vcf_filename}.err"
}

when:
// Statistics calculation is not required for Already aggregated analysis/study
aggregation.toString() == "none"

input:
tuple val(vcf_filename), val(vcf_file), val(fasta), val(analysis_accession), val(db_name), val(vep_version), val(vep_cache_version), val(vep_species), val(aggregation)
val variant_load_complete

output:
val true, emit: statistics_calc_complete

memory '5 GB'

script:
def pipeline_parameters = ""


pipeline_parameters += " --spring.batch.job.names=calculate-statistics-job"

pipeline_parameters += " --input.vcf.aggregation=" + aggregation.toString().toUpperCase()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I'm understanding the old job configurations correctly, we should only run statistics calculation for genotyped vcf.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a good point and one I was not aware of.
I'm a bit surprised by this considering that there are both variant level and study level statistics in the mongodb database. Maybe the study level is calculated in the Variant Load step.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if this is why this assertion passes in your test, even though the stats step hasn't been run...

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes that might be something else to investigate

pipeline_parameters += " --input.vcf=" + vcf_file.toRealPath().toString()
pipeline_parameters += " --input.vcf.id=" + analysis_accession.toString()
pipeline_parameters += " --input.fasta=" + fasta.toString()

pipeline_parameters += " --spring.data.mongodb.database=" + db_name.toString()

"""
java -Xmx4G -jar $params.jar.eva_pipeline --spring.config.location=file:$params.load_job_props --parameters.path=$params.load_job_props $pipeline_parameters
"""
}

/*
* Import Accession Into Variant warehouse
*/
Expand Down
25 changes: 2 additions & 23 deletions tests/test_eload_ingestion.py
Original file line number Diff line number Diff line change
Expand Up @@ -391,27 +391,6 @@ def test_ingest_variant_load_vep_versions_error(self):
config_file = os.path.join(self.resources_folder, 'projects/PRJEB12345/accession_and_load_params.yaml')
assert not os.path.exists(config_file)

def test_ingest_annotation_only(self):
with self._patch_metadata_handle(), \
patch.object(EloadIngestion, '_update_metadata_post_ingestion') as m_post_load_metadata, \
patch('eva_submission.eload_ingestion.get_all_results_for_query') as m_get_results, \
patch('eva_submission.eload_ingestion.command_utils.run_command_with_output', autospec=True), \
patch('eva_submission.eload_utils.get_metadata_connection_handle', autospec=True), \
patch('eva_submission.eload_utils.get_all_results_for_query') as m_get_alias_results, \
patch('eva_submission.eload_ingestion.get_vep_and_vep_cache_version') as m_get_vep_versions, \
patch('eva_submission.eload_ingestion.get_species_name_from_ncbi') as m_get_species, \
patch('eva_submission.eload_utils.requests.post') as m_post, \
patch('eva_submission.eload_ingestion.insert_new_assembly_and_taxonomy') as insert_asm_tax, \
self._patch_mongo_database():
m_get_alias_results.return_value = [['alias']]
m_get_vep_versions.return_value = (100, 100)
m_get_species.return_value = 'homo_sapiens'
m_post.return_value.text = self.get_mock_result_for_ena_date()
m_get_results.side_effect = default_db_results_for_accession()
self.eload.ingest(tasks=['annotation'])
assert os.path.exists(
os.path.join(self.resources_folder, 'projects/PRJEB12345/accession_and_load_params.yaml')
)

def test_ingest_clustering(self):
with self._patch_metadata_handle(), \
Expand Down Expand Up @@ -502,12 +481,12 @@ def test_resume_when_step_fails(self):

with self.assertRaises(subprocess.CalledProcessError):
self.eload.ingest()
for task in ['accession', 'variant_load', 'annotation']:
for task in ['accession', 'variant_load']:
nextflow_dir = self.eload.eload_cfg.query(self.eload.config_section, 'accession_and_load', 'nextflow_dir', task)
assert os.path.exists(nextflow_dir)

self.eload.ingest(resume=True)
for task in ['accession', 'variant_load', 'annotation']:
for task in ['accession', 'variant_load']:
assert self.eload.eload_cfg.query(self.eload.config_section, 'accession_and_load', 'nextflow_dir', task) == '<complete>'
assert not os.path.exists(nextflow_dir)

Expand Down
10 changes: 5 additions & 5 deletions tests/test_vep_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,10 +64,10 @@ def fake_dir(path, callback):

def test_get_vep_versions_from_ensembl(self):
vep_version, cache_version = get_vep_and_vep_cache_version_from_ensembl('GCA_000827895.1')
self.assertEqual(vep_version, 110)
self.assertEqual(cache_version, 57)
self.assertEqual(vep_version, 111)
self.assertEqual(cache_version, 58)
assert os.path.exists(os.path.join(cfg['vep_cache_path'], 'thelohanellus_kitauei'))
assert os.listdir(os.path.join(cfg['vep_cache_path'], 'thelohanellus_kitauei')) == ['57_ASM82789v1']
assert os.listdir(os.path.join(cfg['vep_cache_path'], 'thelohanellus_kitauei')) == ['58_ASM82789v1']

def test_get_vep_versions_from_ensembl_not_found(self):
vep_version, cache_version = get_vep_and_vep_cache_version_from_ensembl('GCA_015220235.1')
Expand Down Expand Up @@ -142,8 +142,8 @@ def test_get_species_and_assembly(self):
'GCA_000181335.4': ('felis_catus', 'Felis_catus_9.0', True, '9685'),
'GCA_000473445.2': ('anopheles_farauti', 'Anop_fara_FAR1_V2', True, '69004'),
'GCA_001704415.1': ('capra_hircus', 'ARS1', True, '9925'),
'GCA_002263795.2': ('bos_taurus', 'ARS-UCD1.2', True, '9913'),
'GCA_002742125.1': ('ovis_aries_rambouillet', 'Oar_rambouillet_v1.0', True, '9940'),
'GCA_002263795.2': ('bos_taurus', 'ARS-UCD1.2', False, '9913'),
'GCA_002742125.1': ('ovis_aries_rambouillet', 'Oar_rambouillet_v1.0', False, '9940'),
'GCA_002863925.1': ('equus_caballus', 'EquCab3.0', True, '9796')
}
for assembly in assemblies2results:
Expand Down
Loading