LLM基础学习02:分布式训练核心架构与多级并行策略详解——DDP/FSDP/ZeRO实 ...

打印 上一主题 下一主题

主题 978|帖子 978|积分 2934

希望尽可能将本身学习过程中参考过的资料进行系统的整理,方便背面的初学者更快的找到本身想要的资料!这系列笔记的代码都在**Basic-LLM-Learning**里面,欢迎Star!
一些大略的参考,更详细的参考会在对应的小节里面提到。
Part 1: Welcome to the Distributed Data Parallel (DDP) Tutorial Series
Efficient Training of Large Language Models on Distributed…
动画明白Pytorch 大模子分布式训练技术 DP,DDP,DeepSpeed ZeRO技术_哔哩哔哩_bilibili
ZeRO & DeepSpeed: New system optimizations enable training models with over 100 billion parameters - Microsoft Research
简要先容

并行方案包括:Hybrid Parallelism(混合并行), Auto Parallelism(主动并行), and Heterogeneous Parallelism(异构并行),目前以混合并行最为常见,也是最成熟最容易插入到本身的代码中实现即插即用的,所以背面主要是对混合并行进行简要的原理先容和详细的代码实现,别的并行技术则会贴上一些先容的比力好的博客或者原论文以供阅读
Hybrid Parallelism(混合并行):混合并行通常结合多种手工制作的并行化策略来分别LLM的不同并行化维度,又可以细分为:


  • data parallelism:数据并行,将训练数据集分别为多个子集,每个GPU复制完备模子参数或分别模子参数(扩展优化技术),独立处理不同数据,终极同步梯度或参数。实用于模子参数量适中(单卡可容纳),但数据量大的场景(下面先容到的DDP、FSDP、ZeRO都属于数据并行和相关的优化技术,是目前最常用,最方便“即插即用”的方法,所以这里会进行侧重先容)。
  • tensor parallelism:张量并行,按模子内部张量的维度拆分盘算(如矩阵的行或列),分配到不同GPU,通讯整合结果。解决超大参数模子(如千亿参数模子)的单卡内存瓶颈。
  • pipeline parallelism:流水线并行,将模子按层切分到不同GPU,数据以"微批次(micro-batch)"流经各阶段(如PipeDream、GPipe)。模子层数较多(如深层次Transformer),可横向扩展至多装备。
  • sequence parallelism:将长序列样本(如文本token)拆分到不同GPU,独立处理子序列并协同盘算注意力机制,处理超长序列(如数万token的文本),降低单卡内存占用。
  • expert parallelism:混合专家模子(MoE)中,将不同的专家子网络分配到不同GPU,通过路由机制动态分配样本
假如用一个表格进行简单的比力:
并行方式拆分维度通讯开销实用场景数据并行数据批次低数据量大,模子小张量并行模子张量(行/列)高单层参数量过大(如矩阵乘法)流水线并行模子层中模子深但层参数量小序列并行输入序列长度中-高超长序列处理(如文本tokens)专家并行模子希罕路径动态分配万亿参数规模的希罕模子 DDP(Distributed Data Parallel)

单机多卡

   数据并行,程序会为每个GPU创建一个进程,每个进程中有对应的模子和优化器本地副本,而且每个优化器上不但模子参数雷同(有相关的扩展优化技术对模子参数进行分片,如FSDP,背面会进行先容),优化器的随机数种子也雷同。DDP会在训练过程内部维持这种同步。每个不同的进程中会继承到不同的数据集样本采样(即Data Parallel,数据并行)。
  更加详细的原理部分请参考引用链接中的B站视频部分。
