Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

EVA-3696 - Processing via a scanner and new brokering method #232

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,7 @@ work
*.class
*.jar
*.log
*.err
*.err

# Might contain real credentials
tests/resources/bsd_webin_submission.yaml
121 changes: 78 additions & 43 deletions eva_sub_cli_processing/process_jobs.py
Original file line number Diff line number Diff line change
@@ -1,34 +1,11 @@
from datetime import datetime

import requests
from ebi_eva_common_pyutils.common_utils import pretty_print
from ebi_eva_common_pyutils.config import cfg
from ebi_eva_common_pyutils.logger import AppLogger
from retry import retry


def _url_build(*args, **kwargs):
url = cfg.query('submissions', 'webservice', 'url') + '/' + '/'.join(args)
if kwargs:
return url + '?' + '&'.join(f'{k}={v}' for k, v in kwargs.items())
else:
return url


@retry(tries=5, backoff=2, jitter=.5)
def _get_submission_api(url):
auth = (cfg.query('submissions', 'webservice', 'admin_username'), cfg.query('submissions', 'webservice', 'admin_password'))
response = requests.get(url, auth=auth)
response.raise_for_status()
return response.json()


@retry(tries=5, backoff=2, jitter=.5)
def _put_submission_api(url):
auth = (cfg.query('submissions', 'webservice', 'admin_username'), cfg.query('submissions', 'webservice', 'admin_password'))
response = requests.put(url, auth=auth)
response.raise_for_status()
return response.json()
from eva_sub_cli_processing.sub_cli_brokering import SubCliProcessBrokering
from eva_sub_cli_processing.sub_cli_ingestion import SubCliProcessIngestion
from eva_sub_cli_processing.sub_cli_utils import get_from_sub_ws, sub_ws_url_build, VALIDATION, READY_FOR_PROCESSING, \
PROCESSING, BROKERING, INGESTION, SUCCESS, FAILURE, put_to_sub_ws
from eva_sub_cli_processing.sub_cli_validation import SubCliProcessValidation


def process_submissions():
Expand All @@ -53,47 +30,105 @@ def _process_submission(submission):
class SubmissionScanner(AppLogger):

statuses = []
step_statuses = []

def scan(self):
def _scan_per_status(self):
submissions = []
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was initially very confused about why we needed to scan both tables, before I realised that this scan is (I think) only used to add the first processing step. If that's the case, then maybe it could be a bit less generic and used only in that specific situation - even if we used it for other operations (e.g. scanning for cancelled submissions to clean up the db or something), I don't think creating a SubmissionStep for validation would make sense in those cases.

