From f8c6f1eb28c44afcdbcf86b6420404e0b323ab36 Mon Sep 17 00:00:00 2001 From: nitin-ebi <79518737+nitin-ebi@users.noreply.github.com> Date: Tue, 9 May 2023 10:40:17 +0100 Subject: [PATCH] EVA-3147 Use Spring properties generator for generating properties files for accession and variant load (#146) generate the properties file using springPropertiesGenerator --- eva_submission/eload_ingestion.py | 106 +++++------ .../etc/example_submission_config.yaml | 1 + eva_submission/ingestion_templates.py | 102 ----------- eva_submission/nextflow/accession.nf | 90 ++++----- eva_submission/nextflow/variant_load.nf | 148 +++++---------- eva_submission/vep_utils.py | 3 +- requirements.txt | 2 +- .../java/FakeAccessionPipeline.java | 5 +- .../nextflow-tests/test_ingestion_config.yaml | 14 +- .../test_ingestion_config_human.yaml | 14 +- .../test_ingestion_no_remapping_config.yaml | 14 +- tests/resources/settings.xml | 7 + tests/resources/submission_config.yml | 1 + tests/test_eload_ingestion.py | 172 +++++------------- 14 files changed, 199 insertions(+), 480 deletions(-) delete mode 100644 eva_submission/ingestion_templates.py diff --git a/eva_submission/eload_ingestion.py b/eva_submission/eload_ingestion.py index 28e94321..f3e639e4 100644 --- a/eva_submission/eload_ingestion.py +++ b/eva_submission/eload_ingestion.py @@ -9,13 +9,11 @@ from cached_property import cached_property from ebi_eva_common_pyutils import command_utils from ebi_eva_common_pyutils.config import cfg -from ebi_eva_common_pyutils.config_utils import get_mongo_uri_for_eva_profile, get_primary_mongo_creds_for_profile, \ - get_accession_pg_creds_for_profile, get_count_service_creds_for_profile +from ebi_eva_common_pyutils.config_utils import get_mongo_uri_for_eva_profile from ebi_eva_common_pyutils.ena_utils import get_assembly_name_and_taxonomy_id from ebi_eva_common_pyutils.metadata_utils import resolve_variant_warehouse_db_name, insert_new_assembly_and_taxonomy, \ get_assembly_set_from_metadata, add_to_supported_assemblies -from ebi_eva_common_pyutils.ncbi_utils import get_ncbi_assembly_dicts_from_term, \ - retrieve_species_scientific_name_from_tax_id_ncbi +from ebi_eva_common_pyutils.ncbi_utils import get_species_name_from_ncbi from ebi_eva_common_pyutils.pg_utils import get_all_results_for_query, execute_query from ebi_eva_common_pyutils.spring_properties import SpringPropertiesGenerator from ebi_eva_common_pyutils.assembly.assembly import get_supported_asm_from_ensembl @@ -25,7 +23,6 @@ from eva_submission.eload_utils import provision_new_database_for_variant_warehouse from eva_submission.submission_config import EloadConfig from eva_submission.vep_utils import get_vep_and_vep_cache_version -from eva_submission.ingestion_templates import accession_props_template, variant_load_props_template project_dirs = { 'logs': '00_logs', @@ -121,30 +118,11 @@ def fill_vep_versions(self, vep_cache_assembly_name=None): assembly_accession, vep_cache_assembly_name ) - vep_species = self.get_species_name_from_ncbi(assembly_accession) + vep_species = get_species_name_from_ncbi(assembly_accession) self.eload_cfg.set(self.config_section, 'vep', assembly_accession, 'version', value=vep_version) self.eload_cfg.set(self.config_section, 'vep', assembly_accession, 'cache_version', value=vep_cache_version) self.eload_cfg.set(self.config_section, 'vep', assembly_accession, 'species', value=vep_species) - def get_species_name_from_ncbi(self, assembly_acc): - # We first need to search for the species associated with the assembly - assembly_dicts = get_ncbi_assembly_dicts_from_term(assembly_acc) - taxid_and_assembly_name = set([ - (assembly_dict.get('taxid'), assembly_dict.get('assemblyname')) - for assembly_dict in assembly_dicts - if assembly_dict.get('assemblyaccession') == assembly_acc or - assembly_dict.get('synonym', {}).get('genbank') == assembly_acc - ]) - # This is a search so could retrieve multiple results - if len(taxid_and_assembly_name) != 1: - raise ValueError(f'Multiple assembly found for {assembly_acc}. ' - f'Cannot resolve single assembly for assembly {assembly_acc} in NCBI.') - - taxonomy_id, assembly_name = taxid_and_assembly_name.pop() - - scientific_name = retrieve_species_scientific_name_from_tax_id_ncbi(taxonomy_id) - return scientific_name.replace(' ', '_').lower() - def _get_vcf_files_from_brokering(self): vcf_files = [] analyses = self.eload_cfg.query('brokering', 'analyses') @@ -324,56 +302,39 @@ def _generate_csv_mappings_to_ingest(self): return vcf_files_to_ingest def run_accession_workflow(self, vcf_files_to_ingest, resume): - mongo_host, mongo_user, mongo_pass = get_primary_mongo_creds_for_profile(self.maven_profile, - self.private_settings_file) - pg_url, pg_user, pg_pass = get_accession_pg_creds_for_profile(self.maven_profile, self.private_settings_file) - counts_url, counts_user, counts_pass = get_count_service_creds_for_profile(self.maven_profile, - self.private_settings_file) - job_props = accession_props_template( - taxonomy_id=self.taxonomy, - project_accession=self.project_accession, - instance_id=self.eload_cfg.query(self.config_section, 'accession', 'instance_id'), - mongo_host=mongo_host, - mongo_user=mongo_user, - mongo_pass=mongo_pass, - postgres_url=pg_url, - postgres_user=pg_user, - postgres_pass=pg_pass, - counts_url=counts_url, - counts_user=counts_user, - counts_pass=counts_pass - ) + instance_id = self.eload_cfg.query(self.config_section, 'accession', 'instance_id') + output_dir = os.path.join(self.project_dir, project_dirs['accessions']) + accession_properties_file = self.create_accession_properties(instance_id=instance_id, + output_file_path=os.path.join(output_dir, 'accession.properties')) accession_config = { 'valid_vcfs': vcf_files_to_ingest, 'project_accession': self.project_accession, - 'instance_id': self.eload_cfg.query(self.config_section, 'accession', 'instance_id'), - 'accession_job_props': job_props, + 'instance_id': instance_id, + 'accession_job_props': accession_properties_file, 'public_ftp_dir': cfg['public_ftp_dir'], 'accessions_dir': os.path.join(self.project_dir, project_dirs['accessions']), 'public_dir': os.path.join(self.project_dir, project_dirs['public']), 'logs_dir': os.path.join(self.project_dir, project_dirs['logs']), 'executable': cfg['executable'], 'jar': cfg['jar'], + 'taxonomy': self.taxonomy } self.run_nextflow('accession', accession_config, resume) def run_variant_load_workflow(self, vcf_files_to_ingest, annotation_only, resume): - job_props = variant_load_props_template( - project_accession=self.project_accession, - study_name=self.get_study_name(), - output_dir=self.project_dir.joinpath(project_dirs['transformed']), - annotation_dir=self.project_dir.joinpath(project_dirs['annotation']), - stats_dir=self.project_dir.joinpath(project_dirs['stats']), - ) + variant_load_properties_file = self.create_variant_load_properties( + output_file_path=os.path.join(self.project_dir, 'variant_load.properties')) + accession_import_properties_file = self.create_accession_import_properties( + output_file_path=os.path.join(self.project_dir, 'accession_import.properties')) + load_config = { 'valid_vcfs': vcf_files_to_ingest, 'vep_path': cfg['vep_path'], - 'load_job_props': job_props, - 'acc_import_job_props': {'db.collections.variants.name': 'variants_2_0'}, + 'load_job_props': variant_load_properties_file, + 'acc_import_job_props': accession_import_properties_file, 'project_accession': self.project_accession, 'project_dir': str(self.project_dir), 'logs_dir': os.path.join(self.project_dir, project_dirs['logs']), - 'eva_pipeline_props': cfg['eva_pipeline_props'], 'executable': cfg['executable'], 'jar': cfg['jar'], 'annotation_only': annotation_only, @@ -496,6 +457,39 @@ def create_clustering_properties(self, output_file_path, clustering_instance, ta open_file.write(properties) return output_file_path + def create_accession_properties(self, instance_id, output_file_path): + properties = self.properties_generator.get_accessioning_properties( + instance=instance_id, + target_assembly=self._get_target_assembly(), + project_accession=self.project_accession, + taxonomy_accession=self.taxonomy + ) + with open(output_file_path, 'w') as open_file: + open_file.write(properties) + return output_file_path + + def create_variant_load_properties(self, output_file_path): + properties = self.properties_generator.get_variant_load_properties( + project_accession=self.project_accession, + study_name=self.get_study_name(), + output_dir=self.project_dir.joinpath(project_dirs['transformed']), + annotation_dir=self.project_dir.joinpath(project_dirs['annotation']), + stats_dir=self.project_dir.joinpath(project_dirs['stats']), + vep_cache_path=cfg['vep_cache_path'], + opencga_path=cfg['opencga_path'] + ) + with open(output_file_path, 'w') as open_file: + open_file.write(properties) + return output_file_path + + def create_accession_import_properties(self, output_file_path): + properties = self.properties_generator.get_accession_import_properties( + opencga_path=cfg['opencga_path'] + ) + with open(output_file_path, 'w') as open_file: + open_file.write(properties) + return output_file_path + def insert_browsable_files(self): with self.metadata_connection_handle as conn: # insert into browsable file table, if files not already there diff --git a/eva_submission/etc/example_submission_config.yaml b/eva_submission/etc/example_submission_config.yaml index f5d59987..c7220a83 100644 --- a/eva_submission/etc/example_submission_config.yaml +++ b/eva_submission/etc/example_submission_config.yaml @@ -7,6 +7,7 @@ eutils_api_key: 1234556lkxflk eva_pipeline_props: '/path/to/pipeline/properties' vep_path: '/path/to/vep' vep_cache_path: '/path/to/vep/cache' +opencga_path: '/path/to/opencga' maven: environment: 'internal' diff --git a/eva_submission/ingestion_templates.py b/eva_submission/ingestion_templates.py deleted file mode 100644 index 1adbb0c8..00000000 --- a/eva_submission/ingestion_templates.py +++ /dev/null @@ -1,102 +0,0 @@ -from ebi_eva_common_pyutils.config import cfg - - -# Name of collections in variant warehouse -annotation_metadata_collection_name = 'annotationMetadata_2_0' -annotation_collection_name = 'annotations_2_0' - - -def accession_props_template( - instance_id, - taxonomy_id, - project_accession, - postgres_url, - postgres_user, - postgres_pass, - mongo_host, - mongo_user, - mongo_pass, - counts_url, - counts_user, - counts_pass -): - """ - Get all properties needed for this accessioning job, except for the input - and output filenames which are filled in by Nextflow. - """ - return { - 'accessioning.instanceId': f'instance-{instance_id}', - 'accessioning.submitted.categoryId': 'ss', - 'accessioning.monotonic.ss.blockSize': 100000, - 'accessioning.monotonic.ss.blockStartValue': 5000000000, - 'accessioning.monotonic.ss.nextBlockInterval': 1000000000, - 'parameters.taxonomyAccession': taxonomy_id, - 'parameters.projectAccession': project_accession, - 'parameters.chunkSize': 100, - 'parameters.forceRestart': False, - 'parameters.contigNaming': 'NO_REPLACEMENT', - 'spring.batch.job.names': 'CREATE_SUBSNP_ACCESSION_JOB', - 'spring.datasource.driver-class-name': 'org.postgresql.Driver', - 'spring.datasource.url': postgres_url, - 'spring.datasource.username': postgres_user, - 'spring.datasource.password': postgres_pass, - 'spring.datasource.tomcat.max-active': 3, - 'spring.jpa.generate-ddl': True, - 'spring.data.mongodb.host': mongo_host, - 'spring.data.mongodb.port': 27017, - 'spring.data.mongodb.database': 'eva_accession_sharded', - 'spring.data.mongodb.username': mongo_user, - 'spring.data.mongodb.password': mongo_pass, - 'spring.data.mongodb.authentication-database': 'admin', - 'mongodb.read-preference': 'secondaryPreferred', - 'eva.count-stats.url': counts_url, - 'eva.count-stats.username': counts_user, - 'eva.count-stats.password': counts_pass, - 'spring.main.web-application-type': 'none', - 'spring.main.allow-bean-definition-overriding': True, - 'spring.jpa.properties.hibernate.jdbc.lob.non_contextual_creation': True, - # See https://vkuzel.com/spring-boot-jpa-hibernate-atomikos-postgresql-exception - # Disable feature detection by this undocumented parameter. - # Check the org.hibernate.engine.jdbc.internal.JdbcServiceImpl.configure method for more details. - 'spring.jpa.properties.hibernate.temp.use_jdbc_metadata_defaults': False, - # Because detection is disabled you have to set correct dialect by hand. - 'spring.jpa.database-platform': 'org.hibernate.dialect.PostgreSQL9Dialect' - } - - -def variant_load_props_template( - project_accession, - study_name, - output_dir, - annotation_dir, - stats_dir -): - """ - Get all properties needed for this variant load job, except for the vcf file - which is filled in by Nextflow after (optional) merge. - """ - return { - 'input.study.id': project_accession, - 'input.study.name': study_name, - 'input.study.type': 'COLLECTION', - 'output.dir': str(output_dir), - 'output.dir.annotation': str(annotation_dir), - 'output.dir.statistics': str(stats_dir), - 'db.collections.files.name': 'files_2_0', - 'db.collections.variants.name': 'variants_2_0', - 'db.collections.annotation-metadata.name': annotation_metadata_collection_name, - 'db.collections.annotations.name': annotation_collection_name, - 'app.vep.cache.path': cfg['vep_cache_path'], - 'app.vep.num-forks': 4, - 'app.vep.timeout': 500, - 'statistics.skip': False, - 'annotation.overwrite': False, - 'config.chunk.size': 200, - 'spring.jpa.properties.hibernate.jdbc.lob.non_contextual_creation': True, - # See https://vkuzel.com/spring-boot-jpa-hibernate-atomikos-postgresql-exception - # Disable feature detection by this undocumented parameter. - # Check the org.hibernate.engine.jdbc.internal.JdbcServiceImpl.configure method for more details. - 'spring.jpa.properties.hibernate.temp.use_jdbc_metadata_defaults': False, - # Because detection is disabled you have to set correct dialect by hand. - 'spring.jpa.database-platform': 'org.hibernate.dialect.PostgreSQL9Dialect' - } diff --git a/eva_submission/nextflow/accession.nf b/eva_submission/nextflow/accession.nf index 03d7af9d..24ddd472 100644 --- a/eva_submission/nextflow/accession.nf +++ b/eva_submission/nextflow/accession.nf @@ -10,11 +10,12 @@ def helpMessage() { --valid_vcfs csv file with the mappings for vcf file, assembly accession, fasta, assembly report, analysis_accession, db_name, aggregation --project_accession project accession --instance_id instance id to run accessioning - --accession_job_props job-specific properties, passed as a map + --accession_job_props properties file for accessioning job --public_ftp_dir public FTP directory --accessions_dir accessions directory (for properties files) --public_dir directory for files to be made public --logs_dir logs directory + --taxonomy taxonomy id """ } @@ -26,6 +27,7 @@ params.public_ftp_dir = null params.accessions_dir = null params.public_dir = null params.logs_dir = null +params.taxonomy = null // executables params.executable = ["bcftools": "bcftools", "tabix": "tabix"] // java jars @@ -37,12 +39,12 @@ params.help = null if (params.help) exit 0, helpMessage() // Test input files -if (!params.valid_vcfs || !params.project_accession || !params.instance_id || !params.accession_job_props || !params.public_ftp_dir || !params.accessions_dir || !params.public_dir || !params.logs_dir || !params.accession_job_props.'parameters.taxonomyAccession') { +if (!params.valid_vcfs || !params.project_accession || !params.instance_id || !params.accession_job_props || !params.public_ftp_dir || !params.accessions_dir || !params.public_dir || !params.logs_dir || !params.taxonomy) { if (!params.valid_vcfs) log.warn('Provide a csv file with the mappings (vcf file, assembly accession, fasta, assembly report, analysis_accession, db_name) --valid_vcfs') if (!params.project_accession) log.warn('Provide a project accession using --project_accession') if (!params.instance_id) log.warn('Provide an instance id using --instance_id') if (!params.accession_job_props) log.warn('Provide job-specific properties using --accession_job_props') - if (!params.accession_job_props.'parameters.taxonomyAccession') log.warn('Provide taxonomy_id in the job-specific properties (--accession_job_props) using field taxonomyAccession') + if (!params.taxonomy) log.warn('Provide taxonomy id using --taxonomy') if (!params.public_ftp_dir) log.warn('Provide public FTP directory using --public_ftp_dir') if (!params.accessions_dir) log.warn('Provide accessions directory using --accessions_dir') if (!params.public_dir) log.warn('Provide public directory using --public_dir') @@ -53,30 +55,29 @@ if (!params.valid_vcfs || !params.project_accession || !params.instance_id || !p /* Sequence of processes in case of: non-human study: - create_properties -> accession_vcf -> sort_and_compress_vcf -> csi_index_vcf -> copy_to_ftp + accession_vcf -> sort_and_compress_vcf -> csi_index_vcf -> copy_to_ftp human study (skip accessioning): csi_index_vcf -> copy_to_ftp process input channels -create_properties -> valid_vcfs +accession_vcf -> valid_vcfs csi_index_vcf -> csi_vcfs and compressed_vcf 1. Check if the study we are working with is a human study or non-human by comparing the taxonomy_id of the study with human taxonomy_id (9606). 2. Provide values to the appropriate channels enabling them to start the corresponding processes. In case of non-human studies we want to start process - "create_properties" while in case of human studies we want to start processes "csi_index_vcf". + "accession_vcf" while in case of human studies we want to start processes "csi_index_vcf". non-human study: - - Initialize valid_vcfs channel with value so that it can start the process "create_properties". + - Initialize valid_vcfs channel with value so that it can start the process "accession_vcf". - Initialize csi_vcfs channels as empty. This makes sure the processes "csi_index_vcf" are not started at the outset. These processes will only be able to start after the process "sort_and_compress_vcf" finishes and create channels compressed_vcf with values. human study: - - Initialize valid_vcfs channel as empty, ensuring the process "create_properties" is not started and in turn accessioning part is also skipped, as the process - "accession_vcf" depends on the output channels created by the process create_properties. + - Initialize valid_vcfs channel as empty, ensuring the process "accession_vcf" is not started and in turn accessioning part is also skipped - Initialize csi_vcfs with values enabling them to start the processes "csi_index_vcf". */ workflow { - is_human_study = (params.accession_job_props.'parameters.taxonomyAccession' == 9606) + is_human_study = (params.taxonomy == 9606) if (is_human_study) { csi_vcfs = Channel.fromPath(params.valid_vcfs) @@ -87,56 +88,15 @@ workflow { valid_vcfs = Channel.fromPath(params.valid_vcfs) .splitCsv(header:true) .map{row -> tuple(file(row.vcf_file), row.assembly_accession, row.aggregation, file(row.fasta), file(row.report))} - create_properties(valid_vcfs) - accession_vcf(create_properties.out.accession_props, create_properties.out.accessioned_filenames, create_properties.out.log_filenames) + accession_vcf(valid_vcfs) sort_and_compress_vcf(accession_vcf.out.accession_done) csi_vcfs = sort_and_compress_vcf.out.compressed_vcf - accessioned_files_to_rm = create_properties.out.accessioned_filenames + accessioned_files_to_rm = accession_vcf.out.accessioned_filenames } csi_index_vcf(csi_vcfs) copy_to_ftp(csi_index_vcf.out.csi_indexed_vcf.toList(), accessioned_files_to_rm.toList()) } -/* - * Create properties files for accession. - */ -process create_properties { - input: - tuple val(vcf_file), val(assembly_accession), val(aggregation), val(fasta), val(report) - - output: - path "${vcf_file.getFileName()}_accessioning.properties", emit: accession_props - val accessioned_filename, emit: accessioned_filenames - val log_filename, emit: log_filenames - - exec: - props = new Properties() - params.accession_job_props.each { k, v -> - props.setProperty(k, v.toString()) - } - props.setProperty("parameters.assemblyAccession", assembly_accession.toString()) - props.setProperty("parameters.vcfAggregation", aggregation.toString()) - props.setProperty("parameters.fasta", fasta.toString()) - props.setProperty("parameters.assemblyReportUrl", "file:" + report.toString()) - props.setProperty("parameters.vcf", vcf_file.toString()) - vcf_filename = vcf_file.getFileName().toString() - accessioned_filename = vcf_filename.take(vcf_filename.indexOf(".vcf")) + ".accessioned.vcf" - log_filename = "accessioning.${vcf_filename}" - props.setProperty("parameters.outputVcf", "${params.public_dir}/${accessioned_filename}") - - // need to explicitly store in workDir so next process can pick it up - // see https://github.com/nextflow-io/nextflow/issues/942#issuecomment-441536175 - props_file = new File("${task.workDir}/${vcf_filename}_accessioning.properties") - props_file.createNewFile() - props_file.newWriter().withWriter { w -> - props.each { k, v -> - w.write("$k=$v\n") - } - } - // make a copy for debugging purposes - new File("${params.accessions_dir}/${vcf_filename}_accessioning.properties") << props_file.asWritable() -} - /* * Accession VCFs @@ -149,17 +109,29 @@ process accession_vcf { memory '6.7 GB' input: - path accession_properties - val accessioned_filename - val log_filename + tuple val(vcf_file), val(assembly_accession), val(aggregation), val(fasta), val(report) output: + val accessioned_filename, emit: accessioned_filenames path "${accessioned_filename}.tmp", emit: accession_done + script: + def pipeline_parameters = "" + pipeline_parameters += " --parameters.assemblyAccession=" + assembly_accession.toString() + pipeline_parameters += " --parameters.vcfAggregation=" + aggregation.toString() + pipeline_parameters += " --parameters.fasta=" + fasta.toString() + pipeline_parameters += " --parameters.assemblyReportUrl=file:" + report.toString() + pipeline_parameters += " --parameters.vcf=" + vcf_file.toString() + + vcf_filename = vcf_file.getFileName().toString() + accessioned_filename = vcf_filename.take(vcf_filename.indexOf(".vcf")) + ".accessioned.vcf" + log_filename = "accessioning.${vcf_filename}" + + pipeline_parameters += " --parameters.outputVcf=" + "${params.public_dir}/${accessioned_filename}" + + """ - filename=\$(basename $accession_properties) - filename=\${filename%.*} - (java -Xmx6g -jar $params.jar.accession_pipeline --spring.config.name=\$filename) || \ + (java -Xmx6g -jar $params.jar.accession_pipeline --spring.config.location=file:$params.accession_job_props $pipeline_parameters) || \ # If accessioning fails due to missing variants, but the only missing variants are structural variants, # then we should treat this as a success from the perspective of the automation. # TODO revert once accessioning pipeline properly registers structural variants diff --git a/eva_submission/nextflow/variant_load.nf b/eva_submission/nextflow/variant_load.nf index 482d5720..74dab762 100644 --- a/eva_submission/nextflow/variant_load.nf +++ b/eva_submission/nextflow/variant_load.nf @@ -10,9 +10,8 @@ def helpMessage() { --valid_vcfs csv file with the mappings for vcf file, assembly accession, fasta, assembly report, analysis_accession, db_name, vep version, vep cache version, aggregation --project_accession project accession - --load_job_props variant load job-specific properties, passed as a map - --acc_import_job_props import accession job-specific properties, passed as a map - --eva_pipeline_props main properties file for eva pipeline + --load_job_props properties file for variant load job + --acc_import_job_props properties file for accession import job --annotation_only whether to only run annotation job --taxonomy taxonomy id --project_dir project directory @@ -25,7 +24,6 @@ params.vep_path = null params.project_accession = null params.load_job_props = null params.acc_import_job_props = null -params.eva_pipeline_props = null params.annotation_only = false params.taxonomy = null params.project_dir = null @@ -41,14 +39,13 @@ params.help = null if (params.help) exit 0, helpMessage() // Test inputs -if (!params.valid_vcfs || !params.vep_path || !params.project_accession || !params.taxonomy || !params.load_job_props || !params.acc_import_job_props || !params.eva_pipeline_props || !params.project_dir || !params.logs_dir) { +if (!params.valid_vcfs || !params.vep_path || !params.project_accession || !params.taxonomy || !params.load_job_props || !params.acc_import_job_props || !params.project_dir || !params.logs_dir) { if (!params.valid_vcfs) log.warn('Provide a csv file with the mappings (vcf file, assembly accession, fasta, assembly report, analysis_accession, db_name) --valid_vcfs') if (!params.vep_path) log.warn('Provide path to VEP installations using --vep_path') if (!params.project_accession) log.warn('Provide project accession using --project_accession') if (!params.taxonomy) log.warn('Provide taxonomy id using --taxonomy') - if (!params.load_job_props) log.warn('Provide job-specific properties using --load_job_props') - if (!params.acc_import_job_props) log.warn('Provide accession load properties using --acc_import_job_props') - if (!params.eva_pipeline_props) log.warn('Provide an EVA Pipeline properties file using --eva_pipeline_props') + if (!params.load_job_props) log.warn('Provide path to variant load job properties file --load_job_props') + if (!params.acc_import_job_props) log.warn('Provide path to accession import job properties file using --acc_import_job_props') if (!params.project_dir) log.warn('Provide project directory using --project_dir') if (!params.logs_dir) log.warn('Provide logs directory using --logs_dir') exit 1, helpMessage() @@ -59,147 +56,94 @@ workflow { unmerged_vcfs = Channel.fromPath(params.valid_vcfs) .splitCsv(header:true) .map{row -> tuple(file(row.vcf_file), file(row.fasta), row.analysis_accession, row.db_name, row.vep_version, row.vep_cache_version, row.vep_species, row.aggregation)} - create_properties(unmerged_vcfs) - load_vcf(create_properties.out.variant_load_props) + load_vcf(unmerged_vcfs) if (params.taxonomy != 9606) { vcf_files_list = Channel.fromPath(params.valid_vcfs) .splitCsv(header:true) .map{row -> tuple(file(row.vcf_file), row.db_name)} - create_properties_for_acc_import_job(vcf_files_list) - import_accession(load_vcf.out.variant_load_complete, create_properties_for_acc_import_job.out.accession_import_props) + import_accession(vcf_files_list, load_vcf.out.variant_load_complete) } } -/* - * Create properties files for load. - */ -process create_properties { - input: - tuple val(vcf_file), val(fasta), val(analysis_accession), val(db_name), val(vep_version), val(vep_cache_version), val(vep_species), val(aggregation) - - output: - path "load_${vcf_file.getFileName()}.properties", emit: variant_load_props - - exec: - props = new Properties() - params.load_job_props.each { k, v -> - props.setProperty(k, v.toString()) - } - if (params.annotation_only) { - props.setProperty("spring.batch.job.names", "annotate-variants-job") - } else { - props.setProperty("spring.batch.job.names", aggregation.toString() == "none" ? "genotyped-vcf-job" : "aggregated-vcf-job") - } - props.setProperty("input.vcf.aggregation", aggregation.toString().toUpperCase()) - props.setProperty("input.vcf", vcf_file.toRealPath().toString()) - props.setProperty("input.vcf.id", analysis_accession.toString()) - props.setProperty("input.fasta", fasta.toString()) - props.setProperty("spring.data.mongodb.database", db_name.toString()) - if (vep_version == "" || vep_cache_version == "") { - props.setProperty("annotation.skip", "true") - } else { - props.setProperty("annotation.skip", "false") - props.setProperty("app.vep.version", vep_version.toString()) - props.setProperty("app.vep.path", "${params.vep_path}/ensembl-vep-release-${vep_version}/vep") - props.setProperty("app.vep.cache.version", vep_cache_version.toString()) - props.setProperty("app.vep.cache.species", vep_species.toString()) - } - // need to explicitly store in workDir so next process can pick it up - // see https://github.com/nextflow-io/nextflow/issues/942#issuecomment-441536175 - props_file = new File("${task.workDir}/load_${vcf_file.getFileName()}.properties") - props_file.createNewFile() - props_file.newWriter().withWriter { w -> - props.each { k, v -> - w.write("$k=$v\n") - } - } - // make a copy for debugging purposes - new File("${params.project_dir}/load_${vcf_file.getFileName()}.properties") << props_file.asWritable() -} - - /* * Load into variant db. */ process load_vcf { clusterOptions { - log_filename = variant_load_properties.getFileName().toString() - log_filename = log_filename.substring(5, log_filename.indexOf('.properties')) + log_filename = vcf_file.getFileName().toString() return "-o $params.logs_dir/pipeline.${log_filename}.log \ -e $params.logs_dir/pipeline.${log_filename}.err" } input: - path variant_load_properties + tuple val(vcf_file), val(fasta), val(analysis_accession), val(db_name), val(vep_version), val(vep_cache_version), val(vep_species), val(aggregation) output: val true, emit: variant_load_complete memory '5 GB' - """ - java -Xmx4G -jar $params.jar.eva_pipeline --spring.config.location=file:$params.eva_pipeline_props --parameters.path=$variant_load_properties - """ -} + script: + def pipeline_parameters = "" + if (params.annotation_only) { + pipeline_parameters += " --spring.batch.job.names=annotate-variants-job" + } else if(aggregation.toString() == "none"){ + pipeline_parameters += " --spring.batch.job.names=genotyped-vcf-job" + } else{ + pipeline_parameters += " --spring.batch.job.names=aggregated-vcf-job" + } -/* - * Create properties files for Accession Import Job. - */ -process create_properties_for_acc_import_job { - input: - tuple val(vcf_file), val(db_name) + pipeline_parameters += " --input.vcf.aggregation=" + aggregation.toString().toUpperCase() + pipeline_parameters += " --input.vcf=" + vcf_file.toRealPath().toString() + pipeline_parameters += " --input.vcf.id=" + analysis_accession.toString() + pipeline_parameters += " --input.fasta=" + fasta.toString() - output: - path "${acc_import_property_file_name}", emit: accession_import_props + pipeline_parameters += " --spring.data.mongodb.database=" + db_name.toString() - exec: - props = new Properties() - params.acc_import_job_props.each { k, v -> - props.setProperty(k, v.toString()) + if (vep_version.trim() == "" || vep_cache_version.trim() == "") { + pipeline_parameters += " --annotation.skip=true" + } else { + pipeline_parameters += " --annotation.skip=false" + pipeline_parameters += " --app.vep.version=" + vep_version.toString() + pipeline_parameters += " --app.vep.path=" + "${params.vep_path}/ensembl-vep-release-${vep_version}/vep" + pipeline_parameters += " --app.vep.cache.version=" + vep_cache_version.toString() + pipeline_parameters += " --app.vep.cache.species=" + vep_species.toString() } - accessioned_report_name = vcf_file.getFileName().toString().replace('.vcf','.accessioned.vcf') - - props.setProperty("input.accession.report", "${params.project_dir}/60_eva_public/${accessioned_report_name}") - props.setProperty("spring.batch.job.names", "accession-import-job") - props.setProperty("spring.data.mongodb.database", db_name.toString()) - - // need to explicitly store in workDir so next process can pick it up - // see https://github.com/nextflow-io/nextflow/issues/942#issuecomment-441536175 - acc_import_property_file_name = "acc_import_${accessioned_report_name}.properties" - props_file = new File("${task.workDir}/${acc_import_property_file_name}") - props_file.createNewFile() - props_file.newWriter().withWriter { w -> - props.each { k, v -> - w.write("$k=$v\n") - } - } - // make a copy for debugging purposes - new File("${params.project_dir}/${acc_import_property_file_name}") << props_file.asWritable() + """ + java -Xmx4G -jar $params.jar.eva_pipeline --spring.config.location=file:$params.load_job_props $pipeline_parameters + """ } - /* * Import Accession Into Variant warehouse */ process import_accession { clusterOptions { - log_filename = accession_import_properties.getFileName().toString() - log_filename = log_filename.substring(11, log_filename.indexOf('.properties')) + log_filename = vcf_file.getFileName().toString() return "-o $params.logs_dir/pipeline.${log_filename}.log \ -e $params.logs_dir/pipeline.${log_filename}.err" } input: + tuple val(vcf_file), val(db_name) val variant_load_output - path accession_import_properties memory '5 GB' + script: + def pipeline_parameters = "" + + accessioned_report_name = vcf_file.getFileName().toString().replace('.vcf','.accessioned.vcf') + pipeline_parameters += " --input.accession.report=" + "${params.project_dir}/60_eva_public/${accessioned_report_name}" + pipeline_parameters += " --spring.batch.job.names=accession-import-job" + pipeline_parameters += " --spring.data.mongodb.database=" + db_name.toString() + + """ - java -Xmx4G -jar $params.jar.eva_pipeline --spring.config.location=file:$params.eva_pipeline_props --parameters.path=$accession_import_properties + java -Xmx4G -jar $params.jar.eva_pipeline --spring.config.location=file:$params.acc_import_job_props $pipeline_parameters """ } diff --git a/eva_submission/vep_utils.py b/eva_submission/vep_utils.py index c491c8fa..3bd357e8 100644 --- a/eva_submission/vep_utils.py +++ b/eva_submission/vep_utils.py @@ -16,7 +16,8 @@ from ebi_eva_common_pyutils.config import cfg from ebi_eva_common_pyutils.logger import logging_config as log_cfg -from eva_submission.ingestion_templates import annotation_metadata_collection_name +annotation_metadata_collection_name = 'annotationMetadata_2_0' +annotation_collection_name = 'annotations_2_0' logger = log_cfg.get_logger(__name__) diff --git a/requirements.txt b/requirements.txt index 2139d780..99b6be0c 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,6 +1,6 @@ cached-property cerberus -ebi-eva-common-pyutils>=0.5.3.dev6 +ebi-eva-common-pyutils>=0.5.3 eva-vcf-merge>=0.0.6 humanize lxml diff --git a/tests/nextflow-tests/java/FakeAccessionPipeline.java b/tests/nextflow-tests/java/FakeAccessionPipeline.java index 07211de9..67f8033d 100644 --- a/tests/nextflow-tests/java/FakeAccessionPipeline.java +++ b/tests/nextflow-tests/java/FakeAccessionPipeline.java @@ -9,8 +9,9 @@ public static void main(String[] args) { String inFile = null; for (String arg: args) { outString += " " + arg; - if (arg.startsWith("--spring.config.name=")) - inFile = arg.substring("--spring.config.name=".length()); + if (arg.startsWith("--spring.config.location=")){ + inFile = arg.substring("--spring.config.location=file:".length()); + } } System.out.println(outString); diff --git a/tests/nextflow-tests/test_ingestion_config.yaml b/tests/nextflow-tests/test_ingestion_config.yaml index d2113903..0975fe21 100644 --- a/tests/nextflow-tests/test_ingestion_config.yaml +++ b/tests/nextflow-tests/test_ingestion_config.yaml @@ -13,17 +13,9 @@ source_assemblies: - GCA_0000003 target_assembly_accession: GCA_0000003 -accession_job_props: - test.prop1: x - test.prop2: y - parameters.taxonomyAccession: 1234 -eva_pipeline_props: test_eva_pipeline.properties -load_job_props: - test.prop1: a - test.prop2: b -acc_import_job_props: - test.prop1: a - test.prop2: b +accession_job_props: accession.properties +load_job_props: test_variant_load.properties +acc_import_job_props: test_accession_import.properties executable: bcftools: ../../../bin/fake_bcftools.sh diff --git a/tests/nextflow-tests/test_ingestion_config_human.yaml b/tests/nextflow-tests/test_ingestion_config_human.yaml index 2ffb2bf0..48e9364b 100644 --- a/tests/nextflow-tests/test_ingestion_config_human.yaml +++ b/tests/nextflow-tests/test_ingestion_config_human.yaml @@ -7,17 +7,9 @@ public_dir: ../../../project/public valid_vcfs: vcf_files_to_ingest.csv vep_path: /path/to/vep -accession_job_props: - test.prop1: x - test.prop2: y - parameters.taxonomyAccession: 9606 -eva_pipeline_props: test_eva_pipeline.properties -load_job_props: - test.prop1: a - test.prop2: b -acc_import_job_props: - test.prop1: a - test.prop2: b +accession_job_props: accession.properties +load_job_props: test_variant_load.properties +acc_import_job_props: test_accession_import.properties aggregation_type: none diff --git a/tests/nextflow-tests/test_ingestion_no_remapping_config.yaml b/tests/nextflow-tests/test_ingestion_no_remapping_config.yaml index f7d798ff..b65b1aee 100644 --- a/tests/nextflow-tests/test_ingestion_no_remapping_config.yaml +++ b/tests/nextflow-tests/test_ingestion_no_remapping_config.yaml @@ -10,17 +10,9 @@ source_assemblies: - GCA_0000003 target_assembly_accession: GCA_0000003 -accession_job_props: - test.prop1: x - test.prop2: y - parameters.taxonomyAccession: 1234 -eva_pipeline_props: test_eva_pipeline.properties -load_job_props: - test.prop1: a - test.prop2: b -acc_import_job_props: - test.prop1: a - test.prop2: b +accession_job_props: accession.properties +load_job_props: test_variant_load.properties +acc_import_job_props: test_accession_import.properties executable: bcftools: ../../../bin/fake_bcftools.sh diff --git a/tests/resources/settings.xml b/tests/resources/settings.xml index e26746f8..287b10a5 100644 --- a/tests/resources/settings.xml +++ b/tests/resources/settings.xml @@ -16,6 +16,13 @@ user pass accession + host + user + pass + files + variants + annotation-metadata + annotation host user pass diff --git a/tests/resources/submission_config.yml b/tests/resources/submission_config.yml index 92069ff2..cf11b31a 100644 --- a/tests/resources/submission_config.yml +++ b/tests/resources/submission_config.yml @@ -5,6 +5,7 @@ projects_dir: 'tests/resources/projects' eva_pipeline_props: '/path/to/pipeline/properties' vep_path: 'tests/resources/vep' vep_cache_path: 'tests/resources/vep_cache' +opencga_path: '/path/to/opencga' noah: genomes_dir: 'old/genomes' diff --git a/tests/test_eload_ingestion.py b/tests/test_eload_ingestion.py index 7b005143..4ce2b963 100644 --- a/tests/test_eload_ingestion.py +++ b/tests/test_eload_ingestion.py @@ -15,7 +15,10 @@ def default_db_results_for_metadata_load(): [(391,)] # Check the assembly_set_id in update_assembly_set_in_analysis ] - +def default_db_results_for_target_assembly(): + return [ + [('GCA_999')] + ] def default_db_results_for_accession(): browsable_files = [(1, 'ERA', 'filename_1', 'PRJ', 123), (2, 'ERA', 'filename_1', 'PRJ', 123)] return [ @@ -41,6 +44,7 @@ def default_db_results_for_clustering(): def default_db_results_for_ingestion(): return ( default_db_results_for_metadata_load() + + default_db_results_for_target_assembly() + default_db_results_for_accession() + default_db_results_for_clustering() + default_db_results_for_variant_load() @@ -99,7 +103,8 @@ def test_check_brokering_done(self): def test_check_variant_db_no_creation(self): with self._patch_metadata_handle(), self._patch_get_dbname('eva_ecaballus_30'), \ - self._patch_mongo_database(collection_names=['col1']) as m_mongo: + patch('eva_submission.eload_ingestion.insert_new_assembly_and_taxonomy') as insert_asm_tax, \ + self._patch_mongo_database(collection_names=['col1']) as m_mongo: self.eload.check_variant_db() # Check the database name is correct and has been set in the config @@ -119,7 +124,8 @@ def test_check_variant_db_name_not_creatd(self): def test_check_variant_db_with_creation(self): with self._patch_metadata_handle(), self._patch_get_dbname('eva_ecaballus_30'), \ - self._patch_mongo_database(collection_names=[]) as m_mongo: + patch('eva_submission.eload_ingestion.insert_new_assembly_and_taxonomy') as insert_asm_tax, \ + self._patch_mongo_database(collection_names=[]) as m_mongo: self.eload.check_variant_db() self.assertEqual( @@ -151,23 +157,13 @@ def test_ingest_all_tasks(self): patch('eva_submission.eload_utils.get_all_results_for_query') as m_get_alias_results, \ patch('eva_submission.eload_ingestion.get_vep_and_vep_cache_version') as m_get_vep_versions, \ patch('eva_submission.eload_utils.requests.post') as m_post, \ - patch('eva_submission.eload_ingestion.get_ncbi_assembly_dicts_from_term') as m_get_assembly_dict, \ - patch('eva_submission.eload_ingestion.retrieve_species_scientific_name_from_tax_id_ncbi') as m_get_scf_name, \ + patch('eva_submission.eload_ingestion.get_species_name_from_ncbi') as m_get_species, \ patch('eva_submission.eload_ingestion.get_assembly_name_and_taxonomy_id') as m_get_tax, \ + patch('eva_submission.eload_ingestion.insert_new_assembly_and_taxonomy') as insert_asm_tax, \ self._patch_mongo_database(): m_get_alias_results.return_value = [['alias']] m_get_vep_versions.return_value = (100, 100) - m_get_assembly_dict.return_value = [{ - "taxid": "9606", - "assemblyname": "GRCh37.p13", - "assemblyaccession": "GCA_000001000.1", - "synonym": { - "genbank": "GCA_000001405.14", - "refseq": "GCF_000001405.25", - "similarity": "identical" - } - }] - m_get_scf_name.return_value = 'Homo sapiens' + m_get_species.return_value = 'homo_sapiens' m_post.return_value.text = self.get_mock_result_for_ena_date() m_get_results.side_effect = default_db_results_for_ingestion() m_get_tax.return_value = ('name', '9090') @@ -179,6 +175,7 @@ def test_ingest_metadata_load(self): patch('eva_submission.eload_utils.get_metadata_connection_handle', autospec=True), \ patch('eva_submission.eload_utils.get_all_results_for_query') as m_get_alias_results, \ patch('eva_submission.eload_utils.requests.post') as m_post, \ + patch('eva_submission.eload_ingestion.insert_new_assembly_and_taxonomy') as insert_asm_tax, \ self._patch_mongo_database(): m_get_alias_results.return_value = [['alias']] m_post.return_value.text = self.get_mock_result_for_ena_date() @@ -200,25 +197,15 @@ def test_ingest_accession(self): patch('eva_submission.eload_utils.get_metadata_connection_handle', autospec=True), \ patch('eva_submission.eload_utils.get_all_results_for_query') as m_get_alias_results, \ patch('eva_submission.eload_ingestion.get_vep_and_vep_cache_version') as m_get_vep_versions, \ - patch('eva_submission.eload_ingestion.get_ncbi_assembly_dicts_from_term') as m_get_assembly_dict, \ - patch('eva_submission.eload_ingestion.retrieve_species_scientific_name_from_tax_id_ncbi') as m_get_scf_name, \ + patch('eva_submission.eload_ingestion.get_species_name_from_ncbi') as m_get_species, \ + patch('eva_submission.eload_ingestion.insert_new_assembly_and_taxonomy') as insert_asm_tax, \ patch('eva_submission.eload_utils.requests.post') as m_post, \ self._patch_mongo_database(): m_get_alias_results.return_value = [['alias']] m_get_vep_versions.return_value = (100, 100) - m_get_assembly_dict.return_value = [{ - "taxid": "9606", - "assemblyname": "GRCh37.p13", - "assemblyaccession": "GCA_000001000.1", - "synonym": { - "genbank": "GCA_000001405.14", - "refseq": "GCF_000001405.25", - "similarity": "identical" - } - }] - m_get_scf_name.return_value = 'Homo sapiens' + m_get_species.return_value = 'homo_sapiens' m_post.return_value.text = self.get_mock_result_for_ena_date() - m_get_results.side_effect = default_db_results_for_accession() + m_get_results.side_effect = default_db_results_for_target_assembly() + default_db_results_for_accession() self.eload.ingest( instance_id=1, tasks=['accession'] @@ -234,23 +221,13 @@ def test_ingest_variant_load(self): patch('eva_submission.eload_utils.get_metadata_connection_handle', autospec=True), \ patch('eva_submission.eload_utils.get_all_results_for_query') as m_get_alias_results, \ patch('eva_submission.eload_ingestion.get_vep_and_vep_cache_version') as m_get_vep_versions, \ - patch('eva_submission.eload_ingestion.get_ncbi_assembly_dicts_from_term') as m_get_assembly_dict, \ - patch('eva_submission.eload_ingestion.retrieve_species_scientific_name_from_tax_id_ncbi') as m_get_scf_name, \ + patch('eva_submission.eload_ingestion.get_species_name_from_ncbi') as m_get_species, \ patch('eva_submission.eload_utils.requests.post') as m_post, \ + patch('eva_submission.eload_ingestion.insert_new_assembly_and_taxonomy') as insert_asm_tax, \ self._patch_mongo_database(): m_get_alias_results.return_value = [['alias']] m_get_vep_versions.return_value = (100, 100) - m_get_assembly_dict.return_value = [{ - "taxid": "9606", - "assemblyname": "GRCh37.p13", - "assemblyaccession": "GCA_000001000.1", - "synonym": { - "genbank": "GCA_000001405.14", - "refseq": "GCF_000001405.25", - "similarity": "identical" - } - }] - m_get_scf_name.return_value = 'Homo sapiens' + m_get_species.return_value = 'homo_sapiens' m_post.return_value.text = self.get_mock_result_for_ena_date() m_get_results.side_effect = default_db_results_for_variant_load() self.eload.ingest(tasks=['variant_load']) @@ -339,25 +316,15 @@ def test_ingest_variant_load_vep_versions_found(self): patch('eva_submission.eload_utils.get_metadata_connection_handle', autospec=True), \ patch('eva_submission.eload_utils.get_all_results_for_query') as m_get_alias_results, \ patch('eva_submission.eload_ingestion.get_vep_and_vep_cache_version') as m_get_vep_versions, \ - patch('eva_submission.eload_ingestion.get_ncbi_assembly_dicts_from_term') as m_get_assembly_dict, \ - patch('eva_submission.eload_ingestion.retrieve_species_scientific_name_from_tax_id_ncbi') as m_get_scf_name, \ + patch('eva_submission.eload_ingestion.get_species_name_from_ncbi') as m_get_species, \ patch('eva_submission.eload_utils.requests.post') as m_post, \ + patch('eva_submission.eload_ingestion.insert_new_assembly_and_taxonomy') as insert_asm_tax, \ self._patch_mongo_database(): m_get_alias_results.return_value = [['alias']] m_post.return_value.text = self.get_mock_result_for_ena_date() m_get_results.side_effect = default_db_results_for_variant_load() m_get_vep_versions.return_value = (100, 100) - m_get_assembly_dict.return_value = [{ - "taxid": "9606", - "assemblyname": "GRCh37.p13", - "assemblyaccession": "GCA_000001000.1", - "synonym": { - "genbank": "GCA_000001405.14", - "refseq": "GCF_000001405.25", - "similarity": "identical" - } - }] - m_get_scf_name.return_value = 'Homo sapiens' + m_get_species.return_value = 'homo_sapiens' self.eload.ingest(tasks=['variant_load']) self.assert_vep_versions(100, 100, 'homo_sapiens') @@ -372,25 +339,15 @@ def test_ingest_variant_load_vep_versions_not_found(self): patch('eva_submission.eload_utils.get_metadata_connection_handle', autospec=True), \ patch('eva_submission.eload_utils.get_all_results_for_query') as m_get_alias_results, \ patch('eva_submission.eload_ingestion.get_vep_and_vep_cache_version') as m_get_vep_versions, \ - patch('eva_submission.eload_ingestion.get_ncbi_assembly_dicts_from_term') as m_get_assembly_dict, \ - patch('eva_submission.eload_ingestion.retrieve_species_scientific_name_from_tax_id_ncbi') as m_get_scf_name, \ + patch('eva_submission.eload_ingestion.get_species_name_from_ncbi') as m_get_species, \ patch('eva_submission.eload_utils.requests.post') as m_post, \ + patch('eva_submission.eload_ingestion.insert_new_assembly_and_taxonomy') as insert_asm_tax, \ self._patch_mongo_database(): m_get_alias_results.return_value = [['alias']] m_post.return_value.text = self.get_mock_result_for_ena_date() m_get_results.side_effect = default_db_results_for_variant_load() m_get_vep_versions.return_value = (None, None) - m_get_assembly_dict.return_value = [{ - "taxid": "9606", - "assemblyname": "GRCh37.p13", - "assemblyaccession": "GCA_000001000.1", - "synonym": { - "genbank": "GCA_000001405.14", - "refseq": "GCF_000001405.25", - "similarity": "identical" - } - }] - m_get_scf_name.return_value = 'Homo sapiens' + m_get_species.return_value = 'homo_sapiens' self.eload.ingest(tasks=['variant_load']) self.assert_vep_versions('', '', '') @@ -405,6 +362,7 @@ def test_ingest_variant_load_vep_versions_error(self): patch('eva_submission.eload_utils.get_all_results_for_query') as m_get_alias_results, \ patch('eva_submission.eload_ingestion.get_vep_and_vep_cache_version') as m_get_vep_versions, \ patch('eva_submission.eload_utils.requests.post') as m_post, \ + patch('eva_submission.eload_ingestion.insert_new_assembly_and_taxonomy') as insert_asm_tax, \ self._patch_mongo_database(): m_get_alias_results.return_value = [['alias']] m_post.return_value.text = self.get_mock_result_for_ena_date() @@ -422,23 +380,13 @@ def test_ingest_annotation_only(self): patch('eva_submission.eload_utils.get_metadata_connection_handle', autospec=True), \ patch('eva_submission.eload_utils.get_all_results_for_query') as m_get_alias_results, \ patch('eva_submission.eload_ingestion.get_vep_and_vep_cache_version') as m_get_vep_versions, \ - patch('eva_submission.eload_ingestion.get_ncbi_assembly_dicts_from_term') as m_get_assembly_dict, \ - patch('eva_submission.eload_ingestion.retrieve_species_scientific_name_from_tax_id_ncbi') as m_get_scf_name, \ + patch('eva_submission.eload_ingestion.get_species_name_from_ncbi') as m_get_species, \ patch('eva_submission.eload_utils.requests.post') as m_post, \ + patch('eva_submission.eload_ingestion.insert_new_assembly_and_taxonomy') as insert_asm_tax, \ self._patch_mongo_database(): m_get_alias_results.return_value = [['alias']] m_get_vep_versions.return_value = (100, 100) - m_get_assembly_dict.return_value = [{ - "taxid": "9606", - "assemblyname": "GRCh37.p13", - "assemblyaccession": "GCA_000001000.1", - "synonym": { - "genbank": "GCA_000001405.14", - "refseq": "GCF_000001405.25", - "similarity": "identical" - } - }] - m_get_scf_name.return_value = 'Homo sapiens' + m_get_species.return_value = 'homo_sapiens' m_post.return_value.text = self.get_mock_result_for_ena_date() m_get_results.side_effect = default_db_results_for_variant_load() self.eload.ingest(tasks=['annotation']) @@ -451,6 +399,7 @@ def test_ingest_clustering(self): patch('eva_submission.eload_ingestion.get_all_results_for_query') as m_get_results, \ patch('eva_submission.eload_ingestion.command_utils.run_command_with_output', autospec=True) as m_run_command, \ patch('eva_submission.eload_ingestion.get_assembly_name_and_taxonomy_id') as m_get_tax, \ + patch('eva_submission.eload_ingestion.insert_new_assembly_and_taxonomy') as insert_asm_tax, \ self._patch_mongo_database(): m_get_results.side_effect = default_db_results_for_clustering() m_get_tax.return_value = ('name', '9796') @@ -463,6 +412,7 @@ def test_ingest_clustering_no_supported_assembly(self): patch('eva_submission.eload_ingestion.get_all_results_for_query') as m_get_results, \ patch.object(EloadIngestion, '_get_target_assembly') as m_target_assembly, \ patch('eva_submission.eload_ingestion.command_utils.run_command_with_output', autospec=True) as m_run_command, \ + patch('eva_submission.eload_ingestion.insert_new_assembly_and_taxonomy') as insert_asm_tax, \ self._patch_mongo_database(): m_target_assembly.return_value = None m_get_results.return_value = [] @@ -477,6 +427,7 @@ def test_ingest_clustering_supported_assembly_in_another_taxonomy(self): patch.object(EloadIngestion, "_insert_new_supported_asm_from_ensembl", new=MagicMock()) as m_new_supported_asm, \ patch('eva_submission.eload_ingestion.get_assembly_name_and_taxonomy_id') as m_get_tax, \ patch('eva_submission.eload_ingestion.command_utils.run_command_with_output', autospec=True) as m_run_command, \ + patch('eva_submission.eload_ingestion.insert_new_assembly_and_taxonomy') as insert_asm_tax, \ self._patch_mongo_database(): m_get_tax.return_value = ('name', 66666) m_get_supported_asm.side_effect = [None, 'gca_in_another_tax'] @@ -490,6 +441,7 @@ def test_ingest_clustering_supported_assembly_in_another_taxonomy(self): patch.object(EloadIngestion, "_insert_new_supported_asm_from_ensembl", new=MagicMock()) as m_new_supported_asm, \ patch('eva_submission.eload_ingestion.get_assembly_name_and_taxonomy_id') as m_get_tax, \ patch('eva_submission.eload_ingestion.command_utils.run_command_with_output', autospec=True) as m_run_command, \ + patch('eva_submission.eload_ingestion.insert_new_assembly_and_taxonomy') as insert_asm_tax, \ self._patch_mongo_database(): m_get_tax.return_value = ('name', 66666) m_new_supported_asm.side_effect = [None, 'gca_in_another_tax'] @@ -506,25 +458,17 @@ def test_resume_when_step_fails(self): patch('eva_submission.eload_utils.get_all_results_for_query') as m_get_alias_results, \ patch('eva_submission.eload_ingestion.get_vep_and_vep_cache_version') as m_get_vep_versions, \ patch('eva_submission.eload_utils.requests.post') as m_post, \ - patch('eva_submission.eload_ingestion.get_ncbi_assembly_dicts_from_term') as m_get_assembly_dict, \ - patch('eva_submission.eload_ingestion.retrieve_species_scientific_name_from_tax_id_ncbi') as m_get_scf_name, \ + patch('eva_submission.eload_ingestion.get_species_name_from_ncbi') as m_get_species, \ patch('eva_submission.eload_ingestion.get_assembly_name_and_taxonomy_id') as m_get_tax, \ + patch('eva_submission.eload_ingestion.insert_new_assembly_and_taxonomy') as insert_asm_tax, \ self._patch_mongo_database(): m_get_alias_results.return_value = [['alias']] m_get_vep_versions.return_value = (100, 100) - m_get_assembly_dict.return_value = [{ - "taxid": "9606", - "assemblyname": "GRCh37.p13", - "assemblyaccession": "GCA_000001000.1", - "synonym": { - "genbank": "GCA_000001405.14", - "refseq": "GCF_000001405.25", - "similarity": "identical" - } - }] - m_get_scf_name.return_value = 'Homo sapiens' + m_get_species.return_value = 'homo_sapiens' m_post.return_value.text = self.get_mock_result_for_ena_date() - m_get_results.side_effect = default_db_results_for_metadata_load() + default_db_results_for_ingestion() + m_get_results.side_effect = default_db_results_for_metadata_load() \ + + default_db_results_for_target_assembly()\ + + default_db_results_for_ingestion() m_run_command.side_effect = [ None, # metadata load @@ -552,23 +496,13 @@ def test_resume_completed_job(self): patch('eva_submission.eload_utils.get_all_results_for_query') as m_get_alias_results, \ patch('eva_submission.eload_ingestion.get_vep_and_vep_cache_version') as m_get_vep_versions, \ patch('eva_submission.eload_utils.requests.post') as m_post, \ - patch('eva_submission.eload_ingestion.get_ncbi_assembly_dicts_from_term') as m_get_assembly_dict, \ - patch('eva_submission.eload_ingestion.retrieve_species_scientific_name_from_tax_id_ncbi') as m_get_scf_name, \ + patch('eva_submission.eload_ingestion.get_species_name_from_ncbi') as m_get_species, \ patch('eva_submission.eload_ingestion.get_assembly_name_and_taxonomy_id') as m_get_tax, \ + patch('eva_submission.eload_ingestion.insert_new_assembly_and_taxonomy') as insert_asm_tax, \ self._patch_mongo_database(): m_get_alias_results.return_value = [['alias']] m_get_vep_versions.return_value = (100, 100) - m_get_assembly_dict.return_value = [{ - "taxid": "9606", - "assemblyname": "GRCh37.p13", - "assemblyaccession": "GCA_000001000.1", - "synonym": { - "genbank": "GCA_000001405.14", - "refseq": "GCF_000001405.25", - "similarity": "identical" - } - }] - m_get_scf_name.return_value = 'Homo sapiens' + m_get_species.return_value = 'homo_sapiens' m_post.return_value.text = self.get_mock_result_for_ena_date() m_get_results.side_effect = default_db_results_for_ingestion() + default_db_results_for_ingestion() m_get_tax.return_value = ('name', '9796') @@ -591,26 +525,16 @@ def test_resume_with_tasks(self): patch('eva_submission.eload_utils.get_metadata_connection_handle', autospec=True), \ patch('eva_submission.eload_utils.get_all_results_for_query') as m_get_alias_results, \ patch('eva_submission.eload_ingestion.get_vep_and_vep_cache_version') as m_get_vep_versions, \ - patch('eva_submission.eload_ingestion.get_ncbi_assembly_dicts_from_term') as m_get_assembly_dict, \ - patch('eva_submission.eload_ingestion.retrieve_species_scientific_name_from_tax_id_ncbi') as m_get_scf_name, \ + patch('eva_submission.eload_ingestion.get_species_name_from_ncbi') as m_get_species, \ patch('eva_submission.eload_utils.requests.post') as m_post, \ + patch('eva_submission.eload_ingestion.insert_new_assembly_and_taxonomy') as insert_asm_tax, \ self._patch_mongo_database(): m_get_alias_results.return_value = [['alias']] m_get_vep_versions.return_value = (100, 100) - m_get_assembly_dict.return_value = [{ - "taxid": "9606", - "assemblyname": "GRCh37.p13", - "assemblyaccession": "GCA_000001000.1", - "synonym": { - "genbank": "GCA_000001405.14", - "refseq": "GCF_000001405.25", - "similarity": "identical" - } - }] - m_get_scf_name.return_value = 'Homo sapiens' + m_get_species.return_value = 'homo_sapiens' m_post.return_value.text = self.get_mock_result_for_ena_date() - m_get_results.side_effect = ( - default_db_results_for_variant_load() + m_get_results.side_effect = (default_db_results_for_target_assembly() + + default_db_results_for_variant_load() + default_db_results_for_target_assembly() + default_db_results_for_accession() + default_db_results_for_variant_load() )