| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
|
|
| import os |
| import time |
| import random |
| import argparse |
| import numpy as np |
|
|
| import torch |
| import torch.nn.functional as F |
| from torch.backends import cudnn |
| import torch.utils.tensorboard as tensorboard |
|
|
| from util import util |
| from util.plot import plot_batch |
|
|
| from models.projected_model import fsModel |
| from data.data_loader_Swapping import GetLoader |
|
|
| def str2bool(v): |
| return v.lower() in ('true') |
|
|
| class TrainOptions: |
| def __init__(self): |
| self.parser = argparse.ArgumentParser() |
| self.initialized = False |
| |
| def initialize(self): |
| self.parser.add_argument('--name', type=str, default='simswap', help='name of the experiment. It decides where to store samples and models') |
| self.parser.add_argument('--gpu_ids', default='0') |
| self.parser.add_argument('--checkpoints_dir', type=str, default='./checkpoints', help='models are saved here') |
| self.parser.add_argument('--isTrain', type=str2bool, default='True') |
|
|
| |
| self.parser.add_argument('--batchSize', type=int, default=4, help='input batch size') |
|
|
| |
| self.parser.add_argument('--use_tensorboard', type=str2bool, default='False') |
|
|
| |
| self.parser.add_argument('--dataset', type=str, default="/path/to/VGGFace2", help='path to the face swapping dataset') |
| self.parser.add_argument('--continue_train', type=str2bool, default='False', help='continue training: load the latest model') |
| self.parser.add_argument('--load_pretrain', type=str, default='./checkpoints/simswap224_test', help='load the pretrained model from the specified location') |
| self.parser.add_argument('--which_epoch', type=str, default='10000', help='which epoch to load? set to latest to use latest cached model') |
| self.parser.add_argument('--phase', type=str, default='train', help='train, val, test, etc') |
| self.parser.add_argument('--niter', type=int, default=10000, help='# of iter at starting learning rate') |
| self.parser.add_argument('--niter_decay', type=int, default=10000, help='# of iter to linearly decay learning rate to zero') |
| self.parser.add_argument('--beta1', type=float, default=0.0, help='momentum term of adam') |
| self.parser.add_argument('--lr', type=float, default=0.0004, help='initial learning rate for adam') |
| self.parser.add_argument('--Gdeep', type=str2bool, default='False') |
|
|
| |
| self.parser.add_argument('--lambda_feat', type=float, default=10.0, help='weight for feature matching loss') |
| self.parser.add_argument('--lambda_id', type=float, default=30.0, help='weight for id loss') |
| self.parser.add_argument('--lambda_rec', type=float, default=10.0, help='weight for reconstruction loss') |
|
|
| self.parser.add_argument("--Arc_path", type=str, default='arcface_model/arcface_checkpoint.tar', help="run ONNX model via TRT") |
| self.parser.add_argument("--total_step", type=int, default=1000000, help='total training step') |
| self.parser.add_argument("--log_frep", type=int, default=200, help='frequence for printing log information') |
| self.parser.add_argument("--sample_freq", type=int, default=1000, help='frequence for sampling') |
| self.parser.add_argument("--model_freq", type=int, default=10000, help='frequence for saving the model') |
|
|
| |
|
|
|
|
| self.isTrain = True |
| |
| def parse(self, save=True): |
| if not self.initialized: |
| self.initialize() |
| self.opt = self.parser.parse_args() |
| self.opt.isTrain = self.isTrain |
|
|
| args = vars(self.opt) |
|
|
| print('------------ Options -------------') |
| for k, v in sorted(args.items()): |
| print('%s: %s' % (str(k), str(v))) |
| print('-------------- End ----------------') |
|
|
| |
| if self.opt.isTrain: |
| expr_dir = os.path.join(self.opt.checkpoints_dir, self.opt.name) |
| util.mkdirs(expr_dir) |
| if save and not self.opt.continue_train: |
| file_name = os.path.join(expr_dir, 'opt.txt') |
| with open(file_name, 'wt') as opt_file: |
| opt_file.write('------------ Options -------------\n') |
| for k, v in sorted(args.items()): |
| opt_file.write('%s: %s\n' % (str(k), str(v))) |
| opt_file.write('-------------- End ----------------\n') |
| return self.opt |
|
|
|
|
| if __name__ == '__main__': |
|
|
| opt = TrainOptions().parse() |
| iter_path = os.path.join(opt.checkpoints_dir, opt.name, 'iter.txt') |
|
|
| sample_path = os.path.join(opt.checkpoints_dir, opt.name, 'samples') |
|
|
| if not os.path.exists(sample_path): |
| os.makedirs(sample_path) |
| |
| log_path = os.path.join(opt.checkpoints_dir, opt.name, 'summary') |
|
|
| if not os.path.exists(log_path): |
| os.makedirs(log_path) |
|
|
| if opt.continue_train: |
| try: |
| start_epoch, epoch_iter = np.loadtxt(iter_path , delimiter=',', dtype=int) |
| except: |
| start_epoch, epoch_iter = 1, 0 |
| print('Resuming from epoch %d at iteration %d' % (start_epoch, epoch_iter)) |
| else: |
| start_epoch, epoch_iter = 1, 0 |
|
|
| os.environ['CUDA_VISIBLE_DEVICES'] = str(opt.gpu_ids) |
| print("GPU used : ", str(opt.gpu_ids)) |
|
|
| |
| cudnn.benchmark = True |
|
|
| |
|
|
| model = fsModel() |
|
|
| model.initialize(opt) |
|
|
| |
| if opt.use_tensorboard: |
| tensorboard_writer = tensorboard.SummaryWriter(log_path) |
| logger = tensorboard_writer |
| |
| log_name = os.path.join(opt.checkpoints_dir, opt.name, 'loss_log.txt') |
|
|
| with open(log_name, "a") as log_file: |
| now = time.strftime("%c") |
| log_file.write('================ Training Loss (%s) ================\n' % now) |
|
|
| optimizer_G, optimizer_D = model.optimizer_G, model.optimizer_D |
|
|
| loss_avg = 0 |
| refresh_count = 0 |
| imagenet_std = torch.Tensor([0.229, 0.224, 0.225]).view(3,1,1) |
| imagenet_mean = torch.Tensor([0.485, 0.456, 0.406]).view(3,1,1) |
|
|
| train_loader = GetLoader(opt.dataset,opt.batchSize,8,1234) |
|
|
| randindex = [i for i in range(opt.batchSize)] |
| random.shuffle(randindex) |
|
|
| if not opt.continue_train: |
| start = 0 |
| else: |
| start = int(opt.which_epoch) |
| total_step = opt.total_step |
| import datetime |
| print("Start to train at %s"%(datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) |
| |
| from util.logo_class import logo_class |
| logo_class.print_start_training() |
| model.netD.feature_network.requires_grad_(False) |
|
|
| |
| for step in range(start, total_step): |
| model.netG.train() |
| for interval in range(2): |
| random.shuffle(randindex) |
| src_image1, src_image2 = train_loader.next() |
| |
| if step%2 == 0: |
| img_id = src_image2 |
| else: |
| img_id = src_image2[randindex] |
|
|
| img_id_112 = F.interpolate(img_id,size=(112,112), mode='bicubic') |
| latent_id = model.netArc(img_id_112) |
| latent_id = F.normalize(latent_id, p=2, dim=1) |
| if interval: |
| |
| img_fake = model.netG(src_image1, latent_id) |
| gen_logits,_ = model.netD(img_fake.detach(), None) |
| loss_Dgen = (F.relu(torch.ones_like(gen_logits) + gen_logits)).mean() |
|
|
| real_logits,_ = model.netD(src_image2,None) |
| loss_Dreal = (F.relu(torch.ones_like(real_logits) - real_logits)).mean() |
|
|
| loss_D = loss_Dgen + loss_Dreal |
| optimizer_D.zero_grad() |
| loss_D.backward() |
| optimizer_D.step() |
| else: |
| |
| |
| img_fake = model.netG(src_image1, latent_id) |
| |
| gen_logits,feat = model.netD(img_fake, None) |
| |
| loss_Gmain = (-gen_logits).mean() |
| img_fake_down = F.interpolate(img_fake, size=(112,112), mode='bicubic') |
| latent_fake = model.netArc(img_fake_down) |
| latent_fake = F.normalize(latent_fake, p=2, dim=1) |
| loss_G_ID = (1 - model.cosin_metric(latent_fake, latent_id)).mean() |
| real_feat = model.netD.get_feature(src_image1) |
| feat_match_loss = model.criterionFeat(feat["3"],real_feat["3"]) |
| loss_G = loss_Gmain + loss_G_ID * opt.lambda_id + feat_match_loss * opt.lambda_feat |
| |
|
|
| if step%2 == 0: |
| |
| loss_G_Rec = model.criterionRec(img_fake, src_image1) * opt.lambda_rec |
| loss_G += loss_G_Rec |
|
|
| optimizer_G.zero_grad() |
| loss_G.backward() |
| optimizer_G.step() |
| |
|
|
| |
| |
| |
| if (step + 1) % opt.log_frep == 0: |
| |
| errors = { |
| "G_Loss":loss_Gmain.item(), |
| "G_ID":loss_G_ID.item(), |
| "G_Rec":loss_G_Rec.item(), |
| "G_feat_match":feat_match_loss.item(), |
| "D_fake":loss_Dgen.item(), |
| "D_real":loss_Dreal.item(), |
| "D_loss":loss_D.item() |
| } |
| if opt.use_tensorboard: |
| for tag, value in errors.items(): |
| logger.add_scalar(tag, value, step) |
| message = '( step: %d, ) ' % (step) |
| for k, v in errors.items(): |
| message += '%s: %.3f ' % (k, v) |
|
|
| print(message) |
| with open(log_name, "a") as log_file: |
| log_file.write('%s\n' % message) |
|
|
| |
| if (step + 1) % opt.sample_freq == 0: |
| model.netG.eval() |
| with torch.no_grad(): |
| imgs = list() |
| zero_img = (torch.zeros_like(src_image1[0,...])) |
| imgs.append(zero_img.cpu().numpy()) |
| save_img = ((src_image1.cpu())* imagenet_std + imagenet_mean).numpy() |
| for r in range(opt.batchSize): |
| imgs.append(save_img[r,...]) |
| arcface_112 = F.interpolate(src_image2,size=(112,112), mode='bicubic') |
| id_vector_src1 = model.netArc(arcface_112) |
| id_vector_src1 = F.normalize(id_vector_src1, p=2, dim=1) |
|
|
| for i in range(opt.batchSize): |
| |
| imgs.append(save_img[i,...]) |
| image_infer = src_image1[i, ...].repeat(opt.batchSize, 1, 1, 1) |
| img_fake = model.netG(image_infer, id_vector_src1).cpu() |
| |
| img_fake = img_fake * imagenet_std |
| img_fake = img_fake + imagenet_mean |
| img_fake = img_fake.numpy() |
| for j in range(opt.batchSize): |
| imgs.append(img_fake[j,...]) |
| print("Save test data") |
| imgs = np.stack(imgs, axis = 0).transpose(0,2,3,1) |
| plot_batch(imgs, os.path.join(sample_path, 'step_'+str(step+1)+'.jpg')) |
|
|
| |
| if (step+1) % opt.model_freq==0: |
| print('saving the latest model (steps %d)' % (step+1)) |
| model.save(step+1) |
| np.savetxt(iter_path, (step+1, total_step), delimiter=',', fmt='%d') |
| wandb.finish() |