From 93972c6fee0d1e4da8d47945fd9b76f87dfb94e2 Mon Sep 17 00:00:00 2001 From: Lynn Rozen Date: Thu, 2 Nov 2023 14:34:06 +0200 Subject: [PATCH] changed to sdk client, create repo for every test --- test/conftest.py | 36 ++++++------------- test/requirements.txt | 4 +-- test/test_iceberg.py | 84 ++++++++++++++++++++----------------------- 3 files changed, 51 insertions(+), 73 deletions(-) diff --git a/test/conftest.py b/test/conftest.py index f0f86d4..442fc3e 100644 --- a/test/conftest.py +++ b/test/conftest.py @@ -4,13 +4,9 @@ import pytest -import lakefs_client -from lakefs_client.client import LakeFSClient -from lakefs_client.models import * -from lakefs_client.model.access_key_credentials import AccessKeyCredentials -from lakefs_client.model.comm_prefs_input import CommPrefsInput -from lakefs_client.model.setup import Setup -from lakefs_client.model.repository_creation import RepositoryCreation +import lakefs_sdk +from lakefs_sdk.client import LakeFSClient +from lakefs_sdk.models import * LAKEFS_ACCESS_KEY = 'AKIAIOSFODNN7EXAMPLE' LAKEFS_SECRET_KEY = 'wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY' @@ -29,15 +25,9 @@ def pytest_addoption(parser): def lakefs_repo(request): return request.config.getoption('--repository') -# @pytest.fixture -# def lakeFS_args(request): -# args = {} -# args['storage_namespace'] = request.config.getoption('--storage_namespace') -# args['repository'] = request.config.getoption('--repository') -# args['lakefs_access_key'] = 'AKIAIOSFODNN7EXAMPLE' -# args['lakefs_secret_key'] = 'wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY' -# return args - +@pytest.fixture +def storage_namespace(request): + return request.config.getoption('--storage_namespace') @pytest.fixture(scope="session") def spark(pytestconfig): @@ -46,6 +36,7 @@ def spark(pytestconfig): spark_config.set("spark.sql.catalog.lakefs", "org.apache.iceberg.spark.SparkCatalog") spark_config.set("spark.sql.catalog.lakefs.catalog-impl", "io.lakefs.iceberg.LakeFSCatalog") spark_config.set("spark.sql.catalog.lakefs.warehouse", f"lakefs://{repo_name}") + spark_config.set("spark.sql.catalog.lakefs.cache-enabled", "false") spark_config.set("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") spark_config.set("spark.hadoop.fs.s3a.endpoint", "http://localhost:8000") spark_config.set("spark.hadoop.fs.s3a.access.key", LAKEFS_ACCESS_KEY) @@ -60,17 +51,12 @@ def spark(pytestconfig): @pytest.fixture(scope="session") def lfs_client(pytestconfig): lfs_client = LakeFSClient( - lakefs_client.Configuration(username=LAKEFS_ACCESS_KEY, + lakefs_sdk.Configuration(username=LAKEFS_ACCESS_KEY, password=LAKEFS_SECRET_KEY, host='http://localhost:8000')) # Setup lakeFS - repo_name = pytestconfig.getoption('--repository') - storage_namespace = pytestconfig.getoption('--storage_namespace') - lfs_client.config.setup_comm_prefs(CommPrefsInput(feature_updates=False, security_updates=False, email=MOCK_EMAIL)) - lfs_client.config.setup(Setup(username="lynn", - key=AccessKeyCredentials(access_key_id=LAKEFS_ACCESS_KEY, secret_access_key=LAKEFS_SECRET_KEY))) - - lfs_client.repositories.create_repository( - RepositoryCreation(name=repo_name, storage_namespace=storage_namespace)) + lfs_client.internal_api.setup_comm_prefs(CommPrefsInput(feature_updates=False, security_updates=False, email=MOCK_EMAIL)) + # lfs_client.internal_api.setup(Setup(username="lynn", + # key=AccessKeyCredentials(access_key_id=LAKEFS_ACCESS_KEY, secret_access_key=LAKEFS_SECRET_KEY))) return lfs_client \ No newline at end of file diff --git a/test/requirements.txt b/test/requirements.txt index be19ac2..6dfaec4 100644 --- a/test/requirements.txt +++ b/test/requirements.txt @@ -1,3 +1,3 @@ -lakefs_client==0.104.0 +lakefs_sdk==1.1.0 pyspark==3.3.2 -pytest==7.4.0 \ No newline at end of file +pytest==7.4.0 diff --git a/test/test_iceberg.py b/test/test_iceberg.py index 0c78303..e798646 100644 --- a/test/test_iceberg.py +++ b/test/test_iceberg.py @@ -1,15 +1,5 @@ -import pytest - -import lakefs_client -from lakefs_client.client import LakeFSClient -from lakefs_client.models import * -from lakefs_client.model.access_key_credentials import AccessKeyCredentials -from lakefs_client.model.comm_prefs_input import CommPrefsInput -from lakefs_client.model.setup import Setup -from lakefs_client.model.repository_creation import RepositoryCreation -import pyspark -from pyspark.sql import SparkSession -from pyspark.conf import SparkConf +import lakefs_sdk.client +from lakefs_sdk.models import * from pyspark.sql.types import StructType, StructField, StringType, IntegerType @@ -28,52 +18,54 @@ def get_data(spark): StructField("gender", StringType(), True), ]) df = spark.createDataFrame(data=data_set,schema=schema) - df.printSchema() - df.show(truncate=False) return df -def test_diff_two_same_branches(spark, lfs_client, lakefs_repo): - print("repo name ", lakefs_repo) + +def initiate_repo_with_data(spark, lfs_client: lakefs_sdk.client.LakeFSClient, repo_name, storage_namespace, test_name): + storage_namespace = f"{storage_namespace}/{test_name}" + lfs_client.repositories_api.create_repository( + RepositoryCreation(name=repo_name, storage_namespace=storage_namespace)) df = get_data(spark) df.write.saveAsTable("lakefs.main.company.workers") + lfs_client.commits_api.commit(repo_name, "main", CommitCreation(message="Initial data load")) + - #Commit, create a new branch, check that the tables are the same - lfs_client.commits.commit(lakefs_repo, "main", CommitCreation(message="Initial data load")) - lfs_client.branches.create_branch(lakefs_repo, BranchCreation(name="dev", source="main")) +def test_diff_two_same_branches(spark, lfs_client: lakefs_sdk.client.LakeFSClient, lakefs_repo, storage_namespace): + repo_name = f"{lakefs_repo}_test1" + initiate_repo_with_data(spark, lfs_client, repo_name, storage_namespace, "test1") + + #Create a new branch, check that the tables are the same + lfs_client.branches_api.create_branch(repo_name, BranchCreation(name="dev", source="main")) df_main = spark.read.table("lakefs.main.company.workers") df_dev = spark.read.table("lakefs.dev.company.workers") assert (df_main.schema == df_dev.schema) and (df_main.collect() == df_dev.collect()), "main and dev tables should be equal" -def test_delete_on_dev_and_merge(spark, lfs_client, lakefs_repo): - lfs_client.branches.create_branch(lakefs_repo, BranchCreation(name="test1", source="main")) - print("1") - spark.sql("SELECT * FROM lakefs.test1.company.workers").show() - spark.sql("DELETE FROM lakefs.test1.company.workers WHERE id = 6") - print("2") - spark.sql("SELECT * FROM lakefs.test1.company.workers").show() - lfs_client.commits.commit(lakefs_repo, "test1", CommitCreation(message="delete one row")) - merge_output = lfs_client.refs.merge_into_branch(lakefs_repo, "test1", "main") - print(merge_output) - print("3") - spark.sql("SELECT * FROM lakefs.main.company.workers").show() + +def test_delete_on_dev_and_merge(spark, lfs_client: lakefs_sdk.client.LakeFSClient, lakefs_repo, storage_namespace): + repo_name = f"{lakefs_repo}_test2" + initiate_repo_with_data(spark, lfs_client, repo_name, storage_namespace, "test2") + + lfs_client.branches_api.create_branch(repo_name, BranchCreation(name="test2", source="main")) + spark.sql("DELETE FROM lakefs.test2.company.workers WHERE id = 6") + lfs_client.commits_api.commit(repo_name, "test2", CommitCreation(message="delete one row")) + lfs_client.refs_api.merge_into_branch(repo_name, "test2", "main") df_main = spark.read.table("lakefs.main.company.workers") - df_dev = spark.read.table("lakefs.test1.company.workers") - assert (df_main.schema == df_dev.schema) and (df_main.collect() == df_dev.collect()), "main and test1 tables should be equal" + df_dev = spark.read.table("lakefs.test2.company.workers") + assert (df_main.schema == df_dev.schema) and (df_main.collect() == df_dev.collect()), "main and test2 tables should be equal" -def test_multiple_changes_and_merge(spark, lfs_client, lakefs_repo): - df = get_data(spark) - df.write.saveAsTable("lakefs.main.company.workers") +def test_multiple_changes_and_merge(spark, lfs_client: lakefs_sdk.client.LakeFSClient, lakefs_repo, storage_namespace): + repo_name = f"{lakefs_repo}_test3" + initiate_repo_with_data(spark, lfs_client, repo_name, storage_namespace, "test3") - lfs_client.commits.commit(lakefs_repo, "main", CommitCreation(message="Initial data load")) - lfs_client.branches.create_branch(lakefs_repo, BranchCreation(name="dev", source="main")) - spark.sql("DELETE FROM lakefs.dev.company.workers WHERE id = 6") - spark.sql("DELETE FROM lakefs.dev.company.workers WHERE id = 5") - spark.sql("INSERT INTO lakefs.dev.company.workers VALUES (7, 'Jhon', 'Smith', 33, 'M')") - spark.sql("DELETE FROM lakefs.dev.company.workers WHERE id = 4") - spark.sql("INSERT INTO lakefs.dev.company.workers VALUES (8, 'Marta', 'Green', 31, 'F')") - lfs_client.commits.commit(lakefs_repo, "dev", CommitCreation(message="Some changes")) - lfs_client.refs.merge_into_branch(lakefs_repo, "dev", "main") + lfs_client.branches.create_branch(repo_name, BranchCreation(name="test3", source="main")) + spark.sql("DELETE FROM lakefs.test3.company.workers WHERE id = 6") + spark.sql("DELETE FROM lakefs.test3.company.workers WHERE id = 5") + spark.sql("INSERT INTO lakefs.test3.company.workers VALUES (7, 'Jhon', 'Smith', 33, 'M')") + spark.sql("DELETE FROM lakefs.test3.company.workers WHERE id = 4") + spark.sql("INSERT INTO lakefs.test3.company.workers VALUES (8, 'Marta', 'Green', 31, 'F')") + lfs_client.commits.commit(repo_name, "test3", CommitCreation(message="Some changes")) + lfs_client.refs.merge_into_branch(repo_name, "test3", "main") df_main = spark.read.table("lakefs.main.company.workers") - df_dev = spark.read.table("lakefs.dev.company.workers") + df_dev = spark.read.table("lakefs.test3.company.workers") assert (df_main.schema == df_dev.schema) and (df_main.collect() == df_dev.collect()), "main and dev tables should be equal" \ No newline at end of file