diff --git a/llmfoundry/command_utils/data_prep/convert_delta_to_json.py b/llmfoundry/command_utils/data_prep/convert_delta_to_json.py index acf7086a12..1a0e575850 100644 --- a/llmfoundry/command_utils/data_prep/convert_delta_to_json.py +++ b/llmfoundry/command_utils/data_prep/convert_delta_to_json.py @@ -706,6 +706,14 @@ def fetch_DT( dbsql, ) except (grpc.RpcError, spark_errors.SparkConnectGrpcException) as e: + if isinstance( + e, + spark_errors.SparkConnectGrpcException, + ) and 'is not Shared or Single User Cluster' in str(e): + raise FaultyDataPrepCluster( + message= + f'The cluster you have provided: {cluster_id} does not have data governance enabled. Please use a cluster with a data security mode other than NONE. {e}', + ) from e if isinstance( e, spark_errors.SparkConnectGrpcException, diff --git a/tests/a_scripts/data_prep/test_convert_delta_to_json.py b/tests/a_scripts/data_prep/test_convert_delta_to_json.py index bb5b3f93d1..95610f00b6 100644 --- a/tests/a_scripts/data_prep/test_convert_delta_to_json.py +++ b/tests/a_scripts/data_prep/test_convert_delta_to_json.py @@ -750,3 +750,49 @@ def test_fetch_malformed_table_error( # Verify that get_total_rows was called mock_gtr.assert_called_once() + + @patch( + 'llmfoundry.command_utils.data_prep.convert_delta_to_json.fetch', + ) + @patch( + 'llmfoundry.command_utils.data_prep.convert_delta_to_json.validate_and_get_cluster_info', + ) + def test_non_shared_single_user_cluster_error( + self, + mock_validate_cluster_info: MagicMock, + mock_fetch: MagicMock, + ): + mock_validate_cluster_info.return_value = ('dbconnect', None, None) + + exception_message = 'Cluster is not Shared or Single User Cluster' + spark_exception = SparkConnectGrpcException(exception_message) + + mock_fetch.side_effect = spark_exception + + # Define test inputs + delta_table_name = 'test_table' + json_output_folder = '/tmp/to/jsonl' + http_path = None + cluster_id = 'test-cluster-id' + use_serverless = False + DATABRICKS_HOST = 'https://test-host' + DATABRICKS_TOKEN = 'test-token' + + # Act & Assert + with self.assertRaises(FaultyDataPrepCluster) as context: + fetch_DT( + delta_table_name=delta_table_name, + json_output_folder=json_output_folder, + http_path=http_path, + cluster_id=cluster_id, + use_serverless=use_serverless, + DATABRICKS_HOST=DATABRICKS_HOST, + DATABRICKS_TOKEN=DATABRICKS_TOKEN, + ) + + self.assertIn( + f'The cluster you have provided: {cluster_id} does not have data governance enabled. Please use a cluster with a data security mode other than NONE.', + str(context.exception), + ) + + mock_fetch.assert_called()