Skip to content

Commit

Permalink
Merge pull request #10 from helxplatform/develop
Browse files Browse the repository at this point in the history
release
  • Loading branch information
YaphetKG authored Oct 22, 2024
2 parents 33cce37 + d93bc40 commit c100ff8
Show file tree
Hide file tree
Showing 17 changed files with 139 additions and 50 deletions.
17 changes: 10 additions & 7 deletions src/avalon/mainoperations.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ def get_files(local_path: str,
branch: str,
lake_fs_client: LakeFsWrapper,
changes_only: bool,
changes_from: str = None):
changes_from: str = None,
changes_to: str = None):
all_repos = [r.Id for r in lake_fs_client.list_repo()]

if not os.path.exists(local_path):
Expand All @@ -45,15 +46,16 @@ def get_files(local_path: str,
if changes_only:
try:
# if commit_id is empty nonexistant then we will get all files, not just changed ones
filelist = lake_fs_client.get_changes(repository=repo, branch=branch, remote_path=remote_path, from_commit_id=changes_from)
filelist = lake_fs_client.get_changes(repository=repo, branch=branch, remote_path=remote_path,
from_commit_id=changes_from, to_commit_id=changes_to)
except NotFoundException:
filelist = lake_fs_client.get_filelist(repository=repo, branch=branch, remote_path=remote_path)
else:
filelist = lake_fs_client.get_filelist(repository=repo, branch=branch, remote_path=remote_path)

try:
logger.info("Trying to download files from LakeFS")
lake_fs_client.download_files(remote_files=filelist, local_path=local_path, repository=repo, branch=branch)
lake_fs_client.download_files(remote_files=filelist, local_path=local_path, repository=repo, branch_or_commit_id=branch)
logger.info("Downloading files from LakeFS completed")
except Exception as ex:
logger.info("Failed to download files from LakeFS ")
Expand All @@ -72,8 +74,9 @@ def put_files(local_path: str,
task_args,
lake_fs_client: LakeFsWrapper,
s3storage: bool,
commit_id: str):
_create_repositry_branch_IfNotExists(branch, lake_fs_client, repo, s3storage)
commit_id: str,
source_branch_name: None):
_create_repositry_branch_IfNotExists(branch, lake_fs_client, repo, s3storage, source_branch_name)

