Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Updated chart data, S3 aggregate parquet moved #124

Merged
merged 3 commits into from
Oct 15, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
112 changes: 60 additions & 52 deletions docs/dashboard_api.prod.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -57,40 +57,6 @@ paths:
content: {}
security:
- api_key: []
/chart-data/{subscription_name}:
get:
parameters:
- name: "subscription_name"
in: "path"
required: true
schema:
type: "string"
security:
- api_key: []
options:
summary: "CORS support"
parameters:
- name: "subscription_name"
in: "path"
required: true
schema:
type: "string"
responses:
"200":
description: "Default response for CORS method"
headers:
Access-Control-Allow-Origin:
schema:
type: "string"
Access-Control-Allow-Methods:
schema:
type: "string"
Access-Control-Allow-Headers:
schema:
type: "string"
content: {}
security:
- api_key: []
/aggregates:
get:
security:
Expand Down Expand Up @@ -351,6 +317,48 @@ paths:
content: {}
security:
- api_key: []
/data_packages/{data_package_id}/chart:
dogversioning marked this conversation as resolved.
Show resolved Hide resolved
get:
parameters:
- name: "data_package_id"
in: "path"
required: true
schema:
type: "string"
- name: "column"
in: "query"
required: true
schema:
type: "string"
- name: "filters"
in: "query"
schema:
type: "string"
security:
- api_key: []
options:
parameters:
- name: "data_package_id"
in: "path"
required: true
schema:
type: "string"
responses:
"200":
description: "Default response for CORS method"
headers:
Access-Control-Allow-Origin:
schema:
type: "string"
Access-Control-Allow-Methods:
schema:
type: "string"
Access-Control-Allow-Headers:
schema:
type: "string"
content: {}
security:
- api_key: []
/last_valid/{study}/{data_package}/{site}/{version}/{filename}:
get:
parameters:
Expand Down Expand Up @@ -513,19 +521,20 @@ paths:
content: {}
security:
- api_key: []
/data_packages/{id}:
/metadata/{site}:
get:
parameters:
- name: "id"
- name: "site"
in: "path"
required: true
schema:
type: "string"
security:
- api_key: []
options:
summary: "CORS support"
parameters:
- name: "id"
- name: "site"
in: "path"
required: true
schema:
Expand All @@ -546,24 +555,12 @@ paths:
content: {}
security:
- api_key: []
/metadata/{site}:
/study-periods:
get:
parameters:
- name: "site"
in: "path"
required: true
schema:
type: "string"
security:
- api_key: []
options:
summary: "CORS support"
parameters:
- name: "site"
in: "path"
required: true
schema:
type: "string"
responses:
"200":
description: "Default response for CORS method"
Expand All @@ -580,12 +577,23 @@ paths:
content: {}
security:
- api_key: []
/study-periods:
/data_packages/{data_package_id}:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: this is so lonely at the end, separated from its child API

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is autogenerated from API gateway, I have no control over it.

get:
parameters:
- name: "data_package_id"
in: "path"
required: true
schema:
type: "string"
security:
- api_key: []
options:
summary: "CORS support"
parameters:
- name: "data_package_id"
in: "path"
required: true
schema:
type: "string"
responses:
"200":
description: "Default response for CORS method"
Expand Down
48 changes: 48 additions & 0 deletions scripts/migrations/migration.004.s3_name_with_id.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
"""Removes unexpected root nodes/templates/misspelled keys from transaction log."""

import argparse
import io
import json

import boto3


def _get_s3_data(key: str, bucket_name: str, client) -> dict:
"""Convenience class for retrieving a dict from S3"""
try:
bytes_buffer = io.BytesIO()
client.download_fileobj(Bucket=bucket_name, Key=key, Fileobj=bytes_buffer)
return json.loads(bytes_buffer.getvalue().decode())
except Exception: # pylint: disable=broad-except
return {}


def _put_s3_data(key: str, bucket_name: str, client, data: dict) -> None:
"""Convenience class for writing a dict to S3"""
b_data = io.BytesIO(json.dumps(data).encode())
client.upload_fileobj(Bucket=bucket_name, Key=key, Fileobj=b_data)


