Skip to content

Commit

Permalink
copy parquet to csv in last_valid (#108)
Browse files Browse the repository at this point in the history
* copy parquet to csv in last_valid

* Clarification, stricter validation

* explicit unit test of parquet->csv
  • Loading branch information
dogversioning authored Oct 23, 2023
1 parent 0eb3a1d commit 80ef663
Show file tree
Hide file tree
Showing 2 changed files with 68 additions and 7 deletions.
34 changes: 34 additions & 0 deletions src/handlers/site_upload/powerset_merge.py
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,29 @@ def expand_and_concat_sets(
return agg_df


def generate_csv_from_parquet(bucket_name: str, bucket_root: str, subbucket_path: str):
"""Convenience function for generating csvs for dashboard upload
TODO: Remove on dashboard parquet/API support"""
last_valid_df = awswrangler.s3.read_parquet(
f"s3://{bucket_name}/{bucket_root}" f"/{subbucket_path}"
)
last_valid_df = last_valid_df.apply(
lambda x: x.strip() if isinstance(x, str) else x
).replace('""', nan)
# Here we are removing internal commas from fields so we get a valid unquoted CSV
last_valid_df = last_valid_df.replace(to_replace=",", value="", regex=True)
awswrangler.s3.to_csv(
last_valid_df,
(
f"s3://{bucket_name}/{bucket_root}/"
f"{subbucket_path}".replace(".parquet", ".csv")
),
index=False,
quoting=csv.QUOTE_NONE,
)


def merge_powersets(manager: S3Manager) -> None:
"""Creates an aggregate powerset from all files with a given s3 prefix"""
# TODO: this should be memory profiled for large datasets. We can use
Expand Down Expand Up @@ -258,6 +281,17 @@ def merge_powersets(manager: S3Manager) -> None:
f"{BucketPath.LATEST.value}/{subbucket_path}",
f"{BucketPath.LAST_VALID.value}/{subbucket_path}",
)

####################
# For now, we'll create a csv of the file we just put in last valid.
# This is used for uploading to the dashboard.
# TODO: remove as soon as we support either parquet upload or
# the API is supported by the dashboard
generate_csv_from_parquet(
manager.s3_bucket_name, BucketPath.LAST_VALID.value, subbucket_path
)
####################

latest_site = site_specific_name.split("/", maxsplit=1)[0]
manager.update_local_metadata(
TransactionKeys.LAST_DATA_UPDATE.value, site=latest_site
Expand Down
41 changes: 34 additions & 7 deletions tests/site_upload/test_powerset_merge.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from src.handlers.site_upload.powerset_merge import (
MergeError,
expand_and_concat_sets,
generate_csv_from_parquet,
powerset_merge_handler,
)
from tests.utils import (
Expand Down Expand Up @@ -47,7 +48,7 @@
f"{EXISTING_VERSION}/encounter.parquet",
False,
200,
ITEM_COUNT + 3,
ITEM_COUNT + 4,
),
( # Adding a new data package to a site without uploads
"./tests/test_data/count_synthea_patient.parquet",
Expand All @@ -57,7 +58,7 @@
f"/{EXISTING_VERSION}/encounter.parquet",
False,
200,
ITEM_COUNT + 3,
ITEM_COUNT + 4,
),
( # Updating an existing data package
"./tests/test_data/count_synthea_patient.parquet",
Expand All @@ -67,7 +68,7 @@
f"/{EXISTING_VERSION}/encounter.parquet",
True,
200,
ITEM_COUNT + 2,
ITEM_COUNT + 3,
),
( # New version of existing data package
"./tests/test_data/count_synthea_patient.parquet",
Expand All @@ -77,7 +78,7 @@
f"/{NEW_VERSION}/encounter.parquet",
True,
200,
ITEM_COUNT + 4,
ITEM_COUNT + 5,
),
( # Invalid parquet file
"./tests/site_upload/test_powerset_merge.py",
Expand All @@ -97,7 +98,7 @@
f"/{EXISTING_VERSION}/encounter.parquet",
False,
200,
ITEM_COUNT + 3,
ITEM_COUNT + 4,
),
( # ensuring that a data package that is a substring does not get
# merged by substr match
Expand All @@ -108,7 +109,7 @@
f"{EXISTING_SITE}/{EXISTING_VERSION}/encount.parquet",
False,
200,
ITEM_COUNT + 3,
ITEM_COUNT + 4,
),
( # Empty file upload
None,
Expand Down Expand Up @@ -222,7 +223,12 @@ def test_powerset_merge_single_upload(
)

elif item["Key"].startswith(BucketPath.LAST_VALID.value):
assert item["Key"] == (f"{BucketPath.LAST_VALID.value}{upload_path}")
if item["Key"].endswith(".parquet"):
assert item["Key"] == (f"{BucketPath.LAST_VALID.value}{upload_path}")
elif item["Key"].endswith(".csv"):
assert f"{upload_path.replace('.parquet','.csv')}" in item["Key"]
else:
raise Exception("Invalid csv found at " f"{item['Key']}")
else:
assert (
item["Key"].startswith(BucketPath.ARCHIVE.value)
Expand Down Expand Up @@ -342,3 +348,24 @@ def test_expand_and_concat(mock_bucket, upload_file, load_empty, raises):
s3_path,
)
expand_and_concat_sets(df, f"s3://{TEST_BUCKET}/{s3_path}", EXISTING_STUDY)


def test_parquet_to_csv(mock_bucket):
bucket_root = "test"
subbucket_path = "/uploaded.parquet"
s3_client = boto3.client("s3", region_name="us-east-1")
s3_client.upload_file(
"./tests/test_data/cube_strings_with_commas.parquet",
TEST_BUCKET,
f"{bucket_root}/{subbucket_path}",
)
generate_csv_from_parquet(TEST_BUCKET, bucket_root, subbucket_path)
df = awswrangler.s3.read_csv(
f"s3://{TEST_BUCKET}/{bucket_root}/{subbucket_path.replace('.parquet','.csv')}"
)
assert list(df["race"].dropna().unique()) == [
"White",
"Black or African American",
"Asian",
"American Indian or Alaska Native",
]

0 comments on commit 80ef663

Please sign in to comment.