diff --git a/app.py b/app.py index 337d6cbb..5588a0b4 100644 --- a/app.py +++ b/app.py @@ -25,20 +25,20 @@ def main(): if "local" in sys.argv: main() + # If not in local mode, assume it's hosted/online mode else: - - # WORK LIKE MULTIPAGE APP + # If captcha control is not in session state or set to False if 'controllo' not in st.session_state or st.session_state['controllo'] == False: - #delete pages - delete_page("app", "File_Upload") - delete_page("app", "Analyze") - delete_page("app", "Result_View") - #apply captcha + # hide app pages as long as captcha not solved + #delete_all_pages("app") + + # Apply captcha control to verify the user captcha_control() - else: - #run main + + else: + # Run the main function main() - #add all pages back - add_page("app", "File_Upload") - add_page("app", "Analyze") - add_page("app", "Result_View") \ No newline at end of file + + # Restore all pages (assuming "app" is the main page) + #restore_all_pages("app") + \ No newline at end of file diff --git "a/pages/0_\360\237\223\201_File_Upload.py" "b/pages/0_\360\237\223\201_File_Upload.py" index e6bdd502..916d44bf 100755 --- "a/pages/0_\360\237\223\201_File_Upload.py" +++ "b/pages/0_\360\237\223\201_File_Upload.py" @@ -7,16 +7,10 @@ params = page_setup() -#if local no need captcha -if st.session_state.location == "local": - params["controllo"] = True - st.session_state["controllo"] = True - -#if controllo is false means not captcha applied +# If run in hosted mode, show captcha as long as it has not been solved if 'controllo' not in st.session_state or params["controllo"] == False: - #apply captcha - captcha_control() - + # Apply captcha by calling the captcha_control function + captcha_control() ### main content of page diff --git "a/pages/1_\342\232\231\357\270\217_Analyze.py" "b/pages/1_\342\232\231\357\270\217_Analyze.py" index 83da6f2f..235510b7 100755 --- "a/pages/1_\342\232\231\357\270\217_Analyze.py" +++ "b/pages/1_\342\232\231\357\270\217_Analyze.py" @@ -9,19 +9,14 @@ from src.ini2dec import * import threading from src.captcha_ import * +from src.run_subprocess import * params = page_setup() -#if local no need captcha -if st.session_state.location == "local": - params["controllo"] = True - st.session_state["controllo"] = True - -#if controllo is false means not captcha applied +# If run in hosted mode, show captcha as long as it has not been solved if 'controllo' not in st.session_state or params["controllo"] == False: - #apply captcha - captcha_control() - + # Apply captcha by calling the captcha_control function + captcha_control() ### main content of page @@ -190,57 +185,6 @@ result_dict["success"] = False result_dict["log"] = " " -def run_subprocess(args, variables, result_dict): - """ - run subprocess e-g: NuXL command - - Args: - args: command with args - variables: variable if any - result_dict: contain success (success flag) and log (capture long log) - should contain result_dict["success"], result_dict["log"] - - Returns: - None - """ - #st.write("inside run_subprocess") - #process = subprocess.Popen(args + list(variables), stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True, text=True) - # run subprocess and get every line of executable log in same time - process = subprocess.Popen(args, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True) - - stdout_ = [] - stderr_ = [] - - while True: - output = process.stdout.readline() - if output == '' and process.poll() is not None: - break - if output: - #print every line of exec on page - st.text(output.strip()) - #append line to store log - stdout_.append(output.strip()) - - while True: - error = process.stderr.readline() - if error == '' and process.poll() is not None: - break - if error: - #print every line of exec on page even error - st.error(error.strip()) - #append line to store log of error - stderr_.append(error.strip()) - - #check if process run successfully - if process.returncode == 0: - result_dict["success"] = True - #save in to log all lines - result_dict["log"] = " ".join(stdout_) - else: - result_dict["success"] = False - #save in to log all lines even process cause error - result_dict["log"] = " ".join(stderr_) - #create terminate flag from even function terminate_flag = threading.Event() terminate_flag.set() diff --git "a/pages/2_\360\237\223\212_Result_View.py" "b/pages/2_\360\237\223\212_Result_View.py" index 2e46cfa4..56a0899f 100755 --- "a/pages/2_\360\237\223\212_Result_View.py" +++ "b/pages/2_\360\237\223\212_Result_View.py" @@ -8,14 +8,9 @@ params = page_setup() -#if local no need captcha -if st.session_state.location == "local": - params["controllo"] = True - st.session_state["controllo"] = True - -#if controllo is false means not captcha applied +# If run in hosted mode, show captcha as long as it has not been solved if 'controllo' not in st.session_state or params["controllo"] == False: - #apply captcha + # Apply captcha by calling the captcha_control function captcha_control() ### main content of page diff --git a/src/captcha_.py b/src/captcha_.py index 90f29062..5e7d0bc9 100644 --- a/src/captcha_.py +++ b/src/captcha_.py @@ -1,7 +1,6 @@ +import os from pathlib import Path import streamlit as st -from captcha.image import ImageCaptcha -import random, string from streamlit.source_util import ( page_icon_and_name, calc_md5, @@ -9,70 +8,180 @@ _on_pages_changed ) -def delete_page(main_script_path_str, page_name): +from captcha.image import ImageCaptcha +import random, string + +def delete_all_pages(main_script_path_str: str) -> None: """ - delete page from app + Delete all pages except the main page from an app's configuration. Args: - main_script_path_str: main page (e-g app) - page_name: Page want to delete (e-g Analyze) + main_script_path_str (str): The name of the main page, typically the app's name. Returns: None + """ - #get all pages + # Get all pages from the app's configuration current_pages = get_pages(main_script_path_str) + + # Create a list to store keys pages to delete + keys_to_delete = [] + + # Iterate over all pages and add keys to delete list if the desired page is found + for key, value in current_pages.items(): + if value['page_name'] != main_script_path_str: + keys_to_delete.append(key) + + # Delete the keys from current pages + for key in keys_to_delete: + del current_pages[key] + + # Refresh the pages configuration + _on_pages_changed.send() + +def delete_page(main_script_path_str: str, page_name: str) -> None: + """ + Delete a specific page from an app's configuration. + + Args: + main_script_path_str (str): The name of the main page, typically the app's name. + page_name (str): The name of the page to be deleted. + + Returns: + None + """ + # Get all pages + current_pages= get_pages(main_script_path_str) - #iterate over all pages and del if desire page found + # Iterate over all pages and delete the desired page if found for key, value in current_pages.items(): if value['page_name'] == page_name: del current_pages[key] - break - else: - pass - #refresh the pages config + # Refresh the pages configuration _on_pages_changed.send() - -def add_page(main_script_path_str, page_name): + + +def restore_all_pages(main_script_path_str: str) -> None: """ - add page in app + restore all pages found in the "pages" directory to an app's configuration. Args: - main_script_path_str: main page (e-g app) - page_name: Page want to delete (e-g Analyze) + main_script_path_str (str): The name of the main page, typically the app's name. Returns: None """ - #get all pages + # Get all pages pages = get_pages(main_script_path_str) + # Obtain the path to the main script main_script_path = Path(main_script_path_str) + + # Define the directory where pages are stored pages_dir = main_script_path.parent / "pages" + + # To store the pages for later, to add in ascending order + pages_temp = [] + + # Iterate over all .py files in the "pages" directory + for script_path in pages_dir.glob("*.py"): + + # append path with file name + script_path_str = str(script_path.resolve()) + + # Calculate the MD5 hash of the script path + psh = calc_md5(script_path_str) + + # Obtain the page icon and name + pi, pn = page_icon_and_name(script_path) + + # Extract the index from the page name + index = int(os.path.basename(script_path.stem).split("_")[0]) + + # Add the page data to the temporary list + pages_temp.append((index, { + "page_script_hash": psh, + "page_name": pn, + "icon": pi, + "script_path": script_path_str, + })) + + # Sort the pages_temp list by index in ascending order as defined in pages folder e-g 0_, 1_ etc + pages_temp.sort(key=lambda x: x[0]) + + # Add pages + for index, page_data in pages_temp: + # Add the new page configuration + pages[page_data['page_script_hash']] = { + "page_script_hash": page_data['page_script_hash'], + "page_name": page_data['page_name'], + "icon": page_data['icon'], + "script_path": page_data['script_path'], + } + + # Refresh the page configuration + _on_pages_changed.send() + + +def add_page(main_script_path_str: str, page_name: str) -> None: + """ + Add a new page to an app's configuration. + + Args: + main_script_path_str (str): The name of the main page, typically the app's name. + page_name (str): The name of the page to be added. + + Returns: + None + """ + # Get all pages + pages = get_pages(main_script_path_str) + + # Obtain the path to the main script + main_script_path = Path(main_script_path_str) + + # Define the directory where pages are stored + pages_dir = main_script_path.parent / "pages" + + # Find the script path corresponding to the new page script_path = [f for f in pages_dir.glob("*.py") if f.name.find(page_name) != -1][0] script_path_str = str(script_path.resolve()) - pi, pn = page_icon_and_name(script_path) + # Calculate the MD5 hash of the script path psh = calc_md5(script_path_str) - #add new page config + + # Obtain the page icon and name + pi, pn = page_icon_and_name(script_path) + + # Add the new page configuration pages[psh] = { "page_script_hash": psh, "page_name": pn, "icon": pi, "script_path": script_path_str, } - #refresh the page config + + # Refresh the page configuration _on_pages_changed.send() -# define the costant length_captcha = 5 width = 400 height = 180 # define the function for the captcha control -def captcha_control(): +def captcha_control() -> None: + """ + Captcha control function to verify if the user is not a robot. + + This function displays a captcha image and allows the user to enter the captcha text. + It verifies whether the entered captcha text matches the generated captcha. + + Returns: + None + """ #control if the captcha is correct if 'controllo' not in st.session_state or st.session_state['controllo'] == False: st.title("Makesure you are not a robot🤖") @@ -84,30 +193,30 @@ def captcha_control(): # define the session state for the captcha text because it doesn't change during refreshes if 'Captcha' not in st.session_state: st.session_state['Captcha'] = ''.join(random.choices(string.ascii_uppercase + string.digits, k=length_captcha)) - + #setup the captcha widget image = ImageCaptcha(width=width, height=height) data = image.generate(st.session_state['Captcha']) col1.image(data) - capta2_text = col2.text_area('Enter captcha text', height=10, placeholder="captcha text") - - col3, col4 = st.columns(2) - with col4: - if st.button("Verify the code"): - capta2_text = capta2_text.replace(" ", "") - # if the captcha is correct, the controllo session state is set to True - if st.session_state['Captcha'].lower() == capta2_text.lower().strip(): - del st.session_state['Captcha'] - col1.empty() - col2.empty() - st.session_state['controllo'] = True - st.experimental_rerun() - else: - # if the captcha is wrong, the controllo session state is set to False and the captcha is regenerated - st.error("🚨 Captcha is wrong") - del st.session_state['Captcha'] - del st.session_state['controllo'] - st.experimental_rerun() + capta2_text = col2.text_area('Enter captcha text', height=20) + + + if st.button("Verify the code"): + capta2_text = capta2_text.replace(" ", "") + # if the captcha is correct, the controllo session state is set to True + if st.session_state['Captcha'].lower() == capta2_text.lower().strip(): + del st.session_state['Captcha'] + col1.empty() + col2.empty() + st.session_state['controllo'] = True + st.rerun() else: - #wait for the button click - st.stop() \ No newline at end of file + # if the captcha is wrong, the controllo session state is set to False and the captcha is regenerated + st.error("🚨 Captch is wrong") + del st.session_state['Captcha'] + del st.session_state['controllo'] + st.rerun() + else: + #wait for the button click + st.stop() + \ No newline at end of file diff --git a/src/common.py b/src/common.py index 474b1f78..d21724fb 100644 --- a/src/common.py +++ b/src/common.py @@ -127,6 +127,8 @@ def page_setup(page: str = "") -> dict[str, Any]: # If running locally, use the default workspace if "local" in sys.argv: st.session_state.workspace = Path(workspaces_dir, "default") + # not any captcha so, controllo should be true + st.session_state['controllo'] = True # If running online, create a new workspace with a random UUID else: diff --git a/src/run_subprocess.py b/src/run_subprocess.py new file mode 100644 index 00000000..60218104 --- /dev/null +++ b/src/run_subprocess.py @@ -0,0 +1,54 @@ +import streamlit as st +import subprocess + +def run_subprocess(args: list[str], variables: list[str], result_dict: dict) -> None: + """ + Run a subprocess and capture its output. + + Args: + args (list[str]): The command and its arguments as a list of strings. + variables (list[str]): Additional variables needed for the subprocess (not used in this code). + result_dict dict: A dictionary to store the success status (bool) and the captured log (str). + + Returns: + None + """ + + # Run the subprocess and capture its output + process = subprocess.Popen(args, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True) + + # Lists to store the captured standard output and standard error + stdout_ = [] + stderr_ = [] + + # Capture the standard output of the subprocess + while True: + output = process.stdout.readline() + if output == '' and process.poll() is not None: + break + if output: + # Print every line of standard output on the Streamlit page + st.text(output.strip()) + # Append the line to store in the log + stdout_.append(output.strip()) + + # Capture the standard error of the subprocess + while True: + error = process.stderr.readline() + if error == '' and process.poll() is not None: + break + if error: + # Print every line of standard error on the Streamlit page, marking it as an error + st.error(error.strip()) + # Append the line to store in the log of errors + stderr_.append(error.strip()) + + # Check if the subprocess ran successfully (return code 0) + if process.returncode == 0: + result_dict["success"] = True + # Save all lines from standard output to the log + result_dict["log"] = " ".join(stdout_) + else: + result_dict["success"] = False + # Save all lines from standard error to the log, even if the process encountered an error + result_dict["log"] = " ".join(stderr_)