Advanced Computing Platform for Theoretical Physics

Commit 07593cf1 authored by Chen-Xiao Dong 's avatar Chen-Xiao Dong
Browse files

new layer add


Signed-off-by: Chen-Xiao Dong 's avatarchenxiaodong <cxdong@iphy.ac.cn>
parent 85c2cec3
import math
import torch
import torch.nn as nn
from torch.autograd import Variable
from torch.autograd import Function
from torch.autograd import gradcheck
import numpy as np
def normalf(x):
y=x
while ((y>1)|(y<-1)):
if y>1:
y=y-2
else:
y=y+2
return y
class Normalayer(Function):
@staticmethod
def forward(ctx,x):
y=x
z=y.detach().numpy()
z=np.array(list(map(lambda x:list(map(normalf,x)),z)))
return torch.Tensor.float(torch.from_numpy(z))
@staticmethod
def backward(ctx,grad_output):
return grad_output
class Perlayer(nn.Module):
def __init__(self):
super(Perlayer,self).__init__()
self.logjac=0
def forward(self,x):
y=Normalayer.apply(x)
return y
def inverse(self,y):
x=Normalayer.apply(y)
return x
class SLinearlayer(nn.Module):
def __init__(self,dim):
super(SLinearlayer,self).__init__()
self.disp=nn.Parameter(torch.FloatTensor(dim),requires_grad=True)
self.disp.data.uniform_(0, 0.3)
self.logjac=0
def forward(self,x):
y=self.disp+x
y=Normalayer.apply(y)
return y
def inverse(self,y):
x=y-self.disp
x=Normalayer.apply(x)
return x
class SNonLinearlayer(nn.Module):
def __init__(self):
super(SNonLinearlayer,self).__init__()
def forward(self,x):
self.logjac=x.new_zeros(x.shape[0])
y= torch.where(x >= 0, (torch.exp(x)-1)/(math.exp(1)-1), -(torch.exp(-x)-1)/(math.exp(1)-1))
self.logjac=torch.where(x >= 0,x-math.log(math.exp(1)-1),-x-math.log(math.exp(1)-1)).sum(1)
return y
def inverse(self,y):
self.logjac=y.new_zeros(y.shape[0])
x=torch.where(y >= 0,torch.log(y*(math.exp(1)-1)+1),-torch.log(-y*(math.exp(1)-1)+1))
self.logjac=torch.where(y >= 0,-torch.log(y+1/(math.exp(1)-1)),-torch.log(-y+1/(math.exp(1)-1))).sum(1)
return x
class SNonLinearlayerplus(nn.Module):
def __init__(self,dim):
super(SNonLinearlayerplus,self).__init__()
self.k=nn.Parameter(torch.FloatTensor(dim),requires_grad=True)
self.k.data.uniform_(-1, 0.2)
self.logjac=0
def forward(self,x):
self.logjac=x.new_zeros(x.shape[0])
m=torch.exp(self.k)
y= torch.where(x >= 0, (torch.exp(m*x)-1)/(torch.exp(m)-1), -(torch.exp(-m*x)-1)/(torch.exp(m)-1))
self.logjac=torch.where(x >= 0,torch.log(m)+m*x-torch.log(torch.exp(m)-1),torch.log(m)-m*x-torch.log(torch.exp(m)-1)).sum(1)
return y
def inverse(self,y):
self.logjac=y.new_zeros(y.shape[0])
m=torch.exp(self.k)
x=torch.where(y >= 0,torch.log(y*(torch.exp(m)-1)+1)/m,-torch.log(-y*(torch.exp(m)-1)+1)/m)
self.logjac=torch.where(y >= 0,-torch.log(m)-torch.log(y+1/(torch.exp(m)-1)),-torch.log(m)-torch.log(-y+1/(torch.exp(m)-1))).sum(1)
return x
class NewSNonLinearlayerplus(nn.Module):
def __init__(self,dim):
super(NewSNonLinearlayerplus,self).__init__()
self.k1=nn.Parameter(torch.FloatTensor(dim),requires_grad=True)
self.k2=nn.Parameter(torch.FloatTensor(dim),requires_grad=True)
self.k1.data.uniform_(0, 0.5)
self.k2.data.uniform_(0, 0.5)
self.logjac=0
def forward(self,x):
self.logjac=x.new_zeros(x.shape[0])
m1=torch.exp(self.k1)
m2=torch.exp(self.k2)
y1=torch.where(x >= 0.5,0.5+0.5*(torch.exp(m1*(x-0.5)*2)-1)/(torch.exp(m1)-1),0.5-0.5*(torch.exp(-m1*(x-0.5)*2)-1)/(torch.exp(m1)-1))
y2=torch.where(x >= -0.5, -0.5+0.5*(torch.exp(m2*(x+0.5)*2)-1)/(torch.exp(m2)-1), -0.5-0.5*(torch.exp(-m2*(x+0.5)*2)-1)/(torch.exp(m2)-1))
y= torch.where(x >= 0, y1, y2)
lj1=torch.where(x >= 0.5,torch.log(m1)+m1*(x-0.5)*2-torch.log(torch.exp(m1)-1),torch.log(m1)-m1*(x-0.5)*2-torch.log(torch.exp(m1)-1))
lj2=torch.where(x >= -0.5,torch.log(m2)+m2*(x+0.5)*2-torch.log(torch.exp(m2)-1),torch.log(m2)-m2*(x+0.5)*2-torch.log(torch.exp(m2)-1))
self.logjac=torch.where(x >= 0,lj1,lj2).sum(1)
return y
def inverse(self,y):
self.logjac=y.new_zeros(y.shape[0])
m1=torch.exp(self.k1)
m2=torch.exp(self.k2)
x1=torch.where(x >= 0.5,0.5+torch.log((2*y-1)*(torch.exp(m1)-1)+1)/m/2,0.5-torch.log((-2*y+1)*(torch.exp(m1)-1)+1)/m/2)
x2=torch.where(x >= -0.5,-0.5+torch.log((2*y+1)*(torch.exp(m2)-1)+1)/m/2,-0.5-torch.log((-2*y-1)*(torch.exp(m2)-1)+1)/m/2)
x=torch.where(y >= 0,x1,x2)
lj1=torch.where(x >= 0.5,-torch.log(m1)-torch.log(2*y-1+1/(torch.exp(m1)-1)),-torch.log(m1)-torch.log(-2*y+1+1/(torch.exp(m1)-1)))
lj2=torch.where(x >= -0.5,-torch.log(m2)-torch.log(2*y+1+1/(torch.exp(m2)-1)),-torch.log(m2)-torch.log(-2*y-1+1/(torch.exp(m2)-1)))
self.logjac=torch.where(y >= 0,lj1,lj2).sum(1)
return x
class parallelop(Function):
@staticmethod
def forward(ctx,x,m):
y=x
m=m.detach().numpy()
n=np.size(x.detach().numpy(),1)
m1=np.zeros((n,n))
m2=np.zeros((n,n))
for a in range(0,n):
for b in range(0,a):
m1[a][b]=m[a][b]
m1[a][a]=1
for a in range(0,n):
for b in range(a,n-1):
m2[a][b+1]=m[a][b]
m2[a][a]=1
m1=torch.from_numpy(m1).float()
m2=torch.from_numpy(m2).float()
y1=y.mm(m1.t())
y1=Normalayer.apply(y1)
y2=y1.mm(m2.t())
ctx.save_for_backward(x,y1,m1,m2)
return y2
@staticmethod
def backward(ctx,grad_output):
input1,input2, m1, m2 = ctx.saved_tensors
if ctx.needs_input_grad[0]:
grad_input1=grad_output.mm(m2)
grad_input2=grad_input1.mm(m1)
if ctx.needs_input_grad[1]:
grad_input1=grad_output.mm(m2)
grad_input2=grad_input1.mm(m1)
grad_m2=grad_output.t().mm(input2)
grad_m1=grad_input1.t().mm(input1)
n=np.size(input1.detach().numpy(),1)
m=np.zeros((n,n-1))
for a in range(0,n):
for b in range(0,a):
m[a][b]=grad_m1[a][b]
for a in range(0,n):
for b in range(a,n-1):
m[a][b]=grad_m2[a][b+1]
grad_m=torch.from_numpy(m).float()
return grad_input2,grad_m
class Mixlayer(nn.Module):
def __init__(self,dim):
super(Mixlayer,self).__init__()
self.m=nn.Parameter(torch.FloatTensor(dim,dim-1),requires_grad=True)
self.m.data.uniform_(0, 0.1)
self.logjac=0
def forward(self,x):
self.logjac=x.new_zeros(x.shape[0])
y=parallelop.apply(x,self.m)
y=Normalayer.apply(y)
return y
def inverse(self,y):
self.logjac=y.new_zeros(y.shape[0])
x=parallelop.apply(y,-self.m)
x=Normalayer.apply(x)
return x
class LieFlow(nn.Module):
def __init__(self, depth, dim,udim, device='cpu', name=None):
super(LieFlow , self).__init__()
self.device = device
self.dim=dim
self.layers=nn.ModuleList()
if name is None:
self.name = 'LieFlow'
else:
self.name = name
self.layers.append(Perlayer())
for d in range(depth):
self.layers.append(Mixlayer(dim))
for e in range(udim):
self.layers.append(SLinearlayer(dim))
self.layers.append(SNonLinearlayerplus(dim))
if (d%2==0):
self.layers.append(SLinearlayer(dim))
self.layers.append(NewSNonLinearlayerplus(dim))
self.layers.append(Perlayer())
def forward(self, x):
'''
from latent to physical
'''
self.logjac=x.new_zeros(x.shape[0])
m=0
for d in self.layers:
x = d(x)
self.logjac +=d.logjac
return x
def inverse(self, y):
'''
from physical to latent
'''
self.logjac=y.new_zeros(y.shape[0])
for d in reversed(self.layers):
y = d.inverse(y)
self.logjac +=d.logjac
return y
def calgd(self, z):
logp=z.new_zeros(z.shape[0])
l=z.new_zeros(z.size())
for m in range(-1,2):
l+= torch.exp(-2*(z.add(m*2)).pow(2))
logp+= torch.log(l).add(-0.5*math.log(0.5*math.pi)).sum(1)
return logp
def sample(self, batch_size):
z= torch.Tensor(batch_size, self.dim).normal_(0,0.5)
x= self.forward(z)
logp= self.calgd(z)-self.logjac
return x,logp
def logprob(self, x):
z=self.inverse(x)
return self.callogp(z)+self.logjac
if __name__=='__main__':
'''
model=LieFlow(depth=2,dim=3,udim=2,name='test')
print(model)
y =torch.zeros(10,3,requires_grad=True)
x=model.forward(y)
print(y)
print(x)
'''
x =torch.zeros(3,2,requires_grad=True)
print(x)
y =torch.randn(1,3,requires_grad=True)
print(y)
l=parallelop.apply(y,x)
m=l.mean()
m.backward()
print(l)
print(m)
print(x.grad)
print(y.grad)
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment