Skip to content

Commit

Permalink
First path at adding multy taxonomy support
Browse files Browse the repository at this point in the history
  • Loading branch information
tcezard committed Dec 20, 2023
1 parent 8eb8012 commit c86ccf0
Show file tree
Hide file tree
Showing 2 changed files with 81 additions and 38 deletions.
3 changes: 0 additions & 3 deletions bin/add_target_assembly.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,6 @@ def main():

load_config()

if not args.taxonomy or not args.target_assembly or not args.release_version:
raise ArgumentError(None, 'Must provide --taxonomy, --target_assembly, and --release_version')

job = AssemblyIngestionJob(args.taxonomy, args.target_assembly, args.release_version)
logging_config.add_stdout_handler()

Expand Down
116 changes: 81 additions & 35 deletions eva_assembly_ingestion/assembly_ingestion_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import datetime
import os
import subprocess
from functools import lru_cache

import yaml
from cached_property import cached_property
Expand All @@ -33,22 +34,38 @@
from eva_assembly_ingestion.parse_counts import count_variants_extracted, count_variants_remapped, \
count_variants_ingested

SUPPORTED_ASSEMBLY_TRACKER_TABLE = "evapro.supported_assembly_tracker"

class AssemblyIngestionJob(AppLogger):
all_tasks = ['load_tracker', 'remap_cluster', 'update_dbs']
tracking_table = 'eva_progress_tracker.remapping_tracker'

def __init__(self, taxonomy, target_assembly, release_version):
self.taxonomy = taxonomy
self.target_assembly = target_assembly
self.release_version = release_version
self.private_settings_file = cfg['maven']['settings_file']
self.maven_profile = cfg['maven']['environment']
self.properties_generator = SpringPropertiesGenerator(self.maven_profile, self.private_settings_file)
self.source_taxonomy = taxonomy

@lru_cache
def scientific_name(self, taxonomy):
return get_scientific_name_from_taxonomy(taxonomy)

@cached_property
def scientific_name(self):
return get_scientific_name_from_taxonomy(self.taxonomy)
def taxonomies(self):
# The taxonomy involved are all the taxonomies that have the same current target assembly
taxonomy_query = (
f"select taxonomy from SUPPORTED_ASSEMBLY_TRACKER_TABLE where current=true AND assembly_id in ("
f" SELECT assembly_id FROM {SUPPORTED_ASSEMBLY_TRACKER_TABLE} "
f" WHERE taxonomy_id={self.source_taxonomy} AND current=true"
f");"
)
with get_metadata_connection_handle(self.maven_profile, self.private_settings_file) as pg_conn:
taxonomies = (t for t, in get_all_results_for_query(pg_conn, taxonomy_query))
if not taxonomies:
taxonomies = (self.source_taxonomy,)
return taxonomies

def run_all(self, tasks, instance, source_of_assembly, resume):
if 'load_tracker' in tasks:
Expand All @@ -59,13 +76,13 @@ def run_all(self, tasks, instance, source_of_assembly, resume):
self.update_dbs(source_of_assembly)

