diff --git a/src/uipath/_cli/_evals/_runtime.py b/src/uipath/_cli/_evals/_runtime.py index 60202d12d..0a06c9d8e 100644 --- a/src/uipath/_cli/_evals/_runtime.py +++ b/src/uipath/_cli/_evals/_runtime.py @@ -192,6 +192,8 @@ class UiPathEvalContext: enable_mocker_cache: bool = False report_coverage: bool = False model_settings_id: str = "default" + resume: bool = False + job_id: str | None = None class UiPathEvalRuntime: @@ -218,7 +220,8 @@ def __init__( self.trace_manager.tracer_provider.add_span_processor(span_processor) self.logs_exporter: ExecutionLogsExporter = ExecutionLogsExporter() - self.execution_id = str(uuid.uuid4()) + # Use job_id if available (for single runtime runs), otherwise generate UUID + self.execution_id = context.job_id or str(uuid.uuid4()) self.coverage = coverage.Coverage(branch=True) async def __aenter__(self) -> "UiPathEvalRuntime": @@ -296,6 +299,17 @@ async def initiate_evaluation( ) async def execute(self) -> UiPathRuntimeResult: + logger.info("=" * 80) + logger.info("EVAL RUNTIME: Starting evaluation execution") + logger.info(f"EVAL RUNTIME: Execution ID: {self.execution_id}") + logger.info(f"EVAL RUNTIME: Job ID: {self.context.job_id}") + logger.info(f"EVAL RUNTIME: Resume mode: {self.context.resume}") + if self.context.resume: + logger.info( + "🟢 EVAL RUNTIME: RESUME MODE ENABLED - Will resume from suspended state" + ) + logger.info("=" * 80) + # Configure model settings override before creating runtime await self._configure_model_settings_override() @@ -381,9 +395,41 @@ async def execute(self) -> UiPathRuntimeResult: wait_for_completion=False, ) + # Collect triggers from all evaluation runs (pass-through from inner runtime) + logger.info("=" * 80) + logger.info( + "EVAL RUNTIME: Collecting triggers from all evaluation runs" + ) + all_triggers = [] + for eval_run_result in results.evaluation_set_results: + if ( + eval_run_result.agent_execution_output + and eval_run_result.agent_execution_output.result + ): + runtime_result = ( + eval_run_result.agent_execution_output.result + ) + if runtime_result.trigger: + all_triggers.append(runtime_result.trigger) + if runtime_result.triggers: + all_triggers.extend(runtime_result.triggers) + + if all_triggers: + logger.info( + f"EVAL RUNTIME: ✅ Passing through {len(all_triggers)} trigger(s) to top-level result" + ) + for i, trigger in enumerate(all_triggers, 1): + logger.info( + f"EVAL RUNTIME: Pass-through trigger {i}: {trigger.model_dump(by_alias=True)}" + ) + else: + logger.info("EVAL RUNTIME: No triggers to pass through") + logger.info("=" * 80) + result = UiPathRuntimeResult( output={**results.model_dump(by_alias=True)}, status=UiPathRuntimeStatus.SUCCESSFUL, + triggers=all_triggers if all_triggers else None, ) return result except Exception as e: @@ -448,6 +494,14 @@ async def _execute_eval( agent_execution_output = await self.execute_runtime( eval_item, execution_id, runtime ) + + logger.info( + f"DEBUG: Agent execution result status: {agent_execution_output.result.status}" + ) + logger.info( + f"DEBUG: Agent execution result trigger: {agent_execution_output.result.trigger}" + ) + except Exception as e: if self.context.verbose: if isinstance(e, EvaluationRuntimeException): @@ -483,6 +537,69 @@ async def _execute_eval( ) raise + # Check if execution was suspended (e.g., waiting for RPA job completion) + if ( + agent_execution_output.result.status + == UiPathRuntimeStatus.SUSPENDED + ): + # For suspended executions, we don't run evaluators yet + # The serverless executor should save the triggers and resume later + logger.info("=" * 80) + logger.info( + f"🔴 EVAL RUNTIME: DETECTED SUSPENSION for eval '{eval_item.name}' (id: {eval_item.id})" + ) + logger.info("EVAL RUNTIME: Agent returned SUSPENDED status") + + # Extract triggers from result + triggers = [] + if agent_execution_output.result.trigger: + triggers.append(agent_execution_output.result.trigger) + if agent_execution_output.result.triggers: + triggers.extend(agent_execution_output.result.triggers) + + logger.info( + f"EVAL RUNTIME: Extracted {len(triggers)} trigger(s) from suspended execution" + ) + for i, trigger in enumerate(triggers, 1): + logger.info( + f"EVAL RUNTIME: Trigger {i}: {trigger.model_dump(by_alias=True)}" + ) + logger.info("=" * 80) + + # IMPORTANT: Always include execution output with triggers when suspended + # This ensures triggers are visible in the output JSON for serverless executor + evaluation_run_results.agent_execution_output = ( + convert_eval_execution_output_to_serializable( + agent_execution_output + ) + ) + + # Publish suspended status event + await self.event_bus.publish( + EvaluationEvents.UPDATE_EVAL_RUN, + EvalRunUpdatedEvent( + execution_id=execution_id, + eval_item=eval_item, + eval_results=[], + success=True, # Not failed, just suspended + agent_output={ + "status": "suspended", + "triggers": [ + t.model_dump(by_alias=True) for t in triggers + ], + }, + agent_execution_time=agent_execution_output.execution_time, + spans=agent_execution_output.spans, + logs=agent_execution_output.logs, + exception_details=None, + ), + wait_for_completion=False, + ) + + # Return partial results with trigger information + # The evaluation will be completed when resumed + return evaluation_run_results + if self.context.verbose: evaluation_run_results.agent_execution_output = ( convert_eval_execution_output_to_serializable( diff --git a/src/uipath/_cli/cli_eval.py b/src/uipath/_cli/cli_eval.py index 570832b47..b7e667337 100644 --- a/src/uipath/_cli/cli_eval.py +++ b/src/uipath/_cli/cli_eval.py @@ -106,6 +106,12 @@ def setup_reporting_prereq(no_report: bool) -> bool: type=click.Path(exists=False), help="File path where traces will be written in JSONL format", ) +@click.option( + "--resume", + is_flag=True, + default=False, + help="Resume execution from a previous suspended state", +) def eval( entrypoint: str | None, eval_set: str | None, @@ -118,6 +124,7 @@ def eval( report_coverage: bool, model_settings_id: str, trace_file: str | None, + resume: bool, ) -> None: """Run an evaluation set against the agent. @@ -131,6 +138,7 @@ def eval( enable_mocker_cache: Enable caching for LLM mocker responses report_coverage: Report evaluation coverage model_settings_id: Model settings ID to override agent settings + resume: Resume execution from a previous suspended state """ should_register_progress_reporter = setup_reporting_prereq(no_report) @@ -166,6 +174,7 @@ def eval( eval_context.eval_ids = eval_ids eval_context.report_coverage = report_coverage eval_context.model_settings_id = model_settings_id + eval_context.resume = resume try: @@ -189,6 +198,9 @@ async def execute_eval(): trace_manager=trace_manager, command="eval", ) as ctx: + # Set job_id in eval context for single runtime runs + eval_context.job_id = ctx.job_id + if ctx.job_id: trace_manager.add_span_exporter(LlmOpsHttpExporter()) diff --git a/src/uipath/functions/runtime.py b/src/uipath/functions/runtime.py index 3267216fd..3867fde8f 100644 --- a/src/uipath/functions/runtime.py +++ b/src/uipath/functions/runtime.py @@ -22,6 +22,11 @@ UiPathErrorContract, UiPathRuntimeError, ) +from uipath.runtime.resumable.trigger import ( + UiPathResumeTrigger, + UiPathResumeTriggerName, + UiPathResumeTriggerType, +) from uipath.runtime.schema import UiPathRuntimeSchema from .schema_gen import get_type_schema @@ -124,6 +129,71 @@ async def _execute_function( return convert_from_class(result) if result is not None else {} + def _detect_langgraph_interrupt( + self, output: dict[str, Any] + ) -> UiPathResumeTrigger | None: + """Detect LangGraph __interrupt__ field and extract InvokeProcess trigger. + + LangGraph's interrupt() creates an __interrupt__ field in the output dict: + { + "query": "...", + "final_result": "", + "__interrupt__": [Interrupt(value=InvokeProcess(...), id="...")] + } + + We extract the InvokeProcess from the interrupt and convert it to a UiPath trigger. + """ + try: + if not isinstance(output, dict): + return None + + # Check for LangGraph's __interrupt__ field + if "__interrupt__" not in output: + return None + + interrupts = output["__interrupt__"] + if not interrupts or not isinstance(interrupts, list): + logger.warning("__interrupt__ field exists but is not a list") + return None + + # Extract first interrupt + interrupt_obj = interrupts[0] + if not hasattr(interrupt_obj, "value"): + logger.warning("Interrupt object missing 'value' attribute") + return None + + invoke_process = interrupt_obj.value + + # Check if it's an InvokeProcess object (has name and input_arguments) + if not ( + hasattr(invoke_process, "name") + and hasattr(invoke_process, "input_arguments") + ): + logger.warning( + f"Interrupt value is not InvokeProcess (type: {type(invoke_process)})" + ) + return None + + logger.info( + f"Detected LangGraph interrupt - suspending execution for process: {invoke_process.name}" + ) + + # Convert InvokeProcess to UiPath trigger + return UiPathResumeTrigger( + trigger_type=UiPathResumeTriggerType.JOB, + trigger_name=UiPathResumeTriggerName.JOB, + item_key=f"job-{uuid.uuid4()}", # Generate unique job key + folder_path=getattr(invoke_process, "process_folder_path", "Shared"), + payload={ + "process_name": invoke_process.name, + "input_arguments": invoke_process.input_arguments or {}, + "folder_key": getattr(invoke_process, "process_folder_key", None), + }, + ) + except Exception as e: + logger.warning(f"Failed to detect LangGraph interrupt: {e}") + return None + async def execute( self, input: dict[str, Any] | None = None, @@ -134,6 +204,23 @@ async def execute( func = self._load_function() output = await self._execute_function(func, input or {}) + logger.info( + f"Output type: {type(output)}, has __interrupt__: {'__interrupt__' in output if isinstance(output, dict) else False}" + ) + + # Check if output represents a LangGraph interrupt (suspend) + trigger = self._detect_langgraph_interrupt(output) + logger.info(f"Trigger detected: {trigger}") + if trigger: + logger.info( + f"Detected LangGraph interrupt - suspending execution with trigger: {trigger.item_key}" + ) + return UiPathRuntimeResult( + output=None, # No final output yet (suspended) + status=UiPathRuntimeStatus.SUSPENDED, + trigger=trigger, + ) + return UiPathRuntimeResult( output=output, status=UiPathRuntimeStatus.SUCCESSFUL,