Skip to content

Commit

Permalink
PersonalAccessTokenに対応しました (#679)
Browse files Browse the repository at this point in the history
* api.pyをPATに対応して既存のテストが通るところまで

* 環境変数・引数にPATを指定してインスタンスを生成できるようにした

* AnnofabApiをいくつか修正

* Unauthorize時のlogin&retry処理を、IdPassが渡されていた時のみにした
* Patを渡された場合のlogin時、self.tokensにPatを転写するようにした
    * logoutの処理と対象になるように

* create_test_project.pyがendpointを引数に取れるようにした

* Patをcredentialsに利用する場合のtokensへの転写をコンストラクタではなくリクエストを実行する直前に行うようにした

* README.mdを更新

* 正しくないコメントを修正

* input_mfa_code_via_stdin のコメントを修正

* rename test_build.py to test_local_build.py

CIでのテスト対象とするため
  • Loading branch information
seraphr authored Sep 11, 2024
1 parent 20304c4 commit e118b8f
Show file tree
Hide file tree
Showing 8 changed files with 221 additions and 50 deletions.
23 changes: 22 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,19 @@ password = "YYYYYY"
service = build(user_id, password)
```

### PersonalAccessTokenをコンストラクタ引数に渡す場合

```python
# APIアクセス用のインスタンスを生成
from annofabapi import build


pat = "XXXXXX"

service = build(pat = pat)
```


### `.netrc`に認証情報を記載する場合
`.netrc`ファイルに、AnnofabのユーザIDとパスワードを記載します。

Expand All @@ -91,7 +104,13 @@ service = build_from_netrc()


### 環境変数に認証情報を設定する場合
環境変数`ANNOFAB_USER_ID``ANNOFAB_PASSWORD`にユーザIDとパスワードを設定します。


* IDとパスワードで認証する場合
* 環境変数`ANNOFAB_USER_ID``ANNOFAB_PASSWORD`にユーザIDとパスワードを設定します。
* パーソナルアクセストークンで認証する場合
* 環境変数`ANNOFAB_PAT`にトークンを設定します。
* `ANNOFAB_PAT`が設定されている場合、`ANNOFAB_USER_ID``ANNOFAB_PASSWORD`は無視されます。

```python
from annofabapi import build_from_env
Expand All @@ -109,6 +128,8 @@ service = build()

優先順位は以下の通りです。
1. 環境変数
1. `ANNOFAB_PAT`
2. `ANNOFAB_USER_ID`及び`ANNOFAB_PASSWORD`
2. `.netrc`


Expand Down
103 changes: 73 additions & 30 deletions annofabapi/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,17 @@
import time
from functools import wraps
from json import JSONDecodeError
from typing import Any, Callable, Collection, Dict, Optional, Tuple
from typing import Any, Callable, Collection, Dict, Optional, Tuple, Union

import backoff
import requests
from requests.auth import AuthBase
from requests.cookies import RequestsCookieJar

from annofabapi.credentials import IdPass, Pat, Tokens
from annofabapi.exceptions import InvalidMfaCodeError, MfaEnabledUserExecutionError, NotLoggedInError
from annofabapi.generated_api import AbstractAnnofabApi
from annofabapi.util.type_util import assert_noreturn

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -234,34 +236,34 @@ class AnnofabApi(AbstractAnnofabApi):
Web APIに対応したメソッドが存在するクラス。
Args:
login_user_id: AnnofabにログインするときのユーザID
login_password: Annofabにログインするときのパスワード
credentials: Annofabにログインするときの認証情報
endpoint_url: Annofab APIのエンドポイント。
input_mfa_code_via_stdin: MFAコードを標準入力から入力するかどうか
Falseを渡して且つMFAコードの入力を求められるアカウントを利用する場合、mfa_codeを引数にloginメソッドを直接呼び出さなければならず、そうしない場合は例外を送出する
Attributes:
token_dict: login, refresh_tokenで取得したtoken情報
tokens: login, refresh_tokenで取得したtoken情報
cookies: Signed Cookie情報
"""

def __init__(
self, login_user_id: str, login_password: str, *, endpoint_url: str = DEFAULT_ENDPOINT_URL, input_mfa_code_via_stdin: bool = False
) -> None:
if not login_user_id or not login_password:
def __init__(self, credentials: Union[IdPass, Pat], *, endpoint_url: str = DEFAULT_ENDPOINT_URL, input_mfa_code_via_stdin: bool = False) -> None:
if isinstance(credentials, IdPass) and (not credentials.user_id or not credentials.password):
raise ValueError("login_user_id or login_password is empty.")
if isinstance(credentials, Pat) and not credentials.token:
raise ValueError("pat is empty.")

self.login_user_id = login_user_id
self.login_password = login_password
self.credentials = credentials
self.endpoint_url = endpoint_url
self.input_mfa_code_via_stdin = input_mfa_code_via_stdin
self.url_prefix = f"{endpoint_url}/api/v1"
self.session = requests.Session()

self.token_dict: Optional[Dict[str, Any]] = None
self.tokens: Union[Tokens, Pat, None] = None

self.cookies: Optional[RequestsCookieJar] = None

self.__account_id: Optional[str] = None
self.__user_id: Optional[str] = None

class _MyToken(AuthBase):
"""
Expand Down Expand Up @@ -328,8 +330,9 @@ def _create_kwargs(
"params": new_params,
"headers": headers,
}
if self.token_dict is not None:
kwargs.update({"auth": self._MyToken(self.token_dict["id_token"])})
if self.tokens is not None:
token = self.tokens.auth_token
kwargs.update({"auth": self._MyToken(token)})

if request_body is not None:
if isinstance(request_body, (dict, list)):
Expand Down Expand Up @@ -495,6 +498,12 @@ def _request_wrapper(
else:
url = f"{self.url_prefix}{url_path}"

# patを使う場合は最初にtokensをセットする
# def logoutの呼び出しでtokensがNoneになった後にAPIを呼び出しても問題ないように(IdPassの場合も、自動loginしているので、その代わり)
# IdPassと同じ処理に合流させてしまうと、patが無効なときに無限ループしてしまうので、ここで1回だけ呼び出す
if self.tokens is None and isinstance(self.credentials, Pat):
self._login_pat(self.credentials)

kwargs = self._create_kwargs(query_params, header_params, request_body)
response = self.session.request(method=http_method.lower(), url=url, **kwargs)
# response.requestよりメソッド引数のrequest情報の方が分かりやすいので、メソッド引数のrequest情報を出力する。
Expand All @@ -512,8 +521,8 @@ def _request_wrapper(
},
)

# Unauthorized Errorならば、ログイン後に再度実行する
if response.status_code == requests.codes.unauthorized:
# ID/PASSが指定されており、Unauthorized Errorならば、ログイン後に再度実行する
if isinstance(self.credentials, IdPass) and response.status_code == requests.codes.unauthorized:
self.refresh_token()
return self._request_wrapper(
http_method,
Expand Down Expand Up @@ -616,7 +625,7 @@ def _request_get_with_cookie(self, project_id: str, url: str) -> requests.Respon
#########################################
# Public Method : Login
#########################################
def _login_respond_to_auth_challenge(self, mfa_code: str, session: str) -> Dict[str, Any]:
def _login_respond_to_auth_challenge(self, id_pass: IdPass, mfa_code: str, session: str) -> Dict[str, Any]:
"""
MFAコードによるログインを実行します。
Expand All @@ -629,7 +638,7 @@ def _login_respond_to_auth_challenge(self, mfa_code: str, session: str) -> Dict[
Raises:
InvalidMfaCodeError: ``self.input_mfa_code_via_stdin`` が ``False`` AND ``mfa_code`` が正しくない場合
"""
request_body = {"user_id": self.login_user_id, "mfa_code": mfa_code, "session": session}
request_body = {"user_id": id_pass.user_id, "mfa_code": mfa_code, "session": session}
url = f"{self.url_prefix}/login-respond-to-auth-challenge"

response = self._execute_http_request("post", url, json=request_body, raise_for_status=False)
Expand All @@ -645,7 +654,7 @@ def _login_respond_to_auth_challenge(self, mfa_code: str, session: str) -> Dict[
if self.input_mfa_code_via_stdin:
logger.info(new_error_message)
new_mfa_code = _read_mfa_code_from_stdin()
return self._login_respond_to_auth_challenge(new_mfa_code, session)
return self._login_respond_to_auth_challenge(id_pass, new_mfa_code, session)
else:
raise InvalidMfaCodeError(new_error_message)

Expand All @@ -671,7 +680,15 @@ def login(self, mfa_code: Optional[str] = None) -> None:
InvalidMfaCodeError: ``self.input_mfa_code_via_stdin`` が ``False`` AND ``mfa_code`` が正しくない場合
MfaEnabledUserExecutionError: ``self.input_mfa_code_via_stdin`` が ``False`` AND ``mfa_code`` が未指定の場合
"""
login_info = {"user_id": self.login_user_id, "password": self.login_password}
if isinstance(self.credentials, IdPass):
self._login_id_pass(self.credentials, mfa_code)
elif isinstance(self.credentials, Pat):
self._login_pat(self.credentials)
else:
assert_noreturn(self.credentials)

def _login_id_pass(self, id_pass: IdPass, mfa_code: Optional[str] = None) -> None:
login_info = {"user_id": id_pass.user_id, "password": id_pass.password}

url = f"{self.url_prefix}/login"

Expand All @@ -683,21 +700,24 @@ def login(self, mfa_code: Optional[str] = None) -> None:
if self.input_mfa_code_via_stdin:
mfa_code = _read_mfa_code_from_stdin()
else:
raise MfaEnabledUserExecutionError(self.login_user_id)
raise MfaEnabledUserExecutionError(id_pass.user_id)

mfa_json_obj = self._login_respond_to_auth_challenge(mfa_code, login_json_obj["session"])
mfa_json_obj = self._login_respond_to_auth_challenge(id_pass, mfa_code, login_json_obj["session"])
token_dict = mfa_json_obj["token"]
else:
# `login` APIのレスポンスのスキーマがloginRespondToAuthChallengeのとき
token_dict = login_json_obj["token"]

self.token_dict = token_dict
logger.debug("Logged in successfully. user_id = %s", self.login_user_id)
self.tokens = Tokens.from_dict(token_dict)
logger.debug("Logged in successfully. user_id = %s", id_pass.user_id)

def _login_pat(self, pat: Pat) -> None:
self.tokens = pat

def logout(self) -> None:
"""
ログアウトします。
ログアウト後は、インスタンス変数 ``token_dict`` をNoneにします。
ログアウト後は、インスタンス変数 ``tokens`` をNoneにします。
Expand All @@ -708,27 +728,33 @@ def logout(self) -> None:
NotLoggedInError: ログインしてない状態で関数を呼び出したときのエラー
"""

if self.token_dict is None:
if self.tokens is None:
raise NotLoggedInError
if isinstance(self.tokens, Pat):
self.tokens = None
return

request_body = self.token_dict
request_body = self.tokens.to_dict()
url = f"{self.url_prefix}/logout"
self._execute_http_request("POST", url, json=request_body)
self.token_dict = None
self.tokens = None

def refresh_token(self) -> None:
"""
トークンを再発行して、新しいトークン情報をインスタンスに保持します。
パーソナルアクセストークンでのアクセスをしている場合はrefreshを行いません。
ログインしていない場合やリフレッシュトークンの有効期限が切れている場合は、login APIを実行して新しいトークン情報をインスタンスに保持します。
"""

if self.token_dict is None:
if self.tokens is None:
# 一度もログインしていないときは、login APIを実行して、トークン情報をインスタンスに保持する(login関数内でインスタンスに保持している)
self.login()
return
if isinstance(self.tokens, Pat):
return

request_body = {"refresh_token": self.token_dict["refresh_token"]}
request_body = {"refresh_token": self.tokens.refresh_token}
url = f"{self.url_prefix}/refresh-token"
response = self._execute_http_request("POST", url, json=request_body)

Expand All @@ -737,11 +763,12 @@ def refresh_token(self) -> None:
self.login()
return

self.token_dict = response.json()
self.tokens = Tokens.from_dict(response.json())

#########################################
# Public Method : Other
#########################################

@property
def account_id(self) -> str:
"""
Expand All @@ -754,3 +781,19 @@ def account_id(self) -> str:
account_id = content["account_id"]
self.__account_id = account_id
return account_id

@property
def login_user_id(self) -> str:
"""
Annofabにログインするユーザのuser_id
"""
if self.__user_id is not None:
return self.__user_id
if isinstance(self.credentials, IdPass):
self.__user_id = self.credentials.user_id
return self.__user_id
else:
content, _ = self.get_my_account()
user_id = content["user_id"]
self.__user_id = user_id
return user_id
48 changes: 48 additions & 0 deletions annofabapi/credentials.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
from dataclasses import dataclass
from typing import Dict, Protocol


class HasAuthToken(Protocol):
@property
def auth_token(self) -> str: ...


@dataclass(frozen=True)
class IdPass:
user_id: str
password: str


@dataclass(frozen=True)
class Pat(HasAuthToken):
"""Personal Access Token"""

token: str

@property
def auth_token(self) -> str:
return f"Bearer {self.token}"


@dataclass(frozen=True)
class Tokens(HasAuthToken):
"""IdPassを元にログインしたあとに取得されるトークン情報"""

id_token: str
access_token: str
refresh_token: str

@property
def auth_token(self) -> str:
return self.id_token

def to_dict(self) -> Dict[str, str]:
return {
"id_token": self.id_token,
"access_token": self.access_token,
"refresh_token": self.refresh_token,
}

@staticmethod
def from_dict(d: Dict[str, str]) -> "Tokens":
return Tokens(id_token=d["id_token"], access_token=d["access_token"], refresh_token=d["refresh_token"])
Loading

0 comments on commit e118b8f

Please sign in to comment.