Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature pytest #385

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions .github/workflows/systemtests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -39,17 +39,16 @@ jobs:
source /opt/ros/humble/setup.bash
. install/local_setup.bash
export ROS_LOCALHOST_ONLY=1
python3 src/crazyswarm2/systemtests/run_test.py
python3 src/crazyswarm2/systemtests/test_flights.py

- name: Upload files
id: step6
uses: actions/upload-artifact@v3
with:
name: pdf_bagfiles_and_logs
path: |
ros2_ws/bagfiles
ros2_ws/results
~/.ros/log




Expand Down
36 changes: 24 additions & 12 deletions systemtests/plotter_class.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from matplotlib.backends.backend_pdf import PdfPages
import numpy as np
from crazyflie_py.uav_trajectory import Trajectory
from pathlib import Path


class Plotter:
Expand All @@ -21,9 +22,10 @@ def __init__(self):
self.deviation = [] #list of all indexes where euclidian_distance(ideal - recorded) > EPSILON

self.EPSILON = 0.05 # euclidian distance in [m] between ideal and recorded trajectory under which the drone has to stay to pass the test
self.DELAY_CONST_FIG8 = 4.75 #this is the delay constant which I found by adding up all the time.sleep() etc in the figure8.py file. This should be implemented better later
self.ALTITUDE_CONST_FIG8 = 1.0 #this is the altitude given for the takeoff in figure8.py. I should find a better solution than a symbolic constant ?
self.DELAY_CONST_FIG8 = 4.75 #this is the delay constant which I found by adding up all the time.sleep() etc in the figure8.py file. This could be implemented better later ?
self.ALTITUDE_CONST_FIG8 = 1 #this is the altitude given for the takeoff in figure8.py. I should find a better solution than a symbolic constant ?
self.ALTITUDE_CONST_MULTITRAJ = 1 #takeoff altitude for traj0 in multi_trajectory.py
self.X_OFFSET_CONST_MULTITRAJ = -0.3 #offest on the x axis between ideal and real trajectory. Reason: ideal trajectory (traj0.csv) starts with offset of 0.3m and CrazyflieServer.startTrajectory() is relative to start position

def file_guard(self, pdf_path):
msg = None
Expand All @@ -45,18 +47,22 @@ def file_guard(self, pdf_path):
def read_csv_and_set_arrays(self, ideal_csvfile, rosbag_csvfile):
'''Method that reads the csv data of the ideal test trajectory and of the actual recorded trajectory and initializes the attribute arrays'''

#check which test we are plotting : figure8 or multi_trajectory
if("fig8" in rosbag_csvfile):
fig8 = True
#check which test we are plotting : figure8 or multi_trajectory or another one
if("figure8" in rosbag_csvfile):
fig8, m_t = True, False
print("Plotting fig8 test data")
else:
fig8 = False
elif "multi_trajectory" in rosbag_csvfile:
fig8, m_t = False, True
print("Plotting multi_trajectory test data")
else:
fig8, m_t = False, False
print("Plotting unspecified test data")

#get ideal trajectory data
self.ideal_traj_csv = Trajectory()
try:
path_to_ideal_csv = os.path.join(os.path.dirname(os.path.abspath(__file__)),ideal_csvfile)
print(path_to_ideal_csv)
self.ideal_traj_csv.loadcsv(path_to_ideal_csv)
except FileNotFoundError:
print("Plotter : File not found " + path_to_ideal_csv)
Expand Down Expand Up @@ -93,11 +99,15 @@ def read_csv_and_set_arrays(self, ideal_csvfile, rosbag_csvfile):
pos = [0,0,0] #for all recorded datapoints who cannot be matched to a corresponding ideal position we assume the drone is on its ground start position (ie those datapoints are before takeoff or after landing)


self.ideal_traj_x[i], self.ideal_traj_y[i]= pos[0], pos[1]
if(fig8) : #special case : in fig8 test no altitude is given in the trajectory polynomials but is stays constant after takeoff in figure8.py
self.ideal_traj_z[i] = self.ALTITUDE_CONST_FIG8
else : #for multi_trajectory test the altitude given in the trajectory polynomials is added to the takeoff altitude
self.ideal_traj_z[i] = self.ALTITUDE_CONST_MULTITRAJ + pos[2]
self.ideal_traj_x[i], self.ideal_traj_y[i], self.ideal_traj_z[i]= pos[0], pos[1], pos[2]

#special cases
if fig8:
self.ideal_traj_z[i] = self.ALTITUDE_CONST_FIG8 #special case: in fig8 no altitude is given in the trajectory polynomials (idealcsv) but is fixed as the takeoff altitude in figure8.py
elif m_t:
self.ideal_traj_z[i] = pos[2] + self.ALTITUDE_CONST_MULTITRAJ #for multi_trajectory the altitude given in the trajectory polynomials is added to the fixed takeoff altitude specified in multi_trajectory.py
self.ideal_traj_x[i] = pos[0] + self.X_OFFSET_CONST_MULTITRAJ #the x-axis is offset by 0.3 m because ideal start position not being (0,0,0)


self.euclidian_dist[i] = np.linalg.norm([self.ideal_traj_x[i]-self.bag_x[i],
self.ideal_traj_y[i]-self.bag_y[i], self.ideal_traj_z[i]-self.bag_z[i]])
Expand Down Expand Up @@ -341,3 +351,5 @@ def test_passed(self) -> bool :
if args.open:
import subprocess
subprocess.call(["xdg-open", args.pdf])


134 changes: 0 additions & 134 deletions systemtests/run_test.py

This file was deleted.

162 changes: 162 additions & 0 deletions systemtests/test_flights.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
import unittest

from pathlib import Path
import shutil
import os
from plotter_class import Plotter
from mcap_handler import McapHandler
from subprocess import Popen, PIPE, TimeoutExpired
import time
import signal
import atexit

#############################

def setUpModule():

path = Path(__file__) #Path(__file__) in this case "/home/github/actions-runner/_work/crazyswarm2/crazyswarm2/ros2_ws/src/crazyswarm2/systemtests/test_flights.py" ; path.parents[0]=.../systemstests

#delete results, logs and bags of previous experiments if they exist
if(Path(path.parents[3] / "results").exists()):
shutil.rmtree(path.parents[3] / "results")

os.makedirs(path.parents[3] / "results") # /home/github/actions-runner/_work/crazyswarm2/crazyswarm2/ros2_ws/results

def tearDownModule():
pass


def clean_process(process:Popen) -> int :
'''Kills process and its children on exit if they aren't already terminated (called with atexit). Returns 0 on termination, 1 if SIGKILL was needed'''
if process.poll() == None:
group_id = os.getpgid(process.pid)
print(f"cleaning process {group_id}")
os.killpg(group_id, signal.SIGTERM)
time.sleep(0.01) #necessary delay before first poll
i=0
while i < 10 and process.poll() == None: #in case the process termination is lazy and takes some time, we wait up to 0.5 sec per process
if process.poll() != None:
return 0 #if we have a returncode-> it terminated
time.sleep(0.05) #if not wait a bit longer
if(i == 9):
os.killpg(group_id, signal.SIGKILL)
return 1 #after 0.5s we stop waiting, consider it did not terminate correctly and kill it
return 0
else:
return 0 #process already terminated


class TestFlights(unittest.TestCase):

def __init__(self, methodName: str = "runTest") -> None:
super().__init__(methodName)
self.test_file = None
self.launch_crazyswarm : Popen = None
self.ros2_ws = Path(__file__).parents[3] #/home/github/actions-runner/_work/crazyswarm2/crazyswarm2/ros2_ws

def idFolderName(self):
return self.id().split(".")[-1] #returns the name of the test_function currently being run, for example "test_figure8"

# runs once per test_ function
def setUp(self):

if(Path(Path.home() / ".ros/log").exists()): #delete log files of the previous test
shutil.rmtree(Path.home() / ".ros/log")
self.test_file = None

# launch server
src = "source " + str(Path(__file__).parents[3] / "install/setup.bash") # -> "source /home/github/actions-runner/_work/crazyswarm2/crazyswarm2/ros2_ws/install/setup.bash"
command = f"{src} && ros2 launch crazyflie launch.py"
self.launch_crazyswarm = Popen(command, shell=True, stderr=True, stdout=PIPE, text=True,
start_new_session=True, executable="/bin/bash")
atexit.register(clean_process, self.launch_crazyswarm) #atexit helps us to make sure processes are cleaned even if script exits unexpectedly
time.sleep(1)


