From e265b272aa89bcdc65948cbaa8b776c42785809d Mon Sep 17 00:00:00 2001 From: tcezard Date: Mon, 16 Dec 2024 14:12:58 +0000 Subject: [PATCH 1/2] processing via a scanner and new brokering method --- eva_sub_cli_processing/process_jobs.py | 120 +++++++++++------- eva_sub_cli_processing/sub_cli_brokering.py | 101 +++++++++++++++ eva_sub_cli_processing/sub_cli_ingestion.py | 8 ++ eva_sub_cli_processing/sub_cli_submission.py | 49 +++++++ eva_sub_cli_processing/sub_cli_utils.py | 55 ++++++++ eva_sub_cli_processing/sub_cli_validation.py | 11 ++ .../biosamples_submitters.py | 85 +++++++++++-- tests/test_biosamples_submission.py | 29 ++++- .../test_process_jobs.py | 5 +- 9 files changed, 406 insertions(+), 57 deletions(-) create mode 100644 eva_sub_cli_processing/sub_cli_brokering.py create mode 100755 eva_sub_cli_processing/sub_cli_ingestion.py create mode 100755 eva_sub_cli_processing/sub_cli_submission.py create mode 100755 eva_sub_cli_processing/sub_cli_utils.py create mode 100755 eva_sub_cli_processing/sub_cli_validation.py diff --git a/eva_sub_cli_processing/process_jobs.py b/eva_sub_cli_processing/process_jobs.py index d89d744..02091cb 100644 --- a/eva_sub_cli_processing/process_jobs.py +++ b/eva_sub_cli_processing/process_jobs.py @@ -1,34 +1,11 @@ -from datetime import datetime - -import requests from ebi_eva_common_pyutils.common_utils import pretty_print -from ebi_eva_common_pyutils.config import cfg from ebi_eva_common_pyutils.logger import AppLogger -from retry import retry - - -def _url_build(*args, **kwargs): - url = cfg.query('submissions', 'webservice', 'url') + '/' + '/'.join(args) - if kwargs: - return url + '?' + '&'.join(f'{k}={v}' for k, v in kwargs.items()) - else: - return url - -@retry(tries=5, backoff=2, jitter=.5) -def _get_submission_api(url): - auth = (cfg.query('submissions', 'webservice', 'admin_username'), cfg.query('submissions', 'webservice', 'admin_password')) - response = requests.get(url, auth=auth) - response.raise_for_status() - return response.json() - - -@retry(tries=5, backoff=2, jitter=.5) -def _put_submission_api(url): - auth = (cfg.query('submissions', 'webservice', 'admin_username'), cfg.query('submissions', 'webservice', 'admin_password')) - response = requests.put(url, auth=auth) - response.raise_for_status() - return response.json() +from eva_sub_cli_processing.sub_cli_brokering import SubCliBrokering +from eva_sub_cli_processing.sub_cli_ingestion import SubCliIngestion +from eva_sub_cli_processing.sub_cli_utils import get_from_sub_ws, sub_ws_url_build, VALIDATION, READY_FOR_PROCESSING, \ + PROCESSING, BROKERING, INGESTION, SUCCESS, FAILURE, put_to_sub_ws +from eva_sub_cli_processing.sub_cli_validation import SubCliValidation def process_submissions(): @@ -53,47 +30,104 @@ def _process_submission(submission): class SubmissionScanner(AppLogger): statuses = [] + step_statuses = [] - def scan(self): + def _scan_per_status(self): submissions = [] for status in self.statuses: - for submission_data in _get_submission_api(_url_build('admin', 'submissions', 'status', status)): - submissions.append(Submission( + for submission_data in get_from_sub_ws(sub_ws_url_build('admin', 'submissions', 'status', status)): + submissions.append(SubmissionStep( submission_id=submission_data.get('submissionId'), - submission_status=submission_data.get('status'), - uploaded_time=submission_data.get('uploadedTime') + status=submission_data.get('status'), + processing_step=VALIDATION, + processing_status=READY_FOR_PROCESSING, + last_update_time=submission_data.get('uploadedTime'), + priority=5 )) return submissions + def _scan_per_step_status(self): + submissions = [] + for step, status in self.step_statuses: + for submission_step_data in get_from_sub_ws(sub_ws_url_build('admin', 'submission-processes', step, status)): + submissions.append(SubmissionStep( + submission_id=submission_step_data.get('submissionId'), + status=PROCESSING, + processing_step=step, + processing_status=status, + last_update_time=submission_step_data.get('lastUpdateTime'), + priority=submission_step_data.get('priority') + )) + return submissions + + def scan(self): + return self._scan_per_status() + self._scan_per_step_status() + def report(self): - header = ['Submission Id', 'Submission status', 'Uploaded time'] + header = ['Submission Id', 'Submission status', 'Processing step', 'Processing status', 'Last updated time', + 'Priority'] scan = self.scan() lines = [] for submission in scan: - lines.append((submission.submission_id, submission.submission_status, str(submission.uploaded_time))) + lines.append((submission.submission_id, submission.submission_status, submission.processing_step, + submission.processing_status, str(submission.last_update_time), str(submission.priority))) pretty_print(header, lines) class NewSubmissionScanner(SubmissionScanner): statuses = ['UPLOADED'] + step_statuses = [] -class Submission(AppLogger): +class SubmissionStep(AppLogger): - def __init__(self, submission_id, submission_status, uploaded_time): + def __init__(self, submission_id, status, processing_step, processing_status, last_update_time, priority): self.submission_id = submission_id - self.submission_status = submission_status - self.uploaded_time = uploaded_time + self.submission_status = status + self.processing_step = processing_step + self.processing_status = processing_status + self.last_update_time = last_update_time + self.priority = priority def start(self): - response = _put_submission_api(_url_build('admin', 'submission', self.submission_id, 'status', 'PROCESSING')) + self._set_next_step() + self._update_submission_ws() self.submit_pipeline() def submit_pipeline(self): - # TODO: Actually submit a job for this submission - pass + assert self.processing_status == READY_FOR_PROCESSING + # TODO: These jobs needs to be submitted as independent processes + if self.processing_step == VALIDATION: + SubCliValidation(self.submission_id).validate() + elif self.processing_step == BROKERING: + SubCliBrokering(self.submission_id).broker() + elif self.processing_step == INGESTION: + SubCliIngestion(self.submission_id).ingest() + + def _set_next_step(self): + if self.submission_status != PROCESSING and not self.processing_step: + self.submission_status = PROCESSING + self.processing_step = VALIDATION + self.processing_status = READY_FOR_PROCESSING + elif self.processing_status == SUCCESS and self.processing_step == VALIDATION: + self.processing_step = BROKERING + self.processing_status = READY_FOR_PROCESSING + elif self.processing_status == SUCCESS and self.processing_step == BROKERING: + self.processing_step = INGESTION + self.processing_status = READY_FOR_PROCESSING + elif self.processing_status == FAILURE: + # TODO: Is there something we need to do before restarting a failed job + self.processing_status = READY_FOR_PROCESSING + + def _update_submission_ws(self): + put_to_sub_ws(sub_ws_url_build('admin', 'submission', self.submission_id, 'status', self.submission_status)) + put_to_sub_ws('admin', 'submission-process', self.submission_id, self.processing_step, self.processing_status) def __repr__(self): return f'Submission(submission_id={self.submission_id}, submission_status={self.submission_status}, ' \ - f'uploaded_time={self.uploaded_time})' + f'processing_step={self.processing_step}, processing_status={self.processing_status}' \ + f'last_update_time={self.last_update_time})' + + + diff --git a/eva_sub_cli_processing/sub_cli_brokering.py b/eva_sub_cli_processing/sub_cli_brokering.py new file mode 100644 index 0000000..2399885 --- /dev/null +++ b/eva_sub_cli_processing/sub_cli_brokering.py @@ -0,0 +1,101 @@ +import csv +import os +import shutil + +from eva_submission.ENA_submission.upload_to_ENA import ENAUploader, ENAUploaderAsync +from eva_submission.biosample_submission.biosamples_submitters import SampleReferenceSubmitter, \ + SampleSubmitter +from eva_sub_cli_processing.sub_cli_submission import SubCli + + +class SubCliBrokering(SubCli): + + def broker(self, brokering_tasks_to_force=None, existing_project=None, async_upload=False, dry_ena_upload=False): + """Run the brokering process""" + self.upload_to_bioSamples() + self.broker_to_ena(force=('ena' in brokering_tasks_to_force), existing_project=existing_project, + async_upload=async_upload, dry_ena_upload=dry_ena_upload) + self.update_biosamples_with_study(force=('update_biosamples' in brokering_tasks_to_force)) + + def prepare_brokering(self, force=False): + valid_analyses = self.eload_cfg.query('validation', 'valid', 'analyses', ret_default=[]) + if not all([ + self.eload_cfg.query('brokering', 'analyses', analysis, 'vcf_files') + for analysis in valid_analyses + ]) or force: + output_dir = self._run_brokering_prep_workflow() + self._collect_brokering_prep_results(output_dir) + shutil.rmtree(output_dir) + else: + self.info('Preparation has already been run, Skip!') + + def broker_to_ena(self, force=False, existing_project=None, async_upload=False, dry_ena_upload=False): + if not self.eload_cfg.query('brokering', 'ena', 'pass') or force: + ena_spreadsheet = os.path.join(self._get_dir('ena'), 'metadata_spreadsheet.xlsx') + # Set the project in the metadata sheet which is then converted to XML + self.update_metadata_spreadsheet(self.eload_cfg['validation']['valid']['metadata_spreadsheet'], + ena_spreadsheet, existing_project) + if async_upload: + ena_uploader = ENAUploaderAsync(self.eload, ena_spreadsheet, self._get_dir('ena')) + else: + ena_uploader = ENAUploader(self.eload, ena_spreadsheet, self._get_dir('ena')) + + if ena_uploader.converter.is_existing_project: + # Set the project in the config, based on the spreadsheet + self.eload_cfg.set('brokering', 'ena', 'PROJECT', value=ena_uploader.converter.existing_project) + self.eload_cfg.set('brokering', 'ena', 'existing_project', value=True) + + # Upload the VCF to ENA FTP + files_to_upload = [] + analyses = self.eload_cfg['brokering']['analyses'] + for analysis in analyses: + for vcf_file_name in analyses[analysis]['vcf_files']: + vcf_file_info = self.eload_cfg['brokering']['analyses'][analysis]['vcf_files'][vcf_file_name] + files_to_upload.append(vcf_file_info['output_vcf_file']) + files_to_upload.append(vcf_file_info['csi']) + if dry_ena_upload: + self.info(f'Would have uploaded the following files to FTP: \n' + "\n".join(files_to_upload)) + else: + ena_uploader.upload_vcf_files_to_ena_ftp(files_to_upload) + # Upload XML to ENA + ena_uploader.upload_xml_files_to_ena(dry_ena_upload) + if not dry_ena_upload: + # Update the project accession in case we're working with existing project + # We should not be uploading additional analysis in th same ELOAD so no need to update + pre_existing_project = self.eload_cfg.query('brokering', 'ena', 'PROJECT') + if pre_existing_project and 'PROJECT' not in ena_uploader.results: + ena_uploader.results['PROJECT'] = pre_existing_project + self.eload_cfg.set('brokering', 'ena', value=ena_uploader.results) + self.eload_cfg.set('brokering', 'ena', 'date', value=self.now) + self.eload_cfg.set('brokering', 'ena', 'hold_date', value=ena_uploader.converter.hold_date) + self.eload_cfg.set('brokering', 'ena', 'pass', value=not bool(ena_uploader.results['errors'])) + else: + self.info('Brokering to ENA has already been run, Skip!') + + def upload_to_bioSamples(self, force=False): + sample_submitter = SampleSubmitter(('create',)) + sample_submitter.sample_data = self.submission.submission_detail.get('samples') + sample_name_to_accession = sample_submitter.submit_to_bioSamples() + # Check whether all samples have been accessioned + passed = ( + bool(sample_name_to_accession) + and all(sample_name in sample_name_to_accession for sample_name in sample_submitter.all_sample_names()) + ) + if not passed: + raise ValueError(f'Not all samples were successfully brokered to BioSamples! ' + f'Found {len(sample_name_to_accession)} and expected ' + f'{len(sample_metadata_submitter.all_sample_names())}. ' + f'Missing samples are ' + f'{[sample_name for sample_name in sample_metadata_submitter.all_sample_names() if sample_name not in sample_name_to_accession]}') + + def update_biosamples_with_study(self, force=False): + if not self.eload_cfg.query('brokering', 'Biosamples', 'backlinks') or force: + biosample_accession_list = self.eload_cfg.query('brokering', 'Biosamples', 'Samples').values() + project_accession = self.eload_cfg.query('brokering', 'ena', 'PROJECT') + if project_accession: + self.info(f'Add external reference to {len(biosample_accession_list)} BioSamples.') + sample_reference_submitter = SampleReferenceSubmitter(biosample_accession_list, project_accession) + sample_reference_submitter.submit_to_bioSamples() + self.eload_cfg.set('brokering', 'Biosamples', 'backlinks', value=project_accession) + else: + self.info('Adding external reference to BioSamples has already been done, Skip!') diff --git a/eva_sub_cli_processing/sub_cli_ingestion.py b/eva_sub_cli_processing/sub_cli_ingestion.py new file mode 100755 index 0000000..0222bde --- /dev/null +++ b/eva_sub_cli_processing/sub_cli_ingestion.py @@ -0,0 +1,8 @@ +from eva_sub_cli_processing.sub_cli_submission import SubCli + + +class SubCliIngestion(SubCli): + + def ingest(self): + pass + diff --git a/eva_sub_cli_processing/sub_cli_submission.py b/eva_sub_cli_processing/sub_cli_submission.py new file mode 100755 index 0000000..7e2f8b1 --- /dev/null +++ b/eva_sub_cli_processing/sub_cli_submission.py @@ -0,0 +1,49 @@ +import os +import random +import string +from datetime import datetime + +from cached_property import cached_property +from ebi_eva_common_pyutils.config import cfg +from ebi_eva_common_pyutils.logger import AppLogger +from ebi_eva_common_pyutils.logger import logging_config as log_cfg +from ebi_eva_internal_pyutils.metadata_utils import get_metadata_connection_handle + + +submission_logging_files = set() + + +class SubCli(AppLogger): + def __init__(self, submission_id: str): + self.submission_id = submission_id + self.submission_dir = os.path.abspath(os.path.join(cfg['submission_dir'], self.submission_id)) + os.makedirs(self.submission_dir, exist_ok=True) + self.create_log_file() + + @cached_property + def submission_detail(self): + return _get_submission_api(_url_build('admin', 'submission', self.submission_id)) + + @property + def metadata_connection_handle(self): + return get_metadata_connection_handle(cfg['maven']['environment'], cfg['maven']['settings_file']) + + def create_nextflow_temp_output_directory(self, base=None): + random_string = ''.join(random.choice(string.ascii_letters) for i in range(6)) + if base is None: + output_dir = os.path.join(self.submission_dir, 'nextflow_output_' + random_string) + else: + output_dir = os.path.join(base, 'nextflow_output_' + random_string) + os.makedirs(output_dir) + return output_dir + + @cached_property + def now(self): + return datetime.now() + + def create_log_file(self): + logfile_name = os.path.join(self.submission_dir, "submission.log") + if logfile_name not in submission_logging_files: + log_cfg.add_file_handler(logfile_name) + submission_logging_files.add(logfile_name) + diff --git a/eva_sub_cli_processing/sub_cli_utils.py b/eva_sub_cli_processing/sub_cli_utils.py new file mode 100755 index 0000000..e64ce8d --- /dev/null +++ b/eva_sub_cli_processing/sub_cli_utils.py @@ -0,0 +1,55 @@ +import requests +from ebi_eva_common_pyutils.config import cfg +from retry import retry + +# Submission statuses +OPEN = 'OPEN' +UPLOADED = 'UPLOADED' +COMPLETED = 'COMPLETED' +TIMEOUT = 'TIMEOUT' +FAILED = 'FAILED' +CANCELLED = 'CANCELLED' +PROCESSING = 'PROCESSING' + +# Processing steps +VALIDATION = 'VALIDATION' +BROKERING = 'BROKERING' +INGESTION = 'INGESTION' +PROCESSING_STEPS = [VALIDATION, BROKERING, INGESTION] + +# Processing statuses +READY_FOR_PROCESSING = 'READY_FOR_PROCESSING' +FAILURE = 'FAILURE' +SUCCESS = 'SUCCESS' +RUNNING = 'RUNNING' +ON_HOLD = 'ON_HOLD' +PROCESSING_STATUS = [READY_FOR_PROCESSING, FAILURE, SUCCESS, RUNNING, ON_HOLD] + + +def sub_ws_auth(): + return ( + cfg.query('submissions', 'webservice', 'admin_username'), + cfg.query('submissions', 'webservice', 'admin_password') + ) + + +def sub_ws_url_build(*args, **kwargs): + url = cfg.query('submissions', 'webservice', 'url') + '/' + '/'.join(args) + if kwargs: + return url + '?' + '&'.join(f'{k}={v}' for k, v in kwargs.items()) + else: + return url + + +@retry(tries=5, backoff=2, jitter=.5) +def get_from_sub_ws(url): + response = requests.get(url, auth=sub_ws_auth()) + response.raise_for_status() + return response.json() + + +@retry(tries=5, backoff=2, jitter=.5) +def put_to_sub_ws(url): + response = requests.put(url, auth=sub_ws_auth()) + response.raise_for_status() + return response.json() diff --git a/eva_sub_cli_processing/sub_cli_validation.py b/eva_sub_cli_processing/sub_cli_validation.py new file mode 100755 index 0000000..dd3270a --- /dev/null +++ b/eva_sub_cli_processing/sub_cli_validation.py @@ -0,0 +1,11 @@ +from eva_sub_cli_processing.sub_cli_submission import SubCli + + +class SubCliValidation(SubCli): + + all_validation_tasks = ['metadata_check', 'assembly_check', 'aggregation_check', 'vcf_check', 'sample_check', + 'structural_variant_check', 'naming_convention_check'] + + def validate(self): + pass + diff --git a/eva_submission/biosample_submission/biosamples_submitters.py b/eva_submission/biosample_submission/biosamples_submitters.py index 77732e4..1c5ef71 100644 --- a/eva_submission/biosample_submission/biosamples_submitters.py +++ b/eva_submission/biosample_submission/biosamples_submitters.py @@ -331,16 +331,14 @@ def apply_mapping(bsd_data, map_key, value): elif map_key: bsd_data[map_key] = value - def _group_across_fields(self, grouped_data, header, values_to_group): - """Populate the grouped_data with the values. The grouped_data variable will be changed by this function""" - groupname = self.map_project_key(header.split()[0].lower()) - if groupname not in grouped_data: - grouped_data[groupname] = [] - for i, value in enumerate(values_to_group.split('\t')): - grouped_data[groupname].append({self.map_project_key(header): value}) - else: - for i, value in enumerate(values_to_group.split('\t')): - grouped_data[groupname][i][self.map_project_key(header)] = value + def check_submit_done(self): + return all((s.get("accession") for s in self.sample_data)) + + def already_submitted_sample_names_to_accessions(self): + raise NotImplementedError() + + def all_sample_names(self): + raise NotImplementedError() def submit_to_bioSamples(self): # Check that the data exists @@ -353,6 +351,73 @@ def submit_to_bioSamples(self): return self.submitter.sample_name_to_accession +class SampleJSONSubmitter(SampleSubmitter): + submitter_mapping = { + 'email': 'E-mail', + 'firstName': 'FirstName', + 'lastName': 'LastName' + } + + organisation_mapping = { + 'laboratory': 'Name', + 'address': 'Address', + } + + def __init__(self, metadata_json, submit_type=('create',)): + super().__init__(submit_type=submit_type) + self.metadata_json = metadata_json + self.sample_data = self._convert_json_to_bsd_json() + + @staticmethod + def serialize(value): + """Create a text representation of the value provided""" + if isinstance(value, date): + return value.strftime('%Y-%m-%d') + return str(value) + + def _convert_json_to_bsd_json(self): + payloads = [] + for sample in self.metadata_json.get('sample'): + bsd_sample_entry = {'characteristics': {}} + + if 'sampleAccession' in sample: + bsd_sample_entry['accession'] = sample['sampleAccession'] + if 'bioSampleObject' in sample: + bsd_sample_entry.update(sample['bioSampleObject']) + if 'submitterDetails' in self.metadata_json: + # add the submitter information to each BioSample + contacts = [] + organisations = [] + for submitter in self.metadata_json.get('submitterDetails'): + contact = {} + organisation = {} + for key in submitter: + self.apply_mapping(contact, self.submitter_mapping.get(key), submitter[key]) + self.apply_mapping(organisation, self.organisation_mapping.get(key), submitter[key]) + if contact: + contacts.append(contact) + if organisation: + organisations.append(organisation) + self.apply_mapping(bsd_sample_entry, 'contact', contacts) + self.apply_mapping(bsd_sample_entry, 'organization', organisations) + bsd_sample_entry['release'] = _now + # Custom attributes added to all the BioSample we create/modify + bsd_sample_entry['characteristics']['last_updated_by'] = [{'text': 'EVA'}] + payloads.append(bsd_sample_entry) + + return payloads + + def already_submitted_sample_names_to_accessions(self): + if self.check_submit_done(): + return dict([ + (sample_row.get('Sample ID'), sample_row.get('Sample Accession')) for sample_row in self.reader.samples + ]) + + def all_sample_names(self): + # We need to get back to the reader to get all the names that were present in the spreadsheet + return [sample_row.get('Sample Name') or sample_row.get('Sample ID') for sample_row in self.reader.samples] + + class SampleMetadataSubmitter(SampleSubmitter): sample_mapping = { diff --git a/tests/test_biosamples_submission.py b/tests/test_biosamples_submission.py index 206acc0..8a21376 100644 --- a/tests/test_biosamples_submission.py +++ b/tests/test_biosamples_submission.py @@ -12,7 +12,7 @@ from eva_submission import ROOT_DIR from eva_submission.biosample_submission import biosamples_submitters from eva_submission.biosample_submission.biosamples_submitters import BioSamplesSubmitter, SampleMetadataSubmitter, \ - SampleReferenceSubmitter + SampleReferenceSubmitter, SampleJSONSubmitter class BSDTestCase(TestCase): @@ -363,3 +363,30 @@ def test_not_override_samples(self): sample_submitter.submit_to_bioSamples() m_follows_link.assert_not_called() +class TestSampleJSONSubmitter(BSDTestCase): + + def test_convert_json_tobsd_json(self): + bio_sample_object1 = { + 'characteristics': { + 'description': [{'text': 'yellow croaker sample 12'}], + 'geographic location (country and/or sea)': [{'text': 'China'}], + 'scientific name': [{'text': 'Larimichthys polyactis'}], + 'geographic location (region and locality)': [{'text': 'East China Sea,Liuheng, Putuo, Zhejiang'}], + 'submission title': [{ + 'text': 'Characterization of a large dataset of SNPs in Larimichthys polyactis using high throughput 2b-RAD sequencing'}], + 'database name': [{'text': 'PRJNA592281'}], + }, + 'name': 'LH1', + 'taxId': '334908' + } + json_data = {'sample': [ + {'analysisAlias': 'alias1', 'sampleInVCF': 'S1', 'bioSampleAccession': 'SAME000001'}, + {'analysisAlias': 'alias1', 'sampleInVCF': 'S1', 'bioSampleObject': bio_sample_object1} + ], 'submitterDetails': [ + {'lastName': 'Doe', 'firstName': 'Jane', 'email': 'jane.doe@example.com', 'laboratory': 'Lab', + 'address': '5 common road' } + ] + + } + self.submitter = SampleJSONSubmitter(json_data) + print(self.submitter._convert_json_to_bsd_json()) diff --git a/tests/test_sub_cli_processing/test_process_jobs.py b/tests/test_sub_cli_processing/test_process_jobs.py index 832f545..09575bd 100644 --- a/tests/test_sub_cli_processing/test_process_jobs.py +++ b/tests/test_sub_cli_processing/test_process_jobs.py @@ -34,10 +34,9 @@ def test_report(self): json_data = [ {'submissionId': 'sub123', 'status': 'UPLOADED', 'uploadedTime': '2024-05-12'} ] - with patch_get(json_data) as m_get, patch('builtins.print') as m_print: scanner.report() m_get.assert_called_once_with('https://test.com/admin/submissions/status/UPLOADED', auth=('admin', 'password')) - m_print.assert_any_call('| Submission Id | Submission status | Uploaded time |') - m_print.assert_any_call('| sub123 | UPLOADED | 2024-05-12 |') + m_print.assert_any_call('| Submission Id | Submission status | Processing step | Processing status | Last updated time | Priority |') + m_print.assert_any_call('| sub123 | UPLOADED | VALIDATION | READY_FOR_PROCESSING | 2024-05-12 | 5 |') From b61560dda6290767829f4ccf387186f4102d1acd Mon Sep 17 00:00:00 2001 From: tcezard Date: Tue, 17 Dec 2024 14:53:20 +0000 Subject: [PATCH 2/2] processing via a scanner and new brokering method --- .gitignore | 5 +- eva_sub_cli_processing/process_jobs.py | 13 +-- eva_sub_cli_processing/sub_cli_brokering.py | 87 ++----------------- eva_sub_cli_processing/sub_cli_ingestion.py | 6 +- eva_sub_cli_processing/sub_cli_submission.py | 12 +-- eva_sub_cli_processing/sub_cli_validation.py | 6 +- .../biosamples_submitters.py | 9 +- eva_submission/vep_utils.py | 12 ++- tests/test_biosamples_submission.py | 12 +-- 9 files changed, 47 insertions(+), 115 deletions(-) diff --git a/.gitignore b/.gitignore index b59823a..e5a5e09 100644 --- a/.gitignore +++ b/.gitignore @@ -12,4 +12,7 @@ work *.class *.jar *.log -*.err \ No newline at end of file +*.err + +# Might contain real credentials +tests/resources/bsd_webin_submission.yaml \ No newline at end of file diff --git a/eva_sub_cli_processing/process_jobs.py b/eva_sub_cli_processing/process_jobs.py index 02091cb..da5d3d9 100644 --- a/eva_sub_cli_processing/process_jobs.py +++ b/eva_sub_cli_processing/process_jobs.py @@ -1,11 +1,11 @@ from ebi_eva_common_pyutils.common_utils import pretty_print from ebi_eva_common_pyutils.logger import AppLogger -from eva_sub_cli_processing.sub_cli_brokering import SubCliBrokering -from eva_sub_cli_processing.sub_cli_ingestion import SubCliIngestion +from eva_sub_cli_processing.sub_cli_brokering import SubCliProcessBrokering +from eva_sub_cli_processing.sub_cli_ingestion import SubCliProcessIngestion from eva_sub_cli_processing.sub_cli_utils import get_from_sub_ws, sub_ws_url_build, VALIDATION, READY_FOR_PROCESSING, \ PROCESSING, BROKERING, INGESTION, SUCCESS, FAILURE, put_to_sub_ws -from eva_sub_cli_processing.sub_cli_validation import SubCliValidation +from eva_sub_cli_processing.sub_cli_validation import SubCliProcessValidation def process_submissions(): @@ -99,11 +99,12 @@ def submit_pipeline(self): assert self.processing_status == READY_FOR_PROCESSING # TODO: These jobs needs to be submitted as independent processes if self.processing_step == VALIDATION: - SubCliValidation(self.submission_id).validate() + process = SubCliProcessValidation(self.submission_id) elif self.processing_step == BROKERING: - SubCliBrokering(self.submission_id).broker() + process = SubCliProcessBrokering(self.submission_id) elif self.processing_step == INGESTION: - SubCliIngestion(self.submission_id).ingest() + process = SubCliProcessIngestion(self.submission_id) + process.start() def _set_next_step(self): if self.submission_status != PROCESSING and not self.processing_step: diff --git a/eva_sub_cli_processing/sub_cli_brokering.py b/eva_sub_cli_processing/sub_cli_brokering.py index 2399885..f830199 100644 --- a/eva_sub_cli_processing/sub_cli_brokering.py +++ b/eva_sub_cli_processing/sub_cli_brokering.py @@ -4,77 +4,18 @@ from eva_submission.ENA_submission.upload_to_ENA import ENAUploader, ENAUploaderAsync from eva_submission.biosample_submission.biosamples_submitters import SampleReferenceSubmitter, \ - SampleSubmitter -from eva_sub_cli_processing.sub_cli_submission import SubCli + SampleSubmitter, SampleJSONSubmitter +from eva_sub_cli_processing.sub_cli_submission import SubCliProcess -class SubCliBrokering(SubCli): +class SubCliProcessBrokering(SubCliProcess): - def broker(self, brokering_tasks_to_force=None, existing_project=None, async_upload=False, dry_ena_upload=False): + def start(self): """Run the brokering process""" self.upload_to_bioSamples() - self.broker_to_ena(force=('ena' in brokering_tasks_to_force), existing_project=existing_project, - async_upload=async_upload, dry_ena_upload=dry_ena_upload) - self.update_biosamples_with_study(force=('update_biosamples' in brokering_tasks_to_force)) - def prepare_brokering(self, force=False): - valid_analyses = self.eload_cfg.query('validation', 'valid', 'analyses', ret_default=[]) - if not all([ - self.eload_cfg.query('brokering', 'analyses', analysis, 'vcf_files') - for analysis in valid_analyses - ]) or force: - output_dir = self._run_brokering_prep_workflow() - self._collect_brokering_prep_results(output_dir) - shutil.rmtree(output_dir) - else: - self.info('Preparation has already been run, Skip!') - - def broker_to_ena(self, force=False, existing_project=None, async_upload=False, dry_ena_upload=False): - if not self.eload_cfg.query('brokering', 'ena', 'pass') or force: - ena_spreadsheet = os.path.join(self._get_dir('ena'), 'metadata_spreadsheet.xlsx') - # Set the project in the metadata sheet which is then converted to XML - self.update_metadata_spreadsheet(self.eload_cfg['validation']['valid']['metadata_spreadsheet'], - ena_spreadsheet, existing_project) - if async_upload: - ena_uploader = ENAUploaderAsync(self.eload, ena_spreadsheet, self._get_dir('ena')) - else: - ena_uploader = ENAUploader(self.eload, ena_spreadsheet, self._get_dir('ena')) - - if ena_uploader.converter.is_existing_project: - # Set the project in the config, based on the spreadsheet - self.eload_cfg.set('brokering', 'ena', 'PROJECT', value=ena_uploader.converter.existing_project) - self.eload_cfg.set('brokering', 'ena', 'existing_project', value=True) - - # Upload the VCF to ENA FTP - files_to_upload = [] - analyses = self.eload_cfg['brokering']['analyses'] - for analysis in analyses: - for vcf_file_name in analyses[analysis]['vcf_files']: - vcf_file_info = self.eload_cfg['brokering']['analyses'][analysis]['vcf_files'][vcf_file_name] - files_to_upload.append(vcf_file_info['output_vcf_file']) - files_to_upload.append(vcf_file_info['csi']) - if dry_ena_upload: - self.info(f'Would have uploaded the following files to FTP: \n' + "\n".join(files_to_upload)) - else: - ena_uploader.upload_vcf_files_to_ena_ftp(files_to_upload) - # Upload XML to ENA - ena_uploader.upload_xml_files_to_ena(dry_ena_upload) - if not dry_ena_upload: - # Update the project accession in case we're working with existing project - # We should not be uploading additional analysis in th same ELOAD so no need to update - pre_existing_project = self.eload_cfg.query('brokering', 'ena', 'PROJECT') - if pre_existing_project and 'PROJECT' not in ena_uploader.results: - ena_uploader.results['PROJECT'] = pre_existing_project - self.eload_cfg.set('brokering', 'ena', value=ena_uploader.results) - self.eload_cfg.set('brokering', 'ena', 'date', value=self.now) - self.eload_cfg.set('brokering', 'ena', 'hold_date', value=ena_uploader.converter.hold_date) - self.eload_cfg.set('brokering', 'ena', 'pass', value=not bool(ena_uploader.results['errors'])) - else: - self.info('Brokering to ENA has already been run, Skip!') - - def upload_to_bioSamples(self, force=False): - sample_submitter = SampleSubmitter(('create',)) - sample_submitter.sample_data = self.submission.submission_detail.get('samples') + def upload_to_bioSamples(self, ): + sample_submitter = SampleJSONSubmitter(('create',), metadata_json=self.submission_detail.get('metadataJson')) sample_name_to_accession = sample_submitter.submit_to_bioSamples() # Check whether all samples have been accessioned passed = ( @@ -84,18 +25,6 @@ def upload_to_bioSamples(self, force=False): if not passed: raise ValueError(f'Not all samples were successfully brokered to BioSamples! ' f'Found {len(sample_name_to_accession)} and expected ' - f'{len(sample_metadata_submitter.all_sample_names())}. ' + f'{len(sample_submitter.all_sample_names())}. ' f'Missing samples are ' - f'{[sample_name for sample_name in sample_metadata_submitter.all_sample_names() if sample_name not in sample_name_to_accession]}') - - def update_biosamples_with_study(self, force=False): - if not self.eload_cfg.query('brokering', 'Biosamples', 'backlinks') or force: - biosample_accession_list = self.eload_cfg.query('brokering', 'Biosamples', 'Samples').values() - project_accession = self.eload_cfg.query('brokering', 'ena', 'PROJECT') - if project_accession: - self.info(f'Add external reference to {len(biosample_accession_list)} BioSamples.') - sample_reference_submitter = SampleReferenceSubmitter(biosample_accession_list, project_accession) - sample_reference_submitter.submit_to_bioSamples() - self.eload_cfg.set('brokering', 'Biosamples', 'backlinks', value=project_accession) - else: - self.info('Adding external reference to BioSamples has already been done, Skip!') + f'{[sample_name for sample_name in sample_submitter.all_sample_names() if sample_name not in sample_name_to_accession]}') diff --git a/eva_sub_cli_processing/sub_cli_ingestion.py b/eva_sub_cli_processing/sub_cli_ingestion.py index 0222bde..15dd1a0 100755 --- a/eva_sub_cli_processing/sub_cli_ingestion.py +++ b/eva_sub_cli_processing/sub_cli_ingestion.py @@ -1,8 +1,8 @@ -from eva_sub_cli_processing.sub_cli_submission import SubCli +from eva_sub_cli_processing.sub_cli_submission import SubCliProcess -class SubCliIngestion(SubCli): +class SubCliProcessIngestion(SubCliProcess): - def ingest(self): + def start(self): pass diff --git a/eva_sub_cli_processing/sub_cli_submission.py b/eva_sub_cli_processing/sub_cli_submission.py index 7e2f8b1..11fc389 100755 --- a/eva_sub_cli_processing/sub_cli_submission.py +++ b/eva_sub_cli_processing/sub_cli_submission.py @@ -9,11 +9,12 @@ from ebi_eva_common_pyutils.logger import logging_config as log_cfg from ebi_eva_internal_pyutils.metadata_utils import get_metadata_connection_handle +from eva_sub_cli_processing.sub_cli_utils import sub_ws_url_build, get_from_sub_ws submission_logging_files = set() -class SubCli(AppLogger): +class SubCliProcess(AppLogger): def __init__(self, submission_id: str): self.submission_id = submission_id self.submission_dir = os.path.abspath(os.path.join(cfg['submission_dir'], self.submission_id)) @@ -22,11 +23,7 @@ def __init__(self, submission_id: str): @cached_property def submission_detail(self): - return _get_submission_api(_url_build('admin', 'submission', self.submission_id)) - - @property - def metadata_connection_handle(self): - return get_metadata_connection_handle(cfg['maven']['environment'], cfg['maven']['settings_file']) + return get_from_sub_ws(sub_ws_url_build('admin', 'submission', self.submission_id)) def create_nextflow_temp_output_directory(self, base=None): random_string = ''.join(random.choice(string.ascii_letters) for i in range(6)) @@ -47,3 +44,6 @@ def create_log_file(self): log_cfg.add_file_handler(logfile_name) submission_logging_files.add(logfile_name) + def start(self): + raise NotImplementedError + diff --git a/eva_sub_cli_processing/sub_cli_validation.py b/eva_sub_cli_processing/sub_cli_validation.py index dd3270a..93702d0 100755 --- a/eva_sub_cli_processing/sub_cli_validation.py +++ b/eva_sub_cli_processing/sub_cli_validation.py @@ -1,11 +1,11 @@ -from eva_sub_cli_processing.sub_cli_submission import SubCli +from eva_sub_cli_processing.sub_cli_submission import SubCliProcess -class SubCliValidation(SubCli): +class SubCliProcessValidation(SubCliProcess): all_validation_tasks = ['metadata_check', 'assembly_check', 'aggregation_check', 'vcf_check', 'sample_check', 'structural_variant_check', 'naming_convention_check'] - def validate(self): + def start(self): pass diff --git a/eva_submission/biosample_submission/biosamples_submitters.py b/eva_submission/biosample_submission/biosamples_submitters.py index 1c5ef71..ad81ba1 100644 --- a/eva_submission/biosample_submission/biosamples_submitters.py +++ b/eva_submission/biosample_submission/biosamples_submitters.py @@ -368,13 +368,6 @@ def __init__(self, metadata_json, submit_type=('create',)): self.metadata_json = metadata_json self.sample_data = self._convert_json_to_bsd_json() - @staticmethod - def serialize(value): - """Create a text representation of the value provided""" - if isinstance(value, date): - return value.strftime('%Y-%m-%d') - return str(value) - def _convert_json_to_bsd_json(self): payloads = [] for sample in self.metadata_json.get('sample'): @@ -410,7 +403,7 @@ def _convert_json_to_bsd_json(self): def already_submitted_sample_names_to_accessions(self): if self.check_submit_done(): return dict([ - (sample_row.get('Sample ID'), sample_row.get('Sample Accession')) for sample_row in self.reader.samples + (sample_json.get('Sample ID'), sample_json.get('sampleAccession')) for sample_json in self.metadata_json.get('sample') ]) def all_sample_names(self): diff --git a/eva_submission/vep_utils.py b/eva_submission/vep_utils.py index ca482c3..1d67354 100644 --- a/eva_submission/vep_utils.py +++ b/eva_submission/vep_utils.py @@ -198,9 +198,13 @@ def get_species_and_assembly(assembly_acc): @retry(tries=4, delay=2, backoff=1.2, jitter=(1, 3), logger=logger) def get_ftp_connection(url): - ftp = ftplib.FTP(url) - ftp.login() - return ftp + try: + ftp = ftplib.FTP(url) + ftp.login() + return ftp + except Exception as e: + logger.error(f'There was an issue accessing {url}') + raise e @retry(tries=8, delay=2, backoff=1.2, jitter=(1, 3), logger=logger) @@ -306,7 +310,7 @@ def download_and_install_vep_version(vep_version): for chunk in response.iter_content(chunk_size=8192): file.write(chunk) else: - raise (f'Error downloading Vep installation files for vep version {vep_version}') + raise Exception(f'Error downloading Vep installation files for vep version {vep_version}') # Unzip the Vep version with zipfile.ZipFile(destination, 'r') as zip_ref: diff --git a/tests/test_biosamples_submission.py b/tests/test_biosamples_submission.py index 8a21376..271dea1 100644 --- a/tests/test_biosamples_submission.py +++ b/tests/test_biosamples_submission.py @@ -56,22 +56,24 @@ def tearDown(self): 'release': '2020-07-06T19:09:29.090Z'} ] - +# BioSamples does not support AAP login so we have to use credentials from Webin. +@pytest.mark.skip(reason='You need to set a config file with correct Webin credential to run these test') class TestBSDSubmitter(BSDTestCase): """ Integration tests that will contact a test server for BSD. """ def setUp(self) -> None: - file_name = os.path.join(self.resources_folder, 'bsd_submission.yaml') + file_name = os.path.join(self.resources_folder, 'bsd_webin_submission.yaml') self.config = None if os.path.isfile(file_name): with open(file_name) as open_file: self.config = yaml.safe_load(open_file) self.sample_data = deepcopy(sample_data) - self.communicator = AAPHALCommunicator(self.config.get('aap_url'), self.config.get('bsd_url'), - self.config.get('username'), self.config.get('password'), - self.config.get('domain')) + + self.communicator = WebinHALCommunicator(self.config.get('webin_url'), self.config.get('bsd_url'), + self.config.get('webin_username'), + self.config.get('webin_password')) self.submitter = BioSamplesSubmitter([self.communicator]) def test_validate_in_bsd(self):