files = get_filepaths(local_path)
dest_paths = get_dest_filepaths(files, local_path, remote_path)
Expand Down Expand Up @@ -137,7 +140,7 @@ def get_commit_id_by_input_commit_id(branch, lake_fs_client, remote_path, repo,
return commit_id


def _create_repositry_branch_IfNotExists(branch, lake_fs_client, repo, s3storage):
def _create_repositry_branch_IfNotExists(branch, lake_fs_client, repo, s3storage, source_branch_name=None):
all_repos = [r.Id for r in lake_fs_client.list_repo()]

if not repo in all_repos:
Expand All @@ -146,5 +149,5 @@ def _create_repositry_branch_IfNotExists(branch, lake_fs_client, repo, s3storage
r = Repository(repo, f"s3://{repo}/")
lake_fs_client.create_repository(r)
if branch != "main":
lake_fs_client.create_branch(branch, repo)
lake_fs_client.create_branch(branch, repo, source_branch=source_branch_name)

68 changes: 50 additions & 18 deletions src/avalon/operations/LakeFsWrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import requests

from lakefs_sdk.client import LakeFSClient
from lakefs_sdk import Configuration, CommitCreation, BranchCreation, exceptions
from lakefs_sdk import Configuration, CommitCreation, BranchCreation, exceptions, TagCreation

from typing import List

Expand Down Expand Up @@ -42,6 +42,13 @@ def create_repository(self, repo: Repository) -> None:
self._client.repositories_api.create_repository(
repository_creation=RepositoryCreation(name=repo.Id, storage_namespace=repo.StorageNamespace))

def delete_repository(self, repo: Repository) -> None:
"""
deletes repository
:param repo: repository name
"""
self._client.repositories_api.delete_repository(repository=repo.Id, storage_namespace=repo.StorageNamespace)

def list_branches(self, repository_name: str):
"""
List branches in a repo
Expand Down Expand Up @@ -139,13 +146,14 @@ def get_filelist(self, branch: str, repository: str, remote_path: str) -> List[s
matching_files = list(filter(lambda f: f.startswith(remote_path) or remote_path == '*', paths))
return matching_files

def get_changes(self, branch: str, repository: str, remote_path: str, from_commit_id: str) -> List[str]:
def get_changes(self, branch: str, repository: str, remote_path: str, from_commit_id: str, to_commit_id: str = None) -> List[str]:
"""
Returns list of remote paths that were changed since specified commit
:param branch: branch name
:param repository: repository name
:param remote_path: path as in Lakefs
:param from_commit_id: id of a commit
:param to_commit_id: id of a commit (optional)
:return: list ot remote paths in LakeFs
"""
files_changed = []
Expand All @@ -155,17 +163,21 @@ def get_changes(self, branch: str, repository: str, remote_path: str, from_commi
commits = self.list_commits(repository_name=repository, branch_name=branch, path=remote_path.split("/")).results
commits_to_proc = []

for commit in commits:
if commit.id != from_commit_id:
commits_to_proc.append(commit)
else:
break
if to_commit_id is None:

if len(commits_to_proc) == 0:
raise NotFoundException()
for commit in commits:
if commit.id != from_commit_id:
commits_to_proc.append(commit)
else:
break

if len(commits_to_proc) == 0:
raise NotFoundException()

files = self._client.refs_api.diff_refs(repository=repository, right_ref=commits_to_proc[0].id, left_ref=from_commit_id).results
else:
files = self._client.refs_api.diff_refs(repository=repository, right_ref=to_commit_id, left_ref=from_commit_id).results

# for commit in commits_to_proc:
files = self._client.refs_api.diff_refs(repository=repository, right_ref=commits_to_proc[0].id, left_ref=from_commit_id).results
for file in files:
if file.type == 'added':
files_added.append(file.path)
Expand All @@ -181,13 +193,13 @@ def get_changes(self, branch: str, repository: str, remote_path: str, from_commi
matching_files = list(filter(lambda f: f.startswith(remote_path), paths))
return matching_files

def download_files(self, remote_files: List[str], local_path: str, repository: str, branch: str) -> None:
def download_files(self, remote_files: List[str], local_path: str, repository: str, branch_or_commit_id: str) -> None:
"""
Downloads files from LakeFs
:param remote_files: list ot remote paths in LakeFs
:param local_path: local path, destination for files
:param repository: repository name
:param branch: branch name
:param branch_or_commit_id: branch name or commit_id
:return: None
"""
dirs = set(map(lambda x: os.path.join(local_path, os.path.dirname(x)), remote_files))
Expand All @@ -197,11 +209,11 @@ def download_files(self, remote_files: List[str], local_path: str, repository: s
dir_name = os.path.dirname(location)
dest_path = os.path.join(local_path, dir_name, file_name)

self.download_file(dest_path, branch, location, repository)
self.download_file(dest_path, branch_or_commit_id, location, repository)

def download_file(self, dest_path, branch, location, repository):
logging.info("Downloading file: {0}, {1}, {2}".format(branch, location, repository))
file_info = self._client.objects_api.stat_object(repository=repository, ref=branch, path=location)
def download_file(self, dest_path, branch_or_commit_id, location, repository):
logging.info("Downloading file: {0}, {1}, {2}".format(branch_or_commit_id, location, repository))
file_info = self._client.objects_api.stat_object(repository=repository, ref=branch_or_commit_id, path=location)
file_size = file_info.size_bytes
logging.info("File size: {0}".format(file_size))
chunk = 32 * 1024 * 1024
Expand All @@ -213,7 +225,7 @@ def download_file(self, dest_path, branch, location, repository):
to_bytes = min(current_pos + chunk, file_size - 1)
logging.info("Downloading bytes: {0} - {1}".format(from_bytes, to_bytes))
obj_bytes = self._client.objects_api.get_object(repository=repository,
ref=branch,
ref=branch_or_commit_id,
path=location,
range="bytes={0}-{1}".format(from_bytes, to_bytes))
f.write(obj_bytes)
Expand All @@ -233,3 +245,23 @@ def create_branch(self, branch_name: str, repository_name: str, source_branch: s
commit_id = self._client.branches_api.create_branch(repository=repository_name,
branch_creation=branch_creation)
return {"commit_id": commit_id, "id": branch_name}


def create_tag(self, repository_name: str, commit_id: str, tag_name: str):
"""
Creates a new tag
"""
logging.info("Creating new tag: {0}".format(tag_name))
self._client.tags_api.create_tag(repository=repository_name, tag_creation=TagCreation(id=tag_name, ref=commit_id))
logging.info("Creating of new tag completed")


def get_tags(self, repository_name: str) -> dict[str, str]:
"""
Returns list of tags for a given repository.
"""
resp = self._client.tags_api.list_tags(repository=repository_name)
res = dict[str, str]()
for tag in resp.results:
res[tag.id] = tag.commit_id
return res
1 change: 1 addition & 0 deletions tst/data/test2/dir1/file2.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
2222 ver1
1 change: 1 addition & 0 deletions tst/data/test2/dir1/file3.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
3333 ver 1
1 change: 1 addition & 0 deletions tst/data/test2/file1.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
TESTSTESTTESTSTEST ver 1
1 change: 1 addition & 0 deletions tst/data/test3/dir1/file2.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
2222 ver 2
1 change: 1 addition & 0 deletions tst/data/test3/dir1/file3.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
3333 ver 2
1 change: 1 addition & 0 deletions tst/data/test3/dir1/file4.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
444 ver 1
1 change: 1 addition & 0 deletions tst/data/test3/file1.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
TESTSTESTTESTSTEST ver2
1 change: 1 addition & 0 deletions tst/data/test_get_changes/commit_1/f1.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
v1
1 change: 1 addition & 0 deletions tst/data/test_get_changes/commit_2/f2.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
v1
1 change: 1 addition & 0 deletions tst/data/test_get_changes/commit_3/f2.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
v2
1 change: 1 addition & 0 deletions tst/data/test_get_changes/commit_3/f3.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
v3
1 change: 1 addition & 0 deletions tst/data/test_get_changes/commit_4/f4.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
v1
49 changes: 43 additions & 6 deletions tst/operations/LakeFsWrapperTests.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ def test_CreateRepository(self):

def test_Upload(self):
lfs = LakeFsWrapper(configuration=self.get_config())
rootpath = "../data/test1/"
rootpath = "./data/test1/"
files = get_filepaths(rootpath)
root_destpath = "20001212/"
dest_paths = get_dest_filepaths(files, rootpath, root_destpath)
Expand Down Expand Up @@ -87,7 +87,7 @@ def test_GetFiles(self):

def test_FileIntegrityTest_Upload(self):
lfs = LakeFsWrapper(configuration=self.get_config())
rootpath = "../data/test4/"
rootpath = "./data/test4/"
files = get_filepaths(rootpath)
root_destpath = "integrity/"
dest_paths = get_dest_filepaths(files, rootpath, root_destpath)
Expand Down Expand Up @@ -124,11 +124,48 @@ def test_GetNonExistingFiles(self):
files = lfs.get_filelist("main", BIGPIPELINEOPERATION, "20001212")
lfs.download_files(['filenotexist.txt'], LOCALTEMPPATH, BIGPIPELINEOPERATION, "main")

def test_GetChanges(self):

def _put_files(self, repo: str, path_to_commit: str, dest_path: str):
lfs = LakeFsWrapper(configuration=self.get_config())

files = get_filepaths(path_to_commit)
dest_paths = get_dest_filepaths(files, path_to_commit, dest_path)

cmt = Commit(
message=f"commit pushed by task: task_name_1, pipeline: pipeline_1",
repo=repo,
branch="main",
metadata=CommitMetaData(pipeline_id="pipeline_1", task_name="task_name_1", task_image="docker_image_1"),
files_added=files,
committer="avalon",
commit_date=datetime.now()
)
lfs.upload_files(cmt.branch, cmt.repo, files, dest_paths)
lfs.commit_files(cmt)

def test_GetChanges_2(self):
repo = "getchangesrepo"
path_in_repo = "test_changes"
lfs = LakeFsWrapper(configuration=self.get_config())
lfs.create_repository(Repository(repo, f"local://{repo}/"))
self._put_files(repo, "./data/test_get_changes/commit_1/", "test_changes")
self._put_files(repo, "./data/test_get_changes/commit_2/", "test_changes")
self._put_files(repo, "./data/test_get_changes/commit_3/", "test_changes")
self._put_files(repo, "./data/test_get_changes/commit_4/", "test_changes")

lfs = LakeFsWrapper(configuration=self.get_config())
files = lfs.get_changes("main", BIGPIPELINEOPERATION, "20001212",
"e5759e1ea49a81bdfcd8acfef186dfb04458e5baaca03c38fca58d79f662d2ac")
print(files)

commits = lfs.list_commits(repo, "main", path_in_repo.split("/"))
files = lfs.get_changes("main", repo, path_in_repo, from_commit_id=commits.results[2].id)
# it should get all files between commit_2 and commit_4
self.assertListEqual(files, ['test_changes/f2.txt', 'test_changes/f3.txt', 'test_changes/f4.txt'])

# it should get all files between commit_2 and commit_3
files = lfs.get_changes("main", repo, path_in_repo, from_commit_id=commits.results[2].id, to_commit_id=commits.results[1].id)
self.assertListEqual(files, ['test_changes/f2.txt', 'test_changes/f3.txt'])

lfs.download_files(['test_changes/f2.txt'], LOCALTEMPPATH, repo, commits.results[0].id)


def test_ListFiles(self):
lfs = LakeFsWrapper(configuration=self.get_config())
Expand Down
Loading

0 comments on commit c100ff8

Please sign in to comment.