Skip to content

Commit

Permalink
Nextflow pipeline remap for multiple taxonomies
Browse files Browse the repository at this point in the history
  • Loading branch information
tcezard committed Jan 5, 2024
1 parent c86ccf0 commit 7baff5e
Show file tree
Hide file tree
Showing 5 changed files with 35 additions and 19 deletions.
23 changes: 13 additions & 10 deletions eva_assembly_ingestion/assembly_ingestion_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,25 +155,26 @@ def run_remapping_and_clustering(self, instance, resume):
"""Run remapping and clustering for all source assemblies in the tracker marked as not Complete, resuming
the nextflow process if specified. (Note that this will also resume or rerun anything marked as Failed.)"""
source_assemblies_and_taxonomies = self.get_incomplete_assemblies_and_taxonomies()
self.info(f'Running remapping and clustering for the following assemblies: {source_assemblies}')
for source_assembly, taxonomies in source_assemblies_and_taxonomies:
self.process_one_assembly(source_assembly, taxonomies, instance, resume)
for source_assembly, taxonomy_list in source_assemblies_and_taxonomies:
self.info(f'Running remapping and clustering for the following assemblies: {source_assembly} '
f'for taxonomy {", ".join([str(t) for t in taxonomy_list])}')
self.process_one_assembly(source_assembly, taxonomy_list, instance, resume)

def get_incomplete_assemblies_and_taxonomies(self):
incomplete_assemblies = []
for row in self.get_job_information_from_tracker():
taxonomies = row[1]
taxonomies = row[1].split(',') # Comma separated list of taxonomies
source_assembly = row[3]
status = row[6]
if status != 'Completed':
incomplete_assemblies.append(source_assembly, taxonomies)
return incomplete_assemblies

def process_one_assembly(self, source_assembly, taxonomies, instance, resume):
def process_one_assembly(self, source_assembly, taxonomy_list, instance, resume):
self.set_status_start(source_assembly)
base_directory = cfg['remapping']['base_directory']
nextflow_pipeline = os.path.join(os.path.dirname(__file__), 'nextflow', 'remap_cluster.nf')
assembly_directory = os.path.join(base_directory, str(taxonomies), source_assembly)
assembly_directory = os.path.join(base_directory, ",".join([str(t) for t in taxonomy_list]), source_assembly)
work_dir = os.path.join(assembly_directory, 'work')
os.makedirs(work_dir, exist_ok=True)

Expand All @@ -195,7 +196,7 @@ def process_one_assembly(self, source_assembly, taxonomies, instance, resume):
remap_cluster_config_file = os.path.join(assembly_directory, 'remap_cluster_config.yaml')
remapping_required = self.check_remapping_required(source_assembly)
remap_cluster_config = {
'taxonomies': taxonomies,
'taxonomy_list': taxonomy_list,
'source_assembly_accession': source_assembly,
'target_assembly_accession': self.target_assembly,
'species_name': self.scientific_name,
Expand Down Expand Up @@ -364,15 +365,17 @@ def update_dbs(self, source_of_assembly):
self.warning('Not updating databases.')
return
with get_metadata_connection_handle(self.maven_profile, self.private_settings_file) as pg_conn:
add_to_supported_assemblies(metadata_connection_handle=pg_conn, source_of_assembly=source_of_assembly,
target_assembly=self.target_assembly, taxonomy_id=self.taxonomy)
for taxonomy in self.taxonomies:
add_to_supported_assemblies(metadata_connection_handle=pg_conn, source_of_assembly=source_of_assembly,
target_assembly=self.target_assembly, taxonomy_id=taxonomy)
self.add_to_metadata()
self.add_to_contig_alias()
self.info('Metadata database updates complete.')

def add_to_metadata(self):
with get_metadata_connection_handle(self.maven_profile, self.private_settings_file) as pg_conn:
insert_new_assembly_and_taxonomy(pg_conn, self.target_assembly, self.taxonomy)
for taxonomy in self.taxonomies:
insert_new_assembly_and_taxonomy(pg_conn, self.target_assembly, taxonomy)

def add_to_contig_alias(self):
contig_alias_url, contig_alias_user, contig_alias_pass = get_contig_alias_db_creds_for_profile(
Expand Down
14 changes: 10 additions & 4 deletions eva_assembly_ingestion/nextflow/remap_cluster.nf
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ def helpMessage() {
Remap one assembly version to another, cluster, and QC.
Inputs:
--taxonomy_id taxonomy id of submitted variants that needs to be remapped.
--taxonomy_list list of taxonomy id of submitted variants that needs to be remapped.
--source_assembly_accession assembly accession of the submitted variants are currently mapped to.
--target_assembly_accession assembly accession the submitted variants will be remapped to.
--species_name scientific name to be used for the species.
Expand All @@ -35,8 +35,8 @@ params.help = null
if (params.help) exit 0, helpMessage()

// Test input files
if (!params.taxonomy_id || !params.source_assembly_accession || !params.target_assembly_accession || !params.species_name || !params.genome_assembly_dir ) {
if (!params.taxonomy_id) log.warn('Provide the taxonomy id of the source submitted variants using --taxonomy_id')
if (!params.taxonomy_list || !params.source_assembly_accession || !params.target_assembly_accession || !params.species_name || !params.genome_assembly_dir ) {
if (!params.taxonomy_list) log.warn('Provide the taxonomy id of the source submitted variants using --taxonomy_list')
if (!params.source_assembly_accession) log.warn('Provide the source assembly using --source_assembly_accession')
if (!params.target_assembly_accession) log.warn('Provide the target assembly using --target_assembly_accession')
if (!params.species_name) log.warn('Provide a species name using --species_name')
Expand Down Expand Up @@ -132,6 +132,7 @@ process extract_vcf_from_mongo {
input:
path source_fasta
path source_report
each taxonomy

output:
// Store both vcfs (eva and dbsnp), emit: one channel
Expand All @@ -145,6 +146,7 @@ process extract_vcf_from_mongo {
--spring.config.location=file:${params.extraction_properties} \
--parameters.fasta=${source_fasta} \
--parameters.assemblyReportUrl=file:${source_report} \
--parameters.taxonomy=${taxonomy}
> ${params.source_assembly_accession}_vcf_extractor.log
"""
}
Expand Down Expand Up @@ -330,7 +332,11 @@ workflow {
update_source_genome(params.source_assembly_accession, retrieve_source_genome.out.source_fasta,
retrieve_source_genome.out.source_report, params.remapping_config)
update_target_genome(retrieve_target_genome.out.target_fasta, retrieve_target_genome.out.target_report, params.remapping_config)
extract_vcf_from_mongo(update_source_genome.out.updated_source_fasta, update_source_genome.out.updated_source_report)
extract_vcf_from_mongo(
update_source_genome.out.updated_source_fasta,
update_source_genome.out.updated_source_report,
params.taxonomy_list
)
remap_variants(extract_vcf_from_mongo.out.source_vcfs.flatten(), update_source_genome.out.updated_source_fasta,
update_target_genome.out.updated_target_fasta)
ingest_vcf_into_mongo(remap_variants.out.remapped_vcfs, update_target_genome.out.updated_target_report)
Expand Down
15 changes: 11 additions & 4 deletions tests/nextflow-tests/java/FakeExtractionPipeline.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,21 @@ public class FakeExtractionPipeline {

public static void main(String[] args) {
String outString = "java -jar extraction.jar";
String inFile = null;
String accession = null;
String taxonomy = null;
for (String arg: args) {
outString += " " + arg;
if (arg.startsWith("--parameters.fasta="))
inFile = arg.substring("--parameters.fasta=".length(), arg.length()-"_custom.fa".length());
if (arg.startsWith("--parameters.fasta=")){
accession = arg.substring("--parameters.fasta=".length(), arg.length()-"_custom.fa".length());
}
if (arg.startsWith("--parameters.taxonomy=")){
taxonomy = arg.substring("--parameters.taxonomy=".length(), arg.length());
}
}
System.out.println(outString);
System.out.println(inFile);
System.out.println(accession);
System.out.println(taxonomy);
String inFile = accession + "_" + taxonomy;

// real pipeline gets this from properties
String outFile1 = inFile + "_dbsnp.vcf";
Expand Down
1 change: 0 additions & 1 deletion tests/nextflow-tests/run_tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ PATH=${SCRIPT_DIR}/bin:$PATH

printf "\e[32m===== REMAPPING AND CLUSTERING PIPELINE =====\e[0m\n"
nextflow run ${SOURCE_DIR}/eva_assembly_ingestion/nextflow/remap_cluster.nf -params-file test_config.yaml \
--taxonomy_id 1234 \
--source_assembly_accession GCA_0000001 \
--target_assembly_accession GCA_0000002 \
--species_name "Thingy thungus" \
Expand Down
1 change: 1 addition & 0 deletions tests/nextflow-tests/test_config.yaml
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
taxonomy_list: [1233, 1234]

executable:
genome_downloader: ../../../bin/fake_genome_downloader.py
Expand Down

0 comments on commit 7baff5e

Please sign in to comment.