def s3_name_with_id(bucket: str):
client = boto3.client("s3")
res = client.list_objects_v2(Bucket=bucket)
contents = res["Contents"]
moved_files = 0
for s3_file in contents:
if s3_file["Key"].split("/")[0] == "aggregates":
key = s3_file["Key"]
key_array = key.split("/")
if len(key_array[3]) == 3:
dogversioning marked this conversation as resolved.
Show resolved Hide resolved
key_array[3] = f"{key_array[2]}_{key_array[3]}"
new_key = "/".join(key_array)
client.copy({"Bucket": bucket, "Key": key}, bucket, new_key)
client.delete_object(Bucket=bucket, Key=key)
moved_files += 1
print(f"Updated {moved_files} aggregates")


if __name__ == "__main__":
parser = argparse.ArgumentParser(description="""Changes lowest directory in S3 to file id""")
parser.add_argument("-b", "--bucket", help="bucket name")
args = parser.parse_args()
s3_name_with_id(args.bucket)
67 changes: 40 additions & 27 deletions src/handlers/dashboard/get_chart_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,22 @@
This is intended to provide an implementation of the logic described in docs/api.md
"""

import logging
import os

import awswrangler
import boto3
import pandas

from src.handlers.dashboard.filter_config import get_filter_string
from src.handlers.shared.enums import BucketPath
from src.handlers.shared.functions import get_latest_data_package_version, http_response
from src.handlers.dashboard import filter_config
from src.handlers.shared import decorators, enums, errors, functions

log_level = os.environ.get("LAMBDA_LOG_LEVEL", "INFO")
logger = logging.getLogger()
logger.setLevel(log_level)

def _get_table_cols(table_name: str, version: str | None = None) -> list:

def _get_table_cols(dp_id: str, version: str | None = None) -> list:
"""Returns the columns associated with a table.

