Skip to content

Commit

Permalink
#113 Added implementation for the SaaS Bucket.
Browse files Browse the repository at this point in the history
  • Loading branch information
ahsimb committed Apr 30, 2024
1 parent 2303a7b commit 37cb761
Show file tree
Hide file tree
Showing 8 changed files with 498 additions and 217 deletions.
1 change: 1 addition & 0 deletions doc/changes/unreleased.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
Extracted bucket interface into BucketLike protocol.
Implemented PathLike for buckets based on BucketLike protocol.
Added a path factory function.
Added implementation of the BucketLike for the SaaS BucketFS.


## Internal
Expand Down
4 changes: 4 additions & 0 deletions exasol/bucketfs/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@
from exasol.bucketfs._buckets import (
BucketLike,
Bucket,
SaaSBucket,
MountedBucket,
MappedBucket,
)
from exasol.bucketfs._convert import (
Expand All @@ -64,6 +66,8 @@
"Service",
"BucketLike",
"Bucket",
"SaaSBucket",
"MountedBucket",
"MappedBucket",
"BucketFsError",
"path",
Expand Down
94 changes: 84 additions & 10 deletions exasol/bucketfs/_buckets.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,13 @@
from requests import HTTPError
from requests.auth import HTTPBasicAuth

from exasol.saas.client.openapi.client import AuthenticatedClient as SaasAuthenticatedClient
from exasol.saas.client.openapi.models.file import File as SaasFile
from exasol.saas.client.openapi.api.files.list_files import sync as saas_list_files
from exasol.saas.client.openapi.api.files.delete_file import sync_detailed as saas_delete_file
from exasol.saas.client.openapi.api.files.upload_file import sync_detailed as saas_upload_file
from exasol.saas.client.openapi.api.files.download_file import sync_detailed as saas_download_file

from exasol.bucketfs._error import BucketFsError
from exasol.bucketfs._logging import LOGGER
from exasol.bucketfs._shared import (
Expand Down Expand Up @@ -221,6 +228,7 @@ def delete(self, path) -> None:
url = _build_url(service_url=self._service, bucket=self.name, path=path)
LOGGER.info(f"Deleting {path} from bucket {self.name}.")
response = requests.delete(url, auth=self._auth, verify=self._verify)

try:
response.raise_for_status()
except HTTPError as ex:
Expand Down Expand Up @@ -252,12 +260,16 @@ def download(self, path: str, chunk_size: int = 8192) -> Iterable[ByteString]:
yield from response.iter_content(chunk_size=chunk_size)


def _to_path_in_url(path: str) -> str:
return path.replace('/', '%2F')


class SaaSBucket:

def __init__(self, url: str, account_id: str, database_id: str, pat: str) -> None:
self._url = url
self._account_id = account_id
self.database_id = database_id
self._database_id = database_id
self._pat = pat

@property
Expand All @@ -268,24 +280,86 @@ def name(self) -> str:
def udf_path(self) -> str:
return f'/buckets/uploads/{self.name}'

@property
def files(self) -> Iterable[str]:
"""To be provided"""
raise NotImplementedError()
LOGGER.info("Retrieving the bucket listing.")
with SaasAuthenticatedClient(base_url=self._url,
token=self._pat,
raise_on_unexpected_status=True) as client:
content = saas_list_files(account_id=self._account_id,
database_id=self._database_id,
client=client)

file_list: list[str] = []

def recursive_file_collector(node: SaasFile) -> None:
if node.children:
for child in node.children:
recursive_file_collector(child)
else:
file_list.append(node.path)

for root_node in content:
recursive_file_collector(root_node)

return file_list

def delete(self, path: str) -> None:
"""To be provided"""
raise NotImplementedError()
LOGGER.info(f"Deleting {path} from the bucket.")
with SaasAuthenticatedClient(base_url=self._url,
token=self._pat,
raise_on_unexpected_status=True) as client:
saas_delete_file(account_id=self._account_id,
database_id=self._database_id,
key=_to_path_in_url(path),
client=client)

def upload(self, path: str, data: ByteString | BinaryIO) -> None:
"""To be provided"""
raise NotImplementedError()
LOGGER.info(f"Uploading {path} to the bucket.")
# Q. The service can handle any characters in the path.
# Do we need to check this path for presence of characters deemed
# invalid in the BucketLike protocol?
with SaasAuthenticatedClient(base_url=self._url,
token=self._pat,
raise_on_unexpected_status=False) as client:
response = saas_upload_file(account_id=self._account_id,
database_id=self._database_id,
key=_to_path_in_url(path),
client=client)
if response.status_code >= 400:
# Q. Is it the right type of exception?
raise RuntimeError(f'Request for a presigned url to upload the file {path} '
f'failed with the status code {response.status_code}')
upload_url = response.parsed.url.replace(r'\u0026', '&')

response = requests.put(upload_url, data=data)
response.raise_for_status()

def download(self, path: str, chunk_size: int = 8192) -> Iterable[ByteString]:
"""To be provided"""
raise NotImplementedError()
LOGGER.info(f"Downloading {path} from the bucket.")
with SaasAuthenticatedClient(base_url=self._url,
token=self._pat,
raise_on_unexpected_status=False) as client:
response = saas_download_file(account_id=self._account_id,
database_id=self._database_id,
key=_to_path_in_url(path),
client=client)
if response.status_code == 404:
raise BucketFsError("The file {path} doesn't exist in the SaaS BucketFs.")
elif response.status_code >= 400:
# Q. Is it the right type of exception?
raise RuntimeError(f'Request for a presigned url to download the file {path} '
f'failed with the status code {response.status_code}')
download_url = response.parsed.url.replace(r'\u0026', '&')

response = requests.get(download_url, stream=True)
response.raise_for_status()
for chunk in response.iter_content(chunk_size=chunk_size):
if chunk:
yield chunk

def __str__(self):
return f"SaaSBucket<{self.name} | on: {self._url}>"
return f"SaaSBucket<account id: {self._account_id}, database id: {self._database_id}>"


class MountedBucket:
Expand Down
Loading

0 comments on commit 37cb761

Please sign in to comment.