diff --git a/.github/workflows/unit_tests.yaml b/.github/workflows/unit_tests.yaml new file mode 100644 index 00000000..d79255f3 --- /dev/null +++ b/.github/workflows/unit_tests.yaml @@ -0,0 +1,76 @@ +name: unit-tests +on: + push: + branches: + - "develop" + - "main" + paths-ignore: + - "cmds/**" + - "**.md" + pull_request: + branches: + - "develop" + - "main" + paths-ignore: + - "cmds/**" + - "**.md" +env: + WORKSPACE_PREFIX: $(echo $GITHUB_WORKSPACE |cut -d '/' -f 1-4) + SLURM_PARTITION: llm_t + +jobs: + check-requirements: + runs-on: [t_cluster] + steps: + - name: mask env + run: | + echo "::add-mask::${{env.WORKSPACE_PREFIX}}" + - uses: actions/checkout@v3 + with: + fetch-depth: 2 + - name: check-requirements + run: | + changed_files=$(git diff --name-only -r HEAD^1 HEAD) + echo $changed_files + if [[ $changed_files =~ "runtime.txt" ]]; then + pip install -r requirements/runtime.txt + fi + + if [[ $changed_files =~ "torch.txt" ]]; then + pip install -r requirements/torch.txt + fi + + + unit_tests_core_pipeline: + if: ${{ always() }} + needs: check-requirements + runs-on: [t_cluster] + timeout-minutes: 20 + steps: + - name: mask env + run: | + echo "::add-mask::${{env.WORKSPACE_PREFIX}}" + - uses: actions/checkout@v3 + + - name: core_pipeline + run: | + source /mnt/petrelfs/share_data/llm_env/env/llm-flash2.0 + export PYTHONPATH=$PWD:$PYTHONPATH + srun -p ${SLURM_PARTITION} --job-name=internlm-ut-${GITHUB_RUN_ID}-${GITHUB_JOB} --quotatype=spot -N 1 -n 1 --gres=gpu:8 python -m pytest -s -v ./tests/test_core/test_pipeline.py + + unit_tests_utils_storage_manager: + if: ${{ always() }} + needs: check-requirements + runs-on: [t_cluster] + timeout-minutes: 20 + steps: + - name: mask env + run: | + echo "::add-mask::${{env.WORKSPACE_PREFIX}}" + - uses: actions/checkout@v3 + + - name: utils_storage_manager + run: | + source /mnt/petrelfs/share_data/llm_env/env/llm-flash2.0 + export PYTHONPATH=$PWD:$PYTHONPATH + srun -p ${SLURM_PARTITION} --job-name=internlm-ut-${GITHUB_RUN_ID}-${GITHUB_JOB} --quotatype=spot -N 1 -n 1 --gres=gpu:8 python -m pytest -s -v ./tests/test_utils/test_storage_manager.py diff --git a/.github/workflows/weekly_test.yaml b/.github/workflows/weekly_test.yaml index 62514596..880d0970 100644 --- a/.github/workflows/weekly_test.yaml +++ b/.github/workflows/weekly_test.yaml @@ -3,6 +3,7 @@ on: push: branches: - "main" + - "develop" env: SLURM_PARTITION: llm_s diff --git a/.gitignore b/.gitignore index 04367e3d..9bdc7ec7 100644 --- a/.gitignore +++ b/.gitignore @@ -149,5 +149,9 @@ memory_trace 13b_train*/ 30b_train*/ fstp_logs/ +configs/7B_train/* +configs/13B_train/* +configs/30B_train/* + atb pip diff --git a/configs/13B_template.py b/configs/13B_template.py index cd33a0e8..d7f33249 100644 --- a/configs/13B_template.py +++ b/configs/13B_template.py @@ -1,7 +1,7 @@ DO_ALERT = False -SEQ_LEN = 4096 -JOB_NAME = "13b_train_" + str({micro_bsz}) + "_" + str({sp}) + "_" + str({checkpoint}) +SEQ_LEN = {seq_len} +JOB_NAME = "13b_train_" + str(SEQ_LEN) + "_" + str({sp}) + "_" + str({intern_overlap}) + "_" + str({checkpoint}) HIDDEN_SIZE = 5120 NUM_ATTENTION_HEAD = 40 MLP_RATIO = 8 / 3 diff --git a/configs/30B_template.py b/configs/30B_template.py index 2334fe9c..ecbf77ba 100644 --- a/configs/30B_template.py +++ b/configs/30B_template.py @@ -1,7 +1,7 @@ DO_ALERT = False -SEQ_LEN = 4096 -JOB_NAME = "7b_train_" + str({micro_bsz}) + "_" + str({sp}) + "_" + str({checkpoint}) +SEQ_LEN = {seq_len} +JOB_NAME = "7b_train_" + str(SEQ_LEN) + "_" + str({sp}) + "_" + str({intern_overlap}) + "_" + str({checkpoint}) HIDDEN_SIZE = 6144 NUM_ATTENTION_HEAD = 48 MLP_RATIO = 8 / 3 diff --git a/configs/7B_MoE4_sft.py b/configs/7B_MoE4_sft.py index e865e792..92a93d09 100644 --- a/configs/7B_MoE4_sft.py +++ b/configs/7B_MoE4_sft.py @@ -4,7 +4,7 @@ SEQ_LEN = 2048 HIDDEN_SIZE = 4096 NUM_ATTENTION_HEAD = 32 -MLP_RATIO = 8 / 3 +MLP_RATIO = 4 / 3 NUM_LAYER = 32 VOCAB_SIZE = 103168 @@ -30,6 +30,14 @@ # 2. the 'content‘ means what states will be loaded, support: "model", "sampler", "optimizer", "scheduler", "all" # 3. the ’ckpt_type‘ means the type of checkpoint to be loaded, now only 'normal' type is supported. load_ckpt_info=dict(path=MODEL_ONLY_FOLDER, content=("model",), ckpt_type="internlm"), + # 'auto_resume' is designed to automatically load the latest checkpoint from 'save_ckpt_folder' when encountering + # training interruptions/hangs caused by hardware failures, using a scheduling system (such as k8s/slurm) + # with an automatic restart mechanism upon training reboot. + # Please be aware that if `auto_resume` is not set (its default value is True), it will not load the checkpoint + # path specified in `load_ckpt_info` by default. + # If you want to initialize your model weights from another model, you must set `auto_resume` to False. + # If you want to train from scratch, please set `auto_resume` to False and 'load_ckpt_info' to None. + auto_resume=True, checkpoint_every=CHECKPOINT_EVERY, async_upload=True, # async ckpt upload. (only work for boto3 ckpt) async_upload_tmp_folder="/dev/shm/internlm_tmp_ckpt/", # path for temporarily files during asynchronous upload. @@ -43,7 +51,7 @@ # micro_num means the number of micro_batch contained in one gradient update micro_num=4, # packed_length = micro_bsz * SEQ_LEN - micro_bsz=1, + micro_bsz=2, # defaults to the value of micro_num valid_micro_num=4, # defaults to 0, means disable evaluate @@ -81,8 +89,8 @@ hybrid_zero_optimizer = dict( # Enable low_level_optimzer overlap_communication - overlap_sync_grad=True, - overlap_sync_param=True, + overlap_sync_grad=False, + overlap_sync_param=False, # bucket size for nccl communication params reduce_bucket_size=512 * 1024 * 1024, # grad clipping @@ -133,7 +141,7 @@ layer_norm_epsilon=1e-5, use_flash_attn=True, num_chunks=1, # if num_chunks > 1, interleaved pipeline scheduler is used. - num_experts=4, + num_experts=8, moe_use_residual=False, moe_gate_k=2, ) @@ -150,8 +158,8 @@ tensor parallel: tensor parallel size, usually the number of GPUs per node. """ parallel = dict( - zero1=-1, - tensor=2, + zero1=dict(size=-1, fsdp=False), + tensor=1, pipeline=dict(size=1, interleaved_overlap=True), sequence_parallel=False, ) diff --git a/configs/7B_template.py b/configs/7B_template.py index d27f74b9..837a7899 100644 --- a/configs/7B_template.py +++ b/configs/7B_template.py @@ -1,8 +1,8 @@ # JOB_NAME = "7b_train" DO_ALERT = False -SEQ_LEN = 4096 -JOB_NAME = "7b_train_" + str({micro_bsz}) + "_" + str({sp}) + "_" + str({checkpoint}) +SEQ_LEN = {seq_len} +JOB_NAME = "7b_train_" + str(SEQ_LEN) + "_" + str({sp}) + "_" + str({intern_overlap}) + "_" + str({checkpoint}) HIDDEN_SIZE = 4096 NUM_ATTENTION_HEAD = 32 MLP_RATIO = 8 / 3 diff --git a/configs/generate.py b/configs/generate.py index 2b7d9ce5..2db5eba8 100644 --- a/configs/generate.py +++ b/configs/generate.py @@ -5,9 +5,9 @@ name = "./configs/" root_names = ["7B_train_", "13B_train_", "30B_train_"] model_size = ["7B", "13B", "30B"] -micro_bsz = [1, 2, 4, 8, 16, 32, 64] -sp = ["none", "megatron", "flash-attn", "intern"] -intern_overlap = [False, False, False, True] +seq_length = [4096, 8192, 16384, 32768, 65536, 131072, 262144] +sp = ["none", "megatron", "flash-attn", "intern", "intern"] +intern_overlap = [False, False, False, True, False] checkpoint = [False, True] for idx, root_name in enumerate(root_names): @@ -31,13 +31,22 @@ line = line.replace("{sp}", f'"{sp_mode}"') line = line.replace("{intern_overlap}", str(intern_overlap[i])) line = line.replace("{checkpoint}", str(ckpt)) - output_file_name = str(mb) + "_" + str(sp_mode) + "_ckpt_" + str(ckpt) + ".py" + output_file_name = ( + str(seq) + + "_" + + str(sp_mode) + + "_overlap_" + + str(intern_overlap[i]) + + "_ckpt_" + + str(ckpt) + + ".py" + ) write_file = folder_path + "/" + output_file_name with open(write_file, "w") as file: file.write(line) log_name = root_name + "_" + output_file_name[:-3] - command = f"srun -p llm_s -N 8 -n 64 --ntasks-per-node=8 --gpus-per-task=1 --time=10 python train.py --config {write_file} --profiling 2>&1 | tee ./fstp_logs/{log_name}.log" + command = f"srun -p llm_s -N 8 -n 64 --ntasks-per-node=8 --gpus-per-task=1 --time=30 python train.py --config {write_file} --profiling 2>&1 | tee ./fstp_logs/{log_name}.log" process = subprocess.Popen(command, shell=True, executable="/bin/bash") process.wait() diff --git a/doc/code-docs/locales/en/LC_MESSAGES/index.po b/doc/code-docs/locales/en/LC_MESSAGES/index.po index d7a1bd3c..52920296 100644 --- a/doc/code-docs/locales/en/LC_MESSAGES/index.po +++ b/doc/code-docs/locales/en/LC_MESSAGES/index.po @@ -7,7 +7,7 @@ msgid "" msgstr "" "Project-Id-Version: InternLM \n" "Report-Msgid-Bugs-To: \n" -"POT-Creation-Date: 2023-09-07 10:56+0800\n" +"POT-Creation-Date: 2023-10-10 17:48+0800\n" "PO-Revision-Date: YEAR-MO-DA HO:MI+ZONE\n" "Last-Translator: FULL NAME \n" "Language: en\n" @@ -16,7 +16,7 @@ msgstr "" "MIME-Version: 1.0\n" "Content-Type: text/plain; charset=utf-8\n" "Content-Transfer-Encoding: 8bit\n" -"Generated-By: Babel 2.12.1\n" +"Generated-By: Babel 2.13.0\n" #: ../../source/index.rst:8 11e029810acf410180311a3c63eb01f4 msgid "InternLM" @@ -46,38 +46,42 @@ msgstr "Parallel Training" msgid "混合精度" msgstr "Mixed Precision" -#: ../../source/index.rst:59 9234725f3c464731993d73607608c874 +#: ../../source/index.rst:59 +msgid "混合专家模型" +msgstr "Mixture-of-Experts" + +#: ../../source/index.rst:67 9234725f3c464731993d73607608c874 msgid "模型备份" msgstr "Model Checkpointing" -#: ../../source/index.rst:67 8e4ce037017f4510b2892a66003877fa +#: ../../source/index.rst:75 8e4ce037017f4510b2892a66003877fa msgid "性能分析" msgstr "Profiler" -#: ../../source/index.rst:75 a36e02819ecd4b448a8cb4ebbecb6600 +#: ../../source/index.rst:83 a36e02819ecd4b448a8cb4ebbecb6600 msgid "训练监控" msgstr "Monitor" -#: ../../source/index.rst:83 b912e292486f455c8b5cdd75962e8ac2 +#: ../../source/index.rst:91 b912e292486f455c8b5cdd75962e8ac2 msgid "训练样例" msgstr "Example" -#: ../../source/index.rst:91 ea9e9281720941a1830e5df7a2badf7a +#: ../../source/index.rst:99 ea9e9281720941a1830e5df7a2badf7a msgid "常见问题" msgstr "Q&A" -#: ../../source/index.rst:99 e08edc5aa1c74965b10084b393b88fae +#: ../../source/index.rst:107 e08edc5aa1c74965b10084b393b88fae msgid "索引和表格" msgstr "Indices and tables" -#: ../../source/index.rst:101 f3fdca059caa49dcad09aa44be7f02d6 +#: ../../source/index.rst:109 f3fdca059caa49dcad09aa44be7f02d6 msgid ":ref:`genindex`" msgstr "" -#: ../../source/index.rst:102 b3791e811315435097bb507edc3f4b9b +#: ../../source/index.rst:110 b3791e811315435097bb507edc3f4b9b msgid ":ref:`modindex`" msgstr "" -#: ../../source/index.rst:103 a164b772960f4ab8b18c7e8820f69f55 +#: ../../source/index.rst:111 a164b772960f4ab8b18c7e8820f69f55 msgid ":ref:`search`" msgstr "" diff --git a/doc/code-docs/locales/en/LC_MESSAGES/moe.po b/doc/code-docs/locales/en/LC_MESSAGES/moe.po new file mode 100644 index 00000000..37654470 --- /dev/null +++ b/doc/code-docs/locales/en/LC_MESSAGES/moe.po @@ -0,0 +1,208 @@ +# SOME DESCRIPTIVE TITLE. +# Copyright (C) 2023, InternLM Team +# This file is distributed under the same license as the InternLM package. +# FIRST AUTHOR , 2023. +# +#, fuzzy +msgid "" +msgstr "" +"Project-Id-Version: InternLM \n" +"Report-Msgid-Bugs-To: \n" +"POT-Creation-Date: 2023-10-10 19:25+0800\n" +"PO-Revision-Date: YEAR-MO-DA HO:MI+ZONE\n" +"Last-Translator: FULL NAME \n" +"Language: en\n" +"Language-Team: en \n" +"Plural-Forms: nplurals=2; plural=(n != 1);\n" +"MIME-Version: 1.0\n" +"Content-Type: text/plain; charset=utf-8\n" +"Content-Transfer-Encoding: 8bit\n" +"Generated-By: Babel 2.12.1\n" + +#: ../../source/moe.rst:2 +msgid "混合专家模型" +msgstr "Mixture-of-Experts" + +#: ../../source/moe.rst:3 +msgid "" +"混合专家模型(Mixture-of-Experts, MoE)是一种特殊的模型结构。 " +"混合专家模型将模型拆分为一系列称为“专家”的子模型,每个“专家” 具有唯一的权重。 " +"混合专家模型可以针对每个输入标记仅激活一个或少量的专家参与运算。 例如,图 :ref:`switch_transformer` 是 `Switch" +" Transformer `_ " +"提出的稀疏混合专家模型结构,其中的前向神经网络(FFN)被分解为多个子网络,在计算时仅有少部分的模型参数参与计算,以实现更有效的计算和资源分配。" +msgstr "" +"Mixture-of-Experts (MoE) is a special model structure. MoE partitions the model into a series of sub-models called \"experts\", " +"each with unique parameters. MoE only activates one or a small number of experts for each input token. For example, the figure :ref:`switch_transformer` " +" shows the sparse MoE architecture proposed by `Switch Transformer `_ . " +"The Forward Neural Network (FFN) is decomposed into multiple sub-networks, and only a small number of model parameters " +"are involved in the calculation to achieve more efficient calculation and resource allocation. " + +#: ../../source/moe.rst:8 +msgid "" +"稀疏混合专家模型通常还包含一个门控(gating)机制,例如图 :ref:`switch_transformer` " +"中的Router网络。门控网络负责选择激活哪些专家参与计算并组合不同专家的预测结果。" +msgstr "" +"Sparse MoE usually also includes a gating mechanism, such as the Router in Figure :ref:`switch_transformer` . " +"The gating network is responsible for selecting which experts to activate and combining the prediction results of " +"different experts." + +#: ../../source/moe.rst:16 +msgid "switch transformer" +msgstr "switch transformer" + +#: ../../source/moe.rst:19 +msgid "参数配置" +msgstr "Parameter Settings" + +#: ../../source/moe.rst:20 +msgid "如果在启动训练时要使用混合专家模型,可进行如下相关配置:" +msgstr "" +"If MoE is expected to be used in the training, please make the following settings in the configuration file:" + +#: ../../source/moe.rst:22 +msgid "模型相关配置" +msgstr "Model related settings" + +#: ../../source/moe.rst:31 +msgid "num_experts:专家网络个数。在InternLM中,每个专家有着相同的网络结构但维护着不同的训练参数。" +msgstr "" +"num_experts: The number of expert networks. In InternLM, each expert has " +"the same network structure but maintains different training parameters." + +#: ../../source/moe.rst:32 +msgid "" +"moe_gate_k:门控策略。决定如何将输入标记路由到不同的专家进行计算。目前InternLM支持top1gating和top2gating两种门控策略。关于这些门控策略的详细的信息可以参考" +" `GShard `_。" +msgstr "" +"moe_gate_k: Gating strategy. Determines how to route input tokens to " +"different experts for calculation. Currently, InternLM supports top1gating" +" and top2gating strategies. For detailed information about " +"these gating strategies, please refer to `GShard `_." + +#: ../../source/moe.rst:34 +msgid "" +"注意:在目前的InternLM中,每个专家都是根据配置文件中HIDDEN_SIZE和MLP_RATIO构造的一个 `SwiGLU网络 `_,同时支持张量并行。用户可以根据需要构造自己的专家网络。" + +msgstr "" +"Note: In the current version of InternLM, each expert is a `SwiGLU network `_ based on " +"HIDDEN_SIZE and MLP_RATIO in the configuration file, and supports tensor parallelism. Users can construct their own expert networks as needed." + +#: ../../source/moe.rst:37 +msgid "损失相关配置" +msgstr "Loss related settings" + +#: ../../source/moe.rst:46 +msgid "" +"在top1gating和top2gating门控策略中,不同的专家处理的标记数量存在差异。为了提高模型效果,应尽量保证输入标记被均匀地路由到不同的专家上。" +"InternLM采用 `GShard `_ 提出的负载平衡损失优化门控策略。 " +"Moe_loss_coeff项决定着负载平衡损失项将如何添加到最终的损失项中( :math:`l=l_{nll}+k·l_{moe}` )。" +"关于该部分的详细信息可以进一步参考 `GShard `_。" +msgstr "" +"In top1gating and top2gating strategies, the number of tokens to process may be different for different experts. " +"In order to improve the model effect, the input tokens should be evenly routed to different experts. " +"InternLM adopts the balancing loss to optimize the gating network proposed by GShard. " +"The moe_loss_coeff determines how the balancing loss should be added to the final loss ( :math:`l=l_{nll}+k·l_{moe}` ). " +"The details can be found in `GShard `_. " + +#: ../../source/moe.rst:49 +msgid "注意:这些参数需要和其他参数一起使用,具体请参考 :doc:`/usage` “训练配置”相关章节的内容。" +msgstr "Note: These parameters need to be used together with other parameters, please refer to :doc:`/usage`: Training Configuration" + +#: ../../source/moe.rst:52 +msgid "模型训练" +msgstr "Model Training" + +#: ../../source/moe.rst:54 +msgid "" +"internlm.model.modeling_moe提供了一个标准的混合专家模型的实现,该模型的网络结构和图 :ref:`switch_transformer` " +"一致,其中使用到internlm.model.moe.MoE实现MoE网络。用户在配置文件中指定模型类型:" +msgstr "" +"internlm.model.modeling_moe provides an implementation of a standard MoE. " +"The model structure is consistent with Figure :ref:`switch_transformer` ," +" which uses internlm.model.moe.MoE to implement the MoE network. " +"To use moe model, specify the model type in the configuration file:" + +#: ../../source/moe.rst:60 +msgid "并配置好稀疏专家网络的相关参数后,就可以像正常启动InternLM一样进行混合专家模型的分布式训练,具体请参考 :doc:`/usage` “启动训练”相关章节的内容。" +msgstr "" +"After configuring the relevant parameters of the sparse MoE, " +"the distributed training can start as the normal training process. please refer to :doc:`/usage`: Start Training" + +#: internlm.model.moe.MoE:1 of +msgid "Initialize an MoE layer." +msgstr "" + +#: internlm.model.moe.MoE of +msgid "参数" +msgstr "parameter" + +#: internlm.model.moe.MoE:3 of +msgid "" +"the hidden dimension of the model, importantly this is also the input and" +" output dimension." +msgstr "" + +#: internlm.model.moe.MoE:5 of +msgid "default=1, the total number of experts per layer." +msgstr "" + +#: internlm.model.moe.MoE:7 of +msgid "default=1, number of ranks in the expert parallel world or group." +msgstr "" + +#: internlm.model.moe.MoE:9 of +msgid "default=1, top-k gating value, only supports k=1 or k=2." +msgstr "" + +#: internlm.model.moe.MoE:11 of +msgid "default=1.0, the capacity of the expert at training time." +msgstr "" + +#: internlm.model.moe.MoE:13 of +msgid "default=1.0, the capacity of the expert at eval time." +msgstr "" + +#: internlm.model.moe.MoE:15 of +msgid "" +"default=4, the minimum capacity per expert regardless of the " +"capacity_factor." +msgstr "" + +#: internlm.model.moe.MoE:17 of +msgid "" +"default=None, noisy gate policy, valid options are 'Jitter', 'RSample' or" +" 'None'." +msgstr "" + +#: internlm.model.moe.MoE:20 of +msgid "default=True, whether to use the default MoE layer." +msgstr "" + +#: internlm.model.moe.MoE:22 of +msgid "" +"default=True, whether to drop tokens - (setting to False is equivalent to" +" infinite capacity)." +msgstr "" + +#: internlm.model.moe.MoE:25 of +msgid "default=True, whether to use Random Token Selection." +msgstr "" + +#: internlm.model.moe.MoE:27 of +msgid "" +"default=False, make this MoE layer a Residual MoE " +"(https://arxiv.org/abs/2201.05596) layer." +msgstr "" + +#: internlm.model.moe.MoE:30 of +msgid "default=None, the torch module that defines the residual MLP." +msgstr "" + +#: ../../source/moe.rst:64 +msgid "注意:InternLM支持用户定义自己的MoE结构。internlm.model.moe.MoE是定义MoE网络的接口,目前使用SwiGLU网络实现了专家模型并支持top1gating和top2gating两种门控策略。用户可以在MoE接口中对专家网络和门控策略进行扩展。" +msgstr "" +"Note: InternLM supports users to define their own MoE structure. " +"internlm.model.moe.MoE is the interface that defines the MoE network. " +"Currently, the SwiGLU network is used to implement the experts and " +"supports two gating strategies: top1gating and top2gating. Users can " +"extend the expert network and gating strategy in the MoE interface as needed." diff --git a/doc/code-docs/source/checkpoint.rst b/doc/code-docs/source/checkpoint.rst index cd9b7553..de0c4cb3 100644 --- a/doc/code-docs/source/checkpoint.rst +++ b/doc/code-docs/source/checkpoint.rst @@ -39,7 +39,7 @@ CheckpointManager load_ckpt_folder=dict(path="local:/mnt/mfs/ckpt", content=["all",], ckpt_type="internlm"), auto_resume=False, # disable auto-resume, internlm will load model checkpoint from the path of 'load_ckpt_folder'. checkpoint_every=CHECKPOINT_EVERY, - async_upload=True, # async ckpt upload. (only work for boto3 ckpt) + async_upload=True, # async ckpt upload. (only work for boto3 and volc ckpt) async_upload_tmp_folder="/dev/shm/internlm_tmp_ckpt/", # path for temporarily files during asynchronous upload. oss_snapshot_freq=int(CHECKPOINT_EVERY / 2), # snapshot ckpt save frequency. ) @@ -67,7 +67,9 @@ InternLM对config中出现的所有存储路径都遵循以下的路径格式约 1. 如果需要使用boto3的路径,需要在运行前提前导入 ``S3_ACCESS_KEY_ID`` 和 ``S3_SECRET_ACCESS_KEY_ID`` 这两个环境变量。 -2. bucket的endpoint一般分为Inside IP和Outside IP,如果可以尽量使用inside IP,会获得更佳的存储速度。 +2. 如果需要使用volc的路径,需要在运行前提前导入 ``VOLC_ACCESS_KEY_ID`` 和 ``VOLC_SECRET_ACCESS_KEY_ID`` 这两个环境变量。 + +3. bucket的endpoint一般分为Inside IP和Outside IP,如果可以尽量使用inside IP,会获得更佳的存储速度。 @@ -114,7 +116,7 @@ config.ckpt 中相关的参数: - ``async_upload_tmp_folder``: 异步上传临时存储路径。参数类型 ``str/None``, 默认值为 ``/dev/shm/{JOB_NAME}_tmp_ckpt/``。 -需要注意的是,异步上传功能仅在backend为boto3时才会有效果,bcakend为local时只支持同步存储。 +需要注意的是,异步上传功能仅在backend为boto3或volc时才会有效果,bcakend为local时只支持同步存储。 ``async_upload_tmp_folder`` 设置的的原则为尽量设置为计算节点的local目录,这样才可以获得最佳的异步上传速度,一般来说建议为 ``/dev/shm`` 或 ``/nvme`` 下的路径,如果使用同步上传,则该路径可不给。 diff --git a/doc/code-docs/source/conf.py b/doc/code-docs/source/conf.py index c752047b..7a434637 100644 --- a/doc/code-docs/source/conf.py +++ b/doc/code-docs/source/conf.py @@ -9,6 +9,8 @@ import os import sys +import torch # noqa # pylint: disable=unused-import + project = "InternLM" copyright = "2023, InternLM Team" author = "InternLM Team" @@ -94,6 +96,10 @@ "apex", "torch", "numpy", + "flash_attn", + "rotary_emb", + "einops", + "torch_scatter", ] # support multi-language docs diff --git a/doc/code-docs/source/index.rst b/doc/code-docs/source/index.rst index 8811af23..da7812ef 100644 --- a/doc/code-docs/source/index.rst +++ b/doc/code-docs/source/index.rst @@ -55,6 +55,14 @@ InternLM mixed_precision +混合专家模型 +------------------- + +.. toctree:: + :maxdepth: 2 + + moe + 模型备份 -------------------- diff --git a/doc/code-docs/source/mixed_precision.rst b/doc/code-docs/source/mixed_precision.rst index fdf1d221..2817fa32 100644 --- a/doc/code-docs/source/mixed_precision.rst +++ b/doc/code-docs/source/mixed_precision.rst @@ -1,5 +1,5 @@ 混合精度 ------------------ +============ 混合精度是指在模型训练的过程中同时使用16位和32位浮点数类型,是一种在最小化精度损失的前提下加速模型训练的方法。 混合精度通过让模型的某些部分使用32位浮点数以保持数值稳定性,并在其余部分利用半精度浮点数加速训练并可以减少内存使用,在评估指标(如准确率)方面仍可以获得同等的训练效果。 @@ -22,10 +22,10 @@ InternLM默认将模型转换为16位浮点数类型进行训练(在配置文 super().__init__() self.linear1 = nn.Linear(4, 1, bias=False) self.linear2 = nn.Linear(1, 4, bias=False) + # set model.linear2 as fp32 module + set_fp32_attr_to_module(model.linear2) model = MlpModel() - # set model.linear2 as fp32 module - set_fp32_attr_to_module(model.linear2) # apply mixed precision model = NaiveAMPModel( @@ -78,4 +78,3 @@ InternLM支持使用TF32训练模型,允许用户在config文件中将 ``dtype torch.backends.cudnn.allow_tf32 = True torch.backends.cuda.matmul.allow_tf32 = True - diff --git a/doc/code-docs/source/moe.rst b/doc/code-docs/source/moe.rst new file mode 100644 index 00000000..29b43a51 --- /dev/null +++ b/doc/code-docs/source/moe.rst @@ -0,0 +1,65 @@ +混合专家模型 +============== +混合专家模型(Mixture-of-Experts, MoE)是一种特殊的模型结构。 +混合专家模型将模型拆分为一系列称为“专家”的子模型,每个“专家” 具有唯一的权重。 +混合专家模型可以针对每个输入标记仅激活一个或少量的专家参与运算。 +例如,图 :ref:`switch_transformer` 是 `Switch Transformer `_ 提出的稀疏混合专家模型结构,其中的前向神经网络(FFN)被分解为多个子网络,在计算时仅有少部分的模型参数参与计算,以实现更有效的计算和资源分配。 + +稀疏混合专家模型通常还包含一个门控(gating)机制,例如图 :ref:`switch_transformer` 中的Router网络。门控网络负责选择激活哪些专家参与计算并组合不同专家的预测结果。 + +.. _switch_transformer: + +.. figure:: ../../imgs/switch_transformer.png + :scale: 40% + :class: with-border + :align: center + + switch transformer + +参数配置 +---------------- +如果在启动训练时要使用混合专家模型,可进行如下相关配置: + +1. 模型相关配置 + +.. code-block:: python + + model = dict( + num_experts=16, + moe_gate_k=1, + ) + +* num_experts:专家网络个数。在InternLM中,每个专家有着相同的网络结构但维护着不同的训练参数。 +* moe_gate_k:门控策略。决定如何将输入标记路由到不同的专家进行计算。目前InternLM支持top1gating和top2gating两种门控策略。关于这些门控策略的详细的信息可以参考 `GShard `_。 + +注意:在目前的InternLM中,每个专家都是根据配置文件中HIDDEN_SIZE和MLP_RATIO构造的一个 `SwiGLU网络 `_,同时支持张量并行。用户可以根据需要构造自己的专家网络。 + + +2. 损失相关配置 + +.. code-block:: python + + loss = dict( + moe_loss_coeff=0.1, + ) + + +在top1gating和top2gating门控策略中,不同的专家处理的标记数量存在差异。为了提高模型效果,应尽量保证输入标记被均匀地路由到不同的专家上。InternLM采用 `GShard `_ 提出的负载平衡损失优化门控策略。 +Moe_loss_coeff项决定着负载平衡损失项将如何添加到最终的损失项中( :math:`l=l_{nll}+k·l_{moe}` )。关于该部分的详细信息可以进一步参考 `GShard `_。 + +注意:这些参数需要和其他参数一起使用,具体请参考 :doc:`/usage` “训练配置”相关章节的内容。 + +模型训练 +---------------- + +internlm.model.modeling_moe提供了一个标准的混合专家模型的实现,该模型的网络结构和图 :ref:`switch_transformer` 一致,其中使用到internlm.model.moe.MoE实现MoE网络。用户在配置文件中指定模型类型: + +.. code-block:: python + + model_type = "INTERNLM_MoE" + +并配置好稀疏专家网络的相关参数后,就可以像正常启动InternLM一样进行混合专家模型的分布式训练,具体请参考 :doc:`/usage` “启动训练”相关章节的内容。 + +.. autoclass:: internlm.model.moe.MoE + +注意:InternLM支持用户定义自己的MoE结构。internlm.model.moe.MoE是定义MoE网络的接口,目前使用SwiGLU网络实现了专家模型并支持top1gating和top2gating两种门控策略。用户可以在MoE接口中对专家网络和门控策略进行扩展。 diff --git a/doc/imgs/ckpt_path_format_CN.png b/doc/imgs/ckpt_path_format_CN.png index 0307d22f..0b21f54b 100644 Binary files a/doc/imgs/ckpt_path_format_CN.png and b/doc/imgs/ckpt_path_format_CN.png differ diff --git a/doc/imgs/switch_transformer.png b/doc/imgs/switch_transformer.png new file mode 100644 index 00000000..cbeb7aa7 Binary files /dev/null and b/doc/imgs/switch_transformer.png differ diff --git a/internlm/core/context/__init__.py b/internlm/core/context/__init__.py index 5cbb8328..6f1142cb 100644 --- a/internlm/core/context/__init__.py +++ b/internlm/core/context/__init__.py @@ -1,4 +1,5 @@ from .parallel_context import ( + IS_SEQUENCE_PARALLEL, IS_TENSOR_PARALLEL, Config, ParallelContext, @@ -29,6 +30,7 @@ __all__ = [ "Config", "IS_TENSOR_PARALLEL", + "IS_SEQUENCE_PARALLEL", "global_context", "ParallelContext", "ParallelMode", diff --git a/internlm/core/context/parallel_context.py b/internlm/core/context/parallel_context.py index 997bd464..633dfe40 100644 --- a/internlm/core/context/parallel_context.py +++ b/internlm/core/context/parallel_context.py @@ -25,6 +25,7 @@ from .random import add_seed, get_seeds, set_mode IS_TENSOR_PARALLEL = "is_tensor_parallel" +IS_SEQUENCE_PARALLEL = "is_sequence_parallel" logger = get_logger(__file__) @@ -329,7 +330,8 @@ def is_pipeline_last_stage(self, ignore_virtual=False): return self.is_last_rank(ParallelMode.PIPELINE) def is_no_pp_or_last_stage(self): - return not self.is_initialized(ParallelMode.PIPELINE) or self.is_pipeline_last_stage() + # NOTICE!!!, this will ignore virutal stage + return not self.is_initialized(ParallelMode.PIPELINE) or self.is_last_rank(ParallelMode.PIPELINE) def get_world_size(self, parallel_mode: ParallelMode): """Returns the world size for `parallel_mode`. diff --git a/internlm/core/scheduler/pipeline_scheduler.py b/internlm/core/scheduler/pipeline_scheduler.py index 6f2558c9..efc9187a 100644 --- a/internlm/core/scheduler/pipeline_scheduler.py +++ b/internlm/core/scheduler/pipeline_scheduler.py @@ -977,11 +977,12 @@ def _run_warmup_loop( if gpc.is_pipeline_last_stage(): output_obj = None + assert output_obj is None or output_obj.dtype == self.dtype + # Send and receive tensors as appropriate (send tensors computed # in this iteration; receive tensors for next iteration). if k != (num_warmup_microsteps - 1) or not receive_extra_backward: # Normal warm-up communication process, or no need to prepare backward input for the 1F1B stage - assert output_obj.dtype == self.dtype input_obj = comm.send_forward_recv_forward( output_obj, input_shape, @@ -993,7 +994,6 @@ def _run_warmup_loop( if self._communication_overlap: # In this case, we should handle forward and backward communication separately, consistent with the # overlap version of the 1F1B stage - assert output_obj.dtype == self.dtype input_obj = comm.send_forward_recv_forward( output_obj, input_shape, @@ -1010,7 +1010,6 @@ def _run_warmup_loop( else: # In this case, we should handle forward and backward communication together, consistent with the # non-overlap version of the 1F1B stage - assert output_obj.dtype == self.dtype input_obj, output_obj_grad = comm.send_forward_backward_recv_forward_backward( output_obj, None, # no backward grad to send @@ -1082,6 +1081,7 @@ def _run_1f1b_loop_with_overlap( else: input_obj_shape = self._input_obj_shapes[next_forward_chunk_id] + assert output_obj is None or output_obj.dtype == self.dtype forward_async_communicator = comm.AsynCommunicator( output_obj, input_obj_shape, @@ -1203,7 +1203,7 @@ def _run_1f1b_loop_without_overlap( output_shape = self._output_obj_shapes[next_backward_chunk_id] if recv_next else None # Communicate objs. - assert output_obj.dtype == self.dtype + assert output_obj is None or output_obj.dtype == self.dtype input_obj, output_obj_grad = comm.send_forward_backward_recv_forward_backward( output_obj, input_obj_grad, diff --git a/internlm/initialize/launch.py b/internlm/initialize/launch.py index 0e74f76b..4eef4ded 100644 --- a/internlm/initialize/launch.py +++ b/internlm/initialize/launch.py @@ -365,7 +365,7 @@ def args_sanity_check(): assert ( not optim_ckpt.overlap_sync_grad & optim_ckpt.overlap_sync_param ), "not support overlap and moe at the same time" - assert gpc.config.parallel.zero1 == -1, "moe only support zero1, set zero1=-1 can fix this" + assert gpc.config.parallel.zero1.size == -1, "moe only support zero1, set zero1=dict(size=-1,...) can fix this" def launch( diff --git a/internlm/model/modeling_internlm.py b/internlm/model/modeling_internlm.py index 228e1e1c..bd335c1a 100644 --- a/internlm/model/modeling_internlm.py +++ b/internlm/model/modeling_internlm.py @@ -9,7 +9,7 @@ from flash_attn.modules.mlp import ParallelFusedMLP from torch import nn -from internlm.core.context import IS_TENSOR_PARALLEL, ParallelMode +from internlm.core.context import IS_SEQUENCE_PARALLEL, IS_TENSOR_PARALLEL, ParallelMode from internlm.core.context.parallel_context import global_context as gpc from internlm.initialize.initialize_tensor import normal_, scaled_init_method_normal from internlm.model.embedding import Embedding1D @@ -142,6 +142,12 @@ def __init__( for _, param in self.mlp.named_parameters(): if gpc.get_world_size(ParallelMode.TENSOR) > 1: setattr(param, IS_TENSOR_PARALLEL, True) + for param in self.norm1.parameters(): + if gpc.config.parallel.sequence_parallel is True: + setattr(param, IS_SEQUENCE_PARALLEL, True) + for param in self.norm2.parameters(): + if gpc.config.parallel.sequence_parallel is True: + setattr(param, IS_SEQUENCE_PARALLEL, True) self.dropout2 = nn.Dropout(drop_rate) self.use_swiglu = use_swiglu @@ -374,6 +380,10 @@ def __init__( normal_(std=0.0052)(param) if gpc.get_world_size(ParallelMode.TENSOR) > 1: setattr(param, IS_TENSOR_PARALLEL, True) + for param in self.norm.parameters(): + if gpc.config.parallel.sequence_parallel is True: + setattr(param, IS_SEQUENCE_PARALLEL, True) + self.parallel_output = parallel_output def forward(self, hidden_states=None, cu_seqlens=None, input_ids=None, indexes=None, inference_params=None): diff --git a/internlm/model/moe.py b/internlm/model/moe.py index 0865097f..173daea9 100644 --- a/internlm/model/moe.py +++ b/internlm/model/moe.py @@ -18,7 +18,6 @@ class MoE(torch.nn.Module): Arguments: hidden_size (int): the hidden dimension of the model, importantly this is also the input and output dimension. - expert (torch.nn.Module): the torch module that defines the expert (e.g., MLP, torch.linear). num_experts (int, optional): default=1, the total number of experts per layer. ep_size (int, optional): default=1, number of ranks in the expert parallel world or group. k (int, optional): default=1, top-k gating value, only supports k=1 or k=2. @@ -26,10 +25,10 @@ class MoE(torch.nn.Module): eval_capacity_factor (float, optional): default=1.0, the capacity of the expert at eval time. min_capacity (int, optional): default=4, the minimum capacity per expert regardless of the capacity_factor. noisy_gate_policy (str, optional): default=None, noisy gate policy, valid options are 'Jitter', 'RSample' - or 'None'. + or 'None'. using_default_moe (bool, optional): default=True, whether to use the default MoE layer. drop_tokens (bool, optional): default=True, whether to drop tokens - (setting to False is equivalent to - infinite capacity). + infinite capacity). use_rts (bool, optional): default=True, whether to use Random Token Selection. moe_use_residual (bool, optional): default=False, make this MoE layer a Residual MoE (https://arxiv.org/abs/2201.05596) layer. @@ -53,6 +52,7 @@ def __init__( device=None, dtype=None, ): + super().__init__() assert ( @@ -72,7 +72,6 @@ def __init__( gpc.expert_parallel_group_names.append(expert_group_name) experts = torch.nn.ModuleList( [ - # TODO have trouble when use internlm.model.linear.FeedForward FeedForward( hidden_size, int(hidden_size * gpc.config.model.mlp_ratio), diff --git a/internlm/monitor/monitor.py b/internlm/monitor/monitor.py index 6a3b9dc4..8c3943db 100644 --- a/internlm/monitor/monitor.py +++ b/internlm/monitor/monitor.py @@ -9,7 +9,7 @@ from internlm.monitor.alert import send_feishu_msg_with_webhook from internlm.utils.common import SingletonMeta -from .utils import get_job_key, set_env_var +from .utils import get_job_key, set_env_var, try_import_send_exception def send_alert_message(address: str = None, title: str = None, message: str = None): @@ -132,6 +132,7 @@ def __init__(self, loss_spike_limit: float = 1.5) -> None: self.monitor_thread = None self.loss_spike_limit = loss_spike_limit self.last_step_loss = -1 + self.send_exception = try_import_send_exception() def monitor_loss_spike(self, alert_address: str = None, step_count: int = 0, cur_step_loss: float = 0.0): """Check loss value, if loss spike occurs, send alert message to Feishu.""" @@ -154,6 +155,8 @@ def monitor_exception(self, alert_address: str = None, excp_info: str = None): format_trace = "" for line in filtered_trace: format_trace += "\n" + line + if self.send_exception: + self.send_exception(format_trace, gpc.get_global_rank()) send_alert_message( address=alert_address, message=f"Catch Exception from {socket.gethostname()} with rank id {gpc.get_global_rank()}:{format_trace}", @@ -165,9 +168,12 @@ def handle_sigterm(self, alert_address: str = None): def sigterm_handler(sys_signal, frame): print("receive frame: ", frame) print("receive signal: ", sys_signal) + message = f"Process received signal {signal} and exited." + if self.send_exception: + self.send_exception(message, gpc.get_global_rank()) send_alert_message( address=alert_address, - message=f"Process received signal {signal} and exited.", + message=message, ) signal.signal(signal.SIGTERM, sigterm_handler) diff --git a/internlm/monitor/utils.py b/internlm/monitor/utils.py index 34360b52..08818978 100644 --- a/internlm/monitor/utils.py +++ b/internlm/monitor/utils.py @@ -32,3 +32,16 @@ def get_job_name(): def get_job_key(): return f"{get_job_id()}_{get_job_name()}" + + +def try_import_send_exception(): + """ + Try import send_exception from uniscale_monitoring, if failed, return None + + """ + try: + from uniscale_monitoring import send_exception_msg as send_exception + + return send_exception + except ImportError: + return None diff --git a/internlm/solver/optimizer/fsdp_optimizer.py b/internlm/solver/optimizer/fsdp_optimizer.py index 60001856..ab15917e 100644 --- a/internlm/solver/optimizer/fsdp_optimizer.py +++ b/internlm/solver/optimizer/fsdp_optimizer.py @@ -79,6 +79,10 @@ def backward(self, loss, retain_graph=False): def _compute_norm_with_fsdp_flatten(self, group_id): params = [p for p in self._fp16_param_groups[group_id] if p.untyped_storage().size() != 0] gradients = [p.grad for p in params if p.untyped_storage().size() != 0] + + norm_group = 0 + if len(params) <= 0 or len(gradients) <= 0: + return norm_group norm_group = compute_norm(gradients=gradients, parameters=params, last_stage=True) return norm_group @@ -126,6 +130,8 @@ def step(self): # create gradient for fp32 params for group_idx in range(len(self.param_groups)): + if len(self._fp32_param_tensor_groups[group_idx]) <= 0: + continue dtype = self._fp32_param_tensor_groups[group_idx][0].dtype fp16_params = [p for p in self._fp16_param_groups[group_idx] if p.untyped_storage().size() != 0] grad_fp32 = [p.grad.to(dtype) for p in fp16_params] diff --git a/internlm/solver/optimizer/hybrid_zero_optim.py b/internlm/solver/optimizer/hybrid_zero_optim.py index 247f8212..8b63fa04 100644 --- a/internlm/solver/optimizer/hybrid_zero_optim.py +++ b/internlm/solver/optimizer/hybrid_zero_optim.py @@ -9,7 +9,7 @@ import torch.distributed as dist from torch.optim import Optimizer -from internlm.core.context import Config, ParallelMode +from internlm.core.context import IS_SEQUENCE_PARALLEL, Config, ParallelMode from internlm.core.context import global_context as gpc from internlm.monitor import send_alert_message from internlm.solver.optimizer.store import ( @@ -35,7 +35,7 @@ from internlm.utils.timeout import llm_timeout from .base_optimizer import BaseOptimizer -from .utils import compute_norm +from .utils import compute_layer_norm, compute_norm, compute_param_norm inf = math.inf logger = get_logger(__file__) @@ -309,6 +309,14 @@ def _define_and_attach(param, reduce_rank=None): param=param, reduce_rank=reduce_rank, ) + def reduction_sp_func(): + handle = reduce_tensor( + param.grad, + dtype=None, + dst_rank=reduce_rank, + parallel_mode=ParallelMode.TENSOR, + ) + handle.wait() # define hook # NOT IMPORTANT BUT GOOD TO KNOW: @@ -320,6 +328,18 @@ def reduce_grad_hook(*args): # pylint: disable=W0613 if self.skip_grad_reduce is False: reduction_func() + # define hook for sequence_parallel + def reduce_grad_hook_sp(*args): # pylint: disable=W0613 + if self.skip_grad_reduce is False: + reduction_sp_func() + + # if sequence_parallel is True, + # the grad of norm should be all-reduce across the tp process group + if gpc.config.parallel.sequence_parallel is True: + if hasattr(param, IS_SEQUENCE_PARALLEL) and getattr(param, IS_SEQUENCE_PARALLEL) is True: + accum_grad_obj_sp = get_grad_accumulate_object(param) + accum_grad_obj_sp.register_hook(reduce_grad_hook_sp) + accum_grad_obj.register_hook(reduce_grad_hook) _define_and_attach(param, reduce_rank) @@ -569,6 +589,29 @@ def _compute_norm_with_stage( return norm + def _compute_param_norm_stage( + self, group_id: int = 0, last_bucket: bool = False, last_stage: bool = False, previous_param_norms=None + ): + # compute norm for gradients that have been reduced + params, grads = self._param_store.get_reduced_param_for_compute_norm(group_id=group_id, last_bucket=last_bucket) + + total_param_norms = {} + if len(params) == 0: + dtype = self.param_groups[group_id]["dtype"] + grads = [self.padding_grad.to(dtype)] + params = [self.padding_tensor.to(dtype)] + + if self._clip_grad_norm > 0: + total_param_norms = compute_param_norm( + grads, + params, + last_stage=last_stage, + previous_param_norms=previous_param_norms, + zero_mode=self._broadcast_parallel_mode[group_id], + is_moe_group=self._is_moe_group(self.optim.param_groups[group_id]), + ) + return total_param_norms + @llm_timeout(func_name="optim_step") def step(self, closure=None): """Performs a single optimization step. @@ -600,8 +643,11 @@ def step(self, closure=None): # compute norm for gradients in the before bucket groups_norms = [] + groups_param_norms = [] for group_id in range(self.num_param_groups): groups_norms.append(self._compute_norm_with_stage(group_id=group_id)) + if gpc.config.get("grad_norm_profiling", False): + groups_param_norms.append(self._compute_param_norm_stage(group_id=group_id)) # clear reduced grads # grads in the last bucket is reduced @@ -613,6 +659,8 @@ def step(self, closure=None): self._param_store.clear_grads_of_previous_reduced_params() # compute norm for gradients in the last bucket total_norms = {} + total_param_norms = {} + total_layer_norms = {} for group_id in range(self.num_param_groups): group_name = self.param_groups[group_id]["name"] if "name" in self.param_groups[group_id] else "default" group_name = f"{group_id}_{group_name}" @@ -622,6 +670,16 @@ def step(self, closure=None): last_stage=True, previous_norm=groups_norms[group_id], ) + if gpc.config.get("grad_norm_profiling", False): + param_norms = self._compute_param_norm_stage( + group_id=group_id, + last_bucket=True, + last_stage=True, + previous_param_norms=groups_param_norms[group_id], + ) + total_layer_norms[group_name], total_param_norms[group_name] = compute_layer_norm( + param_norms=param_norms, loss_scale=self.loss_scale.item() + ) # Need to allreduce(avg) the norms across different ranks because moe params will not be synced # during allreduce @@ -636,9 +694,12 @@ def step(self, closure=None): self._sync_grad() timer("sync_grad").stop() - res = self._step(closure=closure, norms=total_norms) + state, global_norms = self._step(closure=closure, norms=total_norms) + if gpc.config.get("grad_norm_profiling", False): + global_norms["layer_norms"] = total_layer_norms + global_norms["param_norms"] = total_param_norms - return res + return state, global_norms def _step(self, closure=None, norms=None): assert closure is None, "closure is not supported by step()" diff --git a/internlm/solver/optimizer/utils.py b/internlm/solver/optimizer/utils.py index f4816a7d..982a2466 100644 --- a/internlm/solver/optimizer/utils.py +++ b/internlm/solver/optimizer/utils.py @@ -15,7 +15,7 @@ from internlm.core.context import ParallelMode from internlm.core.context import global_context as gpc from internlm.core.naive_amp import NaiveAMPModel -from internlm.utils.common import get_tensor_norm, move_norm_to_cuda +from internlm.utils.common import get_current_device, get_tensor_norm, move_norm_to_cuda from internlm.utils.logger import get_logger from internlm.utils.parallel import is_model_parallel_parameter @@ -209,6 +209,49 @@ def calc_lp(grads, norm_type): return norm +def reduce_grads(gradients, parameters, fine_grained=False): + parallel_grads = [] + if fine_grained: + parallel_grads = {} + + def append_grad(g, p): + if fine_grained: + param_name = p.param_name if hasattr(p, "param_name") else "unknown-padding" + if param_name not in parallel_grads: + parallel_grads[param_name] = [] + parallel_grads[param_name].append(g.data.float()) + else: + parallel_grads.append(g.data.float()) + + for g, p in zip(gradients, parameters): + # TODO: consider the pipeline shared parameter + if ( + gpc.is_initialized(ParallelMode.PIPELINE) + and hasattr(p, "pipeline_shared_module_pg") + and dist.get_rank(p.pipeline_shared_module_pg) == 0 + ): # if shared between different pipe, only count o + append_grad(g, p) + elif ( + gpc.is_initialized(ParallelMode.PIPELINE) + and hasattr(p, "pipeline_shared_module_pg") + and dist.get_rank(p.pipeline_shared_module_pg) != 0 + ): + continue + elif ( + gpc.is_initialized(ParallelMode.TENSOR) + and not is_model_parallel_parameter(p) + and gpc.get_local_rank(ParallelMode.TENSOR) == 0 + ): # if not used in each chunk, such as layernorm + append_grad(g, p) + elif is_model_parallel_parameter(p): + append_grad(g, p) + elif gpc.get_local_rank(ParallelMode.TENSOR) != 0: + continue + else: + raise RuntimeError("Should not arrive here") + return parallel_grads + + def compute_norm( gradients, parameters, last_stage=False, previous_norm=None, norm_type=2, zero_mode=ParallelMode.ZERO1 ): @@ -247,33 +290,7 @@ def compute_norm( ) total_norm = total_norm_cuda[0].item() else: - tensor_parallel_grads = [] - for g, p in zip(gradients, parameters): - # TODO: consider the pipeline shared parameter - if ( - gpc.is_initialized(ParallelMode.PIPELINE) - and hasattr(p, "pipeline_shared_module_pg") - and dist.get_rank(p.pipeline_shared_module_pg) == 0 - ): # if shared between different pipe, only count o - tensor_parallel_grads.append(g.data.float()) - elif ( - gpc.is_initialized(ParallelMode.PIPELINE) - and hasattr(p, "pipeline_shared_module_pg") - and dist.get_rank(p.pipeline_shared_module_pg) != 0 - ): - continue - elif ( - gpc.is_initialized(ParallelMode.TENSOR) - and not is_model_parallel_parameter(p) - and gpc.get_local_rank(ParallelMode.TENSOR) == 0 - ): # if not used in each chunk, such as layernorm - tensor_parallel_grads.append(g.data.float()) - elif is_model_parallel_parameter(p): - tensor_parallel_grads.append(g.data.float()) - elif gpc.get_local_rank(ParallelMode.TENSOR) != 0: - continue - else: - raise RuntimeError("Should not arrive here") + tensor_parallel_grads = reduce_grads(gradients, parameters) if norm_type == 2.0 and enable_cuda_kernels: tensor_parallel_norm = calc_l2_norm(tensor_parallel_grads) ** norm_type @@ -319,6 +336,124 @@ def compute_norm( return total_norm +def compute_param_norm( + gradients, + parameters, + last_stage=False, + previous_param_norms=None, + norm_type=2, + zero_mode=ParallelMode.ZERO1, + is_moe_group=False, +): + """Get the norm of params + Arguments: + gradients (Iterable[Tensor]): The gradient value. + parameters (Iterable[Tensor]): The parameter each gradient corresponds to. + norm_type (float or int): type of the used p-norm. Can be ``'inf'`` for + infinity norm. + + Returns: + The norm of the parameters. + """ + enable_cuda_kernels = gradients[0].device.type == "cuda" + # Norm parameters. + norm_type = float(norm_type) + total_param_norms = {} + + param_grads = reduce_grads(gradients, parameters, fine_grained=True) + + param_norms = {} + for param_name, grads in param_grads.items(): + if norm_type == inf: + param_norm = max(g.data.abs().max() for g in grads) + elif norm_type == 2.0 and enable_cuda_kernels: + param_norm = calc_l2_norm(grads) ** norm_type + else: + param_norm = calc_lp(grads, norm_type) + param_norms[param_name] = param_norm.item() if torch.is_tensor(param_norm) else param_norm + + if last_stage is False: + return param_norms + + if previous_param_norms is not None: + for key, value in previous_param_norms.items(): + if key not in param_norms: + param_norms[key] = value + continue + + if norm_type == inf: + param_norms[key] = max(param_norms[key], value) + else: + param_norms[key] += value + + # model parallel + model_parallel_param_norms = {} + if gpc.is_initialized(ParallelMode.MODEL): + parallel_param_norms = [None for _ in range(gpc.get_world_size(ParallelMode.MODEL))] + dist.all_gather_object(parallel_param_norms, param_norms, group=gpc.get_group(ParallelMode.MODEL)) + for local_param_norm in parallel_param_norms: + for param_name, param_norm in local_param_norm.items(): + if param_name not in model_parallel_param_norms: + model_parallel_param_norms[param_name] = 0.0 + if norm_type == inf: + model_parallel_param_norms[param_name] = max(model_parallel_param_norms[param_name], param_norm) + else: + model_parallel_param_norms[param_name] += param_norm + + # zero parallel + zero_param_norms = [None for _ in range(gpc.get_world_size(zero_mode))] + dist.all_gather_object(zero_param_norms, model_parallel_param_norms, group=gpc.get_group(zero_mode)) + for local_param_norm in zero_param_norms: + for param_name, param_norm in local_param_norm.items(): + if param_name not in total_param_norms: + total_param_norms[param_name] = 0.0 + if norm_type == inf: + total_param_norms[param_name] = max(total_param_norms[param_name], param_norm) + else: + total_param_norms[param_name] += param_norm + + # moe + if is_moe_group: + pg = gpc.get_group(ParallelMode.EXPERT) + scaled_param_norm = torch.cuda.FloatTensor(list(total_param_norms.values()), device=get_current_device()) + scaled_param_norm = scaled_param_norm / float(gpc.get_world_size(ParallelMode.EXPERT)) + dist.all_reduce(scaled_param_norm, group=pg) + for i, param_name in enumerate(total_param_norms.keys()): + total_param_norms[param_name] = scaled_param_norm[i].item() + + # scale + for param_name, param_norm in total_param_norms.items(): + if param_norm in (inf, -inf): + total_param_norms[param_name] = -1 + elif math.isnan(param_norm): + total_param_norms[param_name] = -2 + + return total_param_norms + + +def compute_layer_norm(param_norms, loss_scale): + """ + compute layer norm by parameter norms + """ + param_norms_groupby_layer = {} + layer_norms = {} + + for param_name, param_norm in param_norms.items(): + layer_name, param_key = param_name.split("-") + if layer_name not in param_norms_groupby_layer: + param_norms_groupby_layer[layer_name] = {} + if layer_name not in layer_norms: + layer_norms[layer_name] = 0.0 + + if param_norm not in (-1, -2): + param_norm = param_norm**0.5 / loss_scale + + param_norms_groupby_layer[layer_name][param_key] = param_norm + layer_norms[layer_name] += param_norm + + return layer_norms, param_norms_groupby_layer + + class BaseGradScaler(ABC): """A base class for the gradient scaler. diff --git a/internlm/train/training_internlm.py b/internlm/train/training_internlm.py index 0b605e53..df3fa88d 100644 --- a/internlm/train/training_internlm.py +++ b/internlm/train/training_internlm.py @@ -1,6 +1,7 @@ #!/usr/bin/env python # -*- encoding: utf-8 -*- +import copy import functools import time from functools import partial @@ -53,7 +54,11 @@ from internlm.utils.common import DummyProfile from internlm.utils.logger import get_logger from internlm.utils.megatron_timers import megatron_timer as timer -from internlm.utils.parallel import sync_model_param, sync_model_param_within_tp +from internlm.utils.parallel import ( + set_model_params_layer_name, + sync_model_param, + sync_model_param_within_tp, +) from internlm.utils.registry import MODEL_INITIALIZER from internlm.utils.timeout import llm_timeout @@ -159,6 +164,10 @@ def initialize_optimizer(model: Union[nn.Module, nn.ModuleList]): Returns: A tuple of (optimizer, beta2_scheduler, lr_scheduler). """ + if gpc.config.get("grad_norm_profiling", False): + # set the layer name as an attribute of the model parameters + set_model_params_layer_name(model) + if gpc.config.hybrid_zero_optimizer.overlap_sync_param: param_bcast_sync_handler = ParamBcastSyncHandler(model) else: @@ -528,6 +537,21 @@ def record_current_batch_training_metrics( for key, value in acc_perplex.items(): infos[key] = value + if gpc.config.get("grad_norm_profiling", False): + layer_norms = copy.deepcopy(grad_norm["layer_norms"]) + param_norms = copy.deepcopy(grad_norm["param_norms"]) + for group_name, value in layer_norms.items(): + if value: + title = f"laye_norm_group_{group_name}" + writer.add_scalars(key=title, value=value, step=train_state.step_count) + for group_name, layer_group in param_norms.items(): + if layer_group: + for layer_name, param_group in layer_group.items(): + title = f"param_norm_{layer_name}_{group_name}" + writer.add_scalars(key=title, value=param_group, step=train_state.step_count) + del grad_norm["layer_norms"] + del grad_norm["param_norms"] + line = "" for key, value in infos.items(): line += f"{key}={value} " diff --git a/internlm/utils/model_checkpoint.py b/internlm/utils/model_checkpoint.py index d63ed7a3..4b3f7d5b 100644 --- a/internlm/utils/model_checkpoint.py +++ b/internlm/utils/model_checkpoint.py @@ -556,6 +556,18 @@ def load_optimizer_checkpoint(folder, optim): f"Please check whether loading ckpts are saved with the HybridZeroOptimizer." ) + # compatible with old code that only have one param group, need to align with both parameter groups + if len(states["base_optim_states"]["param_groups"]) == 1: + for group in optim.param_groups: + # for new added empty group, since it has no params, just create it fakely + if len(group["params"]) == 0: + states["base_optim_states"]["param_groups"].append(group) + # for origin group, create new added attributes in recent updates + else: + saved_group = states["base_optim_states"]["param_groups"][0] + saved_group["dp_mode"] = group["dp_mode"] + saved_group["dtype"] = group["dtype"] + optim.load_state_dict(states) del states torch.cuda.empty_cache() @@ -598,6 +610,10 @@ def load_scheduler(ckpt_path: str, lr_scheduler, optimizer, train_state: TrainSt lr_scheduler.load_state_dict(scheduler_states) lr_scheduler.last_epoch = train_state.step_count + 1 + # compatible with old code that only have one param group + if len(base_lrs) == 1: + base_lrs = base_lrs * len(optimizer.param_groups) + ratios = [learning_rate / lr for lr in base_lrs] for idx, param_group in enumerate(optimizer.param_groups): param_group["lr"] = param_group["lr"] * ratios[idx] diff --git a/internlm/utils/parallel.py b/internlm/utils/parallel.py index 3029af59..6e5384f5 100644 --- a/internlm/utils/parallel.py +++ b/internlm/utils/parallel.py @@ -2,9 +2,11 @@ # -*- encoding: utf-8 -*- import torch.distributed as dist +from torch import nn from internlm.core.context import IS_TENSOR_PARALLEL, ParallelMode from internlm.core.context import global_context as gpc +from internlm.core.naive_amp import NaiveAMPModel def is_model_parallel_parameter(p): @@ -61,3 +63,31 @@ def get_parallel_log_file_name(): f"tp={gpc.get_local_rank(ParallelMode.TENSOR)}_pp={gpc.get_local_rank(ParallelMode.PIPELINE)}" ) return log_file_name + + +def set_model_params_layer_name(model): + r"""Set the layer name as an attribute of the model parameters. + Args: + model (:class:`torch.nn.Module`): A pyTorch model on whose parameters you check the consistency. + """ + if not isinstance(model, nn.ModuleList): + model = [model] + + for _chunk in model: + if isinstance(_chunk, NaiveAMPModel): + _chunk = _chunk.model + # Create a unique layer name based on the block's class name and index + for _, children in _chunk.named_children(): + if isinstance(children, nn.ModuleList): + for idx, block in enumerate(children): + for param_name, param in block.named_parameters(): + layer_name = f"{block.__class__.__name__}Block{idx}" + layer_param_name = f"{layer_name}-{param_name}" + param.__setattr__("layer_name", layer_name) + param.__setattr__("param_name", layer_param_name) + else: + for param_name, param in children.named_parameters(): + layer_name = f"{children.__class__.__name__}" + layer_param_name = f"{layer_name}-{param_name}" + param.__setattr__("layer_name", layer_name) + param.__setattr__("param_name", f"{layer_name}-{param_name}") diff --git a/internlm/utils/simple_memory_profiler.py b/internlm/utils/simple_memory_profiler.py index 9caf0a2b..8a688edf 100644 --- a/internlm/utils/simple_memory_profiler.py +++ b/internlm/utils/simple_memory_profiler.py @@ -424,7 +424,9 @@ def _inner_activation_trace_hook( layer_name, output.element_size() * output.nelement(), flush=False ) - def _activation_trace_hook_forward(self, chunk_id: int, model: Any, inputs: Any, output: torch.Tensor) -> None: + def _activation_trace_hook_forward( + self, chunk_id: int, model: Any, inputs: Any, output: Any # pylint: disable=W0613 + ) -> None: """ Hook function to trace the activation memory usage for a forward pass. @@ -437,7 +439,6 @@ def _activation_trace_hook_forward(self, chunk_id: int, model: Any, inputs: Any, None """ del model, inputs - assert isinstance(output, torch.Tensor), f"invalid output type: {type(output)}" if self._stoped: return diff --git a/internlm/utils/storage_manager.py b/internlm/utils/storage_manager.py index a3f91227..89f8023f 100644 --- a/internlm/utils/storage_manager.py +++ b/internlm/utils/storage_manager.py @@ -25,6 +25,7 @@ try: import boto3 import botocore + import tos except ImportError: pass @@ -32,6 +33,7 @@ logger = get_logger(__file__) boto3_url_re = re.compile(r"([^\.]+)\.([\d\.]+)") +volc_url_re = re.compile(r"^(.*?)\.(.*)$") MB = 1024**2 @@ -122,6 +124,47 @@ def unpack_boto3_nosave_meta(meta): return meta.client, meta.bucket_name, meta.file_path +class VolcMetaInfo: + """Volc meta info for save/load etc.""" + + def __init__( + self, + is_async, + handler: StorageClient, + bucket_name: str, + endpoint: str, + region: str, + file_path: str, + async_upload_fn: callable, + local_nvme_path=None, + ) -> None: + # all need info. + self.client = handler + self.bucket_name = bucket_name + self.file_path = file_path + # only save need info. + self.local_nvme_path = local_nvme_path + self.is_async = is_async + self.endpoint = endpoint + self.region = region + self.async_upload_fn = async_upload_fn + + def __str__(self) -> str: + return f"is_async: {self.is_async}, bucket_name:{self.bucket_name}, endpoint:{self.endpoint}, \ +region:{self.region}, local_nvme_path: {self.local_nvme_path}" + + @staticmethod + def unpack_volc_save_meta(meta): + if meta.is_async: + return meta.client, meta.bucket_name, meta.file_path, meta.local_nvme_path + else: + return meta.client, meta.bucket_name, meta.file_path + + @staticmethod + def unpack_volc_nosave_meta(meta): + return meta.client, meta.bucket_name, meta.file_path + + class LocalMetaInfo: """Local meta info for save/load etc.""" @@ -139,18 +182,22 @@ def unpack_local_nosave_meta(meta): return (meta.file_path,) -def unpack_save_meta(meta: Union[Boto3MetaInfo, LocalMetaInfo]): +def unpack_save_meta(meta: Union[Boto3MetaInfo, VolcMetaInfo, LocalMetaInfo]): if isinstance(meta, Boto3MetaInfo): return Boto3MetaInfo.unpack_boto3_save_meta(meta) + elif isinstance(meta, VolcMetaInfo): + return VolcMetaInfo.unpack_volc_save_meta(meta) elif isinstance(meta, LocalMetaInfo): return LocalMetaInfo.unpack_local_save_meta(meta) else: raise ValueError(f"unkonwn meta info: {type(meta)}") -def unpack_nosave_meta(meta: Union[Boto3MetaInfo, LocalMetaInfo]): +def unpack_nosave_meta(meta: Union[Boto3MetaInfo, VolcMetaInfo, LocalMetaInfo]): if isinstance(meta, Boto3MetaInfo): return Boto3MetaInfo.unpack_boto3_nosave_meta(meta) + elif isinstance(meta, VolcMetaInfo): + return VolcMetaInfo.unpack_volc_nosave_meta(meta) elif isinstance(meta, LocalMetaInfo): return LocalMetaInfo.unpack_local_nosave_meta(meta) else: @@ -170,6 +217,10 @@ def try_get_storage_backend(path: str): if gpc.is_rank_for_log(): logger.warning(f"path: '{path}' not start with backend prefix, guess it is the backend of boto3.") return "boto3", path + elif path.startswith("vc:"): + if gpc.is_rank_for_log(): + logger.warning(f"path: '{path}' not start with backend prefix, guess it is the backend of volc.") + return "volc", path else: sre = path.split(":", maxsplit=1) if len(sre) == 1: @@ -312,6 +363,143 @@ def delete_obj(handler, fp: str): raise NotImplementedError("boto3 not support delete_obj") +class VolcClient(StorageClient): + """ + VolcClient + """ + + def __init__( + self, + endpoint: str, + region: str, + ) -> None: + """Volc object/file storage management class + + Args: + access_key (str): Volc access key ID. + secret_key (str): Volc secret access key. + endpoint (str): Volc tos endpoint. + region (str): Volc tos region. + + """ + super().__init__(tos) + + try: + access_key = os.environ["VOLC_ACCESS_KEY_ID"] + secret_key = os.environ["VOLC_SECRET_ACCESS_KEY_ID"] + except KeyError as exc: + raise RuntimeError( + "Please set 'VOLC_ACCESS_KEY_ID' and 'VOLC_SECRET_ACCESS_KEY_ID'", + "using environment variable!", + ) from exc + + self.client = self.handler.TosClientV2(access_key, secret_key, endpoint, region) + + @staticmethod + def sync_upload_fileobj(handler, bucket_name: str, fp: str, saved_obj=None, **kwargs): + assert saved_obj is not None, "saved_obj is None!" + try: + with io.BytesIO() as f: + torch.save(saved_obj, f, **kwargs) + f.seek(0) + handler.client.put_object(bucket_name, fp, content=f) + except handler.handler.exceptions.TosClientError as exc: + raise RuntimeError( + f"Volc Network Error: fail with client error, message:{exc.message}, cause: {exc.cause}" + ) from exc + except handler.handler.exceptions.TosServerError as exc: + raise RuntimeError( + f"Volc Network Error: fail with server error, code: {exec.code}", + f"error with request id: {exec.request_id}", + f"error with message: {exec.message}", + f"error with http code: {exec.status_code}", + ) from exc + + @staticmethod + def load(handler, bucket_name: str, fp: str, **kwargs) -> Dict: + """ + Args: + fp (str): Path to save, eg. vc://opennlplab/model_weights/xxx/ddd.pt + """ + try: + object_stream = handler.client.get_object(bucket_name, fp) + buffer = io.BytesIO(object_stream.read()) + states = torch.load(buffer, **kwargs) + except handler.handler.exceptions.TosClientError as exc: + raise RuntimeError( + f"Volc Network Error: fail with client error, message:{exc.message}, cause: {exc.cause}" + ) from exc + except handler.handler.exceptions.TosServerError as exc: + raise RuntimeError( + f"Volc Network Error: fail with server error, code: {exec.code}", + f"error with request id: {exec.request_id}", + f"error with message: {exec.message}", + f"error with http code: {exec.status_code}", + ) from exc + + return states + + @staticmethod + def assert_fp_exists(handler, bucket_name: str, fp: str): # pylint: disable=W0613 + assert len(list(handler.client.list_objects_type2(bucket_name, prefix=fp).contents)) > 0, fp + + @staticmethod + def is_fp_exists(handler, bucket_name: str, fp: str): # pylint: disable=W0613 + re = handler.client.list_objects_type2(bucket_name, prefix=fp) + if hasattr(re, "contents"): + return len(list(re.contents)) > 0 + else: + return False + + @staticmethod + def get_fns(handler, bucket_name: str, fp: str): + if VolcClient.is_fp_exists(handler, bucket_name, fp): + folder_name_list = [] + result = handler.client.list_objects_type2(bucket_name, prefix=fp) + if hasattr(result, "contents"): + for iterm in result.contents: + pth = iterm.key + folder_name_list.append(pth.split(fp, maxsplit=1)[1].strip("/").split("/", maxsplit=1)[0]) + + while result.is_truncated: + result = handler.client.list_objects_type2( + bucket_name, prefix=fp, continuation_token=result.next_continuation_token + ) + if hasattr(result, "contents"): + for iterm in result.contents: + pth = iterm.key + folder_name_list.append(pth.split(fp, maxsplit=1)[1].strip("/").split("/", maxsplit=1)[0]) + + return list(set(folder_name_list)) + + else: + if gpc.is_rank_for_log(): + logger.warning(f"'{fp}' not found!") + return None + + @staticmethod + def async_upload_fileobj(handler, bucket_name: str, fp: str, local_nvme_path: str): + try: + handler.client.put_object_from_file(bucket_name, fp, local_nvme_path) + except handler.handler.exceptions.TosClientError as exc: + raise RuntimeError( + f"Volc Network Error: fail with client error, message:{exc.message}, cause: {exc.cause}" + ) from exc + except handler.handler.exceptions.TosServerError as exc: + raise RuntimeError( + f"Volc Network Error: fail with server error, code: {exec.code}", + f"error with request id: {exec.request_id}", + f"error with message: {exec.message}", + f"error with http code: {exec.status_code}", + ) from exc + except Exception as e: + raise e + + @staticmethod + def delete_obj(handler, fp: str): + raise NotImplementedError("volc not support delete_obj") + + class LocalClient(StorageClient): """ Storage Client for local NFS. @@ -388,8 +576,35 @@ def get_boto3_meta(fp: str, tmp_local_folder: str, is_async: bool) -> Boto3MetaI ) +def get_volc_meta(fp: str, tmp_local_folder: str, is_async: bool) -> VolcMetaInfo: + assert fp.startswith("vc://"), f"Path '{fp}' is not a volc url" + parts = fp.lstrip("vc://").split(os.path.sep) + match = volc_url_re.match(parts[0]) + assert match is not None, f"url '{fp}' is not a valid volc url" + bucket_name, endpoint = match.group(1), match.group(2) + temp_part = endpoint.split(".") + endpoint = ".".join(temp_part[1:]) + region = temp_part[1].split("-") + region = "-".join(region[1:]) + + if is_async: + tmp_step_file = get_tmp_file_name(tmp_local_folder, fp) + else: + tmp_step_file = None + return VolcMetaInfo( + is_async=is_async, + handler=None, + bucket_name=bucket_name, + endpoint=endpoint, + region=region, + file_path=os.path.sep.join(parts[1:]), + async_upload_fn=VolcClient.async_upload_fileobj, + local_nvme_path=tmp_step_file, + ) + + def get_local_meta(fp: str) -> LocalMetaInfo: - assert not fp.startswith("s3://"), f"Path '{fp}' is not a local path" + assert not fp.startswith("s3://") and not fp.startswith("vc://"), f"Path '{fp}' is not a local path" return LocalMetaInfo(fp) @@ -430,10 +645,11 @@ class StorageManager(metaclass=SingletonMeta): TODO: add a thread to poll the asynchronous storage state. """ - BACKEND_TYPE = {"boto3", "local"} + BACKEND_TYPE = {"boto3", "local", "volc"} BACKEND_INIT_METHOD = { "boto3": Boto3Client, "local": LocalClient, + "volc": VolcClient, } CLI_DICT = {} @@ -476,11 +692,12 @@ def __init__(self, enable_save, tmp_local_folder="/dev/shm/test/", async_mode=Tr logger.error(f'tmp_local_folder only have "{free_size}" GB free space, less then 100 GB!') raise RuntimeError(f"Insufficient temporary storage space on {socket.gethostname()}") - def _get_client(self, path: str, async_mode: bool = False) -> Union[Boto3MetaInfo, LocalMetaInfo]: + def _get_client(self, path: str, async_mode: bool = False) -> Union[Boto3MetaInfo, VolcMetaInfo, LocalMetaInfo]: """ example: local:/path/to/checkpoint boto3:s3://model_weights/0331/120bi + volc:vc://model_weights/0331/120bi Args: path (str): _description_ @@ -507,10 +724,29 @@ def _get_client(self, path: str, async_mode: bool = False) -> Union[Boto3MetaInf the proxy may make boto3 unavailable or affect performance." ) self.has_warning = True + elif backend == "volc": + meta_info = get_volc_meta(path, self.tmp_local_folder, async_mode) + backend_key = backend + ":" + meta_info.endpoint + init_args = ( + meta_info.endpoint, + meta_info.region, + ) + if ( + "http_proxy" in os.environ + or "https_proxy" in os.environ + or "HTTP_PROXY" in os.environ + or "HTTPS_PROXY" in os.environ + ): + if not self.has_warning and gpc.is_rank_for_log(): + logger.warning( + "HTTP/HTTPS proxy is detected when using volc, incorrectly setting \ + the proxy may make volc unavailable or affect performance." + ) + self.has_warning = True assert backend in StorageManager.BACKEND_TYPE, f"Unkown backend: {backend}" - # boto3 backend need special treatment. + # boto3 and volc backend need special treatment. if backend_key not in StorageManager.CLI_DICT: StorageManager.CLI_DICT.update({backend_key: StorageManager.BACKEND_INIT_METHOD[backend](*init_args)}) @@ -527,11 +763,10 @@ def get_fns(self, folder) -> List[str]: return meta.client.get_fns(*unpack_nosave_meta(meta)) def save(self, save_path: str, to_save_obj: Any, async_upload=None, **kwargs): - if async_upload is None: async_upload = self.async_mode - if not save_path.startswith("boto3:"): + if not save_path.startswith("boto3:") and not save_path.startswith("volc:"): async_upload = False meta = self._get_client(save_path, async_upload) @@ -554,6 +789,7 @@ def save(self, save_path: str, to_save_obj: Any, async_upload=None, **kwargs): def load(self, load_path: str, **kwargs) -> Any: self.wait() meta = self._get_client(path=load_path) + return meta.client.load(*unpack_nosave_meta(meta), **kwargs) def delete_obj(self, fp: str): diff --git a/internlm/utils/writer.py b/internlm/utils/writer.py index b519b954..fb41fe5f 100644 --- a/internlm/utils/writer.py +++ b/internlm/utils/writer.py @@ -42,8 +42,8 @@ def init_tb_writer( # dir of the last task by 'make_launch_script.sh'. # If we load ckpt, 'resume_tb_folder' will be overwritten as the # reloaded 'train_state.resume_tb_folder'.s - if resume_tb_folder is not None: - assert len(resume_tb_folder) > 0 and resume_tb_folder != "/" + if resume_tb_folder is not None and len(resume_tb_folder) > 0: + assert resume_tb_folder != "/" if not os.path.exists(resume_tb_folder): logger.error( f"Can't found resume_tb_folder{resume_tb_folder}, \ diff --git a/tests/test_core/test_pipeline.py b/tests/test_core/test_pipeline.py new file mode 100644 index 00000000..72bc52f7 --- /dev/null +++ b/tests/test_core/test_pipeline.py @@ -0,0 +1,323 @@ +import multiprocessing as mp +import random + +import numpy as np +import pytest +import torch +from torch import nn +from torch.testing import assert_close + +import internlm +from internlm.core.context import ParallelMode +from internlm.core.context import global_context as gpc +from internlm.core.context.parallel_context import Config +from internlm.core.engine import Engine +from internlm.core.gradient_handler import PipelineSharedModuleGradientHandler +from internlm.core.scheduler import ( + InterleavedPipelineScheduler, + PipelineScheduler, + SchedulerMetricHook, +) +from internlm.solver.pipeline_utils import partition_uniform +from internlm.train import initialize_optimizer + + +class MlpModel(nn.Module): + """ + Custom model + """ + + def __init__(self, start, end, model_type=None): + super().__init__() + self.part = [start, end] + self.blocks = nn.ModuleList([nn.Linear(8, 8, bias=False) for lid in range(end - start)]) + self.model_type = model_type + + def forward(self, hidden_states=None, input_ids=None): + if self.model_type != "torch" and self.part[0] != 0: + input_ids = hidden_states + + for i in range(self.part[1] - self.part[0]): + input_ids = self.blocks[i](input_ids) + return input_ids + + +class MyLoss(nn.Module): + """ + Custom loss + """ + + def __init__(self): + super().__init__() + + def forward(self, logits, labels): + loss = torch.nn.MSELoss(reduction="sum") + return loss(logits, labels) + + +config = Config( + dict( + gradient_handler=[dict(type="PipelineSharedModuleGradientHandler")], + parallel=dict( + zero1=dict(size=1, fsdp=False), + pipeline=dict(size=8, interleaved_overlap=False), + sequence_parallel=False, + tensor=1, + ), + model_type="INTERNLM", + data=dict(seq_len=8, micro_num=16, micro_bsz=1, pack_sample_into_one=False, min_length=0, total_steps=9999), + model=dict( + dtype=torch.bfloat16, + num_chunks=2, + use_flash_attn=True, + ), + resume_tb_folder="", + tensorboard_folder="", + alert_address=None, + monitor=dict(alert=dict(enable_feishu_alert=False, feishu_alert_address=None, light_monitor_address=None)), + grad_scaler=dict( + fp16=dict( + initial_scale=1, + min_scale=1, + growth_interval=1, + ), + growth_factor=1.1, + backoff_factor=0.9, + max_scale=1, + hysteresis=1, + ), + adam=dict( + lr=1e-4, + adam_beta1=0.9, + adam_beta2=0.95, + adam_beta2_c=0, + adam_eps=1e-8, + weight_decay=0.01, + ), + hybrid_zero_optimizer=dict( + overlap_sync_grad=False, + overlap_sync_param=False, + reduce_bucket_size=512 * 1024 * 1024, + clip_grad_norm=1.0, + ), + beta2_scheduler=dict( + init_beta2=0.95, + c=0, + cur_iter=-1, + ), + lr_scheduler=dict( + total_steps=100, + init_steps=0, + warmup_ratio=0.01, + eta_min=1e-5, + last_epoch=-1, + ), + ) +) + + +def build_environment(rank, world_size): + import os + + os.environ["RANK"] = str(rank) + os.environ["LOCAL_RANK"] = str(rank) + os.environ["WORLD_SIZE"] = str(world_size) + os.environ["MASTER_ADDR"] = "127.0.0.1" + os.environ["MASTER_PORT"] = "33333" + torch.cuda.empty_cache() + # launcher="torch" + internlm.launch_from_torch(config=config, seed=1024) + + +def loose_close(a, b, dtype: torch.dtype = torch.float32): + + if dtype is torch.float32: + rtol = 1.3e-6 + atol = 1e-5 + elif dtype is torch.bfloat16: + rtol = 2e-2 + atol = 2e-2 + + if isinstance(a, torch.Tensor): + a = a.detach().to(dtype) + b = b.detach().to(dtype) + + assert_close(a, b, rtol=rtol, atol=atol) + + +def seed_all(seed, cuda_deterministic=False): + random.seed(seed) + np.random.seed(seed) + torch.manual_seed(seed) + if torch.cuda.is_available(): + torch.cuda.manual_seed(seed) + torch.cuda.manual_seed_all(seed) + if cuda_deterministic: # slower, more reproducible + torch.backends.cudnn.deterministic = True + torch.backends.cudnn.benchmark = False + else: + torch.backends.cudnn.deterministic = False + torch.backends.cudnn.benchmark = True + + +def _build_generic_model_1d(num_layers, num_chunks): + pipeline_size = gpc.get_world_size(ParallelMode.PIPELINE) + pipeline_rank = gpc.get_local_rank(ParallelMode.PIPELINE) + + all_parts = partition_uniform(num_layers, pipeline_size, num_chunks) + parts = all_parts[pipeline_rank] + if gpc.is_rank_for_log(): + print(f"The layer sharding is {all_parts}.", flush=True) + + models = [] + for start, end in parts: + models.append(MlpModel(start, end).cuda()) + torch.distributed.barrier() + if len(models) == 1: + model = models[0] + else: + model = nn.ModuleList(models) + + return model + + +def exam_pipeline_parallel(args): + # init + rank, world_size, micro_num, num_chunks, interleaved_overlap = args + config.data.micro_num = micro_num + config.model.num_chunks = num_chunks + config.parallel.pipeline.interleaved_overlap = interleaved_overlap + + build_environment(rank, world_size) + + device = torch.device(f"cuda:{rank}") + dtype = config.model["dtype"] + + # set seed + seed_all(1024) + + # pp model + pp_model = _build_generic_model_1d(num_layers=32, num_chunks=num_chunks) + pp_model = pp_model.to(dtype) + + # pp scheduler + scheduler_hooks = [ + SchedulerMetricHook(skip=True), + ] + + seq_len = gpc.config.data.seq_len + gpc.config.NUM_MICRO_BATCHES = micro_num + communication_overlap = interleaved_overlap + + if num_chunks == 1: + # noninterleaved pp + scheduler = PipelineScheduler( + data_process_func=None, + num_microbatches=micro_num, + dtype=dtype, + tensor_shape=[1, 8], + scatter_gather_tensors=False, + scheduler_hooks=scheduler_hooks, + ) + else: + # interleaved pp + if micro_num < gpc.get_world_size(ParallelMode.PIPELINE): + try: + scheduler = InterleavedPipelineScheduler( + num_microbatches=micro_num, + num_chunks=gpc.config.model.num_chunks, + dtype=dtype, + tensor_shape=[1, 8], + scatter_gather_tensors=False, + scheduler_hooks=scheduler_hooks, + communication_overlap=communication_overlap, + ) + except AssertionError: + return + else: + raise RuntimeError("Error: AssertionError should occur when micro_num < Pipeline parrallel world size") + else: + scheduler = InterleavedPipelineScheduler( + num_microbatches=micro_num, + num_chunks=gpc.config.model.num_chunks, + dtype=dtype, + tensor_shape=[1, 8], + scatter_gather_tensors=False, + scheduler_hooks=scheduler_hooks, + communication_overlap=communication_overlap, + ) + + # pp optimizer and engine + optimizer, beta2_scheduler, lr_scheduler = initialize_optimizer(model=pp_model) + engine = Engine( + model=pp_model, + optimizer=optimizer, + lr_scheduler=lr_scheduler, + beta2_scheduler=beta2_scheduler, + criterion=MyLoss().to(dtype), + gradient_handlers=[PipelineSharedModuleGradientHandler(model=pp_model, optimizer=optimizer)], + clip_grad_norm=gpc.config.hybrid_zero_optimizer.get("clip_grad_norm", 0.0), + ) + + scheduler.pre_processing(engine) + engine.train() + + # create input + x_list = [] + y_list = [] + for _ in range(micro_num): + x_list.append(list(range(seq_len))) + y_list.append(list(range(seq_len))) + xs = torch.tensor(x_list).to(device).to(dtype) + yx = torch.tensor(y_list).to(device).to(dtype) + + input_list = [{"input_ids": xs}, yx] + + # pp forward and backward + output, _, loss = scheduler.forward_backward_step( + engine, input_list, forward_only=False, return_loss=True, return_output_label=True + ) + + engine.step() + + # torch related + if gpc.is_last_rank(ParallelMode.PIPELINE): + torch_xs = torch.tensor(x_list).to(device).to(torch.float32) + torch_ys = torch.tensor(y_list).to(device).to(torch.float32) + torch_model = MlpModel(0, 32, "torch").to(device) + torch_optimizer = torch.optim.AdamW( + params=[{"params": torch_model.parameters(), "weight_decay": config.adam.weight_decay}], + lr=config.adam.lr, + betas=(config.adam.adam_beta1, config.adam.adam_beta2), + eps=config.adam.adam_eps, + ) + + # check output + torch_output = torch_model(input_ids=torch_xs) # pylint: disable=E1102 + loose_close(torch_output, output, dtype=dtype) + + torch_criterion = MyLoss().to(torch.float32) + torch_loss = torch_criterion(torch_output, torch_ys) / micro_num # pylint: disable=E1102 + torch_loss.backward() + torch_optimizer.step() + + # check loss + loose_close(torch_loss, loss[0], dtype=dtype) + + +@pytest.mark.parametrize("micro_num", [4, 8, 16]) +@pytest.mark.parametrize("num_chunks", [1, 2, 4]) +@pytest.mark.parametrize("interleaved_overlap", [True, False]) +def test_pipeline_parallel(micro_num, num_chunks, interleaved_overlap): + ctx = mp.get_context("spawn") + with ctx.Pool(processes=8) as pool: + pool.map( + exam_pipeline_parallel, + [[rank, 8, micro_num, num_chunks, interleaved_overlap] for rank in range(8)], + ) + pool.close() + pool.join() + + +if __name__ == "__main__": + pytest.main(["-s", "-q", "test_pipeline.py"]) diff --git a/tests/test_model/test_model_internlm.py b/tests/test_model/test_model_internlm.py index fb9c6783..c002a96a 100644 --- a/tests/test_model/test_model_internlm.py +++ b/tests/test_model/test_model_internlm.py @@ -16,7 +16,12 @@ config = Config( dict( - parallel=dict(zero1=1, pipeline=dict(size=1, interleaved_overlap=False), sequence_parallel=False, tensor=1), + parallel=dict( + zero1=dict(size=1, fsdp=False), + pipeline=dict(size=1, interleaved_overlap=False), + sequence_parallel=False, + tensor=1, + ), model_type="INTERNLM", data=dict(seq_len=2048, micro_num=1, micro_bsz=1, pack_sample_into_one=False, min_length=0, total_steps=9999), model=dict( diff --git a/tests/test_solver/test_optimizer.py b/tests/test_solver/test_optimizer.py index 6a227975..569b7b40 100644 --- a/tests/test_solver/test_optimizer.py +++ b/tests/test_solver/test_optimizer.py @@ -10,7 +10,7 @@ from torch.testing import assert_close import internlm -from internlm.core.context.parallel_context import Config +from internlm.core.context.parallel_context import Config, ParallelMode from internlm.solver.optimizer import HybridZeroOptimizer from internlm.solver.optimizer.utils import ParamBcastSyncHandler @@ -29,7 +29,12 @@ def forward(self, x): config = Config( dict( - parallel=dict(zero1=1, pipeline=dict(size=1, interleaved_overlap=False), sequence_parallel=False, tensor=1), + parallel=dict( + zero1=dict(size=1, fsdp=False), + pipeline=dict(size=1, interleaved_overlap=False), + sequence_parallel=False, + tensor=1, + ), model_type="INTERNLM", data=dict(seq_len=2048, micro_num=1, micro_bsz=1, pack_sample_into_one=False, min_length=0, total_steps=9999), model=dict( @@ -103,14 +108,22 @@ def init_optimizer_grouped_parameters(check_group, model): { "params": list(model.parameters())[:2], "weight_decay": config.adam.weight_decay, + "dp_mode": ParallelMode.DATA, }, { "params": list(model.parameters())[2:], "weight_decay": config.adam.weight_decay, + "dp_mode": ParallelMode.DATA, }, ] else: - optimizer_grouped_parameters = [{"params": model.parameters(), "weight_decay": config.adam.weight_decay}] + optimizer_grouped_parameters = [ + { + "params": model.parameters(), + "weight_decay": config.adam.weight_decay, + "dp_mode": ParallelMode.DATA, + } + ] return optimizer_grouped_parameters @@ -137,7 +150,7 @@ def exam_hybrid_zero_optim_with_ddp(args): # ParamBcastSyncHandler does not consider paramters in different optimizer group currently if overlap_sync_param and check_group: return - config.parallel.zero1 = zero_parallel + config.parallel.zero1.size = zero_parallel config.hybrid_zero_optimizer.overlap_sync_param = overlap_sync_param config.hybrid_zero_optimizer.overlap_sync_grad = overlap_sync_grad config.data.micro_num = micro_num @@ -253,7 +266,7 @@ def exam_hybrid_zero_optim_with_ddp(args): def exam_hybrid_zero_optim_with_ckpt_load_save(args): # init rank, world_size, zero_parallel, check_group, dtype = args - config.parallel.zero1 = zero_parallel + config.parallel.zero1.size = zero_parallel config.parallel.dtype = dtype build_environment(rank, world_size) diff --git a/tests/test_utils/common_fixture.py b/tests/test_utils/common_fixture.py index 379a3e05..5d6d7daf 100644 --- a/tests/test_utils/common_fixture.py +++ b/tests/test_utils/common_fixture.py @@ -10,15 +10,18 @@ from internlm.solver.optimizer.hybrid_zero_optim import HybridZeroOptimizer from internlm.utils.common import SingletonMeta -OSS_NAME = os.environ["OSS_BUCKET_NAME"] -OSS_IP = os.environ["OSS_IP"] -USER = os.environ["USER"] +OSS_NAME = os.environ.get("OSS_BUCKET_NAME") +OSS_IP = os.environ.get("OSS_IP") +USER = os.environ.get("USER") JOB_NAME = "CI_TEST" LOCAL_SAVE_PATH = "local:local_ckpt" BOTO_SAVE_PATH = f"boto3:s3://{OSS_NAME}.{OSS_IP}/{USER}/{JOB_NAME}" BOTO_SAVE_PATH_NO_PRFIX = f"s3://{OSS_NAME}.{OSS_IP}/{USER}/{JOB_NAME}/" +VOLC_SAVE_PATH = f"volc:vc://{OSS_NAME}.{OSS_IP}/{USER}/{JOB_NAME}" +VOLC_SAVE_PATH_NO_PRFIX = f"vc://{OSS_NAME}.{OSS_IP}/{USER}/{JOB_NAME}/" + ASYNC_TMP_FOLDER = "./async_tmp_folder" @@ -172,13 +175,25 @@ def del_tmp_file(): except FileNotFoundError: pass - try: - cmd = r"/mnt/petrelfs/share/sensesync --dryrun --deleteSrc cp " + BOTO_SAVE_PATH_NO_PRFIX + " / " - with Popen(cmd, stdout=PIPE, stderr=STDOUT, shell=True) as output: - results, presults = "", "" - for line in iter(output.stdout.readline, b""): - results += str(line.rstrip()) - presults += line.rstrip().decode() + "\n" - print(presults, flush=True) - except: # noqa # pylint: disable=bare-except - pass + if OSS_NAME is not None: + try: + cmd = r"/mnt/petrelfs/share/sensesync --dryrun --deleteSrc cp " + BOTO_SAVE_PATH_NO_PRFIX + " / " + with Popen(cmd, stdout=PIPE, stderr=STDOUT, shell=True) as output: + results, presults = "", "" + for line in iter(output.stdout.readline, b""): + results += str(line.rstrip()) + presults += line.rstrip().decode() + "\n" + print(presults, flush=True) + except: # noqa # pylint: disable=bare-except + pass + + try: + cmd = r"/mnt/petrelfs/share/sensesync --dryrun --deleteSrc cp " + VOLC_SAVE_PATH_NO_PRFIX + " / " + with Popen(cmd, stdout=PIPE, stderr=STDOUT, shell=True) as output: + results, presults = "", "" + for line in iter(output.stdout.readline, b""): + results += str(line.rstrip()) + presults += line.rstrip().decode() + "\n" + print(presults, flush=True) + except: # noqa # pylint: disable=bare-except + pass diff --git a/tests/test_utils/test_storage_manager.py b/tests/test_utils/test_storage_manager.py index e5f60c44..e96374e6 100644 --- a/tests/test_utils/test_storage_manager.py +++ b/tests/test_utils/test_storage_manager.py @@ -6,24 +6,15 @@ from internlm.core.context.parallel_context import Config from internlm.initialize.launch import get_config_value from tests.test_utils.common_fixture import ( # noqa # pylint: disable=unused-import - ASYNC_TMP_FOLDER, BOTO_SAVE_PATH, LOCAL_SAVE_PATH, + VOLC_SAVE_PATH, del_tmp_file, - init_dist_and_model, reset_singletons, ) ASYNC_TMP_FOLDER = "./async_tmp_folder" ckpt_config_list = [ - # async boto - dict( - enable_save_ckpt=True, - async_upload_tmp_folder=ASYNC_TMP_FOLDER, - async_upload=True, - save_folder=BOTO_SAVE_PATH, - test_id=0, - ), # sync local dict( enable_save_ckpt=True, @@ -32,21 +23,45 @@ save_folder=LOCAL_SAVE_PATH, test_id=1, ), + # async local + dict( + enable_save_ckpt=True, + async_upload_tmp_folder=ASYNC_TMP_FOLDER, + async_upload=True, + save_folder=LOCAL_SAVE_PATH, + test_id=2, + ), + # async boto + dict( + enable_save_ckpt=True, + async_upload_tmp_folder=ASYNC_TMP_FOLDER, + async_upload=True, + save_folder=BOTO_SAVE_PATH, + test_id=3, + ), # sync boto dict( enable_save_ckpt=True, async_upload_tmp_folder=None, async_upload=False, save_folder=BOTO_SAVE_PATH, - test_id=2, + test_id=4, ), - # async local + # async volc dict( enable_save_ckpt=True, async_upload_tmp_folder=ASYNC_TMP_FOLDER, async_upload=True, - save_folder=LOCAL_SAVE_PATH, - test_id=3, + save_folder=VOLC_SAVE_PATH, + test_id=5, + ), + # sync volc + dict( + enable_save_ckpt=True, + async_upload_tmp_folder=None, + async_upload=False, + save_folder=VOLC_SAVE_PATH, + test_id=6, ), ] @@ -61,7 +76,7 @@ def del_tmp(): @pytest.mark.usefixtures("del_tmp") @pytest.mark.usefixtures("reset_singletons") @pytest.mark.parametrize("ckpt_config", ckpt_config_list) -def test_storage_mm_save_load(ckpt_config, init_dist_and_model): # noqa # pylint: disable=unused-argument +def test_storage_mm_save_load(ckpt_config): # noqa # pylint: disable=unused-argument from internlm.utils.storage_manager import ( check_folder, get_fns, @@ -72,6 +87,11 @@ def test_storage_mm_save_load(ckpt_config, init_dist_and_model): # noqa # pylin ) ckpt_config = Config(ckpt_config) + if os.environ.get("OSS_BUCKET_NAME") is None: + if ckpt_config.test_id > 2: + print("Pass boto3 and volc", flush=True) + return + enable_save_ckpt = get_config_value(ckpt_config, "enable_save_ckpt", False) async_upload_tmp_folder = get_config_value(ckpt_config, "async_upload_tmp_folder", False) async_upload = get_config_value(ckpt_config, "async_upload", False) @@ -97,6 +117,9 @@ def test_storage_mm_save_load(ckpt_config, init_dist_and_model): # noqa # pylin ("/mnt/ckpt/", "local", "/mnt/ckpt/"), ("./ckpt/", "local", "./ckpt/"), ("s3://oss_bucket/", "boto3", "s3://oss_bucket/"), + ("volc:vc://oss_bucket/", "volc", "vc://oss_bucket/"), + ("volc:oss_bucket/", "volc", "oss_bucket/"), + ("vc://oss_bucket/", "volc", "vc://oss_bucket/"), ]