-
Notifications
You must be signed in to change notification settings - Fork 0
/
ros.py
148 lines (119 loc) · 4.47 KB
/
ros.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
#!/usr/bin/env python3
#
# ros.py
#
# Set up a very simple ROS node to listen for goals/commands and
# publish the current pose (position and heading).
#
# Node: /unicron (this will use your Pi's name)
#
# Publish: ~/pose geometry_msgs/Pose
# Subscribe: ~/goal geometry_msgs/Point
# Subscribe: ~/explore std_msgs/Empty
#
import rclpy
import socket
import traceback
from interface.ui_util import set_flags_to
import constants as const
from interface.ui_util import post
from math import pi, sin, cos
from rclpy.node import Node
from rclpy.time import Time, Duration
from geometry_msgs.msg import Point, Pose
from std_msgs.msg import Empty
# Simple Node Class
class SimpleNode(Node):
# Initialization.
def __init__(self, name, state, flags_in, out):
# Initialize the node, naming it as specified
super().__init__(name)
# Clear the last sent content (posx,posy,theta) and time.
self.posx = None
self.posy = None
self.theta = None
self.time = Time()
self.flags = flags_in
self.out = out
# Create the publisher for the pose information.
self.pub = self.create_publisher(Pose, '~/pose', 10)
# Then create subscribers for goal and explore commands.
self.create_subscription(Point, '~/goal', self.cb_goal, 10)
self.create_subscription(Empty, '~/explore', self.cb_explore, 10)
# Finally create a timer to drive the node.
self.timer = self.create_timer(0.1, lambda: self.cb_timer(state))
# Report and return.
self.get_logger().info("ROS Node '%s' running" % (name))
# Shutdown.
def shutdown(self):
# Destroy the timer and shut down the node.
self.destroy_timer(self.timer)
self.destroy_node()
# Timer callback.
def cb_timer(self, state):
if state[0] != (None, None):
posx = float(state[0][0])
posy = float(state[0][1])
theta = pi/4 * float(state[1]+2)
# Check whether to send (if something changed or 5s have passed).
now = self.get_clock().now()
if ((posx != self.posx) or (posy != self.posy) or
(theta != self.theta) or
(now - self.time > Duration(seconds=5.0))):
# Populate the message with the data and send.
msg = Pose()
msg.position.x = posx
msg.position.y = posy
msg.orientation.z = sin(theta/2)
msg.orientation.w = cos(theta/2)
self.pub.publish(msg)
# Retain as the last sent info.
self.posx = posx
self.posy = posy
self.theta = theta
self.time = now
# Goal command callback.
def cb_goal(self, msg):
# Extract the goal coordinates from the message.
xgoal = msg.x
ygoal = msg.y
# Report.
self.get_logger().info("Received goal command (%d,%d)" % (xgoal,ygoal))
post(f"Received ROS Goal Command ({xgoal}, {ygoal})", self.out)
# Inject the goal command into your robot thread, just as if
# you had typed the goal command in the UI thread.
cmd = const.CMD_DICT['goal']
cmd.append(f"{int(xgoal)},{int(ygoal)}")
cmd[const.DATA] = cmd[len(cmd) - 1]
print(cmd)
set_flags_to(self.flags, cmd)
print(cmd)
# Explore command callback.
def cb_explore(self, msg):
# Report.
self.get_logger().info("Received explore command")
post("Received ros explore command", self.out)
# Inject the explore command into your robot thread, just as
# if you had typed the explore command in the UI thread.
cmd = const.CMD_DICT['explore']
cmd.append(None)
set_flags_to(self.flags, cmd)
#
# Main ROS Thread Code
#
def runros(state, flags, out):
# Initialize ROS.
rclpy.init()
# Instantiate the simple node, named after the host name.
node = SimpleNode(socket.gethostname(), state, flags, out)
# Spin the node until interrupted.
try:
rclpy.spin(node)
except BaseException as ex:
print("Ending runros due to exception: %s" % repr(ex))
traceback.print_exc()
# Shutdown the node and ROS.
node.shutdown()
rclpy.shutdown()
if __name__ == "__main__":
runros()