diff --git a/Lambda_Dockerfile b/Lambda_Dockerfile new file mode 100644 index 0000000..bd53938 --- /dev/null +++ b/Lambda_Dockerfile @@ -0,0 +1,12 @@ +FROM --platform=linux/amd64 public.ecr.aws/lambda/python:3.12 + +RUN dnf install -y git + +COPY requirements.txt ${LAMBDA_TASK_ROOT}/requirements.txt + +RUN pip install -r requirements.txt --target "${LAMBDA_TASK_ROOT}" + +COPY src/ ${LAMBDA_TASK_ROOT} + +# Pass the name of the function handler as an argument to the runtime +CMD [ "regtech_data_validator.lambda_wrapper.lambda_handler" ] \ No newline at end of file diff --git a/lambda_requirements.txt b/lambda_requirements.txt new file mode 100644 index 0000000..50bdb1c --- /dev/null +++ b/lambda_requirements.txt @@ -0,0 +1,13 @@ +polars +awslambdaric +pandera +ujson +boto3 +tabulate +fsspec +s3fs +sqlalchemy +pydantic +psycopg2-binary +pyarrow +sbl_filing_api@git+https://github.com/cfpb/sbl-filing-api \ No newline at end of file diff --git a/src/regtech_data_validator/data_formatters.py b/src/regtech_data_validator/data_formatters.py index 9faa705..ec10425 100644 --- a/src/regtech_data_validator/data_formatters.py +++ b/src/regtech_data_validator/data_formatters.py @@ -1,5 +1,4 @@ import ujson -import pandas as pd import polars as pl from tabulate import tabulate @@ -17,7 +16,7 @@ def find_check(group_name, checks): # which corresponds to severity, error/warning code, name of error/warning, row number in sblar, UID, fig link, # error/warning description (markdown formatted), single/multi/register, and the fields and values associated with the error/warning. # Each row in the final dataframe represents all data for that one finding. -def format_findings(df: pd.DataFrame, checks): +def format_findings(df: pl.DataFrame, checks): final_df = pl.DataFrame() sorted_df = df.with_columns(pl.col('validation_id').cast(pl.Categorical(ordering='lexical'))).sort('validation_id') @@ -39,8 +38,8 @@ def format_findings(df: pd.DataFrame, checks): ) df_pivot.columns = [ ( - col.replace('field_name_field_number_', 'field_').replace('field_value_field_number_', 'value_') - if ('field_name_field_number_' in col or 'field_value_field_number_' in col) + col.replace('field_name_', 'field_').replace('field_value_', 'value_') + if ('field_name_' in col or 'field_value_' in col) else col ) for col in df_pivot.columns @@ -65,7 +64,6 @@ def format_findings(df: pd.DataFrame, checks): field_columns = [col for col in df_pivot.columns if col.startswith('field_')] value_columns = [col for col in df_pivot.columns if col.startswith('value_')] sorted_columns = [col for pair in zip(field_columns, value_columns) for col in pair] - # swap two-field errors/warnings to keep order of FIG if len(field_columns) == 2: df_pivot = df_pivot.with_columns( @@ -94,7 +92,7 @@ def format_findings(df: pd.DataFrame, checks): return final_df -def df_to_download(df: pd.DataFrame, report_name: str = "download_report.csv"): +def df_to_download(df: pl.DataFrame, report_name: str = "download_report.csv"): if df.is_empty(): # return headers of csv for 'emtpy' report pl.DataFrame( diff --git a/src/regtech_data_validator/lambda_wrapper.py b/src/regtech_data_validator/lambda_wrapper.py new file mode 100644 index 0000000..32d61f9 --- /dev/null +++ b/src/regtech_data_validator/lambda_wrapper.py @@ -0,0 +1,7 @@ +from regtech_data_validator.service import service_validate + +def lambda_handler(event, context): + + bucket = event['Records'][0]['s3']['bucket']['name'] + file = event['Records'][0]['s3']['object']['key'] + service_validate(bucket, file) \ No newline at end of file diff --git a/src/regtech_data_validator/service.py b/src/regtech_data_validator/service.py new file mode 100644 index 0000000..cafa33d --- /dev/null +++ b/src/regtech_data_validator/service.py @@ -0,0 +1,89 @@ +import logging +import polars as pl +from fsspec import AbstractFileSystem, filesystem +from pydantic import PostgresDsn +from sqlalchemy import create_engine +from sqlalchemy.orm import sessionmaker +from urllib import parse + +from regtech_data_validator.validator import validate_batch_csv, ValidationSchemaError +from sbl_filing_api.entities.models.dao import SubmissionDAO, UserActionDAO +from sbl_filing_api.entities.models.model_enums import SubmissionState, UserActionType + + +logger = logging.getLogger() +logger.setLevel("INFO") + +def get_validation_db_connection(): + postgres_dsn = PostgresDsn.build( + scheme="postgresql+psycopg2", + username="appuser", + password=parse.quote("OieuISykG/I1qdnJ", safe=""), + host="regtech-validations-main.cowleab4pwre.us-east-1.rds.amazonaws.com", + path="validation_findings", + ) + conn_str = str(postgres_dsn) + return create_engine(conn_str) + + +def service_validate(bucket, file): + lei = file.split("/")[2] + submission_id = file.split("/")[3].split(".csv")[0] + + filing_conn = None + + try: + filing_conn = sessionmaker(bind=get_filing_db_connection())() + submission = filing_conn.query(SubmissionDAO).filter_by(id=submission_id).first() + submission.state = SubmissionState.VALIDATION_IN_PROGRESS + filing_conn.commit() + + try: + s3_path = f"{bucket}/{file}" + + fs: AbstractFileSystem = filesystem("filecache", target_protocol='s3', cache_storage='/tmp/files/') + with fs.open(s3_path, "r") as f: + final_state = SubmissionState.VALIDATION_SUCCESSFUL + for findings, phase in validate_batch_csv(f.name, {lei:lei}, batch_size=50000, batch_count=1): + findings = findings.with_columns( + phase=pl.lit(phase), + submission_id=pl.lit(submission_id) + ) + if final_state != SubmissionState.VALIDATION_WITH_ERRORS: + final_state = SubmissionState.VALIDATION_WITH_ERRORS if findings.filter(pl.col('validation_type') == 'Error').height > 0 else SubmissionState.VALIDATION_WITH_WARNINGS + findings.write_database(table_name="findings", connection=get_validation_db_connection(), if_table_exists="append") + submission.state = final_state + filing_conn.commit() + + except ValidationSchemaError as vse: + logger.exception("The file is malformed.") + submission = filing_conn.query(SubmissionDAO).filter_by(id=submission_id).first() + submission.state = SubmissionState.SUBMISSION_UPLOAD_MALFORMED + filing_conn.commit() + + except Exception as err: + logger.exception(f"Error processing submission file.") + if filing_conn: + submission = filing_conn.query(SubmissionDAO).filter_by(id=submission_id).first() + submission.state = SubmissionState.VALIDATION_ERROR + filing_conn.commit() + + except Exception as err: + logger.exception(f"Error processing submission file.") + if filing_conn: + submission = filing_conn.query(SubmissionDAO).filter_by(id=submission_id).first() + submission.state = SubmissionState.VALIDATION_ERROR + filing_conn.commit() + + + +def get_filing_db_connection(): + postgres_dsn = PostgresDsn.build( + scheme="postgresql+psycopg2", + username="filing_user", + password=parse.quote("ZB8I8/3ZOzMfz6eM", safe=""), + host="regtech-sbl-devpub-cd-eval.cowleab4pwre.us-east-1.rds.amazonaws.com", + path="filing_db", + ) + conn_str = str(postgres_dsn) + return create_engine(conn_str) \ No newline at end of file