From 52cc83d36b7663a77b79fd2258d2ca871af73e55 Mon Sep 17 00:00:00 2001 From: zhaohu xing <920232796@qq.com> Date: Wed, 30 Nov 2022 14:56:12 +0800 Subject: fix bugs Signed-off-by: zhaohu xing <920232796@qq.com> --- ldm/modules/attention.py | 261 ------- ldm/modules/diffusionmodules/__init__.py | 0 ldm/modules/diffusionmodules/model.py | 835 ---------------------- ldm/modules/diffusionmodules/openaimodel.py | 961 -------------------------- ldm/modules/diffusionmodules/util.py | 267 ------- ldm/modules/distributions/__init__.py | 0 ldm/modules/distributions/distributions.py | 92 --- ldm/modules/ema.py | 76 -- ldm/modules/encoders/__init__.py | 0 ldm/modules/encoders/modules.py | 234 ------- ldm/modules/encoders/xlmr.py | 137 ---- ldm/modules/image_degradation/__init__.py | 2 - ldm/modules/image_degradation/bsrgan.py | 730 ------------------- ldm/modules/image_degradation/bsrgan_light.py | 650 ----------------- ldm/modules/image_degradation/utils/test.png | Bin 441072 -> 0 bytes ldm/modules/image_degradation/utils_image.py | 916 ------------------------ ldm/modules/losses/__init__.py | 1 - ldm/modules/losses/contperceptual.py | 111 --- ldm/modules/losses/vqperceptual.py | 167 ----- ldm/modules/x_transformer.py | 641 ----------------- 20 files changed, 6081 deletions(-) delete mode 100644 ldm/modules/attention.py delete mode 100644 ldm/modules/diffusionmodules/__init__.py delete mode 100644 ldm/modules/diffusionmodules/model.py delete mode 100644 ldm/modules/diffusionmodules/openaimodel.py delete mode 100644 ldm/modules/diffusionmodules/util.py delete mode 100644 ldm/modules/distributions/__init__.py delete mode 100644 ldm/modules/distributions/distributions.py delete mode 100644 ldm/modules/ema.py delete mode 100644 ldm/modules/encoders/__init__.py delete mode 100644 ldm/modules/encoders/modules.py delete mode 100644 ldm/modules/encoders/xlmr.py delete mode 100644 ldm/modules/image_degradation/__init__.py delete mode 100644 ldm/modules/image_degradation/bsrgan.py delete mode 100644 ldm/modules/image_degradation/bsrgan_light.py delete mode 100644 ldm/modules/image_degradation/utils/test.png delete mode 100644 ldm/modules/image_degradation/utils_image.py delete mode 100644 ldm/modules/losses/__init__.py delete mode 100644 ldm/modules/losses/contperceptual.py delete mode 100644 ldm/modules/losses/vqperceptual.py delete mode 100644 ldm/modules/x_transformer.py (limited to 'ldm/modules') diff --git a/ldm/modules/attention.py b/ldm/modules/attention.py deleted file mode 100644 index f4eff39c..00000000 --- a/ldm/modules/attention.py +++ /dev/null @@ -1,261 +0,0 @@ -from inspect import isfunction -import math -import torch -import torch.nn.functional as F -from torch import nn, einsum -from einops import rearrange, repeat - -from ldm.modules.diffusionmodules.util import checkpoint - - -def exists(val): - return val is not None - - -def uniq(arr): - return{el: True for el in arr}.keys() - - -def default(val, d): - if exists(val): - return val - return d() if isfunction(d) else d - - -def max_neg_value(t): - return -torch.finfo(t.dtype).max - - -def init_(tensor): - dim = tensor.shape[-1] - std = 1 / math.sqrt(dim) - tensor.uniform_(-std, std) - return tensor - - -# feedforward -class GEGLU(nn.Module): - def __init__(self, dim_in, dim_out): - super().__init__() - self.proj = nn.Linear(dim_in, dim_out * 2) - - def forward(self, x): - x, gate = self.proj(x).chunk(2, dim=-1) - return x * F.gelu(gate) - - -class FeedForward(nn.Module): - def __init__(self, dim, dim_out=None, mult=4, glu=False, dropout=0.): - super().__init__() - inner_dim = int(dim * mult) - dim_out = default(dim_out, dim) - project_in = nn.Sequential( - nn.Linear(dim, inner_dim), - nn.GELU() - ) if not glu else GEGLU(dim, inner_dim) - - self.net = nn.Sequential( - project_in, - nn.Dropout(dropout), - nn.Linear(inner_dim, dim_out) - ) - - def forward(self, x): - return self.net(x) - - -def zero_module(module): - """ - Zero out the parameters of a module and return it. - """ - for p in module.parameters(): - p.detach().zero_() - return module - - -def Normalize(in_channels): - return torch.nn.GroupNorm(num_groups=32, num_channels=in_channels, eps=1e-6, affine=True) - - -class LinearAttention(nn.Module): - def __init__(self, dim, heads=4, dim_head=32): - super().__init__() - self.heads = heads - hidden_dim = dim_head * heads - self.to_qkv = nn.Conv2d(dim, hidden_dim * 3, 1, bias = False) - self.to_out = nn.Conv2d(hidden_dim, dim, 1) - - def forward(self, x): - b, c, h, w = x.shape - qkv = self.to_qkv(x) - q, k, v = rearrange(qkv, 'b (qkv heads c) h w -> qkv b heads c (h w)', heads = self.heads, qkv=3) - k = k.softmax(dim=-1) - context = torch.einsum('bhdn,bhen->bhde', k, v) - out = torch.einsum('bhde,bhdn->bhen', context, q) - out = rearrange(out, 'b heads c (h w) -> b (heads c) h w', heads=self.heads, h=h, w=w) - return self.to_out(out) - - -class SpatialSelfAttention(nn.Module): - def __init__(self, in_channels): - super().__init__() - self.in_channels = in_channels - - self.norm = Normalize(in_channels) - self.q = torch.nn.Conv2d(in_channels, - in_channels, - kernel_size=1, - stride=1, - padding=0) - self.k = torch.nn.Conv2d(in_channels, - in_channels, - kernel_size=1, - stride=1, - padding=0) - self.v = torch.nn.Conv2d(in_channels, - in_channels, - kernel_size=1, - stride=1, - padding=0) - self.proj_out = torch.nn.Conv2d(in_channels, - in_channels, - kernel_size=1, - stride=1, - padding=0) - - def forward(self, x): - h_ = x - h_ = self.norm(h_) - q = self.q(h_) - k = self.k(h_) - v = self.v(h_) - - # compute attention - b,c,h,w = q.shape - q = rearrange(q, 'b c h w -> b (h w) c') - k = rearrange(k, 'b c h w -> b c (h w)') - w_ = torch.einsum('bij,bjk->bik', q, k) - - w_ = w_ * (int(c)**(-0.5)) - w_ = torch.nn.functional.softmax(w_, dim=2) - - # attend to values - v = rearrange(v, 'b c h w -> b c (h w)') - w_ = rearrange(w_, 'b i j -> b j i') - h_ = torch.einsum('bij,bjk->bik', v, w_) - h_ = rearrange(h_, 'b c (h w) -> b c h w', h=h) - h_ = self.proj_out(h_) - - return x+h_ - - -class CrossAttention(nn.Module): - def __init__(self, query_dim, context_dim=None, heads=8, dim_head=64, dropout=0.): - super().__init__() - inner_dim = dim_head * heads - context_dim = default(context_dim, query_dim) - - self.scale = dim_head ** -0.5 - self.heads = heads - - self.to_q = nn.Linear(query_dim, inner_dim, bias=False) - self.to_k = nn.Linear(context_dim, inner_dim, bias=False) - self.to_v = nn.Linear(context_dim, inner_dim, bias=False) - - self.to_out = nn.Sequential( - nn.Linear(inner_dim, query_dim), - nn.Dropout(dropout) - ) - - def forward(self, x, context=None, mask=None): - h = self.heads - - q = self.to_q(x) - context = default(context, x) - k = self.to_k(context) - v = self.to_v(context) - - q, k, v = map(lambda t: rearrange(t, 'b n (h d) -> (b h) n d', h=h), (q, k, v)) - - sim = einsum('b i d, b j d -> b i j', q, k) * self.scale - - if exists(mask): - mask = rearrange(mask, 'b ... -> b (...)') - max_neg_value = -torch.finfo(sim.dtype).max - mask = repeat(mask, 'b j -> (b h) () j', h=h) - sim.masked_fill_(~mask, max_neg_value) - - # attention, what we cannot get enough of - attn = sim.softmax(dim=-1) - - out = einsum('b i j, b j d -> b i d', attn, v) - out = rearrange(out, '(b h) n d -> b n (h d)', h=h) - return self.to_out(out) - - -class BasicTransformerBlock(nn.Module): - def __init__(self, dim, n_heads, d_head, dropout=0., context_dim=None, gated_ff=True, checkpoint=True): - super().__init__() - self.attn1 = CrossAttention(query_dim=dim, heads=n_heads, dim_head=d_head, dropout=dropout) # is a self-attention - self.ff = FeedForward(dim, dropout=dropout, glu=gated_ff) - self.attn2 = CrossAttention(query_dim=dim, context_dim=context_dim, - heads=n_heads, dim_head=d_head, dropout=dropout) # is self-attn if context is none - self.norm1 = nn.LayerNorm(dim) - self.norm2 = nn.LayerNorm(dim) - self.norm3 = nn.LayerNorm(dim) - self.checkpoint = checkpoint - - def forward(self, x, context=None): - return checkpoint(self._forward, (x, context), self.parameters(), self.checkpoint) - - def _forward(self, x, context=None): - x = self.attn1(self.norm1(x)) + x - x = self.attn2(self.norm2(x), context=context) + x - x = self.ff(self.norm3(x)) + x - return x - - -class SpatialTransformer(nn.Module): - """ - Transformer block for image-like data. - First, project the input (aka embedding) - and reshape to b, t, d. - Then apply standard transformer action. - Finally, reshape to image - """ - def __init__(self, in_channels, n_heads, d_head, - depth=1, dropout=0., context_dim=None): - super().__init__() - self.in_channels = in_channels - inner_dim = n_heads * d_head - self.norm = Normalize(in_channels) - - self.proj_in = nn.Conv2d(in_channels, - inner_dim, - kernel_size=1, - stride=1, - padding=0) - - self.transformer_blocks = nn.ModuleList( - [BasicTransformerBlock(inner_dim, n_heads, d_head, dropout=dropout, context_dim=context_dim) - for d in range(depth)] - ) - - self.proj_out = zero_module(nn.Conv2d(inner_dim, - in_channels, - kernel_size=1, - stride=1, - padding=0)) - - def forward(self, x, context=None): - # note: if no context is given, cross-attention defaults to self-attention - b, c, h, w = x.shape - x_in = x - x = self.norm(x) - x = self.proj_in(x) - x = rearrange(x, 'b c h w -> b (h w) c') - for block in self.transformer_blocks: - x = block(x, context=context) - x = rearrange(x, 'b (h w) c -> b c h w', h=h, w=w) - x = self.proj_out(x) - return x + x_in \ No newline at end of file diff --git a/ldm/modules/diffusionmodules/__init__.py b/ldm/modules/diffusionmodules/__init__.py deleted file mode 100644 index e69de29b..00000000 diff --git a/ldm/modules/diffusionmodules/model.py b/ldm/modules/diffusionmodules/model.py deleted file mode 100644 index 533e589a..00000000 --- a/ldm/modules/diffusionmodules/model.py +++ /dev/null @@ -1,835 +0,0 @@ -# pytorch_diffusion + derived encoder decoder -import math -import torch -import torch.nn as nn -import numpy as np -from einops import rearrange - -from ldm.util import instantiate_from_config -from ldm.modules.attention import LinearAttention - - -def get_timestep_embedding(timesteps, embedding_dim): - """ - This matches the implementation in Denoising Diffusion Probabilistic Models: - From Fairseq. - Build sinusoidal embeddings. - This matches the implementation in tensor2tensor, but differs slightly - from the description in Section 3.5 of "Attention Is All You Need". - """ - assert len(timesteps.shape) == 1 - - half_dim = embedding_dim // 2 - emb = math.log(10000) / (half_dim - 1) - emb = torch.exp(torch.arange(half_dim, dtype=torch.float32) * -emb) - emb = emb.to(device=timesteps.device) - emb = timesteps.float()[:, None] * emb[None, :] - emb = torch.cat([torch.sin(emb), torch.cos(emb)], dim=1) - if embedding_dim % 2 == 1: # zero pad - emb = torch.nn.functional.pad(emb, (0,1,0,0)) - return emb - - -def nonlinearity(x): - # swish - return x*torch.sigmoid(x) - - -def Normalize(in_channels, num_groups=32): - return torch.nn.GroupNorm(num_groups=num_groups, num_channels=in_channels, eps=1e-6, affine=True) - - -class Upsample(nn.Module): - def __init__(self, in_channels, with_conv): - super().__init__() - self.with_conv = with_conv - if self.with_conv: - self.conv = torch.nn.Conv2d(in_channels, - in_channels, - kernel_size=3, - stride=1, - padding=1) - - def forward(self, x): - x = torch.nn.functional.interpolate(x, scale_factor=2.0, mode="nearest") - if self.with_conv: - x = self.conv(x) - return x - - -class Downsample(nn.Module): - def __init__(self, in_channels, with_conv): - super().__init__() - self.with_conv = with_conv - if self.with_conv: - # no asymmetric padding in torch conv, must do it ourselves - self.conv = torch.nn.Conv2d(in_channels, - in_channels, - kernel_size=3, - stride=2, - padding=0) - - def forward(self, x): - if self.with_conv: - pad = (0,1,0,1) - x = torch.nn.functional.pad(x, pad, mode="constant", value=0) - x = self.conv(x) - else: - x = torch.nn.functional.avg_pool2d(x, kernel_size=2, stride=2) - return x - - -class ResnetBlock(nn.Module): - def __init__(self, *, in_channels, out_channels=None, conv_shortcut=False, - dropout, temb_channels=512): - super().__init__() - self.in_channels = in_channels - out_channels = in_channels if out_channels is None else out_channels - self.out_channels = out_channels - self.use_conv_shortcut = conv_shortcut - - self.norm1 = Normalize(in_channels) - self.conv1 = torch.nn.Conv2d(in_channels, - out_channels, - kernel_size=3, - stride=1, - padding=1) - if temb_channels > 0: - self.temb_proj = torch.nn.Linear(temb_channels, - out_channels) - self.norm2 = Normalize(out_channels) - self.dropout = torch.nn.Dropout(dropout) - self.conv2 = torch.nn.Conv2d(out_channels, - out_channels, - kernel_size=3, - stride=1, - padding=1) - if self.in_channels != self.out_channels: - if self.use_conv_shortcut: - self.conv_shortcut = torch.nn.Conv2d(in_channels, - out_channels, - kernel_size=3, - stride=1, - padding=1) - else: - self.nin_shortcut = torch.nn.Conv2d(in_channels, - out_channels, - kernel_size=1, - stride=1, - padding=0) - - def forward(self, x, temb): - h = x - h = self.norm1(h) - h = nonlinearity(h) - h = self.conv1(h) - - if temb is not None: - h = h + self.temb_proj(nonlinearity(temb))[:,:,None,None] - - h = self.norm2(h) - h = nonlinearity(h) - h = self.dropout(h) - h = self.conv2(h) - - if self.in_channels != self.out_channels: - if self.use_conv_shortcut: - x = self.conv_shortcut(x) - else: - x = self.nin_shortcut(x) - - return x+h - - -class LinAttnBlock(LinearAttention): - """to match AttnBlock usage""" - def __init__(self, in_channels): - super().__init__(dim=in_channels, heads=1, dim_head=in_channels) - - -class AttnBlock(nn.Module): - def __init__(self, in_channels): - super().__init__() - self.in_channels = in_channels - - self.norm = Normalize(in_channels) - self.q = torch.nn.Conv2d(in_channels, - in_channels, - kernel_size=1, - stride=1, - padding=0) - self.k = torch.nn.Conv2d(in_channels, - in_channels, - kernel_size=1, - stride=1, - padding=0) - self.v = torch.nn.Conv2d(in_channels, - in_channels, - kernel_size=1, - stride=1, - padding=0) - self.proj_out = torch.nn.Conv2d(in_channels, - in_channels, - kernel_size=1, - stride=1, - padding=0) - - - def forward(self, x): - h_ = x - h_ = self.norm(h_) - q = self.q(h_) - k = self.k(h_) - v = self.v(h_) - - # compute attention - b,c,h,w = q.shape - q = q.reshape(b,c,h*w) - q = q.permute(0,2,1) # b,hw,c - k = k.reshape(b,c,h*w) # b,c,hw - w_ = torch.bmm(q,k) # b,hw,hw w[b,i,j]=sum_c q[b,i,c]k[b,c,j] - w_ = w_ * (int(c)**(-0.5)) - w_ = torch.nn.functional.softmax(w_, dim=2) - - # attend to values - v = v.reshape(b,c,h*w) - w_ = w_.permute(0,2,1) # b,hw,hw (first hw of k, second of q) - h_ = torch.bmm(v,w_) # b, c,hw (hw of q) h_[b,c,j] = sum_i v[b,c,i] w_[b,i,j] - h_ = h_.reshape(b,c,h,w) - - h_ = self.proj_out(h_) - - return x+h_ - - -def make_attn(in_channels, attn_type="vanilla"): - assert attn_type in ["vanilla", "linear", "none"], f'attn_type {attn_type} unknown' - print(f"making attention of type '{attn_type}' with {in_channels} in_channels") - if attn_type == "vanilla": - return AttnBlock(in_channels) - elif attn_type == "none": - return nn.Identity(in_channels) - else: - return LinAttnBlock(in_channels) - - -class Model(nn.Module): - def __init__(self, *, ch, out_ch, ch_mult=(1,2,4,8), num_res_blocks, - attn_resolutions, dropout=0.0, resamp_with_conv=True, in_channels, - resolution, use_timestep=True, use_linear_attn=False, attn_type="vanilla"): - super().__init__() - if use_linear_attn: attn_type = "linear" - self.ch = ch - self.temb_ch = self.ch*4 - self.num_resolutions = len(ch_mult) - self.num_res_blocks = num_res_blocks - self.resolution = resolution - self.in_channels = in_channels - - self.use_timestep = use_timestep - if self.use_timestep: - # timestep embedding - self.temb = nn.Module() - self.temb.dense = nn.ModuleList([ - torch.nn.Linear(self.ch, - self.temb_ch), - torch.nn.Linear(self.temb_ch, - self.temb_ch), - ]) - - # downsampling - self.conv_in = torch.nn.Conv2d(in_channels, - self.ch, - kernel_size=3, - stride=1, - padding=1) - - curr_res = resolution - in_ch_mult = (1,)+tuple(ch_mult) - self.down = nn.ModuleList() - for i_level in range(self.num_resolutions): - block = nn.ModuleList() - attn = nn.ModuleList() - block_in = ch*in_ch_mult[i_level] - block_out = ch*ch_mult[i_level] - for i_block in range(self.num_res_blocks): - block.append(ResnetBlock(in_channels=block_in, - out_channels=block_out, - temb_channels=self.temb_ch, - dropout=dropout)) - block_in = block_out - if curr_res in attn_resolutions: - attn.append(make_attn(block_in, attn_type=attn_type)) - down = nn.Module() - down.block = block - down.attn = attn - if i_level != self.num_resolutions-1: - down.downsample = Downsample(block_in, resamp_with_conv) - curr_res = curr_res // 2 - self.down.append(down) - - # middle - self.mid = nn.Module() - self.mid.block_1 = ResnetBlock(in_channels=block_in, - out_channels=block_in, - temb_channels=self.temb_ch, - dropout=dropout) - self.mid.attn_1 = make_attn(block_in, attn_type=attn_type) - self.mid.block_2 = ResnetBlock(in_channels=block_in, - out_channels=block_in, - temb_channels=self.temb_ch, - dropout=dropout) - - # upsampling - self.up = nn.ModuleList() - for i_level in reversed(range(self.num_resolutions)): - block = nn.ModuleList() - attn = nn.ModuleList() - block_out = ch*ch_mult[i_level] - skip_in = ch*ch_mult[i_level] - for i_block in range(self.num_res_blocks+1): - if i_block == self.num_res_blocks: - skip_in = ch*in_ch_mult[i_level] - block.append(ResnetBlock(in_channels=block_in+skip_in, - out_channels=block_out, - temb_channels=self.temb_ch, - dropout=dropout)) - block_in = block_out - if curr_res in attn_resolutions: - attn.append(make_attn(block_in, attn_type=attn_type)) - up = nn.Module() - up.block = block - up.attn = attn - if i_level != 0: - up.upsample = Upsample(block_in, resamp_with_conv) - curr_res = curr_res * 2 - self.up.insert(0, up) # prepend to get consistent order - - # end - self.norm_out = Normalize(block_in) - self.conv_out = torch.nn.Conv2d(block_in, - out_ch, - kernel_size=3, - stride=1, - padding=1) - - def forward(self, x, t=None, context=None): - #assert x.shape[2] == x.shape[3] == self.resolution - if context is not None: - # assume aligned context, cat along channel axis - x = torch.cat((x, context), dim=1) - if self.use_timestep: - # timestep embedding - assert t is not None - temb = get_timestep_embedding(t, self.ch) - temb = self.temb.dense[0](temb) - temb = nonlinearity(temb) - temb = self.temb.dense[1](temb) - else: - temb = None - - # downsampling - hs = [self.conv_in(x)] - for i_level in range(self.num_resolutions): - for i_block in range(self.num_res_blocks): - h = self.down[i_level].block[i_block](hs[-1], temb) - if len(self.down[i_level].attn) > 0: - h = self.down[i_level].attn[i_block](h) - hs.append(h) - if i_level != self.num_resolutions-1: - hs.append(self.down[i_level].downsample(hs[-1])) - - # middle - h = hs[-1] - h = self.mid.block_1(h, temb) - h = self.mid.attn_1(h) - h = self.mid.block_2(h, temb) - - # upsampling - for i_level in reversed(range(self.num_resolutions)): - for i_block in range(self.num_res_blocks+1): - h = self.up[i_level].block[i_block]( - torch.cat([h, hs.pop()], dim=1), temb) - if len(self.up[i_level].attn) > 0: - h = self.up[i_level].attn[i_block](h) - if i_level != 0: - h = self.up[i_level].upsample(h) - - # end - h = self.norm_out(h) - h = nonlinearity(h) - h = self.conv_out(h) - return h - - def get_last_layer(self): - return self.conv_out.weight - - -class Encoder(nn.Module): - def __init__(self, *, ch, out_ch, ch_mult=(1,2,4,8), num_res_blocks, - attn_resolutions, dropout=0.0, resamp_with_conv=True, in_channels, - resolution, z_channels, double_z=True, use_linear_attn=False, attn_type="vanilla", - **ignore_kwargs): - super().__init__() - if use_linear_attn: attn_type = "linear" - self.ch = ch - self.temb_ch = 0 - self.num_resolutions = len(ch_mult) - self.num_res_blocks = num_res_blocks - self.resolution = resolution - self.in_channels = in_channels - - # downsampling - self.conv_in = torch.nn.Conv2d(in_channels, - self.ch, - kernel_size=3, - stride=1, - padding=1) - - curr_res = resolution - in_ch_mult = (1,)+tuple(ch_mult) - self.in_ch_mult = in_ch_mult - self.down = nn.ModuleList() - for i_level in range(self.num_resolutions): - block = nn.ModuleList() - attn = nn.ModuleList() - block_in = ch*in_ch_mult[i_level] - block_out = ch*ch_mult[i_level] - for i_block in range(self.num_res_blocks): - block.append(ResnetBlock(in_channels=block_in, - out_channels=block_out, - temb_channels=self.temb_ch, - dropout=dropout)) - block_in = block_out - if curr_res in attn_resolutions: - attn.append(make_attn(block_in, attn_type=attn_type)) - down = nn.Module() - down.block = block - down.attn = attn - if i_level != self.num_resolutions-1: - down.downsample = Downsample(block_in, resamp_with_conv) - curr_res = curr_res // 2 - self.down.append(down) - - # middle - self.mid = nn.Module() - self.mid.block_1 = ResnetBlock(in_channels=block_in, - out_channels=block_in, - temb_channels=self.temb_ch, - dropout=dropout) - self.mid.attn_1 = make_attn(block_in, attn_type=attn_type) - self.mid.block_2 = ResnetBlock(in_channels=block_in, - out_channels=block_in, - temb_channels=self.temb_ch, - dropout=dropout) - - # end - self.norm_out = Normalize(block_in) - self.conv_out = torch.nn.Conv2d(block_in, - 2*z_channels if double_z else z_channels, - kernel_size=3, - stride=1, - padding=1) - - def forward(self, x): - # timestep embedding - temb = None - - # downsampling - hs = [self.conv_in(x)] - for i_level in range(self.num_resolutions): - for i_block in range(self.num_res_blocks): - h = self.down[i_level].block[i_block](hs[-1], temb) - if len(self.down[i_level].attn) > 0: - h = self.down[i_level].attn[i_block](h) - hs.append(h) - if i_level != self.num_resolutions-1: - hs.append(self.down[i_level].downsample(hs[-1])) - - # middle - h = hs[-1] - h = self.mid.block_1(h, temb) - h = self.mid.attn_1(h) - h = self.mid.block_2(h, temb) - - # end - h = self.norm_out(h) - h = nonlinearity(h) - h = self.conv_out(h) - return h - - -class Decoder(nn.Module): - def __init__(self, *, ch, out_ch, ch_mult=(1,2,4,8), num_res_blocks, - attn_resolutions, dropout=0.0, resamp_with_conv=True, in_channels, - resolution, z_channels, give_pre_end=False, tanh_out=False, use_linear_attn=False, - attn_type="vanilla", **ignorekwargs): - super().__init__() - if use_linear_attn: attn_type = "linear" - self.ch = ch - self.temb_ch = 0 - self.num_resolutions = len(ch_mult) - self.num_res_blocks = num_res_blocks - self.resolution = resolution - self.in_channels = in_channels - self.give_pre_end = give_pre_end - self.tanh_out = tanh_out - - # compute in_ch_mult, block_in and curr_res at lowest res - in_ch_mult = (1,)+tuple(ch_mult) - block_in = ch*ch_mult[self.num_resolutions-1] - curr_res = resolution // 2**(self.num_resolutions-1) - self.z_shape = (1,z_channels,curr_res,curr_res) - print("Working with z of shape {} = {} dimensions.".format( - self.z_shape, np.prod(self.z_shape))) - - # z to block_in - self.conv_in = torch.nn.Conv2d(z_channels, - block_in, - kernel_size=3, - stride=1, - padding=1) - - # middle - self.mid = nn.Module() - self.mid.block_1 = ResnetBlock(in_channels=block_in, - out_channels=block_in, - temb_channels=self.temb_ch, - dropout=dropout) - self.mid.attn_1 = make_attn(block_in, attn_type=attn_type) - self.mid.block_2 = ResnetBlock(in_channels=block_in, - out_channels=block_in, - temb_channels=self.temb_ch, - dropout=dropout) - - # upsampling - self.up = nn.ModuleList() - for i_level in reversed(range(self.num_resolutions)): - block = nn.ModuleList() - attn = nn.ModuleList() - block_out = ch*ch_mult[i_level] - for i_block in range(self.num_res_blocks+1): - block.append(ResnetBlock(in_channels=block_in, - out_channels=block_out, - temb_channels=self.temb_ch, - dropout=dropout)) - block_in = block_out - if curr_res in attn_resolutions: - attn.append(make_attn(block_in, attn_type=attn_type)) - up = nn.Module() - up.block = block - up.attn = attn - if i_level != 0: - up.upsample = Upsample(block_in, resamp_with_conv) - curr_res = curr_res * 2 - self.up.insert(0, up) # prepend to get consistent order - - # end - self.norm_out = Normalize(block_in) - self.conv_out = torch.nn.Conv2d(block_in, - out_ch, - kernel_size=3, - stride=1, - padding=1) - - def forward(self, z): - #assert z.shape[1:] == self.z_shape[1:] - self.last_z_shape = z.shape - - # timestep embedding - temb = None - - # z to block_in - h = self.conv_in(z) - - # middle - h = self.mid.block_1(h, temb) - h = self.mid.attn_1(h) - h = self.mid.block_2(h, temb) - - # upsampling - for i_level in reversed(range(self.num_resolutions)): - for i_block in range(self.num_res_blocks+1): - h = self.up[i_level].block[i_block](h, temb) - if len(self.up[i_level].attn) > 0: - h = self.up[i_level].attn[i_block](h) - if i_level != 0: - h = self.up[i_level].upsample(h) - - # end - if self.give_pre_end: - return h - - h = self.norm_out(h) - h = nonlinearity(h) - h = self.conv_out(h) - if self.tanh_out: - h = torch.tanh(h) - return h - - -class SimpleDecoder(nn.Module): - def __init__(self, in_channels, out_channels, *args, **kwargs): - super().__init__() - self.model = nn.ModuleList([nn.Conv2d(in_channels, in_channels, 1), - ResnetBlock(in_channels=in_channels, - out_channels=2 * in_channels, - temb_channels=0, dropout=0.0), - ResnetBlock(in_channels=2 * in_channels, - out_channels=4 * in_channels, - temb_channels=0, dropout=0.0), - ResnetBlock(in_channels=4 * in_channels, - out_channels=2 * in_channels, - temb_channels=0, dropout=0.0), - nn.Conv2d(2*in_channels, in_channels, 1), - Upsample(in_channels, with_conv=True)]) - # end - self.norm_out = Normalize(in_channels) - self.conv_out = torch.nn.Conv2d(in_channels, - out_channels, - kernel_size=3, - stride=1, - padding=1) - - def forward(self, x): - for i, layer in enumerate(self.model): - if i in [1,2,3]: - x = layer(x, None) - else: - x = layer(x) - - h = self.norm_out(x) - h = nonlinearity(h) - x = self.conv_out(h) - return x - - -class UpsampleDecoder(nn.Module): - def __init__(self, in_channels, out_channels, ch, num_res_blocks, resolution, - ch_mult=(2,2), dropout=0.0): - super().__init__() - # upsampling - self.temb_ch = 0 - self.num_resolutions = len(ch_mult) - self.num_res_blocks = num_res_blocks - block_in = in_channels - curr_res = resolution // 2 ** (self.num_resolutions - 1) - self.res_blocks = nn.ModuleList() - self.upsample_blocks = nn.ModuleList() - for i_level in range(self.num_resolutions): - res_block = [] - block_out = ch * ch_mult[i_level] - for i_block in range(self.num_res_blocks + 1): - res_block.append(ResnetBlock(in_channels=block_in, - out_channels=block_out, - temb_channels=self.temb_ch, - dropout=dropout)) - block_in = block_out - self.res_blocks.append(nn.ModuleList(res_block)) - if i_level != self.num_resolutions - 1: - self.upsample_blocks.append(Upsample(block_in, True)) - curr_res = curr_res * 2 - - # end - self.norm_out = Normalize(block_in) - self.conv_out = torch.nn.Conv2d(block_in, - out_channels, - kernel_size=3, - stride=1, - padding=1) - - def forward(self, x): - # upsampling - h = x - for k, i_level in enumerate(range(self.num_resolutions)): - for i_block in range(self.num_res_blocks + 1): - h = self.res_blocks[i_level][i_block](h, None) - if i_level != self.num_resolutions - 1: - h = self.upsample_blocks[k](h) - h = self.norm_out(h) - h = nonlinearity(h) - h = self.conv_out(h) - return h - - -class LatentRescaler(nn.Module): - def __init__(self, factor, in_channels, mid_channels, out_channels, depth=2): - super().__init__() - # residual block, interpolate, residual block - self.factor = factor - self.conv_in = nn.Conv2d(in_channels, - mid_channels, - kernel_size=3, - stride=1, - padding=1) - self.res_block1 = nn.ModuleList([ResnetBlock(in_channels=mid_channels, - out_channels=mid_channels, - temb_channels=0, - dropout=0.0) for _ in range(depth)]) - self.attn = AttnBlock(mid_channels) - self.res_block2 = nn.ModuleList([ResnetBlock(in_channels=mid_channels, - out_channels=mid_channels, - temb_channels=0, - dropout=0.0) for _ in range(depth)]) - - self.conv_out = nn.Conv2d(mid_channels, - out_channels, - kernel_size=1, - ) - - def forward(self, x): - x = self.conv_in(x) - for block in self.res_block1: - x = block(x, None) - x = torch.nn.functional.interpolate(x, size=(int(round(x.shape[2]*self.factor)), int(round(x.shape[3]*self.factor)))) - x = self.attn(x) - for block in self.res_block2: - x = block(x, None) - x = self.conv_out(x) - return x - - -class MergedRescaleEncoder(nn.Module): - def __init__(self, in_channels, ch, resolution, out_ch, num_res_blocks, - attn_resolutions, dropout=0.0, resamp_with_conv=True, - ch_mult=(1,2,4,8), rescale_factor=1.0, rescale_module_depth=1): - super().__init__() - intermediate_chn = ch * ch_mult[-1] - self.encoder = Encoder(in_channels=in_channels, num_res_blocks=num_res_blocks, ch=ch, ch_mult=ch_mult, - z_channels=intermediate_chn, double_z=False, resolution=resolution, - attn_resolutions=attn_resolutions, dropout=dropout, resamp_with_conv=resamp_with_conv, - out_ch=None) - self.rescaler = LatentRescaler(factor=rescale_factor, in_channels=intermediate_chn, - mid_channels=intermediate_chn, out_channels=out_ch, depth=rescale_module_depth) - - def forward(self, x): - x = self.encoder(x) - x = self.rescaler(x) - return x - - -class MergedRescaleDecoder(nn.Module): - def __init__(self, z_channels, out_ch, resolution, num_res_blocks, attn_resolutions, ch, ch_mult=(1,2,4,8), - dropout=0.0, resamp_with_conv=True, rescale_factor=1.0, rescale_module_depth=1): - super().__init__() - tmp_chn = z_channels*ch_mult[-1] - self.decoder = Decoder(out_ch=out_ch, z_channels=tmp_chn, attn_resolutions=attn_resolutions, dropout=dropout, - resamp_with_conv=resamp_with_conv, in_channels=None, num_res_blocks=num_res_blocks, - ch_mult=ch_mult, resolution=resolution, ch=ch) - self.rescaler = LatentRescaler(factor=rescale_factor, in_channels=z_channels, mid_channels=tmp_chn, - out_channels=tmp_chn, depth=rescale_module_depth) - - def forward(self, x): - x = self.rescaler(x) - x = self.decoder(x) - return x - - -class Upsampler(nn.Module): - def __init__(self, in_size, out_size, in_channels, out_channels, ch_mult=2): - super().__init__() - assert out_size >= in_size - num_blocks = int(np.log2(out_size//in_size))+1 - factor_up = 1.+ (out_size % in_size) - print(f"Building {self.__class__.__name__} with in_size: {in_size} --> out_size {out_size} and factor {factor_up}") - self.rescaler = LatentRescaler(factor=factor_up, in_channels=in_channels, mid_channels=2*in_channels, - out_channels=in_channels) - self.decoder = Decoder(out_ch=out_channels, resolution=out_size, z_channels=in_channels, num_res_blocks=2, - attn_resolutions=[], in_channels=None, ch=in_channels, - ch_mult=[ch_mult for _ in range(num_blocks)]) - - def forward(self, x): - x = self.rescaler(x) - x = self.decoder(x) - return x - - -class Resize(nn.Module): - def __init__(self, in_channels=None, learned=False, mode="bilinear"): - super().__init__() - self.with_conv = learned - self.mode = mode - if self.with_conv: - print(f"Note: {self.__class__.__name} uses learned downsampling and will ignore the fixed {mode} mode") - raise NotImplementedError() - assert in_channels is not None - # no asymmetric padding in torch conv, must do it ourselves - self.conv = torch.nn.Conv2d(in_channels, - in_channels, - kernel_size=4, - stride=2, - padding=1) - - def forward(self, x, scale_factor=1.0): - if scale_factor==1.0: - return x - else: - x = torch.nn.functional.interpolate(x, mode=self.mode, align_corners=False, scale_factor=scale_factor) - return x - -class FirstStagePostProcessor(nn.Module): - - def __init__(self, ch_mult:list, in_channels, - pretrained_model:nn.Module=None, - reshape=False, - n_channels=None, - dropout=0., - pretrained_config=None): - super().__init__() - if pretrained_config is None: - assert pretrained_model is not None, 'Either "pretrained_model" or "pretrained_config" must not be None' - self.pretrained_model = pretrained_model - else: - assert pretrained_config is not None, 'Either "pretrained_model" or "pretrained_config" must not be None' - self.instantiate_pretrained(pretrained_config) - - self.do_reshape = reshape - - if n_channels is None: - n_channels = self.pretrained_model.encoder.ch - - self.proj_norm = Normalize(in_channels,num_groups=in_channels//2) - self.proj = nn.Conv2d(in_channels,n_channels,kernel_size=3, - stride=1,padding=1) - - blocks = [] - downs = [] - ch_in = n_channels - for m in ch_mult: - blocks.append(ResnetBlock(in_channels=ch_in,out_channels=m*n_channels,dropout=dropout)) - ch_in = m * n_channels - downs.append(Downsample(ch_in, with_conv=False)) - - self.model = nn.ModuleList(blocks) - self.downsampler = nn.ModuleList(downs) - - - def instantiate_pretrained(self, config): - model = instantiate_from_config(config) - self.pretrained_model = model.eval() - # self.pretrained_model.train = False - for param in self.pretrained_model.parameters(): - param.requires_grad = False - - - @torch.no_grad() - def encode_with_pretrained(self,x): - c = self.pretrained_model.encode(x) - if isinstance(c, DiagonalGaussianDistribution): - c = c.mode() - return c - - def forward(self,x): - z_fs = self.encode_with_pretrained(x) - z = self.proj_norm(z_fs) - z = self.proj(z) - z = nonlinearity(z) - - for submodel, downmodel in zip(self.model,self.downsampler): - z = submodel(z,temb=None) - z = downmodel(z) - - if self.do_reshape: - z = rearrange(z,'b c h w -> b (h w) c') - return z - diff --git a/ldm/modules/diffusionmodules/openaimodel.py b/ldm/modules/diffusionmodules/openaimodel.py deleted file mode 100644 index fcf95d1e..00000000 --- a/ldm/modules/diffusionmodules/openaimodel.py +++ /dev/null @@ -1,961 +0,0 @@ -from abc import abstractmethod -from functools import partial -import math -from typing import Iterable - -import numpy as np -import torch as th -import torch.nn as nn -import torch.nn.functional as F - -from ldm.modules.diffusionmodules.util import ( - checkpoint, - conv_nd, - linear, - avg_pool_nd, - zero_module, - normalization, - timestep_embedding, -) -from ldm.modules.attention import SpatialTransformer - - -# dummy replace -def convert_module_to_f16(x): - pass - -def convert_module_to_f32(x): - pass - - -## go -class AttentionPool2d(nn.Module): - """ - Adapted from CLIP: https://github.com/openai/CLIP/blob/main/clip/model.py - """ - - def __init__( - self, - spacial_dim: int, - embed_dim: int, - num_heads_channels: int, - output_dim: int = None, - ): - super().__init__() - self.positional_embedding = nn.Parameter(th.randn(embed_dim, spacial_dim ** 2 + 1) / embed_dim ** 0.5) - self.qkv_proj = conv_nd(1, embed_dim, 3 * embed_dim, 1) - self.c_proj = conv_nd(1, embed_dim, output_dim or embed_dim, 1) - self.num_heads = embed_dim // num_heads_channels - self.attention = QKVAttention(self.num_heads) - - def forward(self, x): - b, c, *_spatial = x.shape - x = x.reshape(b, c, -1) # NC(HW) - x = th.cat([x.mean(dim=-1, keepdim=True), x], dim=-1) # NC(HW+1) - x = x + self.positional_embedding[None, :, :].to(x.dtype) # NC(HW+1) - x = self.qkv_proj(x) - x = self.attention(x) - x = self.c_proj(x) - return x[:, :, 0] - - -class TimestepBlock(nn.Module): - """ - Any module where forward() takes timestep embeddings as a second argument. - """ - - @abstractmethod - def forward(self, x, emb): - """ - Apply the module to `x` given `emb` timestep embeddings. - """ - - -class TimestepEmbedSequential(nn.Sequential, TimestepBlock): - """ - A sequential module that passes timestep embeddings to the children that - support it as an extra input. - """ - - def forward(self, x, emb, context=None): - for layer in self: - if isinstance(layer, TimestepBlock): - x = layer(x, emb) - elif isinstance(layer, SpatialTransformer): - x = layer(x, context) - else: - x = layer(x) - return x - - -class Upsample(nn.Module): - """ - An upsampling layer with an optional convolution. - :param channels: channels in the inputs and outputs. - :param use_conv: a bool determining if a convolution is applied. - :param dims: determines if the signal is 1D, 2D, or 3D. If 3D, then - upsampling occurs in the inner-two dimensions. - """ - - def __init__(self, channels, use_conv, dims=2, out_channels=None, padding=1): - super().__init__() - self.channels = channels - self.out_channels = out_channels or channels - self.use_conv = use_conv - self.dims = dims - if use_conv: - self.conv = conv_nd(dims, self.channels, self.out_channels, 3, padding=padding) - - def forward(self, x): - assert x.shape[1] == self.channels - if self.dims == 3: - x = F.interpolate( - x, (x.shape[2], x.shape[3] * 2, x.shape[4] * 2), mode="nearest" - ) - else: - x = F.interpolate(x, scale_factor=2, mode="nearest") - if self.use_conv: - x = self.conv(x) - return x - -class TransposedUpsample(nn.Module): - 'Learned 2x upsampling without padding' - def __init__(self, channels, out_channels=None, ks=5): - super().__init__() - self.channels = channels - self.out_channels = out_channels or channels - - self.up = nn.ConvTranspose2d(self.channels,self.out_channels,kernel_size=ks,stride=2) - - def forward(self,x): - return self.up(x) - - -class Downsample(nn.Module): - """ - A downsampling layer with an optional convolution. - :param channels: channels in the inputs and outputs. - :param use_conv: a bool determining if a convolution is applied. - :param dims: determines if the signal is 1D, 2D, or 3D. If 3D, then - downsampling occurs in the inner-two dimensions. - """ - - def __init__(self, channels, use_conv, dims=2, out_channels=None,padding=1): - super().__init__() - self.channels = channels - self.out_channels = out_channels or channels - self.use_conv = use_conv - self.dims = dims - stride = 2 if dims != 3 else (1, 2, 2) - if use_conv: - self.op = conv_nd( - dims, self.channels, self.out_channels, 3, stride=stride, padding=padding - ) - else: - assert self.channels == self.out_channels - self.op = avg_pool_nd(dims, kernel_size=stride, stride=stride) - - def forward(self, x): - assert x.shape[1] == self.channels - return self.op(x) - - -class ResBlock(TimestepBlock): - """ - A residual block that can optionally change the number of channels. - :param channels: the number of input channels. - :param emb_channels: the number of timestep embedding channels. - :param dropout: the rate of dropout. - :param out_channels: if specified, the number of out channels. - :param use_conv: if True and out_channels is specified, use a spatial - convolution instead of a smaller 1x1 convolution to change the - channels in the skip connection. - :param dims: determines if the signal is 1D, 2D, or 3D. - :param use_checkpoint: if True, use gradient checkpointing on this module. - :param up: if True, use this block for upsampling. - :param down: if True, use this block for downsampling. - """ - - def __init__( - self, - channels, - emb_channels, - dropout, - out_channels=None, - use_conv=False, - use_scale_shift_norm=False, - dims=2, - use_checkpoint=False, - up=False, - down=False, - ): - super().__init__() - self.channels = channels - self.emb_channels = emb_channels - self.dropout = dropout - self.out_channels = out_channels or channels - self.use_conv = use_conv - self.use_checkpoint = use_checkpoint - self.use_scale_shift_norm = use_scale_shift_norm - - self.in_layers = nn.Sequential( - normalization(channels), - nn.SiLU(), - conv_nd(dims, channels, self.out_channels, 3, padding=1), - ) - - self.updown = up or down - - if up: - self.h_upd = Upsample(channels, False, dims) - self.x_upd = Upsample(channels, False, dims) - elif down: - self.h_upd = Downsample(channels, False, dims) - self.x_upd = Downsample(channels, False, dims) - else: - self.h_upd = self.x_upd = nn.Identity() - - self.emb_layers = nn.Sequential( - nn.SiLU(), - linear( - emb_channels, - 2 * self.out_channels if use_scale_shift_norm else self.out_channels, - ), - ) - self.out_layers = nn.Sequential( - normalization(self.out_channels), - nn.SiLU(), - nn.Dropout(p=dropout), - zero_module( - conv_nd(dims, self.out_channels, self.out_channels, 3, padding=1) - ), - ) - - if self.out_channels == channels: - self.skip_connection = nn.Identity() - elif use_conv: - self.skip_connection = conv_nd( - dims, channels, self.out_channels, 3, padding=1 - ) - else: - self.skip_connection = conv_nd(dims, channels, self.out_channels, 1) - - def forward(self, x, emb): - """ - Apply the block to a Tensor, conditioned on a timestep embedding. - :param x: an [N x C x ...] Tensor of features. - :param emb: an [N x emb_channels] Tensor of timestep embeddings. - :return: an [N x C x ...] Tensor of outputs. - """ - return checkpoint( - self._forward, (x, emb), self.parameters(), self.use_checkpoint - ) - - - def _forward(self, x, emb): - if self.updown: - in_rest, in_conv = self.in_layers[:-1], self.in_layers[-1] - h = in_rest(x) - h = self.h_upd(h) - x = self.x_upd(x) - h = in_conv(h) - else: - h = self.in_layers(x) - emb_out = self.emb_layers(emb).type(h.dtype) - while len(emb_out.shape) < len(h.shape): - emb_out = emb_out[..., None] - if self.use_scale_shift_norm: - out_norm, out_rest = self.out_layers[0], self.out_layers[1:] - scale, shift = th.chunk(emb_out, 2, dim=1) - h = out_norm(h) * (1 + scale) + shift - h = out_rest(h) - else: - h = h + emb_out - h = self.out_layers(h) - return self.skip_connection(x) + h - - -class AttentionBlock(nn.Module): - """ - An attention block that allows spatial positions to attend to each other. - Originally ported from here, but adapted to the N-d case. - https://github.com/hojonathanho/diffusion/blob/1e0dceb3b3495bbe19116a5e1b3596cd0706c543/diffusion_tf/models/unet.py#L66. - """ - - def __init__( - self, - channels, - num_heads=1, - num_head_channels=-1, - use_checkpoint=False, - use_new_attention_order=False, - ): - super().__init__() - self.channels = channels - if num_head_channels == -1: - self.num_heads = num_heads - else: - assert ( - channels % num_head_channels == 0 - ), f"q,k,v channels {channels} is not divisible by num_head_channels {num_head_channels}" - self.num_heads = channels // num_head_channels - self.use_checkpoint = use_checkpoint - self.norm = normalization(channels) - self.qkv = conv_nd(1, channels, channels * 3, 1) - if use_new_attention_order: - # split qkv before split heads - self.attention = QKVAttention(self.num_heads) - else: - # split heads before split qkv - self.attention = QKVAttentionLegacy(self.num_heads) - - self.proj_out = zero_module(conv_nd(1, channels, channels, 1)) - - def forward(self, x): - return checkpoint(self._forward, (x,), self.parameters(), True) # TODO: check checkpoint usage, is True # TODO: fix the .half call!!! - #return pt_checkpoint(self._forward, x) # pytorch - - def _forward(self, x): - b, c, *spatial = x.shape - x = x.reshape(b, c, -1) - qkv = self.qkv(self.norm(x)) - h = self.attention(qkv) - h = self.proj_out(h) - return (x + h).reshape(b, c, *spatial) - - -def count_flops_attn(model, _x, y): - """ - A counter for the `thop` package to count the operations in an - attention operation. - Meant to be used like: - macs, params = thop.profile( - model, - inputs=(inputs, timestamps), - custom_ops={QKVAttention: QKVAttention.count_flops}, - ) - """ - b, c, *spatial = y[0].shape - num_spatial = int(np.prod(spatial)) - # We perform two matmuls with the same number of ops. - # The first computes the weight matrix, the second computes - # the combination of the value vectors. - matmul_ops = 2 * b * (num_spatial ** 2) * c - model.total_ops += th.DoubleTensor([matmul_ops]) - - -class QKVAttentionLegacy(nn.Module): - """ - A module which performs QKV attention. Matches legacy QKVAttention + input/ouput heads shaping - """ - - def __init__(self, n_heads): - super().__init__() - self.n_heads = n_heads - - def forward(self, qkv): - """ - Apply QKV attention. - :param qkv: an [N x (H * 3 * C) x T] tensor of Qs, Ks, and Vs. - :return: an [N x (H * C) x T] tensor after attention. - """ - bs, width, length = qkv.shape - assert width % (3 * self.n_heads) == 0 - ch = width // (3 * self.n_heads) - q, k, v = qkv.reshape(bs * self.n_heads, ch * 3, length).split(ch, dim=1) - scale = 1 / math.sqrt(math.sqrt(ch)) - weight = th.einsum( - "bct,bcs->bts", q * scale, k * scale - ) # More stable with f16 than dividing afterwards - weight = th.softmax(weight.float(), dim=-1).type(weight.dtype) - a = th.einsum("bts,bcs->bct", weight, v) - return a.reshape(bs, -1, length) - - @staticmethod - def count_flops(model, _x, y): - return count_flops_attn(model, _x, y) - - -class QKVAttention(nn.Module): - """ - A module which performs QKV attention and splits in a different order. - """ - - def __init__(self, n_heads): - super().__init__() - self.n_heads = n_heads - - def forward(self, qkv): - """ - Apply QKV attention. - :param qkv: an [N x (3 * H * C) x T] tensor of Qs, Ks, and Vs. - :return: an [N x (H * C) x T] tensor after attention. - """ - bs, width, length = qkv.shape - assert width % (3 * self.n_heads) == 0 - ch = width // (3 * self.n_heads) - q, k, v = qkv.chunk(3, dim=1) - scale = 1 / math.sqrt(math.sqrt(ch)) - weight = th.einsum( - "bct,bcs->bts", - (q * scale).view(bs * self.n_heads, ch, length), - (k * scale).view(bs * self.n_heads, ch, length), - ) # More stable with f16 than dividing afterwards - weight = th.softmax(weight.float(), dim=-1).type(weight.dtype) - a = th.einsum("bts,bcs->bct", weight, v.reshape(bs * self.n_heads, ch, length)) - return a.reshape(bs, -1, length) - - @staticmethod - def count_flops(model, _x, y): - return count_flops_attn(model, _x, y) - - -class UNetModel(nn.Module): - """ - The full UNet model with attention and timestep embedding. - :param in_channels: channels in the input Tensor. - :param model_channels: base channel count for the model. - :param out_channels: channels in the output Tensor. - :param num_res_blocks: number of residual blocks per downsample. - :param attention_resolutions: a collection of downsample rates at which - attention will take place. May be a set, list, or tuple. - For example, if this contains 4, then at 4x downsampling, attention - will be used. - :param dropout: the dropout probability. - :param channel_mult: channel multiplier for each level of the UNet. - :param conv_resample: if True, use learned convolutions for upsampling and - downsampling. - :param dims: determines if the signal is 1D, 2D, or 3D. - :param num_classes: if specified (as an int), then this model will be - class-conditional with `num_classes` classes. - :param use_checkpoint: use gradient checkpointing to reduce memory usage. - :param num_heads: the number of attention heads in each attention layer. - :param num_heads_channels: if specified, ignore num_heads and instead use - a fixed channel width per attention head. - :param num_heads_upsample: works with num_heads to set a different number - of heads for upsampling. Deprecated. - :param use_scale_shift_norm: use a FiLM-like conditioning mechanism. - :param resblock_updown: use residual blocks for up/downsampling. - :param use_new_attention_order: use a different attention pattern for potentially - increased efficiency. - """ - - def __init__( - self, - image_size, - in_channels, - model_channels, - out_channels, - num_res_blocks, - attention_resolutions, - dropout=0, - channel_mult=(1, 2, 4, 8), - conv_resample=True, - dims=2, - num_classes=None, - use_checkpoint=False, - use_fp16=False, - num_heads=-1, - num_head_channels=-1, - num_heads_upsample=-1, - use_scale_shift_norm=False, - resblock_updown=False, - use_new_attention_order=False, - use_spatial_transformer=False, # custom transformer support - transformer_depth=1, # custom transformer support - context_dim=None, # custom transformer support - n_embed=None, # custom support for prediction of discrete ids into codebook of first stage vq model - legacy=True, - ): - super().__init__() - if use_spatial_transformer: - assert context_dim is not None, 'Fool!! You forgot to include the dimension of your cross-attention conditioning...' - - if context_dim is not None: - assert use_spatial_transformer, 'Fool!! You forgot to use the spatial transformer for your cross-attention conditioning...' - from omegaconf.listconfig import ListConfig - if type(context_dim) == ListConfig: - context_dim = list(context_dim) - - if num_heads_upsample == -1: - num_heads_upsample = num_heads - - if num_heads == -1: - assert num_head_channels != -1, 'Either num_heads or num_head_channels has to be set' - - if num_head_channels == -1: - assert num_heads != -1, 'Either num_heads or num_head_channels has to be set' - - self.image_size = image_size - self.in_channels = in_channels - self.model_channels = model_channels - self.out_channels = out_channels - self.num_res_blocks = num_res_blocks - self.attention_resolutions = attention_resolutions - self.dropout = dropout - self.channel_mult = channel_mult - self.conv_resample = conv_resample - self.num_classes = num_classes - self.use_checkpoint = use_checkpoint - self.dtype = th.float16 if use_fp16 else th.float32 - self.num_heads = num_heads - self.num_head_channels = num_head_channels - self.num_heads_upsample = num_heads_upsample - self.predict_codebook_ids = n_embed is not None - - time_embed_dim = model_channels * 4 - self.time_embed = nn.Sequential( - linear(model_channels, time_embed_dim), - nn.SiLU(), - linear(time_embed_dim, time_embed_dim), - ) - - if self.num_classes is not None: - self.label_emb = nn.Embedding(num_classes, time_embed_dim) - - self.input_blocks = nn.ModuleList( - [ - TimestepEmbedSequential( - conv_nd(dims, in_channels, model_channels, 3, padding=1) - ) - ] - ) - self._feature_size = model_channels - input_block_chans = [model_channels] - ch = model_channels - ds = 1 - for level, mult in enumerate(channel_mult): - for _ in range(num_res_blocks): - layers = [ - ResBlock( - ch, - time_embed_dim, - dropout, - out_channels=mult * model_channels, - dims=dims, - use_checkpoint=use_checkpoint, - use_scale_shift_norm=use_scale_shift_norm, - ) - ] - ch = mult * model_channels - if ds in attention_resolutions: - if num_head_channels == -1: - dim_head = ch // num_heads - else: - num_heads = ch // num_head_channels - dim_head = num_head_channels - if legacy: - #num_heads = 1 - dim_head = ch // num_heads if use_spatial_transformer else num_head_channels - layers.append( - AttentionBlock( - ch, - use_checkpoint=use_checkpoint, - num_heads=num_heads, - num_head_channels=dim_head, - use_new_attention_order=use_new_attention_order, - ) if not use_spatial_transformer else SpatialTransformer( - ch, num_heads, dim_head, depth=transformer_depth, context_dim=context_dim - ) - ) - self.input_blocks.append(TimestepEmbedSequential(*layers)) - self._feature_size += ch - input_block_chans.append(ch) - if level != len(channel_mult) - 1: - out_ch = ch - self.input_blocks.append( - TimestepEmbedSequential( - ResBlock( - ch, - time_embed_dim, - dropout, - out_channels=out_ch, - dims=dims, - use_checkpoint=use_checkpoint, - use_scale_shift_norm=use_scale_shift_norm, - down=True, - ) - if resblock_updown - else Downsample( - ch, conv_resample, dims=dims, out_channels=out_ch - ) - ) - ) - ch = out_ch - input_block_chans.append(ch) - ds *= 2 - self._feature_size += ch - - if num_head_channels == -1: - dim_head = ch // num_heads - else: - num_heads = ch // num_head_channels - dim_head = num_head_channels - if legacy: - #num_heads = 1 - dim_head = ch // num_heads if use_spatial_transformer else num_head_channels - self.middle_block = TimestepEmbedSequential( - ResBlock( - ch, - time_embed_dim, - dropout, - dims=dims, - use_checkpoint=use_checkpoint, - use_scale_shift_norm=use_scale_shift_norm, - ), - AttentionBlock( - ch, - use_checkpoint=use_checkpoint, - num_heads=num_heads, - num_head_channels=dim_head, - use_new_attention_order=use_new_attention_order, - ) if not use_spatial_transformer else SpatialTransformer( - ch, num_heads, dim_head, depth=transformer_depth, context_dim=context_dim - ), - ResBlock( - ch, - time_embed_dim, - dropout, - dims=dims, - use_checkpoint=use_checkpoint, - use_scale_shift_norm=use_scale_shift_norm, - ), - ) - self._feature_size += ch - - self.output_blocks = nn.ModuleList([]) - for level, mult in list(enumerate(channel_mult))[::-1]: - for i in range(num_res_blocks + 1): - ich = input_block_chans.pop() - layers = [ - ResBlock( - ch + ich, - time_embed_dim, - dropout, - out_channels=model_channels * mult, - dims=dims, - use_checkpoint=use_checkpoint, - use_scale_shift_norm=use_scale_shift_norm, - ) - ] - ch = model_channels * mult - if ds in attention_resolutions: - if num_head_channels == -1: - dim_head = ch // num_heads - else: - num_heads = ch // num_head_channels - dim_head = num_head_channels - if legacy: - #num_heads = 1 - dim_head = ch // num_heads if use_spatial_transformer else num_head_channels - layers.append( - AttentionBlock( - ch, - use_checkpoint=use_checkpoint, - num_heads=num_heads_upsample, - num_head_channels=dim_head, - use_new_attention_order=use_new_attention_order, - ) if not use_spatial_transformer else SpatialTransformer( - ch, num_heads, dim_head, depth=transformer_depth, context_dim=context_dim - ) - ) - if level and i == num_res_blocks: - out_ch = ch - layers.append( - ResBlock( - ch, - time_embed_dim, - dropout, - out_channels=out_ch, - dims=dims, - use_checkpoint=use_checkpoint, - use_scale_shift_norm=use_scale_shift_norm, - up=True, - ) - if resblock_updown - else Upsample(ch, conv_resample, dims=dims, out_channels=out_ch) - ) - ds //= 2 - self.output_blocks.append(TimestepEmbedSequential(*layers)) - self._feature_size += ch - - self.out = nn.Sequential( - normalization(ch), - nn.SiLU(), - zero_module(conv_nd(dims, model_channels, out_channels, 3, padding=1)), - ) - if self.predict_codebook_ids: - self.id_predictor = nn.Sequential( - normalization(ch), - conv_nd(dims, model_channels, n_embed, 1), - #nn.LogSoftmax(dim=1) # change to cross_entropy and produce non-normalized logits - ) - - def convert_to_fp16(self): - """ - Convert the torso of the model to float16. - """ - self.input_blocks.apply(convert_module_to_f16) - self.middle_block.apply(convert_module_to_f16) - self.output_blocks.apply(convert_module_to_f16) - - def convert_to_fp32(self): - """ - Convert the torso of the model to float32. - """ - self.input_blocks.apply(convert_module_to_f32) - self.middle_block.apply(convert_module_to_f32) - self.output_blocks.apply(convert_module_to_f32) - - def forward(self, x, timesteps=None, context=None, y=None,**kwargs): - """ - Apply the model to an input batch. - :param x: an [N x C x ...] Tensor of inputs. - :param timesteps: a 1-D batch of timesteps. - :param context: conditioning plugged in via crossattn - :param y: an [N] Tensor of labels, if class-conditional. - :return: an [N x C x ...] Tensor of outputs. - """ - assert (y is not None) == ( - self.num_classes is not None - ), "must specify y if and only if the model is class-conditional" - hs = [] - t_emb = timestep_embedding(timesteps, self.model_channels, repeat_only=False) - emb = self.time_embed(t_emb) - - if self.num_classes is not None: - assert y.shape == (x.shape[0],) - emb = emb + self.label_emb(y) - - h = x.type(self.dtype) - for module in self.input_blocks: - h = module(h, emb, context) - hs.append(h) - h = self.middle_block(h, emb, context) - for module in self.output_blocks: - h = th.cat([h, hs.pop()], dim=1) - h = module(h, emb, context) - h = h.type(x.dtype) - if self.predict_codebook_ids: - return self.id_predictor(h) - else: - return self.out(h) - - -class EncoderUNetModel(nn.Module): - """ - The half UNet model with attention and timestep embedding. - For usage, see UNet. - """ - - def __init__( - self, - image_size, - in_channels, - model_channels, - out_channels, - num_res_blocks, - attention_resolutions, - dropout=0, - channel_mult=(1, 2, 4, 8), - conv_resample=True, - dims=2, - use_checkpoint=False, - use_fp16=False, - num_heads=1, - num_head_channels=-1, - num_heads_upsample=-1, - use_scale_shift_norm=False, - resblock_updown=False, - use_new_attention_order=False, - pool="adaptive", - *args, - **kwargs - ): - super().__init__() - - if num_heads_upsample == -1: - num_heads_upsample = num_heads - - self.in_channels = in_channels - self.model_channels = model_channels - self.out_channels = out_channels - self.num_res_blocks = num_res_blocks - self.attention_resolutions = attention_resolutions - self.dropout = dropout - self.channel_mult = channel_mult - self.conv_resample = conv_resample - self.use_checkpoint = use_checkpoint - self.dtype = th.float16 if use_fp16 else th.float32 - self.num_heads = num_heads - self.num_head_channels = num_head_channels - self.num_heads_upsample = num_heads_upsample - - time_embed_dim = model_channels * 4 - self.time_embed = nn.Sequential( - linear(model_channels, time_embed_dim), - nn.SiLU(), - linear(time_embed_dim, time_embed_dim), - ) - - self.input_blocks = nn.ModuleList( - [ - TimestepEmbedSequential( - conv_nd(dims, in_channels, model_channels, 3, padding=1) - ) - ] - ) - self._feature_size = model_channels - input_block_chans = [model_channels] - ch = model_channels - ds = 1 - for level, mult in enumerate(channel_mult): - for _ in range(num_res_blocks): - layers = [ - ResBlock( - ch, - time_embed_dim, - dropout, - out_channels=mult * model_channels, - dims=dims, - use_checkpoint=use_checkpoint, - use_scale_shift_norm=use_scale_shift_norm, - ) - ] - ch = mult * model_channels - if ds in attention_resolutions: - layers.append( - AttentionBlock( - ch, - use_checkpoint=use_checkpoint, - num_heads=num_heads, - num_head_channels=num_head_channels, - use_new_attention_order=use_new_attention_order, - ) - ) - self.input_blocks.append(TimestepEmbedSequential(*layers)) - self._feature_size += ch - input_block_chans.append(ch) - if level != len(channel_mult) - 1: - out_ch = ch - self.input_blocks.append( - TimestepEmbedSequential( - ResBlock( - ch, - time_embed_dim, - dropout, - out_channels=out_ch, - dims=dims, - use_checkpoint=use_checkpoint, - use_scale_shift_norm=use_scale_shift_norm, - down=True, - ) - if resblock_updown - else Downsample( - ch, conv_resample, dims=dims, out_channels=out_ch - ) - ) - ) - ch = out_ch - input_block_chans.append(ch) - ds *= 2 - self._feature_size += ch - - self.middle_block = TimestepEmbedSequential( - ResBlock( - ch, - time_embed_dim, - dropout, - dims=dims, - use_checkpoint=use_checkpoint, - use_scale_shift_norm=use_scale_shift_norm, - ), - AttentionBlock( - ch, - use_checkpoint=use_checkpoint, - num_heads=num_heads, - num_head_channels=num_head_channels, - use_new_attention_order=use_new_attention_order, - ), - ResBlock( - ch, - time_embed_dim, - dropout, - dims=dims, - use_checkpoint=use_checkpoint, - use_scale_shift_norm=use_scale_shift_norm, - ), - ) - self._feature_size += ch - self.pool = pool - if pool == "adaptive": - self.out = nn.Sequential( - normalization(ch), - nn.SiLU(), - nn.AdaptiveAvgPool2d((1, 1)), - zero_module(conv_nd(dims, ch, out_channels, 1)), - nn.Flatten(), - ) - elif pool == "attention": - assert num_head_channels != -1 - self.out = nn.Sequential( - normalization(ch), - nn.SiLU(), - AttentionPool2d( - (image_size // ds), ch, num_head_channels, out_channels - ), - ) - elif pool == "spatial": - self.out = nn.Sequential( - nn.Linear(self._feature_size, 2048), - nn.ReLU(), - nn.Linear(2048, self.out_channels), - ) - elif pool == "spatial_v2": - self.out = nn.Sequential( - nn.Linear(self._feature_size, 2048), - normalization(2048), - nn.SiLU(), - nn.Linear(2048, self.out_channels), - ) - else: - raise NotImplementedError(f"Unexpected {pool} pooling") - - def convert_to_fp16(self): - """ - Convert the torso of the model to float16. - """ - self.input_blocks.apply(convert_module_to_f16) - self.middle_block.apply(convert_module_to_f16) - - def convert_to_fp32(self): - """ - Convert the torso of the model to float32. - """ - self.input_blocks.apply(convert_module_to_f32) - self.middle_block.apply(convert_module_to_f32) - - def forward(self, x, timesteps): - """ - Apply the model to an input batch. - :param x: an [N x C x ...] Tensor of inputs. - :param timesteps: a 1-D batch of timesteps. - :return: an [N x K] Tensor of outputs. - """ - emb = self.time_embed(timestep_embedding(timesteps, self.model_channels)) - - results = [] - h = x.type(self.dtype) - for module in self.input_blocks: - h = module(h, emb) - if self.pool.startswith("spatial"): - results.append(h.type(x.dtype).mean(dim=(2, 3))) - h = self.middle_block(h, emb) - if self.pool.startswith("spatial"): - results.append(h.type(x.dtype).mean(dim=(2, 3))) - h = th.cat(results, axis=-1) - return self.out(h) - else: - h = h.type(x.dtype) - return self.out(h) - diff --git a/ldm/modules/diffusionmodules/util.py b/ldm/modules/diffusionmodules/util.py deleted file mode 100644 index a952e6c4..00000000 --- a/ldm/modules/diffusionmodules/util.py +++ /dev/null @@ -1,267 +0,0 @@ -# adopted from -# https://github.com/openai/improved-diffusion/blob/main/improved_diffusion/gaussian_diffusion.py -# and -# https://github.com/lucidrains/denoising-diffusion-pytorch/blob/7706bdfc6f527f58d33f84b7b522e61e6e3164b3/denoising_diffusion_pytorch/denoising_diffusion_pytorch.py -# and -# https://github.com/openai/guided-diffusion/blob/0ba878e517b276c45d1195eb29f6f5f72659a05b/guided_diffusion/nn.py -# -# thanks! - - -import os -import math -import torch -import torch.nn as nn -import numpy as np -from einops import repeat - -from ldm.util import instantiate_from_config - - -def make_beta_schedule(schedule, n_timestep, linear_start=1e-4, linear_end=2e-2, cosine_s=8e-3): - if schedule == "linear": - betas = ( - torch.linspace(linear_start ** 0.5, linear_end ** 0.5, n_timestep, dtype=torch.float64) ** 2 - ) - - elif schedule == "cosine": - timesteps = ( - torch.arange(n_timestep + 1, dtype=torch.float64) / n_timestep + cosine_s - ) - alphas = timesteps / (1 + cosine_s) * np.pi / 2 - alphas = torch.cos(alphas).pow(2) - alphas = alphas / alphas[0] - betas = 1 - alphas[1:] / alphas[:-1] - betas = np.clip(betas, a_min=0, a_max=0.999) - - elif schedule == "sqrt_linear": - betas = torch.linspace(linear_start, linear_end, n_timestep, dtype=torch.float64) - elif schedule == "sqrt": - betas = torch.linspace(linear_start, linear_end, n_timestep, dtype=torch.float64) ** 0.5 - else: - raise ValueError(f"schedule '{schedule}' unknown.") - return betas.numpy() - - -def make_ddim_timesteps(ddim_discr_method, num_ddim_timesteps, num_ddpm_timesteps, verbose=True): - if ddim_discr_method == 'uniform': - c = num_ddpm_timesteps // num_ddim_timesteps - ddim_timesteps = np.asarray(list(range(0, num_ddpm_timesteps, c))) - elif ddim_discr_method == 'quad': - ddim_timesteps = ((np.linspace(0, np.sqrt(num_ddpm_timesteps * .8), num_ddim_timesteps)) ** 2).astype(int) - else: - raise NotImplementedError(f'There is no ddim discretization method called "{ddim_discr_method}"') - - # assert ddim_timesteps.shape[0] == num_ddim_timesteps - # add one to get the final alpha values right (the ones from first scale to data during sampling) - steps_out = ddim_timesteps + 1 - if verbose: - print(f'Selected timesteps for ddim sampler: {steps_out}') - return steps_out - - -def make_ddim_sampling_parameters(alphacums, ddim_timesteps, eta, verbose=True): - # select alphas for computing the variance schedule - alphas = alphacums[ddim_timesteps] - alphas_prev = np.asarray([alphacums[0]] + alphacums[ddim_timesteps[:-1]].tolist()) - - # according the the formula provided in https://arxiv.org/abs/2010.02502 - sigmas = eta * np.sqrt((1 - alphas_prev) / (1 - alphas) * (1 - alphas / alphas_prev)) - if verbose: - print(f'Selected alphas for ddim sampler: a_t: {alphas}; a_(t-1): {alphas_prev}') - print(f'For the chosen value of eta, which is {eta}, ' - f'this results in the following sigma_t schedule for ddim sampler {sigmas}') - return sigmas, alphas, alphas_prev - - -def betas_for_alpha_bar(num_diffusion_timesteps, alpha_bar, max_beta=0.999): - """ - Create a beta schedule that discretizes the given alpha_t_bar function, - which defines the cumulative product of (1-beta) over time from t = [0,1]. - :param num_diffusion_timesteps: the number of betas to produce. - :param alpha_bar: a lambda that takes an argument t from 0 to 1 and - produces the cumulative product of (1-beta) up to that - part of the diffusion process. - :param max_beta: the maximum beta to use; use values lower than 1 to - prevent singularities. - """ - betas = [] - for i in range(num_diffusion_timesteps): - t1 = i / num_diffusion_timesteps - t2 = (i + 1) / num_diffusion_timesteps - betas.append(min(1 - alpha_bar(t2) / alpha_bar(t1), max_beta)) - return np.array(betas) - - -def extract_into_tensor(a, t, x_shape): - b, *_ = t.shape - out = a.gather(-1, t) - return out.reshape(b, *((1,) * (len(x_shape) - 1))) - - -def checkpoint(func, inputs, params, flag): - """ - Evaluate a function without caching intermediate activations, allowing for - reduced memory at the expense of extra compute in the backward pass. - :param func: the function to evaluate. - :param inputs: the argument sequence to pass to `func`. - :param params: a sequence of parameters `func` depends on but does not - explicitly take as arguments. - :param flag: if False, disable gradient checkpointing. - """ - if flag: - args = tuple(inputs) + tuple(params) - return CheckpointFunction.apply(func, len(inputs), *args) - else: - return func(*inputs) - - -class CheckpointFunction(torch.autograd.Function): - @staticmethod - def forward(ctx, run_function, length, *args): - ctx.run_function = run_function - ctx.input_tensors = list(args[:length]) - ctx.input_params = list(args[length:]) - - with torch.no_grad(): - output_tensors = ctx.run_function(*ctx.input_tensors) - return output_tensors - - @staticmethod - def backward(ctx, *output_grads): - ctx.input_tensors = [x.detach().requires_grad_(True) for x in ctx.input_tensors] - with torch.enable_grad(): - # Fixes a bug where the first op in run_function modifies the - # Tensor storage in place, which is not allowed for detach()'d - # Tensors. - shallow_copies = [x.view_as(x) for x in ctx.input_tensors] - output_tensors = ctx.run_function(*shallow_copies) - input_grads = torch.autograd.grad( - output_tensors, - ctx.input_tensors + ctx.input_params, - output_grads, - allow_unused=True, - ) - del ctx.input_tensors - del ctx.input_params - del output_tensors - return (None, None) + input_grads - - -def timestep_embedding(timesteps, dim, max_period=10000, repeat_only=False): - """ - Create sinusoidal timestep embeddings. - :param timesteps: a 1-D Tensor of N indices, one per batch element. - These may be fractional. - :param dim: the dimension of the output. - :param max_period: controls the minimum frequency of the embeddings. - :return: an [N x dim] Tensor of positional embeddings. - """ - if not repeat_only: - half = dim // 2 - freqs = torch.exp( - -math.log(max_period) * torch.arange(start=0, end=half, dtype=torch.float32) / half - ).to(device=timesteps.device) - args = timesteps[:, None].float() * freqs[None] - embedding = torch.cat([torch.cos(args), torch.sin(args)], dim=-1) - if dim % 2: - embedding = torch.cat([embedding, torch.zeros_like(embedding[:, :1])], dim=-1) - else: - embedding = repeat(timesteps, 'b -> b d', d=dim) - return embedding - - -def zero_module(module): - """ - Zero out the parameters of a module and return it. - """ - for p in module.parameters(): - p.detach().zero_() - return module - - -def scale_module(module, scale): - """ - Scale the parameters of a module and return it. - """ - for p in module.parameters(): - p.detach().mul_(scale) - return module - - -def mean_flat(tensor): - """ - Take the mean over all non-batch dimensions. - """ - return tensor.mean(dim=list(range(1, len(tensor.shape)))) - - -def normalization(channels): - """ - Make a standard normalization layer. - :param channels: number of input channels. - :return: an nn.Module for normalization. - """ - return GroupNorm32(32, channels) - - -# PyTorch 1.7 has SiLU, but we support PyTorch 1.5. -class SiLU(nn.Module): - def forward(self, x): - return x * torch.sigmoid(x) - - -class GroupNorm32(nn.GroupNorm): - def forward(self, x): - return super().forward(x.float()).type(x.dtype) - -def conv_nd(dims, *args, **kwargs): - """ - Create a 1D, 2D, or 3D convolution module. - """ - if dims == 1: - return nn.Conv1d(*args, **kwargs) - elif dims == 2: - return nn.Conv2d(*args, **kwargs) - elif dims == 3: - return nn.Conv3d(*args, **kwargs) - raise ValueError(f"unsupported dimensions: {dims}") - - -def linear(*args, **kwargs): - """ - Create a linear module. - """ - return nn.Linear(*args, **kwargs) - - -def avg_pool_nd(dims, *args, **kwargs): - """ - Create a 1D, 2D, or 3D average pooling module. - """ - if dims == 1: - return nn.AvgPool1d(*args, **kwargs) - elif dims == 2: - return nn.AvgPool2d(*args, **kwargs) - elif dims == 3: - return nn.AvgPool3d(*args, **kwargs) - raise ValueError(f"unsupported dimensions: {dims}") - - -class HybridConditioner(nn.Module): - - def __init__(self, c_concat_config, c_crossattn_config): - super().__init__() - self.concat_conditioner = instantiate_from_config(c_concat_config) - self.crossattn_conditioner = instantiate_from_config(c_crossattn_config) - - def forward(self, c_concat, c_crossattn): - c_concat = self.concat_conditioner(c_concat) - c_crossattn = self.crossattn_conditioner(c_crossattn) - return {'c_concat': [c_concat], 'c_crossattn': [c_crossattn]} - - -def noise_like(shape, device, repeat=False): - repeat_noise = lambda: torch.randn((1, *shape[1:]), device=device).repeat(shape[0], *((1,) * (len(shape) - 1))) - noise = lambda: torch.randn(shape, device=device) - return repeat_noise() if repeat else noise() \ No newline at end of file diff --git a/ldm/modules/distributions/__init__.py b/ldm/modules/distributions/__init__.py deleted file mode 100644 index e69de29b..00000000 diff --git a/ldm/modules/distributions/distributions.py b/ldm/modules/distributions/distributions.py deleted file mode 100644 index f2b8ef90..00000000 --- a/ldm/modules/distributions/distributions.py +++ /dev/null @@ -1,92 +0,0 @@ -import torch -import numpy as np - - -class AbstractDistribution: - def sample(self): - raise NotImplementedError() - - def mode(self): - raise NotImplementedError() - - -class DiracDistribution(AbstractDistribution): - def __init__(self, value): - self.value = value - - def sample(self): - return self.value - - def mode(self): - return self.value - - -class DiagonalGaussianDistribution(object): - def __init__(self, parameters, deterministic=False): - self.parameters = parameters - self.mean, self.logvar = torch.chunk(parameters, 2, dim=1) - self.logvar = torch.clamp(self.logvar, -30.0, 20.0) - self.deterministic = deterministic - self.std = torch.exp(0.5 * self.logvar) - self.var = torch.exp(self.logvar) - if self.deterministic: - self.var = self.std = torch.zeros_like(self.mean).to(device=self.parameters.device) - - def sample(self): - x = self.mean + self.std * torch.randn(self.mean.shape).to(device=self.parameters.device) - return x - - def kl(self, other=None): - if self.deterministic: - return torch.Tensor([0.]) - else: - if other is None: - return 0.5 * torch.sum(torch.pow(self.mean, 2) - + self.var - 1.0 - self.logvar, - dim=[1, 2, 3]) - else: - return 0.5 * torch.sum( - torch.pow(self.mean - other.mean, 2) / other.var - + self.var / other.var - 1.0 - self.logvar + other.logvar, - dim=[1, 2, 3]) - - def nll(self, sample, dims=[1,2,3]): - if self.deterministic: - return torch.Tensor([0.]) - logtwopi = np.log(2.0 * np.pi) - return 0.5 * torch.sum( - logtwopi + self.logvar + torch.pow(sample - self.mean, 2) / self.var, - dim=dims) - - def mode(self): - return self.mean - - -def normal_kl(mean1, logvar1, mean2, logvar2): - """ - source: https://github.com/openai/guided-diffusion/blob/27c20a8fab9cb472df5d6bdd6c8d11c8f430b924/guided_diffusion/losses.py#L12 - Compute the KL divergence between two gaussians. - Shapes are automatically broadcasted, so batches can be compared to - scalars, among other use cases. - """ - tensor = None - for obj in (mean1, logvar1, mean2, logvar2): - if isinstance(obj, torch.Tensor): - tensor = obj - break - assert tensor is not None, "at least one argument must be a Tensor" - - # Force variances to be Tensors. Broadcasting helps convert scalars to - # Tensors, but it does not work for torch.exp(). - logvar1, logvar2 = [ - x if isinstance(x, torch.Tensor) else torch.tensor(x).to(tensor) - for x in (logvar1, logvar2) - ] - - return 0.5 * ( - -1.0 - + logvar2 - - logvar1 - + torch.exp(logvar1 - logvar2) - + ((mean1 - mean2) ** 2) * torch.exp(-logvar2) - ) diff --git a/ldm/modules/ema.py b/ldm/modules/ema.py deleted file mode 100644 index c8c75af4..00000000 --- a/ldm/modules/ema.py +++ /dev/null @@ -1,76 +0,0 @@ -import torch -from torch import nn - - -class LitEma(nn.Module): - def __init__(self, model, decay=0.9999, use_num_upates=True): - super().__init__() - if decay < 0.0 or decay > 1.0: - raise ValueError('Decay must be between 0 and 1') - - self.m_name2s_name = {} - self.register_buffer('decay', torch.tensor(decay, dtype=torch.float32)) - self.register_buffer('num_updates', torch.tensor(0,dtype=torch.int) if use_num_upates - else torch.tensor(-1,dtype=torch.int)) - - for name, p in model.named_parameters(): - if p.requires_grad: - #remove as '.'-character is not allowed in buffers - s_name = name.replace('.','') - self.m_name2s_name.update({name:s_name}) - self.register_buffer(s_name,p.clone().detach().data) - - self.collected_params = [] - - def forward(self,model): - decay = self.decay - - if self.num_updates >= 0: - self.num_updates += 1 - decay = min(self.decay,(1 + self.num_updates) / (10 + self.num_updates)) - - one_minus_decay = 1.0 - decay - - with torch.no_grad(): - m_param = dict(model.named_parameters()) - shadow_params = dict(self.named_buffers()) - - for key in m_param: - if m_param[key].requires_grad: - sname = self.m_name2s_name[key] - shadow_params[sname] = shadow_params[sname].type_as(m_param[key]) - shadow_params[sname].sub_(one_minus_decay * (shadow_params[sname] - m_param[key])) - else: - assert not key in self.m_name2s_name - - def copy_to(self, model): - m_param = dict(model.named_parameters()) - shadow_params = dict(self.named_buffers()) - for key in m_param: - if m_param[key].requires_grad: - m_param[key].data.copy_(shadow_params[self.m_name2s_name[key]].data) - else: - assert not key in self.m_name2s_name - - def store(self, parameters): - """ - Save the current parameters for restoring later. - Args: - parameters: Iterable of `torch.nn.Parameter`; the parameters to be - temporarily stored. - """ - self.collected_params = [param.clone() for param in parameters] - - def restore(self, parameters): - """ - Restore the parameters stored with the `store` method. - Useful to validate the model with EMA parameters without affecting the - original optimization process. Store the parameters before the - `copy_to` method. After validation (or model saving), use this to - restore the former parameters. - Args: - parameters: Iterable of `torch.nn.Parameter`; the parameters to be - updated with the stored parameters. - """ - for c_param, param in zip(self.collected_params, parameters): - param.data.copy_(c_param.data) diff --git a/ldm/modules/encoders/__init__.py b/ldm/modules/encoders/__init__.py deleted file mode 100644 index e69de29b..00000000 diff --git a/ldm/modules/encoders/modules.py b/ldm/modules/encoders/modules.py deleted file mode 100644 index ededbe43..00000000 --- a/ldm/modules/encoders/modules.py +++ /dev/null @@ -1,234 +0,0 @@ -import torch -import torch.nn as nn -from functools import partial -import clip -from einops import rearrange, repeat -from transformers import CLIPTokenizer, CLIPTextModel -import kornia - -from ldm.modules.x_transformer import Encoder, TransformerWrapper # TODO: can we directly rely on lucidrains code and simply add this as a reuirement? --> test - - -class AbstractEncoder(nn.Module): - def __init__(self): - super().__init__() - - def encode(self, *args, **kwargs): - raise NotImplementedError - - - -class ClassEmbedder(nn.Module): - def __init__(self, embed_dim, n_classes=1000, key='class'): - super().__init__() - self.key = key - self.embedding = nn.Embedding(n_classes, embed_dim) - - def forward(self, batch, key=None): - if key is None: - key = self.key - # this is for use in crossattn - c = batch[key][:, None] - c = self.embedding(c) - return c - - -class TransformerEmbedder(AbstractEncoder): - """Some transformer encoder layers""" - def __init__(self, n_embed, n_layer, vocab_size, max_seq_len=77, device="cuda"): - super().__init__() - self.device = device - self.transformer = TransformerWrapper(num_tokens=vocab_size, max_seq_len=max_seq_len, - attn_layers=Encoder(dim=n_embed, depth=n_layer)) - - def forward(self, tokens): - tokens = tokens.to(self.device) # meh - z = self.transformer(tokens, return_embeddings=True) - return z - - def encode(self, x): - return self(x) - - -class BERTTokenizer(AbstractEncoder): - """ Uses a pretrained BERT tokenizer by huggingface. Vocab size: 30522 (?)""" - def __init__(self, device="cuda", vq_interface=True, max_length=77): - super().__init__() - from transformers import BertTokenizerFast # TODO: add to reuquirements - self.tokenizer = BertTokenizerFast.from_pretrained("bert-base-uncased") - self.device = device - self.vq_interface = vq_interface - self.max_length = max_length - - def forward(self, text): - batch_encoding = self.tokenizer(text, truncation=True, max_length=self.max_length, return_length=True, - return_overflowing_tokens=False, padding="max_length", return_tensors="pt") - tokens = batch_encoding["input_ids"].to(self.device) - return tokens - - @torch.no_grad() - def encode(self, text): - tokens = self(text) - if not self.vq_interface: - return tokens - return None, None, [None, None, tokens] - - def decode(self, text): - return text - - -class BERTEmbedder(AbstractEncoder): - """Uses the BERT tokenizr model and add some transformer encoder layers""" - def __init__(self, n_embed, n_layer, vocab_size=30522, max_seq_len=77, - device="cuda",use_tokenizer=True, embedding_dropout=0.0): - super().__init__() - self.use_tknz_fn = use_tokenizer - if self.use_tknz_fn: - self.tknz_fn = BERTTokenizer(vq_interface=False, max_length=max_seq_len) - self.device = device - self.transformer = TransformerWrapper(num_tokens=vocab_size, max_seq_len=max_seq_len, - attn_layers=Encoder(dim=n_embed, depth=n_layer), - emb_dropout=embedding_dropout) - - def forward(self, text): - if self.use_tknz_fn: - tokens = self.tknz_fn(text)#.to(self.device) - else: - tokens = text - z = self.transformer(tokens, return_embeddings=True) - return z - - def encode(self, text): - # output of length 77 - return self(text) - - -class SpatialRescaler(nn.Module): - def __init__(self, - n_stages=1, - method='bilinear', - multiplier=0.5, - in_channels=3, - out_channels=None, - bias=False): - super().__init__() - self.n_stages = n_stages - assert self.n_stages >= 0 - assert method in ['nearest','linear','bilinear','trilinear','bicubic','area'] - self.multiplier = multiplier - self.interpolator = partial(torch.nn.functional.interpolate, mode=method) - self.remap_output = out_channels is not None - if self.remap_output: - print(f'Spatial Rescaler mapping from {in_channels} to {out_channels} channels after resizing.') - self.channel_mapper = nn.Conv2d(in_channels,out_channels,1,bias=bias) - - def forward(self,x): - for stage in range(self.n_stages): - x = self.interpolator(x, scale_factor=self.multiplier) - - - if self.remap_output: - x = self.channel_mapper(x) - return x - - def encode(self, x): - return self(x) - -class FrozenCLIPEmbedder(AbstractEncoder): - """Uses the CLIP transformer encoder for text (from Hugging Face)""" - def __init__(self, version="openai/clip-vit-large-patch14", device="cuda", max_length=77): - super().__init__() - self.tokenizer = CLIPTokenizer.from_pretrained(version) - self.transformer = CLIPTextModel.from_pretrained(version) - self.device = device - self.max_length = max_length - self.freeze() - - def freeze(self): - self.transformer = self.transformer.eval() - for param in self.parameters(): - param.requires_grad = False - - def forward(self, text): - batch_encoding = self.tokenizer(text, truncation=True, max_length=self.max_length, return_length=True, - return_overflowing_tokens=False, padding="max_length", return_tensors="pt") - tokens = batch_encoding["input_ids"].to(self.device) - outputs = self.transformer(input_ids=tokens) - - z = outputs.last_hidden_state - return z - - def encode(self, text): - return self(text) - - -class FrozenCLIPTextEmbedder(nn.Module): - """ - Uses the CLIP transformer encoder for text. - """ - def __init__(self, version='ViT-L/14', device="cuda", max_length=77, n_repeat=1, normalize=True): - super().__init__() - self.model, _ = clip.load(version, jit=False, device="cpu") - self.device = device - self.max_length = max_length - self.n_repeat = n_repeat - self.normalize = normalize - - def freeze(self): - self.model = self.model.eval() - for param in self.parameters(): - param.requires_grad = False - - def forward(self, text): - tokens = clip.tokenize(text).to(self.device) - z = self.model.encode_text(tokens) - if self.normalize: - z = z / torch.linalg.norm(z, dim=1, keepdim=True) - return z - - def encode(self, text): - z = self(text) - if z.ndim==2: - z = z[:, None, :] - z = repeat(z, 'b 1 d -> b k d', k=self.n_repeat) - return z - - -class FrozenClipImageEmbedder(nn.Module): - """ - Uses the CLIP image encoder. - """ - def __init__( - self, - model, - jit=False, - device='cuda' if torch.cuda.is_available() else 'cpu', - antialias=False, - ): - super().__init__() - self.model, _ = clip.load(name=model, device=device, jit=jit) - - self.antialias = antialias - - self.register_buffer('mean', torch.Tensor([0.48145466, 0.4578275, 0.40821073]), persistent=False) - self.register_buffer('std', torch.Tensor([0.26862954, 0.26130258, 0.27577711]), persistent=False) - - def preprocess(self, x): - # normalize to [0,1] - x = kornia.geometry.resize(x, (224, 224), - interpolation='bicubic',align_corners=True, - antialias=self.antialias) - x = (x + 1.) / 2. - # renormalize according to clip - x = kornia.enhance.normalize(x, self.mean, self.std) - return x - - def forward(self, x): - # x is assumed to be in range [-1,1] - return self.model.encode_image(self.preprocess(x)) - - -if __name__ == "__main__": - from ldm.util import count_params - model = FrozenCLIPEmbedder() - count_params(model, verbose=True) \ No newline at end of file diff --git a/ldm/modules/encoders/xlmr.py b/ldm/modules/encoders/xlmr.py deleted file mode 100644 index beab3fdf..00000000 --- a/ldm/modules/encoders/xlmr.py +++ /dev/null @@ -1,137 +0,0 @@ -from transformers import BertPreTrainedModel,BertModel,BertConfig -import torch.nn as nn -import torch -from transformers.models.xlm_roberta.configuration_xlm_roberta import XLMRobertaConfig -from transformers import XLMRobertaModel,XLMRobertaTokenizer -from typing import Optional - -class BertSeriesConfig(BertConfig): - def __init__(self, vocab_size=30522, hidden_size=768, num_hidden_layers=12, num_attention_heads=12, intermediate_size=3072, hidden_act="gelu", hidden_dropout_prob=0.1, attention_probs_dropout_prob=0.1, max_position_embeddings=512, type_vocab_size=2, initializer_range=0.02, layer_norm_eps=1e-12, pad_token_id=0, position_embedding_type="absolute", use_cache=True, classifier_dropout=None,project_dim=512, pooler_fn="average",learn_encoder=False,model_type='bert',**kwargs): - - super().__init__(vocab_size, hidden_size, num_hidden_layers, num_attention_heads, intermediate_size, hidden_act, hidden_dropout_prob, attention_probs_dropout_prob, max_position_embeddings, type_vocab_size, initializer_range, layer_norm_eps, pad_token_id, position_embedding_type, use_cache, classifier_dropout, **kwargs) - self.project_dim = project_dim - self.pooler_fn = pooler_fn - self.learn_encoder = learn_encoder - -class RobertaSeriesConfig(XLMRobertaConfig): - def __init__(self, pad_token_id=1, bos_token_id=0, eos_token_id=2,project_dim=512,pooler_fn='cls',learn_encoder=False, **kwargs): - super().__init__(pad_token_id=pad_token_id, bos_token_id=bos_token_id, eos_token_id=eos_token_id, **kwargs) - self.project_dim = project_dim - self.pooler_fn = pooler_fn - self.learn_encoder = learn_encoder - - -class BertSeriesModelWithTransformation(BertPreTrainedModel): - - _keys_to_ignore_on_load_unexpected = [r"pooler"] - _keys_to_ignore_on_load_missing = [r"position_ids", r"predictions.decoder.bias"] - config_class = BertSeriesConfig - - def __init__(self, config=None, **kargs): - # modify initialization for autoloading - if config is None: - config = XLMRobertaConfig() - config.attention_probs_dropout_prob= 0.1 - config.bos_token_id=0 - config.eos_token_id=2 - config.hidden_act='gelu' - config.hidden_dropout_prob=0.1 - config.hidden_size=1024 - config.initializer_range=0.02 - config.intermediate_size=4096 - config.layer_norm_eps=1e-05 - config.max_position_embeddings=514 - - config.num_attention_heads=16 - config.num_hidden_layers=24 - config.output_past=True - config.pad_token_id=1 - config.position_embedding_type= "absolute" - - config.type_vocab_size= 1 - config.use_cache=True - config.vocab_size= 250002 - config.project_dim = 768 - config.learn_encoder = False - super().__init__(config) - self.roberta = XLMRobertaModel(config) - self.transformation = nn.Linear(config.hidden_size,config.project_dim) - self.pre_LN=nn.LayerNorm(config.hidden_size, eps=config.layer_norm_eps) - self.tokenizer = XLMRobertaTokenizer.from_pretrained('xlm-roberta-large') - self.pooler = lambda x: x[:,0] - self.post_init() - - def encode(self,c): - device = next(self.parameters()).device - text = self.tokenizer(c, - truncation=True, - max_length=77, - return_length=False, - return_overflowing_tokens=False, - padding="max_length", - return_tensors="pt") - text["input_ids"] = torch.tensor(text["input_ids"]).to(device) - text["attention_mask"] = torch.tensor( - text['attention_mask']).to(device) - features = self(**text) - return features['projection_state'] - - def forward( - self, - input_ids: Optional[torch.Tensor] = None, - attention_mask: Optional[torch.Tensor] = None, - token_type_ids: Optional[torch.Tensor] = None, - position_ids: Optional[torch.Tensor] = None, - head_mask: Optional[torch.Tensor] = None, - inputs_embeds: Optional[torch.Tensor] = None, - encoder_hidden_states: Optional[torch.Tensor] = None, - encoder_attention_mask: Optional[torch.Tensor] = None, - output_attentions: Optional[bool] = None, - return_dict: Optional[bool] = None, - output_hidden_states: Optional[bool] = None, - ) : - r""" - """ - - return_dict = return_dict if return_dict is not None else self.config.use_return_dict - - - outputs = self.roberta( - input_ids=input_ids, - attention_mask=attention_mask, - token_type_ids=token_type_ids, - position_ids=position_ids, - head_mask=head_mask, - inputs_embeds=inputs_embeds, - encoder_hidden_states=encoder_hidden_states, - encoder_attention_mask=encoder_attention_mask, - output_attentions=output_attentions, - output_hidden_states=True, - return_dict=return_dict, - ) - - # last module outputs - sequence_output = outputs[0] - - - # project every module - sequence_output_ln = self.pre_LN(sequence_output) - - # pooler - pooler_output = self.pooler(sequence_output_ln) - pooler_output = self.transformation(pooler_output) - projection_state = self.transformation(outputs.last_hidden_state) - - return { - 'pooler_output':pooler_output, - 'last_hidden_state':outputs.last_hidden_state, - 'hidden_states':outputs.hidden_states, - 'attentions':outputs.attentions, - 'projection_state':projection_state, - 'sequence_out': sequence_output - } - - -class RobertaSeriesModelWithTransformation(BertSeriesModelWithTransformation): - base_model_prefix = 'roberta' - config_class= RobertaSeriesConfig \ No newline at end of file diff --git a/ldm/modules/image_degradation/__init__.py b/ldm/modules/image_degradation/__init__.py deleted file mode 100644 index 7836cada..00000000 --- a/ldm/modules/image_degradation/__init__.py +++ /dev/null @@ -1,2 +0,0 @@ -from ldm.modules.image_degradation.bsrgan import degradation_bsrgan_variant as degradation_fn_bsr -from ldm.modules.image_degradation.bsrgan_light import degradation_bsrgan_variant as degradation_fn_bsr_light diff --git a/ldm/modules/image_degradation/bsrgan.py b/ldm/modules/image_degradation/bsrgan.py deleted file mode 100644 index 32ef5616..00000000 --- a/ldm/modules/image_degradation/bsrgan.py +++ /dev/null @@ -1,730 +0,0 @@ -# -*- coding: utf-8 -*- -""" -# -------------------------------------------- -# Super-Resolution -# -------------------------------------------- -# -# Kai Zhang (cskaizhang@gmail.com) -# https://github.com/cszn -# From 2019/03--2021/08 -# -------------------------------------------- -""" - -import numpy as np -import cv2 -import torch - -from functools import partial -import random -from scipy import ndimage -import scipy -import scipy.stats as ss -from scipy.interpolate import interp2d -from scipy.linalg import orth -import albumentations - -import ldm.modules.image_degradation.utils_image as util - - -def modcrop_np(img, sf): - ''' - Args: - img: numpy image, WxH or WxHxC - sf: scale factor - Return: - cropped image - ''' - w, h = img.shape[:2] - im = np.copy(img) - return im[:w - w % sf, :h - h % sf, ...] - - -""" -# -------------------------------------------- -# anisotropic Gaussian kernels -# -------------------------------------------- -""" - - -def analytic_kernel(k): - """Calculate the X4 kernel from the X2 kernel (for proof see appendix in paper)""" - k_size = k.shape[0] - # Calculate the big kernels size - big_k = np.zeros((3 * k_size - 2, 3 * k_size - 2)) - # Loop over the small kernel to fill the big one - for r in range(k_size): - for c in range(k_size): - big_k[2 * r:2 * r + k_size, 2 * c:2 * c + k_size] += k[r, c] * k - # Crop the edges of the big kernel to ignore very small values and increase run time of SR - crop = k_size // 2 - cropped_big_k = big_k[crop:-crop, crop:-crop] - # Normalize to 1 - return cropped_big_k / cropped_big_k.sum() - - -def anisotropic_Gaussian(ksize=15, theta=np.pi, l1=6, l2=6): - """ generate an anisotropic Gaussian kernel - Args: - ksize : e.g., 15, kernel size - theta : [0, pi], rotation angle range - l1 : [0.1,50], scaling of eigenvalues - l2 : [0.1,l1], scaling of eigenvalues - If l1 = l2, will get an isotropic Gaussian kernel. - Returns: - k : kernel - """ - - v = np.dot(np.array([[np.cos(theta), -np.sin(theta)], [np.sin(theta), np.cos(theta)]]), np.array([1., 0.])) - V = np.array([[v[0], v[1]], [v[1], -v[0]]]) - D = np.array([[l1, 0], [0, l2]]) - Sigma = np.dot(np.dot(V, D), np.linalg.inv(V)) - k = gm_blur_kernel(mean=[0, 0], cov=Sigma, size=ksize) - - return k - - -def gm_blur_kernel(mean, cov, size=15): - center = size / 2.0 + 0.5 - k = np.zeros([size, size]) - for y in range(size): - for x in range(size): - cy = y - center + 1 - cx = x - center + 1 - k[y, x] = ss.multivariate_normal.pdf([cx, cy], mean=mean, cov=cov) - - k = k / np.sum(k) - return k - - -def shift_pixel(x, sf, upper_left=True): - """shift pixel for super-resolution with different scale factors - Args: - x: WxHxC or WxH - sf: scale factor - upper_left: shift direction - """ - h, w = x.shape[:2] - shift = (sf - 1) * 0.5 - xv, yv = np.arange(0, w, 1.0), np.arange(0, h, 1.0) - if upper_left: - x1 = xv + shift - y1 = yv + shift - else: - x1 = xv - shift - y1 = yv - shift - - x1 = np.clip(x1, 0, w - 1) - y1 = np.clip(y1, 0, h - 1) - - if x.ndim == 2: - x = interp2d(xv, yv, x)(x1, y1) - if x.ndim == 3: - for i in range(x.shape[-1]): - x[:, :, i] = interp2d(xv, yv, x[:, :, i])(x1, y1) - - return x - - -def blur(x, k): - ''' - x: image, NxcxHxW - k: kernel, Nx1xhxw - ''' - n, c = x.shape[:2] - p1, p2 = (k.shape[-2] - 1) // 2, (k.shape[-1] - 1) // 2 - x = torch.nn.functional.pad(x, pad=(p1, p2, p1, p2), mode='replicate') - k = k.repeat(1, c, 1, 1) - k = k.view(-1, 1, k.shape[2], k.shape[3]) - x = x.view(1, -1, x.shape[2], x.shape[3]) - x = torch.nn.functional.conv2d(x, k, bias=None, stride=1, padding=0, groups=n * c) - x = x.view(n, c, x.shape[2], x.shape[3]) - - return x - - -def gen_kernel(k_size=np.array([15, 15]), scale_factor=np.array([4, 4]), min_var=0.6, max_var=10., noise_level=0): - """" - # modified version of https://github.com/assafshocher/BlindSR_dataset_generator - # Kai Zhang - # min_var = 0.175 * sf # variance of the gaussian kernel will be sampled between min_var and max_var - # max_var = 2.5 * sf - """ - # Set random eigen-vals (lambdas) and angle (theta) for COV matrix - lambda_1 = min_var + np.random.rand() * (max_var - min_var) - lambda_2 = min_var + np.random.rand() * (max_var - min_var) - theta = np.random.rand() * np.pi # random theta - noise = -noise_level + np.random.rand(*k_size) * noise_level * 2 - - # Set COV matrix using Lambdas and Theta - LAMBDA = np.diag([lambda_1, lambda_2]) - Q = np.array([[np.cos(theta), -np.sin(theta)], - [np.sin(theta), np.cos(theta)]]) - SIGMA = Q @ LAMBDA @ Q.T - INV_SIGMA = np.linalg.inv(SIGMA)[None, None, :, :] - - # Set expectation position (shifting kernel for aligned image) - MU = k_size // 2 - 0.5 * (scale_factor - 1) # - 0.5 * (scale_factor - k_size % 2) - MU = MU[None, None, :, None] - - # Create meshgrid for Gaussian - [X, Y] = np.meshgrid(range(k_size[0]), range(k_size[1])) - Z = np.stack([X, Y], 2)[:, :, :, None] - - # Calcualte Gaussian for every pixel of the kernel - ZZ = Z - MU - ZZ_t = ZZ.transpose(0, 1, 3, 2) - raw_kernel = np.exp(-0.5 * np.squeeze(ZZ_t @ INV_SIGMA @ ZZ)) * (1 + noise) - - # shift the kernel so it will be centered - # raw_kernel_centered = kernel_shift(raw_kernel, scale_factor) - - # Normalize the kernel and return - # kernel = raw_kernel_centered / np.sum(raw_kernel_centered) - kernel = raw_kernel / np.sum(raw_kernel) - return kernel - - -def fspecial_gaussian(hsize, sigma): - hsize = [hsize, hsize] - siz = [(hsize[0] - 1.0) / 2.0, (hsize[1] - 1.0) / 2.0] - std = sigma - [x, y] = np.meshgrid(np.arange(-siz[1], siz[1] + 1), np.arange(-siz[0], siz[0] + 1)) - arg = -(x * x + y * y) / (2 * std * std) - h = np.exp(arg) - h[h < scipy.finfo(float).eps * h.max()] = 0 - sumh = h.sum() - if sumh != 0: - h = h / sumh - return h - - -def fspecial_laplacian(alpha): - alpha = max([0, min([alpha, 1])]) - h1 = alpha / (alpha + 1) - h2 = (1 - alpha) / (alpha + 1) - h = [[h1, h2, h1], [h2, -4 / (alpha + 1), h2], [h1, h2, h1]] - h = np.array(h) - return h - - -def fspecial(filter_type, *args, **kwargs): - ''' - python code from: - https://github.com/ronaldosena/imagens-medicas-2/blob/40171a6c259edec7827a6693a93955de2bd39e76/Aulas/aula_2_-_uniform_filter/matlab_fspecial.py - ''' - if filter_type == 'gaussian': - return fspecial_gaussian(*args, **kwargs) - if filter_type == 'laplacian': - return fspecial_laplacian(*args, **kwargs) - - -""" -# -------------------------------------------- -# degradation models -# -------------------------------------------- -""" - - -def bicubic_degradation(x, sf=3): - ''' - Args: - x: HxWxC image, [0, 1] - sf: down-scale factor - Return: - bicubicly downsampled LR image - ''' - x = util.imresize_np(x, scale=1 / sf) - return x - - -def srmd_degradation(x, k, sf=3): - ''' blur + bicubic downsampling - Args: - x: HxWxC image, [0, 1] - k: hxw, double - sf: down-scale factor - Return: - downsampled LR image - Reference: - @inproceedings{zhang2018learning, - title={Learning a single convolutional super-resolution network for multiple degradations}, - author={Zhang, Kai and Zuo, Wangmeng and Zhang, Lei}, - booktitle={IEEE Conference on Computer Vision and Pattern Recognition}, - pages={3262--3271}, - year={2018} - } - ''' - x = ndimage.filters.convolve(x, np.expand_dims(k, axis=2), mode='wrap') # 'nearest' | 'mirror' - x = bicubic_degradation(x, sf=sf) - return x - - -def dpsr_degradation(x, k, sf=3): - ''' bicubic downsampling + blur - Args: - x: HxWxC image, [0, 1] - k: hxw, double - sf: down-scale factor - Return: - downsampled LR image - Reference: - @inproceedings{zhang2019deep, - title={Deep Plug-and-Play Super-Resolution for Arbitrary Blur Kernels}, - author={Zhang, Kai and Zuo, Wangmeng and Zhang, Lei}, - booktitle={IEEE Conference on Computer Vision and Pattern Recognition}, - pages={1671--1681}, - year={2019} - } - ''' - x = bicubic_degradation(x, sf=sf) - x = ndimage.filters.convolve(x, np.expand_dims(k, axis=2), mode='wrap') - return x - - -def classical_degradation(x, k, sf=3): - ''' blur + downsampling - Args: - x: HxWxC image, [0, 1]/[0, 255] - k: hxw, double - sf: down-scale factor - Return: - downsampled LR image - ''' - x = ndimage.filters.convolve(x, np.expand_dims(k, axis=2), mode='wrap') - # x = filters.correlate(x, np.expand_dims(np.flip(k), axis=2)) - st = 0 - return x[st::sf, st::sf, ...] - - -def add_sharpening(img, weight=0.5, radius=50, threshold=10): - """USM sharpening. borrowed from real-ESRGAN - Input image: I; Blurry image: B. - 1. K = I + weight * (I - B) - 2. Mask = 1 if abs(I - B) > threshold, else: 0 - 3. Blur mask: - 4. Out = Mask * K + (1 - Mask) * I - Args: - img (Numpy array): Input image, HWC, BGR; float32, [0, 1]. - weight (float): Sharp weight. Default: 1. - radius (float): Kernel size of Gaussian blur. Default: 50. - threshold (int): - """ - if radius % 2 == 0: - radius += 1 - blur = cv2.GaussianBlur(img, (radius, radius), 0) - residual = img - blur - mask = np.abs(residual) * 255 > threshold - mask = mask.astype('float32') - soft_mask = cv2.GaussianBlur(mask, (radius, radius), 0) - - K = img + weight * residual - K = np.clip(K, 0, 1) - return soft_mask * K + (1 - soft_mask) * img - - -def add_blur(img, sf=4): - wd2 = 4.0 + sf - wd = 2.0 + 0.2 * sf - if random.random() < 0.5: - l1 = wd2 * random.random() - l2 = wd2 * random.random() - k = anisotropic_Gaussian(ksize=2 * random.randint(2, 11) + 3, theta=random.random() * np.pi, l1=l1, l2=l2) - else: - k = fspecial('gaussian', 2 * random.randint(2, 11) + 3, wd * random.random()) - img = ndimage.filters.convolve(img, np.expand_dims(k, axis=2), mode='mirror') - - return img - - -def add_resize(img, sf=4): - rnum = np.random.rand() - if rnum > 0.8: # up - sf1 = random.uniform(1, 2) - elif rnum < 0.7: # down - sf1 = random.uniform(0.5 / sf, 1) - else: - sf1 = 1.0 - img = cv2.resize(img, (int(sf1 * img.shape[1]), int(sf1 * img.shape[0])), interpolation=random.choice([1, 2, 3])) - img = np.clip(img, 0.0, 1.0) - - return img - - -# def add_Gaussian_noise(img, noise_level1=2, noise_level2=25): -# noise_level = random.randint(noise_level1, noise_level2) -# rnum = np.random.rand() -# if rnum > 0.6: # add color Gaussian noise -# img += np.random.normal(0, noise_level / 255.0, img.shape).astype(np.float32) -# elif rnum < 0.4: # add grayscale Gaussian noise -# img += np.random.normal(0, noise_level / 255.0, (*img.shape[:2], 1)).astype(np.float32) -# else: # add noise -# L = noise_level2 / 255. -# D = np.diag(np.random.rand(3)) -# U = orth(np.random.rand(3, 3)) -# conv = np.dot(np.dot(np.transpose(U), D), U) -# img += np.random.multivariate_normal([0, 0, 0], np.abs(L ** 2 * conv), img.shape[:2]).astype(np.float32) -# img = np.clip(img, 0.0, 1.0) -# return img - -def add_Gaussian_noise(img, noise_level1=2, noise_level2=25): - noise_level = random.randint(noise_level1, noise_level2) - rnum = np.random.rand() - if rnum > 0.6: # add color Gaussian noise - img = img + np.random.normal(0, noise_level / 255.0, img.shape).astype(np.float32) - elif rnum < 0.4: # add grayscale Gaussian noise - img = img + np.random.normal(0, noise_level / 255.0, (*img.shape[:2], 1)).astype(np.float32) - else: # add noise - L = noise_level2 / 255. - D = np.diag(np.random.rand(3)) - U = orth(np.random.rand(3, 3)) - conv = np.dot(np.dot(np.transpose(U), D), U) - img = img + np.random.multivariate_normal([0, 0, 0], np.abs(L ** 2 * conv), img.shape[:2]).astype(np.float32) - img = np.clip(img, 0.0, 1.0) - return img - - -def add_speckle_noise(img, noise_level1=2, noise_level2=25): - noise_level = random.randint(noise_level1, noise_level2) - img = np.clip(img, 0.0, 1.0) - rnum = random.random() - if rnum > 0.6: - img += img * np.random.normal(0, noise_level / 255.0, img.shape).astype(np.float32) - elif rnum < 0.4: - img += img * np.random.normal(0, noise_level / 255.0, (*img.shape[:2], 1)).astype(np.float32) - else: - L = noise_level2 / 255. - D = np.diag(np.random.rand(3)) - U = orth(np.random.rand(3, 3)) - conv = np.dot(np.dot(np.transpose(U), D), U) - img += img * np.random.multivariate_normal([0, 0, 0], np.abs(L ** 2 * conv), img.shape[:2]).astype(np.float32) - img = np.clip(img, 0.0, 1.0) - return img - - -def add_Poisson_noise(img): - img = np.clip((img * 255.0).round(), 0, 255) / 255. - vals = 10 ** (2 * random.random() + 2.0) # [2, 4] - if random.random() < 0.5: - img = np.random.poisson(img * vals).astype(np.float32) / vals - else: - img_gray = np.dot(img[..., :3], [0.299, 0.587, 0.114]) - img_gray = np.clip((img_gray * 255.0).round(), 0, 255) / 255. - noise_gray = np.random.poisson(img_gray * vals).astype(np.float32) / vals - img_gray - img += noise_gray[:, :, np.newaxis] - img = np.clip(img, 0.0, 1.0) - return img - - -def add_JPEG_noise(img): - quality_factor = random.randint(30, 95) - img = cv2.cvtColor(util.single2uint(img), cv2.COLOR_RGB2BGR) - result, encimg = cv2.imencode('.jpg', img, [int(cv2.IMWRITE_JPEG_QUALITY), quality_factor]) - img = cv2.imdecode(encimg, 1) - img = cv2.cvtColor(util.uint2single(img), cv2.COLOR_BGR2RGB) - return img - - -def random_crop(lq, hq, sf=4, lq_patchsize=64): - h, w = lq.shape[:2] - rnd_h = random.randint(0, h - lq_patchsize) - rnd_w = random.randint(0, w - lq_patchsize) - lq = lq[rnd_h:rnd_h + lq_patchsize, rnd_w:rnd_w + lq_patchsize, :] - - rnd_h_H, rnd_w_H = int(rnd_h * sf), int(rnd_w * sf) - hq = hq[rnd_h_H:rnd_h_H + lq_patchsize * sf, rnd_w_H:rnd_w_H + lq_patchsize * sf, :] - return lq, hq - - -def degradation_bsrgan(img, sf=4, lq_patchsize=72, isp_model=None): - """ - This is the degradation model of BSRGAN from the paper - "Designing a Practical Degradation Model for Deep Blind Image Super-Resolution" - ---------- - img: HXWXC, [0, 1], its size should be large than (lq_patchsizexsf)x(lq_patchsizexsf) - sf: scale factor - isp_model: camera ISP model - Returns - ------- - img: low-quality patch, size: lq_patchsizeXlq_patchsizeXC, range: [0, 1] - hq: corresponding high-quality patch, size: (lq_patchsizexsf)X(lq_patchsizexsf)XC, range: [0, 1] - """ - isp_prob, jpeg_prob, scale2_prob = 0.25, 0.9, 0.25 - sf_ori = sf - - h1, w1 = img.shape[:2] - img = img.copy()[:w1 - w1 % sf, :h1 - h1 % sf, ...] # mod crop - h, w = img.shape[:2] - - if h < lq_patchsize * sf or w < lq_patchsize * sf: - raise ValueError(f'img size ({h1}X{w1}) is too small!') - - hq = img.copy() - - if sf == 4 and random.random() < scale2_prob: # downsample1 - if np.random.rand() < 0.5: - img = cv2.resize(img, (int(1 / 2 * img.shape[1]), int(1 / 2 * img.shape[0])), - interpolation=random.choice([1, 2, 3])) - else: - img = util.imresize_np(img, 1 / 2, True) - img = np.clip(img, 0.0, 1.0) - sf = 2 - - shuffle_order = random.sample(range(7), 7) - idx1, idx2 = shuffle_order.index(2), shuffle_order.index(3) - if idx1 > idx2: # keep downsample3 last - shuffle_order[idx1], shuffle_order[idx2] = shuffle_order[idx2], shuffle_order[idx1] - - for i in shuffle_order: - - if i == 0: - img = add_blur(img, sf=sf) - - elif i == 1: - img = add_blur(img, sf=sf) - - elif i == 2: - a, b = img.shape[1], img.shape[0] - # downsample2 - if random.random() < 0.75: - sf1 = random.uniform(1, 2 * sf) - img = cv2.resize(img, (int(1 / sf1 * img.shape[1]), int(1 / sf1 * img.shape[0])), - interpolation=random.choice([1, 2, 3])) - else: - k = fspecial('gaussian', 25, random.uniform(0.1, 0.6 * sf)) - k_shifted = shift_pixel(k, sf) - k_shifted = k_shifted / k_shifted.sum() # blur with shifted kernel - img = ndimage.filters.convolve(img, np.expand_dims(k_shifted, axis=2), mode='mirror') - img = img[0::sf, 0::sf, ...] # nearest downsampling - img = np.clip(img, 0.0, 1.0) - - elif i == 3: - # downsample3 - img = cv2.resize(img, (int(1 / sf * a), int(1 / sf * b)), interpolation=random.choice([1, 2, 3])) - img = np.clip(img, 0.0, 1.0) - - elif i == 4: - # add Gaussian noise - img = add_Gaussian_noise(img, noise_level1=2, noise_level2=25) - - elif i == 5: - # add JPEG noise - if random.random() < jpeg_prob: - img = add_JPEG_noise(img) - - elif i == 6: - # add processed camera sensor noise - if random.random() < isp_prob and isp_model is not None: - with torch.no_grad(): - img, hq = isp_model.forward(img.copy(), hq) - - # add final JPEG compression noise - img = add_JPEG_noise(img) - - # random crop - img, hq = random_crop(img, hq, sf_ori, lq_patchsize) - - return img, hq - - -# todo no isp_model? -def degradation_bsrgan_variant(image, sf=4, isp_model=None): - """ - This is the degradation model of BSRGAN from the paper - "Designing a Practical Degradation Model for Deep Blind Image Super-Resolution" - ---------- - sf: scale factor - isp_model: camera ISP model - Returns - ------- - img: low-quality patch, size: lq_patchsizeXlq_patchsizeXC, range: [0, 1] - hq: corresponding high-quality patch, size: (lq_patchsizexsf)X(lq_patchsizexsf)XC, range: [0, 1] - """ - image = util.uint2single(image) - isp_prob, jpeg_prob, scale2_prob = 0.25, 0.9, 0.25 - sf_ori = sf - - h1, w1 = image.shape[:2] - image = image.copy()[:w1 - w1 % sf, :h1 - h1 % sf, ...] # mod crop - h, w = image.shape[:2] - - hq = image.copy() - - if sf == 4 and random.random() < scale2_prob: # downsample1 - if np.random.rand() < 0.5: - image = cv2.resize(image, (int(1 / 2 * image.shape[1]), int(1 / 2 * image.shape[0])), - interpolation=random.choice([1, 2, 3])) - else: - image = util.imresize_np(image, 1 / 2, True) - image = np.clip(image, 0.0, 1.0) - sf = 2 - - shuffle_order = random.sample(range(7), 7) - idx1, idx2 = shuffle_order.index(2), shuffle_order.index(3) - if idx1 > idx2: # keep downsample3 last - shuffle_order[idx1], shuffle_order[idx2] = shuffle_order[idx2], shuffle_order[idx1] - - for i in shuffle_order: - - if i == 0: - image = add_blur(image, sf=sf) - - elif i == 1: - image = add_blur(image, sf=sf) - - elif i == 2: - a, b = image.shape[1], image.shape[0] - # downsample2 - if random.random() < 0.75: - sf1 = random.uniform(1, 2 * sf) - image = cv2.resize(image, (int(1 / sf1 * image.shape[1]), int(1 / sf1 * image.shape[0])), - interpolation=random.choice([1, 2, 3])) - else: - k = fspecial('gaussian', 25, random.uniform(0.1, 0.6 * sf)) - k_shifted = shift_pixel(k, sf) - k_shifted = k_shifted / k_shifted.sum() # blur with shifted kernel - image = ndimage.filters.convolve(image, np.expand_dims(k_shifted, axis=2), mode='mirror') - image = image[0::sf, 0::sf, ...] # nearest downsampling - image = np.clip(image, 0.0, 1.0) - - elif i == 3: - # downsample3 - image = cv2.resize(image, (int(1 / sf * a), int(1 / sf * b)), interpolation=random.choice([1, 2, 3])) - image = np.clip(image, 0.0, 1.0) - - elif i == 4: - # add Gaussian noise - image = add_Gaussian_noise(image, noise_level1=2, noise_level2=25) - - elif i == 5: - # add JPEG noise - if random.random() < jpeg_prob: - image = add_JPEG_noise(image) - - # elif i == 6: - # # add processed camera sensor noise - # if random.random() < isp_prob and isp_model is not None: - # with torch.no_grad(): - # img, hq = isp_model.forward(img.copy(), hq) - - # add final JPEG compression noise - image = add_JPEG_noise(image) - image = util.single2uint(image) - example = {"image":image} - return example - - -# TODO incase there is a pickle error one needs to replace a += x with a = a + x in add_speckle_noise etc... -def degradation_bsrgan_plus(img, sf=4, shuffle_prob=0.5, use_sharp=True, lq_patchsize=64, isp_model=None): - """ - This is an extended degradation model by combining - the degradation models of BSRGAN and Real-ESRGAN - ---------- - img: HXWXC, [0, 1], its size should be large than (lq_patchsizexsf)x(lq_patchsizexsf) - sf: scale factor - use_shuffle: the degradation shuffle - use_sharp: sharpening the img - Returns - ------- - img: low-quality patch, size: lq_patchsizeXlq_patchsizeXC, range: [0, 1] - hq: corresponding high-quality patch, size: (lq_patchsizexsf)X(lq_patchsizexsf)XC, range: [0, 1] - """ - - h1, w1 = img.shape[:2] - img = img.copy()[:w1 - w1 % sf, :h1 - h1 % sf, ...] # mod crop - h, w = img.shape[:2] - - if h < lq_patchsize * sf or w < lq_patchsize * sf: - raise ValueError(f'img size ({h1}X{w1}) is too small!') - - if use_sharp: - img = add_sharpening(img) - hq = img.copy() - - if random.random() < shuffle_prob: - shuffle_order = random.sample(range(13), 13) - else: - shuffle_order = list(range(13)) - # local shuffle for noise, JPEG is always the last one - shuffle_order[2:6] = random.sample(shuffle_order[2:6], len(range(2, 6))) - shuffle_order[9:13] = random.sample(shuffle_order[9:13], len(range(9, 13))) - - poisson_prob, speckle_prob, isp_prob = 0.1, 0.1, 0.1 - - for i in shuffle_order: - if i == 0: - img = add_blur(img, sf=sf) - elif i == 1: - img = add_resize(img, sf=sf) - elif i == 2: - img = add_Gaussian_noise(img, noise_level1=2, noise_level2=25) - elif i == 3: - if random.random() < poisson_prob: - img = add_Poisson_noise(img) - elif i == 4: - if random.random() < speckle_prob: - img = add_speckle_noise(img) - elif i == 5: - if random.random() < isp_prob and isp_model is not None: - with torch.no_grad(): - img, hq = isp_model.forward(img.copy(), hq) - elif i == 6: - img = add_JPEG_noise(img) - elif i == 7: - img = add_blur(img, sf=sf) - elif i == 8: - img = add_resize(img, sf=sf) - elif i == 9: - img = add_Gaussian_noise(img, noise_level1=2, noise_level2=25) - elif i == 10: - if random.random() < poisson_prob: - img = add_Poisson_noise(img) - elif i == 11: - if random.random() < speckle_prob: - img = add_speckle_noise(img) - elif i == 12: - if random.random() < isp_prob and isp_model is not None: - with torch.no_grad(): - img, hq = isp_model.forward(img.copy(), hq) - else: - print('check the shuffle!') - - # resize to desired size - img = cv2.resize(img, (int(1 / sf * hq.shape[1]), int(1 / sf * hq.shape[0])), - interpolation=random.choice([1, 2, 3])) - - # add final JPEG compression noise - img = add_JPEG_noise(img) - - # random crop - img, hq = random_crop(img, hq, sf, lq_patchsize) - - return img, hq - - -if __name__ == '__main__': - print("hey") - img = util.imread_uint('utils/test.png', 3) - print(img) - img = util.uint2single(img) - print(img) - img = img[:448, :448] - h = img.shape[0] // 4 - print("resizing to", h) - sf = 4 - deg_fn = partial(degradation_bsrgan_variant, sf=sf) - for i in range(20): - print(i) - img_lq = deg_fn(img) - print(img_lq) - img_lq_bicubic = albumentations.SmallestMaxSize(max_size=h, interpolation=cv2.INTER_CUBIC)(image=img)["image"] - print(img_lq.shape) - print("bicubic", img_lq_bicubic.shape) - print(img_hq.shape) - lq_nearest = cv2.resize(util.single2uint(img_lq), (int(sf * img_lq.shape[1]), int(sf * img_lq.shape[0])), - interpolation=0) - lq_bicubic_nearest = cv2.resize(util.single2uint(img_lq_bicubic), (int(sf * img_lq.shape[1]), int(sf * img_lq.shape[0])), - interpolation=0) - img_concat = np.concatenate([lq_bicubic_nearest, lq_nearest, util.single2uint(img_hq)], axis=1) - util.imsave(img_concat, str(i) + '.png') - - diff --git a/ldm/modules/image_degradation/bsrgan_light.py b/ldm/modules/image_degradation/bsrgan_light.py deleted file mode 100644 index 9e1f8239..00000000 --- a/ldm/modules/image_degradation/bsrgan_light.py +++ /dev/null @@ -1,650 +0,0 @@ -# -*- coding: utf-8 -*- -import numpy as np -import cv2 -import torch - -from functools import partial -import random -from scipy import ndimage -import scipy -import scipy.stats as ss -from scipy.interpolate import interp2d -from scipy.linalg import orth -import albumentations - -import ldm.modules.image_degradation.utils_image as util - -""" -# -------------------------------------------- -# Super-Resolution -# -------------------------------------------- -# -# Kai Zhang (cskaizhang@gmail.com) -# https://github.com/cszn -# From 2019/03--2021/08 -# -------------------------------------------- -""" - - -def modcrop_np(img, sf): - ''' - Args: - img: numpy image, WxH or WxHxC - sf: scale factor - Return: - cropped image - ''' - w, h = img.shape[:2] - im = np.copy(img) - return im[:w - w % sf, :h - h % sf, ...] - - -""" -# -------------------------------------------- -# anisotropic Gaussian kernels -# -------------------------------------------- -""" - - -def analytic_kernel(k): - """Calculate the X4 kernel from the X2 kernel (for proof see appendix in paper)""" - k_size = k.shape[0] - # Calculate the big kernels size - big_k = np.zeros((3 * k_size - 2, 3 * k_size - 2)) - # Loop over the small kernel to fill the big one - for r in range(k_size): - for c in range(k_size): - big_k[2 * r:2 * r + k_size, 2 * c:2 * c + k_size] += k[r, c] * k - # Crop the edges of the big kernel to ignore very small values and increase run time of SR - crop = k_size // 2 - cropped_big_k = big_k[crop:-crop, crop:-crop] - # Normalize to 1 - return cropped_big_k / cropped_big_k.sum() - - -def anisotropic_Gaussian(ksize=15, theta=np.pi, l1=6, l2=6): - """ generate an anisotropic Gaussian kernel - Args: - ksize : e.g., 15, kernel size - theta : [0, pi], rotation angle range - l1 : [0.1,50], scaling of eigenvalues - l2 : [0.1,l1], scaling of eigenvalues - If l1 = l2, will get an isotropic Gaussian kernel. - Returns: - k : kernel - """ - - v = np.dot(np.array([[np.cos(theta), -np.sin(theta)], [np.sin(theta), np.cos(theta)]]), np.array([1., 0.])) - V = np.array([[v[0], v[1]], [v[1], -v[0]]]) - D = np.array([[l1, 0], [0, l2]]) - Sigma = np.dot(np.dot(V, D), np.linalg.inv(V)) - k = gm_blur_kernel(mean=[0, 0], cov=Sigma, size=ksize) - - return k - - -def gm_blur_kernel(mean, cov, size=15): - center = size / 2.0 + 0.5 - k = np.zeros([size, size]) - for y in range(size): - for x in range(size): - cy = y - center + 1 - cx = x - center + 1 - k[y, x] = ss.multivariate_normal.pdf([cx, cy], mean=mean, cov=cov) - - k = k / np.sum(k) - return k - - -def shift_pixel(x, sf, upper_left=True): - """shift pixel for super-resolution with different scale factors - Args: - x: WxHxC or WxH - sf: scale factor - upper_left: shift direction - """ - h, w = x.shape[:2] - shift = (sf - 1) * 0.5 - xv, yv = np.arange(0, w, 1.0), np.arange(0, h, 1.0) - if upper_left: - x1 = xv + shift - y1 = yv + shift - else: - x1 = xv - shift - y1 = yv - shift - - x1 = np.clip(x1, 0, w - 1) - y1 = np.clip(y1, 0, h - 1) - - if x.ndim == 2: - x = interp2d(xv, yv, x)(x1, y1) - if x.ndim == 3: - for i in range(x.shape[-1]): - x[:, :, i] = interp2d(xv, yv, x[:, :, i])(x1, y1) - - return x - - -def blur(x, k): - ''' - x: image, NxcxHxW - k: kernel, Nx1xhxw - ''' - n, c = x.shape[:2] - p1, p2 = (k.shape[-2] - 1) // 2, (k.shape[-1] - 1) // 2 - x = torch.nn.functional.pad(x, pad=(p1, p2, p1, p2), mode='replicate') - k = k.repeat(1, c, 1, 1) - k = k.view(-1, 1, k.shape[2], k.shape[3]) - x = x.view(1, -1, x.shape[2], x.shape[3]) - x = torch.nn.functional.conv2d(x, k, bias=None, stride=1, padding=0, groups=n * c) - x = x.view(n, c, x.shape[2], x.shape[3]) - - return x - - -def gen_kernel(k_size=np.array([15, 15]), scale_factor=np.array([4, 4]), min_var=0.6, max_var=10., noise_level=0): - """" - # modified version of https://github.com/assafshocher/BlindSR_dataset_generator - # Kai Zhang - # min_var = 0.175 * sf # variance of the gaussian kernel will be sampled between min_var and max_var - # max_var = 2.5 * sf - """ - # Set random eigen-vals (lambdas) and angle (theta) for COV matrix - lambda_1 = min_var + np.random.rand() * (max_var - min_var) - lambda_2 = min_var + np.random.rand() * (max_var - min_var) - theta = np.random.rand() * np.pi # random theta - noise = -noise_level + np.random.rand(*k_size) * noise_level * 2 - - # Set COV matrix using Lambdas and Theta - LAMBDA = np.diag([lambda_1, lambda_2]) - Q = np.array([[np.cos(theta), -np.sin(theta)], - [np.sin(theta), np.cos(theta)]]) - SIGMA = Q @ LAMBDA @ Q.T - INV_SIGMA = np.linalg.inv(SIGMA)[None, None, :, :] - - # Set expectation position (shifting kernel for aligned image) - MU = k_size // 2 - 0.5 * (scale_factor - 1) # - 0.5 * (scale_factor - k_size % 2) - MU = MU[None, None, :, None] - - # Create meshgrid for Gaussian - [X, Y] = np.meshgrid(range(k_size[0]), range(k_size[1])) - Z = np.stack([X, Y], 2)[:, :, :, None] - - # Calcualte Gaussian for every pixel of the kernel - ZZ = Z - MU - ZZ_t = ZZ.transpose(0, 1, 3, 2) - raw_kernel = np.exp(-0.5 * np.squeeze(ZZ_t @ INV_SIGMA @ ZZ)) * (1 + noise) - - # shift the kernel so it will be centered - # raw_kernel_centered = kernel_shift(raw_kernel, scale_factor) - - # Normalize the kernel and return - # kernel = raw_kernel_centered / np.sum(raw_kernel_centered) - kernel = raw_kernel / np.sum(raw_kernel) - return kernel - - -def fspecial_gaussian(hsize, sigma): - hsize = [hsize, hsize] - siz = [(hsize[0] - 1.0) / 2.0, (hsize[1] - 1.0) / 2.0] - std = sigma - [x, y] = np.meshgrid(np.arange(-siz[1], siz[1] + 1), np.arange(-siz[0], siz[0] + 1)) - arg = -(x * x + y * y) / (2 * std * std) - h = np.exp(arg) - h[h < scipy.finfo(float).eps * h.max()] = 0 - sumh = h.sum() - if sumh != 0: - h = h / sumh - return h - - -def fspecial_laplacian(alpha): - alpha = max([0, min([alpha, 1])]) - h1 = alpha / (alpha + 1) - h2 = (1 - alpha) / (alpha + 1) - h = [[h1, h2, h1], [h2, -4 / (alpha + 1), h2], [h1, h2, h1]] - h = np.array(h) - return h - - -def fspecial(filter_type, *args, **kwargs): - ''' - python code from: - https://github.com/ronaldosena/imagens-medicas-2/blob/40171a6c259edec7827a6693a93955de2bd39e76/Aulas/aula_2_-_uniform_filter/matlab_fspecial.py - ''' - if filter_type == 'gaussian': - return fspecial_gaussian(*args, **kwargs) - if filter_type == 'laplacian': - return fspecial_laplacian(*args, **kwargs) - - -""" -# -------------------------------------------- -# degradation models -# -------------------------------------------- -""" - - -def bicubic_degradation(x, sf=3): - ''' - Args: - x: HxWxC image, [0, 1] - sf: down-scale factor - Return: - bicubicly downsampled LR image - ''' - x = util.imresize_np(x, scale=1 / sf) - return x - - -def srmd_degradation(x, k, sf=3): - ''' blur + bicubic downsampling - Args: - x: HxWxC image, [0, 1] - k: hxw, double - sf: down-scale factor - Return: - downsampled LR image - Reference: - @inproceedings{zhang2018learning, - title={Learning a single convolutional super-resolution network for multiple degradations}, - author={Zhang, Kai and Zuo, Wangmeng and Zhang, Lei}, - booktitle={IEEE Conference on Computer Vision and Pattern Recognition}, - pages={3262--3271}, - year={2018} - } - ''' - x = ndimage.filters.convolve(x, np.expand_dims(k, axis=2), mode='wrap') # 'nearest' | 'mirror' - x = bicubic_degradation(x, sf=sf) - return x - - -def dpsr_degradation(x, k, sf=3): - ''' bicubic downsampling + blur - Args: - x: HxWxC image, [0, 1] - k: hxw, double - sf: down-scale factor - Return: - downsampled LR image - Reference: - @inproceedings{zhang2019deep, - title={Deep Plug-and-Play Super-Resolution for Arbitrary Blur Kernels}, - author={Zhang, Kai and Zuo, Wangmeng and Zhang, Lei}, - booktitle={IEEE Conference on Computer Vision and Pattern Recognition}, - pages={1671--1681}, - year={2019} - } - ''' - x = bicubic_degradation(x, sf=sf) - x = ndimage.filters.convolve(x, np.expand_dims(k, axis=2), mode='wrap') - return x - - -def classical_degradation(x, k, sf=3): - ''' blur + downsampling - Args: - x: HxWxC image, [0, 1]/[0, 255] - k: hxw, double - sf: down-scale factor - Return: - downsampled LR image - ''' - x = ndimage.filters.convolve(x, np.expand_dims(k, axis=2), mode='wrap') - # x = filters.correlate(x, np.expand_dims(np.flip(k), axis=2)) - st = 0 - return x[st::sf, st::sf, ...] - - -def add_sharpening(img, weight=0.5, radius=50, threshold=10): - """USM sharpening. borrowed from real-ESRGAN - Input image: I; Blurry image: B. - 1. K = I + weight * (I - B) - 2. Mask = 1 if abs(I - B) > threshold, else: 0 - 3. Blur mask: - 4. Out = Mask * K + (1 - Mask) * I - Args: - img (Numpy array): Input image, HWC, BGR; float32, [0, 1]. - weight (float): Sharp weight. Default: 1. - radius (float): Kernel size of Gaussian blur. Default: 50. - threshold (int): - """ - if radius % 2 == 0: - radius += 1 - blur = cv2.GaussianBlur(img, (radius, radius), 0) - residual = img - blur - mask = np.abs(residual) * 255 > threshold - mask = mask.astype('float32') - soft_mask = cv2.GaussianBlur(mask, (radius, radius), 0) - - K = img + weight * residual - K = np.clip(K, 0, 1) - return soft_mask * K + (1 - soft_mask) * img - - -def add_blur(img, sf=4): - wd2 = 4.0 + sf - wd = 2.0 + 0.2 * sf - - wd2 = wd2/4 - wd = wd/4 - - if random.random() < 0.5: - l1 = wd2 * random.random() - l2 = wd2 * random.random() - k = anisotropic_Gaussian(ksize=random.randint(2, 11) + 3, theta=random.random() * np.pi, l1=l1, l2=l2) - else: - k = fspecial('gaussian', random.randint(2, 4) + 3, wd * random.random()) - img = ndimage.filters.convolve(img, np.expand_dims(k, axis=2), mode='mirror') - - return img - - -def add_resize(img, sf=4): - rnum = np.random.rand() - if rnum > 0.8: # up - sf1 = random.uniform(1, 2) - elif rnum < 0.7: # down - sf1 = random.uniform(0.5 / sf, 1) - else: - sf1 = 1.0 - img = cv2.resize(img, (int(sf1 * img.shape[1]), int(sf1 * img.shape[0])), interpolation=random.choice([1, 2, 3])) - img = np.clip(img, 0.0, 1.0) - - return img - - -# def add_Gaussian_noise(img, noise_level1=2, noise_level2=25): -# noise_level = random.randint(noise_level1, noise_level2) -# rnum = np.random.rand() -# if rnum > 0.6: # add color Gaussian noise -# img += np.random.normal(0, noise_level / 255.0, img.shape).astype(np.float32) -# elif rnum < 0.4: # add grayscale Gaussian noise -# img += np.random.normal(0, noise_level / 255.0, (*img.shape[:2], 1)).astype(np.float32) -# else: # add noise -# L = noise_level2 / 255. -# D = np.diag(np.random.rand(3)) -# U = orth(np.random.rand(3, 3)) -# conv = np.dot(np.dot(np.transpose(U), D), U) -# img += np.random.multivariate_normal([0, 0, 0], np.abs(L ** 2 * conv), img.shape[:2]).astype(np.float32) -# img = np.clip(img, 0.0, 1.0) -# return img - -def add_Gaussian_noise(img, noise_level1=2, noise_level2=25): - noise_level = random.randint(noise_level1, noise_level2) - rnum = np.random.rand() - if rnum > 0.6: # add color Gaussian noise - img = img + np.random.normal(0, noise_level / 255.0, img.shape).astype(np.float32) - elif rnum < 0.4: # add grayscale Gaussian noise - img = img + np.random.normal(0, noise_level / 255.0, (*img.shape[:2], 1)).astype(np.float32) - else: # add noise - L = noise_level2 / 255. - D = np.diag(np.random.rand(3)) - U = orth(np.random.rand(3, 3)) - conv = np.dot(np.dot(np.transpose(U), D), U) - img = img + np.random.multivariate_normal([0, 0, 0], np.abs(L ** 2 * conv), img.shape[:2]).astype(np.float32) - img = np.clip(img, 0.0, 1.0) - return img - - -def add_speckle_noise(img, noise_level1=2, noise_level2=25): - noise_level = random.randint(noise_level1, noise_level2) - img = np.clip(img, 0.0, 1.0) - rnum = random.random() - if rnum > 0.6: - img += img * np.random.normal(0, noise_level / 255.0, img.shape).astype(np.float32) - elif rnum < 0.4: - img += img * np.random.normal(0, noise_level / 255.0, (*img.shape[:2], 1)).astype(np.float32) - else: - L = noise_level2 / 255. - D = np.diag(np.random.rand(3)) - U = orth(np.random.rand(3, 3)) - conv = np.dot(np.dot(np.transpose(U), D), U) - img += img * np.random.multivariate_normal([0, 0, 0], np.abs(L ** 2 * conv), img.shape[:2]).astype(np.float32) - img = np.clip(img, 0.0, 1.0) - return img - - -def add_Poisson_noise(img): - img = np.clip((img * 255.0).round(), 0, 255) / 255. - vals = 10 ** (2 * random.random() + 2.0) # [2, 4] - if random.random() < 0.5: - img = np.random.poisson(img * vals).astype(np.float32) / vals - else: - img_gray = np.dot(img[..., :3], [0.299, 0.587, 0.114]) - img_gray = np.clip((img_gray * 255.0).round(), 0, 255) / 255. - noise_gray = np.random.poisson(img_gray * vals).astype(np.float32) / vals - img_gray - img += noise_gray[:, :, np.newaxis] - img = np.clip(img, 0.0, 1.0) - return img - - -def add_JPEG_noise(img): - quality_factor = random.randint(80, 95) - img = cv2.cvtColor(util.single2uint(img), cv2.COLOR_RGB2BGR) - result, encimg = cv2.imencode('.jpg', img, [int(cv2.IMWRITE_JPEG_QUALITY), quality_factor]) - img = cv2.imdecode(encimg, 1) - img = cv2.cvtColor(util.uint2single(img), cv2.COLOR_BGR2RGB) - return img - - -def random_crop(lq, hq, sf=4, lq_patchsize=64): - h, w = lq.shape[:2] - rnd_h = random.randint(0, h - lq_patchsize) - rnd_w = random.randint(0, w - lq_patchsize) - lq = lq[rnd_h:rnd_h + lq_patchsize, rnd_w:rnd_w + lq_patchsize, :] - - rnd_h_H, rnd_w_H = int(rnd_h * sf), int(rnd_w * sf) - hq = hq[rnd_h_H:rnd_h_H + lq_patchsize * sf, rnd_w_H:rnd_w_H + lq_patchsize * sf, :] - return lq, hq - - -def degradation_bsrgan(img, sf=4, lq_patchsize=72, isp_model=None): - """ - This is the degradation model of BSRGAN from the paper - "Designing a Practical Degradation Model for Deep Blind Image Super-Resolution" - ---------- - img: HXWXC, [0, 1], its size should be large than (lq_patchsizexsf)x(lq_patchsizexsf) - sf: scale factor - isp_model: camera ISP model - Returns - ------- - img: low-quality patch, size: lq_patchsizeXlq_patchsizeXC, range: [0, 1] - hq: corresponding high-quality patch, size: (lq_patchsizexsf)X(lq_patchsizexsf)XC, range: [0, 1] - """ - isp_prob, jpeg_prob, scale2_prob = 0.25, 0.9, 0.25 - sf_ori = sf - - h1, w1 = img.shape[:2] - img = img.copy()[:w1 - w1 % sf, :h1 - h1 % sf, ...] # mod crop - h, w = img.shape[:2] - - if h < lq_patchsize * sf or w < lq_patchsize * sf: - raise ValueError(f'img size ({h1}X{w1}) is too small!') - - hq = img.copy() - - if sf == 4 and random.random() < scale2_prob: # downsample1 - if np.random.rand() < 0.5: - img = cv2.resize(img, (int(1 / 2 * img.shape[1]), int(1 / 2 * img.shape[0])), - interpolation=random.choice([1, 2, 3])) - else: - img = util.imresize_np(img, 1 / 2, True) - img = np.clip(img, 0.0, 1.0) - sf = 2 - - shuffle_order = random.sample(range(7), 7) - idx1, idx2 = shuffle_order.index(2), shuffle_order.index(3) - if idx1 > idx2: # keep downsample3 last - shuffle_order[idx1], shuffle_order[idx2] = shuffle_order[idx2], shuffle_order[idx1] - - for i in shuffle_order: - - if i == 0: - img = add_blur(img, sf=sf) - - elif i == 1: - img = add_blur(img, sf=sf) - - elif i == 2: - a, b = img.shape[1], img.shape[0] - # downsample2 - if random.random() < 0.75: - sf1 = random.uniform(1, 2 * sf) - img = cv2.resize(img, (int(1 / sf1 * img.shape[1]), int(1 / sf1 * img.shape[0])), - interpolation=random.choice([1, 2, 3])) - else: - k = fspecial('gaussian', 25, random.uniform(0.1, 0.6 * sf)) - k_shifted = shift_pixel(k, sf) - k_shifted = k_shifted / k_shifted.sum() # blur with shifted kernel - img = ndimage.filters.convolve(img, np.expand_dims(k_shifted, axis=2), mode='mirror') - img = img[0::sf, 0::sf, ...] # nearest downsampling - img = np.clip(img, 0.0, 1.0) - - elif i == 3: - # downsample3 - img = cv2.resize(img, (int(1 / sf * a), int(1 / sf * b)), interpolation=random.choice([1, 2, 3])) - img = np.clip(img, 0.0, 1.0) - - elif i == 4: - # add Gaussian noise - img = add_Gaussian_noise(img, noise_level1=2, noise_level2=8) - - elif i == 5: - # add JPEG noise - if random.random() < jpeg_prob: - img = add_JPEG_noise(img) - - elif i == 6: - # add processed camera sensor noise - if random.random() < isp_prob and isp_model is not None: - with torch.no_grad(): - img, hq = isp_model.forward(img.copy(), hq) - - # add final JPEG compression noise - img = add_JPEG_noise(img) - - # random crop - img, hq = random_crop(img, hq, sf_ori, lq_patchsize) - - return img, hq - - -# todo no isp_model? -def degradation_bsrgan_variant(image, sf=4, isp_model=None): - """ - This is the degradation model of BSRGAN from the paper - "Designing a Practical Degradation Model for Deep Blind Image Super-Resolution" - ---------- - sf: scale factor - isp_model: camera ISP model - Returns - ------- - img: low-quality patch, size: lq_patchsizeXlq_patchsizeXC, range: [0, 1] - hq: corresponding high-quality patch, size: (lq_patchsizexsf)X(lq_patchsizexsf)XC, range: [0, 1] - """ - image = util.uint2single(image) - isp_prob, jpeg_prob, scale2_prob = 0.25, 0.9, 0.25 - sf_ori = sf - - h1, w1 = image.shape[:2] - image = image.copy()[:w1 - w1 % sf, :h1 - h1 % sf, ...] # mod crop - h, w = image.shape[:2] - - hq = image.copy() - - if sf == 4 and random.random() < scale2_prob: # downsample1 - if np.random.rand() < 0.5: - image = cv2.resize(image, (int(1 / 2 * image.shape[1]), int(1 / 2 * image.shape[0])), - interpolation=random.choice([1, 2, 3])) - else: - image = util.imresize_np(image, 1 / 2, True) - image = np.clip(image, 0.0, 1.0) - sf = 2 - - shuffle_order = random.sample(range(7), 7) - idx1, idx2 = shuffle_order.index(2), shuffle_order.index(3) - if idx1 > idx2: # keep downsample3 last - shuffle_order[idx1], shuffle_order[idx2] = shuffle_order[idx2], shuffle_order[idx1] - - for i in shuffle_order: - - if i == 0: - image = add_blur(image, sf=sf) - - # elif i == 1: - # image = add_blur(image, sf=sf) - - if i == 0: - pass - - elif i == 2: - a, b = image.shape[1], image.shape[0] - # downsample2 - if random.random() < 0.8: - sf1 = random.uniform(1, 2 * sf) - image = cv2.resize(image, (int(1 / sf1 * image.shape[1]), int(1 / sf1 * image.shape[0])), - interpolation=random.choice([1, 2, 3])) - else: - k = fspecial('gaussian', 25, random.uniform(0.1, 0.6 * sf)) - k_shifted = shift_pixel(k, sf) - k_shifted = k_shifted / k_shifted.sum() # blur with shifted kernel - image = ndimage.filters.convolve(image, np.expand_dims(k_shifted, axis=2), mode='mirror') - image = image[0::sf, 0::sf, ...] # nearest downsampling - - image = np.clip(image, 0.0, 1.0) - - elif i == 3: - # downsample3 - image = cv2.resize(image, (int(1 / sf * a), int(1 / sf * b)), interpolation=random.choice([1, 2, 3])) - image = np.clip(image, 0.0, 1.0) - - elif i == 4: - # add Gaussian noise - image = add_Gaussian_noise(image, noise_level1=1, noise_level2=2) - - elif i == 5: - # add JPEG noise - if random.random() < jpeg_prob: - image = add_JPEG_noise(image) - # - # elif i == 6: - # # add processed camera sensor noise - # if random.random() < isp_prob and isp_model is not None: - # with torch.no_grad(): - # img, hq = isp_model.forward(img.copy(), hq) - - # add final JPEG compression noise - image = add_JPEG_noise(image) - image = util.single2uint(image) - example = {"image": image} - return example - - - - -if __name__ == '__main__': - print("hey") - img = util.imread_uint('utils/test.png', 3) - img = img[:448, :448] - h = img.shape[0] // 4 - print("resizing to", h) - sf = 4 - deg_fn = partial(degradation_bsrgan_variant, sf=sf) - for i in range(20): - print(i) - img_hq = img - img_lq = deg_fn(img)["image"] - img_hq, img_lq = util.uint2single(img_hq), util.uint2single(img_lq) - print(img_lq) - img_lq_bicubic = albumentations.SmallestMaxSize(max_size=h, interpolation=cv2.INTER_CUBIC)(image=img_hq)["image"] - print(img_lq.shape) - print("bicubic", img_lq_bicubic.shape) - print(img_hq.shape) - lq_nearest = cv2.resize(util.single2uint(img_lq), (int(sf * img_lq.shape[1]), int(sf * img_lq.shape[0])), - interpolation=0) - lq_bicubic_nearest = cv2.resize(util.single2uint(img_lq_bicubic), - (int(sf * img_lq.shape[1]), int(sf * img_lq.shape[0])), - interpolation=0) - img_concat = np.concatenate([lq_bicubic_nearest, lq_nearest, util.single2uint(img_hq)], axis=1) - util.imsave(img_concat, str(i) + '.png') diff --git a/ldm/modules/image_degradation/utils/test.png b/ldm/modules/image_degradation/utils/test.png deleted file mode 100644 index 4249b43d..00000000 Binary files a/ldm/modules/image_degradation/utils/test.png and /dev/null differ diff --git a/ldm/modules/image_degradation/utils_image.py b/ldm/modules/image_degradation/utils_image.py deleted file mode 100644 index 0175f155..00000000 --- a/ldm/modules/image_degradation/utils_image.py +++ /dev/null @@ -1,916 +0,0 @@ -import os -import math -import random -import numpy as np -import torch -import cv2 -from torchvision.utils import make_grid -from datetime import datetime -#import matplotlib.pyplot as plt # TODO: check with Dominik, also bsrgan.py vs bsrgan_light.py - - -os.environ["KMP_DUPLICATE_LIB_OK"]="TRUE" - - -''' -# -------------------------------------------- -# Kai Zhang (github: https://github.com/cszn) -# 03/Mar/2019 -# -------------------------------------------- -# https://github.com/twhui/SRGAN-pyTorch -# https://github.com/xinntao/BasicSR -# -------------------------------------------- -''' - - -IMG_EXTENSIONS = ['.jpg', '.JPG', '.jpeg', '.JPEG', '.png', '.PNG', '.ppm', '.PPM', '.bmp', '.BMP', '.tif'] - - -def is_image_file(filename): - return any(filename.endswith(extension) for extension in IMG_EXTENSIONS) - - -def get_timestamp(): - return datetime.now().strftime('%y%m%d-%H%M%S') - - -def imshow(x, title=None, cbar=False, figsize=None): - plt.figure(figsize=figsize) - plt.imshow(np.squeeze(x), interpolation='nearest', cmap='gray') - if title: - plt.title(title) - if cbar: - plt.colorbar() - plt.show() - - -def surf(Z, cmap='rainbow', figsize=None): - plt.figure(figsize=figsize) - ax3 = plt.axes(projection='3d') - - w, h = Z.shape[:2] - xx = np.arange(0,w,1) - yy = np.arange(0,h,1) - X, Y = np.meshgrid(xx, yy) - ax3.plot_surface(X,Y,Z,cmap=cmap) - #ax3.contour(X,Y,Z, zdim='z',offset=-2,cmap=cmap) - plt.show() - - -''' -# -------------------------------------------- -# get image pathes -# -------------------------------------------- -''' - - -def get_image_paths(dataroot): - paths = None # return None if dataroot is None - if dataroot is not None: - paths = sorted(_get_paths_from_images(dataroot)) - return paths - - -def _get_paths_from_images(path): - assert os.path.isdir(path), '{:s} is not a valid directory'.format(path) - images = [] - for dirpath, _, fnames in sorted(os.walk(path)): - for fname in sorted(fnames): - if is_image_file(fname): - img_path = os.path.join(dirpath, fname) - images.append(img_path) - assert images, '{:s} has no valid image file'.format(path) - return images - - -''' -# -------------------------------------------- -# split large images into small images -# -------------------------------------------- -''' - - -def patches_from_image(img, p_size=512, p_overlap=64, p_max=800): - w, h = img.shape[:2] - patches = [] - if w > p_max and h > p_max: - w1 = list(np.arange(0, w-p_size, p_size-p_overlap, dtype=np.int)) - h1 = list(np.arange(0, h-p_size, p_size-p_overlap, dtype=np.int)) - w1.append(w-p_size) - h1.append(h-p_size) -# print(w1) -# print(h1) - for i in w1: - for j in h1: - patches.append(img[i:i+p_size, j:j+p_size,:]) - else: - patches.append(img) - - return patches - - -def imssave(imgs, img_path): - """ - imgs: list, N images of size WxHxC - """ - img_name, ext = os.path.splitext(os.path.basename(img_path)) - - for i, img in enumerate(imgs): - if img.ndim == 3: - img = img[:, :, [2, 1, 0]] - new_path = os.path.join(os.path.dirname(img_path), img_name+str('_s{:04d}'.format(i))+'.png') - cv2.imwrite(new_path, img) - - -def split_imageset(original_dataroot, taget_dataroot, n_channels=3, p_size=800, p_overlap=96, p_max=1000): - """ - split the large images from original_dataroot into small overlapped images with size (p_size)x(p_size), - and save them into taget_dataroot; only the images with larger size than (p_max)x(p_max) - will be splitted. - Args: - original_dataroot: - taget_dataroot: - p_size: size of small images - p_overlap: patch size in training is a good choice - p_max: images with smaller size than (p_max)x(p_max) keep unchanged. - """ - paths = get_image_paths(original_dataroot) - for img_path in paths: - # img_name, ext = os.path.splitext(os.path.basename(img_path)) - img = imread_uint(img_path, n_channels=n_channels) - patches = patches_from_image(img, p_size, p_overlap, p_max) - imssave(patches, os.path.join(taget_dataroot,os.path.basename(img_path))) - #if original_dataroot == taget_dataroot: - #del img_path - -''' -# -------------------------------------------- -# makedir -# -------------------------------------------- -''' - - -def mkdir(path): - if not os.path.exists(path): - os.makedirs(path) - - -def mkdirs(paths): - if isinstance(paths, str): - mkdir(paths) - else: - for path in paths: - mkdir(path) - - -def mkdir_and_rename(path): - if os.path.exists(path): - new_name = path + '_archived_' + get_timestamp() - print('Path already exists. Rename it to [{:s}]'.format(new_name)) - os.rename(path, new_name) - os.makedirs(path) - - -''' -# -------------------------------------------- -# read image from path -# opencv is fast, but read BGR numpy image -# -------------------------------------------- -''' - - -# -------------------------------------------- -# get uint8 image of size HxWxn_channles (RGB) -# -------------------------------------------- -def imread_uint(path, n_channels=3): - # input: path - # output: HxWx3(RGB or GGG), or HxWx1 (G) - if n_channels == 1: - img = cv2.imread(path, 0) # cv2.IMREAD_GRAYSCALE - img = np.expand_dims(img, axis=2) # HxWx1 - elif n_channels == 3: - img = cv2.imread(path, cv2.IMREAD_UNCHANGED) # BGR or G - if img.ndim == 2: - img = cv2.cvtColor(img, cv2.COLOR_GRAY2RGB) # GGG - else: - img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB) # RGB - return img - - -# -------------------------------------------- -# matlab's imwrite -# -------------------------------------------- -def imsave(img, img_path): - img = np.squeeze(img) - if img.ndim == 3: - img = img[:, :, [2, 1, 0]] - cv2.imwrite(img_path, img) - -def imwrite(img, img_path): - img = np.squeeze(img) - if img.ndim == 3: - img = img[:, :, [2, 1, 0]] - cv2.imwrite(img_path, img) - - - -# -------------------------------------------- -# get single image of size HxWxn_channles (BGR) -# -------------------------------------------- -def read_img(path): - # read image by cv2 - # return: Numpy float32, HWC, BGR, [0,1] - img = cv2.imread(path, cv2.IMREAD_UNCHANGED) # cv2.IMREAD_GRAYSCALE - img = img.astype(np.float32) / 255. - if img.ndim == 2: - img = np.expand_dims(img, axis=2) - # some images have 4 channels - if img.shape[2] > 3: - img = img[:, :, :3] - return img - - -''' -# -------------------------------------------- -# image format conversion -# -------------------------------------------- -# numpy(single) <---> numpy(unit) -# numpy(single) <---> tensor -# numpy(unit) <---> tensor -# -------------------------------------------- -''' - - -# -------------------------------------------- -# numpy(single) [0, 1] <---> numpy(unit) -# -------------------------------------------- - - -def uint2single(img): - - return np.float32(img/255.) - - -def single2uint(img): - - return np.uint8((img.clip(0, 1)*255.).round()) - - -def uint162single(img): - - return np.float32(img/65535.) - - -def single2uint16(img): - - return np.uint16((img.clip(0, 1)*65535.).round()) - - -# -------------------------------------------- -# numpy(unit) (HxWxC or HxW) <---> tensor -# -------------------------------------------- - - -# convert uint to 4-dimensional torch tensor -def uint2tensor4(img): - if img.ndim == 2: - img = np.expand_dims(img, axis=2) - return torch.from_numpy(np.ascontiguousarray(img)).permute(2, 0, 1).float().div(255.).unsqueeze(0) - - -# convert uint to 3-dimensional torch tensor -def uint2tensor3(img): - if img.ndim == 2: - img = np.expand_dims(img, axis=2) - return torch.from_numpy(np.ascontiguousarray(img)).permute(2, 0, 1).float().div(255.) - - -# convert 2/3/4-dimensional torch tensor to uint -def tensor2uint(img): - img = img.data.squeeze().float().clamp_(0, 1).cpu().numpy() - if img.ndim == 3: - img = np.transpose(img, (1, 2, 0)) - return np.uint8((img*255.0).round()) - - -# -------------------------------------------- -# numpy(single) (HxWxC) <---> tensor -# -------------------------------------------- - - -# convert single (HxWxC) to 3-dimensional torch tensor -def single2tensor3(img): - return torch.from_numpy(np.ascontiguousarray(img)).permute(2, 0, 1).float() - - -# convert single (HxWxC) to 4-dimensional torch tensor -def single2tensor4(img): - return torch.from_numpy(np.ascontiguousarray(img)).permute(2, 0, 1).float().unsqueeze(0) - - -# convert torch tensor to single -def tensor2single(img): - img = img.data.squeeze().float().cpu().numpy() - if img.ndim == 3: - img = np.transpose(img, (1, 2, 0)) - - return img - -# convert torch tensor to single -def tensor2single3(img): - img = img.data.squeeze().float().cpu().numpy() - if img.ndim == 3: - img = np.transpose(img, (1, 2, 0)) - elif img.ndim == 2: - img = np.expand_dims(img, axis=2) - return img - - -def single2tensor5(img): - return torch.from_numpy(np.ascontiguousarray(img)).permute(2, 0, 1, 3).float().unsqueeze(0) - - -def single32tensor5(img): - return torch.from_numpy(np.ascontiguousarray(img)).float().unsqueeze(0).unsqueeze(0) - - -def single42tensor4(img): - return torch.from_numpy(np.ascontiguousarray(img)).permute(2, 0, 1, 3).float() - - -# from skimage.io import imread, imsave -def tensor2img(tensor, out_type=np.uint8, min_max=(0, 1)): - ''' - Converts a torch Tensor into an image Numpy array of BGR channel order - Input: 4D(B,(3/1),H,W), 3D(C,H,W), or 2D(H,W), any range, RGB channel order - Output: 3D(H,W,C) or 2D(H,W), [0,255], np.uint8 (default) - ''' - tensor = tensor.squeeze().float().cpu().clamp_(*min_max) # squeeze first, then clamp - tensor = (tensor - min_max[0]) / (min_max[1] - min_max[0]) # to range [0,1] - n_dim = tensor.dim() - if n_dim == 4: - n_img = len(tensor) - img_np = make_grid(tensor, nrow=int(math.sqrt(n_img)), normalize=False).numpy() - img_np = np.transpose(img_np[[2, 1, 0], :, :], (1, 2, 0)) # HWC, BGR - elif n_dim == 3: - img_np = tensor.numpy() - img_np = np.transpose(img_np[[2, 1, 0], :, :], (1, 2, 0)) # HWC, BGR - elif n_dim == 2: - img_np = tensor.numpy() - else: - raise TypeError( - 'Only support 4D, 3D and 2D tensor. But received with dimension: {:d}'.format(n_dim)) - if out_type == np.uint8: - img_np = (img_np * 255.0).round() - # Important. Unlike matlab, numpy.unit8() WILL NOT round by default. - return img_np.astype(out_type) - - -''' -# -------------------------------------------- -# Augmentation, flipe and/or rotate -# -------------------------------------------- -# The following two are enough. -# (1) augmet_img: numpy image of WxHxC or WxH -# (2) augment_img_tensor4: tensor image 1xCxWxH -# -------------------------------------------- -''' - - -def augment_img(img, mode=0): - '''Kai Zhang (github: https://github.com/cszn) - ''' - if mode == 0: - return img - elif mode == 1: - return np.flipud(np.rot90(img)) - elif mode == 2: - return np.flipud(img) - elif mode == 3: - return np.rot90(img, k=3) - elif mode == 4: - return np.flipud(np.rot90(img, k=2)) - elif mode == 5: - return np.rot90(img) - elif mode == 6: - return np.rot90(img, k=2) - elif mode == 7: - return np.flipud(np.rot90(img, k=3)) - - -def augment_img_tensor4(img, mode=0): - '''Kai Zhang (github: https://github.com/cszn) - ''' - if mode == 0: - return img - elif mode == 1: - return img.rot90(1, [2, 3]).flip([2]) - elif mode == 2: - return img.flip([2]) - elif mode == 3: - return img.rot90(3, [2, 3]) - elif mode == 4: - return img.rot90(2, [2, 3]).flip([2]) - elif mode == 5: - return img.rot90(1, [2, 3]) - elif mode == 6: - return img.rot90(2, [2, 3]) - elif mode == 7: - return img.rot90(3, [2, 3]).flip([2]) - - -def augment_img_tensor(img, mode=0): - '''Kai Zhang (github: https://github.com/cszn) - ''' - img_size = img.size() - img_np = img.data.cpu().numpy() - if len(img_size) == 3: - img_np = np.transpose(img_np, (1, 2, 0)) - elif len(img_size) == 4: - img_np = np.transpose(img_np, (2, 3, 1, 0)) - img_np = augment_img(img_np, mode=mode) - img_tensor = torch.from_numpy(np.ascontiguousarray(img_np)) - if len(img_size) == 3: - img_tensor = img_tensor.permute(2, 0, 1) - elif len(img_size) == 4: - img_tensor = img_tensor.permute(3, 2, 0, 1) - - return img_tensor.type_as(img) - - -def augment_img_np3(img, mode=0): - if mode == 0: - return img - elif mode == 1: - return img.transpose(1, 0, 2) - elif mode == 2: - return img[::-1, :, :] - elif mode == 3: - img = img[::-1, :, :] - img = img.transpose(1, 0, 2) - return img - elif mode == 4: - return img[:, ::-1, :] - elif mode == 5: - img = img[:, ::-1, :] - img = img.transpose(1, 0, 2) - return img - elif mode == 6: - img = img[:, ::-1, :] - img = img[::-1, :, :] - return img - elif mode == 7: - img = img[:, ::-1, :] - img = img[::-1, :, :] - img = img.transpose(1, 0, 2) - return img - - -def augment_imgs(img_list, hflip=True, rot=True): - # horizontal flip OR rotate - hflip = hflip and random.random() < 0.5 - vflip = rot and random.random() < 0.5 - rot90 = rot and random.random() < 0.5 - - def _augment(img): - if hflip: - img = img[:, ::-1, :] - if vflip: - img = img[::-1, :, :] - if rot90: - img = img.transpose(1, 0, 2) - return img - - return [_augment(img) for img in img_list] - - -''' -# -------------------------------------------- -# modcrop and shave -# -------------------------------------------- -''' - - -def modcrop(img_in, scale): - # img_in: Numpy, HWC or HW - img = np.copy(img_in) - if img.ndim == 2: - H, W = img.shape - H_r, W_r = H % scale, W % scale - img = img[:H - H_r, :W - W_r] - elif img.ndim == 3: - H, W, C = img.shape - H_r, W_r = H % scale, W % scale - img = img[:H - H_r, :W - W_r, :] - else: - raise ValueError('Wrong img ndim: [{:d}].'.format(img.ndim)) - return img - - -def shave(img_in, border=0): - # img_in: Numpy, HWC or HW - img = np.copy(img_in) - h, w = img.shape[:2] - img = img[border:h-border, border:w-border] - return img - - -''' -# -------------------------------------------- -# image processing process on numpy image -# channel_convert(in_c, tar_type, img_list): -# rgb2ycbcr(img, only_y=True): -# bgr2ycbcr(img, only_y=True): -# ycbcr2rgb(img): -# -------------------------------------------- -''' - - -def rgb2ycbcr(img, only_y=True): - '''same as matlab rgb2ycbcr - only_y: only return Y channel - Input: - uint8, [0, 255] - float, [0, 1] - ''' - in_img_type = img.dtype - img.astype(np.float32) - if in_img_type != np.uint8: - img *= 255. - # convert - if only_y: - rlt = np.dot(img, [65.481, 128.553, 24.966]) / 255.0 + 16.0 - else: - rlt = np.matmul(img, [[65.481, -37.797, 112.0], [128.553, -74.203, -93.786], - [24.966, 112.0, -18.214]]) / 255.0 + [16, 128, 128] - if in_img_type == np.uint8: - rlt = rlt.round() - else: - rlt /= 255. - return rlt.astype(in_img_type) - - -def ycbcr2rgb(img): - '''same as matlab ycbcr2rgb - Input: - uint8, [0, 255] - float, [0, 1] - ''' - in_img_type = img.dtype - img.astype(np.float32) - if in_img_type != np.uint8: - img *= 255. - # convert - rlt = np.matmul(img, [[0.00456621, 0.00456621, 0.00456621], [0, -0.00153632, 0.00791071], - [0.00625893, -0.00318811, 0]]) * 255.0 + [-222.921, 135.576, -276.836] - if in_img_type == np.uint8: - rlt = rlt.round() - else: - rlt /= 255. - return rlt.astype(in_img_type) - - -def bgr2ycbcr(img, only_y=True): - '''bgr version of rgb2ycbcr - only_y: only return Y channel - Input: - uint8, [0, 255] - float, [0, 1] - ''' - in_img_type = img.dtype - img.astype(np.float32) - if in_img_type != np.uint8: - img *= 255. - # convert - if only_y: - rlt = np.dot(img, [24.966, 128.553, 65.481]) / 255.0 + 16.0 - else: - rlt = np.matmul(img, [[24.966, 112.0, -18.214], [128.553, -74.203, -93.786], - [65.481, -37.797, 112.0]]) / 255.0 + [16, 128, 128] - if in_img_type == np.uint8: - rlt = rlt.round() - else: - rlt /= 255. - return rlt.astype(in_img_type) - - -def channel_convert(in_c, tar_type, img_list): - # conversion among BGR, gray and y - if in_c == 3 and tar_type == 'gray': # BGR to gray - gray_list = [cv2.cvtColor(img, cv2.COLOR_BGR2GRAY) for img in img_list] - return [np.expand_dims(img, axis=2) for img in gray_list] - elif in_c == 3 and tar_type == 'y': # BGR to y - y_list = [bgr2ycbcr(img, only_y=True) for img in img_list] - return [np.expand_dims(img, axis=2) for img in y_list] - elif in_c == 1 and tar_type == 'RGB': # gray/y to BGR - return [cv2.cvtColor(img, cv2.COLOR_GRAY2BGR) for img in img_list] - else: - return img_list - - -''' -# -------------------------------------------- -# metric, PSNR and SSIM -# -------------------------------------------- -''' - - -# -------------------------------------------- -# PSNR -# -------------------------------------------- -def calculate_psnr(img1, img2, border=0): - # img1 and img2 have range [0, 255] - #img1 = img1.squeeze() - #img2 = img2.squeeze() - if not img1.shape == img2.shape: - raise ValueError('Input images must have the same dimensions.') - h, w = img1.shape[:2] - img1 = img1[border:h-border, border:w-border] - img2 = img2[border:h-border, border:w-border] - - img1 = img1.astype(np.float64) - img2 = img2.astype(np.float64) - mse = np.mean((img1 - img2)**2) - if mse == 0: - return float('inf') - return 20 * math.log10(255.0 / math.sqrt(mse)) - - -# -------------------------------------------- -# SSIM -# -------------------------------------------- -def calculate_ssim(img1, img2, border=0): - '''calculate SSIM - the same outputs as MATLAB's - img1, img2: [0, 255] - ''' - #img1 = img1.squeeze() - #img2 = img2.squeeze() - if not img1.shape == img2.shape: - raise ValueError('Input images must have the same dimensions.') - h, w = img1.shape[:2] - img1 = img1[border:h-border, border:w-border] - img2 = img2[border:h-border, border:w-border] - - if img1.ndim == 2: - return ssim(img1, img2) - elif img1.ndim == 3: - if img1.shape[2] == 3: - ssims = [] - for i in range(3): - ssims.append(ssim(img1[:,:,i], img2[:,:,i])) - return np.array(ssims).mean() - elif img1.shape[2] == 1: - return ssim(np.squeeze(img1), np.squeeze(img2)) - else: - raise ValueError('Wrong input image dimensions.') - - -def ssim(img1, img2): - C1 = (0.01 * 255)**2 - C2 = (0.03 * 255)**2 - - img1 = img1.astype(np.float64) - img2 = img2.astype(np.float64) - kernel = cv2.getGaussianKernel(11, 1.5) - window = np.outer(kernel, kernel.transpose()) - - mu1 = cv2.filter2D(img1, -1, window)[5:-5, 5:-5] # valid - mu2 = cv2.filter2D(img2, -1, window)[5:-5, 5:-5] - mu1_sq = mu1**2 - mu2_sq = mu2**2 - mu1_mu2 = mu1 * mu2 - sigma1_sq = cv2.filter2D(img1**2, -1, window)[5:-5, 5:-5] - mu1_sq - sigma2_sq = cv2.filter2D(img2**2, -1, window)[5:-5, 5:-5] - mu2_sq - sigma12 = cv2.filter2D(img1 * img2, -1, window)[5:-5, 5:-5] - mu1_mu2 - - ssim_map = ((2 * mu1_mu2 + C1) * (2 * sigma12 + C2)) / ((mu1_sq + mu2_sq + C1) * - (sigma1_sq + sigma2_sq + C2)) - return ssim_map.mean() - - -''' -# -------------------------------------------- -# matlab's bicubic imresize (numpy and torch) [0, 1] -# -------------------------------------------- -''' - - -# matlab 'imresize' function, now only support 'bicubic' -def cubic(x): - absx = torch.abs(x) - absx2 = absx**2 - absx3 = absx**3 - return (1.5*absx3 - 2.5*absx2 + 1) * ((absx <= 1).type_as(absx)) + \ - (-0.5*absx3 + 2.5*absx2 - 4*absx + 2) * (((absx > 1)*(absx <= 2)).type_as(absx)) - - -def calculate_weights_indices(in_length, out_length, scale, kernel, kernel_width, antialiasing): - if (scale < 1) and (antialiasing): - # Use a modified kernel to simultaneously interpolate and antialias- larger kernel width - kernel_width = kernel_width / scale - - # Output-space coordinates - x = torch.linspace(1, out_length, out_length) - - # Input-space coordinates. Calculate the inverse mapping such that 0.5 - # in output space maps to 0.5 in input space, and 0.5+scale in output - # space maps to 1.5 in input space. - u = x / scale + 0.5 * (1 - 1 / scale) - - # What is the left-most pixel that can be involved in the computation? - left = torch.floor(u - kernel_width / 2) - - # What is the maximum number of pixels that can be involved in the - # computation? Note: it's OK to use an extra pixel here; if the - # corresponding weights are all zero, it will be eliminated at the end - # of this function. - P = math.ceil(kernel_width) + 2 - - # The indices of the input pixels involved in computing the k-th output - # pixel are in row k of the indices matrix. - indices = left.view(out_length, 1).expand(out_length, P) + torch.linspace(0, P - 1, P).view( - 1, P).expand(out_length, P) - - # The weights used to compute the k-th output pixel are in row k of the - # weights matrix. - distance_to_center = u.view(out_length, 1).expand(out_length, P) - indices - # apply cubic kernel - if (scale < 1) and (antialiasing): - weights = scale * cubic(distance_to_center * scale) - else: - weights = cubic(distance_to_center) - # Normalize the weights matrix so that each row sums to 1. - weights_sum = torch.sum(weights, 1).view(out_length, 1) - weights = weights / weights_sum.expand(out_length, P) - - # If a column in weights is all zero, get rid of it. only consider the first and last column. - weights_zero_tmp = torch.sum((weights == 0), 0) - if not math.isclose(weights_zero_tmp[0], 0, rel_tol=1e-6): - indices = indices.narrow(1, 1, P - 2) - weights = weights.narrow(1, 1, P - 2) - if not math.isclose(weights_zero_tmp[-1], 0, rel_tol=1e-6): - indices = indices.narrow(1, 0, P - 2) - weights = weights.narrow(1, 0, P - 2) - weights = weights.contiguous() - indices = indices.contiguous() - sym_len_s = -indices.min() + 1 - sym_len_e = indices.max() - in_length - indices = indices + sym_len_s - 1 - return weights, indices, int(sym_len_s), int(sym_len_e) - - -# -------------------------------------------- -# imresize for tensor image [0, 1] -# -------------------------------------------- -def imresize(img, scale, antialiasing=True): - # Now the scale should be the same for H and W - # input: img: pytorch tensor, CHW or HW [0,1] - # output: CHW or HW [0,1] w/o round - need_squeeze = True if img.dim() == 2 else False - if need_squeeze: - img.unsqueeze_(0) - in_C, in_H, in_W = img.size() - out_C, out_H, out_W = in_C, math.ceil(in_H * scale), math.ceil(in_W * scale) - kernel_width = 4 - kernel = 'cubic' - - # Return the desired dimension order for performing the resize. The - # strategy is to perform the resize first along the dimension with the - # smallest scale factor. - # Now we do not support this. - - # get weights and indices - weights_H, indices_H, sym_len_Hs, sym_len_He = calculate_weights_indices( - in_H, out_H, scale, kernel, kernel_width, antialiasing) - weights_W, indices_W, sym_len_Ws, sym_len_We = calculate_weights_indices( - in_W, out_W, scale, kernel, kernel_width, antialiasing) - # process H dimension - # symmetric copying - img_aug = torch.FloatTensor(in_C, in_H + sym_len_Hs + sym_len_He, in_W) - img_aug.narrow(1, sym_len_Hs, in_H).copy_(img) - - sym_patch = img[:, :sym_len_Hs, :] - inv_idx = torch.arange(sym_patch.size(1) - 1, -1, -1).long() - sym_patch_inv = sym_patch.index_select(1, inv_idx) - img_aug.narrow(1, 0, sym_len_Hs).copy_(sym_patch_inv) - - sym_patch = img[:, -sym_len_He:, :] - inv_idx = torch.arange(sym_patch.size(1) - 1, -1, -1).long() - sym_patch_inv = sym_patch.index_select(1, inv_idx) - img_aug.narrow(1, sym_len_Hs + in_H, sym_len_He).copy_(sym_patch_inv) - - out_1 = torch.FloatTensor(in_C, out_H, in_W) - kernel_width = weights_H.size(1) - for i in range(out_H): - idx = int(indices_H[i][0]) - for j in range(out_C): - out_1[j, i, :] = img_aug[j, idx:idx + kernel_width, :].transpose(0, 1).mv(weights_H[i]) - - # process W dimension - # symmetric copying - out_1_aug = torch.FloatTensor(in_C, out_H, in_W + sym_len_Ws + sym_len_We) - out_1_aug.narrow(2, sym_len_Ws, in_W).copy_(out_1) - - sym_patch = out_1[:, :, :sym_len_Ws] - inv_idx = torch.arange(sym_patch.size(2) - 1, -1, -1).long() - sym_patch_inv = sym_patch.index_select(2, inv_idx) - out_1_aug.narrow(2, 0, sym_len_Ws).copy_(sym_patch_inv) - - sym_patch = out_1[:, :, -sym_len_We:] - inv_idx = torch.arange(sym_patch.size(2) - 1, -1, -1).long() - sym_patch_inv = sym_patch.index_select(2, inv_idx) - out_1_aug.narrow(2, sym_len_Ws + in_W, sym_len_We).copy_(sym_patch_inv) - - out_2 = torch.FloatTensor(in_C, out_H, out_W) - kernel_width = weights_W.size(1) - for i in range(out_W): - idx = int(indices_W[i][0]) - for j in range(out_C): - out_2[j, :, i] = out_1_aug[j, :, idx:idx + kernel_width].mv(weights_W[i]) - if need_squeeze: - out_2.squeeze_() - return out_2 - - -# -------------------------------------------- -# imresize for numpy image [0, 1] -# -------------------------------------------- -def imresize_np(img, scale, antialiasing=True): - # Now the scale should be the same for H and W - # input: img: Numpy, HWC or HW [0,1] - # output: HWC or HW [0,1] w/o round - img = torch.from_numpy(img) - need_squeeze = True if img.dim() == 2 else False - if need_squeeze: - img.unsqueeze_(2) - - in_H, in_W, in_C = img.size() - out_C, out_H, out_W = in_C, math.ceil(in_H * scale), math.ceil(in_W * scale) - kernel_width = 4 - kernel = 'cubic' - - # Return the desired dimension order for performing the resize. The - # strategy is to perform the resize first along the dimension with the - # smallest scale factor. - # Now we do not support this. - - # get weights and indices - weights_H, indices_H, sym_len_Hs, sym_len_He = calculate_weights_indices( - in_H, out_H, scale, kernel, kernel_width, antialiasing) - weights_W, indices_W, sym_len_Ws, sym_len_We = calculate_weights_indices( - in_W, out_W, scale, kernel, kernel_width, antialiasing) - # process H dimension - # symmetric copying - img_aug = torch.FloatTensor(in_H + sym_len_Hs + sym_len_He, in_W, in_C) - img_aug.narrow(0, sym_len_Hs, in_H).copy_(img) - - sym_patch = img[:sym_len_Hs, :, :] - inv_idx = torch.arange(sym_patch.size(0) - 1, -1, -1).long() - sym_patch_inv = sym_patch.index_select(0, inv_idx) - img_aug.narrow(0, 0, sym_len_Hs).copy_(sym_patch_inv) - - sym_patch = img[-sym_len_He:, :, :] - inv_idx = torch.arange(sym_patch.size(0) - 1, -1, -1).long() - sym_patch_inv = sym_patch.index_select(0, inv_idx) - img_aug.narrow(0, sym_len_Hs + in_H, sym_len_He).copy_(sym_patch_inv) - - out_1 = torch.FloatTensor(out_H, in_W, in_C) - kernel_width = weights_H.size(1) - for i in range(out_H): - idx = int(indices_H[i][0]) - for j in range(out_C): - out_1[i, :, j] = img_aug[idx:idx + kernel_width, :, j].transpose(0, 1).mv(weights_H[i]) - - # process W dimension - # symmetric copying - out_1_aug = torch.FloatTensor(out_H, in_W + sym_len_Ws + sym_len_We, in_C) - out_1_aug.narrow(1, sym_len_Ws, in_W).copy_(out_1) - - sym_patch = out_1[:, :sym_len_Ws, :] - inv_idx = torch.arange(sym_patch.size(1) - 1, -1, -1).long() - sym_patch_inv = sym_patch.index_select(1, inv_idx) - out_1_aug.narrow(1, 0, sym_len_Ws).copy_(sym_patch_inv) - - sym_patch = out_1[:, -sym_len_We:, :] - inv_idx = torch.arange(sym_patch.size(1) - 1, -1, -1).long() - sym_patch_inv = sym_patch.index_select(1, inv_idx) - out_1_aug.narrow(1, sym_len_Ws + in_W, sym_len_We).copy_(sym_patch_inv) - - out_2 = torch.FloatTensor(out_H, out_W, in_C) - kernel_width = weights_W.size(1) - for i in range(out_W): - idx = int(indices_W[i][0]) - for j in range(out_C): - out_2[:, i, j] = out_1_aug[:, idx:idx + kernel_width, j].mv(weights_W[i]) - if need_squeeze: - out_2.squeeze_() - - return out_2.numpy() - - -if __name__ == '__main__': - print('---') -# img = imread_uint('test.bmp', 3) -# img = uint2single(img) -# img_bicubic = imresize_np(img, 1/4) \ No newline at end of file diff --git a/ldm/modules/losses/__init__.py b/ldm/modules/losses/__init__.py deleted file mode 100644 index 876d7c5b..00000000 --- a/ldm/modules/losses/__init__.py +++ /dev/null @@ -1 +0,0 @@ -from ldm.modules.losses.contperceptual import LPIPSWithDiscriminator \ No newline at end of file diff --git a/ldm/modules/losses/contperceptual.py b/ldm/modules/losses/contperceptual.py deleted file mode 100644 index 672c1e32..00000000 --- a/ldm/modules/losses/contperceptual.py +++ /dev/null @@ -1,111 +0,0 @@ -import torch -import torch.nn as nn - -from taming.modules.losses.vqperceptual import * # TODO: taming dependency yes/no? - - -class LPIPSWithDiscriminator(nn.Module): - def __init__(self, disc_start, logvar_init=0.0, kl_weight=1.0, pixelloss_weight=1.0, - disc_num_layers=3, disc_in_channels=3, disc_factor=1.0, disc_weight=1.0, - perceptual_weight=1.0, use_actnorm=False, disc_conditional=False, - disc_loss="hinge"): - - super().__init__() - assert disc_loss in ["hinge", "vanilla"] - self.kl_weight = kl_weight - self.pixel_weight = pixelloss_weight - self.perceptual_loss = LPIPS().eval() - self.perceptual_weight = perceptual_weight - # output log variance - self.logvar = nn.Parameter(torch.ones(size=()) * logvar_init) - - self.discriminator = NLayerDiscriminator(input_nc=disc_in_channels, - n_layers=disc_num_layers, - use_actnorm=use_actnorm - ).apply(weights_init) - self.discriminator_iter_start = disc_start - self.disc_loss = hinge_d_loss if disc_loss == "hinge" else vanilla_d_loss - self.disc_factor = disc_factor - self.discriminator_weight = disc_weight - self.disc_conditional = disc_conditional - - def calculate_adaptive_weight(self, nll_loss, g_loss, last_layer=None): - if last_layer is not None: - nll_grads = torch.autograd.grad(nll_loss, last_layer, retain_graph=True)[0] - g_grads = torch.autograd.grad(g_loss, last_layer, retain_graph=True)[0] - else: - nll_grads = torch.autograd.grad(nll_loss, self.last_layer[0], retain_graph=True)[0] - g_grads = torch.autograd.grad(g_loss, self.last_layer[0], retain_graph=True)[0] - - d_weight = torch.norm(nll_grads) / (torch.norm(g_grads) + 1e-4) - d_weight = torch.clamp(d_weight, 0.0, 1e4).detach() - d_weight = d_weight * self.discriminator_weight - return d_weight - - def forward(self, inputs, reconstructions, posteriors, optimizer_idx, - global_step, last_layer=None, cond=None, split="train", - weights=None): - rec_loss = torch.abs(inputs.contiguous() - reconstructions.contiguous()) - if self.perceptual_weight > 0: - p_loss = self.perceptual_loss(inputs.contiguous(), reconstructions.contiguous()) - rec_loss = rec_loss + self.perceptual_weight * p_loss - - nll_loss = rec_loss / torch.exp(self.logvar) + self.logvar - weighted_nll_loss = nll_loss - if weights is not None: - weighted_nll_loss = weights*nll_loss - weighted_nll_loss = torch.sum(weighted_nll_loss) / weighted_nll_loss.shape[0] - nll_loss = torch.sum(nll_loss) / nll_loss.shape[0] - kl_loss = posteriors.kl() - kl_loss = torch.sum(kl_loss) / kl_loss.shape[0] - - # now the GAN part - if optimizer_idx == 0: - # generator update - if cond is None: - assert not self.disc_conditional - logits_fake = self.discriminator(reconstructions.contiguous()) - else: - assert self.disc_conditional - logits_fake = self.discriminator(torch.cat((reconstructions.contiguous(), cond), dim=1)) - g_loss = -torch.mean(logits_fake) - - if self.disc_factor > 0.0: - try: - d_weight = self.calculate_adaptive_weight(nll_loss, g_loss, last_layer=last_layer) - except RuntimeError: - assert not self.training - d_weight = torch.tensor(0.0) - else: - d_weight = torch.tensor(0.0) - - disc_factor = adopt_weight(self.disc_factor, global_step, threshold=self.discriminator_iter_start) - loss = weighted_nll_loss + self.kl_weight * kl_loss + d_weight * disc_factor * g_loss - - log = {"{}/total_loss".format(split): loss.clone().detach().mean(), "{}/logvar".format(split): self.logvar.detach(), - "{}/kl_loss".format(split): kl_loss.detach().mean(), "{}/nll_loss".format(split): nll_loss.detach().mean(), - "{}/rec_loss".format(split): rec_loss.detach().mean(), - "{}/d_weight".format(split): d_weight.detach(), - "{}/disc_factor".format(split): torch.tensor(disc_factor), - "{}/g_loss".format(split): g_loss.detach().mean(), - } - return loss, log - - if optimizer_idx == 1: - # second pass for discriminator update - if cond is None: - logits_real = self.discriminator(inputs.contiguous().detach()) - logits_fake = self.discriminator(reconstructions.contiguous().detach()) - else: - logits_real = self.discriminator(torch.cat((inputs.contiguous().detach(), cond), dim=1)) - logits_fake = self.discriminator(torch.cat((reconstructions.contiguous().detach(), cond), dim=1)) - - disc_factor = adopt_weight(self.disc_factor, global_step, threshold=self.discriminator_iter_start) - d_loss = disc_factor * self.disc_loss(logits_real, logits_fake) - - log = {"{}/disc_loss".format(split): d_loss.clone().detach().mean(), - "{}/logits_real".format(split): logits_real.detach().mean(), - "{}/logits_fake".format(split): logits_fake.detach().mean() - } - return d_loss, log - diff --git a/ldm/modules/losses/vqperceptual.py b/ldm/modules/losses/vqperceptual.py deleted file mode 100644 index f6998176..00000000 --- a/ldm/modules/losses/vqperceptual.py +++ /dev/null @@ -1,167 +0,0 @@ -import torch -from torch import nn -import torch.nn.functional as F -from einops import repeat - -from taming.modules.discriminator.model import NLayerDiscriminator, weights_init -from taming.modules.losses.lpips import LPIPS -from taming.modules.losses.vqperceptual import hinge_d_loss, vanilla_d_loss - - -def hinge_d_loss_with_exemplar_weights(logits_real, logits_fake, weights): - assert weights.shape[0] == logits_real.shape[0] == logits_fake.shape[0] - loss_real = torch.mean(F.relu(1. - logits_real), dim=[1,2,3]) - loss_fake = torch.mean(F.relu(1. + logits_fake), dim=[1,2,3]) - loss_real = (weights * loss_real).sum() / weights.sum() - loss_fake = (weights * loss_fake).sum() / weights.sum() - d_loss = 0.5 * (loss_real + loss_fake) - return d_loss - -def adopt_weight(weight, global_step, threshold=0, value=0.): - if global_step < threshold: - weight = value - return weight - - -def measure_perplexity(predicted_indices, n_embed): - # src: https://github.com/karpathy/deep-vector-quantization/blob/main/model.py - # eval cluster perplexity. when perplexity == num_embeddings then all clusters are used exactly equally - encodings = F.one_hot(predicted_indices, n_embed).float().reshape(-1, n_embed) - avg_probs = encodings.mean(0) - perplexity = (-(avg_probs * torch.log(avg_probs + 1e-10)).sum()).exp() - cluster_use = torch.sum(avg_probs > 0) - return perplexity, cluster_use - -def l1(x, y): - return torch.abs(x-y) - - -def l2(x, y): - return torch.pow((x-y), 2) - - -class VQLPIPSWithDiscriminator(nn.Module): - def __init__(self, disc_start, codebook_weight=1.0, pixelloss_weight=1.0, - disc_num_layers=3, disc_in_channels=3, disc_factor=1.0, disc_weight=1.0, - perceptual_weight=1.0, use_actnorm=False, disc_conditional=False, - disc_ndf=64, disc_loss="hinge", n_classes=None, perceptual_loss="lpips", - pixel_loss="l1"): - super().__init__() - assert disc_loss in ["hinge", "vanilla"] - assert perceptual_loss in ["lpips", "clips", "dists"] - assert pixel_loss in ["l1", "l2"] - self.codebook_weight = codebook_weight - self.pixel_weight = pixelloss_weight - if perceptual_loss == "lpips": - print(f"{self.__class__.__name__}: Running with LPIPS.") - self.perceptual_loss = LPIPS().eval() - else: - raise ValueError(f"Unknown perceptual loss: >> {perceptual_loss} <<") - self.perceptual_weight = perceptual_weight - - if pixel_loss == "l1": - self.pixel_loss = l1 - else: - self.pixel_loss = l2 - - self.discriminator = NLayerDiscriminator(input_nc=disc_in_channels, - n_layers=disc_num_layers, - use_actnorm=use_actnorm, - ndf=disc_ndf - ).apply(weights_init) - self.discriminator_iter_start = disc_start - if disc_loss == "hinge": - self.disc_loss = hinge_d_loss - elif disc_loss == "vanilla": - self.disc_loss = vanilla_d_loss - else: - raise ValueError(f"Unknown GAN loss '{disc_loss}'.") - print(f"VQLPIPSWithDiscriminator running with {disc_loss} loss.") - self.disc_factor = disc_factor - self.discriminator_weight = disc_weight - self.disc_conditional = disc_conditional - self.n_classes = n_classes - - def calculate_adaptive_weight(self, nll_loss, g_loss, last_layer=None): - if last_layer is not None: - nll_grads = torch.autograd.grad(nll_loss, last_layer, retain_graph=True)[0] - g_grads = torch.autograd.grad(g_loss, last_layer, retain_graph=True)[0] - else: - nll_grads = torch.autograd.grad(nll_loss, self.last_layer[0], retain_graph=True)[0] - g_grads = torch.autograd.grad(g_loss, self.last_layer[0], retain_graph=True)[0] - - d_weight = torch.norm(nll_grads) / (torch.norm(g_grads) + 1e-4) - d_weight = torch.clamp(d_weight, 0.0, 1e4).detach() - d_weight = d_weight * self.discriminator_weight - return d_weight - - def forward(self, codebook_loss, inputs, reconstructions, optimizer_idx, - global_step, last_layer=None, cond=None, split="train", predicted_indices=None): - if not exists(codebook_loss): - codebook_loss = torch.tensor([0.]).to(inputs.device) - #rec_loss = torch.abs(inputs.contiguous() - reconstructions.contiguous()) - rec_loss = self.pixel_loss(inputs.contiguous(), reconstructions.contiguous()) - if self.perceptual_weight > 0: - p_loss = self.perceptual_loss(inputs.contiguous(), reconstructions.contiguous()) - rec_loss = rec_loss + self.perceptual_weight * p_loss - else: - p_loss = torch.tensor([0.0]) - - nll_loss = rec_loss - #nll_loss = torch.sum(nll_loss) / nll_loss.shape[0] - nll_loss = torch.mean(nll_loss) - - # now the GAN part - if optimizer_idx == 0: - # generator update - if cond is None: - assert not self.disc_conditional - logits_fake = self.discriminator(reconstructions.contiguous()) - else: - assert self.disc_conditional - logits_fake = self.discriminator(torch.cat((reconstructions.contiguous(), cond), dim=1)) - g_loss = -torch.mean(logits_fake) - - try: - d_weight = self.calculate_adaptive_weight(nll_loss, g_loss, last_layer=last_layer) - except RuntimeError: - assert not self.training - d_weight = torch.tensor(0.0) - - disc_factor = adopt_weight(self.disc_factor, global_step, threshold=self.discriminator_iter_start) - loss = nll_loss + d_weight * disc_factor * g_loss + self.codebook_weight * codebook_loss.mean() - - log = {"{}/total_loss".format(split): loss.clone().detach().mean(), - "{}/quant_loss".format(split): codebook_loss.detach().mean(), - "{}/nll_loss".format(split): nll_loss.detach().mean(), - "{}/rec_loss".format(split): rec_loss.detach().mean(), - "{}/p_loss".format(split): p_loss.detach().mean(), - "{}/d_weight".format(split): d_weight.detach(), - "{}/disc_factor".format(split): torch.tensor(disc_factor), - "{}/g_loss".format(split): g_loss.detach().mean(), - } - if predicted_indices is not None: - assert self.n_classes is not None - with torch.no_grad(): - perplexity, cluster_usage = measure_perplexity(predicted_indices, self.n_classes) - log[f"{split}/perplexity"] = perplexity - log[f"{split}/cluster_usage"] = cluster_usage - return loss, log - - if optimizer_idx == 1: - # second pass for discriminator update - if cond is None: - logits_real = self.discriminator(inputs.contiguous().detach()) - logits_fake = self.discriminator(reconstructions.contiguous().detach()) - else: - logits_real = self.discriminator(torch.cat((inputs.contiguous().detach(), cond), dim=1)) - logits_fake = self.discriminator(torch.cat((reconstructions.contiguous().detach(), cond), dim=1)) - - disc_factor = adopt_weight(self.disc_factor, global_step, threshold=self.discriminator_iter_start) - d_loss = disc_factor * self.disc_loss(logits_real, logits_fake) - - log = {"{}/disc_loss".format(split): d_loss.clone().detach().mean(), - "{}/logits_real".format(split): logits_real.detach().mean(), - "{}/logits_fake".format(split): logits_fake.detach().mean() - } - return d_loss, log diff --git a/ldm/modules/x_transformer.py b/ldm/modules/x_transformer.py deleted file mode 100644 index 5fc15bf9..00000000 --- a/ldm/modules/x_transformer.py +++ /dev/null @@ -1,641 +0,0 @@ -"""shout-out to https://github.com/lucidrains/x-transformers/tree/main/x_transformers""" -import torch -from torch import nn, einsum -import torch.nn.functional as F -from functools import partial -from inspect import isfunction -from collections import namedtuple -from einops import rearrange, repeat, reduce - -# constants - -DEFAULT_DIM_HEAD = 64 - -Intermediates = namedtuple('Intermediates', [ - 'pre_softmax_attn', - 'post_softmax_attn' -]) - -LayerIntermediates = namedtuple('Intermediates', [ - 'hiddens', - 'attn_intermediates' -]) - - -class AbsolutePositionalEmbedding(nn.Module): - def __init__(self, dim, max_seq_len): - super().__init__() - self.emb = nn.Embedding(max_seq_len, dim) - self.init_() - - def init_(self): - nn.init.normal_(self.emb.weight, std=0.02) - - def forward(self, x): - n = torch.arange(x.shape[1], device=x.device) - return self.emb(n)[None, :, :] - - -class FixedPositionalEmbedding(nn.Module): - def __init__(self, dim): - super().__init__() - inv_freq = 1. / (10000 ** (torch.arange(0, dim, 2).float() / dim)) - self.register_buffer('inv_freq', inv_freq) - - def forward(self, x, seq_dim=1, offset=0): - t = torch.arange(x.shape[seq_dim], device=x.device).type_as(self.inv_freq) + offset - sinusoid_inp = torch.einsum('i , j -> i j', t, self.inv_freq) - emb = torch.cat((sinusoid_inp.sin(), sinusoid_inp.cos()), dim=-1) - return emb[None, :, :] - - -# helpers - -def exists(val): - return val is not None - - -def default(val, d): - if exists(val): - return val - return d() if isfunction(d) else d - - -def always(val): - def inner(*args, **kwargs): - return val - return inner - - -def not_equals(val): - def inner(x): - return x != val - return inner - - -def equals(val): - def inner(x): - return x == val - return inner - - -def max_neg_value(tensor): - return -torch.finfo(tensor.dtype).max - - -# keyword argument helpers - -def pick_and_pop(keys, d): - values = list(map(lambda key: d.pop(key), keys)) - return dict(zip(keys, values)) - - -def group_dict_by_key(cond, d): - return_val = [dict(), dict()] - for key in d.keys(): - match = bool(cond(key)) - ind = int(not match) - return_val[ind][key] = d[key] - return (*return_val,) - - -def string_begins_with(prefix, str): - return str.startswith(prefix) - - -def group_by_key_prefix(prefix, d): - return group_dict_by_key(partial(string_begins_with, prefix), d) - - -def groupby_prefix_and_trim(prefix, d): - kwargs_with_prefix, kwargs = group_dict_by_key(partial(string_begins_with, prefix), d) - kwargs_without_prefix = dict(map(lambda x: (x[0][len(prefix):], x[1]), tuple(kwargs_with_prefix.items()))) - return kwargs_without_prefix, kwargs - - -# classes -class Scale(nn.Module): - def __init__(self, value, fn): - super().__init__() - self.value = value - self.fn = fn - - def forward(self, x, **kwargs): - x, *rest = self.fn(x, **kwargs) - return (x * self.value, *rest) - - -class Rezero(nn.Module): - def __init__(self, fn): - super().__init__() - self.fn = fn - self.g = nn.Parameter(torch.zeros(1)) - - def forward(self, x, **kwargs): - x, *rest = self.fn(x, **kwargs) - return (x * self.g, *rest) - - -class ScaleNorm(nn.Module): - def __init__(self, dim, eps=1e-5): - super().__init__() - self.scale = dim ** -0.5 - self.eps = eps - self.g = nn.Parameter(torch.ones(1)) - - def forward(self, x): - norm = torch.norm(x, dim=-1, keepdim=True) * self.scale - return x / norm.clamp(min=self.eps) * self.g - - -class RMSNorm(nn.Module): - def __init__(self, dim, eps=1e-8): - super().__init__() - self.scale = dim ** -0.5 - self.eps = eps - self.g = nn.Parameter(torch.ones(dim)) - - def forward(self, x): - norm = torch.norm(x, dim=-1, keepdim=True) * self.scale - return x / norm.clamp(min=self.eps) * self.g - - -class Residual(nn.Module): - def forward(self, x, residual): - return x + residual - - -class GRUGating(nn.Module): - def __init__(self, dim): - super().__init__() - self.gru = nn.GRUCell(dim, dim) - - def forward(self, x, residual): - gated_output = self.gru( - rearrange(x, 'b n d -> (b n) d'), - rearrange(residual, 'b n d -> (b n) d') - ) - - return gated_output.reshape_as(x) - - -# feedforward - -class GEGLU(nn.Module): - def __init__(self, dim_in, dim_out): - super().__init__() - self.proj = nn.Linear(dim_in, dim_out * 2) - - def forward(self, x): - x, gate = self.proj(x).chunk(2, dim=-1) - return x * F.gelu(gate) - - -class FeedForward(nn.Module): - def __init__(self, dim, dim_out=None, mult=4, glu=False, dropout=0.): - super().__init__() - inner_dim = int(dim * mult) - dim_out = default(dim_out, dim) - project_in = nn.Sequential( - nn.Linear(dim, inner_dim), - nn.GELU() - ) if not glu else GEGLU(dim, inner_dim) - - self.net = nn.Sequential( - project_in, - nn.Dropout(dropout), - nn.Linear(inner_dim, dim_out) - ) - - def forward(self, x): - return self.net(x) - - -# attention. -class Attention(nn.Module): - def __init__( - self, - dim, - dim_head=DEFAULT_DIM_HEAD, - heads=8, - causal=False, - mask=None, - talking_heads=False, - sparse_topk=None, - use_entmax15=False, - num_mem_kv=0, - dropout=0., - on_attn=False - ): - super().__init__() - if use_entmax15: - raise NotImplementedError("Check out entmax activation instead of softmax activation!") - self.scale = dim_head ** -0.5 - self.heads = heads - self.causal = causal - self.mask = mask - - inner_dim = dim_head * heads - - self.to_q = nn.Linear(dim, inner_dim, bias=False) - self.to_k = nn.Linear(dim, inner_dim, bias=False) - self.to_v = nn.Linear(dim, inner_dim, bias=False) - self.dropout = nn.Dropout(dropout) - - # talking heads - self.talking_heads = talking_heads - if talking_heads: - self.pre_softmax_proj = nn.Parameter(torch.randn(heads, heads)) - self.post_softmax_proj = nn.Parameter(torch.randn(heads, heads)) - - # explicit topk sparse attention - self.sparse_topk = sparse_topk - - # entmax - #self.attn_fn = entmax15 if use_entmax15 else F.softmax - self.attn_fn = F.softmax - - # add memory key / values - self.num_mem_kv = num_mem_kv - if num_mem_kv > 0: - self.mem_k = nn.Parameter(torch.randn(heads, num_mem_kv, dim_head)) - self.mem_v = nn.Parameter(torch.randn(heads, num_mem_kv, dim_head)) - - # attention on attention - self.attn_on_attn = on_attn - self.to_out = nn.Sequential(nn.Linear(inner_dim, dim * 2), nn.GLU()) if on_attn else nn.Linear(inner_dim, dim) - - def forward( - self, - x, - context=None, - mask=None, - context_mask=None, - rel_pos=None, - sinusoidal_emb=None, - prev_attn=None, - mem=None - ): - b, n, _, h, talking_heads, device = *x.shape, self.heads, self.talking_heads, x.device - kv_input = default(context, x) - - q_input = x - k_input = kv_input - v_input = kv_input - - if exists(mem): - k_input = torch.cat((mem, k_input), dim=-2) - v_input = torch.cat((mem, v_input), dim=-2) - - if exists(sinusoidal_emb): - # in shortformer, the query would start at a position offset depending on the past cached memory - offset = k_input.shape[-2] - q_input.shape[-2] - q_input = q_input + sinusoidal_emb(q_input, offset=offset) - k_input = k_input + sinusoidal_emb(k_input) - - q = self.to_q(q_input) - k = self.to_k(k_input) - v = self.to_v(v_input) - - q, k, v = map(lambda t: rearrange(t, 'b n (h d) -> b h n d', h=h), (q, k, v)) - - input_mask = None - if any(map(exists, (mask, context_mask))): - q_mask = default(mask, lambda: torch.ones((b, n), device=device).bool()) - k_mask = q_mask if not exists(context) else context_mask - k_mask = default(k_mask, lambda: torch.ones((b, k.shape[-2]), device=device).bool()) - q_mask = rearrange(q_mask, 'b i -> b () i ()') - k_mask = rearrange(k_mask, 'b j -> b () () j') - input_mask = q_mask * k_mask - - if self.num_mem_kv > 0: - mem_k, mem_v = map(lambda t: repeat(t, 'h n d -> b h n d', b=b), (self.mem_k, self.mem_v)) - k = torch.cat((mem_k, k), dim=-2) - v = torch.cat((mem_v, v), dim=-2) - if exists(input_mask): - input_mask = F.pad(input_mask, (self.num_mem_kv, 0), value=True) - - dots = einsum('b h i d, b h j d -> b h i j', q, k) * self.scale - mask_value = max_neg_value(dots) - - if exists(prev_attn): - dots = dots + prev_attn - - pre_softmax_attn = dots - - if talking_heads: - dots = einsum('b h i j, h k -> b k i j', dots, self.pre_softmax_proj).contiguous() - - if exists(rel_pos): - dots = rel_pos(dots) - - if exists(input_mask): - dots.masked_fill_(~input_mask, mask_value) - del input_mask - - if self.causal: - i, j = dots.shape[-2:] - r = torch.arange(i, device=device) - mask = rearrange(r, 'i -> () () i ()') < rearrange(r, 'j -> () () () j') - mask = F.pad(mask, (j - i, 0), value=False) - dots.masked_fill_(mask, mask_value) - del mask - - if exists(self.sparse_topk) and self.sparse_topk < dots.shape[-1]: - top, _ = dots.topk(self.sparse_topk, dim=-1) - vk = top[..., -1].unsqueeze(-1).expand_as(dots) - mask = dots < vk - dots.masked_fill_(mask, mask_value) - del mask - - attn = self.attn_fn(dots, dim=-1) - post_softmax_attn = attn - - attn = self.dropout(attn) - - if talking_heads: - attn = einsum('b h i j, h k -> b k i j', attn, self.post_softmax_proj).contiguous() - - out = einsum('b h i j, b h j d -> b h i d', attn, v) - out = rearrange(out, 'b h n d -> b n (h d)') - - intermediates = Intermediates( - pre_softmax_attn=pre_softmax_attn, - post_softmax_attn=post_softmax_attn - ) - - return self.to_out(out), intermediates - - -class AttentionLayers(nn.Module): - def __init__( - self, - dim, - depth, - heads=8, - causal=False, - cross_attend=False, - only_cross=False, - use_scalenorm=False, - use_rmsnorm=False, - use_rezero=False, - rel_pos_num_buckets=32, - rel_pos_max_distance=128, - position_infused_attn=False, - custom_layers=None, - sandwich_coef=None, - par_ratio=None, - residual_attn=False, - cross_residual_attn=False, - macaron=False, - pre_norm=True, - gate_residual=False, - **kwargs - ): - super().__init__() - ff_kwargs, kwargs = groupby_prefix_and_trim('ff_', kwargs) - attn_kwargs, _ = groupby_prefix_and_trim('attn_', kwargs) - - dim_head = attn_kwargs.get('dim_head', DEFAULT_DIM_HEAD) - - self.dim = dim - self.depth = depth - self.layers = nn.ModuleList([]) - - self.has_pos_emb = position_infused_attn - self.pia_pos_emb = FixedPositionalEmbedding(dim) if position_infused_attn else None - self.rotary_pos_emb = always(None) - - assert rel_pos_num_buckets <= rel_pos_max_distance, 'number of relative position buckets must be less than the relative position max distance' - self.rel_pos = None - - self.pre_norm = pre_norm - - self.residual_attn = residual_attn - self.cross_residual_attn = cross_residual_attn - - norm_class = ScaleNorm if use_scalenorm else nn.LayerNorm - norm_class = RMSNorm if use_rmsnorm else norm_class - norm_fn = partial(norm_class, dim) - - norm_fn = nn.Identity if use_rezero else norm_fn - branch_fn = Rezero if use_rezero else None - - if cross_attend and not only_cross: - default_block = ('a', 'c', 'f') - elif cross_attend and only_cross: - default_block = ('c', 'f') - else: - default_block = ('a', 'f') - - if macaron: - default_block = ('f',) + default_block - - if exists(custom_layers): - layer_types = custom_layers - elif exists(par_ratio): - par_depth = depth * len(default_block) - assert 1 < par_ratio <= par_depth, 'par ratio out of range' - default_block = tuple(filter(not_equals('f'), default_block)) - par_attn = par_depth // par_ratio - depth_cut = par_depth * 2 // 3 # 2 / 3 attention layer cutoff suggested by PAR paper - par_width = (depth_cut + depth_cut // par_attn) // par_attn - assert len(default_block) <= par_width, 'default block is too large for par_ratio' - par_block = default_block + ('f',) * (par_width - len(default_block)) - par_head = par_block * par_attn - layer_types = par_head + ('f',) * (par_depth - len(par_head)) - elif exists(sandwich_coef): - assert sandwich_coef > 0 and sandwich_coef <= depth, 'sandwich coefficient should be less than the depth' - layer_types = ('a',) * sandwich_coef + default_block * (depth - sandwich_coef) + ('f',) * sandwich_coef - else: - layer_types = default_block * depth - - self.layer_types = layer_types - self.num_attn_layers = len(list(filter(equals('a'), layer_types))) - - for layer_type in self.layer_types: - if layer_type == 'a': - layer = Attention(dim, heads=heads, causal=causal, **attn_kwargs) - elif layer_type == 'c': - layer = Attention(dim, heads=heads, **attn_kwargs) - elif layer_type == 'f': - layer = FeedForward(dim, **ff_kwargs) - layer = layer if not macaron else Scale(0.5, layer) - else: - raise Exception(f'invalid layer type {layer_type}') - - if isinstance(layer, Attention) and exists(branch_fn): - layer = branch_fn(layer) - - if gate_residual: - residual_fn = GRUGating(dim) - else: - residual_fn = Residual() - - self.layers.append(nn.ModuleList([ - norm_fn(), - layer, - residual_fn - ])) - - def forward( - self, - x, - context=None, - mask=None, - context_mask=None, - mems=None, - return_hiddens=False - ): - hiddens = [] - intermediates = [] - prev_attn = None - prev_cross_attn = None - - mems = mems.copy() if exists(mems) else [None] * self.num_attn_layers - - for ind, (layer_type, (norm, block, residual_fn)) in enumerate(zip(self.layer_types, self.layers)): - is_last = ind == (len(self.layers) - 1) - - if layer_type == 'a': - hiddens.append(x) - layer_mem = mems.pop(0) - - residual = x - - if self.pre_norm: - x = norm(x) - - if layer_type == 'a': - out, inter = block(x, mask=mask, sinusoidal_emb=self.pia_pos_emb, rel_pos=self.rel_pos, - prev_attn=prev_attn, mem=layer_mem) - elif layer_type == 'c': - out, inter = block(x, context=context, mask=mask, context_mask=context_mask, prev_attn=prev_cross_attn) - elif layer_type == 'f': - out = block(x) - - x = residual_fn(out, residual) - - if layer_type in ('a', 'c'): - intermediates.append(inter) - - if layer_type == 'a' and self.residual_attn: - prev_attn = inter.pre_softmax_attn - elif layer_type == 'c' and self.cross_residual_attn: - prev_cross_attn = inter.pre_softmax_attn - - if not self.pre_norm and not is_last: - x = norm(x) - - if return_hiddens: - intermediates = LayerIntermediates( - hiddens=hiddens, - attn_intermediates=intermediates - ) - - return x, intermediates - - return x - - -class Encoder(AttentionLayers): - def __init__(self, **kwargs): - assert 'causal' not in kwargs, 'cannot set causality on encoder' - super().__init__(causal=False, **kwargs) - - - -class TransformerWrapper(nn.Module): - def __init__( - self, - *, - num_tokens, - max_seq_len, - attn_layers, - emb_dim=None, - max_mem_len=0., - emb_dropout=0., - num_memory_tokens=None, - tie_embedding=False, - use_pos_emb=True - ): - super().__init__() - assert isinstance(attn_layers, AttentionLayers), 'attention layers must be one of Encoder or Decoder' - - dim = attn_layers.dim - emb_dim = default(emb_dim, dim) - - self.max_seq_len = max_seq_len - self.max_mem_len = max_mem_len - self.num_tokens = num_tokens - - self.token_emb = nn.Embedding(num_tokens, emb_dim) - self.pos_emb = AbsolutePositionalEmbedding(emb_dim, max_seq_len) if ( - use_pos_emb and not attn_layers.has_pos_emb) else always(0) - self.emb_dropout = nn.Dropout(emb_dropout) - - self.project_emb = nn.Linear(emb_dim, dim) if emb_dim != dim else nn.Identity() - self.attn_layers = attn_layers - self.norm = nn.LayerNorm(dim) - - self.init_() - - self.to_logits = nn.Linear(dim, num_tokens) if not tie_embedding else lambda t: t @ self.token_emb.weight.t() - - # memory tokens (like [cls]) from Memory Transformers paper - num_memory_tokens = default(num_memory_tokens, 0) - self.num_memory_tokens = num_memory_tokens - if num_memory_tokens > 0: - self.memory_tokens = nn.Parameter(torch.randn(num_memory_tokens, dim)) - - # let funnel encoder know number of memory tokens, if specified - if hasattr(attn_layers, 'num_memory_tokens'): - attn_layers.num_memory_tokens = num_memory_tokens - - def init_(self): - nn.init.normal_(self.token_emb.weight, std=0.02) - - def forward( - self, - x, - return_embeddings=False, - mask=None, - return_mems=False, - return_attn=False, - mems=None, - **kwargs - ): - b, n, device, num_mem = *x.shape, x.device, self.num_memory_tokens - x = self.token_emb(x) - x += self.pos_emb(x) - x = self.emb_dropout(x) - - x = self.project_emb(x) - - if num_mem > 0: - mem = repeat(self.memory_tokens, 'n d -> b n d', b=b) - x = torch.cat((mem, x), dim=1) - - # auto-handle masking after appending memory tokens - if exists(mask): - mask = F.pad(mask, (num_mem, 0), value=True) - - x, intermediates = self.attn_layers(x, mask=mask, mems=mems, return_hiddens=True, **kwargs) - x = self.norm(x) - - mem, x = x[:, :num_mem], x[:, num_mem:] - - out = self.to_logits(x) if not return_embeddings else x - - if return_mems: - hiddens = intermediates.hiddens - new_mems = list(map(lambda pair: torch.cat(pair, dim=-2), zip(mems, hiddens))) if exists(mems) else hiddens - new_mems = list(map(lambda t: t[..., -self.max_mem_len:, :].detach(), new_mems)) - return out, new_mems - - if return_attn: - attn_maps = list(map(lambda t: t.post_softmax_attn, intermediates.attn_intermediates)) - return out, attn_maps - - return out - -- cgit v1.2.1