Skip to content

Commit

Permalink
Updated chart data, S3 aggregate parquet moved
Browse files Browse the repository at this point in the history
  • Loading branch information
dogversioning committed Oct 15, 2024
1 parent 7cd824f commit eade41e
Show file tree
Hide file tree
Showing 10 changed files with 188 additions and 107 deletions.
112 changes: 60 additions & 52 deletions docs/dashboard_api.prod.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -57,40 +57,6 @@ paths:
content: {}
security:
- api_key: []
/chart-data/{subscription_name}:
get:
parameters:
- name: "subscription_name"
in: "path"
required: true
schema:
type: "string"
security:
- api_key: []
options:
summary: "CORS support"
parameters:
- name: "subscription_name"
in: "path"
required: true
schema:
type: "string"
responses:
"200":
description: "Default response for CORS method"
headers:
Access-Control-Allow-Origin:
schema:
type: "string"
Access-Control-Allow-Methods:
schema:
type: "string"
Access-Control-Allow-Headers:
schema:
type: "string"
content: {}
security:
- api_key: []
/aggregates:
get:
security:
Expand Down Expand Up @@ -351,6 +317,48 @@ paths:
content: {}
security:
- api_key: []
/data_packages/{data_package_id}/chart:
get:
parameters:
- name: "data_package_id"
in: "path"
required: true
schema:
type: "string"
- name: "column"
in: "query"
required: true
schema:
type: "string"
- name: "filters"
in: "query"
schema:
type: "string"
security:
- api_key: []
options:
parameters:
- name: "data_package_id"
in: "path"
required: true
schema:
type: "string"
responses:
"200":
description: "Default response for CORS method"
headers:
Access-Control-Allow-Origin:
schema:
type: "string"
Access-Control-Allow-Methods:
schema:
type: "string"
Access-Control-Allow-Headers:
schema:
type: "string"
content: {}
security:
- api_key: []
/last_valid/{study}/{data_package}/{site}/{version}/{filename}:
get:
parameters:
Expand Down Expand Up @@ -513,19 +521,20 @@ paths:
content: {}
security:
- api_key: []
/data_packages/{id}:
/metadata/{site}:
get:
parameters:
- name: "id"
- name: "site"
in: "path"
required: true
schema:
type: "string"
security:
- api_key: []
options:
summary: "CORS support"
parameters:
- name: "id"
- name: "site"
in: "path"
required: true
schema:
Expand All @@ -546,24 +555,12 @@ paths:
content: {}
security:
- api_key: []
/metadata/{site}:
/study-periods:
get:
parameters:
- name: "site"
in: "path"
required: true
schema:
type: "string"
security:
- api_key: []
options:
summary: "CORS support"
parameters:
- name: "site"
in: "path"
required: true
schema:
type: "string"
responses:
"200":
description: "Default response for CORS method"
Expand All @@ -580,12 +577,23 @@ paths:
content: {}
security:
- api_key: []
/study-periods:
/data_packages/{data_package_id}:
get:
parameters:
- name: "data_package_id"
in: "path"
required: true
schema:
type: "string"
security:
- api_key: []
options:
summary: "CORS support"
parameters:
- name: "data_package_id"
in: "path"
required: true
schema:
type: "string"
responses:
"200":
description: "Default response for CORS method"
Expand Down
48 changes: 48 additions & 0 deletions scripts/migrations/migration.004.s3_name_with_id.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
"""Removes unexpected root nodes/templates/misspelled keys from transaction log."""

import argparse
import io
import json

import boto3


def _get_s3_data(key: str, bucket_name: str, client) -> dict:
"""Convenience class for retrieving a dict from S3"""
try:
bytes_buffer = io.BytesIO()
client.download_fileobj(Bucket=bucket_name, Key=key, Fileobj=bytes_buffer)
return json.loads(bytes_buffer.getvalue().decode())
except Exception: # pylint: disable=broad-except
return {}


def _put_s3_data(key: str, bucket_name: str, client, data: dict) -> None:
"""Convenience class for writing a dict to S3"""
b_data = io.BytesIO(json.dumps(data).encode())
client.upload_fileobj(Bucket=bucket_name, Key=key, Fileobj=b_data)


