LLMs之minimind:minimind源码解读(pretrain.py)——实现基于Transformer架 ...

打印 上一主题 下一主题

主题 1001|帖子 1001|积分 3003

马上注册,结交更多好友,享用更多功能,让你轻松玩转社区。

您需要 登录 才可以下载或查看,没有账号?立即注册

x
LLMs之minimind:minimind源码解读(pretrain.py)——实现基于Transformer架构的大规模语言模型预练习及wandb监控—支持余弦退火学习率调度/分布式预练习/自动肴杂精度优化/梯度累积/梯度裁剪/定期生存模型

目录
minimind源码解读(pretrain.py)——实现基于Transformer架构的大规模语言模型预练习及wandb监控—支持余弦退火学习率调度/分布式预练习/自动肴杂精度优化/梯度累积/梯度裁剪/定期生存模型



minimind源码解读(pretrain.py)——实现基于Transformer架构的大规模语言模型预练习及wandb监控—支持余弦退火学习率调度/分布式预练习/自动肴杂精度优化/梯度累积/梯度裁剪/定期生存模型

  1. # LLMs之minimind:minimind源码解读(pretrain.py)——实现基于Transformer架构的大规模语言模型预训练及wandb监控—支持余弦退火学习率调度/分布式预训练/自动混合精度优化/梯度累积/梯度裁剪/定期保存模型
  2. '''
  3. 源代码地址:https://github.com/jingyaogong/minimind/blob/master/1-pretrain.py
  4. 这段代码主要实现了基于 Transformer 架构的语言模型的预训练流程。它支持单机多卡以及跨机器的分布式训练。通过精心设计的数据加载机制、
  5. 灵活的学习率调整策略、高效的优化算法以及混合精度训练等技术手段,该脚本能够高效稳定地完成大规模语言模型的训练任务。
  6. 同时,它还提供了对训练过程的监控和支持实验管理的功能。
  7. 该Py文件主要实现了一个基于Transformer模型的分布式预训练任务。文件的实现流程包括:
  8. 模型与分布式环境的初始化。
  9. 使用余弦退火学习率调度和**自动混合精度训练(AMP)**提升训练效率。
  10. 通过torch.nn.parallel.DistributedDataParallel实现模型的分布式训练,加速训练过程。
  11. 实现了包括梯度累积、梯度裁剪和定期保存模型等细节,确保模型训练的稳定性和可扩展性。
  12. 支持可选的wandb(Weights and Biases)日志记录,用于实验管理与可视化。
  13. '''
  14. # torch的深度学习库和相关分布式训练模块(torch.distributed、DistributedDataParallel等
  15. # 核心技术点:PyTorch分布式训练与自动混合精度训练(AMP)
  16. import os
  17. import platform
  18. import time
  19. import math
  20. import warnings
  21. import torch
  22. import torch.distributed as dist
  23. from torch import optim
  24. from torch.nn.parallel import DistributedDataParallel
  25. from torch.optim.lr_scheduler import CosineAnnealingLR
  26. from torch.utils.data import DataLoader, DistributedSampler
  27. from contextlib import nullcontext
  28. from model.model import Transformer
  29. from model.LMConfig import LMConfig
  30. from model.dataset import PretrainDataset
  31. # 过滤不必要的警告信息(warnings.filterwarnings('ignore'))
  32. warnings.filterwarnings('ignore')
  33. # 定义Logger日志记录函数,负责记录日志输出。在分布式训练(DDP)中,只有主进程会输出日志信息。
  34. def Logger(content):
  35.     if not ddp or dist.get_rank() == 0:
  36.         print(content)
  37. # 实现了get_lr学习率调度器函数,根据当前的迭代次数调整学习率。主要使用余弦退火学习率调度策略,允许动态调整学习率
  38. def get_lr(it, all):
  39.     warmup_iters = 0
  40.     lr_decay_iters = all
  41.     min_lr = learning_rate / 10
  42.     if it < warmup_iters:
  43.         return learning_rate * it / warmup_iters
  44.     if it > lr_decay_iters:
  45.         return min_lr
  46.     decay_ratio = (it - warmup_iters) / (lr_decay_iters - warmup_iters)
  47.     assert 0 <= decay_ratio <= 1
  48.     coeff = 0.5 * (1.0 + math.cos(math.pi * decay_ratio))
  49.     return min_lr + coeff * (learning_rate - min_lr)
  50. # 训练单个Epoch的实现
  51. # 负责一个完整的训练周期的操作:数据加载和模型训练→计算当前的学习率,更新优化器的参数→使用torch.cuda.amp进行自动混合精度训练,进行前向传播与梯度回传→每经过设定的步数后,记录日志并保存模型的状态
  52. # 核心技术点:自动混合精度(AMP)、梯度累积、梯度裁剪、分布式数据并行、模型保存
  53. def train_epoch(epoch, wandb, accumulation_steps=8):
  54.     start_time = time.time()
  55.     for step, (X, Y) in enumerate(train_loader):
  56.         X = X.to(device)
  57.         Y = Y.to(device)
  58.         lr = get_lr(epoch * iter_per_epoch + step, epochs * iter_per_epoch)
  59.         for param_group in optimizer.param_groups:
  60.             param_group['lr'] = lr
  61.         with ctx:
  62.             out = model(X, Y)
  63.             loss = out.last_loss / accumulation_steps
  64.         scaler.scale(loss).backward()
  65.         if (step + 1) % accumulation_steps == 0:
  66.             scaler.unscale_(optimizer)
  67.             torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0)
  68.             scaler.step(optimizer)
  69.             scaler.update()
  70.             optimizer.zero_grad(set_to_none=True)
  71.         if step % 100 == 0:
  72.             spend_time = time.time() - start_time
  73.             Logger(
  74.                 'Epoch:[{}/{}]({}/{}) loss:{:.3f} lr:{:.7f} epoch_Time:{}min:'.format(
  75.                     epoch,
  76.                     epochs,
  77.                     step,
  78.                     iter_per_epoch,
  79.                     loss.item() * accumulation_steps,
  80.                     optimizer.param_groups[-1]['lr'],
  81.                     spend_time / (step + 1) * iter_per_epoch // 60 - spend_time // 60))
  82.             if (wandb is not None) and (not ddp or dist.get_rank() == 0):
  83.                 wandb.log({"loss": loss.item() * accumulation_steps,
  84.                            "lr": optimizer.param_groups[-1]['lr'],
  85.                            "epoch_Time": spend_time / (step + 1) * iter_per_epoch // 60 - spend_time // 60})
  86.         if (step + 1) % 1000 == 0 and (not ddp or dist.get_rank() == 0):
  87.             model.eval()
  88.             # torch.save(model.state_dict(), '{}/iter_{}.pth'.format(save_dir, int(step + epoch * iter_per_epoch)))
  89.             moe_path = '_moe' if lm_config.use_moe else ''
  90.             ckp = f'{save_dir}/pretrain_{lm_config.dim}{moe_path}.pth'
  91.             if isinstance(model, torch.nn.parallel.DistributedDataParallel):
  92.                 state_dict = model.module.state_dict()
  93.             else:
  94.                 state_dict = model.state_dict()
  95.             torch.save(state_dict, ckp)
  96.             model.train()
  97. # 模型初始化: init_model函数用于初始化Transformer模型,计算并输出模型的参数数量
  98. def init_model():
  99.     def count_parameters(model):
  100.         return sum(p.numel() for p in model.parameters() if p.requires_grad)
  101.     # model init
  102.     model = Transformer(lm_config).to(device)
  103.     moe_path = '_moe' if lm_config.use_moe else ''
  104.     # ckp = f'{save_dir}/pretrain_{lm_config.dim}{moe_path}.pth'
  105.     #
  106.     # state_dict = torch.load(ckp, map_location=device)
  107.     # unwanted_prefix = '_orig_mod.'
  108.     # for k, v in list(state_dict.items()):
  109.     #     if k.startswith(unwanted_prefix):
  110.     #         state_dict[k[len(unwanted_prefix):]] = state_dict.pop(k)
  111.     # model.load_state_dict(state_dict, strict=False)
  112.     Logger(f'LLM总参数量:{count_parameters(model) / 1e6:.3f} 百万')
  113.     return model
  114. # 分布式模式初始化: init_distributed_mode函数用于初始化分布式训练模式,包括设置设备编号、进程组初始化和NCCL后端的配置。
  115. # 核心技术点:分布式训练模式的设置(NCCL、DDP)。
  116. def init_distributed_mode():
  117.     if not ddp: return
  118.     global ddp_local_rank, DEVICE
  119.     dist.init_process_group(backend="nccl")
  120.     ddp_rank = int(os.environ["RANK"])
  121.     ddp_local_rank = int(os.environ["LOCAL_RANK"])
  122.     ddp_world_size = int(os.environ["WORLD_SIZE"])
  123.     DEVICE = f"cuda:{ddp_local_rank}"
  124.     torch.cuda.set_device(DEVICE)
  125. # torchrun --nproc_per_node 2 1-pretrain.py
  126. # I/O
  127. if __name__ == "__main__":
  128.     # -----------------------------------------------------------------------------
  129.     # 1、配置环境(GPU设备、随机种子、数据集路径等)
  130.     lm_config = LMConfig()
  131.     max_seq_len = lm_config.max_seq_len
  132.     out_dir = 'out'
  133.     epochs = 20
  134.     batch_size = 64
  135.     learning_rate = 2e-4
  136.     device = 'cuda:0' if torch.cuda.is_available() else 'cpu'
  137.     dtype = 'bfloat16'
  138.     save_dir = os.path.join(out_dir)
  139.     os.makedirs(save_dir, exist_ok=True)
  140.     os.makedirs(out_dir, exist_ok=True)
  141.     tokens_per_iter = batch_size * max_seq_len
  142.     torch.manual_seed(1337)
  143.     device_type = device if "cuda" in device else "cpu"
  144.     use_wandb = False  # 是否使用wandb
  145.     wandb_project = "MiniMind-Pretrain"
  146.     wandb_run_name = f"MiniMind-Pretrain-Epoch-{epochs}-BatchSize-{batch_size}-LearningRate-{learning_rate}"
  147.     if use_wandb:
  148.         import wandb
  149.         wandb.init(project=wandb_project, name=wandb_run_name)
  150.     else:
  151.         wandb = None
  152.     ctx = (
  153.         nullcontext()
  154.         if device_type == "cpu"
  155.         else torch.cuda.amp.autocast()
  156.     )
  157.     ddp = int(os.environ.get("RANK", -1)) != -1  # is this a ddp run?
  158.     ddp_local_rank, DEVICE = 0, "cuda:0"
  159.     if ddp:
  160.         init_distributed_mode()
  161.         device = torch.device(DEVICE)
  162.     # -----------------------------------------------------------------------------
  163.     # 2、训练数据加载器的配置:初始化训练数据集与数据加载器,使用PretrainDataset处理数据,并根据是否为分布式训练设置数据采样器DistributedSampler。
  164.     # 核心技术点:数据加载器与分布式数据采样器的配置。
  165.     # -----init dataloader------
  166.     data_path_list = ['./dataset/pretrain_data.bin']
  167.     train_ds = PretrainDataset(data_path_list, max_length=max_seq_len, memmap=True)
  168.     train_sampler = DistributedSampler(train_ds) if ddp else None
  169.     num_workers = 8  # 可以根据系统的 CPU 核心数来调整
  170.     train_loader = DataLoader(
  171.         train_ds,
  172.         batch_size=batch_size,
  173.         pin_memory=True,
  174.         drop_last=False,
  175.         shuffle=False,
  176.         num_workers=num_workers,
  177.         sampler=train_sampler
  178.     )
  179.     # 3、初始化模型与优化器,选择自动混合精度训练和分布式训练模式
  180.     # init model
  181.     model = init_model()
  182.     scaler = torch.cuda.amp.GradScaler(enabled=(dtype == dtype))
  183.     # optimizer
  184.     optimizer = optim.Adam(model.parameters(), lr=learning_rate)
  185.     # compile the model
  186.     if False and platform.system() != 'Windows' and float(torch.__version__.split('.')[0]) >= 2:
  187.         Logger("compiling the model... (takes a ~minute)")
  188.         unoptimized_model = model
  189.         model = torch.compile(model)
  190.     if ddp:
  191.         # Ignore the freqs_cis buffer so that DDP does not broadcast it at
  192.         # construction time since NCCL does not support ComplexFloat
  193.         model._ddp_params_and_buffers_to_ignore = {"pos_cis"}
  194.         model = DistributedDataParallel(model, device_ids=[ddp_local_rank])
  195.     # 4、通过for循环控制训练多个Epoch,每个Epoch调用train_epoch函数进行训练
  196.     # training loop
  197.     iter_per_epoch = len(train_loader)
  198.     for epoch in range(epochs):
  199.         train_epoch(epoch, wandb)
复制代码







免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

万有斥力

论坛元老
这个人很懒什么都没写!
快速回复 返回顶部 返回列表