Skip to content

Commit

Permalink
processing via a scanner and new brokering method
Browse files Browse the repository at this point in the history
  • Loading branch information
tcezard committed Dec 17, 2024
1 parent e265b27 commit b61560d
Show file tree
Hide file tree
Showing 9 changed files with 47 additions and 115 deletions.
5 changes: 4 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,7 @@ work
*.class
*.jar
*.log
*.err
*.err

# Might contain real credentials
tests/resources/bsd_webin_submission.yaml
13 changes: 7 additions & 6 deletions eva_sub_cli_processing/process_jobs.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
from ebi_eva_common_pyutils.common_utils import pretty_print
from ebi_eva_common_pyutils.logger import AppLogger

from eva_sub_cli_processing.sub_cli_brokering import SubCliBrokering
from eva_sub_cli_processing.sub_cli_ingestion import SubCliIngestion
from eva_sub_cli_processing.sub_cli_brokering import SubCliProcessBrokering
from eva_sub_cli_processing.sub_cli_ingestion import SubCliProcessIngestion
from eva_sub_cli_processing.sub_cli_utils import get_from_sub_ws, sub_ws_url_build, VALIDATION, READY_FOR_PROCESSING, \
PROCESSING, BROKERING, INGESTION, SUCCESS, FAILURE, put_to_sub_ws
from eva_sub_cli_processing.sub_cli_validation import SubCliValidation
from eva_sub_cli_processing.sub_cli_validation import SubCliProcessValidation


def process_submissions():
Expand Down Expand Up @@ -99,11 +99,12 @@ def submit_pipeline(self):
assert self.processing_status == READY_FOR_PROCESSING
# TODO: These jobs needs to be submitted as independent processes
if self.processing_step == VALIDATION:
SubCliValidation(self.submission_id).validate()
process = SubCliProcessValidation(self.submission_id)
elif self.processing_step == BROKERING:
SubCliBrokering(self.submission_id).broker()
process = SubCliProcessBrokering(self.submission_id)
elif self.processing_step == INGESTION:
SubCliIngestion(self.submission_id).ingest()
process = SubCliProcessIngestion(self.submission_id)
process.start()

def _set_next_step(self):
if self.submission_status != PROCESSING and not self.processing_step:
Expand Down
87 changes: 8 additions & 79 deletions eva_sub_cli_processing/sub_cli_brokering.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,77 +4,18 @@

from eva_submission.ENA_submission.upload_to_ENA import ENAUploader, ENAUploaderAsync
from eva_submission.biosample_submission.biosamples_submitters import SampleReferenceSubmitter, \
SampleSubmitter
from eva_sub_cli_processing.sub_cli_submission import SubCli
SampleSubmitter, SampleJSONSubmitter
from eva_sub_cli_processing.sub_cli_submission import SubCliProcess


class SubCliBrokering(SubCli):
class SubCliProcessBrokering(SubCliProcess):

def broker(self, brokering_tasks_to_force=None, existing_project=None, async_upload=False, dry_ena_upload=False):
def start(self):
"""Run the brokering process"""
self.upload_to_bioSamples()
self.broker_to_ena(force=('ena' in brokering_tasks_to_force), existing_project=existing_project,
async_upload=async_upload, dry_ena_upload=dry_ena_upload)
self.update_biosamples_with_study(force=('update_biosamples' in brokering_tasks_to_force))

def prepare_brokering(self, force=False):
valid_analyses = self.eload_cfg.query('validation', 'valid', 'analyses', ret_default=[])
if not all([
self.eload_cfg.query('brokering', 'analyses', analysis, 'vcf_files')
for analysis in valid_analyses
]) or force:
output_dir = self._run_brokering_prep_workflow()
self._collect_brokering_prep_results(output_dir)
shutil.rmtree(output_dir)
else:
self.info('Preparation has already been run, Skip!')

def broker_to_ena(self, force=False, existing_project=None, async_upload=False, dry_ena_upload=False):
if not self.eload_cfg.query('brokering', 'ena', 'pass') or force:
ena_spreadsheet = os.path.join(self._get_dir('ena'), 'metadata_spreadsheet.xlsx')
# Set the project in the metadata sheet which is then converted to XML
self.update_metadata_spreadsheet(self.eload_cfg['validation']['valid']['metadata_spreadsheet'],
ena_spreadsheet, existing_project)
if async_upload:
ena_uploader = ENAUploaderAsync(self.eload, ena_spreadsheet, self._get_dir('ena'))
else:
ena_uploader = ENAUploader(self.eload, ena_spreadsheet, self._get_dir('ena'))

if ena_uploader.converter.is_existing_project:
# Set the project in the config, based on the spreadsheet
self.eload_cfg.set('brokering', 'ena', 'PROJECT', value=ena_uploader.converter.existing_project)
self.eload_cfg.set('brokering', 'ena', 'existing_project', value=True)

