import sys
 from typing import Dict, List, Set
 from setuptools import setup
     "geoalchemy2": "GeoAlchemy2~=0.12",
     "google-cloud-monitoring": "google-cloud-monitoring>=2.0.0",
     "google-cloud-storage": "google-cloud-storage==1.43.0",
-    "gcsfs": "gcsfs>=2023.1.0",
"gcsfs": "gcsfs>=2023.10.0",
     "great-expectations": "great-expectations>=0.18.0,<0.18.14",
     "grpc-tools": "grpcio-tools>=1.47.2",
     "msal": "msal~=1.2",
if sys.version_info >= (3, 9):
test.add("locust~=2.32.0")
 e2e_test = {
     # playwright dependencies
"""Base class for resource tests."""
from locust.contrib.fasthttp import FastHttpSession
from requests.auth import AuthBase
class BearerAuth(AuthBase):
def __init__(self, token):
self.token = token
def __call__(self, r):
r.headers["authorization"] = "Bearer " + self.token
return r
def login_user(client: FastHttpSession) -> BearerAuth:
resp = client.post(
"/api/v1/users/login",
json={"email": "admin@open-metadata.org", "password": "YWRtaW4="},
)
token = resp.json().get("accessToken")
return BearerAuth(token)
## Adding a new resource to load tests
Add a new `*.py` file to `test_resources/tasks`. The naming does not matter, but we use the resource name as defined in Java, but seperated by `_` (e.g. `TestCaseResource` becomes `test_case_tasks.py`).
In your newly created file, you'll need to import at minimum 1 package
from locust import task, TaskSet
`task` will be used as a decorator to define our task that will run as part of our load test. `TaskSet` wil be inherited by our task set class.
Here is an example of a locust task definition. The integer argument in `@task` will give a specific weigth to the task (i.e. increasing its probability to be ran)
class TestCaseResultTasks(TaskSet):
"""Test case result resource load test"""
def _list_test_case_results(self, start_ts: int, end_ts: int, days_range: str):
"""List test case results for a given time range
Args:
start_ts (int): start timestamp
end_ts (int): end timestamp
range (str): 
"""
for test_case in self.test_cases:
fqn = test_case.get("fullyQualifiedName")
if fqn:
self.client.get(
f"{TEST_CASE_RESULT_RESOURCE_PATH}/{fqn}",
params={ # type: ignore
"startTs": start_ts,
"endTs": end_ts,
},
auth=self.bearer,
name=f"{TEST_CASE_RESULT_RESOURCE_PATH}/[fqn]/{days_range}"
)
@task(3)
def list_test_case_results_30_days(self):
"""List test case results for the last 30 days. Weighted 3"""
now = datetime.now()
last_30_days = int((now - timedelta(days=30)).timestamp() * 1000)
self._list_test_case_results(last_30_days, int(now.timestamp() * 1000), "30_days")
Notice how we use `self.client.get` to perform the request. This is provided by locust `HttpSession`. If the request needs to be authenticated, you can use `auth=self.bearer`. You will need to first define `self.bearer`, you can achieve this using the `on_start` hook from locust.
from _openmetadata_testutils.helpers.login_user import login_user
class TestCaseResultTasks(TaskSet):
"""Test case result resource load test"""
[...]
def on_start(self):
"""Get a list of test cases to fetch results for"""
self.bearer = login_user(self.client)
resp = self.client.get(f"{TEST_CASE_RESOURCE_PATH}", params={"limit": 100}, auth=self.bearer)
json = resp.json()
self.test_cases = json.get("data", [])
You MUST define a `def stop(self)` methodd in your `TaskSet` class as shown below so that control is given back to the parent user class.
class TestCaseResultTasks(TaskSet):
"""Test case result resource load test"""
[...]
@task
def stop(self):
self.interrupt()
If your request contains a parameter (i.e. `/api/v1/dataQuality/testCases/testCaseResults/{fqn}`) you can name your request so all the request sent you will be grouped together like this
params={ # type: ignore
"startTs": start_ts,
"endTs": end_ts,
},
auth=self.bearer,
name=f"{TEST_CASE_RESULT_RESOURCE_PATH}/[fqn]/{days_range}"
Notice the argument `name=f"{TEST_CASE_RESULT_RESOURCE_PATH}/[fqn]/{days_range}"`, this will define under which name the requests will be grouped. Example of statistics summary below grouped by the request `name`
Type,Name,Request Count,Failure Count,Median Response Time,Average Response Time,Min Response Time,Max Response Time,Average Content Size,Requests/s,Failures/s,50%,66%,75%,80%,90%,95%,98%,99%,99.9%,99.99%,100%
As a final step in `test_resources/manifest.yaml` add the resources, the metrics and the thresholds you want to test.
type: GET
99%: 100
type: GET
99%: 100
This will test that our GET request for the defined resources are running 99% of the time in less than 100 milliseconds (0.1 seconds).
Below is a list of all the metrics you can use:
- Request Count
- Failure Count
- Median Response Time
- Average Response Time
- Min Response Time
- Max Response Time
- Average Content Size
- Requests/s
- Failures/s
- 50%
- 66%
- 75%
- 80%
- 90%
- 95%
- 98%
- 99%
- 99.9%
- 99.99%
- 100%
"""Run test case result resource load test"""
import csv
import os
import sys
from pathlib import Path
from unittest import TestCase, skipIf
import yaml
from ingestion.tests.load.utils import run_load_test
def run_all_resources(summary_file: str, locust_file: str):
"""Test test case result resource"""
args = [
"locust",
"--headless",
"-H",
"http://localhost:8585",
"--user",
os.getenv("LOCUST_USER", "50"),
"--spawn-rate",
"1",
"-f",
str(locust_file),
"--run-time",
os.getenv("LOCUST_RUNTIME", "1m"),
"--only-summary",
"--csv",
str(summary_file),
]
run_load_test(args)
class TestAllResources(TestCase):
"""Test class to run all resources load test"""
@skipIf(sys.version_info < (3, 9), "locust is not supported on python 3.8")
def test_all_resources(self):
"""Test all resources"""
directory = Path(__file__).parent
test_resources_dir = directory.joinpath("test_resources")
locust_file = test_resources_dir.joinpath("all_resources.py")
summary_file = directory.parent.joinpath("load/summaries/all_")
manifest_file = test_resources_dir.joinpath("manifest.yaml")
run_all_resources(str(summary_file), str(locust_file))
with open(manifest_file, "r", encoding="utf-8") as f:
manifest = yaml.safe_load(f)
with open(str(summary_file) + "_stats.csv", "r", encoding="utf-8") as f:
reader = csv.DictReader(f)
for row in reader:
name = row.get("Name")
if name in manifest:
resource = manifest.get(name)
type_ = resource.get("type")
if type_ == row.get("Type"):
for metric, threshold in resource.items():
with self.subTest(
+                                stat = row.get(metric)
+                                if stat:
+                                    stat = int(stat)
+                                    self.assertLessEqual(
+                                        stat,
+                                        threshold,
+                                        msg=f"{metric} for {name} is greater than threshold",
+                                    )
+"""Test class to run all resources load test"""
+import importlib.util
+import inspect
+import logging
+from pathlib import Path
+from typing import List
+from locust import HttpUser, TaskSet, constant
+TASKS_DIR = "tasks"
+logger = logging.getLogger(__name__)
+def get_all_tasks_set() -> List:
+    resource_classes = []
+    wd = Path(__file__).parent.joinpath(TASKS_DIR)
+    for file_path in wd.glob("*.py"):
+        if not str(file_path).startswith("base_"):
+            module_path = str(file_path)
+            module_name = file_path.stem
+            spec = importlib.util.spec_from_file_location(module_name, module_path)
+            if not spec:
+                logger.error(f"Could not load module {module_name}")
+                continue
+            module = importlib.util.module_from_spec(spec)
+            spec.loader.exec_module(module)  # type: ignore
+            for _, obj in inspect.getmembers(module, inspect.isclass):
+                if obj.__module__ == module_name:
+                    resource_classes.append(obj)
+    return resource_classes
+class AllResources(TaskSet):
+    """Execute tasks for all resources"""
+    @classmethod
+    def set_tasks(cls):
+        tasks = get_all_tasks_set()
+        cls.tasks = set(tasks)
+class All(HttpUser):
+    host = "http://localhost:8585"
+    wait_time = constant(1)  # closed workload
+    AllResources.set_tasks()
+    tasks = [AllResources]
+# Description: This file contains the manifest for the test resources.
+# You can add as a key of a resource any metric available in `summaries/all__summaries.csv` file.
+# times should be expressed in milliseconds (e.g. 1000ms = 1s)
+  type: GET
+  99%: 1000
+  type: GET
+  99%: 1000
+  type: GET
+  99%: 1000
+  type: GET
+  99%: 6000
+"""Load test for the test case result resources"""
+from datetime import datetime, timedelta
+from locust import TaskSet, task
+from _openmetadata_testutils.helpers.login_user import login_user
+TEST_CASE_RESULT_RESOURCE_PATH = "/api/v1/dataQuality/testCases/testCaseResults"
+TEST_CASE_RESOURCE_PATH = "/api/v1/dataQuality/testCases"
+class TestCaseResultTasks(TaskSet):
+    """Test case result resource load test"""
+    def __init__(self, *args, **kwargs):
+        super().__init__(*args, **kwargs)
+        self.test_cases = []
+    def _list_test_case_results(self, start_ts: int, end_ts: int, days_range: str):
+        """List test case results for a given time range
+        Args:
+            start_ts (int): start timestamp
+            end_ts (int): end timestamp
+            range (str):
+        """
+        for test_case in self.test_cases:
+            fqn = test_case.get("fullyQualifiedName")
+            if fqn:
+                self.client.get(
+                    f"{TEST_CASE_RESULT_RESOURCE_PATH}/{fqn}",
+                    params={  # type: ignore
+                        "startTs": start_ts,
+                        "endTs": end_ts,
+                    },
+                    auth=self.bearer,
+                    name=f"{TEST_CASE_RESULT_RESOURCE_PATH}/[fqn]/{days_range}",
+                )
+    @task(3)
+    def list_test_case_results_30_days(self):
+        """List test case results for the last 30 days. Weighted 3"""
+        now = datetime.now()
+        last_30_days = int((now - timedelta(days=30)).timestamp() * 1000)
+        self._list_test_case_results(
+            last_30_days, int(now.timestamp() * 1000), "30_days"
+        )
+    @task(2)
+    def list_test_case_results_60_days(self):
+        """List test case results for the last 60 days. Weighted 2"""
+        now = datetime.now()
+        last_60_days = int((now - timedelta(days=60)).timestamp() * 1000)
+        self._list_test_case_results(
+            last_60_days, int(now.timestamp() * 1000), "60_days"
+        )
+    @task
+    def list_test_case_results_180_days(self):
+        """List test case results for the last 180 days"""
+        now = datetime.now()
+        last_180_days = int((now - timedelta(days=180)).timestamp() * 1000)
+        self._list_test_case_results(
+            last_180_days, int(now.timestamp() * 1000), "180_days"
+        )
+    @task
+    def stop(self):
+        self.interrupt()
+    def on_start(self):
+        """Get a list of test cases to fetch results for"""
+        self.bearer = login_user(self.client)
+        resp = self.client.get(
+            f"{TEST_CASE_RESOURCE_PATH}",
+            params={"limit": 100},
+            auth=self.bearer,
+            name=f"{TEST_CASE_RESOURCE_PATH}?limit=100",
+        )
+        json = resp.json()
+        self.test_cases = json.get("data", [])
+"""Load test for the test case resources"""
+from locust import TaskSet, task
+from _openmetadata_testutils.helpers.login_user import login_user
+TEST_CASE_RESOURCE_PATH = "/api/v1/dataQuality/testCases"
+class TestCaseTasks(TaskSet):
+    """Test case resource load test"""
+    def _list_test_cases(self):
+        """Paginate through the test cases"""
+        resp = self.client.get(
+            f"{TEST_CASE_RESOURCE_PATH}",
+            params={"limit": 10},
+            auth=self.bearer,
+            name=f"{TEST_CASE_RESOURCE_PATH}?limit=10",
+        )
+        after = resp.json().get("paging", {}).get("after")
+        while after:
+            resp = self.client.get(
+                f"{TEST_CASE_RESOURCE_PATH}",
+                params={"limit": 10, "after": after},
+                auth=self.bearer,
+                name=f"{TEST_CASE_RESOURCE_PATH}?limit=10",
+            )
+            after = resp.json().get("paging", {}).get("after")
+    @task(2)
+    def list_test_cases(self):
+        """List test cases. Weighted 2"""
+        self._list_test_cases()
+    @task
+    def stop(self):
+        self.interrupt()
+    def on_start(self):
+        self.bearer = login_user(self.client)
+"""Utils functions for load testing."""
+import sys
+from typing import List
+import pytest
+from locust import main
+TEST_CASE_RESOURCE_PATH = "/api/v1/dataQuality/testCases"
+TEST_CASE_RESULT_RESOURCE_PATH = "/api/v1/dataQuality/testCases/testCaseResults"
+def run_load_test(args: List[str]):
+    """Test test case result resource"""
+    original_argv = sys.argv
+    try:
+        sys.argv = args
+        with pytest.raises(SystemExit) as excinfo:
+            main.main()
+        assert excinfo.value.code == 0
+    finally:
+        sys.argv = original_argv