def load_tracker(self):
"""Load the tracking table with the source assemblies for this taxonomy. Will not load anything if jobs in
"""Load the tracking table with the source assemblies for these taxonomies. Will not load anything if jobs in
the tracker already exist for this taxonomy/target assembly pair."""
header_to_print = (
'Sources', 'Taxonomy', 'Scientific Name', 'Assembly', 'Target Assembly', 'Num Studies', 'Status')
'Sources', 'Taxonomies', 'Scientific Name', 'Assembly', 'Target Assembly', 'Num Studies', 'Status')
existing_jobs = self.get_job_information_from_tracker()
if existing_jobs:
self.warning(f'Jobs already exist for taxonomy {self.taxonomy} and target assembly {self.target_assembly}, '
self.warning(f'Jobs already exist for taxonomies {self.taxonomies} and target assembly {self.target_assembly}, '
f'not loading anything new.')
pretty_print(header_to_print, existing_jobs)
return
Expand All @@ -75,18 +92,18 @@ def load_tracker(self):
'remapping_status')
rows = []
rows_to_print = []
for source_assembly, projects in self.get_source_assemblies_and_projects():
rows.append(('EVA', self.taxonomy, self.scientific_name, source_assembly, self.target_assembly,
for source_assembly, taxonomy, projects in self.get_source_assemblies_and_projects():
rows.append(('EVA', taxonomy, self.scientific_name, source_assembly, self.target_assembly,
1, self.release_version, len(projects), 1, None, 'Pending'))
rows_to_print.append(('EVA', self.taxonomy, self.scientific_name, source_assembly, self.target_assembly,
rows_to_print.append(('EVA', taxonomy, self.scientific_name, source_assembly, self.target_assembly,
len(projects), 'Pending'))
for source_assembly, num_studies in self.get_source_assemblies_and_num_studies_dbsnp():
rows.append(('DBSNP', self.taxonomy, self.scientific_name, source_assembly, self.target_assembly,
for source_assembly, taxonomy, num_studies in self.get_source_assemblies_and_num_studies_dbsnp():
rows.append(('DBSNP', taxonomy, self.scientific_name, source_assembly, self.target_assembly,
1, self.release_version, num_studies, 1, None, 'Pending'))
rows_to_print.append(('DBSNP', self.taxonomy, self.scientific_name, source_assembly, self.target_assembly,
rows_to_print.append(('DBSNP', taxonomy, self.scientific_name, source_assembly, self.target_assembly,
num_studies, 'Pending'))
if len(rows) == 0:
self.warning(f'Nothing to process for taxonomy {self.taxonomy} and target assembly {self.target_assembly}')
self.warning(f'Nothing to process for taxonomy {self.taxonomies} and target assembly {self.target_assembly}')
return
with get_metadata_connection_handle(self.maven_profile, self.private_settings_file) as pg_conn:
with pg_conn.cursor() as cursor:
Expand All @@ -95,63 +112,68 @@ def load_tracker(self):
pretty_print(header_to_print, rows_to_print)

def get_job_information_from_tracker(self):
"""Gets jobs from tracker by target assembly, taxonomy, and release version"""
"""Gets jobs from tracker by target assembly, taxonomies, and release version"""
query = (
f"SELECT source, taxonomy, scientific_name, origin_assembly_accession, assembly_accession, "
f"num_studies, remapping_status "
f"SELECT source, string_agg(taxonomy::text, ','), scientific_name, origin_assembly_accession, "
f"assembly_accession, num_studies, remapping_status "
f"FROM {self.tracking_table} "
f"WHERE release_version={self.release_version} "
f"AND assembly_accession='{self.target_assembly}' AND taxonomy={self.taxonomy}"
f"AND assembly_accession='{self.target_assembly}' "
f"AND taxonomy in ({', '.join([str(t) for t in self.taxonomies])})"
f"GROUP BY taxonomy"
)
with get_metadata_connection_handle(self.maven_profile, self.private_settings_file) as pg_conn:
return get_all_results_for_query(pg_conn, query)

def get_source_assemblies_and_projects(self):
"""Query metadata for all public projects with this taxonomy, of these getting all reference accessions
"""Query metadata for all public projects with these taxonomies, of these getting all reference accessions
for all analyses."""
with get_metadata_connection_handle(self.maven_profile, self.private_settings_file) as pg_conn:
query = (
f"SELECT DISTINCT vcf_reference_accession, ARRAY_AGG(project_accession) "
f"SELECT DISTINCT vcf_reference_accession, taxonomy_id, ARRAY_AGG(project_accession) "
f"FROM evapro.project "
f"LEFT OUTER JOIN evapro.project_taxonomy USING (project_accession) "
f"LEFT OUTER JOIN evapro.project_analysis USING (project_accession) "
f"LEFT OUTER JOIN evapro.analysis USING (analysis_accession) "
f"WHERE taxonomy_id={self.taxonomy} AND ena_status=4 AND hidden_in_eva=0 "
f"GROUP BY vcf_reference_accession"
f"WHERE taxonomy_id in ({', '.join([str(t) for t in self.taxonomies])}) "
f"AND ena_status=4 AND hidden_in_eva=0 "
f"GROUP BY vcf_reference_accession, taxonomy_id"
)
return get_all_results_for_query(pg_conn, query)

def get_source_assemblies_and_num_studies_dbsnp(self):
# Source assemblies for dbSNP are not in metadata, so instead we get them from release 3 in the tracker
with get_metadata_connection_handle(self.maven_profile, self.private_settings_file) as pg_conn:
query = (
f"SELECT origin_assembly_accession, num_studies FROM {self.tracking_table} "
f"WHERE release_version=3 AND taxonomy={self.taxonomy} AND source='DBSNP'"
f"SELECT origin_assembly_accession, taxonomy, num_studies FROM {self.tracking_table} "
f"WHERE release_version=3 AND taxonomy in ({', '.join([str(t) for t in self.taxonomies])}) "
f"AND source='DBSNP'"
)
return get_all_results_for_query(pg_conn, query)