代码来源(对应的文件路径为distributed/ddp-tutorial-series/multigpu.py):
GitHub - pytorch/examples: A set of examples around pytorch in Vision, Text, Reinforcement Learning, etc.
颠末处理的带注释的所有文件在:
Basic-LLM-Learning/Distributed training/DDP at master · CYRYGBG/Basic-LLM-Learning
假如要对单卡训练的代码进行修改,使用DDP进行训练,代码中有几个部分需要修改:

  • 导入DDP需要用到的库

  1. # 1.导入DDP需要用到的库
  2. # 用于支持多进程操作
  3. import torch.multiprocessing as mp
  4. # 用于在分布式训练中对数据集进行采样,确保每个进程看到不同的数据子集
  5. from torch.utils.data.distributed import DistributedSampler
  6. # 用于将模型分布到多个 GPU 上并行训练
  7. from torch.nn.parallel import DistributedDataParallel as DDP
  8. # 分别用于初始化分布式训练组和清理分布式训练组
  9. from torch.distributed import init_process_group, destroy_process_group
  10. import os
复制代码

  • 增加DDP相关参数设置的函数
  1. # 2.增加DDP相关参数设置的函数
  2. def ddp_setup(rank, world_size):
  3.     """
  4.     设置分布式数据并行(DDP)环境,在训练前需要调用完成设置
  5.     Args:
  6.         rank (int): 当前进程的唯一标识符(通常为 0 到 world_size-1)。
  7.         world_size (int): 参与分布式训练的总进程数(通常等于 GPU 的数量)。
  8.     """
  9.     # 设置主节点的地址,这里使用本地主机(localhost)作为主节点
  10.     os.environ["MASTER_ADDR"] = "localhost"
  11.     # 设置主节点的端口号,确保所有进程使用相同的端口进行通信
  12.     # 主节点(MASTER_ADDR 和 MASTER_PORT)负责协调所有进程之间的通信,
  13.     # 分布式训练需要所有进程能够互相通信,因此这些设置必须一致
  14.     os.environ["MASTER_PORT"] = "12355"
  15.     # 设置当前进程使用的 GPU 设备,rank 通常对应 GPU 的索引
  16.     torch.cuda.set_device(rank)
  17.     # 初始化进程组,用于分布式训练
  18.     # - backend="nccl": 使用 NCCL 后端,NCCL 是 NVIDIA 提供的用于多 GPU 通信的高性能库
  19.     # - rank=rank: 当前进程的标识符
  20.     # - world_size=world_size: 总进程数
  21.     init_process_group(backend="nccl", rank=rank, world_size=world_size)
复制代码

  • 将模子封装到DDP中,用于背面对应GPU进程中模子参数的分发

  1. class Trainer:
  2.     def __init__(
  3.         self,
  4.         model: torch.nn.Module,
  5.         train_data: DataLoader,
  6.         optimizer: torch.optim.Optimizer,
  7.         gpu_id: int,
  8.         save_every: int,
  9.     ) -> None:
  10.         self.gpu_id = gpu_id
  11.         self.model = model.to(gpu_id)
  12.         self.train_data = train_data
  13.         self.optimizer = optimizer
  14.         self.save_every = save_every
  15.         # 3.将模型封装到DDP中,用于后面对应GPU进程中模型参数的分发
  16.         self.model = DDP(model, device_ids=[gpu_id])
复制代码

  • 由于model如今是DDP的封装,不能直接通过model.state_dict()来直接获取模子的参数了

  1. def _save_checkpoint(self, epoch):
  2.         # 4.由于model现在是DDP的封装,不能直接通过model.state_dict()来直接获取模型的参数了
  3.         ckp = self.model.module.state_dict()  
  4.         PATH = "checkpoint.pt"
  5.         torch.save(ckp, PATH)
  6.         print(f"Epoch {epoch} | Training checkpoint saved at {PATH}")
复制代码

  • 由于每个进程中的模子参数是一样的,所以只需要保存一个进程上的模子参数

  1. def train(self, max_epochs: int):
  2.         for epoch in range(max_epochs):
  3.             self._run_epoch(epoch)
  4.             # 5.由于每个进程中的模型参数是一样的,所以只需要保存一个进程上的模型参数
  5.             if self.gpu_id == 0 and epoch % self.save_every == 0:
  6.                 self._save_checkpoint(epoch)
复制代码

  • 由于已经指定了采样器,所以将shuffle设置为false
  • 包管数据聚会会议被切分到不同的进程中,而且不会产生重复样本

  1. def prepare_dataloader(dataset: Dataset, batch_size: int):
  2.     return DataLoader(
  3.         dataset,
  4.         batch_size=batch_size,
  5.         pin_memory=True,
  6.         shuffle=False,  # 6.由于已经指定了采样器,所以将shuffle设置为false
  7.         sampler=DistributedSampler(dataset)  # 7.保证数据集会被切分到不同的进程中,并且不会产生重复样本
  8.     )
复制代码

  • 需要初始化进程组
  • 确保在训练完成后精确关闭分布式进程组,释放资源并停止后台进程

  1. def main(rank: int, world_size: int, save_every: int, total_epochs: int, batch_size: int):
  2.     ddp_setup(rank, world_size)  # 8.需要初始化进程组
  3.     dataset, model, optimizer = load_train_objs()
  4.     train_data = prepare_dataloader(dataset, batch_size)
  5.     trainer = Trainer(model, train_data, optimizer, rank, save_every)
  6.     trainer.train(total_epochs)
  7.     destroy_process_group()  # 9.确保在训练完成后正确关闭分布式进程组,释放资源并停止后台进程
复制代码

  • 训练部分需要使用mp.spawn完成各下层参数的通报

  1. world_size = torch.cuda.device_count()  # 获取GPU数量
  2. mp.spawn(main,  # 每个进程执行的函数
  3.        args=(world_size, args.save_every, args.total_epochs, args.batch_size),  # 传递给 main 的参数,其中参数rank会被自动分配
  4.        nprocs=world_size  # 进程数
  5.       )
复制代码
With torchrun

   torchrun 是 PyTorch 自带的分布式训练下令行工具,简化了多进程启动的复杂性。
  在DDP训练代码的基础上(前一小节),进行以下修改就可以使用torchrun了:

  • 不再需要手动设置环境变量,只需要指定当前的GPU

  1. # 1. 不再需要手动设置环境变量,只需要指定当前的GPU
  2. def ddp_setup():
  3.     torch.cuda.set_device(int(os.environ["LOCAL_RANK"]))
  4.     init_process_group(backend="nccl")
复制代码

  • 增加快照保存路径,假如快照存在,则先进行读取
  • torchrun会设置环境变量,只需要获取对应变量就可以获取当前GPU编号

  1. class Trainer:
  2.     def __init__(
  3.         self,
  4.         model: torch.nn.Module,
  5.         train_data: DataLoader,
  6.         optimizer: torch.optim.Optimizer,
  7.         save_every: int,
  8.         snapshot_path: str,  # 2. 增加快照保存路径,如果快照存在,则先进行读取
  9.     ) -> None:
  10.         # 3.torchrun会设置环境变量,只需要获取对应变量就可以获取当前GPU编号
  11.         self.gpu_id = int(os.environ["LOCAL_RANK"])  
  12.         self.model = model.to(self.gpu_id)
  13.         self.train_data = train_data
  14.         self.optimizer = optimizer
  15.         self.save_every = save_every
  16.         self.epochs_run = 0
  17.         self.snapshot_path = snapshot_path  
  18.         if os.path.exists(snapshot_path):
  19.             print("Loading snapshot")
  20.             self._load_snapshot(snapshot_path)
  21.         self.model = DDP(self.model, device_ids=[self.gpu_id])
复制代码

  • 增加_load_snapshot函数,用于读取当前存在的snapshot参数
  1. # 4. 增加_load_snapshot函数,用于读取当前存在的snapshot参数
  2. def _load_snapshot(self, snapshot_path):
  3.     loc = f"cuda:{self.gpu_id}"
  4.     snapshot = torch.load(snapshot_path, map_location=loc)
  5.     self.model.load_state_dict(snapshot["MODEL_STATE"])
  6.     self.epochs_run = snapshot["EPOCHS_RUN"]
  7.     print(f"Resuming training from snapshot at Epoch {self.epochs_run}")