def s3_name_with_id(bucket: str):
client = boto3.client("s3")
res = client.list_objects_v2(Bucket=bucket)
contents = res["Contents"]
moved_files = 0
for s3_file in contents:
if s3_file["Key"].split("/")[0] == "aggregates":
key = s3_file["Key"]
key_array = key.split("/")
if len(key_array[3]) == 3:
key_array[3] = f"{key_array[2]}_{key_array[3]}"
new_key = "/".join(key_array)
client.copy({"Bucket": bucket, "Key": key}, bucket, new_key)
client.delete_object(Bucket=bucket, Key=key)
moved_files += 1
print(f"Updated {moved_files} aggregates")


if __name__ == "__main__":
parser = argparse.ArgumentParser(description="""Changes lowest directory in S3 to file id""")
parser.add_argument("-b", "--bucket", help="bucket name")
args = parser.parse_args()
s3_name_with_id(args.bucket)
67 changes: 40 additions & 27 deletions src/handlers/dashboard/get_chart_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,22 @@
This is intended to provide an implementation of the logic described in docs/api.md
"""

import logging
import os

import awswrangler
import boto3
import pandas

from src.handlers.dashboard.filter_config import get_filter_string
from src.handlers.shared.enums import BucketPath
from src.handlers.shared.functions import get_latest_data_package_version, http_response
from src.handlers.dashboard import filter_config
from src.handlers.shared import decorators, enums, errors, functions

log_level = os.environ.get("LAMBDA_LOG_LEVEL", "INFO")
logger = logging.getLogger()
logger.setLevel(log_level)

def _get_table_cols(table_name: str, version: str | None = None) -> list:

def _get_table_cols(dp_id: str, version: str | None = None) -> list:
"""Returns the columns associated with a table.
Since running an athena query takes a decent amount of time due to queueing
Expand All @@ -22,24 +26,26 @@ def _get_table_cols(table_name: str, version: str | None = None) -> list:
"""

s3_bucket_name = os.environ.get("BUCKET_NAME")
prefix = f"{BucketPath.CSVAGGREGATE.value}/{table_name.split('__')[0]}/{table_name}"
prefix = f"{enums.BucketPath.CSVAGGREGATE.value}/{dp_id.split('__')[0]}/{dp_id[:-4]}"
if version is None:
version = get_latest_data_package_version(s3_bucket_name, prefix)
print(f"{prefix}/{version}/{table_name}__aggregate.csv")
s3_key = f"{prefix}/{version}/{table_name}__aggregate.csv"
version = functions.get_latest_data_package_version(s3_bucket_name, prefix)
s3_key = f"{prefix}/{version}/{dp_id[:-4]}__aggregate.csv"
s3_client = boto3.client("s3")
s3_iter = s3_client.get_object(
Bucket=s3_bucket_name,
Key=s3_key,
)["Body"].iter_lines()
return next(s3_iter).decode().split(",")
try:
s3_iter = s3_client.get_object(
Bucket=s3_bucket_name,
Key=s3_key,
)["Body"].iter_lines()
return next(s3_iter).decode().split(",")
except Exception:
raise errors.AggregatorS3Error


def _build_query(query_params: dict, filters: list, path_params: dict) -> str:
"""Creates a query from the dashboard API spec"""
table = path_params["data_package"]
columns = _get_table_cols(table)
filter_str = get_filter_string(filters)
dp_id = path_params["data_package_id"]
columns = _get_table_cols(dp_id)
filter_str = filter_config.get_filter_string(filters)
if filter_str != "":
filter_str = f"AND {filter_str}"
count_col = next(c for c in columns if c.startswith("cnt"))
Expand All @@ -60,11 +66,12 @@ def _build_query(query_params: dict, filters: list, path_params: dict) -> str:
coalesce_str = "WHERE"
query_str = (
f"SELECT {select_str} " # nosec # noqa: S608
f"FROM \"{os.environ.get('GLUE_DB_NAME')}\".\"{table}\" "
f"FROM \"{os.environ.get('GLUE_DB_NAME')}\".\"{dp_id}\" "
f"{coalesce_str} "
f"{query_params['column']} IS NOT Null {filter_str} "
f"GROUP BY {group_str}"
)
logging.debug(query_str)
return query_str


Expand All @@ -91,21 +98,27 @@ def _format_payload(df: pandas.DataFrame, query_params: dict, filters: list) ->
return payload


# @generic_error_handler(msg="Error retrieving chart data")
@decorators.generic_error_handler(msg="Error retrieving chart data")
def chart_data_handler(event, context):
"""manages event from dashboard api call and retrieves data"""
del context
query_params = event["queryStringParameters"]
filters = event["multiValueQueryStringParameters"].get("filter", [])
path_params = event["pathParameters"]
boto3.setup_default_session(region_name="us-east-1")
query = _build_query(query_params, filters, path_params)
df = awswrangler.athena.read_sql_query(
query,
database=os.environ.get("GLUE_DB_NAME"),
s3_output=f"s3://{os.environ.get('BUCKET_NAME')}/awswrangler",
workgroup=os.environ.get("WORKGROUP_NAME"),
)
res = _format_payload(df, query_params, filters)
res = http_response(200, res)
try:
query = _build_query(query_params, filters, path_params)
df = awswrangler.athena.read_sql_query(
query,
database=os.environ.get("GLUE_DB_NAME"),
s3_output=f"s3://{os.environ.get('BUCKET_NAME')}/awswrangler",
workgroup=os.environ.get("WORKGROUP_NAME"),
)
res = _format_payload(df, query_params, filters)
res = functions.http_response(200, res)
except errors.AggregatorS3Error:
res = functions.http_response(
404, f"Aggregate for {path_params['data_package_id']} not found"
)

return res
2 changes: 1 addition & 1 deletion src/handlers/dashboard/get_data_packages.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ def data_packages_handler(event, context):
elif event.get("pathParameters"):
found = None
for package in data_packages:
if event["pathParameters"]["id"] == package["id"]:
if event["pathParameters"]["data_package_id"] == package["data_package_id"]:
found = package
if found:
payload = found
Expand Down
2 changes: 2 additions & 0 deletions src/handlers/shared/errors.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
class AggregatorS3Error(Exception):
"""Errors related to accessing files in S3"""
8 changes: 5 additions & 3 deletions src/handlers/site_upload/powerset_merge.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ def __init__(self, event):
self.study = s3_key_array[1]
self.data_package = s3_key_array[2].split("__")[1]
self.site = s3_key_array[3]
self.version = s3_key_array[4]
self.version = s3_key_array[4][-3:]
self.metadata = functions.read_metadata(self.s3_client, self.s3_bucket_name)
self.types_metadata = functions.read_metadata(
self.s3_client,
Expand All @@ -53,7 +53,8 @@ def __init__(self, event):
)
self.csv_aggerate_path = (
f"s3://{self.s3_bucket_name}/{enums.BucketPath.CSVAGGREGATE.value}/"
f"{self.study}/{self.study}__{self.data_package}/{self.version}/"
f"{self.study}/{self.study}__{self.data_package}/"
f"{self.version}/"
f"{self.study}__{self.data_package}__aggregate.csv"
)

Expand Down Expand Up @@ -85,7 +86,8 @@ def write_parquet(self, df: pandas.DataFrame, is_new_data_package: bool) -> None
"""writes dataframe as parquet to s3 and sends an SNS notification if new"""
parquet_aggregate_path = (
f"s3://{self.s3_bucket_name}/{enums.BucketPath.AGGREGATE.value}/"
f"{self.study}/{self.study}__{self.data_package}/{self.version}/"
f"{self.study}/{self.study}__{self.data_package}/"
f"{self.study}__{self.data_package}_{self.version}/"
f"{self.study}__{self.data_package}__aggregate.parquet"
)
awswrangler.s3.to_parquet(df, parquet_aggregate_path, index=False)
Expand Down
Loading

0 comments on commit eade41e

Please sign in to comment.