Skip to content

Commit

Permalink
fix stats fulfiller agent to produce output from sql
Browse files Browse the repository at this point in the history
  • Loading branch information
nerdai committed Aug 6, 2024
1 parent 3c91ef6 commit a7286bb
Showing 1 changed file with 16 additions and 8 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import asyncio
import json
import logging
from typing import Dict, List

import uvicorn
Expand All @@ -11,8 +12,6 @@
from snowflake.sqlalchemy import URL
from sqlalchemy import create_engine, text

import logging

logger = logging.getLogger(__name__)


Expand Down Expand Up @@ -65,8 +64,10 @@


SQL_QUERY_TEMPLATE = """
SELECT ts.date as date,
ts.value as value
SELECT
ts.date as date,
ts.variable_name,
ts.value as value
FROM cybersyn.datacommons_timeseries AS ts
JOIN cybersyn.geography_index AS geo ON (ts.geo_id = geo.geo_id)
WHERE geo.geo_name = '{city}'
Expand All @@ -77,7 +78,9 @@
"""


def get_time_series_of_statistic_variable(city: str, stats_variable: str) -> str:
def get_time_series_of_statistic_variable(
city: str, stats_variable: str
) -> str:
"""Create a time series of a specified stats variable."""
query = SQL_QUERY_TEMPLATE.format(city=city, stats_variable=stats_variable)
url = URL(
Expand All @@ -94,15 +97,16 @@ def get_time_series_of_statistic_variable(city: str, stats_variable: str) -> str
try:
connection = engine.connect()
results = connection.execute(text(query))
except Exception as e:
except Exception:
logger.debug("Failed to execute query")
raise
finally:
connection.close()

# process
results = [
{"good": str(el[1]), "date": str(el[0]), "price": str(el[2])} for el in results
{"variable": str(el[1]), "date": str(el[0]), "value": str(el[2])}
for el in results
]
results_str = json.dumps(results, indent=4)

Expand All @@ -124,7 +128,11 @@ def perform_date_value_aggregation(json_str: str) -> str:
new_time_series_data[date] = [float(value)]

reduced_time_series_data = [
{"variable": variable, "date": date, "value": sum(values) / len(values)}
{
"variable": variable,
"date": date,
"value": sum(values) / len(values),
}
for date, values in new_time_series_data.items()
]

Expand Down

0 comments on commit a7286bb

Please sign in to comment.