diff --git a/chatbot_ui_biomania/components/Chat/ChatMessage.tsx b/chatbot_ui_biomania/components/Chat/ChatMessage.tsx index 13b9cc8..6c20586 100644 --- a/chatbot_ui_biomania/components/Chat/ChatMessage.tsx +++ b/chatbot_ui_biomania/components/Chat/ChatMessage.tsx @@ -18,6 +18,8 @@ import rehypeMathjax from 'rehype-mathjax'; import remarkGfm from 'remark-gfm'; import remarkMath from 'remark-math'; import ProgressCardController from "@/components/Chat/ProgressCards/ProgressCardController"; +import { libImages } from '@/components/Chat/LibCardSelect'; + export interface Props { message: Message; messageIndex: number; @@ -100,6 +102,13 @@ export const ChatMessage: FC = memo(({ message, messageIndex, onEdit}) => textareaRef.current.style.height = `${textareaRef.current.scrollHeight}px`; } }, [isEditing]); + // const libIconSrc = selectedConversation && selectedConversation.Lib && libImages[selectedConversation.Lib] ? libImages[selectedConversation.Lib] : IconRobot; + const [libIconSrc, setLibIconSrc] = useState(IconRobot); + useEffect(() => { + if (selectedConversation && selectedConversation.Lib && libImages[selectedConversation.Lib]) { + setLibIconSrc(libImages[selectedConversation.Lib]); + } + }, []); return ( <>
= memo(({ message, messageIndex, onEdit}) =>
{message.role === 'assistant' ? ( - + {selectedConversation?.Lib ) : ( )} diff --git a/src/deploy/model.py b/src/deploy/model.py index 35a675e..3b00408 100644 --- a/src/deploy/model.py +++ b/src/deploy/model.py @@ -3,6 +3,8 @@ from sentence_transformers import SentenceTransformer import multiprocessing import numpy as np, matplotlib.pyplot as plt +from functools import lru_cache +import asyncio, aiofiles from ..deploy.ServerEventCallback import ServerEventCallback from ..gpt.utils import get_all_api_json, correct_pred, load_json, save_json, get_retrieved_prompt @@ -15,7 +17,7 @@ from ..prompt.summary import prepare_summary_prompt, prepare_summary_prompt_full from ..configs.Lib_cheatsheet import CHEATSHEET as LIB_CHEATSHEET from ..deploy.utils import basic_types, generate_api_calling, download_file_from_google_drive, download_data, save_decoded_file, correct_bool_values, convert_bool_values, infer, dataframe_to_markdown, convert_image_to_base64, change_format, parse_json_safely, post_process_parsed_params, special_types, io_types, io_param_names -from ..models.dialog_classifier import Dialog_Gaussian_classificaiton +from ..models.dialog_classifier import Dialog_Gaussian_classification def make_execution_correction_prompt(user_input, history_record, error_code, error_message, variables, LIB): prompt = f"Your task is to correct a Python code snippet based on the provided information. The user's inquiry is represented by '{user_input}'. The history of successful executions is provided in '{history_record}', and variables in the namespace are supplied in a dictionary '{variables}'. Execute the error code snippet '{error_code}' and capture the error message '{error_message}'. Analyze the error to determine its root cause. Then, using the entire API name instead of abbreviation in the format '{LIB}.xx.yy'. Ensure any new variables created are named with the prefix 'result_' followed by digits, without reusing existing variable names. If you feel necessary to perform attribute operations similar to 'result_1.var_names.intersection(result_2.var_names)' or subset an AnnData object by columns like 'adata_ref = adata_ref[:, var_names]', go ahead. If you feel importing some libraries are necessary, go ahead. Maintain consistency with the style of previously executed code. Ensure that the corrected code, given the variables in the namespace, can be executed directly without errors. Return the corrected code snippet in the format: '\"\"\"your_corrected_code\"\"\"'. Do not include additional descriptions." @@ -29,12 +31,11 @@ def make_GPT_prompt(user_input, history_record, variables, LIB): class Model: def __init__(self, logger, device, model_llm_type="gpt-3.5-turbo-0125"): # llama3 print('start initialization!') - self.retry_execution_limit = 3 + self.retry_execution_limit = 2 self.retry_execution_count = 0 self.path_info_list = ['path','Path','PathLike'] self.model_llm_type = model_llm_type self.logger = logger - self.logger.debug("Initializing...") self.device=device self.indexxxx = 1 self.inuse = False @@ -60,14 +61,10 @@ def __init__(self, logger, device, model_llm_type="gpt-3.5-turbo-0125"): # llama self.user_states = "run_pipeline" self.parameters_info_list = None self.image_folder = "./tmp/images/" - if not os.path.exists(self.image_folder): - os.makedirs(self.image_folder, exist_ok=True) - if not os.path.exists("./tmp"): - os.makedirs("./tmp", exist_ok=True) - if not os.path.exists("./tmp/states/"): - os.makedirs("./tmp/states/", exist_ok=True) - if not os.path.exists("./tmp/sessions/"): - os.makedirs("./tmp/sessions/", exist_ok=True) + os.makedirs(self.image_folder, exist_ok=True) + os.makedirs("./tmp", exist_ok=True) + os.makedirs("./tmp/states/", exist_ok=True) + os.makedirs("./tmp/sessions/", exist_ok=True) self.image_file_list = [] self.image_file_list = self.update_image_file_list() #with open(f'./data/standard_process/{self.LIB}/vectorizer.pkl', 'rb') as f: @@ -75,8 +72,11 @@ def __init__(self, logger, device, model_llm_type="gpt-3.5-turbo-0125"): # llama with open(f'./data/standard_process/{self.LIB}/centroids.pkl', 'rb') as f: self.centroids = pickle.load(f) self.retrieve_query_mode = "random" - self.all_apis, self.all_apis_json = get_all_api_json(f"./data/standard_process/{self.LIB}/API_init.json", mode='single') + self.get_all_api_json_cache(f"./data/standard_process/{self.LIB}/API_init.json", mode='single') print("Server ready") + @lru_cache(maxsize=10) + def get_all_api_json_cache(self,path,mode): + self.all_apis, self.all_apis_json = get_all_api_json(path, mode) def load_multiple_corpus_in_namespace(self, ): #self.executor.execute_api_call(f"from data.standard_process.{self.LIB}.Composite_API import *", "import") # pyteomics tutorial needs these import libs @@ -87,7 +87,8 @@ def load_multiple_corpus_in_namespace(self, ): self.executor.execute_api_call(f"np.seterr(under='ignore')", "import") self.executor.execute_api_call(f"import warnings", "import") self.executor.execute_api_call(f"warnings.filterwarnings('ignore')", "import") - def load_bert_model(self, load_mode='unfinetuned_bert'): + @lru_cache(maxsize=10) + def load_bert_model_cache(self, load_mode='unfinetuned_bert'): self.bert_model = SentenceTransformer('all-MiniLM-L6-v2', device=self.device) if load_mode=='unfinetuned_bert' else SentenceTransformer(f"./hugging_models/retriever_model_finetuned/{self.LIB}/assigned", device=self.device) def compute_dialog_metrics(self,): annotate_path = f'data/standard_process/{self.LIB}/API_inquiry_annotate.json' @@ -96,13 +97,13 @@ def compute_dialog_metrics(self,): LIB_ALIAS = info_json['LIB_ALIAS'] self.dialog_p_threshold = 0.05 data_source = "single_query_train" - self.dialog_classifer = Dialog_Gaussian_classificaiton(threshold=self.dialog_p_threshold) + self.dialog_classifer = Dialog_Gaussian_classification(threshold=self.dialog_p_threshold) scores_train, outliers = self.dialog_classifer.compute_accuracy_filter_compositeAPI(self.LIB, self.retriever, annotate_data, self.args_top_k, name=data_source, LIB_ALIAS=LIB_ALIAS) self.dialog_mean, self.dialog_std = self.dialog_classifer.fit_gaussian(scores_train['rank_1']) def reset_lib(self, lib_name): + self.initialize_tool() #lib_name = lib_name.strip() - self.logger.debug("================") - self.logger.debug("==>Start reset the Lib {}!", lib_name) + self.logger.info("==>Start reset the Lib {}!", lib_name) # reset and reload all the LIB-related data/models # suppose that all data&model are prepared already in their path try: @@ -111,12 +112,9 @@ def reset_lib(self, lib_name): self.ambiguous_pair = find_similar_two_pairs(f"./data/standard_process/{lib_name}/API_init.json") self.ambiguous_api = list(set(api for api_pair in self.ambiguous_pair for api in api_pair)) self.load_data(f"./data/standard_process/{lib_name}/API_composite.json") - self.logger.info("==>loaded API json done") - self.load_bert_model() - self.logger.info("==>loaded finetuned bert for chitchat") - #self.load_composite_code(lib_name) + self.load_bert_model_cache() + #self.load_composite_code_cache(lib_name) t1 = time.time() - self.logger.info('==>Start loading model!') retrieval_model_path = self.args_retrieval_model_path parts = retrieval_model_path.split('/') if len(parts)>=3: # only work for path containing LIB, otherwise, please reenter the path in script @@ -125,25 +123,20 @@ def reset_lib(self, lib_name): parts[-2]= lib_name new_path = '/'.join(parts) retrieval_model_path = new_path - self.logger.info("load retrieval_model_path in: {}", retrieval_model_path) self.retriever = ToolRetriever(LIB=lib_name,corpus_tsv_path=f"./data/standard_process/{lib_name}/retriever_train_data/corpus.tsv", model_path=retrieval_model_path, add_base=False) - self.logger.info('loaded retriever!') self.load_multiple_corpus_in_namespace() self.executor.execute_api_call(f"import {lib_name}", "import") - self.all_apis, self.all_apis_json = get_all_api_json(f"./data/standard_process/{lib_name}/API_init.json", mode='single') + self.get_all_api_json_cache(f"./data/standard_process/{self.LIB}/API_init.json", mode='single') with open(f'./data/standard_process/{self.LIB}/centroids.pkl', 'rb') as f: self.centroids = pickle.load(f) - self.logger.info('==>Successfully loading model!') - self.logger.info("loading model cost: {} s", str(time.time()-t1)) + self.logger.info("==>Successfully loading model! loading model cost: {} s", str(time.time()-t1)) reset_result = "Success" self.LIB = lib_name # compute the dialog metrics self.compute_dialog_metrics() except Exception as e: - self.logger.error("at least one data or model is not ready, please install lib first!") self.logger.error("Error: {}", e) reset_result = "Fail" - self.initialize_tool() self.callback_func('log', f"Something wrong with loading data and model! \n{e}", "Setting error") return reset_result def install_lib(self,lib_name, lib_alias, api_html=None, github_url=None, doc_url=None): @@ -320,7 +313,8 @@ def save_state_enviro(self): self.save_state() def update_image_file_list(self): return [f for f in os.listdir(self.image_folder) if f.endswith(".webp")] - def load_composite_code(self, lib_name): + @lru_cache(maxsize=10) + def load_composite_code_cache(self, lib_name): # deprecated module_name = f"data.standard_process.{lib_name}.Composite_API" module = importlib.import_module(module_name) @@ -335,16 +329,16 @@ def load_composite_code(self, lib_name): def initialize_executor(self): self.executor = CodeExecutor(self.logger) self.executor.callbacks = self.callbacks - self.executor.variables={} - self.executor.execute_code=[] + #self.executor.variables={} + #self.executor.execute_code=[] self.clear_globals_with_prefix('result_') def clear_globals_with_prefix(self, prefix): global_vars = list(globals().keys()) for var in global_vars: if var.startswith(prefix): del globals()[var] + @lru_cache(maxsize=10) def load_data(self, API_file): - # fix 231227, add API_base.json data = load_json(API_file) base_data = load_json("./data/standard_process/base/API_composite.json") self.API_composite = data @@ -382,16 +376,23 @@ def callback_func(self, type_task, task, task_title, color="", tableData=None, i callback.on_agent_action(**kwargs) if enhance_indexxxx: # sometimes we want to deprecate it, when running something slowly self.indexxxx += 1 - def loading_data(self, files, verbose=False): - for ids, file_path in enumerate(files): - file_extension = os.path.splitext(file_path)[1] - loading_code = self.generate_file_loading_code(file_path, file_extension) - self.executor.execute_api_call(loading_code, "code") + async def load_single_file(self, file_path): + file_extension = os.path.splitext(file_path)[1] + loading_code = self.generate_file_loading_code(file_path, file_extension) + self.executor.execute_api_call(loading_code, "code") + return file_path + async def loading_data_async(self, files, verbose=False): + tasks = [self.load_single_file(file_path) for file_path in files] + for ids, task in enumerate(asyncio.as_completed(tasks)): + file_path = await task if verbose: - self.callback_func('installation', "uploading files..."+str(ids+1)+'/'+str(len(files)), str(int((ids+1)/len(files)*100))) + self.callback_func('installation', f"uploading files...{ids+1}/{len(files)}", str(int((ids+1)/len(files)*100))) self.logger.info("loading data finished!") if verbose: self.callback_func('installation', "uploading files finished!", "100") + def loading_data(self, files, verbose=False): + asyncio.run(self.loading_data_async(files, verbose)) + def save_state(self): a = str(self.session_id) file_name = f"./tmp/states/{a}_state.pkl" @@ -399,14 +400,13 @@ def save_state(self): with open(file_name, 'wb') as file: pickle.dump(state, file) self.logger.info("State saved to {}", file_name) + @lru_cache(maxsize=10) def load_state(self, session_id): a = str(session_id) file_name = f"./tmp/states/{a}_state.pkl" with open(file_name, 'rb') as file: state = pickle.load(file) - print('before loadstate', self.executor.counter) self.__dict__.update(state) - print('after loadstate', self.executor.counter) self.logger.info("State loaded from {}", file_name) def run_pipeline(self, user_input, lib, top_k=3, files=[],conversation_started=True,session_id=""): self.indexxxx = 2 @@ -415,16 +415,10 @@ def run_pipeline(self, user_input, lib, top_k=3, files=[],conversation_started=T self.session_id = session_id try: self.load_state(session_id) - self.logger.info(f"Current folder path: {os.getcwd()}") - self.logger.info(f"subfolderpath: {os.listdir('.')}") - self.logger.info('load state successfully!') a = str(self.session_id) self.executor.load_environment(f"./tmp/sessions/{a}_environment.pkl") - self.logger.info('load environment successfully!') - self.logger.info(self.executor.counter) except Exception as e: self.logger.error(e) - self.logger.error('no local session_id environment exist! start from scratch') self.initialize_executor() pass # only reset lib when changing lib @@ -432,11 +426,11 @@ def run_pipeline(self, user_input, lib, top_k=3, files=[],conversation_started=T reset_result = self.reset_lib(lib) if reset_result=='Fail': self.logger.error('Reset lib fail! Exit the dialog!') - print('Reset lib fail! Exit the dialog!') return self.args_retrieval_model_path = f'./hugging_models/retriever_model_finetuned/{lib}/assigned' self.LIB = lib - elif lib=='GPT': + elif lib=='GPT' and self.last_user_states != "run_pipeline_asking_GPT": + self.user_query = user_input self.update_user_state("run_pipeline_asking_GPT") # only clear namespace when starting new conversations if conversation_started in ["True", True]: @@ -469,7 +463,6 @@ def run_pipeline(self, user_input, lib, top_k=3, files=[],conversation_started=T else: pass # dialog prediction - self.logger.info('start predicting whether inquiry is a complex task description or simple one!') pred_class = self.dialog_classifer.single_prediction(user_input, self.retriever, self.args_top_k) self.logger.info('----query inferred as {}----', pred_class) if pred_class not in ['single']: @@ -518,7 +511,8 @@ def run_pipeline(self, user_input, lib, top_k=3, files=[],conversation_started=T response = correct_pred(response, self.LIB) response = response.strip() #self.logger.info('self.all_apis_json keys: {}', self.all_apis_json.keys()) - self.logger.info('response in self.all_apis_json: {}', response in self.all_apis_json) + if len(response.split(','))>1: + response = response.split(',')[0] self.all_apis_json[response] self.predicted_api_name = response success = True @@ -529,7 +523,6 @@ def run_pipeline(self, user_input, lib, top_k=3, files=[],conversation_started=T self.initialize_tool() self.callback_func('log', "LLM can not return valid API name prediction, please redesign your prompt.", "LLM predict Error") return - self.logger.info('length of ambiguous api list: {}',len(self.ambiguous_api)) # if the predicted API is in ambiguous API list, then show those API and select one from them if self.predicted_api_name in self.ambiguous_api: filtered_pairs = [api_pair for api_pair in self.ambiguous_pair if self.predicted_api_name in api_pair] @@ -555,7 +548,6 @@ def run_pipeline(self, user_input, lib, top_k=3, files=[],conversation_started=T elif self.user_states == "run_pipeline_after_ambiguous": ans = self.run_pipeline_after_ambiguous(user_input) if ans in ['break']: - self.logger.info('break the loop! Restart the inquiry') return self.run_pipeline_after_fixing_API_selection(user_input) elif self.user_states in ["run_pipeline_after_doublechecking_execution_code", "run_pipeline_after_entering_params", "run_select_basic_params", "run_pipeline_after_select_special_params", "run_select_special_params", "run_pipeline_after_doublechecking_API_selection", "run_pipeline_asking_GPT"]: @@ -564,27 +556,29 @@ def run_pipeline(self, user_input, lib, top_k=3, files=[],conversation_started=T self.logger.error('Unknown user state: {}', self.user_states) raise ValueError def run_pipeline_asking_GPT(self,user_input): + self.initialize_tool() self.logger.info('==>run_pipeline_asking_GPT') self.retry_execution_count +=1 if self.retry_execution_count>self.retry_execution_limit: self.logger.error('retry_execution_count exceed the limit! Exit the dialog!') - self.initialize_tool() self.callback_func('log', 'code generation using GPT has exceed the limit! Please choose other lib and re-enter the inquiry! You can use GPT again once you have executed code successfully through our tool!', 'Error') self.update_user_state('run_pipeline') self.save_state_enviro() return - self.initialize_tool() - self.logger.info('==>start asking GPT for code generation!') prompt = make_GPT_prompt(self.user_query, str(self.executor.execute_code), str(self.executor.variables), self.LIB) response, _ = LLM_response(prompt, self.model_llm_type, history=[], kwargs={}) - newer_code = response.replace('\"\"\"', '') + if '```python' in response and response.endswith('```'): + response = response.split('```python')[1].split('```')[0] + if '\"\"\"' in response: + response = response.replace('\"\"\"', '') + newer_code = response self.execution_code = newer_code self.callback_func('code', self.execution_code, "Executed code") # LLM response summary_prompt = prepare_summary_prompt_full(self.user_query, self.predicted_api_name, self.API_composite[self.predicted_api_name]['description'], self.API_composite[self.predicted_api_name]['Parameters'],self.API_composite[self.predicted_api_name]['Returns'], self.execution_code) response, _ = LLM_response(summary_prompt, self.model_llm_type, history=[], kwargs={}) self.callback_func('log', response, "Task summary before execution") - self.callback_func('log', "Could you confirm whether this task is what you aimed for, and the code should be executed? Please enter y/n.\nIf you press n, then we will re-direct to the parameter input step", "Double Check") + self.callback_func('log', "Could you confirm whether this task is what you aimed for, and the code should be executed? Please enter y/n.\nIf you press n, we will re-direct to the parameter input step\nIf you press r, we will restart another turn", "Double Check") self.update_user_state("run_pipeline_after_doublechecking_execution_code") self.save_state_enviro() return @@ -597,30 +591,28 @@ def handle_state_transition(self, user_input): return method(user_input) def run_pipeline_after_ambiguous(self,user_input): + self.initialize_tool() self.logger.info('==>run_pipeline_after_ambiguous') user_input = user_input.strip() - self.initialize_tool() try: - int(user_input) - except: + user_index = int(user_input) + except ValueError: self.callback_func('log', "Error: the input is not a number.\nPlease re-enter the index", "Index Error") self.update_user_state("run_pipeline_after_ambiguous") return 'break' + if user_index==-1: + self.update_user_state("run_pipeline") + self.callback_func('log', "We will start another round. Could you re-enter your inquiry?", "Start another round") + self.save_state_enviro() + return 'break' try: - if int(user_input)==-1: - self.update_user_state("run_pipeline") - self.callback_func('log', "We will start another round. Could you re-enter your inquiry?", "Start another round") - self.logger.info("user state updated to run_pipeline") - self.save_state_enviro() - return 'break' - else: - self.filtered_api[int(user_input)-1] - except: + self.filtered_api[user_index-1] + except IndexError: self.callback_func('log', "Error: the input index exceed the maximum length of ambiguous API list\nPlease re-enter the index", "Index Error") self.update_user_state("run_pipeline_after_ambiguous") return 'break' self.update_user_state("run_pipeline_after_fixing_API_selection") - self.predicted_api_name = self.filtered_api[int(user_input)-1] + self.predicted_api_name = self.filtered_api[user_index-1] self.save_state_enviro() def process_api_info(self, api_info, single_api_name): relevant_apis = api_info.get(single_api_name, {}).get("relevant APIs") @@ -657,67 +649,50 @@ def update_user_state(self, new_state): self.user_states = new_state #print(f"Updated state from {self.last_user_states} to {self.user_states}") def run_pipeline_after_fixing_API_selection(self,user_input): + self.initialize_tool() self.logger.info('==>run_pipeline_after_fixing_API_selection') # check if composite API/class method API, return the relevant APIs self.relevant_api_list = self.process_api_info(self.API_composite, self.predicted_api_name) # only contains predicted API - self.logger.info('self.relevant_api_list: {}', self.relevant_api_list) self.api_name_json = self.check_and_insert_class_apis(self.API_composite, self.relevant_api_list)# also contains class API - self.logger.info('self.api_name_json: {}', json.dumps(self.api_name_json)) self.update_user_state("run_pipeline") api_description = self.API_composite[self.predicted_api_name]['description'] # summary task summary_prompt = prepare_summary_prompt(user_input, self.predicted_api_name, api_description, self.API_composite[self.predicted_api_name]['Parameters'],self.API_composite[self.predicted_api_name]['Returns']) - self.logger.info('summary_prompt: {}', summary_prompt) response, _ = LLM_response(summary_prompt, self.model_llm_type, history=[], kwargs={}) - self.logger.info('summary_prompt response: {}', response) - self.initialize_tool() + self.logger.info(f'summary_prompt: {summary_prompt}, summary_prompt response: {response}') self.callback_func('log', response, f"Predicted API: {self.predicted_api_name}") self.callback_func('log', "Could you confirm whether this API should be called? Please enter y/n.", "Double Check") self.update_user_state("run_pipeline_after_doublechecking_API_selection") self.save_state_enviro() def run_pipeline_after_doublechecking_API_selection(self, user_input): + self.initialize_tool() self.logger.info('==>run_pipeline_after_doublechecking_API_selection') user_input = str(user_input) - if user_input in ['y', 'n']: - if user_input == 'n': - self.logger.info("user input is n") - self.update_user_state("run_pipeline") - self.logger.info("user state updated to run_pipeline") - self.initialize_tool() - self.logger.info("user tool initialized") - self.callback_func('log', "We will start another round. Could you re-enter your inquiry?", "Start another round") - self.save_state_enviro() - return - else: - self.logger.info("user input is y") - pass + if user_input == 'n': + self.update_user_state("run_pipeline") + self.callback_func('log', "We will start another round. Could you re-enter your inquiry?", "Start another round") + self.save_state_enviro() + return + elif user_input == 'y': + pass else: - self.logger.info('input is not y or n') - self.initialize_tool() self.callback_func('log', "The input was not y or n, please enter the correct value.", "Index Error") self.save_state_enviro() # user_states didn't change return - self.logger.info("==>Need to collect all parameters for a composite API") + #self.logger.info("==>Need to collect all parameters for a composite API") combined_params = {} self.logger.info('self.api_name_json: {}', self.api_name_json) # if the class API has already been initialized, then skip it + executor_variables = {var_name: var_info["value"] for var_name, var_info in self.executor.variables.items() if str(var_info["value"]) not in ["None"]} for api in self.api_name_json: maybe_class_name = api.split('.')[-1] maybe_instance_name = maybe_class_name.lower() + "_instance" # 240520: modified, support for variable with none xx_instance name if self.API_composite[api]['api_type'] in ['class', 'unknown']: - executor_variables = {} - for var_name, var_info in self.executor.variables.items(): - var_value = var_info["value"] - if str(var_value) not in ["None"]: - executor_variables[var_name] = var_value - self.logger.info('executor_variables: {}', executor_variables) - self.logger.info("api: {}", api) matching_instance, is_match = find_matching_instance(api, executor_variables) - self.logger.info('matching_instance: {}', matching_instance) - self.logger.info('is_match: {}', is_match) + self.logger.info(f'executor_variables: {executor_variables}, api: {api}, matching_instance: {matching_instance}, is_match: {is_match}') if is_match: maybe_instance_name = matching_instance continue @@ -754,7 +729,6 @@ def run_pipeline_after_doublechecking_API_selection(self, user_input): tmp_api_parameters_information = self.API_composite[apis_name]['Parameters'] api_docstring = json_to_docstring(apis_name, apis_description, tmp_api_parameters_information)###TODO: here only works for one api, if we add compositeAPI or classAPI in the future, we need to buildup a parameters selection for multiple API!!! parameters_prompt = prepare_parameters_prompt(self.user_query, api_docstring, parameters_name_list) - self.logger.info('parameters_prompt: {}', parameters_prompt) except Exception as e: self.logger.error('error for parameters: {}', e) if len(parameters_name_list)==0: @@ -774,12 +748,10 @@ def run_pipeline_after_doublechecking_API_selection(self, user_input): except Exception as e: self.logger.error('error during parameters prediction: {}', e) pass - self.logger.info('success or not: {}', success) if not success: self.callback_func('log', "LLM can not return valid parameters prediction, please redesign prompt in backend if you want to predict parameters. We will skip parameters prediction currently", "LLM predict Error") response = "{}" predicted_parameters = {} - self.logger.info('predicted_parameters: {}', predicted_parameters) # filter predicted_parameters required_param_list = [param_name for param_name, param_info in self.API_composite[apis_name]['Parameters'].items() if param_info['type'] in special_types or param_info['type'] in io_types or param_name in io_param_names] predicted_parameters = {key: value for key, value in predicted_parameters.items() if value not in [None, "None", "null"] or key in required_param_list} @@ -787,16 +759,11 @@ def run_pipeline_after_doublechecking_API_selection(self, user_input): # generate api_calling self.predicted_api_name, api_calling, self.parameters_info_list = generate_api_calling(self.predicted_api_name, self.API_composite[self.predicted_api_name], predicted_parameters) self.logger.info('parameters_info_list: {}', self.parameters_info_list) - self.logger.info('finished generate api calling') if len(self.api_name_json)> len(self.relevant_api_list): - self.logger.info('len(self.api_name_json)> len(self.relevant_api_list)') #assume_class_API = list(set(list(self.api_name_json.keys()))-set(self.relevant_api_list))[0] assume_class_API = '.'.join(self.predicted_api_name.split('.')[:-1]) tmp_class_predicted_api_name, tmp_class_api_calling, tmp_class_parameters_info_list = generate_api_calling(assume_class_API, self.API_composite[assume_class_API], predicted_parameters) - self.logger.info('assume_class_API: {}', assume_class_API) - self.logger.info('tmp_class_predicted_api_name: {}', tmp_class_predicted_api_name) - self.logger.info('tmp_class_api_calling: {}', tmp_class_api_calling) - self.logger.info('tmp_class_parameters_info_list: {}', tmp_class_parameters_info_list) + self.logger.info(f'assume_class_API: {assume_class_API}, tmp_class_predicted_api_name: {tmp_class_predicted_api_name}, tmp_class_api_calling: {tmp_class_api_calling}, tmp_class_parameters_info_list: {tmp_class_parameters_info_list}') fix_update = True for api in self.api_name_json: maybe_class_name = api.split('.')[-1] @@ -813,8 +780,7 @@ def run_pipeline_after_doublechecking_API_selection(self, user_input): matching_instance, is_match = find_matching_instance(api, executor_variables) except Exception as e: self.logger.error('error during matching_instance: {}', e) - self.logger.info('matching_instance: {}', matching_instance) - self.logger.info('is_match: {}', is_match) + self.logger.info(f'matching_instance: {matching_instance}, is_match: {is_match}') if is_match: maybe_instance_name = matching_instance fix_update = False @@ -823,7 +789,6 @@ def run_pipeline_after_doublechecking_API_selection(self, user_input): if fix_update: self.logger.info('fix_update') self.parameters_info_list['parameters'].update(tmp_class_parameters_info_list['parameters']) - self.logger.info('start inferring parameters') ####### infer parameters # $ param self.selected_params = self.executor.select_parameters(self.parameters_info_list['parameters']) @@ -832,8 +797,6 @@ def run_pipeline_after_doublechecking_API_selection(self, user_input): none_dollar_value_params = [param_name for param_name, param_info in self.selected_params.items() if param_info["value"] in ['$']] self.logger.info('none_dollar_value_params: {}', json.dumps(none_dollar_value_params)) if none_dollar_value_params: - #self.logger.info(self.user_states) - self.initialize_tool() self.callback_func('log', "However, there are still some parameters with special type undefined. Please start from uploading data, or check your parameter type in json files.", "Missing Parameters: special type") self.update_user_state("run_pipeline") self.save_state_enviro() @@ -842,7 +805,6 @@ def run_pipeline_after_doublechecking_API_selection(self, user_input): multiple_dollar_value_params = [param_name for param_name, param_info in self.selected_params.items() if ('list' in str(type(param_info["value"]))) and (len(param_info["value"])>1)] self.filtered_params = {key: value for key, value in self.parameters_info_list['parameters'].items() if (key in multiple_dollar_value_params)} if multiple_dollar_value_params: - self.logger.info('==>There exist multiple choice for a special type parameters, start selecting parameters') self.callback_func('log', "There are many variables match the expected type. Please determine which one to choose", "Choosing Parameters: special type") tmp_input_para = "" for idx, api in enumerate(self.filtered_params): @@ -863,10 +825,10 @@ def get_success_code_with_val(self, val): return i['code'] self.callback_func('log', "Can not find the executed code corresponding to the expected parameters", "Error Enter Parameters: special type","red") def run_select_special_params(self, user_input): + self.initialize_tool() self.logger.info('==>run_select_special_params') if self.last_user_states == "run_select_special_params": self.selected_params = self.executor.makeup_for_missing_single_parameter_type_special(params = self.selected_params, param_name_to_update=self.last_param_name, user_input = user_input) - self.initialize_tool() #print('self.filtered_params: {}', json.dumps(self.filtered_params)) if len(self.filtered_params)>1: self.last_param_name = list(self.filtered_params.keys())[0] @@ -900,10 +862,9 @@ def run_pipeline_after_select_special_params(self,user_input): if self.last_user_states == "run_select_special_params": self.selected_params = self.executor.makeup_for_missing_single_parameter_type_special(params = self.selected_params, param_name_to_update=self.last_param_name, user_input = user_input) # @ param - self.logger.info('starting entering basic params') + self.logger.info('==> run_pipeline_after_select_special_params') none_at_value_params = [param_name for param_name, param_info in self.selected_params.items() if (param_info["value"] in ['@']) and (param_name not in self.path_info_list)] self.filtered_params = {key: value for key, value in self.parameters_info_list['parameters'].items() if (value["value"] in ['@']) and (key not in self.path_info_list)} - self.filtered_pathlike_params = {} self.filtered_pathlike_params = {key: value for key, value in self.parameters_info_list['parameters'].items() if (value["value"] in ['@']) and (key in self.path_info_list)} # TODO: add condition later: if uploading data files, # avoid asking Path params, assign it as './tmp' @@ -923,10 +884,10 @@ def run_pipeline_after_select_special_params(self,user_input): self.run_pipeline_after_entering_params(user_input) def run_select_basic_params(self, user_input): + self.initialize_tool() self.logger.info('==>run_select_basic_params') if self.last_user_states == "run_select_basic_params": self.selected_params = self.executor.makeup_for_missing_single_parameter(params = self.selected_params, param_name_to_update=self.last_param_name, user_input = user_input) - self.initialize_tool() self.logger.info('self.filtered_params: {}', json.dumps(self.filtered_params)) if len(self.filtered_params)>1: self.last_param_name = list(self.filtered_params.keys())[0] @@ -986,10 +947,13 @@ def extract_parameters(self, api_name_json, api_info, selected_params): for param_name, param_info in api_params.items(): if param_name not in combined_params: combined_params[param_name] = param_info + combined_params = {param_name: param_info for param_name, param_info in api_params.items() if param_name not in combined_params} + parameters_combined.append(combined_params) return parameters_combined def run_pipeline_after_entering_params(self, user_input): + self.initialize_tool() if self.last_user_states == "run_select_basic_params": self.selected_params = self.executor.makeup_for_missing_single_parameter(params = self.selected_params, param_name_to_update=self.last_param_name, user_input = user_input) self.logger.info('==>run pipeline after entering parameters') @@ -997,73 +961,68 @@ def run_pipeline_after_entering_params(self, user_input): self.image_file_list = self.update_image_file_list() if self.filtered_pathlike_params: # add 'tmp' - for key in self.filtered_pathlike_params: - param_info = self.filtered_pathlike_params[key] - self.selected_params[key] = { + self.selected_params.update({ + key: { "type": param_info["type"], "value": "./tmp", "valuefrom": 'userinput', - "optional": param_info["optional"], + "optional": param_info["optional"] } - self.logger.info('self.selected_params:') - self.logger.info(json.dumps(self.selected_params)) + for key, param_info in self.filtered_pathlike_params.items() + }) + self.logger.info('self.selected_params: {}', json.dumps(self.selected_params)) # split parameters according to multiple API, or class/method API parameters_list = self.extract_parameters(self.api_name_json, self.API_composite, self.selected_params) self.logger.info('==>parameters_list: {}', json.dumps(parameters_list)) extracted_params = self.split_params(self.selected_params, parameters_list) - self.logger.info('==>self.api_name_json: {}, parameters_list: {}', self.api_name_json, parameters_list) - self.logger.info('==>extracted_params: {}', extracted_params) extracted_params_dict = {api_name: extracted_param for api_name, extracted_param in zip(self.api_name_json, extracted_params)} - self.logger.info('extracted_params_dict: {}', json.dumps(extracted_params_dict)) + self.logger.info('==>self.api_name_json: {}, parameters_list: {}, extracted_params: {}, extracted_params_dict: {}',str(self.api_name_json), str(parameters_list), str(extracted_params), str(extracted_params_dict)) api_params_list = [] for idx, api_name in enumerate(self.api_name_json): - if True: - #if self.api_name_json[api_name]['type'] in ['class', 'unknown']: # ! - #print('==>assume not start with class API: {}', api_name) - class_selected_params = {} - fake_class_api = '.'.join(api_name.split('.')[:-1]) - if fake_class_api in self.api_name_json: - if self.api_name_json[fake_class_api]['type'] in ['class', 'unknown']: - class_selected_params = extracted_params_dict[fake_class_api] - # two patches for pandas type data / squidpy parameters - if ('inplace' in self.API_composite[api_name]['Parameters']) and (api_name.startswith('scanpy') or api_name.startswith('squidpy')): - extracted_params[idx]['inplace'] = { - "type": self.API_composite[api_name]['Parameters']['inplace']['type'], - "value": True, - "valuefrom": 'value', - "optional": True, - } - if 'shape' in self.API_composite[api_name]['Parameters'] and 'pl.spatial_scatter' in api_name: - extracted_params[idx]['shape'] = { - "type": self.API_composite[api_name]['Parameters']['shape']['type'], - "value": "None", - "valuefrom": 'value', - "optional": True, - } - # don't include class API, just include class.attribute API - if self.API_composite[api_name]['api_type'] not in ['class', 'unknown']: - # when using class.attribute API, only include the API's information. + #if self.api_name_json[api_name]['type'] in ['class', 'unknown']: # ! + #print('==>assume not start with class API: {}', api_name) + class_selected_params = {} + fake_class_api = '.'.join(api_name.split('.')[:-1]) + if fake_class_api in self.api_name_json: + if self.api_name_json[fake_class_api]['type'] in ['class', 'unknown']: + class_selected_params = extracted_params_dict[fake_class_api] + api_data_single = self.API_composite[api_name] + # two patches for pandas type data / squidpy parameters + if ('inplace' in api_data_single['Parameters']) and (api_name.startswith('scanpy') or api_name.startswith('squidpy')): + extracted_params[idx]['inplace'] = { + "type": api_data_single['Parameters']['inplace']['type'], + "value": True, + "valuefrom": 'value', + "optional": True, + } + if 'shape' in api_data_single['Parameters'] and 'pl.spatial_scatter' in api_name: + extracted_params[idx]['shape'] = { + "type": api_data_single['Parameters']['shape']['type'], + "value": "None", + "valuefrom": 'value', + "optional": True, + } + # don't include class API, just include class.attribute API + if api_data_single['api_type'] not in ['class', 'unknown']: + # when using class.attribute API, only include the API's information. + api_params_list.append({"api_name":api_name, + "parameters":extracted_params[idx], + "return_type":api_data_single['Returns']['type'], + "class_selected_params":class_selected_params, + "api_type":api_data_single['api_type']}) + else: # ==`class` + if len(self.api_name_json)==1: + # When using class API, only include class API's api_params_list.append({"api_name":api_name, "parameters":extracted_params[idx], - "return_type":self.API_composite[api_name]['Returns']['type'], - "class_selected_params":class_selected_params, - "api_type":self.API_composite[api_name]['api_type']}) - else: # ==`class` - if len(self.api_name_json)==1: - # When using class API, only include class API's - api_params_list.append({"api_name":api_name, - "parameters":extracted_params[idx], - "return_type":self.API_composite[api_name]['Returns']['type'], - "class_selected_params":extracted_params[idx], - "api_type":self.API_composite[api_name]['api_type']}) - else: - pass - self.logger.info('==>api_params_list: {}', json.dumps(api_params_list)) + "return_type":api_data_single['Returns']['type'], + "class_selected_params":extracted_params[idx], + "api_type":api_data_single['api_type']}) + else: + pass # add optional cards - optional_param = {key: value for key, value in self.API_composite[api_name]['Parameters'].items() if value['optional']} - self.logger.info('==>optional_param: {}', json.dumps(optional_param)) - self.logger.info('len(optional_param) {}', len(optional_param)) - self.initialize_tool() + optional_param = {key: value for key, value in api_data_single['Parameters'].items() if value['optional']} + self.logger.info('==>api_params_list: {}, ==>optional_param: {}, len(optional_param) {}', json.dumps(api_params_list), json.dumps(optional_param), len(optional_param)) if False: # TODO: if True, to debug the optional card showing if len(optional_param)>0: self.logger.info('producing optional param card') @@ -1072,42 +1031,43 @@ def run_pipeline_after_entering_params(self, user_input): else: pass # TODO: real time adjusting execution_code according to optionalcard - self.logger.info('api_params_list: {}', api_params_list) self.execution_code = self.executor.generate_execution_code(api_params_list) - self.logger.info('==>execution_code: {}', self.execution_code) - self.initialize_tool() + self.logger.info('==>api_params_list: {}, execution_code: {}', api_params_list, self.execution_code) self.callback_func('code', self.execution_code, "Executed code") # LLM response - summary_prompt = prepare_summary_prompt_full(user_input, self.predicted_api_name, self.API_composite[self.predicted_api_name]['description'], self.API_composite[self.predicted_api_name]['Parameters'],self.API_composite[self.predicted_api_name]['Returns'], self.execution_code) + api_data_single = self.API_composite[self.predicted_api_name] + summary_prompt = prepare_summary_prompt_full(user_input, self.predicted_api_name, api_data_single['description'], api_data_single['Parameters'],api_data_single['Returns'], self.execution_code) response, _ = LLM_response(summary_prompt, self.model_llm_type, history=[], kwargs={}) self.callback_func('log', response, "Task summary before execution") - self.callback_func('log', "Could you confirm whether this task is what you aimed for, and the code should be executed? Please enter y/n.\nIf you press n, then we will re-direct to the parameter input step", "Double Check") + self.callback_func('log', "Could you confirm whether this task is what you aimed for, and the code should be executed? Please enter y/n.\nIf you press n, we will re-direct to the parameter input step\nIf you press r, we will restart another turn", "Double Check") self.update_user_state("run_pipeline_after_doublechecking_execution_code") self.save_state_enviro() def run_pipeline_after_doublechecking_execution_code(self, user_input): self.initialize_tool() + self.logger.info('==> run_pipeline_after_doublechecking_execution_code') # if check, back to the last iteration and status - if user_input in ['y', 'n']: + if user_input in ['y', 'n', 'r']: if user_input == 'n': - self.logger.info('input n') - #self.user_states = "run_pipeline" self.update_user_state("run_pipeline_after_doublechecking_API_selection")#TODO: check if exist issue self.callback_func('log', "We will redirect to the parameters input", "Re-enter the parameters") self.save_state_enviro() self.run_pipeline_after_doublechecking_API_selection('y') return + elif user_input == 'r': + self.update_user_state("run_pipeline") + self.callback_func('log', "We will start another round. Could you re-enter your inquiry?", "Start another round") + self.save_state_enviro() + return else: - self.logger.info('input y') + pass else: - self.logger.info('input not y or n') - self.callback_func('log', "The input was not y or n, please enter the correct value.", "Index Error") + self.callback_func('log', "The input was not y or n or r, please enter the correct value.", "Index Error") self.save_state_enviro() # user_states didn't change return # else, continue execution_code_list = self.execution_code.split('\n') - self.logger.info('execute and obtain figures') self.plt_status = plt.get_fignums() temp_output_file = "./sub_process_execution.txt" process = multiprocessing.Process(target=self.run_pipeline_execution_code_list, args=(execution_code_list, temp_output_file)) @@ -1133,11 +1093,7 @@ def run_pipeline_after_doublechecking_execution_code(self, user_input): self.last_execute_code = self.get_last_execute_code(code) else: self.last_execute_code = {"code":"", 'success':"False"} - self.logger.info('Something wrong with generating code with new API!') - self.logger.info('self.executor.variables:') - self.logger.info(json.dumps(list(self.executor.variables.keys()))) - self.logger.info('self.executor.execute_code:') - self.logger.info(json.dumps(self.executor.execute_code)) + self.logger.info('self.executor.variables: {}, self.executor.execute_code: {}, self.last_execute_code: {}', json.dumps(list(self.executor.variables.keys())), json.dumps(self.executor.execute_code), str(self.last_execute_code)) try: content = '\n'.join(output_list) except: @@ -1146,14 +1102,12 @@ def run_pipeline_after_doublechecking_execution_code(self, user_input): except: content = "" self.logger.info('content: {}', content) - self.logger.info('self.last_execute_code: {}',str(self.last_execute_code)) # show the new variable - if self.last_execute_code['success']=='True': + if self.last_execute_code['code'] and self.last_execute_code['success']=='True': # if execute, visualize value code = self.last_execute_code['code'] vari = [i.strip() for i in code.split('(')[0].split('=')] - self.logger.info('-----code: {}', code) - self.logger.info('-----vari: {}', vari) + self.logger.info('-----code: {} -----vari: {}', code, vari) tips_for_execution_success = True if len(vari)>1: #if self.executor.variables[vari[0]]['value'] is not None: @@ -1200,7 +1154,6 @@ def run_pipeline_after_doublechecking_execution_code(self, user_input): new_file_list = set(new_img_list)-set(self.image_file_list) if new_file_list: for new_img in new_file_list: - self.logger.info('send image to frontend') base64_image = convert_image_to_base64(os.path.join(self.image_folder,new_img)) if base64_image: self.callback_func('log', "We visualize the obtained figure. Try to zoom in or out the figure.", "Executed results [Success]", imageData=base64_image) @@ -1211,7 +1164,10 @@ def run_pipeline_after_doublechecking_execution_code(self, user_input): self.retry_execution_count = 0 else: self.logger.info('Execution Error: {}', content) - self.callback_func('log', content, "Executed results [Fail]") + try: + self.callback_func('log', "\n".join(list(set(output_list))), "Executed results [Fail]") + except: + self.callback_func('log', content, "Executed results [Fail]") if self.retry_execution_countstart split tuple variables!') length = len(result_variable['value']) + self.counter = max(self.counter, self.get_newest_counter_from_namespace()) new_variables = [f"result_{self.counter + i + 1}" for i in range(length)] #self.counter+=1 new_code = ', '.join(new_variables) + f" = {result_name_tuple}" @@ -519,13 +524,15 @@ def execute_api_call(self, api_call_code, code_type, output_file=None): #self.logger.info('self.execute_code', self.execute_code) return captured_output_value except Exception as e: - error = f"{e}" + error_info = traceback.format_exc() + error = f"{error_info}" self.logger.info('==?error in execute api call:', error) self.execute_code.append({'code':api_call_code,'code_type':code_type, 'success':'False', 'error': error}) return error def save_variables_to_json(self, ): save_data = {name: details["type"] for name, details in self.variables.items()} save_json(os.path.join(self.save_directory,f"{self.session_id}_variables.json"), save_data) + @lru_cache(maxsize=10) def load_variables_to_json(self): saved_vars = load_json(os.path.join(self.save_directory,f"{self.session_id}_variables.json")) variables = {} diff --git a/src/models/dialog_classifier.py b/src/models/dialog_classifier.py index 852f587..fc0ebb9 100644 --- a/src/models/dialog_classifier.py +++ b/src/models/dialog_classifier.py @@ -17,7 +17,7 @@ from src.gpt.utils import load_json, save_json -class Dialog_Gaussian_classificaiton: +class Dialog_Gaussian_classification: def __init__(self, threshold=0.05): self.threshold = threshold