Skip to content

Commit

Permalink
Merge pull request #7 from helxplatform/develop
Browse files Browse the repository at this point in the history
avalon release
  • Loading branch information
YaphetKG authored Apr 18, 2024
2 parents 84419ad + 849ac13 commit 6b366a0
Show file tree
Hide file tree
Showing 6 changed files with 112 additions and 17 deletions.
5 changes: 3 additions & 2 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
lakefs-sdk==1.8.0
lakefs-sdk==1.12
pydantic
pyyaml
httpx
boto3
retrying
retrying
requests
3 changes: 2 additions & 1 deletion setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,13 @@ packages = find:
python_requires = >=3.10
include_package_data = true
install_requires =
lakefs-sdk==1.8.0
lakefs-sdk==1.12
pydantic
pyyaml
httpx
boto3
retrying
requests

[options.entry_points]
console_scripts =
Expand Down
52 changes: 41 additions & 11 deletions src/avalon/operations/LakeFsWrapper.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
import json
import logging
import os
import urllib.parse
import requests

import boto3
from lakefs_sdk.client import LakeFSClient
from lakefs_sdk import Configuration, CommitCreation, BranchCreation, exceptions

Expand All @@ -18,10 +21,6 @@ def __init__(self, configuration: Configuration):
os.environ.get('')
self._config = configuration
self._client = LakeFSClient(configuration=configuration)
self._s3_client = boto3.client('s3',
endpoint_url=self._config.host,
aws_access_key_id=self._config.username,
aws_secret_access_key=self._config.password)

