Skip to content

Commit

Permalink
refactor: comment and clean up ppc
Browse files Browse the repository at this point in the history
  • Loading branch information
akevinge committed Apr 24, 2024
1 parent e4d1270 commit c9d94f4
Showing 1 changed file with 80 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,20 +27,29 @@ class Constants:
MAX_THROTTLE = 0.6
# Max speed in m/s
MAX_SPEED: float = 1.5
# Path adjustment gain
PATH_ADJUSTMENT_GAIN: float = 0.5


class VehicleState:
def __init__(self, l):
self.l = l
"""Vehicle state class that holds the current pose and velocity of the vehicle."""

def __init__(self):
self.pose: Pose | None = None
self.velocity: float | None = None

def loaded(self) -> bool:
"""Check if the vehicle state is loaded with pose and velocity.
Returns:
bool: True if pose and velocity are not None, False otherwise.
"""
return self.pose is not None and self.velocity is not None

def calc_throttle_brake(self) -> tuple[float, float] | None:
"""Calculate throttle and brake based on the current velocity.
Returns:
tuple[float, float] | None: Throttle and brake values (None if vehicle state is not fulled loaded).
"""
if not self.loaded():
return

Expand All @@ -56,6 +65,14 @@ def calc_throttle_brake(self) -> tuple[float, float] | None:
return throttle, brake

def calc_steer(self, target: tuple[float, float]) -> float | None:
"""Calculate the steering angle based on the target point.
Args:
target (tuple[float, float]): Target point (x, y) to steer towards.
Returns:
float | None: Steering angle in the range [-1, 1] (None if vehicle state is not fulled loaded).
"""
if not self.loaded():
return

Expand All @@ -68,23 +85,40 @@ def calc_steer(self, target: tuple[float, float]) -> float | None:
return -delta / (math.pi / 2)

def lookahead_distance(self) -> float:
"""Calculate the lookahead distance based on the current velocity.
Returns:
float: Lookahead distance in meters.
"""
return Constants.kf * self.velocity + Constants.LD


class PursuitPath:
def __init__(self, l, path: list[tuple[float, float]] = []):
self.l = l
"""Pursuit path class that holds the path and calculates the target point for the vehicle.
Args:
path (list[tuple[float, float]], optional): List of points (x, y) that represent the path. Defaults to [].
"""

def __init__(self, path: list[tuple[float, float]] = []):
self.path: npt.NDArray[np.float64] = np.asarray(path)
self.prev_index: int | None = None
self.target_index: int | None = None

def set_path(self, path: list[tuple[float, float]]) -> None:
"""Set the path.
Args:
path (list[tuple[float, float]]): List of points (x, y) that represent the path.
"""
self.path = np.asarray(path)
self.prev_index = None
self.target_index = None

def _calc_nearest_index(self, vs: VehicleState) -> int:
if len(self.path) == 0 or not vs.loaded():
return
def _calc_nearest_index(self) -> int:
"""Calculate the nearest index of the path to the vehicle's current position.
Returns:
int: Nearest index of the path to the vehicle's current position.
"""
min_dist = float("inf")
min_index = 0
for i, (x, y) in enumerate(self.path):
Expand All @@ -96,29 +130,43 @@ def _calc_nearest_index(self, vs: VehicleState) -> int:
return min_index

def get_current_path(self) -> npt.NDArray[np.float64]:
return np.copy(self.path[: self.prev_index])
"""Get the current path up to and including the target point."""
target_index = self.target_index + 1 if self.target_index is not None else 0
return np.copy(self.path[:target_index])

def calc_target_point(self, vs: VehicleState) -> tuple[float, float] | None:
"""Calculate the target point based on the vehicle state.
Args:
vs (VehicleState): Vehicle state object that holds the current pose and velocity of the vehicle.
Returns:
tuple[float, float] | None: Target point (x, y) to steer towards (None if path is empty or vehicle state is not fulled loaded).
"""
if len(self.path) == 0 or not vs.loaded():
return

if self.prev_index is None:
self.prev_index = self._calc_nearest_index(vs)
# If calculating the target point for the first time, find the nearest index to the vehicle's current position.
if self.target_index is None:
self.target_index = self._calc_nearest_index()

for i, (x, y) in enumerate(self.path[self.prev_index :]):
# Find the first point that is further than the lookahead distance.
# If looking at the same path, start from the last target index.
for i, (x, y) in enumerate(self.path[self.target_index :]):
dist = math.hypot(x, y)
if dist > vs.lookahead_distance():
self.prev_index = i
self.target_index = i
return (x, y)


class PursePursuitController(Node):
"""Pure pursuit controller node that publishes vehicle control commands based on the pure pursuit algorithm."""

def __init__(self):
super().__init__("pure_pursuit_controler")

self.vehicle_state = VehicleState(l=self.get_logger())
self.path = PursuitPath(l=self.get_logger())
self.vehicle_state = VehicleState()
self.path = PursuitPath()
self.target_waypoint = None
self.clock = Clock().clock

Expand All @@ -138,7 +186,6 @@ def __init__(self):
self.command_publisher = self.create_publisher(
VehicleControl, "/vehicle/control", 1
)

self.barrier_marker_pub = self.create_publisher(
Marker, "/planning/barrier_marker", 1
)
Expand All @@ -153,21 +200,27 @@ def __init__(self):
def clock_callback(self, msg: Clock):
self.clock = msg.clock

def odometry_callback(self, msg: Odometry):
self.vehicle_state.pose = msg.pose.pose

def speed_callback(self, msg: VehicleSpeed):
self.vehicle_state.velocity = msg.speed

def route_callback(self, msg: Path):
"""Callback function for the path subscriber.
Recalculates the path and target waypoint based on the received path message.
"""
path = [
(pose_stamped.pose.position.x, pose_stamped.pose.position.y)
for pose_stamped in msg.poses
]
self.path.set_path(path)
self.target_waypoint = self.path.calc_target_point(self.vehicle_state)

def odometry_callback(self, msg: Odometry):
self.vehicle_state.pose = msg.pose.pose

def speed_callback(self, msg: VehicleSpeed):
self.vehicle_state.velocity = msg.speed

def control_callback(self):
"""Calculate and publish the vehicle control commands based on the pure pursuit algorithm."""

# If no target waypoint, do nothing.
if not self.target_waypoint:
return
Expand All @@ -194,6 +247,7 @@ def control_callback(self):
self.command_publisher.publish(control_msg)

def visualize_waypoint_callback(self):
"""Visualize the target waypoint in RVIZ."""
if self.target_waypoint is None:
return

Expand Down Expand Up @@ -225,6 +279,7 @@ def visualize_waypoint_callback(self):
self.barrier_marker_pub.publish(arrow_marker)

def visualize_path_callback(self):
"""Visualize the path in RVIZ."""
current_path = self.path.get_current_path()

if len(current_path) == 0:
Expand Down

0 comments on commit c9d94f4

Please sign in to comment.