Skip to content

Commit

Permalink
#114 Added build_path function prototype.
Browse files Browse the repository at this point in the history
  • Loading branch information
ahsimb committed Apr 19, 2024
1 parent 631ed3f commit ea77009
Show file tree
Hide file tree
Showing 4 changed files with 184 additions and 34 deletions.
1 change: 1 addition & 0 deletions doc/changes/unreleased.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
Added the PathLike protocol as described in the [design document](../design/bucketpath.rst).
Extracted bucket interface into BucketLike protocol.
Implemented PathLike for buckets based on BucketLike protocol.
Added a path factory function.


## Internal
Expand Down
85 changes: 85 additions & 0 deletions exasol/bucketfs/_buckets.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@
Iterator,
Protocol,
)
import os
import errno
from pathlib import Path

import requests
from requests import HTTPError
Expand All @@ -27,6 +30,12 @@ class BucketLike(Protocol):
It is compatible with both on-premises an SaaS BucketFS systems.
"""

@property
def name(self) -> str:
"""
Returns the bucket name.
"""

@property
def files(self) -> Iterable[str]:
"""
Expand Down Expand Up @@ -223,6 +232,82 @@ def download(self, path: str, chunk_size: int = 8192) -> Iterable[ByteString]:
yield from response.iter_content(chunk_size=chunk_size)


class SaaSBucket:

def __init__(self, url: str, account_id: str, database_id: str, pat: str) -> None:
self._url = url
self._account_id = account_id
self.database_id = database_id
self._pat = pat

def name(self) -> str:
# TODO: Find out the name of the bucket in SaaS
return 'default'

def files(self) -> Iterable[str]:
"""To be provided"""
raise NotImplementedError()

def delete(self, path: str) -> None:
"""To be provided"""
raise NotImplementedError()

def upload(self, path: str, data: ByteString | BinaryIO) -> None:
"""To be provided"""
raise NotImplementedError()

def download(self, path: str, chunk_size: int = 8192) -> Iterable[ByteString]:
"""To be provided"""
raise NotImplementedError()


class MountedBucket:
"""
Implementation of the Bucket interface backed by a normal file system in read-only mode.
The targeted use case is the read-only access to the BucketFS files from a UDF.
Q. What exception should be raised when the user attempts to download non-existing file?
A.
Q. What exception should be raised if the user tries to delete or upload a file?
A.
"""

def __init__(self,
service_name: str,
bucket_name: str):
self._name = bucket_name
self.root = Path(service_name) / bucket_name

@property
def name(self) -> str:
return self._name

@property
def files(self) -> list[str]:
root_length = len(str(self.root))
if self.root != self.root.root:
root_length += 1
return [str(pth)[root_length:] for pth in self.root.rglob('*.*')]

def delete(self, path: str) -> None:
raise PermissionError('File deletion is not allowed.')

def upload(self, path: str, data: ByteString | BinaryIO) -> None:
raise PermissionError('Uploading a file is not allowed.')

def download(self, path: str, chunk_size: int) -> Iterable[ByteString]:
full_path = self.root / path
if (not full_path.exists()) or (not full_path.is_file()):
raise FileNotFoundError(errno.ENOENT, os.strerror(errno.ENOENT), str(path))
with full_path.open('rb') as f:
while True:
data = f.read(chunk_size)
if not data:
break
yield data


class MappedBucket:
"""
Wraps a bucket and provides various convenience features to it (e.g. index based access).
Expand Down
93 changes: 87 additions & 6 deletions exasol/bucketfs/_path.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,12 @@
import errno
import os
from io import IOBase
from exasol.bucketfs._buckets import BucketLike
from exasol.bucketfs._buckets import BucketLike, SaaSBucket, MountedBucket
from exasol.bucketfs._service import Service

SYSTEM_TYPE_ONPREM = 'onprem'
SYSTEM_TYPE_SAAS = 'saas'
SYSTEM_TYPE_MOUNTED = 'mounted'


class PathLike(Protocol):
Expand Down Expand Up @@ -73,7 +78,7 @@ def read(self, chunk_size: int = 8192) -> Iterable[ByteString]:
IsADirectoryError: if the pathlike object points to a directory.
"""

def write(self, data: ByteString | BinaryIO | Iterable[ByteString]):
def write(self, data: ByteString | BinaryIO | Iterable[ByteString]) -> None:
"""
Writes data to this path.
Expand All @@ -90,7 +95,7 @@ def write(self, data: ByteString | BinaryIO | Iterable[ByteString]):
NotAFileError: if the pathlike object is not a file path.
"""

def rm(self):
def rm(self) -> None:
"""
Remove this file.
Expand All @@ -102,7 +107,7 @@ def rm(self):
FileNotFoundError: If the file does not exist.
"""

def rmdir(self, recursive: bool = False):
def rmdir(self, recursive: bool = False) -> None:
"""
Removes this directory.
Expand All @@ -126,7 +131,7 @@ def joinpath(self, *path_segments) -> "PathLike":
A new pathlike object pointing the combined path.
"""

