ToB企服应用市场:ToB评测及商务社交产业平台

标题: 【课程总结】Day18:Seq2Seq的深入了解 [打印本页]

作者: 梦见你的名字    时间: 2024-8-4 15:12
标题: 【课程总结】Day18:Seq2Seq的深入了解
前言

在上一章【课程总结】Day17(下):初始Seq2Seq模型中,我们开端了解了Seq2Seq模型的根本情况及代码运行效果,本章内容将深入了解Seq2Seq模型的代码,梳理代码的框架图、各部分组成部分以及运行流程。
框架图

工程目次结构

查看项目目次结构如下:
  1. seq2seq_demo/
  2. ├── data.txt                     # 原始数据文件,包含训练或测试数据
  3. ├── dataloader.py                # 数据加载器,负责读取和预处理数据
  4. ├── decoder.py                   # 解码器实现,用于生成输出序列
  5. ├── encoder.py                   # 编码器实现,将输入序列编码为上下文向量
  6. ├── main.py                      # 主程序入口,执行模型训练和推理
  7. ├── seq2seq.py                   # seq2seq 模型的实现,整合编码器和解码器
  8. └── tokenizer.py                 # 分词器实现,将文本转换为模型可处理的格式
复制代码
查看各个py文件整理关系图结构如下:


核心逻辑

初始化过程



Build_dict()

  1. def build_dict(self):
  2.         """
  3.         构建字典
  4.         """
  5.         if os.path.exists(self.saved_dict):
  6.             self.load()
  7.             print("加载本地字典成功")
  8.             return
  9.         input_words = {"<UNK>", "<PAD>"}
  10.         output_words = {"<UNK>", "<PAD>", "<SOS>", "<EOS>"}
  11.         with open(file=self.data_file, mode="r", encoding="utf8") as f:
  12.             for line in tqdm(f.readlines()):
  13.                 if line:
  14.                     input_sentence, output_sentence = line.strip().split("\t")
  15.                     input_sentence_words = self.split_input(input_sentence)
  16.                     output_sentence_words = self.split_output(output_sentence)
  17.                     input_words = input_words.union(set(input_sentence_words))
  18.                     output_words = output_words.union(set(output_sentence_words))
  19.         # 输入字典
  20.         self.input_word2idx = {word: idx for idx, word in enumerate(input_words)}
  21.         self.input_idx2word = {idx: word for word, idx in self.input_word2idx.items()}
  22.         self.input_dict_len = len(self.input_word2idx)
  23.         # 输出字典
  24.         self.output_word2idx = {word: idx for idx, word in enumerate(output_words)}
  25.         self.output_idx2word = {idx: word for word, idx in self.output_word2idx.items()}
  26.         self.output_dict_len = len(self.output_word2idx)
  27.         # 保存
  28.         self.save()
  29.         print("保存字典成功")
复制代码
代码解析:

encoder

  1. import torch
  2. from torch import nn
  3. class Encoder(nn.Module):
  4.     """
  5.         定义一个 编码器
  6.     """
  7.     def __init__(self, tokenizer):
  8.         super(Encoder, self).__init__()
  9.         self.tokenizer = tokenizer
  10.         # 嵌入层
  11.         self.embed = nn.Embedding(num_embeddings=self.tokenizer.input_dict_len,
  12.                                   embedding_dim=self.tokenizer.input_embed_dim,
  13.                                   padding_idx=self.tokenizer.input_word2idx.get("<PAD>"))
  14.         # GRU单元
  15.         self.gru = nn.GRU(input_size=self.tokenizer.input_embed_dim,
  16.                           hidden_size=self.tokenizer.input_hidden_size,
  17.                           batch_first=False)
  18.    
  19.     def forward(self, x, x_len):
  20.         # [seq_len, batch_size] --> [seq_len, batch_size, embed_dim]
  21.         x = self.embed(x)
  22.         # 压紧被填充的序列
  23.         x = nn.utils.rnn.pack_padded_sequence(input=x,
  24.                                               lengths=x_len,
  25.                                               batch_first=False)
  26.         out, hn = self.gru(x)
  27.         # 填充被压紧的序列
  28.         out, out_len = nn.utils.rnn.pad_packed_sequence(sequence=out,
  29.                                                         batch_first=False,
  30.                                                         padding_value=self.tokenizer.input_word2idx.get("<PAD>"))
  31.         # out: [seq_len, batch_size, hidden_size]
  32.         # hn: [1, batch_size, hidden_size]
  33.         return out, hn
复制代码
代码解析:

decoder

  1. import torch
  2. from torch import nn
  3. import random
  4. device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
  5. class Decoder(nn.Module):
  6.     def __init__(self, tokenizer):
  7.         super(Decoder, self).__init__()
  8.         self.tokenizer = tokenizer
  9.         # 嵌入
  10.         self.embed = nn.Embedding(
  11.             num_embeddings=self.tokenizer.output_dict_len,
  12.             embedding_dim=self.tokenizer.output_embed_dim,
  13.             padding_idx=self.tokenizer.output_word2idx.get("<PAD>"),
  14.         )
  15.         # 抽取特征
  16.         self.gru = nn.GRU(
  17.             input_size=self.tokenizer.output_embed_dim,
  18.             hidden_size=self.tokenizer.output_hidden_size,
  19.             batch_first=False,
  20.         )
  21.         # 转换维度,做概率输出
  22.         self.fc = nn.Linear(
  23.             in_features=self.tokenizer.output_hidden_size,
  24.             out_features=self.tokenizer.output_dict_len,
  25.         )
  26.    
  27.     def forward_step(self, decoder_input, decoder_hidden):
  28.         """
  29.         单步解码:
  30.             decoder_input: [1, batch_size]
  31.             decoder_hidden: [1, batch_size, hidden_size]
  32.         """
  33.         # [1, batch_size] --> [1, batch_size, embedding_dim]
  34.         decoder_input = self.embed(decoder_input)
  35.         # 输入:[1, batch_size, embedding_dim] [1, batch_size, hidden_size]
  36.         # 输出:[1, batch_size, hidden_size] [1, batch_size, hidden_size]
  37.         # 因为只有1步,所以 out 跟 decoder_hidden是一样的
  38.         out, decoder_hidden = self.gru(decoder_input, decoder_hidden)
  39.         # [batch_size, hidden_size]
  40.         out = out.squeeze(dim=0)
  41.         # [batch_size, dict_len]
  42.         out = self.fc(out)
  43.         # out: [batch_size, dict_len]
  44.         # decoder_hidden: [1, batch_size, hidden_size]
  45.         return out, decoder_hidden
  46.     def forward(self, encoder_hidden, y, y_len):
  47.         """
  48.         训练时的正向传播
  49.             - encoder_hidden: [1, batch_size, hidden_size]
  50.             - y: [seq_len, batch_size]
  51.             - y_len: [batch_size]
  52.         """
  53.         # 计算输出的最大长度(本批数据的最大长度)
  54.         output_max_len = max(y_len.tolist()) + 1
  55.         # 本批数据的批量大小
  56.         batch_size = encoder_hidden.size(1)
  57.         # 输入信号 SOS  读取第0步,启动信号
  58.         # decoder_input: [1, batch_size]
  59.         # 输入信号 SOS [1, batch_size]
  60.         decoder_input = torch.LongTensor(
  61.             [[self.tokenizer.output_word2idx.get("<SOS>")] * batch_size]
  62.         ).to(device=device)
  63.         # 收集所有的预测结果
  64.         # decoder_outputs: [seq_len, batch_size, dict_len]
  65.         decoder_outputs = torch.zeros(
  66.             output_max_len, batch_size, self.tokenizer.output_dict_len
  67.         )
  68.         # 隐藏状态 [1, batch_size, hidden_size]
  69.         decoder_hidden = encoder_hidden
  70.         # 手动循环
  71.         for t in range(output_max_len):
  72.             # 输入:decoder_input: [batch_size, dict_len], decoder_hidden: [1, batch_size, hidden_size]
  73.             # 返回值:decoder_output_t: [batch_size, dict_len], decoder_hidden: [1, batch_size, hidden_size]
  74.             decoder_output_t, decoder_hidden = self.forward_step(
  75.                 decoder_input, decoder_hidden
  76.             )
  77.             # 填充结果张量 [seq_len, batch_size, dict_len]
  78.             decoder_outputs[t, :, :] = decoder_output_t
  79.             # teacher forcing 教师强迫机制
  80.             use_teacher_forcing = random.random() > 0.5
  81.             # 0.5 概率 实行教师强迫
  82.             if use_teacher_forcing:
  83.                 # [1, batch_size] 取标签中的下一个词
  84.                 decoder_input = y[t, :].unsqueeze(0)
  85.             else:
  86.                 # 取出上一步的推理结果 [1, batch_size]
  87.                 decoder_input = decoder_output_t.argmax(dim=-1).unsqueeze(0)
  88.         # decoder_outputs: [seq_len, batch_size, dict_len]
  89.         return decoder_outputs
  90.     # ...(其他函数暂略)
复制代码
代码解析:

训练过程



调用collate_fn

  1. def collate_fn(batch, tokenizer):
  2.     # 根据 x 的长度来 倒序排列
  3.     batch = sorted(batch, key=lambda ele: ele[1], reverse=True)
  4.     # 合并整个批量的每一部分
  5.     input_sentences, input_sentence_lens, output_sentences, output_sentence_lens = zip(
  6.         *batch
  7.     )
  8.     # 转索引【按本批量最大长度来填充】
  9.     input_sentence_len = input_sentence_lens[0]
  10.     input_idxes = []
  11.     for input_sentence in input_sentences:
  12.         input_idxes.append(tokenizer.encode_input(input_sentence, input_sentence_len))
  13.     # 转索引【按本批量最大长度来填充】
  14.     output_sentence_len = max(output_sentence_lens)
  15.     output_idxes = []
  16.     for output_sentence in output_sentences:
  17.         output_idxes.append(
  18.             tokenizer.encode_output(output_sentence, output_sentence_len)
  19.         )
  20.     # 转张量 [seq_len, batch_size]
  21.     input_idxes = torch.LongTensor(input_idxes).t()
  22.     output_idxes = torch.LongTensor(output_idxes).t()
  23.     input_sentence_lens = torch.LongTensor(input_sentence_lens)
  24.     output_sentence_lens = torch.LongTensor(output_sentence_lens)
  25.     return input_idxes, input_sentence_lens, output_idxes, output_sentence_lens
复制代码
代码解析:

   比方:
I’m a student.
I’m OK.
Here is your change.
  
