Megatron 源码分析
程序启动
/Megatron-LM/examples/pretrain_gpt_distributed.sh
使用 torchrun
启动多进程
pretrain_gpt.py
torchrun
预训练
pretrain_gpt.py
调用 pretrain
函数进行预训练。
1 2 3 4 5 6
| if __name__ == "__main__": pretrain(train_valid_test_datasets_provider, model_provider, ModelType.encoder_or_decoder, forward_step, args_defaults={'tokenizer_type': 'GPT2BPETokenizer'})
|
model_provider
提供一个并行化的 gpt 模型
1 2 3 4 5 6 7 8 9 10 11 12 13
| def model_provider(pre_process=True, post_process=True): """Build the model."""
print_rank_0('building GPT model ...') config = core_transformer_config_from_args(get_args()) model = GPTModel( config, num_tokentypes=0, parallel_output=True, pre_process=pre_process, post_process=post_process ) return model
|
在这里可以按照需要修改模型并行时的 Stage
TODO
train_valid_test_datasets_provider
构建
train/valid/test
数据集
forward_step
定义了模型的前向传播
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| def forward_step(data_iterator, model): """Forward step.""" args = get_args() timers = get_timers()
timers('batch-generator', log_level=2).start() tokens, labels, loss_mask, attention_mask, position_ids = get_batch( data_iterator) timers('batch-generator').stop()
output_tensor = model(tokens, position_ids, attention_mask, labels=labels)
return output_tensor, partial(loss_func, loss_mask)
|
get_batch
从数据集中获取一个 batch 的数据,调用
broadcast_data
将数据广播到各个进程
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
| def get_batch(data_iterator): """Generate a batch""" args = get_args() tokenizer = get_tokenizer()
keys = ['text'] datatype = torch.int64
if data_iterator is not None: data = next(data_iterator) else: data = None data_b = tensor_parallel.broadcast_data(keys, data, datatype)
tokens_ = data_b['text'].long() labels = tokens_[:, 1:].contiguous() tokens = tokens_[:, :-1].contiguous()
attention_mask, loss_mask, position_ids = get_ltor_masks_and_position_ids( tokens, tokenizer.eod, args.reset_position_ids, args.reset_attention_mask, args.eod_mask_loss)
return tokens, labels, loss_mask, attention_mask, position_ids
|
pretrain
调用 megatron/training.py
中的
pretrain
函数进行训练
- 调用
initialize_megatron
初始化
megatron
,包括
timers
,tensorboard_writer
等,重点
- 设置
JIT
,不太重要
- 设置时间戳,使用
all_reduce
进行同步
- 使用
setup_model_and_optimizer
初始化模型,优化器和优化策略(warnup
, lr
,
...)
- 使用
train
调用 forward_step_func
进行训练
初始化
megatron.initialize.initialize_megatron
初始化
megatron
set_global_variables
,设置
timers
,tensorboard_writer
等
_initialize_distributed
- 调用
torch.distributed.init_process_group
初始化分布式环境
- 调用
mpu.initialize_model_parallel
初始化模型并行,数据并行的进程组
- 这里提供了具体的分配方式,默认分配方式比较简单,直接通过张量并行大小,流水线并行大小,虚拟流水线大小(决定每张卡有几个连续
stage 段),和流水线并行分割等级(略)
问题
在哪里设置数据的发送和接收?(流水线并行)