def walk(self) -> Generator[tuple["PathLike", list[str], list[str]], None, None]:
def walk(self, top_down: bool = True) -> Generator[tuple["PathLike", list[str], list[str]], None, None]:
"""
Generate the file names in a directory tree by walking the tree either top-down or bottom-up.
Expand Down Expand Up @@ -320,7 +325,7 @@ def _rmdir_recursive(self, node: _BucketFile):
if node.is_file:
self._bucket_api.delete(node.path)

def joinpath(self, *path_segments) -> "PathLike":
def joinpath(self, *path_segments) -> PathLike:
# The path segments can be of either this type or an os.PathLike.
cls = type(self)
seg_paths = [seg._path if isinstance(seg, cls) else seg for seg in path_segments]
Expand Down Expand Up @@ -376,3 +381,79 @@ def __truediv__(self, other):

def __str__(self):
return str(self._path)


def create_onprem_bucket(**kwargs) -> BucketLike:
"""
Creates an on-prem bucket using the arguments in kwargs.
Q. What exception should be thrown if an essential argument is missing?
A.
Q. Do any default username and password make any sense?
A.
"""
url = kwargs.get('url')
if url is None:
raise ValueError('BucketFS service url is not specified')
verify_ca = bool(kwargs.get('verify_ca', True))
username = kwargs.get('user') or kwargs.get('username')
password = kwargs.get('password')
if (not username) or (not password):
raise ValueError('BucketFS credentials are not provided')
bucket_name = kwargs.get('bucket', 'default')
credentials = {bucket_name: {'username': username, 'password': password}}
service = Service(url, credentials, verify_ca)
buckets = service.buckets
if bucket_name not in buckets:
raise ValueError(f'Bucket {bucket_name} does not exist.')
return buckets[bucket_name]


def create_saas_bucket(**kwargs) -> BucketLike:
"""
Creates an on-prem bucket using the arguments in kwargs.
Q. What exception should be thrown if an essential argument is missing?
A.
"""
url = kwargs.get('url', 'https://cloud.exasol.com')
account_id = kwargs.get('account_id')
if account_id is None:
raise ValueError('account_id is not specified.')
database_id = kwargs.get('database_id')
if database_id is None:
raise ValueError('database_id is not specified.')
pat = kwargs.get('pat')
if pat is None:
raise ValueError('pat (Personal Access Token) is not provided.')
return SaaSBucket(url=url, account_id=account_id, database_id=database_id, pat=pat)


def create_mounted_bucket(**kwargs) -> BucketLike:
"""
Creates a bucket mounted to a UDF
Q. Should we check that the service and bucket exist?
A.
"""
service_name = kwargs.get('service', 'bfsdefault')
bucket_name = kwargs.get('bucket', 'default')
return MountedBucket(service_name, bucket_name)


def build_path(**kwargs) -> PathLike:

system_type = kwargs.get('system', SYSTEM_TYPE_ONPREM).lower()
if system_type == SYSTEM_TYPE_ONPREM:
bucket = create_onprem_bucket(**kwargs)
elif system_type == SYSTEM_TYPE_SAAS:
bucket = create_saas_bucket(**kwargs)
elif system_type == SYSTEM_TYPE_MOUNTED:
bucket = create_mounted_bucket(**kwargs)
else:
raise ValueError(f'Unknown BucketFS system type {system_type}. '
'Valid values are: '
f'"{SYSTEM_TYPE_ONPREM}", "{SYSTEM_TYPE_SAAS}", "{SYSTEM_TYPE_MOUNTED}".')
path = kwargs.get('path', '')
return BucketPath(path, bucket)
39 changes: 11 additions & 28 deletions test/unit/conftest.py
Original file line number Diff line number Diff line change
@@ -1,41 +1,35 @@
from __future__ import annotations
from typing import Iterable, ByteString, BinaryIO
from typing import ByteString, BinaryIO
import os
from io import IOBase
import shutil
import errno
from pathlib import Path
import pytest

from exasol.bucketfs._buckets import MountedBucket

class BucketFake:

class BucketFake(MountedBucket):
"""
Implementation of the Bucket API backed by the normal file system.
"""

def __init__(self, root: str):
self.root = Path(root)
def __init__(self, base_dir: str):
super().__init__('', '')
self.root = Path(base_dir)
if not self.root.is_dir():
raise FileNotFoundError(errno.ENOENT, os.strerror(errno.ENOENT), str(root))

def _get_full_path(self, path: str | Path):
return self.root / path

@property
def files(self) -> list[str]:
root_length = len(str(self.root))
if self.root != self.root.root:
root_length += 1
return [str(pth)[root_length:] for pth in self.root.rglob('*.*')]
raise FileNotFoundError(errno.ENOENT, os.strerror(errno.ENOENT), str(self.root))

def delete(self, path: str) -> None:
try:
self._get_full_path(path).unlink(missing_ok=True)
full_path = self.root / path
full_path.unlink(missing_ok=True)
except IsADirectoryError:
pass

def upload(self, path: str, data: ByteString | BinaryIO) -> None:
full_path = self._get_full_path(path)
full_path = self.root / path
if not full_path.parent.exists():
full_path.parent.mkdir(parents=True)
with full_path.open('wb') as f:
Expand All @@ -47,17 +41,6 @@ def upload(self, path: str, data: ByteString | BinaryIO) -> None:
raise ValueError('upload_file called with unrecognised data type. '
'A valid data should be either ByteString or BinaryIO')

def download(self, path: str, chunk_size: int) -> Iterable[ByteString]:
full_path = self._get_full_path(path)
if (not full_path.exists()) or (not full_path.is_file()):
raise FileNotFoundError(errno.ENOENT, os.strerror(errno.ENOENT), str(path))
with full_path.open('rb') as f:
while True:
data = f.read(chunk_size)
if not data:
break
yield data


@pytest.fixture
def bucket_fake(tmpdir) -> BucketFake:
Expand Down

0 comments on commit ea77009

Please sign in to comment.