复制代码

  • 增加_save_snapshot函数,保存当前训练的状态和模子参数
  1. # 5. 增加_save_snapshot函数,保存当前训练的状态和模型参数
  2. def _save_snapshot(self, epoch):
  3.     snapshot = {
  4.         "MODEL_STATE": self.model.module.state_dict(),
  5.         "EPOCHS_RUN": epoch,
  6.     }
  7.     torch.save(snapshot, self.snapshot_path)
  8.     print(f"Epoch {epoch} | Training snapshot saved at {self.snapshot_path}")
复制代码

  • 从snapshot读取参数后,按照对应的训练步骤开始训练,不用重新开始训练
  • checkpoint保存修改为snapshot保存

  1. def train(self, max_epochs: int):
  2.         # 6. 从snapshot读取参数后,按照对应的训练步骤开始训练,不用从头开始训练
  3.         for epoch in range(self.epochs_run, max_epochs):
  4.             self._run_epoch(epoch)
  5.             if self.gpu_id == 0 and epoch % self.save_every == 0:
  6.                 self._save_snapshot(epoch)  # 7.checkpoint保存修改为snapshot保存
复制代码

  • rank和world_size参数由torchrun处理,这里只需要在Trainer参数中传入snapshot_path

  1. # 8.rank和world_size参数由torchrun处理,这里只需要在Trainer参数中传入snapshot_path
  2. def main(save_every: int, total_epochs: int, batch_size: int, snapshot_path: str = "snapshot.pt"):
  3.     ddp_setup()
  4.     dataset, model, optimizer = load_train_objs()
  5.     train_data = prepare_dataloader(dataset, batch_size)
  6.     trainer = Trainer(model, train_data, optimizer, save_every, snapshot_path)
  7.     trainer.train(total_epochs)
  8.     destroy_process_group()
复制代码

  • 只需要调用main就行

  1. # 9.只需要调用main就行
  2. main(args.save_every, args.total_epochs, args.batch_size)
复制代码

  • 下令行运行代码torchrun --nproc_per_node=gpu multigpu.py 50 10
多机多卡

由于当前本身基本全是在使用单机多卡,所以详细代码不作纪录,仅对相关教程链接进行引用。
Part 5: Multinode DDP Training with Torchrun (code walkthrough)
FSDP(Fully Sharded Data Parallel)

FSDP 是将模子参数、优化器状态和梯度都进行切分并分配到不同的GPU上。在前向流传中,FSDP会网络所有装备上的模子参数来完成一次完备的模子前向流传,随后释放来自别的GPU的参数;在反向流传中,FSDP也会从别的GPU中网络完备的模子参数进行反向流传,在完成梯度同步后再释放所网络的参数。
更加详细的原理部分请参考引用链接中的B站视频部分。
如今继续以Pytorch Example中的mnist代码作为基础来进行修改。最紧张的修改的点包括:损失值等结构需要聚合后再盘算输出、数据集的分片处理和模子的分片处理,同时在所有保存信息或者输出信息的代码中必须包管是主进程,避免重复保存或输出。下面的代码以在终端中使用torchrun下令运行为基础。
修改后的带有注释的完备代码在以下链接的Distributed training/FSDP/FSDP_torchrun.py文件中,main.py文件则为未经FSDP训练修改的原代码。
GitHub - CYRYGBG/Basic-LLM-Learning

  • 增加须要的包引入
  1. # 1. 增加包引入
  2. import os
  3. import functools
  4. import torch.distributed as dist  # 用于多线程数据的处理
  5. import torch.multiprocessing as mp
  6. from torch.utils.data.distributed import DistributedSampler
  7. from torch.distributed.fsdp import FullyShardedDataParallel as FSDP
  8. from torch.distributed.fsdp.fully_sharded_data_parallel import (
  9.     CPUOffload,
  10.     BackwardPrefetch,
  11. )
  12. from torch.distributed.fsdp.wrap import (
  13.     size_based_auto_wrap_policy,
  14.     enable_wrap,
  15.     wrap,
  16. )
复制代码

  • 增加FSDP相关参数设置的函数
  1. # 2. 增加FSDP相关参数设置的函数
  2. def setup(rank, world_size):
  3.     dist.init_process_group("nccl", rank=rank, world_size=world_size)
  4. def cleanup():
  5.     dist.destroy_process_group()
复制代码

  • 修改训练函数(仅训练一轮)使其符合FSDP的使用

  1. def train(args, model, rank, world_size, train_loader, optimizer, epoch, sampler=None):
  2.     model.train()
  3.     # 新增用来存储分布式训练损失统计(损失总和+样本数)
  4.     ddp_loss = torch.zeros(2).to(rank)
  5.     # 设置sampler的epoch,保证分布式训练中shuffle的正确性
  6.     if sampler:  
  7.         sampler.set_epoch(epoch)
  8.     for batch_idx, (data, target) in enumerate(train_loader):
  9.         # 将数据转移到指定的设备上
  10.         data, target = data.to(rank), target.to(rank)
  11.         optimizer.zero_grad()
  12.         output = model(data)
  13.         loss = F.nll_loss(output, target, reduction='sum')
  14.         loss.backward()
  15.         optimizer.step()
  16.         ddp_loss[0] += loss.item()
  17.         ddp_loss[1] += len(data)
  18.     # 将不同进程中中的损失值和样本数进行相加以计算后续的平均损失
  19.     dist.all_reduce(ddp_loss, op=dist.ReduceOp.SUM)
  20.     # 仅在主进程进行该操作
  21.     if rank == 0:
  22.         print('Train Epoch: {} \tLoss: {:.6f}'.format(epoch, ddp_loss[0] / ddp_loss[1]))
复制代码

  • 修改测试函数使其符合FSDP的使用

  1. # 4. 修改测试函数使其符合FSDP的使用
  2. def test(model, rank, world_size, test_loader):
  3.     model.eval()
  4.     correct = 0
  5.     # 与train函数中的部分类似,记录损失值、预测正确的样本数和总样本数
  6.     ddp_loss = torch.zeros(3).to(rank)
  7.     with torch.no_grad():
  8.         for data, target in test_loader:
  9.             data, target = data.to(rank), target.to(rank)
  10.             output = model(data)
  11.             ddp_loss[0] += F.nll_loss(output, target, reduction='sum').item()  # 所有损失值求和
  12.             pred = output.argmax(dim=1, keepdim=True)  # 获取预测标签
  13.             ddp_loss[1] += pred.eq(target.view_as(pred)).sum().item()  # 预测正确的样本数
  14.             ddp_loss[2] += len(data)  # 样本数
  15.     # 与train中功能类似
  16.     dist.all_reduce(ddp_loss, op=dist.ReduceOp.SUM)
  17.     if rank == 0:
  18.         test_loss = ddp_loss[0] / ddp_loss[2]
  19.         print('Test set: Average loss: {:.4f}, Accuracy: {}/{} ({:.2f}%)\n'.format(
  20.             test_loss, int(ddp_loss[1]), int(ddp_loss[2]),
  21.             100. * ddp_loss[1] / ddp_loss[2]))
复制代码

  • 将模子封装在FSDP中并进行分布式训练,同时将参数通报部分全部转移到背面的运行部分中
  1. # 5. 将模型封装在FSDP中并进行分布式训练
  2. def fsdp_main(args):
  3.     # 获取线程信息
  4.     rank = int(os.environ['RANK'])
  5.     world_size = int(os.environ['WORLD_SIZE'])
  6.     local_rank = int(os.environ['LOCAL_RANK'])
  7.     setup(rank, world_size)
  8.     # -----------数据处理和下载部分没有变化-----------
  9.     transform=transforms.Compose([
  10.         transforms.ToTensor(),
  11.         transforms.Normalize((0.1307,), (0.3081,))
  12.     ])
  13.     dataset1 = datasets.MNIST('../data', train=True, download=True,
  14.                         transform=transform)
  15.     dataset2 = datasets.MNIST('../data', train=False,
  16.                         transform=transform)
  17.     # -----------数据处理和下载部分没有变化-----------
  18.     # 与分布式
  19.     sampler1 = DistributedSampler(dataset1,  # 用于采样的数据集
  20.                                   rank=rank,  # 当前进程
  21.                                   num_replicas=world_size,  # 参与分布式训练的进程的数量
  22.                                   shuffle=True)
  23.     sampler2 = DistributedSampler(dataset2,
  24.                                   rank=rank,
  25.                                   num_replicas=world_size)
  26.     train_kwargs = {'batch_size': args.batch_size,
  27.                     'sampler': sampler1}
  28.     test_kwargs = {'batch_size': args.test_batch_size,
  29.                    'sampler': sampler2}
  30.     cuda_kwargs = { 'num_workers': 2,       # 数据加载子进程数
  31.                     'pin_memory': True,     # 启用锁页内存(加速GPU传输)
  32.                     'shuffle': False}       # 分布式采样器已处理shuffle,此处必须禁用
  33.     # 合并到训练和测试参数中
  34.     train_kwargs.update(cuda_kwargs)
  35.     test_kwargs.update(cuda_kwargs)
  36.     train_loader = torch.utils.data.DataLoader(dataset1,**train_kwargs)
  37.     test_loader = torch.utils.data.DataLoader(dataset2, **test_kwargs)
  38.     # FSDP自动包装策略
  39.     my_auto_wrap_policy = functools.partial(
  40.         size_based_auto_wrap_policy,    # 基于参数数量的自动分片策略
  41.         min_num_params=100              # 参数超过100的模块会被分片
  42.     )
  43.     # 绑定当前进程到对应GPU
  44.     torch.cuda.set_device(local_rank)
  45.     # 记录初始化开始时间
  46.     init_start_event = torch.cuda.Event(enable_timing=True)
  47.     # 记录初始化结束时间
  48.     init_end_event = torch.cuda.Event(enable_timing=True)
  49.     # 创建模型并移至当前GPU
  50.     model = Net().to(rank)
  51.     # 使用FSDP封装模型并使用自定义的自动分片策略
  52.     model = FSDP(model, auto_wrap_policy=my_auto_wrap_policy)
  53.     optimizer = optim.Adadelta(model.parameters(), lr=args.lr)
  54.     scheduler = StepLR(optimizer, step_size=1, gamma=args.gamma)
  55.     init_start_event.record()  # 开始记录初始化时间
  56.     for epoch in range(1, args.epochs + 1):
  57.         train(args, model, rank, world_size, train_loader, optimizer, epoch, sampler=sampler1)
  58.         test(model, rank, world_size, test_loader)
  59.         scheduler.step()
  60.     init_end_event.record()  # 时间记录和输出
  61.     if rank == 0:
  62.         init_end_event.synchronize()
  63.         print(f"CUDA event elapsed time: {init_start_event.elapsed_time(init_end_event) / 1000}sec")
  64.         print(f"{model}")
  65.     if args.save_model:
  66.         # 同步所有进程,确保训练完成
  67.         # 避免rank 0在保存时其他进程还在运行
  68.         dist.barrier()
  69.         # 获取全量模型参数(会自动聚合分片)
  70.         states = model.state_dict()
  71.         # 仅主进程保存
  72.         if rank == 0:
  73.             torch.save(states, "mnist_cnn.pt")
  74.     # 销毁进程组,释放资源
  75.     cleanup()
复制代码

  • 修改后的参数通报和运行部分
  1. # 6. 修改后的参数传递和运行部分
  2. if __name__ == '__main__':
  3.     # Training settings
  4.     parser = argparse.ArgumentParser(description='PyTorch MNIST Example')
  5.     parser.add_argument('--batch-size', type=int, default=64, metavar='N',
  6.                         help='input batch size for training (default: 64)')
  7.     parser.add_argument('--test-batch-size', type=int, default=1000, metavar='N',
  8.                         help='input batch size for testing (default: 1000)')
  9.     parser.add_argument('--epochs', type=int, default=14, metavar='N',
  10.                         help='number of epochs to train (default: 14)')
  11.     parser.add_argument('--lr', type=float, default=1.0, metavar='LR',
  12.                         help='learning rate (default: 1.0)')
  13.     parser.add_argument('--gamma', type=float, default=0.7, metavar='M',
  14.                         help='Learning rate step gamma (default: 0.7)')
  15.     parser.add_argument('--no-cuda', action='store_true', default=False,
  16.                         help='disables CUDA training')
  17.     parser.add_argument('--seed', type=int, default=1, metavar='S',
  18.                         help='random seed (default: 1)')
  19.     parser.add_argument('--save-model', action='store_true', default=False,
  20.                         help='For Saving the current Model')
  21.     args = parser.parse_args()
  22.     torch.manual_seed(args.seed)
  23.     fsdp_main(args)
