Skip to content

Commit

Permalink
Merge pull request #104 from yukinchan/qe-test-in-pattern-repository
Browse files Browse the repository at this point in the history
Add interop tests from QE
  • Loading branch information
mbaldessari authored Feb 8, 2024
2 parents 73ba7ca + 06bed13 commit 7d8a316
Show file tree
Hide file tree
Showing 8 changed files with 805 additions and 0 deletions.
2 changes: 2 additions & 0 deletions tests/interop/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
__version__ = "0.1.0"
__loggername__ = "css_logger"
51 changes: 51 additions & 0 deletions tests/interop/conftest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
import os

import pytest
from kubernetes import config
from kubernetes.client import Configuration
from openshift.dynamic import DynamicClient

from . import __loggername__
from .css_logger import CSS_Logger


def pytest_addoption(parser):
parser.addoption(
"--kubeconfig",
action="store",
default=None,
help="The full path to the kubeconfig file to be used",
)


@pytest.fixture(scope="session")
def get_kubeconfig(request):
if request.config.getoption("--kubeconfig"):
k8config = request.config.getoption("--kubeconfig")
elif "KUBECONFIG" in os.environ.keys() and os.environ["KUBECONFIG"]:
k8config = os.environ["KUBECONFIG"]
else:
raise ValueError(
"A kubeconfig file was not provided. Please provide one either "
"via the --kubeconfig command option or by setting a KUBECONFIG "
"environment variable"
)
return k8config


@pytest.fixture(scope="session")
def kube_config(get_kubeconfig):
kc = Configuration
config.load_kube_config(config_file=get_kubeconfig, client_configuration=kc)
return kc


@pytest.fixture(scope="session")
def openshift_dyn_client(get_kubeconfig):
return DynamicClient(client=config.new_client_from_config(get_kubeconfig))


@pytest.fixture(scope="session", autouse=True)
def setup_logger():
logger = CSS_Logger(__loggername__)
return logger
64 changes: 64 additions & 0 deletions tests/interop/crd.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
from ocp_resources.resource import NamespacedResource, Resource


class DeploymentConfig(NamespacedResource):
"""
OpenShift DeploymentConfig object.
"""

api_version = "apps.openshift.io/v1"
kind = "DeploymentConfig"


class ArgoCD(NamespacedResource):
"""
OpenShift ArgoCD / GitOps object.
"""

api_group = "argoproj.io"
api_version = NamespacedResource.ApiVersion.V1ALPHA1
kind = "Application"

@property
def health(self):
"""
Check the health of of the argocd application
:return: boolean
"""

if (
self.instance.status.operationState.phase == "Succeeded"
and self.instance.status.health.status == "Healthy"
):
return True
return False


class ManagedCluster(Resource):
"""
OpenShift Managed Cluster object.
"""

api_version = "cluster.open-cluster-management.io/v1"

@property
def self_registered(self):
"""
Check if managed cluster is self registered in to ACM running on hub site
:param name: (str) name of managed cluster
:param namespace: namespace
:return: Tuple of boolean and dict on success
"""
is_joined = False
status = dict()

for condition in self.instance.status.conditions:
if condition["type"] == "HubAcceptedManagedCluster":
status["HubAcceptedManagedCluster"] = condition["status"]
elif condition["type"] == "ManagedClusterConditionAvailable":
status["ManagedClusterConditionAvailable"] = condition["status"]
elif condition["type"] == "ManagedClusterJoined":
is_joined = True
status["ManagedClusterJoined"] = condition["status"]

return is_joined, status
52 changes: 52 additions & 0 deletions tests/interop/css_logger.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
import logging
import os
from datetime import datetime
from logging.handlers import RotatingFileHandler

LOG_DIR = os.path.join(os.environ["WORKSPACE"], ".teflo/.results/test_execution_logs")
if not os.path.exists(LOG_DIR):
os.mkdir(LOG_DIR)


class CSS_Logger(object):
_logger = None

def __new__(cls, *args, **kwargs):
if cls._logger is None:
cls._logger = super(CSS_Logger, cls).__new__(cls)
# Put any initialization here.
cls._logger = logging.getLogger(args[0])
cls._logger.setLevel(logging.DEBUG)

pytest_current_test = os.environ.get("PYTEST_CURRENT_TEST")
split_test_name = pytest_current_test.split("::")[1]
short_test_name = split_test_name.split(" ")[0]

datestring = datetime.now().strftime("%Y_%m_%d_%H_%M_%S")
filename = "{}_{}.log".format(short_test_name, datestring)
filepath = os.path.join(LOG_DIR, filename)

# Create a file handler for logging level above DEBUG
file_handler = RotatingFileHandler(
filepath, maxBytes=1024 * 1024 * 1024, backupCount=20
)

# Create a logging format
log_formatter = logging.Formatter(
"%(asctime)s "
"[%(levelname)s] "
"%(module)s:%(lineno)d "
"%(message)s"
)
file_handler.setFormatter(log_formatter)

# Create a stream handler for logging level above INFO
stream_handler = logging.StreamHandler()
stream_handler.setLevel(logging.INFO)
stream_handler.setFormatter(log_formatter)

# Add the handlers to the logger
cls._logger.addHandler(file_handler)
cls._logger.addHandler(stream_handler)

return cls._logger
148 changes: 148 additions & 0 deletions tests/interop/edge_util.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
import base64
import fileinput
import logging
import os
import subprocess

