马上注册,结交更多好友,享用更多功能,让你轻松玩转社区。
您需要 登录 才可以下载或查看,没有账号?立即注册
×
- from __future__ import absolute_import
- from __future__ import division
- from __future__ import print_function
- import math
- import numpy as np
- import torch
- import torch.nn as nn
- import torch._utils
- import torch.nn.functional as F
- import torch.nn.init as init
- import torch.optim as optim
- from Lib.config import config
- import random
- import scipy.io as scio
- from torch.utils.data import TensorDataset, DataLoader
- import csv
- import matplotlib.pyplot as plt
- # 定义一个3x3卷积!kernel_initializer='he_normal','glorot_normal'
- def regularized_padded_conv(in_channels, out_channels, kernel_size, stride=1):
- conv = nn.Conv2d(in_channels, out_channels, kernel_size, stride, padding=kernel_size // 2, bias=False)
- # 使用 kaiming_normal_ 进行初始化
- nn.init.kaiming_normal_(conv.weight, mode='fan_out', nonlinearity='leaky_relu')
- return conv
- ####################### 通道注意力机制 ##########################
- class ChannelAttention(nn.Module):
- def __init__(self, in_planes, ratio=16):
- super(ChannelAttention, self).__init__()
- self.avg_pool = nn.AdaptiveAvgPool2d((1, 1))
- self.max_pool = nn.AdaptiveMaxPool2d((1, 1))
- compressed_channels = in_planes // ratio
- self.conv1 = nn.Conv2d(in_planes, compressed_channels, kernel_size=1, stride=1, padding=0)
- self.conv2 = nn.Conv2d(compressed_channels, in_planes, kernel_size=1, stride=1, padding=0)
- self.leaky_relu = nn.LeakyReLU(negative_slope=0.1, inplace=True)
- def forward(self, inputs):
- avg = self.avg_pool(inputs)
- max = self.max_pool(inputs)
- avg = self.conv2(self.leaky_relu(self.conv1(avg)))
- max = self.conv2(self.leaky_relu(self.conv1(max)))
- out = avg + max
- out = torch.sigmoid(out)
- return out
- ########################### 空间注意力机制 ###########################
- class SpatialAttention(nn.Module):
- def __init__(self, kernel_size=7):
- super(SpatialAttention, self).__init__()
- self.conv1 = regularized_padded_conv(2, 1, kernel_size, stride=1)
- self.sigmoid = nn.Sigmoid()
- self.leaky_relu = nn.LeakyReLU(negative_slope=0.1, inplace=True)
- def forward(self, inputs):
- avg_out = torch.mean(inputs, dim=1, keepdim=True)
- max_out, _ = torch.max(inputs, dim=1, keepdim=True)
- out = torch.cat([avg_out, max_out], dim=1)
- out = self.conv1(out)
- out = self.sigmoid(out)
- return out
- ####################################csc layer###########################################################
- class elasnet_prox(nn.Module):
- r"""Applies the elastic net proximal operator,
- NOTS: it will degenerate to ell1_prox if mu=0.0
- The elastic net proximal operator function is given as the following function
- \argmin_{x} \lambda ||x||_1 + \mu /2 ||x||_2^2 + 0.5 ||x - input||_2^2
- Args:
- lambd: the :math:`\lambda` value on the ell_1 penalty term. Default: 0.5
- mu: the :math:`\mu` value on the ell_2 penalty term. Default: 0.0
- Shape:
- - Input: :math:`(N, *)` where `*` means, any number of additional
- dimensions
- - Output: :math:`(N, *)`, same shape as the input
- """
- def __init__(self, lambd=0.5, mu=0.0):
- super(elasnet_prox, self).__init__()
- self.lambd = lambd
- self.scaling_mu = 1.0 / (1.0 + mu)
- def forward(self, input):
- return F.softshrink(input * self.scaling_mu, self.lambd * self.scaling_mu)
- def extra_repr(self):
- return '{} {}'.format(self.lambd, self.scaling_mu)
- class DictBlock(nn.Module):
- # c = argmin_c lmbd * ||c||_1 + mu/2 * ||c||_2^2 + 1 / 2 * ||x - weight (@conv) c||_2^2
- def __init__(self, n_channel, dict_size, mu=0.0, lmbd=0.0, n_dict=1, non_negative=True,
- stride=1, kernel_size=3, padding=1, share_weight=True, square_noise=True,
- n_steps=10, step_size_fixed=True, step_size=0.1, w_norm=True,
- padding_mode="constant"):
- super(DictBlock, self).__init__()
- self.mu = mu
- self.lmbd = lmbd # LAMBDA
- self.n_dict = n_dict
- self.stride = stride
- self.kernel_size = (kernel_size, kernel_size)
- self.padding = padding
- self.padding_mode = padding_mode
- assert self.padding_mode in ['constant', 'reflect', 'replicate', 'circular']
- self.groups = 1
- self.n_steps = n_steps
- self.conv_transpose_output_padding = 0 if stride == 1 else 1
- self.w_norm = w_norm
- self.non_negative = non_negative
- self.v_max = None
- self.v_max_error = 0.
- self.xsize = None
- self.zsize = None
- self.lmbd_ = None
- self.square_noise = square_noise
- self.weight = nn.Parameter(torch.Tensor(dict_size, self.n_dict * n_channel, kernel_size, kernel_size))
- with torch.no_grad():
- init.kaiming_uniform_(self.weight)
- self.nonlinear = elasnet_prox(self.lmbd * step_size, self.mu * step_size)
- self.register_buffer('step_size', torch.tensor(step_size, dtype=torch.float))
- def fista_forward(self, x):
- for i in range(self.n_steps):
- weight = self.weight
- step_size = self.step_size
- if i == 0:
- c_pre = 0.
- c = step_size * F.conv2d(x.repeat(1, self.n_dict, 1, 1), weight, bias=None, stride=self.stride,
- padding=self.padding)
- c = self.nonlinear(c)
- elif i == 1:
- c_pre = c
- xp = F.conv_transpose2d(c, weight, bias=None, stride=self.stride, padding=self.padding,
- output_padding=self.conv_transpose_output_padding)
- r = x.repeat(1, self.n_dict, 1, 1) - xp
- if self.square_noise:
- gra = F.conv2d(r, weight, bias=None, stride=self.stride, padding=self.padding)
- else:
- w = r.view(r.size(0), -1)
- normw = w.norm(p=2, dim=1, keepdim=True).clamp_min(1e-12).expand_as(w).detach()
- w = (w / normw).view(r.size())
- gra = F.conv2d(w, weight, bias=None, stride=self.stride, padding=self.padding) * 0.5
- c = c + step_size * gra
- c = self.nonlinear(c)
- t = (math.sqrt(5.0) + 1.0) / 2.0
- else:
- t_pre = t
- t = (math.sqrt(1.0 + 4.0 * t_pre * t_pre) + 1) / 2.0
- a = (t_pre + t - 1.0) / t * c + (1.0 - t_pre) / t * c_pre
- c_pre = c
- xp = F.conv_transpose2d(c, weight, bias=None, stride=self.stride, padding=self.padding,
- output_padding=self.conv_transpose_output_padding)
- r = x.repeat(1, self.n_dict, 1, 1) - xp
- if self.square_noise:
- gra = F.conv2d(r, weight, bias=None, stride=self.stride, padding=self.padding)
- else:
- w = r.view(r.size(0), -1)
- normw = w.norm(p=2, dim=1, keepdim=True).clamp_min(1e-12).expand_as(w).detach()
- w = (w / normw).view(r.size())
- gra = F.conv2d(w, weight, bias=None, stride=self.stride, padding=self.padding) * 0.5
- c = a + step_size * gra
- c = self.nonlinear(c)
- if self.non_negative:
- c = F.leaky_relu(c, negative_slope=0.1)
- return c, weight
- def forward(self, x):
- if self.xsize is None:
- self.xsize = (x.size(-3), x.size(-2), x.size(-1))
- print(self.xsize)
- else:
- assert self.xsize[-3] == x.size(-3) and self.xsize[-2] == x.size(-2) and self.xsize[-1] == x.size(-1)
- if self.w_norm:
- self.normalize_weight()
- c, weight = self.fista_forward(x)
- # Compute loss
- xp = F.conv_transpose2d(c, weight, bias=None, stride=self.stride, padding=self.padding,
- output_padding=self.conv_transpose_output_padding)
- r = x.repeat(1, self.n_dict, 1, 1) - xp
- r_loss = torch.sum(torch.pow(r, 2)) / self.n_dict
- c_loss = self.lmbd * torch.sum(torch.abs(c)) + self.mu / 2. * torch.sum(torch.pow(c, 2))
- if self.zsize is None:
- self.zsize = (c.size(-3), c.size(-2), c.size(-1))
- print(self.zsize)
- else:
- assert self.zsize[-3] == c.size(-3) and self.zsize[-2] == c.size(-2) and self.zsize[-1] == c.size(-1)
- if self.lmbd_ is None and config.MODEL.ADAPTIVELAMBDA:
- self.lmbd_ = self.lmbd * self.xsize[-3] * self.xsize[-2] * self.xsize[-1] / (
- self.zsize[-3] * self.zsize[-2] * self.zsize[-1])
- self.lmbd = self.lmbd_
- print("======")
- print("xsize", self.xsize)
- print("zsize", self.zsize)
- print("new lmbd: ", self.lmbd)
- return c, (r_loss, c_loss)
- def update_stepsize(self):
- step_size = 0.9 / self.power_iteration(self.weight)
- self.step_size = self.step_size * 0. + step_size
- self.nonlinear.lambd = self.lmbd * step_size
- self.nonlinear.scaling_mu = 1.0 / (1.0 + self.mu * step_size)
- def normalize_weight(self):
- with torch.no_grad():
- w = self.weight.view(self.weight.size(0), -1)
- normw = w.norm(p=2, dim=1, keepdim=True).clamp_min(1e-12).expand_as(w)
- w = (w / normw).view(self.weight.size())
- self.weight.data = w.data
- def power_iteration(self, weight):
- max_iteration = 50
- v_max_error = 1.0e5
- tol = 1.0e-5
- k = 0
- with torch.no_grad():
- if self.v_max is None:
- c = weight.shape[0]
- v = torch.randn(size=(1, c, self.zsize[-2], self.zsize[-1])).to(weight.device)
- else:
- v = self.v_max.clone()
- while k < max_iteration and v_max_error > tol:
- tmp = F.conv_transpose2d(
- v, weight, bias=None, stride=self.stride, padding=self.padding,
- output_padding=self.conv_transpose_output_padding
- )
- v_ = F.conv2d(tmp, weight, bias=None, stride=self.stride, padding=self.padding)
- v_ = F.normalize(v_.view(-1), dim=0, p=2).view(v.size())
- v_max_error = torch.sum((v_ - v) ** 2)
- k += 1
- v = v_
- v_max = v.clone()
- Dv_max = F.conv_transpose2d(
- v_max, weight, bias=None, stride=self.stride, padding=self.padding,
- output_padding=self.conv_transpose_output_padding
- ) # Dv
- lambda_max = torch.sum(Dv_max ** 2).item() # vTDTDv / vTv, ignore the vTv since vTv = 1
- self.v_max = v_max
- return lambda_max
- ################################# SDNet ################################################################
- from Lib.config import config as _cfg
- cfg = _cfg
- class DictConv2d(nn.Module):
- def __init__(self, in_channels, out_channels, kernel_size, stride=1, padding=0, bias=True):
- super(DictConv2d, self).__init__()
- self.dn = DictBlock(
- in_channels, out_channels, stride=stride, kernel_size=kernel_size, padding=padding,
- mu=cfg['MODEL']['MU'], lmbd=cfg['MODEL']['LAMBDA'][0], square_noise=cfg['MODEL']['SQUARE_NOISE'],
- n_dict=cfg['MODEL']['EXPANSION_FACTOR'], non_negative=cfg['MODEL']['NONEGATIVE'],
- n_steps=cfg['MODEL']['NUM_LAYERS'], w_norm=cfg['MODEL']['WNORM']
- )
- self.rc = None
- self.r_loss = []
- def get_rc(self):
- if self.rc is None:
- raise ValueError("should call forward first.")
- else:
- return self.rc
- def forward(self, x):
- out, rc = self.dn(x)
- self.rc = rc
- if self.training is False:
- self.r_loss.extend([self.rc[0].item() / len(x)] * len(x))
- return out
- #########模型构建###############
- class SDNet_model(nn.Module):
- def __init__(self, dropout1, dropout2, num_classes=2):
- super(SDNet_model, self).__init__()
- # self.layer0 = nn.Sequential(
- # DictConv2d(1, 64, kernel_size=3, stride=1, padding=1, bias=False),
- # nn.BatchNorm2d(64),
- # nn.ReLU(inplace=True),
- # )
- self.conv0 = nn.Conv2d(1, 64, kernel_size=(3, 3), padding=(1, 1))
- self.bn0 = nn.BatchNorm2d(64)
- self.pool0 = nn.MaxPool2d(kernel_size=(2, 2))
- self.conv1 = nn.Conv2d(64, 128, kernel_size=(3, 3), padding=(1, 1))
- self.bn1 = nn.BatchNorm2d(128)
- self.pool1 = nn.MaxPool2d(kernel_size=(2, 2))
- self.dropout1 = nn.Dropout2d(p=dropout1)
- self.layer0 = nn.Sequential(
- DictConv2d(128, 256, kernel_size=3, stride=1, padding=1, bias=False),
- nn.BatchNorm2d(256),
- nn.LeakyReLU(inplace=True),
- )
- self.conv2 = nn.Conv2d(256, 256, kernel_size=(3, 3), padding=(1, 1))
- self.bn2 = nn.BatchNorm2d(256)
- self.ca = ChannelAttention(256)
- self.sa = SpatialAttention()
- self.conv3 = nn.Conv2d(256, 256, kernel_size=(3, 3), padding=(1, 1))
- self.pool2 = nn.MaxPool2d(kernel_size=(2, 2))
- self.dropout2 = nn.Dropout2d(p=dropout2)
- self.flatten = nn.Flatten()
- self.fc1 = nn.Linear(256 * 12 * 75, 512)
- self.fc2 = nn.Linear(512, 256)
- self.fc3 = nn.Linear(256, num_classes)
- self.leaky_relu = nn.LeakyReLU(negative_slope=0.1, inplace=True)
- self.sigmoid = nn.Sigmoid()
- def update_stepsize(self):
- for m in self.modules():
- if isinstance(m, DictBlock):
- m.update_stepsize()
- def get_rc(self):
- rc_list = []
- for m in self.modules():
- if isinstance(m, DictConv2d):
- rc_list.append(m.get_rc())
- return rc_list
- def forward(self, x):
- # x = self.layer0(x)
- x = self.conv0(x)
- x = self.bn0(x)
- x = self.pool0(x)
- x = self.conv1(x)
- x = self.bn1(x)
- x = self.pool1(x)
- x = self.dropout1(x)
- x = self.layer0(x)
- x = self.conv2(x)
- x = self.bn2(x)
- x = self.ca(x) * x
- x = self.sa(x) * x
- x = self.conv3(x)
- x = self.pool2(x)
- # print(x.shape)
- x = self.dropout2(x)
- x = self.flatten(x)
- # print(x.shape)
- x = self.leaky_relu(self.fc1(x))
- x = self.fc2(x)
- x = self.leaky_relu(x)
- x = self.fc3(x)
- x = self.sigmoid(x)
- return x
- def SDCNN_model(num_classes, dropout1, dropout2):
- model = SDNet_model(num_classes=num_classes, dropout1=dropout1, dropout2=dropout2)
- return model
- randomSeed = 1
- random.seed(randomSeed)
- torch.manual_seed(randomSeed)
- np.random.seed(randomSeed)
- def main():
- # 数据导入
- dataFile = r'C:\Users\sun\Desktop\SDNET\SDNet-main\data\python_energy_T.mat'
- data = scio.loadmat(dataFile)
- train_input = data['train_input']
- train_output = data['train_output']
- test_input = data['test_input']
- test_output = data['test_output']
- validate_input = data['validate_input']
- validate_output = data['validate_output']
- train_input = train_input.reshape(-1, 1, 100, 300).astype('float32')
- test_input = test_input.reshape(-1, 1, 100, 300).astype('float32')
- validate_input = validate_input.reshape(-1, 1, 100, 300).astype('float32')
- train_input = torch.from_numpy(train_input)
- train_output = torch.from_numpy(train_output)
- validate_input = torch.from_numpy(validate_input)
- validate_output = torch.from_numpy(validate_output)
- test_input = torch.from_numpy(test_input)
- test_output = torch.from_numpy(test_output)
- # 定义超参数搜索空间
- epochs = range(50, 201)
- batch_sizes = [64, 128, 256]
- dropouts1 = [0.1, 0.3, 0.5]
- dropouts2 = [0.1, 0.3, 0.5]
- # 初始化最优超参数和最高准确度
- best_hyperparams = {'epoch': None, 'batch_size': None, 'dropout1': None, 'dropout2': None}
- best_accuracy = 0.0
- # 定义随机搜索算法的迭代次数
- num_iterations = 10
- # 随机搜索算法
- for i in range(num_iterations):
- # 随机选择超参数组合
- epoch = random.choice(epochs)
- batch_size = random.choice(batch_sizes)
- dropout1 = random.choice(dropouts1)
- dropout2 = random.choice(dropouts2)
- print(f"Iteration {i+1}/{num_iterations}: epoch={epoch}, batch_size={batch_size}, dropout1={dropout1}, dropout2={dropout2}")
- # 实例化模型、损失函数和优化器
- model = SDCNN_model(num_classes=2, dropout1=dropout1, dropout2=dropout2)
- criterion = nn.BCELoss()
- optimizer = optim.Adam(model.parameters(), lr=0.001)
- # 将数据转换为PyTorch DataLoader
- train_dataset = TensorDataset(train_input, torch.tensor(train_output).float())
- valid_dataset = TensorDataset(validate_input, torch.tensor(validate_output).float())
- train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
- valid_loader = DataLoader(valid_dataset, batch_size=batch_size, shuffle=False)
- # 实例化学习率调度器 #diff 添加学习率调度器
- scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=50, gamma=0.1)
- # 训练模型
- for e in range(epoch):
- model.train()
- for inputs, targets in train_loader:
- inputs, targets = inputs, targets
- optimizer.zero_grad()
- outputs = model(inputs)
- loss = criterion(outputs, targets)
- loss.backward()
- optimizer.step()
- scheduler.step()
- # 评估模型
- model.eval()
- correct = 0
- total = 0
- with torch.no_grad():
- for inputs, targets in valid_loader:
- inputs, targets = inputs, targets
- outputs = model(inputs)
- predicted = torch.argmax(outputs, dim=1)
- total += targets.size(0)
- targets_index = torch.argmax(targets, dim=1)
- correct += (predicted == targets_index).sum().item()
- accuracy = 100 * correct / total
- print(f"Iteration {i+1}: Accuracy={accuracy:.2f}%")
- # 更新最优超参数和最高准确度
- if accuracy > best_accuracy:
- best_hyperparams['epoch'] = epoch
- best_hyperparams['batch_size'] = batch_size
- best_hyperparams['dropout1'] = dropout1
- best_hyperparams['dropout2'] = dropout2
- best_accuracy = accuracy
- print(f"New best accuracy: {best_accuracy:.2f}% with hyperparameters {best_hyperparams}")
- # 使用找到的最佳超参数进行最终训练
- best_epoch = best_hyperparams['epoch']
- best_batch_size = best_hyperparams['batch_size']
- best_dropout1 = best_hyperparams['dropout1']
- best_dropout2 = best_hyperparams['dropout2']
- def weights_init(m):
- if isinstance(m, (nn.Conv2d, nn.Linear)):
- nn.init.kaiming_normal_(m.weight, mode='fan_out', nonlinearity='leaky_relu')
- if m.bias is not None:
- nn.init.constant_(m.bias, 0)
- # 重新实例化模型以确保权重是新的
- model = SDCNN_model(num_classes=2, dropout1=best_dropout1, dropout2=best_dropout2)
- model.apply(weights_init)
- optimizer = optim.Adam(model.parameters(), lr=0.001)
- # 使用最佳批量大小创建数据加载器
- train_loader = DataLoader(train_dataset, batch_size=best_batch_size, shuffle=True)
- valid_loader = DataLoader(valid_dataset, batch_size=best_batch_size, shuffle=False)
- # 实例化学习率调度器 #diff 添加学习率调度器
- scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=50, gamma=0.1)
- # 特征可视化准备
- feature_maps = {}
- def get_activation(name):
- def hook(model, input, output):
- feature_maps[name] = output.detach()
- return hook
- # 注册钩子 #diff 注册前向钩子以提取特征图
- for name, layer in model.named_modules():
- if isinstance(layer, nn.Conv2d) or isinstance(layer, DictConv2d):
- layer.register_forward_hook(get_activation(name))
- # 训练模型
- for e in range(best_epoch):
- model.train()
- running_loss = 0.0
- for inputs, targets in train_loader:
- inputs, targets = inputs, targets
- optimizer.zero_grad()
- outputs = model(inputs)
- loss = criterion(outputs.squeeze(), targets.squeeze())
- loss.backward()
- optimizer.step()
- running_loss += loss.item() # 累加损失以计算平均损失
- scheduler.step()
- print(f'Epoch {e + 1}/{best_epoch}, Loss: {running_loss / len(train_loader):.4f}')
- # 评估模型
- model.eval() # 设置模型为评估模式
- validation_loss = 0.0
- with torch.no_grad():
- for inputs, targets in valid_loader:
- inputs, targets = inputs, targets
- outputs = model(inputs)
- validation_loss += criterion(outputs.squeeze(), targets.squeeze()).item()
- print(f'Validation Loss: {validation_loss / len(valid_loader):.4f}')
- model.eval()
- with torch.no_grad():
- sample_inputs = validate_input[:1]
- model(sample_inputs)
- def visualize_features(feature_maps, layer_names, num_images=5):
- for layer_name in layer_names:
- act = feature_maps.get(layer_name)
- if act is None:
- continue
- act = act.cpu().numpy()
- num_channels = act.shape[1]
- plt.figure(figsize=(20, 10))
- for i in range(min(num_channels, 64)):
- plt.subplot(8, 8, i + 1)
- plt.imshow(act[0, i, :, :], cmap='viridis')
- plt.axis('off')
- plt.suptitle(f'Feature Maps of {layer_name}')
- plt.savefig(f'feature_maps_{layer_name}.png')
- plt.close()
- layers_to_visualize = ['conv0', 'conv1', 'DictConv2d', 'conv2', 'conv3']
- visualize_features(feature_maps, layers_to_visualize)
- model.eval()
- with torch.no_grad():
- predictions = model(test_input.float())
- probabilities = predictions
- predicted_labels = torch.argmax(probabilities, dim=1)
- predict = predicted_labels.cpu().numpy()
- print(predict)
- with open(r'C:\Users\sun\Desktop\SDNET\SDNet-main\predict_label.csv', 'w', newline='') as pr_file:
- writer = csv.writer(pr_file)
- for label in predict:
- writer.writerow([label])
- with open(r'C:\Users\sun\Desktop\SDNET\SDNet-main\pr.csv', 'w+') as pr_file:
- out = [f"{i[0]},{i[1]}" for i in probabilities]
- pr_file.write("\n".join(out))
- # 调用函数保存预测结果
- # save_predictions_to_csv(probabilities.cpu().numpy(), 'pr.csv')
- def save_model_complete(model, filename=r'C:\Users\sun\Desktop\SDNET\SDNet-main\sdnet_model.pth'):
- torch.save(model.state_dict(), filename)
- print(f"Complete model saved as {filename}")
- save_model_complete(model)
- if __name__ == '__main__':
- main()
复制代码 免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!qidao123.com:ToB企服之家,中国第一个企服评测及软件市场,开放入驻,技术点评得现金 |