diff --git a/rosbridge_library/src/rosbridge_library/capabilities/action_result.py b/rosbridge_library/src/rosbridge_library/capabilities/action_result.py new file mode 100644 index 000000000..6fc979c7d --- /dev/null +++ b/rosbridge_library/src/rosbridge_library/capabilities/action_result.py @@ -0,0 +1,41 @@ +from rosbridge_library.capability import Capability +from rosbridge_library.internal import message_conversion, ros_loader + + +class ActionResult(Capability): + + action_result_msg_fields = [ + (True, "action", str), + (False, "id", str), + (False, "values", dict), + (True, "result", bool), + ] + + def __init__(self, protocol): + # Call superclass constructor + Capability.__init__(self, protocol) + + # Register the operations that this capability provides + protocol.register_operation("action_result", self.action_result) + + def action_result(self, message): + # Typecheck the args + self.basic_type_check(message, self.action_result_msg_fields) + + # check for the action + action_name = message["action"] + if action_name in self.protocol.external_action_list: + action_handler = self.protocol.external_action_list[action_name] + # parse the message + goal_id = message["id"] + values = message["values"] + # create a message instance + result = ros_loader.get_action_result_instance(action_handler.action_type) + message_conversion.populate_instance(values, result) + # pass along the result + action_handler.handle_result(goal_id, result) + else: + self.protocol.log( + "error", + f"Action {action_name} has not been advertised via rosbridge.", + ) diff --git a/rosbridge_library/src/rosbridge_library/capabilities/advertise_action.py b/rosbridge_library/src/rosbridge_library/capabilities/advertise_action.py new file mode 100644 index 000000000..0ab281c0d --- /dev/null +++ b/rosbridge_library/src/rosbridge_library/capabilities/advertise_action.py @@ -0,0 +1,136 @@ +import fnmatch + +import rclpy +from rclpy.action import ActionServer +from rclpy.callback_groups import ReentrantCallbackGroup +from rosbridge_library.capability import Capability +from rosbridge_library.internal import message_conversion +from rosbridge_library.internal.ros_loader import get_action_class + + +class AdvertisedActionHandler: + + id_counter = 1 + + def __init__(self, action_name, action_type, protocol): + self.goal_futures = {} + self.action_name = action_name + self.action_type = action_type + self.protocol = protocol + # setup the action + self.action_server = ActionServer( + protocol.node_handle, + get_action_class(action_type), + action_name, + self.execute_callback, + callback_group=ReentrantCallbackGroup(), # https://github.com/ros2/rclpy/issues/834#issuecomment-961331870 + ) + + def next_id(self): + id = self.id_counter + self.id_counter += 1 + return id + + async def execute_callback(self, goal): + # generate a unique ID + goal_id = f"action_goal:{self.action}:{self.next_id()}" + + future = rclpy.task.Future() + self.request_futures[goal_id] = future + + # build a request to send to the external client + goal_message = { + "op": "send_action_goal", + "id": goal_id, + "action": self.action_name, + "args": message_conversion.extract_values(goal), + } + self.protocol.send(goal_message) + + try: + return await future + finally: + del self.goal_futures[goal_id] + + def handle_result(self, goal_id, res): + """ + Called by the ActionResult capability to handle an action result from the external client. + """ + if goal_id in self.goal_futures: + self.goal_futures[goal_id].set_result(res) + else: + self.protocol.log("warning", f"Received action result for unrecognized id: {goal_id}") + + def graceful_shutdown(self): + """ + Signal the AdvertisedActionHandler to shutdown. + """ + if self.goal_futures: + incomplete_ids = ", ".join(self.goal_futures.keys()) + self.protocol.log( + "warning", + f"Action {self.action_name} was unadvertised with an action in progress, " + f"aborting action goals with request IDs {incomplete_ids}", + ) + for future_id in self.goal_futures: + future = self.goal_futures[future_id] + future.set_exception(RuntimeError(f"Goal {self.action_name} was unadvertised")) + self.action_server.destroy() + + +class AdvertiseAction(Capability): + actions_glob = None + + advertise_action_msg_fields = [(True, "action", str), (True, "type", str)] + + def __init__(self, protocol): + # Call superclass constructor + Capability.__init__(self, protocol) + + # Register the operations that this capability provides + protocol.register_operation("advertise_action", self.advertise_action) + + def advertise_action(self, message): + # Typecheck the args + self.basic_type_check(message, self.advertise_action_msg_fields) + + # parse the incoming message + action_name = message["action"] + + if AdvertiseAction.actions_glob is not None and AdvertiseAction.actions_glob: + self.protocol.log( + "debug", + "Action security glob enabled, checking action: " + action_name, + ) + match = False + for glob in AdvertiseAction.actions_glob: + if fnmatch.fnmatch(action_name, glob): + self.protocol.log( + "debug", + "Found match with glob " + glob + ", continuing action advertisement...", + ) + match = True + break + if not match: + self.protocol.log( + "warn", + "No match found for action, cancelling action advertisement for: " + + action_name, + ) + return + else: + self.protocol.log( + "debug", "No action security glob, not checking action advertisement." + ) + + # check for an existing entry + if action_name in self.protocol.external_action_list.keys(): + self.protocol.log("warn", f"Duplicate action advertised. Overwriting {action_name}.") + self.protocol.external_action_list[action_name].graceful_shutdown() + del self.protocol.external_action_list[action_name] + + # setup and store the action information + action_type = message["type"] + action_handler = AdvertisedActionHandler(action_name, action_type, self.protocol) + self.protocol.external_action_list[action_name] = action_handler + self.protocol.log("info", f"Advertised action {action_name}") diff --git a/rosbridge_library/src/rosbridge_library/internal/actions.py b/rosbridge_library/src/rosbridge_library/internal/actions.py new file mode 100644 index 000000000..db4943a72 --- /dev/null +++ b/rosbridge_library/src/rosbridge_library/internal/actions.py @@ -0,0 +1,135 @@ +# Software License Agreement (BSD License) +# +# Copyright (c) 2023, PickNik Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions +# are met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following +# disclaimer in the documentation and/or other materials provided +# with the distribution. +# * Neither the name of the copyright holder nor the names of its +# contributors may be used to endorse or promote products derived +# from this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS +# FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE +# COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, +# INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, +# BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; +# LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER +# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT +# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN +# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE +# POSSIBILITY OF SUCH DAMAGE. + +import time +from threading import Thread + +import rclpy +from rclpy.action import ActionClient +from rclpy.expand_topic_name import expand_topic_name +from rosbridge_library.internal.message_conversion import ( + extract_values, + populate_instance, +) +from rosbridge_library.internal.ros_loader import ( + get_action_class, + get_action_goal_instance, +) + + +class InvalidActionException(Exception): + def __init__(self, action_name): + Exception.__init__(self, f"Action {action_name} does not exist") + + +class ActionClientHandler(Thread): + def __init__(self, action, action_type, args, success_callback, error_callback, node_handle): + """ + Create a client handler for the specified action. + Use start() to start in a separate thread or run() to run in this thread. + + Keyword arguments: + action -- the name of the action to execute. + args -- arguments to pass to the action. Can be an + ordered list, or a dict of name-value pairs. Anything else will be + treated as though no arguments were provided (which is still valid for + some kinds of actions) + success_callback -- a callback to call with the JSON result of the + service call + error_callback -- a callback to call if an error occurs. The + callback will be passed the exception that caused the failure + node_handle -- a ROS 2 node handle to call services. + """ + Thread.__init__(self) + self.daemon = True + self.action = action + self.action_type = action_type + self.args = args + self.success = success_callback + self.error = error_callback + self.node_handle = node_handle + + def run(self): + try: + # Call the service and pass the result to the success handler + self.success(send_goal(self.node_handle, self.action, self.action_type, args=self.args)) + except Exception as e: + # On error, just pass the exception to the error handler + self.error(e) + + +def args_to_action_goal_instance(action, inst, args): + """ " + Populate an action goal instance with the provided args + + args can be a dictionary of values, or a list, or None + + Propagates any exceptions that may be raised. + """ + msg = {} + if isinstance(args, list): + msg = dict(zip(inst.get_fields_and_field_types().keys(), args)) + elif isinstance(args, dict): + msg = args + + # Populate the provided instance, propagating any exceptions + populate_instance(msg, inst) + + +def send_goal(node_handle, action, action_type, args=None, sleep_time=0.001): + # Given the action nam and type, fetch a request instance + action_name = expand_topic_name(action, node_handle.get_name(), node_handle.get_namespace()) + action_class = get_action_class(action_type) + inst = get_action_goal_instance(action_type) + + # Populate the instance with the provided args + args_to_action_goal_instance(action_name, inst, args) + + client = ActionClient(node_handle, action_class, action_name) + send_goal_future = client.send_goal_async(inst) + while rclpy.ok() and not send_goal_future.done(): + time.sleep(sleep_time) + goal_handle = send_goal_future.result() + + if not goal_handle.accepted: + raise Exception("Action goal was rejected") # TODO: Catch better + + result = goal_handle.get_result() + client.destroy() + + if result is not None: + # Turn the response into JSON and pass to the callback + json_response = extract_values(result) + else: + raise Exception(result) + + return json_response diff --git a/rosbridge_library/src/rosbridge_library/internal/services.py b/rosbridge_library/src/rosbridge_library/internal/services.py index 7a95814d5..dc91158ed 100644 --- a/rosbridge_library/src/rosbridge_library/internal/services.py +++ b/rosbridge_library/src/rosbridge_library/internal/services.py @@ -47,8 +47,8 @@ class InvalidServiceException(Exception): - def __init__(self, servicename): - Exception.__init__(self, "Service %s does not exist" % servicename) + def __init__(self, service_name): + Exception.__init__(self, f"Service {service_name} does not exist") class ServiceCaller(Thread): diff --git a/rosbridge_library/src/rosbridge_library/protocol.py b/rosbridge_library/src/rosbridge_library/protocol.py index 8e2cc1aeb..c3a2f418d 100644 --- a/rosbridge_library/src/rosbridge_library/protocol.py +++ b/rosbridge_library/src/rosbridge_library/protocol.py @@ -82,6 +82,8 @@ class Protocol: delay_between_messages = 0 # global list of non-ros advertised services external_service_list = {} + # global list of non-ros advertised actions + external_action_list = {} # Use only BSON for the whole communication if the server has been started with bson_only_mode:=True bson_only_mode = False diff --git a/rosbridge_library/test/capabilities/test_action_capabilities.py b/rosbridge_library/test/capabilities/test_action_capabilities.py new file mode 100755 index 000000000..acd033301 --- /dev/null +++ b/rosbridge_library/test/capabilities/test_action_capabilities.py @@ -0,0 +1,99 @@ +#!/usr/bin/env python +import unittest +from json import dumps, loads + +import rclpy +from rclpy.node import Node +from rosbridge_library.capabilities.action_result import ActionResult +from rosbridge_library.capabilities.advertise_action import AdvertiseAction +from rosbridge_library.internal.exceptions import ( + InvalidArgumentException, + MissingArgumentException, +) +from rosbridge_library.protocol import Protocol + + +class TestActionCapabilities(unittest.TestCase): + def setUp(self): + rclpy.init() + self.node = Node("test_action_capabilities") + + self.node.declare_parameter("call_services_in_new_thread", False) + + self.proto = Protocol(self._testMethodName, self.node) + # change the log function so we can verify errors are logged + self.proto.log = self.mock_log + # change the send callback so we can access the rosbridge messages + # being sent + self.proto.send = self.local_send_cb + self.advertise = AdvertiseAction(self.proto) + # self.unadvertise = UnadvertiseService(self.proto) + self.result = ActionResult(self.proto) + # self.call_service = CallService(self.proto) + self.received_message = None + self.log_entries = [] + + def tearDown(self): + self.node.destroy_node() + rclpy.shutdown() + + def local_send_cb(self, msg): + self.received_message = msg + + def mock_log(self, loglevel, message, _=None): + self.log_entries.append((loglevel, message)) + + def test_advertise_missing_arguments(self): + advertise_msg = loads(dumps({"op": "advertise_action"})) + self.assertRaises(MissingArgumentException, self.advertise.advertise_action, advertise_msg) + + def test_advertise_invalid_arguments(self): + advertise_msg = loads(dumps({"op": "advertise_action", "type": 42, "action": None})) + self.assertRaises(InvalidArgumentException, self.advertise.advertise_action, advertise_msg) + + def test_result_missing_arguments(self): + result_msg = loads(dumps({"op": "action_result"})) + self.assertRaises(MissingArgumentException, self.result.action_result, result_msg) + + # this message has the optional fields, with correct types, but not the + # required ones + result_msg = loads(dumps({"op": "action_result", "id": "dummy_action", "values": "none"})) + self.assertRaises(MissingArgumentException, self.result.action_result, result_msg) + + def test_result_invalid_arguments(self): + result_msg = loads(dumps({"op": "action_result", "action": 5, "result": "error"})) + self.assertRaises(InvalidArgumentException, self.result.action_result, result_msg) + + def test_advertise_action(self): + action_path = "/fibonacci_1" + advertise_msg = loads( + dumps( + { + "op": "advertise_action", + "type": "example_interfaces/Fibonacci", + "action": action_path, + } + ) + ) + self.advertise.advertise_action(advertise_msg) + + def test_execute_advertised_action(self): + # Advertise the action + action_path = "/fibonacci_2" + advertise_msg = loads( + dumps( + { + "op": "advertise_action", + "type": "example_interfaces/Fibonacci", + "action": action_path, + } + ) + ) + self.received_message = None + self.advertise.advertise_action(advertise_msg) + + # TODO: Fill out the rest + + +if __name__ == "__main__": + unittest.main() diff --git a/rosbridge_library/test/internal/actions/test_actions.py b/rosbridge_library/test/internal/actions/test_actions.py new file mode 100755 index 000000000..81007d718 --- /dev/null +++ b/rosbridge_library/test/internal/actions/test_actions.py @@ -0,0 +1,189 @@ +#!/usr/bin/env python3 +import time +import unittest +from threading import Thread + +import numpy as np +import rclpy +from example_interfaces.action import Fibonacci +from rclpy.action import ActionClient, ActionServer +from rclpy.executors import SingleThreadedExecutor +from rclpy.node import Node +from rosbridge_library.internal import actions +from rosbridge_library.internal import message_conversion as c +from rosbridge_library.internal import ros_loader + + +class ActionTester: + def __init__(self, executor): + self.executor = executor + self.node = Node("action_tester") + self.executor.add_node(self.node) + self.action_server = ActionServer( + self.node, + Fibonacci, + "get_fibonacci_sequence", + self.execute_callback, + ) + + def __del__(self): + self.executor.remove_node(self.node) + + def start(self): + req = self.action_class.Goal() + gen = c.extract_values(req) + thread = actions.ActionClientHandler( + self.name, + self.name, + gen, + self.success, + self.error, + self.node, + ) + thread.start() + thread.join() + + def execute_callback(self, goal): + self.goal = goal + feedback_msg = Fibonacci.Feedback() + feedback_msg.sequence = [0, 1] + + for i in range(1, goal.request.order): + feedback_msg.sequence.append(feedback_msg.sequence[i] + feedback_msg.sequence[i - 1]) + goal.publish_feedback(feedback_msg) + time.sleep(0.1) + + goal.succeed() + result = Fibonacci.Result() + result.sequence = feedback_msg.sequence + return result + + def success(self, rsp): + self.rsp = rsp + + def error(self, exc): + self.exc = exc + + def validate(self, equality_function): + if hasattr(self, "exc"): + print(self.exc) + raise self.exc + equality_function(self.input, c.extract_values(self.req)) + equality_function(self.output, self.rsp) + + +class TestActions(unittest.TestCase): + def setUp(self): + rclpy.init() + self.executor = SingleThreadedExecutor() + self.node = Node("test_node") + self.executor.add_node(self.node) + + self.exec_thread = Thread(target=self.executor.spin) + self.exec_thread.start() + + def tearDown(self): + self.executor.remove_node(self.node) + self.executor.shutdown() + rclpy.shutdown() + + def msgs_equal(self, msg1, msg2): + if isinstance(msg1, str) and isinstance(msg2, str): + pass + else: + self.assertEqual(type(msg1), type(msg2)) + if type(msg1) in c.list_types: + for x, y in zip(msg1, msg2): + self.msgs_equal(x, y) + elif type(msg1) in c.primitive_types or type(msg1) is str: + self.assertEqual(msg1, msg2) + elif np.issubdtype(type(msg1), np.number): + self.assertEqual(msg1, msg2) + else: + for x in msg1: + self.assertTrue(x in msg2) + for x in msg2: + self.assertTrue(x in msg1) + for x in msg1: + self.msgs_equal(msg1[x], msg2[x]) + + def test_populate_goal_args(self): + # Test empty messages + for srv_type in ["TestEmpty", "TestFeedbackAndResult", "TestResultOnly"]: + cls = ros_loader.get_action_class("rosbridge_test_msgs/" + srv_type) + for args in [[], {}, None]: + # Should throw no exceptions + actions.args_to_action_goal_instance("", cls.Goal(), args) + + # TODO: Test actions with data message + + # TODO: Test actions with multiple fields + + def test_send_action_goal(self): + """Test a simple action call""" + ActionTester(self.executor) + self.result = None + + def get_response_callback(future): + goal_handle = future.result() + if not goal_handle.accepted: + return + result_future = future.result().get_result_async() + result_future.add_done_callback(get_result_callback) + + def get_result_callback(future): + self.result = future.result().result + + # First, call the action the 'proper' way + client = ActionClient(self.node, Fibonacci, "get_fibonacci_sequence") + client.wait_for_server() + goal = Fibonacci.Goal() + goal.order = 5 + future = client.send_goal_async(goal) + future.add_done_callback(get_response_callback) + while not future.done(): + time.sleep(0.1) + client.destroy() + + self.assertIsNotNone(self.result) + self.assertEqual(list(self.result.sequence), [0, 1, 1, 2, 3, 5]) + + # Now, call using the services + json_ret = actions.send_goal( + self.node, + "get_fibonacci_sequence", + "example_interfaces/Fibonacci", + {"order": 5}, + ) + self.assertEqual(list(json_ret["result"]["sequence"]), [0, 1, 1, 2, 3, 5]) + + def test_action_client_handler(self): + """Same as test_service_call but via the thread caller""" + ActionTester(self.executor) + + rcvd = {"json": None} + + def success(json): + rcvd["json"] = json + + def error(): + raise Exception() + + # Now, call using the services + actions.ActionClientHandler( + "get_fibonacci_sequence", + "example_interfaces/Fibonacci", + {"order": 5}, + success, + error, + self.node, + ).start() + + time.sleep(1.0) + + self.assertIsNotNone(rcvd["json"]) + self.assertEqual(list(rcvd["json"]["result"]["sequence"]), [0, 1, 1, 2, 3, 5]) + + +if __name__ == "__main__": + unittest.main() diff --git a/rosbridge_test_msgs/CMakeLists.txt b/rosbridge_test_msgs/CMakeLists.txt index 414990004..0380dbb58 100644 --- a/rosbridge_test_msgs/CMakeLists.txt +++ b/rosbridge_test_msgs/CMakeLists.txt @@ -30,6 +30,9 @@ rosidl_generate_interfaces(${PROJECT_NAME} srv/TestRequestAndResponse.srv srv/TestRequestOnly.srv srv/TestResponseOnly.srv + action/TestEmpty.action + action/TestFeedbackAndResult.action + action/TestResultOnly.action DEPENDENCIES builtin_interfaces geometry_msgs std_msgs ) diff --git a/rosbridge_test_msgs/action/TestEmpty.action b/rosbridge_test_msgs/action/TestEmpty.action new file mode 100644 index 000000000..32d74ecd3 --- /dev/null +++ b/rosbridge_test_msgs/action/TestEmpty.action @@ -0,0 +1,5 @@ +# Empty goal +--- +# Empty feedback +--- +# Empty result diff --git a/rosbridge_test_msgs/action/TestFeedbackAndResult.action b/rosbridge_test_msgs/action/TestFeedbackAndResult.action new file mode 100644 index 000000000..2dafbe60b --- /dev/null +++ b/rosbridge_test_msgs/action/TestFeedbackAndResult.action @@ -0,0 +1,6 @@ +# Empty goal +--- +# Empty feedback +--- +# Result +int32 data diff --git a/rosbridge_test_msgs/action/TestResultOnly.action b/rosbridge_test_msgs/action/TestResultOnly.action new file mode 100644 index 000000000..2dafbe60b --- /dev/null +++ b/rosbridge_test_msgs/action/TestResultOnly.action @@ -0,0 +1,6 @@ +# Empty goal +--- +# Empty feedback +--- +# Result +int32 data