Skip to content

Commit

Permalink
Move setup code from cloud-storage notebooks to lib (#41)
Browse files Browse the repository at this point in the history
* Github get version and dl url

* Requests types for types checking

* Retrieve jar function and tests

* Small comment

* BFS upload function, no tests

* BFS basic test

* BFS bucket mocking

* Unused import

* Cloud storage extension scripts

* Make lint happy
  • Loading branch information
Shmuma authored Jan 9, 2024
1 parent 589c8a7 commit f4e45f0
Show file tree
Hide file tree
Showing 7 changed files with 301 additions and 1 deletion.
30 changes: 30 additions & 0 deletions exasol/bfs_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
"""
Bucketfs-related functions.
"""
import pathlib
import logging
import exasol.bucketfs as bfs # type: ignore


_logger = logging.getLogger(__name__)


def put_file(bucket: bfs.Bucket, file_path: pathlib.Path,
skip_if_exists: bool = True) -> pathlib.Path:
"""
Uploads given file into bucketfs
:param bucket: bucket to use
:param file_path: local file path to uplaod. File have to exist.
:param skip_if_exists: Do not upload if file already present in the bucketfs.
:return: Path in the bucketfs.
"""
if not file_path.exists():
raise ValueError(f"Local file doesn't exist: {file_path}")
local_name = file_path.name
if skip_if_exists and local_name in list(bucket):
_logger.info("File %s is already present in the bucketfs", local_name)
else:
_logger.info("Uploading file %s to bucketfs", local_name)
with file_path.open("rb") as file:
bucket.upload(local_name, file)
return pathlib.Path("/buckets/bfsdefault/") / bucket.name / local_name
48 changes: 48 additions & 0 deletions exasol/cloud_storage.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
import pyexasol # type: ignore


_SETUP_SQL = [
"OPEN SCHEMA {schema!i}",
"""
--/
CREATE OR REPLACE JAVA SET SCRIPT IMPORT_PATH(...) EMITS (...) AS
%scriptclass com.exasol.cloudetl.scriptclasses.FilesImportQueryGenerator;
%jar {jar_path!r};
/
""",
"""
--/
CREATE OR REPLACE JAVA SCALAR SCRIPT IMPORT_METADATA(...)
EMITS (
filename VARCHAR(2000),
partition_index VARCHAR(100),
start_index DECIMAL(36, 0),
end_index DECIMAL(36, 0)
) AS
%scriptclass com.exasol.cloudetl.scriptclasses.FilesMetadataReader;
%jar {jar_path!r};
/
""",
"""
--/
CREATE OR REPLACE JAVA SET SCRIPT IMPORT_FILES(...) EMITS (...) AS
%scriptclass com.exasol.cloudetl.scriptclasses.FilesDataImporter;
%jar {jar_path!r};
/
"""
]


def setup_scripts(db_connection: pyexasol.ExaConnection, schema_name: str, bucketfs_jar_path: str):
"""
Perform initialization of scripts for could-storage-extension.
:param db_connection: DB connection
:param schema_name: name of the schema to be used.
:param bucketfs_jar_path: path to cloud-storage-extension jar in BucketFS
:return:
"""
for sql in _SETUP_SQL:
db_connection.execute(sql, query_params={
"schema": schema_name,
"jar_path": bucketfs_jar_path,
})
78 changes: 78 additions & 0 deletions exasol/github.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
"""
Github-related utility functions - check for latest release of
project, retrieval of artefacts, etc.
"""
import enum
import requests
import pathlib
import logging
from typing import Tuple, Optional

_logger = logging.getLogger(__name__)


class Project(enum.Enum):
"""
Names of github projects to be retrieved. Values have to
match github project names.
"""
CLOUD_STORAGE_EXTENSION = "cloud-storage-extension"
KAFKA_CONNECTOR_EXTENSION = "kafka-connector-extension"


def get_latest_version_and_jar_url(project: Project) -> Tuple[str, str]:
"""
Retrieves the latest version of stable project release
and url with jar file from the release.
:param project: name of the project
:return: tuple with version and url to retrieve the artefact.
"""
req = requests.get(f"https://api.github.com/repos/exasol/{project.value}"
f"/releases/latest", timeout=10)
if req.status_code != 200:
raise RuntimeError("Error sending request to the github, code: %d" %
req.status_code)
data = req.json()
version = data.get('tag_name')
if version is None:
raise RuntimeError(f"The latest version of {project.value} "
f"has no tag, something is wrong")
for asset in data.get('assets', []):
name = asset['name']
if name.endswith(f"{version}.jar"):
dl_url = asset['browser_download_url']
return version, dl_url
raise RuntimeError("Could not find proper jar url for the latest release")


def retrieve_jar(project: Project, use_local_cache: bool = True,
storage_path: Optional[pathlib.Path] = None) -> pathlib.Path:
"""
Returns latest jar file for the project, possibly using local cache.
:param project: project to be used
:param use_local_cache: should local cache be used or file always retrieved anew
:param storage_path: path to be used for downloading.
If None, current directory will be used.
:return: path to the jar file on the local filesystem
"""
version, jar_url = get_latest_version_and_jar_url(project)
_, local_jar_name = jar_url.rsplit('/', maxsplit=1)
local_jar_path = pathlib.Path(local_jar_name)
if storage_path is not None:
if not storage_path.exists():
raise ValueError(f"Local storage path doesn't exist: {storage_path}")
local_jar_path = storage_path / local_jar_path

if use_local_cache and local_jar_path.exists():
_logger.info("Jar for version %s already exists in %s, skip downloading",
version, local_jar_path)
else:
_logger.info("Fetching jar for version %s from %s...", version, jar_url)
req = requests.get(jar_url, stream=True, timeout=10)
try:
count_bytes = local_jar_path.write_bytes(req.content)
_logger.info("Saved %d bytes in %s", count_bytes, local_jar_path)
finally:
req.close()
return local_jar_path
16 changes: 15 additions & 1 deletion poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ sqlcipher3 = {version = ">=0.5.0", platform = "darwin"}
sqlalchemy-exasol = ">=4.6.0"
pyexasol = ">=0.24.0"
exasol-bucketfs = ">=0.6.0"
requests = "^2.31.0"
types-requests = "^2.31.0.10"


[build-system]
Expand Down
46 changes: 46 additions & 0 deletions test/unit/test_bfs_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
import pytest
from unittest import mock
from typing import Generator
import pathlib
from exasol import bfs_utils


MOCKED_BUCKET = "bucket"
MOCKED_FILE_NAME = "bfs.file"


@mock.patch("exasol.bucketfs.Bucket")
def test_put_file_basic(bfs_bucket: mock.MagicMock):
with pytest.raises(ValueError, match="Local file doesn't exist"):
bfs_utils.put_file(bfs_bucket, pathlib.Path("non/existent/local.file"))


@pytest.fixture
@mock.patch("exasol.bucketfs.Bucket")
def bucket_with_file(bfs_bucket: mock.MagicMock):
bfs_bucket.name = MOCKED_BUCKET
bfs_bucket.__iter__.return_value = iter([MOCKED_FILE_NAME])
bfs_bucket.upload.return_value = None
return bfs_bucket


@pytest.fixture
def temp_file(tmp_path) -> Generator[pathlib.Path, None, None]:
path = pathlib.Path(tmp_path) / MOCKED_FILE_NAME
path.write_text("data")
yield path
path.unlink()


def test_put_file_exists(caplog, bucket_with_file, temp_file):
caplog.set_level("INFO")
path = bfs_utils.put_file(bucket_with_file, temp_file)
assert str(path) == f"/buckets/bfsdefault/{MOCKED_BUCKET}/{MOCKED_FILE_NAME}"
assert "already present in the bucketfs" in caplog.text
assert not bucket_with_file.upload.called

caplog.clear()
path = bfs_utils.put_file(bucket_with_file, temp_file, skip_if_exists=False)
assert str(path) == f"/buckets/bfsdefault/{MOCKED_BUCKET}/{MOCKED_FILE_NAME}"
assert bucket_with_file.upload.called
assert "Uploading file" in caplog.text
82 changes: 82 additions & 0 deletions test/unit/test_github.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
import os
import pytest
import pathlib
import requests
from unittest import mock
from exasol import github

CSE_MOCK_URL = "https://github.com/some_path/exasol-cloud-storage-extension-2.7.8.jar"

MOCKED_RELEASES_RESULT = {
"tag_name": "2.7.8",
"assets": [
{
"name": "cloud-storage-extension-2.7.8-javadoc.jar",
"browser_download_url": "should_not_be_used",
},
{
"name": "exasol-cloud-storage-extension-2.7.8.jar",
"browser_download_url": CSE_MOCK_URL,
}
]
}


def mocked_requests_get(*args, **_):
res = mock.create_autospec(requests.Response)
res.status_code = 404
url = args[0]
if url.endswith("/releases/latest"):
if github.Project.CLOUD_STORAGE_EXTENSION.value in url:
res.status_code = 200
res.json = mock.MagicMock(return_value=MOCKED_RELEASES_RESULT)
elif github.Project.KAFKA_CONNECTOR_EXTENSION.value in url:
# used to test error handling
res.status_code = 500
elif url == CSE_MOCK_URL:
res.status_code = 200
res.content = b'binary data'
return res


@mock.patch("requests.get", side_effect=mocked_requests_get)
def test_get_latest_version_and_jar_url(_):
res = github.get_latest_version_and_jar_url(github.Project.CLOUD_STORAGE_EXTENSION)
assert res == ("2.7.8", CSE_MOCK_URL)

with pytest.raises(RuntimeError, match="Error sending request"):
github.get_latest_version_and_jar_url(github.Project.KAFKA_CONNECTOR_EXTENSION)


@mock.patch("requests.get", side_effect=mocked_requests_get)
def test_retrieve_jar(_, tmpdir, caplog):
# need this as retrieve_jar works with current directory in some cases
os.chdir(tmpdir)

# fetch for the first time, local dir
jar_path = github.retrieve_jar(github.Project.CLOUD_STORAGE_EXTENSION)
assert jar_path.exists()
assert jar_path.read_bytes() == b'binary data'

# ensure file is recreated without cache
old_ts = jar_path.lstat().st_ctime
jar_path = github.retrieve_jar(github.Project.CLOUD_STORAGE_EXTENSION, use_local_cache=False)
assert jar_path.exists()
assert old_ts < jar_path.lstat().st_ctime

# but with enabled cache, file is preserved
caplog.set_level("INFO")
caplog.clear()
old_ts = jar_path.lstat().st_ctime_ns
jar_path = github.retrieve_jar(github.Project.CLOUD_STORAGE_EXTENSION, use_local_cache=True)
assert jar_path.lstat().st_ctime_ns == old_ts
assert "skip downloading" in caplog.text

# test storage path specification
caplog.clear()
stg_path = pathlib.Path(tmpdir.mkdir("sub"))
jar_path_sub = github.retrieve_jar(github.Project.CLOUD_STORAGE_EXTENSION,
use_local_cache=True, storage_path=stg_path)
assert jar_path_sub.exists()
assert jar_path != jar_path_sub
assert "Fetching jar" in caplog.text

0 comments on commit f4e45f0

Please sign in to comment.