diff --git a/src/workflow/StreamlitUI.py b/src/workflow/StreamlitUI.py index 270aaaf3..392f5926 100644 --- a/src/workflow/StreamlitUI.py +++ b/src/workflow/StreamlitUI.py @@ -3,7 +3,7 @@ from pathlib import Path import shutil import subprocess -from typing import Any, Union, List +from typing import Any, Union, List, Literal import json import os import sys @@ -14,7 +14,13 @@ from datetime import datetime -from src.common.common import OS_PLATFORM, TK_AVAILABLE, tk_directory_dialog, tk_file_dialog +from src.common.common import ( + OS_PLATFORM, + TK_AVAILABLE, + tk_directory_dialog, + tk_file_dialog, +) + class StreamlitUI: """ @@ -37,7 +43,7 @@ def upload_widget( key: str, file_types: Union[str, List[str]], name: str = "", - fallback: Union[List, str] = None + fallback: Union[List, str] = None, ) -> None: """ Handles file uploads through the Streamlit interface, supporting both direct @@ -72,14 +78,19 @@ def upload_widget( if st.session_state.location == "local": c2_text, c2_checkbox = c2.columns([1.5, 1], gap="large") c2_text.markdown("**OR add files from local folder**") - use_copy = c2_checkbox.checkbox("Make a copy of files", key=f"{key}-copy_files", value=True, help="Create a copy of files in workspace.") + use_copy = c2_checkbox.checkbox( + "Make a copy of files", + key=f"{key}-copy_files", + value=True, + help="Create a copy of files in workspace.", + ) else: use_copy = True # Convert file_types to a list if it's a string if isinstance(file_types, str): file_types = [file_types] - + if use_copy: with c1.form(f"{key}-upload", clear_on_submit=True): # Streamlit file uploader accepts file types as a list or None @@ -100,9 +111,9 @@ def upload_widget( files = [files] for f in files: # Check if file type is in the list of accepted file types - if f.name not in [f.name for f in files_dir.iterdir()] and any( - f.name.endswith(ft) for ft in file_types - ): + if f.name not in [ + f.name for f in files_dir.iterdir() + ] and any(f.name.endswith(ft) for ft in file_types): with open(Path(files_dir, f.name), "wb") as fh: fh.write(f.getbuffer()) st.success("Successfully added uploaded files!") @@ -124,7 +135,7 @@ def upload_widget( help="Browse for your local MS data files.", disabled=not TK_AVAILABLE, ) - + # Tk file dialog requires file types to be a list of tuples if isinstance(file_types, str): tk_file_types = [(f"{file_types}", f"*.{file_types}")] @@ -132,8 +143,7 @@ def upload_widget( tk_file_types = [(f"{ft}", f"*.{ft}") for ft in file_types] else: raise ValueError("'file_types' must be either of type str or list") - - + if dialog_button: local_files = tk_file_dialog( "Select your local MS data files", @@ -147,7 +157,7 @@ def upload_widget( f_handle.write(f"{f}\n") my_bar.empty() st.success("Successfully added files!") - + st.session_state["previous_dir"] = Path(local_files[0]).parent # Local file upload option: via directory path @@ -160,15 +170,32 @@ def upload_widget( with st_cols[0]: st.write("\n") st.write("\n") - dialog_button = st.button("📁", key=f'local_browse_{key}', help="Browse for your local directory with MS data.", disabled=not TK_AVAILABLE) + dialog_button = st.button( + "📁", + key=f"local_browse_{key}", + help="Browse for your local directory with MS data.", + disabled=not TK_AVAILABLE, + ) if dialog_button: - st.session_state["local_dir"] = tk_directory_dialog("Select directory with your MS data", st.session_state["previous_dir"]) + st.session_state["local_dir"] = tk_directory_dialog( + "Select directory with your MS data", + st.session_state["previous_dir"], + ) st.session_state["previous_dir"] = st.session_state["local_dir"] with st_cols[1]: - local_dir = st.text_input(f"path to folder with **{name}** files", key=f"path_to_folder_{key}", value=st.session_state["local_dir"]) + local_dir = st.text_input( + f"path to folder with **{name}** files", + key=f"path_to_folder_{key}", + value=st.session_state["local_dir"], + ) - if c2.button(f"Add **{name}** files from local folder", use_container_width=True, key=f"add_files_from_local_{key}", help="Add files from local directory."): + if c2.button( + f"Add **{name}** files from local folder", + use_container_width=True, + key=f"add_files_from_local_{key}", + help="Add files from local directory.", + ): files = [] local_dir = Path( local_dir @@ -194,7 +221,9 @@ def upload_widget( if os.path.isfile(f): shutil.copy(f, Path(files_dir, f.name)) elif os.path.isdir(f): - shutil.copytree(f, Path(files_dir, f.name), dirs_exist_ok=True) + shutil.copytree( + f, Path(files_dir, f.name), dirs_exist_ok=True + ) else: # Write the path to the local directories to the file with open(external_files, "a") as f_handle: @@ -203,16 +232,18 @@ def upload_widget( st.success("Successfully copied files!") if not TK_AVAILABLE: - c2.warning("**Warning**: Failed to import tkinter, either it is not installed, or this is being called from a cloud context. " "This function is not available in a Streamlit Cloud context. " - "You will have to manually enter the path to the folder with the MS files." - ) + c2.warning( + "**Warning**: Failed to import tkinter, either it is not installed, or this is being called from a cloud context. " + "This function is not available in a Streamlit Cloud context. " + "You will have to manually enter the path to the folder with the MS files." + ) if not use_copy: c2.warning( - "**Warning**: You have deselected the `Make a copy of files` option. " - "This **_assumes you know what you are doing_**. " - "This means that the original files will be used instead. " - ) + "**Warning**: You have deselected the `Make a copy of files` option. " + "This **_assumes you know what you are doing_**. " + "This means that the original files will be used instead. " + ) if fallback and not any(Path(files_dir).iterdir()): if isinstance(fallback, str): @@ -229,16 +260,26 @@ def upload_widget( ] else: if files_dir.exists(): - current_files = [f.name for f in files_dir.iterdir() if "external_files.txt" not in f.name] + current_files = [ + f.name + for f in files_dir.iterdir() + if "external_files.txt" not in f.name + ] # Check if local files are available - external_files = Path(self.workflow_dir, "input-files", key, "external_files.txt") + external_files = Path( + self.workflow_dir, "input-files", key, "external_files.txt" + ) if external_files.exists(): with open(external_files, "r") as f: external_files_list = f.read().splitlines() # Only make files available that still exist - current_files += [f"(local) {Path(f).name}" for f in external_files_list if os.path.exists(f)] + current_files += [ + f"(local) {Path(f).name}" + for f in external_files_list + if os.path.exists(f) + ] else: current_files = [] @@ -289,7 +330,9 @@ def select_input_file( options = [str(f) for f in path.iterdir() if "external_files.txt" not in str(f)] # Check if local files are available - external_files = Path(self.workflow_dir, "input-files", key, "external_files.txt") + external_files = Path( + self.workflow_dir, "input-files", key, "external_files.txt" + ) if external_files.exists(): with open(external_files, "r") as f: @@ -476,7 +519,9 @@ def format_files(input: Any) -> List[str]: help=help, ) elif isinstance(value, bool): - self.input_widget(key, value, widget_type="checkbox", name=name, help=help) + self.input_widget( + key, value, widget_type="checkbox", name=name, help=help + ) else: self.input_widget(key, value, widget_type="text", name=name, help=help) @@ -489,8 +534,9 @@ def input_TOPP( num_cols: int = 4, exclude_parameters: List[str] = [], include_parameters: List[str] = [], - display_full_parameter_names: bool = False, - display_subsections: bool = False, + display_tool_name: bool = True, + display_subsections: bool = True, + display_subsection_tabs: bool = False, custom_defaults: dict = {}, ) -> None: """ @@ -503,14 +549,25 @@ def input_TOPP( num_cols (int, optional): Number of columns to use for the layout. Defaults to 3. exclude_parameters (List[str], optional): List of parameter names to exclude from the widget. Defaults to an empty list. include_parameters (List[str], optional): List of parameter names to include in the widget. Defaults to an empty list. - display_full_parameter_names (bool, optional): Whether to display the full parameter names. Defaults to False. - display_subsections (bool, optional): Whether to split parameters into subsections based on the prefix (disables display_full_parameter_names). Defaults to False. + display_tool_name (bool, optional): Whether to display the TOPP tool name. Defaults to True. + display_subsections (bool, optional): Whether to split parameters into subsections based on the prefix. Defaults to True. + display_subsection_tabs (bool, optional): Whether to display main subsections in separate tabs (if more than one main section). Defaults to False. custom_defaults (dict, optional): Dictionary of custom defaults to use. Defaults to an empty dict. """ + + if not display_subsections: + display_subsection_tabs = False + if display_subsection_tabs: + display_subsections = True + # write defaults ini files ini_file_path = Path(self.parameter_manager.ini_dir, f"{topp_tool_name}.ini") if not ini_file_path.exists(): - subprocess.call([topp_tool_name, "-write_ini", str(ini_file_path)]) + try: + subprocess.call([topp_tool_name, "-write_ini", str(ini_file_path)]) + except FileNotFoundError: + st.error(f"TOPP tool **'{topp_tool_name}'** not found.") + return # update custom defaults if necessary if custom_defaults: param = poms.Param() @@ -525,7 +582,11 @@ def input_TOPP( param = poms.Param() poms.ParamXMLFile().load(str(ini_file_path), param) if include_parameters: - valid_keys = [key for key in param.keys() if any([k.encode() in key for k in include_parameters])] + valid_keys = [ + key + for key in param.keys() + if any([k.encode() in key for k in include_parameters]) + ] else: excluded_keys = [ "log", @@ -536,27 +597,40 @@ def input_TOPP( "version", "test", ] + exclude_parameters - valid_keys = [key for key in param.keys() if not (b"input file" in param.getTags(key) - or b"output file" in param.getTags(key) - or any([k.encode() in key for k in excluded_keys]))] - params_decoded = [] + valid_keys = [ + key + for key in param.keys() + if not ( + b"input file" in param.getTags(key) + or b"output file" in param.getTags(key) + or any([k.encode() in key for k in excluded_keys]) + ) + ] + params = [] for key in valid_keys: entry = param.getEntry(key) - tmp = { + p = { "name": entry.name.decode(), "key": key, "value": entry.value, "valid_strings": [v.decode() for v in entry.valid_strings], "description": entry.description.decode(), "advanced": (b"advanced" in param.getTags(key)), - "section_description": param.getSectionDescription(':'.join(key.decode().split(':')[:-1])) + "section_description": param.getSectionDescription( + ":".join(key.decode().split(":")[:-1]) + ), } - params_decoded.append(tmp) + # Parameter sections and subsections as string (e.g. "section:subsection") + if display_subsections: + p["sections"] = ":".join( + p["key"].decode().split(":1:")[1].split(":")[:-1] + ) + params.append(p) # for each parameter in params_decoded # if a parameter with custom default value exists, use that value # else check if the parameter is already in self.params, if yes take the value from self.params - for p in params_decoded: + for p in params: name = p["key"].decode().split(":1:")[1] if topp_tool_name in self.params: if name in self.params[topp_tool_name]: @@ -566,97 +640,144 @@ def input_TOPP( elif name in custom_defaults: p["value"] = custom_defaults[name] - # show input widgets - section_description = None - cols = st.columns(num_cols) - i = 0 - - for p in params_decoded: - # skip avdanced parameters if not selected - if not st.session_state["advanced"] and p["advanced"]: - continue + # Split into subsections if required + param_sections = {} + section_descriptions = {} + if display_subsections: + for p in params: + # Skip adavnaced parameters if not selected + if not st.session_state["advanced"] and p["advanced"]: + continue + # Add section description to section_descriptions dictionary if it exists + if p["section_description"]: + section_descriptions[p["sections"]] = p["section_description"] + # Add parameter to appropriate section in param_sections dictionary + if not p["sections"]: + p["sections"] = "General" + if p["sections"] in param_sections: + param_sections[p["sections"]].append(p) + else: + param_sections[p["sections"]] = [p] + else: + # Simply put all parameters in "all" section if no subsections required + param_sections["all"] = params + + # Display tool name if required + if display_tool_name: + st.markdown(f"**{topp_tool_name}**") + + tab_names = [k for k in param_sections.keys() if ":" not in k] + tabs = None + if tab_names and display_subsection_tabs: + tabs = st.tabs([k for k in param_sections.keys() if ":" not in k]) + + # Show input widgets + def show_subsection_header(section: str, display_subsections: bool): + # Display section name and help text (section description) if required + if section and display_subsections: + parts = section.split(":") + st.markdown( + ":".join(parts[:-1]) + + (":" if len(parts) > 1 else "") + + f"**{parts[-1]}**", + help=( + section_descriptions[section] + if section in section_descriptions + else None + ), + ) - key = f"{self.parameter_manager.topp_param_prefix}{p['key'].decode()}" - if display_subsections: + def display_TOPP_params(params: dict, num_cols): + """Displays individual TOPP parameters in given number of columns""" + cols = st.columns(num_cols) + i = 0 + for p in params: + # get key and name + key = f"{self.parameter_manager.topp_param_prefix}{p['key'].decode()}" name = p["name"] - if section_description is None: - section_description = p['section_description'] + try: + # sometimes strings with newline, handle as list + if isinstance(p["value"], str) and "\n" in p["value"]: + p["value"] = p["value"].split("\n") + # bools + if isinstance(p["value"], bool): + cols[i].markdown("##") + cols[i].checkbox( + name, + value=( + (p["value"] == "true") + if type(p["value"]) == str + else p["value"] + ), + help=p["description"], + key=key, + ) - elif section_description != p['section_description']: - section_description = p['section_description'] - st.markdown(f"**{section_description}**") - cols = st.columns(num_cols) - i = 0 - elif display_full_parameter_names: - name = key.split(":1:")[1].replace("algorithm:", "").replace(":", " : ") - else: - name = p["name"] - try: - # # sometimes strings with newline, handle as list - if isinstance(p["value"], str) and "\n" in p["value"]: - p["value"] = p["value"].split("\n") - # bools - if isinstance(p["value"], bool): - cols[i].markdown("##") - cols[i].checkbox( - name, - value=(p["value"] == "true") if type(p["value"]) == str else p["value"], - help=p["description"], - key=key, - ) + # strings + elif isinstance(p["value"], str): + # string options + if p["valid_strings"]: + cols[i].selectbox( + name, + options=p["valid_strings"], + index=p["valid_strings"].index(p["value"]), + help=p["description"], + key=key, + ) + else: + cols[i].text_input( + name, value=p["value"], help=p["description"], key=key + ) + + # ints + elif isinstance(p["value"], int): + cols[i].number_input( + name, value=int(p["value"]), help=p["description"], key=key + ) - # strings - elif isinstance(p["value"], str): - # string options - if p["valid_strings"]: - cols[i].selectbox( + # floats + elif isinstance(p["value"], float): + cols[i].number_input( name, - options=p["valid_strings"], - index=p["valid_strings"].index(p["value"]), + value=float(p["value"]), + step=1.0, help=p["description"], key=key, ) - else: - cols[i].text_input( - name, value=p["value"], help=p["description"], key=key + + # lists + elif isinstance(p["value"], list): + p["value"] = [ + v.decode() if isinstance(v, bytes) else v + for v in p["value"] + ] + cols[i].text_area( + name, + value="\n".join([str(val) for val in p["value"]]), + help=p["description"], + key=key, ) - # ints - elif isinstance(p["value"], int): - cols[i].number_input( - name, value=int(p["value"]), help=p["description"], key=key - ) + # increment number of columns, create new cols object if end of line is reached + i += 1 + if i == num_cols: + i = 0 + cols = st.columns(num_cols) + except Exception as e: + cols[i].error(f"Error in parameter **{p['name']}**.") + print('Error parsing "' + p["name"] + '": ' + str(e)) - # floats - elif isinstance(p["value"], float): - cols[i].number_input( - name, - value=float(p["value"]), - step=1.0, - help=p["description"], - key=key, - ) - # lists - elif isinstance(p["value"], list): - p["value"] = [ - v.decode() if isinstance(v, bytes) else v for v in p["value"] - ] - cols[i].text_area( - name, - value="\n".join([str(val) for val in p["value"]]), - help=p["description"], - key=key, - ) - - # increment number of columns, create new cols object if end of line is reached - i += 1 - if i == num_cols: - i = 0 - cols = st.columns(num_cols) - except Exception as e: - cols[i].error(f"Error in parameter **{p['name']}**.") - print("Error parsing \""+ p['name'] + "\": " + str(e)) + for section, params in param_sections.items(): + if tabs is None: + show_subsection_header(section, display_subsections) + display_TOPP_params(params, num_cols) + else: + tab_name = section.split(":")[0] + with tabs[tab_names.index(tab_name)]: + show_subsection_header(section, display_subsections) + display_TOPP_params(params, num_cols) + def input_python( self, @@ -664,22 +785,22 @@ def input_python( num_cols: int = 3, ) -> None: """ - Dynamically generates and displays input widgets based on the DEFAULTS - dictionary defined in a specified Python script file. - - For each entry in the DEFAULTS dictionary, an input widget is displayed, - allowing the user to specify values for the parameters defined in the - script. The widgets are arranged in a grid with a specified number of - columns. Parameters can be marked as hidden or advanced within the DEFAULTS - dictionary; hidden parameters are not displayed, and advanced parameters - are displayed only if the user has selected to view advanced options. - - Args: - script_file (str): The file name or path to the Python script containing - the DEFAULTS dictionary. If the path is omitted, the method searches in - src/python-tools/'. - num_cols (int, optional): The number of columns to use for displaying input widgets. Defaults to 3. - """ + Dynamically generates and displays input widgets based on the DEFAULTS + dictionary defined in a specified Python script file. + + For each entry in the DEFAULTS dictionary, an input widget is displayed, + allowing the user to specify values for the parameters defined in the + script. The widgets are arranged in a grid with a specified number of + columns. Parameters can be marked as hidden or advanced within the DEFAULTS + dictionary; hidden parameters are not displayed, and advanced parameters + are displayed only if the user has selected to view advanced options. + + Args: + script_file (str): The file name or path to the Python script containing + the DEFAULTS dictionary. If the path is omitted, the method searches in + src/python-tools/'. + num_cols (int, optional): The number of columns to use for displaying input widgets. Defaults to 3. + """ # Check if script file exists (can be specified without path and extension) # default location: src/python-tools/script_file @@ -780,7 +901,9 @@ def zip_and_download_files(self, directory: str): with zipfile.ZipFile(bytes_io, "w", zipfile.ZIP_DEFLATED) as zip_file: for i, file_path in enumerate(files): - if file_path.is_file(): # Ensure we're only adding files, not directories + if ( + file_path.is_file() + ): # Ensure we're only adding files, not directories # Preserve directory structure relative to the original directory zip_file.write(file_path, file_path.relative_to(directory.parent)) my_bar.progress((i + 1) / n_files) # Update progress bar @@ -794,7 +917,7 @@ def zip_and_download_files(self, directory: str): data=bytes_io, file_name="input-files.zip", mime="application/zip", - use_container_width=True + use_container_width=True, ) def file_upload_section(self, custom_upload_function) -> None: @@ -838,23 +961,25 @@ def execution_section(self, start_workflow_function) -> None: summary_text += f""" {key}: **{value}** -""" +""" elif value: summary_text += f""" **{key}**: -""" +""" for k, v in value.items(): summary_text += f""" {key}: **{v}** -""" +""" with st.expander("**Parameter Summary**"): st.markdown(summary_text) c1, c2 = st.columns(2) # Select log level, this can be changed at run time or later without re-running the workflow - log_level = c1.selectbox("log details", ["minimal", "commands and run times", "all"], key="log_level") + log_level = c1.selectbox( + "log details", ["minimal", "commands and run times", "all"], key="log_level" + ) if self.executor.pid_dir.exists(): if c1.button("Stop Workflow", type="primary", use_container_width=True): self.executor.stop() @@ -871,7 +996,9 @@ def execution_section(self, start_workflow_function) -> None: time.sleep(2) st.rerun() else: - st.markdown(f"**Workflow log file: {datetime.fromtimestamp(log_path.stat().st_ctime).strftime('%Y-%m-%d %H:%M')} CET**") + st.markdown( + f"**Workflow log file: {datetime.fromtimestamp(log_path.stat().st_ctime).strftime('%Y-%m-%d %H:%M')} CET**" + ) with open(log_path, "r", encoding="utf-8") as f: content = f.read() # Check if workflow finished successfully