diff --git a/piperider_cli/cli.py b/piperider_cli/cli.py index 6b7b0c394..09540d28a 100644 --- a/piperider_cli/cli.py +++ b/piperider_cli/cli.py @@ -395,6 +395,6 @@ def reconcile(**kwargs): if __name__ == '__main__': - # reconciler = Reconciler(engine=None) - # reconciler.reconcile(project='pipeline_v2') - compare_reports() \ No newline at end of file + reconciler = Reconciler(engine=None) + reconciler.reconcile(project='test_mock') + # compare_reports() \ No newline at end of file diff --git a/piperider_cli/configuration.py b/piperider_cli/configuration.py index 36dc2693b..e9a556bb1 100644 --- a/piperider_cli/configuration.py +++ b/piperider_cli/configuration.py @@ -218,7 +218,8 @@ def load(cls, piperider_config_path=PIPERIDER_CONFIG_PATH, piperider_reconcile_p if os.path.exists(piperider_reconcile_path): # logger.info(f'Loading reconcile rules from {piperider_reconcile_path}') with open(piperider_reconcile_path, 'r') as frecon: - r_config = yaml.safe_load(frecon) + yml=yaml.YAML(typ='safe', pure=True) + r_config = yml.load(frecon) r_projects = r_config.get('Reconciles') if r_projects: diff --git a/piperider_cli/reconciler/reconcile_rule.py b/piperider_cli/reconciler/reconcile_rule.py index 87f5f0a19..18e29ef31 100644 --- a/piperider_cli/reconciler/reconcile_rule.py +++ b/piperider_cli/reconciler/reconcile_rule.py @@ -23,12 +23,6 @@ class ReconcileSuite: target_join_key: str column_reconcile_rules: List[ColumnReconcileRule] description: str = None - # base_source: str - # target_source: str - # base_table: str - # target_table: str - # base_join_key: str - # target_join_key: str def validate(self): reasons = [] diff --git a/piperider_cli/reconciler/reconciler.py b/piperider_cli/reconciler/reconciler.py index c47fb02d2..5cedfd7d5 100644 --- a/piperider_cli/reconciler/reconciler.py +++ b/piperider_cli/reconciler/reconciler.py @@ -6,6 +6,7 @@ from rich.console import Console, escape from sqlalchemy.engine import Connection, Engine, create_engine from sqlalchemy import Table, Column, text +from snowflake.sqlalchemy.custom_types import TIMESTAMP_LTZ, TIMESTAMP_NTZ, TIMESTAMP_TZ from piperider_cli import datetime_to_str from piperider_cli.profiler.profiler import * @@ -79,8 +80,6 @@ def reconcile(self, output=None, report_dir=None, project=None) -> dict: # Validating console.rule("Validating") - # TODO: starts with single reconciliation if there are multiple rules - if project: r_project: ReconcileProject = configuration.get_reconcile_project(project) else: @@ -89,10 +88,7 @@ def reconcile(self, output=None, report_dir=None, project=None) -> dict: project_name = r_project.name project_description = r_project.description suites = r_project.suites - # rule_base_table = r_project.base_table - # rule_base_column = r_project.base_join_key - # rule_target_table = r_project.target_table - # rule_target_column = r_project.target_join_key + result["project"] = project_name result["description"] = project_description @@ -146,6 +142,7 @@ def reconcile(self, output=None, report_dir=None, project=None) -> dict: tables_dict = {} tables_for_profiling = [] + # To enable target and source from different database/schema if is_same_source: engine = create_engine(bds.to_database_url(), **bds.engine_args()) @@ -157,10 +154,13 @@ def reconcile(self, output=None, report_dir=None, project=None) -> dict: tables_for_profiling.append(base_table) tables_for_profiling.append(target_table) + base_table = tables_to_profile.get("base") + target_table = tables_to_profile.get("target") + profiler = Profiler( - engine, RichProfilerEventHandler(tables_to_profile.get("base").extend(tables_to_profile.get("target"))) + engine, RichProfilerEventHandler(base_table + target_table) ) - # try: + schema = inspect(engine).default_schema_name base_tables = [ProfileSubject(t, schema, t) for t in tables_to_profile.get("base")] @@ -175,10 +175,6 @@ def reconcile(self, output=None, report_dir=None, project=None) -> dict: tables_dict["base"] = [profiler._fetch_table_metadata(ProfileSubject(t, schema, t), reflecting_cache={}) for t in tables_to_profile.get("base")] tables_dict["target"] = [profiler._fetch_table_metadata(ProfileSubject(t, schema, t), reflecting_cache={}) for t in tables_to_profile.get("target")] - # tables_dict["target_table"] = profiler._fetch_table_metadata( - # ProfileSubject(target_table, schema, target_table), reflecting_cache={} - # ) - else: bengine = create_engine(bds.to_database_url(), **bds.engine_args()) tengine = create_engine(tds.to_database_url(), **tds.engine_args()) @@ -195,7 +191,6 @@ def reconcile(self, output=None, report_dir=None, project=None) -> dict: t_profiler_result = tprofilers.profile(target_tables) result["profiling"]["base"] = b_profiler_result.get("tables") result["profiling"]["target"] = t_profiler_result.get("tables") - # base_table, target_table = self._get_table_list(r_project) # TODO: differentiat table name with schema/database tables_dict["base"] = [bprofilers._fetch_table_metadata(t, reflecting_cache={}) for t in base_tables] @@ -207,13 +202,16 @@ def reconcile(self, output=None, report_dir=None, project=None) -> dict: # Reconciling console.rule("Reconciling") - - for suite in suites: + for idx, suite in enumerate(suites, 1): base_table: Table = list(filter(lambda x: x.name == suite.base_table, tables_dict.get('base')))[0] target_table: Table = list(filter(lambda x: x.name == suite.target_table, tables_dict.get('target')))[0] - base_col = base_table.columns[suite.base_join_key] - target_col = target_table.columns[suite.target_join_key] + # concatenation if it's a list + + console.print(f'[{idx}/{len(suites)}] suites: {suite.name}') + + base_col = suite.base_join_key + target_col = suite.target_join_key suite_result = { "name": suite.name, @@ -222,30 +220,13 @@ def reconcile(self, output=None, report_dir=None, project=None) -> dict: "description": suite.description, "base_table": base_table.name, "target_table": target_table.name, - "base_column": base_col.name, - "target_column:": target_col.name, + "base_column": base_col, + "target_column:": target_col, }, "tables": {}, "columns": {}, } - # tables_dict = {} - # for table in tables: - # tables_dict[table] = profiler._fetch_table_metadata(ProfileSubject(table, schema, table), reflecting_cache={}) - - # Use sqlalchmey Table type generated by Profiles - # base_table: Table = tables_dict["base_table"] - # target_table: Table = tables_dict["target_table"] - # base_col = base_table.columns[r_project.base_join_key] - # target_col = target_table.columns[r_project.target_join_key] - # rules = r_project.column_reconcile_rules - - # base_database = bds.database - # target_database = tds.database - # base_schema = bds.schema - # target_schema = tds.schema - - console.rule("Reconciling Tables Stats") trecon = self._reconcile_table( base_table, target_table, @@ -253,11 +234,8 @@ def reconcile(self, output=None, report_dir=None, project=None) -> dict: target_col, ) - # result["reconcile"]["tables"] = trecon suite_result["tables"].update(trecon) - console.rule("Reconciling Columns") - # rules = suite.column_reconcile_rules crecon = self._reconcile_column( base_table, target_table, @@ -270,26 +248,6 @@ def reconcile(self, output=None, report_dir=None, project=None) -> dict: result["reconcile"].append(suite_result) - # result["reconcile"]["columns"] = crecon - # result["reconcile"]["metadata"] = { - # "name": project_name, - # "description": project_description, - # "base_table": rule_base_table, - # "base_column": rule_base_column, - # "target_table": rule_target_table, - # "target_column": rule_target_column, - # "base_source": base_source, - # "target_source": target_source, - # # "base_database": base_database, - # # "target_database": target_database, - # # "base_schema": base_schema, - # # "target_schema": target_schema, - # } - # result["reconcile"]["name"] = project_name - # result["created_at"] = created_at.isoformat() - # result["id"] = run_id - # decorate_with_metadata(result["profiling"]) - console.rule("Summary") # TODO: Implement summary presentation funciton @@ -301,7 +259,6 @@ def reconcile(self, output=None, report_dir=None, project=None) -> dict: if output: clone_directory(output_path, output) - console.print_json() console.print(f"Reconcile report: {output_file}") with open(output_file, "w") as f: @@ -313,8 +270,8 @@ def _reconcile_table( self, base_table: Table, target_table: Table, - base_col: Column, - target_col: Column, + base_col: str, + target_col: str, base_database: str = None, target_database: str = None, ): @@ -344,8 +301,8 @@ def _reconcile_column( self, base_table: Table, target_table: Table, - base_column: Column, - target_column: Column, + base_column: str, + target_column: str, rules: List[ColumnReconcileRule], ): result = {} @@ -358,7 +315,7 @@ def _reconcile_column( index = 0 for rule in rules: # TODO: implement progress bar - print(f"[{index+1}/{len(rules)}] {rule.name}") + console.print(f" [{index+1}/{len(rules)}] {rule.name}") # Loop through base and target column data type to dispath to specific column reconciler @@ -424,9 +381,8 @@ def _reconcile_column( base_db, target_db, ) - elif isinstance(base_compare_key.type, (Date, DateTime)) and isinstance( - target_compare_key.type, (Date, DateTime) - ): + elif isinstance(base_compare_key.type, (Date, DateTime, TIMESTAMP_TZ, TIMESTAMP_LTZ, TIMESTAMP_NTZ)) and isinstance( + target_compare_key.type, (Date, DateTime, TIMESTAMP_TZ, TIMESTAMP_LTZ, TIMESTAMP_NTZ)): # DATE # DATETIME generic_type = "datetime" @@ -460,7 +416,7 @@ def _reconcile_column( target_db, ) elif base_compare_key.type != target_compare_key.type: - console.print(f'base type {base_compare_key.type} is mismatched with target type {target_compare_key.type}') + generic_type = f"mismatch: {base_compare_key.type} vs {target_compare_key.type}" reconciler = MismatchDataTypeColumnReconciler( self.bengine, base_table, @@ -473,7 +429,7 @@ def _reconcile_column( base_db, target_db, ) - else: + else: raise UnhandableColumnTypeError _result = reconciler.reconcile() @@ -498,8 +454,8 @@ def __init__( bengine: Engine, base_table: Table, target_table: Table, - base_col: Column, - target_col: Column, + base_col: str, + target_col: str, base_compare_col: Column, target_compare_col: Column, name: str, @@ -593,8 +549,8 @@ def __init__( engine: Engine, base_table: Table, target_table: Table, - base_col: Column, - target_col: Column, + base_col: str, + target_col: str, base_compare_col: Column, target_compare_col: Column, name: str, @@ -620,8 +576,8 @@ def reconcile(self) -> dict: # case insenstive: lower(bc) = lower(tc) # trim space: trim(lower(bc)) = trim(lower(bc)) - bkey = self.base_col.name - tkey = self.target_col.name + bkey = self.base_col + tkey = self.target_col bcompkey = self.base_compare_col.name tcompkey = self.target_compare_col.name @@ -657,9 +613,9 @@ def reconcile(self) -> dict: result = { "name": self.name, "base_table": self.base_table.name, - "base_column": self.base_col.name, + "base_column": self.base_col, "target_table": self.target_table.name, - "target_column": self.target_col.name, + "target_column": self.target_col, "base_compare_key": self.base_compare_col.name, "target_compare_key": self.target_compare_col.name, "total": _total, @@ -669,10 +625,10 @@ def reconcile(self) -> dict: "not_comparable": _not_comparable, "equal_case_insensitive": _equal_case_insensitive, "equal_trim_whitespace": _equal_trim_whitespace, - "equal_percentage": dtostr(round(_equal / _common, 4)), - "not_equal_percentage": dtostr(round(_not_equal / _common, 4)), - "equal_case_insensitive_percentage": dtostr(round(_equal_case_insensitive / _common, 4)), - "equal_trim_whitespace_percentage": dtostr(round(_equal_trim_whitespace / _common, 4)) + "equal_percentage": dtostr(round(_equal / _common, 4)) if _common != 0 else 0, + "not_equal_percentage": dtostr(round(_not_equal / _common, 4)) if _common != 0 else 0, + "equal_case_insensitive_percentage": dtostr(round(_equal_case_insensitive / _common, 4)) if _common != 0 else 0, + "equal_trim_whitespace_percentage": dtostr(round(_equal_trim_whitespace / _common, 4)) if _common != 0 else 0 } return result @@ -684,8 +640,8 @@ def __init__( engine: Engine, base_table: Table, target_table: Table, - base_col: Column, - target_col: Column, + base_col: str, + target_col: str, base_compare_col: Column, target_compare_col: Column, name: str, @@ -706,8 +662,8 @@ def __init__( ) def reconcile(self) -> dict: - bkey = self.base_col.name - tkey = self.target_col.name + bkey = self.base_col + tkey = self.target_col bcompkey = self.base_compare_col.name tcompkey = self.target_compare_col.name cte = super()._base_cte(bkey, bcompkey, tkey, tcompkey) @@ -753,9 +709,9 @@ def reconcile(self) -> dict: result = { "name": self.name, "base_table": self.base_table.name, - "base_column": self.base_col.name, + "base_column": self.base_col, "target_table": self.target_table.name, - "target_column": self.target_col.name, + "target_column": self.target_col, "base_compare_key": self.base_compare_col.name, "target_compare_key": self.target_compare_col.name, "total": _total, @@ -765,10 +721,10 @@ def reconcile(self) -> dict: "not_comparable": _not_comparable, "equal_within_5_difference": _equal_within_5_difference, "equal_within_10_difference": _equal_within_10_difference, - "equal_percentage": dtostr(round(_equal / _common, 4)), - "not_equal_percentage": dtostr(round(_not_equal / _common, 4)), - "equal_within_5_difference_percentage": dtostr(round(_equal_within_5_difference / _common, 4)), - "equal_within_10_difference_percentage": dtostr(round(_equal_within_10_difference / _common, 4)), + "equal_percentage": dtostr(round(_equal / _common, 4)) if _common != 0 else 0, + "not_equal_percentage": dtostr(round(_not_equal / _common, 4)) if _common != 0 else 0, + "equal_within_5_difference_percentage": dtostr(round(_equal_within_5_difference / _common, 4)) if _common != 0 else 0, + "equal_within_10_difference_percentage": dtostr(round(_equal_within_10_difference / _common, 4)) if _common != 0 else 0, } return result @@ -780,8 +736,8 @@ def __init__( engine: Engine, base_table: Table, target_table: Table, - base_col: Column, - target_col: Column, + base_col: str, + target_col: str, base_compare_col: Column, target_compare_col: Column, name: str, @@ -802,8 +758,8 @@ def __init__( ) def reconcile(self) -> dict: - bkey = self.base_col.name - tkey = self.target_col.name + bkey = self.base_col + tkey = self.target_col bcompkey = self.base_compare_col.name tcompkey = self.target_compare_col.name cte = super()._base_cte(bkey, bcompkey, tkey, tcompkey) @@ -850,9 +806,9 @@ def reconcile(self) -> dict: result = { "name": self.name, "base_table": self.base_table.name, - "base_column": self.base_col.name, + "base_column": self.base_col, "target_table": self.target_table.name, - "target_column": self.target_col.name, + "target_column": self.target_col, "base_compare_key": self.base_compare_col.name, "target_compare_key": self.target_compare_col.name, "total": _total, @@ -860,14 +816,14 @@ def reconcile(self) -> dict: "equal": _equal, "not_equal": _not_equal, "not_comparable": _not_comparable, - "equal_percentage": dtostr(round(_equal / _common, 4)), - "not_equal_percentage": dtostr(round(1 - _equal / _common, 4)), + "equal_percentage": dtostr(round(_equal / _common, 4)) if _common != 0 else 0, + "not_equal_percentage": dtostr(round(1 - _equal / _common, 4)) if _common != 0 else 0, "equal_within_1_day_difference": equal_within_1_day_difference, "equal_within_1_week_difference": equal_within_1_week_difference, "equal_within_1_month_difference": equal_within_1_month_difference, - "equal_within_1_day_difference_percentage": dtostr(round(equal_within_1_day_difference / _common, 4)), - "equal_within_1_week_difference_percentage": dtostr(round(equal_within_1_week_difference / _common, 4)), - "equal_within_1_month_difference_percentage": dtostr(round(equal_within_1_month_difference / _common, 4)), + "equal_within_1_day_difference_percentage": dtostr(round(equal_within_1_day_difference / _common, 4)) if _common != 0 else 0, + "equal_within_1_week_difference_percentage": dtostr(round(equal_within_1_week_difference / _common, 4)) if _common != 0 else 0, + "equal_within_1_month_difference_percentage": dtostr(round(equal_within_1_month_difference / _common, 4)) if _common != 0 else 0, } return result @@ -878,8 +834,8 @@ def __init__( engine: Engine, base_table: Table, target_table: Table, - base_col: Column, - target_col: Column, + base_col: str, + target_col: str, base_compare_col: Column, target_compare_col: Column, name: str, @@ -900,8 +856,8 @@ def __init__( ) def reconcile(self) -> dict: - bkey = self.base_col.name - tkey = self.target_col.name + bkey = self.base_col + tkey = self.target_col bcompkey = self.base_compare_col.name tcompkey = self.target_compare_col.name cte = super()._base_cte(bkey, bcompkey, tkey, tcompkey) @@ -926,17 +882,17 @@ def reconcile(self) -> dict: result = { "name": self.name, "base_table": self.base_table.name, - "base_column": self.base_col.name, + "base_column": self.base_col, "target_table": self.target_table.name, - "target_column": self.target_col.name, + "target_column": self.target_col, "base_compare_key": self.base_compare_col.name, "target_compare_key": self.target_compare_col.name, "total": _total, "equal": _equal, "not_equal": _not_equal, "not_comparable": _not_comparable, - "equal_percentage": dtostr(round(_equal / _common, 4)), - "not_equal_percentage": dtostr(round(1 - _equal / _common, 4)), + "equal_percentage": dtostr(round(_equal / _common, 4)) if _common != 0 else 0, + "not_equal_percentage": dtostr(round(1 - _equal / _common, 4)) if _common != 0 else 0, } return result @@ -952,8 +908,8 @@ def __init__( engine: Engine, base_table: Table, target_table: Table, - base_col: Column, - target_col: Column, + base_col: str, + target_col: str, base_compare_col: Column, target_compare_col: Column, name: str, @@ -1029,8 +985,8 @@ def _base_cte(self, bkey, bcompkey, tkey, tcompkey) -> str: def reconcile(self) -> dict: # Warning("Different type comparison not implemented yet") - bkey = self.base_col.name - tkey = self.target_col.name + bkey = self.base_col + tkey = self.target_col bcompkey = self.base_compare_col.name tcompkey = self.target_compare_col.name cte = super()._base_cte(bkey, bcompkey, tkey, tcompkey) @@ -1060,8 +1016,6 @@ def reconcile(self) -> dict: ) select * from stats """ - # console = Console() - # console.print(query) result = conn.execute(text(query)).fetchone() _total, _common, _equal, _not_equal, _not_comparable = result @@ -1069,17 +1023,17 @@ def reconcile(self) -> dict: result = { "name": self.name, "base_table": self.base_table.name, - "base_column": self.base_col.name, + "base_column": self.base_col, "target_table": self.target_table.name, - "target_column": self.target_col.name, + "target_column": self.target_col, "base_compare_key": self.base_compare_col.name, "target_compare_key": self.target_compare_col.name, "total": _total, "equal": _equal, "not_equal": _not_equal, "not_comparable": _not_comparable, - "equal_percentage": dtostr(round(_equal / _common, 4)), - "not_equal_percentage": dtostr(round(1 - _equal / _common, 4)), + "equal_percentage": dtostr(round(_equal / _common, 4)) if _common != 0 else 0, + "not_equal_percentage": dtostr(round(1 - _equal / _common, 4)) if _common != 0 else 0, } return result @@ -1088,8 +1042,8 @@ def reconcile_table_counts( engine: Engine, base_table: Table, target_table: Table, - base_col: Column, - target_col: Column, + base_col: str, + target_col: str, base_database: str = None, target_database: str = None, ) -> dict: @@ -1108,11 +1062,11 @@ def reconcile_table_counts( query = f""" with b as ( - select {base_col.name} as bid + select {base_col} as bid from {_base_table} ), t as ( - select {target_col.name} as tid + select {target_col} as tid from {_target_table} ), fjoin as ( diff --git a/tests/reconciler/mock.sql b/tests/reconciler/mock.sql new file mode 100644 index 000000000..4ea6dbb4d --- /dev/null +++ b/tests/reconciler/mock.sql @@ -0,0 +1,42 @@ +select current_role(); +use role data_dept_property_dev; + +create or replace table zz_chenxuan_rong_dev.sandbox.test_mock_1 ( + event_id VARCHAR(255), + customer_id VARCHAR(255), + event_date TIMESTAMP_LTZ, + state VARCHAR(255), + purchase_amount INT +); + +create or replace table zz_chenxuan_rong_dev.sandbox.test_mock_2 ( + event_id VARCHAR(255), + customer_id VARCHAR(255), + event_date TIMESTAMP_LTZ, + state VARCHAR(255), + purchase_amount INT +); + +-- Inserting mock data into the events table +INSERT INTO zz_chenxuan_rong_dev.sandbox.test_mock_1 (event_id, customer_id, event_date, state, purchase_amount) VALUES +('E00001', 'C00001', '2023-10-01 08:15:23', 'NY', 150), +('E00002', 'C00002', '2023-10-01 09:20:45', 'CA', 200), +('E00003', 'C00003', '2023-10-01 10:30:12', 'TX', 75), +('E00004', 'C00004', '2023-10-01 11:45:30', 'FL', 300), +('E00100', 'C00100', '2023-10-15 21:55:42', 'CA', 250); + + +INSERT INTO zz_chenxuan_rong_dev.sandbox.test_mock_2 (event_id, customer_id, event_date, state, purchase_amount) VALUES +('E00001', 'C00001', '2023-10-16 08:15:23', 'TX', 150), +('E00002', 'C00002', '2023-10-16 09:20:45', 'NY', 200), +('E00003', 'C00003', '2023-10-16 10:30:12', 'CA', 90), +('E00004', 'C00004', '2023-10-16 11:45:30', 'FL', 220), +('E00005', 'C00005', '2023-10-17 13:20:18', 'TX', 200), +('E00001', 'C00001', '2023-10-18 14:35:45', 'NY', 170), +('E00002', 'C00007', '2023-10-19 15:40:27', 'CA', 110), +('E00008', 'C00008', '2023-10-20 16:55:33', 'FL', 250), +('E01009', 'C00009', '2023-10-21 18:10:12', 'TX', 130), +('E01010', 'C00010', '2023-10-22 19:25:46', 'NY', 190); + +select * from zz_chenxuan_rong_dev.sandbox.test_mock_1; +select * from zz_chenxuan_rong_dev.sandbox.test_mock_2; diff --git a/tests/reconciler/mock_reconcile.yml b/tests/reconciler/mock_reconcile.yml index f9d4dfcd5..a05258427 100644 --- a/tests/reconciler/mock_reconcile.yml +++ b/tests/reconciler/mock_reconcile.yml @@ -9,10 +9,10 @@ Reconciles: description: Compare table in test1 vs test2 base: table: test1 - join_key: user_id + join_key: user_id || activity_id target: table: test2 - join_key: user_id + join_key: user_id || event_id rules: - name: user_name base_column: user_name diff --git a/tests/reconciler/test_reconciler.py b/tests/reconciler/test_reconciler.py index 9f458ab0c..1f2ab99ab 100644 --- a/tests/reconciler/test_reconciler.py +++ b/tests/reconciler/test_reconciler.py @@ -72,8 +72,8 @@ def test_load_reconcile_yml(self) -> None: assert r_suite.description == "Compare table in test1 vs test2" assert r_suite.base_table == "test1" assert r_suite.target_table == "test2" - assert r_suite.base_join_key == "user_id" - assert r_suite.target_join_key == "user_id" + assert r_suite.base_join_key == "user_id || activity_id" + assert r_suite.target_join_key == "user_id || event_id" r_rules = r_suite.column_reconcile_rules assert isinstance(r_rules, List) @@ -251,7 +251,7 @@ def test_reconcile_with_empty_string(self): assert result["equal_percentage"] == "50.00%" - @skip("Sqlite not implemented") + # @skip("Sqlite not implemented") def test_reconciler_e2e(self): reconciler = self.reconciler res = reconciler.reconcile(project="unittest")