diff --git a/doc/changes/unreleased.md b/doc/changes/unreleased.md index eaea2280..2e2efb37 100644 --- a/doc/changes/unreleased.md +++ b/doc/changes/unreleased.md @@ -26,6 +26,7 @@ Added the PathLike protocol as described in the [design document](../design/bucketpath.rst). Extracted bucket interface into BucketLike protocol. Implemented PathLike for buckets based on BucketLike protocol. + Added a path factory function. ## Internal diff --git a/exasol/bucketfs/_buckets.py b/exasol/bucketfs/_buckets.py index 7aaba344..2f44eec3 100644 --- a/exasol/bucketfs/_buckets.py +++ b/exasol/bucketfs/_buckets.py @@ -7,6 +7,9 @@ Iterator, Protocol, ) +import os +import errno +from pathlib import Path import requests from requests import HTTPError @@ -27,6 +30,12 @@ class BucketLike(Protocol): It is compatible with both on-premises an SaaS BucketFS systems. """ + @property + def name(self) -> str: + """ + Returns the bucket name. + """ + @property def files(self) -> Iterable[str]: """ @@ -223,6 +232,82 @@ def download(self, path: str, chunk_size: int = 8192) -> Iterable[ByteString]: yield from response.iter_content(chunk_size=chunk_size) +class SaaSBucket: + + def __init__(self, url: str, account_id: str, database_id: str, pat: str) -> None: + self._url = url + self._account_id = account_id + self.database_id = database_id + self._pat = pat + + def name(self) -> str: + # TODO: Find out the name of the bucket in SaaS + return 'default' + + def files(self) -> Iterable[str]: + """To be provided""" + raise NotImplementedError() + + def delete(self, path: str) -> None: + """To be provided""" + raise NotImplementedError() + + def upload(self, path: str, data: ByteString | BinaryIO) -> None: + """To be provided""" + raise NotImplementedError() + + def download(self, path: str, chunk_size: int = 8192) -> Iterable[ByteString]: + """To be provided""" + raise NotImplementedError() + + +class MountedBucket: + """ + Implementation of the Bucket interface backed by a normal file system in read-only mode. + The targeted use case is the read-only access to the BucketFS files from a UDF. + + Q. What exception should be raised when the user attempts to download non-existing file? + A. + + Q. What exception should be raised if the user tries to delete or upload a file? + A. + """ + + def __init__(self, + service_name: str, + bucket_name: str): + self._name = bucket_name + self.root = Path(service_name) / bucket_name + + @property + def name(self) -> str: + return self._name + + @property + def files(self) -> list[str]: + root_length = len(str(self.root)) + if self.root != self.root.root: + root_length += 1 + return [str(pth)[root_length:] for pth in self.root.rglob('*.*')] + + def delete(self, path: str) -> None: + raise PermissionError('File deletion is not allowed.') + + def upload(self, path: str, data: ByteString | BinaryIO) -> None: + raise PermissionError('Uploading a file is not allowed.') + + def download(self, path: str, chunk_size: int) -> Iterable[ByteString]: + full_path = self.root / path + if (not full_path.exists()) or (not full_path.is_file()): + raise FileNotFoundError(errno.ENOENT, os.strerror(errno.ENOENT), str(path)) + with full_path.open('rb') as f: + while True: + data = f.read(chunk_size) + if not data: + break + yield data + + class MappedBucket: """ Wraps a bucket and provides various convenience features to it (e.g. index based access). diff --git a/exasol/bucketfs/_path.py b/exasol/bucketfs/_path.py index 4f60b65b..5e0d788c 100644 --- a/exasol/bucketfs/_path.py +++ b/exasol/bucketfs/_path.py @@ -4,7 +4,12 @@ import errno import os from io import IOBase -from exasol.bucketfs._buckets import BucketLike +from exasol.bucketfs._buckets import BucketLike, SaaSBucket, MountedBucket +from exasol.bucketfs._service import Service + +SYSTEM_TYPE_ONPREM = 'onprem' +SYSTEM_TYPE_SAAS = 'saas' +SYSTEM_TYPE_MOUNTED = 'mounted' class PathLike(Protocol): @@ -73,7 +78,7 @@ def read(self, chunk_size: int = 8192) -> Iterable[ByteString]: IsADirectoryError: if the pathlike object points to a directory. """ - def write(self, data: ByteString | BinaryIO | Iterable[ByteString]): + def write(self, data: ByteString | BinaryIO | Iterable[ByteString]) -> None: """ Writes data to this path. @@ -90,7 +95,7 @@ def write(self, data: ByteString | BinaryIO | Iterable[ByteString]): NotAFileError: if the pathlike object is not a file path. """ - def rm(self): + def rm(self) -> None: """ Remove this file. @@ -102,7 +107,7 @@ def rm(self): FileNotFoundError: If the file does not exist. """ - def rmdir(self, recursive: bool = False): + def rmdir(self, recursive: bool = False) -> None: """ Removes this directory. @@ -126,7 +131,7 @@ def joinpath(self, *path_segments) -> "PathLike": A new pathlike object pointing the combined path. """ - def walk(self) -> Generator[tuple["PathLike", list[str], list[str]], None, None]: + def walk(self, top_down: bool = True) -> Generator[tuple["PathLike", list[str], list[str]], None, None]: """ Generate the file names in a directory tree by walking the tree either top-down or bottom-up. @@ -320,7 +325,7 @@ def _rmdir_recursive(self, node: _BucketFile): if node.is_file: self._bucket_api.delete(node.path) - def joinpath(self, *path_segments) -> "PathLike": + def joinpath(self, *path_segments) -> PathLike: # The path segments can be of either this type or an os.PathLike. cls = type(self) seg_paths = [seg._path if isinstance(seg, cls) else seg for seg in path_segments] @@ -376,3 +381,79 @@ def __truediv__(self, other): def __str__(self): return str(self._path) + + +def create_onprem_bucket(**kwargs) -> BucketLike: + """ + Creates an on-prem bucket using the arguments in kwargs. + + Q. What exception should be thrown if an essential argument is missing? + A. + + Q. Do any default username and password make any sense? + A. + """ + url = kwargs.get('url') + if url is None: + raise ValueError('BucketFS service url is not specified') + verify_ca = bool(kwargs.get('verify_ca', True)) + username = kwargs.get('user') or kwargs.get('username') + password = kwargs.get('password') + if (not username) or (not password): + raise ValueError('BucketFS credentials are not provided') + bucket_name = kwargs.get('bucket', 'default') + credentials = {bucket_name: {'username': username, 'password': password}} + service = Service(url, credentials, verify_ca) + buckets = service.buckets + if bucket_name not in buckets: + raise ValueError(f'Bucket {bucket_name} does not exist.') + return buckets[bucket_name] + + +def create_saas_bucket(**kwargs) -> BucketLike: + """ + Creates an on-prem bucket using the arguments in kwargs. + + Q. What exception should be thrown if an essential argument is missing? + A. + """ + url = kwargs.get('url', 'https://cloud.exasol.com') + account_id = kwargs.get('account_id') + if account_id is None: + raise ValueError('account_id is not specified.') + database_id = kwargs.get('database_id') + if database_id is None: + raise ValueError('database_id is not specified.') + pat = kwargs.get('pat') + if pat is None: + raise ValueError('pat (Personal Access Token) is not provided.') + return SaaSBucket(url=url, account_id=account_id, database_id=database_id, pat=pat) + + +def create_mounted_bucket(**kwargs) -> BucketLike: + """ + Creates a bucket mounted to a UDF + + Q. Should we check that the service and bucket exist? + A. + """ + service_name = kwargs.get('service', 'bfsdefault') + bucket_name = kwargs.get('bucket', 'default') + return MountedBucket(service_name, bucket_name) + + +def build_path(**kwargs) -> PathLike: + + system_type = kwargs.get('system', SYSTEM_TYPE_ONPREM).lower() + if system_type == SYSTEM_TYPE_ONPREM: + bucket = create_onprem_bucket(**kwargs) + elif system_type == SYSTEM_TYPE_SAAS: + bucket = create_saas_bucket(**kwargs) + elif system_type == SYSTEM_TYPE_MOUNTED: + bucket = create_mounted_bucket(**kwargs) + else: + raise ValueError(f'Unknown BucketFS system type {system_type}. ' + 'Valid values are: ' + f'"{SYSTEM_TYPE_ONPREM}", "{SYSTEM_TYPE_SAAS}", "{SYSTEM_TYPE_MOUNTED}".') + path = kwargs.get('path', '') + return BucketPath(path, bucket) diff --git a/test/unit/conftest.py b/test/unit/conftest.py index 797e3e1e..1dd31912 100644 --- a/test/unit/conftest.py +++ b/test/unit/conftest.py @@ -1,5 +1,5 @@ from __future__ import annotations -from typing import Iterable, ByteString, BinaryIO +from typing import ByteString, BinaryIO import os from io import IOBase import shutil @@ -7,35 +7,29 @@ from pathlib import Path import pytest +from exasol.bucketfs._buckets import MountedBucket -class BucketFake: + +class BucketFake(MountedBucket): """ Implementation of the Bucket API backed by the normal file system. """ - def __init__(self, root: str): - self.root = Path(root) + def __init__(self, base_dir: str): + super().__init__('', '') + self.root = Path(base_dir) if not self.root.is_dir(): - raise FileNotFoundError(errno.ENOENT, os.strerror(errno.ENOENT), str(root)) - - def _get_full_path(self, path: str | Path): - return self.root / path - - @property - def files(self) -> list[str]: - root_length = len(str(self.root)) - if self.root != self.root.root: - root_length += 1 - return [str(pth)[root_length:] for pth in self.root.rglob('*.*')] + raise FileNotFoundError(errno.ENOENT, os.strerror(errno.ENOENT), str(self.root)) def delete(self, path: str) -> None: try: - self._get_full_path(path).unlink(missing_ok=True) + full_path = self.root / path + full_path.unlink(missing_ok=True) except IsADirectoryError: pass def upload(self, path: str, data: ByteString | BinaryIO) -> None: - full_path = self._get_full_path(path) + full_path = self.root / path if not full_path.parent.exists(): full_path.parent.mkdir(parents=True) with full_path.open('wb') as f: @@ -47,17 +41,6 @@ def upload(self, path: str, data: ByteString | BinaryIO) -> None: raise ValueError('upload_file called with unrecognised data type. ' 'A valid data should be either ByteString or BinaryIO') - def download(self, path: str, chunk_size: int) -> Iterable[ByteString]: - full_path = self._get_full_path(path) - if (not full_path.exists()) or (not full_path.is_file()): - raise FileNotFoundError(errno.ENOENT, os.strerror(errno.ENOENT), str(path)) - with full_path.open('rb') as f: - while True: - data = f.read(chunk_size) - if not data: - break - yield data - @pytest.fixture def bucket_fake(tmpdir) -> BucketFake: