diff --git a/eva_assembly_ingestion/assembly_ingestion_job.py b/eva_assembly_ingestion/assembly_ingestion_job.py index 69133b7..6c681dc 100644 --- a/eva_assembly_ingestion/assembly_ingestion_job.py +++ b/eva_assembly_ingestion/assembly_ingestion_job.py @@ -155,25 +155,26 @@ def run_remapping_and_clustering(self, instance, resume): """Run remapping and clustering for all source assemblies in the tracker marked as not Complete, resuming the nextflow process if specified. (Note that this will also resume or rerun anything marked as Failed.)""" source_assemblies_and_taxonomies = self.get_incomplete_assemblies_and_taxonomies() - self.info(f'Running remapping and clustering for the following assemblies: {source_assemblies}') - for source_assembly, taxonomies in source_assemblies_and_taxonomies: - self.process_one_assembly(source_assembly, taxonomies, instance, resume) + for source_assembly, taxonomy_list in source_assemblies_and_taxonomies: + self.info(f'Running remapping and clustering for the following assemblies: {source_assembly} ' + f'for taxonomy {", ".join([str(t) for t in taxonomy_list])}') + self.process_one_assembly(source_assembly, taxonomy_list, instance, resume) def get_incomplete_assemblies_and_taxonomies(self): incomplete_assemblies = [] for row in self.get_job_information_from_tracker(): - taxonomies = row[1] + taxonomies = row[1].split(',') # Comma separated list of taxonomies source_assembly = row[3] status = row[6] if status != 'Completed': incomplete_assemblies.append(source_assembly, taxonomies) return incomplete_assemblies - def process_one_assembly(self, source_assembly, taxonomies, instance, resume): + def process_one_assembly(self, source_assembly, taxonomy_list, instance, resume): self.set_status_start(source_assembly) base_directory = cfg['remapping']['base_directory'] nextflow_pipeline = os.path.join(os.path.dirname(__file__), 'nextflow', 'remap_cluster.nf') - assembly_directory = os.path.join(base_directory, str(taxonomies), source_assembly) + assembly_directory = os.path.join(base_directory, ",".join([str(t) for t in taxonomy_list]), source_assembly) work_dir = os.path.join(assembly_directory, 'work') os.makedirs(work_dir, exist_ok=True) @@ -195,7 +196,7 @@ def process_one_assembly(self, source_assembly, taxonomies, instance, resume): remap_cluster_config_file = os.path.join(assembly_directory, 'remap_cluster_config.yaml') remapping_required = self.check_remapping_required(source_assembly) remap_cluster_config = { - 'taxonomies': taxonomies, + 'taxonomy_list': taxonomy_list, 'source_assembly_accession': source_assembly, 'target_assembly_accession': self.target_assembly, 'species_name': self.scientific_name, @@ -364,15 +365,17 @@ def update_dbs(self, source_of_assembly): self.warning('Not updating databases.') return with get_metadata_connection_handle(self.maven_profile, self.private_settings_file) as pg_conn: - add_to_supported_assemblies(metadata_connection_handle=pg_conn, source_of_assembly=source_of_assembly, - target_assembly=self.target_assembly, taxonomy_id=self.taxonomy) + for taxonomy in self.taxonomies: + add_to_supported_assemblies(metadata_connection_handle=pg_conn, source_of_assembly=source_of_assembly, + target_assembly=self.target_assembly, taxonomy_id=taxonomy) self.add_to_metadata() self.add_to_contig_alias() self.info('Metadata database updates complete.') def add_to_metadata(self): with get_metadata_connection_handle(self.maven_profile, self.private_settings_file) as pg_conn: - insert_new_assembly_and_taxonomy(pg_conn, self.target_assembly, self.taxonomy) + for taxonomy in self.taxonomies: + insert_new_assembly_and_taxonomy(pg_conn, self.target_assembly, taxonomy) def add_to_contig_alias(self): contig_alias_url, contig_alias_user, contig_alias_pass = get_contig_alias_db_creds_for_profile( diff --git a/eva_assembly_ingestion/nextflow/remap_cluster.nf b/eva_assembly_ingestion/nextflow/remap_cluster.nf index 3f7a8ed..b73f242 100644 --- a/eva_assembly_ingestion/nextflow/remap_cluster.nf +++ b/eva_assembly_ingestion/nextflow/remap_cluster.nf @@ -8,7 +8,7 @@ def helpMessage() { Remap one assembly version to another, cluster, and QC. Inputs: - --taxonomy_id taxonomy id of submitted variants that needs to be remapped. + --taxonomy_list list of taxonomy id of submitted variants that needs to be remapped. --source_assembly_accession assembly accession of the submitted variants are currently mapped to. --target_assembly_accession assembly accession the submitted variants will be remapped to. --species_name scientific name to be used for the species. @@ -35,8 +35,8 @@ params.help = null if (params.help) exit 0, helpMessage() // Test input files -if (!params.taxonomy_id || !params.source_assembly_accession || !params.target_assembly_accession || !params.species_name || !params.genome_assembly_dir ) { - if (!params.taxonomy_id) log.warn('Provide the taxonomy id of the source submitted variants using --taxonomy_id') +if (!params.taxonomy_list || !params.source_assembly_accession || !params.target_assembly_accession || !params.species_name || !params.genome_assembly_dir ) { + if (!params.taxonomy_list) log.warn('Provide the taxonomy id of the source submitted variants using --taxonomy_list') if (!params.source_assembly_accession) log.warn('Provide the source assembly using --source_assembly_accession') if (!params.target_assembly_accession) log.warn('Provide the target assembly using --target_assembly_accession') if (!params.species_name) log.warn('Provide a species name using --species_name') @@ -132,6 +132,7 @@ process extract_vcf_from_mongo { input: path source_fasta path source_report + each taxonomy output: // Store both vcfs (eva and dbsnp), emit: one channel @@ -145,6 +146,7 @@ process extract_vcf_from_mongo { --spring.config.location=file:${params.extraction_properties} \ --parameters.fasta=${source_fasta} \ --parameters.assemblyReportUrl=file:${source_report} \ + --parameters.taxonomy=${taxonomy} > ${params.source_assembly_accession}_vcf_extractor.log """ } @@ -330,7 +332,11 @@ workflow { update_source_genome(params.source_assembly_accession, retrieve_source_genome.out.source_fasta, retrieve_source_genome.out.source_report, params.remapping_config) update_target_genome(retrieve_target_genome.out.target_fasta, retrieve_target_genome.out.target_report, params.remapping_config) - extract_vcf_from_mongo(update_source_genome.out.updated_source_fasta, update_source_genome.out.updated_source_report) + extract_vcf_from_mongo( + update_source_genome.out.updated_source_fasta, + update_source_genome.out.updated_source_report, + params.taxonomy_list + ) remap_variants(extract_vcf_from_mongo.out.source_vcfs.flatten(), update_source_genome.out.updated_source_fasta, update_target_genome.out.updated_target_fasta) ingest_vcf_into_mongo(remap_variants.out.remapped_vcfs, update_target_genome.out.updated_target_report) diff --git a/tests/nextflow-tests/java/FakeExtractionPipeline.java b/tests/nextflow-tests/java/FakeExtractionPipeline.java index 79ff9be..eb69721 100644 --- a/tests/nextflow-tests/java/FakeExtractionPipeline.java +++ b/tests/nextflow-tests/java/FakeExtractionPipeline.java @@ -6,14 +6,21 @@ public class FakeExtractionPipeline { public static void main(String[] args) { String outString = "java -jar extraction.jar"; - String inFile = null; + String accession = null; + String taxonomy = null; for (String arg: args) { outString += " " + arg; - if (arg.startsWith("--parameters.fasta=")) - inFile = arg.substring("--parameters.fasta=".length(), arg.length()-"_custom.fa".length()); + if (arg.startsWith("--parameters.fasta=")){ + accession = arg.substring("--parameters.fasta=".length(), arg.length()-"_custom.fa".length()); + } + if (arg.startsWith("--parameters.taxonomy=")){ + taxonomy = arg.substring("--parameters.taxonomy=".length(), arg.length()); + } } System.out.println(outString); - System.out.println(inFile); + System.out.println(accession); + System.out.println(taxonomy); + String inFile = accession + "_" + taxonomy; // real pipeline gets this from properties String outFile1 = inFile + "_dbsnp.vcf"; diff --git a/tests/nextflow-tests/run_tests.sh b/tests/nextflow-tests/run_tests.sh index 5660edf..e3870b4 100755 --- a/tests/nextflow-tests/run_tests.sh +++ b/tests/nextflow-tests/run_tests.sh @@ -13,7 +13,6 @@ PATH=${SCRIPT_DIR}/bin:$PATH printf "\e[32m===== REMAPPING AND CLUSTERING PIPELINE =====\e[0m\n" nextflow run ${SOURCE_DIR}/eva_assembly_ingestion/nextflow/remap_cluster.nf -params-file test_config.yaml \ - --taxonomy_id 1234 \ --source_assembly_accession GCA_0000001 \ --target_assembly_accession GCA_0000002 \ --species_name "Thingy thungus" \ diff --git a/tests/nextflow-tests/test_config.yaml b/tests/nextflow-tests/test_config.yaml index ae7bfba..660f574 100644 --- a/tests/nextflow-tests/test_config.yaml +++ b/tests/nextflow-tests/test_config.yaml @@ -1,3 +1,4 @@ +taxonomy_list: [1233, 1234] executable: genome_downloader: ../../../bin/fake_genome_downloader.py