Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Small improvements #17

Merged
merged 2 commits into from
Sep 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion dbxio/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,4 @@
from dbxio.utils import * # noqa: F403
from dbxio.volume import * # noqa: F403

__version__ = '0.4.3' # single source of truth
__version__ = '0.4.4' # single source of truth
15 changes: 15 additions & 0 deletions dbxio/core/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@
from dbxio.sql.results import _FutureBaseResult
from dbxio.sql.sql_driver import SQLDriver, get_sql_driver
from dbxio.utils.databricks import ClusterType
from dbxio.utils.logging import get_logger

logger = get_logger()


@attrs.define(slots=True)
Expand Down Expand Up @@ -41,6 +44,18 @@ class DbxIOClient:

session_configuration: Optional[Dict[str, Any]] = None

def __attrs_post_init__(self):
"""
This method is used only for logging
"""
logger.info(
'Client is created with the following settings: %s; cluster settings: http-path: %s, server-hostname: %s',
self.settings,
self._cluster_credentials.http_path,
self._cluster_credentials.server_hostname,
)
logger.info('Auth provider: %s', self.credential_provider.__class__.__name__)

def clear_cache(self):
self.credential_provider.clear_cache()

Expand Down
7 changes: 4 additions & 3 deletions dbxio/delta/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,9 +102,10 @@ def safe_table_identifier(self):
"""
Returns table identifier with special characters replaced with underscores and wrapped in backticks.
"""
trunc_ti = self.table_identifier.translate(
str.maketrans('!"#$%&\'()*+,/:;<=>?@[\\]^`{|}~', '_____________________________')
)
translations = {ord(char): ord('_') for char in '!"#$%&\'()*+,/:;<=>?@[\\]^{|}~'}
translations[ord('`')] = None

trunc_ti = self.table_identifier.translate(translations)
return '.'.join([f'`{ti_part}`' for ti_part in trunc_ti.split('.')])

@property
Expand Down
13 changes: 13 additions & 0 deletions dbxio/sql/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -662,6 +662,19 @@ def deserialize(self, obj):
raise DbxIOTypeError(f'Cannot cast value `{obj}` to JSON')


class VariantType(BaseType):
def fit(self, obj) -> bool:
return True

@nullable
def serialize(self, obj, unsafe: bool = False):
return str(obj)

@nullable
def deserialize(self, obj):
return obj


class GroupsPrimaryDataTypes:
INTEGER = (IntType, BigIntType, DecimalType)
FLOAT = (FloatType, DoubleType)
Expand Down
9 changes: 8 additions & 1 deletion dbxio/utils/retries.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
import logging

from databricks.sdk.errors.platform import PermissionDenied
from tenacity import RetryCallState, retry, retry_if_exception_type, stop_after_attempt, wait_exponential
from tenacity import RetryCallState, after_log, retry, retry_if_exception_type, stop_after_attempt, wait_exponential

from dbxio.utils.logging import get_logger

logger = get_logger()


def _clear_client_cache(call_state: RetryCallState) -> None:
Expand Down Expand Up @@ -28,4 +34,5 @@ def _clear_client_cache(call_state: RetryCallState) -> None:
retry=retry_if_exception_type((PermissionDenied,)),
reraise=True,
before=_clear_client_cache,
after=after_log(logger, log_level=logging.INFO),
)
17 changes: 10 additions & 7 deletions dbxio/volume/volume_commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from typing import TYPE_CHECKING, Optional, Union

import attrs
from databricks.sdk.errors.platform import NotFound
from databricks.sdk.errors.platform import NotFound, ResourceDoesNotExist
from databricks.sdk.service.catalog import VolumeType

from dbxio.blobs.block_upload import upload_file
Expand Down Expand Up @@ -98,7 +98,7 @@ def is_external(self):

@dbxio_retry
def create_volume(volume: Volume, client: 'DbxIOClient', skip_if_exists: bool = True) -> None:
if skip_if_exists and _exists_volume(volume.catalog, volume.schema, volume.name, client):
if skip_if_exists and exists_volume(volume.catalog, volume.schema, volume.name, client):
logger.info(f'Volume {volume.safe_full_name} already exists, skipping creation.')
return

Expand All @@ -113,7 +113,7 @@ def create_volume(volume: Volume, client: 'DbxIOClient', skip_if_exists: bool =


@dbxio_retry
def _exists_volume(catalog_name: str, schema_name: str, volume_name: str, client: 'DbxIOClient') -> bool:
def exists_volume(catalog_name: str, schema_name: str, volume_name: str, client: 'DbxIOClient') -> bool:
for v in client.workspace_api.volumes.list(catalog_name=catalog_name, schema_name=schema_name):
if v.name == volume_name:
return True
Expand Down Expand Up @@ -223,7 +223,7 @@ def _write_external_volume(
create_volume_if_not_exists: bool,
force: bool,
):
volume_exists = _exists_volume(catalog_name, schema_name, volume_name, client)
volume_exists = exists_volume(catalog_name, schema_name, volume_name, client)
if volume_exists:
volume = Volume.from_url(get_volume_url(catalog_name, schema_name, volume_name), client=client)
assert volume.storage_location, f'External volume must have a storage location, got {volume=}'
Expand Down Expand Up @@ -454,7 +454,7 @@ def get_comment_on_volume(volume: Volume, client: 'DbxIOClient') -> Union[str, N


@dbxio_retry
def drop_volume(volume: Volume, client: 'DbxIOClient') -> None:
def drop_volume(volume: Volume, client: 'DbxIOClient', force: bool = False) -> None:
"""
Deletes a volume in Databricks.
If the volume is external, it will also delete all blobs in the storage location.
Expand All @@ -467,6 +467,9 @@ def drop_volume(volume: Volume, client: 'DbxIOClient') -> None:
logger.debug(f'Blob {blob.name} was successfully deleted.')

logger.info(f'External volume {volume.safe_full_name} was successfully cleaned up.')

client.workspace_api.volumes.delete(volume.full_name)
try:
client.workspace_api.volumes.delete(volume.full_name)
except ResourceDoesNotExist as e:
if not force:
raise e
logger.info(f'Volume {volume.safe_full_name} was successfully dropped.')
9 changes: 9 additions & 0 deletions tests/test_primary_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -367,3 +367,12 @@ def test_json():
assert types.JSONType().deserialize('1') == 1
assert types.JSONType().deserialize('"a"') == 'a'
assert types.JSONType().deserialize('NULL') is None


def test_variant():
assert types.VariantType().fit(1)
assert types.VariantType().fit('a')
assert types.VariantType().fit(None)
assert types.VariantType().fit(datetime.datetime(2014, 1, 1, 0, 0, 0))
assert types.VariantType().fit([1, 2, 3])
assert types.VariantType().fit({'a': 1})
5 changes: 5 additions & 0 deletions tests/test_table.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,3 +48,8 @@ def test_safe_table_identifier(self):
dbxio.Table(table_identifier='database!.schema.table+').safe_table_identifier,
'`database_`.`schema`.`table_`',
)

self.assertEqual(
dbxio.Table(table_identifier='`database!`.`schema`.`table+`').safe_table_identifier,
'`database_`.`schema`.`table_`',
)
20 changes: 16 additions & 4 deletions tests/test_volume_commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from unittest.mock import patch

import pytest
from databricks.sdk.errors.platform import ResourceDoesNotExist
from databricks.sdk.service.catalog import VolumesAPI, VolumeType
from databricks.sdk.service.files import DirectoryEntry, FilesAPI

Expand Down Expand Up @@ -101,8 +102,8 @@ def _mock_download_blob_tree(object_storage_client, local_path: Path, prefix_pat


@patch.object(VolumesAPI, 'create', return_value=None)
@patch('dbxio.volume.volume_commands._exists_volume', return_value=False)
def test_create_volume(mock_exists_volume, mock_volume_create):
@patch('dbxio.volume.volume_commands.exists_volume', return_value=False)
def test_create_volume(mockexists_volume, mock_volume_create):
volume = Volume(catalog='catalog', schema='schema', name='volume')
create_volume(volume, client)
mock_volume_create.assert_called_once_with(
Expand All @@ -115,8 +116,8 @@ def test_create_volume(mock_exists_volume, mock_volume_create):


@patch.object(VolumesAPI, 'create', return_value=None)
@patch('dbxio.volume.volume_commands._exists_volume', return_value=True)
def test_create_volume__volume_exists(mock_exists_volume, mock_volume_create):
@patch('dbxio.volume.volume_commands.exists_volume', return_value=True)
def test_create_volume__volume_exists(mockexists_volume, mock_volume_create):
volume = Volume(catalog='catalog', schema='schema', name='volume')
create_volume(volume, client)
mock_volume_create.assert_not_called()
Expand Down Expand Up @@ -407,3 +408,14 @@ def test_drop_volume__external(
mock_volume_delete.assert_called_once_with(volume.full_name)

assert mock_try_delete_blob.call_count == 2


@patch.object(VolumesAPI, 'delete', side_effect=ResourceDoesNotExist())
def test_drop_volume_force(mock_volume_delete):
volume = Volume(
catalog='catalog',
schema='schema',
name='volume',
volume_type=VolumeType.MANAGED,
)
drop_volume(volume, client, force=True)