Skip to content

Commit

Permalink
systemtests: add support for automatically querying timestamps
Browse files Browse the repository at this point in the history
  • Loading branch information
julienthevenoz authored May 23, 2024
1 parent 46486b4 commit 97ed0a9
Show file tree
Hide file tree
Showing 3 changed files with 98 additions and 107 deletions.
29 changes: 21 additions & 8 deletions systemtests/mcap_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

class McapHandler:
def __init__(self):
pass
self.takeoff_time = None

def read_messages(self, input_bag: str):
reader = rosbag2_py.SequentialReader()
Expand Down Expand Up @@ -36,18 +36,32 @@ def write_mcap_to_csv(self, inputbag:str, outputfile:str):
'''A method which translates an .mcap rosbag file format to a .csv file.
Also modifies the timestamp to start at 0.0 instead of the wall time.
Only written to translate the /tf topic but could easily be extended to other topics'''

t_start = None
t_start_bag = None #this is the timestamp of the first message we read in the bag (doesn't mean it's exactly the start time of the bag but close enough ?)
try:
print("Translating .mcap to .csv")
f = open(outputfile, 'w+')
writer = csv.writer(f)
writer.writerow(["# t"," x"," y"," z"])
for topic, msg, timestamp in self.read_messages(inputbag):
if topic =="/tf":
if t_start == None:
t_start = msg.transforms[0].header.stamp.sec + msg.transforms[0].header.stamp.nanosec *10**(-9)
t = msg.transforms[0].header.stamp.sec + msg.transforms[0].header.stamp.nanosec *10**(-9) - t_start
if t_start_bag == None:
t_start_bag = msg.transforms[0].header.stamp.sec + msg.transforms[0].header.stamp.nanosec *10**(-9)
t = msg.transforms[0].header.stamp.sec + msg.transforms[0].header.stamp.nanosec *10**(-9) -t_start_bag
writer.writerow([t, msg.transforms[0].transform.translation.x, msg.transforms[0].transform.translation.y, msg.transforms[0].transform.translation.z])
if topic == "/rosout": #and t_start_bag != None :
if msg.name == "crazyflie_server" and msg.function == "takeoff":
t = msg.stamp.sec + msg.stamp.nanosec *10**(-9) - t_start_bag
self.takeoff_time = t
#the takeoff message in simulation has a sligthly different name than IRL, so we need this to record the sim takeoff time
#BUT for some unknown reason the sim tests seem to work perfectly without even recording takeoff and adjusting the offset
#so I will leave this commented here just in case the sim tests have to be worked on
# if msg.name == "crazyflie_server" and msg.function == "_takeoff_callback":
# t = msg.stamp.sec + msg.stamp.nanosec *10**(-9)
# self.takeoff_time = t
# print(f"takeoff at {self.takeoff_time}")

#write the "takeoff command" time as a comment on the last line
writer.writerow([f"### takeoff time : {self.takeoff_time}"])
f.close()
except FileNotFoundError:
print(f"McapHandler : file {outputfile} not found")
Expand All @@ -67,5 +81,4 @@ def write_mcap_to_csv(self, inputbag:str, outputfile:str):
args:Namespace = parser.parse_args()

translator = McapHandler()
translator.write_mcap_to_csv(args.inputbag,args.outputfile)

translator.write_mcap_to_csv(args.inputbag,args.outputfile)
163 changes: 71 additions & 92 deletions systemtests/plotter_class.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import os
import sys
from matplotlib.backends.backend_pdf import PdfPages
from matplotlib.patches import Rectangle
import numpy as np
from crazyflie_py.uav_trajectory import Trajectory
from pathlib import Path
Expand All @@ -24,12 +25,12 @@ def __init__(self, sim_backend = False):

self.SIM = sim_backend #indicates if we are plotting data from real life test or from a simulated test. Default is false (real life test)
self.EPSILON = 0.15 # euclidian distance in [m] between ideal and recorded trajectory under which the drone has to stay to pass the test (NB : epsilon is doubled for multi_trajectory test)
self.ideal_takeoff = 0.6
self.ideal_traj_start = 5.6
self.ALLOWED_DEV_POINTS = 0.05 #allowed percentage of datapoints whose deviation > EPSILON while still passing test (currently % for fig8 and 10% for mt)
self.DELAY_CONST_FIG8 = 1.3 #delay const used to temporally align the ideal traj with the real traj
self.DELAY_CONST_MT = 5.5
if self.SIM : #It allows to temporally adjust the ideal and real trajectories on the graph. Could this be implemented in a better (not hardcoded) way ?
self.DELAY_CONST_FIG8 = -0.45 #for an unknown reason, the delay constants with the sim_backend is different
self.DELAY_CONST_MT = -0.3
# if self.SIM : #It allows to temporally adjust the ideal and real trajectories on the graph. Could this be implemented in a better (not hardcoded) way ?
# self.DELAY_CONST_FIG8 = 0#-0.45 #for an unknown reason, the delay constants with the sim_backend is different
# self.DELAY_CONST_MT = 0#-0.3
self.ALTITUDE_CONST_FIG8 = 1 #this is the altitude given for the takeoff in figure8.py. I should find a better solution than a symbolic constant ?

