From 8b9e18e5428681f98591bd2d8492866efc47d4d4 Mon Sep 17 00:00:00 2001 From: Irene Dea Date: Fri, 2 Feb 2024 17:03:25 -0800 Subject: [PATCH] Modify UCObjectStore.list_objects to lists all files recursively (#2959) --- .../utils/object_store/uc_object_store.py | 39 +++++++++++----- .../object_store/test_uc_object_store.py | 46 +++++++++++++++++-- 2 files changed, 71 insertions(+), 14 deletions(-) diff --git a/composer/utils/object_store/uc_object_store.py b/composer/utils/object_store/uc_object_store.py index 2d9a467d76..2578e7151b 100644 --- a/composer/utils/object_store/uc_object_store.py +++ b/composer/utils/object_store/uc_object_store.py @@ -219,10 +219,6 @@ def get_object_size(self, object_name: str) -> int: def list_objects(self, prefix: Optional[str]) -> List[str]: """List all objects in the object store with the given prefix. - .. note:: - - This function removes the directories from the returned list. - Args: prefix (str): The prefix to search for. @@ -234,14 +230,35 @@ def list_objects(self, prefix: Optional[str]) -> List[str]: from databricks.sdk.core import DatabricksError try: - data = json.dumps({'path': self._get_object_path(prefix)}) # NOTE: This API is in preview and should not be directly used outside of this instance - resp = self.client.api_client.do(method='GET', - path=self._UC_VOLUME_LIST_API_ENDPOINT, - data=data, - headers={'Source': 'mosaicml/composer'}) - assert isinstance(resp, dict) - return [f['path'] for f in resp.get('files', []) if not f['is_dir']] + logging.warn('UCObjectStore.list_objects is experimental.') + + # Iteratively get all UC Volume files with `prefix`. + stack = [prefix] + all_files = [] + + while len(stack) > 0: + current_path = stack.pop() + + # Note: Databricks SDK handles HTTP errors and retries. + # See https://github.com/databricks/databricks-sdk-py/blob/v0.18.0/databricks/sdk/core.py#L125 and + # https://github.com/databricks/databricks-sdk-py/blob/v0.18.0/databricks/sdk/retries.py#L33 . + resp = self.client.api_client.do(method='GET', + path=self._UC_VOLUME_LIST_API_ENDPOINT, + data=json.dumps({'path': self._get_object_path(current_path)}), + headers={'Source': 'mosaicml/composer'}) + + assert isinstance(resp, dict), 'Response is not a dictionary' + + for f in resp.get('files', []): + fpath = f['path'] + if f['is_dir']: + stack.append(fpath) + else: + all_files.append(fpath) + + return all_files + except DatabricksError as e: _wrap_errors(self.get_uri(prefix), e) return [] diff --git a/tests/utils/object_store/test_uc_object_store.py b/tests/utils/object_store/test_uc_object_store.py index 1f84143186..0ca3dbbbd3 100644 --- a/tests/utils/object_store/test_uc_object_store.py +++ b/tests/utils/object_store/test_uc_object_store.py @@ -160,6 +160,49 @@ def generate_dummy_file(_): raise NotImplementedError(f'Test for result={result} is not implemented.') +def test_list_objects_nested_folders(ws_client, uc_object_store): + expected_files = [ + '/Volumes/catalog/volume/schema/path/to/folder/file1.txt', + '/Volumes/catalog/volume/schema/path/to/folder/file2.txt', + '/Volumes/catalog/volume/schema/path/to/folder/subdir/file1.txt', + '/Volumes/catalog/volume/schema/path/to/folder/subdir/file2.txt', + ] + uc_list_api_responses = [{ + 'files': [{ + 'path': '/Volumes/catalog/volume/schema/path/to/folder/file1.txt', + 'is_dir': False + }, { + 'path': '/Volumes/catalog/volume/schema/path/to/folder/file2.txt', + 'is_dir': False + }, { + 'path': '/Volumes/catalog/volume/schema/path/to/folder/subdir', + 'is_dir': True + }] + }, { + 'files': [{ + 'path': '/Volumes/catalog/volume/schema/path/to/folder/subdir/file1.txt', + 'is_dir': False + }, { + 'path': '/Volumes/catalog/volume/schema/path/to/folder/subdir/file2.txt', + 'is_dir': False + }] + }] + + prefix = 'Volumes/catalog/schema/volume/path/to/folder' + + ws_client.api_client.do = MagicMock(side_effect=[uc_list_api_responses[0], uc_list_api_responses[1]]) + actual_files = uc_object_store.list_objects(prefix=prefix) + + assert actual_files == expected_files + + ws_client.api_client.do.assert_called_with(method='GET', + path=uc_object_store._UC_VOLUME_LIST_API_ENDPOINT, + data='{"path": "/Volumes/catalog/volume/schema/path/to/folder/subdir"}', + headers={'Source': 'mosaicml/composer'}) + + assert ws_client.api_client.do.call_count == 2 + + @pytest.mark.parametrize('result', ['success', 'prefix_none', 'not_found', 'error']) def test_list_objects(ws_client, uc_object_store, result): expected_files = [ @@ -173,9 +216,6 @@ def test_list_objects(ws_client, uc_object_store, result): }, { 'path': '/Volumes/catalog/volume/schema/path/to/folder/file2.txt', 'is_dir': False - }, { - 'path': '/Volumes/catalog/volume/schema/path/to/folder/samples/', - 'is_dir': True }] }