diff --git a/docs/se_diagrams/flow.png b/docs/se_diagrams/flow.png index dbff58f..ac8d6c3 100644 Binary files a/docs/se_diagrams/flow.png and b/docs/se_diagrams/flow.png differ diff --git a/spark_expectations/examples/base_setup.py b/spark_expectations/examples/base_setup.py index 89081b0..4a78d7c 100644 --- a/spark_expectations/examples/base_setup.py +++ b/spark_expectations/examples/base_setup.py @@ -27,18 +27,18 @@ RULES_DATA = """ - ("your_product", "dq_spark_local.customer_order", "row_dq", "sales_greater_than_zero", "sales", "sales > 2000", "ignore", "accuracy", "sales value should be greater than zero", false, true, true, false, 0,null, null) + ("your_product", "dq_spark_dev.customer_order", "row_dq", "sales_greater_than_zero", "sales", "sales > 2", "ignore", "accuracy", "sales value should be greater than zero", false, true, true, false, 0,null, null) ,("your_product", "dq_spark_{env}.customer_order", "row_dq", "discount_threshold", "discount", "discount*100 < 60","drop", "validity", "discount should be less than 40", true, true, true, false, 0,null, null) ,("your_product", "dq_spark_{env}.customer_order", "row_dq", "ship_mode_in_set", "ship_mode", "lower(trim(ship_mode)) in('second class', 'standard class', 'standard class')", "drop", "validity", "ship_mode mode belongs in the sets", true, true, true, false, 0,null, null) - ,("your_product", "dq_spark_{env}.customer_order", "row_dq", "profit_threshold", "profit", "profit>0", "ignore", "validity", "profit threshold should be greater tahn 0", false, true, true, true, 0,null, null) - ,("your_product", "dq_spark_local.customer_order", "agg_dq", "sum_of_sales_range type 1", "sales", "sum(sales)>99 and sum(sales)<99999", "ignore", "validity", "regex format validation for quantity", true, true, true, false, 0, null, true) - ,("your_product", "dq_spark_local.customer_order", "agg_dq", "sum_of_sales_range type 2", "sales", "sum(sales) between 100 and 10000 ", "ignore", "validity", "regex format validation for quantity", true, true, true, false, 0, null, true) - ,("your_product", "dq_spark_local.customer_order", "agg_dq", "sum_of_sales", "sales", "sum(sales)>10000", "ignore", "validity", "regex format validation for quantity", true, true, true, false, 0,null, null) - ,("your_product", "dq_spark_local.customer_order", "agg_dq", "sum_of_quantity", "quantity", "sum(quantity)>10000", "ignore", "validity", "regex format validation for quantity", true, true, true, false, 0,null, null) - ,("your_product", "dq_spark_local.customer_order", "query_dq", "product_missing_count_threshold", "*", "((select count(*) from ({source_f1}) a) - (select count(*) from ({target_f1}) b) ) < 3$source_f1$select distinct product_id,order_id from order_source$target_f1$select distinct product_id,order_id from order_target", "ignore", "validity", "row count threshold", true, true, true, false, 0,null, true) - ,("your_product", "dq_spark_local.customer_order", "query_dq", "customer_missing_count_threshold","*", "((select count(*) from ({source_f1}) a join ({source_f2}) b on a.customer_id = b.customer_id) - (select count(*) from ({target_f1}) a join ({target_f2}) b on a.customer_id = b.customer_id)) > ({target_f3})$source_f1$select customer_id, count(*) from customer_source group by customer_id$source_f2$select customer_id, count(*) from order_source group by customer_id$target_f1$select customer_id, count(*) from customer_target group by customer_id$target_f2$select customer_id, count(*) from order_target group by customer_id$target_f3$select count(*) from order_source", "ignore", "validity", "customer count threshold", true, true, true, false, 0,null, true) + ,("your_product", "dq_spark_{env}.customer_order", "row_dq", "profit_threshold", "profit", "profit>0", "ignore", "validity", "profit threshold should be greater tahn 0", false, true, false, true, 0,null, null) + ,("your_product", "dq_spark_dev.customer_order", "agg_dq", "sum_of_sales_range type 1", "sales", "sum(sales)>99 and sum(sales)<99999", "ignore", "validity", "regex format validation for quantity", true, true, true, false, 0, null, true) + ,("your_product", "dq_spark_dev.customer_order", "agg_dq", "sum_of_sales_range type 2", "sales", "sum(sales) between 100 and 10000 ", "ignore", "validity", "regex format validation for quantity", true, true, true, false, 0, null, true) + ,("your_product", "dq_spark_dev.customer_order", "agg_dq", "sum_of_sales", "sales", "sum(sales)>10000", "ignore", "validity", "regex format validation for quantity", true, true, true, false, 0,null, null) + ,("your_product", "dq_spark_dev.customer_order", "agg_dq", "sum_of_quantity", "quantity", "sum(quantity)>10000", "ignore", "validity", "regex format validation for quantity", true, true, true, false, 0,null, null) + ,("your_product", "dq_spark_dev.customer_order", "query_dq", "product_missing_count_threshold", "*", "((select count(*) from ({source_f1}) a) - (select count(*) from ({target_f1}) b) ) < 3@source_f1@select distinct product_id,order_id from order_source@target_f1@select distinct product_id,order_id from order_target", "ignore", "validity", "row count threshold", true, true, true, false, 0,null, true) + ,("your_product", "dq_spark_dev.customer_order", "query_dq", "customer_missing_count_threshold","*", "((select count(*) from ({source_f1}) a join ({source_f2}) b on a.customer_id = b.customer_id) - (select count(*) from ({target_f1}) a join ({target_f2}) b on a.customer_id = b.customer_id)) > ({target_f3})@source_f1@select customer_id, count(*) from customer_source group by customer_id@source_f2@select customer_id, count(*) from order_source group by customer_id@target_f1@select customer_id, count(*) from customer_target group by customer_id@target_f2@select customer_id, count(*) from order_target group by customer_id@target_f3@select count(*) from order_source", "ignore", "validity", "customer count threshold", true, true, true, false, 0,null, true) ,("your_product", "dq_spark_dev.customer_order", "query_dq", "order_count_validity", "*", "({source_f1}) > 10@source_f1@select count(*) from order_source", "ignore", "validity", "row count threshold", true, true, true, false, 0, "@", true) - ,("your_product", "dq_spark_local.customer_order", "query_dq", "order_count_validity_check", "*", "(select count(*) from order_source) > 10", "ignore", "validity", "row count threshold", true, true, true, false, 0, null, true) + ,("your_product", "dq_spark_dev.customer_order", "query_dq", "order_count_validity_check", "*", "(select count(*) from order_source) > 10", "ignore", "validity", "row count threshold", true, true, true, false, 0, null, true) ,("your_product", "dq_spark_{env}.customer_order", "query_dq", "product_category", "*", "(select count(distinct category) from {table}) < 5", "ignore", "validity", "distinct product category", true, true, true, false, 0,null, true) ,("your_product", "dq_spark_{env}.customer_order", "agg_dq", "distinct_of_ship_mode", "ship_mode", "count(distinct ship_mode) <= 3", "ignore", "validity", "regex format validation for quantity", true, true, true, false, 0,null, null) diff --git a/spark_expectations/sinks/utils/writer.py b/spark_expectations/sinks/utils/writer.py index bd3787c..67d93c5 100644 --- a/spark_expectations/sinks/utils/writer.py +++ b/spark_expectations/sinks/utils/writer.py @@ -855,8 +855,8 @@ def write_error_records_final( ) _error_count = error_df.count() - if _error_count > 0: - self.generate_summarized_row_dq_res(error_df, rule_type) + # if _error_count > 0: + self.generate_summarized_row_dq_res(error_df, rule_type) _log.info("_write_error_records_final ended") return _error_count, df