diff --git a/README.md b/README.md index 0860827..6c01772 100644 --- a/README.md +++ b/README.md @@ -63,7 +63,7 @@ reporting: ms-teams-channels: automation, qa-team emails: example@example.com run: - treat_skips_as_failures: false + treat-skips-as-failures: false display-name: Nightly Regression Suite build: 1.12.1.96-SNAPSHOT environment: TEST-1 @@ -85,7 +85,7 @@ reporting: would be marked as passed. If the required configurations are not provided, there is a warning displayed in logs with the problem description and the names of options -which need to be specified. Parameter names are case insensitive and can be written in upper and lower registers. +which need to be specified. Parameter names are case-insensitive and can be written in upper and lower registers. @@ -94,11 +94,381 @@ which need to be specified. Parameter names are case insensitive and can be writ There is two options for activation of Zebrunner listener. Command line argument: -```bash -robot --listener robotframework_zebrunner.ZebrunnerListener ... ``` - +robot --listener robotframework_zebrunner.ZebrunnerListener +``` Import Zebrunner library into your project: ``` Library robotframework_zebrunner.ZebrunnerLib ``` + + +## Collecting captured screenshot +Sometimes it may be useful to have the ability to track captured screenshots in scope of Zebrunner Reporting. The agent comes +with the API allowing you to send your screenshots to Zebrunner, so that they could be attached to the test. + +``` +*** Settings *** +Library robotframework_zebrunner.ZebrunnerLib + +With screenshot + ... + Create File ${OUTPUTDIR}/selenium-screenshot-1.png + Capture Page Screenshot ${OUTPUTDIR}/selenium-screenshot-1.png + Attach Test Screenshot ${OUTPUTDIR}/selenium-screenshot-1.png + ... +``` + +## Additional reporting capabilities + +### Tracking test maintainer + +You may want to add transparency to the process of automation maintenance by having an engineer responsible for +evolution of specific tests or test classes. Zebrunner comes with a concept of a maintainer - a person that can be +assigned to maintain tests. In order to keep track of those, the agent comes with the `maintainer` tag. + +See a sample test bellow: + +``` +*** Settings *** +Library robotframework_zebrunner.ZebrunnerLib + +With maintainer + [Tags] maintainer= + Should Be True 5 + 5 == 10 +``` + +### Attaching labels +In some cases, it may be useful to attach some meta information related to a test. The agent comes with a concept of a label. +Label is a key-value pair associated with a test. The key and value are represented by a `str`. Labels can be attached to +tests and test runs. + +There is a tag that can be used to attach labels to a test. There is also an API to attach labels during test or execution. +The agent has functions that can be used to attach labels. +``` +*** Settings *** +Library robotframework_zebrunner.ZebrunnerLib + +With labels + [Tags] labels: some_label=123, other_label=234 + + Attach Test Label some_test_label value + Attach Test Run Label some_test_run_label value + Should Be True 5 + 5 == 10 +``` +The test from the sample above attaches 3 test-level labels. + +### Reverting test registration +In some cases it might be handy not to register test execution in Zebrunner. This may be caused by very special circumstances of test environment or execution conditions. + + +Zebrunner agent comes with a convenient method revert_registration() from CurrentTest class for reverting test registration at runtime. The following code snippet shows a case where test is not reported on Monday. +``` +*** Settings *** +Library robotframework_zebrunner.ZebrunnerLib + +Revert test + Current Test Revert Registration + Should Be True 5 + 5 == 10 +``` + +### Overriding run attributes +This section contains information on agent APIs allowing to manipulate test run attributes during runtime. + +#### Setting build at runtime +All the configuration mechanisms listed above provide possibility to declaratively set test run build. But there might be cases when actual build becomes available only at runtime. + +For such cases Zebrunner agent has a special method that can be used at any moment of the suite execution: +``` +*** Settings *** +Library robotframework_zebrunner.ZebrunnerLib +Suite Setup Current Test Run Set Build 0.0.1 +``` + +#### Setting locale +If you want to get full reporting experience and collect as much information in Zebrunner as its possible, you may want to report the test run locale. + +For this, Zebrunner agent has a special method that can be used at any moment of the suite execution: +``` +*** Settings *** +Library robotframework_zebrunner.ZebrunnerLib +Suite Setup Current Test Run Set Locale EN +``` + +#### Overriding platform +A test run in Zebrunner may have platform associated with the run. If there is at least one initiated `Remote` driver session within the test run, then its platform will be displayed as a platform of the whole test run. Even if subsequent `Remote` drivers are initiated on another platform, the very first one will be displayed as the run platform. + +In some cases you may want to override the platform of the first `Remote` driver session. Another problem is that it is not possible to specify `API` as a platform. + +Zebrunner provides two special methods to solve both of these problems: `Current Test Run Set Platform` and `Current Test Run Set Platform Version`. +In the example below, the hook sets the API as a platform associated with the current test run. + +``` +*** Settings *** +Library robotframework_zebrunner.ZebrunnerLib +Suite Setup Current Test Run Set Platform API +``` + +### Collecting additional artifacts +In case your tests or an entire test run produce some artifacts, it may be useful to track them in Zebrunner. +The agent comes with a few convenient methods for uploading artifacts in Zebrunner and linking them to the currently running test or test run. +Artifacts and artifact references can be attached using functions. Together with an artifact +or artifact reference, you must provide the display name. For the file, this name must contain the file extension that +reflects the actual content of the file. If the file extension does not match the file content, this file will not be +saved in Zebrunner. Artifact reference can have an arbitrary name. + +#### Attaching artifact to test +``` +*** Settings *** +Library robotframework_zebrunner.ZebrunnerLib + +With artifact + Attach Test Artifact requirements.txt + ... +``` + +#### Attaching artifact reference to test + +``` +*** Settings *** +Library robotframework_zebrunner.ZebrunnerLib + +With artifact reference + Attach Test Artifact Reference google https://google.com + ... +``` + + +#### Attaching artifact to test run + +``` +*** Settings *** +Library robotframework_zebrunner.ZebrunnerLib + +Attach Test Run Artifact requirements.txt +``` + +#### Attaching artifact reference to test run + +``` +*** Settings *** +Library robotframework_zebrunner.ZebrunnerLib + +Attach Test Run Artifact Reference google https://google.com +``` + +Artifact uploading process is performed in the foreground now, so it will block the execution thread while sending. +The background uploading will be available in upcoming releases. + +## Syncing test executions with external TCM systems + +Zebrunner provides an ability to upload test results to external test case management systems (TCMs) on test run finish. For some TCMs it is possible to upload results in real-time during the test run execution. + +This functionality is currently supported for TestRail, Xray, Zephyr Squad and Zephyr Scale. + +### TestRail + +For successful upload of test run results in TestRail, two steps must be performed: + +1. Integration with TestRail is configured and enabled for Zebrunner project +2. Configuration is performed on the tests side + + +#### Configuration + +Zebrunner agent has a special `Test Rail` keywords: + +`Test Rail Set Suite Id ` +: Mandatory. The method sets TestRail suite id for current test run. This method must be invoked before all tests. + +`Test Rail Set Case Id ` or `test_rail_case_id=` tag +: Mandatory. Using these mechanisms you can set TestRail's case associated with specific automated test. It is highly recommended using the `test_rail_case_id` tag instead of keyword. Use the keyword only for special cases + +`Test Rail Disable Sync` +: Optional. Disables result upload. Same as `Test Rail Set Suite Id`, this keyword must be invoked before all tests + +`Test Rail Include All Test Cases In New Run` +: Optional. Includes all cases from suite into newly created run in TestRail. Same as `#set_suite_id(str)`, this method must be invoked before all tests + +`Test Rail Enable Real Time Sync` +: Optional. Enables real-time results upload. In this mode, result of test execution will be uploaded immediately after test finish. This method also automatically invokes `Test Rail Include All Test Cases In New Run`. Same as `Test Rail Set Suite Id`, this keyword must be invoked before all tests + +`Test Rail Set Run Id ` +: Optional. Adds result into existing TestRail run. If not provided, test run is treated as new. Same as `Test Rail Set Suite Id`, this keyword must be invoked before all tests + +`Test Rail Set Run Name ` +: Optional. Sets custom name for new TestRail run. By default, Zebrunner test run name is used. Same as `Test Rail Set Suite Id`, this keyword must be invoked before all tests + +`Test Rail Set Milestone ` +: Optional. Adds result in TestRail milestone with the given name. Same as `Test Rail Set Suite Id`, this keyword must be invoked before all tests + +`Test Rail Set Assignee ` +: Optional. Sets TestRail run assignee. Same as `Test Rail Set Suite Id`, this keyword must be invoked before all tests + +By default, a new run containing only cases assigned to the tests will be created in TestRail on test run finish. + +#### Example + +In the example below, a new run with name "Best run ever" will be created in TestRail on test run finish. Suite id is `321` and assignee is "Deve Loper". Results of the `awesome_test1` will be uploaded as result of cases with id `10000`, `10001`, `10002`. Results of the `awesome_test2` will be uploaded as result of case with id `20000`. +``` +*** Settings *** +Library robotframework_zebrunner.ZebrunnerLib +Suite Setup Run Keywords Test Rail Set Suite Id 1 +... AND Test Rail Set Run Name Best run ever +... AND Test Rail Set Assignee Deve Loper + +With Case Id + [Tags] test_rail_case_id=1 + ... + +With Another Case id + Test Rail Set Case Id 2 +``` + +### Xray + +For successful upload of test run results in Xray two steps must be performed: + +1. Xray integration is configured and enabled in Zebrunner project +2. Xray configuration is performed on the tests side + +#### Configuration + +Zebrunner agent has a special `Xray` keywords to control results upload: + +`Xray Set Execution Key ` +: Mandatory. The method sets Xray execution key. This method must be invoked before all tests. + +`Xray Set Test Key ` or `xray_test_key=` +: Mandatory. Using these mechanisms you can set test keys associated with specific automated test. It is highly recommended using the `xray_test_key` tag instead of keyword. Use the keyword only for special cases + +`Xray Disable Sync` +: Optional. Disables result upload. Same as `Xray Set Execution Key`, this method must be invoked before all tests + +`Xray Enable Real Time Sync` +: Optional. Enables real-time results upload. In this mode, result of test execution will be uploaded immediately after test finish. Same as `Xray Set Execution Key`, this method must be invoked before all tests + +By default, results will be uploaded to Xray on test run finish. + +#### Example + +In the example below, results will be uploaded to execution with key `ZBR-42`. Results of the `awesome_test1` will be uploaded as result of tests with key `ZBR-10000`, `ZBR-10001`, `ZBR-10002`. Results of the `awesome_test2` will be uploaded as result of test with key `ZBR-20000`. + +``` +*** Settings *** +Library robotframework_zebrunner.ZebrunnerLib +Suite Setup Xray Set Execution Key ZBR-42 + + +With test key + [Tags] xray_test_key=ZBR-10000 xray_test_key=ZBR-10001 xray_test_key=ZBR-10002 + ... + +With Another Test Key + Xray Set Test Key ZBR-20000 + +``` + + +### Zephyr + +For successful upload of test run results in Zephyr two steps must be performed: + +1. Zephyr integration is configured and enabled in Zebrunner project +2. Zephyr configuration is performed on the tests side + +Described steps work for both Zephyr Squad and Zephyr Scale. + +#### Configuration + +Zebrunner agent has a special `Zephyr` keywords to control results upload: + +`Zephyr Set Test Cycle Key ` +: Mandatory. The method sets Zephyr test cycle key. This method must be invoked before all tests. + +`Zephyr Set Jira Project Key ` +: Mandatory. Sets Zephyr Jira project key. Same as `Zephyr Set Test Cycle Key`, this method must be invoked before all tests + +`Zephyr Set Test Case Key ` or `zephyr_test_case_key=` +: Mandatory. Using these mechanisms you can set test case keys associated with specific automated test. It is highly recommended using the `zephyr_test_case_key=1` tag instead of keyword. Use the keyword only for special cases + +`Zephyr Disable Sync` +: Optional. Disables result upload. Same as `Zephyr Set Test Cycle Key`, this method must be invoked before all tests + +`Zephyr Enable Real Time Sync` +: Optional. Enables real-time results upload. In this mode, result of test execution will be uploaded immediately after test finish. Same as `Zephyr Set Test Cycle Key`, this method must be invoked before all tests + +By default, results will be uploaded to Zephyr on test run finish. + +#### Example + +In the example below, results will be uploaded to test cycle with key `ZBR-R42` from project with key `ZBR`. Results of the `awesome_test1` will be uploaded as result of tests with key `ZBR-T10000`, `ZBR-T10001`, `ZBR-T10002`. Results of the `awesome_test2` will be uploaded as result of test with key `ZBR-T20000`. + +``` +*** Settings *** +Library robotframework_zebrunner.ZebrunnerLib +Suite Setup Run Keywords Zephyr Set Test Cycle Key ZBR-R42 +... AND Zephyr Set Jira Project Key ZBR + + +With test key + [Tags] zephyr_test_case_key=ZBR-T10000 zephyr_test_case_key=ZBR-T10001 zephyr_test_case_key=ZBR-T10002 + ... + +With Another Test Key + Zephyr Set Test Case Key ZBR-T20000 + + +#conftest.py +from pytest_zebrunner.tcm import Zephyr + +@pytest.hookimpl(trylast=True) +def pytest_sessionstart(session: Session) -> None: + Zephyr.set_test_cycle_key("ZBR-R42"); + Zephyr.set_jira_project_key("ZBR"); + +``` + +## Selenium WebDriver support + +The Zebrunner test agent is capable of tracking tests along with remote Selenium WebDriver sessions. + +### Capturing session artifacts + +Zebrunner supports 3 types of test session artifacts: + +- Video recording +- Session log +- VNC streaming + +Test agent itself does not capture those artifacts since it has no control over underlying Selenium Grid implementation, however, it is possible to attach appropriate artifact references by providing specially designed set of driver session capabilities (**enabling capabilities**) - see the table below for more details. Only the `True` value is considered as trigger to save the link. + +| Artifact | Display name | Enabling capability | Default reference | Reference overriding capability | +|-----------------|--------------|---------------------|----------------------------------------------------|---------------------------------| +| Video recording | Video | enableVideo | `artifacts/test-sessions//video.mp4` | videoLink | +| Session log | Log | enableLog | `artifacts/test-sessions//session.log` | logLink | +| VNC streaming | | enableVNC | `/ws/vnc/` | vncLink | + +The **display name** is the name of the artifact that will be displayed on Zebrunner UI. This value is predefined and unfortunately can not be changed at the moment. + +The **default reference** is a reference to a location, where artifact is **expected to reside** in S3-compatible storage once created by test environment - it is important that it stays in sync with test environment configuration. It is possible to override references if needed by providing **reference overriding capabilities**. Note, that special `` placeholder is supported and can be used as part of the value of those capabilities allowing runtime session id (generated by WebDriver) to be included into actual reference value. + +#### VNC streaming + +VNC is an artifact of a special type. They don't have a name and are not displayed among other artifacts. They are displayed in the video section on Zebrunner UI during session execution and are dropped off on session close. + +Default reference to the VNC streaming is based on `provider` capability. Value of this capability will be converted to preconfigured integration from **Test Environment Provider** group. The resolved integration must have a filled in URL property and be enabled in order to save the link to VNC streaming. The `` placeholder of the default link will be replaced by the host of the obtained integration URL. Also, the `http` protocol in the VNC streaming url will be automatically replaced by `ws`, and `https` protocol will be replaced by `wss`. Currently, we only support Selenium, Zebrunner and MCloud integrations. + + +## Contribution + +To check out the project and build from the source, do the following: +``` +git clone git://github.com/zebrunner/python-agent-pytest.git +cd python-agent-pytest +``` + +## License + +Zebrunner reporting agent for PyTest is released under version 2.0 of the [Apache License](https://www.apache.org/licenses/LICENSE-2.0). + diff --git a/examples/example.robot b/examples/example.robot index d2a7307..33db0c8 100644 --- a/examples/example.robot +++ b/examples/example.robot @@ -21,12 +21,12 @@ Super long long long long long long long long long long long long long long long Open Google Chrome [Tags] ${caps} Evaluate {"enableVideo": True, "enableLogs": True, "enableVNC": True, "provider": "zebrunner"} - SeleniumLibrary.Open Browser https://google.com chrome remote_url=https://tolik:90eaktVT97VqUOy5@engine.zebrunner.dev/wd/hub desired_capabilities=${caps} + SeleniumLibrary.Open Browser https://google.com chrome remote_url=http://localhost:4444/wd/hub desired_capabilities=${caps} SeleniumLibrary.Close Browser Open Firefox [Tags] ${caps} Evaluate {"enableVideo": True, "enableLogs": True, "enableVNC": True, "provider": "zebrunner"} - SeleniumLibrary.Open Browser https://google.com firefox remote_url=https://tolik:90eaktVT97VqUOy5@engine.zebrunner.dev/wd/hub desired_capabilities=${caps} + SeleniumLibrary.Open Browser https://google.com firefox remote_url=http://localhost:4444/wd/hub desired_capabilities=${caps} Should Be True 2 + 2 == 5 SeleniumLibrary.Close Browser diff --git a/pyproject.toml b/pyproject.toml index b5c25ab..1215c89 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "robotframework-zebrunner" -version = "0.1.8" +version = "0.3.1" description = "Robotframework connector for Zebrunner reporting" authors = ["Anatoliy Platonov "] license = "Apache" diff --git a/src/robotframework_zebrunner/__init__.py b/src/robotframework_zebrunner/__init__.py index bffa404..8714c06 100644 --- a/src/robotframework_zebrunner/__init__.py +++ b/src/robotframework_zebrunner/__init__.py @@ -1,2 +1,2 @@ +from .library import ZebrunnerLib from .listener import ZebrunnerListener -from .listener import ZebrunnerLib diff --git a/src/robotframework_zebrunner/api/client.py b/src/robotframework_zebrunner/api/client.py index 48b9891..8f368bf 100644 --- a/src/robotframework_zebrunner/api/client.py +++ b/src/robotframework_zebrunner/api/client.py @@ -1,6 +1,5 @@ import json import logging -import sys import time from datetime import datetime, timedelta, timezone from pathlib import Path @@ -12,78 +11,56 @@ from .models import ( ArtifactReferenceModel, + AttachTestsToSessionModel, FinishTestModel, FinishTestSessionModel, LabelModel, LogRecordModel, + PlatformModel, RerunDataModel, StartTestModel, StartTestRunModel, StartTestSessionModel, ) +from ..errors import AgentApiError +from ..utils import Singleton logger = logging.getLogger(__name__) def log_response(response: Response, log_level: int = logging.DEBUG) -> None: - """ - Logger customized configuration. - - Args: - response (Response): Http response from server to client - log_level (int): Logging messages which are less severe than level will be ignored. - - Attributes: - request (Request): Request instance associated to the current response. - """ + """Logger customized configuration""" request = response.request request.read() logger.log( log_level, - f"Request {request.method} {request.url}\n" # type: ignore + f"Request {request.method} {request.url}\n" f"Headers: \n{pformat(dict(request.headers))}\n\n" - f"Content: \n{request.content}\n\n" + f"Content: \n{request.content!r}\n\n" f"Response Code: {response.status_code}\n" - f"Response Content: \n{pformat(response.json())}", + f" Content: \n{pformat(response.json())}", ) -class ZebrunnerAPI: +class ZebrunnerAPI(metaclass=Singleton): """ - A class used to represent ZebrunnerAPI using Singleton Pattern which guarantees that will exist only one instance - of this class. - - Attributes: - authenticated (bool): True when a valid access token is given. - + A singleton Zebrunner API representation """ - authenticated = False + _authenticated = False def __init__(self, service_url: str = None, access_token: str = None): - """ - Args: - service_url (str): Url to access Zebrunner API. None by default. - access_token (str): Access token to access Zebrunner API. None by default. - """ if service_url and access_token: self.service_url = service_url.rstrip("/") self.access_token = access_token self._client = Client() self._auth_token = None - self.authenticated = False + self._authenticated = False def _sign_request(self, request: Request) -> Request: """ Returns a request with the _auth_token set to the authorization request header. - - Args: - request (Request): - - Returns: - request (Request): Request whose authorization request header has been set with _auth_token. - """ request.headers["Authorization"] = f"Bearer {self._auth_token}" return request @@ -91,282 +68,355 @@ def _sign_request(self, request: Request) -> Request: def auth(self) -> None: """ Validates the user access token with http post method and if it is correct, authenticates the user. - - Attributes: - url (str): Url to validates the user access token. - """ if not self.access_token or not self.service_url: return - url = self.service_url + "/api/iam/v1/auth/refresh" + url = f"{self.service_url}/api/iam/v1/auth/refresh" try: response = self._client.post(url, json={"refreshToken": self.access_token}) except httpx.RequestError as e: - logger.warning("Error while sending request to zebrunner.", exc_info=e) - return + raise AgentApiError("Failed to authorize zebrunner agent", e) - if response.status_code != 200: - log_response(response, logging.ERROR) - sys.exit("Authorization failed!") + if not response.is_success: + raise AgentApiError( + "Failed to authorize zebrunner agent", {"status_code": response.status_code, "body": response.json()} + ) self._auth_token = response.json()["authToken"] - self._client.auth = self._sign_request # type: ignore - self.authenticated = True + self._client.auth = self._sign_request # type: ignore + self._authenticated = True def start_test_run(self, project_key: str, body: StartTestRunModel) -> Optional[int]: """ - Execute an http post with the given project_key and body, which contains StartTestRunModel. - If everything is OK, returns response id value for test run. Otherwise, logs errors and returns None. - - Args: - project_key (str): - body (StartTestRunModel): Entity with TestRun properties. - - Attributes: - url (str): Url to access api reporting test runs. - - Returns: - (int, optional): Returns http response 'id' value if http post was OK. Otherwise, returns None. + Send POST request creating new test run. Raise ApiAgentException if request failed """ - url = self.service_url + "/api/reporting/v1/test-runs" + url = f"{self.service_url}/api/reporting/v1/test-runs" try: response = self._client.post( url, params={"projectKey": project_key}, json=body.dict(exclude_none=True, by_alias=True) ) except httpx.RequestError as e: - logger.warning("Error while sending request to zebrunner.", exc_info=e) - return None + raise AgentApiError("Failed to create test run", e) - if response.status_code != 200: - log_response(response, logging.ERROR) - return None + if not response.is_success: + raise AgentApiError( + "Failed to create test run. Non successful response code", + {"status_code": response.status_code, "body": response.json()}, + ) return response.json()["id"] def start_test(self, test_run_id: int, body: StartTestModel) -> Optional[int]: """ - Execute an http post with the given test_run_id and body, which contains StartTestModel. - If everything is OK, returns response id value for test. Otherwise, logs errors and returns None. - - Args: - test_run_id (int): Number that identifies test_run. - body (StartTestModel): Entity with Test properties. - - Returns: - (int, optional): Returns http response 'id' value if http post was OK. Otherwise, returns None. + Send POST request creating new test. Raise AgentApiError in case of any exceptions """ - url = self.service_url + f"/api/reporting/v1/test-runs/{test_run_id}/tests" + url = f"{self.service_url}/api/reporting/v1/test-runs/{test_run_id}/tests" try: response = self._client.post(url, json=body.dict(exclude_none=True, by_alias=True)) except httpx.RequestError as e: - logger.warning("Error while sending request to zebrunner.", exc_info=e) - return None + raise AgentApiError("Failed to create test", e) - if response.status_code != 200: - log_response(response, logging.ERROR) - return None + if not response.is_success: + raise AgentApiError( + "Failed to create test. Non successful response code", + {"status_code": response.status_code, "body": response.json()}, + ) return response.json()["id"] def update_test(self, test_run_id: int, test_id: int, test: StartTestModel) -> Optional[int]: - url = self.service_url + f"/api/reporting/v1/test-runs/{test_run_id}/tests/{test_id}/" + """ + Send PUT request updating some test. Raise AgentApiError in case of any exceptions + """ + + url = f"{self.service_url}/api/reporting/v1/test-runs/{test_run_id}/tests/{test_id}/" try: response = self._client.post(url, json=test.dict(exclude_none=True, by_alias=True)) except httpx.RequestError as e: - logger.warning("Error while sending request to zebrunner.", exc_info=e) - return None + raise AgentApiError("Failed to update test", e) - if response.status_code != 200: - log_response(response, logging.ERROR) - return None + if not response.is_success: + raise AgentApiError( + "Failed to update test. Non successful status code", + {"status_code": response.status_code, "body": response.json()}, + ) return response.json()["id"] def finish_test(self, test_run_id: int, test_id: int, body: FinishTestModel) -> None: """ - Execute an http put with the given test_run_id, test_id, and body, which contains FinishTestModel. - If everything is OK, finish the Test. Otherwise, logs errors. - - Args: - test_run_id (int): Number that identifies test_run. - test_id (int): Number that identifies test. - body (FinishTestModel): Entity with FinishTest properties. - + Send PUT request finishing current test. Raise AgentApiError in case of any exceptions """ - url = self.service_url + f"/api/reporting/v1/test-runs/{test_run_id}/tests/{test_id}" + url = f"{self.service_url}/api/reporting/v1/test-runs/{test_run_id}/tests/{test_id}" try: response = self._client.put(url, json=body.dict(exclude_none=True, by_alias=True)) except httpx.RequestError as e: - logger.warning("Error while sending request to zebrunner.", exc_info=e) - return + raise AgentApiError("Failed to finish test", e) - if response.status_code != 200: - log_response(response, logging.ERROR) + if not response.is_success: + raise AgentApiError( + "Failed to finish test. Non successful status code", + {"status_code": response.status_code, "body": response.json()}, + ) def finish_test_run(self, test_run_id: int) -> None: """ - Execute an http put with the given test_run_id. - If everything is OK, updates endedAt value finishing test_run. Otherwise, logs errors. - - Args: - test_run_id (int): Number that identifies test_run. + Send PUT request finishing current test run. Raise AgentApiError in case of any exceptions """ - url = self.service_url + f"/api/reporting/v1/test-runs/{test_run_id}" + url = f"{self.service_url}/api/reporting/v1/test-runs/{test_run_id}" try: response = self._client.put( url, json={"endedAt": (datetime.utcnow().replace(tzinfo=timezone.utc) - timedelta(seconds=1)).isoformat()}, ) except httpx.RequestError as e: - logger.warning("Error while sending request to zebrunner.", exc_info=e) - return + raise AgentApiError("Failed to finish test run", e) - if response.status_code != 200: - log_response(response, logging.ERROR) - return + if not response.is_success: + raise AgentApiError( + "Failed to finish test run. Non successful status code", + {"status_code": response.status_code, "body": response.json()}, + ) def send_logs(self, test_run_id: int, logs: List[LogRecordModel]) -> None: """ - Convert a list of LogRecordModel to dictionary format(json) and send them to logs endpoint - associated with the appropriate test_run_id, for reporting. - - Args: - test_run_id (int): Number that identifies test_run. - logs (List[LogRecordModel]): List of LogRecordModel to send for reporting. + Send POST request uploading logs. Raise AgentApiError in case of any exceptions """ - url = self.service_url + f"/api/reporting/v1/test-runs/{test_run_id}/logs" + url = f"{self.service_url}/api/reporting/v1/test-runs/{test_run_id}/logs" body = [x.dict(exclude_none=True, by_alias=True) for x in logs] - self._client.post(url, json=body) + try: + response = self._client.post(url, json=body) + except httpx.RequestError as e: + raise AgentApiError("Failed to send logs", e) + + if not response.is_success: + raise AgentApiError( + "Failed to send logs. Non successful status code", + {"status_code": response.status_code, "body": response.json()}, + ) def send_screenshot(self, test_run_id: int, test_id: int, image_path: Union[str, Path]) -> None: """ - Open an image file by its path, read the binary content and send it to screenshots endpoint - associated with the appropriate test_id, for reporting. - - Args: - test_run_id (int): Number that identifies test_run. - test_id (int): Number that identifies test. - image_path (Union[str,Path)]: Path to identify image location in directory structure. - - Raises: - FileNotFoundError: If screenshot file is not reachable. - + Send screenshot to zebrunner. Raise AgentApiError in case of any exceptions """ - url = self.service_url + f"/api/reporting/v1/test-runs/{test_run_id}/tests/{test_id}/screenshots" + url = f"{self.service_url}/api/reporting/v1/test-runs/{test_run_id}/tests/{test_id}/screenshots" with open(image_path, "rb") as image: - self._client.post( - url, - content=image.read(), - headers={"Content-Type": "image/png", "x-zbr-screenshot-captured-at": str(round(time.time() * 1000))}, - ) + try: + response = self._client.post( + url, + content=image.read(), + headers={ + "Content-Type": "image/png", + "x-zbr-screenshot-captured-at": str(round(time.time() * 1000)), + }, + ) + except httpx.RequestError as e: + raise AgentApiError("Failed to send screenshot", e) + + if not response.is_success: + raise AgentApiError( + "Failed to send screenshot. Non successful status code", + {"status_code": response.status_code, "body": response.json()}, + ) def send_artifact(self, filename: Union[str, Path], test_run_id: int, test_id: Optional[int] = None) -> None: """ - Open a file by its path, read the binary content and sent it to artifacts endpoint - associated with the appropriate test_id if one is given. - Otherwise the artifacts endpoint is the one associated with only the appropriate test_run_id. - - Args: - filename (Union[str, Path]): Path to identify artifact location in directory structure. - test_run_id (int): Number that identifies test_run. - test_id: (int, optional): Number that identifies test. - - Raises: - FileNotFoundError: If artifact file is not reachable. - + Send artifact to zebrunner. Attach it to test run if test_id is None else attach it to test. + Raise AgentApiError in case of any exceptions """ if test_id: url = f"{self.service_url}/api/reporting/v1/test-runs/{test_run_id}/tests/{test_id}/artifacts" else: url = f"{self.service_url}/api/reporting/v1/test-runs/{test_run_id}/artifacts" + with open(filename, "rb") as file: - self._client.post(url, files={"file": file}) + try: + response = self._client.post(url, files={"file": file}) + except httpx.RequestError as e: + raise AgentApiError("Failed to send artifact", e) + + if not response.is_success: + raise AgentApiError( + "Failed to send artifact. Non successful status code", + {"status_code": response.status_code, "body": response.json()}, + ) def send_artifact_references( self, references: List[ArtifactReferenceModel], test_run_id: int, test_id: Optional[int] = None ) -> None: """ - Convert a list of ArtifactsReferenceModel to dictionary format(json) and send them to - test artifact-references endpoint associated with the appropriate test_id if one is given. - Otherwise the artifact-references endpoint is the one associated with only the appropriate test_run_id. - - Args: - references (List[ArtifactReferenceModel]): List of artifacts references to send for reporting. - test_run_id (int): Number that identifies test_run. - test_id: (int, optional): Number that identifies test. + Send artifact reference to zebrunner. Attach it to test run if test_id is None else attach it to test. + Raise AgentApiError in case of any exceptions """ if test_id: url = f"{self.service_url}/api/reporting/v1/test-runs/{test_run_id}/tests/{test_id}/artifact-references" else: url = f"{self.service_url}/api/reporting/v1/test-runs/{test_run_id}/artifact-references/" json_items = [item.dict(exclude_none=True, by_alias=True) for item in references] - self._client.put(url, json={"items": json_items}) - def send_labels(self, labels: List[LabelModel], test_run_id: int, test_id: Optional[int] = None) -> None: - """ - Convert a list of LabelModel to dictionary format(json) and send them to - labels endpoint associated with the appropiate test_id if one is given. - Otherwise, the labels endpoint is the one associated with only the appropriate test_run_id. + try: + response = self._client.put(url, json={"items": json_items}) + except httpx.RequestError as e: + raise AgentApiError("Failed to send artifact reference", e) - Args: - labels (List[LabelModel]): List of labels to send for reporting. - test_run_id (int): Number that identifies test_run. - test_id: (optional int): Number that identifies test. + if not response.is_success: + raise AgentApiError( + "Failed to send artifact reference. Non successful status code", + {"status_code": response.status_code, "body": response.json()}, + ) + def send_labels(self, labels: List[LabelModel], test_run_id: int, test_id: Optional[int] = None) -> None: + """ + Send labels to zebrunner. Attach it to test run if test_id is None else attach it to test. + Raise AgentApiError in case of any exceptions """ if test_id: url = f"{self.service_url}/api/reporting/v1/test-runs/{test_run_id}/tests/{test_id}/labels" else: url = f"{self.service_url}/api/reporting/v1/test-runs/{test_run_id}/labels" labels_json = [label.dict(exclude_none=True, by_alias=True) for label in labels] - self._client.put(url, json={"items": labels_json}) + + try: + response = self._client.put(url, json={"items": labels_json}) + except httpx.RequestError as e: + raise AgentApiError("Failed to send labels", e) + + if not response.is_success: + raise AgentApiError( + "Failed to send labels. Non successful status code", + {"status_code": response.status_code, "body": response.json()}, + ) def start_test_session(self, test_run_id: int, body: StartTestSessionModel) -> Optional[str]: """ - Execute an http post with the given test_run_id and body, which contains StartTestSessionModel. - If everything is OK, returns response id value for test. Otherwise, logs errors and returns None. + Send POST request starting test session. Raise AgentApiError in case of any exceptions + """ + url = f"{self.service_url}/api/reporting/v1/test-runs/{test_run_id}/test-sessions" - Args: - test_run_id (int): Number that identifies test_run. - body (StartTestSessionModel): Entity with TestSession properties. + try: + response = self._client.post(url, json=body.dict(exclude_none=True, by_alias=True)) + except httpx.RequestError as e: + raise AgentApiError("Failed to start session", e) - Returns: - (string, optional): Returns http response 'id' value if http post was OK. Otherwise, returns None. - """ - url = self.service_url + f"/api/reporting/v1/test-runs/{test_run_id}/test-sessions" - response = self._client.post(url, json=body.dict(exclude_none=True, by_alias=True)) if not response.status_code == 200: - log_response(response, logging.ERROR) - return None + raise AgentApiError( + "Failed to start session. Non successful status code", + {"status_code": response.status_code, "body": response.json()}, + ) return response.json().get("id") - def finish_test_session(self, test_run_id: int, session_id: str, body: FinishTestSessionModel) -> None: + def add_tests_to_session(self, test_run_id: int, session_id: str, related_tests: List[int]) -> None: + """ + Send PUT request attaching new test to test session. Raise AgentApiError in case of any exceptions """ - Execute an http put with the given test_run_id, zebrunner_id and body, which contains FinishTestSessionModel. - If everything is OK, finish the test_session. + url = f"{self.service_url}/api/reporting/v1/test-runs/{test_run_id}/test-sessions/{session_id}" + body = AttachTestsToSessionModel(test_ids=related_tests) + try: + response = self._client.put(url, json=body.dict(exclude_none=True, by_alias=True)) + except httpx.RequestError as e: + raise AgentApiError("Failed to attach tests to session", e) + + if not response.is_success: + raise AgentApiError( + "Failed to attach tests to session. Non successful status code", + {"status_code": response.status_code, "body": response.json()}, + ) + def finish_test_session(self, test_run_id: int, test_id: str, body: FinishTestSessionModel) -> None: """ - url = self.service_url + f"/api/reporting/v1/test-runs/{test_run_id}/test-sessions/{session_id}" - self._client.put(url, json=body.dict(exclude_none=True, by_alias=True)) + Send PUT request finishing test session. Raise AgentApiError in case of any exceptions + """ + url = f"{self.service_url}/api/reporting/v1/test-runs/{test_run_id}/test-sessions/{test_id}" + try: + response = self._client.put(url, json=body.dict(exclude_none=True, by_alias=True)) + except httpx.RequestError as e: + raise AgentApiError("Failed to start session", e) + + if not response.is_success: + raise AgentApiError( + "Failed to finish session. Non successful status code", + {"status_code": response.status_code, "body": response.json()}, + ) def get_rerun_tests(self, run_context: str) -> RerunDataModel: - """""" - url = self.service_url + "/api/reporting/v1/run-context-exchanges" + """Exchange run context on tests to run. Raise AgentApiError in case of any exceptions""" + url = f"{self.service_url}/api/reporting/v1/run-context-exchanges" run_context_dict = json.loads(run_context) - response = self._client.post(url, json=run_context_dict) + try: + response = self._client.post(url, json=run_context_dict) + except httpx.RequestError as e: + raise AgentApiError("Failed to get rerun tests", e) + + if not response.is_success: + raise AgentApiError( + "Failed to get rerun tests. Non successful status code", + {"status_code": response.status_code, "body": response.json()}, + ) + response_data = response.json() for test in response_data["testsToRun"]: - correlation_data = test["correlationData"] - if correlation_data is not None: - test["correlationData"] = json.loads(correlation_data) + test["correlationData"] = json.loads(test["correlationData"]) if test["correlationData"] else None + return RerunDataModel(**response_data) + def reverse_test_registration(self, test_run_id: int, test_id: int) -> None: + """Send PUT request reversing test registration. Raise AgentApiError in case of any exceptions""" + + url = f"{self.service_url}/api/reporting/v1/test-runs/{test_run_id}/tests/{test_id}" + + try: + response = self._client.delete(url) + except httpx.RequestError as e: + raise AgentApiError("Failed to revert test registration", e) + + if not response.is_success: + raise AgentApiError( + "Failed to revert test registration. Non successful status code", + {"status_code": response.status_code, "body": response.json()}, + ) + + def set_test_run_platform(self, run_id: int, platform: PlatformModel) -> None: + """Update test run platform. Raise AgentApiError in case of any exceptions""" + url = f"{self.service_url}/api/reporting/v1/test-runs/{run_id}/platform" + + try: + response = self._client.put(url, json=platform.dict(exclude_none=True, by_alias=True)) + except httpx.RequestError as e: + raise AgentApiError("failed to set test run platform", e) + + if not response.is_success: + raise AgentApiError( + "Failed to set test run platform. Non successful status code", + {"status_code": response.status_code, "body": response.json()}, + ) + + def patch_test_run_build(self, run_id: int, build: str) -> None: + """Set test run build. Raise AgentApiError in case of any exceptions""" + url = f"{self.service_url}/api/reporting/v1/test-runs/{run_id}" + + body = { + "op": "replace", + "path": "/config/build", + "value": build, + } + + try: + response = self._client.patch(url, json=[body]) + except httpx.RequestError as e: + raise AgentApiError("failed to patch test run build", e) + + if not response.is_success: + raise AgentApiError( + "Failed to patch test run platform. Non successful status code", + {"status_code": response.status_code, "body": response.json()}, + ) + def close(self) -> None: """ Close the connection pool without block-usage. diff --git a/src/robotframework_zebrunner/api/models.py b/src/robotframework_zebrunner/api/models.py index 3199e22..1adfde9 100644 --- a/src/robotframework_zebrunner/api/models.py +++ b/src/robotframework_zebrunner/api/models.py @@ -9,9 +9,6 @@ def generate_uuid() -> str: """ Generate an universal unique identifier. - - Returns: - (str): Universal unique identifier (uuid). """ return str(uuid4()) @@ -19,9 +16,6 @@ def generate_uuid() -> str: def generate_datetime_str() -> str: """ Generate a DateTime string in ISO format. - - Returns: - (str): DateTime in ISO format. """ return (datetime.utcnow()).replace(tzinfo=timezone.utc).isoformat() @@ -72,6 +66,7 @@ class Config: class TestRunConfigModel(CamelModel): environment: Optional[str] = None build: Optional[str] = None + treat_skips_as_failures: bool class MilestoneModel(CamelModel): @@ -154,6 +149,10 @@ class FinishTestSessionModel(CamelModel): test_ids: List[int] = [] +class AttachTestsToSessionModel(CamelModel): + test_ids: List[int] = [] + + class ArtifactReferenceModel(CamelModel): name: str value: str @@ -174,3 +173,8 @@ class RerunDataModel(CamelModel): reason: Optional[str] run_only_specific_tests: bool tests_to_run: List[TestModel] + + +class PlatformModel(CamelModel): + name: str + version: Optional[str] diff --git a/src/robotframework_zebrunner/context.py b/src/robotframework_zebrunner/context.py new file mode 100644 index 0000000..5a57bc5 --- /dev/null +++ b/src/robotframework_zebrunner/context.py @@ -0,0 +1,31 @@ +from typing import Optional + +from pydantic.error_wrappers import ValidationError + +from .settings import load_settings + + +class ZebrunnerContext: + def __init__(self) -> None: + self.test_run_id: Optional[int] = None + self.test_id: Optional[int] = None + self.is_reverted = False + try: + self.settings = load_settings() + except ValidationError: + self.settings = None # type: ignore + + @property + def is_configured(self) -> bool: + return self.settings is not None + + @property + def test_is_active(self) -> bool: + return self.is_configured and self.test_run_is_active and self.test_id is not None + + @property + def test_run_is_active(self) -> bool: + return self.is_configured and self.test_run_id is not None + + +zebrunner_context = ZebrunnerContext() diff --git a/src/robotframework_zebrunner/errors.py b/src/robotframework_zebrunner/errors.py new file mode 100644 index 0000000..c6da301 --- /dev/null +++ b/src/robotframework_zebrunner/errors.py @@ -0,0 +1,6 @@ +class AgentApiError(Exception): + pass + + +class AgentError(Exception): + pass diff --git a/src/robotframework_zebrunner/library.py b/src/robotframework_zebrunner/library.py new file mode 100644 index 0000000..d46902a --- /dev/null +++ b/src/robotframework_zebrunner/library.py @@ -0,0 +1,265 @@ +import logging +import sys +from pathlib import Path +from typing import Optional, Union + +from .api.client import ZebrunnerAPI +from .api.models import ArtifactReferenceModel, LabelModel, PlatformModel +from .context import zebrunner_context +from .errors import AgentApiError, AgentError +from .listener import ZebrunnerListener +from .tcm.test_rail import TestRail +from .tcm.xray import Xray +from .tcm.zephyr import Zephyr + + +class ZebrunnerLib: + if not ("robotframework_zebrunner.ZebrunnerListener" in sys.argv): + ROBOT_LIBRARY_LISTENER = ZebrunnerListener() + + ROBOT_LIBRARY_SCOPE = "GLOBAL" + + def attach_test_screenshot(self, path: Union[str, Path]) -> None: + """ + Send screenshot to zebrunner and attach it to test + """ + if not zebrunner_context.test_is_active: + raise AgentError("There is no active test to attach screenshot") + + try: + api = ZebrunnerAPI( + zebrunner_context.settings.server.hostname, + zebrunner_context.settings.server.access_token, + ) + api.send_screenshot( + zebrunner_context.test_run_id, zebrunner_context.test_id, path + ) + except AgentApiError as e: + logging.error("Failed to attach test screenshot", exc_info=e) + + def attach_test_artifact(self, path: Union[str, Path]) -> None: + """ + Send artifact to zebrunner and attach it to test. Artifact is any file from disk + """ + if not zebrunner_context.test_is_active: + raise AgentError("There is no active test to attach artifact") + + try: + api = ZebrunnerAPI( + zebrunner_context.settings.server.hostname, + zebrunner_context.settings.server.access_token, + ) + api.send_artifact( + path, zebrunner_context.test_run_id, zebrunner_context.test_id + ) + except AgentApiError as e: + logging.error("Failed to attach test artifact", exc_info=e) + + def attach_test_run_artifact(self, path: Union[str, Path]) -> None: + """ + Send artifact to zebrunner and attach it to test run. Artifact is any file from disk + """ + if not zebrunner_context.test_run_is_active: + raise AgentError("There is no active test run to attach artifact") + + try: + api = ZebrunnerAPI( + zebrunner_context.settings.server.hostname, + zebrunner_context.settings.server.access_token, + ) + api.send_artifact(path, zebrunner_context.test_run_id) + except AgentApiError as e: + logging.error("Failed to attach test run artifact", exc_info=e) + + def attach_test_artifact_reference(self, name: str, ref: str) -> None: + """ + Send artifact reference to zebrunner and attach it to test. Artifact reference is a URL + """ + if not zebrunner_context.test_is_active: + raise AgentError("There is no active test to attach artifact reference") + + try: + api = ZebrunnerAPI( + zebrunner_context.settings.server.hostname, + zebrunner_context.settings.server.access_token, + ) + api.send_artifact_references( + [ArtifactReferenceModel(name=name, value=ref)], + zebrunner_context.test_run_id, + zebrunner_context.test_id, + ) + except AgentApiError as e: + logging.error("Failed to attach test artifact reference", exc_info=e) + + def attach_test_run_artifact_reference(self, name: str, ref: str) -> None: + """ + Send artifact reference to zebrunner and attach it to test run. Artifact reference is a URL + """ + if not zebrunner_context.test_run_is_active: + raise AgentError("There is no active test run to attach artifact reference") + + try: + api = ZebrunnerAPI( + zebrunner_context.settings.server.hostname, + zebrunner_context.settings.server.access_token, + ) + api.send_artifact_references( + [ArtifactReferenceModel(name=name, value=ref)], + zebrunner_context.test_run_id, + ) + except AgentApiError as e: + logging.error("Failed to attach test run artifact reference", exc_info=e) + + def attach_test_label(self, name: str, value: str) -> None: + """ + Attach label to test in zebrunner + """ + if not zebrunner_context.test_is_active: + raise AgentError("There is no active test to attach label") + + try: + api = ZebrunnerAPI( + zebrunner_context.settings.server.hostname, + zebrunner_context.settings.server.access_token, + ) + api.send_labels( + [LabelModel(key=name, value=value)], + zebrunner_context.test_run_id, + zebrunner_context.test_id, + ) + except AgentApiError as e: + logging.error("Failed to attach label to test", exc_info=e) + + def attach_test_run_label(self, name: str, value: str) -> None: + """ + Attach label to test run in zebrunner + """ + if not zebrunner_context.test_run_is_active: + raise AgentError("There is no active test run to attach label") + + try: + api = ZebrunnerAPI( + zebrunner_context.settings.server.hostname, + zebrunner_context.settings.server.access_token, + ) + api.send_labels( + [LabelModel(key=name, value=value)], zebrunner_context.test_run_id + ) + except AgentApiError as e: + logging.error("Failed to attach label to test run", exc_info=e) + + def current_test_revert_registration(self) -> None: + if not zebrunner_context.test_is_active: + raise AgentError("There is not active test to revert") + + settings = zebrunner_context.settings + try: + api = ZebrunnerAPI(settings.server.hostname, settings.server.access_token) + api.reverse_test_registration(zebrunner_context.test_run_id, zebrunner_context.test_id) + zebrunner_context.is_reverted = True + except AgentApiError as e: + logging.error("Failed to revert test registration", exc_info=e) + + def current_test_run_set_build(self, build: str) -> None: + if not build.strip(): + raise AgentError("Build must not be empty") + + if not zebrunner_context.test_run_is_active: + raise AgentError("There is not active test run to set build") + + settings = zebrunner_context.settings + try: + api = ZebrunnerAPI(settings.server.hostname, settings.server.access_token) + api.patch_test_run_build(zebrunner_context.test_run_id, build) + except AgentApiError as e: + logging.error("Failed to set build", exc_info=e) + + def current_test_run_set_locale(self, locale: str) -> None: + if not locale.strip(): + raise AgentError("Locale must no be empty") + if not zebrunner_context.test_run_is_active: + raise AgentError("There is not active test run to set locale") + + label = "com.zebrunner.app/sut.locale" + settings = zebrunner_context.settings + try: + api = ZebrunnerAPI(settings.server.hostname, settings.server.access_token) + api.send_labels([LabelModel(key=label, value=locale)], zebrunner_context.test_run_id, None) + except AgentApiError as e: + logging.error("failed to set locale", exc_info=e) + + def current_test_run_set_platform(self, name: str) -> None: + if not name.strip(): + raise AgentError("Platform must not be empty") + + self.current_test_run_set_platform_version(name, None) + + def current_test_run_set_platform_version(self, name: str, version: Optional[str]) -> None: + if not name.strip(): + raise AgentError("Platform must not be empty") + if not zebrunner_context.test_run_is_active: + raise AgentError("There is not active test run to set platform") + + settings = zebrunner_context.settings + try: + api = ZebrunnerAPI(settings.server.hostname, settings.server.access_token) + api.set_test_run_platform( + zebrunner_context.test_run_id, + PlatformModel(name=name, version=version), + ) + except AgentApiError as e: + logging.error("Failed to set platform", exc_info=e) + + def test_rail_disable_sync(self) -> None: + TestRail.disable_sync() + + def test_rail_enable_real_time_sync(self) -> None: + TestRail.enable_real_time_sync() + + def test_rail_include_all_test_cases_in_new_run(self) -> None: + TestRail.include_all_test_cases_in_new_run() + + def test_rail_set_suite_id(self, suite_id: str) -> None: + TestRail.set_suite_id(suite_id) + + def test_rail_set_run_id(self, run_id: str) -> None: + TestRail.set_run_id(run_id) + + def test_rail_set_run_name(self, run_name: str) -> None: + TestRail.set_run_name(run_name) + + def test_rail_set_milestone(self, milestone: str) -> None: + TestRail.set_milestone(milestone) + + def test_rail_set_assignee(self, assignee: str) -> None: + TestRail.set_assignee(assignee) + + def test_rail_set_case_id(self, test_case_id: str) -> None: + TestRail.set_case_id(test_case_id) + + def xray_disable_sync(self) -> None: + Xray.disable_sync() + + def xray_enable_real_time_sync(self) -> None: + Xray.enable_real_time_sync() + + def xray_set_execution_key(self, execution_key: str) -> None: + Xray.set_execution_key(execution_key) + + def xray_set_test_key(self, test_key: str) -> None: + Xray.set_test_key(test_key) + + def zephyr_disable_sync(self) -> None: + Zephyr.disable_sync() + + def zephyr_enable_real_time_sync(self) -> None: + Zephyr.enable_real_time_sync() + + def zephyr_set_test_cycle_key(self, key: str) -> None: + Zephyr.set_test_cycle_key(key) + + def zephyr_set_jira_project_key(self, key: str) -> None: + Zephyr.set_jira_project_key(key) + + def zephyr_set_test_case_key(self, key: str) -> None: + Zephyr.set_test_case_key(key) diff --git a/src/robotframework_zebrunner/listener.py b/src/robotframework_zebrunner/listener.py index ac76410..c5d0fa0 100644 --- a/src/robotframework_zebrunner/listener.py +++ b/src/robotframework_zebrunner/listener.py @@ -1,60 +1,57 @@ -from distutils.command.build import build -import json import logging -from pprint import pformat +import os +import sys import time +from datetime import datetime from typing import List, Optional -from pydantic import ValidationError -from robot import running, result +from robot import result, running from robot.libraries.BuiltIn import BuiltIn -from robotframework_zebrunner.logs import LogBuffer - from .api.client import ZebrunnerAPI -from .api.models import FinishTestModel, LogRecordModel, StartTestModel, StartTestRunModel, TestStatus -from .api.models import ( - MilestoneModel, - NotificationTargetModel, - NotificationsModel, - NotificationsType, - TestRunConfigModel, -) +from .api.models import (FinishTestModel, LabelModel, LogRecordModel, + MilestoneModel, NotificationsModel, NotificationsType, + NotificationTargetModel, StartTestModel, + StartTestRunModel, TestRunConfigModel, TestStatus) from .ci_loaders import resolve_ci_context +from .context import zebrunner_context +from .errors import AgentApiError +from .logs import LogBuffer from .selenium_integration import SeleniumSessionManager, inject_driver -from .settings import Settings, load_settings +from .settings import NotificationsSettings +from .tcm import TestRail, Xray, Zephyr class ZebrunnerListener: ROBOT_LISTENER_API_VERSION = 3 - test_run_id: Optional[int] - test_id: Optional[int] - settings: Optional[Settings] - session_manager: Optional[SeleniumSessionManager] + TEST_RUN_ID_KEY = "ZEBRUNNER_TEST_RUN_ID" + RUNNING_COUNT_KEY = "ZEBRUNNER_RUNNING_TESTS_COUNT" + ZEBRUNNER_LOCK = "zebrunner" + + session_manager: SeleniumSessionManager log_buffer: LogBuffer def __init__(self) -> None: - self.test_run_id = None - self.test_id = None - try: - self.settings = load_settings() - except ValidationError as e: - logging.error("Failed to load zebrunner config", exc_info=e) - self.settings = None - return - - server_config = self.settings.server - self.api = ZebrunnerAPI(server_config.hostname, server_config.access_token) + self.api = ZebrunnerAPI( + zebrunner_context.settings.server.hostname, + zebrunner_context.settings.server.access_token, + ) try: self.api.auth() - except SystemExit as e: + except AgentApiError as e: logging.error(str(e)) - raise + sys.exit(os.EX_CONFIG) + + def _pabotlib(self): + if self._is_pabot_enabled(): + builtin = BuiltIn() + return builtin.get_library_instance("pabot.PabotLib") - self.log_buffer = LogBuffer(self.api, self.test_run_id) + return None - def _is_pabot_enabled(self) -> bool: + @staticmethod + def _is_pabot_enabled() -> bool: try: builtin = BuiltIn() builtin.import_library("pabot.PabotLib") @@ -68,172 +65,240 @@ def _is_pabot_enabled(self) -> bool: return False def _start_test_run(self, data: running.TestSuite) -> Optional[int]: - if not self.settings: - return None + settings = zebrunner_context.settings + display_name = ( + settings.run.display_name + if settings.run + else f"Unnamed {datetime.utcnow()}" + ) - display_name = self.settings.run.display_name if self.settings.run else "Default suite" start_run = StartTestRunModel( name=display_name or data.name, framework="robotframework", config=TestRunConfigModel( - environment=self.settings.run.environment, - build=self.settings.run.build, + environment=settings.run.environment, + build=settings.run.build, + treat_skips_as_failures=settings.run.treat_skips_as_failures, ), - ci_context=resolve_ci_context() + ci_context=resolve_ci_context(), ) - if self.settings.run.context: - zebrunner_run_context = self.api.get_rerun_tests(self.settings.run.context) + if settings.run.context: + zebrunner_run_context = self.api.get_rerun_tests(settings.run.context) start_run.uuid = zebrunner_run_context.test_run_uuid - # if not zebrunner_run_context.run_allowed: - # pytest.exit(f"Run not allowed by zebrunner! Reason: {zebrunner_run_context.reason}") - # if zebrunner_run_context.run_only_specific_tests and not zebrunner_run_context.tests_to_run: - # pytest.exit("Aborted. No tests to run!!") + builtin = BuiltIn() + if not zebrunner_run_context.run_allowed: + builtin.fatal_error( + f"Run not allowed by zebrunner! Reason: {zebrunner_run_context.reason}" + ) + if ( + zebrunner_run_context.run_only_specific_tests + and not zebrunner_run_context.tests_to_run + ): + builtin.fatal("Aborted. No tests to run!!") - if self.settings.milestone: + if settings.milestone: start_run.milestone = MilestoneModel( - id=self.settings.milestone.id, - name=self.settings.milestone.name, + id=settings.milestone.id, name=settings.milestone.name ) - if self.settings.notification: - notification = self.settings.notification - targets: List[NotificationTargetModel] = [] - if notification.emails: - targets.append( - NotificationTargetModel( - type=NotificationsType.EMAIL_RECIPIENTS.value, - value=notification.emails, - ) - ) - - if notification.slack_channels: - targets.append( - NotificationTargetModel( - type=NotificationsType.SLACK_CHANNELS.value, - value=notification.slack_channels, - ) - ) - - if notification.ms_teams_channels: - targets.append( - NotificationTargetModel( - type=NotificationsType.MS_TEAMS_CHANNELS.value, - value=notification.ms_teams_channels, - ) - ) - + if settings.notification and _get_notification_targets(settings.notification): start_run.notifications = NotificationsModel( - notify_on_each_failure=self.settings.notification.notify_on_each_failure, - targets=targets, + notify_on_each_failure=settings.notification.notify_on_each_failure, + targets=_get_notification_targets(settings.notification), ) start_run.ci_context = resolve_ci_context() - return self.api.start_test_run(self.settings.project_key, start_run) + return self.api.start_test_run(settings.project_key, start_run) def start_suite(self, data: running.TestSuite, result: result.TestSuite) -> None: - # Skip all nonroot suites + # Skip all non root suites if data.id != "s1": return - if self._is_pabot_enabled(): - # Lock, create run or get from variables, release lock - from pabot.pabotlib import PabotLib - builtin = BuiltIn() - pabot: PabotLib = builtin.get_library_instance("pabot.PabotLib") - pabot.acquire_lock("zebrunner") + settings = zebrunner_context.settings + pabotlib = self._pabotlib() + if pabotlib: try: - if pabot.get_parallel_value_for_key("ZEBRUNNER_TEST_RUN_ID"): - self.test_run_id = pabot.get_parallel_value_for_key("ZEBRUNNER_TEST_RUN_ID") + pabotlib.acquire_lock(self.ZEBRUNNER_LOCK) + if pabotlib.get_parallel_value_for_key(self.TEST_RUN_ID_KEY): + zebrunner_context.test_run_id = pabotlib.get_parallel_value_for_key(self.TEST_RUN_ID_KEY) else: - self.test_run_id = self._start_test_run(data) - pabot.set_parallel_value_for_key("ZEBRUNNER_TEST_RUN_ID", self.test_run_id) + zebrunner_context.test_run_id = self._start_test_run(data) + pabotlib.set_parallel_value_for_key(self.TEST_RUN_ID_KEY, zebrunner_context.test_run_id) finally: - pabot.release_lock("zebrunner") + pabotlib.release_lock(self.ZEBRUNNER_LOCK) else: - self.test_run_id = self._start_test_run(data) + zebrunner_context.test_run_id = self._start_test_run(data) - if self.settings and self.settings.send_logs and self.test_run_id: - self.log_buffer.test_run_id = self.test_run_id - - if self.settings and self.test_run_id: - self.session_manager = inject_driver(self.settings, self.api, self.test_run_id) + if zebrunner_context.test_run_is_active: + self.session_manager = inject_driver( + settings, self.api, zebrunner_context.test_run_id + ) + if settings.send_logs: + self.log_buffer = LogBuffer(self.api, zebrunner_context.test_run_id) - def end_suite(self, name: str, attributes: dict) -> None: - if not self.settings: + def end_suite(self, data, result) -> None: + if not zebrunner_context.test_run_is_active: return - if self.test_run_id: - self.log_buffer.push_logs() - builtin = BuiltIn() - is_pabot_last = builtin.get_variable_value("${PABOTISLASTEXECUTIONINPOOL}") - if is_pabot_last == "1" or is_pabot_last is None: - if self.test_run_id: - self.api.finish_test_run(self.test_run_id) - if self.session_manager: - self.session_manager.finish_all_sessions() + self.log_buffer.push_logs() + + pabotlib = self._pabotlib() + if pabotlib: + try: + pabotlib.acquire_lock(self.ZEBRUNNER_LOCK) + running_count = pabotlib.get_parallel_value_for_key(self.RUNNING_COUNT_KEY) + if running_count != 0: + return + finally: + pabotlib.release_lock(self.ZEBRUNNER_LOCK) + + self.api.finish_test_run(zebrunner_context.test_run_id) + self.session_manager.finish_all_sessions() + + if zebrunner_context.settings.save_test_run: + with open(".zbr-test-run-id", "w") as f: + f.write(str(zebrunner_context.test_run_id)) def start_test(self, data: running.TestCase, result: result.TestCase) -> None: - if not self.settings: + if not zebrunner_context.test_run_is_active: return - if self.test_run_id: - start_test = StartTestModel( - name=data.name[:255], - class_name=data.parent.longname[:255], - test_case=data.parent.longname[:255], - method_name=data.name[:255], - ) - self.test_id = self.api.start_test(self.test_run_id, start_test) + maintainer = None + labels: List[LabelModel] = [] + + for tag in data.tags: + if tag.startswith("maintainer"): + maintainer = tag.split("=")[1].strip() + + if tag.startswith("labels: "): + tag_labels = tag.replace("labels: ", "").split(",") + for label in tag_labels: + labels.append( + LabelModel( + key=label.split("=")[0].strip(), + value=label.split("=")[1].strip(), + ) + ) + + if tag.startswith("test_rail_case_id"): + labels.append( + LabelModel( + key=TestRail.CASE_ID, + value=tag.split("=")[1].strip(), + ) + ) + + if tag.startswith("xray_test_key"): + labels.append( + LabelModel( + key=Xray.TEST_KEY, + value=tag.split("=")[1].strip(), + ) + ) + + if tag.startswith("zephyr_test_case_key"): + labels.append( + LabelModel( + key=Zephyr.TEST_CASE_KEY, value=tag.split("=")[1].strip() + ) + ) + + start_test = StartTestModel( + name=data.name, + class_name=data.parent.longname, + test_case=data.parent.longname, + method_name=data.name, + maintainer=maintainer, + labels=labels or None, + ) + zebrunner_context.test_id = self.api.start_test(zebrunner_context.test_run_id, start_test) + zebrunner_context.is_reverted = False + + if zebrunner_context.test_is_active: + self.session_manager.add_test(zebrunner_context.test_id) + + + pabotlib = self._pabotlib() + if pabotlib: + try: + pabotlib.acquire_lock(self.ZEBRUNNER_LOCK) + if pabotlib.get_parallel_value_for_key(self.RUNNING_COUNT_KEY): + running_count = pabotlib.get_parallel_value_for_key(self.RUNNING_COUNT_KEY) + pabotlib.set_parallel_value_for_key(self.RUNNING_COUNT_KEY, running_count + 1) + else: + pabotlib.set_parallel_value_for_key(self.RUNNING_COUNT_KEY, 1) + finally: + pabotlib.release_lock(self.ZEBRUNNER_LOCK) - if self.session_manager and self.test_id: - self.session_manager.add_test(self.test_id) def end_test(self, data: running.TestCase, attributes: result.TestCase) -> None: - if not self.settings: + if not zebrunner_context.test_is_active or zebrunner_context.is_reverted: return - if self.test_id and self.test_run_id: - if self.session_manager: - self.session_manager.remove_test(self.test_id) + self.session_manager.remove_test(zebrunner_context.test_id) + if attributes.status == "PASS": + status = TestStatus.PASSED + elif attributes.status == "FAIL": status = TestStatus.FAILED - if attributes.status == "PASS": - status = TestStatus.PASSED - elif attributes.status == "FAIL": - status = TestStatus.FAILED - else: - status = TestStatus.SKIPPED + else: + status = TestStatus.SKIPPED - finish_test = FinishTestModel(result=status.value, reason=attributes.message) - self.api.finish_test(self.test_run_id, self.test_id, finish_test) + finish_test = FinishTestModel(result=status.value, reason=attributes.message) + self.api.finish_test( + zebrunner_context.test_run_id, zebrunner_context.test_id, finish_test + ) + pabotlib = self._pabotlib() + if pabotlib: + try: + pabotlib.acquire_lock(self.ZEBRUNNER_LOCK) + if pabotlib.get_parallel_value_for_key(self.RUNNING_COUNT_KEY): + running_count = pabotlib.get_parallel_value_for_key(self.RUNNING_COUNT_KEY) + pabotlib.set_parallel_value_for_key(self.RUNNING_COUNT_KEY, running_count - 1) + finally: + pabotlib.release_lock(self.ZEBRUNNER_LOCK) def log_message(self, message: result.Message) -> None: - if not self.test_run_id or not self.test_id: + if not zebrunner_context.test_is_active: return self.log_buffer.add_log( LogRecordModel( - test_id=self.test_id, + test_id=zebrunner_context.test_id, level=message.level, timestamp=str(round(time.time() * 1000)), message=message.message, ) ) - def output_file(self, path: str) -> None: - if not self.test_run_id: - return - - self.api.send_artifact(path, self.test_run_id, self.test_id) - def log_file(self, path: str) -> None: - if not self.test_run_id: - return - - self.api.send_artifact(path, self.test_run_id, self.test_id) +def _get_notification_targets( + notification: Optional[NotificationsSettings], +) -> List[NotificationTargetModel]: + if notification is None: + return [] + targets = [] + if notification.emails: + targets.append( + NotificationTargetModel( + type=NotificationsType.EMAIL_RECIPIENTS, value=notification.emails + ) + ) + if notification.slack_channels: + targets.append( + NotificationTargetModel( + type=NotificationsType.SLACK_CHANNELS, value=notification.slack_channels + ) + ) + if notification.ms_teams_channels: + targets.append( + NotificationTargetModel( + type=NotificationsType.MS_TEAMS_CHANNELS, + value=notification.ms_teams_channels, + ) + ) -class ZebrunnerLib: - ROBOT_LIBRARY_LISTENER = ZebrunnerListener() - ROBOT_LIBRARY_SCOPE = "GLOBAL" + return targets diff --git a/src/robotframework_zebrunner/logs.py b/src/robotframework_zebrunner/logs.py index 6fa28d2..37588e0 100644 --- a/src/robotframework_zebrunner/logs.py +++ b/src/robotframework_zebrunner/logs.py @@ -1,7 +1,5 @@ - - -from datetime import datetime, timedelta import logging +from datetime import datetime, timedelta from typing import List, Optional import httpx diff --git a/src/robotframework_zebrunner/settings.py b/src/robotframework_zebrunner/settings.py index 433bc9b..b195d88 100644 --- a/src/robotframework_zebrunner/settings.py +++ b/src/robotframework_zebrunner/settings.py @@ -14,30 +14,19 @@ class TestRunSettings(BaseModel): - """ - A class that inherit from BaseModel and represents test_run settings. - """ - display_name: Optional[str] = None build: Optional[str] = None environment: Optional[str] = None context: Optional[str] = None + treat_skips_as_failures: bool = True class ServerSettings(BaseModel): - """ - A class that inherit from BaseModel and represents server settings. - """ - hostname: str access_token: str class NotificationsSettings(BaseModel): - """ - A class that inherit from BaseModel and represents notifications settings. - """ - slack_channels: Optional[str] = None ms_teams_channels: Optional[str] = None emails: Optional[str] = None @@ -45,19 +34,11 @@ class NotificationsSettings(BaseModel): class MilestoneSettings(BaseModel): - """ - A class that inherit from BaseModel and represents milestone settings. - """ - id: Optional[str] name: Optional[str] class ZebrunnerSettings(BaseModel): - """ - Zebrunner settings provided by launcher - """ - @property def desired_capabilities(self) -> Optional[dict]: try: @@ -71,13 +52,10 @@ def desired_capabilities(self) -> Optional[dict]: class Settings(BaseModel): - """ - A class that inherit from BaseModel and represents some settings. - """ - enabled: bool = True project_key: str = "DEF" send_logs: bool = True + save_test_run: bool = False server: ServerSettings run: TestRunSettings = TestRunSettings() notification: Optional[NotificationsSettings] = None @@ -89,12 +67,6 @@ def _list_settings(model: Type[BaseModel]) -> List: """ Extracts and returns a list with all model fields. Also goes deeper into fields that extend from BaseModel and extract theirs fields too. - - Args: - model (Type[BaseModel]): A model to list its fields. - - Returns: - setting_names (list): List with all model fields. """ setting_names = [] for field_name, field_value in model.__fields__.items(): @@ -114,11 +86,6 @@ def _put_by_path(settings_dict: dict, path: List[str], value: Any) -> None: Creates a dictionary with the first item in path as key and set value as its value if the amount of items in path is one. Otherwise, creates a set of nested dictionaries, with the first item in path at the top of the head. - - Args: - settings_dict (dict): Dictionary with settings fields. - path (List[str]): Strings to be set as dictionary keys. - value: Some value to be set to las dictionary key. """ if len(path) == 1: settings_dict[path[0]] = value @@ -132,11 +99,6 @@ def _get_by_path(settings_dict: dict, path: List[str], default_value: Any = None """ Returns the value of first path item key if path list has only one element. Otherwise, returns values of every key in path list recursively. - - Args: - settings_dict (dict): - path (List[str]): - default_value (optional): """ if len(path) == 1: return settings_dict.get(path[0], default_value) @@ -146,7 +108,6 @@ def _get_by_path(settings_dict: dict, path: List[str], default_value: Any = None def _load_env(path_list: List[List[str]]) -> dict: - """""" dotenv.load_dotenv(".env") settings: Dict[str, Any] = {} for path in path_list: diff --git a/src/robotframework_zebrunner/tcm/__init__.py b/src/robotframework_zebrunner/tcm/__init__.py new file mode 100644 index 0000000..eaf45ac --- /dev/null +++ b/src/robotframework_zebrunner/tcm/__init__.py @@ -0,0 +1,5 @@ +"""Test case management integration package""" + +from .test_rail import TestRail # noqa: F401 +from .xray import Xray # noqa: F401 +from .zephyr import Zephyr # noqa: F401 diff --git a/src/robotframework_zebrunner/tcm/base.py b/src/robotframework_zebrunner/tcm/base.py new file mode 100644 index 0000000..aefec87 --- /dev/null +++ b/src/robotframework_zebrunner/tcm/base.py @@ -0,0 +1,28 @@ +import logging + +from ..api.client import ZebrunnerAPI +from ..api.models import LabelModel +from ..context import zebrunner_context + + +class AgentException(Exception): + pass + + +class BaseTcm: + @staticmethod + def _attach_label(name: str, value: str) -> None: + api = ZebrunnerAPI(zebrunner_context.settings.server.hostname, zebrunner_context.settings.server.access_token) + if not zebrunner_context.test_run_is_active: + logging.error(f"Failed to attach label '{name}: {value}' to test run because it has not been started yet.") + return + label = LabelModel(key=name, value=value) + api.send_labels([label], zebrunner_context.test_run_id, zebrunner_context.test_id) + + @staticmethod + def _verify_no_tests() -> None: + if zebrunner_context.test_is_active: + raise AgentException( + "The TCM configuration must be provided before start of tests. Hint: move the " + "configuration to the code block which is executed before all tests." + ) diff --git a/src/robotframework_zebrunner/tcm/test_rail.py b/src/robotframework_zebrunner/tcm/test_rail.py new file mode 100644 index 0000000..c6dddd3 --- /dev/null +++ b/src/robotframework_zebrunner/tcm/test_rail.py @@ -0,0 +1,58 @@ +from .base import BaseTcm + + +class TestRail(BaseTcm): + SYNC_ENABLED = "com.zebrunner.app/tcm.testrail.sync.enabled" + SYNC_REAL_TIME = "com.zebrunner.app/tcm.testrail.sync.real-time" + INCLUDE_ALL = "com.zebrunner.app/tcm.testrail.include-all-cases" + SUITE_ID = "com.zebrunner.app/tcm.testrail.suite-id" + RUN_ID = "com.zebrunner.app/tcm.testrail.run-id" + RUN_NAME = "com.zebrunner.app/tcm.testrail.run-name" + MILESTONE = "com.zebrunner.app/tcm.testrail.milestone" + ASSIGNEE = "com.zebrunner.app/tcm.testrail.assignee" + CASE_ID = "com.zebrunner.app/tcm.testrail.case-id" + + @staticmethod + def disable_sync() -> None: + TestRail._verify_no_tests() + TestRail._attach_label(TestRail.SYNC_ENABLED, "false") + + @staticmethod + def enable_real_time_sync() -> None: + TestRail._verify_no_tests() + TestRail._attach_label(TestRail.SYNC_REAL_TIME, "true") + TestRail._attach_label(TestRail.INCLUDE_ALL, "true") + + @staticmethod + def include_all_test_cases_in_new_run() -> None: + TestRail._verify_no_tests() + TestRail._attach_label(TestRail.INCLUDE_ALL, "true") + + @staticmethod + def set_suite_id(suite_id: str) -> None: + TestRail._verify_no_tests() + TestRail._attach_label(TestRail.SUITE_ID, suite_id) + + @staticmethod + def set_run_id(run_id: str) -> None: + TestRail._verify_no_tests() + TestRail._attach_label(TestRail.RUN_ID, run_id) + + @staticmethod + def set_run_name(run_name: str) -> None: + TestRail._verify_no_tests() + TestRail._attach_label(TestRail.RUN_NAME, run_name) + + @staticmethod + def set_milestone(milestone: str) -> None: + TestRail._verify_no_tests() + TestRail._attach_label(TestRail.MILESTONE, milestone) + + @staticmethod + def set_assignee(assignee: str) -> None: + TestRail._verify_no_tests() + TestRail._attach_label(TestRail.ASSIGNEE, assignee) + + @staticmethod + def set_case_id(test_case_id: str) -> None: + TestRail._attach_label(TestRail.CASE_ID, test_case_id) diff --git a/src/robotframework_zebrunner/tcm/xray.py b/src/robotframework_zebrunner/tcm/xray.py new file mode 100644 index 0000000..4439ff0 --- /dev/null +++ b/src/robotframework_zebrunner/tcm/xray.py @@ -0,0 +1,27 @@ +from .base import BaseTcm + + +class Xray(BaseTcm): + SYNC_ENABLED = "com.zebrunner.app/tcm.xray.sync.enabled" + SYNC_REAL_TIME = "com.zebrunner.app/tcm.xray.sync.real-time" + EXECUTION_KEY = "com.zebrunner.app/tcm.xray.test-execution-key" + TEST_KEY = "com.zebrunner.app/tcm.xray.test-key" + + @staticmethod + def disable_sync() -> None: + Xray._verify_no_tests() + Xray._attach_label(Xray.SYNC_ENABLED, "false") + + @staticmethod + def enable_real_time_sync() -> None: + Xray._verify_no_tests() + Xray._attach_label(Xray.SYNC_REAL_TIME, "true") + + @staticmethod + def set_execution_key(execution_key: str) -> None: + Xray._verify_no_tests() + Xray._attach_label(Xray.EXECUTION_KEY, execution_key) + + @staticmethod + def set_test_key(test_key: str) -> None: + Xray._attach_label(Xray.TEST_KEY, test_key) diff --git a/src/robotframework_zebrunner/tcm/zephyr.py b/src/robotframework_zebrunner/tcm/zephyr.py new file mode 100644 index 0000000..08f6f05 --- /dev/null +++ b/src/robotframework_zebrunner/tcm/zephyr.py @@ -0,0 +1,33 @@ +from .base import BaseTcm + + +class Zephyr(BaseTcm): + SYNC_ENABLED = "com.zebrunner.app/tcm.zephyr.sync.enabled" + SYNC_REAL_TIME = "com.zebrunner.app/tcm.zephyr.sync.real-time" + TEST_CYCLE_KEY = "com.zebrunner.app/tcm.zephyr.test-cycle-key" + JIRA_PROJECT_KEY = "com.zebrunner.app/tcm.zephyr.jira-project-key" + TEST_CASE_KEY = "com.zebrunner.app/tcm.zephyr.test-case-key" + + @staticmethod + def disable_sync() -> None: + Zephyr._verify_no_tests() + Zephyr._attach_label(Zephyr.SYNC_ENABLED, "false") + + @staticmethod + def enable_real_time_sync() -> None: + Zephyr._verify_no_tests() + Zephyr._attach_label(Zephyr.SYNC_REAL_TIME, "true") + + @staticmethod + def set_test_cycle_key(key: str) -> None: + Zephyr._verify_no_tests() + Zephyr._attach_label(Zephyr.TEST_CYCLE_KEY, key) + + @staticmethod + def set_jira_project_key(key: str) -> None: + Zephyr._verify_no_tests() + Zephyr._attach_label(Zephyr.JIRA_PROJECT_KEY, key) + + @staticmethod + def set_test_case_key(key: str) -> None: + Zephyr._attach_label(Zephyr.TEST_CASE_KEY, key) diff --git a/src/robotframework_zebrunner/utils.py b/src/robotframework_zebrunner/utils.py new file mode 100644 index 0000000..b2064e6 --- /dev/null +++ b/src/robotframework_zebrunner/utils.py @@ -0,0 +1,12 @@ +class Singleton(type): + """ + A class that inherit form 'type' and allows to implement Singleton Pattern. + """ + + __instance = None + + def __call__(cls, *args, **kwargs): # type: ignore + if not isinstance(cls.__instance, cls): + cls.__instance = super(Singleton, cls).__call__(*args, **kwargs) + + return cls.__instance