Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

EVA-3337 - Refactor to separate the auth from the submission #11

Merged
merged 4 commits into from
Sep 26, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
95 changes: 95 additions & 0 deletions cli/auth.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
import json
from functools import lru_cache
from getpass import getpass

import requests
from ebi_eva_common_pyutils.logger import AppLogger
from urllib3.exceptions import ResponseError

from cli import LSRI_CLIENT_ID

ENA_AUTH_URL = "https://www.ebi.ac.uk/ena/submit/webin/auth/token",
LSRI_AUTH_URL = "http://www.ebi.ac.uk/eva/v1/submission/auth/lsri"
DEVICE_AUTHORISATION_URL ="https://login.elixir-czech.org/oidc/devicecode"


class LSRIAuth(AppLogger):
def __init__(self, client_id=LSRI_CLIENT_ID, device_authorization_url=DEVICE_AUTHORISATION_URL, auth_url=LSRI_AUTH_URL):
self.client_id = client_id
self.device_authorization_url = device_authorization_url
self.auth_url = auth_url

def token(self):
# Step 1: Get device code using device auth url
payload = {
'client_id': self.client_id,
'scope': 'openid'
}
response = requests.post(self.device_authorization_url, data=payload)
response_json = response.json()

device_code = response_json['device_code']
user_code = response_json['user_code']
verification_uri = response_json['verification_uri']
expires_in = response_json['expires_in']

# Display the user code and verification URI to the user
print(f'Please visit {verification_uri} and enter this user code: {user_code}')
# Delegate subsequent post-authentication processing (which requires LSRI client secret) to eva-submission-ws
# so that we can avoid storing that client secret in eva-sub-cli
response = requests.post(self.auth_url, timeout=expires_in,
headers={'Accept': 'application/hal+json'},
params={"deviceCode": device_code, "expiresIn": expires_in})
if response.status_code == 200:
self.info("LSRI authentication successful!")
return response.text
else:
raise ResponseError('LSRI Authentication Error')


class WebinAuth(AppLogger):

def __init__(self, ena_auth_url=ENA_AUTH_URL):
self.ena_auth_url = ena_auth_url

@lru_cache
apriltuesday marked this conversation as resolved.
Show resolved Hide resolved
def token(self):
self.info("Proceeding with ENA Webin authentication...")
username, password = self._get_webin_username_password()
headers = {"accept": "*/*", "Content-Type": "application/json"}
data = {"authRealms": ["ENA"], "username": username, "password": password}
response = requests.post(self.ena_auth_url, headers=headers, data=json.dumps(data))

if response.status_code == 200:
self.info("Webin authentication successful!")
return response.text
else:
raise ResponseError('Webin Authentication Error: Username or password not recognised')
tcezard marked this conversation as resolved.
Show resolved Hide resolved

def _get_webin_username_password(self):
username = input("Enter your ENA Webin username: ")
password = getpass("Enter your ENA Webin password: ")
return username, password

# Global auth for the session
auth = None


def get_auth():
global auth
if auth:
return auth
print("Choose an authentication method:")
print("1. ENA Webin")
print("2. LSRI")

choice = int(input("Enter the number corresponding to your choice: "))

if choice == 1:
auth = WebinAuth()
elif choice == 2:
auth = LSRIAuth()
apriltuesday marked this conversation as resolved.
Show resolved Hide resolved
else:
print("Invalid choice! Try again!")
get_auth()
return auth
30 changes: 0 additions & 30 deletions cli/lsri_auth.py

This file was deleted.

79 changes: 13 additions & 66 deletions cli/submit.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,85 +2,32 @@
import json
import requests

from cli import LSRI_CLIENT_ID
from ebi_eva_common_pyutils.logger import logging_config
from getpass import getpass
from cli.lsri_auth import LSRIAuth
from cli.auth import get_auth

logger = logging_config.get_logger(__name__)
ENA_AUTH_URL="https://www.ebi.ac.uk/ena/submit/webin/auth/token",
SUBMISSION_INITIATE_WEBIN_URL ="http://www.ebi.ac.uk/eva/v1/submission/initiate/webin",
SUBMISSION_INITIATE_LSRI_URL ="http://www.ebi.ac.uk/eva/v1/submission/initiate/lsri"
SUBMISSION_INITIATE_URL = "http://www.ebi.ac.uk/eva/v1/submission/initiate"


class StudySubmitter:
def __init__(self, ena_auth_url=ENA_AUTH_URL, submission_initiate_webin_url = SUBMISSION_INITIATE_WEBIN_URL,
submission_initiate_lsri_url = SUBMISSION_INITIATE_LSRI_URL):
self.ena_auth_url = ena_auth_url
self.submission_initiate_webin_url = submission_initiate_webin_url
self.submission_initiate_lsri_url = submission_initiate_lsri_url