复制代码
ZeRO & DeepSpeed

一些链接

详细的原明白释:
ZeRO: Memory Optimizations Toward Training Trillion Parameter Models
论文精度视频:
Zero 论文精读【论文精读】_哔哩哔哩_bilibili
ds_config.json中相关参数的解释
DeepSpeed Configuration JSON
原理

ZeRO有三个阶段:


  • ZeRO Stage 1:仅对优化器状态进行分割,每个GPU中仍有完备的模子参数和梯度数据
  • ZeRO Stage 2:对优化器状态和梯度进行分割
  • ZeRO Stage 3:对优化器状态、梯度和模子参数全部进行分割
特性Stage 1Stage 2Stage 3优化器状态分区存储分区存储分区存储梯度完备存储分区存储分区存储模子参数完备存储完备存储分区存储 代码

deepspeed库的方法和别的方法最大的差别是不需要对本身的代码进行太大的改动就可以使用分布式训练。

  • 导入须要的库

  1. # 1. 导入必要的库
  2. from torch.optim.lr_scheduler import StepLR, LambdaLR
  3. import deepspeed
  4. import os
复制代码

  • 修改train函数,这部分最紧张的是将梯度清零, 前向流传,反向流传和梯度更新的部分全部修改为封装后的model_engine完成

  1. # 2. 修改train函数,主要的修改包括使用deepspeed的封装进行前向
  2. #    和反向传播过程
  3. def train(args, model_engine, train_loader, epoch):
  4.     model_engine.train()
  5.     local_rank = int(os.getenv('LOCAL_RANK', '0'))
  6.     for batch_idx, (data, target) in enumerate(train_loader):
  7.         # 数据传递到model_engine.device自动进行设备分配
  8.         data = data.to(model_engine.device)
  9.         target = target.to(model_engine.device)
  10.         # 使用model_engine.device进行反向传播相关处理
  11.         # 进行修改的包括:梯度清零, 前向传播,反向传播和梯度更新
  12.         model_engine.zero_grad()  # 原 optimizer.zero_grad()
  13.         output = model_engine(data)  # 原 output = model(data)
  14.         loss = F.nll_loss(output, target)
  15.         model_engine.backward(loss)  # 原 loss.backward()
  16.         model_engine.step()  # 原 optimizer.step()
  17.         if batch_idx % args.log_interval == 0 and local_rank == 0:
  18.             print('Train Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}'.format(
  19.                 epoch, batch_idx * len(data), len(train_loader.dataset),
  20.                 100. * batch_idx / len(train_loader), loss.item()))
  21.             if args.dry_run:
  22.                 break
