-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathtest.py
148 lines (130 loc) · 5.26 KB
/
test.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
#!/usr/bin/env python
# ------------------------------------------------------------------------------------------------------%
# Created by "Thieu" at 13:45, 28/06/2021 %
# %
# Email: nguyenthieu2102@gmail.com %
# Homepage: https://www.researchgate.net/profile/Nguyen_Thieu2 %
# Github: https://github.com/thieu1995 %
# ------------------------------------------------------------------------------------------------------%
from pandas import DataFrame, Series, concat, read_csv
from sklearn.metrics import mean_squared_error
from sklearn.preprocessing import MinMaxScaler
from keras.models import Sequential
from keras.layers import Dense
from math import sqrt
import matplotlib
# be able to save images on server
matplotlib.use('Agg')
from matplotlib import pyplot
import numpy
# frame a sequence as a supervised learning problem
def timeseries_to_supervised(data, lag=1):
df = DataFrame(data)
columns = [df.shift(i) for i in range(1, lag + 1)]
columns.append(df)
df = concat(columns, axis=1)
return df
# create a differenced series
def difference(dataset, interval=1):
diff = list()
for i in range(interval, len(dataset)):
value = dataset[i] - dataset[i - interval]
diff.append(value)
return Series(diff)
# invert differenced value
def inverse_difference(history, yhat, interval=1):
return yhat + history[-interval]
# scale train and test data to [-1, 1]
def scale(train, test):
# fit scaler
scaler = MinMaxScaler(feature_range=(-1, 1))
scaler = scaler.fit(train)
# transform train
train = train.reshape(train.shape[0], train.shape[1])
train_scaled = scaler.transform(train)
# transform test
test = test.reshape(test.shape[0], test.shape[1])
test_scaled = scaler.transform(test)
return scaler, train_scaled, test_scaled
# inverse scaling for a forecasted value
def invert_scale(scaler, X, yhat):
new_row = [x for x in X] + [yhat]
array = numpy.array(new_row)
array = array.reshape(1, len(array))
inverted = scaler.inverse_transform(array)
return inverted[0, -1]
# fit an MLP network to training data
def fit_model(train, batch_size, nb_epoch, neurons):
X, y = train[:, 0:-1], train[:, -1]
model = Sequential()
model.add(Dense(neurons, activation='relu', input_dim=X.shape[1]))
model.add(Dense(1))
model.compile(loss='mean_squared_error', optimizer='adam')
model.fit(X, y, epochs=nb_epoch, batch_size=batch_size, verbose=2, shuffle=False)
return model
def draw_predict(y_test=None, y_pred=None, pathfile=None):
pyplot.plot(y_test)
pyplot.plot(y_pred)
pyplot.ylabel('CPU')
pyplot.xlabel('Timestamp')
pyplot.legend(['Actual', 'Predict'], loc='upper right')
pyplot.savefig(pathfile + ".png")
pyplot.close()
return None
# run a repeated experiment
def experiment(repeats, series, epochs, lag, neurons, test_size, batch_size):
# transform data to be stationary
raw_values = series.values
diff_values = difference(raw_values, 1)
# transform data to be supervised learning
supervised = timeseries_to_supervised(diff_values, lag)
supervised_values = supervised.values[lag:, :]
# split data into train and test-sets
train, test = supervised_values[0:-test_size], supervised_values[-test_size:]
# transform the scale of the data
scaler, train_scaled, test_scaled = scale(train, test)
# run experiment
error_scores = list()
for r in range(repeats):
# fit the model
train_trimmed = train_scaled[2:, :]
model = fit_model(train_trimmed, batch_size, epochs, neurons)
# forecast test dataset
test_reshaped = test_scaled[:, 0:-1]
output = model.predict(test_reshaped, batch_size=batch_size)
predictions = list()
for i in range(len(output)):
yhat = output[i, 0]
X = test_scaled[i, 0:-1]
# invert scaling
yhat = invert_scale(scaler, X, yhat)
# invert differencing
yhat = inverse_difference(raw_values, yhat, len(test_scaled) + 1 - i)
# store forecast
predictions.append(yhat)
# report performance
rmse = sqrt(mean_squared_error(raw_values[-test_size:], predictions))
draw_predict(raw_values[-test_size:][:100], predictions[:100], f"mlp2-{dataname}-{lag}-{neurons}-{test_size}-{batch_size}-{epochs}")
print('%d) Test RMSE: %.3f' % (r + 1, rmse))
error_scores.append(rmse)
return error_scores
dataname = "it_uk_fm_5m_noise_fm2_noise2"
# load dataset
series = read_csv(f'./data/formatted/{dataname}.csv', usecols=[0])
dataset = series.values
# experiment
repeats = 1
results = DataFrame()
lags = [12, 24, 36, 48]
neurons = 6
test_size = 1000
batch_size = 256
# vary training epochs
epoch = 1000
for lag in lags:
results[str(lag)] = experiment(repeats, series, epoch, lag, neurons, test_size, batch_size)
# summarize results
print(results.describe())
# save boxplot
results.boxplot()
pyplot.savefig('boxplot_epochs.png')