# Upload the VCF to ENA FTP
files_to_upload = []
analyses = self.eload_cfg['brokering']['analyses']
for analysis in analyses:
for vcf_file_name in analyses[analysis]['vcf_files']:
vcf_file_info = self.eload_cfg['brokering']['analyses'][analysis]['vcf_files'][vcf_file_name]
files_to_upload.append(vcf_file_info['output_vcf_file'])
files_to_upload.append(vcf_file_info['csi'])
if dry_ena_upload:
self.info(f'Would have uploaded the following files to FTP: \n' + "\n".join(files_to_upload))
else:
ena_uploader.upload_vcf_files_to_ena_ftp(files_to_upload)
# Upload XML to ENA
ena_uploader.upload_xml_files_to_ena(dry_ena_upload)
if not dry_ena_upload:
# Update the project accession in case we're working with existing project
# We should not be uploading additional analysis in th same ELOAD so no need to update
pre_existing_project = self.eload_cfg.query('brokering', 'ena', 'PROJECT')
if pre_existing_project and 'PROJECT' not in ena_uploader.results:
ena_uploader.results['PROJECT'] = pre_existing_project
self.eload_cfg.set('brokering', 'ena', value=ena_uploader.results)
self.eload_cfg.set('brokering', 'ena', 'date', value=self.now)
self.eload_cfg.set('brokering', 'ena', 'hold_date', value=ena_uploader.converter.hold_date)
self.eload_cfg.set('brokering', 'ena', 'pass', value=not bool(ena_uploader.results['errors']))
else:
self.info('Brokering to ENA has already been run, Skip!')

def upload_to_bioSamples(self, force=False):
sample_submitter = SampleSubmitter(('create',))
sample_submitter.sample_data = self.submission.submission_detail.get('samples')
def upload_to_bioSamples(self, ):
sample_submitter = SampleJSONSubmitter(('create',), metadata_json=self.submission_detail.get('metadataJson'))
sample_name_to_accession = sample_submitter.submit_to_bioSamples()
# Check whether all samples have been accessioned
passed = (
Expand All @@ -84,18 +25,6 @@ def upload_to_bioSamples(self, force=False):
if not passed:
raise ValueError(f'Not all samples were successfully brokered to BioSamples! '
f'Found {len(sample_name_to_accession)} and expected '
f'{len(sample_metadata_submitter.all_sample_names())}. '
f'{len(sample_submitter.all_sample_names())}. '
f'Missing samples are '
f'{[sample_name for sample_name in sample_metadata_submitter.all_sample_names() if sample_name not in sample_name_to_accession]}')

def update_biosamples_with_study(self, force=False):
if not self.eload_cfg.query('brokering', 'Biosamples', 'backlinks') or force:
biosample_accession_list = self.eload_cfg.query('brokering', 'Biosamples', 'Samples').values()
project_accession = self.eload_cfg.query('brokering', 'ena', 'PROJECT')
if project_accession:
self.info(f'Add external reference to {len(biosample_accession_list)} BioSamples.')
sample_reference_submitter = SampleReferenceSubmitter(biosample_accession_list, project_accession)
sample_reference_submitter.submit_to_bioSamples()
self.eload_cfg.set('brokering', 'Biosamples', 'backlinks', value=project_accession)
else:
self.info('Adding external reference to BioSamples has already been done, Skip!')
f'{[sample_name for sample_name in sample_submitter.all_sample_names() if sample_name not in sample_name_to_accession]}')
6 changes: 3 additions & 3 deletions eva_sub_cli_processing/sub_cli_ingestion.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
from eva_sub_cli_processing.sub_cli_submission import SubCli
from eva_sub_cli_processing.sub_cli_submission import SubCliProcess


class SubCliIngestion(SubCli):
class SubCliProcessIngestion(SubCliProcess):

def ingest(self):
def start(self):
pass

12 changes: 6 additions & 6 deletions eva_sub_cli_processing/sub_cli_submission.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,12 @@
from ebi_eva_common_pyutils.logger import logging_config as log_cfg
from ebi_eva_internal_pyutils.metadata_utils import get_metadata_connection_handle

from eva_sub_cli_processing.sub_cli_utils import sub_ws_url_build, get_from_sub_ws

submission_logging_files = set()


class SubCli(AppLogger):
class SubCliProcess(AppLogger):
def __init__(self, submission_id: str):
self.submission_id = submission_id
self.submission_dir = os.path.abspath(os.path.join(cfg['submission_dir'], self.submission_id))
Expand All @@ -22,11 +23,7 @@ def __init__(self, submission_id: str):