# runs once per test_ function
def tearDown(self) -> None:
clean_process(self.launch_crazyswarm) #kill crazyswarm_server and all of its child processes

# copy .ros/log files to results folder
if Path(Path.home() / ".ros/log").exists():
shutil.copytree(Path.home() / ".ros/log", Path(__file__).parents[3] / f"results/{self.idFolderName()}/roslogs")

return super().tearDown()



def record_start_and_clean(self, testname:str, max_wait:int):
'''Starts recording the /tf topic in a rosbag, starts the test, waits, closes the rosbag and terminate all processes. max_wait is the max amount of time you want to wait
before forcefully terminating the test flight script (in case it never finishes correctly).
NB the testname must be the name of the crayzflie_examples executable (ie the CLI grammar "ros2 run crazyflie_examples testname" must be valid)'''

src = f"source {str(self.ros2_ws)}/install/setup.bash"
try:
command = f"{src} && ros2 bag record -s mcap -o test_{testname} /tf"
record_bag = Popen(command, shell=True, stderr=PIPE, stdout=True, text=True,
cwd= self.ros2_ws / "results/", start_new_session=True, executable="/bin/bash")
atexit.register(clean_process, record_bag)

command = f"{src} && ros2 run crazyflie_examples {testname}"
start_flight_test = Popen(command, shell=True, stderr=True, stdout=True,
start_new_session=True, text=True, executable="/bin/bash")
atexit.register(clean_process, start_flight_test)

start_flight_test.wait(timeout=max_wait) #raise Timeoutexpired after max_wait seconds if start_flight_test didn't finish by itself
clean_process(start_flight_test)
clean_process(record_bag)

except TimeoutExpired: #if max_wait is exceeded
clean_process(start_flight_test)
clean_process(record_bag)

except KeyboardInterrupt: #if drone crashes, user can ^C to skip the waiting
clean_process(start_flight_test)
clean_process(record_bag)

#if something went wrong with the bash command lines in Popen, print the error
if record_bag.stderr != None:
print(testname," record_bag stderr: ", record_bag.stderr.readlines())
if start_flight_test.stderr != None:
print(testname," start_flight flight stderr: ", start_flight_test.stderr.readlines())


def translate_plot_and_check(self, testname:str) -> bool :
'''Translates rosbag .mcap format to .csv, then uses that csv to plot a pdf. Checks the deviation between ideal and real trajectories, i.e if the drone
successfully followed its given trajectory. Returns True if deviation < epsilon(defined in plotter_class.py) at every timestep, false if not. '''

# NB : the mcap filename is almost the same as the folder name but has _0 at the end
inputbag = f"{str(self.ros2_ws)}/results/test_{testname}/test_{testname}_0.mcap"
output_csv = f"{str(self.ros2_ws)}/results/test_{testname}/test_{testname}_0.csv"

writer = McapHandler()
writer.write_mcap_to_csv(inputbag, output_csv) #translate bag from mcap to csv
output_pdf = f"{str(self.ros2_ws)}/results/test_{testname}/results_{testname}.pdf"
rosbag_csv = output_csv

plotter = Plotter()
plotter.create_figures(self.test_file, rosbag_csv, output_pdf) #plot the data
return plotter.test_passed()



def test_figure8(self):
self.test_file = "../crazyflie_examples/crazyflie_examples/data/figure8.csv"
# run test
self.record_start_and_clean("figure8", 20)
#create the plot etc
test_passed = self.translate_plot_and_check("figure8")
assert test_passed, "figure8 test failed : deviation larger than epsilon"

def test_multi_trajectory(self):
self.test_file = "../crazyflie_examples/crazyflie_examples/data/multi_trajectory/traj0.csv"
self.record_start_and_clean("multi_trajectory", 80)
test_passed = self.translate_plot_and_check("multi_trajectory")
assert test_passed, "multitrajectory test failed : deviation larger than epsilon"




if __name__ == '__main__':
unittest.main()

Loading