FCN图像语义分割
FCN全卷网络,舍弃了传统的全毗连层,仅使用卷积和池化等操作完成,end-to-end的像素集预测网络。
网络特点:
- 不含全毗连层(fc)的全卷积(fully conv)网络,可顺应任意尺寸输入。
- 增大数据尺寸的反卷积(deconv)层,能够输出精致的结果。
- 结合差异深度层结果的跳级(skip)结构,同时确保鲁棒性和准确性。
语义分割
图像语义分割(semantic segmentation)是图像处置惩罚和机器视觉技术中关于图像理解的紧张一环,AI范畴中一个紧张分支,常被应用于人脸识别、物体检测、医学影像、卫星图像分析、自动驾驶感知等范畴。
这个就不消过多赘述了,目标检测是画框,语义分割就是把目标从图内里抠出来。
1. 数据处置惩罚
起首下载数据
- from download import download
- url = "https://mindspore-website.obs.cn-north-4.myhuaweicloud.com/notebook/datasets/dataset_fcn8s.tar"
- download(url, "./dataset", kind="tar", replace=True)
复制代码 数据预处置惩罚
- import numpy as np
- import cv2
- import mindspore.dataset as ds
- class SegDataset:
- def __init__(self,
- image_mean,
- image_std,
- data_file='',
- batch_size=32,
- crop_size=512,
- max_scale=2.0,
- min_scale=0.5,
- ignore_label=255,
- num_classes=21,
- num_readers=2,
- num_parallel_calls=4):
- self.data_file = data_file
- self.batch_size = batch_size
- self.crop_size = crop_size
- self.image_mean = np.array(image_mean, dtype=np.float32)
- self.image_std = np.array(image_std, dtype=np.float32)
- self.max_scale = max_scale
- self.min_scale = min_scale
- self.ignore_label = ignore_label
- self.num_classes = num_classes
- self.num_readers = num_readers
- self.num_parallel_calls = num_parallel_calls
- max_scale > min_scale
- # 初始化数据集的参数,如文件路径、批次大小、裁剪大小、缩放比例、忽略标签、类别数、读取线程数和并行调用数。
- def preprocess_dataset(self, image, label):
- image_out = cv2.imdecode(np.frombuffer(image, dtype=np.uint8), cv2.IMREAD_COLOR)
- label_out = cv2.imdecode(np.frombuffer(label, dtype=np.uint8), cv2.IMREAD_GRAYSCALE)
- sc = np.random.uniform(self.min_scale, self.max_scale)
- new_h, new_w = int(sc * image_out.shape[0]), int(sc * image_out.shape[1])
- image_out = cv2.resize(image_out, (new_w, new_h), interpolation=cv2.INTER_CUBIC)
- label_out = cv2.resize(label_out, (new_w, new_h), interpolation=cv2.INTER_NEAREST)
- image_out = (image_out - self.image_mean) / self.image_std
- out_h, out_w = max(new_h, self.crop_size), max(new_w, self.crop_size)
- pad_h, pad_w = out_h - new_h, out_w - new_w
- if pad_h > 0 or pad_w > 0:
- image_out = cv2.copyMakeBorder(image_out, 0, pad_h, 0, pad_w, cv2.BORDER_CONSTANT, value=0)
- label_out = cv2.copyMakeBorder(label_out, 0, pad_h, 0, pad_w, cv2.BORDER_CONSTANT, value=self.ignore_label)
- offset_h = np.random.randint(0, out_h - self.crop_size + 1)
- offset_w = np.random.randint(0, out_w - self.crop_size + 1)
- image_out = image_out[offset_h: offset_h + self.crop_size, offset_w: offset_w + self.crop_size, :]
- label_out = label_out[offset_h: offset_h + self.crop_size, offset_w: offset_w+self.crop_size]
- if np.random.uniform(0.0, 1.0) > 0.5:
- image_out = image_out[:, ::-1, :]
- label_out = label_out[:, ::-1]
- image_out = image_out.transpose((2, 0, 1))
- image_out = image_out.copy()
- label_out = label_out.copy()
- label_out = label_out.astype("int32")
- return image_out, label_out
- # 随机缩放图像和标签。对图像进行标准化(减去均值,除以标准差)。对图像和标签进行裁剪和翻转等数据增强操作。将图像转换为适合深度学习模型的格式(CHW)。
- def get_dataset(self):
- ds.config.set_numa_enable(True)
- dataset = ds.MindDataset(self.data_file, columns_list=["data", "label"],
- shuffle=True, num_parallel_workers=self.num_readers)
- transforms_list = self.preprocess_dataset
- dataset = dataset.map(operations=transforms_list, input_columns=["data", "label"],
- output_columns=["data", "label"],
- num_parallel_workers=self.num_parallel_calls)
- dataset = dataset.shuffle(buffer_size=self.batch_size * 10)
- dataset = dataset.batch(self.batch_size, drop_remainder=True)
- return dataset
- # 使用 MindDataset 从 self.data_file 读取数据,指定需要的列("data" 和 "label")以及读取线程数。将 preprocess_dataset 方法应用于数据集。打乱数据集,并按指定的批次大小进行批处理。
- # 定义创建数据集的参数
- IMAGE_MEAN = [103.53, 116.28, 123.675]
- IMAGE_STD = [57.375, 57.120, 58.395]
- DATA_FILE = "dataset/dataset_fcn8s/mindname.mindrecord"
- # 定义模型训练参数
- train_batch_size = 4
- crop_size = 512
- min_scale = 0.5
- max_scale = 2.0
- ignore_label = 255
- num_classes = 21
- # 实例化Dataset
- dataset = SegDataset(image_mean=IMAGE_MEAN,
- image_std=IMAGE_STD,
- data_file=DATA_FILE,
- batch_size=train_batch_size,
- crop_size=crop_size,
- max_scale=max_scale,
- min_scale=min_scale,
- ignore_label=ignore_label,
- num_classes=num_classes,
- num_readers=2,
- num_parallel_calls=4)
- dataset = dataset.get_dataset()
复制代码 2. 构建网络
- import mindspore.nn as nn
- class FCN8s(nn.Cell):
- def __init__(self, n_class):
- super().__init__()
- self.n_class = n_class
- self.conv1 = nn.SequentialCell(
- nn.Conv2d(in_channels=3, out_channels=64,
- kernel_size=3, weight_init='xavier_uniform'),
- nn.BatchNorm2d(64),
- nn.ReLU(),
- nn.Conv2d(in_channels=64, out_channels=64,
- kernel_size=3, weight_init='xavier_uniform'),
- nn.BatchNorm2d(64),
- nn.ReLU()
- )
- self.pool1 = nn.MaxPool2d(kernel_size=2, stride=2)
- self.conv2 = nn.SequentialCell(
- nn.Conv2d(in_channels=64, out_channels=128,
- kernel_size=3, weight_init='xavier_uniform'),
- nn.BatchNorm2d(128),
- nn.ReLU(),
- nn.Conv2d(in_channels=128, out_channels=128,
- kernel_size=3, weight_init='xavier_uniform'),
- nn.BatchNorm2d(128),
- nn.ReLU()
- )
- self.pool2 = nn.MaxPool2d(kernel_size=2, stride=2)
- self.conv3 = nn.SequentialCell(
- nn.Conv2d(in_channels=128, out_channels=256,
- kernel_size=3, weight_init='xavier_uniform'),
- nn.BatchNorm2d(256),
- nn.ReLU(),
- nn.Conv2d(in_channels=256, out_channels=256,
- kernel_size=3, weight_init='xavier_uniform'),
- nn.BatchNorm2d(256),
- nn.ReLU(),
- nn.Conv2d(in_channels=256, out_channels=256,
- kernel_size=3, weight_init='xavier_uniform'),
- nn.BatchNorm2d(256),
- nn.ReLU()
- )
- self.pool3 = nn.MaxPool2d(kernel_size=2, stride=2)
- self.conv4 = nn.SequentialCell(
- nn.Conv2d(in_channels=256, out_channels=512,
- kernel_size=3, weight_init='xavier_uniform'),
- nn.BatchNorm2d(512),
- nn.ReLU(),
- nn.Conv2d(in_channels=512, out_channels=512,
- kernel_size=3, weight_init='xavier_uniform'),
- nn.BatchNorm2d(512),
- nn.ReLU(),
- nn.Conv2d(in_channels=512, out_channels=512,
- kernel_size=3, weight_init='xavier_uniform'),
- nn.BatchNorm2d(512),
- nn.ReLU()
- )
- self.pool4 = nn.MaxPool2d(kernel_size=2, stride=2)
- self.conv5 = nn.SequentialCell(
- nn.Conv2d(in_channels=512, out_channels=512,
- kernel_size=3, weight_init='xavier_uniform'),
- nn.BatchNorm2d(512),
- nn.ReLU(),
- nn.Conv2d(in_channels=512, out_channels=512,
- kernel_size=3, weight_init='xavier_uniform'),
- nn.BatchNorm2d(512),
- nn.ReLU(),
- nn.Conv2d(in_channels=512, out_channels=512,
- kernel_size=3, weight_init='xavier_uniform'),
- nn.BatchNorm2d(512),
- nn.ReLU()
- )
- self.pool5 = nn.MaxPool2d(kernel_size=2, stride=2)
- self.conv6 = nn.SequentialCell(
- nn.Conv2d(in_channels=512, out_channels=4096,
- kernel_size=7, weight_init='xavier_uniform'),
- nn.BatchNorm2d(4096),
- nn.ReLU(),
- )
- self.conv7 = nn.SequentialCell(
- nn.Conv2d(in_channels=4096, out_channels=4096,
- kernel_size=1, weight_init='xavier_uniform'),
- nn.BatchNorm2d(4096),
- nn.ReLU(),
- )
- self.score_fr = nn.Conv2d(in_channels=4096, out_channels=self.n_class,
- kernel_size=1, weight_init='xavier_uniform')
- self.upscore2 = nn.Conv2dTranspose(in_channels=self.n_class, out_channels=self.n_class,
- kernel_size=4, stride=2, weight_init='xavier_uniform')
- self.score_pool4 = nn.Conv2d(in_channels=512, out_channels=self.n_class,
- kernel_size=1, weight_init='xavier_uniform')
- self.upscore_pool4 = nn.Conv2dTranspose(in_channels=self.n_class, out_channels=self.n_class,
- kernel_size=4, stride=2, weight_init='xavier_uniform')
- self.score_pool3 = nn.Conv2d(in_channels=256, out_channels=self.n_class,
- kernel_size=1, weight_init='xavier_uniform')
- self.upscore8 = nn.Conv2dTranspose(in_channels=self.n_class, out_channels=self.n_class,
- kernel_size=16, stride=8, weight_init='xavier_uniform')
- def construct(self, x):
- x1 = self.conv1(x)
- p1 = self.pool1(x1)
- x2 = self.conv2(p1)
- p2 = self.pool2(x2)
- x3 = self.conv3(p2)
- p3 = self.pool3(x3)
- x4 = self.conv4(p3)
- p4 = self.pool4(x4)
- x5 = self.conv5(p4)
- p5 = self.pool5(x5)
- x6 = self.conv6(p5)
- x7 = self.conv7(x6)
- sf = self.score_fr(x7)
- u2 = self.upscore2(sf)
- s4 = self.score_pool4(p4)
- f4 = s4 + u2
- u4 = self.upscore_pool4(f4)
- s3 = self.score_pool3(p3)
- f3 = s3 + u4
- out = self.upscore8(f3)
- return out
复制代码 3. 训练准备
下载预训练VGG-16模型
- from download import download
- from mindspore import load_checkpoint, load_param_into_net
- url = "https://mindspore-website.obs.cn-north-4.myhuaweicloud.com/notebook/datasets/fcn8s_vgg16_pretrain.ckpt"
- download(url, "fcn8s_vgg16_pretrain.ckpt", replace=True)
- def load_vgg16():
- ckpt_vgg16 = "fcn8s_vgg16_pretrain.ckpt"
- param_vgg = load_checkpoint(ckpt_vgg16)
- load_param_into_net(net, param_vgg)
复制代码 评估函数定义
- import numpy as np
- import mindspore as ms
- import mindspore.nn as nn
- import mindspore.train as train
- class PixelAccuracy(train.Metric):
- def __init__(self, num_class=21):
- super(PixelAccuracy, self).__init__()
- self.num_class = num_class
- def _generate_matrix(self, gt_image, pre_image):
- mask = (gt_image >= 0) & (gt_image < self.num_class)
- label = self.num_class * gt_image[mask].astype('int') + pre_image[mask]
- count = np.bincount(label, minlength=self.num_class**2)
- confusion_matrix = count.reshape(self.num_class, self.num_class)
- return confusion_matrix
- def clear(self):
- self.confusion_matrix = np.zeros((self.num_class,) * 2)
- def update(self, *inputs):
- y_pred = inputs[0].asnumpy().argmax(axis=1)
- y = inputs[1].asnumpy().reshape(4, 512, 512)
- self.confusion_matrix += self._generate_matrix(y, y_pred)
- def eval(self):
- pixel_accuracy = np.diag(self.confusion_matrix).sum() / self.confusion_matrix.sum()
- return pixel_accuracy
- class PixelAccuracyClass(train.Metric):
- def __init__(self, num_class=21):
- super(PixelAccuracyClass, self).__init__()
- self.num_class = num_class
- def _generate_matrix(self, gt_image, pre_image):
- mask = (gt_image >= 0) & (gt_image < self.num_class)
- label = self.num_class * gt_image[mask].astype('int') + pre_image[mask]
- count = np.bincount(label, minlength=self.num_class**2)
- confusion_matrix = count.reshape(self.num_class, self.num_class)
- return confusion_matrix
- def update(self, *inputs):
- y_pred = inputs[0].asnumpy().argmax(axis=1)
- y = inputs[1].asnumpy().reshape(4, 512, 512)
- self.confusion_matrix += self._generate_matrix(y, y_pred)
- def clear(self):
- self.confusion_matrix = np.zeros((self.num_class,) * 2)
- def eval(self):
- mean_pixel_accuracy = np.diag(self.confusion_matrix) / self.confusion_matrix.sum(axis=1)
- mean_pixel_accuracy = np.nanmean(mean_pixel_accuracy)
- return mean_pixel_accuracy
- class MeanIntersectionOverUnion(train.Metric):
- def __init__(self, num_class=21):
- super(MeanIntersectionOverUnion, self).__init__()
- self.num_class = num_class
- def _generate_matrix(self, gt_image, pre_image):
- mask = (gt_image >= 0) & (gt_image < self.num_class)
- label = self.num_class * gt_image[mask].astype('int') + pre_image[mask]
- count = np.bincount(label, minlength=self.num_class**2)
- confusion_matrix = count.reshape(self.num_class, self.num_class)
- return confusion_matrix
- def update(self, *inputs):
- y_pred = inputs[0].asnumpy().argmax(axis=1)
- y = inputs[1].asnumpy().reshape(4, 512, 512)
- self.confusion_matrix += self._generate_matrix(y, y_pred)
- def clear(self):
- self.confusion_matrix = np.zeros((self.num_class,) * 2)
- def eval(self):
- mean_iou = np.diag(self.confusion_matrix) / (
- np.sum(self.confusion_matrix, axis=1) + np.sum(self.confusion_matrix, axis=0) -
- np.diag(self.confusion_matrix))
- mean_iou = np.nanmean(mean_iou)
- return mean_iou
- class FrequencyWeightedIntersectionOverUnion(train.Metric):
- def __init__(self, num_class=21):
- super(FrequencyWeightedIntersectionOverUnion, self).__init__()
- self.num_class = num_class
- def _generate_matrix(self, gt_image, pre_image):
- mask = (gt_image >= 0) & (gt_image < self.num_class)
- label = self.num_class * gt_image[mask].astype('int') + pre_image[mask]
- count = np.bincount(label, minlength=self.num_class**2)
- confusion_matrix = count.reshape(self.num_class, self.num_class)
- return confusion_matrix
- def update(self, *inputs):
- y_pred = inputs[0].asnumpy().argmax(axis=1)
- y = inputs[1].asnumpy().reshape(4, 512, 512)
- self.confusion_matrix += self._generate_matrix(y, y_pred)
- def clear(self):
- self.confusion_matrix = np.zeros((self.num_class,) * 2)
- def eval(self):
- freq = np.sum(self.confusion_matrix, axis=1) / np.sum(self.confusion_matrix)
- iu = np.diag(self.confusion_matrix) / (
- np.sum(self.confusion_matrix, axis=1) + np.sum(self.confusion_matrix, axis=0) -
- np.diag(self.confusion_matrix))
- frequency_weighted_iou = (freq[freq > 0] * iu[freq > 0]).sum()
- return frequency_weighted_iou
复制代码 模型训练
- import mindspore
- from mindspore import Tensor
- import mindspore.nn as nn
- from mindspore.train import ModelCheckpoint, CheckpointConfig, LossMonitor, TimeMonitor, Model
- device_target = "Ascend"
- mindspore.set_context(mode=mindspore.PYNATIVE_MODE, device_target=device_target)
- train_batch_size = 4
- num_classes = 21
- # 初始化模型结构
- net = FCN8s(n_class=21)
- # 导入vgg16预训练参数
- load_vgg16()
- # 计算学习率
- min_lr = 0.0005
- base_lr = 0.05
- train_epochs = 1
- iters_per_epoch = dataset.get_dataset_size()
- total_step = iters_per_epoch * train_epochs
- lr_scheduler = mindspore.nn.cosine_decay_lr(min_lr,
- base_lr,
- total_step,
- iters_per_epoch,
- decay_epoch=2)
- lr = Tensor(lr_scheduler[-1])
- # 定义损失函数
- loss = nn.CrossEntropyLoss(ignore_index=255)
- # 定义优化器
- optimizer = nn.Momentum(params=net.trainable_params(), learning_rate=lr, momentum=0.9, weight_decay=0.0001)
- # 定义loss_scale
- scale_factor = 4
- scale_window = 3000
- loss_scale_manager = ms.amp.DynamicLossScaleManager(scale_factor, scale_window)
- # 初始化模型
- if device_target == "Ascend":
- model = Model(net, loss_fn=loss, optimizer=optimizer, loss_scale_manager=loss_scale_manager, metrics={"pixel accuracy": PixelAccuracy(), "mean pixel accuracy": PixelAccuracyClass(), "mean IoU": MeanIntersectionOverUnion(), "frequency weighted IoU": FrequencyWeightedIntersectionOverUnion()})
- else:
- model = Model(net, loss_fn=loss, optimizer=optimizer, metrics={"pixel accuracy": PixelAccuracy(), "mean pixel accuracy": PixelAccuracyClass(), "mean IoU": MeanIntersectionOverUnion(), "frequency weighted IoU": FrequencyWeightedIntersectionOverUnion()})
- # 设置ckpt文件保存的参数
- time_callback = TimeMonitor(data_size=iters_per_epoch)
- loss_callback = LossMonitor()
- callbacks = [time_callback, loss_callback]
- save_steps = 330
- keep_checkpoint_max = 5
- config_ckpt = CheckpointConfig(save_checkpoint_steps=10,
- keep_checkpoint_max=keep_checkpoint_max)
- ckpt_callback = ModelCheckpoint(prefix="FCN8s",
- directory="./ckpt",
- config=config_ckpt)
- callbacks.append(ckpt_callback)
- model.train(train_epochs, dataset, callbacks=callbacks)
复制代码 4. 模型评估
- IMAGE_MEAN = [103.53, 116.28, 123.675]
- IMAGE_STD = [57.375, 57.120, 58.395]
- DATA_FILE = "dataset/dataset_fcn8s/mindname.mindrecord"
- # 下载已训练好的权重文件
- url = "https://mindspore-website.obs.cn-north-4.myhuaweicloud.com/notebook/datasets/FCN8s.ckpt"
- download(url, "FCN8s.ckpt", replace=True)
- net = FCN8s(n_class=num_classes)
- ckpt_file = "FCN8s.ckpt"
- param_dict = load_checkpoint(ckpt_file)
- load_param_into_net(net, param_dict)
- if device_target == "Ascend":
- model = Model(net, loss_fn=loss, optimizer=optimizer, loss_scale_manager=loss_scale_manager, metrics={"pixel accuracy": PixelAccuracy(), "mean pixel accuracy": PixelAccuracyClass(), "mean IoU": MeanIntersectionOverUnion(), "frequency weighted IoU": FrequencyWeightedIntersectionOverUnion()})
- else:
- model = Model(net, loss_fn=loss, optimizer=optimizer, metrics={"pixel accuracy": PixelAccuracy(), "mean pixel accuracy": PixelAccuracyClass(), "mean IoU": MeanIntersectionOverUnion(), "frequency weighted IoU": FrequencyWeightedIntersectionOverUnion()})
- # 实例化Dataset
- dataset = SegDataset(image_mean=IMAGE_MEAN,
- image_std=IMAGE_STD,
- data_file=DATA_FILE,
- batch_size=train_batch_size,
- crop_size=crop_size,
- max_scale=max_scale,
- min_scale=min_scale,
- ignore_label=ignore_label,
- num_classes=num_classes,
- num_readers=2,
- num_parallel_calls=4)
- dataset_eval = dataset.get_dataset()
- model.eval(dataset_eval)
复制代码 5. 模型推理
- import cv2
- import matplotlib.pyplot as plt
- net = FCN8s(n_class=num_classes)
- # 设置超参
- ckpt_file = "FCN8s.ckpt"
- param_dict = load_checkpoint(ckpt_file)
- load_param_into_net(net, param_dict)
- eval_batch_size = 4
- img_lst = []
- mask_lst = []
- res_lst = []
- # 推理效果展示(上方为输入图片,下方为推理效果图片)
- plt.figure(figsize=(8, 5))
- show_data = next(dataset_eval.create_dict_iterator())
- show_images = show_data["data"].asnumpy()
- mask_images = show_data["label"].reshape([4, 512, 512])
- show_images = np.clip(show_images, 0, 1)
- for i in range(eval_batch_size):
- img_lst.append(show_images[i])
- mask_lst.append(mask_images[i])
- res = net(show_data["data"]).asnumpy().argmax(axis=1)
- for i in range(eval_batch_size):
- plt.subplot(2, 4, i + 1)
- plt.imshow(img_lst[i].transpose(1, 2, 0))
- plt.axis("off")
- plt.subplots_adjust(wspace=0.05, hspace=0.02)
- plt.subplot(2, 4, i + 5)
- plt.imshow(res[i])
- plt.axis("off")
- plt.subplots_adjust(wspace=0.05, hspace=0.02)
- plt.show()
复制代码 打卡:
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |