diff --git a/OpenRTM_aist/GlobalFactory.py b/OpenRTM_aist/GlobalFactory.py index e6843010..092a1913 100644 --- a/OpenRTM_aist/GlobalFactory.py +++ b/OpenRTM_aist/GlobalFactory.py @@ -58,7 +58,7 @@ def addFactory(self, id, creator, prop=None): if id in self._creators: return self.ALREADY_EXISTS - self._creators[id] = creator + self._creators[id] = FactoryEntry(id, creator, prop) return self.FACTORY_OK # ReturnCode removeFactory(const Identifier& id) @@ -76,9 +76,34 @@ def createObject(self, id): if id not in self._creators: print("Factory.createObject return None id: ", id) return None - obj_ = self._creators[id]() + obj_ = self._creators[id].creator() return obj_ + def getProperties(self, id): + if not id in self._creators: + print("Factory.getProperties return None id: ", id) + return None + return self._creators[id].prop + + +class FactoryEntry: + def __init__(self, id, creator, prop=None): + self._id = id + self._creator = creator + self._prop = prop + + @property + def id(self): + return self._id + + @property + def creator(self): + return self._creator + + @property + def prop(self): + return self._prop + gfactory = None diff --git a/OpenRTM_aist/InPortBase.py b/OpenRTM_aist/InPortBase.py index c02534ac..d602d9a0 100644 --- a/OpenRTM_aist/InPortBase.py +++ b/OpenRTM_aist/InPortBase.py @@ -1175,10 +1175,25 @@ def initProviders(self): # InPortProvider supports "push" dataflow type if provider_types: + prop_options = OpenRTM_aist.Properties() self._rtcout.RTC_DEBUG("dataflow_type push is supported") self.appendProperty("dataport.dataflow_type", "push") for provider_type in provider_types: self.appendProperty("dataport.interface_type", provider_type) + prop_if = factory.getProperties(provider_type) + if prop_if is None: + prop_if = OpenRTM_aist.Properties() + + prop_node = prop_options.getNode(provider_type) + prop_node.mergeProperties(prop_if) + + prop = OpenRTM_aist.Properties() + OpenRTM_aist.NVUtil.copyToProperties( + prop, self._profile.properties) + prop_dataport = prop.getNode("dataport.interface_option") + prop_dataport.mergeProperties(prop_options) + OpenRTM_aist.NVUtil.copyFromProperties( + self._profile.properties, prop) self._providerTypes = provider_types return @@ -1219,10 +1234,25 @@ def initConsumers(self): # OutPortConsumer supports "pull" dataflow type if consumer_types: + prop_options = OpenRTM_aist.Properties() self._rtcout.RTC_PARANOID("dataflow_type pull is supported") self.appendProperty("dataport.dataflow_type", "pull") for consumer_type in consumer_types: self.appendProperty("dataport.interface_type", consumer_type) + prop_if = factory.getProperties(consumer_type) + if prop_if is None: + prop_if = OpenRTM_aist.Properties() + + prop_node = prop_options.getNode(consumer_type) + prop_node.mergeProperties(prop_if) + + prop = OpenRTM_aist.Properties() + OpenRTM_aist.NVUtil.copyToProperties( + prop, self._profile.properties) + prop_dataport = prop.getNode("dataport.interface_option") + prop_dataport.mergeProperties(prop_options) + OpenRTM_aist.NVUtil.copyFromProperties( + self._profile.properties, prop) self._consumerTypes = consumer_types return diff --git a/OpenRTM_aist/OutPortBase.py b/OpenRTM_aist/OutPortBase.py index d980d5c6..1983b32a 100644 --- a/OpenRTM_aist/OutPortBase.py +++ b/OpenRTM_aist/OutPortBase.py @@ -1185,11 +1185,27 @@ def initProviders(self): provider_types = provider_types + list(set_ptypes) # OutPortProvider supports "pull" dataflow type + if provider_types: + prop_options = OpenRTM_aist.Properties() self._rtcout.RTC_DEBUG("dataflow_type pull is supported") self.appendProperty("dataport.dataflow_type", "pull") for provider_type in provider_types: self.appendProperty("dataport.interface_type", provider_type) + prop_if = factory.getProperties(provider_type) + if prop_if is None: + prop_if = OpenRTM_aist.Properties() + + prop_node = prop_options.getNode(provider_type) + prop_node.mergeProperties(prop_if) + + prop = OpenRTM_aist.Properties() + OpenRTM_aist.NVUtil.copyToProperties( + prop, self._profile.properties) + prop_dataport = prop.getNode("dataport.interface_option") + prop_dataport.mergeProperties(prop_options) + OpenRTM_aist.NVUtil.copyFromProperties( + self._profile.properties, prop) self._providerTypes = provider_types @@ -1229,10 +1245,25 @@ def initConsumers(self): # InPortConsumer supports "push" dataflow type if consumer_types: + prop_options = OpenRTM_aist.Properties() self._rtcout.RTC_PARANOID("dataflow_type push is supported") self.appendProperty("dataport.dataflow_type", "push") for consumer_type in consumer_types: self.appendProperty("dataport.interface_type", consumer_type) + prop_if = factory.getProperties(consumer_type) + if prop_if is None: + prop_if = OpenRTM_aist.Properties() + + prop_node = prop_options.getNode(consumer_type) + prop_node.mergeProperties(prop_if) + + prop = OpenRTM_aist.Properties() + OpenRTM_aist.NVUtil.copyToProperties( + prop, self._profile.properties) + prop_dataport = prop.getNode("dataport.interface_option") + prop_dataport.mergeProperties(prop_options) + OpenRTM_aist.NVUtil.copyFromProperties( + self._profile.properties, prop) self._consumerTypes = consumer_types diff --git a/OpenRTM_aist/ext/transport/ROSTransport/ROSInPort.py b/OpenRTM_aist/ext/transport/ROSTransport/ROSInPort.py index 59475c2c..0c719f0e 100644 --- a/OpenRTM_aist/ext/transport/ROSTransport/ROSInPort.py +++ b/OpenRTM_aist/ext/transport/ROSTransport/ROSInPort.py @@ -100,6 +100,13 @@ def __init__(self): self._messageType = "ros:std_msgs/Float32" self._roscorehost = "" self._roscoreport = "" + self._so_reuseaddr = True + self._so_keepalive = True + self._tcp_keepcnt = 9 + self._tcp_keepidle = 60 + self._tcp_keepintvl = 10 + self._tcp_nodelay = True + self._sock_timeout = 60.0 self._tcp_connecters = [] self._con_mutex = threading.RLock() @@ -225,6 +232,44 @@ def init(self, prop): "marshaling_type", "ros:std_msgs/Float32") self._topic = prop.getProperty("ros.topic", "chatter") self._topic = "/" + self._topic + + self._so_reuseaddr = OpenRTM_aist.toBool(prop.getProperty( + "ros.so_reuseaddr"), "YES", "NO", True) + + self._so_keepalive = OpenRTM_aist.toBool(prop.getProperty( + "ros.so_keepalive"), "YES", "NO", True) + + try: + self._tcp_keepcnt = int(prop.getProperty( + "ros.tcp_keepcnt")) + except ValueError as error: + pass + # self._rtcout.RTC_ERROR(error) + + try: + self._tcp_keepidle = int(prop.getProperty( + "ros.tcp_keepidle")) + except ValueError as error: + pass + # self._rtcout.RTC_ERROR(error) + + try: + self._tcp_keepintvl = int(prop.getProperty( + "ros.tcp_keepintvl")) + except ValueError as error: + pass + # self._rtcout.RTC_ERROR(error) + + self._tcp_nodelay = OpenRTM_aist.toBool(prop.getProperty( + "ros.tcp_nodelay"), "YES", "NO", True) + + try: + self._sock_timeout = float(prop.getProperty( + "ros.sock.timeout")) + except ValueError as error: + pass + # self._rtcout.RTC_ERROR(error) + self._roscorehost = prop.getProperty("ros.roscore.host") self._roscoreport = prop.getProperty("ros.roscore.port") @@ -234,9 +279,10 @@ def init(self, prop): ROSInPort.ROS_MASTER_URI) env = os.getenv(ROSInPort.ROS_MASTER_URI) if env: - self._rtcout.RTC_VERBOSE("$%s: %s", (ROSInPort.ROS_MASTER_URI, env)) - env = env.replace("http://","") - env = env.replace("https://","") + self._rtcout.RTC_VERBOSE( + "$%s: %s", (ROSInPort.ROS_MASTER_URI, env)) + env = env.replace("http://", "") + env = env.replace("https://", "") envsplit = env.split(":") self._roscorehost = envsplit[0] if len(envsplit) >= 2: @@ -248,7 +294,6 @@ def init(self, prop): if not self._roscoreport: self._roscoreport = ROSInPort.ROS_DEFAULT_MASTER_PORT - self._rtcout.RTC_VERBOSE("topic name: %s", self._topic) self._rtcout.RTC_VERBOSE( "roscore address: %s:%s", @@ -268,7 +313,7 @@ def init(self, prop): info_type = info.datatype() else: self._rtcout.RTC_ERROR("can not found %s", self._messageType) - return + raise MemoryError("Message Type ERROR") self._rtcout.RTC_VERBOSE("caller id: %s", self._callerid) @@ -282,7 +327,8 @@ def init(self, prop): self._callerid, self._topic, info_type, self._topicmgr.getURI()) except xmlrpclib.Fault as err: self._rtcout.RTC_ERROR("XML-RPC ERROR: %s", err.faultString) - return + raise MemoryError("XML-RPC ERROR") + self.connect(self._callerid, self._topic, val) ## @@ -340,13 +386,47 @@ def connect(self, caller_id, topic, publishers): else: _, dest_addr, dest_port = result sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) - sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1) - sock.setsockopt(socket.SOL_TCP, socket.TCP_KEEPCNT, 9) - sock.setsockopt(socket.SOL_TCP, socket.TCP_KEEPIDLE, 60) - sock.setsockopt(socket.SOL_TCP, socket.TCP_KEEPINTVL, 10) - sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) - sock.settimeout(60.0) + so_reuseaddr = 1 + if not self._so_reuseaddr: + so_reuseaddr = 0 + sock.setsockopt(socket.SOL_SOCKET, + socket.SO_REUSEADDR, so_reuseaddr) + so_keepalive = 1 + if not self._so_keepalive: + so_keepalive = 0 + sock.setsockopt(socket.SOL_SOCKET, + socket.SO_KEEPALIVE, so_keepalive) + if self._so_keepalive: + sock.setsockopt( + socket.SOL_TCP, socket.TCP_KEEPCNT, self._tcp_keepcnt) + sock.setsockopt( + socket.SOL_TCP, socket.TCP_KEEPIDLE, self._tcp_keepidle) + sock.setsockopt( + socket.SOL_TCP, socket.TCP_KEEPINTVL, self._tcp_keepintvl) + + tcp_nodelay = 1 + if not self._tcp_nodelay: + tcp_nodelay = 0 + + sock.setsockopt(socket.IPPROTO_TCP, + socket.TCP_NODELAY, tcp_nodelay) + sock.settimeout(self._sock_timeout) + + self._rtcout.RTC_VERBOSE( + "SO_REUSEADDR: %s", self._so_reuseaddr) + self._rtcout.RTC_VERBOSE( + "SO_KEEPALIVE: %s", self._so_keepalive) + self._rtcout.RTC_VERBOSE( + "TCP_KEEPCNT: %d", self._tcp_keepcnt) + self._rtcout.RTC_VERBOSE( + "TCP_KEEPIDLE: %d", self._tcp_keepidle) + self._rtcout.RTC_VERBOSE( + "TCP_KEEPINTVL: %d", self._tcp_keepintvl) + self._rtcout.RTC_VERBOSE( + "TCP_NODELAY: %d", self._tcp_nodelay) + self._rtcout.RTC_VERBOSE( + "Socket timeout: %lf", self._sock_timeout) + sock.connect((dest_addr, dest_port)) fileno = sock.fileno() @@ -391,9 +471,12 @@ def connect(self, caller_id, topic, publishers): "Can not found %s", self._messageType) sock.setblocking(1) + tcp_nodelay = '1' + if not self._tcp_nodelay: + tcp_nodelay = '0' fields = {'topic': topic, 'message_definition': info_message_definition, - 'tcp_nodelay': '0', + 'tcp_nodelay': tcp_nodelay, 'md5sum': info_md5sum, 'type': info_type, 'callerid': self._callerid} @@ -794,6 +877,45 @@ def recieve(self): return +ros_sub_option = [ + "topic.__value__", "chatter", + "topic.__widget__", "text", + "topic.__constraint__", "none", + "roscore.host.__value__", "", + "roscore.host.__widget__", "text", + "roscore.host.__constraint__", "none", + "roscore.port.__value__", "", + "roscore.port.__widget__", "text", + "roscore.port.__constraint__", "none", + "node.name.__value__", "", + "node.name.__widget__", "text", + "node.name.__constraint__", "none", + "node.anonymous.__value__", "NO", + "node.anonymous.__widget__", "radio", + "node.anonymous.__constraint__", "(YES, NO)", + "so_reuseaddr.__value__", "YES", + "so_reuseaddr.__widget__", "radio", + "so_reuseaddr.__constraint__", "(YES, NO)", + "so_keepalive.__value__", "YES", + "so_keepalive.__widget__", "radio", + "so_keepalive.__constraint__", "(YES, NO)", + "tcp_nodelay.__value__", "YES", + "tcp_nodelay.__widget__", "radio", + "tcp_nodelay.__constraint__", "(YES, NO)", + "tcp_keepcnt.__value__", "9", + "tcp_keepcnt.__widget__", "spin", + "tcp_keepcnt.__constraint__", "1 <= x <= 10000", + "tcp_keepidle.__value__", "60", + "tcp_keepidle.__widget__", "spin", + "tcp_keepidle.__constraint__", "1 <= x <= 10000", + "tcp_keepintvl.__value__", "10", + "tcp_keepintvl.__widget__", "spin", + "tcp_keepintvl.__constraint__", "1 <= x <= 10000", + "ros.sock.timeout.__value__", "60", + "ros.sock.timeout.__widget__", "spin", + "ros.sock.timeout.__constraint__", "1 <= x <= 10000", + "" +] ## # @if jp # @brief モジュール登録関数 @@ -805,7 +927,11 @@ def recieve(self): # # @endif # + + def ROSInPortInit(): + prop = OpenRTM_aist.Properties(defaults_str=ros_sub_option) factory = OpenRTM_aist.InPortProviderFactory.instance() factory.addFactory("ros", - ROSInPort) + ROSInPort, + prop) diff --git a/OpenRTM_aist/ext/transport/ROSTransport/ROSOutPort.py b/OpenRTM_aist/ext/transport/ROSTransport/ROSOutPort.py index 6f87d580..c5decf5e 100644 --- a/OpenRTM_aist/ext/transport/ROSTransport/ROSOutPort.py +++ b/OpenRTM_aist/ext/transport/ROSTransport/ROSOutPort.py @@ -80,6 +80,7 @@ def __init__(self): self._topic = "chatter" self._roscorehost = "" self._roscoreport = "" + self._tcp_nodelay = True self._tcp_connecters = [] self._con_mutex = threading.RLock() self._message_data_sent = 0 @@ -148,6 +149,10 @@ def init(self, prop): "marshaling_type", "ros:std_msgs/Float32") self._topic = prop.getProperty("ros.topic", "chatter") self._topic = "/" + self._topic + + self._tcp_nodelay = OpenRTM_aist.toBool(prop.getProperty( + "ros.tcp_nodelay"), "YES", "NO", True) + self._roscorehost = prop.getProperty("ros.roscore.host") self._roscoreport = prop.getProperty("ros.roscore.port") @@ -203,6 +208,7 @@ def init(self, prop): self._topicmgr.getURI()) except xmlrpclib.Fault as err: self._rtcout.RTC_ERROR("XML-RPC ERROR: %s", err.faultString) + raise MemoryError("XML-RPC ERROR") ## # @if jp @@ -277,9 +283,12 @@ def connect(self, client_sock, header): "MD5sum in not match(%s:%s)", (info_md5sum, md5sum)) return + tcp_nodelay = '1' + if not self._tcp_nodelay: + tcp_nodelay = '0' fields = {'topic': topic_name, 'message_definition': info_message_definition, - 'tcp_nodelay': '0', + 'tcp_nodelay': tcp_nodelay, 'md5sum': info_md5sum, 'type': info_type, 'callerid': self._callerid} @@ -488,6 +497,28 @@ def datatype(self): return self._messageType +ros_pub_option = [ + "topic.__value__", "chatter", + "topic.__widget__", "text", + "topic.__constraint__", "none", + "roscore.host.__value__", "", + "roscore.host.__widget__", "text", + "roscore.host.__constraint__", "none", + "roscore.port.__value__", "", + "roscore.port.__widget__", "text", + "roscore.port.__constraint__", "none", + "node.name.__value__", "", + "node.name.__widget__", "text", + "node.name.__constraint__", "none", + "node.anonymous.__value__", "NO", + "node.anonymous.__widget__", "radio", + "node.anonymous.__constraint__", "(YES, NO)", + "tcp_nodelay.__value__", "YES", + "tcp_nodelay.__widget__", "radio", + "tcp_nodelay.__constraint__", "(YES, NO)", + "" +] + ## # @if jp # @brief モジュール登録関数 @@ -502,6 +533,8 @@ def datatype(self): def ROSOutPortInit(): + prop = OpenRTM_aist.Properties(defaults_str=ros_pub_option) factory = OpenRTM_aist.InPortConsumerFactory.instance() factory.addFactory("ros", - ROSOutPort) + ROSOutPort, + prop) diff --git a/OpenRTM_aist/ext/transport/ROSTransport/ROSTopicManager.py b/OpenRTM_aist/ext/transport/ROSTransport/ROSTopicManager.py index b59a73e0..629cc6ab 100644 --- a/OpenRTM_aist/ext/transport/ROSTransport/ROSTopicManager.py +++ b/OpenRTM_aist/ext/transport/ROSTransport/ROSTopicManager.py @@ -885,6 +885,29 @@ def getSubscriberLink(self, connection): return con return None + ## + # @if jp + # @brief 初期化 + # + # + # @else + # + # @brief + # + # + # @endif + + def init(): + global manager + global mutex + + guard = OpenRTM_aist.Guard.ScopedLock(mutex) + if manager is None: + manager = ROSTopicManager() + manager.start() + + init = staticmethod(init) + ## # @if jp # @brief インスタンス取得 diff --git a/OpenRTM_aist/ext/transport/ROSTransport/ROSTransport.py b/OpenRTM_aist/ext/transport/ROSTransport/ROSTransport.py index 6d74f4d2..4f377a2d 100644 --- a/OpenRTM_aist/ext/transport/ROSTransport/ROSTransport.py +++ b/OpenRTM_aist/ext/transport/ROSTransport/ROSTransport.py @@ -92,5 +92,6 @@ def ROSTransportInit(mgr): ROSInPort.ROSInPortInit() ROSOutPort.ROSOutPortInit() ROSSerializer.ROSSerializerInit() + ROSTopicManager.init() mgr.addManagerActionListener(ManagerActionListener()) diff --git a/OpenRTM_aist/ext/transport/ROSTransport/rtc.conf b/OpenRTM_aist/ext/transport/ROSTransport/rtc.conf index 550ac0bd..8d738840 100644 --- a/OpenRTM_aist/ext/transport/ROSTransport/rtc.conf +++ b/OpenRTM_aist/ext/transport/ROSTransport/rtc.conf @@ -4,5 +4,5 @@ logger.log_level: ERROR manager.modules.load_path: . manager.modules.preload: ROSTransport.py -manager.components.preconnect: ConsoleOut0.in?interface_type=ros&marshaling_type=ROSFloat32, ConsoleIn0.out?interface_type=ros&marshaling_type=ROSFloat32 +manager.components.preconnect: ConsoleOut0.in?interface_type=ros&marshaling_type=ros:std_msgs/Float32&ros.node.name=ConsoleOut0, ConsoleIn0.out?interface_type=ros&marshaling_type=ros:std_msgs/Float32&ros.node.name=ConsoleIn0 manager.components.preactivation: ConsoleOut0, ConsoleIn0 \ No newline at end of file