##+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
## Created by: Hang Zhang
## ECE Department, Rutgers University
## Email: zhang.hang@rutgers.edu
## Copyright (c) 2017
##
## This source code is licensed under the MIT-style license found in the
## LICENSE file in the root directory of this source tree
##+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
"""Encoding Custermized NN Module"""
import torch
import torch.nn as nn
from torch.nn import functional as F
from torch.autograd import Variable
from .splat import SplAtConv2d
from .rectify import RFConv2d
torch_ver = torch.__version__[:3]
__all__ = ['ConvBnAct', 'GlobalAvgPool2d', 'GramMatrix',
'View', 'Sum', 'Mean', 'Normalize', 'ConcurrentModule',
'PyramidPooling']
class ConvBnAct(nn.Sequential):
def __init__(self, in_channels, out_channels, kernel_size, stride=1,
padding=0, dilation=1, radix=0, groups=1,
bias=True, padding_mode='zeros',
rectify=False, rectify_avg=False, act=True,
norm_layer=nn.BatchNorm2d):
super().__init__()
if radix > 0:
conv_layer = SplAtConv2d
conv_kwargs = {'radix': radix, 'rectify': rectify, 'rectify_avg': rectify_avg, 'norm_layer': norm_layer}
else:
conv_layer = RFConv2d if rectify else nn.Conv2d
conv_kwargs = {'average_mode': rectify_avg} if rectify else {}
self.add_module("conv", conv_layer(in_channels, out_channels, kernel_size=kernel_size, stride=stride,
padding=padding, dilation=dilation, groups=groups, bias=bias,
padding_mode=padding_mode, **conv_kwargs))
self.add_module("bn", nn.BatchNorm2d(out_channels))
if act:
self.add_module("relu", nn.ReLU())
class GlobalAvgPool2d(nn.Module):
def __init__(self):
"""Global average pooling over the input's spatial dimensions"""
super(GlobalAvgPool2d, self).__init__()
def forward(self, inputs):
return F.adaptive_avg_pool2d(inputs, 1).view(inputs.size(0), -1)
[docs]class GramMatrix(nn.Module):
r""" Gram Matrix for a 4D convolutional featuremaps as a mini-batch
.. math::
\mathcal{G} = \sum_{h=1}^{H_i}\sum_{w=1}^{W_i} \mathcal{F}_{h,w}\mathcal{F}_{h,w}^T
"""
[docs] def forward(self, y):
(b, ch, h, w) = y.size()
features = y.view(b, ch, w * h)
features_t = features.transpose(1, 2)
gram = features.bmm(features_t) / (ch * h * w)
return gram
class View(nn.Module):
"""Reshape the input into different size, an inplace operator, support
SelfParallel mode.
"""
def __init__(self, *args):
super(View, self).__init__()
if len(args) == 1 and isinstance(args[0], torch.Size):
self.size = args[0]
else:
self.size = torch.Size(args)
def forward(self, input):
return input.view(self.size)
class Sum(nn.Module):
def __init__(self, dim, keep_dim=False):
super(Sum, self).__init__()
self.dim = dim
self.keep_dim = keep_dim
def forward(self, input):
return input.sum(self.dim, self.keep_dim)
class Mean(nn.Module):
def __init__(self, dim, keep_dim=False):
super(Mean, self).__init__()
self.dim = dim
self.keep_dim = keep_dim
def forward(self, input):
return input.mean(self.dim, self.keep_dim)
class Normalize(nn.Module):
r"""Performs :math:`L_p` normalization of inputs over specified dimension.
Does:
.. math::
v = \frac{v}{\max(\lVert v \rVert_p, \epsilon)}
for each subtensor v over dimension dim of input. Each subtensor is
flattened into a vector, i.e. :math:`\lVert v \rVert_p` is not a matrix
norm.
With default arguments normalizes over the second dimension with Euclidean
norm.
Args:
p (float): the exponent value in the norm formulation. Default: 2
dim (int): the dimension to reduce. Default: 1
"""
def __init__(self, p=2, dim=1):
super(Normalize, self).__init__()
self.p = p
self.dim = dim
def forward(self, x):
return F.normalize(x, self.p, self.dim, eps=1e-8)
class ConcurrentModule(nn.ModuleList):
r"""Feed to a list of modules concurrently.
The outputs of the layers are concatenated at channel dimension.
Args:
modules (iterable, optional): an iterable of modules to add
"""
def __init__(self, modules=None):
super(ConcurrentModule, self).__init__(modules)
def forward(self, x):
outputs = []
for layer in self:
outputs.append(layer(x))
return torch.cat(outputs, 1)
class PyramidPooling(nn.Module):
"""
Reference:
Zhao, Hengshuang, et al. *"Pyramid scene parsing network."*
"""
def __init__(self, in_channels, norm_layer, up_kwargs):
super(PyramidPooling, self).__init__()
self.pool1 = nn.AdaptiveAvgPool2d(1)
self.pool2 = nn.AdaptiveAvgPool2d(2)
self.pool3 = nn.AdaptiveAvgPool2d(3)
self.pool4 = nn.AdaptiveAvgPool2d(6)
out_channels = int(in_channels/4)
self.conv1 = nn.Sequential(nn.Conv2d(in_channels, out_channels, 1, bias=False),
norm_layer(out_channels),
nn.ReLU(True))
self.conv2 = nn.Sequential(nn.Conv2d(in_channels, out_channels, 1, bias=False),
norm_layer(out_channels),
nn.ReLU(True))
self.conv3 = nn.Sequential(nn.Conv2d(in_channels, out_channels, 1, bias=False),
norm_layer(out_channels),
nn.ReLU(True))
self.conv4 = nn.Sequential(nn.Conv2d(in_channels, out_channels, 1, bias=False),
norm_layer(out_channels),
nn.ReLU(True))
# bilinear interpolate options
self._up_kwargs = up_kwargs
def forward(self, x):
_, _, h, w = x.size()
feat1 = F.interpolate(self.conv1(self.pool1(x)), (h, w), **self._up_kwargs)
feat2 = F.interpolate(self.conv2(self.pool2(x)), (h, w), **self._up_kwargs)
feat3 = F.interpolate(self.conv3(self.pool3(x)), (h, w), **self._up_kwargs)
feat4 = F.interpolate(self.conv4(self.pool4(x)), (h, w), **self._up_kwargs)
return torch.cat((x, feat1, feat2, feat3, feat4), 1)