diff --git a/src/validator/create_schemas.py b/src/validator/create_schemas.py index e582747b..725608f2 100644 --- a/src/validator/create_schemas.py +++ b/src/validator/create_schemas.py @@ -1,6 +1,8 @@ """Creates two DataFrameSchema objects by rendering the schema template with validations listed in phase 1 and phase 2.""" +import pandas as pd +from checks import SBLCheck from pandera import DataFrameSchema from pandera.errors import SchemaErrors from phase_validations import get_phase_1_and_2_validations_for_lei @@ -21,26 +23,68 @@ def get_schema_by_phase_for_lei(template: dict, phase: str, lei: str = None): def print_schema_errors(errors: SchemaErrors, phase: str): - for error in errors.schema_errors: - # Name of the column in the dataframe being checked - schema_error = error["error"] + findings = [] + print("Validation failed for phase: " + phase + ":") + for schema_error in errors.schema_errors: + error = schema_error["error"] + check: SBLCheck = error.check + column_name = error.schema.name check_id = "n/a" - # built in checks such as unique=True are different than custom - # checks unfortunately so the name needs to be accessed differently - try: - check_name = schema_error.check.name - check_id = schema_error.check.id + fields: list[str] = [column_name] + + if hasattr(check, "name"): + check_name: str = check.name + + if check.groupby: + fields += check.groupby # type: ignore + # This will either be a boolean series or a single bool - check_output = schema_error.check_output - except AttributeError: - check_name = schema_error.check - # this is just a string that we'd need to parse manually - check_output = schema_error.args[0] - - print(f"{phase} Validation `{check_name}` with id: `{check_id}` failed for column `{{column_name}}`") - print(check_output) - print("") + check_output = error.check_output + else: + # This means this check's column has unique set to True. + # we shouldn't be using Unique flag as it doesn't return series of + # validation result . it returns just a printout result string/txt + raise AttributeError(f"{str(check)}") + if hasattr(check, "id"): + check_id = schema_error.check.id + + # Remove duplicates, but keep as `list` for JSON-friendliness + fields = list(set(fields)) + + if check_output is not None: + # `check_output` must be sorted so its index lines up with `df`'s index + check_output.sort_index(inplace=True) + + # Filter records using Pandas's boolean indexing, where all False values + # get filtered out. The `~` does the inverse since it's actually the + # False values we want to keep. + # http://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#boolean-indexing + failed_check_fields_df = df[~check_output][fields].fillna("") + + # Create list of dicts representing the failed validations and the + # associated field data for each invalid record. + records = [] + for idx, row in failed_check_fields_df.iterrows(): + record = {"number": idx + 1, "field_values": {}} + for field in fields: + record["field_values"][field] = row[field] + records.append(record) + + validation_findings = { + "validation": { + "id": check_id, + "name": check_name, + "description": check.description, + "fields": fields, + "severity": "warning" if check.warning else "error", + }, + "records": records, + } + + findings.append(validation_findings) + + return findings def get_phase_1_schema_for_lei(lei: str = None):