import requests
import yaml
from ocp_resources.secret import Secret
from requests import HTTPError, RequestException
from urllib3.exceptions import InsecureRequestWarning, ProtocolError

from . import __loggername__

logger = logging.getLogger(__loggername__)


def load_yaml_file(file_path):
"""
Load and parse the yaml file
:param file_path: (str) file path
:return: (dict) yaml_config_obj in the form of Python dict
"""
yaml_config_obj = None
with open(file_path, "r") as yfh:
try:
yaml_config_obj = yaml.load(yfh, Loader=yaml.FullLoader)
except Exception as ex:
raise yaml.YAMLError("YAML Syntax Error:\n %s" % ex)
logger.info("Yaml Config : %s", yaml_config_obj)
return yaml_config_obj


def find_number_of_edge_sites(dir_path):
"""
Find the number of edge (managed cluster) sites folder
:param dir_path: (dtr) dir path where edge site manifest resides
:return: (list) site_names
"""
site_names = list()
list_of_dirs = os.listdir(path=dir_path)

for site_dir in list_of_dirs:
if "staging" in site_dir:
site_names.append(site_dir)

return site_names


def get_long_live_bearer_token(
dyn_client, namespace="default", sub_string="default-token"
):
"""
Get bearer token from secrets to authorize openshift cluster
:param sub_string: (str) substring of secrets name to find actual secret name since openshift append random
5 ascii digit at the end of every secret name
:param namespace: (string) name of namespace where secret exist
:return: (string) secret token for specified secret
"""
filtered_secrets = []
try:
for secret in Secret.get(dyn_client=dyn_client, namespace=namespace):
if sub_string in secret.instance.metadata.name:
filtered_secrets.append(secret.instance.data.token)
except StopIteration:
logger.exception(
"Specified substring %s doesn't exist in namespace %s",
sub_string,
namespace,
)
except ProtocolError as e:
# See https://github.com/kubernetes-client/python/issues/1225
logger.info(
"Skip %s... because kubelet disconnect client after default" " 10m...",
e,
)

# All secret tokens in openshift are base64 encoded.
# Decode base64 string into byte and convert byte to str
if len(filtered_secrets) > 0:
bearer_token = base64.b64decode(filtered_secrets[-1]).decode()
return bearer_token
else:
return None


def get_site_response(site_url, bearer_token):
"""
:param site_url: (str) Site API end point
:param bearer_token: (str) bearer token
:return: (dict) site_response
"""
site_response = None
headers = {"Authorization": "Bearer " + bearer_token}

try:
# Suppress only the single warning from urllib3 needed.
requests.packages.urllib3.disable_warnings(category=InsecureRequestWarning)
site_response = requests.get(site_url, headers=headers, verify=False)
except (ConnectionError, HTTPError, RequestException) as e:
logger.exception(
"Failed to connect %s due to refused connection or unsuccessful"
" status code %s",
site_url,
e,
)
logger.debug("Site Response %s: ", site_response)

return site_response


def execute_shell_command_local(cmd):
"""
Executes a shell command in a subprocess, wait until it has completed.
:param cmd: Command to execute.
"""
proc = subprocess.Popen(
cmd,
shell=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
universal_newlines=True,
)
(out, error) = proc.communicate()
exit_code = proc.wait()
return exit_code, out, error


def modify_file_content(file_name):
with open(file_name, "r") as frb:
logger.debug(f"Current content : {frb.readlines()}")

with fileinput.FileInput(file_name, inplace=True, backup=".bak") as file:
for line in file:
print(
line.replace(
'SENSOR_TEMPERATURE_ENABLED: "false"',
'SENSOR_TEMPERATURE_ENABLED: "true"',
),
end="",
)

with open(file_name, "r") as fra:
contents = fra.readlines()
logger.debug(f"Modified content : {contents}")

return contents
50 changes: 50 additions & 0 deletions tests/interop/test_scale_image_generator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
import logging
import time

import pytest

from . import __loggername__
from .crd import DeploymentConfig

logger = logging.getLogger(__loggername__)


@pytest.mark.scale_image_generator
def test_scale_image_generator(openshift_dyn_client):
name = "image-generator"
replicas = 1
body = {"spec": {"replicas": replicas}, "metadata": {"name": name}}

logger.info("Check current replicas for image-generator deploymentconfig")
dc = DeploymentConfig.get(
dyn_client=openshift_dyn_client, namespace="xraylab-1", name=name
)
dc = next(dc)

if int(dc.instance.status.replicas) != 0:
err_msg = "Expected 0 replicas for image-generator deploymentconfig"
logger.error(f"FAIL: {err_msg}")
assert False, err_msg
else:
logger.info(f"Replicas found: {dc.instance.status.replicas}")

logger.info("Scale image-generator deploymentconfig")
dc.update(resource_dict=body)

logger.info("Wait for image-generator pod")
timeout = time.time() + 120
while time.time() < timeout:
time.sleep(5)
dc = DeploymentConfig.get(
dyn_client=openshift_dyn_client, namespace="xraylab-1", name=name
)
dc = next(dc)
if int(dc.instance.status.replicas) == replicas:
break

if int(dc.instance.status.replicas) != replicas:
err_msg = f"Expected {replicas} replicas for {name} deploymentconfig"
logger.error(f"FAIL: {err_msg}")
assert False, err_msg
else:
logger.info("PASS: Scale image-generator test passed")
Loading

0 comments on commit 7d8a316

Please sign in to comment.