diff --git a/skyvern/forge/agent.py b/skyvern/forge/agent.py index 33deda2ade..3d5170e491 100644 --- a/skyvern/forge/agent.py +++ b/skyvern/forge/agent.py @@ -342,6 +342,23 @@ async def execute_step( detailed_output, ) = await self._initialize_execution_state(task, step, workflow_run, browser_session_id) + if not task.navigation_goal and not task.data_extraction_goal: + # This is most likely a GOTO_URL task block + # mark step as completed and mark task as completed + step = await self.update_step( + step, status=StepStatus.completed, is_last=True, output=AgentStepOutput(action_results=[]) + ) + task = await self.update_task(task, status=TaskStatus.completed) + await self.clean_up_task( + task=task, + last_step=step, + api_key=api_key, + need_call_webhook=True, + close_browser_on_completion=close_browser_on_completion, + browser_session_id=browser_session_id, + ) + return step, detailed_output, None + if page := await browser_state.get_working_page(): await self.register_async_operations(organization, task, page) diff --git a/skyvern/forge/sdk/models.py b/skyvern/forge/sdk/models.py index 556bd22251..e8e460025f 100644 --- a/skyvern/forge/sdk/models.py +++ b/skyvern/forge/sdk/models.py @@ -18,7 +18,7 @@ class StepStatus(StrEnum): def can_update_to(self, new_status: StepStatus) -> bool: allowed_transitions: dict[StepStatus, set[StepStatus]] = { - StepStatus.created: {StepStatus.running, StepStatus.failed, StepStatus.canceled}, + StepStatus.created: {StepStatus.running, StepStatus.failed, StepStatus.canceled, StepStatus.completed}, StepStatus.running: {StepStatus.completed, StepStatus.failed, StepStatus.canceled}, StepStatus.failed: set(), StepStatus.completed: set(), @@ -80,9 +80,6 @@ def validate_update( if self.output is not None and output is not None: raise ValueError(f"cant_override_output({self.step_id})") - if is_last and not self.status.is_terminal(): - raise ValueError(f"is_last_but_status_not_terminal({self.status},{self.step_id})") - if is_last is False: raise ValueError(f"cant_set_is_last_to_false({self.step_id})") diff --git a/skyvern/forge/sdk/services/observer_service.py b/skyvern/forge/sdk/services/observer_service.py index 06a7e2fae7..e8e0bd07b1 100644 --- a/skyvern/forge/sdk/services/observer_service.py +++ b/skyvern/forge/sdk/services/observer_service.py @@ -34,6 +34,7 @@ ForLoopBlock, NavigationBlock, TaskBlock, + UrlBlock, ) from skyvern.forge.sdk.workflow.models.parameter import PARAMETER_TYPE, ContextParameter from skyvern.forge.sdk.workflow.models.workflow import ( @@ -51,6 +52,7 @@ ForLoopBlockYAML, NavigationBlockYAML, TaskBlockYAML, + UrlBlockYAML, WorkflowCreateYAMLRequest, WorkflowDefinitionYAML, ) @@ -348,159 +350,175 @@ async def run_observer_task_helper( max_iterations = int_max_iterations_override or DEFAULT_MAX_ITERATIONS for i in range(max_iterations): LOG.info(f"Observer iteration i={i}", workflow_run_id=workflow_run_id, url=url) - try: - browser_state = await app.BROWSER_MANAGER.get_or_create_for_workflow_run( - workflow_run=workflow_run, + task_type = "" + plan = "" + block: BlockTypeVar | None = None + task_history_record: dict[str, Any] = {} + context = skyvern_context.ensure_context() + + if i == 0: + # The first iteration is always a GOTO_URL task + task_type = "goto_url" + plan = f"Go to this website: {url}" + task_history_record = {"type": task_type, "task": plan} + block, block_yaml_list, parameter_yaml_list = await _generate_goto_url_task( + workflow_id=workflow_id, url=url, - browser_session_id=browser_session_id, ) - scraped_page = await scrape_website( - browser_state, - url, - app.AGENT_FUNCTION.cleanup_element_tree_factory(), - scrape_exclude=app.scrape_exclude, + task_history_record = {"type": task_type, "task": plan} + else: + try: + browser_state = await app.BROWSER_MANAGER.get_or_create_for_workflow_run( + workflow_run=workflow_run, + url=url, + browser_session_id=browser_session_id, + ) + scraped_page = await scrape_website( + browser_state, + url, + app.AGENT_FUNCTION.cleanup_element_tree_factory(), + scrape_exclude=app.scrape_exclude, + ) + element_tree_in_prompt: str = scraped_page.build_element_tree(ElementTreeFormat.HTML) + page = await browser_state.get_working_page() + except Exception: + LOG.exception( + "Failed to get browser state or scrape website in observer iteration", iteration=i, url=url + ) + continue + current_url = str( + await SkyvernFrame.evaluate(frame=page, expression="() => document.location.href") if page else url ) - element_tree_in_prompt: str = scraped_page.build_element_tree(ElementTreeFormat.HTML) - page = await browser_state.get_working_page() - except Exception: - LOG.exception("Failed to get browser state or scrape website in observer iteration", iteration=i, url=url) - continue - current_url = str( - await SkyvernFrame.evaluate(frame=page, expression="() => document.location.href") if page else url - ) - context = skyvern_context.ensure_context() - observer_prompt = prompt_engine.load_prompt( - "observer", - current_url=current_url, - elements=element_tree_in_prompt, - user_goal=user_prompt, - task_history=task_history, - local_datetime=datetime.now(context.tz_info).isoformat(), - ) - observer_thought = await app.DATABASE.create_observer_thought( - observer_cruise_id=observer_cruise_id, - organization_id=organization_id, - workflow_run_id=workflow_run.workflow_run_id, - workflow_id=workflow.workflow_id, - workflow_permanent_id=workflow.workflow_permanent_id, - observer_thought_type=ObserverThoughtType.plan, - observer_thought_scenario=ObserverThoughtScenario.generate_plan, - ) - observer_response = await app.LLM_API_HANDLER( - prompt=observer_prompt, - screenshots=scraped_page.screenshots, - observer_thought=observer_thought, - ) - LOG.info( - "Observer response", - observer_response=observer_response, - iteration=i, - current_url=current_url, - workflow_run_id=workflow_run_id, - ) - # see if the user goal has achieved or not - user_goal_achieved = observer_response.get("user_goal_achieved", False) - observation = observer_response.get("page_info", "") - thoughts: str = observer_response.get("thoughts", "") - plan: str = observer_response.get("plan", "") - task_type: str = observer_response.get("task_type", "") - # Create and save observer thought - await app.DATABASE.update_observer_thought( - observer_thought_id=observer_thought.observer_thought_id, - organization_id=organization_id, - thought=thoughts, - observation=observation, - answer=plan, - output={"task_type": task_type, "user_goal_achieved": user_goal_achieved}, - ) - - if user_goal_achieved is True: + observer_prompt = prompt_engine.load_prompt( + "observer", + current_url=current_url, + elements=element_tree_in_prompt, + user_goal=user_prompt, + task_history=task_history, + local_datetime=datetime.now(context.tz_info).isoformat(), + ) + observer_thought = await app.DATABASE.create_observer_thought( + observer_cruise_id=observer_cruise_id, + organization_id=organization_id, + workflow_run_id=workflow_run.workflow_run_id, + workflow_id=workflow.workflow_id, + workflow_permanent_id=workflow.workflow_permanent_id, + observer_thought_type=ObserverThoughtType.plan, + observer_thought_scenario=ObserverThoughtScenario.generate_plan, + ) + observer_response = await app.LLM_API_HANDLER( + prompt=observer_prompt, + screenshots=scraped_page.screenshots, + observer_thought=observer_thought, + ) LOG.info( - "User goal achieved. Workflow run will complete. Observer is stopping", + "Observer response", + observer_response=observer_response, iteration=i, + current_url=current_url, workflow_run_id=workflow_run_id, ) - observer_task = await _summarize_observer_task( - observer_task=observer_task, - task_history=task_history, - context=context, - screenshots=scraped_page.screenshots, + # see if the user goal has achieved or not + user_goal_achieved = observer_response.get("user_goal_achieved", False) + observation = observer_response.get("page_info", "") + thoughts: str = observer_response.get("thoughts", "") + plan = observer_response.get("plan", "") + task_type = observer_response.get("task_type", "") + # Create and save observer thought + await app.DATABASE.update_observer_thought( + observer_thought_id=observer_thought.observer_thought_id, + organization_id=organization_id, + thought=thoughts, + observation=observation, + answer=plan, + output={"task_type": task_type, "user_goal_achieved": user_goal_achieved}, ) - break - if not plan: - LOG.warning("No plan found in observer response", observer_response=observer_response) - continue + if user_goal_achieved is True: + LOG.info( + "User goal achieved. Workflow run will complete. Observer is stopping", + iteration=i, + workflow_run_id=workflow_run_id, + ) + observer_task = await _summarize_observer_task( + observer_task=observer_task, + task_history=task_history, + context=context, + screenshots=scraped_page.screenshots, + ) + break - # parse observer repsonse and run the next task - if not task_type: - LOG.error("No task type found in observer response", observer_response=observer_response) - await app.WORKFLOW_SERVICE.mark_workflow_run_as_failed( - workflow_run_id=workflow_run_id, - failure_reason="Skyvern failed to generate a task. Please try again later.", - ) - break + if not plan: + LOG.warning("No plan found in observer response", observer_response=observer_response) + continue - block: BlockTypeVar | None = None - task_history_record: dict[str, Any] = {} - if task_type == "extract": - block, block_yaml_list, parameter_yaml_list = await _generate_extraction_task( - observer_cruise=observer_task, - workflow_id=workflow_id, - workflow_permanent_id=workflow.workflow_permanent_id, - workflow_run_id=workflow_run_id, - current_url=current_url, - element_tree_in_prompt=element_tree_in_prompt, - data_extraction_goal=plan, - task_history=task_history, - ) - task_history_record = {"type": task_type, "task": plan} - elif task_type == "navigate": - original_url = url if i == 0 else None - navigation_goal = MINI_GOAL_TEMPLATE.format(main_goal=user_prompt, mini_goal=plan) - block, block_yaml_list, parameter_yaml_list = await _generate_navigation_task( - workflow_id=workflow_id, - workflow_permanent_id=workflow.workflow_permanent_id, - workflow_run_id=workflow_run_id, - original_url=original_url, - navigation_goal=navigation_goal, - totp_verification_url=observer_task.totp_verification_url, - totp_identifier=observer_task.totp_identifier, - ) - task_history_record = {"type": task_type, "task": plan} - elif task_type == "loop": - try: - block, block_yaml_list, parameter_yaml_list, extraction_obj, inner_task = await _generate_loop_task( + # parse observer repsonse and run the next task + if not task_type: + LOG.error("No task type found in observer response", observer_response=observer_response) + await app.WORKFLOW_SERVICE.mark_workflow_run_as_failed( + workflow_run_id=workflow_run_id, + failure_reason="Skyvern failed to generate a task. Please try again later.", + ) + break + + if task_type == "extract": + block, block_yaml_list, parameter_yaml_list = await _generate_extraction_task( observer_cruise=observer_task, workflow_id=workflow_id, workflow_permanent_id=workflow.workflow_permanent_id, workflow_run_id=workflow_run_id, - plan=plan, - browser_state=browser_state, - original_url=url, - scraped_page=scraped_page, + current_url=current_url, + element_tree_in_prompt=element_tree_in_prompt, + data_extraction_goal=plan, + task_history=task_history, ) - task_history_record = { - "type": task_type, - "task": plan, - "loop_over_values": extraction_obj.get("loop_values"), - "task_inside_the_loop": inner_task, - } - except Exception: - LOG.exception("Failed to generate loop task") + task_history_record = {"type": task_type, "task": plan} + elif task_type == "navigate": + original_url = url if i == 0 else None + navigation_goal = MINI_GOAL_TEMPLATE.format(main_goal=user_prompt, mini_goal=plan) + block, block_yaml_list, parameter_yaml_list = await _generate_navigation_task( + workflow_id=workflow_id, + workflow_permanent_id=workflow.workflow_permanent_id, + workflow_run_id=workflow_run_id, + original_url=original_url, + navigation_goal=navigation_goal, + totp_verification_url=observer_task.totp_verification_url, + totp_identifier=observer_task.totp_identifier, + ) + task_history_record = {"type": task_type, "task": plan} + elif task_type == "loop": + try: + block, block_yaml_list, parameter_yaml_list, extraction_obj, inner_task = await _generate_loop_task( + observer_cruise=observer_task, + workflow_id=workflow_id, + workflow_permanent_id=workflow.workflow_permanent_id, + workflow_run_id=workflow_run_id, + plan=plan, + browser_state=browser_state, + original_url=url, + scraped_page=scraped_page, + ) + task_history_record = { + "type": task_type, + "task": plan, + "loop_over_values": extraction_obj.get("loop_values"), + "task_inside_the_loop": inner_task, + } + except Exception: + LOG.exception("Failed to generate loop task") + await app.WORKFLOW_SERVICE.mark_workflow_run_as_failed( + workflow_run_id=workflow_run_id, + failure_reason="Failed to generate the loop.", + ) + break + else: + LOG.info("Unsupported task type", task_type=task_type) await app.WORKFLOW_SERVICE.mark_workflow_run_as_failed( workflow_run_id=workflow_run_id, - failure_reason="Failed to generate the loop.", + failure_reason=f"Unsupported task block type gets generated: {task_type}", ) break - else: - LOG.info("Unsupported task type", task_type=task_type) - await app.WORKFLOW_SERVICE.mark_workflow_run_as_failed( - workflow_run_id=workflow_run_id, - failure_reason=f"Unsupported task block type gets generated: {task_type}", - ) - break # generate the extraction task block_result = await block.execute_safe( @@ -562,6 +580,11 @@ async def run_observer_task_helper( if block_result.success is True: completion_screenshots = [] try: + browser_state = await app.BROWSER_MANAGER.get_or_create_for_workflow_run( + workflow_run=workflow_run, + url=url, + browser_session_id=browser_session_id, + ) scraped_page = await scrape_website( browser_state, url, @@ -1059,6 +1082,34 @@ async def _generate_navigation_task( ) +async def _generate_goto_url_task( + workflow_id: str, + url: str, +) -> tuple[UrlBlock, list[BLOCK_YAML_TYPES], list[PARAMETER_YAML_TYPES]]: + LOG.info("Generating goto url task", url=url) + # create OutputParameter for the data_extraction block + label = f"goto_url_{_generate_random_string()}" + + url_block_yaml = UrlBlockYAML( + label=label, + url=url, + ) + output_parameter = await app.WORKFLOW_SERVICE.create_output_parameter_for_block( + workflow_id=workflow_id, + block_yaml=url_block_yaml, + ) + # create UrlBlock + return ( + UrlBlock( + label=label, + url=url, + output_parameter=output_parameter, + ), + [url_block_yaml], + [], + ) + + def _generate_random_string(length: int = 5) -> str: # Use the current timestamp as the seed random.seed(os.urandom(16)) diff --git a/skyvern/forge/sdk/workflow/service.py b/skyvern/forge/sdk/workflow/service.py index a1cba6ce41..cd722119e9 100644 --- a/skyvern/forge/sdk/workflow/service.py +++ b/skyvern/forge/sdk/workflow/service.py @@ -52,6 +52,7 @@ TaskV2Block, TextPromptBlock, UploadToS3Block, + UrlBlock, ValidationBlock, WaitBlock, ) @@ -1751,6 +1752,12 @@ async def block_yaml_to_block( max_iterations=block_yaml.max_iterations, output_parameter=output_parameter, ) + elif block_yaml.block_type == BlockType.GOTO_URL: + return UrlBlock( + label=block_yaml.label, + url=block_yaml.url, + output_parameter=output_parameter, + ) raise ValueError(f"Invalid block type {block_yaml.block_type}")