From 70499630a45f4151e7e761cd1134a8c74012e03f Mon Sep 17 00:00:00 2001 From: Matt Garber Date: Mon, 23 Oct 2023 11:21:07 -0400 Subject: [PATCH] explicit unit test of parquet->csv --- src/handlers/site_upload/powerset_merge.py | 45 +++++++++++++--------- tests/site_upload/test_powerset_merge.py | 22 +++++++++++ 2 files changed, 49 insertions(+), 18 deletions(-) diff --git a/src/handlers/site_upload/powerset_merge.py b/src/handlers/site_upload/powerset_merge.py index b5273d0..e65a41f 100644 --- a/src/handlers/site_upload/powerset_merge.py +++ b/src/handlers/site_upload/powerset_merge.py @@ -194,6 +194,29 @@ def expand_and_concat_sets( return agg_df +def generate_csv_from_parquet(bucket_name: str, bucket_root: str, subbucket_path: str): + """Convenience function for generating csvs for dashboard upload + + TODO: Remove on dashboard parquet/API support""" + last_valid_df = awswrangler.s3.read_parquet( + f"s3://{bucket_name}/{bucket_root}" f"/{subbucket_path}" + ) + last_valid_df = last_valid_df.apply( + lambda x: x.strip() if isinstance(x, str) else x + ).replace('""', nan) + # Here we are removing internal commas from fields so we get a valid unquoted CSV + last_valid_df = last_valid_df.replace(to_replace=",", value="", regex=True) + awswrangler.s3.to_csv( + last_valid_df, + ( + f"s3://{bucket_name}/{bucket_root}/" + f"{subbucket_path}".replace(".parquet", ".csv") + ), + index=False, + quoting=csv.QUOTE_NONE, + ) + + def merge_powersets(manager: S3Manager) -> None: """Creates an aggregate powerset from all files with a given s3 prefix""" # TODO: this should be memory profiled for large datasets. We can use @@ -258,29 +281,15 @@ def merge_powersets(manager: S3Manager) -> None: f"{BucketPath.LATEST.value}/{subbucket_path}", f"{BucketPath.LAST_VALID.value}/{subbucket_path}", ) + #################### # For now, we'll create a csv of the file we just put in last valid. - # This is occasionally useful for uploading to the dashboard. + # This is used for uploading to the dashboard. # TODO: remove as soon as we support either parquet upload or # the API is supported by the dashboard - last_valid_df = awswrangler.s3.read_parquet( - f"s3://{manager.s3_bucket_name}/{BucketPath.LAST_VALID.value}" - f"/{subbucket_path}" + generate_csv_from_parquet( + manager.s3_bucket_name, BucketPath.LAST_VALID.value, subbucket_path ) - last_valid_df = last_valid_df.apply( - lambda x: x.strip() if isinstance(x, str) else x - ).replace('""', nan) - last_valid_df = last_valid_df.replace(to_replace=r",", value="", regex=True) - awswrangler.s3.to_csv( - last_valid_df, - ( - f"s3://{manager.s3_bucket_name}/{BucketPath.LAST_VALID.value}/" - f"{subbucket_path}".replace(".parquet", ".csv") - ), - index=False, - quoting=csv.QUOTE_NONE, - ) - #################### latest_site = site_specific_name.split("/", maxsplit=1)[0] diff --git a/tests/site_upload/test_powerset_merge.py b/tests/site_upload/test_powerset_merge.py index eedffc1..52fbed8 100644 --- a/tests/site_upload/test_powerset_merge.py +++ b/tests/site_upload/test_powerset_merge.py @@ -15,6 +15,7 @@ from src.handlers.site_upload.powerset_merge import ( MergeError, expand_and_concat_sets, + generate_csv_from_parquet, powerset_merge_handler, ) from tests.utils import ( @@ -347,3 +348,24 @@ def test_expand_and_concat(mock_bucket, upload_file, load_empty, raises): s3_path, ) expand_and_concat_sets(df, f"s3://{TEST_BUCKET}/{s3_path}", EXISTING_STUDY) + + +def test_parquet_to_csv(mock_bucket): + bucket_root = "test" + subbucket_path = "/uploaded.parquet" + s3_client = boto3.client("s3", region_name="us-east-1") + s3_client.upload_file( + "./tests/test_data/cube_strings_with_commas.parquet", + TEST_BUCKET, + f"{bucket_root}/{subbucket_path}", + ) + generate_csv_from_parquet(TEST_BUCKET, bucket_root, subbucket_path) + df = awswrangler.s3.read_csv( + f"s3://{TEST_BUCKET}/{bucket_root}/{subbucket_path.replace('.parquet','.csv')}" + ) + assert list(df["race"].dropna().unique()) == [ + "White", + "Black or African American", + "Asian", + "American Indian or Alaska Native", + ]