-
Notifications
You must be signed in to change notification settings - Fork 1
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
added pytest #36
added pytest #36
Changes from 17 commits
d024cbc
78580e0
d424a4e
1218700
e09e564
7e22484
d296829
93972c6
61c0fce
4f61fd3
1efbda9
e8c42d2
3da1db0
5c77c91
04d6ad9
a35a727
33c78d9
8262468
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
@@ -0,0 +1,64 @@ | ||||||
import pyspark | ||||||
from pyspark.sql import SparkSession | ||||||
from pyspark.conf import SparkConf | ||||||
|
||||||
import pytest | ||||||
|
||||||
import lakefs_sdk | ||||||
from lakefs_sdk.client import LakeFSClient | ||||||
from lakefs_sdk.models import * | ||||||
|
||||||
LAKEFS_ACCESS_KEY = 'AKIAIOSFODNN7EXAMPLE' | ||||||
LAKEFS_SECRET_KEY = 'wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY' | ||||||
MOCK_EMAIL = "[email protected]" | ||||||
|
||||||
def pytest_addoption(parser): | ||||||
parser.addoption( | ||||||
'--storage_namespace', action='store', default='local://' | ||||||
) | ||||||
parser.addoption( | ||||||
'--repository', action='store', default='example' | ||||||
) | ||||||
|
||||||
|
||||||
@pytest.fixture | ||||||
def lakefs_repo(request): | ||||||
return request.config.getoption('--repository') | ||||||
|
||||||
|
||||||
@pytest.fixture(scope="session") | ||||||
def spark(pytestconfig): | ||||||
repo_name = pytestconfig.getoption('--repository') | ||||||
spark_config = SparkConf() | ||||||
spark_config.set("spark.sql.catalog.lakefs", "org.apache.iceberg.spark.SparkCatalog") | ||||||
spark_config.set("spark.sql.catalog.lakefs.catalog-impl", "io.lakefs.iceberg.LakeFSCatalog") | ||||||
spark_config.set("spark.sql.catalog.lakefs.warehouse", f"lakefs://{repo_name}") | ||||||
spark_config.set("spark.sql.catalog.lakefs.cache-enabled", "false") | ||||||
spark_config.set("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") | ||||||
spark_config.set("spark.hadoop.fs.s3a.endpoint", "http://localhost:8000") | ||||||
spark_config.set("spark.hadoop.fs.s3a.access.key", LAKEFS_ACCESS_KEY) | ||||||
spark_config.set("spark.hadoop.fs.s3a.secret.key", LAKEFS_SECRET_KEY) | ||||||
spark_config.set("spark.hadoop.fs.s3a.path.style.access", "true") | ||||||
spark_config.set("spark.jars.packages", "io.lakefs:lakefs-iceberg:1.0-SNAPSHOT,org.apache.iceberg:iceberg-spark-runtime-3.3_2.12:1.3.0,org.apache.hadoop:hadoop-aws:3.3.4,org.apache.hadoop:hadoop-client-api:3.3.4") | ||||||
|
||||||
spark = SparkSession.builder.config(conf=spark_config).getOrCreate() | ||||||
yield spark | ||||||
spark.stop() | ||||||
|
||||||
|
||||||
@pytest.fixture(scope="session") | ||||||
def lfs_client(pytestconfig): | ||||||
lfs_client = LakeFSClient( | ||||||
lakefs_sdk.Configuration(username=LAKEFS_ACCESS_KEY, | ||||||
password=LAKEFS_SECRET_KEY, | ||||||
host='http://localhost:8000')) | ||||||
|
||||||
# Setup lakeFS | ||||||
repo_name = pytestconfig.getoption('--repository') | ||||||
storage_namespace = pytestconfig.getoption('--storage_namespace') | ||||||
lfs_client.internal_api.setup_comm_prefs(CommPrefsInput(feature_updates=False, security_updates=False, email=MOCK_EMAIL)) | ||||||
lfs_client.internal_api.setup(Setup(username="lynn", | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||
key=AccessKeyCredentials(access_key_id=LAKEFS_ACCESS_KEY, secret_access_key=LAKEFS_SECRET_KEY))) | ||||||
lfs_client.repositories_api.create_repository( | ||||||
RepositoryCreation(name=repo_name, storage_namespace=storage_namespace)) | ||||||
return lfs_client | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Add line break at the end |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,2 +1,3 @@ | ||
lakefs_client==0.104.0 | ||
pyspark==3.3.2 | ||
lakefs_sdk==1.1.0 | ||
pyspark==3.3.2 | ||
pytest==7.4.0 |
This file was deleted.
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,59 @@ | ||
import lakefs_sdk.client | ||
from lakefs_sdk.models import * | ||
from pyspark.sql.types import StructType, StructField, StringType, IntegerType | ||
|
||
|
||
def get_data(spark): | ||
data_set = [(1, "James", "Smith", 32, "M"), | ||
(2, "Michael","Rose", 35, "M"), | ||
(3, "Robert", "Williams", 41, "M"), | ||
(4, "Maria", "Jones", 36, "F"), | ||
(5, "Jen","Brown", 44, "F"), | ||
(6, "Monika","Geller", 31, "F")] | ||
|
||
schema = StructType([StructField("id", StringType(), True), | ||
StructField("firstname", StringType(), True), | ||
StructField("lastname", StringType(), True), | ||
StructField("age", IntegerType(), True), | ||
StructField("gender", StringType(), True), | ||
]) | ||
df = spark.createDataFrame(data=data_set,schema=schema) | ||
return df | ||
|
||
|
||
def test_diff_two_same_branches(spark, lfs_client: lakefs_sdk.client.LakeFSClient, lakefs_repo): | ||
df = get_data(spark) | ||
df.write.saveAsTable("lakefs.main.company.workers") | ||
lfs_client.commits_api.commit(lakefs_repo, "main", CommitCreation(message="Initial data load")) | ||
|
||
#Create a new branch, check that the tables are the same | ||
lfs_client.branches_api.create_branch(lakefs_repo, BranchCreation(name="dev", source="main")) | ||
df_main = spark.read.table("lakefs.main.company.workers") | ||
df_dev = spark.read.table("lakefs.dev.company.workers") | ||
assert (df_main.schema == df_dev.schema) and (set(df_main.collect()) == set(df_dev.collect())), "main and dev tables should be equal" | ||
|
||
|
||
def test_delete_on_dev_and_merge(spark, lfs_client: lakefs_sdk.client.LakeFSClient, lakefs_repo): | ||
lfs_client.branches_api.create_branch(lakefs_repo, BranchCreation(name="test1", source="main")) | ||
lfs_client.branches_api.create_branch(lakefs_repo, BranchCreation(name="test2", source="test1")) | ||
spark.sql("DELETE FROM lakefs.test2.company.workers WHERE id = 6") | ||
lfs_client.commits_api.commit(lakefs_repo, "test2", CommitCreation(message="delete one row")) | ||
lfs_client.refs_api.merge_into_branch(lakefs_repo, "test2", "test1") | ||
df_source = spark.read.table("lakefs.test1.company.workers") | ||
df_dest = spark.read.table("lakefs.test2.company.workers") | ||
assert (df_source.schema == df_dest.schema) and (set(df_source.collect()) == set(df_dest.collect())), "test1 and test2 tables should be equal" | ||
|
||
|
||
def test_multiple_changes_and_merge(spark, lfs_client: lakefs_sdk.client.LakeFSClient, lakefs_repo): | ||
lfs_client.branches_api.create_branch(lakefs_repo, BranchCreation(name="test3", source="main")) | ||
lfs_client.branches_api.create_branch(lakefs_repo, BranchCreation(name="test4", source="test3")) | ||
spark.sql("DELETE FROM lakefs.test4.company.workers WHERE id = 6") | ||
spark.sql("DELETE FROM lakefs.test4.company.workers WHERE id = 5") | ||
spark.sql("INSERT INTO lakefs.test4.company.workers VALUES (7, 'Jhon', 'Smith', 33, 'M')") | ||
spark.sql("DELETE FROM lakefs.test4.company.workers WHERE id = 4") | ||
spark.sql("INSERT INTO lakefs.test4.company.workers VALUES (8, 'Marta', 'Green', 31, 'F')") | ||
lfs_client.commits_api.commit(lakefs_repo, "test4", CommitCreation(message="Some changes")) | ||
lfs_client.refs_api.merge_into_branch(lakefs_repo, "test4", "test3") | ||
df_source = spark.read.table("lakefs.test3.company.workers") | ||
df_dest = spark.read.table("lakefs.test4.company.workers") | ||
assert (df_source.schema == df_dest.schema) and (set(df_source.collect()) == set(df_dest.collect())), "test3 and test4 tables should be equal" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. linebreak |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove spaces