diff --git a/pytest_tests/testsuites/session_token/test_object_session_token.py b/pytest_tests/testsuites/session_token/test_object_session_token.py index 9996fa15c..9034ec858 100644 --- a/pytest_tests/testsuites/session_token/test_object_session_token.py +++ b/pytest_tests/testsuites/session_token/test_object_session_token.py @@ -4,8 +4,9 @@ import pytest from cluster_test_base import ClusterTestBase from common import WALLET_PASS +from epoch import get_epoch from file_helper import generate_file -from grpc_responses import SESSION_NOT_FOUND +from grpc_responses import EXPIRED_SESSION_TOKEN, SESSION_NOT_FOUND from neofs_testlib.utils.wallet import get_last_address_from_wallet from python_keywords.container import create_container from python_keywords.neofs_verbs import delete_object, put_object, put_object_to_random_node @@ -144,3 +145,73 @@ def test_object_session_token(self, default_wallet, object_size): endpoint=non_container_node.get_rpc_endpoint(), session=session_token, ) + + @allure.title("Verify session token expiration flags") + @pytest.mark.skip(reason="https://github.com/nspcc-dev/neofs-node/issues/2539") + @pytest.mark.nspcc_dev__neofs_node__issue_2539 + @pytest.mark.parametrize("expiration_flag", ["lifetime", "expire_at"]) + def test_session_token_expiration_flags( + self, default_wallet, simple_object_size, expiration_flag, cluster + ): + rpc_endpoint = self.cluster.storage_nodes[0].get_rpc_endpoint() + + with allure.step("Create Session Token with Lifetime param"): + current_epoch = get_epoch(self.shell, cluster) + + session_token = create_session_token( + shell=self.shell, + owner=get_last_address_from_wallet(default_wallet, ""), + wallet_path=default_wallet, + wallet_password=WALLET_PASS, + rpc_endpoint=rpc_endpoint, + lifetime=1 if expiration_flag == "lifetime" else None, + expire_at=current_epoch + 1 if expiration_flag == "expire_at" else None, + ) + + with allure.step("Create Private Container"): + un_locode = self.cluster.storage_nodes[0].get_un_locode() + locode = "SPB" if un_locode == "RU LED" else un_locode.split()[1] + placement_policy = ( + f"REP 1 IN LOC_{locode}_PLACE CBF 1 SELECT 1 FROM LOC_{locode} " + f'AS LOC_{locode}_PLACE FILTER "UN-LOCODE" ' + f'EQ "{un_locode}" AS LOC_{locode}' + ) + cid = create_container( + default_wallet, + shell=self.shell, + endpoint=self.cluster.default_rpc_endpoint, + rule=placement_policy, + ) + + with allure.step("Verify object operations with created session token are allowed"): + file_path = generate_file(simple_object_size) + oid = put_object( + wallet=default_wallet, + path=file_path, + cid=cid, + shell=self.shell, + endpoint=rpc_endpoint, + session=session_token, + ) + delete_object( + wallet=default_wallet, + cid=cid, + oid=oid, + shell=self.shell, + endpoint=rpc_endpoint, + session=session_token, + ) + + self.tick_epochs(2) + + with allure.step("Verify object operations with created session token are not allowed"): + file_path = generate_file(simple_object_size) + with pytest.raises(RuntimeError, match=EXPIRED_SESSION_TOKEN): + oid = put_object( + wallet=default_wallet, + path=file_path, + cid=cid, + shell=self.shell, + endpoint=rpc_endpoint, + session=session_token, + )