for status in self.statuses:
for submission_data in _get_submission_api(_url_build('admin', 'submissions', 'status', status)):
submissions.append(Submission(
for submission_data in get_from_sub_ws(sub_ws_url_build('admin', 'submissions', 'status', status)):
submissions.append(SubmissionStep(
submission_id=submission_data.get('submissionId'),
submission_status=submission_data.get('status'),
uploaded_time=submission_data.get('uploadedTime')
status=submission_data.get('status'),
processing_step=VALIDATION,
processing_status=READY_FOR_PROCESSING,
last_update_time=submission_data.get('uploadedTime'),
priority=5
))
return submissions

def _scan_per_step_status(self):
submissions = []
for step, status in self.step_statuses:
for submission_step_data in get_from_sub_ws(sub_ws_url_build('admin', 'submission-processes', step, status)):
submissions.append(SubmissionStep(
submission_id=submission_step_data.get('submissionId'),
status=PROCESSING,
processing_step=step,
processing_status=status,
last_update_time=submission_step_data.get('lastUpdateTime'),
priority=submission_step_data.get('priority')
))
return submissions

def scan(self):
return self._scan_per_status() + self._scan_per_step_status()

def report(self):
header = ['Submission Id', 'Submission status', 'Uploaded time']
header = ['Submission Id', 'Submission status', 'Processing step', 'Processing status', 'Last updated time',
'Priority']
scan = self.scan()
lines = []
for submission in scan:
lines.append((submission.submission_id, submission.submission_status, str(submission.uploaded_time)))
lines.append((submission.submission_id, submission.submission_status, submission.processing_step,
submission.processing_status, str(submission.last_update_time), str(submission.priority)))
pretty_print(header, lines)


class NewSubmissionScanner(SubmissionScanner):

statuses = ['UPLOADED']
step_statuses = []

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is mostly a matter of taste, but I think I would prefer the different scanning tasks as methods rather than classes. So we would have just one SubmissionScanner with the generic _scan_per_step_status helper method, and find_new_submissions, find_completed_submission_steps, etc. that call that method with the relevant statuses.

On the other hand, maybe there's more functionality that would go into these subclasses that I'm not thinking of, in which case having the extra classes makes sense.


class Submission(AppLogger):
class SubmissionStep(AppLogger):

def __init__(self, submission_id, submission_status, uploaded_time):
def __init__(self, submission_id, status, processing_step, processing_status, last_update_time, priority):
self.submission_id = submission_id
self.submission_status = submission_status
self.uploaded_time = uploaded_time
self.submission_status = status
self.processing_step = processing_step
self.processing_status = processing_status
self.last_update_time = last_update_time
self.priority = priority

def start(self):
response = _put_submission_api(_url_build('admin', 'submission', self.submission_id, 'status', 'PROCESSING'))
self._set_next_step()
self._update_submission_ws()
self.submit_pipeline()

def submit_pipeline(self):
# TODO: Actually submit a job for this submission
pass
assert self.processing_status == READY_FOR_PROCESSING
# TODO: These jobs needs to be submitted as independent processes
if self.processing_step == VALIDATION:
process = SubCliProcessValidation(self.submission_id)
elif self.processing_step == BROKERING:
process = SubCliProcessBrokering(self.submission_id)
elif self.processing_step == INGESTION:
process = SubCliProcessIngestion(self.submission_id)
process.start()

def _set_next_step(self):
if self.submission_status != PROCESSING and not self.processing_step:
self.submission_status = PROCESSING
self.processing_step = VALIDATION
self.processing_status = READY_FOR_PROCESSING
elif self.processing_status == SUCCESS and self.processing_step == VALIDATION:
self.processing_step = BROKERING
self.processing_status = READY_FOR_PROCESSING
elif self.processing_status == SUCCESS and self.processing_step == BROKERING:
self.processing_step = INGESTION
self.processing_status = READY_FOR_PROCESSING
elif self.processing_status == FAILURE:
# TODO: Is there something we need to do before restarting a failed job
self.processing_status = READY_FOR_PROCESSING

def _update_submission_ws(self):
put_to_sub_ws(sub_ws_url_build('admin', 'submission', self.submission_id, 'status', self.submission_status))
put_to_sub_ws('admin', 'submission-process', self.submission_id, self.processing_step, self.processing_status)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing call to sub_ws_url_build

def __repr__(self):
return f'Submission(submission_id={self.submission_id}, submission_status={self.submission_status}, ' \
f'uploaded_time={self.uploaded_time})'
f'processing_step={self.processing_step}, processing_status={self.processing_status}' \
f'last_update_time={self.last_update_time})'



30 changes: 30 additions & 0 deletions eva_sub_cli_processing/sub_cli_brokering.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
import csv
import os
import shutil

from eva_submission.ENA_submission.upload_to_ENA import ENAUploader, ENAUploaderAsync
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not all imports are being used

from eva_submission.biosample_submission.biosamples_submitters import SampleReferenceSubmitter, \
SampleSubmitter, SampleJSONSubmitter
from eva_sub_cli_processing.sub_cli_submission import SubCliProcess


class SubCliProcessBrokering(SubCliProcess):

def start(self):
"""Run the brokering process"""
self.upload_to_bioSamples()

def upload_to_bioSamples(self, ):
sample_submitter = SampleJSONSubmitter(('create',), metadata_json=self.submission_detail.get('metadataJson'))
sample_name_to_accession = sample_submitter.submit_to_bioSamples()
# Check whether all samples have been accessioned
passed = (
bool(sample_name_to_accession)
and all(sample_name in sample_name_to_accession for sample_name in sample_submitter.all_sample_names())
)
if not passed:
raise ValueError(f'Not all samples were successfully brokered to BioSamples! '
f'Found {len(sample_name_to_accession)} and expected '
f'{len(sample_submitter.all_sample_names())}. '
f'Missing samples are '
f'{[sample_name for sample_name in sample_submitter.all_sample_names() if sample_name not in sample_name_to_accession]}')
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This process needs to set the processing status to RUNNING, SUCCESS or FAILURE, correct? Or is that someone else's responsibility?

8 changes: 8 additions & 0 deletions eva_sub_cli_processing/sub_cli_ingestion.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
from eva_sub_cli_processing.sub_cli_submission import SubCliProcess


class SubCliProcessIngestion(SubCliProcess):

def start(self):
pass

49 changes: 49 additions & 0 deletions eva_sub_cli_processing/sub_cli_submission.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
import os
import random
import string
from datetime import datetime

from cached_property import cached_property
from ebi_eva_common_pyutils.config import cfg
from ebi_eva_common_pyutils.logger import AppLogger
from ebi_eva_common_pyutils.logger import logging_config as log_cfg
from ebi_eva_internal_pyutils.metadata_utils import get_metadata_connection_handle

from eva_sub_cli_processing.sub_cli_utils import sub_ws_url_build, get_from_sub_ws

submission_logging_files = set()


class SubCliProcess(AppLogger):
def __init__(self, submission_id: str):
self.submission_id = submission_id
self.submission_dir = os.path.abspath(os.path.join(cfg['submission_dir'], self.submission_id))
os.makedirs(self.submission_dir, exist_ok=True)
self.create_log_file()

@cached_property
def submission_detail(self):
return get_from_sub_ws(sub_ws_url_build('admin', 'submission', self.submission_id))

def create_nextflow_temp_output_directory(self, base=None):
random_string = ''.join(random.choice(string.ascii_letters) for i in range(6))
if base is None:
output_dir = os.path.join(self.submission_dir, 'nextflow_output_' + random_string)
else:
output_dir = os.path.join(base, 'nextflow_output_' + random_string)
os.makedirs(output_dir)
return output_dir

@cached_property
def now(self):
return datetime.now()

def create_log_file(self):
logfile_name = os.path.join(self.submission_dir, "submission.log")
if logfile_name not in submission_logging_files:
log_cfg.add_file_handler(logfile_name)
submission_logging_files.add(logfile_name)

def start(self):
raise NotImplementedError

55 changes: 55 additions & 0 deletions eva_sub_cli_processing/sub_cli_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
import requests
from ebi_eva_common_pyutils.config import cfg
from retry import retry

# Submission statuses
OPEN = 'OPEN'
UPLOADED = 'UPLOADED'
COMPLETED = 'COMPLETED'
TIMEOUT = 'TIMEOUT'
FAILED = 'FAILED'
CANCELLED = 'CANCELLED'
PROCESSING = 'PROCESSING'

# Processing steps
VALIDATION = 'VALIDATION'
BROKERING = 'BROKERING'
INGESTION = 'INGESTION'
PROCESSING_STEPS = [VALIDATION, BROKERING, INGESTION]

# Processing statuses
READY_FOR_PROCESSING = 'READY_FOR_PROCESSING'
FAILURE = 'FAILURE'
SUCCESS = 'SUCCESS'
RUNNING = 'RUNNING'
ON_HOLD = 'ON_HOLD'
PROCESSING_STATUS = [READY_FOR_PROCESSING, FAILURE, SUCCESS, RUNNING, ON_HOLD]


def sub_ws_auth():
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you think it's worth extracting the submission WS client into common-pyutils, so it can be used in both eva-sub-cli and eva-submission? It's some extra refactoring, but I think it could be beneficial in the long run to keep python interactions with the submission WS in one place.

return (
cfg.query('submissions', 'webservice', 'admin_username'),
cfg.query('submissions', 'webservice', 'admin_password')
)


def sub_ws_url_build(*args, **kwargs):
url = cfg.query('submissions', 'webservice', 'url') + '/' + '/'.join(args)
if kwargs:
return url + '?' + '&'.join(f'{k}={v}' for k, v in kwargs.items())
else:
return url


@retry(tries=5, backoff=2, jitter=.5)
def get_from_sub_ws(url):
response = requests.get(url, auth=sub_ws_auth())
response.raise_for_status()
return response.json()


@retry(tries=5, backoff=2, jitter=.5)
def put_to_sub_ws(url):
response = requests.put(url, auth=sub_ws_auth())
response.raise_for_status()
return response.json()
11 changes: 11 additions & 0 deletions eva_sub_cli_processing/sub_cli_validation.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
from eva_sub_cli_processing.sub_cli_submission import SubCliProcess


class SubCliProcessValidation(SubCliProcess):

all_validation_tasks = ['metadata_check', 'assembly_check', 'aggregation_check', 'vcf_check', 'sample_check',
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will we use these tasks for sub cli processing?

'structural_variant_check', 'naming_convention_check']

def start(self):
pass

Loading
Loading