具体训练过程

  1.     # (其他部分代码略)
  2.         # 训练过程
  3.         is_complete = False
  4.         for epoch in range(self.epochs):
  5.             self.model.train()
  6.             for batch_idx, (x, x_len, y, y_len) in enumerate(train_dataloader):
  7.                 x = x.to(device=self.device)
  8.                 y = y.to(device=self.device)
  9.                 results = self.model(x, x_len, y, y_len)
  10.                 loss = self.get_loss(decoder_outputs=results, y=y)
  11.                 # 简单判定一下,如果损失小于0.5,则训练提前完成
  12.                 if loss.item() < 0.3:
  13.                     is_complete = True
  14.                     print(f"训练提前完成, 本批次损失为:{loss.item()}")
  15.                     break
  16.                 loss.backward()
  17.                 self.optimizer.step()
  18.                 self.optimizer.zero_grad()
  19.                 # 过程监控
  20.                 with torch.no_grad():
  21.                     if batch_idx % 100 == 0:
  22.                         print(
  23.                             f"第 {epoch + 1} 轮 {batch_idx + 1} 批, 当前批次损失: {loss.item()}"
  24.                         )
  25.                         x_true = self.get_real_input(x)
  26.                         y_pred = self.model.batch_infer(x, x_len)
  27.                         y_true = self.get_real_output(y)
  28.                         samples = random.sample(population=range(x.size(1)), k=2)
  29.                         for idx in samples:
  30.                             print("\t真实输入:", x_true[idx])
  31.                             print("\t真实结果:", y_true[idx])
  32.                             print("\t预测结果:", y_pred[idx])
  33.                             print(
  34.                                 "\t----------------------------------------------------------"
  35.                             )
  36.             # 外层提前退出
  37.             if is_complete:
  38.                 # print("训练提前完成")
  39.                 break
  40.         # 保存模型
  41.         torch.save(obj=self.model.state_dict(), f="./model.pt")
复制代码
手动循环进行正向推理

  1.     #(其他部分略)
  2.     def batch_infer(self, encoder_hidden):
  3.         """
  4.         推理时的正向传播
  5.             - encoder_hidden: [1, batch_size, hidden_size]
  6.         """
  7.         # 推理时,设定一个最大的固定长度
  8.         output_max_len = self.tokenizer.output_max_len
  9.         # 获取批量大小
  10.         batch_size = encoder_hidden.size(1)
  11.         # 输入信号 SOS [1, batch_size]
  12.         decoder_input = torch.LongTensor(
  13.             [[self.tokenizer.output_word2idx.get("<SOS>")] * batch_size]
  14.         ).to(device=device)
  15.         # print(decoder_input)
  16.         results = []
  17.         # 隐藏状态
  18.         # encoder_hidden: [1, batch_size, hidden_size]
  19.         decoder_hidden = encoder_hidden
  20.         with torch.no_grad():
  21.             # 手动循环
  22.             for t in range(output_max_len):
  23.                 # decoder_input: [1, batch_size]
  24.                 # decoder_hidden: [1, batch_size, hidden_size]
  25.                 decoder_output_t, decoder_hidden = self.forward_step(
  26.                     decoder_input, decoder_hidden
  27.                 )
  28.                 # 取出结果 [1, batch_size]
  29.                 decoder_input = decoder_output_t.argmax(dim=-1).unsqueeze(0)
  30.                 results.append(decoder_input)
  31.             # [seq_len, batch_size]
  32.             results = torch.cat(tensors=results, dim=0)
  33.         return results
复制代码
代码解析:

补充知识

tqdm

定义

tqdm 是一个用于在 Python 中表现进度条的库,非常得当在长时间运行的循环中使用。
安装方法

  1. pip install tqdm
复制代码
使用方法

  1. from tqdm import tqdm
  2. import time
  3. # 示例:在一个简单的循环中使用 tqdm
  4. for i in tqdm(range(10)):
  5.     time.sleep(1)  # 模拟某个耗时操作
复制代码
运行结果:

OpenCC

定义

OpenCC(Open Chinese Convert)是一个用于简体中文和繁体中文之间转换的工具
安装方法

  1. pip install OpenCC
复制代码
使用方法

  1. import opencc
  2. # 创建转换器,使用简体到繁体的配置
  3. converter = opencc.OpenCC('s2t')  # s2t: 简体到繁体
  4. # 输入简体中文
  5. simplified_text = "我爱编程"
  6. # 进行转换
  7. traditional_text = converter.convert(simplified_text)
  8. print(traditional_text)  
  9. # 输出结果:我愛編程
复制代码
内容小结


参考资料

(暂无)

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。




欢迎光临 ToB企服应用市场:ToB评测及商务社交产业平台 (https://dis.qidao123.com/) Powered by Discuz! X3.4