-
Notifications
You must be signed in to change notification settings - Fork 3
/
data_join.py
184 lines (162 loc) · 7.58 KB
/
data_join.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
# Copyright 2018 Toyota Research Institute. All rights reserved.
import pypyodbc
import logging
import pandas
import numpy as np
import sql_functions
from typing import Tuple, List, Any
class ArbinTime:
"""
This class converts between the arbin timestamp values (ints)
from the result databases and the seconds since epoch.
Each timestamp value is
epoch_time * 10000000
(100 microseconds is the last digit)
"""
def __init__(self) -> None:
self.conversion_to_arbin_timestamp = 10000000
def query(self, time: float) -> int:
query_time = int(time * self.conversion_to_arbin_timestamp)
return query_time
def to_epoch(self, timestamp: int) -> float:
epoch_time = timestamp / self.conversion_to_arbin_timestamp
return epoch_time
def fill_times(row):
"""
For each step the date time value is set to be the date time
value of the first preceding row that has a step index
This function then subtracts the date time for each row
and returns that value, which should be the step time
"""
return row['DateTime'] - row['Step_Time']
def pull_and_join(cfg: Any, test_id: int, channel: int, starts: List,
stops: List,
dbs: List) -> Tuple[pandas.DataFrame, float, float]:
"""
This is the primary data manipulation function. It calls the sql query
functions and joins the returned data frames. Based on the step and cycle
date time entries it fills in step time and test time. It also fills
in values in columns that do not have a value for that time stamp.
"""
db_frames = []
listed_windows = list(zip(starts, stops, dbs))
arbin_time = ArbinTime()
for window_index, window in enumerate(listed_windows):
start = window[0]
stop = window[1]
db_offset = 0
set_test_start_flag = True
for db_index, db in enumerate(window[2].split(',')[:-1]):
logging.info('Getting data from:' + db)
for i in range(cfg.ATTEMPTS):
try:
connection, cursor = sql_functions.db_connect(cfg, db)
steps_frame = sql_functions.find_steps(
connection, channel, arbin_time.query(start),
arbin_time.query(stop))
raw_frame = sql_functions.find_raw_data(
connection, channel, arbin_time.query(start),
arbin_time.query(stop))
aux_frame = sql_functions.find_auxiliary_data(
connection, channel, arbin_time.query(start),
arbin_time.query(stop))
except pypyodbc.OperationalError:
logging.warning('Database read error')
continue
except UnboundLocalError:
logging.warning('Unknown database read error')
continue
else:
connection.close()
break
logging.info('Done getting info from: ' + db)
if raw_frame.empty or steps_frame.empty:
db_offset = db_offset + 1 # to deal with empty data frame and set start time correctly
continue
if not aux_frame.empty:
aux_frame = aux_interpolate(raw_frame.index, aux_frame)
else:
blank_data = {
'date_time': pandas.Series(raw_frame.index[0], index=[0]),
'Temperature': pandas.Series(np.NaN, index=[0]),
'Aux_Voltage': pandas.Series(np.NaN, index=[0])
}
aux_frame = pandas.DataFrame(blank_data)
set_frame = pandas.concat(
[raw_frame, steps_frame, aux_frame], axis=1, join='outer')
set_frame.reset_index(inplace=True)
# if db_index == 0:
# start_time = steps_frame.index[0]
if db_index == (0 + db_offset) and window_index == 0 and set_test_start_flag:
start_time = steps_frame.index[0]
set_test_start_flag = False
set_frame['Test_Time'] = arbin_time.to_epoch(
set_frame.date_time - start_time)
set_frame['Step_Time'] = arbin_time.to_epoch(set_frame.date_time)
set_frame.date_time = arbin_time.to_epoch(set_frame.date_time)
set_frame.loc[set_frame.Step_Index.isnull(), 'Step_Time'] = np.NaN
set_frame['AC_Impedance'] = 0
set_frame['Is_FC_Data'] = 0
set_frame['ACI_Phase_Angle'] = 0
set_frame.rename(columns={'date_time': 'DateTime'}, inplace=True)
cols = [
'Test_Time', 'DateTime', 'Step_Time', 'Step_Index',
'Cycle_Index', 'Current', 'Voltage', 'Charge_Capacity',
'Discharge_Capacity', 'Charge_Energy', 'Discharge_Energy',
'dV/dt', 'Internal_Resistance', 'Temperature', 'Aux_Voltage'
]
db_frames.append(set_frame[cols])
query_last_time = max(stops)
if not db_frames:
logging.warning(
'No data for test id:' + str(test_id) + ' channel:' + str(channel))
return pandas.DataFrame(
columns=['DateTime', 'Cycle_Index']), query_last_time, 0
full_test_frame = pandas.concat(db_frames, ignore_index=True)
full_test_frame.fillna(method='ffill', inplace=True)
full_test_frame = full_test_frame[np.isfinite(
full_test_frame['Step_Index'])]
full_test_frame.Step_Time = full_test_frame.apply(fill_times, axis=1)
full_test_frame = full_test_frame[full_test_frame.Step_Time != 0]
# delete the rows that were inserted by steps frame
full_test_frame.reset_index(drop=True, inplace=True)
full_test_frame.index.name = 'Data_Point'
full_test_frame = full_test_frame.round({'Step_Time': 4})
full_test_frame['Step_Index'] = full_test_frame.Step_Index.astype('int')
full_test_frame['Cycle_Index'] = full_test_frame.Cycle_Index.astype('int')
return full_test_frame, query_last_time, len(full_test_frame.index)
def pull_meta_data(cfg: Any, test_name_channel_test_id: int,
test_name_channel_chan_id: int) -> pandas.DataFrame:
for i in range(cfg.ATTEMPTS):
try:
connection, c = sql_functions.db_connect(cfg, "ArbinMasterData")
meta_data_frame = sql_functions.find_meta_data(
connection, test_name_channel_test_id,
test_name_channel_chan_id)
except pymssql.OperationalError:
logging.warning('Database read error')
continue
except UnboundLocalError:
logging.warning('Unknown database read error')
continue
else:
connection.close()
break
return meta_data_frame
def aux_interpolate(date_time: pandas.Series,
aux_frame: pandas.DataFrame) -> pandas.DataFrame:
"""
Re-sample the aux data values to the same time indices
as the main data file and return so that the outer join to the rest
of the data and fill of NaN values does not produce duplicate values
"""
interp_temp = np.interp(date_time, aux_frame.index, aux_frame.Temperature)
interp_aux_volt = np.interp(date_time, aux_frame.index,
aux_frame.Aux_Voltage)
interp_temp = pandas.DataFrame(
data=interp_temp, index=date_time, columns=['Temperature'])
interp_aux_volt = pandas.DataFrame(
data=interp_aux_volt, index=date_time, columns=['Aux_Voltage'])
interpolated_aux = pandas.concat(
[interp_temp, interp_aux_volt], axis=1, join='inner')
return interpolated_aux