def run_remapping_and_clustering(self, instance, resume):
"""Run remapping and clustering for all source assemblies in the tracker marked as not Complete, resuming
the nextflow process if specified. (Note that this will also resume or rerun anything marked as Failed.)"""
source_assemblies = self.get_incomplete_assemblies()
source_assemblies_and_taxonomies = self.get_incomplete_assemblies_and_taxonomies()
self.info(f'Running remapping and clustering for the following assemblies: {source_assemblies}')
for source_assembly in source_assemblies:
self.process_one_assembly(source_assembly, instance, resume)
for source_assembly, taxonomies in source_assemblies_and_taxonomies:
self.process_one_assembly(source_assembly, taxonomies, instance, resume)

def get_incomplete_assemblies(self):
def get_incomplete_assemblies_and_taxonomies(self):
incomplete_assemblies = []
for row in self.get_job_information_from_tracker():
taxonomies = row[1]
source_assembly = row[3]
status = row[6]
if status != 'Completed':
incomplete_assemblies.append(source_assembly)
incomplete_assemblies.append(source_assembly, taxonomies)
return incomplete_assemblies

def process_one_assembly(self, source_assembly, instance, resume):
def process_one_assembly(self, source_assembly, taxonomies, instance, resume):
self.set_status_start(source_assembly)
base_directory = cfg['remapping']['base_directory']
nextflow_pipeline = os.path.join(os.path.dirname(__file__), 'nextflow', 'remap_cluster.nf')
assembly_directory = os.path.join(base_directory, str(self.taxonomy), source_assembly)
assembly_directory = os.path.join(base_directory, str(taxonomies), source_assembly)
work_dir = os.path.join(assembly_directory, 'work')
os.makedirs(work_dir, exist_ok=True)

Expand All @@ -173,7 +195,7 @@ def process_one_assembly(self, source_assembly, instance, resume):
remap_cluster_config_file = os.path.join(assembly_directory, 'remap_cluster_config.yaml')
remapping_required = self.check_remapping_required(source_assembly)
remap_cluster_config = {
'taxonomy_id': self.taxonomy,
'taxonomies': taxonomies,
'source_assembly_accession': source_assembly,
'target_assembly_accession': self.target_assembly,
'species_name': self.scientific_name,
Expand Down Expand Up @@ -221,7 +243,6 @@ def check_remapping_required(self, source_assembly):

def create_extraction_properties(self, output_file_path, source_assembly):
properties = self.properties_generator.get_remapping_extraction_properties(
taxonomy=self.taxonomy,
source_assembly=source_assembly,
output_folder='.',
)
Expand Down Expand Up @@ -337,9 +358,9 @@ def count_variants_from_logs(self, assembly_directory, source_assembly):

def update_dbs(self, source_of_assembly):
"""Update all relevant databases to reflect the new assembly."""
incomplete_assemblies = self.get_incomplete_assemblies()
if incomplete_assemblies:
self.warning(f'Processing for the following source assemblies is not yet complete: {incomplete_assemblies}')
incomplete_assemblies_taxonomies = self.get_incomplete_assemblies_and_taxonomies()
if incomplete_assemblies_taxonomies:
self.warning(f'Processing for the following source assemblies is not yet complete: {incomplete_assemblies_taxonomies}')
self.warning('Not updating databases.')
return
with get_metadata_connection_handle(self.maven_profile, self.private_settings_file) as pg_conn:
Expand All @@ -358,3 +379,28 @@ def add_to_contig_alias(self):
self.maven_profile, self.private_settings_file)
client = ContigAliasClient(contig_alias_url, contig_alias_user, contig_alias_pass)
client.insert_assembly(self.target_assembly)



def get_taxonomy_involved(taxonomy_id: int):
# First check if the current assembly is already target - if so don't do anything

if len(results) > 0 and results[0][0] == target_assembly:
logger.warning(f'Current assembly for taxonomy {taxonomy_id} is already {target_assembly}!')
return

# Deprecate the last current assembly
update_query = (
f"UPDATE {SUPPORTED_ASSEMBLY_TRACKER_TABLE} "
f"SET current=false, end_date='{today}' "
f"WHERE taxonomy_id={taxonomy_id} AND current=true;"
)
execute_query(metadata_connection_handle, update_query)

# Then insert the new assembly
insert_query = (
f"INSERT INTO {SUPPORTED_ASSEMBLY_TRACKER_TABLE} "
f"(taxonomy_id, source, assembly_id, current, start_date) "
f"VALUES({taxonomy_id}, '{source_of_assembly}', '{target_assembly}', true, '{today}');"
)
execute_query(metadata_connection_handle, insert_query)

0 comments on commit c86ccf0

Please sign in to comment.