-
Notifications
You must be signed in to change notification settings - Fork 1
/
coupling.py
104 lines (88 loc) · 4.86 KB
/
coupling.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
import torch
import torch.nn as nn
import numpy as np
from net import NN
# device
if torch.cuda.is_available():
device = "cuda"
else:
device = "cpu"
class CouplingLayer(nn.Module):
def __init__(self, channels, coupling, device, nn_init_last_zeros=False):
super(CouplingLayer, self).__init__()
self.coupling = coupling
self.channels = channels
if self.coupling == "affine":
self.net = NN(channels_in=self.channels//2, channels_out=self.channels,
device=device, init_last_zeros=nn_init_last_zeros)
elif self.coupling == "additive":
self.net = NN(channels_in=self.channels//2, channels_out=self.channels//2,
device=device, init_last_zeros=nn_init_last_zeros)
else:
raise "only affine and additive coupling is implemented"
self.to(device)
def forward(self, x, logdet=None, reverse=False):
n,c,h,w = x.size()
if not reverse:
# affine coupling layer
if self.coupling == "affine":
xa, xb = self.split(x, mode="split-by-chunk")
s_and_t = self.net(xb)
s, t = self.split(s_and_t, mode="split-by-alternating")
s = torch.sigmoid(s + 2.)
# t = torch.tanh(t)
ya = s * xa + t
# ya = torch.exp(torch.log(s+1e-6)) * xa + t
y = torch.cat([ya,xb],dim=1)
logdet = logdet + torch.log(s).view(n,-1).sum(-1)
assert not np.isnan(y.mean().item()), "nan in coupling forward: s=%0.4f, x=%0.4f, xa=%0.4f, ya=%0.4f, t=%0.3f"%(s.mean().item(),x.mean().item(), xa.mean().item(), ya.mean().item(), t.mean().item())
assert not np.isinf(y.mean().item()), "inf in coupling forward: s=%0.4f, x=%0.4f, xa=%0.4f, ya=%0.4f, t=%0.3f"%(s.mean().item(),x.mean().item(), xa.mean().item(), ya.mean().item(), t.mean().item())
return y, logdet
# additive coupling layer
if self.coupling == "additive":
xa, xb = self.split(x, mode="split-by-chunk")
t = self.net(xb)
ya = xa + t
y = torch.cat([ya,xb],dim=1)
assert not np.isnan(y.mean().item()), "nan in coupling forward: s=%0.4f, x=%0.4f, xa=%0.4f, ya=%0.4f, t=%0.3f"%(s.mean().item(),x.mean().item(), xa.mean().item(), ya.mean().item(), t.mean().item())
assert not np.isinf(y.mean().item()), "inf in coupling forward: s=%0.4f, x=%0.4f, xa=%0.4f, ya=%0.4f, t=%0.3f"%(s.mean().item(),x.mean().item(), xa.mean().item(), ya.mean().item(), t.mean().item())
return y, logdet
if reverse:
# affine coupling layer
if self.coupling == "affine":
xa, xb = self.split(x, mode="split-by-chunk")
s_and_t = self.net(xb)
s, t = self.split(s_and_t, mode="split-by-alternating")
s = torch.sigmoid(s + 2.)
# t = torch.tanh(t)
ya = (xa - t) / s
# ya = (xa - t) * torch.exp(-torch.log(s+1e-6))
y = torch.cat([ya,xb],dim=1)
assert not np.isnan(y.mean().item()), "nan in coupling reverse: s=%0.4f, x=%0.4f, xa=%0.4f, ya=%0.4f, t=%0.3f"%(s.mean().item(),x.mean().item(), xa.mean().item(), ya.mean().item(), t.mean().item())
assert not np.isinf(y.mean().item()), "inf in coupling reverse: s=%0.4f, x=%0.4f, xa=%0.4f, ya=%0.4f, t=%0.3f"%(s.mean().item(),x.mean().item(), xa.mean().item(), ya.mean().item(), t.mean().item())
return y
# additive coupling layer
if self.coupling == "additive":
xa, xb = self.split(x, mode="split-by-chunk")
t = self.net(xb)
ya = (xa - t)
y = torch.cat([ya,xb],dim=1)
return y
def split(self, x, mode):
if mode == "split-by-chunk":
xa = x[:,:self.channels//2,:,:]
xb = x[:,self.channels//2:,:,:]
return xa, xb
if mode == "split-by-alternating":
xa = x[:,0::2,:,:].contiguous()
xb = x[:,1::2,:,:].contiguous()
return xa, xb
if __name__ == "__main__":
size = (16,64,32,32)
coupling = CouplingLayer(channels=64,coupling="affine",device=device,nn_init_last_zeros=False)
x = torch.tensor(np.random.normal(5,10,size),dtype=torch.float,device=device)
logdet = torch.tensor(0, dtype=torch.float, device=device, requires_grad=True)
y, logdet = coupling(x, logdet=logdet, reverse=False)
x_rev = coupling(y, reverse=True)
loss_rev = torch.norm(x_rev - x)
print(loss_rev.item())