-
Notifications
You must be signed in to change notification settings - Fork 0
/
fitting_polynomials_numeric_2d.py
246 lines (211 loc) · 8.76 KB
/
fitting_polynomials_numeric_2d.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
"""
fitting_polynomials_numeric_2d.py
=================================
Examples of regression of in two dimensions, described in the paper "No
patterns in regression residuals," illustrating underspecified, correctly
specified, and overspecified regression of randomly-generated polynomial
surfaces on a regular 2D grid.
Uses numeric gradient descent-based optimization schemes provided by pytorch
(e.g. batch gradient descent, Adam, L-BFGS) rather than solving for the exact
linear algebra solution.
Saves output from each simulated regression into a uniquely timestamped
subfolder of ./plots/polynomials_numeric_2d/.
"""
import os
import pickle
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import torch
from torch.utils.data import DataLoader, TensorDataset
import fitting_polynomials_2d
import further_analysis_polynomials_2d
from fitting_polynomials_2d import ( # all parameters defined in core fitting module
x0x1_min,
x0x1_max,
nx,
fit_degree_lo,
fit_degree_true,
fit_degree_hi,
fit_degree_vhi,
coeff_signal_to_noise,
noise_sigma,
CLIM,
)
# Parameters
# ==========
# Output folder structure
PLTDIR = os.path.join(".", "plots")
PROJDIR = os.path.join(PLTDIR, "polynomials_2d_numeric")
# Which under/overspecifications to run?
all_specifications = ("lo", "true", "hi", "vhi")
run_specifications = ("lo", "true", "hi", "vhi")
# Use existing timestamp?
tstmp_exists = True
if tstmp_exists:
# change this to use a referenced pre-run regression example output by fitting_polynomials_2d.py
tstmp = "2024-05-10T112411.980432"
tstmp_project_dir = os.path.join(PLTDIR, "polynomials_2d")
tsfolder, tsfile = further_analysis_polynomials_2d.pathfile(tstmp, projdir=tstmp_project_dir)
# Pytorch settings
n_epochs = int(1e6)
iter_stride = 1000
loss_function = torch.nn.MSELoss(reduction="mean")
gradient_descent_optim = {torch.optim.Adam: {"lr": 0.001, "betas": (0.99, 0.999), "eps": 1e-8}}
lbfgs_optim = {
torch.optim.LBFGS: {
"lr": 1,
"max_iter": iter_stride,
"line_search_fn": "strong_wolfe",
"tolerance_grad": 1e-15,
"tolerance_change": 1e-13,
}
}
early_exit_rtol = 1e-13
# use graphics card if available
device = "cuda" if torch.cuda.is_available() else "cpu"
# Main script
# ===========
if __name__ == "__main__":
# Output dict - will be pickled later
output = {}
# Prepare two independent variables on a grid
x0, x1 = fitting_polynomials_2d.square_grid(
min_val=x0x1_min, max_val=x0x1_max, nside=nx, endpoint=True, flatten_order="C")
# Design matrices
design_matrices = {
_spec: fitting_polynomials_2d.chebyshev_design_matrix(x0, x1, degree=_deg)
for _spec, _deg in zip(
all_specifications, (fit_degree_lo, fit_degree_true, fit_degree_hi, fit_degree_vhi))
}
# Build the true / ideal 2D contour, and the noise-added data, and plot
if tstmp_exists:
print(f"Loading fitting_polynomials_2d output data from {tsfile}")
with open(tsfile, "rb") as fin:
tsdata = pickle.load(fin)
ctrue = tsdata["ctrue"]
ztrue = tsdata["ztrue"]
zdata = tsdata["zdata"]
else:
# current timestamp
tstmp = pd.Timestamp.now().isoformat().replace(":", "")
ctrue = np.random.randn(design_matrices["true"].shape[-1]) * coeff_signal_to_noise
ztrue = (np.matmul(design_matrices["true"], ctrue)).reshape((nx, nx), order="C")
zdata = ztrue + noise_sigma * np.random.randn(*ztrue.shape)
outdir = fitting_polynomials_2d.build_output_folder_structure(tstmp, project_dir=PROJDIR)
fitting_polynomials_2d.plot_image(
ztrue, "Ideal model", filename=os.path.join(outdir, "ideal_"+tstmp+".png"), show=True)
fitting_polynomials_2d.plot_image(
zdata, "Data", filename=os.path.join(outdir, "data_"+tstmp+".png"), show=True)
output["ctrue"] = ctrue
output["ztrue"] = ztrue
output["zdata"] = zdata
# Prepare torch tensor and run
zflat = zdata.flatten(order="C")
ztensor = torch.from_numpy(zflat).to(device).view(-1, 1)
for _spec in run_specifications:
_design_matrix = design_matrices[_spec]
_lstsq_coeffs = np.linalg.lstsq(_design_matrix, zflat, rcond=None)[0].T
_lstsq_prediction = _design_matrix.dot(_lstsq_coeffs).reshape((nx, nx), order="C")
_lstsq_loss = ((zdata - _lstsq_prediction)**2).mean()
output[f"lstsq_pred_{_spec}"] = _lstsq_prediction
output[f"lstsq_loss_{_spec}"] = _lstsq_loss
# Initialize pytorch elements
_dataset = TensorDataset(torch.from_numpy(_design_matrix).to(device), ztensor)
_loader = DataLoader(_dataset, batch_size=len(ztensor))
_model = torch.nn.Linear(_design_matrix.shape[-1], 1, bias=False).double()
_model.to(device)
# Gradient descent optimization
_optimizer = [
_f(params=_model.parameters(), **_kw) for _f, _kw in gradient_descent_optim.items()][0]
print(f"Running Gradient Descent with {gradient_descent_optim}")
_model.train()
_loss0 = 0.
_losses = []
for i in range(1 + n_epochs):
for batch, (X, y) in enumerate(_loader):
_optimizer.zero_grad()
_pred = _model(X)
_loss = loss_function(_pred, y)
_losses.append(_loss.item())
# Backpropagation
_loss.backward()
_optimizer.step()
if (i == 0) or ((1 + i) % iter_stride == 0): # report
print(
f"GD/{tstmp}/{_spec}: epoch: {1 + i:d}/{n_epochs:d}, loss: {_loss:>7f}, "
f"dloss: {min(_loss - _loss0, 0):>7e}, ideal: {_lstsq_loss:>7f}"
)
_loss0 = _loss.item()
if np.isclose(_loss0 / _lstsq_loss, 1., atol=0, rtol=early_exit_rtol):
break
output[f"gd_pred_{_spec}"] = _pred.numpy(force=True).reshape((nx, nx), order="C")
output[f"gd_losses_{_spec}"] = _losses
# L-BFGS optimimization
_optimizer = [_f(params=_model.parameters(), **_kw) for _f, _kw in lbfgs_optim.items()][0]
print(f"Running L-BFGS with {lbfgs_optim}")
_loss0 = 0.
_losses = []
for j in range(n_epochs // iter_stride):
for batch, (X, y) in enumerate(_loader):
def closure(_loss0=_loss0):
_optimizer.zero_grad()
_pred = _model(X)
_loss = loss_function(_pred, y)
_losses.append(_loss.item())
_loss.backward()
return _loss
_optimizer.step(closure)
# report
_pred = _model(X)
_loss = loss_function(_pred, y)
print(
f"L-BFGS/{tstmp}/{_spec}: epoch: {(1 + j) * iter_stride:d}/{n_epochs:d}, "
f"loss: {_loss:>7f}, dloss: {min(_loss - _loss0, 0):>7e}, "
f"ideal: {_lstsq_loss:>7f}"
)
_loss0 = _loss.item()
print(_loss0 / _lstsq_loss - 1.)
if np.isclose(_loss0 / _lstsq_loss, 1., atol=0, rtol=early_exit_rtol):
break
output[f"lbfgs_pred_{_spec}"] = _pred.numpy(force=True).reshape((nx, nx), order="C")
output[f"lbfgs_losses_{_spec}"] = _losses
# Calculate and plot residuals
for _optim in ("gd", "lbfgs"):
rlo = zdata - output[f"{_optim}_pred_lo"]
fitting_polynomials_2d.plot_image(
rlo,
"Low degree polynomial residuals",
filename=os.path.join(outdir, "lo_"+tstmp+".png"),
clim=CLIM,
)
rtrue = zdata - output[f"{_optim}_pred_true"]
fitting_polynomials_2d.plot_image(
rtrue,
"Matching degree polynomial residuals",
filename=os.path.join(outdir, "matching_"+tstmp+".png"),
clim=CLIM,
)
rhi = zdata - output[f"{_optim}_pred_hi"]
fitting_polynomials_2d.plot_image(
rhi,
"High degree polynomial residuals",
filename=os.path.join(outdir, "hi_"+tstmp+".png"),
clim=CLIM,
)
rvhi = zdata - output[f"{_optim}_pred_vhi"]
fitting_polynomials_2d.plot_image(
rvhi,
"Very high degree polynomial residuals",
filename=os.path.join(outdir, "vhi_"+tstmp+".png"),
clim=CLIM,
)
output[f"{_optim}_r_lo"] = rlo
output[f"{_optim}_r_true"] = rtrue
output[f"{_optim}_r_hi"] = rhi
output[f"{_optim}_r_vhi"] = rvhi
# Save output for further analysis
outfile = os.path.join(outdir, f"output_{tstmp}_n{n_epochs}.pickle")
print("Saving to "+outfile)
with open(outfile, "wb") as fout:
pickle.dump(output, fout)