From 46e8198097074381f3788c20fcb7b56e42eeebb5 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Mon, 23 Sep 2024 09:55:27 +0000 Subject: [PATCH] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- tests/pytorch/distributed/run_numerics.py | 154 +++++++++++++-------- tests/pytorch/distributed/test_numerics.py | 12 +- 2 files changed, 102 insertions(+), 64 deletions(-) diff --git a/tests/pytorch/distributed/run_numerics.py b/tests/pytorch/distributed/run_numerics.py index a332276fe7..fb169d93bd 100644 --- a/tests/pytorch/distributed/run_numerics.py +++ b/tests/pytorch/distributed/run_numerics.py @@ -42,7 +42,7 @@ def main(argv=None, namespace=None): parser = argparse.ArgumentParser() parser.add_argument("-l", "--layer-type", type=str) - parser.add_argument("--fp8", action='store_true', default=False) + parser.add_argument("--fp8", action="store_true", default=False) args = parser.parse_args(argv, namespace) test_dict = { @@ -50,7 +50,7 @@ def main(argv=None, namespace=None): "layernorm": test_layernorm, "layernorm_linear": test_layernorm_linear, "layernorm_mlp": test_layernorm_mlp, - "transformer_layer": test_transformer_layer + "transformer_layer": test_transformer_layer, } assert args.layer_type in test_dict, f"Argument --layer-type must be in {test_dict.keys()}" @@ -61,6 +61,7 @@ def main(argv=None, namespace=None): dist.destroy_process_group() return output_value + def run_distributed_test(test_name=None): def decorator(func): @wraps(func) @@ -75,14 +76,18 @@ def wrapper(*args, **kwargs): dist.barrier() dist_print(f"Passed test {name}") + return wrapper + return decorator + def _gather(tensor, dim=0): """ - Gathers tensors and concats them. Since torch.distributed.nn.functional.all_gather - multiplies gradients by WORLD_SIZE, those gradiedts are rescaled. + Gathers tensors and concats them. Since torch.distributed.nn.functional.all_gather + multiplies gradients by WORLD_SIZE, those gradiedts are rescaled. """ + class HalfGradient(torch.autograd.Function): @staticmethod def forward(ctx, input): @@ -97,27 +102,32 @@ def backward(ctx, grad_output): gathered = torch.distributed.nn.functional.all_gather(tensor, group=NCCL_WORLD) return torch.cat(gathered, dim=dim) + def _constant(tensor): return nn.init.constant_(tensor, 0.5) + def dist_print(msg, src=None, end="\n", error=False): stream = sys.stderr if error else sys.stdout if WORLD_RANK == (0 if src is None else src): stream.write(f"[rank{WORLD_RANK}] {msg}{end}\n") dist.barrier() + def _check_outputs(output_single_node, output_distributed): numerics_failed = torch.tensor([0], dtype=torch.uint8, device="cuda") rtol = 0.125 if FP8 else 0.025 atol = 0.0625 if FP8 else 0.00125 output_failed, output_info = _compare_tensors( - "outputs", output_distributed, output_single_node, rtol, atol) + "outputs", output_distributed, output_single_node, rtol, atol + ) dist_print(output_info, src=WORLD_RANK, error=output_failed) numerics_failed[0] = int(output_failed) dist.all_reduce(numerics_failed, dist.ReduceOp.MAX, NCCL_WORLD) if bool(numerics_failed.item()): sys.exit(1) + def _match_param_sizes(dist_param, single_param): """ Adjust single_param to match the shape of dist_param @@ -155,9 +165,11 @@ def _match_param_sizes(dist_param, single_param): return to_output + def _check_gradients(model_distributed, model_single, main_grad_check=False): - for i, ((name, param_d), param_s) in \ - enumerate(zip(model_distributed.named_parameters(), model_single.parameters())): + for i, ((name, param_d), param_s) in enumerate( + zip(model_distributed.named_parameters(), model_single.parameters()) + ): dist_print(i) dist_print(name) numerics_failed = torch.tensor([0], dtype=torch.uint8, device="cuda") @@ -167,11 +179,13 @@ def _check_gradients(model_distributed, model_single, main_grad_check=False): if main_grad_check: param_s_grad = _match_param_sizes(param_d.main_grad, param_s.main_grad) grad_failed, grad_info = _compare_tensors( - str(i), param_d.main_grad, param_s_grad, rtol, atol) + str(i), param_d.main_grad, param_s_grad, rtol, atol + ) else: param_s_grad = _match_param_sizes(param_d.grad, param_s.grad) grad_failed, grad_info = _compare_tensors( - str(i), param_d.grad, param_s_grad, rtol, atol) + str(i), param_d.grad, param_s_grad, rtol, atol + ) dist_print(grad_info, src=WORLD_RANK, error=grad_failed) numerics_failed[0] = int(grad_failed) @@ -179,44 +193,47 @@ def _check_gradients(model_distributed, model_single, main_grad_check=False): if bool(numerics_failed.item()): sys.exit(1) + def _copy_params(model_distributed, model_single): - for dist_param, single_param in \ - zip(model_distributed.parameters(), model_single.parameters()): + for dist_param, single_param in zip(model_distributed.parameters(), model_single.parameters()): with torch.no_grad(): to_copy = single_param for dim, _ in enumerate(dist_param.shape): if dist_param.shape[dim] != single_param.shape[dim]: src_slice = slice( - WORLD_RANK * dist_param.shape[dim], - (WORLD_RANK + 1) * dist_param.shape[dim] + WORLD_RANK * dist_param.shape[dim], (WORLD_RANK + 1) * dist_param.shape[dim] ) - indices = [slice(None)] * max( - min(dim, len(dist_param.shape) - 1), 0) + indices = [slice(None)] * max(min(dim, len(dist_param.shape) - 1), 0) indices.append(src_slice) if dim < len(dist_param.shape) - 1: indices.append(slice(None)) to_copy = single_param[tuple(indices)] dist_param.copy_(to_copy) -def _apply_models(model_single_node, model_distributed, - input_single_node, input_distributed, **kwargs): - _alloc_main_grad(model_single_node, model_distributed) # for fuse_wgrad_accumulation=True + +def _apply_models( + model_single_node, model_distributed, input_single_node, input_distributed, **kwargs +): + _alloc_main_grad(model_single_node, model_distributed) # for fuse_wgrad_accumulation=True with te.fp8_autocast(enabled=FP8, fp8_recipe=fp8_recipe): output_single_node = model_single_node(input_single_node, **kwargs) with te.fp8_autocast(enabled=FP8, fp8_recipe=fp8_recipe, fp8_group=NCCL_WORLD): output_distributed = model_distributed(input_distributed, **kwargs) return output_single_node, output_distributed + def _loss_backward(output_single_node, output_distributed): target = torch.randn_like(output_single_node) LOSS_FN(output_single_node, target).backward() LOSS_FN(output_distributed, target).backward() + def _alloc_main_grad(model_single_node, model_distributed): for model in [model_single_node, model_distributed]: for param in model.parameters(): param.main_grad = torch.zeros_like(param, dtype=torch.float32) + ############################################ # Linear # ############################################ @@ -241,7 +258,7 @@ def _test_linear(parallel_mode=None, sequence_parallel=False, **kwargs): tp_group=NCCL_WORLD, parallel_mode=parallel_mode, sequence_parallel=sequence_parallel, - **kwargs + **kwargs, ) # Synchronize parameters between models @@ -259,8 +276,9 @@ def _test_linear(parallel_mode=None, sequence_parallel=False, **kwargs): elif parallel_mode == "column": if sequence_parallel: # Duplicate input for sequence parallelism - input_single_node = torch.empty( - (WORLD_SIZE * BATCH_SIZE, HIDDEN_SIZE)).cuda().to(params_dtype) + input_single_node = ( + torch.empty((WORLD_SIZE * BATCH_SIZE, HIDDEN_SIZE)).cuda().to(params_dtype) + ) input_distributed = torch.randn((BATCH_SIZE, HIDDEN_SIZE)).cuda().to(params_dtype) input_single_node[:BATCH_SIZE, :] = input_distributed input_single_node[BATCH_SIZE:, :] = input_distributed @@ -274,11 +292,10 @@ def _test_linear(parallel_mode=None, sequence_parallel=False, **kwargs): model_single_node, model_distributed, input_single_node, input_distributed ) - if "return_bias" in kwargs: output_single_node, bias_s = output_single_node output_distributed, bias_d = output_distributed - if parallel_mode=="column": + if parallel_mode == "column": bias_d = _gather(bias_d) _check_outputs(bias_s, bias_d) @@ -297,9 +314,10 @@ def _test_linear(parallel_mode=None, sequence_parallel=False, **kwargs): _check_gradients( model_distributed, model_single_node, - main_grad_check=("fuse_wgrad_accumulation" in kwargs) + main_grad_check=("fuse_wgrad_accumulation" in kwargs), ) + def test_linear(): """Run linear layer tests with various configurations.""" kwargs_list = [ @@ -315,10 +333,12 @@ def test_linear(): for sequence_parallel in [False, True]: _test_linear(parallel_mode, sequence_parallel, **kwargs) + ############################################ # LayerNorm # ############################################ + @run_distributed_test() def _test_layernorm(kwargs): """Test LayerNorm and RMSNorm with given arguments. @@ -327,9 +347,9 @@ def _test_layernorm(kwargs): kwargs (dict): Contains 'norm', 'basic_args', and 'distributed_args'. """ # Extract parameters - norm = kwargs['norm'] - basic_args = kwargs['basic_args'] - distributed_args = kwargs['distributed_args'] + norm = kwargs["norm"] + basic_args = kwargs["basic_args"] + distributed_args = kwargs["distributed_args"] params_dtype = basic_args.get("params_dtype", torch.float32) # Create models @@ -355,6 +375,7 @@ def _test_layernorm(kwargs): _check_outputs(output_single_node, output_distributed) _check_gradients(model_distributed, model_single_node) + def test_layernorm(): """Run LayerNorm and RMSNorm tests with various configurations.""" norms = [te.LayerNorm, te.RMSNorm] @@ -376,16 +397,18 @@ def test_layernorm(): for basic_args in basic_args_list: for distributed_args in distributed_args_list: kwargs = { - 'norm': norm, - 'basic_args': basic_args, - 'distributed_args': distributed_args, + "norm": norm, + "basic_args": basic_args, + "distributed_args": distributed_args, } _test_layernorm(kwargs) + ############################################ # LayerNormLinear # ############################################ + @run_distributed_test() def _test_layernorm_linear(parallel_mode=None, sequence_parallel=False, **kwargs): """Test the linear layer with specified parallel mode and sequence parallelization. @@ -407,7 +430,7 @@ def _test_layernorm_linear(parallel_mode=None, sequence_parallel=False, **kwargs tp_group=NCCL_WORLD, parallel_mode=parallel_mode, sequence_parallel=sequence_parallel, - **kwargs + **kwargs, ) # Synchronize parameters between models @@ -416,7 +439,6 @@ def _test_layernorm_linear(parallel_mode=None, sequence_parallel=False, **kwargs # Prepare input tensors input_single_node = torch.randn((SEQ_LEN, HIDDEN_SIZE)).cuda().to(params_dtype) - if sequence_parallel: # Duplicate input for sequence parallelism input_single_node = torch.empty((WORLD_SIZE * SEQ_LEN, HIDDEN_SIZE)).cuda().to(params_dtype) @@ -440,7 +462,7 @@ def _test_layernorm_linear(parallel_mode=None, sequence_parallel=False, **kwargs if "return_bias" in kwargs: output_single_node, bias_s = output_single_node output_distributed, bias_d = output_distributed - if parallel_mode=="column": + if parallel_mode == "column": bias_d = _gather(bias_d) _check_outputs(bias_s, bias_d) @@ -459,7 +481,7 @@ def _test_layernorm_linear(parallel_mode=None, sequence_parallel=False, **kwargs _check_gradients( model_distributed, model_single_node, - main_grad_check=("fuse_wgrad_accumulation" in kwargs) + main_grad_check=("fuse_wgrad_accumulation" in kwargs), ) @@ -479,6 +501,7 @@ def test_layernorm_linear(): for sequence_parallel in [False, True]: _test_layernorm_linear(parallel_mode, sequence_parallel, **kwargs) + ############################################ # LayerNormMLP # ############################################ @@ -505,7 +528,7 @@ def _test_layernorm_mlp(set_parallel_mode=None, sequence_parallel=False, **kwarg tp_group=NCCL_WORLD, set_parallel_mode=set_parallel_mode, sequence_parallel=sequence_parallel, - **kwargs + **kwargs, ) # Synchronize parameters between models @@ -514,7 +537,6 @@ def _test_layernorm_mlp(set_parallel_mode=None, sequence_parallel=False, **kwarg # Prepare input tensors input_single_node = torch.randn((SEQ_LEN, HIDDEN_SIZE)).cuda().to(params_dtype) - if sequence_parallel: # Duplicate input for sequence parallelism input_single_node = torch.empty((WORLD_SIZE * SEQ_LEN, HIDDEN_SIZE)).cuda().to(params_dtype) @@ -528,7 +550,6 @@ def _test_layernorm_mlp(set_parallel_mode=None, sequence_parallel=False, **kwarg model_single_node, model_distributed, input_single_node, input_distributed ) - if "return_layernorm_output" in kwargs: output_single_node, norm_s = output_single_node output_distributed, norm_d = output_distributed @@ -555,22 +576,23 @@ def _test_layernorm_mlp(set_parallel_mode=None, sequence_parallel=False, **kwarg _check_gradients( model_distributed, model_single_node, - main_grad_check=("fuse_wgrad_accumulation" in kwargs) + main_grad_check=("fuse_wgrad_accumulation" in kwargs), ) + def test_layernorm_mlp(): kwargs_list = [ {}, {"init_method": _constant}, {"output_layer_init_method": _constant}, - {"normalization": 'RMSNorm'}, + {"normalization": "RMSNorm"}, {"zero_centered_gamma": True}, {"bias": False}, {"params_dtype": torch.float16}, - {"activation": 'relu'}, + {"activation": "relu"}, {"fuse_wgrad_accumulation": True}, {"return_bias": True}, - {"return_layernorm_output": True} + {"return_layernorm_output": True}, ] for kwargs in kwargs_list: for set_parallel_mode in [True]: @@ -582,32 +604,43 @@ def test_layernorm_mlp(): # TransformerLayer # ############################################ + @run_distributed_test() def _test_transformer_layer_parallel(sequence_parallel=False, **kwargs): params_dtype = kwargs.get("params_dtype", torch.float32) model_single_node = te.TransformerLayer( - HIDDEN_SIZE, FFN_HIDDEN_SIZE, NR_HEADS, - attention_dropout=0, hidden_dropout=0, **kwargs) + HIDDEN_SIZE, FFN_HIDDEN_SIZE, NR_HEADS, attention_dropout=0, hidden_dropout=0, **kwargs + ) model_distributed = te.TransformerLayer( - HIDDEN_SIZE, FFN_HIDDEN_SIZE, NR_HEADS, tp_size=WORLD_SIZE, - tp_group=NCCL_WORLD, set_parallel_mode=True, sequence_parallel=sequence_parallel, + HIDDEN_SIZE, + FFN_HIDDEN_SIZE, + NR_HEADS, + tp_size=WORLD_SIZE, + tp_group=NCCL_WORLD, + set_parallel_mode=True, + sequence_parallel=sequence_parallel, seq_length=WORLD_SIZE * SEQ_LEN if sequence_parallel else None, - attention_dropout=0, hidden_dropout=0, **kwargs) + attention_dropout=0, + hidden_dropout=0, + **kwargs, + ) _copy_params(model_distributed, model_single_node) - _alloc_main_grad(model_single_node, model_distributed) # for fuse_wgrad_accumulation=True + _alloc_main_grad(model_single_node, model_distributed) # for fuse_wgrad_accumulation=True - input_single_node = torch.randn( - (WORLD_SIZE * SEQ_LEN, BATCH_SIZE, HIDDEN_SIZE)).cuda().to(params_dtype) + input_single_node = ( + torch.randn((WORLD_SIZE * SEQ_LEN, BATCH_SIZE, HIDDEN_SIZE)).cuda().to(params_dtype) + ) if sequence_parallel: # Duplicate input for sequence parallelism - input_single_node = torch.randn( - (WORLD_SIZE * SEQ_LEN, BATCH_SIZE, HIDDEN_SIZE)).cuda().to(params_dtype) + input_single_node = ( + torch.randn((WORLD_SIZE * SEQ_LEN, BATCH_SIZE, HIDDEN_SIZE)).cuda().to(params_dtype) + ) if WORLD_RANK == 0: input_distributed = input_single_node[:SEQ_LEN, :, :] else: - input_distributed = input_single_node[SEQ_LEN:, :, :] + input_distributed = input_single_node[SEQ_LEN:, :, :] else: input_distributed = input_single_node.clone().cuda() @@ -615,10 +648,12 @@ def _test_transformer_layer_parallel(sequence_parallel=False, **kwargs): if "layer_type" in kwargs: encoder_output = torch.randn((SEQ_LEN, BATCH_SIZE, HIDDEN_SIZE)).cuda() - output_single_node, output_distributed = _apply_models( - model_single_node, model_distributed, input_single_node, - input_distributed, encoder_output=encoder_output + model_single_node, + model_distributed, + input_single_node, + input_distributed, + encoder_output=encoder_output, ) if sequence_parallel: @@ -632,7 +667,7 @@ def _test_transformer_layer_parallel(sequence_parallel=False, **kwargs): _check_gradients( model_distributed, model_single_node, - main_grad_check=("fuse_wgrad_accumulation" in kwargs) + main_grad_check=("fuse_wgrad_accumulation" in kwargs), ) @@ -645,9 +680,9 @@ def test_transformer_layer(): {"apply_residual_connection_post_layernorm": True}, {"output_layernorm": True}, {"parallel_attention_mlp": True}, - {"layer_type": 'decoder'}, + {"layer_type": "decoder"}, {"window_size": (2, 2)}, - {"normalization": 'RMSNorm'}, + {"normalization": "RMSNorm"}, {"zero_centered_gamma": True}, {"fuse_qkv_params": True}, {"fuse_qkv_params": True, "fuse_wgrad_accumulation": True}, @@ -655,11 +690,12 @@ def test_transformer_layer(): {"bias": False}, {"params_dtype": torch.float16}, {"fuse_qkv_params": True}, - {"activation": 'relu'}, + {"activation": "relu"}, ] for kwargs in kwargs_list: for sequence_parallel in [False, True]: _test_transformer_layer_parallel(sequence_parallel, **kwargs) + if __name__ == "__main__": sys.exit(main()) diff --git a/tests/pytorch/distributed/test_numerics.py b/tests/pytorch/distributed/test_numerics.py index bb1edc7451..dc406d7ad3 100644 --- a/tests/pytorch/distributed/test_numerics.py +++ b/tests/pytorch/distributed/test_numerics.py @@ -23,7 +23,6 @@ """ - if torch.cuda.device_count() < 2: pytest.skip("Distributed training needs at least 2 GPUs.") @@ -33,12 +32,10 @@ NUM_PROCS: int = min(torch.cuda.device_count(), 4) LAUNCH_CMD = ["torchrun", f"--nproc_per_node={NUM_PROCS}"] + def _run_test(layer, fp8): test_path = TEST_ROOT / "run_numerics.py" - test_cmd = LAUNCH_CMD + [ - str(test_path), - f"--layer-type={layer}" - ] + test_cmd = LAUNCH_CMD + [str(test_path), f"--layer-type={layer}"] if fp8: test_cmd += ["--fp8=true"] @@ -54,30 +51,35 @@ def _run_test(layer, fp8): all_boolean = [True, False] + @pytest.mark.parametrize("fp8", all_boolean) def test_linear(fp8): if fp8 and not fp8_available: pytest.skip(reason_for_no_fp8) _run_test("linear", fp8) + @pytest.mark.parametrize("fp8", all_boolean) def test_layernorm(fp8): if fp8 and not fp8_available: pytest.skip(reason_for_no_fp8) _run_test("layernorm", fp8) + @pytest.mark.parametrize("fp8", all_boolean) def test_layernorm_linear(fp8): if fp8 and not fp8_available: pytest.skip(reason_for_no_fp8) _run_test("layernorm_linear", fp8) + @pytest.mark.parametrize("fp8", all_boolean) def test_layernorm_mlp(fp8): if fp8 and not fp8_available: pytest.skip(reason_for_no_fp8) _run_test("layernorm_mlp", fp8) + @pytest.mark.parametrize("fp8", all_boolean) def test_transformer_layer(fp8): if fp8 and not fp8_available: