diff --git a/docs/dashboard_api.prod.yaml b/docs/dashboard_api.prod.yaml index b2a20e2..e9eae66 100644 --- a/docs/dashboard_api.prod.yaml +++ b/docs/dashboard_api.prod.yaml @@ -8,13 +8,8 @@ servers: basePath: default: "/" paths: - /data_packages: + /metadata: get: - parameters: - - name: "name" - in: "query" - schema: - type: "string" security: - api_key: [] options: @@ -35,15 +30,14 @@ paths: content: {} security: - api_key: [] - /metadata: + /last-valid: get: security: - api_key: [] options: - summary: "CORS support" responses: "200": - description: "Default response for CORS method" + description: "200 response" headers: Access-Control-Allow-Origin: schema: @@ -57,27 +51,35 @@ paths: content: {} security: - api_key: [] - /chart-data/{subscription_name}: + /data-packages/{data_package_id}/chart: get: parameters: - - name: "subscription_name" + - name: "data_package_id" in: "path" required: true schema: type: "string" + - name: "column" + in: "query" + required: true + schema: + type: "string" + - name: "filters" + in: "query" + schema: + type: "string" security: - api_key: [] options: - summary: "CORS support" parameters: - - name: "subscription_name" + - name: "data_package_id" in: "path" required: true schema: type: "string" responses: "200": - description: "Default response for CORS method" + description: "200 response" headers: Access-Control-Allow-Origin: schema: @@ -113,28 +115,6 @@ paths: content: {} security: - api_key: [] - /last_valid: - get: - security: - - api_key: [] - options: - summary: "CORS support" - responses: - "200": - description: "Default response for CORS method" - headers: - Access-Control-Allow-Origin: - schema: - type: "string" - Access-Control-Allow-Methods: - schema: - type: "string" - Access-Control-Allow-Headers: - schema: - type: "string" - content: {} - security: - - api_key: [] /metadata/{site}/{study}: get: parameters: @@ -351,7 +331,33 @@ paths: content: {} security: - api_key: [] - /last_valid/{study}/{data_package}/{site}/{version}/{filename}: + /data-packages: + get: + parameters: + - name: "name" + in: "query" + schema: + type: "string" + security: + - api_key: [] + options: + responses: + "200": + description: "200 response" + headers: + Access-Control-Allow-Origin: + schema: + type: "string" + Access-Control-Allow-Methods: + schema: + type: "string" + Access-Control-Allow-Headers: + schema: + type: "string" + content: {} + security: + - api_key: [] + /last-valid/{study}/{data_package}/{site}/{version}/{filename}: get: parameters: - name: "filename" @@ -382,7 +388,6 @@ paths: security: - api_key: [] options: - summary: "CORS support" parameters: - name: "filename" in: "path" @@ -411,7 +416,7 @@ paths: type: "string" responses: "200": - description: "Default response for CORS method" + description: "200 response" headers: Access-Control-Allow-Origin: schema: @@ -425,10 +430,10 @@ paths: content: {} security: - api_key: [] - /study-periods/{site}: + /data-packages/{data_package_id}: get: parameters: - - name: "site" + - name: "data_package_id" in: "path" required: true schema: @@ -436,16 +441,15 @@ paths: security: - api_key: [] options: - summary: "CORS support" parameters: - - name: "site" + - name: "data_package_id" in: "path" required: true schema: type: "string" responses: "200": - description: "Default response for CORS method" + description: "200 response" headers: Access-Control-Allow-Origin: schema: @@ -459,19 +463,9 @@ paths: content: {} security: - api_key: [] - /metadata/{site}/{study}/{data_package}: + /study-periods/{site}: get: parameters: - - name: "data_package" - in: "path" - required: true - schema: - type: "string" - - name: "study" - in: "path" - required: true - schema: - type: "string" - name: "site" in: "path" required: true @@ -482,16 +476,6 @@ paths: options: summary: "CORS support" parameters: - - name: "data_package" - in: "path" - required: true - schema: - type: "string" - - name: "study" - in: "path" - required: true - schema: - type: "string" - name: "site" in: "path" required: true @@ -513,10 +497,20 @@ paths: content: {} security: - api_key: [] - /data_packages/{id}: + /metadata/{site}/{study}/{data_package}: get: parameters: - - name: "id" + - name: "data_package" + in: "path" + required: true + schema: + type: "string" + - name: "study" + in: "path" + required: true + schema: + type: "string" + - name: "site" in: "path" required: true schema: @@ -524,8 +518,19 @@ paths: security: - api_key: [] options: + summary: "CORS support" parameters: - - name: "id" + - name: "data_package" + in: "path" + required: true + schema: + type: "string" + - name: "study" + in: "path" + required: true + schema: + type: "string" + - name: "site" in: "path" required: true schema: diff --git a/scripts/migrations/migration.004.s3_name_with_id.py b/scripts/migrations/migration.004.s3_name_with_id.py new file mode 100644 index 0000000..21db0f5 --- /dev/null +++ b/scripts/migrations/migration.004.s3_name_with_id.py @@ -0,0 +1,47 @@ +"""Removes unexpected root nodes/templates/misspelled keys from transaction log.""" + +import argparse +import io +import json + +import boto3 + + +def _get_s3_data(key: str, bucket_name: str, client) -> dict: + """Convenience class for retrieving a dict from S3""" + try: + bytes_buffer = io.BytesIO() + client.download_fileobj(Bucket=bucket_name, Key=key, Fileobj=bytes_buffer) + return json.loads(bytes_buffer.getvalue().decode()) + except Exception: # pylint: disable=broad-except + return {} + + +def _put_s3_data(key: str, bucket_name: str, client, data: dict) -> None: + """Convenience class for writing a dict to S3""" + b_data = io.BytesIO(json.dumps(data).encode()) + client.upload_fileobj(Bucket=bucket_name, Key=key, Fileobj=b_data) + + +def s3_name_with_id(bucket: str): + client = boto3.client("s3") + res = client.list_objects_v2(Bucket=bucket) + contents = res["Contents"] + moved_files = 0 + for s3_file in contents: + key = s3_file["Key"] + key_array = key.split("/") + if key_array[0] == "aggregates" and len(key_array[3]) == 3: + key_array[3] = f"{key_array[2]}__{key_array[3]}" + new_key = "/".join(key_array) + client.copy({"Bucket": bucket, "Key": key}, bucket, new_key) + client.delete_object(Bucket=bucket, Key=key) + moved_files += 1 + print(f"Updated {moved_files} aggregates") + + +if __name__ == "__main__": + parser = argparse.ArgumentParser(description="""Changes lowest directory in S3 to file id""") + parser.add_argument("-b", "--bucket", help="bucket name") + args = parser.parse_args() + s3_name_with_id(args.bucket) diff --git a/src/handlers/dashboard/get_chart_data.py b/src/handlers/dashboard/get_chart_data.py index 2e63d71..4da4f61 100644 --- a/src/handlers/dashboard/get_chart_data.py +++ b/src/handlers/dashboard/get_chart_data.py @@ -2,18 +2,22 @@ This is intended to provide an implementation of the logic described in docs/api.md """ +import logging import os import awswrangler import boto3 import pandas -from src.handlers.dashboard.filter_config import get_filter_string -from src.handlers.shared.enums import BucketPath -from src.handlers.shared.functions import get_latest_data_package_version, http_response +from src.handlers.dashboard import filter_config +from src.handlers.shared import decorators, enums, errors, functions +log_level = os.environ.get("LAMBDA_LOG_LEVEL", "INFO") +logger = logging.getLogger() +logger.setLevel(log_level) -def _get_table_cols(table_name: str, version: str | None = None) -> list: + +def _get_table_cols(dp_id: str, version: str | None = None) -> list: """Returns the columns associated with a table. Since running an athena query takes a decent amount of time due to queueing @@ -22,24 +26,27 @@ def _get_table_cols(table_name: str, version: str | None = None) -> list: """ s3_bucket_name = os.environ.get("BUCKET_NAME") - prefix = f"{BucketPath.CSVAGGREGATE.value}/{table_name.split('__')[0]}/{table_name}" + dp_name = dp_id.rsplit("__", 1)[0] + prefix = f"{enums.BucketPath.CSVAGGREGATE.value}/{dp_id.split('__')[0]}/{dp_name}" if version is None: - version = get_latest_data_package_version(s3_bucket_name, prefix) - print(f"{prefix}/{version}/{table_name}__aggregate.csv") - s3_key = f"{prefix}/{version}/{table_name}__aggregate.csv" + version = functions.get_latest_data_package_version(s3_bucket_name, prefix) + s3_key = f"{prefix}/{version}/{dp_name}__aggregate.csv" s3_client = boto3.client("s3") - s3_iter = s3_client.get_object( - Bucket=s3_bucket_name, - Key=s3_key, - )["Body"].iter_lines() - return next(s3_iter).decode().split(",") + try: + s3_iter = s3_client.get_object( + Bucket=s3_bucket_name, + Key=s3_key, + )["Body"].iter_lines() + return next(s3_iter).decode().split(",") + except Exception: + raise errors.AggregatorS3Error def _build_query(query_params: dict, filters: list, path_params: dict) -> str: """Creates a query from the dashboard API spec""" - table = path_params["data_package"] - columns = _get_table_cols(table) - filter_str = get_filter_string(filters) + dp_id = path_params["data_package_id"] + columns = _get_table_cols(dp_id) + filter_str = filter_config.get_filter_string(filters) if filter_str != "": filter_str = f"AND {filter_str}" count_col = next(c for c in columns if c.startswith("cnt")) @@ -60,11 +67,12 @@ def _build_query(query_params: dict, filters: list, path_params: dict) -> str: coalesce_str = "WHERE" query_str = ( f"SELECT {select_str} " # nosec # noqa: S608 - f"FROM \"{os.environ.get('GLUE_DB_NAME')}\".\"{table}\" " + f"FROM \"{os.environ.get('GLUE_DB_NAME')}\".\"{dp_id}\" " f"{coalesce_str} " f"{query_params['column']} IS NOT Null {filter_str} " f"GROUP BY {group_str}" ) + logging.debug(query_str) return query_str @@ -91,7 +99,7 @@ def _format_payload(df: pandas.DataFrame, query_params: dict, filters: list) -> return payload -# @generic_error_handler(msg="Error retrieving chart data") +@decorators.generic_error_handler(msg="Error retrieving chart data") def chart_data_handler(event, context): """manages event from dashboard api call and retrieves data""" del context @@ -99,13 +107,21 @@ def chart_data_handler(event, context): filters = event["multiValueQueryStringParameters"].get("filter", []) path_params = event["pathParameters"] boto3.setup_default_session(region_name="us-east-1") - query = _build_query(query_params, filters, path_params) - df = awswrangler.athena.read_sql_query( - query, - database=os.environ.get("GLUE_DB_NAME"), - s3_output=f"s3://{os.environ.get('BUCKET_NAME')}/awswrangler", - workgroup=os.environ.get("WORKGROUP_NAME"), - ) - res = _format_payload(df, query_params, filters) - res = http_response(200, res) + try: + query = _build_query(query_params, filters, path_params) + df = awswrangler.athena.read_sql_query( + query, + database=os.environ.get("GLUE_DB_NAME"), + s3_output=f"s3://{os.environ.get('BUCKET_NAME')}/awswrangler", + workgroup=os.environ.get("WORKGROUP_NAME"), + ) + res = _format_payload(df, query_params, filters) + res = functions.http_response(200, res) + except errors.AggregatorS3Error: + # while the API is publicly accessible, we've been asked to not pass + # helpful error messages back. revisit when dashboard is in AWS. + res = functions.http_response( + 404, f"Aggregate for {path_params['data_package_id']} not found" + ) + return res diff --git a/src/handlers/dashboard/get_data_packages.py b/src/handlers/dashboard/get_data_packages.py index e47a67b..1399a39 100644 --- a/src/handlers/dashboard/get_data_packages.py +++ b/src/handlers/dashboard/get_data_packages.py @@ -26,7 +26,7 @@ def data_packages_handler(event, context): elif event.get("pathParameters"): found = None for package in data_packages: - if event["pathParameters"]["id"] == package["id"]: + if event["pathParameters"]["data_package_id"] == package["id"]: found = package if found: payload = found diff --git a/src/handlers/shared/errors.py b/src/handlers/shared/errors.py new file mode 100644 index 0000000..a33b967 --- /dev/null +++ b/src/handlers/shared/errors.py @@ -0,0 +1,2 @@ +class AggregatorS3Error(Exception): + """Errors related to accessing files in S3""" diff --git a/src/handlers/site_upload/powerset_merge.py b/src/handlers/site_upload/powerset_merge.py index 7f33dd5..18b6786 100644 --- a/src/handlers/site_upload/powerset_merge.py +++ b/src/handlers/site_upload/powerset_merge.py @@ -44,7 +44,7 @@ def __init__(self, event): self.study = s3_key_array[1] self.data_package = s3_key_array[2].split("__")[1] self.site = s3_key_array[3] - self.version = s3_key_array[4] + self.version = s3_key_array[4].split("__")[-1] self.metadata = functions.read_metadata(self.s3_client, self.s3_bucket_name) self.types_metadata = functions.read_metadata( self.s3_client, @@ -53,7 +53,8 @@ def __init__(self, event): ) self.csv_aggerate_path = ( f"s3://{self.s3_bucket_name}/{enums.BucketPath.CSVAGGREGATE.value}/" - f"{self.study}/{self.study}__{self.data_package}/{self.version}/" + f"{self.study}/{self.study}__{self.data_package}/" + f"{self.version}/" f"{self.study}__{self.data_package}__aggregate.csv" ) @@ -85,7 +86,8 @@ def write_parquet(self, df: pandas.DataFrame, is_new_data_package: bool) -> None """writes dataframe as parquet to s3 and sends an SNS notification if new""" parquet_aggregate_path = ( f"s3://{self.s3_bucket_name}/{enums.BucketPath.AGGREGATE.value}/" - f"{self.study}/{self.study}__{self.data_package}/{self.version}/" + f"{self.study}/{self.study}__{self.data_package}/" + f"{self.study}__{self.data_package}__{self.version}/" f"{self.study}__{self.data_package}__aggregate.parquet" ) awswrangler.s3.to_parquet(df, parquet_aggregate_path, index=False) diff --git a/template.yaml b/template.yaml index ea10209..3d8c21e 100644 --- a/template.yaml +++ b/template.yaml @@ -336,7 +336,7 @@ Resources: LoggingConfig: ApplicationLogLevel: !Ref LogLevel LogFormat: !Ref LogFormat - LogGroup: !Sub "/aws/lambda/CumulusAggDashboardGetChartDataFunction-${DeployStage}" + LogGroup: !Sub "/aws/lambda/CumulusAggDashboardGetChartData-${DeployStage}" MemorySize: 2048 Timeout: 100 Description: Retrieve data for chart display in Cumulus Dashboard @@ -350,8 +350,13 @@ Resources: Type: Api Properties: RestApiId: !Ref DashboardApiGateway - Path: /chart-data/{subscription_name} + Path: /data-packages/{data_package_id}/chart Method: GET + RequestParameters: + - method.request.querystring.column: + Required: true + - method.request.querystring.filters: + Required: false Policies: - S3CrudPolicy: BucketName: !Ref AggregatorBucket @@ -412,7 +417,7 @@ Resources: Type: Api Properties: RestApiId: !Ref DashboardApiGateway - Path: /last_valid/{study}/{data_package}/{site}/{version}/{filename} + Path: /last-valid/{study}/{data_package}/{site}/{version}/{filename} Method: GET Policies: - S3ReadPolicy: @@ -458,7 +463,7 @@ Resources: Type: Api Properties: RestApiId: !Ref DashboardApiGateway - Path: /last_valid/ + Path: /last-valid/ Method: GET Policies: - S3ReadPolicy: @@ -566,7 +571,7 @@ Resources: Type: Api Properties: RestApiId: !Ref DashboardApiGateway - Path: /data_packages + Path: /data-packages Method: GET RequestParameters: - method.request.querystring.name: @@ -575,7 +580,7 @@ Resources: Type: Api Properties: RestApiId: !Ref DashboardApiGateway - Path: /data_packages/{id} + Path: /data-packages/{data_package_id} Method: GET # TODO: it :should: be possible to move these policies to a central role/policy # set that can be referenced in multiple places; see @@ -735,7 +740,7 @@ Resources: CatalogId: !Ref AWS::AccountId DatabaseInput: Name: !Sub '${GlueNameParameter}-${DeployStage}' - Description: Database for serving data to Cumulus Dashboard + Description: Database for serving data to the Cumulus Dashboard GlueCrawler: Type: AWS::Glue::Crawler @@ -743,11 +748,11 @@ Resources: Name: !Sub '${GlueNameParameter}-crawler-${DeployStage}' DatabaseName: !Ref GlueDB Role: !GetAtt CrawlerRole.Arn - Configuration: "{\"Version\":1.0,\"Grouping\":{\"TableLevelConfiguration\":4}}" + Configuration: "{\"Version\":1.0,\"Grouping\":{\"TableLevelConfiguration\":5}}" RecrawlPolicy: RecrawlBehavior: CRAWL_EVERYTHING SchemaChangePolicy: - DeleteBehavior: DEPRECATE_IN_DATABASE + DeleteBehavior: DELETE_FROM_DATABASE UpdateBehavior: UPDATE_IN_DATABASE Schedule: ScheduleExpression: "cron(0 22 ? * SUN *)" diff --git a/tests/conftest.py b/tests/conftest.py index 34b84cf..a7abc6e 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -49,7 +49,8 @@ def _init_mock_data(s3_client, bucket, study, data_package, version): "./tests/test_data/count_synthea_patient_agg.parquet", bucket, f"{enums.BucketPath.AGGREGATE.value}/{study}/" - f"{study}__{data_package}/{version}/{study}__{data_package}__aggregate.parquet", + f"{study}__{data_package}/{study}__{data_package}__{version}/" + f"{study}__{data_package}__aggregate.parquet", ) s3_client.upload_file( "./tests/test_data/count_synthea_patient_agg.csv", diff --git a/tests/dashboard/test_get_chart_data.py b/tests/dashboard/test_get_chart_data.py index 8a6febc..7acdbdf 100644 --- a/tests/dashboard/test_get_chart_data.py +++ b/tests/dashboard/test_get_chart_data.py @@ -9,6 +9,7 @@ from tests.mock_utils import ( EXISTING_DATA_P, EXISTING_STUDY, + EXISTING_VERSION, MOCK_ENV, TEST_GLUE_DB, ) @@ -33,7 +34,7 @@ def mock_data_frame(filter_param): ( {"column": "gender"}, [], - {"data_package": "test_study"}, + {"data_package_id": "test_study"}, f'SELECT gender, sum(cnt) as cnt FROM "{TEST_GLUE_DB}"."test_study" ' "WHERE COALESCE (race) IS NOT Null AND gender IS NOT Null " "GROUP BY gender", @@ -41,7 +42,7 @@ def mock_data_frame(filter_param): ( {"column": "gender", "stratifier": "race"}, [], - {"data_package": "test_study"}, + {"data_package_id": "test_study"}, f'SELECT race, gender, sum(cnt) as cnt FROM "{TEST_GLUE_DB}"."test_study" ' "WHERE gender IS NOT Null " "GROUP BY race, gender", @@ -49,7 +50,7 @@ def mock_data_frame(filter_param): ( {"column": "gender"}, ["gender:strEq:female"], - {"data_package": "test_study"}, + {"data_package_id": "test_study"}, f'SELECT gender, sum(cnt) as cnt FROM "{TEST_GLUE_DB}"."test_study" ' "WHERE COALESCE (race) IS NOT Null AND gender IS NOT Null " "AND gender LIKE 'female' " @@ -58,7 +59,7 @@ def mock_data_frame(filter_param): ( {"column": "gender", "stratifier": "race"}, ["gender:strEq:female"], - {"data_package": "test_study"}, + {"data_package_id": "test_study"}, f'SELECT race, gender, sum(cnt) as cnt FROM "{TEST_GLUE_DB}"."test_study" ' "WHERE gender IS NOT Null " "AND gender LIKE 'female' " @@ -103,8 +104,8 @@ def test_format_payload(query_params, filters, expected_payload): def test_get_data_cols(mock_bucket): - table_name = f"{EXISTING_STUDY}__{EXISTING_DATA_P}" - res = get_chart_data._get_table_cols(table_name) + table_id = f"{EXISTING_STUDY}__{EXISTING_DATA_P}__{EXISTING_VERSION}" + res = get_chart_data._get_table_cols(table_id) cols = pandas.read_csv("./tests/test_data/count_synthea_patient_agg.csv").columns assert res == list(cols) diff --git a/tests/dashboard/test_get_data_packages.py b/tests/dashboard/test_get_data_packages.py index 016dbdc..20e1e96 100644 --- a/tests/dashboard/test_get_data_packages.py +++ b/tests/dashboard/test_get_data_packages.py @@ -22,11 +22,13 @@ def test_get_data_packages(mock_bucket): assert 2 == len(json.loads(res["body"])) for item in data: assert item["name"] == "encounter" - res = data_packages_handler({"pathParameters": {"id": "other_study__document__100"}}, {}) + res = data_packages_handler( + {"pathParameters": {"data_package_id": "other_study__document__100"}}, {} + ) data = json.loads(res["body"]) assert res["statusCode"] == 200 assert 9 == len(data) assert data["id"] == "other_study__document__100" - res = data_packages_handler({"pathParameters": {"id": "not_an_id"}}, {}) + res = data_packages_handler({"pathParameters": {"data_package_id": "not_an_id"}}, {}) data = json.loads(res["body"]) assert res["statusCode"] == 404 diff --git a/tests/site_upload/test_powerset_merge.py b/tests/site_upload/test_powerset_merge.py index 09dcf9f..106c4da 100644 --- a/tests/site_upload/test_powerset_merge.py +++ b/tests/site_upload/test_powerset_merge.py @@ -95,10 +95,10 @@ ( # ensuring that a data package that is a substring does not get # merged by substr match "./tests/test_data/count_synthea_patient.parquet", - f"/{EXISTING_STUDY}/{EXISTING_STUDY}__{EXISTING_DATA_P[0:-2]}/" - f"{EXISTING_SITE}/{EXISTING_VERSION}/encount.parquet", - f"/{EXISTING_STUDY}/{EXISTING_STUDY}__{EXISTING_DATA_P[0:-2]}/" - f"{EXISTING_SITE}/{EXISTING_VERSION}/encount.parquet", + f"/{EXISTING_STUDY}/{EXISTING_STUDY}__{EXISTING_DATA_P[0:-2]}/{EXISTING_SITE}/" + f"{EXISTING_VERSION}/encount.parquet", + f"/{EXISTING_STUDY}/{EXISTING_STUDY}__{EXISTING_DATA_P[0:-2]}/{EXISTING_SITE}/" + f"{EXISTING_VERSION}/encount.parquet", False, 200, ITEM_COUNT + 4, @@ -164,7 +164,7 @@ def test_powerset_merge_single_upload( ] } # This array looks like: - # ['', 'study', 'package', 'site', 'file'] + # ['', 'study', 'package', 'site', 'version','file'] event_list = event_key.split("/") study = event_list[1] data_package = event_list[2]