From 5c91e88362a02b1844af28cd6d0fe01f022f3c9d Mon Sep 17 00:00:00 2001 From: fsae computer <> Date: Thu, 25 Apr 2024 13:11:19 +1200 Subject: [PATCH 1/4] interpolate centerline --- src/moa/moa_bringup/launch/sim_launch.py | 12 +- .../path_planning/path_planning/bound_coods | 4 + .../path_planning/path_planning/bound_coods2 | 4 + .../path_planning/trajectory_generation.py | 4 +- .../trajectory_optimisation_CS.py | 331 ++++++++---------- .../visualise_trajectories.py | 185 ++++------ .../visualise_trajectories_demo.py | 7 +- 7 files changed, 228 insertions(+), 319 deletions(-) create mode 100644 src/planning/path_planning/path_planning/bound_coods create mode 100644 src/planning/path_planning/path_planning/bound_coods2 diff --git a/src/moa/moa_bringup/launch/sim_launch.py b/src/moa/moa_bringup/launch/sim_launch.py index 125c7bd..9199600 100644 --- a/src/moa/moa_bringup/launch/sim_launch.py +++ b/src/moa/moa_bringup/launch/sim_launch.py @@ -4,11 +4,11 @@ def generate_launch_description(): return launch.LaunchDescription([ # cone detect - launch_ros.actions.Node( - package='cone_mapping', - executable='dbscan', - name='listener', - ), + # launch_ros.actions.Node( + # package='cone_mapping', + # executable='dbscan', + # name='listener', + # ), # path generation launch_ros.actions.Node( @@ -37,7 +37,7 @@ def generate_launch_description(): # path viz launch_ros.actions.Node( package='path_planning_visualization', - executable='visualize2', + executable='visualize', name='path_viz', ), diff --git a/src/planning/path_planning/path_planning/bound_coods b/src/planning/path_planning/path_planning/bound_coods new file mode 100644 index 0000000..7636868 --- /dev/null +++ b/src/planning/path_planning/path_planning/bound_coods @@ -0,0 +1,4 @@ +-13.984399896783033 -13.753242748183307 -13.522085599583582 -13.290928450983856 -13.05977130238413 -12.828614153784404 -12.597457005184678 -12.366299856584952 -12.135142707985226 -11.9039855593855 -11.672828410785774 -11.441671262186048 -11.210514113586322 -10.979356964986597 -10.74819981638687 -10.517042667787145 -10.285885519187419 -10.054728370587693 -9.823571221987965 -9.592414073388241 -9.361256924788513 -9.13009977618879 -8.898942627589062 -8.667785478989337 -8.43662833038961 -8.205471181789886 -7.974314033190159 -7.743156884590433 -7.511999735990707 -7.280842587390981 -7.049685438791255 -6.818528290191529 -6.587371141591803 -6.356213992992077 -6.125056844392351 -5.893899695792625 -5.662742547192899 -5.431585398593173 -5.200428249993447 -4.969271101393721 -4.738113952793995 -4.506956804194269 -4.275799655594543 -4.044642506994817 -3.8134853583950914 -3.5823282097953655 -3.3511710611956396 -3.1200139125959137 -2.8888567639961877 -2.6576996153964614 +18.00000101729649 17.87396674394038 17.747932470584274 17.62189819722816 17.49586392387205 17.36982965051594 17.24379537715983 17.11776110380372 16.99172683044761 16.8656925570915 16.723749949164805 16.564488014449786 16.405226079734767 16.24596414501975 16.086702210304736 15.927440275589715 15.768178340874702 15.60891640615968 15.449654471444662 15.243598109882772 15.016342330608962 14.789086551335156 14.561830772061345 14.334574992787541 14.10731921351373 13.880063434239922 13.652807654966114 13.389405065848925 13.115060535444195 12.840716005039466 12.566371474634739 12.292026944230006 12.017682413825277 11.743337883420548 11.40061738510908 11.045069945062286 10.689522505015493 10.333975064968698 9.978427624921904 9.620174315110118 9.129500537664088 8.638826760218056 8.148152982772025 7.657479205325993 7.0797583828357205 6.368003065412987 5.6562477479902515 4.926519334899599 3.7132599223374685 2.5000005097753353 +-9.104301541482327 -8.871501531845682 -8.638701522209038 -8.405901512572395 -8.17310150293575 -7.940301493299106 -7.707501483662462 -7.474701474025817 -7.241901464389173 -7.009101454752529 -6.776301445115885 -6.543501435479241 -6.310701425842597 -6.077901416205952 -5.845101406569308 -5.612301396932663 -5.37950138729602 -5.146701377659376 -4.9139013680227315 -4.681101358386087 -4.4483013487494425 -4.215501339112799 -3.9827013294761544 -3.74990131983951 -3.5171013102028663 -3.284301300566222 -3.0515012909295773 -2.818701281292933 -2.5859012716562892 -2.3531012620196448 -2.1203012523830003 -1.8875012427463567 -1.6547012331097122 -1.4219012234730677 -1.189101213836424 -0.9563012041997787 -0.723501194563136 -0.4907011849264915 -0.257901175289847 -0.02510116565320253 0.20769884398344196 0.44049885362008645 0.6732988632567292 0.9060988728933737 1.1388988825300181 1.3716988921666626 1.6044989018033071 1.8372989114399516 2.0700989210765943 2.302898930713238 +21.09999984182598 20.925851561371385 20.75170328091679 20.57755500046219 20.403406720007595 20.229258439552996 20.055110159098398 19.880961878643802 19.706813598189207 19.532665317734608 19.35851703728001 19.18082601200009 18.9672104316014 18.753594851202706 18.539979270804018 18.32636369040533 18.11274811000664 17.899132529607947 17.68551694920926 17.471901368810567 17.258285788411875 17.024671570043896 16.73384583089838 16.44302009175287 16.15219435260736 15.861368613461845 15.570542874316335 15.27971713517082 14.98889139602531 14.698065656879798 14.361528507753558 14.001735788644298 13.641943069535035 13.282150350425773 12.922357631316512 12.562564912207248 12.202772193097989 11.78718052761864 11.299531749635975 10.811882971653308 10.324234193670641 9.836585415687976 9.348936637705314 8.701670759843026 7.984220134415498 7.266769508987968 6.549318883560439 5.5676250313189986 4.348812112735477 3.12999919415195 diff --git a/src/planning/path_planning/path_planning/bound_coods2 b/src/planning/path_planning/path_planning/bound_coods2 new file mode 100644 index 0000000..7636868 --- /dev/null +++ b/src/planning/path_planning/path_planning/bound_coods2 @@ -0,0 +1,4 @@ +-13.984399896783033 -13.753242748183307 -13.522085599583582 -13.290928450983856 -13.05977130238413 -12.828614153784404 -12.597457005184678 -12.366299856584952 -12.135142707985226 -11.9039855593855 -11.672828410785774 -11.441671262186048 -11.210514113586322 -10.979356964986597 -10.74819981638687 -10.517042667787145 -10.285885519187419 -10.054728370587693 -9.823571221987965 -9.592414073388241 -9.361256924788513 -9.13009977618879 -8.898942627589062 -8.667785478989337 -8.43662833038961 -8.205471181789886 -7.974314033190159 -7.743156884590433 -7.511999735990707 -7.280842587390981 -7.049685438791255 -6.818528290191529 -6.587371141591803 -6.356213992992077 -6.125056844392351 -5.893899695792625 -5.662742547192899 -5.431585398593173 -5.200428249993447 -4.969271101393721 -4.738113952793995 -4.506956804194269 -4.275799655594543 -4.044642506994817 -3.8134853583950914 -3.5823282097953655 -3.3511710611956396 -3.1200139125959137 -2.8888567639961877 -2.6576996153964614 +18.00000101729649 17.87396674394038 17.747932470584274 17.62189819722816 17.49586392387205 17.36982965051594 17.24379537715983 17.11776110380372 16.99172683044761 16.8656925570915 16.723749949164805 16.564488014449786 16.405226079734767 16.24596414501975 16.086702210304736 15.927440275589715 15.768178340874702 15.60891640615968 15.449654471444662 15.243598109882772 15.016342330608962 14.789086551335156 14.561830772061345 14.334574992787541 14.10731921351373 13.880063434239922 13.652807654966114 13.389405065848925 13.115060535444195 12.840716005039466 12.566371474634739 12.292026944230006 12.017682413825277 11.743337883420548 11.40061738510908 11.045069945062286 10.689522505015493 10.333975064968698 9.978427624921904 9.620174315110118 9.129500537664088 8.638826760218056 8.148152982772025 7.657479205325993 7.0797583828357205 6.368003065412987 5.6562477479902515 4.926519334899599 3.7132599223374685 2.5000005097753353 +-9.104301541482327 -8.871501531845682 -8.638701522209038 -8.405901512572395 -8.17310150293575 -7.940301493299106 -7.707501483662462 -7.474701474025817 -7.241901464389173 -7.009101454752529 -6.776301445115885 -6.543501435479241 -6.310701425842597 -6.077901416205952 -5.845101406569308 -5.612301396932663 -5.37950138729602 -5.146701377659376 -4.9139013680227315 -4.681101358386087 -4.4483013487494425 -4.215501339112799 -3.9827013294761544 -3.74990131983951 -3.5171013102028663 -3.284301300566222 -3.0515012909295773 -2.818701281292933 -2.5859012716562892 -2.3531012620196448 -2.1203012523830003 -1.8875012427463567 -1.6547012331097122 -1.4219012234730677 -1.189101213836424 -0.9563012041997787 -0.723501194563136 -0.4907011849264915 -0.257901175289847 -0.02510116565320253 0.20769884398344196 0.44049885362008645 0.6732988632567292 0.9060988728933737 1.1388988825300181 1.3716988921666626 1.6044989018033071 1.8372989114399516 2.0700989210765943 2.302898930713238 +21.09999984182598 20.925851561371385 20.75170328091679 20.57755500046219 20.403406720007595 20.229258439552996 20.055110159098398 19.880961878643802 19.706813598189207 19.532665317734608 19.35851703728001 19.18082601200009 18.9672104316014 18.753594851202706 18.539979270804018 18.32636369040533 18.11274811000664 17.899132529607947 17.68551694920926 17.471901368810567 17.258285788411875 17.024671570043896 16.73384583089838 16.44302009175287 16.15219435260736 15.861368613461845 15.570542874316335 15.27971713517082 14.98889139602531 14.698065656879798 14.361528507753558 14.001735788644298 13.641943069535035 13.282150350425773 12.922357631316512 12.562564912207248 12.202772193097989 11.78718052761864 11.299531749635975 10.811882971653308 10.324234193670641 9.836585415687976 9.348936637705314 8.701670759843026 7.984220134415498 7.266769508987968 6.549318883560439 5.5676250313189986 4.348812112735477 3.12999919415195 diff --git a/src/planning/path_planning/path_planning/trajectory_generation.py b/src/planning/path_planning/path_planning/trajectory_generation.py index 08fb491..44ea781 100644 --- a/src/planning/path_planning/path_planning/trajectory_generation.py +++ b/src/planning/path_planning/path_planning/trajectory_generation.py @@ -159,10 +159,10 @@ def get_position_of_cart(self, cone_map): return x, y, theta def get_transformation_matrix(self, position_and_orientation): - theta = position_and_orientation[2] - np.pi/2 + # theta = position_and_orientation[2] - np.pi/2 cart_x = position_and_orientation[0] cart_y = position_and_orientation[1] - # theta = position_and_orientation[2] + theta = position_and_orientation[2] # 2d trasformation matrix rotation_matrix = np.array([[np.cos(theta), -np.sin(theta)],[np.sin(theta), np.cos(theta)]]) position_vector = np.array([[cart_x], [cart_y]]) diff --git a/src/planning/path_planning/path_planning/trajectory_optimisation_CS.py b/src/planning/path_planning/path_planning/trajectory_optimisation_CS.py index 4a5bfef..a6e9981 100644 --- a/src/planning/path_planning/path_planning/trajectory_optimisation_CS.py +++ b/src/planning/path_planning/path_planning/trajectory_optimisation_CS.py @@ -3,6 +3,7 @@ from shapely import LineString, MultiPoint from shapely import Point as shapelyPoint from scipy import interpolate +import matplotlib.pyplot as plt import os import rclpy @@ -66,81 +67,49 @@ def set_boundaries(self, msg: ConeMap): leftboundary = [] rightboundary = [] for i in range(len(cones)): + x = cones[i].pose.pose.position.x + y = cones[i].pose.pose.position.y if i != 0: - x = cones[i].pose.pose.position.x - y = cones[i].pose.pose.position.y # blue - left if cones[i].colour == 0: leftboundary.append([x,y]) elif cones[i].colour == 2: rightboundary.append([x,y]) + else: + car_position = [x,y] + - # interpolate - funcL = interpolate.interp1d([P[0] for P in leftboundary], [P[1] for P in leftboundary], kind='cubic') - funcR = interpolate.interp1d([P[0] for P in rightboundary], [P[1] for P in rightboundary], kind='cubic') + # get local boundary points + # position_orientation = self.get_position_of_cart(msg) + # starting_index = self.get_local_boundary(position_orientation, car_position, leftboundary, rightboundary) + # leftboundary = leftboundary[starting_index:] + # rightboundary = rightboundary[starting_index:] - xlrange = np.linspace(min([P[0] for P in leftboundary]), max([P[0] for P in leftboundary])) - xrrange = np.linspace(min([P[0] for P in rightboundary]), max([P[0] for P in rightboundary])) - self.get_logger().info(f"length of xlrange = {len(xlrange)}") - leftboundary = [] - rightboundary = [] - for P in xlrange: - leftboundary.append([P,funcL(P)]) + # leftboundary, rightboundary = self.interpolate_boundary(leftboundary, rightboundary) + + # get center line + coods = self.get_center_line(leftboundary, rightboundary) - for P in xrrange: - rightboundary.append([P,funcR(P)]) - - # get correspnoding right boundary based on left boundary x1, y1, x2, y2 = leftboundary[0][0], leftboundary[0][1], rightboundary[0][0], rightboundary[0][1] track_width = self.get_distance(x1, y1, x2, y2) - # get right boundary - # leftboundary = self.get_left_boundary(rightboundary, track_width) - print(f"left boundary = {len(leftboundary)}, right boundary = {len(rightboundary)}") + # print(f"left boundary = {len(leftboundary)}, right boundary = {len(rightboundary)}") # adjust boundaries # self._leftboundary, self._rightboundary = self.get_adjusted_boundaries(leftboundary, rightboundary) self._leftboundary = leftboundary self._rightboundary = rightboundary - - # print coordinate lists + self._track_width = track_width + self._center_coods = coods + + # plot if self._once: - with open(f'{os.path.dirname(__file__)}/bound_coods', 'w') as fh: - xl=[i[0] for i in self._leftboundary] - yl=[i[1] for i in self._leftboundary] - xr=[i[0] for i in self._rightboundary] - yr=[i[1] for i in self._rightboundary] - for P in xl: - fh.write("{} ".format(P)) - fh.write("\n") - for P in yl: - fh.write("{} ".format(P)) - fh.write("\n") - for P in xr: - fh.write("{} ".format(P)) - fh.write("\n") - for P in yr: - fh.write("{} ".format(P)) - fh.write("\n") - fh.close() - - with open(f'{os.path.dirname(__file__)}/bound_coods2', 'w') as fh: - xl=[i[0] for i in leftboundary] - yl=[i[1] for i in leftboundary] - xr=[i[0] for i in rightboundary] - yr=[i[1] for i in rightboundary] - for P in xl: - fh.write("{} ".format(P)) - fh.write("\n") - for P in yl: - fh.write("{} ".format(P)) - fh.write("\n") - for P in xr: - fh.write("{} ".format(P)) - fh.write("\n") - for P in yr: - fh.write("{} ".format(P)) - fh.write("\n") - fh.close() - # self._once = False + # plot + plt.plot([P[0] for P in leftboundary], [P[1] for P in leftboundary], "-g", label='leftboundary') + plt.plot([P[0] for P in rightboundary], [P[1] for P in rightboundary], "-r", label='rightboundary') + plt.plot([P[0] for P in coods], [P[1] for P in coods], "ob", label='centerline') + plt.plot([car_position[0]],[car_position[1]],'or', label='car position') + plt.legend() + plt.show() + self._once = False def get_left_boundary(self, rightboundary, track_width): angle = -90 * np.pi / 180 @@ -168,54 +137,70 @@ def get_left_boundary(self, rightboundary, track_width): return leftboundary - # def get_adjusted_boundaries(self, leftboundaryI, rightboundaryI): - # # distance away from boundaries - # distance = 3 - # # left and right boundary lists - # leftboundary = [] - # rightboundary = [] - # # each point on the boundary - # num_points = len(leftboundaryI) - # # left - # for i in range(num_points-10): - # # if i != num_points-1: - # # # get left and right unit vector - forward - # # left_vector = self.get_vector(leftboundaryI[i], leftboundaryI[i+1], True) - # # else: - # # # backward - # # left_vector = self.get_vector(leftboundaryI[i], leftboundaryI[i-1], True) - # left_vector = self.get_vector(leftboundaryI[i], leftboundaryI[i+1], True) - - # # once vectors are found - rotate them and get new point - # angle = 90 * np.pi / 180 - # left_vector = self.get_rotated_vector(angle, left_vector) - - # # append new point - # new_left = leftboundaryI[i] + distance * left_vector - - # leftboundary.append(new_left) - - # num_points = len(rightboundaryI) - # for i in range(num_points-10): - # # if i < num_points-1: - # # # get left and right unit vector - forward - # # right_vector = self.get_vector(rightboundaryI[i], rightboundaryI[i+1], True) - # # else: - # # # backward - # # right_vector = self.get_vector(rightboundaryI[i-1], rightboundaryI[i], True) - # right_vector = self.get_vector(rightboundaryI[i], rightboundaryI[i+1], True) - - # # once vectors are found - rotate them and get new point - # angle = -90 * np.pi / 180 - # right_vector = self.get_rotated_vector(angle, right_vector) - - # # append new point - # new_right = rightboundaryI[i] + distance * right_vector - - # rightboundary.append(new_right) - - # return leftboundary, rightboundary + def get_center_line(self, leftboundary, rightboundary): + '''approximates the track's center line''' + # the midpoint is the average of the coordinates + coods = [] + x = [] + y = [] + num_cones = min(len(leftboundary), len(rightboundary)) + for i in range(num_cones): + x1, y1 = leftboundary[i] + x2, y2 = rightboundary[i] + tmp = self.get_avg_point(x1,y1,x2,y2) + x.append(tmp[0]) + y.append(tmp[1]) + + # interpolate center line + fun = interpolate.interp1d(x, y, kind='quadratic') + xrange = np.linspace(min(x), max(x)) + for X in xrange: + coods.append([X, float(fun(X))]) + + return coods + + def get_local_boundary(self, position_orientation, car_position, leftboundary, rightboundary): + position_vector, rotation_matrix = self.get_transformation_matrix(position_orientation) + x1, y1 = car_position + # output = self.apply_transformation(position_vector, rotation_matrix, x1, y1) - position_vector + dist = [0]*len(leftboundary) + dist2 = [0]*len(rightboundary) + for i, P in enumerate(leftboundary): + x2, y2 = P + # output = self.apply_transformation(position_vector, rotation_matrix, x2, y2) - position_vector + # y2 = output[1][0] + dist[i] = self.get_distance(x1, y1, x2, y2) + # self.get_logger().info(f"dist = {ydist[i]}") + for i, P in enumerate(rightboundary): + x2, y2 = P + dist2[i] = self.get_distance(x1, y1, x2, y2) + + start_index = max(np.argmin(np.array(dist)), np.argmin(np.array(dist2))) + + return start_index + + def interpolate_boundary(self, leftboundary, rightboundary): + # interpolate + lx = [P[0] for P in leftboundary] + ly = [P[1] for P in leftboundary] + rx = [P[0] for P in rightboundary] + + ry = [P[1] for P in rightboundary] + funcL = interpolate.interp1d(lx, ly, kind='quadratic') + funcR = interpolate.interp1d(rx, ry, kind='quadratic') + + xlrange = np.linspace(min(lx), max(lx)) + xrrange = np.linspace(min(rx), max(rx)) + self.get_logger().info(f"length of xlrange = {len(xlrange)}") + lb = [] + rb = [] + for P in xlrange: + lb.append([P,funcL(P)]) + + for P in xrrange: + rb.append([P,funcR(P)]) + return lb, rb def get_vector(self, p1, p2, unit=True): '''calculates point vector based on slope''' @@ -257,7 +242,7 @@ def delete_optimise_trajectories(self, msg: AllTrajectories): trajectories = msg.trajectories self.get_logger().info(f"number of paths before deletion = {len(trajectories)}") - self.trajectory_deletion(trajectories, states) + # self.trajectory_deletion(trajectories, states) self.get_logger().info(f"number of paths after deletion = {len(trajectories)}") best_trajectory_idx = self.optimisation(trajectories, states) @@ -274,7 +259,7 @@ def delete_optimise_trajectories(self, msg: AllTrajectories): self.best_trajectory_publisher.publish(posearray_msg) # publish valid (within boundaries) trajectories including center line - ps = [Pose(position=Point(x=P[0], y=P[1], z=0.0)) for P in self._center_line_coordinates] + ps = [Pose(position=Point(x=P[0], y=P[1], z=0.0)) for P in self._center_coods] trajectories.append(PoseArray(poses=ps)) alltrajectories_msg = { "id": msg.id, @@ -334,11 +319,10 @@ def set_within_boundary_trajectories(self, trajectories, states): pts_list = [(P[0], P[1]) for P in self._rightboundary] # if self._debug: # pts_list.pop(0) - self.get_logger().info(f"left boundary = {self._rightboundary}") self._right_boundary_linestring = LineString(pts_list) # track width - track_width = self.get_track_width() + track_width = self._track_width for i in range(len(trajectories)): # trajectory line string @@ -443,10 +427,9 @@ def get_best_trajectory_state(self, trajectories, states): trajectory_lengths = np.zeros(len(trajectories)) # get track width - width = self.get_track_width() + width = self._track_width # get center line - center_linestring, self._center_line_coordinates = self.get_center_line() - + center_linestring = LineString(self._center_coods) for i in range(len(trajectories)): trajectory = self.get_shapely_linestring(trajectories[i].poses) @@ -490,56 +473,9 @@ def get_shapely_point(self, x, y) -> shapelyPoint: def get_shapely_linestring(self, poses) -> LineString: return LineString([(P.position.x, P.position.y) for P in poses]) - - def get_track_width(self): - return self._right_boundary_linestring.distance(self._left_boundary_linestring) - - def get_center_line(self): - '''approximates the track's center line''' - # the midpoint is the average of the coordinates - coods = [] - num_cones = min(len(self._leftboundary), len(self._rightboundary)) - for i in range(num_cones): - x1 = self._leftboundary[i][0] - y1 = self._leftboundary[i][1] - x2 = self._rightboundary[i][0] - y2 = self._rightboundary[i][1] - coods.append(self.get_avg_point(x1,y1,x2,y2)) - - # # perform extrapolation to extend line - # func = interpolate.interp1d([P[0] for P in coods], [P[1] for P in coods], kind='cubic', fill_value='extrapolate') - - # into_future_points = 0 - # into_future_distance = 0 - - # # track direction - # if coods[-1][0] > coods[-2][0]: - # # positive/right - # direction = 1 - # elif coods[-1][0] < coods[-2][0]: - # # negative/left - # direction = -1 - # elif abs(coods[-1][0] - coods[-2][0]) <= 1e-3: - # # straight - # into_future_distance = 0 - # into_future_points = 0 - - # if into_future_points != 0: - # xnew = [] - # into_future_distance += direction - # for i in range(into_future_points): - # xnew.append(coods[-1][0] + into_future_distance) - # into_future_distance += direction - # # get new pts - # ynew = func(xnew) - # # add approximated coordinates to center line - # for i in range(into_future_points): - # coods.append((xnew[i],ynew[i])) - - return LineString(coods), coods def get_avg_point(self, x1, y1, x2, y2): - return (((x1 + x2)/2), ((y1 + y2)/2)) + return [((x1 + x2)/2), ((y1 + y2)/2)] def get_geometry_distance(self, geometry1, geometry2): '''calculates the eucledian distance bewteen two shapely geometries''' @@ -561,30 +497,55 @@ def get_best_trajectory_index(self, trajectory_lengths, trajectory_distances): return idx except ValueError: return None - - def DEBUG_generate_trajectories(self, n): - # list of all trajectories and states (for now just steering angle in rads) - all_traj = [] - all_states = -np.random.random(n) + np.random.random(n) - # make n pose arrays - x = y = z = 0.0 - for i in range(n): - # contains poses for ith pose array - temp = [] - # make m poses with random coordinates - for j in range(3): - pose = Pose() - pose.position.x, pose.position.y = x, y - # pose_array.poses = pose - temp.append(pose) - # calculate new x, y, z sqrt((x2-x1)^2+(y2-y1)^2) = length with x2 unknown - x = np.sqrt(0.1**2) + x - pose_array = PoseArray() - pose_array.poses = temp - # append pose array to all trajectories - all_traj.append(pose_array) - - return all_traj, all_states + + def get_position_of_cart(self, cone_map): + # first cone + localization_data = cone_map.cones[0] + x = localization_data.pose.pose.position.x + y = localization_data.pose.pose.position.y + theta = localization_data.pose.pose.orientation.w + return x, y, theta + + def get_transformation_matrix(self, position_and_orientation): + # theta = position_and_orientation[2] - np.pi/2 + cart_x = position_and_orientation[0] + cart_y = position_and_orientation[1] + theta = position_and_orientation[2] + # 2d trasformation matrix + rotation_matrix = np.array([[np.cos(theta), -np.sin(theta)],[np.sin(theta), np.cos(theta)]]) + position_vector = np.array([[cart_x], [cart_y]]) + + return position_vector, rotation_matrix + + def apply_transformation(self, position_vector, rotation_matrix, point_x, point_y): + point = np.array([[point_x], [point_y]]) + # matrix multiplication for rotation then translate from car position + transformed_point = np.matmul(rotation_matrix, point) + position_vector + return transformed_point + + # def DEBUG_generate_trajectories(self, n): + # # list of all trajectories and states (for now just steering angle in rads) + # all_traj = [] + # all_states = -np.random.random(n) + np.random.random(n) + # # make n pose arrays + # x = y = z = 0.0 + # for i in range(n): + # # contains poses for ith pose array + # temp = [] + # # make m poses with random coordinates + # for j in range(3): + # pose = Pose() + # pose.position.x, pose.position.y = x, y + # # pose_array.poses = pose + # temp.append(pose) + # # calculate new x, y, z sqrt((x2-x1)^2+(y2-y1)^2) = length with x2 unknown + # x = np.sqrt(0.1**2) + x + # pose_array = PoseArray() + # pose_array.poses = temp + # # append pose array to all trajectories + # all_traj.append(pose_array) + + # return all_traj, all_states def main(): rclpy.init() diff --git a/src/visualization/path_planning_visualization/path_planning_visualization/visualise_trajectories.py b/src/visualization/path_planning_visualization/path_planning_visualization/visualise_trajectories.py index d90ff55..ac9215f 100644 --- a/src/visualization/path_planning_visualization/path_planning_visualization/visualise_trajectories.py +++ b/src/visualization/path_planning_visualization/path_planning_visualization/visualise_trajectories.py @@ -6,6 +6,7 @@ from ackermann_msgs.msg import AckermannDrive from builtin_interfaces.msg import Time, Duration +from std_msgs.msg import Int16, Int32MultiArray import rclpy from rclpy.node import Node @@ -18,135 +19,79 @@ def __init__(self): self.next_destination_vis = [] - self.pubviz = self.create_publisher(SceneUpdate, 'visualization_trajectories', 5) + self.pubviz = self.create_publisher(SceneUpdate, 'visualization_trajectories', 10) # sub to all trajectories points and states - #self.all_paths = self.create_subscription(AllTrajectories, "moa/inbound_trajectories", self.show_paths, 5) - # self.all_paths = self.create_subscription(AllTrajectories, "moa/trajectories", self.show_paths, 5) - #self.all_states = self.create_subscription(AllStates, "moa/inbound_states", self.get_all_states, 5) - # selected path - #self.chosen_states = self.create_subscription(AckermannDrive, "moa/selected_trajectory", self.get_chosen_state_idx, 5) - #self.chosen_path = self.create_subscription(PoseArray, "moa/selected_trajectory", self.get_chosen_trajectory, 5) - self.chosen_path = self.create_subscription(PoseArray, "moa/selected_trajectory", self.show_chosen_paths, 5) - self.next_destination = self.create_subscription(Pose, "moa/next_destination", self.save_next_destination, 5) + self.create_subscription(AllTrajectories, "moa/inbound_trajectories", self.set_inbound_trajectories, 10) + self.create_subscription(AllTrajectories, "moa/trajectories", self.show_paths, 10) + self.create_subscription(Int16, "moa/best_trajectory_index", self.get_chosen_trajectory, 10) self.id = 1 - # def get_all_states(self, msg:AllStates) -> None: self.states = [i.steering_angle for i in msg.states] - def get_chosen_state_idx(self, msg:AckermannDrive) -> None: - if hasattr(self, "states"): - self.chosen_idx = np.where(np.isclose(self.states, msg.steering_angle, 1e-3))[0][0] + def get_chosen_trajectory(self, msg: Int16) -> None: + print(f"chosen idx got={msg.data}") + self.chosen_trajectory = msg.data - def get_chosen_trajectory(self, msg: PoseArray) -> None: - self.chosen_trajectory = msg + def set_inbound_trajectories(self, msg: AllTrajectories) -> None: + self.inbounds = msg + # def set_out_of_bounds_indicies(self, msg: Int32MultiArray) -> None: + # self.invalid_bounds_indicies = msg.data def show_paths(self, msg: AllTrajectories): - #if not hasattr(self,"chosen_trajectory"): - # self.get_logger().info("attribute not initialized") - # return - - line_list = [] - # list of pose array - pths = msg.trajectories - pths.append(self.chosen_trajectory) - for i in range(len(pths)): - if i == len(pths) - 1: - tcols = Color(r=255.0, g=255.0, b=255.0, a=1.0) - elif i == len(pths) - 2: - tcols = Color(r=0.0, g=255.0, b=0.0, a=1.0) - else: - tcols = Color(r=255.0, g=0.0, b=0.0, a=1.0) - pts = [] - for j in range(len(pths[i].poses)): - # get a particular pose - _ = pths[i].poses[j].position - pts.append(_) - args = {'type': LinePrimitive.LINE_STRIP, - 'pose': Pose(position=Point(x=0.0,y=0.0,z=0.0), orientation=Quaternion(x=0.0,y=0.0,z=0.0,w=0.0)), - 'thickness': 2.0, - 'scale_invariant': True, - 'points': pts, - 'color': tcols} - line_list.append(LinePrimitive(**args)) - - # arrow primitive code if needed - # args = {'pose': Pose(position=Point(x=1.0,y=0.0,z=0.0), orientation=Quaternion(x=0.0,y=0.0,z=0.0,w=0.0)), - # 'shaft_length': 1.0, - # 'shaft_diameter': 0.1, - # 'head_length': 2.5, - # 'head_diameter': 0.5, - # 'color': Color(r=67.0,g=125.0,b=100.0,a=1.0)} - # msg = ArrowPrimitive(**args) - - # scene entity encapsulates these primitive objects - sargs = {'timestamp': Time(sec=0,nanosec=0), - 'frame_id': 'global_frame', - 'id': f'{self.id}', - 'lifetime': Duration(sec=3,nanosec=0), - 'frame_locked': False, - 'lines': line_list, - 'spheres': self.next_destination_vis} - # scene update is a wrapper for scene entity - scene_update_msg = SceneUpdate(entities=[SceneEntity(**sargs)]) - - self.pubviz.publish(scene_update_msg) - #self.get_logger().info("Published msg") - - self.id += 1 - - def show_chosen_paths(self, msg: PoseArray): - tcols = Color(r=0.0, g=255.0, b=0.0, a=1.0) - pts = [] - line_list = []; - for j in range(len(msg.poses)): - # get a particular pose - _ = msg.poses[j].position - pts.append(_) - args = {'type': LinePrimitive.LINE_STRIP, - 'pose': Pose(position=Point(x=0.0, y=0.0, z=0.0), - orientation=Quaternion(x=0.0, y=0.0, z=0.0, w=0.0)), - 'thickness': 2.0, - 'scale_invariant': True, - 'points': pts, - 'color': tcols} - line_list.append(LinePrimitive(**args)) - - # arrow primitive code if needed - # args = {'pose': Pose(position=Point(x=1.0,y=0.0,z=0.0), orientation=Quaternion(x=0.0,y=0.0,z=0.0,w=0.0)), - # 'shaft_length': 1.0, - # 'shaft_diameter': 0.1, - # 'head_length': 2.5, - # 'head_diameter': 0.5, - # 'color': Color(r=67.0,g=125.0,b=100.0,a=1.0)} - # msg = ArrowPrimitive(**args) - - # scene entity encapsulates these primitive objects - sargs = {'timestamp': Time(sec=0,nanosec=0), - 'frame_id': 'global_frame', - 'id': f'{self.id}', - 'lifetime': Duration(sec=3,nanosec=0), - 'frame_locked': False, - 'lines': line_list, - 'spheres': self.next_destination_vis} - - # scene update is a wrapper for scene entity - scene_update_msg = SceneUpdate(entities=[SceneEntity(**sargs)]) - - self.pubviz.publish(scene_update_msg) - #self.get_logger().info("Published msg") - - self.id += 1 - - def save_next_destination(self, msg : Pose): - tcols = Color(r=255.0, g=255.0, b=0.0, a=1.0) - args = {'pose': msg, - 'size': Vector3(x=1.0, y=1.0, z=1.0), - 'color': tcols} - if len(self.next_destination_vis) == 0: - self.next_destination_vis.append(SpherePrimitive(**args)) - else: - self.next_destination_vis[0] = SpherePrimitive(**args) + if hasattr(self,"chosen_trajectory"): + line_list = [] + paths = msg.trajectories + paths.append(self.inbounds.trajectories[-1]) # append center line + + for i in range(len(paths)): + # choose color + # chosen + if i == self.chosen_trajectory: + # green + tcols = Color(r=0.0, g=255.0, b=0.0, a=1.0) + thickness = 5.0 + # center line + elif i == len(paths) - 1: + # blue + tcols = Color(r=0.0, g=0.0, b=255.0, a=1.0) + thickness = 3.0 + # self.get_logger().info(f"center pts: {len(pths[i].poses)}") + # other lines + else: + tcols = Color(r=255.0, g=255.0, b=255.0, a=0.8) + thickness = 1.0 + + # get points + points = [] + for j in range(len(paths[i].poses)): + points.append(paths[i].poses[j].position) + args = {'type': LinePrimitive.LINE_STRIP, + 'pose': Pose(position=Point(x=0.0,y=0.0,z=0.0), orientation=Quaternion(x=0.0,y=0.0,z=0.0,w=0.0)), + 'thickness': thickness, + 'scale_invariant': True, + 'points': points, + 'color': tcols} + line_list.append(LinePrimitive(**args)) + + + # scene entity encapsulates these primitive objects + sargs = {'timestamp': Time(sec=0,nanosec=0), + 'frame_id': 'global_frame', + 'id': f'{self.id}', + 'lifetime': Duration(sec=2,nanosec=100), + 'frame_locked': False, + 'lines': line_list} + + # scene update is a wrapper for scene entity + self.pubviz.publish(SceneUpdate(entities=[SceneEntity(**sargs)])) + self.get_logger().info("Published msg") + + self.id += 1 + return + + self.get_logger().info("attributes not initialized") + return def main(): diff --git a/src/visualization/path_planning_visualization/path_planning_visualization/visualise_trajectories_demo.py b/src/visualization/path_planning_visualization/path_planning_visualization/visualise_trajectories_demo.py index e82940e..d815a22 100644 --- a/src/visualization/path_planning_visualization/path_planning_visualization/visualise_trajectories_demo.py +++ b/src/visualization/path_planning_visualization/path_planning_visualization/visualise_trajectories_demo.py @@ -23,12 +23,6 @@ def __init__(self): # sub to all trajectories points and states self.all_paths = self.create_subscription(AllTrajectories, "moa/inbound_trajectories", self.set_inbound_trajectories, 10) self.create_subscription(AllTrajectories, "moa/trajectories", self.show_paths, 10) - #self.all_states = self.create_subscription(AllStates, "moa/inbound_states", self.get_all_states, 5) - # selected path - #self.chosen_states = self.create_subscription(AckermannDrive, "moa/selected_trajectory", self.get_chosen_state_idx, 5) - # self.create_subscription(PoseArray, "moa/selected_trajectory", self.get_chosen_trajectory, 5) - # self.chosen_path = self.create_subscription(PoseArray, "moa/selected_trajectory", self.show_chosen_paths, 5) - # self.next_destination = self.create_subscription(Pose, "moa/next_destination", self.save_next_destination, 5) self.create_subscription(Int16, "moa/best_trajectory_index", self.get_chosen_trajectory, 10) self.create_subscription(Int32MultiArray,"moa/out_of_bounds",self.set_out_of_bounds_indicies, 10) @@ -69,6 +63,7 @@ def show_paths(self, msg: AllTrajectories): # blue tcols = Color(r=0.0, g=0.0, b=255.0, a=1.0) thickness = 3.0 + self.get_logger().info(f"center pts: {len(pths[i].poses)}") # out of bounds elif i in self.invalid_bounds_indicies: appeneded += 1 From 6398553027f18a654b6b705d0644fc405320a2f5 Mon Sep 17 00:00:00 2001 From: Tanish Bhatt Date: Sun, 2 Jun 2024 21:57:55 +1200 Subject: [PATCH 2/4] code cleanup - not working atm --- .../trajectory_optimisation_CS.py | 227 +++++++----------- 1 file changed, 90 insertions(+), 137 deletions(-) diff --git a/src/planning/path_planning/path_planning/trajectory_optimisation_CS.py b/src/planning/path_planning/path_planning/trajectory_optimisation_CS.py index a6e9981..0693f5b 100644 --- a/src/planning/path_planning/path_planning/trajectory_optimisation_CS.py +++ b/src/planning/path_planning/path_planning/trajectory_optimisation_CS.py @@ -23,26 +23,21 @@ def __init__(self): super().__init__("trajectory_optimisation") self.get_logger().info("Trajectory Optimisation Node Started") - self.declare_parameters( - namespace='', - parameters=[ - ('debug', True) - ] - ) + # self.declare_parameters( + # namespace='', + # parameters=[ + # ('', True) + # ] + # ) # attributes - self._debug = self.get_parameter('debug').get_parameter_value().bool_value self._once = True # subscribers self.create_subscription(AllStates, "moa/states", self.set_states, 10) - self.create_subscription(AllTrajectories, "moa/trajectories", self.delete_optimise_trajectories, 10) - self.create_subscription(AckermannDrive, "moa/cur_vel", self.set_current_speed, 10) - # self.create_subscription(BoundaryStamped, "track/bound_l", self.set_left_boundary, 10) - # self.create_subscription(BoundaryStamped, "track/bound_r", self.set_right_boundary, 10) - # only debugging - self.create_subscription(ConeMap, "cone_map", self.set_boundaries, 10) - self.best_steering_angle_pub = self.create_publisher(Float32, "moa/selected_steering_angle", 10) + self.create_subscription(AllTrajectories, "moa/trajectories", self.get_generated_trajectories, 10) + self.create_subscription(ConeMap, "cone_map", self.callback, 10) + # self.create_subscription(AckermannDrive, "moa/cur_vel", self.set_current_speed, 10) # publishers # self.best_trajectory_publisher = self.create_publisher(AckermannDrive, "moa/selected_trajectory", 10) @@ -50,33 +45,35 @@ def __init__(self): self.within_boundary_trajectories_publisher = self.create_publisher(AllTrajectories, 'moa/inbound_trajectories', 10) self.best_trajectory_index = self.create_publisher(Int16, "moa/best_trajectory_index", 10) self.out_of_bounds_indicies = self.create_publisher(Int32MultiArray, "moa/out_of_bounds", 10) + self.best_steering_angle_pub = self.create_publisher(Float32, "moa/selected_steering_angle", 10) def set_states(self, msg: AllStates) -> None: self._state_msg = msg - def set_current_speed(self, msg: AckermannDrive) -> None: self._current_speed = msg.speed - - def set_left_boundary(self, msg: BoundaryStamped) -> None: self._leftboundary = msg.coords - - def set_right_boundary(self, msg: BoundaryStamped) -> None: self._rightboundary = msg.coords - - def set_boundaries(self, msg: ConeMap): - if self._debug: - cones = msg.cones - # loop through each cone - leftboundary = [] - rightboundary = [] - for i in range(len(cones)): - x = cones[i].pose.pose.position.x - y = cones[i].pose.pose.position.y - if i != 0: - # blue - left - if cones[i].colour == 0: - leftboundary.append([x,y]) - elif cones[i].colour == 2: - rightboundary.append([x,y]) - else: - car_position = [x,y] + def set_generated_trajectories(self, msg: AllTrajectories) -> None: self._trajectories_msg = msg + + def callback(self, msg:ConeMap) -> None: + '''retrives cone msg and find trajectory closest to centerline''' + self.get_logger().info(f"all states: {hasattr(self,'_state_msg')}" + f" | current speed: {hasattr(self,'_current_speed')}" \ + f" | left boundaries: {hasattr(self,'_leftboundary')}"\ + f" | right boundaries: {hasattr(self,'_rightboundary')}") + + cones = msg.cones # get cones + + # boundary list + leftboundary = [] + rightboundary = [] + for i in range(len(cones)): + x = cones[i].pose.pose.position.x # x point + y = cones[i].pose.pose.position.y # y point + if i != 0: + if cones[i].colour == 0: # 0 is blue which is left + leftboundary.append([x,y]) + elif cones[i].colour == 2: # 2 is yellow which is right + rightboundary.append([x,y]) + else: + car_position = [x,y] # first cone is car position # get local boundary points @@ -88,29 +85,75 @@ def set_boundaries(self, msg: ConeMap): # leftboundary, rightboundary = self.interpolate_boundary(leftboundary, rightboundary) # get center line - coods = self.get_center_line(leftboundary, rightboundary) + center_coordinates = self.get_center_line(leftboundary, rightboundary) - x1, y1, x2, y2 = leftboundary[0][0], leftboundary[0][1], rightboundary[0][0], rightboundary[0][1] - track_width = self.get_distance(x1, y1, x2, y2) - # print(f"left boundary = {len(leftboundary)}, right boundary = {len(rightboundary)}") + # x1, y1, x2, y2 = leftboundary[0][0], leftboundary[0][1], rightboundary[0][0], rightboundary[0][1] + # track_width = self.get_distance(x1, y1, x2, y2) + # adjust boundaries # self._leftboundary, self._rightboundary = self.get_adjusted_boundaries(leftboundary, rightboundary) - self._leftboundary = leftboundary - self._rightboundary = rightboundary - self._track_width = track_width - self._center_coods = coods + # self._leftboundary = leftboundary + # self._rightboundary = rightboundary + # self._track_width = track_width + # self._center_coods = coods + + + if self._state_msg.id == self._trajectories_msg.id: # check if the steering angle (states) and generate trajectories are the same + states = [ackerman_msg.steering_angle for ackerman_msg in self._state_msg.states] + trajectories = self._trajectories_msg + + self.get_logger().info(f"trajectories before deletion = {len(trajectories)}") + self.trajectory_deletion(trajectories, states) # delete invalid trajectories + self.get_logger().info(f"number of paths after deletion = {len(trajectories)}") + + best_trajectory_idx = self.optimisation(trajectories, states) # find best/optimised trajectory + if best_trajectory_idx == None: # check if no trajectory found + self.get_logger().info("no valid trajectories found") + else: + self.get_logger().info(f"best steering angle is = {states[best_trajectory_idx]}") + + # publish best trajectory + args = {"header": Header(stamp=Time(sec=0,nanosec=0), frame_id='best_trajectory'), + "poses": trajectories[best_trajectory_idx].poses} + posearray_msg = PoseArray(**args) + + # publish valid (within boundaries) trajectories including center line + ps = [Pose(position=Point(x=P[0], y=P[1], z=0.0)) for P in self._center_coods] + trajectories.append(PoseArray(poses=ps)) + alltrajectories_msg = {"id": msg.id, "trajectories": trajectories} + + # publish within boundary trajectory states + # states_msg = [] + # for sta in states: + # sargs = {"steering_angle": sta, + # "steering_angle_velocity": 0.0, + # "speed": self._current_speed, + # "acceleration": 0.0, + # "jerk": 0.0} + # states_msg.append(AckermannDrive(**sargs)) + # self.within_boundary_states_publisher.publish(AllStates(id=msg.id, states=states_msg)) + + self.within_boundary_trajectories_publisher.publish(AllTrajectories(**alltrajectories_msg)) # within bound pub + self.best_trajectory_publisher.publish(posearray_msg) # best trajectory pub + self.best_steering_angle_pub.publish(Float32(data=states[best_trajectory_idx])) # optimal steering angle pub + + self.get_logger().info("msg published") + else: + self.get_logger().info(f"Ids state:{self._state_msg.id} and trajectory:{self._trajectories_msg.id} do not match") - # plot + + # plot track if self._once: # plot plt.plot([P[0] for P in leftboundary], [P[1] for P in leftboundary], "-g", label='leftboundary') plt.plot([P[0] for P in rightboundary], [P[1] for P in rightboundary], "-r", label='rightboundary') - plt.plot([P[0] for P in coods], [P[1] for P in coods], "ob", label='centerline') + plt.plot([P[0] for P in center_coordinates], [P[1] for P in center_coordinates], "ob", label='centerline') plt.plot([car_position[0]],[car_position[1]],'or', label='car position') plt.legend() plt.show() self._once = False + def get_left_boundary(self, rightboundary, track_width): angle = -90 * np.pi / 180 leftboundary = [] @@ -220,73 +263,6 @@ def get_magnitude(self, vector): return np.sqrt(sum(vector**2)) - def delete_optimise_trajectories(self, msg: AllTrajectories): - '''deletes trajectories then optimises to find the best path for car''' - - if self._debug: self._current_speed = 0.0 - self.get_logger().info(f"all states: {hasattr(self,'_state_msg')}" - f" | current speed: {hasattr(self,'_current_speed')}" \ - f" | left boundaries: {hasattr(self,'_leftboundary')}"\ - f" | right boundaries: {hasattr(self,'_rightboundary')}") - - if hasattr(self,"_state_msg") and hasattr(self,"_current_speed") \ - and hasattr(self,"_leftboundary") and hasattr(self,"_rightboundary"): - # check ids - if self._state_msg.id != msg.id: - self.get_logger().info(f"Ids state:{self._state_msg.id} and trajectory:{msg.id} do not match") - # return - - # set trajectories and states as a list - # states = [self._state_msg.states[i].steering_angle for i in range(len(self._state_msg.states))] - states = [command.steering_angle for command in self._state_msg.states] - trajectories = msg.trajectories - - self.get_logger().info(f"number of paths before deletion = {len(trajectories)}") - # self.trajectory_deletion(trajectories, states) - self.get_logger().info(f"number of paths after deletion = {len(trajectories)}") - best_trajectory_idx = self.optimisation(trajectories, states) - - # check for no trajectories - if best_trajectory_idx == None: - self.get_logger().info("no valid trajectories found") - return - self.get_logger().info(f"best state is = {states[best_trajectory_idx]}") - - # publish best trajectory - args = {"header": Header(stamp=Time(sec=0,nanosec=0), frame_id='best_trajectory'), - "poses": trajectories[best_trajectory_idx].poses} - posearray_msg = PoseArray(**args) - self.best_trajectory_publisher.publish(posearray_msg) - - # publish valid (within boundaries) trajectories including center line - ps = [Pose(position=Point(x=P[0], y=P[1], z=0.0)) for P in self._center_coods] - trajectories.append(PoseArray(poses=ps)) - alltrajectories_msg = { - "id": msg.id, - "trajectories": trajectories - } - self.within_boundary_trajectories_publisher.publish(AllTrajectories(**alltrajectories_msg)) - - # publish within boundary trajectory states - # states_msg = [] - # for sta in states: - # sargs = {"steering_angle": sta, - # "steering_angle_velocity": 0.0, - # "speed": self._current_speed, - # "acceleration": 0.0, - # "jerk": 0.0} - # states_msg.append(AckermannDrive(**sargs)) - # self.within_boundary_states_publisher.publish(AllStates(id=msg.id, states=states_msg)) - - # publish best steering angle - self.best_steering_angle_pub.publish(Float32(data=states[best_trajectory_idx])) - - self.get_logger().info("msg published") - - return - - return - # =========================================================== # TRAJECTORY DELETION def trajectory_deletion(self, trajectories, states): @@ -523,29 +499,6 @@ def apply_transformation(self, position_vector, rotation_matrix, point_x, point_ transformed_point = np.matmul(rotation_matrix, point) + position_vector return transformed_point - # def DEBUG_generate_trajectories(self, n): - # # list of all trajectories and states (for now just steering angle in rads) - # all_traj = [] - # all_states = -np.random.random(n) + np.random.random(n) - # # make n pose arrays - # x = y = z = 0.0 - # for i in range(n): - # # contains poses for ith pose array - # temp = [] - # # make m poses with random coordinates - # for j in range(3): - # pose = Pose() - # pose.position.x, pose.position.y = x, y - # # pose_array.poses = pose - # temp.append(pose) - # # calculate new x, y, z sqrt((x2-x1)^2+(y2-y1)^2) = length with x2 unknown - # x = np.sqrt(0.1**2) + x - # pose_array = PoseArray() - # pose_array.poses = temp - # # append pose array to all trajectories - # all_traj.append(pose_array) - - # return all_traj, all_states def main(): rclpy.init() From 710f614312474d64da6a34b6c5d4b109ac493206 Mon Sep 17 00:00:00 2001 From: Tanish Bhatt Date: Mon, 3 Jun 2024 15:10:51 +1200 Subject: [PATCH 3/4] more cleanup - still not work atm --- .../trajectory_optimisation_CS.py | 354 +++++++----------- 1 file changed, 144 insertions(+), 210 deletions(-) diff --git a/src/planning/path_planning/path_planning/trajectory_optimisation_CS.py b/src/planning/path_planning/path_planning/trajectory_optimisation_CS.py index 0693f5b..22949a6 100644 --- a/src/planning/path_planning/path_planning/trajectory_optimisation_CS.py +++ b/src/planning/path_planning/path_planning/trajectory_optimisation_CS.py @@ -40,7 +40,6 @@ def __init__(self): # self.create_subscription(AckermannDrive, "moa/cur_vel", self.set_current_speed, 10) # publishers - # self.best_trajectory_publisher = self.create_publisher(AckermannDrive, "moa/selected_trajectory", 10) self.best_trajectory_publisher = self.create_publisher(PoseArray, "moa/selected_trajectory", 10) self.within_boundary_trajectories_publisher = self.create_publisher(AllTrajectories, 'moa/inbound_trajectories', 10) self.best_trajectory_index = self.create_publisher(Int16, "moa/best_trajectory_index", 10) @@ -92,10 +91,10 @@ def callback(self, msg:ConeMap) -> None: # adjust boundaries # self._leftboundary, self._rightboundary = self.get_adjusted_boundaries(leftboundary, rightboundary) - # self._leftboundary = leftboundary - # self._rightboundary = rightboundary + self._leftboundary = leftboundary + self._rightboundary = rightboundary # self._track_width = track_width - # self._center_coods = coods + self._center_coods = center_coordinates if self._state_msg.id == self._trajectories_msg.id: # check if the steering angle (states) and generate trajectories are the same @@ -122,22 +121,12 @@ def callback(self, msg:ConeMap) -> None: trajectories.append(PoseArray(poses=ps)) alltrajectories_msg = {"id": msg.id, "trajectories": trajectories} - # publish within boundary trajectory states - # states_msg = [] - # for sta in states: - # sargs = {"steering_angle": sta, - # "steering_angle_velocity": 0.0, - # "speed": self._current_speed, - # "acceleration": 0.0, - # "jerk": 0.0} - # states_msg.append(AckermannDrive(**sargs)) - # self.within_boundary_states_publisher.publish(AllStates(id=msg.id, states=states_msg)) - + # publish msgs self.within_boundary_trajectories_publisher.publish(AllTrajectories(**alltrajectories_msg)) # within bound pub self.best_trajectory_publisher.publish(posearray_msg) # best trajectory pub self.best_steering_angle_pub.publish(Float32(data=states[best_trajectory_idx])) # optimal steering angle pub - self.get_logger().info("msg published") + self.get_logger().info("OPTIMAL TRAJECTORY COMPUTED") else: self.get_logger().info(f"Ids state:{self._state_msg.id} and trajectory:{self._trajectories_msg.id} do not match") @@ -153,33 +142,6 @@ def callback(self, msg:ConeMap) -> None: plt.show() self._once = False - - def get_left_boundary(self, rightboundary, track_width): - angle = -90 * np.pi / 180 - leftboundary = [] - # loop through all left boundary points - for i in range(len(rightboundary)): - P = rightboundary[i] - # check what point is - if i == 0: - # forward point - vector = self.get_vector(rightboundary[i], rightboundary[i+1], True) - elif i == len(rightboundary)-1: - # backward point - vector = self.get_vector(rightboundary[i-1], rightboundary[i], True) - else: - # centeral points - vector = self.get_vector(rightboundary[i-1], rightboundary[i+1], True) - - # rotate vector - vector = self.get_rotated_vector(angle,vector) - # compute new point - new_point = P + track_width * vector - # append new point - leftboundary.append(new_point) - - return leftboundary - def get_center_line(self, leftboundary, rightboundary): '''approximates the track's center line''' # the midpoint is the average of the coordinates @@ -201,67 +163,13 @@ def get_center_line(self, leftboundary, rightboundary): coods.append([X, float(fun(X))]) return coods - - def get_local_boundary(self, position_orientation, car_position, leftboundary, rightboundary): - position_vector, rotation_matrix = self.get_transformation_matrix(position_orientation) - x1, y1 = car_position - # output = self.apply_transformation(position_vector, rotation_matrix, x1, y1) - position_vector - dist = [0]*len(leftboundary) - dist2 = [0]*len(rightboundary) - for i, P in enumerate(leftboundary): - x2, y2 = P - # output = self.apply_transformation(position_vector, rotation_matrix, x2, y2) - position_vector - # y2 = output[1][0] - dist[i] = self.get_distance(x1, y1, x2, y2) - # self.get_logger().info(f"dist = {ydist[i]}") - for i, P in enumerate(rightboundary): - x2, y2 = P - dist2[i] = self.get_distance(x1, y1, x2, y2) - - start_index = max(np.argmin(np.array(dist)), np.argmin(np.array(dist2))) - return start_index - - def interpolate_boundary(self, leftboundary, rightboundary): - # interpolate - lx = [P[0] for P in leftboundary] - ly = [P[1] for P in leftboundary] - rx = [P[0] for P in rightboundary] - - ry = [P[1] for P in rightboundary] - funcL = interpolate.interp1d(lx, ly, kind='quadratic') - funcR = interpolate.interp1d(rx, ry, kind='quadratic') - - xlrange = np.linspace(min(lx), max(lx)) - xrrange = np.linspace(min(rx), max(rx)) - self.get_logger().info(f"length of xlrange = {len(xlrange)}") - lb = [] - rb = [] - for P in xlrange: - lb.append([P,funcL(P)]) - - for P in xrrange: - rb.append([P,funcR(P)]) - - return lb, rb - - def get_vector(self, p1, p2, unit=True): - '''calculates point vector based on slope''' - vector = np.array(p2)-np.array(p1) - if unit: - return vector / self.get_magnitude(vector) - - return vector - - def get_rotated_vector(self, angle, vector): - # rotation matrix - rotate = np.array([[np.cos(angle), -np.sin(angle)], [np.sin(angle), np.cos(angle)]]) - - return np.dot(rotate,np.array(vector)) - - def get_magnitude(self, vector): - return np.sqrt(sum(vector**2)) + def get_shapely_linestring(self, points) -> LineString: + '''create a shapely linestring from an array/list of [x,y] points''' + return LineString([(P[0], P[1]) for P in points]) + def get_avg_point(self, x1, y1, x2, y2): + return [((x1 + x2)/2), ((y1 + y2)/2)] # =========================================================== # TRAJECTORY DELETION @@ -289,33 +197,22 @@ def set_within_boundary_trajectories(self, trajectories, states): # # trajectories.pop([i for i in range(len(trajectories)) if T==trajectories[i]][0]) # break - # shapely library - self._left_boundary_linestring = LineString([(P[0], P[1]) for P in self._leftboundary]) - - pts_list = [(P[0], P[1]) for P in self._rightboundary] - # if self._debug: - # pts_list.pop(0) - self._right_boundary_linestring = LineString(pts_list) + left_boundary_linestring = self.get_shapely_linestring(self._leftboundary) + right_boundary_linestring = self.get_shapely_linestring(self._rightboundary) # track width track_width = self._track_width - for i in range(len(trajectories)): - # trajectory line string - trajectory = LineString([(P.position.x, P.position.y) for P in trajectories[i].poses]) - - if i == len(trajectories)/2 - 1: - print('the long one') + for i in range(len(trajectories)): # loop each trajectory + trajectory = self.get_shapely_linestring([[P.position.x, P.position.y] for P in trajectories[i].poses]) tmp = None - # check intersection - if trajectory.intersects(self._left_boundary_linestring): + if trajectory.intersects(left_boundary_linestring): # left bound intersection # get intersection point/s - tmp = trajectory.intersection(self._left_boundary_linestring) + tmp = trajectory.intersection(left_boundary_linestring) intersection = "left" - - elif trajectory.intersects(self._right_boundary_linestring): - tmp = trajectory.intersection(self._right_boundary_linestring) + elif trajectory.intersects(right_boundary_linestring): # right bound intersection + tmp = trajectory.intersection(right_boundary_linestring) intersection = "right" if tmp is not None: @@ -345,6 +242,7 @@ def set_within_boundary_trajectories(self, trajectories, states): print(f"removed indices = {remove_trajectories_indices}") self.out_of_bounds_indicies.publish(Int32MultiArray(data=remove_trajectories_indices)) + # remove out of bound trajectories [(trajectories.pop(index), states.pop(index)) for index in list(reversed(remove_trajectories_indices))] def get_distance(self,x1,y1,x2,y2): @@ -390,115 +288,151 @@ def compare_with_boundary(self, x, y, tol): # =========================================================== # OPTIMISATION - def optimisation(self, trajectories, states): + def optimisation(self, trajectories): '''returns the best trajectory''' self.get_logger().info("Optimisation Started") - return self.get_best_trajectory_state(trajectories, states) + return self.get_best_trajectory_state(trajectories) - def get_best_trajectory_state(self, trajectories, states): - '''finds the best trajectory''' + def get_best_trajectory_state(self, trajectories): + '''finds the best trajectory from the set''' trajectory_distances = np.ones(len(trajectories)) * np.inf - trajectory_lengths = np.zeros(len(trajectories)) - # get track width width = self._track_width # get center line center_linestring = LineString(self._center_coods) - for i in range(len(trajectories)): - trajectory = self.get_shapely_linestring(trajectories[i].poses) - - # distance to center line - # average_distance = self.get_average_distance_to_center_trajectory(trajectories[i], center_linestring, to_center_line) - # endpoint - P = trajectories[i].poses[-1].position - end_point = shapelyPoint(P.x,P.y) - average_distance = end_point.distance(center_linestring) + for i in range(len(trajectories)): # loop through each trajectory + # trajectory = self.get_shapely_linestring(trajectories[i].poses) # trajectory i as a line + P = trajectories[i].poses[-1].position # end point of trajectory i + end_point = shapelyPoint(P.x,P.y) # as a point + trajectory_distances[i] = end_point.distance(center_linestring) # distance to centerline - trajectory_distances[i] = average_distance - trajectory_lengths[i] = trajectory.length + return self.get_best_trajectory_index(trajectory_distances) - return self.get_best_trajectory_index(trajectory_lengths, trajectory_distances) - - def get_average_distance_to_center_trajectory(self, trajectory, center_linestring, toCenterLine=True): - # if distance to centerline (from trajecotry) or other way round - if toCenterLine: - num_points = len(trajectory.poses) - else: - num_points = len(center_linestring) - - total_distance = np.zeros(num_points) - - for j in range(num_points): - if toCenterLine: - ps = trajectory.poses[j] - point = self.get_shapely_point(ps.position.x, ps.position.y) - total_distance[j] = point.distance(center_linestring) - else: - ps = center_linestring[j] - point = self.get_shapely_point(ps[0], ps[1]) - # TRAJECTORY MUST BE A LINESTRING NOT A POSEARRAY - total_distance[j] = point.distance(trajectory) - - return np.mean(total_distance) - - def get_shapely_point(self, x, y) -> shapelyPoint: - return shapelyPoint(x,y) - - def get_shapely_linestring(self, poses) -> LineString: - return LineString([(P.position.x, P.position.y) for P in poses]) - - def get_avg_point(self, x1, y1, x2, y2): - return [((x1 + x2)/2), ((y1 + y2)/2)] - - def get_geometry_distance(self, geometry1, geometry2): - '''calculates the eucledian distance bewteen two shapely geometries''' - # this is to calculate trajectory length using only poses/points (DECAP) - # poses = trajectory.poses - # p0 = np.array([poses[0].position.x, poses[0].position.y]) - # p1 = np.array([poses[1].position.x, poses[1].position.y]) - # intL = np.linalg.norm(p1-p0,ord=2) - # arc_length = (len(poses)-1) * intL - - return geometry1.distance(geometry2) - # return frechet_distance(geometry1, geometry2) - - def get_best_trajectory_index(self, trajectory_lengths, trajectory_distances): + def get_best_trajectory_index(self, trajectory_distances): + idx = None try: objective_function = trajectory_distances idx = int(np.ceil(np.argmin(objective_function))) self.best_trajectory_index.publish(Int16(data=idx)) - return idx except ValueError: - return None + self.get_logger().info("error calculating objective value") + + return idx + +# FUNCTIONS/CODE NOT NEEDED ATM BUT MAYBE IN THE FUTURE + # def get_left_boundary(self, rightboundary, track_width): + # angle = -90 * np.pi / 180 + # leftboundary = [] + # # loop through all left boundary points + # for i in range(len(rightboundary)): + # P = rightboundary[i] + # # check what point is + # if i == 0: + # # forward point + # vector = self.get_vector(rightboundary[i], rightboundary[i+1], True) + # elif i == len(rightboundary)-1: + # # backward point + # vector = self.get_vector(rightboundary[i-1], rightboundary[i], True) + # else: + # # centeral points + # vector = self.get_vector(rightboundary[i-1], rightboundary[i+1], True) + + # # rotate vector + # vector = self.get_rotated_vector(angle,vector) + # # compute new point + # new_point = P + track_width * vector + # # append new point + # leftboundary.append(new_point) + + # return leftboundary + + # def get_local_boundary(self, position_orientation, car_position, leftboundary, rightboundary): + # position_vector, rotation_matrix = self.get_transformation_matrix(position_orientation) + # x1, y1 = car_position + # # output = self.apply_transformation(position_vector, rotation_matrix, x1, y1) - position_vector + # dist = [0]*len(leftboundary) + # dist2 = [0]*len(rightboundary) + # for i, P in enumerate(leftboundary): + # x2, y2 = P + # # output = self.apply_transformation(position_vector, rotation_matrix, x2, y2) - position_vector + # # y2 = output[1][0] + # dist[i] = self.get_distance(x1, y1, x2, y2) + # # self.get_logger().info(f"dist = {ydist[i]}") + # for i, P in enumerate(rightboundary): + # x2, y2 = P + # dist2[i] = self.get_distance(x1, y1, x2, y2) + + # start_index = max(np.argmin(np.array(dist)), np.argmin(np.array(dist2))) + + # return start_index + + # def interpolate_boundary(self, leftboundary, rightboundary): + # # interpolate + # lx = [P[0] for P in leftboundary] + # ly = [P[1] for P in leftboundary] + # rx = [P[0] for P in rightboundary] + + # ry = [P[1] for P in rightboundary] + # funcL = interpolate.interp1d(lx, ly, kind='quadratic') + # funcR = interpolate.interp1d(rx, ry, kind='quadratic') + + # xlrange = np.linspace(min(lx), max(lx)) + # xrrange = np.linspace(min(rx), max(rx)) + # self.get_logger().info(f"length of xlrange = {len(xlrange)}") + # lb = [] + # rb = [] + # for P in xlrange: + # lb.append([P,funcL(P)]) + + # for P in xrrange: + # rb.append([P,funcR(P)]) + + # return lb, rb + + # def get_vector(self, p1, p2, unit=True): + # '''calculates point vector based on slope''' + # vector = np.array(p2)-np.array(p1) + # if unit: + # return vector / self.get_magnitude(vector) - def get_position_of_cart(self, cone_map): - # first cone - localization_data = cone_map.cones[0] - x = localization_data.pose.pose.position.x - y = localization_data.pose.pose.position.y - theta = localization_data.pose.pose.orientation.w - return x, y, theta - - def get_transformation_matrix(self, position_and_orientation): - # theta = position_and_orientation[2] - np.pi/2 - cart_x = position_and_orientation[0] - cart_y = position_and_orientation[1] - theta = position_and_orientation[2] - # 2d trasformation matrix - rotation_matrix = np.array([[np.cos(theta), -np.sin(theta)],[np.sin(theta), np.cos(theta)]]) - position_vector = np.array([[cart_x], [cart_y]]) - - return position_vector, rotation_matrix - - def apply_transformation(self, position_vector, rotation_matrix, point_x, point_y): - point = np.array([[point_x], [point_y]]) - # matrix multiplication for rotation then translate from car position - transformed_point = np.matmul(rotation_matrix, point) + position_vector - return transformed_point + # return vector + + # def get_rotated_vector(self, angle, vector): + # # rotation matrix + # rotate = np.array([[np.cos(angle), -np.sin(angle)], [np.sin(angle), np.cos(angle)]]) + # return np.dot(rotate,np.array(vector)) + + # def get_magnitude(self, vector): + # return np.sqrt(sum(vector**2)) + + # def get_position_of_cart(self, cone_map): + # # first cone + # localization_data = cone_map.cones[0] + # x = localization_data.pose.pose.position.x + # y = localization_data.pose.pose.position.y + # theta = localization_data.pose.pose.orientation.w + # return x, y, theta + + # def get_transformation_matrix(self, position_and_orientation): + # # theta = position_and_orientation[2] - np.pi/2 + # cart_x = position_and_orientation[0] + # cart_y = position_and_orientation[1] + # theta = position_and_orientation[2] + # # 2d trasformation matrix + # rotation_matrix = np.array([[np.cos(theta), -np.sin(theta)],[np.sin(theta), np.cos(theta)]]) + # position_vector = np.array([[cart_x], [cart_y]]) + + # return position_vector, rotation_matrix + + # def apply_transformation(self, position_vector, rotation_matrix, point_x, point_y): + # point = np.array([[point_x], [point_y]]) + # # matrix multiplication for rotation then translate from car position + # transformed_point = np.matmul(rotation_matrix, point) + position_vector + # return transformed_point def main(): rclpy.init() From 0080cff60ff6427c144573a8eaec7d4d869ea959 Mon Sep 17 00:00:00 2001 From: Tanish Bhatt Date: Wed, 5 Jun 2024 22:07:08 +1200 Subject: [PATCH 4/4] center line code cleanedup - works need smoother centerline --- .../path_planning/boundaries_drawn.png | Bin 0 -> 2396 bytes .../path_planning/trajectory_generation.py | 6 +- .../trajectory_optimisation_CS.py | 266 +++++++++--------- .../visualise_trajectories.py | 2 +- 4 files changed, 140 insertions(+), 134 deletions(-) create mode 100644 src/planning/path_planning/path_planning/boundaries_drawn.png diff --git a/src/planning/path_planning/path_planning/boundaries_drawn.png b/src/planning/path_planning/path_planning/boundaries_drawn.png new file mode 100644 index 0000000000000000000000000000000000000000..8722e47213f249d39d568626335c2b1699cbf0fa GIT binary patch literal 2396 zcmeAS@N?(olHy`uVBq!ia0y~yU}|7sV0^&A1{5*9c;^X_vMh0pC<)F_D=AMbN@eg( zEGfvzFUiSFQYcF;D$dN$GuE@vGuBbaC@Co@w$j(ng)7j@FG|-x{wyLGXb5M4M`SSr z1Aih2Gp?{-p2@(#anjSpF{EP7+iQl53<^943=Zsn{*b9pjjO8c{23hv2BUNDfjaLQ zH83z3vI#IK9AjW&I3nS|&>+Fg!61;#z{rrqqrl+M!^pzWVK}O2Gzdmh!DvPpEel4A j!_gXHv{IyNyFgE^l6}we<(ra#?G*-3S3j3^P6 None: self._state_msg = msg - def set_generated_trajectories(self, msg: AllTrajectories) -> None: self._trajectories_msg = msg + def set_generated_trajectories(self, msg: AllTrajectories) -> None: + self._trajectories_msg = msg + self.get_logger().info(f"traj len = {len(msg.trajectories)}") def callback(self, msg:ConeMap) -> None: '''retrives cone msg and find trajectory closest to centerline''' - self.get_logger().info(f"all states: {hasattr(self,'_state_msg')}" - f" | current speed: {hasattr(self,'_current_speed')}" \ - f" | left boundaries: {hasattr(self,'_leftboundary')}"\ - f" | right boundaries: {hasattr(self,'_rightboundary')}") - - cones = msg.cones # get cones - # boundary list - leftboundary = [] - rightboundary = [] - for i in range(len(cones)): - x = cones[i].pose.pose.position.x # x point - y = cones[i].pose.pose.position.y # y point - if i != 0: - if cones[i].colour == 0: # 0 is blue which is left - leftboundary.append([x,y]) - elif cones[i].colour == 2: # 2 is yellow which is right - rightboundary.append([x,y]) - else: - car_position = [x,y] # first cone is car position - + if hasattr(self, "_state_msg") and hasattr(self, "_trajectories_msg"): + self.get_logger().info(f"all states: {hasattr(self,'_state_msg')}" + f" | current speed: {hasattr(self,'_current_speed')}" \ + f" | left boundaries: {hasattr(self,'_leftboundary')}"\ + f" | right boundaries: {hasattr(self,'_rightboundary')}") + leftboundary, rightboundary, car_position = self.get_boundaries(msg.cones) # get boundaries + # get local boundary points # position_orientation = self.get_position_of_cart(msg) # starting_index = self.get_local_boundary(position_orientation, car_position, leftboundary, rightboundary) @@ -84,83 +74,101 @@ def callback(self, msg:ConeMap) -> None: # leftboundary, rightboundary = self.interpolate_boundary(leftboundary, rightboundary) # get center line - center_coordinates = self.get_center_line(leftboundary, rightboundary) + centerline = self.get_center_line(leftboundary, rightboundary) # x1, y1, x2, y2 = leftboundary[0][0], leftboundary[0][1], rightboundary[0][0], rightboundary[0][1] - # track_width = self.get_distance(x1, y1, x2, y2) # adjust boundaries - # self._leftboundary, self._rightboundary = self.get_adjusted_boundaries(leftboundary, rightboundary) self._leftboundary = leftboundary self._rightboundary = rightboundary - # self._track_width = track_width - self._center_coods = center_coordinates + self._centerline = centerline - if self._state_msg.id == self._trajectories_msg.id: # check if the steering angle (states) and generate trajectories are the same + if self._state_msg.id == self._trajectories_msg.id: # check if the steering angle (states) and generated trajectories are the same states = [ackerman_msg.steering_angle for ackerman_msg in self._state_msg.states] - trajectories = self._trajectories_msg + trajectories = self._trajectories_msg.trajectories.copy() # COPY THIS! else errors will pop up like centerline added in trajectories + - self.get_logger().info(f"trajectories before deletion = {len(trajectories)}") - self.trajectory_deletion(trajectories, states) # delete invalid trajectories - self.get_logger().info(f"number of paths after deletion = {len(trajectories)}") + if self._delete: + self.get_logger().info(f"trajectories before deletion = {len(trajectories)}") + self.trajectory_deletion(trajectories, states) # delete invalid trajectories + self.get_logger().info(f"number of paths after deletion = {len(trajectories)}") - best_trajectory_idx = self.optimisation(trajectories, states) # find best/optimised trajectory + best_trajectory_idx = self.optimisation(trajectories) # find best/optimised trajectory + if best_trajectory_idx == None: # check if no trajectory found self.get_logger().info("no valid trajectories found") else: + self.get_logger().info(f"best indx = {best_trajectory_idx}") self.get_logger().info(f"best steering angle is = {states[best_trajectory_idx]}") # publish best trajectory - args = {"header": Header(stamp=Time(sec=0,nanosec=0), frame_id='best_trajectory'), + args1 = {"header": Header(stamp=Time(sec=0,nanosec=0), frame_id='path_optimisation'), "poses": trajectories[best_trajectory_idx].poses} - posearray_msg = PoseArray(**args) - # publish valid (within boundaries) trajectories including center line - ps = [Pose(position=Point(x=P[0], y=P[1], z=0.0)) for P in self._center_coods] + ps = [Pose(position=Point(x=P[0], y=P[1], z=0.0)) for P in centerline] trajectories.append(PoseArray(poses=ps)) - alltrajectories_msg = {"id": msg.id, "trajectories": trajectories} + args2 = {"id": self._trajectories_msg.id, "trajectories": trajectories} + args3 = {"data": states[best_trajectory_idx]} # float32 msg # publish msgs - self.within_boundary_trajectories_publisher.publish(AllTrajectories(**alltrajectories_msg)) # within bound pub - self.best_trajectory_publisher.publish(posearray_msg) # best trajectory pub - self.best_steering_angle_pub.publish(Float32(data=states[best_trajectory_idx])) # optimal steering angle pub + self.best_trajectory_publisher.publish(PoseArray(**args1)) # best trajectory pub + self.within_boundary_trajectories_publisher.publish(AllTrajectories(**args2)) # within bound pub + self.best_steering_angle_pub.publish(Float32(**args3)) # optimal steering angle pub self.get_logger().info("OPTIMAL TRAJECTORY COMPUTED") else: self.get_logger().info(f"Ids state:{self._state_msg.id} and trajectory:{self._trajectories_msg.id} do not match") - # plot track if self._once: # plot - plt.plot([P[0] for P in leftboundary], [P[1] for P in leftboundary], "-g", label='leftboundary') - plt.plot([P[0] for P in rightboundary], [P[1] for P in rightboundary], "-r", label='rightboundary') - plt.plot([P[0] for P in center_coordinates], [P[1] for P in center_coordinates], "ob", label='centerline') + plt.plot([P[0] for P in leftboundary], [P[1] for P in leftboundary], "ob", label='leftboundary') + plt.plot([P[0] for P in rightboundary], [P[1] for P in rightboundary], "oy", label='rightboundary') + plt.plot([P[0] for P in centerline], [P[1] for P in centerline], "ok", label='centerline') plt.plot([car_position[0]],[car_position[1]],'or', label='car position') plt.legend() plt.show() self._once = False - + + def get_boundaries(self, cones): + leftboundary = [] + rightboundary = [] + for i in range(len(cones)): + x = cones[i].pose.pose.position.x # x point + y = cones[i].pose.pose.position.y # y point + if i != 0: + if cones[i].colour == 0: # 0 is blue which is left + leftboundary.append([x,y]) + elif cones[i].colour == 2: # 2 is yellow which is right + rightboundary.append([x,y]) + else: + car_position = [x,y] # first cone is car position + + return leftboundary, rightboundary, car_position + + def get_center_line(self, leftboundary, rightboundary): '''approximates the track's center line''' # the midpoint is the average of the coordinates coods = [] - x = [] - y = [] + xps = [] + yps = [] num_cones = min(len(leftboundary), len(rightboundary)) for i in range(num_cones): x1, y1 = leftboundary[i] x2, y2 = rightboundary[i] - tmp = self.get_avg_point(x1,y1,x2,y2) - x.append(tmp[0]) - y.append(tmp[1]) + x, y = self.get_avg_point(x1,y1,x2,y2) + xps.append(x) + yps.append(y) + coods.append([x,y]) # interpolate center line - fun = interpolate.interp1d(x, y, kind='quadratic') - xrange = np.linspace(min(x), max(x)) - for X in xrange: - coods.append([X, float(fun(X))]) + if False: + fun = interpolate.interp1d(x, y, kind='quadratic') + xrange = np.linspace(min(x), max(x)) + for X in xrange: + coods.append([X, float(fun(X))]) return coods @@ -171,8 +179,7 @@ def get_shapely_linestring(self, points) -> LineString: def get_avg_point(self, x1, y1, x2, y2): return [((x1 + x2)/2), ((y1 + y2)/2)] - # =========================================================== - # TRAJECTORY DELETION + ''' ---------------------------------- DELETION ---------------------------------- ''' def trajectory_deletion(self, trajectories, states): '''Caller for trajectory deletion if there are trajectories''' @@ -200,44 +207,40 @@ def set_within_boundary_trajectories(self, trajectories, states): left_boundary_linestring = self.get_shapely_linestring(self._leftboundary) right_boundary_linestring = self.get_shapely_linestring(self._rightboundary) - # track width - track_width = self._track_width - for i in range(len(trajectories)): # loop each trajectory trajectory = self.get_shapely_linestring([[P.position.x, P.position.y] for P in trajectories[i].poses]) + num_poses = len(trajectories[i].poses) - tmp = None + # check trajectory intersection + ips = None # ips = intersection points if trajectory.intersects(left_boundary_linestring): # left bound intersection # get intersection point/s - tmp = trajectory.intersection(left_boundary_linestring) + ips = trajectory.intersection(left_boundary_linestring) intersection = "left" elif trajectory.intersects(right_boundary_linestring): # right bound intersection - tmp = trajectory.intersection(right_boundary_linestring) + ips = trajectory.intersection(right_boundary_linestring) intersection = "right" - if tmp is not None: - # new pose list - list_of_poses = [] - if type(tmp) is MultiPoint: - tmp_x = tmp.centroid.x - tmp_y = tmp.centroid.y + if ips is not None: + remove_pose_indices = [] # poses to remove (index) + if type(ips) is MultiPoint: + tmp_x = ips.centroid.x + tmp_y = ips.centroid.y else: - tmp_x = tmp.x - tmp_y = tmp.y - # go through each pose - for j, P in enumerate(trajectories[i].poses): - # get distance between points - # distance = self.get_distance(x1=tmp_x, y1=tmp_y, x2=P.position.x, y2=P.position.y) - in_bounds = self.get_in_of_bounds(inter=intersection,x1=tmp_x,y1=tmp_y,x2=P.position.x,y2=P.position.y) - if in_bounds: - list_of_poses.append(P) + tmp_x = ips.x + tmp_y = ips.y + + for j, P in enumerate(trajectories[i].poses): # loop through trajectory i + if num_poses >= 2: + in_bounds = self.is_within_boundaries(inter=intersection,x1=tmp_x,y1=tmp_y,x2=P.position.x,y2=P.position.y) # get if pose outside bounds + if not in_bounds: + remove_pose_indices.append(j) # if pose outside else: - if len(list_of_poses) < 2: - remove_trajectories_indices.append(i) - else: - trajectories[i].poses = list_of_poses + remove_trajectories_indices.append(i) # remove trajectory and stop break - + + [trajectories[i].poses.pop(index) for index in list(reversed(remove_pose_indices))] # remove out of bound poses for trajectory i + # publish indicies print(f"removed indices = {remove_trajectories_indices}") self.out_of_bounds_indicies.publish(Int32MultiArray(data=remove_trajectories_indices)) @@ -248,46 +251,14 @@ def set_within_boundary_trajectories(self, trajectories, states): def get_distance(self,x1,y1,x2,y2): return np.sqrt(((x2-x1)**2 + (y2-y1)**2)) - def get_in_of_bounds(self,inter,x1,y1,x2,y2): + def is_within_boundaries(self,inter,x1,y1,x2,y2): if inter == "right": return x2x1 and y2 398: + print("stop here") self.best_trajectory_index.publish(Int16(data=idx)) except ValueError: self.get_logger().info("error calculating objective value") return idx - -# FUNCTIONS/CODE NOT NEEDED ATM BUT MAYBE IN THE FUTURE + ''' ---------------------------------- OPTIMISATION ---------------------------------- ''' + +''' DEPRECATED/UNUSED CODE ATM BUT MAYBE NEEDED IN THE FUTURE ''' # def get_left_boundary(self, rightboundary, track_width): # angle = -90 * np.pi / 180 # leftboundary = [] @@ -434,6 +406,38 @@ def get_best_trajectory_index(self, trajectory_distances): # transformed_point = np.matmul(rotation_matrix, point) + position_vector # return transformed_point + # def compare_with_boundary(self, x, y, tol): + # ''' + # Compares a coordinate with the left and right boundary coordinates of the track map + + # inputs + # x (float): x position of coordinate + # y (float): y position of coordinate + # return + # (boolean): True if point on either boundary + + # * Assumes same number of points are given for left and right boundary + # ''' + # lxyt = [] + # rxyt = [] + # for i in range(len(self.rightbound)): + # blx, bly = self.leftbound[i] + # brx, bry = self.rightbound[i] + # # on left boundary + # if i != 0: + # # lxyt = abs(x-blx) <= 1e-1 and abs(y-bly) <= 1e-1 + # lxyt.append(np.sqrt(((x-blx)**2 + (y-bly)**2))) + # else: + # lxyt.append(np.inf) + # # on right boundary + # # rxyt = abs(x-brx) <= 1e-1 and abs(y-bry) <= 1e-1 + # rxyt.append(np.sqrt(((x-brx)**2 + (y-bry)**2))) + # # check if near boundary + # if min(lxyt) <= tol or min(rxyt) <= tol: + # return True + # else: + # return False + def main(): rclpy.init() exe = SingleThreadedExecutor() diff --git a/src/visualization/path_planning_visualization/path_planning_visualization/visualise_trajectories.py b/src/visualization/path_planning_visualization/path_planning_visualization/visualise_trajectories.py index ac9215f..c1dc31e 100644 --- a/src/visualization/path_planning_visualization/path_planning_visualization/visualise_trajectories.py +++ b/src/visualization/path_planning_visualization/path_planning_visualization/visualise_trajectories.py @@ -39,7 +39,7 @@ def set_inbound_trajectories(self, msg: AllTrajectories) -> None: # self.invalid_bounds_indicies = msg.data def show_paths(self, msg: AllTrajectories): - if hasattr(self,"chosen_trajectory"): + if hasattr(self,"chosen_trajectory") and hasattr(self, "inbounds"): line_list = [] paths = msg.trajectories paths.append(self.inbounds.trajectories[-1]) # append center line