deepspeed/runtime/engine.py

确定检查点保存的配置

1
2
3
4
5
6
7
8
def _save_checkpoint(self, save_dir, tag, client_state={}, exclude_frozen_parameters=False):

save_path = self._get_ckpt_name(save_dir, tag)

zero_optimizer_state = self.zero_optimization() or self.bfloat16_enabled()

save_frozen_param = self.zero_optimization_partition_gradients() and not exclude_frozen_parameters

  • **save_path = self._get_ckpt_name(save_dir, tag)**根据save_dir和tag生成完整的检查点保存路径。
  • **zero_optimizer_state = self.zero_optimization() or self.bfloat16_enabled()**若启用 Zero Optimization 或 bfloat16,则需要保存优化器状态。
  • save_frozen_param: 若启用 Zero Optimization 的梯度分区 且未排除冻结参数,则需要保存冻结参数。

收集模型的当前状态

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
self._curr_ckpt_path = os.path.join(save_dir, tag)
module = self.module_state_dict(exclude_frozen_parameters=exclude_frozen_parameters)
self._curr_ckpt_path = None
state = dict(module=module,
buffer_names=self._get_buffer_names(),
optimizer=self.optimizer.state_dict() if self.optimizer and not zero_optimizer_state else None,
param_shapes=self._get_zero_param_shapes() if self.optimizer and zero_optimizer_state else None,
frozen_param_shapes=self._get_zero_frozen_param_attributes(self._get_param_shape_func)
if save_frozen_param else None,
shared_params=self._get_shared_params() if self.optimizer and zero_optimizer_state else None,
frozen_param_fragments=self._get_zero_frozen_param_attributes(self._get_param_fragment_func)
if save_frozen_param else None,
lr_scheduler=self.lr_scheduler.state_dict() if self.lr_scheduler is not None else None,
data_sampler=self.training_dataloader.data_sampler.state_dict() if
(self.training_dataloader is not None and self.curriculum_learning_enabled()) else None,
random_ltd=self.random_ltd_scheduler.state_dict() if self.random_ltd_enabled() else None,
sparse_tensor_module_names=self.sparse_tensor_module_names,
skipped_steps=self.skipped_steps,
global_steps=self.global_steps,
global_samples=self.global_samples,
dp_world_size=self.seq_dp_world_size,
mp_world_size=self.mp_world_size,
ds_config=self.config,
ds_version=version)
state.update(client_state)

在使用 Pipeline Parallelism(流水线并行) 的训练模式时,
默认行为被重写,导致直接调用 module_state_dict() 返回 None,因为在流水线并行训练模式下,各个模块可能并没有直接获取到完整的状态数据。在调用 module_state_dict() 之前,临时设置一个保存路径变量 self._curr_ckpt_path。
PipelineEngine 的 module_state_dict() 实现会检查 self._curr_ckpt_path 是否存在,并利用它作为保存路径。
在完成 module_state_dict() 的调用之后,将 self._curr_ckpt_path 重置为 None,以免影响其他模块的逻辑。

  • **self._curr_ckpt_path = os.path.join(save_dir, tag)**将检查点保存路径设置为 save_dir 和 tag 的组合路径。 这是为了解决流水线并行模式下 module_state_dict() 依赖 self._curr_ckpt_path 的问题,确保其能够正确返回模型状态字典。
  • module = self.module_state_dict(exclude_frozen_parameters=exclude_frozen_parameters)调用 module_state_dict: 获取模型的状态字典。参数 exclude_frozen_parameters 用于指定是否排除冻结参数(例如固定不更新的权重)。
  • self._curr_ckpt_path = None 将 self._curr_ckpt_path 重置为 None,恢复初始状态。
  • **state = dict()**构造一个字典 state,保存与模型和训练相关的各种状态信息:
    module: 模型的状态字典,由 module_state_dict 提供。
    buffer_names: 模型中所有 buffer 的名称,调用 _get_buffer_names() 获取。
    optimizer: 优化器状态,通过 self.optimizer.state_dict() 提取。仅在启用了优化器且不使用 zero_optimizer_state 时保存。
    param_shapes: 参数形状。仅在启用 zero_optimizer_state 时保存,调用 _get_zero_param_shapes()。
    frozen_param_shapes: 冻结参数的形状。仅在启用 save_frozen_param 时保存,调用 _get_zero_frozen_param_attributes(self._get_param_shape_func)。
    shared_params: 共享参数信息,调用 _get_shared_params()。
    frozen_param_fragments: 冻结参数片段。类似于冻结参数形状,但数据片段级别,调用 _get_zero_frozen_param_attributes(self._get_param_fragment_func)。
    lr_scheduler: 学习率调度器状态字典,由 self.lr_scheduler.state_dict() 提供。
    data_sampler: 数据采样器状态字典。如果启用了课程学习(curriculum_learning_enabled())并且有训练数据加载器,则调用 self.training_dataloader.data_sampler.state_dict()。
    random_ltd: 随机 LTD 调度器状态字典(用于随机剪枝或模型压缩),通过 self.random_ltd_scheduler.state_dict() 获取。
    sparse_tensor_module_names: 稀疏张量模块的名称。
    skipped_steps: 当前训练中跳过的步骤数。
    global_steps: 全局训练步数。
    global_samples: 全局训练样本数。
    dp_world_size: 数据并行的全局工作大小。
    mp_world_size: 模型并行的全局工作大小。
    ds_config: DeepSpeed 的配置。
    ds_version: DeepSpeed 的版本信息。
  • **state.update(client_state)**将外部传入的 client_state 添加到 state 字典中,扩展其内容。

处理冻结参数形状

1
2
3
4
frozen_param_shapes = state['frozen_param_shapes']
frozen_param_shapes_str = str(frozen_param_shapes)
frozen_param_shapes_dict = {key: list(value) if isinstance(value, torch.Size) else value.tolist() for key, value in frozen_param_shapes.items()}

  • frozen_param_shapes = state[‘frozen_param_shapes’]
    **frozen_param_shapes_str = str(frozen_param_shapes)**提取冻结参数的形状信息,并转换为字符串格式以便后续处理。
  • frozen_param_shapes_dict = {
    key: list(value) if isinstance(value, torch.Size) else value.tolist()
    for key, value in frozen_param_shapes.items()
    }遍历字典中的每个键值对将 frozen_param_shapes 中的内容格式化为适合保存为 JSON 文件的形式。
    如果是 torch.Size类型(torch.Size 是 PyTorch 中表示张量形状的类,但它是不可直接序列化为 JSON 的)调用 list(value) 将其转换为 Python 的列表(如 (3, 4) 转为 [3, 4]),张量(torch.Tensor)也不可直接序列化为 JSON。调用 .tolist() 将张量内容转换为 Python 的列表。
    例如,形如 torch.tensor([1, 2, 3]) 会转换为 [1, 2, 3]。

总结

  • 确定检查点保存的配置:包括路径、状态和所需部分。
    确定保存路径:使用 self._get_ckpt_name(save_dir, tag) 方法生成保存路径 save_path。
    确定保存状态:zero_optimizer_state:是否保存优化器的非零状态(由 Zero Optimization 和 bfloat16 支持决定)。save_frozen_param:是否保存冻结参数状态(需要 Zero Optimization 的分区梯度功能并且没有排除冻结参数)。
  • 收集模型的当前状态:构建保存字典(state)
    保存模块状态:通过设置 self._curr_ckpt_path,解决 Pipeline Parallelism 中 module_state_dict() 的路径依赖问题,获取模块状态。
    构建状态字典 (**state)**:包含模型和训练相关的多种状态:
    模型权重(module),优化器状态(optimizer),参数形状(param_shapes)
    冻结参数形状及片段(frozen_param_shapes 和 frozen_param_fragments),学习率调度器状态(lr_scheduler),数据采样器状态(data_sampler)
    随机梯度裁剪(random_ltd),其它全局状态(例如步数、样本数、配置等)
  • 处理冻结参数形状:将 frozen_param_shapes 转换为字典格式(从 torch.Size 转为列表)。
  • 完整保存检查点:用于模型后续的加载和恢复。

frozen_param_shapes

目的:遍历模型的参数,检查哪些参数是冻结的(即不需要梯度更新的),并统计它们分布在 CPU 和 GPU 上的数量,同时通过给定的 attr_func 获取每个冻结参数的属性。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
def _get_zero_frozen_param_attributes(self, attr_func):
frozen_param_fragments = OrderedDict()
cpu = 0
gpu = 0
for param in self.module.parameters():
if param.requires_grad:
continue
if param not in self.param_names:
raise ValueError(f"failed to find frozen {param} in named params")
name = self.param_names[param]
#pdb.set_trace()

if param.ds_tensor.device == torch.device('cpu'):
cpu+=1
# print(f"Device of param {param}: {param.ds_tensor.device}")
else :
pdb.set_trace()
gpu+=1
#print(param.ds_tensor.device)

frozen_param_fragments[name] = attr_func(param)
print(f"cpu次数: {cpu}")
print(f"gpu次数: {gpu}")
return frozen_param_fragments
  • frozen_param_fragments:存储冻结参数及其通过 attr_func 提取的属性,使用 OrderedDict 按插入顺序存储键值对。
  • for param in self.module.parameters():遍历模型中所有的参数。
  • if param.requires_grad:continue 这里param.requires_grad=True 表示该参数是可训练的,即需要进行梯度更新,因此跳过这些参数,只处理冻结的参数(requires_grad=False)。
  • if param not in self.param_names: param 必须在 self.param_names 中有对应的名称。如果没有找到,抛出一个 ValueError 错误,表示无法找到该参数的名称。
  • name = self.param_names[param]:获取冻结参数的名称,存储在 name 变量中。这个名称通常用于在 frozen_param_fragments 字典中标识每个参数。
  • if param.ds_tensor.device == torch.device(‘cpu’):
    cpu += 1
    else:
    gpu += 1区分 CPU 和 GPU 中的冻结参数,并分别统计它们的数量。
  • **frozen_param_fragments[name] = attr_func(param)**:对每个冻结参数,调用 attr_func(param) 来获取其相关属性,并将该属性存储在 frozen_param_fragments 字典中。字典的键是参数的名称 name,值是通过 attr_func 获取的属性。

总结

  1. 初始化数据结构:创建存储冻结参数的字典 frozen_param_fragments 和设备计数器 cpu、gpu。
  2. 遍历模型参数:遍历模型中所有的参数,筛选出冻结参数。检查参数名称:确保冻结参数在 self.param_names 中有对应的名称。
  3. 统计设备信息:判断冻结参数所在的设备,并统计 CPU 和 GPU 上冻结参数的数量。提取属性:通过调用 attr_func 提取冻结参数的属性。
  4. 打印统计信息:打印 CPU 和 GPU 上冻结参数的数量。
  5. 返回结果:返回包含冻结参数名称和属性的字典。

deepspeed/runtime/engine.py
http://sjx.com/2024/11/27/deepspeed-runtime-engine-py/
作者
sjx
发布于
2024年11月27日
许可协议