@cached_property
def submission_detail(self):
return _get_submission_api(_url_build('admin', 'submission', self.submission_id))

@property
def metadata_connection_handle(self):
return get_metadata_connection_handle(cfg['maven']['environment'], cfg['maven']['settings_file'])
return get_from_sub_ws(sub_ws_url_build('admin', 'submission', self.submission_id))

def create_nextflow_temp_output_directory(self, base=None):
random_string = ''.join(random.choice(string.ascii_letters) for i in range(6))
Expand All @@ -47,3 +44,6 @@ def create_log_file(self):
log_cfg.add_file_handler(logfile_name)
submission_logging_files.add(logfile_name)

def start(self):
raise NotImplementedError

6 changes: 3 additions & 3 deletions eva_sub_cli_processing/sub_cli_validation.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
from eva_sub_cli_processing.sub_cli_submission import SubCli
from eva_sub_cli_processing.sub_cli_submission import SubCliProcess


class SubCliValidation(SubCli):
class SubCliProcessValidation(SubCliProcess):

all_validation_tasks = ['metadata_check', 'assembly_check', 'aggregation_check', 'vcf_check', 'sample_check',
'structural_variant_check', 'naming_convention_check']

def validate(self):
def start(self):
pass

9 changes: 1 addition & 8 deletions eva_submission/biosample_submission/biosamples_submitters.py
Original file line number Diff line number Diff line change
Expand Up @@ -368,13 +368,6 @@ def __init__(self, metadata_json, submit_type=('create',)):
self.metadata_json = metadata_json
self.sample_data = self._convert_json_to_bsd_json()

@staticmethod
def serialize(value):
"""Create a text representation of the value provided"""
if isinstance(value, date):
return value.strftime('%Y-%m-%d')
return str(value)

def _convert_json_to_bsd_json(self):
payloads = []
for sample in self.metadata_json.get('sample'):
Expand Down Expand Up @@ -410,7 +403,7 @@ def _convert_json_to_bsd_json(self):
def already_submitted_sample_names_to_accessions(self):
if self.check_submit_done():
return dict([
(sample_row.get('Sample ID'), sample_row.get('Sample Accession')) for sample_row in self.reader.samples
(sample_json.get('Sample ID'), sample_json.get('sampleAccession')) for sample_json in self.metadata_json.get('sample')
])

def all_sample_names(self):
Expand Down
12 changes: 8 additions & 4 deletions eva_submission/vep_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -198,9 +198,13 @@ def get_species_and_assembly(assembly_acc):

@retry(tries=4, delay=2, backoff=1.2, jitter=(1, 3), logger=logger)
def get_ftp_connection(url):
ftp = ftplib.FTP(url)
ftp.login()
return ftp
try:
ftp = ftplib.FTP(url)
ftp.login()
return ftp
except Exception as e:
logger.error(f'There was an issue accessing {url}')
raise e


@retry(tries=8, delay=2, backoff=1.2, jitter=(1, 3), logger=logger)
Expand Down Expand Up @@ -306,7 +310,7 @@ def download_and_install_vep_version(vep_version):
for chunk in response.iter_content(chunk_size=8192):
file.write(chunk)
else:
raise (f'Error downloading Vep installation files for vep version {vep_version}')
raise Exception(f'Error downloading Vep installation files for vep version {vep_version}')

# Unzip the Vep version
with zipfile.ZipFile(destination, 'r') as zip_ref:
Expand Down
12 changes: 7 additions & 5 deletions tests/test_biosamples_submission.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,22 +56,24 @@ def tearDown(self):
'release': '2020-07-06T19:09:29.090Z'}
]


# BioSamples does not support AAP login so we have to use credentials from Webin.
@pytest.mark.skip(reason='You need to set a config file with correct Webin credential to run these test')
class TestBSDSubmitter(BSDTestCase):
"""
Integration tests that will contact a test server for BSD.
"""

def setUp(self) -> None:
file_name = os.path.join(self.resources_folder, 'bsd_submission.yaml')
file_name = os.path.join(self.resources_folder, 'bsd_webin_submission.yaml')
self.config = None
if os.path.isfile(file_name):
with open(file_name) as open_file:
self.config = yaml.safe_load(open_file)
self.sample_data = deepcopy(sample_data)
self.communicator = AAPHALCommunicator(self.config.get('aap_url'), self.config.get('bsd_url'),
self.config.get('username'), self.config.get('password'),
self.config.get('domain'))

self.communicator = WebinHALCommunicator(self.config.get('webin_url'), self.config.get('bsd_url'),
self.config.get('webin_username'),
self.config.get('webin_password'))
self.submitter = BioSamplesSubmitter([self.communicator])

def test_validate_in_bsd(self):
Expand Down

0 comments on commit b61560d

Please sign in to comment.