Skip to content

Commit

Permalink
Merge pull request #248 from Limmen/dev
Browse files Browse the repository at this point in the history
cadvisor
  • Loading branch information
Limmen authored Aug 21, 2023
2 parents 6fd5730 + dc5c03d commit f795715
Showing 1 changed file with 116 additions and 3 deletions.
119 changes: 116 additions & 3 deletions simulation-system/libs/csle-cluster/tests/test_cluster_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -179,13 +179,15 @@ def test_startDockerEngine(self, grpc_stub, mocker: pytest_mock.MockFixture) ->
'ManagementSystemController.is_docker_engine_running', return_value=True)
mocker.patch('csle_common.controllers.management_system_controller.'
'ManagementSystemController.start_docker_engine', return_value=(False, None, None))
response: ServiceStatusDTO = csle_cluster.cluster_manager.query_cluster_manager.start_docker_engine(stub=grpc_stub)
response: ServiceStatusDTO = csle_cluster.cluster_manager.query_cluster_manager.start_docker_engine(
stub=grpc_stub)
assert response.running
mocker.patch('csle_common.controllers.management_system_controller.'
'ManagementSystemController.is_docker_engine_running', return_value=False)
mocker.patch('csle_common.controllers.management_system_controller.'
'ManagementSystemController.start_docker_engine', return_value=(True, "PIPE", "PIPE"))
response: ServiceStatusDTO = csle_cluster.cluster_manager.query_cluster_manager.start_docker_engine(stub=grpc_stub)
response: ServiceStatusDTO = csle_cluster.cluster_manager.query_cluster_manager.start_docker_engine(
stub=grpc_stub)
assert response.running

def test_stopDockerEngine(self, grpc_stub, mocker: pytest_mock.MockFixture) -> None:
Expand All @@ -200,7 +202,8 @@ def test_stopDockerEngine(self, grpc_stub, mocker: pytest_mock.MockFixture) -> N
'ManagementSystemController.is_docker_engine_running', return_value=True)
mocker.patch('csle_common.controllers.management_system_controller.'
'ManagementSystemController.stop_docker_engine', return_value=(False, None, None))
response: ServiceStatusDTO = csle_cluster.cluster_manager.query_cluster_manager.stop_docker_engine(stub=grpc_stub)
response: ServiceStatusDTO = csle_cluster.cluster_manager.query_cluster_manager.stop_docker_engine(
stub=grpc_stub)
assert not response.running

mocker.patch('csle_common.controllers.management_system_controller.'
Expand Down Expand Up @@ -253,3 +256,113 @@ def test_stopNginx(self, grpc_stub, mocker: pytest_mock.MockFixture) -> None:
'ManagementSystemController.stop_nginx', return_value=(True, "PIPE", "PIPE"))
response: ServiceStatusDTO = csle_cluster.cluster_manager.query_cluster_manager.stop_nginx(stub=grpc_stub)
assert not response.running

def test_startCAdvisor(self, grpc_stub, mocker: pytest_mock.MockFixture) -> None:
"""
Tests the startCAdvisor grpc
:param grpc_stub: the stub for the GRPC server to make the request to
:param mocker: the mocker object to mock functions with external dependencies
:return: None
"""
mocker.patch('csle_common.controllers.management_system_controller.'
'ManagementSystemController.start_cadvisor', return_value=False)
response: ServiceStatusDTO = csle_cluster.cluster_manager.query_cluster_manager.start_cadvisor(stub=grpc_stub)
assert response.running
mocker.patch('csle_common.controllers.management_system_controller.'
'ManagementSystemController.start_cadvisor', return_value=True)
response: ServiceStatusDTO = csle_cluster.cluster_manager.query_cluster_manager.start_cadvisor(stub=grpc_stub)
assert response.running

