根本介绍
今天要学习的内容是计算机视觉范畴中的目标检测任务。与图像分类相比,目标检测更难,因为目标检测不但要检测出图片中的物体的类别,还要检测出该物体的位置。现主流的目标检测算法大致可分为两种,一种是基于CNN的,另一种是基于Transformer的。基于CNN的还可以细分为三种,以Faster R-CNN为代表的一阶段目标检测,以Yolo为代表的二阶段目标检测,以及Anchor-free算法。本文会先简单介绍一下SSD模型,然后在MindSpore框架下,使用COCO2017数据集训练SSD模型,并进行模型评估。
SSD模型简介
SSD与Yolo一样,都是一阶段目标检测算法,是直接通过主干网络给出类别位置信息,不须要地区天生。此外,SSD通过卷积神经网络进行特征提取,取不同的特征层进行检测输出,所以SSD是一种多尺度的检测方法,在须要检测的特征层,直接使用一个3 ×× 3卷积,进行通道的变换。SSD的框架图如下图所示:
SSD采用VGG16作为底子模型,然后在VGG16的底子上新增了卷积层来获得更多的特征图以用于检测。如果VGG16,也可以使用ResNet19,ResNet50等作为底子模型。SSD的网络布局如下图所示
可大致分为四个模块:VGG base Layer,Extra Feature Layer,Detection Layer、NMS
如下图所示,VGG base Layer也就是backbone layer
输入图像颠末预处理后巨细固定为300×300,起首颠末backbone,本案例中使用的是VGG16网络的前13个卷积层,然后分别将VGG16的全连接层fc6和fc7转换成3 ×× 3卷积层block6和1 ×× 1卷积层block7,进一步提取特征。 在block6中,使用了空洞数为6的空洞卷积,其padding也为6,这样做同样也是为了增加感受野的同时保持参数目与特征图尺寸的不变
Extra Feature Layer是在VGG16的底子增加的特征提取层,用于提取更高层的语义信息,详细布局如下:
block8-11,用于更高语义信息的提取。block8的通道数为512,而block9、block10与block11的通道数都为256。从block7到block11,这5个卷积后输出特征图的尺寸依次为19×19、10×10、5×5、3×3和1×1。为了低落参数目,使用了1×1卷积先低落通道数为该层输出通道数的一半,再利用3×3卷积进行特征提取
Detection Layer负责类别和位置推测,这须要借助anchor实现。SSD模型一共有6个推测特征图,对于此中一个尺寸为m*n,通道为p的推测特征图,假设其每个像素点会产生k个anchor,每个anchor会对应c个类别和4个回归偏移量,使用(4+c)k个尺寸为3x3,通道为p的卷积核对该推测特征图进行卷积操作,得到尺寸为m*n,通道为(4+c)m*k的输出特征图,它包含了推测特征图上所产生的每个anchor的回归偏移量和各类别概率分数。
NMS即非极大抑制法,训练过程不用,只用在推理过程。其算法流程如下
SSD代码实践
数据集预备
我们将会使用COCO2017数据集进行训练,COCO2017数据集很轻易下载,加载到内存的操作也很简单,我们将重点放在SSD模型的数据增强方法。为了使模型对于各种输入对象巨细和形状更加鲁棒,SSD算法每个训练图像通过以下选项之一随机采样:
- 使用整个原始输入图像
- 采样一个地区,使采样地区和原始图片最小的交并比重叠为0.1,0.3,0.5,0.7或0.9
- 随机采样一个地区
每个采样地区的巨细为原始图像巨细的[0.3,1],长宽比在1/2和2之间。如果真实标签框中央在采样地区内,则保存两者重叠部门作为新图片的真实标注框。在上述采样步调之后,将每个采样地区巨细调整为固定巨细,并以0.5的概率程度翻转。其代码实现如下:
- import cv2
- import numpy as np
- def _rand(a=0., b=1.):
- return np.random.rand() * (b - a) + a
- def intersect(box_a, box_b):
- """Compute the intersect of two sets of boxes."""
- max_yx = np.minimum(box_a[:, 2:4], box_b[2:4])
- min_yx = np.maximum(box_a[:, :2], box_b[:2])
- inter = np.clip((max_yx - min_yx), a_min=0, a_max=np.inf)
- return inter[:, 0] * inter[:, 1]
- def jaccard_numpy(box_a, box_b):
- """Compute the jaccard overlap of two sets of boxes."""
- inter = intersect(box_a, box_b)
- area_a = ((box_a[:, 2] - box_a[:, 0]) *
- (box_a[:, 3] - box_a[:, 1]))
- area_b = ((box_b[2] - box_b[0]) *
- (box_b[3] - box_b[1]))
- union = area_a + area_b - inter
- return inter / union
- def random_sample_crop(image, boxes):
- """Crop images and boxes randomly."""
- height, width, _ = image.shape
- min_iou = np.random.choice([None, 0.1, 0.3, 0.5, 0.7, 0.9])
- if min_iou is None:
- return image, boxes
- for _ in range(50):
- image_t = image
- w = _rand(0.3, 1.0) * width
- h = _rand(0.3, 1.0) * height
- # aspect ratio constraint b/t .5 & 2
- if h / w < 0.5 or h / w > 2:
- continue
- left = _rand() * (width - w)
- top = _rand() * (height - h)
- rect = np.array([int(top), int(left), int(top + h), int(left + w)])
- overlap = jaccard_numpy(boxes, rect)
- # dropout some boxes
- drop_mask = overlap > 0
- if not drop_mask.any():
- continue
- if overlap[drop_mask].min() < min_iou and overlap[drop_mask].max() > (min_iou + 0.2):
- continue
- image_t = image_t[rect[0]:rect[2], rect[1]:rect[3], :]
- centers = (boxes[:, :2] + boxes[:, 2:4]) / 2.0
- m1 = (rect[0] < centers[:, 0]) * (rect[1] < centers[:, 1])
- m2 = (rect[2] > centers[:, 0]) * (rect[3] > centers[:, 1])
- # mask in that both m1 and m2 are true
- mask = m1 * m2 * drop_mask
- # have any valid boxes? try again if not
- if not mask.any():
- continue
- # take only matching gt boxes
- boxes_t = boxes[mask, :].copy()
- boxes_t[:, :2] = np.maximum(boxes_t[:, :2], rect[:2])
- boxes_t[:, :2] -= rect[:2]
- boxes_t[:, 2:4] = np.minimum(boxes_t[:, 2:4], rect[2:4])
- boxes_t[:, 2:4] -= rect[:2]
- return image_t, boxes_t
- return image, boxes
- def ssd_bboxes_encode(boxes):
- """Labels anchors with ground truth inputs."""
- def jaccard_with_anchors(bbox):
- """Compute jaccard score a box and the anchors."""
- # Intersection bbox and volume.
- ymin = np.maximum(y1, bbox[0])
- xmin = np.maximum(x1, bbox[1])
- ymax = np.minimum(y2, bbox[2])
- xmax = np.minimum(x2, bbox[3])
- w = np.maximum(xmax - xmin, 0.)
- h = np.maximum(ymax - ymin, 0.)
- # Volumes.
- inter_vol = h * w
- union_vol = vol_anchors + (bbox[2] - bbox[0]) * (bbox[3] - bbox[1]) - inter_vol
- jaccard = inter_vol / union_vol
- return np.squeeze(jaccard)
- pre_scores = np.zeros((8732), dtype=np.float32)
- t_boxes = np.zeros((8732, 4), dtype=np.float32)
- t_label = np.zeros((8732), dtype=np.int64)
- for bbox in boxes:
- label = int(bbox[4])
- scores = jaccard_with_anchors(bbox)
- idx = np.argmax(scores)
- scores[idx] = 2.0
- mask = (scores > matching_threshold)
- mask = mask & (scores > pre_scores)
- pre_scores = np.maximum(pre_scores, scores * mask)
- t_label = mask * label + (1 - mask) * t_label
- for i in range(4):
- t_boxes[:, i] = mask * bbox[i] + (1 - mask) * t_boxes[:, i]
- index = np.nonzero(t_label)
- # Transform to tlbr.
- bboxes = np.zeros((8732, 4), dtype=np.float32)
- bboxes[:, [0, 1]] = (t_boxes[:, [0, 1]] + t_boxes[:, [2, 3]]) / 2
- bboxes[:, [2, 3]] = t_boxes[:, [2, 3]] - t_boxes[:, [0, 1]]
- # Encode features.
- bboxes_t = bboxes[index]
- default_boxes_t = default_boxes[index]
- bboxes_t[:, :2] = (bboxes_t[:, :2] - default_boxes_t[:, :2]) / (default_boxes_t[:, 2:] * 0.1)
- tmp = np.maximum(bboxes_t[:, 2:4] / default_boxes_t[:, 2:4], 0.000001)
- bboxes_t[:, 2:4] = np.log(tmp) / 0.2
- bboxes[index] = bboxes_t
- num_match = np.array([len(np.nonzero(t_label)[0])], dtype=np.int32)
- return bboxes, t_label.astype(np.int32), num_match
- def preprocess_fn(img_id, image, box, is_training):
- """Preprocess function for dataset."""
- cv2.setNumThreads(2)
- def _infer_data(image, input_shape):
- img_h, img_w, _ = image.shape
- input_h, input_w = input_shape
- image = cv2.resize(image, (input_w, input_h))
- # When the channels of image is 1
- if len(image.shape) == 2:
- image = np.expand_dims(image, axis=-1)
- image = np.concatenate([image, image, image], axis=-1)
- return img_id, image, np.array((img_h, img_w), np.float32)
- def _data_aug(image, box, is_training, image_size=(300, 300)):
- ih, iw, _ = image.shape
- h, w = image_size
- if not is_training:
- return _infer_data(image, image_size)
- # Random crop
- box = box.astype(np.float32)
- image, box = random_sample_crop(image, box)
- ih, iw, _ = image.shape
- # Resize image
- image = cv2.resize(image, (w, h))
- # Flip image or not
- flip = _rand() < .5
- if flip:
- image = cv2.flip(image, 1, dst=None)
- # When the channels of image is 1
- if len(image.shape) == 2:
- image = np.expand_dims(image, axis=-1)
- image = np.concatenate([image, image, image], axis=-1)
- box[:, [0, 2]] = box[:, [0, 2]] / ih
- box[:, [1, 3]] = box[:, [1, 3]] / iw
- if flip:
- box[:, [1, 3]] = 1 - box[:, [3, 1]]
- box, label, num_match = ssd_bboxes_encode(box)
- return image, box, label, num_match
- return _data_aug(image, box, is_training, image_size=[300, 300])
复制代码 模型搭建
借助MindSpore可以很快搭建出模型,模型代码如下:
- class SSD300Vgg16(nn.Cell):
- """SSD300Vgg16 module."""
- def __init__(self):
- super(SSD300Vgg16, self).__init__()
- # VGG16 backbone: block1~5
- self.backbone = Vgg16()
- # SSD blocks: block6~7
- self.b6_1 = nn.Conv2d(in_channels=512, out_channels=1024, kernel_size=3, padding=6, dilation=6, pad_mode='pad')
- self.b6_2 = nn.Dropout(p=0.5)
- self.b7_1 = nn.Conv2d(in_channels=1024, out_channels=1024, kernel_size=1)
- self.b7_2 = nn.Dropout(p=0.5)
- # Extra Feature Layers: block8~11
- self.b8_1 = nn.Conv2d(in_channels=1024, out_channels=256, kernel_size=1, padding=1, pad_mode='pad')
- self.b8_2 = nn.Conv2d(in_channels=256, out_channels=512, kernel_size=3, stride=2, pad_mode='valid')
- self.b9_1 = nn.Conv2d(in_channels=512, out_channels=128, kernel_size=1, padding=1, pad_mode='pad')
- self.b9_2 = nn.Conv2d(in_channels=128, out_channels=256, kernel_size=3, stride=2, pad_mode='valid')
- self.b10_1 = nn.Conv2d(in_channels=256, out_channels=128, kernel_size=1)
- self.b10_2 = nn.Conv2d(in_channels=128, out_channels=256, kernel_size=3, pad_mode='valid')
- self.b11_1 = nn.Conv2d(in_channels=256, out_channels=128, kernel_size=1)
- self.b11_2 = nn.Conv2d(in_channels=128, out_channels=256, kernel_size=3, pad_mode='valid')
- # boxes
- self.multi_box = MultiBox()
- def construct(self, x):
- # VGG16 backbone: block1~5
- block4, x = self.backbone(x)
- # SSD blocks: block6~7
- x = self.b6_1(x) # 1024
- x = self.b6_2(x)
- x = self.b7_1(x) # 1024
- x = self.b7_2(x)
- block7 = x
- # Extra Feature Layers: block8~11
- x = self.b8_1(x) # 256
- x = self.b8_2(x) # 512
- block8 = x
- x = self.b9_1(x) # 128
- x = self.b9_2(x) # 256
- block9 = x
- x = self.b10_1(x) # 128
- x = self.b10_2(x) # 256
- block10 = x
- x = self.b11_1(x) # 128
- x = self.b11_2(x) # 256
- block11 = x
- # boxes
- multi_feature = (block4, block7, block8, block9, block10, block11)
- pred_loc, pred_label = self.multi_box(multi_feature)
- if not self.training:
- pred_label = ops.sigmoid(pred_label)
- pred_loc = pred_loc.astype(ms.float32)
- pred_label = pred_label.astype(ms.float32)
- return pred_loc, pred_label
复制代码 模型训练
模型训练时,使用上述所说的数据增强方式,丧失韩式是类别丧失函数和位置丧失函数的加权和,设置模型训练的epoch次数为60,然后通过create_ssd_dataset类创建了训练集和验证集。batch_size巨细为5,图像尺寸同一调整为300×300。丧失函数使用位置丧失函数和置信度丧失函数的加权和,优化器使用Momentum,并设置初始学习率为0.001。回调函数方面使用了LossMonitor和TimeMonitor来监控训练过程中每个epoch结束后,丧失值Loss的变化情况以及每个epoch、每个step的运行时间。设置每训练10个epoch保存一次模型。详细代码如下:
- dataset = create_ssd_dataset(mindrecord_file, batch_size=5, rank=0, use_multiprocessing=True)
- dataset_size = dataset.get_dataset_size()
- image, get_loc, gt_label, num_matched_boxes = next(dataset.create_tuple_iterator())
- # Network definition and initialization
- network = SSD300Vgg16()
- init_net_param(network)
- # Define the learning rate
- lr = Tensor(get_lr(global_step=0 * dataset_size,
- lr_init=0.001, lr_end=0.001 * 0.05, lr_max=0.05,
- warmup_epochs=2, total_epochs=60, steps_per_epoch=dataset_size))
- # Define the optimizer
- opt = nn.Momentum(filter(lambda x: x.requires_grad, network.get_parameters()), lr,
- 0.9, 0.00015, float(1024))
- # Define the forward procedure
- def forward_fn(x, gt_loc, gt_label, num_matched_boxes):
- pred_loc, pred_label = network(x)
- mask = ops.less(0, gt_label).astype(ms.float32)
- num_matched_boxes = ops.sum(num_matched_boxes.astype(ms.float32))
- # Positioning loss
- mask_loc = ops.tile(ops.expand_dims(mask, -1), (1, 1, 4))
- smooth_l1 = nn.SmoothL1Loss()(pred_loc, gt_loc) * mask_loc
- loss_loc = ops.sum(ops.sum(smooth_l1, -1), -1)
- # Category loss
- loss_cls = class_loss(pred_label, gt_label)
- loss_cls = ops.sum(loss_cls, (1, 2))
- return ops.sum((loss_cls + loss_loc) / num_matched_boxes)
- grad_fn = ms.value_and_grad(forward_fn, None, opt.parameters, has_aux=False)
- loss_scaler = DynamicLossScaler(1024, 2, 1000)
- # Gradient updates
- def train_step(x, gt_loc, gt_label, num_matched_boxes):
- loss, grads = grad_fn(x, gt_loc, gt_label, num_matched_boxes)
- opt(grads)
- return loss
- print("=================== Starting Training =====================")
- for epoch in range(60):
- network.set_train(True)
- begin_time = time.time()
- for step, (image, get_loc, gt_label, num_matched_boxes) in enumerate(dataset.create_tuple_iterator()):
- loss = train_step(image, get_loc, gt_label, num_matched_boxes)
- end_time = time.time()
- times = end_time - begin_time
- print(f"Epoch:[{int(epoch + 1)}/{int(60)}], "
- f"loss:{loss} , "
- f"time:{times}s ")
- ms.save_checkpoint(network, "ssd-60_9.ckpt")
- print("=================== Training Success =====================")
复制代码 模型评估
训练好自然就要进行模型评估,本次使用的评估指标是目标检测范畴的经典指标Average Precision、Average Recall和mAP。评估结果如下:
可以看出:好像各个评价指标的表现都很一样平常,我个人以为有两个原因,一个是测试数据集太少了,只有9张图片好像;另一个是,SSD模型对中小物体的检测本领本来就比较弱,所以差一些。
总结
今天所学习的SSD是有些难度的,固然我之前接触目标检测算法比较多,但还是第一接触SSD算法。不外得益于之前积聚的经验,今天的很多东西能比较快速理解。今天运行的SSD模型的结果比最新的Yolo差很多,但有其优点。此外,本人在这里只是回顾一些官方文档中的一些重要部门,SSD更详细的解说和代码解说还是要看官方文档的。
Jupyter在线运行情况
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |