Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add support for loading NeMo 2.0 checkpoints #412

Merged
merged 4 commits into from
Dec 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/_run_test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ jobs:
--env HF_HOME=/home/TestData/aligner/hf_home \
--env ALIGNER_CI_DIR=/home/TestData/aligner \
--env ALIGNER_REPO_DIR=/opt/NeMo-Aligner \
--volume /mnt/datadrive/TestData/aligner/nlp-copy:/home/TestData/aligner/nlp-copy \
--volume /mnt/datadrive/TestData/aligner/checkpoints:/home/TestData/aligner/checkpoints:ro \
--volume /mnt/datadrive/TestData/aligner/hf_home/hub:/home/TestData/aligner/hf_home/hub:ro \
nemoci.azurecr.io/nemo_aligner_container:${{ github.run_id }} \
Expand Down
3 changes: 2 additions & 1 deletion .github/workflows/cicd-main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -97,9 +97,10 @@ jobs:
- sft-llama3
- sft-llama3-cp
- rm-llama3
- e2e-nemo2
with:
RUNNER: self-hosted-azure
# Fairly aggresive timeout that all functional tests should try to adhere to
TIMEOUT: 8
TIMEOUT: 10
SCRIPT: |
bash /opt/NeMo-Aligner/tests/functional/test_cases/${{ matrix.test_case }}
8 changes: 7 additions & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,10 @@ RUN git clone https://github.com/NVIDIA/NeMo.git && \
pip install -e ".[nlp]" && \
cd nemo/collections/nlp/data/language_modeling/megatron && make

# TODO: Allow installing from the default branch, but introduce a build
# arg if compatibility starts breaking
RUN pip install --no-cache-dir git+https://github.com/NVIDIA/NeMo-Run.git

# MLM
ARG MLM_TAG
RUN pip uninstall -y megatron-core && \
Expand All @@ -115,7 +119,9 @@ RUN pip uninstall -y megatron-core && \
fi && \
pip install -e .

RUN pip install --no-cache-dir lightning # can remove this when NEMO_TAG is bumped to include lightning install
# TODO: This is redundant since NeMo installs this as of 24.12, but keep
# it until 25.03 to give folks enough time to transition.
RUN pip install --no-cache-dir lightning

COPY --from=aligner-bump /opt/NeMo-Aligner /opt/NeMo-Aligner
RUN cd /opt/NeMo-Aligner && \
Expand Down
145 changes: 137 additions & 8 deletions nemo_aligner/utils/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
from copy import deepcopy
from dataclasses import replace
from functools import partial, wraps
from typing import Iterator, List
from typing import Any, Iterator, List, Optional
from unittest.mock import patch

