Skip to content

Commit

Permalink
explicit unit test of parquet->csv
Browse files Browse the repository at this point in the history
  • Loading branch information
dogversioning committed Oct 23, 2023
1 parent d4b78eb commit 7049963
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 18 deletions.
45 changes: 27 additions & 18 deletions src/handlers/site_upload/powerset_merge.py
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,29 @@ def expand_and_concat_sets(
return agg_df


def generate_csv_from_parquet(bucket_name: str, bucket_root: str, subbucket_path: str):
"""Convenience function for generating csvs for dashboard upload
TODO: Remove on dashboard parquet/API support"""
last_valid_df = awswrangler.s3.read_parquet(
f"s3://{bucket_name}/{bucket_root}" f"/{subbucket_path}"
)
last_valid_df = last_valid_df.apply(
lambda x: x.strip() if isinstance(x, str) else x
).replace('""', nan)
# Here we are removing internal commas from fields so we get a valid unquoted CSV
last_valid_df = last_valid_df.replace(to_replace=",", value="", regex=True)
awswrangler.s3.to_csv(
last_valid_df,
(
f"s3://{bucket_name}/{bucket_root}/"
f"{subbucket_path}".replace(".parquet", ".csv")
),
index=False,
quoting=csv.QUOTE_NONE,
)


def merge_powersets(manager: S3Manager) -> None:
"""Creates an aggregate powerset from all files with a given s3 prefix"""
# TODO: this should be memory profiled for large datasets. We can use
Expand Down Expand Up @@ -258,29 +281,15 @@ def merge_powersets(manager: S3Manager) -> None:
f"{BucketPath.LATEST.value}/{subbucket_path}",
f"{BucketPath.LAST_VALID.value}/{subbucket_path}",
)

####################
# For now, we'll create a csv of the file we just put in last valid.
# This is occasionally useful for uploading to the dashboard.
# This is used for uploading to the dashboard.
# TODO: remove as soon as we support either parquet upload or
# the API is supported by the dashboard
last_valid_df = awswrangler.s3.read_parquet(
f"s3://{manager.s3_bucket_name}/{BucketPath.LAST_VALID.value}"
f"/{subbucket_path}"
generate_csv_from_parquet(
manager.s3_bucket_name, BucketPath.LAST_VALID.value, subbucket_path
)
last_valid_df = last_valid_df.apply(
lambda x: x.strip() if isinstance(x, str) else x
).replace('""', nan)
last_valid_df = last_valid_df.replace(to_replace=r",", value="", regex=True)
awswrangler.s3.to_csv(
last_valid_df,
(
f"s3://{manager.s3_bucket_name}/{BucketPath.LAST_VALID.value}/"
f"{subbucket_path}".replace(".parquet", ".csv")
),
index=False,
quoting=csv.QUOTE_NONE,
)

####################

latest_site = site_specific_name.split("/", maxsplit=1)[0]
Expand Down
22 changes: 22 additions & 0 deletions tests/site_upload/test_powerset_merge.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from src.handlers.site_upload.powerset_merge import (
MergeError,
expand_and_concat_sets,
generate_csv_from_parquet,
powerset_merge_handler,
)
from tests.utils import (
Expand Down Expand Up @@ -347,3 +348,24 @@ def test_expand_and_concat(mock_bucket, upload_file, load_empty, raises):
s3_path,
)
expand_and_concat_sets(df, f"s3://{TEST_BUCKET}/{s3_path}", EXISTING_STUDY)


def test_parquet_to_csv(mock_bucket):
bucket_root = "test"
subbucket_path = "/uploaded.parquet"
s3_client = boto3.client("s3", region_name="us-east-1")
s3_client.upload_file(
"./tests/test_data/cube_strings_with_commas.parquet",
TEST_BUCKET,
f"{bucket_root}/{subbucket_path}",
)
generate_csv_from_parquet(TEST_BUCKET, bucket_root, subbucket_path)
df = awswrangler.s3.read_csv(
f"s3://{TEST_BUCKET}/{bucket_root}/{subbucket_path.replace('.parquet','.csv')}"
)
assert list(df["race"].dropna().unique()) == [
"White",
"Black or African American",
"Asian",
"American Indian or Alaska Native",
]

0 comments on commit 7049963

Please sign in to comment.