Skip to content

Commit

Permalink
Add tests for CLI commands (#586)
Browse files Browse the repository at this point in the history
(cherry picked from commit 7add2f3)
  • Loading branch information
christophfroehlich authored and mergify[bot] committed Sep 25, 2024
1 parent 48d62f0 commit 616f1f3
Show file tree
Hide file tree
Showing 4 changed files with 147 additions and 3 deletions.
1 change: 1 addition & 0 deletions example_1/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ if(BUILD_TESTING)
ament_add_pytest_test(example_1_urdf_xacro test/test_urdf_xacro.py)
ament_add_pytest_test(view_example_1_launch test/test_view_robot_launch.py)
ament_add_pytest_test(run_example_1_launch test/test_rrbot_launch.py)
ament_add_pytest_test(run_example_1_launch_cli_direct test/test_rrbot_launch_cli_direct.py)
endif()


Expand Down
39 changes: 39 additions & 0 deletions example_1/test/test_rrbot_launch.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import os
import pytest
import unittest
import subprocess

from ament_index_python.packages import get_package_share_directory
from launch import LaunchDescription
Expand Down Expand Up @@ -89,6 +90,44 @@ def test_check_if_msgs_published(self):
check_if_js_published("/joint_states", ["joint1", "joint2"])


class TestFixtureCLI(unittest.TestCase):

def setUp(self):
rclpy.init()
self.node = Node("test_node")

def tearDown(self):
self.node.destroy_node()
rclpy.shutdown()

def test_main(self, proc_output):

# Command to run the CLI
cname = "joint_trajectory_position_controller"
command = ["ros2", "control", "load_controller", cname]
subprocess.run(command, check=True)
check_controllers_running(self.node, [cname], state="unconfigured")
check_controllers_running(self.node, ["forward_position_controller"], state="active")

command = ["ros2", "control", "set_controller_state", cname, "inactive"]
subprocess.run(command, check=True)
check_controllers_running(self.node, [cname], state="inactive")
check_controllers_running(self.node, ["forward_position_controller"], state="active")

command = [
"ros2",
"control",
"set_controller_state",
"forward_position_controller",
"inactive",
]
subprocess.run(command, check=True)
command = ["ros2", "control", "set_controller_state", cname, "active"]
subprocess.run(command, check=True)
check_controllers_running(self.node, ["forward_position_controller"], state="inactive")
check_controllers_running(self.node, [cname], state="active")


# TODO(anyone): enable this if shutdown of ros2_control_node does not fail anymore
# @launch_testing.post_shutdown_test()
# # These tests are run after the processes in generate_test_description() have shutdown.
Expand Down
104 changes: 104 additions & 0 deletions example_1/test/test_rrbot_launch_cli_direct.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
# Copyright (c) 2024 AIT - Austrian Institute of Technology GmbH
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
#
# * Redistributions in binary form must reproduce the above copyright
# notice, this list of conditions and the following disclaimer in the
# documentation and/or other materials provided with the distribution.
#
# * Neither the name of the {copyright_holder} nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
#
# Author: Christoph Froehlich

import os
import pytest
import unittest
import subprocess

from ament_index_python.packages import get_package_share_directory
from launch import LaunchDescription
from launch.actions import IncludeLaunchDescription
from launch.launch_description_sources import PythonLaunchDescriptionSource
from launch_testing.actions import ReadyToTest

# import launch_testing.markers
import rclpy
from rclpy.node import Node
from ros2_control_demo_testing.test_utils import check_controllers_running


# Executes the given launch file and checks if all nodes can be started
@pytest.mark.rostest
def generate_test_description():
launch_include = IncludeLaunchDescription(
PythonLaunchDescriptionSource(
os.path.join(
get_package_share_directory("ros2_control_demo_example_1"),
"launch/rrbot.launch.py",
)
),
launch_arguments={"gui": "False"}.items(),
)

return LaunchDescription([launch_include, ReadyToTest()])


class TestFixtureCliDirect(unittest.TestCase):

def setUp(self):
rclpy.init()
self.node = Node("test_node")

def tearDown(self):
self.node.destroy_node()
rclpy.shutdown()

def test_main(self, proc_output):

# Command to run the CLI
cname = "joint_trajectory_position_controller"
command = ["ros2", "control", "load_controller", "--set-state", "inactive", cname]
subprocess.run(command, check=True)
check_controllers_running(self.node, [cname], state="inactive")
check_controllers_running(self.node, ["forward_position_controller"], state="active")

command = [
"ros2",
"control",
"switch_controllers",
"--activate",
cname,
"--deactivate",
"forward_position_controller",
]
subprocess.run(command, check=True)
check_controllers_running(self.node, ["forward_position_controller"], state="inactive")
check_controllers_running(self.node, [cname], state="active")


# TODO(anyone): enable this if shutdown of ros2_control_node does not fail anymore
# @launch_testing.post_shutdown_test()
# # These tests are run after the processes in generate_test_description() have shutdown.
# class TestDescriptionCraneShutdown(unittest.TestCase):

# def test_exit_codes(self, proc_info):
# """Check if the processes exited normally."""
# launch_testing.asserts.assertExitCodes(proc_info)
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ def check_node_running(node, node_name, timeout=5.0):
assert found, f"{node_name} not found!"


def check_controllers_running(node, cnames, namespace=""):
def check_controllers_running(node, cnames, namespace="", state="active"):

# wait for controller to be loaded before we call the CM services
found = {cname: False for cname in cnames} # Define 'found' as a dictionary
Expand Down Expand Up @@ -85,14 +85,14 @@ def check_controllers_running(node, cnames, namespace=""):
assert controllers, "No controllers found!"
for c in controllers:
for cname in cnames:
if c.name == cname and c.state == "active":
if c.name == cname and c.state == state:
found[cname] = True
break
time.sleep(0.1)

assert all(
found.values()
), f"Controller(s) not found or not active: {', '.join([cname for cname, is_found in found.items() if not is_found])}"
), f"Controller(s) not found or not {state}: {', '.join([cname for cname, is_found in found.items() if not is_found])}"


def check_if_js_published(topic, joint_names):
Expand Down

0 comments on commit 616f1f3

Please sign in to comment.