import torch
Expand All @@ -46,16 +46,19 @@ class CustomSaveRestoreConnector(NLPSaveRestoreConnector):
the rm head if load_base_model_only is True
"""

def __init__(self, *args, load_base_model_only=False, **kwargs):
def __init__(self, *args, load_base_model_only=False, replace_sharded_tensor_key: Optional[str] = None, **kwargs):
super().__init__(*args, **kwargs)
self.__load_base_model_only = load_base_model_only
self.__replace_sharded_tensor_key = replace_sharded_tensor_key

def restore_from(self, *args, **kwargs):
if not self.__load_base_model_only:
return super().restore_from(*args, **kwargs)
return super().restore_from(*args, replace_sharded_tensor_key=self.__replace_sharded_tensor_key, **kwargs)

with patch.object(GPTRewardModel, "return_rm_head_in_state_dict", False):
output = super().restore_from(*args, **kwargs)
output = super().restore_from(
*args, replace_sharded_tensor_key=self.__replace_sharded_tensor_key, **kwargs
)

return output

Expand Down Expand Up @@ -87,9 +90,21 @@ def load_from_nemo(
):
"""load a model using nemo checkpoint
"""
connector = CustomSaveRestoreConnector(load_base_model_only=load_base_model_only)
assert os.path.exists(restore_path), f"tried to load from {restore_path=} but it does not exist"

is_2_0_ckpt = load_2_0_checkpoint_model_config(restore_path) is not None
if is_2_0_ckpt:
replace_sharded_tensor_key = "module"
else:
replace_sharded_tensor_key = None

connector = CustomSaveRestoreConnector(
load_base_model_only=load_base_model_only, replace_sharded_tensor_key=replace_sharded_tensor_key
)

if is_2_0_ckpt:
connector.model_weights_ckpt = "weights"

# if we gave it a directory, then load as if it was extracted already
if os.path.isdir(restore_path):
connector.model_extracted_dir = restore_path
Expand All @@ -107,6 +122,10 @@ def load_from_nemo(
save_restore_connector=connector,
strict=strict,
)

if is_2_0_ckpt:
connector.model_weights_ckpt = "model_weights.ckpt"

return (model, model_cfg) if return_updated_cfg else model


Expand All @@ -131,11 +150,121 @@ def load_checkpoint_model_config(restore_path):
return cfg


def load_2_0_checkpoint_model_config(restore_path: str):
try:
from nemo.lightning import io
from nemo.lightning.ckpt_utils import ckpt_to_context_subdir
from nemo.lightning.io.pl import ckpt_to_weights_subdir

if (
os.path.isdir(ckpt_to_context_subdir(restore_path))
and os.path.isdir(ckpt_to_weights_subdir(restore_path, is_saving=False))
and os.path.isfile(os.path.join(ckpt_to_context_subdir(restore_path), "io.json"))
):
config = io.load_context(restore_path, subpath="model.config")
tokenizer_cfg = OmegaConf.load(os.path.join(ckpt_to_context_subdir(restore_path), "model.yaml")).tokenizer

def get_tokenizer_args(tokenizer_cfg):
if "AutoTokenizer" in tokenizer_cfg._target_:
tokenizer_type = "huggingface"
tokenizer_name = (
tokenizer_cfg.pretrained_model_name
if isinstance(tokenizer_cfg.pretrained_model_name, str)
else tokenizer_cfg.pretrained_model_name.attr
)
if os.path.isfile(
os.path.join(ckpt_to_context_subdir(restore_path), tokenizer_name)
) or os.path.isdir(os.path.join(ckpt_to_context_subdir(restore_path), tokenizer_name)):
tokenizer_name = os.path.join(ckpt_to_context_subdir(restore_path), tokenizer_name)

args = {
"library": tokenizer_type,
"type": tokenizer_name,
"use_fast": True,
}
if tokenizer_cfg.get("vocab_file", None):
args["vocab_file"] = os.path.join(
ckpt_to_context_subdir(restore_path), tokenizer_cfg.vocab_file
)
if tokenizer_cfg.get("merges_file", None):
args["merges_file"] = os.path.join(
ckpt_to_context_subdir(restore_path), tokenizer_cfg.merges_file
)

return args
elif "SentencePieceTokenizer" in tokenizer_cfg._target_:
tokenizer_type = "sentencepiece"
tokenizer_name = tokenizer_cfg.model_path
if os.path.isfile(
os.path.join(ckpt_to_context_subdir(restore_path), tokenizer_name)
) or os.path.isdir(os.path.join(ckpt_to_context_subdir(restore_path), tokenizer_name)):
tokenizer_name = os.path.join(ckpt_to_context_subdir(restore_path), tokenizer_name)
elif not os.path.isfile(tokenizer_name):
raise FileNotFoundError(f"Tokenizer file {tokenizer_name} not found")

return {"library": tokenizer_type, "type": None, "model": tokenizer_name}
else:
raise ValueError(f"Unknown tokenizer type: {tokenizer_cfg}")

tokenizer_args = get_tokenizer_args(tokenizer_cfg)

config_dict = {}
for k, v in config.__dict__.items():
if isinstance(v, (float, int, str, bool)):
config_dict[k] = v
elif k == "activation_func":
config_dict["activation"] = v.__name__

if config_dict["activation"] == "silu":
config_dict["activation"] = "fast-swiglu"

config_dict["encoder_seq_length"] = config_dict["seq_length"]

config_dict["mcore_gpt"] = True
config_dict["max_position_embeddings"] = config_dict.get("seq_length")
config_dict["tokenizer"] = tokenizer_args
config_dict["bias"] = config_dict.get("add_bias_linear", True)
config_dict["qkv_bias"] = config_dict.get("add_qkv_bias", False)

try:
strategy: dict[str, Any] = io.load_context(restore_path, subpath="trainer.strategy").__dict__
config_dict["gradient_as_bucket_view"] = strategy.get("gradient_as_bucket_view", True)
# TODO: Add any other parameters required from strategy here
except Exception:
# Default to True based on default values in https://github.com/NVIDIA/NeMo/tree/main/nemo/collections/llm/recipes
config_dict["gradient_as_bucket_view"] = True

try:
precision_plugin: dict[str, Any] = io.load_context(restore_path, subpath="trainer.plugins").__dict__
config_dict["fp16"] = precision_plugin.get("fp16", False)
config_dict["bf16"] = precision_plugin.get("bf16", True)
# TODO: Add any other parameters required from precision plugin here
except Exception:
# Default to True based on default values in https://github.com/NVIDIA/NeMo/tree/main/nemo/collections/llm/recipes
config_dict["fp16"] = False
config_dict["bf16"] = True

if not os.path.isfile(os.path.join(restore_path, "model_config.yaml")):
OmegaConf.save(config=OmegaConf.create(config_dict), f=os.path.join(restore_path, "model_config.yaml"))

return config_dict
except Exception:
# If there's a failure loading the path as a NeMo 2.0 checkpoint,
# return None and continue loading NeMo 1.0 checkpoint.
return None

return None


def load_and_override_model_config(restore_path, model_cfg_to_overwrite, remove_meta_info=True):
"""load the config in the model checkpoint and then overwrite it
with whatever is provided
"""
checkpoint_cfg = load_checkpoint_model_config(restore_path)
checkpoint_cfg_2_0 = load_2_0_checkpoint_model_config(restore_path)
if checkpoint_cfg_2_0 is not None:
checkpoint_cfg = checkpoint_cfg_2_0
else:
checkpoint_cfg = load_checkpoint_model_config(restore_path)

if remove_meta_info:
checkpoint_cfg.pop("target", None)
Expand Down Expand Up @@ -473,11 +602,11 @@ def convert_to_amp_o2_format(state_dict):
def get_iterator_k_split_list(batch: List[str], num_microbatches: int) -> Iterator:
"""
Generate an iterator to split a list into microbatches of equal size.

