发布于 

Megatron 源码分析

程序启动

/Megatron-LM/examples/pretrain_gpt_distributed.sh

使用 torchrun 启动多进程 pretrain_gpt.py

torchrun

预训练

pretrain_gpt.py 调用 pretrain 函数进行预训练。

1
2
3
4
5
6
if __name__ == "__main__":
pretrain(train_valid_test_datasets_provider,
model_provider,
ModelType.encoder_or_decoder,
forward_step,
args_defaults={'tokenizer_type': 'GPT2BPETokenizer'})

model_provider 提供一个并行化的 gpt 模型

1
2
3
4
5
6
7
8
9
10
11
12
13
def model_provider(pre_process=True, post_process=True):
"""Build the model."""

print_rank_0('building GPT model ...')
config = core_transformer_config_from_args(get_args())
model = GPTModel(
config,
num_tokentypes=0,
parallel_output=True,
pre_process=pre_process,
post_process=post_process
)
return model

在这里可以按照需要修改模型并行时的 Stage

TODO

train_valid_test_datasets_provider 构建 train/valid/test 数据集

forward_step 定义了模型的前向传播

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
def forward_step(data_iterator, model):
"""Forward step."""
args = get_args()
timers = get_timers()

# Get the batch.
timers('batch-generator', log_level=2).start()
tokens, labels, loss_mask, attention_mask, position_ids = get_batch(
data_iterator)
timers('batch-generator').stop()

output_tensor = model(tokens, position_ids, attention_mask,
labels=labels)

return output_tensor, partial(loss_func, loss_mask)

get_batch 从数据集中获取一个 batch 的数据,调用 broadcast_data 将数据广播到各个进程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
def get_batch(data_iterator):
"""Generate a batch"""
args = get_args()
tokenizer = get_tokenizer()

# Items and their type.
keys = ['text']
datatype = torch.int64

# Broadcast data.
if data_iterator is not None:
data = next(data_iterator)
else:
data = None
data_b = tensor_parallel.broadcast_data(keys, data, datatype)

# Unpack.
tokens_ = data_b['text'].long()
labels = tokens_[:, 1:].contiguous()
tokens = tokens_[:, :-1].contiguous()

# Get the masks and postition ids.
attention_mask, loss_mask, position_ids = get_ltor_masks_and_position_ids(
tokens,
tokenizer.eod,
args.reset_position_ids,
args.reset_attention_mask,
args.eod_mask_loss)

return tokens, labels, loss_mask, attention_mask, position_ids

pretrain 调用 megatron/training.py 中的 pretrain 函数进行训练

  1. 调用 initialize_megatron 初始化 megatron,包括 timers,tensorboard_writer 等,重点
  2. 设置 JIT,不太重要
  3. 设置时间戳,使用 all_reduce 进行同步
  4. 使用 setup_model_and_optimizer 初始化模型,优化器和优化策略(warnup, lr, ...)
  5. 使用 train 调用 forward_step_func 进行训练

初始化

megatron.initialize.initialize_megatron 初始化 megatron

  1. set_global_variables,设置 timers,tensorboard_writer
  2. _initialize_distributed
    1. 调用 torch.distributed.init_process_group 初始化分布式环境
    2. 调用 mpu.initialize_model_parallel 初始化模型并行,数据并行的进程组
      1. 这里提供了具体的分配方式,默认分配方式比较简单,直接通过张量并行大小,流水线并行大小,虚拟流水线大小(决定每张卡有几个连续 stage 段),和流水线并行分割等级(略)

问题

在哪里设置数据的发送和接收?(流水线并行)