mirror of
https://github.com/aladdinpersson/Machine-Learning-Collection.git
synced 2026-04-10 12:33:44 +00:00
update readmes, added pix2pix
This commit is contained in:
205
ML/Pytorch/GANs/ProGAN/model.py
Normal file
205
ML/Pytorch/GANs/ProGAN/model.py
Normal file
@@ -0,0 +1,205 @@
|
||||
"""
|
||||
Implementation of ProGAN generator and discriminator with the key
|
||||
attributions from the paper. We have tried to make the implementation
|
||||
compact but a goal is also to keep it readable and understandable.
|
||||
Specifically the key points implemented are:
|
||||
|
||||
1) Progressive growing (of model and layers)
|
||||
2) Minibatch std on Discriminator
|
||||
3) Normalization with PixelNorm
|
||||
4) Equalized Learning Rate (here I cheated and only did it on Conv layers)
|
||||
"""
|
||||
|
||||
import torch
|
||||
import torch.nn as nn
|
||||
import torch.nn.functional as F
|
||||
from math import log2
|
||||
|
||||
"""
|
||||
Factors is used in Discrmininator and Generator for how much
|
||||
the channels should be multiplied and expanded for each layer,
|
||||
so specifically the first 5 layers the channels stay the same,
|
||||
whereas when we increase the img_size (towards the later layers)
|
||||
we decrease the number of chanels by 1/2, 1/4, etc.
|
||||
"""
|
||||
factors = [1, 1, 1, 1, 1/2, 1/4, 1/4, 1/8, 1/16]
|
||||
|
||||
|
||||
class WSConv2d(nn.Module):
|
||||
"""
|
||||
Weight scaled Conv2d (Equalized Learning Rate)
|
||||
Note that input is multiplied rather than changing weights
|
||||
this will have the same result.
|
||||
|
||||
Inspired by:
|
||||
https://github.com/nvnbny/progressive_growing_of_gans/blob/master/modelUtils.py
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self, in_channels, out_channels, kernel_size=3, stride=1, padding=1, gain=2
|
||||
):
|
||||
super(WSConv2d, self).__init__()
|
||||
self.conv = nn.Conv2d(
|
||||
in_channels, out_channels, kernel_size, stride, padding
|
||||
)
|
||||
self.scale = (gain / (self.conv.weight[0].numel())) ** 0.5
|
||||
|
||||
# initialize conv layer
|
||||
nn.init.normal_(self.conv.weight)
|
||||
nn.init.zeros_(self.conv.bias)
|
||||
|
||||
def forward(self, x):
|
||||
return self.conv(x * self.scale)
|
||||
|
||||
|
||||
class PixelNorm(nn.Module):
|
||||
def __init__(self):
|
||||
super(PixelNorm, self).__init__()
|
||||
self.epsilon = 1e-8
|
||||
|
||||
def forward(self, x):
|
||||
return x / torch.sqrt(
|
||||
torch.mean(x ** 2, dim=1, keepdim=True) + self.epsilon
|
||||
)
|
||||
|
||||
|
||||
class ConvBlock(nn.Module):
|
||||
def __init__(self, in_channels, out_channels, use_pixelnorm=True):
|
||||
super(ConvBlock, self).__init__()
|
||||
self.use_pn = use_pixelnorm
|
||||
self.conv1 = WSConv2d(in_channels, out_channels)
|
||||
self.conv2 = WSConv2d(out_channels, out_channels)
|
||||
self.leaky = nn.LeakyReLU(0.2)
|
||||
self.pn = PixelNorm()
|
||||
|
||||
def forward(self, x):
|
||||
x = self.leaky(self.conv1(x))
|
||||
x = self.pn(x) if self.use_pn else x
|
||||
x = self.leaky(self.conv2(x))
|
||||
x = self.pn(x) if self.use_pn else x
|
||||
return x
|
||||
|
||||
|
||||
class Generator(nn.Module):
|
||||
def __init__(self, z_dim, in_channels, img_size, img_channels=3):
|
||||
super(Generator, self).__init__()
|
||||
self.prog_blocks, self.rgb_layers = nn.ModuleList([]), nn.ModuleList([])
|
||||
|
||||
# initial takes 1x1 -> 4x4
|
||||
self.initial = nn.Sequential(
|
||||
nn.ConvTranspose2d(z_dim, in_channels, 4, 1, 0),
|
||||
nn.LeakyReLU(0.2),
|
||||
PixelNorm(),
|
||||
)
|
||||
|
||||
# Create progression blocks and rgb layers
|
||||
channels = in_channels
|
||||
|
||||
# we need to double img for log2(img_size/4) and
|
||||
# +1 in loop for initial 4x4
|
||||
for idx in range(int(log2(img_size/4)) + 1):
|
||||
conv_in = channels
|
||||
conv_out = int(in_channels*factors[idx])
|
||||
self.prog_blocks.append(ConvBlock(conv_in, conv_out))
|
||||
self.rgb_layers.append(WSConv2d(conv_out, img_channels, kernel_size=1, stride=1, padding=0))
|
||||
channels = conv_out
|
||||
|
||||
def fade_in(self, alpha, upscaled, generated):
|
||||
#assert 0 <= alpha <= 1, "Alpha not between 0 and 1"
|
||||
#assert upscaled.shape == generated.shape
|
||||
return torch.tanh(alpha * generated + (1 - alpha) * upscaled)
|
||||
|
||||
def forward(self, x, alpha, steps):
|
||||
upscaled = self.initial(x)
|
||||
out = self.prog_blocks[0](upscaled)
|
||||
|
||||
if steps == 0:
|
||||
return self.rgb_layers[0](out)
|
||||
|
||||
for step in range(1, steps+1):
|
||||
upscaled = F.interpolate(out, scale_factor=2, mode="nearest")
|
||||
out = self.prog_blocks[step](upscaled)
|
||||
|
||||
# The number of channels in upscale will stay the same, while
|
||||
# out which has moved through prog_blocks might change. To ensure
|
||||
# we can convert both to rgb we use different rgb_layers
|
||||
# (steps-1) and steps for upscaled, out respectively
|
||||
final_upscaled = self.rgb_layers[steps - 1](upscaled)
|
||||
final_out = self.rgb_layers[steps](out)
|
||||
return self.fade_in(alpha, final_upscaled, final_out)
|
||||
|
||||
|
||||
class Discriminator(nn.Module):
|
||||
def __init__(self, img_size, z_dim, in_channels, img_channels=3):
|
||||
super(Discriminator, self).__init__()
|
||||
self.prog_blocks, self.rgb_layers = nn.ModuleList([]), nn.ModuleList([])
|
||||
|
||||
# Create progression blocks and rgb layers
|
||||
channels = in_channels
|
||||
for idx in range(int(log2(img_size/4)) + 1):
|
||||
conv_in = int(in_channels * factors[idx])
|
||||
conv_out = channels
|
||||
self.rgb_layers.append(WSConv2d(img_channels, conv_in, kernel_size=1, stride=1, padding=0))
|
||||
self.prog_blocks.append(ConvBlock(conv_in, conv_out, use_pixelnorm=False))
|
||||
channels = conv_in
|
||||
|
||||
self.avg_pool = nn.AvgPool2d(kernel_size=2, stride=2)
|
||||
# +1 to in_channels because we concatenate from minibatch std
|
||||
self.conv = WSConv2d(in_channels + 1, z_dim, kernel_size=4, stride=1, padding=0)
|
||||
self.linear = nn.Linear(z_dim, 1)
|
||||
|
||||
def fade_in(self, alpha, downscaled, out):
|
||||
"""Used to fade in downscaled using avgpooling and output from CNN"""
|
||||
#assert 0 <= alpha <= 1, "Alpha needs to be between [0, 1]"
|
||||
#assert downscaled.shape == out.shape
|
||||
return alpha * out + (1 - alpha) * downscaled
|
||||
|
||||
def minibatch_std(self, x):
|
||||
batch_statistics = (
|
||||
torch.std(x, dim=0)
|
||||
.mean()
|
||||
.repeat(x.shape[0], 1, x.shape[2], x.shape[3])
|
||||
)
|
||||
return torch.cat([x, batch_statistics], dim=1)
|
||||
|
||||
def forward(self, x, alpha, steps):
|
||||
out = self.rgb_layers[steps](x) # convert from rgb as initial step
|
||||
|
||||
if steps == 0: # i.e, image is 4x4
|
||||
out = self.minibatch_std(out)
|
||||
out = self.conv(out)
|
||||
return self.linear(out.view(-1, out.shape[1]))
|
||||
|
||||
# index steps which has the "reverse" fade_in
|
||||
downscaled = self.rgb_layers[steps - 1](self.avg_pool(x))
|
||||
out = self.avg_pool(self.prog_blocks[steps](out))
|
||||
out = self.fade_in(alpha, downscaled, out)
|
||||
|
||||
for step in range(steps - 1, 0, -1):
|
||||
downscaled = self.avg_pool(out)
|
||||
out = self.prog_blocks[step](downscaled)
|
||||
|
||||
out = self.minibatch_std(out)
|
||||
out = self.conv(out)
|
||||
return self.linear(out.view(-1, out.shape[1]))
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
import time
|
||||
Z_DIM = 100
|
||||
IN_CHANNELS = 16
|
||||
img_size = 512
|
||||
num_steps = int(log2(img_size / 4))
|
||||
x = torch.randn((5, Z_DIM, 1, 1))
|
||||
gen = Generator(Z_DIM, IN_CHANNELS, img_size=img_size)
|
||||
disc = Discriminator(img_size, Z_DIM, IN_CHANNELS)
|
||||
start = time.time()
|
||||
with torch.autograd.profiler.profile(use_cuda=True) as prof:
|
||||
z = gen(x, alpha=0.5, steps=num_steps)
|
||||
print(prof)
|
||||
gen_time = time.time()-start
|
||||
t = time.time()
|
||||
out = disc(z, 0.01, num_steps)
|
||||
disc_time = time.time()-t
|
||||
print(gen_time, disc_time)
|
||||
#print(disc(z, 0.01, num_steps).shape)
|
||||
5
ML/Pytorch/GANs/ProGAN/test.py
Normal file
5
ML/Pytorch/GANs/ProGAN/test.py
Normal file
@@ -0,0 +1,5 @@
|
||||
def func(x=1, y=2, **kwargs):
|
||||
print(x, y)
|
||||
|
||||
|
||||
print(func(x=3, y=4))
|
||||
165
ML/Pytorch/GANs/ProGAN/train.py
Normal file
165
ML/Pytorch/GANs/ProGAN/train.py
Normal file
@@ -0,0 +1,165 @@
|
||||
""" Training of ProGAN using WGAN-GP loss"""
|
||||
|
||||
import torch
|
||||
import torch.nn as nn
|
||||
import torch.optim as optim
|
||||
import torchvision
|
||||
import torchvision.datasets as datasets
|
||||
import torchvision.transforms as transforms
|
||||
from torch.utils.data import DataLoader
|
||||
from torch.utils.tensorboard import SummaryWriter
|
||||
from utils import gradient_penalty, plot_to_tensorboard, save_checkpoint, load_checkpoint
|
||||
from model import Discriminator, Generator
|
||||
from math import log2
|
||||
from tqdm import tqdm
|
||||
import time
|
||||
|
||||
torch.backends.cudnn.benchmarks = True
|
||||
torch.manual_seed(0)
|
||||
|
||||
# Hyperparameters etc.
|
||||
device = "cuda" if torch.cuda.is_available() else "cpu"
|
||||
LEARNING_RATE = 1e-4
|
||||
BATCH_SIZES = [128, 128, 64, 16, 8, 4, 2, 2, 1]
|
||||
IMAGE_SIZE = 128
|
||||
CHANNELS_IMG = 3
|
||||
Z_DIM = 128
|
||||
IN_CHANNELS = 128
|
||||
CRITIC_ITERATIONS = 1
|
||||
LAMBDA_GP = 10
|
||||
NUM_STEPS = int(log2(IMAGE_SIZE / 4)) + 1
|
||||
PROGRESSIVE_EPOCHS = [2 ** i for i in range(int(log2(IMAGE_SIZE / 4) + 1))]
|
||||
PROGRESSIVE_EPOCHS = [8 for i in range(int(log2(IMAGE_SIZE / 4) + 1))]
|
||||
fixed_noise = torch.randn(8, Z_DIM, 1, 1).to(device)
|
||||
NUM_WORKERS = 4
|
||||
|
||||
def get_loader(image_size):
|
||||
transform = transforms.Compose(
|
||||
[
|
||||
transforms.Resize((image_size, image_size)),
|
||||
transforms.ToTensor(),
|
||||
transforms.Normalize(
|
||||
[0.5 for _ in range(CHANNELS_IMG)],
|
||||
[0.5 for _ in range(CHANNELS_IMG)],
|
||||
),
|
||||
]
|
||||
)
|
||||
batch_size = BATCH_SIZES[int(log2(image_size/4))]
|
||||
dataset = datasets.ImageFolder(root="celeb_dataset", transform=transform)
|
||||
loader = DataLoader(dataset, batch_size=batch_size, shuffle=True, num_workers=NUM_WORKERS, pin_memory=True)
|
||||
return loader, dataset
|
||||
|
||||
def train_fn(
|
||||
critic,
|
||||
gen,
|
||||
loader,
|
||||
dataset,
|
||||
step,
|
||||
alpha,
|
||||
opt_critic,
|
||||
opt_gen,
|
||||
tensorboard_step,
|
||||
writer,
|
||||
):
|
||||
start = time.time()
|
||||
total_time = 0
|
||||
training = tqdm(loader, leave=True)
|
||||
for batch_idx, (real, _) in enumerate(training):
|
||||
real = real.to(device)
|
||||
cur_batch_size = real.shape[0]
|
||||
model_start = time.time()
|
||||
|
||||
# Train Critic: max E[critic(real)] - E[critic(fake)]
|
||||
# which is equivalent to minimizing the negative of the expression
|
||||
for _ in range(CRITIC_ITERATIONS):
|
||||
critic.zero_grad()
|
||||
noise = torch.randn(cur_batch_size, Z_DIM, 1, 1).to(device)
|
||||
fake = gen(noise, alpha, step)
|
||||
critic_real = critic(real, alpha, step).reshape(-1)
|
||||
critic_fake = critic(fake, alpha, step).reshape(-1)
|
||||
gp = gradient_penalty(critic, real, fake, alpha, step, device=device)
|
||||
loss_critic = (
|
||||
-(torch.mean(critic_real) - torch.mean(critic_fake))
|
||||
+ LAMBDA_GP * gp
|
||||
)
|
||||
loss_critic.backward(retain_graph=True)
|
||||
opt_critic.step()
|
||||
|
||||
# Train Generator: max E[critic(gen_fake)] <-> min -E[critic(gen_fake)]
|
||||
gen.zero_grad()
|
||||
fake = gen(noise, alpha, step)
|
||||
gen_fake = critic(fake, alpha, step).reshape(-1)
|
||||
loss_gen = -torch.mean(gen_fake)
|
||||
loss_gen.backward()
|
||||
opt_gen.step()
|
||||
|
||||
# Update alpha and ensure less than 1
|
||||
alpha += cur_batch_size / (
|
||||
(PROGRESSIVE_EPOCHS[step]*0.5) * len(dataset) # - step
|
||||
)
|
||||
alpha = min(alpha, 1)
|
||||
total_time += time.time()-model_start
|
||||
|
||||
if batch_idx % 300 == 0:
|
||||
with torch.no_grad():
|
||||
fixed_fakes = gen(fixed_noise, alpha, step)
|
||||
plot_to_tensorboard(
|
||||
writer, loss_critic, loss_gen, real, fixed_fakes, tensorboard_step
|
||||
)
|
||||
tensorboard_step += 1
|
||||
|
||||
print(f'Fraction spent on model training: {total_time/(time.time()-start)}')
|
||||
return tensorboard_step, alpha
|
||||
|
||||
|
||||
def main():
|
||||
# initialize gen and disc, note: discriminator should be called critic,
|
||||
# according to WGAN paper (since it no longer outputs between [0, 1])
|
||||
gen = Generator(Z_DIM, IN_CHANNELS, img_size=IMAGE_SIZE, img_channels=CHANNELS_IMG).to(device)
|
||||
critic = Discriminator(IMAGE_SIZE, Z_DIM, IN_CHANNELS, img_channels=CHANNELS_IMG).to(device)
|
||||
|
||||
# initializate optimizer
|
||||
opt_gen = optim.Adam(gen.parameters(), lr=LEARNING_RATE, betas=(0.0, 0.99))
|
||||
opt_critic = optim.Adam(critic.parameters(), lr=LEARNING_RATE, betas=(0.0, 0.99))
|
||||
|
||||
# for tensorboard plotting
|
||||
writer = SummaryWriter(f"logs/gan")
|
||||
|
||||
load_checkpoint(torch.load("celeba_wgan_gp.pth.tar"), gen, critic)
|
||||
gen.train()
|
||||
critic.train()
|
||||
|
||||
tensorboard_step = 0
|
||||
for step, num_epochs in enumerate(PROGRESSIVE_EPOCHS):
|
||||
alpha = 0.01
|
||||
if step < 3:
|
||||
continue
|
||||
|
||||
if step == 4:
|
||||
print(f"Img size is: {4*2**step}")
|
||||
|
||||
loader, dataset = get_loader(4 * 2 ** step)
|
||||
for epoch in range(num_epochs):
|
||||
print(f"Epoch [{epoch+1}/{num_epochs}]")
|
||||
tensorboard_step, alpha = train_fn(
|
||||
critic,
|
||||
gen,
|
||||
loader,
|
||||
dataset,
|
||||
step,
|
||||
alpha,
|
||||
opt_critic,
|
||||
opt_gen,
|
||||
tensorboard_step,
|
||||
writer,
|
||||
)
|
||||
|
||||
checkpoint = {'gen': gen.state_dict(),
|
||||
'critic': critic.state_dict(),
|
||||
'opt_gen': opt_gen.state_dict(),
|
||||
'opt_critic': opt_critic.state_dict()}
|
||||
|
||||
save_checkpoint(checkpoint)
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
54
ML/Pytorch/GANs/ProGAN/utils.py
Normal file
54
ML/Pytorch/GANs/ProGAN/utils.py
Normal file
@@ -0,0 +1,54 @@
|
||||
import torch
|
||||
import torchvision
|
||||
import torch.nn as nn
|
||||
|
||||
# Print losses occasionally and print to tensorboard
|
||||
def plot_to_tensorboard(
|
||||
writer, loss_critic, loss_gen, real, fake, tensorboard_step
|
||||
):
|
||||
writer.add_scalar("Loss Critic", loss_critic, global_step=tensorboard_step)
|
||||
|
||||
with torch.no_grad():
|
||||
# take out (up to) 32 examples
|
||||
img_grid_real = torchvision.utils.make_grid(real[:8], normalize=True)
|
||||
img_grid_fake = torchvision.utils.make_grid(fake[:8], normalize=True)
|
||||
writer.add_image("Real", img_grid_real, global_step=tensorboard_step)
|
||||
writer.add_image("Fake", img_grid_fake, global_step=tensorboard_step)
|
||||
|
||||
|
||||
def gradient_penalty(critic, real, fake, alpha, train_step, device="cpu"):
|
||||
BATCH_SIZE, C, H, W = real.shape
|
||||
beta = torch.rand((BATCH_SIZE, 1, 1, 1)).repeat(1, C, H, W).to(device)
|
||||
interpolated_images = real * beta + fake * (1 - beta)
|
||||
|
||||
# Calculate critic scores
|
||||
mixed_scores = critic(interpolated_images, alpha, train_step)
|
||||
|
||||
# Take the gradient of the scores with respect to the images
|
||||
gradient = torch.autograd.grad(
|
||||
inputs=interpolated_images,
|
||||
outputs=mixed_scores,
|
||||
grad_outputs=torch.ones_like(mixed_scores),
|
||||
create_graph=True,
|
||||
retain_graph=True,
|
||||
)[0]
|
||||
gradient = gradient.view(gradient.shape[0], -1)
|
||||
gradient_norm = gradient.norm(2, dim=1)
|
||||
gradient_penalty = torch.mean((gradient_norm - 1) ** 2)
|
||||
return gradient_penalty
|
||||
|
||||
|
||||
def save_checkpoint(state, filename="celeba_wgan_gp.pth.tar"):
|
||||
print("=> Saving checkpoint")
|
||||
torch.save(state, filename)
|
||||
|
||||
def load_checkpoint(checkpoint, gen, disc, opt_gen=None, opt_disc=None):
|
||||
print("=> Loading checkpoint")
|
||||
gen.load_state_dict(checkpoint['gen'])
|
||||
disc.load_state_dict(checkpoint['critic'])
|
||||
|
||||
if opt_gen != None and opt_disc != None:
|
||||
opt_gen.load_state_dict(checkpoint['opt_gen'])
|
||||
opt_disc.load_state_dict(checkpoint['opt_critic'])
|
||||
|
||||
|
||||
Reference in New Issue
Block a user