代码
- import torch
- import numpy as np
- class MaxState(torch.nn.Module):
- def __init__(self, hidden_dim, heads, win):
- super(MaxState, self).__init__()
- assert hidden_dim % heads == 0, "Hidden size must be divisible by the number of heads."
- self.head_size = hidden_dim // heads
- self.head = torch.nn.Linear(hidden_dim, hidden_dim, bias=False)
- self.state = torch.nn.Linear(hidden_dim, hidden_dim, bias=False)
- self.head_num = heads
- self.win = win
- self.hidden = hidden_dim
- self.mask = torch.triu(torch.ones([win, win])).to("cuda")
- self.layer_nor = torch.nn.LayerNorm(hidden_dim)
- def forward(self, input_data, state=None):
- # self.head.to("cuda")
- b, s, k, h, w = input_data.shape[0], input_data.shape[1], self.head_num, self.head_size, self.win
- window = torch.ones([1, w]).to("cuda")
- out = self.head(input_data)
- out = out.unsqueeze(-1) @ window
- out = out.permute([0, 2, 1, 3])
- one_list = []
- if state is None:
- state = torch.ones([out.shape[0], out.shape[1], 1, 1]) * float("-inf")
- state = state.to("cuda")
- for i in range(0, s, w):
- state.reshape([state.shape[0], -1])
- j = w + i
- one = out[:, :, i:j]
- _, _, r, c = one.shape
- if r != self.win:
- one = torch.where(self.mask[:r, :] == 1, one, torch.Tensor([-float('inf')]).to("cuda"))
- else:
- one = torch.where(self.mask == 1, one, torch.Tensor([-float('inf')]).to("cuda"))
- if i == 0:
- one = torch.concat([one, state @ window], axis=2)
- state, _ = torch.max(one, axis=2, keepdim=True)
- else:
- state1, _ = torch.max(one, axis=2, keepdim=True)
- # state = torch.sin(self.state(state1.reshape([state1.shape[0], -1]))*state.reshape([state.shape[0], -1]))
- state1 = self.state(state1.permute([0, 3, 1, 2]).reshape([state1.shape[0], -1, state1.shape[1]]))
- state = state1.permute([0, 2, 1]).unsqueeze(-2) + state
- # state = state.reshape(state1.shape)
- one = torch.concat([one, state], axis=2)
- state, _ = torch.max(one, axis=2, keepdim=True)
- one = state.reshape([b, k, h, w])
- state = state[..., -1:]
- if r != self.win:
- one = one[..., :r]
- one = one.permute([0, 3, 1, 2])
- one_list.append(one)
- out = torch.concat(one_list, 1)
- out = out.reshape([b, s, -1])
- return out, state
- class FeedForward(torch.nn.Module):
- def __init__(self, hidden_size):
- super(FeedForward, self).__init__()
- self.ffn1 = torch.nn.Linear(hidden_size, hidden_size * 2)
- self.ffn2 = torch.nn.Linear(hidden_size * 2, hidden_size)
- self.gate = torch.nn.Linear(hidden_size, hidden_size * 2)
- self.relu = torch.nn.ReLU()
- def forward(self, x):
- x1 = self.ffn1(x)
- x2 = self.relu(self.gate(x))
- x = x1 * x2
- x = self.ffn2(x)
- return x
- class DecoderLayer(torch.nn.Module):
- def __init__(self, hidden_size, num_heads):
- super(DecoderLayer, self).__init__()
- # self.self_attention = MaskMultiHeadAttention(hidden_size, num_heads)
- self.self_attention = MaxState(hidden_size, num_heads, 8)
- self.ffn = FeedForward(hidden_size)
- self.layer_norm = torch.nn.LayerNorm(hidden_size)
- def forward(self, x, state=None, seq_len=None):
- x1, state = self.self_attention(x, state)
- x = self.layer_norm(self.ffn(x1) + x) # Feed-Forward with residual connection
- return x, state
- class SamOut(torch.nn.Module):
- def __init__(self, voc_size, hidden_size, num_heads, num_layers):
- super(SamOut, self).__init__()
- self.em = torch.nn.Embedding(voc_size, hidden_size, padding_idx=3)
- self.pos = torch.nn.Embedding(1024, hidden_size)
- self.decoder_layers = torch.nn.ModuleList([DecoderLayer(hidden_size, num_heads) for _ in range(num_layers)])
- self.head = torch.nn.Linear(hidden_size, voc_size)
- self.head_state = torch.nn.Linear(hidden_size, num_layers)
- self.layer_nor=torch.nn.LayerNorm(hidden_size)
- self.down=torch.nn.ModuleList([torch.nn.Linear(2*hidden_size,hidden_size) for _ in range(num_layers)])
- def forward(self, x, state=None, seq_len=None):
- x = self.em(x)
- if x.shape[1] >= 1024:
- pos = self.pos(torch.range(0, x.shape[1]-1).long() // 1024).unsqueeze(0)
- pos = self.pos(torch.range(0, x.shape[1]-1).long() % 1024).unsqueeze(0) + pos
- else:
- pos = self.pos(torch.range(0, x.shape[1]-1).long().to("cuda")).unsqueeze(0)
- if state is None:
- state = [None] * len(self.decoder_layers)
- i = 0
- for decoder_layer in self.decoder_layers:
- x1, state[i] = decoder_layer(self.down[i](torch.concat([torch.zeros([x.shape[0],1,1]).to("cuda")+pos , x],-1)), state[i])
- x = x1 + x
- i += 1
- state_data = self.head_state((torch.concat(state, -1).squeeze(-2)).permute([0, 2, 1]))
- return self.head(x), state, state_data
- if __name__ == '__main__':
- net = SamOut(235, 256, 16, 4)
- net(torch.randint(0, 200, [2, 3000]))
复制代码 解释
这段代码定义了一个基于PyTorch的神经网络模型,该模型包罗自定义的解码器层和输出层,用于处置惩罚序列数据。下面是代码的逐行解析:
- import torch
- import numpy as np
复制代码
- class MaxState(torch.nn.Module):
- def __init__(self, hidden_dim, heads, win):
- super(MaxState, self).__init__()
复制代码
- 定义一个名为MaxState的类,继续自torch.nn.Module。这是自定义的一个模块,用于处置惩罚状态的最大化。
- assert hidden_dim % heads == 0, "Hidden size must be divisible by the number of heads."
复制代码
- 断言查抄,确保隐蔽层的维度可以被注意力头的数目整除。
- self.head_size = hidden_dim // heads
- self.head = torch.nn.Linear(hidden_dim, hidden_dim, bias=False)
- self.state = torch.nn.Linear(hidden_dim, hidden_dim, bias=False)
- self.head_num = heads
- self.win = win
- self.hidden = hidden_dim
- self.mask = torch.triu(torch.ones([win, win])).to("cuda")
- self.layer_nor = torch.nn.LayerNorm(hidden_dim)
复制代码
- 初始化类成员变量,包罗线性层、注意力头数目、窗口巨细、掩码和层归一化。
- def forward(self, input_data, state=None):
- # self.head.to("cuda")
- b, s, k, h, w = input_data.shape[0], input_data.shape[1], self.head_num, self.head_size, self.win
复制代码
- window = torch.ones([1, w]).to("cuda")
复制代码
- out = self.head(input_data)
复制代码
- out = out.unsqueeze(-1) @ window
复制代码
- out = out.permute([0, 2, 1, 3])
复制代码
- one_list = []
- if state is None:
- state = torch.ones([out.shape[0], out.shape[1], 1, 1]) * float("-inf")
- state = state.to("cuda")
复制代码
- 初始化状态张量,如果状态为None,则创建一个初始状态。
- for i in range(0, s, w):
- # ... (省略中间代码)
复制代码
- 返回处置惩罚后的输出和状态。
接下来是FeedForward、DecoderLayer和SamOut类的定义,这些类分别实现了前馈网络、解码器层和整个模型的输出部分。代码结构与MaxState类雷同,包罗了初始化和前向传播方法。
- if __name__ == '__main__':
- net = SamOut(235, 256, 16, 4)
- net(torch.randint(0, 200, [2, 3000]))
复制代码
- 如果当前脚本作为主程序运行,创建一个SamOut模型实例,并使用随机整数张量作为输入举行测试。
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |