Skip to content

Commit

Permalink
Add tests for the template
Browse files Browse the repository at this point in the history
  • Loading branch information
sbuldeev committed Nov 27, 2024
1 parent 98c0a3d commit 077c082
Show file tree
Hide file tree
Showing 3 changed files with 133 additions and 47 deletions.
Original file line number Diff line number Diff line change
@@ -1,63 +1,51 @@
#TODO Align with the upsert tempalte
# Copyright 2023-2024 Broadcom
# SPDX-License-Identifier: Apache-2.0
"""
Load example input data for an scd1 template test.
"""
from vdk.api.job_input import IJobInput


def run(job_input: IJobInput) -> None:
target_schema = job_input.get_arguments().get("target_schema")
target_table = job_input.get_arguments().get("target_table")
source_schema = job_input.get_arguments().get("source_schema")
source_view = job_input.get_arguments().get("source_view")
# Step 1: create a table that represents the current state

source_composite_name = f'"{source_schema}"."{source_view}"'
target_composite_name = f'"{target_schema}"."{target_table}"'
source_data_composite_name = f'"{source_schema}"."{source_view}_data"'

# Step 1: create a new table that represents the current state
job_input.execute_query(
f"""
DROP TABLE IF EXISTS {target_composite_name}
"""
DROP TABLE IF EXISTS {target_schema}.{target_table}
"""
)
job_input.execute_query(
f"""
CREATE TABLE IF NOT EXISTS {target_composite_name} (
"""
CREATE TABLE IF NOT EXISTS {target_schema}.{target_table} (
org_id INT,
org_name VARCHAR,
org_type VARCHAR,
company_name VARCHAR,
sddc_limit INT,
org_host_limit INT
)
"""
"""
)
job_input.execute_query(
f"""
INSERT INTO {target_composite_name} VALUES
"""
INSERT INTO {target_schema}.{target_table} VALUES
(2, '[email protected]' , 'CUSTOMER_POC' , 'VMware' , 1, 6 ),
(3, '[email protected]', 'CUSTOMER' , 'Goofy''s' , 2, 16),
(3, '[email protected]', 'CUSTOMER' , 'Goofy''s' , 2, 16),
(4, '[email protected]' , 'PARTNER_SISO' , 'Uncanny Company' , 2, 16),
(5, '[email protected]' , 'CUSTOMER' , 'VMware' , 2, 32),
(6, '[email protected]' , 'CUSTOMER' , 'PharmaMed' , 1, 32),
(7, '[email protected]' , 'PARTNER_SISO' , 'ACME' , 1, 32),
(8, '[email protected]' , 'INTERNAL_CORE' , 'VMware' , 4, 32)
"""
"""
)

# Step 2: create a new table that represents the next state
# Step 2: create a table that represents the data that will be upserted

job_input.execute_query(
f"""
DROP TABLE IF EXISTS {source_composite_name}
"""
"""
DROP TABLE IF EXISTS {source_schema}.{source_view}
"""
)

job_input.execute_query(
f"""
CREATE TABLE IF NOT EXISTS {source_data_composite_name} (
"""
CREATE TABLE IF NOT EXISTS {source_schema}.{source_view} (
org_id INT,
org_name VARCHAR,
org_type VARCHAR,
Expand All @@ -68,27 +56,44 @@ def run(job_input: IJobInput) -> None:
"""
)
job_input.execute_query(
f"""
INSERT INTO {source_data_composite_name} VALUES
(1, '[email protected]' , 'CUSTOMER_MSP_TENANT', 'actual Master Org', 2, 32),
(2, '[email protected]' , 'CUSTOMER_POC' , 'VMware' , 1, 6 ),
(3, '[email protected]', 'CUSTOMER' , 'Goofy''s' , 2, 32),
(4, '[email protected]' , 'PARTNER_SISO' , 'Uncanny Company' , 2, 32),
(5, '[email protected]' , 'CUSTOMER' , 'VMware' , 2, 32),
(6, '[email protected]' , 'CUSTOMER' , 'PharmaMed' , 2, 32),
(7, '[email protected]' , 'PARTNER_SISO' , 'ACME' , 2, 32),
(8, '[email protected]' , 'INTERNAL_CORE' , 'VMware' , 2, 32)
"""
"""INSERT INTO {source_schema}.{source_view} VALUES
(7, '[email protected]' , 'CUSTOMER' , 'ACME' , 1, 32),
(8, '[email protected]' , 'CUSTOMER' , 'VMware' , 4, 32),
(9, '[email protected]' , 'CUSTOMER_POC' , 'VMware' , 1, 6 ),
(10, '[email protected]' , 'CUSTOMER' , 'Goofy''s' , 2, 16)
"""
)

# Step 3: Create a table containing the state expected after upserting the target table with the source table data

job_input.execute_query(
f"""
DROP VIEW IF EXISTS {source_composite_name}
"""
DROP TABLE IF EXISTS {expect_schema}.{expect_table}
"""
)
job_input.execute_query(
"""
CREATE TABLE IF NOT EXISTS {expect_schema}.{expect_table} (
org_id INT,
org_name VARCHAR,
org_type VARCHAR,
company_name VARCHAR,
sddc_limit INT,
org_host_limit INT
)
"""
)

job_input.execute_query(
f"""
CREATE VIEW {source_composite_name} AS (SELECT * FROM {source_data_composite_name})
"""
INSERT INTO {expect_schema}.{expect_table} VALUES
(2, '[email protected]' , 'CUSTOMER_POC' , 'VMware' , 1, 6 ),
(3, '[email protected]', 'CUSTOMER' , 'Goofy''s' , 2, 16),
(4, '[email protected]' , 'PARTNER_SISO' , 'Uncanny Company' , 2, 16),
(5, '[email protected]' , 'CUSTOMER' , 'VMware' , 2, 32),
(6, '[email protected]' , 'CUSTOMER' , 'PharmaMed' , 1, 32),
(7, '[email protected]' , 'CUSTOMER' , 'ACME' , 1, 32),
(8, '[email protected]' , 'CUSTOMER' , 'VMware' , 4, 32),
(9, '[email protected]' , 'CUSTOMER_POC' , 'VMware' , 1, 6 ),
(10, '[email protected]', 'CUSTOMER' , 'Goofy''s' , 2, 16)
"""
)
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

def run(job_input: IJobInput) -> None:
result = job_input.execute_template(
template_name="scd1",
template_name="scd1_upsert",
template_args=job_input.get_arguments(),
database="trino",
)
Expand Down
83 changes: 82 additions & 1 deletion projects/vdk-plugins/vdk-trino/tests/test_vdk_templates.py
Original file line number Diff line number Diff line change
Expand Up @@ -631,8 +631,89 @@ def test_insert_clean_staging(self) -> None:
second_exec,
f"Clean up of staging table - {staging_table_name} is not made properly. Different data was found in the table after consecutive executions.",
)


##TODO Add testing logic for upsert template here
def test_scd_upsert(self) -> None:
test_schema = self.__schema
source_view = "vw_dim_test_scd_upsert"
target_table = "dw_dim_test_scd_upsert"
expect_table = "ex_dim_test_scd_upsert"
id_column = "org_id"

res = self.__scd_upsert_execute(
test_schema, source_view, target_table, expect_table, id_column
)

cli_assert(not res.exception, res)

actual_rs = self.__trino_query(f"SELECT * FROM {test_schema}.{target_table}")
expected_rs = self.__trino_query(f"SELECT * FROM {test_schema}.{expect_table}")
assert actual_rs.output and expected_rs.output

actual = {x for x in actual_rs.output.split("\n")}
expected = {x for x in expected_rs.output.split("\n")}

self.assertSetEqual(
actual, expected, f"Elements in {expect_table} and {target_table} differ."
)


def __scd_upsert_execute(
self,
test_schema,
source_view,
target_table,
expect_table,
id_column,
check=False,
staging_schema=None,
):
if check != False and staging_schema is not None:
return self.__runner.invoke(
[
"run",
get_test_job_path(
pathlib.Path(os.path.dirname(os.path.abspath(__file__))),
"load_dimension_scd_upsert_template_job",
),
"--arguments",
json.dumps(
{
"source_schema": test_schema,
"source_view": source_view,
"target_schema": test_schema,
"target_table": target_table,
"expect_schema": test_schema,
"expect_table": expect_table,
"id_column": id_column,
"check": check,
"staging_schema": staging_schema,
}
),
]
)
else:
return self.__runner.invoke(
[
"run",
get_test_job_path(
pathlib.Path(os.path.dirname(os.path.abspath(__file__))),
"load_dimension_scd_upsert_template_job",
),
"--arguments",
json.dumps(
{
"source_schema": test_schema,
"source_view": source_view,
"target_schema": test_schema,
"target_table": target_table,
"expect_schema": test_schema,
"expect_table": expect_table,
"id_column": id_column,
}
),
]
)

def __fact_insert_template_execute(
self,
Expand Down

0 comments on commit 077c082

Please sign in to comment.