Skip to content

Commit

Permalink
Make dataframe from events
Browse files Browse the repository at this point in the history
  • Loading branch information
berland committed Sep 26, 2024
1 parent 93f011d commit 8fd0806
Showing 1 changed file with 39 additions and 9 deletions.
48 changes: 39 additions & 9 deletions res/dash_gui/app.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,23 @@
import json
import threading
import websocket

import dash
import dash_bootstrap_components as dbc
import dash_table
import pandas as pd
from communication.experiments import fetch_experiments_data, get_data, set_data
import websocket
from communication.experiments import fetch_experiments_data
from dash import dcc, html
from dash.dependencies import Input, Output, State

# Initialize the Dash app with a Bootstrap theme
app = dash.Dash(__name__, external_stylesheets=[dbc.themes.CYBORG])
global old_id
old_id = None
global ws_messages
global dataframe_rows
ws_messages = [] # Clear the global message log after displaying

dataframe_rows = []

# Define the layout of the app
app.layout = dbc.Container(
Expand Down Expand Up @@ -84,13 +87,39 @@
)


FM_STATES = pd.DataFrame(
columns=[
"experiment_id",
"iteration",
"realization",
"fm_id",
"current_memory_usage",
]
)


def event_to_row(event: dict, experiment_id):
if event["event_type"] in ["SnapshotUpdateEvent", "FullSnapshotEvent"]:
for real_id, real in event["snapshot"].get("reals", {}).items():
for fm_id, fm in real["forward_models"].items():
return {
"experiment_id": experiment_id,
"iteration": event["current_iteration"],
"realization": real_id,
"fm_id": fm_id,
"current_memory_usage": fm["current_memory_usage"],
}


# Callback to update the table with new data
@app.callback(
Output("experiments-table", "data"),
[Input("refresh-button", "n_clicks"), Input("interval-component", "n_intervals")],
)
def update_table(n_clicks, n_intervals):
global dataframe_rows
data = fetch_experiments_data()
print(pd.DataFrame(dataframe_rows))
if data:
# Convert to DataFrame and return as dict records
df = pd.DataFrame(data)
Expand All @@ -100,10 +129,13 @@ def update_table(n_clicks, n_intervals):

# WebSocket client handling function
def on_message(ws, message):
global ws_messages
global ws_messages, dataframe_rows, old_id
x = json.loads(message)
ws_messages.append(x)
if x['event_type'] == "EndEvent":
event_as_dict = event_to_row(x, old_id)
if event_as_dict:
dataframe_rows.append(event_as_dict)
if x["event_type"] == "EndEvent":
ws.close()
print(f"WebSocket message received: {x['event_type']}")

Expand Down Expand Up @@ -140,18 +172,16 @@ def start_websocket(experiment_id):
[State("websocket-events", "children")],
)
def connect_to_websocket(selected_experiment_id, n_intervals, existing_logs):
data = get_data()
old_id = data.get("selected_experiment_id", None)
global old_id
if selected_experiment_id and selected_experiment_id["row_id"] == old_id:
global ws_messages
new_logs = "\n".join([json.dumps(msg) for msg in ws_messages])
ws_messages = [] # Clear the global message log after displaying
return (existing_logs or "") + "\n" + new_logs

elif selected_experiment_id and selected_experiment_id["row_id"] != old_id:
data["selected_experiment_id"] = selected_experiment_id["row_id"]
set_data(data)
# Start WebSocket connection in a new thread
old_id = selected_experiment_id["row_id"]
ws_thread = threading.Thread(
target=start_websocket, args=(selected_experiment_id["row_id"],)
)
Expand Down

0 comments on commit 8fd0806

Please sign in to comment.