目录
Unet++网络
Dense connection
deep supervision
模型复现
Unet++
数据集准备
模型训练
训练结果
Unet++:《UNet++: A Nested U-Net Architecture for Medical Image Segmentation》
作者对Unet和Unet++的理解:研习U-Net
延续前文:语义分割系列2-Unet(pytorch实现)
本文将介绍Unet++网络,在pytorch框架上复现Unet++,并在Camvid数据集上进行训练。
Unet++网络
Dense connection
Unet++继承了Unet的结构,同时又借鉴了DenseNet的稠密连接方式(图1中各种分支)。
作者通过各层之间的稠密连接,互相连接起来,就像Denset那样,前前后后每一个模块互相作用,每一个模块都能看到彼此,那对彼此互相熟悉,分割效果自然就会变好。
在实际分割中,一次次的下采样自然会丢掉一些细节特征,在Unet中是使用skip connection来恢复这些细节,但能否做的更好呢?Unet++就给出了答案,这种稠密连接的方式,每一层都尽量多的保存这种细节信息和全局信息,一层层之间架起桥梁互相沟通,最后共享给最后一层,实现全局信息和局部信息的保留和重构。
deep supervision
当然,简单的将各个模块连接起来是会实现很好的效果。而我们又能发现,一个Unet++其实是很多个不同深度的Unet++叠加。那么,每一个深度的Unet++是不是就都可以输出一个loss?答案自然是可以的。
所以,作者提出了deep supervision,也就是监督每一个深度的Unet++的输出,通过一定的方式来叠加Loss(比如加权的方式),这样就得到了一个经由1、2、3、4层的Unet++的加权Loss(图2 不同深度Unet++融合)。
那么,deep supervision又有什么用呢?-剪枝
既然Unet++由多个不同深度的Unet++叠加起来,那么随意去掉一层,前向传播的梯度不会受到任何变化,但你发现Unet++第三个输出的效果和第四个输出效果差不多时,那就可以毫不犹豫删去4层深度的Unet++。比如,直接删去图3中棕色部分,就可以实现剪枝。这样,就得到了更加轻量化的网络。
模型复现
Unet++
为了更直观一些,我把代码中的所有符号都和网络结构中对应上了。
- import torch
- import torch.nn as nn
- class ContinusParalleConv(nn.Module):
- # 一个连续的卷积模块,包含BatchNorm 在前 和 在后 两种模式
- def __init__(self, in_channels, out_channels, pre_Batch_Norm = True):
- super(ContinusParalleConv, self).__init__()
- self.in_channels = in_channels
- self.out_channels = out_channels
-
- if pre_Batch_Norm:
- self.Conv_forward = nn.Sequential(
- nn.BatchNorm2d(self.in_channels),
- nn.ReLU(),
- nn.Conv2d(self.in_channels, self.out_channels, 3, padding=1),
- nn.BatchNorm2d(out_channels),
- nn.ReLU(),
- nn.Conv2d(self.out_channels, self.out_channels, 3, padding=1))
-
- else:
- self.Conv_forward = nn.Sequential(
- nn.Conv2d(self.in_channels, self.out_channels, 3, padding=1),
- nn.BatchNorm2d(out_channels),
- nn.ReLU(),
- nn.Conv2d(self.out_channels, self.out_channels, 3, padding=1),
- nn.BatchNorm2d(self.out_channels),
- nn.ReLU())
-
- def forward(self, x):
- x = self.Conv_forward(x)
- return x
-
- class UnetPlusPlus(nn.Module):
- def __init__(self, num_classes, deep_supervision=False):
- super(UnetPlusPlus, self).__init__()
- self.num_classes = num_classes
- self.deep_supervision = deep_supervision
- self.filters = [64, 128, 256, 512, 1024]
-
- self.CONV3_1 = ContinusParalleConv(512*2, 512, pre_Batch_Norm = True)
-
- self.CONV2_2 = ContinusParalleConv(256*3, 256, pre_Batch_Norm = True)
- self.CONV2_1 = ContinusParalleConv(256*2, 256, pre_Batch_Norm = True)
-
- self.CONV1_1 = ContinusParalleConv(128*2, 128, pre_Batch_Norm = True)
- self.CONV1_2 = ContinusParalleConv(128*3, 128, pre_Batch_Norm = True)
- self.CONV1_3 = ContinusParalleConv(128*4, 128, pre_Batch_Norm = True)
-
- self.CONV0_1 = ContinusParalleConv(64*2, 64, pre_Batch_Norm = True)
- self.CONV0_2 = ContinusParalleConv(64*3, 64, pre_Batch_Norm = True)
- self.CONV0_3 = ContinusParalleConv(64*4, 64, pre_Batch_Norm = True)
- self.CONV0_4 = ContinusParalleConv(64*5, 64, pre_Batch_Norm = True)
-
-
- self.stage_0 = ContinusParalleConv(3, 64, pre_Batch_Norm = False)
- self.stage_1 = ContinusParalleConv(64, 128, pre_Batch_Norm = False)
- self.stage_2 = ContinusParalleConv(128, 256, pre_Batch_Norm = False)
- self.stage_3 = ContinusParalleConv(256, 512, pre_Batch_Norm = False)
- self.stage_4 = ContinusParalleConv(512, 1024, pre_Batch_Norm = False)
-
- self.pool = nn.MaxPool2d(2)
-
- self.upsample_3_1 = nn.ConvTranspose2d(in_channels=1024, out_channels=512, kernel_size=4, stride=2, padding=1)
-
- self.upsample_2_1 = nn.ConvTranspose2d(in_channels=512, out_channels=256, kernel_size=4, stride=2, padding=1)
- self.upsample_2_2 = nn.ConvTranspose2d(in_channels=512, out_channels=256, kernel_size=4, stride=2, padding=1)
-
- self.upsample_1_1 = nn.ConvTranspose2d(in_channels=256, out_channels=128, kernel_size=4, stride=2, padding=1)
- self.upsample_1_2 = nn.ConvTranspose2d(in_channels=256, out_channels=128, kernel_size=4, stride=2, padding=1)
- self.upsample_1_3 = nn.ConvTranspose2d(in_channels=256, out_channels=128, kernel_size=4, stride=2, padding=1)
-
- self.upsample_0_1 = nn.ConvTranspose2d(in_channels=128, out_channels=64, kernel_size=4, stride=2, padding=1)
- self.upsample_0_2 = nn.ConvTranspose2d(in_channels=128, out_channels=64, kernel_size=4, stride=2, padding=1)
- self.upsample_0_3 = nn.ConvTranspose2d(in_channels=128, out_channels=64, kernel_size=4, stride=2, padding=1)
- self.upsample_0_4 = nn.ConvTranspose2d(in_channels=128, out_channels=64, kernel_size=4, stride=2, padding=1)
-
-
- # 分割头
- self.final_super_0_1 = nn.Sequential(
- nn.BatchNorm2d(64),
- nn.ReLU(),
- nn.Conv2d(64, self.num_classes, 3, padding=1),
- )
- self.final_super_0_2 = nn.Sequential(
- nn.BatchNorm2d(64),
- nn.ReLU(),
- nn.Conv2d(64, self.num_classes, 3, padding=1),
- )
- self.final_super_0_3 = nn.Sequential(
- nn.BatchNorm2d(64),
- nn.ReLU(),
- nn.Conv2d(64, self.num_classes, 3, padding=1),
- )
- self.final_super_0_4 = nn.Sequential(
- nn.BatchNorm2d(64),
- nn.ReLU(),
- nn.Conv2d(64, self.num_classes, 3, padding=1),
- )
-
-
- def forward(self, x):
- x_0_0 = self.stage_0(x)
- x_1_0 = self.stage_1(self.pool(x_0_0))
- x_2_0 = self.stage_2(self.pool(x_1_0))
- x_3_0 = self.stage_3(self.pool(x_2_0))
- x_4_0 = self.stage_4(self.pool(x_3_0))
-
- x_0_1 = torch.cat([self.upsample_0_1(x_1_0) , x_0_0], 1)
- x_0_1 = self.CONV0_1(x_0_1)
-
- x_1_1 = torch.cat([self.upsample_1_1(x_2_0), x_1_0], 1)
- x_1_1 = self.CONV1_1(x_1_1)
-
- x_2_1 = torch.cat([self.upsample_2_1(x_3_0), x_2_0], 1)
- x_2_1 = self.CONV2_1(x_2_1)
-
- x_3_1 = torch.cat([self.upsample_3_1(x_4_0), x_3_0], 1)
- x_3_1 = self.CONV3_1(x_3_1)
-
- x_2_2 = torch.cat([self.upsample_2_2(x_3_1), x_2_0, x_2_1], 1)
- x_2_2 = self.CONV2_2(x_2_2)
-
- x_1_2 = torch.cat([self.upsample_1_2(x_2_1), x_1_0, x_1_1], 1)
- x_1_2 = self.CONV1_2(x_1_2)
-
- x_1_3 = torch.cat([self.upsample_1_3(x_2_2), x_1_0, x_1_1, x_1_2], 1)
- x_1_3 = self.CONV1_3(x_1_3)
-
- x_0_2 = torch.cat([self.upsample_0_2(x_1_1), x_0_0, x_0_1], 1)
- x_0_2 = self.CONV0_2(x_0_2)
-
- x_0_3 = torch.cat([self.upsample_0_3(x_1_2), x_0_0, x_0_1, x_0_2], 1)
- x_0_3 = self.CONV0_3(x_0_3)
-
- x_0_4 = torch.cat([self.upsample_0_4(x_1_3), x_0_0, x_0_1, x_0_2, x_0_3], 1)
- x_0_4 = self.CONV0_4(x_0_4)
-
-
- if self.deep_supervision:
- out_put1 = self.final_super_0_1(x_0_1)
- out_put2 = self.final_super_0_2(x_0_2)
- out_put3 = self.final_super_0_3(x_0_3)
- out_put4 = self.final_super_0_4(x_0_4)
- return [out_put1, out_put2, out_put3, out_put4]
- else:
- return self.final_super_0_4(x_0_4)
-
-
- if __name__ == "__main__":
- print("deep_supervision: False")
- deep_supervision = False
- device = torch.device('cpu')
- inputs = torch.randn((1, 3, 224, 224)).to(device)
- model = UnetPlusPlus(num_classes=3, deep_supervision=deep_supervision).to(device)
- outputs = model(inputs)
- print(outputs.shape)
-
- print("deep_supervision: True")
- deep_supervision = True
- model = UnetPlusPlus(num_classes=3, deep_supervision=deep_supervision).to(device)
- outputs = model(inputs)
- for out in outputs:
- print(out.shape)
-
-
测试结果如下
数据集准备
数据集使用Camvid数据集,可在CamVid数据集的创建和使用-pytorch中参考构建方法。
- # 导入库
- import os
- os.environ['CUDA_VISIBLE_DEVICES'] = '0'
-
- import torch
- import torch.nn as nn
- import torch.optim as optim
- import torch.nn.functional as F
- from torch import optim
- from torch.utils.data import Dataset, DataLoader, random_split
- from tqdm import tqdm
- import warnings
- warnings.filterwarnings("ignore")
- import os.path as osp
- import matplotlib.pyplot as plt
- from PIL import Image
- import numpy as np
- import albumentations as A
- from albumentations.pytorch.transforms import ToTensorV2
-
-
- torch.manual_seed(17)
- # 自定义数据集CamVidDataset
- class CamVidDataset(torch.utils.data.Dataset):
- """CamVid Dataset. Read images, apply augmentation and preprocessing transformations.
-
- Args:
- images_dir (str): path to images folder
- masks_dir (str): path to segmentation masks folder
- class_values (list): values of classes to extract from segmentation mask
- augmentation (albumentations.Compose): data transfromation pipeline
- (e.g. flip, scale, etc.)
- preprocessing (albumentations.Compose): data preprocessing
- (e.g. noralization, shape manipulation, etc.)
- """
-
- def __init__(self, images_dir, masks_dir):
- self.transform = A.Compose([
- A.Resize(224, 224),
- A.HorizontalFlip(),
- A.VerticalFlip(),
- A.Normalize(),
- ToTensorV2(),
- ])
- self.ids = os.listdir(images_dir)
- self.images_fps = [os.path.join(images_dir, image_id) for image_id in self.ids]
- self.masks_fps = [os.path.join(masks_dir, image_id) for image_id in self.ids]
-
-
- def __getitem__(self, i):
- # read data
- image = np.array(Image.open(self.images_fps[i]).convert('RGB'))
- mask = np.array( Image.open(self.masks_fps[i]).convert('RGB'))
- image = self.transform(image=image,mask=mask)
-
- return image['image'], image['mask'][:,:,0]
-
- def __len__(self):
- return len(self.ids)
-
-
- # 设置数据集路径
- DATA_DIR = r'dataset\camvid' # 根据自己的路径来设置
- x_train_dir = os.path.join(DATA_DIR, 'train_images')
- y_train_dir = os.path.join(DATA_DIR, 'train_labels')
- x_valid_dir = os.path.join(DATA_DIR, 'valid_images')
- y_valid_dir = os.path.join(DATA_DIR, 'valid_labels')
-
- train_dataset = CamVidDataset(
- x_train_dir,
- y_train_dir,
- )
- val_dataset = CamVidDataset(
- x_valid_dir,
- y_valid_dir,
- )
-
- train_loader = DataLoader(train_dataset, batch_size=8, shuffle=True,drop_last=True)
- val_loader = DataLoader(val_dataset, batch_size=8, shuffle=True,drop_last=True)
模型训练
- model = UnetPlusPlus(num_classes=33).cuda()
- #载入预训练模型
- #model.load_state_dict(torch.load(r"checkpoints/Unet++_25.pth"),strict=False)
- from d2l import torch as d2l
- from tqdm import tqdm
- import pandas as pd
- #损失函数选用多分类交叉熵损失函数
- lossf = nn.CrossEntropyLoss(ignore_index=255)
- #选用adam优化器来训练
- optimizer = optim.SGD(model.parameters(),lr=0.1)
- scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=50, gamma=0.1, last_epoch=-1)
-
- #训练50轮
- epochs_num = 50
- def train_ch13(net, train_iter, test_iter, loss, trainer, num_epochs,scheduler,
- devices=d2l.try_all_gpus()):
- timer, num_batches = d2l.Timer(), len(train_iter)
- animator = d2l.Animator(xlabel='epoch', xlim=[1, num_epochs], ylim=[0, 1],
- legend=['train loss', 'train acc', 'test acc'])
- net = nn.DataParallel(net, device_ids=devices).to(devices[0])
-
- loss_list = []
- train_acc_list = []
- test_acc_list = []
- epochs_list = []
- time_list = []
- for epoch in range(num_epochs):
- # Sum of training loss, sum of training accuracy, no. of examples,
- # no. of predictions
- metric = d2l.Accumulator(4)
- for i, (features, labels) in enumerate(train_iter):
- timer.start()
- l, acc = d2l.train_batch_ch13(
- net, features, labels.long(), loss, trainer, devices)
- metric.add(l, acc, labels.shape[0], labels.numel())
- timer.stop()
- if (i + 1) % (num_batches // 5) == 0 or i == num_batches - 1:
- animator.add(epoch + (i + 1) / num_batches,
- (metric[0] / metric[2], metric[1] / metric[3],
- None))
- test_acc = d2l.evaluate_accuracy_gpu(net, test_iter)
- animator.add(epoch + 1, (None, None, test_acc))
- scheduler.step()
-
- print(f"epoch {epoch+1} --- loss {metric[0] / metric[2]:.3f} --- train acc {metric[1] / metric[3]:.3f} --- test acc {test_acc:.3f} --- cost time {timer.sum()}")
-
- #---------保存训练数据---------------
- df = pd.DataFrame()
- loss_list.append(metric[0] / metric[2])
- train_acc_list.append(metric[1] / metric[3])
- test_acc_list.append(test_acc)
- epochs_list.append(epoch)
- time_list.append(timer.sum())
-
- df['epoch'] = epochs_list
- df['loss'] = loss_list
- df['train_acc'] = train_acc_list
- df['test_acc'] = test_acc_list
- df['time'] = time_list
- df.to_excel("savefile/Unet++_camvid1.xlsx")
- #----------------保存模型-------------------
- if np.mod(epoch+1, 5) == 0:
- torch.save(model.state_dict(), f'checkpoints/Unet++_{epoch+1}.pth')
开始训练
train_ch13(model, train_loader, val_loader, lossf, optimizer, epochs_num,scheduler)