From e6d8a09d09f50af3e1ae2b58a9a3d0e31a4f0a5c Mon Sep 17 00:00:00 2001 From: Evan Mattson Date: Mon, 12 Jan 2026 12:00:21 +0900 Subject: [PATCH] fix(workflows): rename WorkflowOutputEvent.source_executor_id to executor_id for API consistency --- python/packages/core/agent_framework/_workflows/_agent.py | 8 ++++---- .../packages/core/agent_framework/_workflows/_events.py | 8 ++++---- .../core/agent_framework/_workflows/_workflow_context.py | 2 +- python/packages/devui/agent_framework_devui/_mapper.py | 4 ++-- .../packages/devui/frontend/src/types/agent-framework.ts | 5 ++--- python/packages/devui/tests/test_mapper.py | 4 ++-- .../workflows/control-flow/sequential_streaming.py | 2 +- 7 files changed, 16 insertions(+), 17 deletions(-) diff --git a/python/packages/core/agent_framework/_workflows/_agent.py b/python/packages/core/agent_framework/_workflows/_agent.py index 7eec2472f0..f328b607ef 100644 --- a/python/packages/core/agent_framework/_workflows/_agent.py +++ b/python/packages/core/agent_framework/_workflows/_agent.py @@ -315,14 +315,14 @@ def _convert_workflow_event_to_agent_update( return update return None - case WorkflowOutputEvent(data=data, source_executor_id=source_executor_id): + case WorkflowOutputEvent(data=data, executor_id=executor_id): # Convert workflow output to an agent response update. # Handle different data types appropriately. # Skip AgentRunResponse from AgentExecutor with output_response=True # since streaming events already surfaced the content. if isinstance(data, AgentRunResponse): - executor = self.workflow.executors.get(source_executor_id) + executor = self.workflow.executors.get(executor_id) if isinstance(executor, AgentExecutor) and executor.output_response: return None @@ -332,7 +332,7 @@ def _convert_workflow_event_to_agent_update( return AgentRunResponseUpdate( contents=list(data.contents), role=data.role, - author_name=data.author_name or source_executor_id, + author_name=data.author_name or executor_id, response_id=response_id, message_id=str(uuid.uuid4()), created_at=datetime.now(tz=timezone.utc).strftime("%Y-%m-%dT%H:%M:%S.%fZ"), @@ -344,7 +344,7 @@ def _convert_workflow_event_to_agent_update( return AgentRunResponseUpdate( contents=contents, role=Role.ASSISTANT, - author_name=source_executor_id, + author_name=executor_id, response_id=response_id, message_id=str(uuid.uuid4()), created_at=datetime.now(tz=timezone.utc).strftime("%Y-%m-%dT%H:%M:%S.%fZ"), diff --git a/python/packages/core/agent_framework/_workflows/_events.py b/python/packages/core/agent_framework/_workflows/_events.py index 57c600519d..405fee356d 100644 --- a/python/packages/core/agent_framework/_workflows/_events.py +++ b/python/packages/core/agent_framework/_workflows/_events.py @@ -278,20 +278,20 @@ class WorkflowOutputEvent(WorkflowEvent): def __init__( self, data: Any, - source_executor_id: str, + executor_id: str, ): """Initialize the workflow output event. Args: data: The output yielded by the executor. - source_executor_id: ID of the executor that yielded the output. + executor_id: ID of the executor that yielded the output. """ super().__init__(data) - self.source_executor_id = source_executor_id + self.executor_id = executor_id def __repr__(self) -> str: """Return a string representation of the workflow output event.""" - return f"{self.__class__.__name__}(data={self.data}, source_executor_id={self.source_executor_id})" + return f"{self.__class__.__name__}(data={self.data}, executor_id={self.executor_id})" class SuperStepEvent(WorkflowEvent): diff --git a/python/packages/core/agent_framework/_workflows/_workflow_context.py b/python/packages/core/agent_framework/_workflows/_workflow_context.py index cffeb02aa0..8233e0b0a0 100644 --- a/python/packages/core/agent_framework/_workflows/_workflow_context.py +++ b/python/packages/core/agent_framework/_workflows/_workflow_context.py @@ -344,7 +344,7 @@ async def yield_output(self, output: T_W_Out) -> None: self._yielded_outputs.append(copy.deepcopy(output)) with _framework_event_origin(): - event = WorkflowOutputEvent(data=output, source_executor_id=self._executor_id) + event = WorkflowOutputEvent(data=output, executor_id=self._executor_id) await self._runner_context.add_event(event) async def add_event(self, event: WorkflowEvent) -> None: diff --git a/python/packages/devui/agent_framework_devui/_mapper.py b/python/packages/devui/agent_framework_devui/_mapper.py index 021a4a4549..05c568d9e6 100644 --- a/python/packages/devui/agent_framework_devui/_mapper.py +++ b/python/packages/devui/agent_framework_devui/_mapper.py @@ -881,7 +881,7 @@ async def _convert_workflow_event(self, event: Any, context: dict[str, Any]) -> # Handle WorkflowOutputEvent separately to preserve output data if event_class == "WorkflowOutputEvent": output_data = getattr(event, "data", None) - source_executor_id = getattr(event, "source_executor_id", "unknown") + executor_id = getattr(event, "executor_id", "unknown") if output_data is not None: # Import required types @@ -942,7 +942,7 @@ async def _convert_workflow_event(self, event: Any, context: dict[str, Any]) -> # Emit output_item.added for each yield_output logger.debug( f"WorkflowOutputEvent converted to output_item.added " - f"(executor: {source_executor_id}, length: {len(text)})" + f"(executor: {executor_id}, length: {len(text)})" ) return [ ResponseOutputItemAddedEvent( diff --git a/python/packages/devui/frontend/src/types/agent-framework.ts b/python/packages/devui/frontend/src/types/agent-framework.ts index 6d12a4cc8a..9a493445aa 100644 --- a/python/packages/devui/frontend/src/types/agent-framework.ts +++ b/python/packages/devui/frontend/src/types/agent-framework.ts @@ -269,8 +269,7 @@ export interface AgentThread { export interface WorkflowEvent { type?: string; // Event class name like "WorkflowOutputEvent", "WorkflowCompletedEvent", "ExecutorInvokedEvent", etc. data?: unknown; - executor_id?: string; // Present for executor-related events - source_executor_id?: string; // Present for WorkflowOutputEvent + executor_id?: string; // Present for executor-related events and WorkflowOutputEvent } export interface WorkflowStartedEvent extends WorkflowEvent { @@ -286,7 +285,7 @@ export interface WorkflowCompletedEvent extends WorkflowEvent { export interface WorkflowOutputEvent extends WorkflowEvent { // Event-specific data for workflow output (new) readonly event_type: "workflow_output"; - source_executor_id: string; // ID of executor that yielded the output + executor_id: string; // ID of executor that yielded the output } export interface WorkflowWarningEvent extends WorkflowEvent { diff --git a/python/packages/devui/tests/test_mapper.py b/python/packages/devui/tests/test_mapper.py index 2de788257b..effe811189 100644 --- a/python/packages/devui/tests/test_mapper.py +++ b/python/packages/devui/tests/test_mapper.py @@ -587,7 +587,7 @@ async def test_workflow_output_event(mapper: MessageMapper, test_request: AgentF """Test WorkflowOutputEvent is converted to output_item.added.""" from agent_framework._workflows._events import WorkflowOutputEvent - event = WorkflowOutputEvent(data="Final workflow output", source_executor_id="final_executor") + event = WorkflowOutputEvent(data="Final workflow output", executor_id="final_executor") events = await mapper.convert_event(event, test_request) # WorkflowOutputEvent should emit output_item.added @@ -609,7 +609,7 @@ async def test_workflow_output_event_with_list_data(mapper: MessageMapper, test_ ChatMessage(role=Role.USER, contents=[TextContent(text="Hello")]), ChatMessage(role=Role.ASSISTANT, contents=[TextContent(text="World")]), ] - event = WorkflowOutputEvent(data=messages, source_executor_id="complete") + event = WorkflowOutputEvent(data=messages, executor_id="complete") events = await mapper.convert_event(event, test_request) assert len(events) == 1 diff --git a/python/samples/getting_started/workflows/control-flow/sequential_streaming.py b/python/samples/getting_started/workflows/control-flow/sequential_streaming.py index 3030d4ff44..ce7bc92758 100644 --- a/python/samples/getting_started/workflows/control-flow/sequential_streaming.py +++ b/python/samples/getting_started/workflows/control-flow/sequential_streaming.py @@ -77,7 +77,7 @@ async def main(): Event: ExecutorCompletedEvent(executor_id=upper_case_executor) Event: ExecutorInvokedEvent(executor_id=reverse_text_executor) Event: ExecutorCompletedEvent(executor_id=reverse_text_executor) - Event: WorkflowOutputEvent(data='DLROW OLLEH', source_executor_id=reverse_text_executor) + Event: WorkflowOutputEvent(data='DLROW OLLEH', executor_id=reverse_text_executor) Workflow completed with result: DLROW OLLEH """