From b4b1686fecd2805a98a89cd7813ac2fa4957837f Mon Sep 17 00:00:00 2001 From: noob_wcy <55313095+vocaltract@users.noreply.github.com> Date: Mon, 26 Aug 2024 06:43:20 +0800 Subject: [PATCH] feat: add nccl stream fetch api and add dependency version limit (#48) 1. add nccl stream fetch api in pytorch patches 2. add dependency version limit about numpy and pytest in torch_patch and vescale requirements --- patches/patched_pytorch_v2.2.1_rc3.patch | 363 +++++++++++++++++++++-- requirements.txt | 2 +- 2 files changed, 340 insertions(+), 25 deletions(-) diff --git a/patches/patched_pytorch_v2.2.1_rc3.patch b/patches/patched_pytorch_v2.2.1_rc3.patch index 747f3b9..03f8d93 100644 --- a/patches/patched_pytorch_v2.2.1_rc3.patch +++ b/patches/patched_pytorch_v2.2.1_rc3.patch @@ -1,5 +1,5 @@ diff --git a/aten/src/ATen/FunctionalInverses.cpp b/aten/src/ATen/FunctionalInverses.cpp -index af0e5af..9896f16 100644 +index af0e5af3be..9896f16a84 100644 --- a/aten/src/ATen/FunctionalInverses.cpp +++ b/aten/src/ATen/FunctionalInverses.cpp @@ -151,6 +151,12 @@ Tensor FunctionalInverses::expand_copy_inverse(const Tensor& base, const Tensor& @@ -16,7 +16,7 @@ index af0e5af..9896f16 100644 return at::functionalization::permute_copy_inverse(mutated_view, dims, reapply_views); } diff --git a/aten/src/ATen/cuda/CUDAGeneratorImpl.cpp b/aten/src/ATen/cuda/CUDAGeneratorImpl.cpp -index b8004ec..45869fe 100644 +index b8004ec7e7..45869fe392 100644 --- a/aten/src/ATen/cuda/CUDAGeneratorImpl.cpp +++ b/aten/src/ATen/cuda/CUDAGeneratorImpl.cpp @@ -137,6 +137,32 @@ uint64_t CUDAGeneratorImpl::get_offset() const { @@ -137,7 +137,7 @@ index b8004ec..45869fe 100644 } diff --git a/aten/src/ATen/cuda/CUDAGeneratorImpl.h b/aten/src/ATen/cuda/CUDAGeneratorImpl.h -index 2fe8a6f..874ef15 100644 +index 2fe8a6f6c8..874ef15015 100644 --- a/aten/src/ATen/cuda/CUDAGeneratorImpl.h +++ b/aten/src/ATen/cuda/CUDAGeneratorImpl.h @@ -87,6 +87,13 @@ namespace at { @@ -183,7 +183,7 @@ index 2fe8a6f..874ef15 100644 std::atomic_flag no_reset_rnn_state_; }; diff --git a/aten/src/ATen/functorch/BatchRulesDecompositions.cpp b/aten/src/ATen/functorch/BatchRulesDecompositions.cpp -index 1b179a5..b1beaa6 100644 +index 1b179a505e..b1beaa67ae 100644 --- a/aten/src/ATen/functorch/BatchRulesDecompositions.cpp +++ b/aten/src/ATen/functorch/BatchRulesDecompositions.cpp @@ -296,7 +296,7 @@ TORCH_LIBRARY_IMPL(aten, FuncTorchBatchedDecomposition, m) { @@ -196,7 +196,7 @@ index 1b179a5..b1beaa6 100644 OP_DECOMPOSE2(var, dim); OP_DECOMPOSE(var_mean); diff --git a/aten/src/ATen/native/Onehot.cpp b/aten/src/ATen/native/Onehot.cpp -index 41b7a69..26fd097 100644 +index 41b7a69618..26fd0979c3 100644 --- a/aten/src/ATen/native/Onehot.cpp +++ b/aten/src/ATen/native/Onehot.cpp @@ -5,7 +5,9 @@ @@ -228,7 +228,7 @@ index 41b7a69..26fd097 100644 // empty tensor could be converted to one hot representation, diff --git a/aten/src/ATen/native/ReduceOps.cpp b/aten/src/ATen/native/ReduceOps.cpp -index 7a47490..a2c54db 100644 +index 7a47490c67..a2c54db942 100644 --- a/aten/src/ATen/native/ReduceOps.cpp +++ b/aten/src/ATen/native/ReduceOps.cpp @@ -2228,26 +2228,21 @@ bool cpu_equal(const Tensor& self, const Tensor& other) { @@ -262,7 +262,7 @@ index 7a47490..a2c54db 100644 auto indices_ = indices.unsqueeze(dim); return inplace_scatter_if_not_tensor_subclass(grad_, indices_); diff --git a/aten/src/ATen/native/TensorShape.cpp b/aten/src/ATen/native/TensorShape.cpp -index 0a018fb..a5e4643 100644 +index 0a018fbc8d..a5e4643ae5 100644 --- a/aten/src/ATen/native/TensorShape.cpp +++ b/aten/src/ATen/native/TensorShape.cpp @@ -109,6 +109,7 @@ @@ -297,7 +297,7 @@ index 0a018fb..a5e4643 100644 Tensor sum_to_size_symint(const Tensor& self, SymIntArrayRef size) { diff --git a/aten/src/ATen/native/cuda/DistributionTemplates.h b/aten/src/ATen/native/cuda/DistributionTemplates.h -index 5f38e36..aa95680 100644 +index 5f38e36e07..aa95680ee7 100644 --- a/aten/src/ATen/native/cuda/DistributionTemplates.h +++ b/aten/src/ATen/native/cuda/DistributionTemplates.h @@ -62,32 +62,47 @@ std::tuple calc_execution_policy(int64_t total_elements) { @@ -447,7 +447,7 @@ index 5f38e36..aa95680 100644 C10_CUDA_KERNEL_LAUNCH_CHECK(); } diff --git a/aten/src/ATen/native/cuda/Dropout.cu b/aten/src/ATen/native/cuda/Dropout.cu -index 67ea3e4..938a90a 100644 +index 67ea3e4f83..938a90a83a 100644 --- a/aten/src/ATen/native/cuda/Dropout.cu +++ b/aten/src/ATen/native/cuda/Dropout.cu @@ -56,13 +56,10 @@ fused_dropout_kernel_vec(at::cuda::detail::TensorInfo a, @@ -719,7 +719,7 @@ index 67ea3e4..938a90a 100644 return std::tuple(ret, mask); } diff --git a/aten/src/ATen/native/native_functions.yaml b/aten/src/ATen/native/native_functions.yaml -index 35a1049..604f53a 100644 +index 35a1049e20..604f53ac73 100644 --- a/aten/src/ATen/native/native_functions.yaml +++ b/aten/src/ATen/native/native_functions.yaml @@ -2595,6 +2595,8 @@ @@ -759,8 +759,21 @@ index 35a1049..604f53a 100644 - func: permute_copy(Tensor self, int[] dims) -> Tensor variants: function dispatch: +diff --git a/requirements.txt b/requirements.txt +index 0acf5091cf..899b8a95cc 100644 +--- a/requirements.txt ++++ b/requirements.txt +@@ -2,7 +2,7 @@ + astunparse + expecttest + hypothesis +-numpy ++numpy<2.0.0 + psutil + pyyaml + requests diff --git a/test/distributed/_tensor/test_dtensor.py b/test/distributed/_tensor/test_dtensor.py -index a83efe5..e190c5b 100644 +index a83efe539e..e190c5b97d 100644 --- a/test/distributed/_tensor/test_dtensor.py +++ b/test/distributed/_tensor/test_dtensor.py @@ -109,6 +109,16 @@ class DTensorTest(DTensorTestBase): @@ -780,8 +793,134 @@ index a83efe5..e190c5b 100644 @with_comms def test_modules_w_meta_dtensor(self): model = DummyMLP("meta") +diff --git a/test/distributed/test_c10d_nccl.py b/test/distributed/test_c10d_nccl.py +index 0c630c27a2..caad63856c 100644 +--- a/test/distributed/test_c10d_nccl.py ++++ b/test/distributed/test_c10d_nccl.py +@@ -73,6 +73,50 @@ BFLOAT16_AVAILABLE = ( + ) + ) + ++ ++def get_nccl_p2p_stream(pg: "torch.distributed.ProcessGroup", ++ device, peer, is_batched): ++ nccl_backend = pg._get_backend(device) ++ stream_id = nccl_backend.get_p2p_cuda_stream_id(device.index, peer, is_batched) ++ if stream_id < 0: ++ return None ++ _CUDA_DEVICE = 1 ++ nccl_stream = torch.cuda.Stream(stream_id=stream_id, ++ device_index=device.index, device_type=_CUDA_DEVICE) ++ msg = f"[{pg.rank()}] Retrieved nccl p2p stream id={stream_id} device={device} stream={nccl_stream}" ++ print(msg, flush=True) ++ return nccl_stream ++ ++ ++def get_nccl_coll_stream(pg: "torch.distributed.ProcessGroup", tensor: torch.Tensor): ++ device = tensor.device ++ nccl_backend = pg._get_backend(device) ++ stream_id = nccl_backend.get_coll_cuda_stream_id([tensor]) ++ if stream_id < 0: ++ return None ++ _CUDA_DEVICE = 1 ++ nccl_stream = torch.cuda.Stream(stream_id=stream_id, ++ device_index=device.index, device_type=_CUDA_DEVICE) ++ msg = f"[{pg.rank()}] Retrieved nccl coll stream id={stream_id} device={device} stream={nccl_stream}" ++ print(msg, flush=True) ++ return nccl_stream ++ ++ ++def time_with_stream(stream, fn, *args, **kwargs): ++ torch.cuda.synchronize() ++ start = torch.cuda.Event(enable_timing=True) ++ start.record(stream=stream) ++ fn(*args, **kwargs) ++ end = torch.cuda.Event(enable_timing=True) ++ end.record(stream=stream) ++ # Blocks until nccl stream has completed all operations ++ # Doing a time.sleep has the same effect ++ # import time; time.sleep(10) ++ stream.synchronize() ++ duration = start.elapsed_time(end) ++ return duration ++ ++ + class RendezvousEnvTest(TestCase): + @retry_on_connect_failures + @requires_nccl() +@@ -365,6 +409,12 @@ class ProcessGroupNCCLTest(MultiProcessTestCase): + tensors = [torch.tensor([self.rank + 1.]).cuda(local_device_id)] + + allreduce(tensors, c10d.ReduceOp.AVG) ++ ++ # byted tests for nccl stream event ++ nccl_stream = get_nccl_coll_stream(pg, tensors[0]) ++ duration = time_with_stream(nccl_stream, allreduce, tensors, c10d.ReduceOp.SUM) ++ assert duration > 0 ++ + ndev = self.world_size + self.assertEqual( + torch.tensor([ndev * (ndev + 1.) / (2. * ndev)]), +@@ -576,6 +626,11 @@ class ProcessGroupNCCLTest(MultiProcessTestCase): + + # Verification + self.assertEqual(output_tensors, expected_output) ++ # byted tests for nccl stream event ++ nccl_stream = get_nccl_coll_stream(pg, tensors[0]) ++ duration = time_with_stream(nccl_stream, allgather, output_tensors, tensors) ++ assert duration > 0 ++ + + @requires_nccl() + @skip_but_pass_in_sandcastle_if(torch.cuda.device_count() < 2, "NCCL test requires 2+ GPUs") +@@ -597,6 +652,10 @@ class ProcessGroupNCCLTest(MultiProcessTestCase): + + # Verification + self.assertEqual(torch.arange(self.world_size), output_t) ++ nccl_stream = get_nccl_coll_stream(pg, tensor) ++ duration = time_with_stream(nccl_stream, allgather_base, output_t, tensor) ++ assert duration > 0 ++ + + @requires_nccl() + @skip_but_pass_in_sandcastle_if(torch.cuda.device_count() < 2, "NCCL test requires 2+ GPUs") +@@ -1042,6 +1101,11 @@ class ProcessGroupNCCLTest(MultiProcessTestCase): + reduce_scatter(output_ref, tensor_lists_ref, c10d.ReduceOp.SUM) + self.assertEqual(output_ref, output) + ++ nccl_stream = get_nccl_coll_stream(pg, output_tensor) ++ duration = time_with_stream(nccl_stream, pg.reduce_scatter, output_tensor, input_list, c10d.ReduceOp.SUM) ++ assert duration > 0 ++ ++ + @requires_nccl() + @skip_but_pass_in_sandcastle_if(torch.cuda.device_count() < 2, "NCCL test requires 2+ GPUs") + def test_reduce_scatter_base_ops(self): +@@ -1101,7 +1165,7 @@ class ProcessGroupNCCLTest(MultiProcessTestCase): + @skip_but_pass_in_sandcastle_if(torch.cuda.device_count() < 2, "NCCL test requires 2+ GPUs") + def test_send_recv(self): + store = c10d.FileStore(self.file_name, self.world_size) +- self._create_process_group_nccl(store, self.opts()) ++ pg = self._create_process_group_nccl(store, self.opts()) + device = self.rank_to_GPU[self.rank][0] + + # Generate the same random tensor +@@ -1109,9 +1173,13 @@ class ProcessGroupNCCLTest(MultiProcessTestCase): + send_tensor = torch.rand(10, 10, device=device) + if self.rank == 0: + dist.send(send_tensor, 1) ++ fn = dist.send ++ tensor = send_tensor + if self.rank == 1: + recv_tensor = torch.rand(10, 10, device=device) + dist.recv(recv_tensor, 0) ++ fn = dist.recv ++ tensor = recv_tensor + self.assertEqual(send_tensor, recv_tensor) + + @requires_nccl() diff --git a/tools/autograd/derivatives.yaml b/tools/autograd/derivatives.yaml -index 2c6886a..6d65124 100644 +index 2c6886a36c..6d65124935 100644 --- a/tools/autograd/derivatives.yaml +++ b/tools/autograd/derivatives.yaml @@ -892,7 +892,7 @@ @@ -855,7 +994,7 @@ index 2c6886a..6d65124 100644 values: gather(self_t, dim, indices) diff --git a/tools/autograd/gen_inplace_or_view_type.py b/tools/autograd/gen_inplace_or_view_type.py -index ee1075c..fea1c39 100644 +index ee1075cbed..fea1c39901 100644 --- a/tools/autograd/gen_inplace_or_view_type.py +++ b/tools/autograd/gen_inplace_or_view_type.py @@ -315,6 +315,7 @@ def get_view_info(f: NativeFunction) -> Optional[str]: @@ -875,7 +1014,7 @@ index ee1075c..fea1c39 100644 # [NOTE] [Nested Arg Types] # This is temporary. Nested tensors will be migrating to use SymInts and diff --git a/torch/_dynamo/variables/distributed.py b/torch/_dynamo/variables/distributed.py -index 54ad1cd..47605d9 100644 +index 54ad1cdf9b..47605d96ae 100644 --- a/torch/_dynamo/variables/distributed.py +++ b/torch/_dynamo/variables/distributed.py @@ -24,9 +24,7 @@ class DistributedVariable(VariableTracker): @@ -1006,7 +1145,7 @@ index 54ad1cd..47605d9 100644 return super().var_getattr(tx, name) diff --git a/torch/_dynamo/variables/misc.py b/torch/_dynamo/variables/misc.py -index e5cf6f6..755e28f 100644 +index e5cf6f6673..755e28f331 100644 --- a/torch/_dynamo/variables/misc.py +++ b/torch/_dynamo/variables/misc.py @@ -266,6 +266,64 @@ class NewGlobalVariable(VariableTracker): @@ -1131,7 +1270,7 @@ index e5cf6f6..755e28f 100644 def produce_trampoline_autograd_fwd(fn_cls): diff --git a/torch/_functorch/_aot_autograd/dispatch_and_compile_graph.py b/torch/_functorch/_aot_autograd/dispatch_and_compile_graph.py -index 16eef07..ce82a26 100644 +index 16eef07af0..ce82a2675d 100644 --- a/torch/_functorch/_aot_autograd/dispatch_and_compile_graph.py +++ b/torch/_functorch/_aot_autograd/dispatch_and_compile_graph.py @@ -102,9 +102,10 @@ def aot_dispatch_base_graph( @@ -1149,7 +1288,7 @@ index 16eef07..ce82a26 100644 return fw_module, list(updated_flat_args_subclasses_desugared), maybe_subclass_meta diff --git a/torch/_functorch/_aot_autograd/subclass_utils.py b/torch/_functorch/_aot_autograd/subclass_utils.py -index 0514c1c..4d813fe 100644 +index 0514c1c4d5..4d813fe64b 100644 --- a/torch/_functorch/_aot_autograd/subclass_utils.py +++ b/torch/_functorch/_aot_autograd/subclass_utils.py @@ -16,6 +16,27 @@ from .utils import strict_zip @@ -1181,7 +1320,7 @@ index 0514c1c..4d813fe 100644 def requires_subclass_dispatch(args, fw_metadata: ViewAndMutationMeta) -> bool: args_flattened = pytree.arg_tree_leaves(*args) diff --git a/torch/_functorch/aot_autograd.py b/torch/_functorch/aot_autograd.py -index 837fe2a..b38b2c2 100644 +index 837fe2ab4b..b38b2c2bed 100644 --- a/torch/_functorch/aot_autograd.py +++ b/torch/_functorch/aot_autograd.py @@ -511,6 +511,8 @@ def create_aot_dispatcher_function( @@ -1204,7 +1343,7 @@ index 837fe2a..b38b2c2 100644 aot_export is not currently supported with traceable tensor subclass. If you need this feature, please comment on """) diff --git a/torch/_guards.py b/torch/_guards.py -index 69912b1..4f00d53 100644 +index 69912b1531..4f00d53b88 100644 --- a/torch/_guards.py +++ b/torch/_guards.py @@ -817,8 +817,16 @@ def detect_fake_mode(inputs: Any = None): @@ -1225,7 +1364,7 @@ index 69912b1..4f00d53 100644 if fake_modes: fake_mode, desc1, i1 = fake_modes[0] diff --git a/torch/_tensor.py b/torch/_tensor.py -index 3aa0cee..dd76e76 100644 +index 3aa0cee639..dd76e76e84 100644 --- a/torch/_tensor.py +++ b/torch/_tensor.py @@ -107,6 +107,7 @@ class Tensor(torch._C.TensorBase): @@ -1237,7 +1376,7 @@ index 3aa0cee..dd76e76 100644 new_tensor = self.clone() if type(new_tensor) is not type(self): diff --git a/torch/csrc/autograd/python_variable.cpp b/torch/csrc/autograd/python_variable.cpp -index ba0e913..0335434 100644 +index ba0e913896..0335434fbe 100644 --- a/torch/csrc/autograd/python_variable.cpp +++ b/torch/csrc/autograd/python_variable.cpp @@ -656,9 +656,9 @@ static PyObject* THPVariable_make_wrapper_subclass( @@ -1277,8 +1416,184 @@ index ba0e913..0335434 100644 -} +} \ No newline at end of file +diff --git a/torch/csrc/distributed/c10d/ProcessGroupNCCL.cpp b/torch/csrc/distributed/c10d/ProcessGroupNCCL.cpp +index 6652a991d7..af9ed64f72 100644 +--- a/torch/csrc/distributed/c10d/ProcessGroupNCCL.cpp ++++ b/torch/csrc/distributed/c10d/ProcessGroupNCCL.cpp +@@ -3895,6 +3895,117 @@ c10::intrusive_ptr ProcessGroupNCCL::_allgather_base( + avoidRecordStreams); + } + ++int64_t ProcessGroupNCCL::getCollCudaStreamId( ++ std::vector& inputs) { ++ const auto devices = getDeviceList(inputs); ++ const bool is_single_tensor = (inputs.size() == 1); ++ TORCH_INTERNAL_ASSERT( ++ is_single_tensor, ++ "Expected only one input tensor, multiple tensors are given." ++ ); ++ const auto key = getKeyFromDevices(devices); ++ ++ LOG(INFO) << "getCollCudaStreamId: keys: ("; ++ for (const auto& pair : ncclStreams_) { ++ LOG(INFO) <<"<"<< pair.first << ">, "; ++ } ++ LOG(INFO) << ")" << std::endl; ++ LOG(INFO) << "the key to look for: <" << key << ">" << std::endl; ++ if (ncclStreams_.find(key) == ncclStreams_.end()) { ++ // not found ++ return -1; ++ } ++ ++ auto& ncclStreams = ncclStreams_[key]; ++ at::cuda::CUDAStream& ncclStream = ncclStreams[0]; ++ return ncclStream.id(); ++} ++ ++uint64_t ProcessGroupNCCL::_getCollCudaStream( ++ std::vector& inputs) { ++ const auto devices = getDeviceList(inputs); ++ const bool is_single_tensor = (inputs.size() == 1); ++ TORCH_INTERNAL_ASSERT( ++ is_single_tensor, ++ "Expected only one input tensor, multiple tensors are given." ++ ); ++ const auto key = getKeyFromDevices(devices); ++ ++ LOG(INFO) << "_getCollCudaStream: keys: ("; ++ for (const auto& pair : ncclStreams_) { ++ LOG(INFO) <<"<"<< pair.first << ">, "; ++ } ++ LOG(INFO) << ")" << std::endl; ++ LOG(INFO) << "key to loo for: <" << key << ">" << std::endl; ++ ++ if (ncclStreams_.find(key) == ncclStreams_.end()) { ++ // not found. return nullptr ++ return 0; ++ } ++ ++ auto& ncclStreams = ncclStreams_[key]; ++ at::cuda::CUDAStream& ncclStream = ncclStreams[0]; ++ return reinterpret_cast(ncclStream.stream()); ++} ++ ++int64_t ProcessGroupNCCL::getP2PCudaStreamId( ++ int device_idx, ++ int peer, ++ int is_batched) { ++ const auto device = getDeviceForRank(device_idx); ++ std::vector devices = {device}; ++ std::string key; ++ if (is_batched == 1) { ++ key = getKeyFromDevices(devices); ++ } else { ++ key = getKeySendRecv(rank_, peer); ++ } ++ // print existing keys in ncclStreams_ ++ LOG(INFO) << "_getP2PCudaStreamId: keys: ("; ++ for (const auto& pair : ncclStreams_) { ++ LOG(INFO) <<"<"<< pair.first << ">, "; ++ } ++ LOG(INFO) << ")" << std::endl; ++ LOG(INFO) << "P2P is_batched: " << is_batched << ", the key to look for: <" ++ << key << ">" << std::endl; ++ if (ncclStreams_.find(key) == ncclStreams_.end()) { ++ // not found ++ return -1; ++ } ++ auto& ncclStreams = ncclStreams_[key]; ++ at::cuda::CUDAStream& ncclStream = ncclStreams[0]; ++ return ncclStream.id(); ++} ++ ++uint64_t ProcessGroupNCCL::_getP2PCudaStream( ++ int device_idx, ++ int peer, ++ int is_batched) { ++ const auto device = getDeviceForRank(device_idx); ++ std::vector devices = {device}; ++ std::string key; ++ if (is_batched == 1) { ++ key = getKeyFromDevices(devices); ++ } else { ++ key = getKeySendRecv(rank_, peer); ++ } ++ LOG(INFO) << "_getP2PCudaStream: keys: ("; ++ for (const auto& pair : ncclStreams_) { ++ LOG(INFO) <<"<"<< pair.first << ">, "; ++ } ++ LOG(INFO) << ")"; ++ LOG(INFO) << "P2P is_batched: " << is_batched << ", the key to look for: <" ++ << key << ">"; ++ if (ncclStreams_.find(key) == ncclStreams_.end()) { ++ // not found ++ return -1; ++ } ++ auto& ncclStreams = ncclStreams_[key]; ++ at::cuda::CUDAStream& ncclStream = ncclStreams[0]; ++ return reinterpret_cast(ncclStream.stream()); ++} ++ ++ + } // namespace c10d + + #endif // USE_C10D_NCCL +diff --git a/torch/csrc/distributed/c10d/ProcessGroupNCCL.hpp b/torch/csrc/distributed/c10d/ProcessGroupNCCL.hpp +index 3567cb3572..2f3995bfc3 100644 +--- a/torch/csrc/distributed/c10d/ProcessGroupNCCL.hpp ++++ b/torch/csrc/distributed/c10d/ProcessGroupNCCL.hpp +@@ -540,6 +540,14 @@ class TORCH_API ProcessGroupNCCL : public Backend { + // instead of relying on ProcessGroupNCCL destructor. + void abort(c10::optional abortReason = c10::nullopt); + ++ int64_t getCollCudaStreamId(std::vector& inputs); ++ ++ int64_t getP2PCudaStreamId(int device_idx, int peer, int is_batched); ++ ++ uint64_t _getCollCudaStream(std::vector& inputs); ++ ++ uint64_t _getP2PCudaStream(int device_idx, int peer, int is_batched); ++ + void shutdown(); + + protected: +diff --git a/torch/csrc/distributed/c10d/init.cpp b/torch/csrc/distributed/c10d/init.cpp +index 3296bd3754..c1b3c7eab5 100644 +--- a/torch/csrc/distributed/c10d/init.cpp ++++ b/torch/csrc/distributed/c10d/init.cpp +@@ -2288,6 +2288,30 @@ options :class:`~torch.distributed.ProcessGroupNCCL.Options`). + py::call_guard()) + .def("_group_start", &::c10d::ProcessGroupNCCL::groupStart) + .def("_group_end", &::c10d::ProcessGroupNCCL::groupEnd) ++ .def( ++ "_get_coll_cuda_stream", ++ &::c10d::ProcessGroupNCCL::_getCollCudaStream, ++ py::arg("tensor"), ++ py::call_guard()) ++ .def( ++ "_get_p2p_cuda_stream", ++ &::c10d::ProcessGroupNCCL::_getP2PCudaStream, ++ py::arg("device_idx"), ++ py::arg("peer"), ++ py::arg("is_batched"), ++ py::call_guard()) ++ .def( ++ "get_coll_cuda_stream_id", ++ &::c10d::ProcessGroupNCCL::getCollCudaStreamId, ++ py::arg("tensor"), ++ py::call_guard()) ++ .def( ++ "get_p2p_cuda_stream_id", ++ &::c10d::ProcessGroupNCCL::getP2PCudaStreamId, ++ py::arg("device_idx"), ++ py::arg("peer"), ++ py::arg("is_batched"), ++ py::call_guard()) + .def( + "comm_split_count", + &::c10d::ProcessGroupNCCL::getCommSplitCounter) diff --git a/torch/distributed/_functional_collectives.py b/torch/distributed/_functional_collectives.py -index a0e0229..f76fded 100644 +index a0e02292cf..f76fded484 100644 --- a/torch/distributed/_functional_collectives.py +++ b/torch/distributed/_functional_collectives.py @@ -128,6 +128,62 @@ def wait_tensor(tensor): @@ -1380,7 +1695,7 @@ index a0e0229..f76fded 100644 "all_reduce(Tensor self, str reduceOp, str tag, int[] ranks, int group_size) -> Tensor", "all_reduce_coalesced(Tensor[] self, str reduceOp, str tag, int[] ranks, int group_size) -> Tensor[]", diff --git a/torch/distributed/_functional_collectives_impl.py b/torch/distributed/_functional_collectives_impl.py -index f14ad5b..0444565 100644 +index f14ad5b067..04445656e7 100644 --- a/torch/distributed/_functional_collectives_impl.py +++ b/torch/distributed/_functional_collectives_impl.py @@ -138,6 +138,37 @@ def _str_to_reduce_op(reduceOp: str) -> dist.ReduceOp: @@ -1422,7 +1737,7 @@ index f14ad5b..0444565 100644 """ Kernel implementations (for eager runtime only) - should never be traced by torch.compile diff --git a/torch/distributed/_tensor/api.py b/torch/distributed/_tensor/api.py -index 068bc8b..5a57704 100644 +index 068bc8b9af..5a57704624 100644 --- a/torch/distributed/_tensor/api.py +++ b/torch/distributed/_tensor/api.py @@ -233,6 +233,7 @@ class DTensor(torch.Tensor): # pyre-ignore[13]: pyre is bad at __new__ diff --git a/requirements.txt b/requirements.txt index 82b132a..450e8fe 100644 --- a/requirements.txt +++ b/requirements.txt @@ -2,7 +2,7 @@ --pre expecttest hypothesis -pytest +pytest<8.0.0 tqdm optree accelerate