From 67ee3eefd37969e53c62db811b1789d461800d9e Mon Sep 17 00:00:00 2001 From: yuji38kwmt Date: Wed, 29 May 2024 09:51:21 +0900 Subject: [PATCH] =?UTF-8?q?MFA=E3=82=B3=E3=83=BC=E3=83=89=E3=82=92?= =?UTF-8?q?=E6=A8=99=E6=BA=96=E5=85=A5=E5=8A=9B=E3=81=8B=E3=82=89=E5=85=A5?= =?UTF-8?q?=E5=8A=9B=E3=81=A7=E3=81=8D=E3=82=8B=E3=82=AA=E3=83=97=E3=82=B7?= =?UTF-8?q?=E3=83=A7=E3=83=B3=E3=82=92=E7=94=A8=E6=84=8F=E3=81=97=E3=81=BE?= =?UTF-8?q?=E3=81=97=E3=81=9F=E3=80=82=20(#652)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * InvalidMfaCodeErrorに対応する * mask対象を増やす * 仮実装 * docstringを修正 * update resoruces.py * 引数の設計 * エラーメッセージ --- annofabapi/api.py | 80 +++++++++++++++++++++++++++++++----- annofabapi/exceptions.py | 9 ++++ annofabapi/resource.py | 34 ++++++++++----- tests/test_local_resource.py | 2 +- 4 files changed, 103 insertions(+), 22 deletions(-) diff --git a/annofabapi/api.py b/annofabapi/api.py index ceec9fec..43225083 100644 --- a/annofabapi/api.py +++ b/annofabapi/api.py @@ -11,7 +11,7 @@ from requests.auth import AuthBase from requests.cookies import RequestsCookieJar -from annofabapi.exceptions import MfaEnabledUserExecutionError, NotLoggedInError +from annofabapi.exceptions import InvalidMfaCodeError, MfaEnabledUserExecutionError, NotLoggedInError from annofabapi.generated_api import AbstractAnnofabApi logger = logging.getLogger(__name__) @@ -23,6 +23,14 @@ """HTTP Status Codeが429のときの、デフォルト(Retry-Afterヘッダがないとき)の待ち時間です。""" +def _read_mfa_code_from_stdin() -> str: + """標準入力からMFAコードを読み込みます。""" + inputted_mfa_code = "" + while inputted_mfa_code == "": + inputted_mfa_code = input("Enter Annofab MFA Code: ") + return inputted_mfa_code + + def _mask_senritive_value_for_dict(data: Dict[str, Any], keys: Collection[str]) -> Dict[str, Any]: """ dictに含まれているセンシティブな情報を"***"でマスクします。 @@ -148,7 +156,9 @@ def _create_request_body_for_logger(data: Any) -> Any: # noqa: ANN401 # bytes型のときは値を出力しても意味がないので、bytesであることが分かるようにする return "(bytes)" - return _mask_senritive_value_for_dict(data, keys={"password", "old_password", "new_password", "id_token", "refresh_token", "access_token"}) + return _mask_senritive_value_for_dict( + data, keys={"password", "old_password", "new_password", "id_token", "refresh_token", "access_token", "session"} + ) def _create_query_params_for_logger(params: Dict[str, Any]) -> Dict[str, Any]: @@ -227,19 +237,23 @@ class AnnofabApi(AbstractAnnofabApi): login_user_id: AnnofabにログインするときのユーザID login_password: Annofabにログインするときのパスワード endpoint_url: Annofab APIのエンドポイント。 + input_mfa_code_via_stdin: MFAコードを標準入力から入力するかどうか Attributes: token_dict: login, refresh_tokenで取得したtoken情報 cookies: Signed Cookie情報 """ - def __init__(self, login_user_id: str, login_password: str, endpoint_url: str = DEFAULT_ENDPOINT_URL) -> None: + def __init__( + self, login_user_id: str, login_password: str, *, endpoint_url: str = DEFAULT_ENDPOINT_URL, input_mfa_code_via_stdin: bool = False + ) -> None: if not login_user_id or not login_password: raise ValueError("login_user_id or login_password is empty.") self.login_user_id = login_user_id self.login_password = login_password self.endpoint_url = endpoint_url + self.input_mfa_code_via_stdin = input_mfa_code_via_stdin self.url_prefix = f"{endpoint_url}/api/v1" self.session = requests.Session() @@ -604,37 +618,81 @@ def _request_get_with_cookie(self, project_id: str, url: str) -> requests.Respon ######################################### # Public Method : Login ######################################### + def _login_respond_to_auth_challenge(self, mfa_code: str, session: str) -> Dict[str, Any]: + """ + MFAコードによるログインを実行します。 + + ``self.input_mfa_code_via_stdin`` が ``True`` AND ``mfa_code`` が正しくない場合は、標準入力から再度MFAコードの入力を求めます。 + + Args: + mfa_code: MFAコード + session: `login` APIのレスポンスに格納されている`session` + + Raises: + InvalidMfaCodeError: ``self.input_mfa_code_via_stdin`` が ``False`` AND ``mfa_code`` が正しくない場合 + """ + request_body = {"user_id": self.login_user_id, "mfa_code": mfa_code, "session": session} + url = f"{self.url_prefix}/login-respond-to-auth-challenge" + + response = self._execute_http_request("post", url, json=request_body, raise_for_status=False) + + json_obj = response.json() + # MFAコードが間違っているかどうかの判定が、メッセージでしかできなかったので、暫定的にメッセージで判定する + if response.status_code == requests.codes.bad_request: + assert len(json_obj["errors"]) > 0 + error_message = json_obj["errors"][0]["message"] + if error_message in {"検証コードが間違っています", "検証コードの期限が切れています"}: + # 分かりやすいメッセージにするため「検証コード」を「MFAコード」に置き換える + new_error_message = error_message.replace("検証コード", "MFAコード") + if self.input_mfa_code_via_stdin: + logger.info(new_error_message) + new_mfa_code = _read_mfa_code_from_stdin() + return self._login_respond_to_auth_challenge(new_mfa_code, session) + else: + raise InvalidMfaCodeError(new_error_message) + + _log_error_response(logger, response) + _raise_for_status(response) + return response.json() + def login(self, mfa_code: Optional[str] = None) -> None: """ ログインして、トークンをインスタンスに保持します。 MFAが有効化されている場合は、loginRespondToAuthChallenge APIを実行してトークンを取得します。 + ``self.input_mfa_code_via_stdin == True`` の場合は、標準入力からMFAコードの入力を求めます。 + + Args: mfa_code: ``loginRespondToAuthChallenge``のレスポンスから取得したMFAコード。この引数はexperimentalです。将来削除される可能性があります。 Returns: Tuple[Token, requests.Response] + Raises: + InvalidMfaCodeError: ``self.input_mfa_code_via_stdin`` が ``False`` AND ``mfa_code`` が正しくない場合 + MfaEnabledUserExecutionError: ``self.input_mfa_code_via_stdin`` が ``False`` AND ``mfa_code`` が未指定の場合 """ login_info = {"user_id": self.login_user_id, "password": self.login_password} url = f"{self.url_prefix}/login" login_response = self._execute_http_request("post", url, json=login_info) - json_obj = login_response.json() - if "token" not in json_obj: + login_json_obj = login_response.json() + if "token" not in login_json_obj: # `login` APIのレスポンスのスキーマがLoginNeedChallengeResponseのとき if mfa_code is None: - raise MfaEnabledUserExecutionError(self.login_user_id) + if self.input_mfa_code_via_stdin: + mfa_code = _read_mfa_code_from_stdin() + else: + raise MfaEnabledUserExecutionError(self.login_user_id) - mfa_param = {"user_id": self.login_user_id, "mfa_code": mfa_code, "session": json_obj["session"]} - mfa_url = f"{self.url_prefix}/login-respond-to-auth-challenge" - mfa_response = self._execute_http_request("post", mfa_url, json=mfa_param) - mfa_json_obj = mfa_response.json() + mfa_json_obj = self._login_respond_to_auth_challenge(mfa_code, login_json_obj["session"]) token_dict = mfa_json_obj["token"] else: # `login` APIのレスポンスのスキーマがloginRespondToAuthChallengeのとき - token_dict = json_obj["token"] + token_dict = login_json_obj["token"] + self.token_dict = token_dict logger.debug("Logged in successfully. user_id = %s", self.login_user_id) diff --git a/annofabapi/exceptions.py b/annofabapi/exceptions.py index b5a275fa..f167f901 100644 --- a/annofabapi/exceptions.py +++ b/annofabapi/exceptions.py @@ -82,3 +82,12 @@ class MfaEnabledUserExecutionError(Exception): def __init__(self, user_id: str) -> None: message = f"User (User ID: {user_id}) cannot use annofab-api-python-client because MFA is enabled." super().__init__(message) + + +class InvalidMfaCodeError(AnnofabApiException): + """ + MFAコードが間違っている場合のエラー + """ + + def __init__(self, message: str) -> None: + super().__init__(message) diff --git a/annofabapi/resource.py b/annofabapi/resource.py index fc7ef5f9..d7f7a726 100644 --- a/annofabapi/resource.py +++ b/annofabapi/resource.py @@ -19,12 +19,17 @@ class Resource: login_user_id: AnnofabにログインするときのユーザID login_password: Annofabにログインするときのパスワード endpoint_url: Annofab APIのエンドポイント。 + input_mfa_code_via_stdin: MFAコードを標準入力から入力するかどうか """ - def __init__(self, login_user_id: str, login_password: str, endpoint_url: str = DEFAULT_ENDPOINT_URL) -> None: + def __init__( + self, login_user_id: str, login_password: str, *, endpoint_url: str = DEFAULT_ENDPOINT_URL, input_mfa_code_via_stdin: bool = False + ) -> None: #: AnnofabApi Instance - self.api = AnnofabApi(login_user_id=login_user_id, login_password=login_password, endpoint_url=endpoint_url) + self.api = AnnofabApi( + login_user_id=login_user_id, login_password=login_password, endpoint_url=endpoint_url, input_mfa_code_via_stdin=input_mfa_code_via_stdin + ) #: Wrapper Instance self.wrapper = Wrapper(self.api) @@ -35,7 +40,13 @@ def __init__(self, login_user_id: str, login_password: str, endpoint_url: str = logger.debug("Create annofabapi resource instance :: %s", {"login_user_id": login_user_id, "endpoint_url": endpoint_url}) -def build(login_user_id: Optional[str] = None, login_password: Optional[str] = None, endpoint_url: str = DEFAULT_ENDPOINT_URL) -> Resource: +def build( + login_user_id: Optional[str] = None, + login_password: Optional[str] = None, + *, + endpoint_url: str = DEFAULT_ENDPOINT_URL, + input_mfa_code_via_stdin: bool = False, +) -> Resource: """ AnnofabApi, Wrapperのインスタンスを保持するインスタンスを生成する。 @@ -48,6 +59,7 @@ def build(login_user_id: Optional[str] = None, login_password: Optional[str] = N login_user_id: AnnofabにログインするときのユーザID login_password: Annofabにログインするときのパスワード endpoint_url: Annofab APIのエンドポイント。 + input_mfa_code_via_stdin: MFAコードを標準入力から入力するかどうか Returns: AnnofabApi, Wrapperのインスタンスを保持するインスタンス @@ -57,26 +69,27 @@ def build(login_user_id: Optional[str] = None, login_password: Optional[str] = N """ if login_user_id is not None and login_password is not None: - return Resource(login_user_id, login_password, endpoint_url=endpoint_url) + return Resource(login_user_id, login_password, endpoint_url=endpoint_url, input_mfa_code_via_stdin=input_mfa_code_via_stdin) elif login_user_id is None and login_password is None: try: - return build_from_env(endpoint_url) + return build_from_env(endpoint_url=endpoint_url, input_mfa_code_via_stdin=input_mfa_code_via_stdin) except CredentialsNotFoundError: try: - return build_from_netrc(endpoint_url) + return build_from_netrc(endpoint_url=endpoint_url, input_mfa_code_via_stdin=input_mfa_code_via_stdin) except CredentialsNotFoundError as e: raise CredentialsNotFoundError("環境変数または`.netrc`ファイルにAnnofab認証情報はありませんでした。") from e else: raise ValueError("引数`login_user_id`か`login_password`のどちらか一方がNoneです。両方Noneでないか、両方Noneである必要があります。") -def build_from_netrc(endpoint_url: str = DEFAULT_ENDPOINT_URL) -> Resource: +def build_from_netrc(*, endpoint_url: str = DEFAULT_ENDPOINT_URL, input_mfa_code_via_stdin: bool = False) -> Resource: """ ``.netrc`` ファイルから、annofabapi.Resourceインスタンスを生成する。 Args: endpoint_url: Annofab APIのエンドポイント。 + input_mfa_code_via_stdin: MFAコードを標準入力から入力するかどうか Returns: annofabapi.Resourceインスタンス @@ -101,15 +114,16 @@ def build_from_netrc(endpoint_url: str = DEFAULT_ENDPOINT_URL) -> Resource: if login_user_id is None or login_password is None: raise CredentialsNotFoundError("User ID or password in the .netrc file are None.") - return Resource(login_user_id, login_password, endpoint_url=endpoint_url) + return Resource(login_user_id, login_password, endpoint_url=endpoint_url, input_mfa_code_via_stdin=input_mfa_code_via_stdin) -def build_from_env(endpoint_url: str = DEFAULT_ENDPOINT_URL) -> Resource: +def build_from_env(*, endpoint_url: str = DEFAULT_ENDPOINT_URL, input_mfa_code_via_stdin: bool = False) -> Resource: """ 環境変数 ``ANNOFAB_USER_ID`` , ``ANNOFAB_PASSWORD`` から、annofabapi.Resourceインスタンスを生成する。 Args: endpoint_url: Annofab APIのエンドポイント。 + input_mfa_code_via_stdin: MFAコードを標準入力から入力するかどうか Returns: annofabapi.Resourceインスタンス @@ -122,4 +136,4 @@ def build_from_env(endpoint_url: str = DEFAULT_ENDPOINT_URL) -> Resource: if login_user_id is None or login_password is None: raise CredentialsNotFoundError("`ANNOFAB_USER_ID` or `ANNOFAB_PASSWORD` environment variable are empty.") - return Resource(login_user_id, login_password, endpoint_url=endpoint_url) + return Resource(login_user_id, login_password, endpoint_url=endpoint_url, input_mfa_code_via_stdin=input_mfa_code_via_stdin) diff --git a/tests/test_local_resource.py b/tests/test_local_resource.py index 4f9c7a46..4a36c1b8 100644 --- a/tests/test_local_resource.py +++ b/tests/test_local_resource.py @@ -49,7 +49,7 @@ def test_build(self): def test_build_with_endpoint(self): user_id = "test_user" password = "password" - resource = build(user_id, password, "https://localhost:8080") + resource = build(user_id, password, endpoint_url="https://localhost:8080") assert resource.api.url_prefix == "https://localhost:8080/api/v1" assert resource.api2.url_prefix == "https://localhost:8080/api/v2" assert resource.api.login_user_id == user_id