Skip to content

Commit

Permalink
MFAコードを標準入力から入力できるオプションを用意しました。 (#652)
Browse files Browse the repository at this point in the history
* InvalidMfaCodeErrorに対応する

* mask対象を増やす

* 仮実装

* docstringを修正

* update resoruces.py

* 引数の設計

* エラーメッセージ
  • Loading branch information
yuji38kwmt authored May 29, 2024
1 parent 93fc20e commit 67ee3ee
Show file tree
Hide file tree
Showing 4 changed files with 103 additions and 22 deletions.
80 changes: 69 additions & 11 deletions annofabapi/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from requests.auth import AuthBase
from requests.cookies import RequestsCookieJar

from annofabapi.exceptions import MfaEnabledUserExecutionError, NotLoggedInError
from annofabapi.exceptions import InvalidMfaCodeError, MfaEnabledUserExecutionError, NotLoggedInError
from annofabapi.generated_api import AbstractAnnofabApi

logger = logging.getLogger(__name__)
Expand All @@ -23,6 +23,14 @@
"""HTTP Status Codeが429のときの、デフォルト(Retry-Afterヘッダがないとき)の待ち時間です。"""


def _read_mfa_code_from_stdin() -> str:
"""標準入力からMFAコードを読み込みます。"""
inputted_mfa_code = ""
while inputted_mfa_code == "":
inputted_mfa_code = input("Enter Annofab MFA Code: ")
return inputted_mfa_code


def _mask_senritive_value_for_dict(data: Dict[str, Any], keys: Collection[str]) -> Dict[str, Any]:
"""
dictに含まれているセンシティブな情報を"***"でマスクします。
Expand Down Expand Up @@ -148,7 +156,9 @@ def _create_request_body_for_logger(data: Any) -> Any: # noqa: ANN401
# bytes型のときは値を出力しても意味がないので、bytesであることが分かるようにする
return "(bytes)"

return _mask_senritive_value_for_dict(data, keys={"password", "old_password", "new_password", "id_token", "refresh_token", "access_token"})
return _mask_senritive_value_for_dict(
data, keys={"password", "old_password", "new_password", "id_token", "refresh_token", "access_token", "session"}
)


def _create_query_params_for_logger(params: Dict[str, Any]) -> Dict[str, Any]:
Expand Down Expand Up @@ -227,19 +237,23 @@ class AnnofabApi(AbstractAnnofabApi):
login_user_id: AnnofabにログインするときのユーザID
login_password: Annofabにログインするときのパスワード
endpoint_url: Annofab APIのエンドポイント。
input_mfa_code_via_stdin: MFAコードを標準入力から入力するかどうか
Attributes:
token_dict: login, refresh_tokenで取得したtoken情報
cookies: Signed Cookie情報
"""

def __init__(self, login_user_id: str, login_password: str, endpoint_url: str = DEFAULT_ENDPOINT_URL) -> None:
def __init__(
self, login_user_id: str, login_password: str, *, endpoint_url: str = DEFAULT_ENDPOINT_URL, input_mfa_code_via_stdin: bool = False
) -> None:
if not login_user_id or not login_password:
raise ValueError("login_user_id or login_password is empty.")

self.login_user_id = login_user_id
self.login_password = login_password
self.endpoint_url = endpoint_url
self.input_mfa_code_via_stdin = input_mfa_code_via_stdin
self.url_prefix = f"{endpoint_url}/api/v1"
self.session = requests.Session()

Expand Down Expand Up @@ -604,37 +618,81 @@ def _request_get_with_cookie(self, project_id: str, url: str) -> requests.Respon
#########################################
# Public Method : Login
#########################################
def _login_respond_to_auth_challenge(self, mfa_code: str, session: str) -> Dict[str, Any]:
"""
MFAコードによるログインを実行します。
``self.input_mfa_code_via_stdin`` が ``True`` AND ``mfa_code`` が正しくない場合は、標準入力から再度MFAコードの入力を求めます。
Args:
mfa_code: MFAコード
session: `login` APIのレスポンスに格納されている`session`
Raises:
InvalidMfaCodeError: ``self.input_mfa_code_via_stdin`` が ``False`` AND ``mfa_code`` が正しくない場合
"""
request_body = {"user_id": self.login_user_id, "mfa_code": mfa_code, "session": session}
url = f"{self.url_prefix}/login-respond-to-auth-challenge"

response = self._execute_http_request("post", url, json=request_body, raise_for_status=False)

json_obj = response.json()
# MFAコードが間違っているかどうかの判定が、メッセージでしかできなかったので、暫定的にメッセージで判定する
if response.status_code == requests.codes.bad_request:
assert len(json_obj["errors"]) > 0
error_message = json_obj["errors"][0]["message"]
if error_message in {"検証コードが間違っています", "検証コードの期限が切れています"}:
# 分かりやすいメッセージにするため「検証コード」を「MFAコード」に置き換える
new_error_message = error_message.replace("検証コード", "MFAコード")
if self.input_mfa_code_via_stdin:
logger.info(new_error_message)
new_mfa_code = _read_mfa_code_from_stdin()
return self._login_respond_to_auth_challenge(new_mfa_code, session)
else:
raise InvalidMfaCodeError(new_error_message)

_log_error_response(logger, response)
_raise_for_status(response)
return response.json()

def login(self, mfa_code: Optional[str] = None) -> None:
"""
ログインして、トークンをインスタンスに保持します。
MFAが有効化されている場合は、loginRespondToAuthChallenge APIを実行してトークンを取得します。
``self.input_mfa_code_via_stdin == True`` の場合は、標準入力からMFAコードの入力を求めます。
Args:
mfa_code: ``loginRespondToAuthChallenge``のレスポンスから取得したMFAコード。この引数はexperimentalです。将来削除される可能性があります。
Returns:
Tuple[Token, requests.Response]
Raises:
InvalidMfaCodeError: ``self.input_mfa_code_via_stdin`` が ``False`` AND ``mfa_code`` が正しくない場合
MfaEnabledUserExecutionError: ``self.input_mfa_code_via_stdin`` が ``False`` AND ``mfa_code`` が未指定の場合
"""
login_info = {"user_id": self.login_user_id, "password": self.login_password}

url = f"{self.url_prefix}/login"

login_response = self._execute_http_request("post", url, json=login_info)
json_obj = login_response.json()
if "token" not in json_obj:
login_json_obj = login_response.json()
if "token" not in login_json_obj:
# `login` APIのレスポンスのスキーマがLoginNeedChallengeResponseのとき
if mfa_code is None:
raise MfaEnabledUserExecutionError(self.login_user_id)
if self.input_mfa_code_via_stdin:
mfa_code = _read_mfa_code_from_stdin()
else:
raise MfaEnabledUserExecutionError(self.login_user_id)

mfa_param = {"user_id": self.login_user_id, "mfa_code": mfa_code, "session": json_obj["session"]}
mfa_url = f"{self.url_prefix}/login-respond-to-auth-challenge"
mfa_response = self._execute_http_request("post", mfa_url, json=mfa_param)
mfa_json_obj = mfa_response.json()
mfa_json_obj = self._login_respond_to_auth_challenge(mfa_code, login_json_obj["session"])
token_dict = mfa_json_obj["token"]
else:
# `login` APIのレスポンスのスキーマがloginRespondToAuthChallengeのとき
token_dict = json_obj["token"]
token_dict = login_json_obj["token"]

self.token_dict = token_dict
logger.debug("Logged in successfully. user_id = %s", self.login_user_id)

Expand Down
9 changes: 9 additions & 0 deletions annofabapi/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,3 +82,12 @@ class MfaEnabledUserExecutionError(Exception):
def __init__(self, user_id: str) -> None:
message = f"User (User ID: {user_id}) cannot use annofab-api-python-client because MFA is enabled."
super().__init__(message)


class InvalidMfaCodeError(AnnofabApiException):
"""
MFAコードが間違っている場合のエラー
"""

def __init__(self, message: str) -> None:
super().__init__(message)
34 changes: 24 additions & 10 deletions annofabapi/resource.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,17 @@ class Resource:
login_user_id: AnnofabにログインするときのユーザID
login_password: Annofabにログインするときのパスワード
endpoint_url: Annofab APIのエンドポイント。
input_mfa_code_via_stdin: MFAコードを標準入力から入力するかどうか
"""

def __init__(self, login_user_id: str, login_password: str, endpoint_url: str = DEFAULT_ENDPOINT_URL) -> None:
def __init__(
self, login_user_id: str, login_password: str, *, endpoint_url: str = DEFAULT_ENDPOINT_URL, input_mfa_code_via_stdin: bool = False
) -> None:
#: AnnofabApi Instance
self.api = AnnofabApi(login_user_id=login_user_id, login_password=login_password, endpoint_url=endpoint_url)
self.api = AnnofabApi(
login_user_id=login_user_id, login_password=login_password, endpoint_url=endpoint_url, input_mfa_code_via_stdin=input_mfa_code_via_stdin
)

#: Wrapper Instance
self.wrapper = Wrapper(self.api)
Expand All @@ -35,7 +40,13 @@ def __init__(self, login_user_id: str, login_password: str, endpoint_url: str =
logger.debug("Create annofabapi resource instance :: %s", {"login_user_id": login_user_id, "endpoint_url": endpoint_url})


def build(login_user_id: Optional[str] = None, login_password: Optional[str] = None, endpoint_url: str = DEFAULT_ENDPOINT_URL) -> Resource:
def build(
login_user_id: Optional[str] = None,
login_password: Optional[str] = None,
*,
endpoint_url: str = DEFAULT_ENDPOINT_URL,
input_mfa_code_via_stdin: bool = False,
) -> Resource:
"""
AnnofabApi, Wrapperのインスタンスを保持するインスタンスを生成する。
Expand All @@ -48,6 +59,7 @@ def build(login_user_id: Optional[str] = None, login_password: Optional[str] = N
login_user_id: AnnofabにログインするときのユーザID
login_password: Annofabにログインするときのパスワード
endpoint_url: Annofab APIのエンドポイント。
input_mfa_code_via_stdin: MFAコードを標準入力から入力するかどうか
Returns:
AnnofabApi, Wrapperのインスタンスを保持するインスタンス
Expand All @@ -57,26 +69,27 @@ def build(login_user_id: Optional[str] = None, login_password: Optional[str] = N
"""
if login_user_id is not None and login_password is not None:
return Resource(login_user_id, login_password, endpoint_url=endpoint_url)
return Resource(login_user_id, login_password, endpoint_url=endpoint_url, input_mfa_code_via_stdin=input_mfa_code_via_stdin)

elif login_user_id is None and login_password is None:
try:
return build_from_env(endpoint_url)
return build_from_env(endpoint_url=endpoint_url, input_mfa_code_via_stdin=input_mfa_code_via_stdin)
except CredentialsNotFoundError:
try:
return build_from_netrc(endpoint_url)
return build_from_netrc(endpoint_url=endpoint_url, input_mfa_code_via_stdin=input_mfa_code_via_stdin)
except CredentialsNotFoundError as e:
raise CredentialsNotFoundError("環境変数または`.netrc`ファイルにAnnofab認証情報はありませんでした。") from e
else:
raise ValueError("引数`login_user_id`か`login_password`のどちらか一方がNoneです。両方Noneでないか、両方Noneである必要があります。")


def build_from_netrc(endpoint_url: str = DEFAULT_ENDPOINT_URL) -> Resource:
def build_from_netrc(*, endpoint_url: str = DEFAULT_ENDPOINT_URL, input_mfa_code_via_stdin: bool = False) -> Resource:
"""
``.netrc`` ファイルから、annofabapi.Resourceインスタンスを生成する。
Args:
endpoint_url: Annofab APIのエンドポイント。
input_mfa_code_via_stdin: MFAコードを標準入力から入力するかどうか
Returns:
annofabapi.Resourceインスタンス
Expand All @@ -101,15 +114,16 @@ def build_from_netrc(endpoint_url: str = DEFAULT_ENDPOINT_URL) -> Resource:
if login_user_id is None or login_password is None:
raise CredentialsNotFoundError("User ID or password in the .netrc file are None.")

return Resource(login_user_id, login_password, endpoint_url=endpoint_url)
return Resource(login_user_id, login_password, endpoint_url=endpoint_url, input_mfa_code_via_stdin=input_mfa_code_via_stdin)


def build_from_env(endpoint_url: str = DEFAULT_ENDPOINT_URL) -> Resource:
def build_from_env(*, endpoint_url: str = DEFAULT_ENDPOINT_URL, input_mfa_code_via_stdin: bool = False) -> Resource:
"""
環境変数 ``ANNOFAB_USER_ID`` , ``ANNOFAB_PASSWORD`` から、annofabapi.Resourceインスタンスを生成する。
Args:
endpoint_url: Annofab APIのエンドポイント。
input_mfa_code_via_stdin: MFAコードを標準入力から入力するかどうか
Returns:
annofabapi.Resourceインスタンス
Expand All @@ -122,4 +136,4 @@ def build_from_env(endpoint_url: str = DEFAULT_ENDPOINT_URL) -> Resource:
if login_user_id is None or login_password is None:
raise CredentialsNotFoundError("`ANNOFAB_USER_ID` or `ANNOFAB_PASSWORD` environment variable are empty.")

return Resource(login_user_id, login_password, endpoint_url=endpoint_url)
return Resource(login_user_id, login_password, endpoint_url=endpoint_url, input_mfa_code_via_stdin=input_mfa_code_via_stdin)
2 changes: 1 addition & 1 deletion tests/test_local_resource.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ def test_build(self):
def test_build_with_endpoint(self):
user_id = "test_user"
password = "password"
resource = build(user_id, password, "https://localhost:8080")
resource = build(user_id, password, endpoint_url="https://localhost:8080")
assert resource.api.url_prefix == "https://localhost:8080/api/v1"
assert resource.api2.url_prefix == "https://localhost:8080/api/v2"
assert resource.api.login_user_id == user_id
Expand Down

0 comments on commit 67ee3ee

Please sign in to comment.