Skip to content

Commit

Permalink
[Unified Checkpoint] Support sharding_comm_overlap (PaddlePaddle#9392)
Browse files Browse the repository at this point in the history
* update
  • Loading branch information
DesmonDay committed Nov 14, 2024
1 parent 5a03c3b commit e2c5765
Show file tree
Hide file tree
Showing 5 changed files with 24 additions and 11 deletions.
14 changes: 12 additions & 2 deletions paddlenlp/trainer/trainer.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@
set_seed,
should_skip_data,
speed_metrics,
split_parallel_config,
)
from .training_args import TrainingArguments
from .unified_checkpoint import UnifiedCheckpointHandler
Expand Down Expand Up @@ -1988,7 +1989,12 @@ def get_expected_keys(inputs, keys):
self.optimizer = mix_precision_utils.MixPrecisionOptimizer(self.optimizer)
self.optimizer = fleet.distributed_optimizer(self.optimizer)

if hasattr(self.args, "enable_sharding_comm_overlap") and self.args.enable_sharding_comm_overlap:
if (
hasattr(self.args, "enable_sharding_comm_overlap")
and self.args.enable_sharding_comm_overlap
and self.args.unified_checkpoint
and "split_param" in split_parallel_config(self.args.sharding_parallel_config)
):
model.register_sharding_comm_overlap_hook(self.optimizer)

# No pipeline mode, sharding only
Expand Down Expand Up @@ -2765,7 +2771,11 @@ def _load_optimizer_and_scheduler(self, checkpoint):
opt_state_dict = None
else:
model = self.model
if hasattr(self.args, "enable_sharding_comm_overlap") and self.args.enable_sharding_comm_overlap:
if (
hasattr(self.args, "enable_sharding_comm_overlap")
and self.args.enable_sharding_comm_overlap
and "split_param" in split_parallel_config(self.args.sharding_parallel_config)
):
model = self.model_wrapped
opt_state_dict = self.unified_checkpoint_handler.load_unified_optimizer(
model=model,
Expand Down
8 changes: 8 additions & 0 deletions paddlenlp/trainer/trainer_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -1126,3 +1126,11 @@ def should_skip_data(global_step, skip_data_intervals):
skip_flag = True
break
return skip_flag


def split_parallel_config(parallel_config):
if "," in parallel_config:
parallel_config = set(parallel_config.split(","))
else:
parallel_config = set(parallel_config.split(" "))
return parallel_config
8 changes: 1 addition & 7 deletions paddlenlp/trainer/training_args.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
OptimizerNames,
SchedulerType,
ShardingOption,
split_parallel_config,
)

__all__ = [
Expand Down Expand Up @@ -1069,13 +1070,6 @@ def __post_init__(self):
logger.warning("set amp_master_grad to false since amp is disabled.")
self.amp_master_grad = False

def split_parallel_config(parallel_config):
if "," in parallel_config:
parallel_config = set(parallel_config.split(","))
else:
parallel_config = set(parallel_config.split(" "))
return parallel_config

# use_hybrid_parallel
if self.use_hybrid_parallel:

Expand Down
3 changes: 2 additions & 1 deletion paddlenlp/trainer/unified_checkpoint/check_completion.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,8 @@ def check_unified_optimizer(args, model, optimizer, resume_from_checkpoint, safe
sharding_group = hcg.get_sharding_parallel_group()
sharding_rank = sharding_group.rank
dp_rank = dp_group.rank if dp_group.nranks > 1 else 0
struct2static_name_mappings = {k: v.name for k, v in model.state_dict().items()}
model_state_dict = get_expected_state_dict(model)
struct2static_name_mappings = {k: v.name for k, v in model_state_dict.items()}

if is_sharding_split_param_mode(args):
# We do not check optimizer files completion for split_param, since it is very complicated. Directly support local resume.
Expand Down
2 changes: 1 addition & 1 deletion paddlenlp/trainer/unified_checkpoint/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ def get_expected_keys(args, sharded_metadata, model, optimizer, is_master_weight
params2rank = optimizer._param2rank

model_state_dict = get_expected_state_dict(model)
struct2static_name_mappings = {k: v.name for k, v in get_expected_state_dict(model).items()}
struct2static_name_mappings = {k: v.name for k, v in model_state_dict.items()}

expected_keys = []
for key in list(sharded_metadata["all_optimizer_keys"]):
Expand Down

0 comments on commit e2c5765

Please sign in to comment.