def file_guard(self, pdf_path):
Expand Down Expand Up @@ -74,30 +75,43 @@ def read_csv_and_set_arrays(self, ideal_csvfile, rosbag_csvfile):
self.adjust_arrays()
bag_arrays_size = len(self.bag_times)
print("number of datapoints in self.bag_*: ",bag_arrays_size)


#since the rosbag doesn't start at a reliable time, we need to adjust the ideal time array and the real one so that they aren't offset. For this we compare the time where the "takeoff" command was
# given to the crazyflie with the time of takeoff in the desired trajectory. With this we have the time-delay which we need to correct the offset (NB : empirically modified by 0.15 seconds this
#time-delay because that seems to be the delay between receiving the takeoff command and actually flying off)
with open(rosbag_csvfile) as f:
lines = f.readlines()
lastline = lines[-1] #get last line of csv file, where the takeoff time is written as comment
try:
self.takeoff_time = float(lastline[lastline.find(":") + 1 :]) #get the "takeoff" time from last line of csv
except ValueError: #if no takeoff was issued, there will be a "None" in the lastline which will produce value error when converting to float
print("Warning : No takeoff written in the lastline of the rosbag csv file : offset cannot be corrected.")
self.takeoff_time = 0
offset1 = (self.ideal_takeoff - self.takeoff_time) - 0.15


#####calculate ideal trajectory points corresponding to the times of recorded points

self.ideal_traj_x = np.empty([bag_arrays_size])
self.ideal_traj_y = np.empty([bag_arrays_size])
self.ideal_traj_z = np.empty([bag_arrays_size])
self.euclidian_dist = np.empty([bag_arrays_size])


no_match_in_idealcsv=[]

delay = 0
if self.test_name == "fig8":
delay = self.DELAY_CONST_FIG8
elif self.test_name == "mt":
delay = self.DELAY_CONST_MT

delay = offset1
# if self.test_name == "fig8" and self.SIM:
# delay = self.DELAY_CONST_FIG8
# elif self.test_name == "mt" and self.SIM:
# delay = self.DELAY_CONST_MT
for i in range(bag_arrays_size):
try:
pos = self.ideal_traj_csv.eval(self.bag_times[i] - delay).pos
pos = self.ideal_traj_csv.eval(self.bag_times[i] + delay).pos
except AssertionError:
no_match_in_idealcsv.append(i)
pos = [0,0,0] #for all recorded datapoints who cannot be matched to a corresponding ideal position we assume the drone is on its ground start position (ie those datapoints are before takeoff or after landing)


self.ideal_traj_x[i], self.ideal_traj_y[i], self.ideal_traj_z[i]= pos[0], pos[1], pos[2]

Expand Down Expand Up @@ -136,44 +150,33 @@ def no_match_warning(self, unmatched_values:list):


def adjust_arrays(self):
''' Method that trims the self.bag_* attributes to get rid of the datapoints where the drone is immobile on the ground and makes self.bag_times start at 0 [s]'''
''' Method that adjusts the self.bag_* attributes to get rid of the datapoints whose timestamp don't make sense'''

print(f"rosbag initial length {(self.bag_times[-1]-self.bag_times[0]) }s")
#find the takeoff time and landing times
ground_level = self.bag_z[0]
airborne = False
takeoff_index = None
landing_index = None
i=0
for z_coord in self.bag_z:
if not(airborne) and z_coord > ground_level + 0.05: #when altitude of the drone is 5cm above ground level, it started takeoff
takeoff_index = i
airborne = True
print(f"takeoff time is {self.bag_times[takeoff_index]}s")
if airborne and z_coord < ground_level + 0.05: #find when it lands again
landing_index = i
print(f"landing time is {self.bag_times[landing_index]}s")
break
i+=1



assert (takeoff_index != None) and (landing_index != None), "Plotter : couldn't find drone takeoff or landing"

#recurring problem : some messages recorded from /tf arrive way later (2-4 seconds) than when they were emitted, which makes a mess in the timestamps
#zB we have timestamps (in s) : 5.1, 5.2, 5.3, !!!3.7, 3.9, 4.5, 4.9!!!, 5.4, 5.5, 5.6, 5.7 etc. This bug almost always occurs in the first seconds of the recording & generally only happens once per recording
#and only concerns a very small percentage of datapoints (10-40 over about 1800 total), so it is not a big deal. We do not know if this bug stems from the Rosbag recording, from how /tf behaves or from the radio

