Skip to content

Commit

Permalink
got it working
Browse files Browse the repository at this point in the history
  • Loading branch information
nerdai committed Jul 29, 2024
1 parent b0d87ec commit b287ff0
Showing 1 changed file with 1 addition and 117 deletions.
118 changes: 1 addition & 117 deletions snowflake_cybersyn_demo/apps/streamlit.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ async def start_consuming_finalized_tasks(final_task_consumer):
st.session_state.messages.append({"role": "assistant", "content": response})


@st.experimental_fragment(run_every=30)
@st.experimental_fragment(run_every="30s")
def task_df():
st.text("Task Status")
st.button("Refresh")
Expand All @@ -154,50 +154,6 @@ def task_df():
task_df()


@st.cache_resource
def start_consuming(final_task_consumer: FinalTaskConsumer):
if st.session_state.consuming:
return

import time
import threading

# async def write_to_queue(queue):
# task = TaskModel(
# task_id="111",
# input="Test task",
# chat_history=[
# ChatMessage(role="user", content="Test task"),
# ],
# status=TaskStatus.COMPLETED,
# )
# queue.put(task)

async def start_consuming_finalized_tasks(final_task_consumer):
final_task_consuming_callable = (
await final_task_consumer.register_to_message_queue()
)

await final_task_consuming_callable()

# server thread will remain active as long as streamlit thread is running, or is manually shutdown
thread = threading.Thread(
name="Consuming thread",
target=asyncio.run,
args=(start_consuming_finalized_tasks(final_task_consumer),),
daemon=False,
)
thread.start()

time.sleep(5)
st.session_state.consuming = True
logger.info("Started consuming.")
return thread


# _thread = start_consuming(final_task_consumer=final_task_consumer)


@st.experimental_fragment(run_every=5)
def process_completed_tasks(completed_queue: queue.Queue):
task_res: Optional[TaskResult] = None
Expand Down Expand Up @@ -229,75 +185,3 @@ def process_completed_tasks(completed_queue: queue.Queue):


process_completed_tasks(completed_queue=completed_tasks_queue)


# @st.cache_resource
# def get_consuming_callables() -> None:
# async def launch() -> None:
# start_consuming_callable = (
# await controller._human_service.message_queue.register_consumer(
# controller._human_service.as_consumer()
# )
# )

# final_task_consuming_callable = (
# await controller._human_service.message_queue.register_consumer(
# controller._final_task_consumer
# )
# )

# return start_consuming_callable, final_task_consuming_callable

# return asyncio.run(launch())


# (
# start_consuming_callable,
# final_task_consuming_callable,
# ) = get_consuming_callables()


# def remove_from_list_closure(
# task_list: List[TaskModel],
# task_status: TaskStatus,
# task_res: TaskResult,
# # current_task: Tuple[int, TaskStatus] = current_task,
# ) -> None:
# """Closure depending on the task list/status.

# Returns a function used to move the task from the incumbent list/status
# over to the completed list.
# """
# ix, task = next(
# (ix, t) for ix, t in enumerate(task_list) if t.task_id == task_res.task_id
# )
# task.status = TaskStatus.COMPLETED
# task.chat_history.append(ChatMessage(role="assistant", content=task_res.result))
# del task_list[ix]

# if current_task:
# current_task_ix, current_task_status = current_task
# if current_task_status == task_status and current_task_ix == ix:
# # current task is the task that is being moved to completed
# current_task = (len(completed) - 1, TaskStatus.COMPLETED)


# async def listening_to_queue() -> None:
# logger.info("🤖 LISTENING")
# h_task = asyncio.create_task(start_consuming_callable()) # noqa: F841
# f_task = asyncio.create_task(final_task_consuming_callable()) # noqa: F841

# human_required_tasks = []
# while True:
# logger.info(f"submitted: {submitted_tasks._list}")

# try:
# new_task: TaskModel = controller._submitted_tasks_queue.get_nowait()
# await submitted_tasks.append(new_task)
# logger.info("got new submitted task")
# logger.info(f"submitted: {submitted_tasks}")
# except asyncio.QueueEmpty:
# logger.info("task completion queue is empty")


# asyncio.run(listening_to_queue(bottom))

0 comments on commit b287ff0

Please sign in to comment.