def list_repo(self) -> list[Repository]:
"""
Expand Down Expand Up @@ -81,11 +80,24 @@ def upload_files(self, branch: str, repository: str, files: List[str], dest_path
"""
This function uploads files
"""
login_cookie = self._get_login_cookie()
for i in range(len(files)):
self._client.objects_api.upload_object(repository=repository,
branch=branch,
path=dest_paths[i],
content=files[i])
with open(files[i], 'rb') as f:
url = f'{self._config.host}/repositories/{urllib.parse.quote_plus(repository)}/branches/{urllib.parse.quote_plus(branch)}/objects?path={urllib.parse.quote_plus(dest_paths[i])}'
res = requests.post(url, data=f, cookies=login_cookie)
if res.status_code != 201:
raise Exception(f"Failed to upload file to lakefs: {res.text}")
logging.info(f'Upload file result: {res.text}')

def _get_login_cookie(self):
login_url = f"{self._config.host}/auth/login"
auth_resp = requests.post(login_url, json={"access_key_id": self._config.username,
"secret_access_key": self._config.password})
if auth_resp.status_code != 200:
raise Exception(f"Authentication to lakefs failed: {auth_resp.status_code}")

return auth_resp.cookies


def upload_file(self, branch: str, repository: str, content: str, dest_path: str):
"""
Expand Down Expand Up @@ -188,9 +200,27 @@ def download_files(self, remote_files: List[str], local_path: str, repository: s
self.download_file(dest_path, branch, location, repository)

def download_file(self, dest_path, branch, location, repository):
obj_bytes = self._client.objects_api.get_object(repository=repository, ref=branch, path=location)
logging.info("Downloading file: {0}, {1}, {2}".format(branch, location, repository))
file_info = self._client.objects_api.stat_object(repository=repository, ref=branch, path=location)
file_size = file_info.size_bytes
logging.info("File size: {0}".format(file_size))
chunk = 32 * 1024 * 1024
current_pos = 0

with open(dest_path, 'wb') as f:
f.write(obj_bytes)
while current_pos < file_size:
from_bytes = current_pos
to_bytes = min(current_pos + chunk, file_size - 1)
logging.info("Downloading bytes: {0} - {1}".format(from_bytes, to_bytes))
obj_bytes = self._client.objects_api.get_object(repository=repository,
ref=branch,
path=location,
range="bytes={0}-{1}".format(from_bytes, to_bytes))
f.write(obj_bytes)
current_pos = to_bytes + 1

logging.info("Downloading completed: {0}".format(current_pos - 1))


def create_branch(self, branch_name: str, repository_name: str, source_branch: str = "main"):
"""
Expand Down
Binary file added tst/data/test4/biggerfile1.zip
Binary file not shown.
11 changes: 11 additions & 0 deletions tst/data/test4/file1.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
Testing started at 6:20 PM ...
Launching unittests with arguments python -m unittest LakeFsWrapperTests.LakeFsWrapperTests.test_FileIntegrityTest in /home/admin2/Documents/avalon/tst/operations

INFO:root:Upload file result: {"checksum":"00cedcf91beffa9ee69f6cfe23a4602d","content_type":"application/octet-stream","metadata":{},"mtime":1707175217,"path":"integrity/file1.txt","path_type":"object","physical_address":"local:///home/lakefs/lakefs/data/block/bigpipelineoperation3/data/gi6pdsdhjao0477k2hn0/cn0mqcdhjao0477k2hs0","size_bytes":10}

Upload file result: {"checksum":"00cedcf91beffa9ee69f6cfe23a4602d","content_type":"application/octet-stream","metadata":{},"mtime":1707175217,"path":"integrity/file1.txt","path_type":"object","physical_address":"local:///home/lakefs/lakefs/data/block/bigpipelineoperation3/data/gi6pdsdhjao0477k2hn0/cn0mqcdhjao0477k2hs0","size_bytes":10}

INFO:root:Upload file result: {"checksum":"a1763560d8361f8f2c3ce792895f4e7c","content_type":"application/octet-stream","metadata":{},"mtime":1707175221,"path":"integrity/biggerfile1.zip","path_type":"object","physical_address":"local:///home/lakefs/lakefs/data/block/bigpipelineoperation3/data/gi6pdsdhjao0477k2hn0/cn0mqd5hjao0477k2hsg","size_bytes":41140776}

Upload file result: {"checksum":"a1763560d8361f8f2c3ce792895f4e7c","content_type":"application/octet-stream","metadata":{},"mtime":1707175221,"path":"integrity/biggerfile1.zip","path_type":"object","physical_address":"local:///home/lakefs/lakefs/data/block/bigpipelineoperation3/data/gi6pdsdhjao0477k2hn0/cn0mqd5hjao0477k2hsg","size_bytes":41140776}

58 changes: 55 additions & 3 deletions tst/operations/LakeFsWrapperTests.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import os.path
import hashlib
import logging
import sys
import unittest
from datetime import datetime

Expand All @@ -21,6 +23,12 @@ class LakeFsWrapperTests(unittest.TestCase):
@classmethod
def setUpClass(cls):
create_dirs(["temp"])
logging.basicConfig(
level=logging.INFO,
)
handler = logging.StreamHandler(sys.stdout)
logging.getLogger().addHandler(handler)


def get_config(self) -> Configuration:
config = lakefs_sdk.Configuration(host='http://localhost:8001/api/v1',
Expand All @@ -34,6 +42,11 @@ def get_config(self) -> Configuration:
config.temp_folder_path = LOCALTEMPPATH
return config

def test_login(self):
lfs = LakeFsWrapper(configuration=self.get_config())
cookies = lfs._get_login_cookie()
self.assertIsNotNone(cookies)

def test_ListRepositories(self):
lfs = LakeFsWrapper(configuration=self.get_config())
repos = lfs.list_repo()
Expand Down Expand Up @@ -65,15 +78,46 @@ def test_Upload(self):
commit_date=datetime.now()
)
lfs.upload_files(cmt.branch, cmt.repo, files, dest_paths)
lfs.upload_file(cmt.branch, cmt.repo, "TEST111", os.path.join(root_destpath, "lastrun.roger"))

lfs.commit_files(cmt)

def test_GetFiles(self):
lfs = LakeFsWrapper(configuration=self.get_config())
files = lfs.get_filelist("main", BIGPIPELINEOPERATION, "20001212")
lfs.download_files(files, LOCALTEMPPATH, BIGPIPELINEOPERATION, "main")

def test_FileIntegrityTest_Upload(self):
lfs = LakeFsWrapper(configuration=self.get_config())
rootpath = "../data/test4/"
files = get_filepaths(rootpath)
root_destpath = "integrity/"
dest_paths = get_dest_filepaths(files, rootpath, root_destpath)

cmt_meta = CommitMetaData(pipeline_id="pipeline_1", task_name="task_name_1", task_image="docker_image_1")
cmt = Commit(
message=f"test4_fileintegritytest",
repo=BIGPIPELINEOPERATION,
branch="main",
metadata=cmt_meta,
files_added=files,
committer="avalon",
commit_date=datetime.now()
)
lfs.upload_files(cmt.branch, cmt.repo, files, dest_paths)
lfs.commit_files(cmt)

files = lfs.get_filelist("main", BIGPIPELINEOPERATION, "intergrity")
lfs.download_files(files, LOCALTEMPPATH, BIGPIPELINEOPERATION, "main")

def test_FileIntegrityTest_Download(self):
lfs = LakeFsWrapper(configuration=self.get_config())
files = lfs.get_filelist("main", BIGPIPELINEOPERATION, "integrity")
lfs.download_files(files, LOCALTEMPPATH, BIGPIPELINEOPERATION, "main")

file1_hash = self.calculateHash(LOCALTEMPPATH + "/integrity/" + "file1.txt")
biggerfile_hash = self.calculateHash(LOCALTEMPPATH + "/integrity/" + "biggerfile1.zip")
self.assertEqual('7fac80f5d8c2d0baf6fca348bfca7ac2696b580b012481d40e171e2cfcc2d26b86f11b6046903e710e51aba474cf3e0347542d2fa50e4e2fa5194ed520148b81', file1_hash)
self.assertEqual('b7c3db9e3ff6e3fe7b6d0f01a95d491fcf1d5f947878ab053c36dfd2549146ee40a3ed1ddd122b12410c11361cd3c94e1a959a11ce70888b2e31decee6388f33', biggerfile_hash)

#expected to fail
def test_GetNonExistingFiles(self):
lfs = LakeFsWrapper(configuration=self.get_config())
Expand All @@ -90,6 +134,14 @@ def test_ListFiles(self):
lfs = LakeFsWrapper(configuration=self.get_config())
files = lfs.get_filelist("main", BIGPIPELINEOPERATION, "20001212")

def calculateHash(self, path: str) -> str:
with open(path, "rb") as f:
file_hash = hashlib.sha512()
while chunk := f.read(8192):
file_hash.update(chunk)

return file_hash.hexdigest()


if __name__ == '__main__':
unittest.main()

0 comments on commit 6b366a0

Please sign in to comment.