forked from exasol/bucketfs-utils-python
-
Notifications
You must be signed in to change notification settings - Fork 1
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
#114 Added two more integration tests for the path builder.
- Loading branch information
Showing
1 changed file
with
87 additions
and
7 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,25 +1,105 @@ | ||
from exasol.bucketfs._path import build_path, SYSTEM_TYPE_ONPREM | ||
from typing import ByteString | ||
import pytest | ||
from exasol.bucketfs._path import PathLike, build_path, SYSTEM_TYPE_ONPREM | ||
from integration.conftest import delete_file | ||
|
||
|
||
def test_write_read_back(test_config): | ||
@pytest.fixture | ||
def children_poem() -> ByteString: | ||
poem_text = \ | ||
b"Twinkle twinkle little star." \ | ||
b"How I wonder what you are." \ | ||
b"Up above the world so high." \ | ||
b"Like a diamond in the sky." | ||
return poem_text | ||
|
||
|
||
@pytest.fixture | ||
def classic_poem() -> ByteString: | ||
poem_text = \ | ||
b"My heart's in the Highlands, my heart is not here," \ | ||
b"My heart's in the Highlands, a-chasing the deer;" \ | ||
b"Chasing the wild-deer, and following the roe," \ | ||
b"My heart's in the Highlands, wherever I go." | ||
return poem_text | ||
|
||
|
||
def _collect_all_names(path: PathLike) -> set[str]: | ||
all_names = [] | ||
for _, dirs, files in path.walk(): | ||
all_names.extend(dirs) | ||
all_names.extend(files) | ||
return set(all_names) | ||
|
||
|
||
def test_write_read_back_onprem(test_config, children_poem): | ||
|
||
file_name = 'my_poems/little_star.txt' | ||
data = b'twinkle twinkle little star how i wonder what you are' | ||
base_path = build_path(system=SYSTEM_TYPE_ONPREM, url=test_config.url, | ||
username=test_config.username, password=test_config.password) | ||
file_name = 'my_poems/little_star.txt' | ||
poem_path = base_path / file_name | ||
|
||
try: | ||
poem_path.write(data) | ||
poem_path.write(children_poem) | ||
data_back = b''.join(poem_path.read(20)) | ||
assert data_back == data | ||
assert data_back == children_poem | ||
finally: | ||
# cleanup | ||
delete_file( | ||
test_config.url, | ||
'default', | ||
test_config.username, | ||
test_config.password, | ||
file_name, | ||
file_name | ||
) | ||
|
||
|
||
def test_write_list_files_onprem(test_config, children_poem, classic_poem): | ||
|
||
base_path = build_path(system=SYSTEM_TYPE_ONPREM, url=test_config.url, path='my_poems', | ||
username=test_config.username, password=test_config.password) | ||
poem_path1 = base_path / 'children/little_star.txt' | ||
poem_path2 = base_path / 'classic/highlands.txt' | ||
|
||
try: | ||
poem_path1.write(children_poem) | ||
poem_path2.write(classic_poem) | ||
expected_names = {'my_poems', 'children', 'classic', | ||
'little_star.txt', 'highlands.txt'} | ||
assert _collect_all_names(base_path) == expected_names | ||
finally: | ||
# cleanup | ||
for poem_path in [poem_path1, poem_path2]: | ||
delete_file( | ||
test_config.url, | ||
'default', | ||
test_config.username, | ||
test_config.password, | ||
str(poem_path) | ||
) | ||
|
||
|
||
def test_write_delete_onprem(test_config, children_poem, classic_poem): | ||
|
||
base_path = build_path(system=SYSTEM_TYPE_ONPREM, url=test_config.url, | ||
username=test_config.username, password=test_config.password) | ||
poems_root = base_path / 'my_poems' | ||
poem_path1 = poems_root / 'children/little_star.txt' | ||
poem_path2 = poems_root / 'classic/highlands.txt' | ||
|
||
try: | ||
poem_path1.write(children_poem) | ||
poem_path2.write(classic_poem) | ||
poem_path1.rm() | ||
expected_names = {'my_poems', 'classic', 'highlands.txt'} | ||
assert _collect_all_names(poems_root) == expected_names | ||
finally: | ||
# cleanup | ||
for poem_path in [poem_path1, poem_path2]: | ||
delete_file( | ||
test_config.url, | ||
'default', | ||
test_config.username, | ||
test_config.password, | ||
str(poem_path) | ||
) |