Skip to content

Commit

Permalink
argh
Browse files Browse the repository at this point in the history
  • Loading branch information
nerdai committed Jul 26, 2024
1 parent 1e793a2 commit 878ff60
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 35 deletions.
4 changes: 1 addition & 3 deletions snowflake_cybersyn_demo/apps/controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,7 @@ def __init__(
self._final_task_consumer = CallableMessageConsumer(
message_type="human", handler=self._process_completed_task_messages
)
self._completed_tasks_queue: asyncio.Queue[
TaskResult
] = asyncio.Queue()
self._completed_tasks_queue: asyncio.Queue[TaskResult] = asyncio.Queue()

async def _process_completed_task_messages(
self, message: QueueMessage, **kwargs: Any
Expand Down
63 changes: 31 additions & 32 deletions snowflake_cybersyn_demo/apps/streamlit.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,12 +74,8 @@
for m in st.session_state.messages
]
)
response = st.write_stream(
controller._llama_index_stream_wrapper(stream)
)
st.session_state.messages.append(
{"role": "assistant", "content": response}
)
response = st.write_stream(controller._llama_index_stream_wrapper(stream))
st.session_state.messages.append({"role": "assistant", "content": response})

bottom = st.container()
with bottom:
Expand Down Expand Up @@ -168,14 +164,10 @@ def remove_from_list_closure(
over to the completed list.
"""
ix, task = next(
(ix, t)
for ix, t in enumerate(task_list)
if t.task_id == task_res.task_id
(ix, t) for ix, t in enumerate(task_list) if t.task_id == task_res.task_id
)
task.status = TaskStatus.COMPLETED
task.chat_history.append(
ChatMessage(role="assistant", content=task_res.result)
)
task.chat_history.append(ChatMessage(role="assistant", content=task_res.result))
del task_list[ix]
st.session_state.completed_tasks.append(task)

Expand All @@ -188,9 +180,7 @@ def remove_from_list_closure(
try:
task_res: TaskResult = controller._completed_tasks_queue.get_nowait()
logger.info("got new completed task result")
if task_res.task_id in [
t.task_id for t in st.session_state.submitted_tasks
]:
if task_res.task_id in [t.task_id for t in st.session_state.submitted_tasks]:
remove_from_list_closure(
st.session_state.submitted_tasks, TaskStatus.SUBMITTED
)
Expand All @@ -202,9 +192,7 @@ def remove_from_list_closure(
TaskStatus.HUMAN_REQUIRED,
)
else:
raise ValueError(
"Completed task not in submitted or human_needed lists."
)
raise ValueError("Completed task not in submitted or human_needed lists.")
except asyncio.QueueEmpty:
logger.info("completed task queue is empty.")
pass
Expand All @@ -214,23 +202,34 @@ def remove_from_list_closure(
continuously_check_for_completed_tasks()


async def launch() -> None:
start_consuming_callable = (
await controller._human_service.message_queue.register_consumer(
controller._human_service.as_consumer()
@st.cache_resource
def get_consuming_callables() -> None:
async def launch() -> None:
start_consuming_callable = (
await controller._human_service.message_queue.register_consumer(
controller._human_service.as_consumer()
)
)
)
h_task = asyncio.create_task(start_consuming_callable()) # noqa: F841

final_task_consuming_callable = (
await controller._human_service.message_queue.register_consumer(
controller._final_task_consumer
final_task_consuming_callable = (
await controller._human_service.message_queue.register_consumer(
controller._final_task_consumer
)
)
)
f_task = asyncio.create_task(final_task_consuming_callable()) # noqa: F841

await asyncio.Future()
return start_consuming_callable, final_task_consuming_callable

return asyncio.run(launch())


start_consuming_callable, final_task_consuming_callable = get_consuming_callables()


async def listening_to_queue() -> None:
h_task = asyncio.create_task(start_consuming_callable()) # noqa: F841
f_task = asyncio.create_task(final_task_consuming_callable()) # noqa: F841
while True:
await asyncio.sleep(0.1)


if __name__ == "__main__":
asyncio.run(launch())
asyncio.run(listening_to_queue())

0 comments on commit 878ff60

Please sign in to comment.