def submit_with_lsri_auth(self):
logger.info("Proceeding with LSRI authentication...")
# For now, it is OK for client ID to be hardcoded because, unlike client secret, it is not sensitive information
response = LSRIAuth(client_id=LSRI_CLIENT_ID,
device_authorization_url="https://login.elixir-czech.org/oidc/devicecode",
submission_initiation_url=self.submission_initiate_lsri_url).get_auth_response()
if response.status_code == 200:
logger.info("LSRI authentication successful!")
response_json = json.loads(response.text)
logger.info("Submission ID {} received!!".format(response_json["submissionId"]))
self.upload_submission(response_json["submissionId"], response_json["uploadUrl"])
else:
raise RuntimeError("Could not perform LSRI Authentication! Please try running this script again.")
def __init__(self, submission_initiate_url=SUBMISSION_INITIATE_URL):
self.auth = get_auth()
self.submission_initiate_url = submission_initiate_url

# TODO
def upload_submission(self, submission_id, submission_upload_url):
pass

@staticmethod
def _get_webin_credentials():
username = input("Enter your ENA Webin username: ")
password = getpass("Enter your ENA Webin password: ")
return username, password

def submit_with_webin_auth(self, username, password):
logger.info("Proceeding with ENA Webin authentication...")

headers = {"accept": "*/*", "Content-Type": "application/json"}
data = {"authRealms": ["ENA"], "username": username, "password": password}
response = requests.post(self.ena_auth_url, headers=headers, data=json.dumps(data))

if response.status_code == 200:
logger.info("Webin authentication successful!")
webin_token = response.text
response = requests.post(self.submission_initiate_webin_url,
headers={'Accept': 'application/hal+json',
'Authorization': 'Bearer ' + webin_token})
response_json = json.loads(response.text)
logger.info("Submission ID {} received!!".format(response_json["submissionId"]))
self.upload_submission(response_json["submissionId"], response_json["uploadUrl"])
else:
logger.error("Authentication failed!")

def auth_prompt(self):
print("Choose an authentication method:")
print("1. ENA Webin")
print("2. LSRI")

choice = int(input("Enter the number corresponding to your choice: "))

if choice == 1:
webin_username, webin_password = StudySubmitter._get_webin_credentials()
self.submit_with_webin_auth(webin_username, webin_password)
elif choice == 2:
self.submit_with_lsri_auth()
else:
logger.error("Invalid choice! Try again!")
self.auth_prompt()
def submit(self):
response = requests.post(self.submission_initiate_url,
headers={'Accept': 'application/hal+json',
'Authorization': 'Bearer ' + self.auth.token()})
response_json = json.loads(response.text)
apriltuesday marked this conversation as resolved.
Show resolved Hide resolved
logger.info("Submission ID {} received!!".format(response_json["submissionId"]))
self.upload_submission(response_json["submissionId"], response_json["uploadUrl"])


if __name__ == "__main__":
logging_config.add_stdout_handler()
submitter = StudySubmitter()
submitter.auth_prompt()
submitter.submit()
66 changes: 66 additions & 0 deletions tests/test_auth.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
import json
import unittest
from unittest.mock import MagicMock, patch

from cli import LSRI_CLIENT_ID
from cli.auth import WebinAuth, LSRIAuth


class TestWebinAuth(unittest.TestCase):
def setUp(self):
self.auth = WebinAuth()

def test_webin_auth(self):
# Mock the response for ENA authentication
mock_auth_response = MagicMock()
mock_auth_response.status_code = 200
mock_auth_response.text = "mock_webin_token"

# Call the submit_with_webin_auth method
with patch.object(WebinAuth, '_get_webin_username_password', return_value=("mock_username", "mock_password")), \
patch("cli.auth.requests.post", return_value=mock_auth_response) as mock_post:
token = self.auth.token()

# Check if the ENA_AUTH_URL was called with the correct parameters
mock_post.assert_any_call(
self.auth.ena_auth_url,
headers={"accept": "*/*", "Content-Type": "application/json"},
data=json.dumps({"authRealms": ["ENA"], "username": "mock_username", "password": "mock_password"}),
)
assert token == 'mock_webin_token'

class TestLSRIAuth(unittest.TestCase):

def setUp(self):
self.auth = LSRIAuth()

@patch("cli.auth.requests.post")
def test_auth_with_lsri_auth(self, mock_post):
# Mock the response for OAuth device flow initiation
mock_device_response = MagicMock()
mock_device_response.status_code = 200
mock_device_response.json.return_value = {"device_code": "device_code", "user_code": "user_code",
"verification_uri": "verification_uri", "expires_in": 600}

# Mock the response for post-authentication response from eva-submission-ws
mock_auth_response = MagicMock()
mock_auth_response.status_code = 200
mock_auth_response.text = "mock_lsri_token"

# Set the side_effect attribute to return different responses
mock_post.side_effect = [mock_device_response, mock_auth_response]
token = self.auth.token()

