diff --git a/crazyflie_sim/CMakeLists.txt b/crazyflie_sim/CMakeLists.txt index 61965fed1..e144c2c35 100644 --- a/crazyflie_sim/CMakeLists.txt +++ b/crazyflie_sim/CMakeLists.txt @@ -16,4 +16,20 @@ ament_python_install_package(${PROJECT_NAME} SCRIPTS_DESTINATION lib/${PROJECT_N # DESTINATION share/${PROJECT_NAME}/ # ) +if(BUILD_TESTING) + find_package(ament_cmake_pytest REQUIRED) + set(_pytest_tests + test/test_flake8.py + test/test_pep257.py + ) + foreach(_test_path ${_pytest_tests}) + get_filename_component(_test_name ${_test_path} NAME_WE) + ament_add_pytest_test(${_test_name} ${_test_path} + APPEND_ENV PYTHONPATH=${CMAKE_CURRENT_BINARY_DIR} + TIMEOUT 60 + WORKING_DIRECTORY ${CMAKE_SOURCE_DIR} + ) + endforeach() +endif() + ament_package() diff --git a/crazyflie_sim/crazyflie_sim/backend/none.py b/crazyflie_sim/crazyflie_sim/backend/none.py index c4bcf08a1..48b4c4bda 100644 --- a/crazyflie_sim/crazyflie_sim/backend/none.py +++ b/crazyflie_sim/crazyflie_sim/backend/none.py @@ -1,13 +1,14 @@ from __future__ import annotations from rclpy.node import Node -from rosgraph_msgs.msg import Clock from rclpy.time import Time -from ..sim_data_types import State, Action +from rosgraph_msgs.msg import Clock + +from ..sim_data_types import Action, State class Backend: - """Tracks the desired state perfectly (no physics simulation)""" + """Tracks the desired state perfectly (no physics simulation).""" def __init__(self, node: Node, names: list[str], states: list[State]): self.node = node @@ -33,4 +34,3 @@ def step(self, states_desired: list[State], actions: list[Action]) -> list[State def shutdown(self): pass - diff --git a/crazyflie_sim/crazyflie_sim/backend/np.py b/crazyflie_sim/crazyflie_sim/backend/np.py index eb2e43eb9..26e0c4cc3 100644 --- a/crazyflie_sim/crazyflie_sim/backend/np.py +++ b/crazyflie_sim/crazyflie_sim/backend/np.py @@ -1,16 +1,16 @@ from __future__ import annotations +import numpy as np from rclpy.node import Node -from rosgraph_msgs.msg import Clock from rclpy.time import Time -from ..sim_data_types import State, Action +from rosgraph_msgs.msg import Clock +import rowan +from ..sim_data_types import Action, State -import numpy as np -import rowan class Backend: - """Backend that uses newton-euler rigid-body dynamics implemented in numpy""" + """Backend that uses newton-euler rigid-body dynamics implemented in numpy.""" def __init__(self, node: Node, names: list[str], states: list[State]): self.node = node @@ -50,11 +50,11 @@ def shutdown(self): class Quadrotor: - """Basic rigid body quadrotor model (no drag) using numpy and rowan""" + """Basic rigid body quadrotor model (no drag) using numpy and rowan.""" def __init__(self, state): # parameters (Crazyflie 2.0 quadrotor) - self.mass = 0.034 # kg + self.mass = 0.034 # kg # self.J = np.array([ # [16.56,0.83,0.71], # [0.83,16.66,1.8], @@ -63,21 +63,21 @@ def __init__(self, state): self.J = np.array([16.571710e-6, 16.655602e-6, 29.261652e-6]) # Note: we assume here that our control is forces - arm_length = 0.046 # m + arm_length = 0.046 # m arm = 0.707106781 * arm_length - t2t = 0.006 # thrust-to-torque ratio + t2t = 0.006 # thrust-to-torque ratio self.B0 = np.array([ [1, 1, 1, 1], [-arm, -arm, arm, arm], [-arm, arm, arm, -arm], [-t2t, t2t, -t2t, t2t] ]) - self.g = 9.81 # not signed + self.g = 9.81 # not signed - if self.J.shape == (3,3): - self.inv_J = np.linalg.pinv(self.J) # full matrix -> pseudo inverse + if self.J.shape == (3, 3): + self.inv_J = np.linalg.pinv(self.J) # full matrix -> pseudo inverse else: - self.inv_J = 1 / self.J # diagonal matrix -> division + self.inv_J = 1 / self.J # diagonal matrix -> division self.state = state @@ -95,23 +95,28 @@ def rpm_to_force(rpm): # compute next state eta = np.dot(self.B0, force) - f_u = np.array([0,0,eta[0]]) - tau_u = np.array([eta[1],eta[2],eta[3]]) + f_u = np.array([0, 0, eta[0]]) + tau_u = np.array([eta[1], eta[2], eta[3]]) - # dynamics - # dot{p} = v + # dynamics + # dot{p} = v pos_next = self.state.pos + self.state.vel * dt - # mv = mg + R f_u - vel_next = self.state.vel + (np.array([0,0,-self.g]) + rowan.rotate(self.state.quat,f_u) / self.mass) * dt + # mv = mg + R f_u + vel_next = self.state.vel + ( + np.array([0, 0, -self.g]) + + rowan.rotate(self.state.quat, f_u) / self.mass) * dt # dot{R} = R S(w) # to integrate the dynamics, see # https://www.ashwinnarayan.com/post/how-to-integrate-quaternions/, and # https://arxiv.org/pdf/1604.08139.pdf - q_next = rowan.normalize(rowan.calculus.integrate(self.state.quat, self.state.omega, dt)) + q_next = rowan.normalize( + rowan.calculus.integrate( + self.state.quat, self.state.omega, dt)) - # mJ = Jw x w + tau_u - omega_next = self.state.omega + (self.inv_J * (np.cross(self.J * self.state.omega, self.state.omega) + tau_u)) * dt + # mJ = Jw x w + tau_u + omega_next = self.state.omega + ( + self.inv_J * (np.cross(self.J * self.state.omega, self.state.omega) + tau_u)) * dt self.state.pos = pos_next self.state.vel = vel_next @@ -121,5 +126,5 @@ def rpm_to_force(rpm): # if we fall below the ground, set velocities to 0 if self.state.pos[2] < 0: self.state.pos[2] = 0 - self.state.vel = [0,0,0] - self.state.omega = [0,0,0] + self.state.vel = [0, 0, 0] + self.state.omega = [0, 0, 0] diff --git a/crazyflie_sim/crazyflie_sim/crazyflie_server.py b/crazyflie_sim/crazyflie_sim/crazyflie_server.py index 0ba75f78f..8c3364e83 100755 --- a/crazyflie_sim/crazyflie_sim/crazyflie_server.py +++ b/crazyflie_sim/crazyflie_sim/crazyflie_server.py @@ -3,79 +3,84 @@ """ A crazyflie server for simulation. - 2022 - Wolfgang Hönig (TU Berlin) """ +from functools import partial +import importlib + +from crazyflie_interfaces.msg import FullState, Hover +from crazyflie_interfaces.srv import GoTo, Land, Takeoff +from crazyflie_interfaces.srv import NotifySetpointsStop, StartTrajectory, UploadTrajectory +from geometry_msgs.msg import Twist import rclpy from rclpy.node import Node import rowan -import importlib - -from crazyflie_interfaces.srv import Takeoff, Land, GoTo -from crazyflie_interfaces.srv import UploadTrajectory, StartTrajectory, NotifySetpointsStop -from crazyflie_interfaces.msg import Hover, FullState - from std_srvs.srv import Empty -from geometry_msgs.msg import Twist -from functools import partial # import BackendRviz from .backend_rviz # from .backend import * # from .backend.none import BackendNone from .crazyflie_sil import CrazyflieSIL, TrajectoryPolynomialPiece -from .sim_data_types import State, Action +from .sim_data_types import State class CrazyflieServer(Node): + def __init__(self): super().__init__( - "crazyflie_server", + 'crazyflie_server', allow_undeclared_parameters=True, automatically_declare_parameters_from_overrides=True, ) # Turn ROS parameters into a dictionary self._ros_parameters = self._param_to_dict(self._parameters) - self.cfs = dict() - - self.world_tf_name = "world" + self.cfs = {} + + self.world_tf_name = 'world' try: - self.world_tf_name = self._ros_parameters["world_tf_name"] + self.world_tf_name = self._ros_parameters['world_tf_name'] except KeyError: pass - robot_data = self._ros_parameters["robots"] + robot_data = self._ros_parameters['robots'] # Parse robots names = [] initial_states = [] for cfname in robot_data: - if robot_data[cfname]["enabled"]: - type_cf = robot_data[cfname]["type"] + if robot_data[cfname]['enabled']: + type_cf = robot_data[cfname]['type'] # do not include virtual objects - connection = self._ros_parameters['robot_types'][type_cf].get("connection", "crazyflie") - if connection == "crazyflie": + connection = self._ros_parameters['robot_types'][type_cf].get( + 'connection', 'crazyflie') + if connection == 'crazyflie': names.append(cfname) - pos = robot_data[cfname]["initial_position"] + pos = robot_data[cfname]['initial_position'] initial_states.append(State(pos)) # initialize backend by dynamically loading the module - backend_name = self._ros_parameters["sim"]["backend"] - module = importlib.import_module(".backend." + backend_name, package="crazyflie_sim") - class_ = getattr(module, "Backend") + backend_name = self._ros_parameters['sim']['backend'] + module = importlib.import_module('.backend.' + backend_name, package='crazyflie_sim') + class_ = getattr(module, 'Backend') self.backend = class_(self, names, initial_states) # initialize visualizations by dynamically loading the modules self.visualizations = [] - for vis_key in self._ros_parameters["sim"]["visualizations"]: - if self._ros_parameters["sim"]["visualizations"][vis_key]["enabled"]: - module = importlib.import_module(".visualization." + str(vis_key), package="crazyflie_sim") - class_ = getattr(module, "Visualization") - vis = class_(self, self._ros_parameters["sim"]["visualizations"][vis_key], names, initial_states) + for vis_key in self._ros_parameters['sim']['visualizations']: + if self._ros_parameters['sim']['visualizations'][vis_key]['enabled']: + module = importlib.import_module('.visualization.' + + str(vis_key), + package='crazyflie_sim') + class_ = getattr(module, 'Visualization') + vis = class_(self, + self._ros_parameters['sim']['visualizations'][vis_key], + names, + initial_states) self.visualizations.append(vis) - controller_name = backend_name = self._ros_parameters["sim"]["controller"] + controller_name = backend_name = self._ros_parameters['sim']['controller'] # create robot SIL objects for name, initial_state in zip(names, initial_states): @@ -86,51 +91,72 @@ def __init__(self): self.backend.time) # Create services for the entire swarm and each individual crazyflie - self.create_service(Empty, "all/emergency", self._emergency_callback) - self.create_service(Takeoff, "all/takeoff", self._takeoff_callback) - self.create_service(Land, "all/land", self._land_callback) - self.create_service(GoTo, "all/go_to", self._go_to_callback) - self.create_service(StartTrajectory, "all/start_trajectory", self._start_trajectory_callback) + self.create_service(Empty, 'all/emergency', self._emergency_callback) + self.create_service(Takeoff, 'all/takeoff', self._takeoff_callback) + self.create_service(Land, 'all/land', self._land_callback) + self.create_service(GoTo, 'all/go_to', self._go_to_callback) + self.create_service(StartTrajectory, + 'all/start_trajectory', + self._start_trajectory_callback) for name, _ in self.cfs.items(): self.create_service( - Empty, name + - "/emergency", partial(self._emergency_callback, name=name) + Empty, + name + '/emergency', + partial(self._emergency_callback, name=name) ) self.create_service( - Takeoff, name + - "/takeoff", partial(self._takeoff_callback, name=name) + Takeoff, + name + '/takeoff', + partial(self._takeoff_callback, name=name) ) self.create_service( - Land, name + "/land", partial(self._land_callback, name=name) + Land, + name + '/land', + partial(self._land_callback, name=name) ) self.create_service( - GoTo, name + "/go_to", partial(self._go_to_callback, name=name) + GoTo, + name + '/go_to', + partial(self._go_to_callback, name=name) ) self.create_service( - StartTrajectory, name + "/start_trajectory", partial(self._start_trajectory_callback, name=name) + StartTrajectory, + name + '/start_trajectory', + partial(self._start_trajectory_callback, name=name) ) self.create_service( - UploadTrajectory, name + "/upload_trajectory", partial(self._upload_trajectory_callback, name=name) + UploadTrajectory, + name + '/upload_trajectory', + partial(self._upload_trajectory_callback, name=name) ) self.create_service( - NotifySetpointsStop, name + "/notify_setpoints_stop", partial(self._notify_setpoints_stop_callback, name=name) + NotifySetpointsStop, + name + '/notify_setpoints_stop', + partial(self._notify_setpoints_stop_callback, name=name) ) self.create_subscription( - Twist, name + - "/cmd_vel_legacy", partial(self._cmd_vel_legacy_changed, name=name), 10 + Twist, + name + '/cmd_vel_legacy', + partial(self._cmd_vel_legacy_changed, name=name), + 10 ) self.create_subscription( - Hover, name + - "/cmd_hover", partial(self._cmd_hover_changed, name=name), 10 + Hover, + name + '/cmd_hover', + partial(self._cmd_hover_changed, name=name), + 10 ) self.create_subscription( - FullState, name + - "/cmd_full_state", partial(self._cmd_full_state_changed, name=name), 10 + FullState, + name + '/cmd_full_state', + partial(self._cmd_full_state_changed, name=name), + 10 ) # step as fast as possible - max_dt = 0.0 if "max_dt" not in self._ros_parameters["sim"] else self._ros_parameters["sim"]["max_dt"] + max_dt = 0.0 if 'max_dt' not in self._ros_parameters['sim'] \ + else self._ros_parameters['sim']['max_dt'] self.timer = self.create_timer(max_dt, self._timer_callback) self.is_shutdown = False @@ -147,7 +173,7 @@ def _timer_callback(self): states_desired = [cf.getSetpoint() for _, cf in self.cfs.items()] # execute the control loop - actions = [cf.executeController() for _, cf in self.cfs.items()] + actions = [cf.executeController() for _, cf in self.cfs.items()] # execute the physics simulator states_next = self.backend.step(states_desired, actions) @@ -160,9 +186,7 @@ def _timer_callback(self): vis.step(self.backend.time(), states_next, states_desired, actions) def _param_to_dict(self, param_ros): - """ - Turn ROS 2 parameters from the node into a dict - """ + """Turn ROS 2 parameters from the node into a dict.""" tree = {} for item in param_ros: t = tree @@ -173,58 +197,48 @@ def _param_to_dict(self, param_ros): t = t.setdefault(part, {}) return tree - def _emergency_callback(self, request, response, name="all"): - self.get_logger().info("emergency not yet implemented") + def _emergency_callback(self, request, response, name='all'): + self.get_logger().info('emergency not yet implemented') return response - def _takeoff_callback(self, request, response, name="all"): - """ - Service callback to take the crazyflie land to - a certain height in high level commander - """ - + def _takeoff_callback(self, request, response, name='all'): + """Service callback to takeoff the crazyflie.""" duration = float(request.duration.sec) + \ float(request.duration.nanosec / 1e9) self.get_logger().info( - f"takeoff(height={request.height} m," - + f"duration={duration} s," - + f"group_mask={request.group_mask}) {name}" + f'takeoff(height={request.height} m,' + + f'duration={duration} s,' + + f'group_mask={request.group_mask}) {name}' ) - cfs = self.cfs if name == "all" else {name: self.cfs[name]} + cfs = self.cfs if name == 'all' else {name: self.cfs[name]} for _, cf in cfs.items(): cf.takeoff(request.height, duration, request.group_mask) return response - def _land_callback(self, request, response, name="all"): - """ - Service callback to make the crazyflie land to - a certain height in high level commander - """ + def _land_callback(self, request, response, name='all'): + """Service callback to land the crazyflie.""" duration = float(request.duration.sec) + \ float(request.duration.nanosec / 1e9) self.get_logger().info( - f"land(height={request.height} m," - + f"duration={duration} s," - + f"group_mask={request.group_mask})" + f'land(height={request.height} m,' + + f'duration={duration} s,' + + f'group_mask={request.group_mask})' ) - cfs = self.cfs if name == "all" else {name: self.cfs[name]} + cfs = self.cfs if name == 'all' else {name: self.cfs[name]} for _, cf in cfs.items(): cf.land(request.height, duration, request.group_mask) return response - def _go_to_callback(self, request, response, name="all"): - """ - Service callback to have the crazyflie go to - a certain position in high level commander - """ + def _go_to_callback(self, request, response, name='all'): + """Service callback to have the crazyflie go to a position.""" duration = float(request.duration.sec) + \ float(request.duration.nanosec / 1e9) self.get_logger().info( - "go_to(position=%f,%f,%f m, yaw=%f rad, duration=%f s, relative=%d, group_mask=%d)" + 'go_to(position=%f,%f,%f m, yaw=%f rad, duration=%f s, relative=%d, group_mask=%d)' % ( request.goal.x, request.goal.y, @@ -235,21 +249,21 @@ def _go_to_callback(self, request, response, name="all"): request.group_mask, ) ) - cfs = self.cfs if name == "all" else {name: self.cfs[name]} + cfs = self.cfs if name == 'all' else {name: self.cfs[name]} for _, cf in cfs.items(): - cf.goTo([request.goal.x, request.goal.y, request.goal.z], + cf.goTo([request.goal.x, request.goal.y, request.goal.z], request.yaw, duration, request.relative, request.group_mask) - + return response - def _notify_setpoints_stop_callback(self, request, response, name="all"): - self.get_logger().info("Notify setpoint stop not yet implemented") + def _notify_setpoints_stop_callback(self, request, response, name='all'): + self.get_logger().info('Notify setpoint stop not yet implemented') return response - def _upload_trajectory_callback(self, request, response, name="all"): - self.get_logger().info("Upload trajectory(id=%d)" % (request.trajectory_id)) + def _upload_trajectory_callback(self, request, response, name='all'): + self.get_logger().info('Upload trajectory(id=%d)' % (request.trajectory_id)) - cfs = self.cfs if name == "all" else {name: self.cfs[name]} + cfs = self.cfs if name == 'all' else {name: self.cfs[name]} for _, cf in cfs.items(): pieces = [] for piece in request.pieces: @@ -259,14 +273,19 @@ def _upload_trajectory_callback(self, request, response, name="all"): poly_yaw = piece.poly_yaw duration = float(piece.duration.sec) + \ float(piece.duration.nanosec / 1e9) - pieces.append(TrajectoryPolynomialPiece(poly_x, poly_y, poly_z, poly_yaw, duration)) + pieces.append(TrajectoryPolynomialPiece( + poly_x, + poly_y, + poly_z, + poly_yaw, + duration)) cf.uploadTrajectory(request.trajectory_id, request.piece_offset, pieces) return response - - def _start_trajectory_callback(self, request, response, name="all"): + + def _start_trajectory_callback(self, request, response, name='all'): self.get_logger().info( - "start_trajectory(id=%d, timescale=%f, reverse=%d, relative=%d, group_mask=%d)" + 'start_trajectory(id=%d, timescale=%f, reverse=%d, relative=%d, group_mask=%d)' % ( request.trajectory_id, request.timescale, @@ -275,29 +294,38 @@ def _start_trajectory_callback(self, request, response, name="all"): request.group_mask, ) ) - cfs = self.cfs if name == "all" else {name: self.cfs[name]} + cfs = self.cfs if name == 'all' else {name: self.cfs[name]} for _, cf in cfs.items(): - cf.startTrajectory(request.trajectory_id, request.timescale, request.reversed, request.relative, request.group_mask) + cf.startTrajectory( + request.trajectory_id, + request.timescale, + request.reversed, + request.relative, + request.group_mask) return response - def _cmd_vel_legacy_changed(self, msg, name=""): + def _cmd_vel_legacy_changed(self, msg, name=''): """ - Topic update callback to control the attitude and thrust - of the crazyflie with teleop - """ - self.get_logger().info("cmd_vel_legacy not yet implemented") + Topic update callback. + Controls the attitude and thrust of the crazyflie with teleop. + """ + self.get_logger().info('cmd_vel_legacy not yet implemented') - def _cmd_hover_changed(self, msg, name=""): + def _cmd_hover_changed(self, msg, name=''): """ - Topic update callback to control the hover command - of the crazyflie from the velocity multiplexer (vel_mux) + Topic update callback for hover command. + + Used from the velocity multiplexer (vel_mux). """ - self.get_logger().info("cmd_hover not yet implemented") + self.get_logger().info('cmd_hover not yet implemented') def _cmd_full_state_changed(self, msg, name): - q = [msg.pose.orientation.w, msg.pose.orientation.x, msg.pose.orientation.y, msg.pose.orientation.z] + q = [msg.pose.orientation.w, + msg.pose.orientation.x, + msg.pose.orientation.y, + msg.pose.orientation.z] rpy = rowan.to_euler(q) self.cfs[name].cmdFullState( @@ -323,5 +351,5 @@ def main(args=None): crazyflie_server.destroy_node() -if __name__ == "__main__": +if __name__ == '__main__': main() diff --git a/crazyflie_sim/crazyflie_sim/crazyflie_sil.py b/crazyflie_sim/crazyflie_sim/crazyflie_sil.py index a4f9fe66b..cf904d746 100644 --- a/crazyflie_sim/crazyflie_sim/crazyflie_sil.py +++ b/crazyflie_sim/crazyflie_sim/crazyflie_sil.py @@ -1,21 +1,21 @@ #!/usr/bin/env python3 """ -Crazyflie Software-In-The-Loop Wrapper that uses the firmware Python bindings - +Crazyflie Software-In-The-Loop Wrapper that uses the firmware Python bindings. 2022 - Wolfgang Hönig (TU Berlin) """ from __future__ import annotations -import numpy as np - import cffirmware as firm +import numpy as np import rowan + from . import sim_data_types class TrajectoryPolynomialPiece: + def __init__(self, poly_x, poly_y, poly_z, poly_yaw, duration): self.poly_x = poly_x self.poly_y = poly_y @@ -23,6 +23,7 @@ def __init__(self, poly_x, poly_y, poly_z, poly_yaw, duration): self.poly_yaw = poly_yaw self.duration = duration + def copy_svec(v): return firm.mkvec(v.x, v.y, v.z) @@ -36,9 +37,7 @@ class CrazyflieSIL: MODE_LOW_POSITION = 3 MODE_LOW_VELOCITY = 4 - def __init__(self, name, initialPosition, controller_name, time_func): - # Core. self.name = name self.groupMask = 0 @@ -49,7 +48,7 @@ def __init__(self, name, initialPosition, controller_name, time_func): self.mode = CrazyflieSIL.MODE_IDLE self.planner = firm.planner() firm.plan_init(self.planner) - self.trajectories = dict() + self.trajectories = {} # previous state for HL commander self.cmdHl_pos = firm.mkvec(*initialPosition) @@ -68,7 +67,7 @@ def __init__(self, name, initialPosition, controller_name, time_func): self.state.velocity.y = 0 self.state.velocity.z = 0 self.state.attitude.roll = 0 - self.state.attitude.pitch = -0 # WARNING: this is in the legacy coordinate system + self.state.attitude.pitch = -0 # WARNING: this is in the legacy coordinate system self.state.attitude.yaw = 0 self.sensors = firm.sensorData_t() @@ -84,54 +83,65 @@ def __init__(self, name, initialPosition, controller_name, time_func): self.controller_name = controller_name # set up controller - if controller_name == "none": + if controller_name == 'none': self.controller = None - elif controller_name == "pid": + elif controller_name == 'pid': firm.controllerPidInit() self.controller = firm.controllerPid - elif controller_name == "mellinger": + elif controller_name == 'mellinger': self.mellinger_control = firm.controllerMellinger_t() firm.controllerMellingerInit(self.mellinger_control) self.controller = firm.controllerMellinger - elif controller_name == "brescianini": + elif controller_name == 'brescianini': firm.controllerBrescianiniInit() self.controller = firm.controllerBrescianini else: - raise ValueError("Unknown controller {}".format(controller_name)) + raise ValueError('Unknown controller {}'.format(controller_name)) def setGroupMask(self, groupMask): self.groupMask = groupMask - def takeoff(self, targetHeight, duration, groupMask = 0): + def takeoff(self, targetHeight, duration, groupMask=0): if self._isGroup(groupMask): self.mode = CrazyflieSIL.MODE_HIGH_POLY targetYaw = 0.0 - firm.plan_takeoff(self.planner, + firm.plan_takeoff( + self.planner, self.cmdHl_pos, - self.cmdHl_yaw, targetHeight, targetYaw, duration, self.time_func()) + self.cmdHl_yaw, + targetHeight, targetYaw, duration, self.time_func()) - def land(self, targetHeight, duration, groupMask = 0): + def land(self, targetHeight, duration, groupMask=0): if self._isGroup(groupMask): self.mode = CrazyflieSIL.MODE_HIGH_POLY targetYaw = 0.0 - firm.plan_land(self.planner, + firm.plan_land( + self.planner, self.cmdHl_pos, - self.cmdHl_yaw, targetHeight, targetYaw, duration, self.time_func()) + self.cmdHl_yaw, + targetHeight, targetYaw, duration, self.time_func()) # def stop(self, groupMask = 0): # if self._isGroup(groupMask): # self.mode = CrazyflieSIL.MODE_IDLE # firm.plan_stop(self.planner) - def goTo(self, goal, yaw, duration, relative = False, groupMask = 0): + def goTo(self, goal, yaw, duration, relative=False, groupMask=0): if self._isGroup(groupMask): if self.mode != CrazyflieSIL.MODE_HIGH_POLY: # We need to update to the latest firmware that has go_to_from. - raise ValueError("goTo from low-level modes not yet supported.") + raise ValueError('goTo from low-level modes not yet supported.') self.mode = CrazyflieSIL.MODE_HIGH_POLY - firm.plan_go_to(self.planner, relative, firm.mkvec(*goal), yaw, duration, self.time_func()) - - def uploadTrajectory(self, trajectoryId: int, pieceOffset: int, pieces: list[TrajectoryPolynomialPiece]): + firm.plan_go_to( + self.planner, + relative, + firm.mkvec(*goal), + yaw, duration, self.time_func()) + + def uploadTrajectory(self, + trajectoryId: int, + pieceOffset: int, + pieces: list[TrajectoryPolynomialPiece]): traj = firm.piecewise_traj() traj.t_begin = 0 traj.timescale = 1.0 @@ -148,7 +158,12 @@ def uploadTrajectory(self, trajectoryId: int, pieceOffset: int, pieces: list[Tra firm.poly4d_set(fwpiece, 3, coef, piece.poly_yaw[coef]) self.trajectories[trajectoryId] = traj - def startTrajectory(self, trajectoryId: int, timescale: float = 1.0, reverse: bool = False, relative: bool = True, groupMask: int = 0): + def startTrajectory(self, + trajectoryId: int, + timescale: float = 1.0, + reverse: bool = False, + relative: bool = True, + groupMask: int = 0): if self._isGroup(groupMask): self.mode = CrazyflieSIL.MODE_HIGH_POLY traj = self.trajectories[trajectoryId] @@ -185,7 +200,7 @@ def cmdFullState(self, pos, vel, acc, yaw, omega): self.setpoint.acceleration.x = acc[0] self.setpoint.acceleration.y = acc[1] self.setpoint.acceleration.z = acc[2] - + self.cmdHl_pos = copy_svec(self.setpoint.position) self.cmdHl_vel = copy_svec(self.setpoint.velocity) self.cmdHl_yaw = yaw @@ -282,18 +297,24 @@ def executeController(self): return None if self.mode != CrazyflieSIL.MODE_HIGH_POLY: - return sim_data_types.Action([0,0,0,0]) + return sim_data_types.Action([0, 0, 0, 0]) time_in_seconds = self.time_func() # ticks is essentially the time in milliseconds as an integer tick = int(time_in_seconds * 1000) - if self.controller_name != "mellinger": + if self.controller_name != 'mellinger': self.controller(self.control, self.setpoint, self.sensors, self.state, tick) else: - self.controller(self.mellinger_control, self.control, self.setpoint, self.sensors, self.state, tick) + self.controller( + self.mellinger_control, + self.control, + self.setpoint, + self.sensors, + self.state, + tick) return self._fwcontrol_to_sim_data_types_action() - # "private" methods + # 'private' methods def _isGroup(self, groupMask): return groupMask == 0 or (self.groupMask & groupMask) > 0 @@ -313,23 +334,29 @@ def pwm_to_rpm(pwm): def pwm_to_force(pwm): # polyfit using data and scripts from https://github.com/IMRCLab/crazyflie-system-id - p = [ 1.71479058e-09, 8.80284482e-05, -2.21152097e-01] + p = [1.71479058e-09, 8.80284482e-05, -2.21152097e-01] force_in_grams = np.polyval(p, pwm) force_in_newton = force_in_grams * 9.81 / 1000.0 return np.maximum(force_in_newton, 0) - return sim_data_types.Action([pwm_to_rpm(self.motors_thrust_pwm.motors.m1), - pwm_to_rpm(self.motors_thrust_pwm.motors.m2), - pwm_to_rpm(self.motors_thrust_pwm.motors.m3), - pwm_to_rpm(self.motors_thrust_pwm.motors.m4)]) - + return sim_data_types.Action( + [pwm_to_rpm(self.motors_thrust_pwm.motors.m1), + pwm_to_rpm(self.motors_thrust_pwm.motors.m2), + pwm_to_rpm(self.motors_thrust_pwm.motors.m3), + pwm_to_rpm(self.motors_thrust_pwm.motors.m4)]) @staticmethod def _fwsetpoint_to_sim_data_types_state(fwsetpoint): pos = np.array([fwsetpoint.position.x, fwsetpoint.position.y, fwsetpoint.position.z]) vel = np.array([fwsetpoint.velocity.x, fwsetpoint.velocity.y, fwsetpoint.velocity.z]) - acc = np.array([fwsetpoint.acceleration.x, fwsetpoint.acceleration.y, fwsetpoint.acceleration.z]) - omega = np.radians(np.array([fwsetpoint.attitudeRate.roll, fwsetpoint.attitudeRate.pitch, fwsetpoint.attitudeRate.yaw])) + acc = np.array([ + fwsetpoint.acceleration.x, + fwsetpoint.acceleration.y, + fwsetpoint.acceleration.z]) + omega = np.radians(np.array([ + fwsetpoint.attitudeRate.roll, + fwsetpoint.attitudeRate.pitch, + fwsetpoint.attitudeRate.yaw])) if fwsetpoint.mode.quat == firm.modeDisable: # compute rotation based on differential flatness diff --git a/crazyflie_sim/crazyflie_sim/sim_data_types.py b/crazyflie_sim/crazyflie_sim/sim_data_types.py index c5c484f49..8c1394001 100644 --- a/crazyflie_sim/crazyflie_sim/sim_data_types.py +++ b/crazyflie_sim/crazyflie_sim/sim_data_types.py @@ -1,8 +1,11 @@ import numpy as np + class State: - """Class that stores the state of a UAV as used in the simulator interface""" - def __init__(self, pos = np.zeros(3), vel = np.zeros(3), quat = np.array([1,0,0,0]), omega = np.zeros(3)): + """Class that stores the state of a UAV as used in the simulator interface.""" + + def __init__(self, pos=np.zeros(3), vel=np.zeros(3), + quat=np.array([1, 0, 0, 0]), omega=np.zeros(3)): # internally use one numpy array self._state = np.empty(13) self.pos = pos @@ -12,7 +15,7 @@ def __init__(self, pos = np.zeros(3), vel = np.zeros(3), quat = np.array([1,0,0, @property def pos(self): - """position [m; world frame]""" + """Position [m; world frame].""" return self._state[0:3] @pos.setter @@ -21,7 +24,7 @@ def pos(self, value): @property def vel(self): - """velocity [m/s; world frame]""" + """Velocity [m/s; world frame].""" return self._state[3:6] @vel.setter @@ -30,7 +33,7 @@ def vel(self, value): @property def quat(self): - """quaternion [qw, qx, qy, qz; body -> world]""" + """Quaternion [qw, qx, qy, qz; body -> world].""" return self._state[6:10] @quat.setter @@ -39,7 +42,7 @@ def quat(self, value): @property def omega(self): - """angular velocity [rad/s; body frame]""" + """Angular velocity [rad/s; body frame].""" return self._state[10:13] @omega.setter @@ -47,11 +50,13 @@ def omega(self, value): self._state[10:13] = value def __repr__(self) -> str: - return "State pos={}, vel={}, quat={}, omega={}".format(self.pos, self.vel, self.quat, self.omega) + return 'State pos={}, vel={}, quat={}, omega={}'.format( + self.pos, self.vel, self.quat, self.omega) class Action: - """Class that stores the action of a UAV as used in the simulator interface""" + """Class that stores the action of a UAV as used in the simulator interface.""" + def __init__(self, rpm): # internally use one numpy array self._action = np.empty(4) @@ -59,7 +64,7 @@ def __init__(self, rpm): @property def rpm(self): - """rotation per second [rpm]""" + """Rotation per second [rpm].""" return self._action @rpm.setter @@ -67,4 +72,4 @@ def rpm(self, value): self._action = value def __repr__(self) -> str: - return "Action rpm={}".format(self.rpm) + return 'Action rpm={}'.format(self.rpm) diff --git a/crazyflie_sim/crazyflie_sim/visualization/blender.py b/crazyflie_sim/crazyflie_sim/visualization/blender.py index ea876fd94..577687774 100755 --- a/crazyflie_sim/crazyflie_sim/visualization/blender.py +++ b/crazyflie_sim/crazyflie_sim/visualization/blender.py @@ -1,40 +1,43 @@ +import datetime +import os +from pathlib import Path + import bpy import numpy as np -import os -import datetime +from rclpy.node import Node import rowan as rw import yaml -from pathlib import Path -from rclpy.node import Node -from ..sim_data_types import State, Action -# rotation vectors are axis-angle format in "compact form", where +from ..sim_data_types import Action, State + + +# rotation vectors are axis-angle format in 'compact form', where # theta = norm(rvec) and axis = rvec / theta # they can be converted to a matrix using cv2. Rodrigues, see # https://docs.opencv.org/4.7.0/d9/d0c/group__calib3d.html#ga61585db663d9da06b68e70cfbf6a1eac def opencv2quat(rvec): angle = np.linalg.norm(rvec) if angle == 0: - q = np.array([1,0,0,0]) + q = np.array([1, 0, 0, 0]) else: axis = rvec.flatten() / angle q = rw.from_axis_angle(axis, angle) return q + class Visualization: def __init__(self, node: Node, params: dict, names: list[str], states: list[State]): self.node = node - ## blender + # blender # load environment world = bpy.context.scene.world world.use_nodes = True - background_paths = [""] - self.env = world.node_tree.nodes.new("ShaderNodeTexEnvironment") - if params["cycle_bg"]: + self.env = world.node_tree.nodes.new('ShaderNodeTexEnvironment') + if params['cycle_bg']: bg_paths = [] - p = Path(__file__).resolve().parent / "data/env" + p = Path(__file__).resolve().parent / 'data/env' print(p) for subdir, dirs, files in os.walk(p): bg_paths.extend([os.path.join(subdir, file) for file in files]) @@ -46,60 +49,64 @@ def __init__(self, node: Node, params: dict, names: list[str], states: list[Stat self.env.image = self.bg_imgs[self.bg_idx] else: self.cycle_bg = False - self.env.image = bpy.data.images.load(Path(__file__).resolve().parent / "data/env/env.jpg") + self.env.image = bpy.data.images.load( + Path(__file__).resolve().parent / 'data/env/env.jpg') node_tree = world.node_tree - node_tree.links.new(self.env.outputs["Color"], node_tree.nodes["Background"].inputs["Color"]) - - # import crazyflie object - bpy.ops.import_scene.obj(filepath=f"{Path(__file__).resolve().parent}/data/model/cf.obj", axis_forward="Y", axis_up="Z") - self.cf_default = bpy.data.objects["cf"] + node_tree.links.new( + self.env.outputs['Color'], + node_tree.nodes['Background'].inputs['Color']) + + # import crazyflie object + bpy.ops.import_scene.obj( + filepath=f'{Path(__file__).resolve().parent}/data/model/cf.obj', + axis_forward='Y', axis_up='Z') + self.cf_default = bpy.data.objects['cf'] # save scene self.scene = bpy.context.scene self.scene.render.resolution_x = 320 self.scene.render.resolution_y = 320 self.scene.render.pixel_aspect_x = 1.0 self.scene.render.pixel_aspect_y = 1.0 - self.scene.render.image_settings.file_format = "JPEG" - self.scene.unit_settings.length_unit = "METERS" - self.scene.unit_settings.system = "METRIC" + self.scene.render.image_settings.file_format = 'JPEG' + self.scene.unit_settings.length_unit = 'METERS' + self.scene.unit_settings.system = 'METRIC' self.scene.unit_settings.scale_length = 1.0 - #self.scene.render.threads = 2 # max CPU cores to use to render + # self.scene.render.threads = 2 # max CPU cores to use to render # remove default objects - bpy.data.objects.remove(bpy.data.objects["Cube"]) - bpy.data.objects.remove(bpy.data.objects["Light"]) + bpy.data.objects.remove(bpy.data.objects['Cube']) + bpy.data.objects.remove(bpy.data.objects['Light']) # create lamp - lamp_data = bpy.data.lights.new(name="Lamp", type="SUN") + lamp_data = bpy.data.lights.new(name='Lamp', type='SUN') lamp_data.energy = 1.5 lamp_data.angle = 0.19198621809482574 # 11 deg - self.lamp = bpy.data.objects.new(name="Lamp", object_data=lamp_data) + self.lamp = bpy.data.objects.new(name='Lamp', object_data=lamp_data) bpy.context.collection.objects.link(self.lamp) bpy.context.view_layer.objects.active = self.lamp # camera - self.camera = bpy.data.objects["Camera"] + self.camera = bpy.data.objects['Camera'] self.camera.data.lens = 0.7376461029052734 - self.camera.data.lens_unit = "FOV" - self.camera.data.sensor_fit = "AUTO" + self.camera.data.lens_unit = 'FOV' + self.camera.data.sensor_fit = 'AUTO' self.camera.data.sensor_width = 1.4 self.camera.data.sensor_height = 18 self.camera.data.angle = 1.518436431884765 # 87 deg self.camera.data.clip_start = 1.1e-6 # link camera to scene - self.cf_default.hide_render = False # set rotation mode to quaternion - self.cf_default.rotation_mode = "QUATERNION" - self.camera.rotation_mode = "QUATERNION" - self.lamp.rotation_mode = "QUATERNION" + self.cf_default.rotation_mode = 'QUATERNION' + self.camera.rotation_mode = 'QUATERNION' + self.lamp.rotation_mode = 'QUATERNION' - base = "simulation_results" - self.path = base + "/" + datetime.datetime.now().strftime("%Y-%m-%d_%H%M%S") + "/" - os.makedirs(self.path, exist_ok=True) + base = 'simulation_results' + self.path = base + '/' + datetime.datetime.now().strftime('%Y-%m-%d_%H%M%S') + '/' + os.makedirs(self.path, exist_ok=True) self.ts = [] self.frame = 0 - self.fps = params["fps"] # frames per second + self.fps = params['fps'] # frames per second self.names = names self.n = len(names) @@ -107,14 +114,13 @@ def __init__(self, node: Node, params: dict, names: list[str], states: list[Stat self.cam_state_filenames = [] # dictionary with (name, idx) pairs to later find corresponding cf - self.names_idx_map = dict() + self.names_idx_map = {} # init list self.cf_list = [self.cf_default] - self.cf_cameras = params["cf_cameras"] if "cf_cameras" in params else dict() + self.cf_cameras = params['cf_cameras'] if 'cf_cameras' in params else {} # matrix where rows are the constant transformation between camera and its cf # it should only be accessed if corresponding cf carries a camera self.Q_virt_cf_cam = np.zeros((self.n, 4)) - for idx, (name, state) in enumerate(zip(names, states)): self.names_idx_map[name] = idx @@ -128,38 +134,43 @@ def __init__(self, node: Node, params: dict, names: list[str], states: list[Stat # set positions self.cf_list[idx].location = np.array(state.pos) - self.tvecs = np.zeros((self.n, 3)) for name in names: - os.mkdir(self.path + "/" + name + "/") # create dir for every cf for saving images - csf = f"{self.path}/{name}/{name}.csv" - calibration_sf = f"{self.path}/{name}/calibration.yaml" + os.mkdir(self.path + '/' + name + '/') # create dir for every cf for saving images + csf = f'{self.path}/{name}/{name}.csv' + calibration_sf = f'{self.path}/{name}/calibration.yaml' # initialize /.csv self.state_filenames.append(csf) - ### self.cam_state_filenames.append(cam_sf) - with open(csf, "w") as file: - file.write("image_name,timestamp,x,y,z,qw,qx,qy,qz\n") + # self.cam_state_filenames.append(cam_sf) + with open(csf, 'w') as file: + file.write('image_name,timestamp,x,y,z,qw,qx,qy,qz\n') if name in self.cf_cameras: - calibration = self.cf_cameras[name]["calibration"] - calibration["dist_coeff"] = np.zeros(5).tolist() - calibration["camera_matrix"] = np.array([[170.0,0,160.0], [0,170.0,160.0],[0,0,1]]).tolist() - with open(calibration_sf, "w") as file: + calibration = self.cf_cameras[name]['calibration'] + calibration['dist_coeff'] = np.zeros(5).tolist() + calibration['camera_matrix'] = np.array([ + [170.0, 0, 160.0], + [0, 170.0, 160.0], + [0, 0, 1]]).tolist() + with open(calibration_sf, 'w') as file: yaml.dump(calibration, file) - rvec = np.array(calibration["rvec"]) if "rvec" in calibration else np.array([1.2092, -1.2092, 1.2092]) - tvec = np.array(calibration["tvec"]) if "tvec" in calibration else np.zeros(3) + rvec = np.array([1.2092, -1.2092, 1.2092]) + if 'rvec' in calibration: + rvec = np.array(calibration['rvec']) + tvec = np.array(calibration['tvec']) if 'tvec' in calibration else np.zeros(3) self.tvecs[self.names_idx_map[name]] = tvec q_real_camera_to_robot = rw.inverse(opencv2quat(rvec)) - q_virtual_camera_to_real_camera = rw.from_euler(np.pi, 0, 0, "xyz") - self.Q_virt_cf_cam[self.names_idx_map[name]] = rw.multiply(q_real_camera_to_robot, q_virtual_camera_to_real_camera) - + q_virtual_camera_to_real_camera = rw.from_euler(np.pi, 0, 0, 'xyz') + self.Q_virt_cf_cam[self.names_idx_map[name]] = rw.multiply( + q_real_camera_to_robot, + q_virtual_camera_to_real_camera) def step(self, t, states: list[State], states_desired: list[State], actions: list[Action]): self.ts.append(t) - # render and record `self.fps` frames per second + # render and record `self.fps` frames per second if t - self.frame / self.fps >= 1 / self.fps: self.frame += 1 # quaternion matrix - Q = np.zeros((self.n, 4)) + Q = np.zeros((self.n, 4)) # position matrix P = np.zeros((self.n, 3)) @@ -167,17 +178,20 @@ def step(self, t, states: list[State], states_desired: list[State], actions: lis for name, state in zip(self.names, states): idx = self.names_idx_map[name] Q[idx] = np.array(state.quat) - P[idx] = np.array(state.pos) + P[idx] = np.array(state.pos) # set rotations self.cf_list[idx].rotation_quaternion = Q[idx] # set positions self.cf_list[idx].location = P[idx] - # record states - image_name = f"{self.names[idx]}_{self.frame:05}.jpg" if name in self.cf_cameras else "None" # image capturing scene from cf's pov or None + # record states + # image capturing scene from cf's pov or None + image_name = None + if name in self.cf_cameras: + image_name = f'{self.names[idx]}_{self.frame:05}.jpg' # record cf's state in world frame - with open(self.state_filenames[idx], "a") as file: - file.write(f"{image_name},{t},{P[idx,0]},{P[idx,1]},{P[idx,2]},{Q[idx,0]},{Q[idx,1]},{Q[idx,2]},{Q[idx,3]}\n") + with open(self.state_filenames[idx], 'a') as file: + file.write(f'{image_name},{t},{P[idx,0]},{P[idx,1]},{P[idx,2]},{Q[idx,0]},{Q[idx,1]},{Q[idx,2]},{Q[idx,3]}\n') # noqa E501 # cycle background image if enabled if self.cycle_bg: @@ -199,12 +213,12 @@ def step(self, t, states: list[State], states_desired: list[State], actions: lis # hide corresponding cf for rendering self.cf_list[idx].hide_render = True # Render image - image_name = f"{name}_{self.frame:05}.jpg" # image capturing scene from cf's pov - self.scene.render.filepath = f"{self.path}/{name}/{image_name}" + image_name = f'{name}_{self.frame:05}.jpg' # image capturing scene from cf's pov + self.scene.render.filepath = f'{self.path}/{name}/{image_name}' bpy.ops.render.render(write_still=True) # show again after rendering self.cf_list[idx].hide_render = False def shutdown(self): - for idx in range(1,self.n): + for idx in range(1, self.n): bpy.data.objects.remove(self.cf_list[idx]) diff --git a/crazyflie_sim/crazyflie_sim/visualization/pdf.py b/crazyflie_sim/crazyflie_sim/visualization/pdf.py index bb789b296..d3b04507d 100644 --- a/crazyflie_sim/crazyflie_sim/visualization/pdf.py +++ b/crazyflie_sim/crazyflie_sim/visualization/pdf.py @@ -1,16 +1,18 @@ from __future__ import annotations -from rclpy.node import Node -from ..sim_data_types import State, Action - import copy + +from matplotlib.backends.backend_pdf import PdfPages +import matplotlib.pyplot as plt import numpy as np +from rclpy.node import Node import rowan -import matplotlib.pyplot as plt -from matplotlib.backends.backend_pdf import PdfPages + +from ..sim_data_types import Action, State + class Visualization: - """Plots current and desired states into a PDF""" + """Plots current and desired states into a PDF.""" def __init__(self, node: Node, params: dict, names: list[str], states: list[State]): self.node = node @@ -19,7 +21,7 @@ def __init__(self, node: Node, params: dict, names: list[str], states: list[Stat self.all_states = [] self.all_states_desired = [] self.all_actions = [] - self.filename = params["output_file"] + self.filename = params['output_file'] def step(self, t, states: list[State], states_desired: list[State], actions: list[Action]): self.ts.append(t) @@ -39,79 +41,79 @@ def shutdown(self): # position fig, axs = plt.subplots(3, 1, sharex=True) - axs[0].set_ylabel("px [m]") - axs[1].set_ylabel("py [m]") - axs[2].set_ylabel("pz [m]") - axs[-1].set_xlabel("Time [s]") + axs[0].set_ylabel('px [m]') + axs[1].set_ylabel('py [m]') + axs[2].set_ylabel('pz [m]') + axs[-1].set_xlabel('Time [s]') for d in range(3): - axs[d].plot(self.ts, cf_states[:,d], label="state") - axs[d].plot(self.ts, cf_states_desired[:,d], label="desired") + axs[d].plot(self.ts, cf_states[:, d], label='state') + axs[d].plot(self.ts, cf_states_desired[:, d], label='desired') axs[0].legend() pdf.savefig(fig) plt.close() # velocity fig, axs = plt.subplots(3, 1, sharex=True) - axs[0].set_ylabel("vx [m/s]") - axs[1].set_ylabel("vy [m/s]") - axs[2].set_ylabel("vz [m/s]") - axs[-1].set_xlabel("Time [s]") + axs[0].set_ylabel('vx [m/s]') + axs[1].set_ylabel('vy [m/s]') + axs[2].set_ylabel('vz [m/s]') + axs[-1].set_xlabel('Time [s]') for d in range(3): - axs[d].plot(self.ts, cf_states[:,3+d], label="state") - axs[d].plot(self.ts, cf_states_desired[:,3+d], label="desired") + axs[d].plot(self.ts, cf_states[:, 3+d], label='state') + axs[d].plot(self.ts, cf_states_desired[:, 3+d], label='desired') axs[0].legend() pdf.savefig(fig) plt.close() # orientation fig, axs = plt.subplots(3, 1, sharex=True) - axs[0].set_ylabel("roll [deg]") - axs[1].set_ylabel("pitch [deg]") - axs[2].set_ylabel("yaw [deg]") - axs[-1].set_xlabel("Time [s]") + axs[0].set_ylabel('roll [deg]') + axs[1].set_ylabel('pitch [deg]') + axs[2].set_ylabel('yaw [deg]') + axs[-1].set_xlabel('Time [s]') - rpy = np.degrees(rowan.to_euler(cf_states[:,6:10], convention='xyz')) - rpy_desired = np.degrees(rowan.to_euler(cf_states_desired[:,6:10], convention='xyz')) + rpy = np.degrees(rowan.to_euler(cf_states[:, 6:10], convention='xyz')) + rpy_desired = np.degrees( + rowan.to_euler(cf_states_desired[:, 6:10], convention='xyz')) for d in range(3): - axs[d].plot(self.ts, rpy[:,d], label="state") - axs[d].plot(self.ts, rpy_desired[:,d], label="desired") + axs[d].plot(self.ts, rpy[:, d], label='state') + axs[d].plot(self.ts, rpy_desired[:, d], label='desired') axs[0].legend() pdf.savefig(fig) plt.close() # omega fig, axs = plt.subplots(3, 1, sharex=True) - axs[0].set_ylabel("wx [deg/s]") - axs[1].set_ylabel("wy [deg/s]") - axs[2].set_ylabel("wz [deg/s]") - axs[-1].set_xlabel("Time [s]") + axs[0].set_ylabel('wx [deg/s]') + axs[1].set_ylabel('wy [deg/s]') + axs[2].set_ylabel('wz [deg/s]') + axs[-1].set_xlabel('Time [s]') for d in range(3): - axs[d].plot(self.ts, np.degrees(cf_states[:,10+d]), label="state") - axs[d].plot(self.ts, np.degrees(cf_states_desired[:,10+d]), label="desired") + axs[d].plot(self.ts, np.degrees(cf_states[:, 10+d]), label='state') + axs[d].plot(self.ts, np.degrees(cf_states_desired[:, 10+d]), label='desired') axs[0].legend() pdf.savefig(fig) plt.close() # actions fig, axs = plt.subplots(2, 2, sharex=True, sharey=True) - axs[0,0].set_ylabel("rpm") - axs[1,0].set_ylabel("rpm") - axs[1,0].set_xlabel("Time [s]") - axs[1,1].set_xlabel("Time [s]") - - axs[0,0].plot(self.ts, cf_actions[:,3], label="M4") - axs[0,0].set_title("M4") - axs[0,1].plot(self.ts, cf_actions[:,0], label="M1") - axs[0,1].set_title("M1") - axs[1,1].plot(self.ts, cf_actions[:,1], label="M2") - axs[1,1].set_title("M2") - axs[1,0].plot(self.ts, cf_actions[:,2], label="M3") - axs[1,0].set_title("M3") + axs[0, 0].set_ylabel('rpm') + axs[1, 0].set_ylabel('rpm') + axs[1, 0].set_xlabel('Time [s]') + axs[1, 1].set_xlabel('Time [s]') + + axs[0, 0].plot(self.ts, cf_actions[:, 3], label='M4') + axs[0, 0].set_title('M4') + axs[0, 1].plot(self.ts, cf_actions[:, 0], label='M1') + axs[0, 1].set_title('M1') + axs[1, 1].plot(self.ts, cf_actions[:, 1], label='M2') + axs[1, 1].set_title('M2') + axs[1, 0].plot(self.ts, cf_actions[:, 2], label='M3') + axs[1, 0].set_title('M3') pdf.savefig(fig) plt.close() - diff --git a/crazyflie_sim/crazyflie_sim/visualization/record_states.py b/crazyflie_sim/crazyflie_sim/visualization/record_states.py index 9734f1cd6..240bfae4d 100644 --- a/crazyflie_sim/crazyflie_sim/visualization/record_states.py +++ b/crazyflie_sim/crazyflie_sim/visualization/record_states.py @@ -1,50 +1,53 @@ from __future__ import annotations -import numpy as np -import os import datetime -from pathlib import Path +import os + +import numpy as np from rclpy.node import Node -from ..sim_data_types import State, Action + +from ..sim_data_types import Action, State class Visualization: - """Records states in given format file""" + """Records states in given format file.""" def __init__(self, node: Node, params: dict, names: list[str], states: list[State]): self.node = node self.names = names - self.outdir = params["output_dir"] if "output_dir" in params else "state_info" - self.outdir = self.outdir + "/" + datetime.datetime.now().strftime("%Y-%m-%d_%H%M%S") - os.makedirs(self.outdir, exist_ok=True) + self.outdir = params['output_dir'] if 'output_dir' in params else 'state_info' + self.outdir = self.outdir + '/' + datetime.datetime.now().strftime('%Y-%m-%d_%H%M%S') + os.makedirs(self.outdir, exist_ok=True) self.names_idx_map = {} - self.logging_time = params["logging_time"] if "logging_time" in params else 0.3 # how many seconds to leave between logs - self.last_log = -self.logging_time # timestamp of last log, initialized to -self.logging_time so log occurs at t=0 + # how many seconds to leave between logs + self.logging_time = params['logging_time'] if 'logging_time' in params else 0.3 + # timestamp of last log, initialized to -self.logging_time so log occurs at t=0 + self.last_log = -self.logging_time self.supported_formats = { - "csv": { - "log": self.__log_csv + 'csv': { + 'log': self.__log_csv }, - "np": { - "log": self.__log_np, - "shutdown": self.__shutdown_np + 'np': { + 'log': self.__log_np, + 'shutdown': self.__shutdown_np } - } - self.active_formats = [fmt for fmt in params["file_formats"]] + } + self.active_formats = params['file_formats'] for idx, name in enumerate(names): self.names_idx_map[name] = idx self.n = len(names) - if "csv" in self.active_formats: + if 'csv' in self.active_formats: for idx, name in enumerate(names): - os.makedirs(f"{self.outdir}/csv", exist_ok=True) - csf = f"{self.outdir}/csv/{name}.csv" + os.makedirs(f'{self.outdir}/csv', exist_ok=True) + csf = f'{self.outdir}/csv/{name}.csv' # initialize .csv - with open(csf, "w") as file: - file.write("timestamp,x,y,z,qw,qx,qy,qz\n") - self.ts = [] # list of timestamps - if "np" in self.active_formats: - os.makedirs(f"{self.outdir}/np", exist_ok=True) - self.Ps = [] # list of positions matrices (self.n x 3) - self.Qs = [] # list of quaternion matrices (self.n x 4) + with open(csf, 'w') as file: + file.write('timestamp,x,y,z,qw,qx,qy,qz\n') + self.ts = [] # list of timestamps + if 'np' in self.active_formats: + os.makedirs(f'{self.outdir}/np', exist_ok=True) + self.Ps = [] # list of positions matrices (self.n x 3) + self.Qs = [] # list of quaternion matrices (self.n x 4) def step(self, t, states: list[State], states_desired: list[State], actions: list[Action]): if t - self.last_log >= self.logging_time: @@ -52,32 +55,33 @@ def step(self, t, states: list[State], states_desired: list[State], actions: lis self.ts.append(t) P = np.zeros((self.n, 3)) Q = np.zeros((self.n, 4)) - for name, state in zip(self.names, states): - idx = self.names_idx_map[name] + for name, state in zip(self.names, states): + idx = self.names_idx_map[name] P[idx] = np.array(state.pos) Q[idx] = np.array(state.quat) for fmt in self.active_formats: - self.supported_formats[fmt]["log"](t, idx, P, Q) - + self.supported_formats[fmt]['log'](t, idx, P, Q) def __log_csv(self, t, idx: int, P: np.ndarray, Q: np.ndarray): - """ record states in csv file """ - with open(f"{self.outdir}/csv/{self.names[idx]}.csv", "a") as file: - file.write(f"{t},{P[idx,0]},{P[idx,1]},{P[idx,2]},{Q[idx,0]},{Q[idx,1]},{Q[idx,2]},{Q[idx,3]}\n") + """Record states in csv file.""" + with open(f'{self.outdir}/csv/{self.names[idx]}.csv', 'a') as file: + file.write(f'{t},{P[idx,0]},{P[idx,1]},{P[idx,2]},{Q[idx,0]},{Q[idx,1]},{Q[idx,2]},{Q[idx,3]}\n') # noqa E501 def __log_np(self, t, idx: int, P: np.ndarray, Q: np.ndarray): self.Ps.append(P) self.Qs.append(Q) - def __shutdown_np(self): P = np.array(self.Ps) Q = np.array(self.Qs) for idx, name in enumerate(self.names): - np.savez_compressed(f"{self.outdir}/np/{name}.npz", t=self.ts, pos=P[idx::self.n,idx,:], quat=Q[idx::self.n,idx,:]) + np.savez_compressed( + f'{self.outdir}/np/{name}.npz', + t=self.ts, + pos=P[idx::self.n, idx, :], + quat=Q[idx::self.n, idx, :]) def shutdown(self): for fmt in self.active_formats: - if "shutdown" in self.supported_formats[fmt]: - self.supported_formats[fmt]["shutdown"]() - + if 'shutdown' in self.supported_formats[fmt]: + self.supported_formats[fmt]['shutdown']() diff --git a/crazyflie_sim/crazyflie_sim/visualization/rviz.py b/crazyflie_sim/crazyflie_sim/visualization/rviz.py index 1c629ee66..c1ed92687 100644 --- a/crazyflie_sim/crazyflie_sim/visualization/rviz.py +++ b/crazyflie_sim/crazyflie_sim/visualization/rviz.py @@ -1,13 +1,14 @@ from __future__ import annotations from geometry_msgs.msg import TransformStamped -from tf2_ros import TransformBroadcaster from rclpy.node import Node -from ..sim_data_types import State, Action +from tf2_ros import TransformBroadcaster + +from ..sim_data_types import Action, State class Visualization: - """Publishes ROS 2 transforms of the states, so that they can be visualized in RVIZ""" + """Publishes ROS 2 transforms of the states, so that they can be visualized in RVIZ.""" def __init__(self, node: Node, params: dict, names: list[str], states: list[State]): self.node = node @@ -20,7 +21,7 @@ def step(self, t, states: list[State], states_desired: list[State], actions: lis for name, state in zip(self.names, states): msg = TransformStamped() msg.header.stamp = self.node.get_clock().now().to_msg() - msg.header.frame_id = "world" + msg.header.frame_id = 'world' msg.child_frame_id = name msg.transform.translation.x = state.pos[0] msg.transform.translation.y = state.pos[1] @@ -34,4 +35,3 @@ def step(self, t, states: list[State], states_desired: list[State], actions: lis def shutdown(self): pass - diff --git a/crazyflie_sim/package.xml b/crazyflie_sim/package.xml index dd1caa89f..0bdea17c3 100644 --- a/crazyflie_sim/package.xml +++ b/crazyflie_sim/package.xml @@ -17,6 +17,7 @@ ament_flake8 ament_pep257 python3-pytest + ament_cmake_pytest ament_cmake diff --git a/crazyflie_sim/test/test_copyright.py b/crazyflie_sim/test/test_copyright.py deleted file mode 100644 index cc8ff03f7..000000000 --- a/crazyflie_sim/test/test_copyright.py +++ /dev/null @@ -1,23 +0,0 @@ -# Copyright 2015 Open Source Robotics Foundation, Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from ament_copyright.main import main -import pytest - - -@pytest.mark.copyright -@pytest.mark.linter -def test_copyright(): - rc = main(argv=['.', 'test']) - assert rc == 0, 'Found errors'