diff --git a/python/packages/core/agent_framework/_workflows/_agent.py b/python/packages/core/agent_framework/_workflows/_agent.py index b5c723d405..47936e0d73 100644 --- a/python/packages/core/agent_framework/_workflows/_agent.py +++ b/python/packages/core/agent_framework/_workflows/_agent.py @@ -315,14 +315,14 @@ def _convert_workflow_event_to_agent_update( return update return None - case WorkflowOutputEvent(data=data, source_executor_id=source_executor_id): + case WorkflowOutputEvent(data=data, executor_id=executor_id): # Convert workflow output to an agent response update. # Handle different data types appropriately. # Skip AgentResponse from AgentExecutor with output_response=True # since streaming events already surfaced the content. if isinstance(data, AgentResponse): - executor = self.workflow.executors.get(source_executor_id) + executor = self.workflow.executors.get(executor_id) if isinstance(executor, AgentExecutor) and executor.output_response: return None @@ -332,7 +332,7 @@ def _convert_workflow_event_to_agent_update( return AgentResponseUpdate( contents=list(data.contents), role=data.role, - author_name=data.author_name or source_executor_id, + author_name=data.author_name or executor_id, response_id=response_id, message_id=str(uuid.uuid4()), created_at=datetime.now(tz=timezone.utc).strftime("%Y-%m-%dT%H:%M:%S.%fZ"), @@ -344,7 +344,7 @@ def _convert_workflow_event_to_agent_update( return AgentResponseUpdate( contents=contents, role=Role.ASSISTANT, - author_name=source_executor_id, + author_name=executor_id, response_id=response_id, message_id=str(uuid.uuid4()), created_at=datetime.now(tz=timezone.utc).strftime("%Y-%m-%dT%H:%M:%S.%fZ"), diff --git a/python/packages/core/agent_framework/_workflows/_events.py b/python/packages/core/agent_framework/_workflows/_events.py index 2eff8f04a3..dcd6ab5866 100644 --- a/python/packages/core/agent_framework/_workflows/_events.py +++ b/python/packages/core/agent_framework/_workflows/_events.py @@ -278,20 +278,20 @@ class WorkflowOutputEvent(WorkflowEvent): def __init__( self, data: Any, - source_executor_id: str, + executor_id: str, ): """Initialize the workflow output event. Args: data: The output yielded by the executor. - source_executor_id: ID of the executor that yielded the output. + executor_id: ID of the executor that yielded the output. """ super().__init__(data) - self.source_executor_id = source_executor_id + self.executor_id = executor_id def __repr__(self) -> str: """Return a string representation of the workflow output event.""" - return f"{self.__class__.__name__}(data={self.data}, source_executor_id={self.source_executor_id})" + return f"{self.__class__.__name__}(data={self.data}, executor_id={self.executor_id})" class SuperStepEvent(WorkflowEvent): diff --git a/python/packages/core/agent_framework/_workflows/_workflow_context.py b/python/packages/core/agent_framework/_workflows/_workflow_context.py index 053221712c..893f0ccfe9 100644 --- a/python/packages/core/agent_framework/_workflows/_workflow_context.py +++ b/python/packages/core/agent_framework/_workflows/_workflow_context.py @@ -358,7 +358,7 @@ async def yield_output(self, output: T_W_Out) -> None: self._yielded_outputs.append(copy.deepcopy(output)) with _framework_event_origin(): - event = WorkflowOutputEvent(data=output, source_executor_id=self._executor_id) + event = WorkflowOutputEvent(data=output, executor_id=self._executor_id) await self._runner_context.add_event(event) async def add_event(self, event: WorkflowEvent) -> None: diff --git a/python/packages/devui/agent_framework_devui/_mapper.py b/python/packages/devui/agent_framework_devui/_mapper.py index e86eeac18d..71bbda9b85 100644 --- a/python/packages/devui/agent_framework_devui/_mapper.py +++ b/python/packages/devui/agent_framework_devui/_mapper.py @@ -881,7 +881,7 @@ async def _convert_workflow_event(self, event: Any, context: dict[str, Any]) -> # Handle WorkflowOutputEvent separately to preserve output data if event_class == "WorkflowOutputEvent": output_data = getattr(event, "data", None) - source_executor_id = getattr(event, "source_executor_id", "unknown") + executor_id = getattr(event, "executor_id", "unknown") if output_data is not None: # Import required types @@ -942,7 +942,7 @@ async def _convert_workflow_event(self, event: Any, context: dict[str, Any]) -> # Emit output_item.added for each yield_output logger.debug( f"WorkflowOutputEvent converted to output_item.added " - f"(executor: {source_executor_id}, length: {len(text)})" + f"(executor: {executor_id}, length: {len(text)})" ) return [ ResponseOutputItemAddedEvent( diff --git a/python/packages/devui/frontend/src/types/agent-framework.ts b/python/packages/devui/frontend/src/types/agent-framework.ts index 98f73c1841..a41b0ce9a1 100644 --- a/python/packages/devui/frontend/src/types/agent-framework.ts +++ b/python/packages/devui/frontend/src/types/agent-framework.ts @@ -269,8 +269,7 @@ export interface AgentThread { export interface WorkflowEvent { type?: string; // Event class name like "WorkflowOutputEvent", "WorkflowCompletedEvent", "ExecutorInvokedEvent", etc. data?: unknown; - executor_id?: string; // Present for executor-related events - source_executor_id?: string; // Present for WorkflowOutputEvent + executor_id?: string; // Present for executor-related events and WorkflowOutputEvent } export interface WorkflowStartedEvent extends WorkflowEvent { @@ -286,7 +285,7 @@ export interface WorkflowCompletedEvent extends WorkflowEvent { export interface WorkflowOutputEvent extends WorkflowEvent { // Event-specific data for workflow output (new) readonly event_type: "workflow_output"; - source_executor_id: string; // ID of executor that yielded the output + executor_id: string; // ID of executor that yielded the output } export interface WorkflowWarningEvent extends WorkflowEvent { diff --git a/python/packages/devui/tests/test_mapper.py b/python/packages/devui/tests/test_mapper.py index b1762f79b9..faf0c831d8 100644 --- a/python/packages/devui/tests/test_mapper.py +++ b/python/packages/devui/tests/test_mapper.py @@ -585,7 +585,7 @@ async def test_workflow_output_event(mapper: MessageMapper, test_request: AgentF """Test WorkflowOutputEvent is converted to output_item.added.""" from agent_framework._workflows._events import WorkflowOutputEvent - event = WorkflowOutputEvent(data="Final workflow output", source_executor_id="final_executor") + event = WorkflowOutputEvent(data="Final workflow output", executor_id="final_executor") events = await mapper.convert_event(event, test_request) # WorkflowOutputEvent should emit output_item.added @@ -607,7 +607,7 @@ async def test_workflow_output_event_with_list_data(mapper: MessageMapper, test_ ChatMessage(role=Role.USER, contents=[TextContent(text="Hello")]), ChatMessage(role=Role.ASSISTANT, contents=[TextContent(text="World")]), ] - event = WorkflowOutputEvent(data=messages, source_executor_id="complete") + event = WorkflowOutputEvent(data=messages, executor_id="complete") events = await mapper.convert_event(event, test_request) assert len(events) == 1 diff --git a/python/samples/getting_started/workflows/control-flow/sequential_streaming.py b/python/samples/getting_started/workflows/control-flow/sequential_streaming.py index 3030d4ff44..ce7bc92758 100644 --- a/python/samples/getting_started/workflows/control-flow/sequential_streaming.py +++ b/python/samples/getting_started/workflows/control-flow/sequential_streaming.py @@ -77,7 +77,7 @@ async def main(): Event: ExecutorCompletedEvent(executor_id=upper_case_executor) Event: ExecutorInvokedEvent(executor_id=reverse_text_executor) Event: ExecutorCompletedEvent(executor_id=reverse_text_executor) - Event: WorkflowOutputEvent(data='DLROW OLLEH', source_executor_id=reverse_text_executor) + Event: WorkflowOutputEvent(data='DLROW OLLEH', executor_id=reverse_text_executor) Workflow completed with result: DLROW OLLEH """