####get rid of datapoints before takeoff and after landing in bag_times, bag_x, bag_y, bag_y
#Since we use a lineplot, we need to get rid of these datapoints that are in an order that doesn't make sense so that the plot can be readable
self.nonsensical = [] #list of indexes of datapoints who arrived too late, meaning their timestamp doesn't follow the ones before
time = -1
for index,t in enumerate(self.bag_times):
if t > time:
time = t
else:
self.nonsensical.append(index)

assert len(self.bag_times) == len(self.bag_x) == len(self.bag_y) == len(self.bag_z), "Plotter : self.bag_* aren't the same size before trimming"
index_arr = np.arange(len(self.bag_times))
slicing_arr = np.delete(index_arr, index_arr[takeoff_index:landing_index+1]) #in our slicing array we take out all the indexes of when the drone is in flight so that it only contains the indexes of when the drone is on the ground

# #delete the datapoints where drone is on the ground
# self.bag_times = np.delete(self.bag_times, slicing_arr)
# self.bag_x = np.delete(self.bag_x, slicing_arr)
# self.bag_y = np.delete(self.bag_y, slicing_arr)
# self.bag_z = np.delete(self.bag_z, slicing_arr)
if self.nonsensical: #if self.nonsensical is not empty
self.bag_times = np.delete(self.bag_times, self.nonsensical)
self.bag_x = np.delete(self.bag_x, self.nonsensical)
self.bag_y = np.delete(self.bag_y, self.nonsensical)
self.bag_z = np.delete(self.bag_z, self.nonsensical)
print(f"{len(self.nonsensical)} datapoints were ignored because because their timestamp wasn't in the good order (delayed message problem). They go from index {self.nonsensical[0]} to {self.nonsensical[-1]}")

assert len(self.bag_times) == len(self.bag_x) == len(self.bag_y) == len(self.bag_z), "Plotter : self.bag_* aren't the same size after trimming"
assert len(self.bag_times) == len(self.bag_x) == len(self.bag_y) == len(self.bag_z), "Plotter : self.bag_* aren't the same size after adjusting arrays"

print(f"trimmed bag_times starts: {self.bag_times[0]}s and ends: {self.bag_times[-1]}, size: {len(self.bag_times)}")

