-
Notifications
You must be signed in to change notification settings - Fork 12
/
Copy pathIGC2CSV.py
281 lines (238 loc) · 10.1 KB
/
IGC2CSV.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
import sys
import os
import datetime
from math import radians, cos, sin, asin, sqrt
# Reads the IGC file and returns a flight dictionary
def parse_igc(flight):
flight['fixrecords'] = []
flight['optional_records'] = {}
file = open(flight['igcfile'], 'r')
for line in file:
line = line.rstrip()
linetype = line[0]
recordtypes[linetype](line, flight)
file.close()
return flight
# Adds a bunch of calculated fields to a flight dictionary
def crunch_flight(flight):
for index, record in enumerate(flight['fixrecords']):
#thisdatetime = datetime.datetime.strptime(record['timestamp'], '')
record['latdegrees'] = lat_to_degrees(record['latitude'])
record['londegrees'] = lon_to_degrees(record['longitude'])
record['time'] = datetime.time(int(record['timestamp'][0:2]), int(record['timestamp'][2:4]), int(record['timestamp'][4:6]), 0, )
if index > 0:
prevrecord = flight['fixrecords'][index-1]
# Because we only know the date of the FIRST B record, we have to do some shaky logic to determine when we cross the midnight barrier
# There's a theoretical edge case here where two B records are separated by more than 24 hours causing the date to be incorrect
# But that's a problem with the IGC spec and we can't do much about it
if(record['time'] < prevrecord['time']):
# We crossed the midnight barrier, so increment the date
record['date'] = prevrecord['date'] + datetime.timedelta(days=1)
else:
record['date'] = prevrecord['date']
record['datetime'] = datetime.datetime.combine(record['date'], record['time'])
record['time_delta'] = (record['datetime'] - prevrecord['datetime']).total_seconds()
record['running_time'] = (record['datetime'] - flight['datetime_start']).total_seconds()
record['distance_delta'] = haversine(record['londegrees'], record['latdegrees'], prevrecord['londegrees'], prevrecord['latdegrees'])
flight['distance_total'] += record['distance_delta']
record['distance_total'] = flight['distance_total']
record['distance_from_start'] = straight_line_distance(record['londegrees'], record['latdegrees'], record['alt-GPS'], flight['fixrecords'][0]['londegrees'], flight['fixrecords'][0]['latdegrees'], flight['fixrecords'][0]['alt-GPS'])
record['groundspeed'] = record['distance_delta'] / record['time_delta'] * 3600
flight['groundspeed_peak'] = max(record['groundspeed'], flight['groundspeed_peak'])
record['groundspeed_peak'] = flight['groundspeed_peak']
record['alt_gps_delta'] = record['alt-GPS'] - prevrecord['alt-GPS']
record['alt_pressure_delta'] = record['pressure'] - prevrecord['pressure']
record['climb_speed'] = record['alt_gps_delta'] / record['time_delta']
flight['climb_total'] += max(0, record['alt_gps_delta'])
record['climb_total'] = flight['climb_total']
flight['alt_peak'] = max(record['alt-GPS'], flight['alt_peak'])
flight['alt_floor'] = min(record['alt-GPS'], flight['alt_floor'])
if "TAS" in flight['optional_records']:
flight['tas_peak'] = max(record['opt_tas'], flight['tas_peak'])
record['tas_peak'] = flight['tas_peak']
else:
flight['time_start'] = record['time']
flight['datetime_start'] = datetime.datetime.combine(flight['flightdate'], flight['time_start'])
flight['altitude_start'] = record['alt-GPS']
flight['distance_total'] = 0
flight['climb_total'] = 0
flight['alt_peak'] = record['alt-GPS']
flight['alt_floor'] = record['alt-GPS']
flight['groundspeed_peak'] = 0
record['date'] = flight['flightdate']
record['datetime'] = datetime.datetime.combine(record['date'], record['time'])
record['running_time'] = 0
record['time_delta'] = 0
record['distance_delta'] = 0
record['distance_total'] = 0
record['groundspeed'] = 0
record['groundspeed_peak'] = 0
record['alt_gps_delta'] = 0
record['alt_pressure_delta'] = 0
record['climb_speed'] = 0
record['climb_total'] = 0
record['distance_from_start'] = 0
if "TAS" in flight['optional_records']:
flight['tas_peak'] = record['opt_tas']
record['tas_peak'] = 0
return flight
def logline_A(line, flight):
flight['manufacturer'] = line[1:]
return
# H Records are headers that give one-time information
# http://carrier.csi.cam.ac.uk/forsterlewis/soaring/igc_file_format/igc_format_2008.html#link_3.3
def logline_H(line, flight):
try:
headertypes[line[1:5]](line[5:], flight)
except KeyError:
print "Header (not implemented): {}".format(line[1:])
return
# Flight date header. This is the date that the FIRST B record was made on
# Date format: DDMMYY
# (did we learn nothing from Y2K?)
def logline_H_FDTE(line, flight):
flight['flightdate'] = datetime.date(int(line[4:6])+2000, int(line[2:4]), int(line[0:2]))
print "Flight date: {}".format(flight['flightdate'])
def logline_I(line, flight):
num = int(line[1:3])
for i in xrange(num):
field = line[3+7*i:10+7*i]
flight['optional_records'][field[4:7]] = (int(field[0:2])-1, int(field[2:4]))
def logline_B(line, flight):
flight['fixrecords'].append({
'timestamp' : line[1:7],
'latitude' : line[7:15],
'longitude' : line[15:24],
'AVflag' : line[24:25] == "A",
'pressure' : int(line[25:30]),
'alt-GPS' : int(line[30:35]),
})
for key, record in flight['optional_records'].iteritems():
flight['fixrecords'][-1]['opt_' + key.lower()] = line[record[0]:record[1]]
return
def logline_NotImplemented(line, flight):
print "Record Type {} not implemented: {}".format(line[0:1], line[1:])
return
recordtypes = {
'A' : logline_A,
'B' : logline_B,
'C' : logline_NotImplemented,
'D' : logline_NotImplemented,
'E' : logline_NotImplemented,
'F' : logline_NotImplemented,
'G' : logline_NotImplemented,
'H' : logline_H,
'I' : logline_I,
'J' : logline_NotImplemented,
'K' : logline_NotImplemented,
'L' : logline_NotImplemented,
}
headertypes = {
'FDTE' : logline_H_FDTE,
}
# IGC files store latitude as DDMMmmmN
def lat_to_degrees(lat):
direction = {'N':1, 'S':-1}
degrees = int(lat[0:2])
minutes = int(lat[2:7])
minutes /= 1000.
directionmod = direction[lat[7]]
return (degrees + minutes/60.) * directionmod
# IGC files store longitude as DDDMMmmmN
def lon_to_degrees(lon):
direction = {'E': 1, 'W':-1}
degrees = int(lon[0:3])
minutes = int(lon[3:8])
minutes /= 1000.
directionmod = direction[lon[8]]
return (degrees + minutes/60.) * directionmod
# haversine calculates the distance between two pairs of latitude/longitude
def haversine(lon1, lat1, lon2, lat2):
"""
Calculate the great circle distance between two points
on the earth (specified in decimal degrees)
"""
# convert decimal degrees to radians
lon1, lat1, lon2, lat2 = map(radians, [lon1, lat1, lon2, lat2])
# haversine formula
dlon = lon2 - lon1
dlat = lat2 - lat1
a = sin(dlat/2)**2 + cos(lat1) * cos(lat2) * sin(dlon/2)**2
c = 2 * asin(sqrt(a))
km = 6367 * c
return km
# Calculates the distance between two sets of latitude, longitude, and altitude, as a straight line
def straight_line_distance(lon1, lat1, alt1, lon2, lat2, alt2):
a = haversine(lon1, lat1, lon2, lat2)
b = (alt1 - alt2) / 1000. #altitude is in meters, but we're working in km here
c = sqrt(a**2. + b**2.)
return c
def get_output_filename(inputfilename):
head, tail = os.path.split(inputfilename)
filename, ext = os.path.splitext(tail)
outputfilename = filename + '.csv'
return outputfilename
if __name__ == "__main__":
print "Number of arguments: {}".format(len(sys.argv))
print "Argument List: {}".format(str(sys.argv))
defaultoutputfields = [
('Datetime (UTC)', 'record', 'datetime'),
('Elapsed Time', 'record', 'running_time'),
('Latitude (Degrees)', 'record', 'latdegrees'),
('Longitude (Degrees)', 'record', 'londegrees'),
('Altitude GPS', 'record', 'alt-GPS'),
('Distance Delta', 'record', 'distance_delta'),
('Distance Total', 'record', 'distance_total'),
('Groundspeed', 'record', 'groundspeed'),
('Groundspeed Peak', 'record', 'groundspeed_peak'),
('Altitude Delta (GPS)', 'record', 'alt_gps_delta'),
('Altitude Delta (Pressure)', 'record', 'alt_pressure_delta'),
('Climb Speed', 'record', 'climb_speed'),
('Climb Total', 'record', 'climb_total'),
('Max Altitude (flight)', 'flight', 'alt_peak'),
('Min Altitude (flight)', 'flight', 'alt_floor'),
('Distance From Start (straight line)', 'record', 'distance_from_start')
]
logbook = []
fileparam = sys.argv[1]
if os.path.isfile(fileparam):
logbook.append({'igcfile': os.path.abspath(fileparam)})
print "Single IGC file supplied: {}".format(logbook[-1]['igcfile'])
elif os.path.isdir(fileparam):
for filename in os.listdir(fileparam):
fileabs = os.path.join(fileparam, filename)
if not os.path.isfile(fileabs):
continue
root, ext = os.path.splitext(fileabs)
if ext.lower() == '.igc'.lower():
logbook.append({'igcfile': os.path.abspath(fileabs)})
else:
print 'Must indicate a file or directory to process'
exit()
print "{} flights ready to process...".format(len(logbook))
# Parse all files
for flight in logbook:
flight = parse_igc(flight)
# Crunch the telemetry numbers on all of the flights
for flight in logbook:
flight = crunch_flight(flight)
# Output the CSV file for all flights
for flight in logbook:
flight['outputfilename'] = get_output_filename(flight['igcfile'])
output = open(flight['outputfilename'], 'w')
outputfields = list(defaultoutputfields)
if 'TAS' in flight['optional_records']:
outputfields.append( ('True Airspeed', 'record', 'opt_tas') )
outputfields.append( ('True Airspeed Peak', 'record', 'tas_peak') )
header = ''
for field in outputfields:
header += field[0] + ','
output.write(header[:-1] + '\n')
for record in flight['fixrecords']:
recordline = ''
for field in outputfields:
if field[1] == 'record':
recordline += str(record[field[2]]) + ','
elif field[1] == 'flight':
recordline += str(flight[field[2]]) + ','
output.write(recordline[:-1] + '\n')