Skip to content

Commit

Permalink
#114 Added path builder function (#115)
Browse files Browse the repository at this point in the history
* #114 Added build_path function prototype.

* #114 Added the first integration test for the path builder.

* #114 Added two more integration tests for the path builder.

* #114 Addressed review comments.

* #114 Tried to fix the integration test error.

* #114 Used specific exceptions.

* #114 Understanding the lock on deleted files.

* #114 Understanding the lock on deleted files.

* #114 Fixed the integration tests and improved the build_path documentation.

* #114 Fixed the integration tests.

* #114 Added the service_name to buckets; made MountedBucket R/W

* #114 Added the service_name to buckets; made MountedBucket R/W

* #114 Added the service_name to buckets; made MountedBucket R/W

* #114 Added the service_name to buckets; made MountedBucket R/W

* #114 Made path a sub-module

* #114 Added as_udf_path method

* #114 Fixed issues found in a review
  • Loading branch information
ahsimb authored Apr 24, 2024
1 parent 631ed3f commit 2303a7b
Show file tree
Hide file tree
Showing 10 changed files with 453 additions and 94 deletions.
1 change: 1 addition & 0 deletions doc/changes/unreleased.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
Added the PathLike protocol as described in the [design document](../design/bucketpath.rst).
Extracted bucket interface into BucketLike protocol.
Implemented PathLike for buckets based on BucketLike protocol.
Added a path factory function.


## Internal
Expand Down
4 changes: 4 additions & 0 deletions exasol/bucketfs/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
from __future__ import annotations

from exasol.bucketfs._buckets import (
BucketLike,
Bucket,
MappedBucket,
)
Expand All @@ -55,14 +56,17 @@
as_hash,
as_string,
)
from exasol.bucketfs import _path as path
from exasol.bucketfs._error import BucketFsError
from exasol.bucketfs._service import Service

__all__ = [
"Service",
"BucketLike",
"Bucket",
"MappedBucket",
"BucketFsError",
"path",
"as_bytes",
"as_string",
"as_file",
Expand Down
140 changes: 140 additions & 0 deletions exasol/bucketfs/_buckets.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,13 @@
Iterable,
Iterator,
Protocol,
Optional,
)
import os
from io import IOBase
import shutil
import errno
from pathlib import Path

import requests
from requests import HTTPError
Expand All @@ -27,6 +33,18 @@ class BucketLike(Protocol):
It is compatible with both on-premises an SaaS BucketFS systems.
"""

@property
def name(self) -> str:
"""
Returns the bucket name.
"""

@property
def udf_path(self) -> str:
"""
Returns the path to the bucket's base directory, as it's seen from a UDF.
"""

@property
def files(self) -> Iterable[str]:
"""
Expand Down Expand Up @@ -110,6 +128,7 @@ def __init__(
username: str,
password: str,
verify: bool | str = True,
service_name: Optional[str] = None
):
"""
Create a new bucket instance.
Expand All @@ -127,12 +146,15 @@ def __init__(
Either a boolean, in which case it controls whether we verify
the server's TLS certificate, or a string, in which case it must be a path
to a CA bundle to use. Defaults to ``True``.
service_name:
Optional name of the BucketFS service.
"""
self._name = name
self._service = _parse_service_url(service)
self._username = username
self._password = password
self._verify = verify
self._service_name = service_name

def __str__(self):
return f"Bucket<{self.name} | on: {self._service}>"
Expand All @@ -141,6 +163,13 @@ def __str__(self):
def name(self) -> str:
return self._name

@property
def udf_path(self) -> str:
if self._service_name is None:
raise BucketFsError('The bucket cannot provide its udf_path '
'as the service name is unknown.')
return f'/buckets/{self._service_name}/{self._name}'

@property
def _auth(self) -> HTTPBasicAuth:
return HTTPBasicAuth(username=self._username, password=self._password)
Expand Down Expand Up @@ -223,6 +252,117 @@ def download(self, path: str, chunk_size: int = 8192) -> Iterable[ByteString]:
yield from response.iter_content(chunk_size=chunk_size)


class SaaSBucket:

def __init__(self, url: str, account_id: str, database_id: str, pat: str) -> None:
self._url = url
self._account_id = account_id
self.database_id = database_id
self._pat = pat

@property
def name(self) -> str:
return 'default'

@property
def udf_path(self) -> str:
return f'/buckets/uploads/{self.name}'

def files(self) -> Iterable[str]:
"""To be provided"""
raise NotImplementedError()

def delete(self, path: str) -> None:
"""To be provided"""
raise NotImplementedError()

def upload(self, path: str, data: ByteString | BinaryIO) -> None:
"""To be provided"""
raise NotImplementedError()

def download(self, path: str, chunk_size: int = 8192) -> Iterable[ByteString]:
"""To be provided"""
raise NotImplementedError()

def __str__(self):
return f"SaaSBucket<{self.name} | on: {self._url}>"


class MountedBucket:
"""
Implementation of the Bucket interface backed by a normal file system.
The targeted use case is the access to the BucketFS files from a UDF.
Arguments:
service_name:
Name of the BucketFS service (not a service url). Defaults to 'bfsdefault'.
bucket_name:
Name of the bucket. Defaults to 'default'.
base_path:
Instead of specifying the names of the service and the bucket, one can provide
a full path to the root directory. This can be a useful option for testing when
the backend is a local file system.
If this parameter is not provided the root directory is set to
buckets/<service_name>/<bucket_name>.
"""

def __init__(self,
service_name: str = 'bfsdefault',
bucket_name: str = 'default',
base_path: Optional[str] = None):
self._name = bucket_name
if base_path:
self.root = Path(base_path)
else:
self.root = Path('/buckets') / service_name / bucket_name

@property
def name(self) -> str:
return self._name

@property
def udf_path(self) -> str:
return str(self.root)

@property
def files(self) -> list[str]:
return [str(pth.relative_to(self.root)) for pth in self.root.rglob('*.*')]

def delete(self, path: str) -> None:
try:
full_path = self.root / path
full_path.unlink(missing_ok=True)
except IsADirectoryError:
pass

def upload(self, path: str, data: ByteString | BinaryIO) -> None:
full_path = self.root / path
if not full_path.parent.exists():
full_path.parent.mkdir(parents=True)
with full_path.open('wb') as f:
if isinstance(data, IOBase):
shutil.copyfileobj(data, f)
elif isinstance(data, ByteString):
f.write(data)
else:
raise ValueError('upload called with unrecognised data type. '
'A valid data should be either ByteString or BinaryIO')

def download(self, path: str, chunk_size: int) -> Iterable[ByteString]:
full_path = self.root / path
if (not full_path.exists()) or (not full_path.is_file()):
raise FileNotFoundError(errno.ENOENT, os.strerror(errno.ENOENT), str(path))
with full_path.open('rb') as f:
while True:
data = f.read(chunk_size)
if not data:
break
yield data

def __str__(self):
return f"MountedBucket<{self.name} | on: {self._service_name}>"


class MappedBucket:
"""
Wraps a bucket and provides various convenience features to it (e.g. index based access).
Expand Down
Loading

0 comments on commit 2303a7b

Please sign in to comment.