def test_stopCAdvisor(self, grpc_stub, mocker: pytest_mock.MockFixture) -> None:
"""
Tests the stopCAdvisor grpc
:param grpc_stub: the stub for the GRPC server to make the request to
:param mocker: the mocker object to mock functions with external dependencies
:return: None
"""
mocker.patch('csle_common.controllers.management_system_controller.'
'ManagementSystemController.stop_cadvisor', return_value=False)
response: ServiceStatusDTO = csle_cluster.cluster_manager.query_cluster_manager.stop_cadvisor(stub=grpc_stub)
assert not response.running
mocker.patch('csle_common.controllers.management_system_controller.'
'ManagementSystemController.stop_cadvisor', return_value=True)
response: ServiceStatusDTO = csle_cluster.cluster_manager.query_cluster_manager.stop_cadvisor(stub=grpc_stub)
assert not response.running

def test_startNodeExporter(self, grpc_stub, mocker: pytest_mock.MockFixture) -> None:
"""
Tests the startNodeExporter grpc
:param grpc_stub: the stub for the GRPC server to make the request to
:param mocker: the mocker object to mock functions with external dependencies
:return: None
"""
mocker.patch('csle_common.controllers.management_system_controller.'
'ManagementSystemController.start_node_exporter', return_value=False)
response: ServiceStatusDTO = csle_cluster.cluster_manager.query_cluster_manager.start_node_exporter(
stub=grpc_stub)
assert response.running
mocker.patch('csle_common.controllers.management_system_controller.'
'ManagementSystemController.start_node_exporter', return_value=True)
response: ServiceStatusDTO = csle_cluster.cluster_manager.query_cluster_manager.start_node_exporter(
stub=grpc_stub)
assert response.running

def test_stopNodeExporter(self, grpc_stub, mocker: pytest_mock.MockFixture) -> None:
"""
Tests the stopNodeExporter grpc
:param grpc_stub: the stub for the GRPC server to make the request to
:param mocker: the mocker object to mock functions with external dependencies
:return: None
"""
mocker.patch('csle_common.controllers.management_system_controller.'
'ManagementSystemController.stop_node_exporter', return_value=False)
response: ServiceStatusDTO = csle_cluster.cluster_manager.query_cluster_manager.stop_node_exporter(
stub=grpc_stub)
assert not response.running
mocker.patch('csle_common.controllers.management_system_controller.'
'ManagementSystemController.stop_node_exporter', return_value=True)
response: ServiceStatusDTO = csle_cluster.cluster_manager.query_cluster_manager.stop_node_exporter(
stub=grpc_stub)
assert not response.running

def test_startGrafana(self, grpc_stub, mocker: pytest_mock.MockFixture) -> None:
"""
Tests the startGrafana grpc
:param grpc_stub: the stub for the GRPC server to make the request to
:param mocker: the mocker object to mock functions with external dependencies
:return: None
"""
mocker.patch('csle_common.controllers.management_system_controller.'
'ManagementSystemController.start_grafana', return_value=False)
response: ServiceStatusDTO = csle_cluster.cluster_manager.query_cluster_manager.start_grafana(
stub=grpc_stub)
assert response.running
mocker.patch('csle_common.controllers.management_system_controller.'
'ManagementSystemController.start_grafana', return_value=True)
response: ServiceStatusDTO = csle_cluster.cluster_manager.query_cluster_manager.start_grafana(
stub=grpc_stub)
assert response.running

def test_stopGrafana(self, grpc_stub, mocker: pytest_mock.MockFixture) -> None:
"""
Tests the stopGrafana grpc
:param grpc_stub: the stub for the GRPC server to make the request to
:param mocker: the mocker object to mock functions with external dependencies
:return: None
"""
mocker.patch('csle_common.controllers.management_system_controller.'
'ManagementSystemController.stop_grafana', return_value=False)
response: ServiceStatusDTO = csle_cluster.cluster_manager.query_cluster_manager.stop_grafana(
stub=grpc_stub)
assert not response.running
mocker.patch('csle_common.controllers.management_system_controller.'
'ManagementSystemController.stop_grafana', return_value=True)
response: ServiceStatusDTO = csle_cluster.cluster_manager.query_cluster_manager.stop_grafana(
stub=grpc_stub)
assert not response.running

0 comments on commit f795715

Please sign in to comment.