Skip to content

Commit

Permalink
test & cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
dogversioning committed Sep 13, 2023
1 parent ae2f48b commit e076e86
Show file tree
Hide file tree
Showing 7 changed files with 51 additions and 30 deletions.
5 changes: 2 additions & 3 deletions scripts/migrations/migration.001.transaction_cleanup.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,11 @@ def transaction_cleanup(bucket: str):
for site in transactions:
if site in EXPECTED_UPLOADERS:
site_dict = transactions[site]
if "template" in site_dict:
site_dict.pop("template")
print(site_dict)
site_dict.pop("template". None)
new_t[site] = site_dict

# updating incorrectly spelled keys
# for future migrations, start from scratch with items rather than ignoring
for site in new_t: # pylint: disable=consider-using-dict-items
for study in new_t[site]:
for dp in new_t[site][study]:
Expand Down
3 changes: 1 addition & 2 deletions src/handlers/site_upload/powerset_merge.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ def __init__(self, event):
self.site = s3_key_array[3]
self.version = s3_key_array[4]
self.metadata = read_metadata(self.s3_client, self.s3_bucket_name)
print(s3_key_array)

# S3 Filesystem operations
def get_data_package_list(self, path) -> list:
Expand Down Expand Up @@ -216,7 +215,7 @@ def merge_powersets(manager: S3Manager) -> None:
try:
if not any(x.endswith(site_specific_name) for x in latest_file_list):
df = expand_and_concat_sets(df, last_valid_path, last_valid_site)
manager.update_local_metadata("last_upload", site=last_valid_site)
manager.update_local_metadata("last_aggregation", site=last_valid_site)
except MergeError as e:
# This is expected to trigger if there's an issue in expand_and_concat_sets;
# this usually means there's a data problem.
Expand Down
11 changes: 1 addition & 10 deletions src/handlers/site_upload/process_upload.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,18 +32,9 @@ def process_upload(s3_client, sns_client, s3_bucket_name: str, s3_key: str) -> N
# If someone runs an upload on the template study, we'll just move it
# to archive - we don't care about metadata for this, but can look there to
# verify transmission if it's a connectivity test
if s3_key.startswith("template__"):
if study == "template":
new_key = f"{BucketPath.ARCHIVE.value}/{s3_key.split('/', 1)[-1]}"
move_s3_file(s3_client, s3_bucket_name, s3_key, new_key)
metadata = update_metadata(
metadata,
site,
study,
data_package,
version,
"last_upload",
last_uploaded_date,
)
elif s3_key.endswith(".parquet"):
if "__meta_" in s3_key:
new_key = f"{BucketPath.STUDY_META.value}/{s3_key.split('/', 1)[-1]}"
Expand Down
17 changes: 13 additions & 4 deletions tests/site_upload/test_powerset_merge.py
Original file line number Diff line number Diff line change
Expand Up @@ -196,13 +196,13 @@ def test_powerset_merge_single_upload(
assert item["Key"].startswith(BucketPath.META.value)
metadata = read_metadata(s3_client, TEST_BUCKET)
if res["statusCode"] == 200:
print(metadata[site][study])
assert (
metadata[site][study][data_package.split("__")[1]][version][
"last_aggregation"
]
== datetime.now(timezone.utc).isoformat()
)

else:
assert (
metadata["princeton_plainsboro_teaching_hospital"]["study"][
Expand All @@ -212,6 +212,15 @@ def test_powerset_merge_single_upload(
"study"
]["encounter"]["099"]["last_aggregation"]
)
if upload_file is not None:
# checking to see that merge powerset didn't touch last upload
assert (
metadata[site][study][data_package.split("__")[1]][version][
"last_upload"
]
!= datetime.now(timezone.utc).isoformat()
)

elif item["Key"].startswith(BucketPath.LAST_VALID.value):
assert item["Key"] == (f"{BucketPath.LAST_VALID.value}{upload_path}")
else:
Expand All @@ -236,8 +245,8 @@ def test_powerset_merge_single_upload(
"upload_file,archives,expected_errors",
[
("./tests/test_data/count_synthea_patient.parquet", False, 0),
("./tests/test_data/cube_simple_example.parquet", False, 1),
("./tests/test_data/cube_simple_example.parquet", True, 1),
("./tests/test_data/other_schema.parquet", False, 1),
("./tests/test_data/other_schema.parquet", True, 1),
],
)
@mock.patch.dict(os.environ, MOCK_ENV)
Expand Down Expand Up @@ -293,7 +302,7 @@ def test_powerset_merge_join_study_data(
for item in s3_res["Contents"]:
if item["Key"].startswith(BucketPath.ERROR.value):
errors += 1
if item["Key"].startswith(f"{BucketPath.AGGREGATE.value}/study"):
elif item["Key"].startswith(f"{BucketPath.AGGREGATE.value}/study"):
agg_df = awswrangler.s3.read_parquet(f"s3://{TEST_BUCKET}/{item['Key']}")
# if a file cant be merged and there's no fallback, we expect
# [<NA>, site_name], otherwise, [<NA>, site_name, uploading_site_name]
Expand Down
37 changes: 26 additions & 11 deletions tests/site_upload/test_process_upload.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,15 @@
200,
ITEM_COUNT + 1,
),
( # Upload of the template study
"./tests/test_data/cube_simple_example.parquet",
f"/template/{NEW_DATA_P}/{EXISTING_SITE}"
f"/{EXISTING_VERSION}/document.parquet",
f"/template/{NEW_DATA_P}/{EXISTING_SITE}"
f"/{EXISTING_VERSION}/document.parquet",
200,
ITEM_COUNT + 1,
),
( # Non-parquet file
"./tests/test_data/cube_simple_example.csv",
f"/{EXISTING_STUDY}/{NEW_DATA_P}/{EXISTING_SITE}"
Expand Down Expand Up @@ -125,26 +134,30 @@ def test_process_upload(
assert res["statusCode"] == status
s3_res = s3_client.list_objects_v2(Bucket=TEST_BUCKET)
assert len(s3_res["Contents"]) == expected_contents
found_archive = False
for item in s3_res["Contents"]:
if item["Key"].endswith("aggregate.parquet"):
assert item["Key"].startswith(BucketPath.AGGREGATE.value)
elif item["Key"].endswith("aggregate.csv"):
assert item["Key"].startswith(BucketPath.CSVAGGREGATE.value)
elif item["Key"].endswith("transactions.json"):
assert item["Key"].startswith(BucketPath.META.value)
metadata = read_metadata(s3_client, TEST_BUCKET)
if upload_file is not None and upload_path is not None:
path_params = upload_path.split("/")
study = path_params[1]
data_package = path_params[2]
site = path_params[3]
version = path_params[4]
assert (
metadata[site][study][data_package][version]["last_upload"]
== datetime.now(timezone.utc).isoformat()
)
if "template" not in upload_path:
metadata = read_metadata(s3_client, TEST_BUCKET)
if upload_file is not None and upload_path is not None:
path_params = upload_path.split("/")
study = path_params[1]
data_package = path_params[2]
site = path_params[3]
version = path_params[4]
assert (
metadata[site][study][data_package][version]["last_upload"]
== datetime.now(timezone.utc).isoformat()
)
elif item["Key"].startswith(BucketPath.STUDY_META.value):
assert "_meta_" in item["Key"]
elif item["Key"].startswith(BucketPath.ARCHIVE.value):
found_archive = True
else:
assert (
item["Key"].startswith(BucketPath.LATEST.value)
Expand All @@ -154,3 +167,5 @@ def test_process_upload(
or item["Key"].startswith(BucketPath.CACHE.value)
or item["Key"].endswith("study_periods.json")
)
if found_archive:
assert "template" in upload_path
8 changes: 8 additions & 0 deletions tests/test_data/other_schema.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
cnt,code,code_display
30,,
20,A-01,
20,,Code A
20,A-01,Code A
10,B-02,
10,,Code B
10,B-02,Code B
Binary file added tests/test_data/other_schema.parquet
Binary file not shown.

0 comments on commit e076e86

Please sign in to comment.