diff --git a/snowflake_cybersyn_demo/agent_services/financial_and_economic_essentials/time_series_getter_agent.py b/snowflake_cybersyn_demo/agent_services/financial_and_economic_essentials/time_series_getter_agent.py index 65a9ca3..f88380f 100644 --- a/snowflake_cybersyn_demo/agent_services/financial_and_economic_essentials/time_series_getter_agent.py +++ b/snowflake_cybersyn_demo/agent_services/financial_and_economic_essentials/time_series_getter_agent.py @@ -94,8 +94,7 @@ def get_time_series_of_good(good: str) -> str: # process results = [ - {"good": str(el[1]), "date": str(el[0]), "price": str(el[2])} - for el in results + {"good": str(el[1]), "date": str(el[0]), "price": str(el[2])} for el in results ] results_str = json.dumps(results, indent=4) @@ -121,7 +120,7 @@ def perform_price_aggregation(json_str: str) -> str: for date, prices in new_time_series_data.items() ] - return json.dumps(reduced_time_series_data, indent=4) + return reduced_time_series_data goods_getter_tool = FunctionTool.from_defaults( diff --git a/snowflake_cybersyn_demo/apps/streamlit.py b/snowflake_cybersyn_demo/apps/streamlit.py index 0fa738b..fac80ad 100644 --- a/snowflake_cybersyn_demo/apps/streamlit.py +++ b/snowflake_cybersyn_demo/apps/streamlit.py @@ -15,6 +15,9 @@ HumanRequest, HumanService, ) +from snowflake_cybersyn_demo.agent_services.financial_and_economic_essentials.time_series_getter_agent import ( + perform_price_aggregation, +) from snowflake_cybersyn_demo.apps.controller import Controller from snowflake_cybersyn_demo.apps.final_task_consumer import FinalTaskConsumer @@ -29,15 +32,13 @@ @st.cache_resource -def startup() -> ( - Tuple[ - Controller, - queue.Queue[TaskResult], - FinalTaskConsumer, - queue.Queue[HumanRequest], - queue.Queue[str], - ] -): +def startup() -> Tuple[ + Controller, + queue.Queue[TaskResult], + FinalTaskConsumer, + queue.Queue[HumanRequest], + queue.Queue[str], +]: from snowflake_cybersyn_demo.additional_services.human_in_the_loop import ( human_input_request_queue, human_input_result_queue, @@ -60,9 +61,7 @@ async def start_consuming_human_tasks(hs: HumanService) -> None: ) ) - consuming_callable = await message_queue.register_consumer( - hs.as_consumer() - ) + consuming_callable = await message_queue.register_consumer(hs.as_consumer()) ht_task = asyncio.create_task(consuming_callable()) # noqa: F841 @@ -225,22 +224,20 @@ def task_df() -> None: st.text_input( "Provide human input", key="human_input", - on_change=controller.get_human_input_handler( - human_input_result_queue - ), + on_change=controller.get_human_input_handler(human_input_result_queue), ) - sidebar_enabled = ( + show_task_res = ( len(event.selection["rows"]) > 0 and st.session_state.current_task.status == "completed" ) - if sidebar_enabled: + if show_task_res: if task_res := controller.get_task_result( st.session_state.current_task.task_id ): try: - timeseries_data = json.loads(task_res.result) + timeseries_data = perform_price_aggregation(task_res.result) except json.JSONDecodeError: logger.info("Could not decode task_res") pass @@ -249,9 +246,9 @@ def task_df() -> None: "dates": [el["date"] for el in timeseries_data], "price": [el["price"] for el in timeseries_data], } - st.header(title) - logger.info(timeseries_data) - st.line_chart(data=timeseries_data, x="dates", y="price") + with st.container(height=500): + st.header(title) + st.bar_chart(data=timeseries_data, x="dates", y="price", height=400) task_df() @@ -267,9 +264,7 @@ def process_completed_tasks(completed_queue: queue.Queue) -> None: logger.info("task result queue is empty.") if task_res: - controller.update_associated_task_to_completed_status( - task_res=task_res - ) + controller.update_associated_task_to_completed_status(task_res=task_res) process_completed_tasks(completed_queue=completed_tasks_queue) @@ -287,9 +282,7 @@ def process_human_input_requests( logger.info("human request queue is empty.") if human_req: - controller.update_associated_task_to_human_required_status( - human_req=human_req - ) + controller.update_associated_task_to_human_required_status(human_req=human_req) process_human_input_requests(human_requests_queue=human_input_request_queue)