复制代码

  • 与前面类似,将参数通报部分放到背面的运行部分,main函数**仅保存数据处理、模子封装和训练循环部分。特别注意:**代码修改前的scheduler是在每轮训练完成后对学习率进行调整,但是如今全部由model_engine继承后,需要手动调整scheduler的实现来完成和原来雷同的功能(每次调用model_engine.step()后都会默认调用一次scheduler),下面特别标出


  1. def main(args):
  2.     train_kwargs = {'batch_size': args.batch_size}
  3.     test_kwargs = {'batch_size': args.test_batch_size}
  4.     cuda_kwargs = {'num_workers': 1,
  5.                     'pin_memory': True,
  6.                     'shuffle': True}
  7.     train_kwargs.update(cuda_kwargs)
  8.     test_kwargs.update(cuda_kwargs)
  9.     transform=transforms.Compose([
  10.         transforms.ToTensor(),
  11.         transforms.Normalize((0.1307,), (0.3081,))
  12.         ])
  13.     dataset1 = datasets.MNIST('../data', train=True, download=True,
  14.                        transform=transform)
  15.     dataset2 = datasets.MNIST('../data', train=False,
  16.                        transform=transform)
  17.     train_loader = torch.utils.data.DataLoader(dataset1,**train_kwargs)
  18.     test_loader = torch.utils.data.DataLoader(dataset2, **test_kwargs)
  19.     model = Net()
  20.    
  21.     # 定义好优化器和调度器之后再传递给deepspeed.initialize进行接管
  22.     # 这里要不就完全自己指定在传递给deepspeed,要不就完全在config中指定好,不能进行混用
  23.     # 另外,在代码中指定的话会覆盖config中的设置
  24.     optimizer = optim.Adadelta(model.parameters(), lr=args.lr)  # default=1.0
  25.     # 计算每个epoch的batch数量
  26.     total_batches_per_epoch = len(train_loader)
  27.     # 定义基于batch的调度逻辑
  28.     def lr_lambda(current_step):
  29.         # 每隔 total_batches_per_epoch * step_size 个batch更新一次
  30.         return 1.0 if current_step % (total_batches_per_epoch * args.step_size) !=0 else args.gamma
  31.     scheduler = LambdaLR(optimizer, lr_lambda)  
  32.     # scheduler = StepLR(optimizer, step_size=1, gamma=args.gamma)  # default=0.7
  33.     # 初始化DeepSpeed
  34.     model_engine, optimizer, _, _ = deepspeed.initialize(
  35.                                                         args=args,
  36.                                                         model=model,
  37.                                                         model_parameters=model.parameters(),
  38.                                                         optimizer=optimizer,
  39.                                                         lr_scheduler=scheduler,
  40.                                                         config=args.deepspeed_config
  41.                                                         )
  42.    
  43.     for epoch in range(1, args.epochs + 1):
  44.         train(args, model_engine, train_loader, epoch)
  45.         # 测试函数传递的是当前的模型
  46.         test(model_engine.module, model_engine.device, test_loader)
  47.         # 不需要再进行显式调用,会在model_engine.step()中进行统一处理
  48.         # scheduler.step()
  49.     if args.save_model:
  50.         torch.save(model_engine.module.state_dict(), "mnist_cnn.pt")
复制代码
Tensor Parallelism(张量并行)

Tensor Parallelism(TP)张量并行,可以明白为当输入张量与第一个权重张量进行矩阵乘法时,本质上等效于以下操纵:将权重张量按列方向切分到不同GPU上,每个子权重张量分别与输入张量进行独立运算后,各个GPU再将盘算结果传输汇总,并通过拼接操纵形成终极的输出结果,可以参考下图:

Efficient Training on Multiple GPUs
Efficient Large-Scale Language Model Training on GPU Clusters…
Pipeline Parallelism(流水线并行)

流水线并行将模子的层分别为多个阶段,此中每个阶段有模子中一连的几个层组成并放到一个GPU上进行处理。

deepspeed库中有关于流水线并行的实现,但是要求模子必须表示为一连的一系列层(假如有些层进行了重复使用,这种模子就无法使用流水线并行)。
Pipeline Parallelism
Sequence Parallelism(序列并行)

这个方法好像有来自不同论文的不同的实现方法,但是也没有一个通用的库来完成通用的实现,所以这里贴一个感觉总结的比力好的文章的链接。
【分布式训练技术分享四】聊聊序列并行Sequence parallelism
Expert Parallelism(专家并行)

专家并行是混合专家模子(MoE)中使用的一种并行方法,用于将不同的专家模子分配到不同的GPU上。

有一个将混合专家模子和专家并行先容的比力好的文章贴在这里。
混合专家模子(MoE)详解
关于专家混合模子想要了解更多和追追热门的话可以看一下Deepseek的一篇MoE的论文:
DeepSeekMoE: Towards Ultimate Expert Specialization in…


免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

怀念夏天

金牌会员
这个人很懒什么都没写!
快速回复 返回顶部 返回列表