Args:
batch (List[str]): The list to be split into microbatches.
num_microbatches (int): The number of microbatches to split the list into.

Returns:
Iterator: An iterator that yields the microbatches.
"""
Expand Down
9 changes: 5 additions & 4 deletions tests/functional/dpo.sh
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#!/bin/bash

DATA_DIR=${DATA_DIR}
SCRIPT_DIR=$( cd -- "$( dirname -- "${BASH_SOURCE[0]}" )" &> /dev/null && pwd )
cd $SCRIPT_DIR
set -eoux pipefail

export NCCL_ALGO=Tree
Expand All @@ -11,7 +12,7 @@ GBS=${GBS:-4}
PRETRAINED_CHECKPOINT_NEMO_FILE=${PRETRAINED_CHECKPOINT_NEMO_FILE}


TRAIN_DATA_PATH=${TRAIN_DATA_PATH:-"${DATA_DIR}/dummy-dpo.jsonl"}
TRAIN_DATA_PATH=${TRAIN_DATA_PATH:-./test_data/dummy-dpo.jsonl}
VALID_DATA_PATH=$TRAIN_DATA_PATH

NAME=${NAME:-"dpo_test"}
Expand Down Expand Up @@ -49,14 +50,14 @@ torchrun --nproc-per-node 2 ${GPFS}/examples/nlp/gpt/train_gpt_dpo.py \
++model.dpo.preference_loss_weight=1.0 \
pretrained_checkpoint.restore_from_path=${PRETRAINED_CHECKPOINT_NEMO_FILE} \
"model.data.data_prefix={train: [${TRAIN_DATA_PATH}], validation: [${VALID_DATA_PATH}], test: [${VALID_DATA_PATH}]}" \
exp_manager.create_checkpoint_callback=False \
exp_manager.create_checkpoint_callback=${CREATE_CHECKPOINT_CALLBACK:-False} \
model.data.num_workers=2 \
++model.tensor_model_parallel_size=1 \
++model.pipeline_model_parallel_size=1 \
trainer.dpo.max_steps=${MAX_STEPS:-3} \
trainer.dpo.val_check_interval=${MAX_STEPS:-3} \
trainer.dpo.limit_val_batches=8 \
trainer.dpo.save_interval=0 \
trainer.dpo.save_interval=${SAVE_INTERVAL:-0} \
exp_manager.explicit_log_dir=${RESULTS_DIR} \
++model.activations_checkpoint_granularity=full \
++model.activations_checkpoint_method=uniform \
Expand Down
1 change: 0 additions & 1 deletion tests/functional/test_cases/dpo-llama3
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
# limitations under the License.

SCRIPT_DIR=$( cd -- "$( dirname -- "${BASH_SOURCE[0]}" )" &> /dev/null && pwd )
export DATA_DIR=$SCRIPT_DIR/../test_data
cd $SCRIPT_DIR

set -eoux pipefail
Expand Down
73 changes: 73 additions & 0 deletions tests/functional/test_cases/e2e-nemo2
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
#!/bin/bash
# Copyright (c) 2024, NVIDIA CORPORATION. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

SCRIPT_DIR=$( cd -- "$( dirname -- "${BASH_SOURCE[0]}" )" &> /dev/null && pwd )
cd $SCRIPT_DIR

set -eoux pipefail

# This is an e2e test testing the following workflow:
#
# 1. train a dummy llama3 nemo 2.0 checkpoint
# 2. perform dpo on it and save nemo 1.0 checkpoint
# 3. convert nemo 1.0 checkpoint to 2.0
# 4. perform eval on this newly convert 2.0 checkpoint

PRETRAINING_PATH=/tmp/$(basename $0)-ckpt-dir
PRETRAINING_CHECKPOINT_PATH=/tmp/$(basename $0)-ckpt-path
CONVERTED_PATH=/tmp/$(basename $0)-convert-dir
rm -rf $PRETRAINING_PATH $PRETRAINING_CHECKPOINT_PATH $CONVERTED_PATH

if ! pip show nemo-run &>/dev/null; then
echo "[ERROR] nemo-run is needed for this test. Please visit installation instructions: https://github.com/NVIDIA/NeMo-Run?tab=readme-ov-file#install-nemo-run"
exit 1
fi


####################
# Step 1: Pretrain #
####################
python /opt/NeMo/tests/collections/llm/megatron_gpt_pretraining.py \
--devices=1 \
--max-steps=3 \
--experiment-dir=${PRETRAINING_PATH} \
--vocab-path=${ALIGNER_CI_DIR}/nlp-copy/megatron_gpt/data/gpt/vocab.json \
--merges-path=${ALIGNER_CI_DIR}/nlp-copy/megatron_gpt/data/gpt/merges.txt \
--data-path=${ALIGNER_CI_DIR}/nlp-copy/megatron_gpt/data/gpt/simple_wiki_gpt_preproc_text_document \
--index-mapping-dir=tests/collections/llm/gpt_index_mappings \
--no-masked-softmax-fusion


#################
# Step 2: Align #
#################
NEMO2_CHECKPOINT=$(find $PRETRAINING_PATH/default/checkpoints -maxdepth 1 -type d -name '*-last' | head -n 1)
mv $NEMO2_CHECKPOINT $PRETRAINING_CHECKPOINT_PATH
export PRETRAINED_CHECKPOINT_NEMO_FILE=$PRETRAINING_CHECKPOINT_PATH
CREATE_CHECKPOINT_CALLBACK=True SAVE_INTERVAL=3 \
bash ../dpo.sh

###################
# Step 3: Convert #
###################
export DPO_CHECKPOINT="/tmp/dpo_test/checkpoints/megatron_gpt.nemo"
python /opt/NeMo/scripts/checkpoint_converters/convert_nemo1_to_nemo2.py --input_path $DPO_CHECKPOINT --output_path $CONVERTED_PATH --model_id ${PRETRAINED_CHECKPOINT_NEMO_FILE}

################
# Step 4: Eval #
################
python /opt/NeMo/scripts/llm/generate.py --model_path $CONVERTED_PATH

echo "[Finished] $0"
Loading