Since running an athena query takes a decent amount of time due to queueing
Expand All @@ -22,24 +26,26 @@ def _get_table_cols(table_name: str, version: str | None = None) -> list:
"""

s3_bucket_name = os.environ.get("BUCKET_NAME")
prefix = f"{BucketPath.CSVAGGREGATE.value}/{table_name.split('__')[0]}/{table_name}"
prefix = f"{enums.BucketPath.CSVAGGREGATE.value}/{dp_id.split('__')[0]}/{dp_id[:-4]}"
dogversioning marked this conversation as resolved.
Show resolved Hide resolved
if version is None:
version = get_latest_data_package_version(s3_bucket_name, prefix)
print(f"{prefix}/{version}/{table_name}__aggregate.csv")
s3_key = f"{prefix}/{version}/{table_name}__aggregate.csv"
version = functions.get_latest_data_package_version(s3_bucket_name, prefix)
s3_key = f"{prefix}/{version}/{dp_id[:-4]}__aggregate.csv"
s3_client = boto3.client("s3")
s3_iter = s3_client.get_object(
Bucket=s3_bucket_name,
Key=s3_key,
)["Body"].iter_lines()
return next(s3_iter).decode().split(",")
try:
s3_iter = s3_client.get_object(
Bucket=s3_bucket_name,
Key=s3_key,
)["Body"].iter_lines()
return next(s3_iter).decode().split(",")
except Exception:
raise errors.AggregatorS3Error


def _build_query(query_params: dict, filters: list, path_params: dict) -> str:
"""Creates a query from the dashboard API spec"""
table = path_params["data_package"]
columns = _get_table_cols(table)
filter_str = get_filter_string(filters)
dp_id = path_params["data_package_id"]
columns = _get_table_cols(dp_id)
filter_str = filter_config.get_filter_string(filters)
if filter_str != "":
filter_str = f"AND {filter_str}"
count_col = next(c for c in columns if c.startswith("cnt"))
Expand All @@ -60,11 +66,12 @@ def _build_query(query_params: dict, filters: list, path_params: dict) -> str:
coalesce_str = "WHERE"
query_str = (
f"SELECT {select_str} " # nosec # noqa: S608
f"FROM \"{os.environ.get('GLUE_DB_NAME')}\".\"{table}\" "
f"FROM \"{os.environ.get('GLUE_DB_NAME')}\".\"{dp_id}\" "
f"{coalesce_str} "
f"{query_params['column']} IS NOT Null {filter_str} "
f"GROUP BY {group_str}"
)
logging.debug(query_str)
return query_str


Expand All @@ -91,21 +98,27 @@ def _format_payload(df: pandas.DataFrame, query_params: dict, filters: list) ->
return payload


# @generic_error_handler(msg="Error retrieving chart data")
@decorators.generic_error_handler(msg="Error retrieving chart data")
def chart_data_handler(event, context):
"""manages event from dashboard api call and retrieves data"""
del context
query_params = event["queryStringParameters"]
filters = event["multiValueQueryStringParameters"].get("filter", [])
path_params = event["pathParameters"]
boto3.setup_default_session(region_name="us-east-1")
query = _build_query(query_params, filters, path_params)
df = awswrangler.athena.read_sql_query(
query,
database=os.environ.get("GLUE_DB_NAME"),
s3_output=f"s3://{os.environ.get('BUCKET_NAME')}/awswrangler",
workgroup=os.environ.get("WORKGROUP_NAME"),
)
res = _format_payload(df, query_params, filters)
res = http_response(200, res)
try:
query = _build_query(query_params, filters, path_params)
df = awswrangler.athena.read_sql_query(
query,
database=os.environ.get("GLUE_DB_NAME"),
s3_output=f"s3://{os.environ.get('BUCKET_NAME')}/awswrangler",
workgroup=os.environ.get("WORKGROUP_NAME"),
)
res = _format_payload(df, query_params, filters)
res = functions.http_response(200, res)
except errors.AggregatorS3Error:
res = functions.http_response(
404, f"Aggregate for {path_params['data_package_id']} not found"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: that's a very specific conclusion to draw from a generic "something happened error". Might be helpful to log the original error message and/or include it here too.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The error should get logged to cloudwatch, security asked us to have no stack traces in responses and to generally have these error messages be unhelpful while the API is externally accessible.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe add a comment to that effect? Like... if a third developer comes across this in 2 years, they won't have that context. Something like "# don't be too informative about internal state in our error messages" (but in Matt voice) above this generic response.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, added a brief note.

)

return res
2 changes: 1 addition & 1 deletion src/handlers/dashboard/get_data_packages.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ def data_packages_handler(event, context):
elif event.get("pathParameters"):
found = None
for package in data_packages:
if event["pathParameters"]["id"] == package["id"]:
if event["pathParameters"]["data_package_id"] == package["id"]:
found = package
if found:
payload = found
Expand Down
2 changes: 2 additions & 0 deletions src/handlers/shared/errors.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
class AggregatorS3Error(Exception):
"""Errors related to accessing files in S3"""
8 changes: 5 additions & 3 deletions src/handlers/site_upload/powerset_merge.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ def __init__(self, event):
self.study = s3_key_array[1]
self.data_package = s3_key_array[2].split("__")[1]
self.site = s3_key_array[3]
self.version = s3_key_array[4]
self.version = s3_key_array[4][-3:]
dogversioning marked this conversation as resolved.
Show resolved Hide resolved
self.metadata = functions.read_metadata(self.s3_client, self.s3_bucket_name)
self.types_metadata = functions.read_metadata(
self.s3_client,
Expand All @@ -53,7 +53,8 @@ def __init__(self, event):
)
self.csv_aggerate_path = (
f"s3://{self.s3_bucket_name}/{enums.BucketPath.CSVAGGREGATE.value}/"
f"{self.study}/{self.study}__{self.data_package}/{self.version}/"
f"{self.study}/{self.study}__{self.data_package}/"
f"{self.version}/"
f"{self.study}__{self.data_package}__aggregate.csv"
)

Expand Down Expand Up @@ -85,7 +86,8 @@ def write_parquet(self, df: pandas.DataFrame, is_new_data_package: bool) -> None
"""writes dataframe as parquet to s3 and sends an SNS notification if new"""
parquet_aggregate_path = (
f"s3://{self.s3_bucket_name}/{enums.BucketPath.AGGREGATE.value}/"
f"{self.study}/{self.study}__{self.data_package}/{self.version}/"
f"{self.study}/{self.study}__{self.data_package}/"
f"{self.study}__{self.data_package}_{self.version}/"
f"{self.study}__{self.data_package}__aggregate.parquet"
)
awswrangler.s3.to_parquet(df, parquet_aggregate_path, index=False)
Expand Down
Loading
Loading