From 80ef663df30936304e60f9bea08d7093163b3ee0 Mon Sep 17 00:00:00 2001 From: matt garber Date: Mon, 23 Oct 2023 11:34:31 -0400 Subject: [PATCH] copy parquet to csv in last_valid (#108) * copy parquet to csv in last_valid * Clarification, stricter validation * explicit unit test of parquet->csv --- src/handlers/site_upload/powerset_merge.py | 34 ++++++++++++++++++ tests/site_upload/test_powerset_merge.py | 41 ++++++++++++++++++---- 2 files changed, 68 insertions(+), 7 deletions(-) diff --git a/src/handlers/site_upload/powerset_merge.py b/src/handlers/site_upload/powerset_merge.py index 4cfc25c..e65a41f 100644 --- a/src/handlers/site_upload/powerset_merge.py +++ b/src/handlers/site_upload/powerset_merge.py @@ -194,6 +194,29 @@ def expand_and_concat_sets( return agg_df +def generate_csv_from_parquet(bucket_name: str, bucket_root: str, subbucket_path: str): + """Convenience function for generating csvs for dashboard upload + + TODO: Remove on dashboard parquet/API support""" + last_valid_df = awswrangler.s3.read_parquet( + f"s3://{bucket_name}/{bucket_root}" f"/{subbucket_path}" + ) + last_valid_df = last_valid_df.apply( + lambda x: x.strip() if isinstance(x, str) else x + ).replace('""', nan) + # Here we are removing internal commas from fields so we get a valid unquoted CSV + last_valid_df = last_valid_df.replace(to_replace=",", value="", regex=True) + awswrangler.s3.to_csv( + last_valid_df, + ( + f"s3://{bucket_name}/{bucket_root}/" + f"{subbucket_path}".replace(".parquet", ".csv") + ), + index=False, + quoting=csv.QUOTE_NONE, + ) + + def merge_powersets(manager: S3Manager) -> None: """Creates an aggregate powerset from all files with a given s3 prefix""" # TODO: this should be memory profiled for large datasets. We can use @@ -258,6 +281,17 @@ def merge_powersets(manager: S3Manager) -> None: f"{BucketPath.LATEST.value}/{subbucket_path}", f"{BucketPath.LAST_VALID.value}/{subbucket_path}", ) + + #################### + # For now, we'll create a csv of the file we just put in last valid. + # This is used for uploading to the dashboard. + # TODO: remove as soon as we support either parquet upload or + # the API is supported by the dashboard + generate_csv_from_parquet( + manager.s3_bucket_name, BucketPath.LAST_VALID.value, subbucket_path + ) + #################### + latest_site = site_specific_name.split("/", maxsplit=1)[0] manager.update_local_metadata( TransactionKeys.LAST_DATA_UPDATE.value, site=latest_site diff --git a/tests/site_upload/test_powerset_merge.py b/tests/site_upload/test_powerset_merge.py index 74ee1da..52fbed8 100644 --- a/tests/site_upload/test_powerset_merge.py +++ b/tests/site_upload/test_powerset_merge.py @@ -15,6 +15,7 @@ from src.handlers.site_upload.powerset_merge import ( MergeError, expand_and_concat_sets, + generate_csv_from_parquet, powerset_merge_handler, ) from tests.utils import ( @@ -47,7 +48,7 @@ f"{EXISTING_VERSION}/encounter.parquet", False, 200, - ITEM_COUNT + 3, + ITEM_COUNT + 4, ), ( # Adding a new data package to a site without uploads "./tests/test_data/count_synthea_patient.parquet", @@ -57,7 +58,7 @@ f"/{EXISTING_VERSION}/encounter.parquet", False, 200, - ITEM_COUNT + 3, + ITEM_COUNT + 4, ), ( # Updating an existing data package "./tests/test_data/count_synthea_patient.parquet", @@ -67,7 +68,7 @@ f"/{EXISTING_VERSION}/encounter.parquet", True, 200, - ITEM_COUNT + 2, + ITEM_COUNT + 3, ), ( # New version of existing data package "./tests/test_data/count_synthea_patient.parquet", @@ -77,7 +78,7 @@ f"/{NEW_VERSION}/encounter.parquet", True, 200, - ITEM_COUNT + 4, + ITEM_COUNT + 5, ), ( # Invalid parquet file "./tests/site_upload/test_powerset_merge.py", @@ -97,7 +98,7 @@ f"/{EXISTING_VERSION}/encounter.parquet", False, 200, - ITEM_COUNT + 3, + ITEM_COUNT + 4, ), ( # ensuring that a data package that is a substring does not get # merged by substr match @@ -108,7 +109,7 @@ f"{EXISTING_SITE}/{EXISTING_VERSION}/encount.parquet", False, 200, - ITEM_COUNT + 3, + ITEM_COUNT + 4, ), ( # Empty file upload None, @@ -222,7 +223,12 @@ def test_powerset_merge_single_upload( ) elif item["Key"].startswith(BucketPath.LAST_VALID.value): - assert item["Key"] == (f"{BucketPath.LAST_VALID.value}{upload_path}") + if item["Key"].endswith(".parquet"): + assert item["Key"] == (f"{BucketPath.LAST_VALID.value}{upload_path}") + elif item["Key"].endswith(".csv"): + assert f"{upload_path.replace('.parquet','.csv')}" in item["Key"] + else: + raise Exception("Invalid csv found at " f"{item['Key']}") else: assert ( item["Key"].startswith(BucketPath.ARCHIVE.value) @@ -342,3 +348,24 @@ def test_expand_and_concat(mock_bucket, upload_file, load_empty, raises): s3_path, ) expand_and_concat_sets(df, f"s3://{TEST_BUCKET}/{s3_path}", EXISTING_STUDY) + + +def test_parquet_to_csv(mock_bucket): + bucket_root = "test" + subbucket_path = "/uploaded.parquet" + s3_client = boto3.client("s3", region_name="us-east-1") + s3_client.upload_file( + "./tests/test_data/cube_strings_with_commas.parquet", + TEST_BUCKET, + f"{bucket_root}/{subbucket_path}", + ) + generate_csv_from_parquet(TEST_BUCKET, bucket_root, subbucket_path) + df = awswrangler.s3.read_csv( + f"s3://{TEST_BUCKET}/{bucket_root}/{subbucket_path.replace('.parquet','.csv')}" + ) + assert list(df["race"].dropna().unique()) == [ + "White", + "Black or African American", + "Asian", + "American Indian or Alaska Native", + ]