Expand All @@ -200,15 +203,11 @@ def create_figures(self, ideal_csvfile:str, rosbag_csvfile:str, pdfname:str, ove


self.read_csv_and_set_arrays(ideal_csvfile,rosbag_csvfile)
offset_list = self.find_temporal_offset()
if len(offset_list) == 1:
offset_string = f"temporal offset : {offset_list[0]}s \n"
elif len(offset_list) ==2:
offset_string = f"averaged temporal offset : {(offset_list[0]+offset_list[1])/2}s \n"

passed="failed"
if self.test_passed():
passed = "passed"
test_result="failed"
passed, percentage = self.test_passed()
if passed:
test_result = "passed"

#create PDF to save figures
if(pdfname[-4:] != ".pdf"):
Expand All @@ -228,14 +227,19 @@ def create_figures(self, ideal_csvfile:str, rosbag_csvfile:str, pdfname:str, ove
name = "Unnamed test"
print("Plotter : test name not defined")

text = f'{name} trajectory test'
title = f'{name} trajectory test'
if self.SIM:
title += "(SIMULATION)"
title_text_settings = f'Settings:\n'
title_text_parameters = f'Parameters:\n'
for key, value in self.params.items():
title_text_parameters += f" {key}: {value}\n"
title_text_results = f'Results: test {passed}\n' + offset_string + f'max error : '
title_text_results = f'Results: test {test_result}\n' + f"acceptable deviation EPSILON: {self.EPSILON}[m]\n"
title_text_results += f"percentage of points > EPSILON : {percentage:.4f}%\n" + f"average error : {np.mean(self.euclidian_dist):.6f} [m]\n"
title_text_results += f"median error : {np.median(self.euclidian_dist):.6f} [m]\n" + f"max error : {np.max(self.euclidian_dist):.6f} [m]\n"


title_text = text + "\n" + title_text_settings + "\n" + title_text_parameters + "\n" + title_text_results
title_text = title + "\n" + title_text_settings + "\n" + title_text_parameters + "\n" + title_text_results
fig = plt.figure(figsize=(5,8))
fig.text(0.1, 0.1, title_text, size=11)

Expand All @@ -249,22 +253,20 @@ def create_figures(self, ideal_csvfile:str, rosbag_csvfile:str, pdfname:str, ove
ax.plot(self.bag_times, self.bag_x, label='Recorded trajectory')
ax.set_xlabel('time [s]')
ax.set_ylabel('x position [m]')
ax.set_title("Trajectory x")

ax.set_title("Trajectory x")
ax.grid(which='major', color='#DDDDDD', linewidth=0.8)
ax.grid(which='minor', color='#EEEEEE', linestyle=':', linewidth=0.5)
ax.minorticks_on()
fig.tight_layout(pad = 4)
fig.legend()

fig.legend()
pdf_pages.savefig(fig)

fig2, ax2 = plt.subplots()
ax2.plot(self.bag_times, self.ideal_traj_y, label='Ideal trajectory', linestyle="--", linewidth=1, zorder=10)
ax2.plot(self.bag_times, self.bag_y, label='Recorded trajectory')
ax2.set_xlabel('time [s]')
ax2.set_ylabel('y position [m]')
ax2.set_title("Trajectory y")
ax2.set_title("trajectory y")
ax2.grid(which='major', color='#DDDDDD', linewidth=0.8)
ax2.grid(which='minor', color='#EEEEEE', linestyle=':', linewidth=0.5)
ax2.minorticks_on()
Expand Down Expand Up @@ -324,52 +326,31 @@ def create_figures(self, ideal_csvfile:str, rosbag_csvfile:str, pdfname:str, ove
plt.tight_layout(pad=4)
pdf_pages.savefig(fig6)



pdf_pages.close()

print("Results saved in " + pdfname)

def test_passed(self) -> bool :
'''Returns True if the deviation between recorded and ideal trajectories of the drone didn't exceed EPSILON for more than ALLOWED_DEV_POINTS % of datapoints.
Returns False otherwise '''
def test_passed(self) -> tuple :
'''Returns a tuple containing (bool:passed, float:percentage). If the deviation between recorded and ideal trajectories of the drone didn't exceed
EPSILON for more than ALLOWED_DEV_POINTS % of datapoints, the boolean passed is True. Otherwise it is false. float:percentage is the percentage
of points which whose deviation is less than EPSILON'''

nb_dev_points = len(self.deviation)
threshold = self.ALLOWED_DEV_POINTS * len(self.bag_times)
percentage = (len(self.deviation) / len(self.bag_times)) * 100

if nb_dev_points < threshold:
print(f"Test {self.test_name} passed : {percentage:8.4f}% of datapoints had deviation larger than {self.EPSILON}m ({self.ALLOWED_DEV_POINTS * 100}% max for pass)")
return True
print(f"Test {self.test_name} passed : {percentage:.4f}% of datapoints had deviation larger than {self.EPSILON}m ({self.ALLOWED_DEV_POINTS * 100}% max for pass)")
return (True, percentage)
else:
print(f"Test {self.test_name} failed : The deviation between ideal and recorded trajectories is greater than {self.EPSILON}m for {percentage:8.4f}% of datapoints")
return False
return (False, percentage)

def find_temporal_offset(self) -> list :
''' Returns a list containing the on-graph temporal offset between real and ideal trajectory. If offset is different for x and y axis, returns both in the same list'''
peak_x = self.bag_x.argmax() #find index of extremum value of real trajectory along x axis
peak_time_x = self.bag_times[peak_x] #find corresponding time
peak_x_ideal = self.ideal_traj_x.argmax() #find index of extremum value of ideal traj along x axis
peak_time_x_ideal = self.bag_times[peak_x_ideal] #find corresponding time
offset_x = peak_time_x_ideal - peak_time_x

peak_y = self.bag_y.argmax() #find index of extremum value of real trajectory along y ayis
peak_time_y = self.bag_times[peak_y] #find corresponding time
peak_y_ideal = self.ideal_traj_y.argmax() #find index of extremum value of ideal traj along y ayis
peak_time_y_ideal = self.bag_times[peak_y_ideal] #find corresponding time
offset_y = peak_time_y_ideal - peak_time_y

if offset_x == offset_y:
print(f"On-graph temporal offset is {offset_x}s, delay const is {self.DELAY_CONST_FIG8} so uncorrected/absolute offset is {offset_x-self.DELAY_CONST_FIG8}")
return [offset_x]
else :
print(f"On-graph temporal offsets are {offset_x} & {offset_y} secs, delay const is {self.DELAY_CONST_FIG8}")
return [offset_x, offset_y]


if __name__=="__main__":

#command line utility
# command line utility

from argparse import ArgumentParser, Namespace
parser = ArgumentParser(description="Creates a pdf plotting the recorded trajectory of a drone against its desired trajectory")
Expand All @@ -384,6 +365,4 @@ def find_temporal_offset(self) -> list :
plotter.create_figures(args.desired_trajectory, args.recorded_trajectory, args.pdf, overwrite=args.overwrite)
if args.open:
import subprocess
subprocess.call(["xdg-open", args.pdf])


subprocess.call(["xdg-open", args.pdf])
Loading

0 comments on commit 97ed0a9

Please sign in to comment.