Skip to content

Commit

Permalink
Proof of concept files for running as lambda
Browse files Browse the repository at this point in the history
  • Loading branch information
jcadam14 committed Aug 20, 2024
1 parent 1cdf338 commit 98c5dee
Show file tree
Hide file tree
Showing 5 changed files with 125 additions and 6 deletions.
12 changes: 12 additions & 0 deletions Lambda_Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
FROM --platform=linux/amd64 public.ecr.aws/lambda/python:3.12

RUN dnf install -y git

COPY requirements.txt ${LAMBDA_TASK_ROOT}/requirements.txt

RUN pip install -r requirements.txt --target "${LAMBDA_TASK_ROOT}"

COPY src/ ${LAMBDA_TASK_ROOT}

# Pass the name of the function handler as an argument to the runtime
CMD [ "regtech_data_validator.lambda_wrapper.lambda_handler" ]
13 changes: 13 additions & 0 deletions lambda_requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
polars
awslambdaric
pandera
ujson
boto3
tabulate
fsspec
s3fs
sqlalchemy
pydantic
psycopg2-binary
pyarrow
sbl_filing_api@git+https://github.com/cfpb/sbl-filing-api
10 changes: 4 additions & 6 deletions src/regtech_data_validator/data_formatters.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import ujson
import pandas as pd
import polars as pl

from tabulate import tabulate
Expand All @@ -17,7 +16,7 @@ def find_check(group_name, checks):
# which corresponds to severity, error/warning code, name of error/warning, row number in sblar, UID, fig link,
# error/warning description (markdown formatted), single/multi/register, and the fields and values associated with the error/warning.
# Each row in the final dataframe represents all data for that one finding.
def format_findings(df: pd.DataFrame, checks):
def format_findings(df: pl.DataFrame, checks):
final_df = pl.DataFrame()

sorted_df = df.with_columns(pl.col('validation_id').cast(pl.Categorical(ordering='lexical'))).sort('validation_id')
Expand All @@ -39,8 +38,8 @@ def format_findings(df: pd.DataFrame, checks):
)
df_pivot.columns = [
(
col.replace('field_name_field_number_', 'field_').replace('field_value_field_number_', 'value_')
if ('field_name_field_number_' in col or 'field_value_field_number_' in col)
col.replace('field_name_', 'field_').replace('field_value_', 'value_')
if ('field_name_' in col or 'field_value_' in col)
else col
)
for col in df_pivot.columns
Expand All @@ -65,7 +64,6 @@ def format_findings(df: pd.DataFrame, checks):
field_columns = [col for col in df_pivot.columns if col.startswith('field_')]
value_columns = [col for col in df_pivot.columns if col.startswith('value_')]
sorted_columns = [col for pair in zip(field_columns, value_columns) for col in pair]

# swap two-field errors/warnings to keep order of FIG
if len(field_columns) == 2:
df_pivot = df_pivot.with_columns(
Expand Down Expand Up @@ -94,7 +92,7 @@ def format_findings(df: pd.DataFrame, checks):
return final_df


def df_to_download(df: pd.DataFrame, report_name: str = "download_report.csv"):
def df_to_download(df: pl.DataFrame, report_name: str = "download_report.csv"):
if df.is_empty():
# return headers of csv for 'emtpy' report
pl.DataFrame(
Expand Down
7 changes: 7 additions & 0 deletions src/regtech_data_validator/lambda_wrapper.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
from regtech_data_validator.service import service_validate

def lambda_handler(event, context):

bucket = event['Records'][0]['s3']['bucket']['name']
file = event['Records'][0]['s3']['object']['key']
service_validate(bucket, file)
89 changes: 89 additions & 0 deletions src/regtech_data_validator/service.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
import logging
import polars as pl
from fsspec import AbstractFileSystem, filesystem
from pydantic import PostgresDsn
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from urllib import parse

from regtech_data_validator.validator import validate_batch_csv, ValidationSchemaError
from sbl_filing_api.entities.models.dao import SubmissionDAO, UserActionDAO
from sbl_filing_api.entities.models.model_enums import SubmissionState, UserActionType


logger = logging.getLogger()
logger.setLevel("INFO")

def get_validation_db_connection():
postgres_dsn = PostgresDsn.build(
scheme="postgresql+psycopg2",
username="appuser",
password=parse.quote("OieuISykG/I1qdnJ", safe=""),
host="regtech-validations-main.cowleab4pwre.us-east-1.rds.amazonaws.com",
path="validation_findings",
)
conn_str = str(postgres_dsn)
return create_engine(conn_str)


def service_validate(bucket, file):
lei = file.split("/")[2]
submission_id = file.split("/")[3].split(".csv")[0]

filing_conn = None

try:
filing_conn = sessionmaker(bind=get_filing_db_connection())()
submission = filing_conn.query(SubmissionDAO).filter_by(id=submission_id).first()
submission.state = SubmissionState.VALIDATION_IN_PROGRESS
filing_conn.commit()

try:
s3_path = f"{bucket}/{file}"

fs: AbstractFileSystem = filesystem("filecache", target_protocol='s3', cache_storage='/tmp/files/')
with fs.open(s3_path, "r") as f:
final_state = SubmissionState.VALIDATION_SUCCESSFUL
for findings, phase in validate_batch_csv(f.name, {lei:lei}, batch_size=50000, batch_count=1):
findings = findings.with_columns(
phase=pl.lit(phase),
submission_id=pl.lit(submission_id)
)
if final_state != SubmissionState.VALIDATION_WITH_ERRORS:
final_state = SubmissionState.VALIDATION_WITH_ERRORS if findings.filter(pl.col('validation_type') == 'Error').height > 0 else SubmissionState.VALIDATION_WITH_WARNINGS
findings.write_database(table_name="findings", connection=get_validation_db_connection(), if_table_exists="append")
submission.state = final_state
filing_conn.commit()

except ValidationSchemaError as vse:
logger.exception("The file is malformed.")
submission = filing_conn.query(SubmissionDAO).filter_by(id=submission_id).first()
submission.state = SubmissionState.SUBMISSION_UPLOAD_MALFORMED
filing_conn.commit()

except Exception as err:
logger.exception(f"Error processing submission file.")
if filing_conn:
submission = filing_conn.query(SubmissionDAO).filter_by(id=submission_id).first()
submission.state = SubmissionState.VALIDATION_ERROR
filing_conn.commit()

except Exception as err:
logger.exception(f"Error processing submission file.")
if filing_conn:
submission = filing_conn.query(SubmissionDAO).filter_by(id=submission_id).first()
submission.state = SubmissionState.VALIDATION_ERROR
filing_conn.commit()



def get_filing_db_connection():
postgres_dsn = PostgresDsn.build(
scheme="postgresql+psycopg2",
username="filing_user",
password=parse.quote("ZB8I8/3ZOzMfz6eM", safe=""),
host="regtech-sbl-devpub-cd-eval.cowleab4pwre.us-east-1.rds.amazonaws.com",
path="filing_db",
)
conn_str = str(postgres_dsn)
return create_engine(conn_str)

0 comments on commit 98c5dee

Please sign in to comment.