From e076e865b090fb2ff0bb76408e3766841edbcfbe Mon Sep 17 00:00:00 2001 From: Matt Garber Date: Wed, 13 Sep 2023 16:19:40 -0400 Subject: [PATCH] test & cleanup --- .../migration.001.transaction_cleanup.py | 5 +-- src/handlers/site_upload/powerset_merge.py | 3 +- src/handlers/site_upload/process_upload.py | 11 +----- tests/site_upload/test_powerset_merge.py | 17 ++++++-- tests/site_upload/test_process_upload.py | 37 ++++++++++++------ tests/test_data/other_schema.csv | 8 ++++ tests/test_data/other_schema.parquet | Bin 0 -> 2848 bytes 7 files changed, 51 insertions(+), 30 deletions(-) create mode 100644 tests/test_data/other_schema.csv create mode 100644 tests/test_data/other_schema.parquet diff --git a/scripts/migrations/migration.001.transaction_cleanup.py b/scripts/migrations/migration.001.transaction_cleanup.py index fcd27d4..52a69ae 100644 --- a/scripts/migrations/migration.001.transaction_cleanup.py +++ b/scripts/migrations/migration.001.transaction_cleanup.py @@ -45,12 +45,11 @@ def transaction_cleanup(bucket: str): for site in transactions: if site in EXPECTED_UPLOADERS: site_dict = transactions[site] - if "template" in site_dict: - site_dict.pop("template") - print(site_dict) + site_dict.pop("template". None) new_t[site] = site_dict # updating incorrectly spelled keys + # for future migrations, start from scratch with items rather than ignoring for site in new_t: # pylint: disable=consider-using-dict-items for study in new_t[site]: for dp in new_t[site][study]: diff --git a/src/handlers/site_upload/powerset_merge.py b/src/handlers/site_upload/powerset_merge.py index 25e0460..716183b 100644 --- a/src/handlers/site_upload/powerset_merge.py +++ b/src/handlers/site_upload/powerset_merge.py @@ -47,7 +47,6 @@ def __init__(self, event): self.site = s3_key_array[3] self.version = s3_key_array[4] self.metadata = read_metadata(self.s3_client, self.s3_bucket_name) - print(s3_key_array) # S3 Filesystem operations def get_data_package_list(self, path) -> list: @@ -216,7 +215,7 @@ def merge_powersets(manager: S3Manager) -> None: try: if not any(x.endswith(site_specific_name) for x in latest_file_list): df = expand_and_concat_sets(df, last_valid_path, last_valid_site) - manager.update_local_metadata("last_upload", site=last_valid_site) + manager.update_local_metadata("last_aggregation", site=last_valid_site) except MergeError as e: # This is expected to trigger if there's an issue in expand_and_concat_sets; # this usually means there's a data problem. diff --git a/src/handlers/site_upload/process_upload.py b/src/handlers/site_upload/process_upload.py index 1b81ec1..af1bc91 100644 --- a/src/handlers/site_upload/process_upload.py +++ b/src/handlers/site_upload/process_upload.py @@ -32,18 +32,9 @@ def process_upload(s3_client, sns_client, s3_bucket_name: str, s3_key: str) -> N # If someone runs an upload on the template study, we'll just move it # to archive - we don't care about metadata for this, but can look there to # verify transmission if it's a connectivity test - if s3_key.startswith("template__"): + if study == "template": new_key = f"{BucketPath.ARCHIVE.value}/{s3_key.split('/', 1)[-1]}" move_s3_file(s3_client, s3_bucket_name, s3_key, new_key) - metadata = update_metadata( - metadata, - site, - study, - data_package, - version, - "last_upload", - last_uploaded_date, - ) elif s3_key.endswith(".parquet"): if "__meta_" in s3_key: new_key = f"{BucketPath.STUDY_META.value}/{s3_key.split('/', 1)[-1]}" diff --git a/tests/site_upload/test_powerset_merge.py b/tests/site_upload/test_powerset_merge.py index e8b2168..74ee1da 100644 --- a/tests/site_upload/test_powerset_merge.py +++ b/tests/site_upload/test_powerset_merge.py @@ -196,13 +196,13 @@ def test_powerset_merge_single_upload( assert item["Key"].startswith(BucketPath.META.value) metadata = read_metadata(s3_client, TEST_BUCKET) if res["statusCode"] == 200: - print(metadata[site][study]) assert ( metadata[site][study][data_package.split("__")[1]][version][ "last_aggregation" ] == datetime.now(timezone.utc).isoformat() ) + else: assert ( metadata["princeton_plainsboro_teaching_hospital"]["study"][ @@ -212,6 +212,15 @@ def test_powerset_merge_single_upload( "study" ]["encounter"]["099"]["last_aggregation"] ) + if upload_file is not None: + # checking to see that merge powerset didn't touch last upload + assert ( + metadata[site][study][data_package.split("__")[1]][version][ + "last_upload" + ] + != datetime.now(timezone.utc).isoformat() + ) + elif item["Key"].startswith(BucketPath.LAST_VALID.value): assert item["Key"] == (f"{BucketPath.LAST_VALID.value}{upload_path}") else: @@ -236,8 +245,8 @@ def test_powerset_merge_single_upload( "upload_file,archives,expected_errors", [ ("./tests/test_data/count_synthea_patient.parquet", False, 0), - ("./tests/test_data/cube_simple_example.parquet", False, 1), - ("./tests/test_data/cube_simple_example.parquet", True, 1), + ("./tests/test_data/other_schema.parquet", False, 1), + ("./tests/test_data/other_schema.parquet", True, 1), ], ) @mock.patch.dict(os.environ, MOCK_ENV) @@ -293,7 +302,7 @@ def test_powerset_merge_join_study_data( for item in s3_res["Contents"]: if item["Key"].startswith(BucketPath.ERROR.value): errors += 1 - if item["Key"].startswith(f"{BucketPath.AGGREGATE.value}/study"): + elif item["Key"].startswith(f"{BucketPath.AGGREGATE.value}/study"): agg_df = awswrangler.s3.read_parquet(f"s3://{TEST_BUCKET}/{item['Key']}") # if a file cant be merged and there's no fallback, we expect # [, site_name], otherwise, [, site_name, uploading_site_name] diff --git a/tests/site_upload/test_process_upload.py b/tests/site_upload/test_process_upload.py index 4296da0..eca394c 100644 --- a/tests/site_upload/test_process_upload.py +++ b/tests/site_upload/test_process_upload.py @@ -64,6 +64,15 @@ 200, ITEM_COUNT + 1, ), + ( # Upload of the template study + "./tests/test_data/cube_simple_example.parquet", + f"/template/{NEW_DATA_P}/{EXISTING_SITE}" + f"/{EXISTING_VERSION}/document.parquet", + f"/template/{NEW_DATA_P}/{EXISTING_SITE}" + f"/{EXISTING_VERSION}/document.parquet", + 200, + ITEM_COUNT + 1, + ), ( # Non-parquet file "./tests/test_data/cube_simple_example.csv", f"/{EXISTING_STUDY}/{NEW_DATA_P}/{EXISTING_SITE}" @@ -125,6 +134,7 @@ def test_process_upload( assert res["statusCode"] == status s3_res = s3_client.list_objects_v2(Bucket=TEST_BUCKET) assert len(s3_res["Contents"]) == expected_contents + found_archive = False for item in s3_res["Contents"]: if item["Key"].endswith("aggregate.parquet"): assert item["Key"].startswith(BucketPath.AGGREGATE.value) @@ -132,19 +142,22 @@ def test_process_upload( assert item["Key"].startswith(BucketPath.CSVAGGREGATE.value) elif item["Key"].endswith("transactions.json"): assert item["Key"].startswith(BucketPath.META.value) - metadata = read_metadata(s3_client, TEST_BUCKET) - if upload_file is not None and upload_path is not None: - path_params = upload_path.split("/") - study = path_params[1] - data_package = path_params[2] - site = path_params[3] - version = path_params[4] - assert ( - metadata[site][study][data_package][version]["last_upload"] - == datetime.now(timezone.utc).isoformat() - ) + if "template" not in upload_path: + metadata = read_metadata(s3_client, TEST_BUCKET) + if upload_file is not None and upload_path is not None: + path_params = upload_path.split("/") + study = path_params[1] + data_package = path_params[2] + site = path_params[3] + version = path_params[4] + assert ( + metadata[site][study][data_package][version]["last_upload"] + == datetime.now(timezone.utc).isoformat() + ) elif item["Key"].startswith(BucketPath.STUDY_META.value): assert "_meta_" in item["Key"] + elif item["Key"].startswith(BucketPath.ARCHIVE.value): + found_archive = True else: assert ( item["Key"].startswith(BucketPath.LATEST.value) @@ -154,3 +167,5 @@ def test_process_upload( or item["Key"].startswith(BucketPath.CACHE.value) or item["Key"].endswith("study_periods.json") ) + if found_archive: + assert "template" in upload_path diff --git a/tests/test_data/other_schema.csv b/tests/test_data/other_schema.csv new file mode 100644 index 0000000..9a1ed15 --- /dev/null +++ b/tests/test_data/other_schema.csv @@ -0,0 +1,8 @@ +cnt,code,code_display +30,, +20,A-01, +20,,Code A +20,A-01,Code A +10,B-02, +10,,Code B +10,B-02,Code B \ No newline at end of file diff --git a/tests/test_data/other_schema.parquet b/tests/test_data/other_schema.parquet new file mode 100644 index 0000000000000000000000000000000000000000..c316722a07b44318f924191d500e463f24f72d46 GIT binary patch literal 2848 zcmcImOK;*<6gCNil2H^zqZk!IUq%Kx4RGfmOkG?S(`=`Apf#F@31freRou!&MTOQc}-{M8DKo1vA} z^UqR$h2nHLmfNBtF=kb@U3w+D8T~yJ{UelWq}HAS97_Xrie8~$*n)|oV4t90Rj?S2 zvUMB=8QRUTc+T%;vS9dvj!sW;7}*J9ITOZKV6>jVFso8%btCm_I8JW_M8j%N(+Sh> zLeU>WsXtd!Kdr_W+?Vp%q0kO;-6OoFc>~&-gQ%@-i{6zavv#8<^Ln zW(*ut9us~)hoZlPQr|>UzdghJ7NM4Lj8$?J6B(!--ID1DB=PkMsSHJLz=|CGHC2fN z4I-F}87QKgv3TtGf35AOsK=+yUG&>UhcCe+?TS z?~;J)0qi`=m7$y3<-E+Nb?J^B94_hE2EI3hfoD16iK3@;b=AdkOLt{Wc4aWXOX@b5 z8+IG}UYr+qKTqDzkeO$ZgqFqtb2J`=3p@D zUS^L?LV_Nm&nowFSVu7%sL}W!zVjQM`DgtN{tnwG2%$RsznVpok1 zMqbg!<-RQDuK8}S&D-UUVreFCjM}`F>nP#{w(&IaW>6_0L)~w5RJ#r`Gz#`ZwZI?p zC+dJNly5ao7-5`hlhv@kl|340BI`A?er%~ZiRraO*2K1oEewepb zrGX)dxvr9JIEtkbJV9u1mGic7wZL5+gMHD3{5|l(=gMXL94&I5(b;*(CFF~p`cMAyezt4V*hg6f!Nb(b2f~lNwx|joMJJN$z;7Hk2SA*j}|k*^OFo4_nRJEC*Jx%;`C-$2n87<&TnB3v$Yu8dn62MrU%jO2J*+FPk7A zs)k>Gg)kfccXfaylUa#hPN~Lu1$Gb!dT6k zLj_|N_?t@i!mFN@kD%)~+@Rox*S9 Fe*y2v>nQ*L literal 0 HcmV?d00001