diff --git a/src/UmetaFlowTOPPWorkflow.py b/src/UmetaFlowTOPPWorkflow.py index 2a93889..6c3bc54 100644 --- a/src/UmetaFlowTOPPWorkflow.py +++ b/src/UmetaFlowTOPPWorkflow.py @@ -91,7 +91,6 @@ def configure(self) -> None: with t[4]: self.ui.input_TOPP( "FeatureLinkerUnlabeledKD", - display_full_parameter_names=True, ) with tabs[1]: self.ui.input_widget( @@ -180,6 +179,7 @@ def configure(self) -> None: help="Password from a valid SIRIUS account. **Not encrypted**, will be stored in **plain text** in parameters and show up in log files.", widget_type="password", ) + st.markdown("**SIRIUS**") self.ui.input_widget( "run-sirius", False, diff --git a/src/workflow/StreamlitUI.py b/src/workflow/StreamlitUI.py index 945708f..37c769f 100644 --- a/src/workflow/StreamlitUI.py +++ b/src/workflow/StreamlitUI.py @@ -14,7 +14,13 @@ from datetime import datetime -from src.common.common import OS_PLATFORM, TK_AVAILABLE, tk_directory_dialog, tk_file_dialog +from src.common.common import ( + OS_PLATFORM, + TK_AVAILABLE, + tk_directory_dialog, + tk_file_dialog, +) + class StreamlitUI: """ @@ -37,7 +43,7 @@ def upload_widget( key: str, file_types: Union[str, List[str]], name: str = "", - fallback: Union[List, str] = None + fallback: Union[List, str] = None, ) -> None: """ Handles file uploads through the Streamlit interface, supporting both direct @@ -55,12 +61,13 @@ def upload_widget( # create the files dir files_dir.mkdir(exist_ok=True, parents=True) - # check if only fallback files are in files_dir, if yes, reset the directory before adding new files - if [Path(f).name for f in Path(files_dir).iterdir()] == [ - Path(f).name for f in fallback - ]: - shutil.rmtree(files_dir) - files_dir.mkdir() + if fallback is not None: + # check if only fallback files are in files_dir, if yes, reset the directory before adding new files + if [Path(f).name for f in Path(files_dir).iterdir()] == [ + Path(f).name for f in fallback + ]: + shutil.rmtree(files_dir) + files_dir.mkdir() if not name: name = key.replace("-", " ") @@ -71,14 +78,19 @@ def upload_widget( if st.session_state.location == "local": c2_text, c2_checkbox = c2.columns([1.5, 1], gap="large") c2_text.markdown("**OR add files from local folder**") - use_copy = c2_checkbox.checkbox("Make a copy of files", key=f"{key}-copy_files", value=True, help="Create a copy of files in workspace.") + use_copy = c2_checkbox.checkbox( + "Make a copy of files", + key=f"{key}-copy_files", + value=True, + help="Create a copy of files in workspace.", + ) else: use_copy = True # Convert file_types to a list if it's a string if isinstance(file_types, str): file_types = [file_types] - + if use_copy: with c1.form(f"{key}-upload", clear_on_submit=True): # Streamlit file uploader accepts file types as a list or None @@ -99,9 +111,9 @@ def upload_widget( files = [files] for f in files: # Check if file type is in the list of accepted file types - if f.name not in [f.name for f in files_dir.iterdir()] and any( - f.name.endswith(ft) for ft in file_types - ): + if f.name not in [ + f.name for f in files_dir.iterdir() + ] and any(f.name.endswith(ft) for ft in file_types): with open(Path(files_dir, f.name), "wb") as fh: fh.write(f.getbuffer()) st.success("Successfully added uploaded files!") @@ -123,7 +135,7 @@ def upload_widget( help="Browse for your local MS data files.", disabled=not TK_AVAILABLE, ) - + # Tk file dialog requires file types to be a list of tuples if isinstance(file_types, str): tk_file_types = [(f"{file_types}", f"*.{file_types}")] @@ -131,8 +143,7 @@ def upload_widget( tk_file_types = [(f"{ft}", f"*.{ft}") for ft in file_types] else: raise ValueError("'file_types' must be either of type str or list") - - + if dialog_button: local_files = tk_file_dialog( "Select your local MS data files", @@ -146,7 +157,7 @@ def upload_widget( f_handle.write(f"{f}\n") my_bar.empty() st.success("Successfully added files!") - + st.session_state["previous_dir"] = Path(local_files[0]).parent # Local file upload option: via directory path @@ -159,15 +170,32 @@ def upload_widget( with st_cols[0]: st.write("\n") st.write("\n") - dialog_button = st.button("📁", key='local_browse', help="Browse for your local directory with MS data.", disabled=not TK_AVAILABLE) + dialog_button = st.button( + "📁", + key=f"local_browse_{key}", + help="Browse for your local directory with MS data.", + disabled=not TK_AVAILABLE, + ) if dialog_button: - st.session_state["local_dir"] = tk_directory_dialog("Select directory with your MS data", st.session_state["previous_dir"]) + st.session_state["local_dir"] = tk_directory_dialog( + "Select directory with your MS data", + st.session_state["previous_dir"], + ) st.session_state["previous_dir"] = st.session_state["local_dir"] with st_cols[1]: - local_dir = st.text_input(f"path to folder with **{name}** files", value=st.session_state["local_dir"]) + local_dir = st.text_input( + f"path to folder with **{name}** files", + key=f"path_to_folder_{key}", + value=st.session_state["local_dir"], + ) - if c2.button(f"Add **{name}** files from local folder", use_container_width=True): + if c2.button( + f"Add **{name}** files from local folder", + use_container_width=True, + key=f"add_files_from_local_{key}", + help="Add files from local directory.", + ): files = [] local_dir = Path( local_dir @@ -193,7 +221,9 @@ def upload_widget( if os.path.isfile(f): shutil.copy(f, Path(files_dir, f.name)) elif os.path.isdir(f): - shutil.copytree(f, Path(files_dir, f.name), dirs_exist_ok=True) + shutil.copytree( + f, Path(files_dir, f.name), dirs_exist_ok=True + ) else: # Write the path to the local directories to the file with open(external_files, "a") as f_handle: @@ -202,16 +232,18 @@ def upload_widget( st.success("Successfully copied files!") if not TK_AVAILABLE: - c2.warning("**Warning**: Failed to import tkinter, either it is not installed, or this is being called from a cloud context. " "This function is not available in a Streamlit Cloud context. " - "You will have to manually enter the path to the folder with the MS files." - ) + c2.warning( + "**Warning**: Failed to import tkinter, either it is not installed, or this is being called from a cloud context. " + "This function is not available in a Streamlit Cloud context. " + "You will have to manually enter the path to the folder with the MS files." + ) if not use_copy: c2.warning( - "**Warning**: You have deselected the `Make a copy of files` option. " - "This **_assumes you know what you are doing_**. " - "This means that the original files will be used instead. " - ) + "**Warning**: You have deselected the `Make a copy of files` option. " + "This **_assumes you know what you are doing_**. " + "This means that the original files will be used instead. " + ) if fallback and not any(Path(files_dir).iterdir()): if isinstance(fallback, str): @@ -228,16 +260,26 @@ def upload_widget( ] else: if files_dir.exists(): - current_files = [f.name for f in files_dir.iterdir() if "external_files.txt" not in f.name] + current_files = [ + f.name + for f in files_dir.iterdir() + if "external_files.txt" not in f.name + ] # Check if local files are available - external_files = Path(self.workflow_dir, "input-files", key, "external_files.txt") + external_files = Path( + self.workflow_dir, "input-files", key, "external_files.txt" + ) if external_files.exists(): with open(external_files, "r") as f: external_files_list = f.read().splitlines() # Only make files available that still exist - current_files += [f"(local) {Path(f).name}" for f in external_files_list if os.path.exists(f)] + current_files += [ + f"(local) {Path(f).name}" + for f in external_files_list + if os.path.exists(f) + ] else: current_files = [] @@ -288,7 +330,9 @@ def select_input_file( options = [str(f) for f in path.iterdir() if "external_files.txt" not in str(f)] # Check if local files are available - external_files = Path(self.workflow_dir, "input-files", key, "external_files.txt") + external_files = Path( + self.workflow_dir, "input-files", key, "external_files.txt" + ) if external_files.exists(): with open(external_files, "r") as f: @@ -475,7 +519,9 @@ def format_files(input: Any) -> List[str]: help=help, ) elif isinstance(value, bool): - self.input_widget(key, value, widget_type="checkbox", name=name, help=help) + self.input_widget( + key, value, widget_type="checkbox", name=name, help=help + ) else: self.input_widget(key, value, widget_type="text", name=name, help=help) @@ -488,8 +534,8 @@ def input_TOPP( num_cols: int = 4, exclude_parameters: List[str] = [], include_parameters: List[str] = [], - display_full_parameter_names: bool = False, - display_subsections: bool = False, + display_tool_name: bool = True, + display_subsections: bool = True, custom_defaults: dict = {}, ) -> None: """ @@ -502,14 +548,18 @@ def input_TOPP( num_cols (int, optional): Number of columns to use for the layout. Defaults to 3. exclude_parameters (List[str], optional): List of parameter names to exclude from the widget. Defaults to an empty list. include_parameters (List[str], optional): List of parameter names to include in the widget. Defaults to an empty list. - display_full_parameter_names (bool, optional): Whether to display the full parameter names. Defaults to False. - display_subsections (bool, optional): Whether to split parameters into subsections based on the prefix (disables display_full_parameter_names). Defaults to False. + display_tool_name (bool, optional): Whether to display the TOPP tool name. Defaults to True. + display_subsections (bool, optional): Whether to split parameters into subsections based on the prefix. Defaults to True. custom_defaults (dict, optional): Dictionary of custom defaults to use. Defaults to an empty dict. """ # write defaults ini files ini_file_path = Path(self.parameter_manager.ini_dir, f"{topp_tool_name}.ini") if not ini_file_path.exists(): - subprocess.call([topp_tool_name, "-write_ini", str(ini_file_path)]) + try: + subprocess.call([topp_tool_name, "-write_ini", str(ini_file_path)]) + except FileNotFoundError: + st.error(f"TOPP tool **'{topp_tool_name}'** not found.") + return # update custom defaults if necessary if custom_defaults: param = poms.Param() @@ -524,7 +574,11 @@ def input_TOPP( param = poms.Param() poms.ParamXMLFile().load(str(ini_file_path), param) if include_parameters: - valid_keys = [key for key in param.keys() if any([k.encode() in key for k in include_parameters])] + valid_keys = [ + key + for key in param.keys() + if any([k.encode() in key for k in include_parameters]) + ] else: excluded_keys = [ "log", @@ -535,27 +589,40 @@ def input_TOPP( "version", "test", ] + exclude_parameters - valid_keys = [key for key in param.keys() if not (b"input file" in param.getTags(key) - or b"output file" in param.getTags(key) - or any([k.encode() in key for k in excluded_keys]))] - params_decoded = [] + valid_keys = [ + key + for key in param.keys() + if not ( + b"input file" in param.getTags(key) + or b"output file" in param.getTags(key) + or any([k.encode() in key for k in excluded_keys]) + ) + ] + params = [] for key in valid_keys: entry = param.getEntry(key) - tmp = { + p = { "name": entry.name.decode(), "key": key, "value": entry.value, "valid_strings": [v.decode() for v in entry.valid_strings], "description": entry.description.decode(), "advanced": (b"advanced" in param.getTags(key)), - "section_description": param.getSectionDescription(':'.join(key.decode().split(':')[:-1])) + "section_description": param.getSectionDescription( + ":".join(key.decode().split(":")[:-1]) + ), } - params_decoded.append(tmp) + # Parameter sections and subsections as string (e.g. "section:subsection") + if display_subsections: + p["sections"] = ":".join( + p["key"].decode().split(":1:")[1].split(":")[:-1] + ) + params.append(p) # for each parameter in params_decoded # if a parameter with custom default value exists, use that value # else check if the parameter is already in self.params, if yes take the value from self.params - for p in params_decoded: + for p in params: name = p["key"].decode().split(":1:")[1] if topp_tool_name in self.params: if name in self.params[topp_tool_name]: @@ -565,97 +632,122 @@ def input_TOPP( elif name in custom_defaults: p["value"] = custom_defaults[name] - # show input widgets - section_description = None - cols = st.columns(num_cols) - i = 0 - - for p in params_decoded: - # skip avdanced parameters if not selected - if not st.session_state["advanced"] and p["advanced"]: - continue - - key = f"{self.parameter_manager.topp_param_prefix}{p['key'].decode()}" - if display_subsections: - name = p["name"] - if section_description is None: - section_description = p['section_description'] - - elif section_description != p['section_description']: - section_description = p['section_description'] - st.markdown(f"**{section_description}**") - cols = st.columns(num_cols) - i = 0 - elif display_full_parameter_names: - name = key.split(":1:")[1].replace("algorithm:", "").replace(":", " : ") - else: + # Split into subsections if required + param_sections = {} + section_descriptions = {} + if display_subsections: + for p in params: + # Skip adavnaced parameters if not selected + if not st.session_state["advanced"] and p["advanced"]: + continue + # Add section description to section_descriptions dictionary if it exists + if p["section_description"]: + section_descriptions[p["sections"]] = p["section_description"] + # Add parameter to appropriate section in param_sections dictionary + if p["sections"] in param_sections: + param_sections[p["sections"]].append(p) + else: + param_sections[p["sections"]] = [p] + else: + # Simply put all parameters in "all" section if no subsections required + param_sections["all"] = params + + # Display tool name if required + if display_tool_name: + st.markdown(f"**{topp_tool_name}**") + + # Show input widgets + for section, params in param_sections.items(): + # Display section name and help text (section description) if required + if section and display_subsections: + parts = section.split(":") + st.markdown( + ":".join(parts[:-1]) + + (":" if len(parts) > 1 else "") + + f"**{parts[-1]}**", + help=( + section_descriptions[section] + if section in section_descriptions + else None + ), + ) + cols = st.columns(num_cols) + i = 0 + for p in params: + # get key and name + key = f"{self.parameter_manager.topp_param_prefix}{p['key'].decode()}" name = p["name"] - try: - # # sometimes strings with newline, handle as list - if isinstance(p["value"], str) and "\n" in p["value"]: - p["value"] = p["value"].split("\n") - # bools - if isinstance(p["value"], bool): - cols[i].markdown("##") - cols[i].checkbox( - name, - value=(p["value"] == "true") if type(p["value"]) == str else p["value"], - help=p["description"], - key=key, - ) - - # strings - elif isinstance(p["value"], str): - # string options - if p["valid_strings"]: - cols[i].selectbox( + try: + # sometimes strings with newline, handle as list + if isinstance(p["value"], str) and "\n" in p["value"]: + p["value"] = p["value"].split("\n") + # bools + if isinstance(p["value"], bool): + cols[i].markdown("##") + cols[i].checkbox( name, - options=p["valid_strings"], - index=p["valid_strings"].index(p["value"]), + value=( + (p["value"] == "true") + if type(p["value"]) == str + else p["value"] + ), help=p["description"], key=key, ) - else: - cols[i].text_input( - name, value=p["value"], help=p["description"], key=key - ) - # ints - elif isinstance(p["value"], int): - cols[i].number_input( - name, value=int(p["value"]), help=p["description"], key=key - ) + # strings + elif isinstance(p["value"], str): + # string options + if p["valid_strings"]: + cols[i].selectbox( + name, + options=p["valid_strings"], + index=p["valid_strings"].index(p["value"]), + help=p["description"], + key=key, + ) + else: + cols[i].text_input( + name, value=p["value"], help=p["description"], key=key + ) + + # ints + elif isinstance(p["value"], int): + cols[i].number_input( + name, value=int(p["value"]), help=p["description"], key=key + ) - # floats - elif isinstance(p["value"], float): - cols[i].number_input( - name, - value=float(p["value"]), - step=1.0, - help=p["description"], - key=key, - ) + # floats + elif isinstance(p["value"], float): + cols[i].number_input( + name, + value=float(p["value"]), + step=1.0, + help=p["description"], + key=key, + ) - # lists - elif isinstance(p["value"], list): - p["value"] = [ - v.decode() if isinstance(v, bytes) else v for v in p["value"] - ] - cols[i].text_area( - name, - value="\n".join([str(val) for val in p["value"]]), - help=p["description"], - key=key, - ) + # lists + elif isinstance(p["value"], list): + p["value"] = [ + v.decode() if isinstance(v, bytes) else v + for v in p["value"] + ] + cols[i].text_area( + name, + value="\n".join([str(val) for val in p["value"]]), + help=p["description"], + key=key, + ) - # increment number of columns, create new cols object if end of line is reached - i += 1 - if i == num_cols: - i = 0 - cols = st.columns(num_cols) - except Exception as e: - cols[i].error(f"Error in parameter **{p['name']}**.") - print("Error parsing \""+ p['name'] + "\": " + str(e)) + # increment number of columns, create new cols object if end of line is reached + i += 1 + if i == num_cols: + i = 0 + cols = st.columns(num_cols) + except Exception as e: + cols[i].error(f"Error in parameter **{p['name']}**.") + print('Error parsing "' + p["name"] + '": ' + str(e)) def input_python( self, @@ -663,22 +755,22 @@ def input_python( num_cols: int = 3, ) -> None: """ - Dynamically generates and displays input widgets based on the DEFAULTS - dictionary defined in a specified Python script file. - - For each entry in the DEFAULTS dictionary, an input widget is displayed, - allowing the user to specify values for the parameters defined in the - script. The widgets are arranged in a grid with a specified number of - columns. Parameters can be marked as hidden or advanced within the DEFAULTS - dictionary; hidden parameters are not displayed, and advanced parameters - are displayed only if the user has selected to view advanced options. - - Args: - script_file (str): The file name or path to the Python script containing - the DEFAULTS dictionary. If the path is omitted, the method searches in - src/python-tools/'. - num_cols (int, optional): The number of columns to use for displaying input widgets. Defaults to 3. - """ + Dynamically generates and displays input widgets based on the DEFAULTS + dictionary defined in a specified Python script file. + + For each entry in the DEFAULTS dictionary, an input widget is displayed, + allowing the user to specify values for the parameters defined in the + script. The widgets are arranged in a grid with a specified number of + columns. Parameters can be marked as hidden or advanced within the DEFAULTS + dictionary; hidden parameters are not displayed, and advanced parameters + are displayed only if the user has selected to view advanced options. + + Args: + script_file (str): The file name or path to the Python script containing + the DEFAULTS dictionary. If the path is omitted, the method searches in + src/python-tools/'. + num_cols (int, optional): The number of columns to use for displaying input widgets. Defaults to 3. + """ # Check if script file exists (can be specified without path and extension) # default location: src/python-tools/script_file @@ -779,7 +871,9 @@ def zip_and_download_files(self, directory: str): with zipfile.ZipFile(bytes_io, "w", zipfile.ZIP_DEFLATED) as zip_file: for i, file_path in enumerate(files): - if file_path.is_file(): # Ensure we're only adding files, not directories + if ( + file_path.is_file() + ): # Ensure we're only adding files, not directories # Preserve directory structure relative to the original directory zip_file.write(file_path, file_path.relative_to(directory.parent)) my_bar.progress((i + 1) / n_files) # Update progress bar @@ -793,7 +887,7 @@ def zip_and_download_files(self, directory: str): data=bytes_io, file_name="input-files.zip", mime="application/zip", - use_container_width=True + use_container_width=True, ) def file_upload_section(self, custom_upload_function) -> None: @@ -837,23 +931,25 @@ def execution_section(self, start_workflow_function) -> None: summary_text += f""" {key}: **{value}** -""" +""" elif value: summary_text += f""" **{key}**: -""" +""" for k, v in value.items(): summary_text += f""" {key}: **{v}** -""" +""" with st.expander("**Parameter Summary**"): st.markdown(summary_text) c1, c2 = st.columns(2) # Select log level, this can be changed at run time or later without re-running the workflow - log_level = c1.selectbox("log details", ["minimal", "commands and run times", "all"], key="log_level") + log_level = c1.selectbox( + "log details", ["minimal", "commands and run times", "all"], key="log_level" + ) if self.executor.pid_dir.exists(): if c1.button("Stop Workflow", type="primary", use_container_width=True): self.executor.stop() @@ -870,7 +966,9 @@ def execution_section(self, start_workflow_function) -> None: time.sleep(2) st.rerun() else: - st.markdown(f"**Workflow log file: {datetime.fromtimestamp(log_path.stat().st_ctime).strftime('%Y-%m-%d %H:%M')} CET**") + st.markdown( + f"**Workflow log file: {datetime.fromtimestamp(log_path.stat().st_ctime).strftime('%Y-%m-%d %H:%M')} CET**" + ) with open(log_path, "r", encoding="utf-8") as f: content = f.read() # Check if workflow finished successfully