diff --git a/.gitignore b/.gitignore index b59823a..e5a5e09 100644 --- a/.gitignore +++ b/.gitignore @@ -12,4 +12,7 @@ work *.class *.jar *.log -*.err \ No newline at end of file +*.err + +# Might contain real credentials +tests/resources/bsd_webin_submission.yaml \ No newline at end of file diff --git a/eva_sub_cli_processing/process_jobs.py b/eva_sub_cli_processing/process_jobs.py index d89d744..da5d3d9 100644 --- a/eva_sub_cli_processing/process_jobs.py +++ b/eva_sub_cli_processing/process_jobs.py @@ -1,34 +1,11 @@ -from datetime import datetime - -import requests from ebi_eva_common_pyutils.common_utils import pretty_print -from ebi_eva_common_pyutils.config import cfg from ebi_eva_common_pyutils.logger import AppLogger -from retry import retry - - -def _url_build(*args, **kwargs): - url = cfg.query('submissions', 'webservice', 'url') + '/' + '/'.join(args) - if kwargs: - return url + '?' + '&'.join(f'{k}={v}' for k, v in kwargs.items()) - else: - return url - -@retry(tries=5, backoff=2, jitter=.5) -def _get_submission_api(url): - auth = (cfg.query('submissions', 'webservice', 'admin_username'), cfg.query('submissions', 'webservice', 'admin_password')) - response = requests.get(url, auth=auth) - response.raise_for_status() - return response.json() - - -@retry(tries=5, backoff=2, jitter=.5) -def _put_submission_api(url): - auth = (cfg.query('submissions', 'webservice', 'admin_username'), cfg.query('submissions', 'webservice', 'admin_password')) - response = requests.put(url, auth=auth) - response.raise_for_status() - return response.json() +from eva_sub_cli_processing.sub_cli_brokering import SubCliProcessBrokering +from eva_sub_cli_processing.sub_cli_ingestion import SubCliProcessIngestion +from eva_sub_cli_processing.sub_cli_utils import get_from_sub_ws, sub_ws_url_build, VALIDATION, READY_FOR_PROCESSING, \ + PROCESSING, BROKERING, INGESTION, SUCCESS, FAILURE, put_to_sub_ws +from eva_sub_cli_processing.sub_cli_validation import SubCliProcessValidation def process_submissions(): @@ -53,47 +30,105 @@ def _process_submission(submission): class SubmissionScanner(AppLogger): statuses = [] + step_statuses = [] - def scan(self): + def _scan_per_status(self): submissions = [] for status in self.statuses: - for submission_data in _get_submission_api(_url_build('admin', 'submissions', 'status', status)): - submissions.append(Submission( + for submission_data in get_from_sub_ws(sub_ws_url_build('admin', 'submissions', 'status', status)): + submissions.append(SubmissionStep( submission_id=submission_data.get('submissionId'), - submission_status=submission_data.get('status'), - uploaded_time=submission_data.get('uploadedTime') + status=submission_data.get('status'), + processing_step=VALIDATION, + processing_status=READY_FOR_PROCESSING, + last_update_time=submission_data.get('uploadedTime'), + priority=5 )) return submissions + def _scan_per_step_status(self): + submissions = [] + for step, status in self.step_statuses: + for submission_step_data in get_from_sub_ws(sub_ws_url_build('admin', 'submission-processes', step, status)): + submissions.append(SubmissionStep( + submission_id=submission_step_data.get('submissionId'), + status=PROCESSING, + processing_step=step, + processing_status=status, + last_update_time=submission_step_data.get('lastUpdateTime'), + priority=submission_step_data.get('priority') + )) + return submissions + + def scan(self): + return self._scan_per_status() + self._scan_per_step_status() + def report(self): - header = ['Submission Id', 'Submission status', 'Uploaded time'] + header = ['Submission Id', 'Submission status', 'Processing step', 'Processing status', 'Last updated time', + 'Priority'] scan = self.scan() lines = [] for submission in scan: - lines.append((submission.submission_id, submission.submission_status, str(submission.uploaded_time))) + lines.append((submission.submission_id, submission.submission_status, submission.processing_step, + submission.processing_status, str(submission.last_update_time), str(submission.priority))) pretty_print(header, lines) class NewSubmissionScanner(SubmissionScanner): statuses = ['UPLOADED'] + step_statuses = [] -class Submission(AppLogger): +class SubmissionStep(AppLogger): - def __init__(self, submission_id, submission_status, uploaded_time): + def __init__(self, submission_id, status, processing_step, processing_status, last_update_time, priority): self.submission_id = submission_id - self.submission_status = submission_status - self.uploaded_time = uploaded_time + self.submission_status = status + self.processing_step = processing_step + self.processing_status = processing_status + self.last_update_time = last_update_time + self.priority = priority def start(self): - response = _put_submission_api(_url_build('admin', 'submission', self.submission_id, 'status', 'PROCESSING')) + self._set_next_step() + self._update_submission_ws() self.submit_pipeline() def submit_pipeline(self): - # TODO: Actually submit a job for this submission - pass + assert self.processing_status == READY_FOR_PROCESSING + # TODO: These jobs needs to be submitted as independent processes + if self.processing_step == VALIDATION: + process = SubCliProcessValidation(self.submission_id) + elif self.processing_step == BROKERING: + process = SubCliProcessBrokering(self.submission_id) + elif self.processing_step == INGESTION: + process = SubCliProcessIngestion(self.submission_id) + process.start() + + def _set_next_step(self): + if self.submission_status != PROCESSING and not self.processing_step: + self.submission_status = PROCESSING + self.processing_step = VALIDATION + self.processing_status = READY_FOR_PROCESSING + elif self.processing_status == SUCCESS and self.processing_step == VALIDATION: + self.processing_step = BROKERING + self.processing_status = READY_FOR_PROCESSING + elif self.processing_status == SUCCESS and self.processing_step == BROKERING: + self.processing_step = INGESTION + self.processing_status = READY_FOR_PROCESSING + elif self.processing_status == FAILURE: + # TODO: Is there something we need to do before restarting a failed job + self.processing_status = READY_FOR_PROCESSING + + def _update_submission_ws(self): + put_to_sub_ws(sub_ws_url_build('admin', 'submission', self.submission_id, 'status', self.submission_status)) + put_to_sub_ws('admin', 'submission-process', self.submission_id, self.processing_step, self.processing_status) def __repr__(self): return f'Submission(submission_id={self.submission_id}, submission_status={self.submission_status}, ' \ - f'uploaded_time={self.uploaded_time})' + f'processing_step={self.processing_step}, processing_status={self.processing_status}' \ + f'last_update_time={self.last_update_time})' + + + diff --git a/eva_sub_cli_processing/sub_cli_brokering.py b/eva_sub_cli_processing/sub_cli_brokering.py new file mode 100644 index 0000000..f830199 --- /dev/null +++ b/eva_sub_cli_processing/sub_cli_brokering.py @@ -0,0 +1,30 @@ +import csv +import os +import shutil + +from eva_submission.ENA_submission.upload_to_ENA import ENAUploader, ENAUploaderAsync +from eva_submission.biosample_submission.biosamples_submitters import SampleReferenceSubmitter, \ + SampleSubmitter, SampleJSONSubmitter +from eva_sub_cli_processing.sub_cli_submission import SubCliProcess + + +class SubCliProcessBrokering(SubCliProcess): + + def start(self): + """Run the brokering process""" + self.upload_to_bioSamples() + + def upload_to_bioSamples(self, ): + sample_submitter = SampleJSONSubmitter(('create',), metadata_json=self.submission_detail.get('metadataJson')) + sample_name_to_accession = sample_submitter.submit_to_bioSamples() + # Check whether all samples have been accessioned + passed = ( + bool(sample_name_to_accession) + and all(sample_name in sample_name_to_accession for sample_name in sample_submitter.all_sample_names()) + ) + if not passed: + raise ValueError(f'Not all samples were successfully brokered to BioSamples! ' + f'Found {len(sample_name_to_accession)} and expected ' + f'{len(sample_submitter.all_sample_names())}. ' + f'Missing samples are ' + f'{[sample_name for sample_name in sample_submitter.all_sample_names() if sample_name not in sample_name_to_accession]}') diff --git a/eva_sub_cli_processing/sub_cli_ingestion.py b/eva_sub_cli_processing/sub_cli_ingestion.py new file mode 100755 index 0000000..15dd1a0 --- /dev/null +++ b/eva_sub_cli_processing/sub_cli_ingestion.py @@ -0,0 +1,8 @@ +from eva_sub_cli_processing.sub_cli_submission import SubCliProcess + + +class SubCliProcessIngestion(SubCliProcess): + + def start(self): + pass + diff --git a/eva_sub_cli_processing/sub_cli_submission.py b/eva_sub_cli_processing/sub_cli_submission.py new file mode 100755 index 0000000..11fc389 --- /dev/null +++ b/eva_sub_cli_processing/sub_cli_submission.py @@ -0,0 +1,49 @@ +import os +import random +import string +from datetime import datetime + +from cached_property import cached_property +from ebi_eva_common_pyutils.config import cfg +from ebi_eva_common_pyutils.logger import AppLogger +from ebi_eva_common_pyutils.logger import logging_config as log_cfg +from ebi_eva_internal_pyutils.metadata_utils import get_metadata_connection_handle + +from eva_sub_cli_processing.sub_cli_utils import sub_ws_url_build, get_from_sub_ws + +submission_logging_files = set() + + +class SubCliProcess(AppLogger): + def __init__(self, submission_id: str): + self.submission_id = submission_id + self.submission_dir = os.path.abspath(os.path.join(cfg['submission_dir'], self.submission_id)) + os.makedirs(self.submission_dir, exist_ok=True) + self.create_log_file() + + @cached_property + def submission_detail(self): + return get_from_sub_ws(sub_ws_url_build('admin', 'submission', self.submission_id)) + + def create_nextflow_temp_output_directory(self, base=None): + random_string = ''.join(random.choice(string.ascii_letters) for i in range(6)) + if base is None: + output_dir = os.path.join(self.submission_dir, 'nextflow_output_' + random_string) + else: + output_dir = os.path.join(base, 'nextflow_output_' + random_string) + os.makedirs(output_dir) + return output_dir + + @cached_property + def now(self): + return datetime.now() + + def create_log_file(self): + logfile_name = os.path.join(self.submission_dir, "submission.log") + if logfile_name not in submission_logging_files: + log_cfg.add_file_handler(logfile_name) + submission_logging_files.add(logfile_name) + + def start(self): + raise NotImplementedError + diff --git a/eva_sub_cli_processing/sub_cli_utils.py b/eva_sub_cli_processing/sub_cli_utils.py new file mode 100755 index 0000000..e64ce8d --- /dev/null +++ b/eva_sub_cli_processing/sub_cli_utils.py @@ -0,0 +1,55 @@ +import requests +from ebi_eva_common_pyutils.config import cfg +from retry import retry + +# Submission statuses +OPEN = 'OPEN' +UPLOADED = 'UPLOADED' +COMPLETED = 'COMPLETED' +TIMEOUT = 'TIMEOUT' +FAILED = 'FAILED' +CANCELLED = 'CANCELLED' +PROCESSING = 'PROCESSING' + +# Processing steps +VALIDATION = 'VALIDATION' +BROKERING = 'BROKERING' +INGESTION = 'INGESTION' +PROCESSING_STEPS = [VALIDATION, BROKERING, INGESTION] + +# Processing statuses +READY_FOR_PROCESSING = 'READY_FOR_PROCESSING' +FAILURE = 'FAILURE' +SUCCESS = 'SUCCESS' +RUNNING = 'RUNNING' +ON_HOLD = 'ON_HOLD' +PROCESSING_STATUS = [READY_FOR_PROCESSING, FAILURE, SUCCESS, RUNNING, ON_HOLD] + + +def sub_ws_auth(): + return ( + cfg.query('submissions', 'webservice', 'admin_username'), + cfg.query('submissions', 'webservice', 'admin_password') + ) + + +def sub_ws_url_build(*args, **kwargs): + url = cfg.query('submissions', 'webservice', 'url') + '/' + '/'.join(args) + if kwargs: + return url + '?' + '&'.join(f'{k}={v}' for k, v in kwargs.items()) + else: + return url + + +@retry(tries=5, backoff=2, jitter=.5) +def get_from_sub_ws(url): + response = requests.get(url, auth=sub_ws_auth()) + response.raise_for_status() + return response.json() + + +@retry(tries=5, backoff=2, jitter=.5) +def put_to_sub_ws(url): + response = requests.put(url, auth=sub_ws_auth()) + response.raise_for_status() + return response.json() diff --git a/eva_sub_cli_processing/sub_cli_validation.py b/eva_sub_cli_processing/sub_cli_validation.py new file mode 100755 index 0000000..93702d0 --- /dev/null +++ b/eva_sub_cli_processing/sub_cli_validation.py @@ -0,0 +1,11 @@ +from eva_sub_cli_processing.sub_cli_submission import SubCliProcess + + +class SubCliProcessValidation(SubCliProcess): + + all_validation_tasks = ['metadata_check', 'assembly_check', 'aggregation_check', 'vcf_check', 'sample_check', + 'structural_variant_check', 'naming_convention_check'] + + def start(self): + pass + diff --git a/eva_submission/biosample_submission/biosamples_submitters.py b/eva_submission/biosample_submission/biosamples_submitters.py index 77732e4..ad81ba1 100644 --- a/eva_submission/biosample_submission/biosamples_submitters.py +++ b/eva_submission/biosample_submission/biosamples_submitters.py @@ -331,16 +331,14 @@ def apply_mapping(bsd_data, map_key, value): elif map_key: bsd_data[map_key] = value - def _group_across_fields(self, grouped_data, header, values_to_group): - """Populate the grouped_data with the values. The grouped_data variable will be changed by this function""" - groupname = self.map_project_key(header.split()[0].lower()) - if groupname not in grouped_data: - grouped_data[groupname] = [] - for i, value in enumerate(values_to_group.split('\t')): - grouped_data[groupname].append({self.map_project_key(header): value}) - else: - for i, value in enumerate(values_to_group.split('\t')): - grouped_data[groupname][i][self.map_project_key(header)] = value + def check_submit_done(self): + return all((s.get("accession") for s in self.sample_data)) + + def already_submitted_sample_names_to_accessions(self): + raise NotImplementedError() + + def all_sample_names(self): + raise NotImplementedError() def submit_to_bioSamples(self): # Check that the data exists @@ -353,6 +351,66 @@ def submit_to_bioSamples(self): return self.submitter.sample_name_to_accession +class SampleJSONSubmitter(SampleSubmitter): + submitter_mapping = { + 'email': 'E-mail', + 'firstName': 'FirstName', + 'lastName': 'LastName' + } + + organisation_mapping = { + 'laboratory': 'Name', + 'address': 'Address', + } + + def __init__(self, metadata_json, submit_type=('create',)): + super().__init__(submit_type=submit_type) + self.metadata_json = metadata_json + self.sample_data = self._convert_json_to_bsd_json() + + def _convert_json_to_bsd_json(self): + payloads = [] + for sample in self.metadata_json.get('sample'): + bsd_sample_entry = {'characteristics': {}} + + if 'sampleAccession' in sample: + bsd_sample_entry['accession'] = sample['sampleAccession'] + if 'bioSampleObject' in sample: + bsd_sample_entry.update(sample['bioSampleObject']) + if 'submitterDetails' in self.metadata_json: + # add the submitter information to each BioSample + contacts = [] + organisations = [] + for submitter in self.metadata_json.get('submitterDetails'): + contact = {} + organisation = {} + for key in submitter: + self.apply_mapping(contact, self.submitter_mapping.get(key), submitter[key]) + self.apply_mapping(organisation, self.organisation_mapping.get(key), submitter[key]) + if contact: + contacts.append(contact) + if organisation: + organisations.append(organisation) + self.apply_mapping(bsd_sample_entry, 'contact', contacts) + self.apply_mapping(bsd_sample_entry, 'organization', organisations) + bsd_sample_entry['release'] = _now + # Custom attributes added to all the BioSample we create/modify + bsd_sample_entry['characteristics']['last_updated_by'] = [{'text': 'EVA'}] + payloads.append(bsd_sample_entry) + + return payloads + + def already_submitted_sample_names_to_accessions(self): + if self.check_submit_done(): + return dict([ + (sample_json.get('Sample ID'), sample_json.get('sampleAccession')) for sample_json in self.metadata_json.get('sample') + ]) + + def all_sample_names(self): + # We need to get back to the reader to get all the names that were present in the spreadsheet + return [sample_row.get('Sample Name') or sample_row.get('Sample ID') for sample_row in self.reader.samples] + + class SampleMetadataSubmitter(SampleSubmitter): sample_mapping = { diff --git a/eva_submission/vep_utils.py b/eva_submission/vep_utils.py index ca482c3..1d67354 100644 --- a/eva_submission/vep_utils.py +++ b/eva_submission/vep_utils.py @@ -198,9 +198,13 @@ def get_species_and_assembly(assembly_acc): @retry(tries=4, delay=2, backoff=1.2, jitter=(1, 3), logger=logger) def get_ftp_connection(url): - ftp = ftplib.FTP(url) - ftp.login() - return ftp + try: + ftp = ftplib.FTP(url) + ftp.login() + return ftp + except Exception as e: + logger.error(f'There was an issue accessing {url}') + raise e @retry(tries=8, delay=2, backoff=1.2, jitter=(1, 3), logger=logger) @@ -306,7 +310,7 @@ def download_and_install_vep_version(vep_version): for chunk in response.iter_content(chunk_size=8192): file.write(chunk) else: - raise (f'Error downloading Vep installation files for vep version {vep_version}') + raise Exception(f'Error downloading Vep installation files for vep version {vep_version}') # Unzip the Vep version with zipfile.ZipFile(destination, 'r') as zip_ref: diff --git a/tests/test_biosamples_submission.py b/tests/test_biosamples_submission.py index 206acc0..271dea1 100644 --- a/tests/test_biosamples_submission.py +++ b/tests/test_biosamples_submission.py @@ -12,7 +12,7 @@ from eva_submission import ROOT_DIR from eva_submission.biosample_submission import biosamples_submitters from eva_submission.biosample_submission.biosamples_submitters import BioSamplesSubmitter, SampleMetadataSubmitter, \ - SampleReferenceSubmitter + SampleReferenceSubmitter, SampleJSONSubmitter class BSDTestCase(TestCase): @@ -56,22 +56,24 @@ def tearDown(self): 'release': '2020-07-06T19:09:29.090Z'} ] - +# BioSamples does not support AAP login so we have to use credentials from Webin. +@pytest.mark.skip(reason='You need to set a config file with correct Webin credential to run these test') class TestBSDSubmitter(BSDTestCase): """ Integration tests that will contact a test server for BSD. """ def setUp(self) -> None: - file_name = os.path.join(self.resources_folder, 'bsd_submission.yaml') + file_name = os.path.join(self.resources_folder, 'bsd_webin_submission.yaml') self.config = None if os.path.isfile(file_name): with open(file_name) as open_file: self.config = yaml.safe_load(open_file) self.sample_data = deepcopy(sample_data) - self.communicator = AAPHALCommunicator(self.config.get('aap_url'), self.config.get('bsd_url'), - self.config.get('username'), self.config.get('password'), - self.config.get('domain')) + + self.communicator = WebinHALCommunicator(self.config.get('webin_url'), self.config.get('bsd_url'), + self.config.get('webin_username'), + self.config.get('webin_password')) self.submitter = BioSamplesSubmitter([self.communicator]) def test_validate_in_bsd(self): @@ -363,3 +365,30 @@ def test_not_override_samples(self): sample_submitter.submit_to_bioSamples() m_follows_link.assert_not_called() +class TestSampleJSONSubmitter(BSDTestCase): + + def test_convert_json_tobsd_json(self): + bio_sample_object1 = { + 'characteristics': { + 'description': [{'text': 'yellow croaker sample 12'}], + 'geographic location (country and/or sea)': [{'text': 'China'}], + 'scientific name': [{'text': 'Larimichthys polyactis'}], + 'geographic location (region and locality)': [{'text': 'East China Sea,Liuheng, Putuo, Zhejiang'}], + 'submission title': [{ + 'text': 'Characterization of a large dataset of SNPs in Larimichthys polyactis using high throughput 2b-RAD sequencing'}], + 'database name': [{'text': 'PRJNA592281'}], + }, + 'name': 'LH1', + 'taxId': '334908' + } + json_data = {'sample': [ + {'analysisAlias': 'alias1', 'sampleInVCF': 'S1', 'bioSampleAccession': 'SAME000001'}, + {'analysisAlias': 'alias1', 'sampleInVCF': 'S1', 'bioSampleObject': bio_sample_object1} + ], 'submitterDetails': [ + {'lastName': 'Doe', 'firstName': 'Jane', 'email': 'jane.doe@example.com', 'laboratory': 'Lab', + 'address': '5 common road' } + ] + + } + self.submitter = SampleJSONSubmitter(json_data) + print(self.submitter._convert_json_to_bsd_json()) diff --git a/tests/test_sub_cli_processing/test_process_jobs.py b/tests/test_sub_cli_processing/test_process_jobs.py index 832f545..09575bd 100644 --- a/tests/test_sub_cli_processing/test_process_jobs.py +++ b/tests/test_sub_cli_processing/test_process_jobs.py @@ -34,10 +34,9 @@ def test_report(self): json_data = [ {'submissionId': 'sub123', 'status': 'UPLOADED', 'uploadedTime': '2024-05-12'} ] - with patch_get(json_data) as m_get, patch('builtins.print') as m_print: scanner.report() m_get.assert_called_once_with('https://test.com/admin/submissions/status/UPLOADED', auth=('admin', 'password')) - m_print.assert_any_call('| Submission Id | Submission status | Uploaded time |') - m_print.assert_any_call('| sub123 | UPLOADED | 2024-05-12 |') + m_print.assert_any_call('| Submission Id | Submission status | Processing step | Processing status | Last updated time | Priority |') + m_print.assert_any_call('| sub123 | UPLOADED | VALIDATION | READY_FOR_PROCESSING | 2024-05-12 | 5 |')