# Check if the device initiation flow was called with the correct parameters
device_authorization_url = "https://login.elixir-czech.org/oidc/devicecode"
mock_post.assert_any_call(device_authorization_url,
data={'client_id': LSRI_CLIENT_ID, 'scope': 'openid'})

# Check if the post-authentication call to eva-submission-ws was called with the correct parameters
mock_post.assert_any_call(
self.auth.auth_url, timeout=600 ,headers={'Accept': 'application/hal+json'},
params={'deviceCode': 'device_code', 'expiresIn': 600}
)
# Check the total number of calls to requests.post
assert mock_post.call_count == 2
assert token == 'mock_lsri_token'
77 changes: 15 additions & 62 deletions tests/test_submit.py
Original file line number Diff line number Diff line change
@@ -1,78 +1,31 @@
import json
import unittest
from unittest.mock import MagicMock, patch
from unittest.mock import MagicMock, patch, Mock

from cli import LSRI_CLIENT_ID
from cli.auth import WebinAuth, LSRIAuth
from cli.submit import StudySubmitter


class TestStudySubmitter(unittest.TestCase):
def setUp(self):
self.submitter = StudySubmitter()
class TestSubmit(unittest.TestCase):

@patch("cli.submit.requests.post")
def test_submit_with_webin_auth(self, mock_post):
# Mock the response for ENA authentication
mock_auth_response = MagicMock()
mock_auth_response.status_code = 200
mock_auth_response.text = "mock_webin_token"

# Mock the response for WEBIN_SUBMIT_ENDPOINT
mock_submit_response = MagicMock()
mock_submit_response.text = json.dumps({"submissionId": "mock_submission_id", "uploadUrl": "mock_upload_url"})

# Set the side_effect attribute to return different responses
mock_post.side_effect = [mock_auth_response, mock_submit_response]

# Call the submit_with_webin_auth method
self.submitter.submit_with_webin_auth("mock_username", "mock_password")

# Check if the ENA_AUTH_URL was called with the correct parameters
mock_post.assert_any_call(
self.submitter.ena_auth_url,
headers={"accept": "*/*", "Content-Type": "application/json"},
data=json.dumps({"authRealms": ["ENA"], "username": "mock_username", "password": "mock_password"}),
)

# Check if the WEBIN_SUBMIT_ENDPOINT was called with the correct parameters
mock_post.assert_any_call(
self.submitter.submission_initiate_webin_url, headers={'Accept': 'application/hal+json',
'Authorization': 'Bearer ' + 'mock_webin_token'}
)

# Check the total number of calls to requests.post
assert mock_post.call_count == 2

@patch("cli.submit.requests.post")
def test_submit_with_lsri_auth(self, mock_post):
# Mock the response for OAuth device flow initiation
# see get_auth_response() in LSRIAuth class
mock_auth_response = MagicMock()
mock_auth_response.status_code = 200
mock_auth_response.json.return_value = {"device_code": "device_code", "user_code": "user_code",
"verification_uri": "verification_uri", "expires_in": 600}
def setUp(self) -> None:
self.token = 'a token'
with patch('cli.submit.get_auth', return_value=Mock(token=Mock(return_value=self.token))):
self.submitter = StudySubmitter()

def test_submit(self):
# Mock the response for post-authentication response from eva-submission-ws
# see get_auth_response() in LSRIAuth class
mock_submit_response = MagicMock()
mock_submit_response.status_code = 200
mock_submit_response.text = json.dumps({"submissionId": "mock_submission_id", "uploadUrl": "mock_upload_url"})
mock_submit_response.text = json.dumps({"submissionId": "mock_submission_id",
'uploadUrl': 'directory to use for upload'})

# Set the side_effect attribute to return different responses
mock_post.side_effect = [mock_auth_response, mock_submit_response]
self.submitter.submit_with_lsri_auth()

# Check if the device initiation flow was called with the correct parameters
device_authorization_url = "https://login.elixir-czech.org/oidc/devicecode"
print(mock_post.mock_calls)
mock_post.assert_any_call(device_authorization_url,
data={'client_id': LSRI_CLIENT_ID, 'scope': 'openid'})

# Check if the post-authentication call to eva-submission-ws was called with the correct parameters
mock_post.assert_any_call(
self.submitter.submission_initiate_lsri_url, headers={'Accept': 'application/hal+json'},
params={'deviceCode': 'device_code', 'expiresIn': 600}
)
with patch('cli.submit.requests.post', return_value=mock_submit_response) as mock_post:
self.submitter.submit()
mock_post.assert_called_once_with('http://www.ebi.ac.uk/eva/v1/submission/initiate',
headers={'Accept': 'application/hal+json', 'Authorization': 'Bearer a token'})

# Check the total number of calls to requests.post
assert mock_post.call_count == 2
# TODO: Check that upload_submission was called with submission id
Loading