From d19dce6034f8763c4d952a9a1379dde5d9262c41 Mon Sep 17 00:00:00 2001 From: xiaohanzhangcmu Date: Tue, 5 Nov 2024 15:00:39 -0800 Subject: [PATCH 1/4] Add upper bound for prefix_int --- streaming/base/constant.py | 3 +++ streaming/base/shared/prefix.py | 8 +++++++- tests/test_shared.py | 13 +++++++++++++ 3 files changed, 23 insertions(+), 1 deletion(-) diff --git a/streaming/base/constant.py b/streaming/base/constant.py index f26f638fb..12ac5eeb3 100644 --- a/streaming/base/constant.py +++ b/streaming/base/constant.py @@ -34,3 +34,6 @@ # Default download timeout DEFAULT_TIMEOUT = 60.0 + +# Maximum prefix integers +MAX_PREFIX_INT = 1000 diff --git a/streaming/base/shared/prefix.py b/streaming/base/shared/prefix.py index b25743df2..689802c3c 100644 --- a/streaming/base/shared/prefix.py +++ b/streaming/base/shared/prefix.py @@ -16,7 +16,7 @@ import numpy as np from torch import distributed as dist -from streaming.base.constant import BARRIER_FILELOCK, CACHE_FILELOCK, LOCALS, SHM_TO_CLEAN, TICK +from streaming.base.constant import BARRIER_FILELOCK, CACHE_FILELOCK, LOCALS, SHM_TO_CLEAN, TICK, MAX_PREFIX_INT from streaming.base.shared import SharedMemory from streaming.base.world import World @@ -113,6 +113,12 @@ def _check_and_find(streams_local: list[str], streams_remote: list[Union[str, No for prefix_int in _each_prefix_int(): + if prefix_int >= MAX_PREFIX_INT: + raise ValueError(f"prefix_int exceeds {MAX_PREFIX_INT}. This may happen " + + f"when you mock os.path.exists or os.stat so the filelock " + + f"checks always returns ``True`` " + + f"you need to clean up TMPDIR.") + name = _get_path(prefix_int, shm_name) # Check if any shared memory filelocks exist for the current prefix diff --git a/tests/test_shared.py b/tests/test_shared.py index d1914617d..77379c4b0 100644 --- a/tests/test_shared.py +++ b/tests/test_shared.py @@ -190,3 +190,16 @@ def test_shared_memory_permission_error(mock_shared_memory_class: MagicMock): with patch('os.path.exists', return_value=False): next_prefix = _check_and_find(['local'], [None], LOCALS) assert next_prefix == 1 + + +@pytest.mark.usefixtures('local_remote_dir') +def test_shared_memory_infinity_exception(local_remote_dir: tuple[str, str]): + local, remote = local_remote_dir + with patch('os.path.exists', return_value=True): + with pytest.raises(ValueError, match='prefix_int exceeds .*clean up TMPDIR.'): + _, _ = get_shm_prefix(streams_local=[local], + streams_remote=[remote], + world=World.detect()) + + + From 0f57f00e38de7ce94f04cd38afe3ea4062ff2e45 Mon Sep 17 00:00:00 2001 From: xiaohanzhangcmu Date: Wed, 6 Nov 2024 09:19:33 -0800 Subject: [PATCH 2/4] update --- streaming/base/shared/prefix.py | 10 +++++----- tests/test_shared.py | 3 --- 2 files changed, 5 insertions(+), 8 deletions(-) diff --git a/streaming/base/shared/prefix.py b/streaming/base/shared/prefix.py index 689802c3c..50923053e 100644 --- a/streaming/base/shared/prefix.py +++ b/streaming/base/shared/prefix.py @@ -16,7 +16,8 @@ import numpy as np from torch import distributed as dist -from streaming.base.constant import BARRIER_FILELOCK, CACHE_FILELOCK, LOCALS, SHM_TO_CLEAN, TICK, MAX_PREFIX_INT +from streaming.base.constant import (BARRIER_FILELOCK, CACHE_FILELOCK, LOCALS, MAX_PREFIX_INT, + SHM_TO_CLEAN, TICK) from streaming.base.shared import SharedMemory from streaming.base.world import World @@ -114,10 +115,9 @@ def _check_and_find(streams_local: list[str], streams_remote: list[Union[str, No for prefix_int in _each_prefix_int(): if prefix_int >= MAX_PREFIX_INT: - raise ValueError(f"prefix_int exceeds {MAX_PREFIX_INT}. This may happen " + - f"when you mock os.path.exists or os.stat so the filelock " + - f"checks always returns ``True`` " + - f"you need to clean up TMPDIR.") + raise ValueError(f'prefix_int exceeds {MAX_PREFIX_INT}. This may happen ' + + f'when you mock os.path.exists or os.stat so the filelock ' + + f'checks always returns ``True`` ' + f'you need to clean up TMPDIR.') name = _get_path(prefix_int, shm_name) diff --git a/tests/test_shared.py b/tests/test_shared.py index 77379c4b0..d73c6c811 100644 --- a/tests/test_shared.py +++ b/tests/test_shared.py @@ -200,6 +200,3 @@ def test_shared_memory_infinity_exception(local_remote_dir: tuple[str, str]): _, _ = get_shm_prefix(streams_local=[local], streams_remote=[remote], world=World.detect()) - - - From 3d9815ec5bff85cefb7d672fbd80444ad4a09e2e Mon Sep 17 00:00:00 2001 From: Xiaohan Zhang Date: Tue, 12 Nov 2024 22:59:34 -0800 Subject: [PATCH 3/4] Update streaming/base/shared/prefix.py Co-authored-by: Saaketh Narayan --- streaming/base/shared/prefix.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/streaming/base/shared/prefix.py b/streaming/base/shared/prefix.py index 50923053e..dda6a7038 100644 --- a/streaming/base/shared/prefix.py +++ b/streaming/base/shared/prefix.py @@ -116,7 +116,7 @@ def _check_and_find(streams_local: list[str], streams_remote: list[Union[str, No if prefix_int >= MAX_PREFIX_INT: raise ValueError(f'prefix_int exceeds {MAX_PREFIX_INT}. This may happen ' + - f'when you mock os.path.exists or os.stat so the filelock ' + + f'when you mock os.path.exists or os.stat functions so the filelock ' + f'checks always returns ``True`` ' + f'you need to clean up TMPDIR.') name = _get_path(prefix_int, shm_name) From 328b8ce996d4f8ea4ea4cec22f9ff54be8dc928a Mon Sep 17 00:00:00 2001 From: Xiaohan Zhang Date: Tue, 12 Nov 2024 22:59:40 -0800 Subject: [PATCH 4/4] Update streaming/base/shared/prefix.py Co-authored-by: Saaketh Narayan --- streaming/base/shared/prefix.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/streaming/base/shared/prefix.py b/streaming/base/shared/prefix.py index dda6a7038..b6b03d865 100644 --- a/streaming/base/shared/prefix.py +++ b/streaming/base/shared/prefix.py @@ -117,7 +117,7 @@ def _check_and_find(streams_local: list[str], streams_remote: list[Union[str, No if prefix_int >= MAX_PREFIX_INT: raise ValueError(f'prefix_int exceeds {MAX_PREFIX_INT}. This may happen ' + f'when you mock os.path.exists or os.stat functions so the filelock ' + - f'checks always returns ``True`` ' + f'you need to clean up TMPDIR.') + f'checks always returns `True` ' + f'you need to clean up TMPDIR.') name = _get_path(prefix_int, shm_name)