diff --git a/python/packages/core/agent_framework/_workflows/_concurrent.py b/python/packages/core/agent_framework/_workflows/_concurrent.py index 033946afff..4204c8cd6d 100644 --- a/python/packages/core/agent_framework/_workflows/_concurrent.py +++ b/python/packages/core/agent_framework/_workflows/_concurrent.py @@ -292,9 +292,7 @@ class MyCustomExecutor(Executor): ... wf2 = ConcurrentBuilder().register_participants([create_researcher, MyCustomExecutor]).build() """ if self._participants: - raise ValueError( - "Cannot mix .participants([...]) and .register_participants() in the same builder instance." - ) + raise ValueError("Cannot mix .participants() and .register_participants() in the same builder instance.") if self._participant_factories: raise ValueError("register_participants() has already been called on this builder instance.") @@ -330,9 +328,7 @@ def participants(self, participants: Sequence[AgentProtocol | Executor]) -> "Con wf2 = ConcurrentBuilder().participants([researcher_agent, my_custom_executor]).build() """ if self._participant_factories: - raise ValueError( - "Cannot mix .participants([...]) and .register_participants() in the same builder instance." - ) + raise ValueError("Cannot mix .participants() and .register_participants() in the same builder instance.") if self._participants: raise ValueError("participants() has already been called on this builder instance.") @@ -498,6 +494,10 @@ def with_request_info( def _resolve_participants(self) -> list[Executor]: """Resolve participant instances into Executor objects.""" + if not self._participants and not self._participant_factories: + raise ValueError("No participants provided. Call .participants() or .register_participants() first.") + # We don't need to check if both are set since that is handled in the respective methods + participants: list[Executor | AgentProtocol] = [] if self._participant_factories: # Resolve the participant factories now. This doesn't break the factory pattern @@ -549,11 +549,6 @@ def build(self) -> Workflow: workflow = ConcurrentBuilder().participants([agent1, agent2]).build() """ - if not self._participants and not self._participant_factories: - raise ValueError( - "No participants provided. Call .participants([...]) or .register_participants([...]) first." - ) - # Internal nodes dispatcher = _DispatchToAllParticipants(id="dispatcher") aggregator = ( diff --git a/python/packages/core/agent_framework/_workflows/_group_chat.py b/python/packages/core/agent_framework/_workflows/_group_chat.py index d75b805514..83c531b77a 100644 --- a/python/packages/core/agent_framework/_workflows/_group_chat.py +++ b/python/packages/core/agent_framework/_workflows/_group_chat.py @@ -24,7 +24,7 @@ from collections import OrderedDict from collections.abc import Awaitable, Callable, Sequence from dataclasses import dataclass -from typing import Any, ClassVar, cast +from typing import Any, ClassVar, cast, overload from pydantic import BaseModel, Field from typing_extensions import Never @@ -519,9 +519,11 @@ class GroupChatBuilder: def __init__(self) -> None: """Initialize the GroupChatBuilder.""" self._participants: dict[str, AgentProtocol | Executor] = {} + self._participant_factories: list[Callable[[], AgentProtocol | Executor]] = [] # Orchestrator related members self._orchestrator: BaseGroupChatOrchestrator | None = None + self._orchestrator_factory: Callable[[], ChatAgent | BaseGroupChatOrchestrator] | None = None self._selection_func: GroupChatSelectionFunction | None = None self._agent_orchestrator: ChatAgent | None = None self._termination_condition: TerminationCondition | None = None @@ -535,139 +537,193 @@ def __init__(self) -> None: self._request_info_enabled: bool = False self._request_info_filter: set[str] = set() - def with_orchestrator(self, orchestrator: BaseGroupChatOrchestrator) -> "GroupChatBuilder": - """Set the orchestrator for this group chat workflow. - - An group chat orchestrator is responsible for managing the flow of conversation, making - sure all participants are synced and picking the next speaker according to the defined logic - until the termination conditions are met. + @overload + def with_orchestrator(self, *, agent: ChatAgent) -> "GroupChatBuilder": + """Set the orchestrator for this group chat workflow using a ChatAgent. Args: - orchestrator: An instance of BaseGroupChatOrchestrator to manage the group chat. + agent: An instance of ChatAgent to manage the group chat. Returns: Self for fluent chaining. + """ + ... - Raises: - ValueError: If an orchestrator has already been set + @overload + def with_orchestrator( + self, + *, + selection_func: GroupChatSelectionFunction, + orchestrator_name: str | None = None, + ) -> "GroupChatBuilder": + """Set the orchestrator for this group chat workflow using a selection function. - Example: - .. code-block:: python + Args: + selection_func: Callable that receives the current GroupChatState and returns + the name of the next participant to speak, or None to finish. + orchestrator_name: Optional display name for the orchestrator in the workflow. + If not provided, defaults to `GroupChatBuilder.DEFAULT_ORCHESTRATOR_ID`. - from agent_framework import GroupChatBuilder + Returns: + Self for fluent chaining. + """ + ... + @overload + def with_orchestrator(self, *, orchestrator: BaseGroupChatOrchestrator) -> "GroupChatBuilder": + """Set the orchestrator for this group chat workflow using a custom orchestrator. - orchestrator = CustomGroupChatOrchestrator(...) - workflow = GroupChatBuilder().with_orchestrator(orchestrator).participants([agent1, agent2]).build() - """ - if self._orchestrator is not None: - raise ValueError("An orchestrator has already been configured. Call with_orchestrator(...) at most once.") - if self._agent_orchestrator is not None: - raise ValueError( - "An agent orchestrator has already been configured. " - "Call only one of with_orchestrator(...) or with_agent_orchestrator(...)." - ) - if self._selection_func is not None: - raise ValueError( - "A selection function has already been configured. " - "Call only one of with_orchestrator(...) or with_select_speaker_func(...)." - ) + Args: + orchestrator: An instance of BaseGroupChatOrchestrator to manage the group chat. - self._orchestrator = orchestrator - return self + Returns: + Self for fluent chaining. - def with_agent_orchestrator(self, agent: ChatAgent) -> "GroupChatBuilder": - """Set an agent-based orchestrator for this group chat workflow. + Note: + When using a custom orchestrator that implements `BaseGroupChatOrchestrator`, setting + `termination_condition` and `max_rounds` on the builder will have no effect since the + orchestrator is already fully defined. + """ + ... - An agent-based group chat orchestrator uses a ChatAgent to select the next speaker - intelligently based on the conversation context. + @overload + def with_orchestrator( + self, *, orchestrator_factory: Callable[[], ChatAgent | BaseGroupChatOrchestrator] + ) -> "GroupChatBuilder": + """Set the orchestrator for this group chat workflow using a factory. Args: - agent: An instance of ChatAgent to manage the group chat. + orchestrator_factory: A callable that produces either a ChatAgent or BaseGroupChatOrchestrator + when invoked. Returns: Self for fluent chaining. - Raises: - ValueError: If an orchestrator has already been set + Note: + When using a custom orchestrator that implements `BaseGroupChatOrchestrator`, setting + `termination_condition` and `max_rounds` on the builder will have no effect since the + orchestrator is already fully defined. """ - if self._agent_orchestrator is not None: - raise ValueError( - "Agent orchestrator has already been configured. Call with_agent_orchestrator(...) at most once." - ) - if self._orchestrator is not None: - raise ValueError( - "An orchestrator has already been configured. " - "Call only one of with_agent_orchestrator(...) or with_orchestrator(...)." - ) - if self._selection_func is not None: - raise ValueError( - "A selection function has already been configured. " - "Call only one of with_agent_orchestrator(...) or with_select_speaker_func(...)." - ) - - self._agent_orchestrator = agent - return self + ... - def with_select_speaker_func( + def with_orchestrator( self, - selection_func: GroupChatSelectionFunction, *, + agent: ChatAgent | None = None, + selection_func: GroupChatSelectionFunction | None = None, + orchestrator: BaseGroupChatOrchestrator | None = None, + orchestrator_factory: Callable[[], ChatAgent | BaseGroupChatOrchestrator] | None = None, orchestrator_name: str | None = None, ) -> "GroupChatBuilder": - """Define a custom function to select the next speaker in the group chat. + """Set the orchestrator for this group chat workflow. + + An group chat orchestrator is responsible for managing the flow of conversation, making + sure all participants are synced and picking the next speaker according to the defined logic + until the termination conditions are met. - This is a quick way to implement simple orchestration logic without needing a full - GroupChatOrchestrator. The provided function receives the current state of - the group chat and returns the name of the next participant to speak. + There are a few ways to configure the orchestrator: + 1. Provide a ChatAgent instance to use an agent-based orchestrator that selects the next speaker intelligently + 2. Provide a selection function to use that picks the next speaker based on the function logic + 3. Provide a BaseGroupChatOrchestrator instance to use a custom orchestrator + 4. Provide an orchestrator factory to create either a ChatAgent or BaseGroupChatOrchestrator at build time + + You can only use one of the above methods to configure the orchestrator. Args: + agent: An instance of ChatAgent to manage the group chat. selection_func: Callable that receives the current GroupChatState and returns the name of the next participant to speak, or None to finish. - orchestrator_name: Optional display name for the orchestrator in the workflow. - If not provided, defaults to `GroupChatBuilder.DEFAULT_ORCHESTRATOR_ID`. + orchestrator: An instance of BaseGroupChatOrchestrator to manage the group chat. + orchestrator_factory: A callable that produces either a ChatAgent or + BaseGroupChatOrchestrator when invoked. + orchestrator_name: Optional display name for the orchestrator in the workflow if + using a selection function. If not provided, defaults to + `GroupChatBuilder.DEFAULT_ORCHESTRATOR_ID`. This parameter is + ignored if using an agent or custom orchestrator. Returns: - Self for fluent chaining + Self for fluent chaining. Raises: - ValueError: If an orchestrator has already been set + ValueError: If an orchestrator has already been set or if none or multiple + of the parameters are provided. + + Note: + When using a custom orchestrator that implements `BaseGroupChatOrchestrator`, either + via the `orchestrator` or `orchestrator_factory` parameters, setting `termination_condition` + and `max_rounds` on the builder will have no effect since the orchestrator is already + fully defined. Example: .. code-block:: python - from agent_framework import GroupChatBuilder, GroupChatState - - - async def round_robin_selector(state: GroupChatState) -> str: - # Simple round-robin selection among participants - return state.participants[state.current_round % len(state.participants)] + from agent_framework import GroupChatBuilder - workflow = ( - GroupChatBuilder() - .with_select_speaker_func(round_robin_selector, orchestrator_name="Coordinator") - .participants([agent1, agent2]) - .build() - ) + orchestrator = CustomGroupChatOrchestrator(...) + workflow = GroupChatBuilder().with_orchestrator(orchestrator).participants([agent1, agent2]).build() """ - if self._selection_func is not None: + if self._agent_orchestrator is not None: raise ValueError( - "select_speakers_func has already been configured. Call with_select_speakers_func(...) at most once." + "An agent orchestrator has already been configured. Call with_orchestrator(...) once only." ) + + if self._selection_func is not None: + raise ValueError("A selection function has already been configured. Call with_orchestrator(...) once only.") + if self._orchestrator is not None: + raise ValueError("An orchestrator has already been configured. Call with_orchestrator(...) once only.") + + if self._orchestrator_factory is not None: raise ValueError( - "An orchestrator has already been configured. " - "Call only one of with_select_speaker_func(...) or with_orchestrator(...)." + "An orchestrator factory has already been configured. Call with_orchestrator(...) once only." ) - if self._agent_orchestrator is not None: + + if sum(x is not None for x in [agent, selection_func, orchestrator, orchestrator_factory]) != 1: raise ValueError( - "An agent orchestrator has already been configured. " - "Call only one of with_select_speaker_func(...) or with_agent_orchestrator(...)." + "Exactly one of agent, selection_func, orchestrator, or orchestrator_factory must be provided." ) - self._selection_func = selection_func - self._orchestrator_name = orchestrator_name + if agent is not None: + self._agent_orchestrator = agent + elif selection_func is not None: + self._selection_func = selection_func + self._orchestrator_name = orchestrator_name + elif orchestrator is not None: + self._orchestrator = orchestrator + else: + self._orchestrator_factory = orchestrator_factory + + return self + + def register_participants( + self, + participant_factories: Sequence[Callable[[], AgentProtocol | Executor]], + ) -> "GroupChatBuilder": + """Register participant factories for this group chat workflow. + + Args: + participant_factories: Sequence of callables that produce participant definitions + when invoked. Each callable should return either an AgentProtocol instance + (auto-wrapped as AgentExecutor) or an Executor instance. + + Returns: + Self for fluent chaining + + Raises: + ValueError: If participant_factories is empty, or participants + or participant factories are already set + """ + if self._participants: + raise ValueError("Cannot mix .participants() and .register_participants() in the same builder instance.") + + if self._participant_factories: + raise ValueError("register_participants() has already been called on this builder instance.") + + if not participant_factories: + raise ValueError("participant_factories cannot be empty") + + self._participant_factories = list(participant_factories) return self def participants(self, participants: Sequence[AgentProtocol | Executor]) -> "GroupChatBuilder": @@ -682,7 +738,8 @@ def participants(self, participants: Sequence[AgentProtocol | Executor]) -> "Gro Self for fluent chaining Raises: - ValueError: If participants are empty, names are duplicated, or already set + ValueError: If participants are empty, names are duplicated, or participants + or participant factories are already set TypeError: If any participant is not AgentProtocol or Executor instance Example: @@ -693,13 +750,16 @@ def participants(self, participants: Sequence[AgentProtocol | Executor]) -> "Gro workflow = ( GroupChatBuilder() - .with_select_speaker_func(my_selection_function) + .with_orchestrator(selection_func=my_selection_function) .participants([agent1, agent2, custom_executor]) .build() ) """ + if self._participant_factories: + raise ValueError("Cannot mix .participants() and .register_participants() in the same builder instance.") + if self._participants: - raise ValueError("participants have already been set. Call participants(...) at most once.") + raise ValueError("participants have already been set. Call participants() at most once.") if not participants: raise ValueError("participants cannot be empty.") @@ -752,13 +812,13 @@ def stop_after_two_calls(conversation: list[ChatMessage]) -> bool: specialist_agent = ... workflow = ( GroupChatBuilder() - .with_select_speaker_func(my_selection_function) + .with_orchestrator(selection_func=my_selection_function) .participants([agent1, specialist_agent]) .with_termination_condition(stop_after_two_calls) .build() ) """ - if self._orchestrator is not None: + if self._orchestrator is not None or self._orchestrator_factory is not None: logger.warning( "Orchestrator has already been configured; setting termination condition on builder has no effect." ) @@ -778,6 +838,9 @@ def with_max_rounds(self, max_rounds: int | None) -> "GroupChatBuilder": Returns: Self for fluent chaining """ + if self._orchestrator is not None or self._orchestrator_factory is not None: + logger.warning("Orchestrator has already been configured; setting max rounds on builder has no effect.") + self._max_rounds = max_rounds return self @@ -802,7 +865,7 @@ def with_checkpointing(self, checkpoint_storage: CheckpointStorage) -> "GroupCha storage = MemoryCheckpointStorage() workflow = ( GroupChatBuilder() - .with_select_speaker_func(my_selection_function) + .with_orchestrator(selection_func=my_selection_function) .participants([agent1, agent2]) .with_checkpointing(storage) .build() @@ -846,15 +909,22 @@ def _resolve_orchestrator(self, participants: Sequence[Executor]) -> Executor: Args: participants: List of resolved participant executors """ - if self._orchestrator is not None: - return self._orchestrator + if all( + x is None + for x in [self._agent_orchestrator, self._selection_func, self._orchestrator, self._orchestrator_factory] + ): + raise ValueError("No orchestrator has been configured. Call with_orchestrator() to set one.") + # We don't need to check if multiple are set since that is handled in with_orchestrator() - if self._agent_orchestrator is not None and self._selection_func is not None: - raise ValueError( - "Both agent-based orchestrator and selection function are configured; only one can be used at a time." + if self._agent_orchestrator: + return AgentBasedGroupChatOrchestrator( + agent=self._agent_orchestrator, + participant_registry=ParticipantRegistry(participants), + max_rounds=self._max_rounds, + termination_condition=self._termination_condition, ) - if self._selection_func is not None: + if self._selection_func: return GroupChatOrchestrator( id=self.DEFAULT_ORCHESTRATOR_ID, participant_registry=ParticipantRegistry(participants), @@ -864,23 +934,44 @@ def _resolve_orchestrator(self, participants: Sequence[Executor]) -> Executor: termination_condition=self._termination_condition, ) - if self._agent_orchestrator is not None: - return AgentBasedGroupChatOrchestrator( - agent=self._agent_orchestrator, - participant_registry=ParticipantRegistry(participants), - max_rounds=self._max_rounds, - termination_condition=self._termination_condition, + if self._orchestrator: + return self._orchestrator + + if self._orchestrator_factory: + orchestrator_instance = self._orchestrator_factory() + if isinstance(orchestrator_instance, ChatAgent): + return AgentBasedGroupChatOrchestrator( + agent=orchestrator_instance, + participant_registry=ParticipantRegistry(participants), + max_rounds=self._max_rounds, + termination_condition=self._termination_condition, + ) + if isinstance(orchestrator_instance, BaseGroupChatOrchestrator): + return orchestrator_instance + raise TypeError( + f"Orchestrator factory must return ChatAgent or BaseGroupChatOrchestrator instance. " + f"Got {type(orchestrator_instance).__name__}." ) - raise RuntimeError( - "Orchestrator could not be resolved. Please provide one via with_orchestrator(), " - "with_agent_orchestrator(), or with_select_speaker_func()." - ) + # This should never be reached due to the checks above + raise RuntimeError("Orchestrator could not be resolved. Please provide one via with_orchestrator()") def _resolve_participants(self) -> list[Executor]: """Resolve participant instances into Executor objects.""" + if not self._participants and not self._participant_factories: + raise ValueError("No participants provided. Call .participants() or .register_participants() first.") + # We don't need to check if both are set since that is handled in the respective methods + + participants: list[Executor | AgentProtocol] = [] + if self._participant_factories: + for factory in self._participant_factories: + participant = factory() + participants.append(participant) + else: + participants = list(self._participants.values()) + executors: list[Executor] = [] - for participant in self._participants.values(): + for participant in participants: if isinstance(participant, Executor): executors.append(participant) elif isinstance(participant, AgentProtocol): @@ -908,9 +999,6 @@ def build(self) -> Workflow: Returns: Validated Workflow instance ready for execution """ - if not self._participants: - raise ValueError("participants must be configured before build()") - # Resolve orchestrator and participants to executors participants: list[Executor] = self._resolve_participants() orchestrator: Executor = self._resolve_orchestrator(participants) diff --git a/python/packages/core/agent_framework/_workflows/_handoff.py b/python/packages/core/agent_framework/_workflows/_handoff.py index 8d329b618d..f07bc47f9b 100644 --- a/python/packages/core/agent_framework/_workflows/_handoff.py +++ b/python/packages/core/agent_framework/_workflows/_handoff.py @@ -603,7 +603,7 @@ def __init__( self._participant_factories: dict[str, Callable[[], AgentProtocol]] = {} self._start_id: str | None = None if participant_factories: - self.participant_factories(participant_factories) + self.register_participants(participant_factories) if participants: self.participants(participants) @@ -623,7 +623,7 @@ def __init__( # Termination related members self._termination_condition: Callable[[list[ChatMessage]], bool | Awaitable[bool]] | None = None - def participant_factories( + def register_participants( self, participant_factories: Mapping[str, Callable[[], AgentProtocol]] ) -> "HandoffBuilder": """Register factories that produce agents for the handoff workflow. @@ -641,7 +641,7 @@ def participant_factories( Self for method chaining. Raises: - ValueError: If participant_factories is empty or `.participants(...)` or `.participant_factories(...)` + ValueError: If participant_factories is empty or `.participants(...)` or `.register_participants(...)` has already been called. Example: @@ -670,17 +670,14 @@ def create_billing_agent() -> ChatAgent: # Handoff will be created automatically unless specified otherwise # The default creates a mesh topology where all agents can handoff to all others - builder = HandoffBuilder().participant_factories(factories) + builder = HandoffBuilder().register_participants(factories) builder.with_start_agent("triage") """ if self._participants: - raise ValueError( - "Cannot mix .participants([...]) and .participant_factories() in the same builder instance." - ) + raise ValueError("Cannot mix .participants() and .register_participants() in the same builder instance.") if self._participant_factories: - raise ValueError("participant_factories() has already been called on this builder instance.") - + raise ValueError("register_participants() has already been called on this builder instance.") if not participant_factories: raise ValueError("participant_factories cannot be empty") @@ -698,8 +695,8 @@ def participants(self, participants: Sequence[AgentProtocol]) -> "HandoffBuilder Self for method chaining. Raises: - ValueError: If participants is empty, contains duplicates, or `.participants(...)` or - `.participant_factories(...)` has already been called. + ValueError: If participants is empty, contains duplicates, or `.participants()` or + `.register_participants()` has already been called. TypeError: If participants are not AgentProtocol instances. Example: @@ -718,9 +715,7 @@ def participants(self, participants: Sequence[AgentProtocol]) -> "HandoffBuilder builder.with_start_agent(triage) """ if self._participant_factories: - raise ValueError( - "Cannot mix .participants([...]) and .participant_factories() in the same builder instance." - ) + raise ValueError("Cannot mix .participants() and .register_participants() in the same builder instance.") if self._participants: raise ValueError("participants have already been assigned") @@ -900,7 +895,7 @@ def with_start_agent(self, agent: str | AgentProtocol) -> "HandoffBuilder": if agent not in self._participant_factories: raise ValueError(f"Start agent factory name '{agent}' is not in the participant_factories list") else: - raise ValueError("Call participant_factories(...) before with_start_agent(...)") + raise ValueError("Call register_participants(...) before with_start_agent(...)") self._start_id = agent elif isinstance(agent, AgentProtocol): resolved_id = self._resolve_to_id(agent) @@ -1043,15 +1038,6 @@ def build(self) -> Workflow: ValueError: If participants or coordinator were not configured, or if required configuration is invalid. """ - if not self._participants and not self._participant_factories: - raise ValueError( - "No participants or participant_factories have been configured. " - "Call participants(...) or participant_factories(...) first." - ) - - if self._start_id is None: - raise ValueError("Must call with_start_agent(...) before building the workflow.") - # Resolve agents (either from instances or factories) # The returned map keys are either executor IDs or factory names, which is need to resolve handoff configs resolved_agents = self._resolve_agents() @@ -1062,6 +1048,8 @@ def build(self) -> Workflow: executors = self._resolve_executors(resolved_agents, resolved_handoffs) # Build the workflow graph + if self._start_id is None: + raise ValueError("Must call with_start_agent(...) before building the workflow.") start_executor = executors[self._resolve_to_id(resolved_agents[self._start_id])] builder = WorkflowBuilder( name=self._name, @@ -1100,8 +1088,9 @@ def _resolve_agents(self) -> dict[str, AgentProtocol]: Returns: Map of executor IDs or factory names to `AgentProtocol` instances """ - if self._participants and self._participant_factories: - raise ValueError("Cannot have both executors and participant_factories configured") + if not self._participants and not self._participant_factories: + raise ValueError("No participants provided. Call .participants() or .register_participants() first.") + # We don't need to check if both are set since that is handled in the respective methods if self._participants: return self._participants diff --git a/python/packages/core/agent_framework/_workflows/_magentic.py b/python/packages/core/agent_framework/_workflows/_magentic.py index 052a59766f..2e0e101051 100644 --- a/python/packages/core/agent_framework/_workflows/_magentic.py +++ b/python/packages/core/agent_framework/_workflows/_magentic.py @@ -7,10 +7,10 @@ import re import sys from abc import ABC, abstractmethod -from collections.abc import Sequence +from collections.abc import Callable, Sequence from dataclasses import dataclass, field from enum import Enum -from typing import Any, ClassVar, TypeVar, cast +from typing import Any, ClassVar, TypeVar, cast, overload from typing_extensions import Never @@ -1376,11 +1376,47 @@ class MagenticBuilder: """ def __init__(self) -> None: + """Initialize the Magentic workflow builder.""" self._participants: dict[str, AgentProtocol | Executor] = {} + self._participant_factories: list[Callable[[], AgentProtocol | Executor]] = [] + + # Manager related members self._manager: MagenticManagerBase | None = None + self._manager_factory: Callable[[], MagenticManagerBase] | None = None + self._manager_agent_factory: Callable[[], AgentProtocol] | None = None + self._standard_manager_options: dict[str, Any] = {} self._enable_plan_review: bool = False + self._checkpoint_storage: CheckpointStorage | None = None + def register_participants( + self, + participant_factories: Sequence[Callable[[], AgentProtocol | Executor]], + ) -> "MagenticBuilder": + """Register participant factories for this Magentic workflow. + + Args: + participant_factories: Sequence of callables that return AgentProtocol or Executor instances. + + Returns: + Self for method chaining + + Raises: + ValueError: If participant_factories is empty, or participants + or participant factories are already set + """ + if self._participants: + raise ValueError("Cannot mix .participants() and .register_participants() in the same builder instance.") + + if self._participant_factories: + raise ValueError("register_participants() has already been called on this builder instance.") + + if not participant_factories: + raise ValueError("participant_factories cannot be empty") + + self._participant_factories = list(participant_factories) + return self + def participants(self, participants: Sequence[AgentProtocol | Executor]) -> Self: """Define participants for this Magentic workflow. @@ -1393,7 +1429,8 @@ def participants(self, participants: Sequence[AgentProtocol | Executor]) -> Self Self for method chaining Raises: - ValueError: If participants are empty, names are duplicated, or already set + ValueError: If participants are empty, names are duplicated, or participants + or participant factories are already set TypeError: If any participant is not AgentProtocol or Executor instance Example: @@ -1403,7 +1440,7 @@ def participants(self, participants: Sequence[AgentProtocol | Executor]) -> Self workflow = ( MagenticBuilder() .participants([research_agent, writing_agent, coding_agent, review_agent]) - .with_standard_manager(agent=manager_agent) + .with_manager(agent=manager_agent) .build() ) @@ -1412,6 +1449,9 @@ def participants(self, participants: Sequence[AgentProtocol | Executor]) -> Self - Agent descriptions (if available) are extracted and provided to the manager - Can be called multiple times to add participants incrementally """ + if self._participant_factories: + raise ValueError("Cannot mix .participants() and .register_participants() in the same builder instance.") + if self._participants: raise ValueError("participants have already been set. Call participants(...) at most once.") @@ -1468,7 +1508,7 @@ def with_plan_review(self, enable: bool = True) -> "MagenticBuilder": workflow = ( MagenticBuilder() .participants(agent1=agent1) - .with_standard_manager(agent=manager_agent) + .with_manager(agent=manager_agent) .with_plan_review(enable=True) .build() ) @@ -1515,7 +1555,7 @@ def with_checkpointing(self, checkpoint_storage: CheckpointStorage) -> "Magentic workflow = ( MagenticBuilder() .participants([agent1]) - .with_standard_manager(agent=manager_agent) + .with_manager(agent=manager_agent) .with_checkpointing(storage) .build() ) @@ -1537,10 +1577,142 @@ def with_checkpointing(self, checkpoint_storage: CheckpointStorage) -> "Magentic self._checkpoint_storage = checkpoint_storage return self - def with_standard_manager( + @overload + def with_manager(self, *, manager: MagenticManagerBase) -> Self: + """Configure the workflow with a pre-defined Magentic manager instance. + + Args: + manager: A custom manager instance (subclass of MagenticManagerBase) + + Returns: + Self for method chaining + """ + ... + + @overload + def with_manager(self, *, manager_factory: Callable[[], MagenticManagerBase]) -> Self: + """Configure the workflow with a factory for creating custom Magentic manager instances. + + Args: + manager_factory: Callable that returns a new MagenticManagerBase instance + + Returns: + Self for method chaining + """ + ... + + @overload + def with_manager( self, - manager: MagenticManagerBase | None = None, *, + agent: AgentProtocol, + task_ledger: _MagenticTaskLedger | None = None, + # Prompt overrides + task_ledger_facts_prompt: str | None = None, + task_ledger_plan_prompt: str | None = None, + task_ledger_full_prompt: str | None = None, + task_ledger_facts_update_prompt: str | None = None, + task_ledger_plan_update_prompt: str | None = None, + progress_ledger_prompt: str | None = None, + final_answer_prompt: str | None = None, + # Limits + max_stall_count: int = 3, + max_reset_count: int | None = None, + max_round_count: int | None = None, + ) -> Self: + """Configure the workflow with an agent for creating a standard manager. + + This will create a StandardMagenticManager using the provided agent. + + Args: + agent: AgentProtocol instance for the standard magentic manager + (`StandardMagenticManager`) + task_ledger: Optional custom task ledger implementation for specialized + prompting or structured output requirements + task_ledger_facts_prompt: Custom prompt template for extracting facts from + task description + task_ledger_plan_prompt: Custom prompt template for generating initial plan + task_ledger_full_prompt: Custom prompt template for complete task ledger + (facts + plan combined) + task_ledger_facts_update_prompt: Custom prompt template for updating facts + based on agent progress + task_ledger_plan_update_prompt: Custom prompt template for replanning when + needed + progress_ledger_prompt: Custom prompt template for assessing progress and + determining next actions + final_answer_prompt: Custom prompt template for synthesizing final response + when task is complete + max_stall_count: Maximum consecutive rounds without progress before triggering + replan (default 3). Set to 0 to disable stall detection. + max_reset_count: Maximum number of complete resets allowed before failing. + None means unlimited resets. + max_round_count: Maximum total coordination rounds before stopping with + partial result. None means unlimited rounds. + + Returns: + Self for method chaining + """ + ... + + @overload + def with_manager( + self, + *, + agent_factory: Callable[[], AgentProtocol], + task_ledger: _MagenticTaskLedger | None = None, + # Prompt overrides + task_ledger_facts_prompt: str | None = None, + task_ledger_plan_prompt: str | None = None, + task_ledger_full_prompt: str | None = None, + task_ledger_facts_update_prompt: str | None = None, + task_ledger_plan_update_prompt: str | None = None, + progress_ledger_prompt: str | None = None, + final_answer_prompt: str | None = None, + # Limits + max_stall_count: int = 3, + max_reset_count: int | None = None, + max_round_count: int | None = None, + ) -> Self: + """Configure the workflow with a factory for creating the manager agent. + + This will create a StandardMagenticManager using the provided agent factory. + + Args: + agent_factory: Callable that returns a new AgentProtocol instance for the standard + magentic manager (`StandardMagenticManager`) + task_ledger: Optional custom task ledger implementation for specialized + prompting or structured output requirements + task_ledger_facts_prompt: Custom prompt template for extracting facts from + task description + task_ledger_plan_prompt: Custom prompt template for generating initial plan + task_ledger_full_prompt: Custom prompt template for complete task ledger + (facts + plan combined) + task_ledger_facts_update_prompt: Custom prompt template for updating facts + based on agent progress + task_ledger_plan_update_prompt: Custom prompt template for replanning when + needed + progress_ledger_prompt: Custom prompt template for assessing progress and + determining next actions + final_answer_prompt: Custom prompt template for synthesizing final response + when task is complete + max_stall_count: Maximum consecutive rounds without progress before triggering + replan (default 3). Set to 0 to disable stall detection. + max_reset_count: Maximum number of complete resets allowed before failing. + None means unlimited resets. + max_round_count: Maximum total coordination rounds before stopping with + partial result. None means unlimited rounds. + + Returns: + Self for method chaining + """ + ... + + def with_manager( + self, + *, + manager: MagenticManagerBase | None = None, + manager_factory: Callable[[], MagenticManagerBase] | None = None, + agent_factory: Callable[[], AgentProtocol] | None = None, # Constructor args for StandardMagenticManager when manager is not provided agent: AgentProtocol | None = None, task_ledger: _MagenticTaskLedger | None = None, @@ -1560,17 +1732,21 @@ def with_standard_manager( """Configure the workflow manager for task planning and agent coordination. The manager is responsible for creating plans, selecting agents, tracking progress, - and deciding when to replan or complete. This method supports two usage patterns: + and deciding when to replan or complete. This method supports four usage patterns: 1. **Provide existing manager**: Pass a pre-configured manager instance (custom or standard) for full control over behavior - 2. **Auto-create with agent**: Pass an agent to automatically create a - StandardMagenticManager that uses the agent's configured instructions and - options (temperature, seed, etc.) + 2. **Factory for custom manager**: Pass a callable that returns a new manager + instance for more advanced scenarios so that the builder can be reused + 3. **Factory for agent**: Pass a callable that returns a new agent instance to + automatically create a `StandardMagenticManager` + 4. **Auto-create with agent**: Pass an agent to automatically create a `StandardMagenticManager` Args: - manager: Pre-configured manager instance (StandardMagenticManager or custom - MagenticManagerBase subclass). If provided, all other arguments are ignored. + manager: Pre-configured manager instance (`StandardMagenticManager` or custom + `MagenticManagerBase` subclass). If provided, all other arguments are ignored. + manager_factory: Callable that returns a new manager instance. + agent_factory: Callable that returns a new agent instance. agent: Agent instance for generating plans and decisions. The agent's configured instructions and options (temperature, seed, etc.) will be applied. @@ -1620,7 +1796,7 @@ def with_standard_manager( workflow = ( MagenticBuilder() .participants(agent1=agent1, agent2=agent2) - .with_standard_manager( + .with_manager( agent=manager_agent, max_round_count=20, max_stall_count=3, @@ -1639,7 +1815,7 @@ async def plan(self, context: MagenticContext) -> ChatMessage: manager = MyManager() - workflow = MagenticBuilder().participants(agent1=agent1).with_standard_manager(manager).build() + workflow = MagenticBuilder().participants(agent1=agent1).with_manager(manager).build() Usage with prompt customization: @@ -1648,7 +1824,7 @@ async def plan(self, context: MagenticContext) -> ChatMessage: workflow = ( MagenticBuilder() .participants(coder=coder_agent, reviewer=reviewer_agent) - .with_standard_manager( + .with_manager( agent=manager_agent, task_ledger_plan_prompt="Create a detailed step-by-step plan...", progress_ledger_prompt="Assess progress and decide next action...", @@ -1664,27 +1840,68 @@ async def plan(self, context: MagenticContext) -> ChatMessage: - Stall detection helps prevent infinite loops in stuck scenarios - The agent's instructions are used as system instructions for all manager prompts """ + if any([self._manager, self._manager_factory, self._manager_agent_factory]): + raise ValueError("with_manager() has already been called on this builder instance.") + + if sum(x is not None for x in [manager, agent, manager_factory, agent_factory]) != 1: + raise ValueError("Exactly one of manager, agent, manager_factory, or agent_factory must be provided.") + + def _log_warning_if_constructor_args_provided() -> None: + if any( + arg is not None + for arg in [ + task_ledger, + task_ledger_facts_prompt, + task_ledger_plan_prompt, + task_ledger_full_prompt, + task_ledger_facts_update_prompt, + task_ledger_plan_update_prompt, + progress_ledger_prompt, + final_answer_prompt, + max_stall_count, + max_reset_count, + max_round_count, + ] + ): + logger.warning("Customer manager provided; all other with_manager() arguments will be ignored.") + if manager is not None: self._manager = manager - return self - - if agent is None: - raise ValueError("agent is required when manager is not provided: with_standard_manager(agent=...)") - - self._manager = StandardMagenticManager( - agent=agent, - task_ledger=task_ledger, - task_ledger_facts_prompt=task_ledger_facts_prompt, - task_ledger_plan_prompt=task_ledger_plan_prompt, - task_ledger_full_prompt=task_ledger_full_prompt, - task_ledger_facts_update_prompt=task_ledger_facts_update_prompt, - task_ledger_plan_update_prompt=task_ledger_plan_update_prompt, - progress_ledger_prompt=progress_ledger_prompt, - final_answer_prompt=final_answer_prompt, - max_stall_count=max_stall_count, - max_reset_count=max_reset_count, - max_round_count=max_round_count, - ) + _log_warning_if_constructor_args_provided() + elif agent is not None: + self._manager = StandardMagenticManager( + agent=agent, + task_ledger=task_ledger, + task_ledger_facts_prompt=task_ledger_facts_prompt, + task_ledger_plan_prompt=task_ledger_plan_prompt, + task_ledger_full_prompt=task_ledger_full_prompt, + task_ledger_facts_update_prompt=task_ledger_facts_update_prompt, + task_ledger_plan_update_prompt=task_ledger_plan_update_prompt, + progress_ledger_prompt=progress_ledger_prompt, + final_answer_prompt=final_answer_prompt, + max_stall_count=max_stall_count, + max_reset_count=max_reset_count, + max_round_count=max_round_count, + ) + elif manager_factory is not None: + self._manager_factory = manager_factory + _log_warning_if_constructor_args_provided() + elif agent_factory is not None: + self._manager_agent_factory = agent_factory + self._standard_manager_options = { + "task_ledger": task_ledger, + "task_ledger_facts_prompt": task_ledger_facts_prompt, + "task_ledger_plan_prompt": task_ledger_plan_prompt, + "task_ledger_full_prompt": task_ledger_full_prompt, + "task_ledger_facts_update_prompt": task_ledger_facts_update_prompt, + "task_ledger_plan_update_prompt": task_ledger_plan_update_prompt, + "progress_ledger_prompt": progress_ledger_prompt, + "final_answer_prompt": final_answer_prompt, + "max_stall_count": max_stall_count, + "max_reset_count": max_reset_count, + "max_round_count": max_round_count, + } + return self def _resolve_orchestrator(self, participants: Sequence[Executor]) -> Executor: @@ -1693,19 +1910,46 @@ def _resolve_orchestrator(self, participants: Sequence[Executor]) -> Executor: Args: participants: List of resolved participant executors """ - if self._manager is None: - raise ValueError("No manager configured. Call with_standard_manager(...) before building the orchestrator.") + if all(x is None for x in [self._manager, self._manager_factory, self._manager_agent_factory]): + raise ValueError("No manager configured. Call with_manager(...) before building the orchestrator.") + # We don't need to check if multiple are set since that is handled in with_orchestrator() + + if self._manager: + manager = self._manager + elif self._manager_factory: + manager = self._manager_factory() + elif self._manager_agent_factory: + agent_instance = self._manager_agent_factory() + manager = StandardMagenticManager( + agent=agent_instance, + **self._standard_manager_options, + ) + else: + # This should never be reached due to the checks above + raise RuntimeError("Manager could not be resolved. Please set the manager properly with with_manager().") return MagenticOrchestrator( - manager=self._manager, + manager=manager, participant_registry=ParticipantRegistry(participants), require_plan_signoff=self._enable_plan_review, ) def _resolve_participants(self) -> list[Executor]: """Resolve participant instances into Executor objects.""" + if not self._participants and not self._participant_factories: + raise ValueError("No participants provided. Call .participants() or .register_participants() first.") + # We don't need to check if both are set since that is handled in the respective methods + + participants: list[Executor | AgentProtocol] = [] + if self._participant_factories: + for factory in self._participant_factories: + participant = factory() + participants.append(participant) + else: + participants = list(self._participants.values()) + executors: list[Executor] = [] - for participant in self._participants.values(): + for participant in participants: if isinstance(participant, Executor): executors.append(participant) elif isinstance(participant, AgentProtocol): @@ -1719,12 +1963,6 @@ def _resolve_participants(self) -> list[Executor]: def build(self) -> Workflow: """Build a Magentic workflow with the orchestrator and all agent executors.""" - if not self._participants: - raise ValueError("No participants added to Magentic workflow") - - if self._manager is None: - raise ValueError("No manager configured. Call with_standard_manager(...) before build().") - logger.info(f"Building Magentic workflow with {len(self._participants)} participants") participants: list[Executor] = self._resolve_participants() diff --git a/python/packages/core/agent_framework/_workflows/_sequential.py b/python/packages/core/agent_framework/_workflows/_sequential.py index 11c123d153..663e85c9dd 100644 --- a/python/packages/core/agent_framework/_workflows/_sequential.py +++ b/python/packages/core/agent_framework/_workflows/_sequential.py @@ -160,9 +160,7 @@ def register_participants( ) -> "SequentialBuilder": """Register participant factories for this sequential workflow.""" if self._participants: - raise ValueError( - "Cannot mix .participants([...]) and .register_participants() in the same builder instance." - ) + raise ValueError("Cannot mix .participants() and .register_participants() in the same builder instance.") if self._participant_factories: raise ValueError("register_participants() has already been called on this builder instance.") @@ -180,9 +178,7 @@ def participants(self, participants: Sequence[AgentProtocol | Executor]) -> "Seq Raises if empty or duplicates are provided for clarity. """ if self._participant_factories: - raise ValueError( - "Cannot mix .participants([...]) and .register_participants() in the same builder instance." - ) + raise ValueError("Cannot mix .participants() and .register_participants() in the same builder instance.") if self._participants: raise ValueError("participants() has already been called on this builder instance.") @@ -248,6 +244,10 @@ def with_request_info( def _resolve_participants(self) -> list[Executor]: """Resolve participant instances into Executor objects.""" + if not self._participants and not self._participant_factories: + raise ValueError("No participants provided. Call .participants() or .register_participants() first.") + # We don't need to check if both are set since that is handled in the respective methods + participants: list[Executor | AgentProtocol] = [] if self._participant_factories: # Resolve the participant factories now. This doesn't break the factory pattern @@ -287,18 +287,6 @@ def build(self) -> Workflow: - Else (custom Executor): pass conversation directly to the executor - _EndWithConversation yields the final conversation and the workflow becomes idle """ - if not self._participants and not self._participant_factories: - raise ValueError( - "No participants or participant factories provided to the builder. " - "Use .participants([...]) or .register_participants([...])." - ) - - if self._participants and self._participant_factories: - # Defensive strategy: this should never happen due to checks in respective methods - raise ValueError( - "Cannot mix .participants([...]) and .register_participants() in the same builder instance." - ) - # Internal nodes input_conv = _InputToConversation(id="input-conversation") end = _EndWithConversation(id="end") diff --git a/python/packages/core/tests/workflow/test_group_chat.py b/python/packages/core/tests/workflow/test_group_chat.py index c65f19d599..298b9349d5 100644 --- a/python/packages/core/tests/workflow/test_group_chat.py +++ b/python/packages/core/tests/workflow/test_group_chat.py @@ -1,6 +1,6 @@ # Copyright (c) Microsoft. All rights reserved. -from collections.abc import AsyncIterable, Callable +from collections.abc import AsyncIterable, Callable, Sequence from typing import Any, cast import pytest @@ -40,7 +40,7 @@ def __init__(self, agent_name: str, reply_text: str, **kwargs: Any) -> None: async def run( # type: ignore[override] self, - messages: str | ChatMessage | list[str] | list[ChatMessage] | None = None, + messages: str | ChatMessage | Sequence[str | ChatMessage] | None = None, *, thread: AgentThread | None = None, **kwargs: Any, @@ -50,7 +50,7 @@ async def run( # type: ignore[override] def run_stream( # type: ignore[override] self, - messages: str | ChatMessage | list[str] | list[ChatMessage] | None = None, + messages: str | ChatMessage | Sequence[str | ChatMessage] | None = None, *, thread: AgentThread | None = None, **kwargs: Any, @@ -66,9 +66,7 @@ async def _stream() -> AsyncIterable[AgentResponseUpdate]: class MockChatClient: """Mock chat client that raises NotImplementedError for all methods.""" - @property - def additional_properties(self) -> dict[str, Any]: - return {} + additional_properties: dict[str, Any] async def get_response(self, messages: Any, **kwargs: Any) -> ChatResponse: raise NotImplementedError @@ -84,7 +82,7 @@ def __init__(self) -> None: async def run( self, - messages: str | ChatMessage | list[str] | list[ChatMessage] | None = None, + messages: str | ChatMessage | Sequence[str | ChatMessage] | None = None, *, thread: AgentThread | None = None, **kwargs: Any, @@ -130,7 +128,7 @@ async def run( def run_stream( self, - messages: str | ChatMessage | list[str] | list[ChatMessage] | None = None, + messages: str | ChatMessage | Sequence[str | ChatMessage] | None = None, *, thread: AgentThread | None = None, **kwargs: Any, @@ -230,7 +228,7 @@ async def test_group_chat_builder_basic_flow() -> None: workflow = ( GroupChatBuilder() - .with_select_speaker_func(selector, orchestrator_name="manager") + .with_orchestrator(selection_func=selector, orchestrator_name="manager") .participants([alpha, beta]) .with_max_rounds(2) # Limit rounds to prevent infinite loop .build() @@ -257,7 +255,7 @@ async def test_group_chat_as_agent_accepts_conversation() -> None: workflow = ( GroupChatBuilder() - .with_select_speaker_func(selector, orchestrator_name="manager") + .with_orchestrator(selection_func=selector, orchestrator_name="manager") .participants([alpha, beta]) .with_max_rounds(2) # Limit rounds to prevent infinite loop .build() @@ -285,7 +283,9 @@ def test_build_without_manager_raises_error(self) -> None: builder = GroupChatBuilder().participants([agent]) - with pytest.raises(RuntimeError, match="Orchestrator could not be resolved"): + with pytest.raises( + ValueError, match=r"No orchestrator has been configured\. Call with_orchestrator\(\) to set one\." + ): builder.build() def test_build_without_participants_raises_error(self) -> None: @@ -294,9 +294,12 @@ def test_build_without_participants_raises_error(self) -> None: def selector(state: GroupChatState) -> str: return "agent" - builder = GroupChatBuilder().with_select_speaker_func(selector) + builder = GroupChatBuilder().with_orchestrator(selection_func=selector) - with pytest.raises(ValueError, match="participants must be configured before build"): + with pytest.raises( + ValueError, + match=r"No participants provided\. Call \.participants\(\) or \.register_participants\(\) first\.", + ): builder.build() def test_duplicate_manager_configuration_raises_error(self) -> None: @@ -305,10 +308,13 @@ def test_duplicate_manager_configuration_raises_error(self) -> None: def selector(state: GroupChatState) -> str: return "agent" - builder = GroupChatBuilder().with_select_speaker_func(selector) + builder = GroupChatBuilder().with_orchestrator(selection_func=selector) - with pytest.raises(ValueError, match="select_speakers_func has already been configured"): - builder.with_select_speaker_func(selector) + with pytest.raises( + ValueError, + match=r"A selection function has already been configured\. Call with_orchestrator\(\.\.\.\) once only\.", + ): + builder.with_orchestrator(selection_func=selector) def test_empty_participants_raises_error(self) -> None: """Test that empty participants list raises ValueError.""" @@ -316,7 +322,7 @@ def test_empty_participants_raises_error(self) -> None: def selector(state: GroupChatState) -> str: return "agent" - builder = GroupChatBuilder().with_select_speaker_func(selector) + builder = GroupChatBuilder().with_orchestrator(selection_func=selector) with pytest.raises(ValueError, match="participants cannot be empty"): builder.participants([]) @@ -329,7 +335,7 @@ def test_duplicate_participant_names_raises_error(self) -> None: def selector(state: GroupChatState) -> str: return "agent" - builder = GroupChatBuilder().with_select_speaker_func(selector) + builder = GroupChatBuilder().with_orchestrator(selection_func=selector) with pytest.raises(ValueError, match="Duplicate participant name 'test'"): builder.participants([agent1, agent2]) @@ -357,7 +363,7 @@ async def _stream() -> AsyncIterable[AgentResponseUpdate]: def selector(state: GroupChatState) -> str: return "agent" - builder = GroupChatBuilder().with_select_speaker_func(selector) + builder = GroupChatBuilder().with_orchestrator(selection_func=selector) with pytest.raises(ValueError, match="AgentProtocol participants must have a non-empty name"): builder.participants([agent]) @@ -369,7 +375,7 @@ def test_empty_participant_name_raises_error(self) -> None: def selector(state: GroupChatState) -> str: return "agent" - builder = GroupChatBuilder().with_select_speaker_func(selector) + builder = GroupChatBuilder().with_orchestrator(selection_func=selector) with pytest.raises(ValueError, match="AgentProtocol participants must have a non-empty name"): builder.participants([agent]) @@ -391,7 +397,7 @@ def selector(state: GroupChatState) -> str: workflow = ( GroupChatBuilder() - .with_select_speaker_func(selector) + .with_orchestrator(selection_func=selector) .participants([agent]) .with_max_rounds(2) # Limit to 2 rounds .build() @@ -426,7 +432,7 @@ def termination_condition(conversation: list[ChatMessage]) -> bool: workflow = ( GroupChatBuilder() - .with_select_speaker_func(selector) + .with_orchestrator(selection_func=selector) .participants([agent]) .with_termination_condition(termination_condition) .build() @@ -454,7 +460,7 @@ async def test_termination_condition_agent_manager_finalizes(self) -> None: workflow = ( GroupChatBuilder() - .with_agent_orchestrator(manager) + .with_orchestrator(agent=manager) .participants([worker]) .with_termination_condition(lambda conv: any(msg.author_name == "agent" for msg in conv)) .build() @@ -480,7 +486,7 @@ def selector(state: GroupChatState) -> str: agent = StubAgent("agent", "response") - workflow = GroupChatBuilder().with_select_speaker_func(selector).participants([agent]).build() + workflow = GroupChatBuilder().with_orchestrator(selection_func=selector).participants([agent]).build() with pytest.raises(RuntimeError, match="Selection function returned unknown participant 'unknown_agent'"): async for _ in workflow.run_stream("test task"): @@ -501,7 +507,7 @@ def selector(state: GroupChatState) -> str: workflow = ( GroupChatBuilder() - .with_select_speaker_func(selector) + .with_orchestrator(selection_func=selector) .participants([agent]) .with_max_rounds(1) .with_checkpointing(storage) @@ -530,7 +536,11 @@ def selector(state: GroupChatState) -> str: agent = StubAgent("agent", "response") workflow = ( - GroupChatBuilder().with_select_speaker_func(selector).participants([agent]).with_max_rounds(1).build() + GroupChatBuilder() + .with_orchestrator(selection_func=selector) + .participants([agent]) + .with_max_rounds(1) + .build() ) with pytest.raises(ValueError, match="At least one ChatMessage is required to start the group chat workflow."): @@ -550,7 +560,11 @@ def selector(state: GroupChatState) -> str: agent = StubAgent("agent", "response") workflow = ( - GroupChatBuilder().with_select_speaker_func(selector).participants([agent]).with_max_rounds(1).build() + GroupChatBuilder() + .with_orchestrator(selection_func=selector) + .participants([agent]) + .with_max_rounds(1) + .build() ) outputs: list[list[ChatMessage]] = [] @@ -575,7 +589,11 @@ def selector(state: GroupChatState) -> str: agent = StubAgent("agent", "response") workflow = ( - GroupChatBuilder().with_select_speaker_func(selector).participants([agent]).with_max_rounds(1).build() + GroupChatBuilder() + .with_orchestrator(selection_func=selector) + .participants([agent]) + .with_max_rounds(1) + .build() ) outputs: list[list[ChatMessage]] = [] @@ -603,7 +621,11 @@ def selector(state: GroupChatState) -> str: agent = StubAgent("agent", "response") workflow = ( - GroupChatBuilder().with_select_speaker_func(selector).participants([agent]).with_max_rounds(1).build() + GroupChatBuilder() + .with_orchestrator(selection_func=selector) + .participants([agent]) + .with_max_rounds(1) + .build() ) outputs: list[list[ChatMessage]] = [] @@ -632,7 +654,7 @@ def selector(state: GroupChatState) -> str: workflow = ( GroupChatBuilder() - .with_select_speaker_func(selector) + .with_orchestrator(selection_func=selector) .participants([agent]) .with_max_rounds(1) # Very low limit .build() @@ -667,7 +689,7 @@ def selector(state: GroupChatState) -> str: workflow = ( GroupChatBuilder() - .with_select_speaker_func(selector) + .with_orchestrator(selection_func=selector) .participants([agent]) .with_max_rounds(1) # Hit limit after first response .build() @@ -700,7 +722,7 @@ async def test_group_chat_checkpoint_runtime_only() -> None: wf = ( GroupChatBuilder() .participants([agent_a, agent_b]) - .with_select_speaker_func(selector) + .with_orchestrator(selection_func=selector) .with_max_rounds(2) .build() ) @@ -738,7 +760,7 @@ async def test_group_chat_checkpoint_runtime_overrides_buildtime() -> None: wf = ( GroupChatBuilder() .participants([agent_a, agent_b]) - .with_select_speaker_func(selector) + .with_orchestrator(selection_func=selector) .with_max_rounds(2) .with_checkpointing(buildtime_storage) .build() @@ -783,7 +805,7 @@ async def selector(state: GroupChatState) -> str: workflow = ( GroupChatBuilder() - .with_select_speaker_func(selector, orchestrator_name="manager") + .with_orchestrator(selection_func=selector, orchestrator_name="manager") .participants([alpha, beta]) .with_max_rounds(2) .with_request_info(agents=["beta"]) # Only pause before beta runs @@ -835,7 +857,7 @@ async def selector(state: GroupChatState) -> str: workflow = ( GroupChatBuilder() - .with_select_speaker_func(selector, orchestrator_name="manager") + .with_orchestrator(selection_func=selector, orchestrator_name="manager") .participants([alpha]) .with_max_rounds(1) .with_request_info() # No filter - pause for all @@ -864,3 +886,455 @@ def test_group_chat_builder_with_request_info_returns_self(): builder2 = GroupChatBuilder() result2 = builder2.with_request_info(agents=["test"]) assert result2 is builder2 + + +# region Participant Factory Tests + + +def test_group_chat_builder_rejects_empty_participant_factories(): + """Test that GroupChatBuilder rejects empty participant_factories list.""" + + def selector(state: GroupChatState) -> str: + return list(state.participants.keys())[0] + + with pytest.raises(ValueError, match=r"participant_factories cannot be empty"): + GroupChatBuilder().register_participants([]) + + with pytest.raises( + ValueError, + match=r"No participants provided\. Call \.participants\(\) or \.register_participants\(\) first\.", + ): + GroupChatBuilder().with_orchestrator(selection_func=selector).build() + + +def test_group_chat_builder_rejects_mixing_participants_and_factories(): + """Test that mixing .participants() and .register_participants() raises an error.""" + alpha = StubAgent("alpha", "reply from alpha") + + # Case 1: participants first, then register_participants + with pytest.raises(ValueError, match="Cannot mix .participants"): + GroupChatBuilder().participants([alpha]).register_participants([lambda: StubAgent("beta", "reply from beta")]) + + # Case 2: register_participants first, then participants + with pytest.raises(ValueError, match="Cannot mix .participants"): + GroupChatBuilder().register_participants([lambda: alpha]).participants([StubAgent("beta", "reply from beta")]) + + +def test_group_chat_builder_rejects_multiple_calls_to_register_participants(): + """Test that multiple calls to .register_participants() raises an error.""" + with pytest.raises( + ValueError, match=r"register_participants\(\) has already been called on this builder instance." + ): + ( + GroupChatBuilder() + .register_participants([lambda: StubAgent("alpha", "reply from alpha")]) + .register_participants([lambda: StubAgent("beta", "reply from beta")]) + ) + + +def test_group_chat_builder_rejects_multiple_calls_to_participants(): + """Test that multiple calls to .participants() raises an error.""" + with pytest.raises(ValueError, match="participants have already been set"): + ( + GroupChatBuilder() + .participants([StubAgent("alpha", "reply from alpha")]) + .participants([StubAgent("beta", "reply from beta")]) + ) + + +async def test_group_chat_with_participant_factories(): + """Test workflow creation using participant_factories.""" + call_count = 0 + + def create_alpha() -> StubAgent: + nonlocal call_count + call_count += 1 + return StubAgent("alpha", "reply from alpha") + + def create_beta() -> StubAgent: + nonlocal call_count + call_count += 1 + return StubAgent("beta", "reply from beta") + + selector = make_sequence_selector() + + workflow = ( + GroupChatBuilder() + .register_participants([create_alpha, create_beta]) + .with_orchestrator(selection_func=selector) + .with_max_rounds(2) + .build() + ) + + # Factories should be called during build + assert call_count == 2 + + outputs: list[WorkflowOutputEvent] = [] + async for event in workflow.run_stream("coordinate task"): + if isinstance(event, WorkflowOutputEvent): + outputs.append(event) + + assert len(outputs) == 1 + + +async def test_group_chat_participant_factories_reusable_builder(): + """Test that the builder can be reused to build multiple workflows with factories.""" + call_count = 0 + + def create_alpha() -> StubAgent: + nonlocal call_count + call_count += 1 + return StubAgent("alpha", "reply from alpha") + + def create_beta() -> StubAgent: + nonlocal call_count + call_count += 1 + return StubAgent("beta", "reply from beta") + + selector = make_sequence_selector() + + builder = ( + GroupChatBuilder() + .register_participants([create_alpha, create_beta]) + .with_orchestrator(selection_func=selector) + .with_max_rounds(2) + ) + + # Build first workflow + wf1 = builder.build() + assert call_count == 2 + + # Build second workflow + wf2 = builder.build() + assert call_count == 4 + + # Verify that the two workflows have different agent instances + assert wf1.executors["alpha"] is not wf2.executors["alpha"] + assert wf1.executors["beta"] is not wf2.executors["beta"] + + +async def test_group_chat_participant_factories_with_checkpointing(): + """Test checkpointing with participant_factories.""" + storage = InMemoryCheckpointStorage() + + def create_alpha() -> StubAgent: + return StubAgent("alpha", "reply from alpha") + + def create_beta() -> StubAgent: + return StubAgent("beta", "reply from beta") + + selector = make_sequence_selector() + + workflow = ( + GroupChatBuilder() + .register_participants([create_alpha, create_beta]) + .with_orchestrator(selection_func=selector) + .with_checkpointing(storage) + .with_max_rounds(2) + .build() + ) + + outputs: list[WorkflowOutputEvent] = [] + async for event in workflow.run_stream("checkpoint test"): + if isinstance(event, WorkflowOutputEvent): + outputs.append(event) + + assert outputs, "Should have workflow output" + + checkpoints = await storage.list_checkpoints() + assert checkpoints, "Checkpoints should be created during workflow execution" + + +# endregion + +# region Orchestrator Factory Tests + + +def test_group_chat_builder_rejects_multiple_orchestrator_configurations(): + """Test that configuring multiple orchestrators raises ValueError.""" + + def selector(state: GroupChatState) -> str: + return list(state.participants.keys())[0] + + def orchestrator_factory() -> ChatAgent: + return cast(ChatAgent, StubManagerAgent()) + + builder = GroupChatBuilder().with_orchestrator(selection_func=selector) + + # Already has a selection_func, should fail on second call + with pytest.raises(ValueError, match=r"A selection function has already been configured"): + builder.with_orchestrator(selection_func=selector) + + # Test with orchestrator_factory + builder2 = GroupChatBuilder().with_orchestrator(orchestrator_factory=orchestrator_factory) + with pytest.raises(ValueError, match=r"An orchestrator factory has already been configured"): + builder2.with_orchestrator(orchestrator_factory=orchestrator_factory) + + +def test_group_chat_builder_requires_exactly_one_orchestrator_option(): + """Test that exactly one orchestrator option must be provided.""" + + def selector(state: GroupChatState) -> str: + return list(state.participants.keys())[0] + + def orchestrator_factory() -> ChatAgent: + return cast(ChatAgent, StubManagerAgent()) + + # No options provided + with pytest.raises(ValueError, match="Exactly one of"): + GroupChatBuilder().with_orchestrator() # type: ignore + + # Multiple options provided + with pytest.raises(ValueError, match="Exactly one of"): + GroupChatBuilder().with_orchestrator(selection_func=selector, orchestrator_factory=orchestrator_factory) # type: ignore + + +async def test_group_chat_with_orchestrator_factory_returning_chat_agent(): + """Test workflow creation using orchestrator_factory that returns ChatAgent.""" + factory_call_count = 0 + + class DynamicManagerAgent(ChatAgent): + """Manager agent that dynamically selects from available participants.""" + + def __init__(self) -> None: + super().__init__(chat_client=MockChatClient(), name="dynamic_manager", description="Dynamic manager") + self._call_count = 0 + + async def run( + self, + messages: str | ChatMessage | Sequence[str | ChatMessage] | None = None, + *, + thread: AgentThread | None = None, + **kwargs: Any, + ) -> AgentResponse: + if self._call_count == 0: + self._call_count += 1 + payload = { + "terminate": False, + "reason": "Selecting alpha", + "next_speaker": "alpha", + "final_message": None, + } + return AgentResponse( + messages=[ + ChatMessage( + role=Role.ASSISTANT, + text=( + '{"terminate": false, "reason": "Selecting alpha", ' + '"next_speaker": "alpha", "final_message": null}' + ), + author_name=self.name, + ) + ], + value=payload, + ) + + payload = { + "terminate": True, + "reason": "Task complete", + "next_speaker": None, + "final_message": "dynamic manager final", + } + return AgentResponse( + messages=[ + ChatMessage( + role=Role.ASSISTANT, + text=( + '{"terminate": true, "reason": "Task complete", ' + '"next_speaker": null, "final_message": "dynamic manager final"}' + ), + author_name=self.name, + ) + ], + value=payload, + ) + + def orchestrator_factory() -> ChatAgent: + nonlocal factory_call_count + factory_call_count += 1 + return cast(ChatAgent, DynamicManagerAgent()) + + alpha = StubAgent("alpha", "reply from alpha") + beta = StubAgent("beta", "reply from beta") + + workflow = ( + GroupChatBuilder() + .participants([alpha, beta]) + .with_orchestrator(orchestrator_factory=orchestrator_factory) + .build() + ) + + # Factory should be called during build + assert factory_call_count == 1 + + outputs: list[WorkflowOutputEvent] = [] + async for event in workflow.run_stream("coordinate task"): + if isinstance(event, WorkflowOutputEvent): + outputs.append(event) + + assert len(outputs) == 1 + # The DynamicManagerAgent terminates after second call with final_message + final_messages = outputs[0].data + assert isinstance(final_messages, list) + assert any( + msg.text == "dynamic manager final" + for msg in cast(list[ChatMessage], final_messages) + if msg.author_name == "dynamic_manager" + ) + + +def test_group_chat_with_orchestrator_factory_returning_base_orchestrator(): + """Test that orchestrator_factory returning BaseGroupChatOrchestrator is used as-is.""" + factory_call_count = 0 + selector = make_sequence_selector() + + def orchestrator_factory() -> BaseGroupChatOrchestrator: + nonlocal factory_call_count + factory_call_count += 1 + from agent_framework._workflows._base_group_chat_orchestrator import ParticipantRegistry + from agent_framework._workflows._group_chat import GroupChatOrchestrator + + # Create a custom orchestrator; when returning BaseGroupChatOrchestrator, + # the builder uses it as-is without modifying its participant registry + return GroupChatOrchestrator( + id="custom_orchestrator", + participant_registry=ParticipantRegistry([]), + selection_func=selector, + max_rounds=2, + ) + + alpha = StubAgent("alpha", "reply from alpha") + + workflow = ( + GroupChatBuilder().participants([alpha]).with_orchestrator(orchestrator_factory=orchestrator_factory).build() + ) + + # Factory should be called during build + assert factory_call_count == 1 + # Verify the custom orchestrator is in the workflow + assert "custom_orchestrator" in workflow.executors + + +async def test_group_chat_orchestrator_factory_reusable_builder(): + """Test that the builder can be reused to build multiple workflows with orchestrator factory.""" + factory_call_count = 0 + + def orchestrator_factory() -> ChatAgent: + nonlocal factory_call_count + factory_call_count += 1 + return cast(ChatAgent, StubManagerAgent()) + + alpha = StubAgent("alpha", "reply from alpha") + beta = StubAgent("beta", "reply from beta") + + builder = ( + GroupChatBuilder().participants([alpha, beta]).with_orchestrator(orchestrator_factory=orchestrator_factory) + ) + + # Build first workflow + wf1 = builder.build() + assert factory_call_count == 1 + + # Build second workflow + wf2 = builder.build() + assert factory_call_count == 2 + + # Verify that the two workflows have different orchestrator instances + assert wf1.executors["manager_agent"] is not wf2.executors["manager_agent"] + + +def test_group_chat_orchestrator_factory_invalid_return_type(): + """Test that orchestrator_factory raising error for invalid return type.""" + + def invalid_factory() -> Any: + return "invalid type" + + alpha = StubAgent("alpha", "reply from alpha") + + with pytest.raises( + TypeError, + match=r"Orchestrator factory must return ChatAgent or BaseGroupChatOrchestrator instance", + ): + (GroupChatBuilder().participants([alpha]).with_orchestrator(orchestrator_factory=invalid_factory).build()) + + +def test_group_chat_with_both_participant_and_orchestrator_factories(): + """Test workflow creation using both participant_factories and orchestrator_factory.""" + participant_factory_call_count = 0 + orchestrator_factory_call_count = 0 + + def create_alpha() -> StubAgent: + nonlocal participant_factory_call_count + participant_factory_call_count += 1 + return StubAgent("alpha", "reply from alpha") + + def create_beta() -> StubAgent: + nonlocal participant_factory_call_count + participant_factory_call_count += 1 + return StubAgent("beta", "reply from beta") + + def orchestrator_factory() -> ChatAgent: + nonlocal orchestrator_factory_call_count + orchestrator_factory_call_count += 1 + return cast(ChatAgent, StubManagerAgent()) + + workflow = ( + GroupChatBuilder() + .register_participants([create_alpha, create_beta]) + .with_orchestrator(orchestrator_factory=orchestrator_factory) + .build() + ) + + # All factories should be called during build + assert participant_factory_call_count == 2 + assert orchestrator_factory_call_count == 1 + + # Verify all executors are present in the workflow + assert "alpha" in workflow.executors + assert "beta" in workflow.executors + assert "manager_agent" in workflow.executors + + +async def test_group_chat_factories_reusable_for_multiple_workflows(): + """Test that both factories are reused correctly for multiple workflow builds.""" + participant_factory_call_count = 0 + orchestrator_factory_call_count = 0 + + def create_alpha() -> StubAgent: + nonlocal participant_factory_call_count + participant_factory_call_count += 1 + return StubAgent("alpha", "reply from alpha") + + def create_beta() -> StubAgent: + nonlocal participant_factory_call_count + participant_factory_call_count += 1 + return StubAgent("beta", "reply from beta") + + def orchestrator_factory() -> ChatAgent: + nonlocal orchestrator_factory_call_count + orchestrator_factory_call_count += 1 + return cast(ChatAgent, StubManagerAgent()) + + builder = ( + GroupChatBuilder() + .register_participants([create_alpha, create_beta]) + .with_orchestrator(orchestrator_factory=orchestrator_factory) + ) + + # Build first workflow + wf1 = builder.build() + assert participant_factory_call_count == 2 + assert orchestrator_factory_call_count == 1 + + # Build second workflow + wf2 = builder.build() + assert participant_factory_call_count == 4 + assert orchestrator_factory_call_count == 2 + + # Verify that the workflows have different agent and orchestrator instances + assert wf1.executors["alpha"] is not wf2.executors["alpha"] + assert wf1.executors["beta"] is not wf2.executors["beta"] + assert wf1.executors["manager_agent"] is not wf2.executors["manager_agent"] + + +# endregion diff --git a/python/packages/core/tests/workflow/test_handoff.py b/python/packages/core/tests/workflow/test_handoff.py index 268f89d513..af9cdeeb04 100644 --- a/python/packages/core/tests/workflow/test_handoff.py +++ b/python/packages/core/tests/workflow/test_handoff.py @@ -208,7 +208,9 @@ def test_build_fails_without_start_agent(): def test_build_fails_without_participants(): """Verify that build() raises ValueError when no participants are provided.""" - with pytest.raises(ValueError, match="No participants or participant_factories have been configured."): + with pytest.raises( + ValueError, match=r"No participants provided\. Call \.participants\(\) or \.register_participants\(\) first." + ): HandoffBuilder().build() @@ -272,7 +274,7 @@ async def mock_get_response(messages: Any, options: dict[str, Any] | None = None agent = ChatAgent( chat_client=mock_client, name="test_agent", - default_options={"tool_choice": {"mode": "required"}}, + default_options={"tool_choice": {"mode": "required"}}, # type: ignore ) # Run the agent @@ -292,9 +294,11 @@ def test_handoff_builder_rejects_empty_participant_factories(): """Test that HandoffBuilder rejects empty participant_factories dictionary.""" # Empty factories are rejected immediately when calling participant_factories() with pytest.raises(ValueError, match=r"participant_factories cannot be empty"): - HandoffBuilder().participant_factories({}) + HandoffBuilder().register_participants({}) - with pytest.raises(ValueError, match=r"No participants or participant_factories have been configured"): + with pytest.raises( + ValueError, match=r"No participants provided\. Call \.participants\(\) or \.register_participants\(\) first\." + ): HandoffBuilder(participant_factories={}).build() @@ -311,7 +315,7 @@ def test_handoff_builder_rejects_mixing_participants_and_participant_factories_m # Case 1: participants first, then participant_factories with pytest.raises(ValueError, match="Cannot mix .participants"): - HandoffBuilder(participants=[triage]).participant_factories({ + HandoffBuilder(participants=[triage]).register_participants({ "specialist": lambda: MockHandoffAgent(name="specialist") }) @@ -323,13 +327,13 @@ def test_handoff_builder_rejects_mixing_participants_and_participant_factories_m # Case 3: participants(), then participant_factories() with pytest.raises(ValueError, match="Cannot mix .participants"): - HandoffBuilder().participants([triage]).participant_factories({ + HandoffBuilder().participants([triage]).register_participants({ "specialist": lambda: MockHandoffAgent(name="specialist") }) # Case 4: participant_factories(), then participants() with pytest.raises(ValueError, match="Cannot mix .participants"): - HandoffBuilder().participant_factories({"triage": lambda: triage}).participants([ + HandoffBuilder().register_participants({"triage": lambda: triage}).participants([ MockHandoffAgent(name="specialist") ]) @@ -342,11 +346,13 @@ def test_handoff_builder_rejects_mixing_participants_and_participant_factories_m def test_handoff_builder_rejects_multiple_calls_to_participant_factories(): """Test that multiple calls to .participant_factories() raises an error.""" - with pytest.raises(ValueError, match=r"participant_factories\(\) has already been called"): + with pytest.raises( + ValueError, match=r"register_participants\(\) has already been called on this builder instance." + ): ( HandoffBuilder() - .participant_factories({"agent1": lambda: MockHandoffAgent(name="agent1")}) - .participant_factories({"agent2": lambda: MockHandoffAgent(name="agent2")}) + .register_participants({"agent1": lambda: MockHandoffAgent(name="agent1")}) + .register_participants({"agent2": lambda: MockHandoffAgent(name="agent2")}) ) @@ -385,7 +391,7 @@ def test_handoff_builder_rejects_factory_name_coordinator_with_instances(): triage = MockHandoffAgent(name="triage") specialist = MockHandoffAgent(name="specialist") - with pytest.raises(ValueError, match="Call participant_factories.*before with_start_agent"): + with pytest.raises(ValueError, match=r"Call register_participants\(...\) before with_start_agent\(...\)"): ( HandoffBuilder(participants=[triage, specialist]).with_start_agent( "triage" diff --git a/python/packages/core/tests/workflow/test_magentic.py b/python/packages/core/tests/workflow/test_magentic.py index 7e4a5bb48e..bb37131c57 100644 --- a/python/packages/core/tests/workflow/test_magentic.py +++ b/python/packages/core/tests/workflow/test_magentic.py @@ -1,7 +1,7 @@ # Copyright (c) Microsoft. All rights reserved. import sys -from collections.abc import AsyncIterable +from collections.abc import AsyncIterable, Sequence from dataclasses import dataclass from typing import Any, ClassVar, cast @@ -155,7 +155,7 @@ def __init__(self, agent_name: str, reply_text: str, **kwargs: Any) -> None: async def run( # type: ignore[override] self, - messages: str | ChatMessage | list[str] | list[ChatMessage] | None = None, + messages: str | ChatMessage | Sequence[str | ChatMessage] | None = None, *, thread: AgentThread | None = None, **kwargs: Any, @@ -165,7 +165,7 @@ async def run( # type: ignore[override] def run_stream( # type: ignore[override] self, - messages: str | ChatMessage | list[str] | list[ChatMessage] | None = None, + messages: str | ChatMessage | Sequence[str | ChatMessage] | None = None, *, thread: AgentThread | None = None, **kwargs: Any, @@ -193,7 +193,7 @@ async def test_magentic_builder_returns_workflow_and_runs() -> None: manager = FakeManager() agent = StubAgent(manager.next_speaker_name, "first draft") - workflow = MagenticBuilder().participants([agent]).with_standard_manager(manager).build() + workflow = MagenticBuilder().participants([agent]).with_manager(manager=manager).build() assert isinstance(workflow, Workflow) @@ -219,7 +219,7 @@ async def test_magentic_as_agent_does_not_accept_conversation() -> None: manager = FakeManager() writer = StubAgent(manager.next_speaker_name, "summary response") - workflow = MagenticBuilder().participants([writer]).with_standard_manager(manager).build() + workflow = MagenticBuilder().participants([writer]).with_manager(manager=manager).build() agent = workflow.as_agent(name="magentic-agent") conversation = [ @@ -247,7 +247,7 @@ async def test_standard_manager_plan_and_replan_combined_ledger(): async def test_magentic_workflow_plan_review_approval_to_completion(): manager = FakeManager() - wf = MagenticBuilder().participants([DummyExec("agentA")]).with_standard_manager(manager).with_plan_review().build() + wf = MagenticBuilder().participants([DummyExec("agentA")]).with_manager(manager=manager).with_plan_review().build() req_event: RequestInfoEvent | None = None async for ev in wf.run_stream("do work"): @@ -288,7 +288,7 @@ async def replan(self, magentic_context: MagenticContext) -> ChatMessage: # typ wf = ( MagenticBuilder() .participants([DummyExec(name=manager.next_speaker_name)]) - .with_standard_manager(manager) + .with_manager(manager=manager) .with_plan_review() .build() ) @@ -333,7 +333,7 @@ async def test_magentic_orchestrator_round_limit_produces_partial_result(): wf = ( MagenticBuilder() .participants([DummyExec(name=manager.next_speaker_name)]) - .with_standard_manager(manager) + .with_manager(manager=manager) .build() ) @@ -363,7 +363,7 @@ async def test_magentic_checkpoint_resume_round_trip(): wf = ( MagenticBuilder() .participants([DummyExec(name=manager1.next_speaker_name)]) - .with_standard_manager(manager1) + .with_manager(manager=manager1) .with_plan_review() .with_checkpointing(storage) .build() @@ -386,7 +386,7 @@ async def test_magentic_checkpoint_resume_round_trip(): wf_resume = ( MagenticBuilder() .participants([DummyExec(name=manager2.next_speaker_name)]) - .with_standard_manager(manager2) + .with_manager(manager=manager2) .with_plan_review() .with_checkpointing(storage) .build() @@ -422,7 +422,7 @@ class StubManagerAgent(BaseAgent): async def run( self, - messages: str | ChatMessage | list[str] | list[ChatMessage] | None = None, + messages: str | ChatMessage | Sequence[str | ChatMessage] | None = None, *, thread: Any = None, **kwargs: Any, @@ -431,7 +431,7 @@ async def run( def run_stream( self, - messages: str | ChatMessage | list[str] | list[ChatMessage] | None = None, + messages: str | ChatMessage | Sequence[str | ChatMessage] | None = None, *, thread: Any = None, **kwargs: Any, @@ -575,7 +575,7 @@ async def run(self, messages=None, *, thread=None, **kwargs): # type: ignore[ov async def _collect_agent_responses_setup(participant: AgentProtocol) -> list[ChatMessage]: captured: list[ChatMessage] = [] - wf = MagenticBuilder().participants([participant]).with_standard_manager(InvokeOnceManager()).build() + wf = MagenticBuilder().participants([participant]).with_manager(manager=InvokeOnceManager()).build() # Run a bounded stream to allow one invoke and then completion events: list[WorkflowEvent] = [] @@ -623,7 +623,7 @@ async def test_magentic_checkpoint_resume_inner_loop_superstep(): workflow = ( MagenticBuilder() .participants([StubThreadAgent()]) - .with_standard_manager(InvokeOnceManager()) + .with_manager(manager=InvokeOnceManager()) .with_checkpointing(storage) .build() ) @@ -638,7 +638,7 @@ async def test_magentic_checkpoint_resume_inner_loop_superstep(): resumed = ( MagenticBuilder() .participants([StubThreadAgent()]) - .with_standard_manager(InvokeOnceManager()) + .with_manager(manager=InvokeOnceManager()) .with_checkpointing(storage) .build() ) @@ -661,7 +661,7 @@ async def test_magentic_checkpoint_resume_from_saved_state(): workflow = ( MagenticBuilder() .participants([StubThreadAgent()]) - .with_standard_manager(manager) + .with_manager(manager=manager) .with_checkpointing(storage) .build() ) @@ -678,7 +678,7 @@ async def test_magentic_checkpoint_resume_from_saved_state(): resumed_workflow = ( MagenticBuilder() .participants([StubThreadAgent()]) - .with_standard_manager(InvokeOnceManager()) + .with_manager(manager=InvokeOnceManager()) .with_checkpointing(storage) .build() ) @@ -699,7 +699,7 @@ async def test_magentic_checkpoint_resume_rejects_participant_renames(): workflow = ( MagenticBuilder() .participants([StubThreadAgent()]) - .with_standard_manager(manager) + .with_manager(manager=manager) .with_plan_review() .with_checkpointing(storage) .build() @@ -719,7 +719,7 @@ async def test_magentic_checkpoint_resume_rejects_participant_renames(): renamed_workflow = ( MagenticBuilder() .participants([StubThreadAgent(name="renamedAgent")]) - .with_standard_manager(InvokeOnceManager()) + .with_manager(manager=InvokeOnceManager()) .with_plan_review() .with_checkpointing(storage) .build() @@ -759,7 +759,7 @@ async def prepare_final_answer(self, magentic_context: MagenticContext) -> ChatM async def test_magentic_stall_and_reset_reach_limits(): manager = NotProgressingManager(max_round_count=10, max_stall_count=0, max_reset_count=1) - wf = MagenticBuilder().participants([DummyExec("agentA")]).with_standard_manager(manager).build() + wf = MagenticBuilder().participants([DummyExec("agentA")]).with_manager(manager=manager).build() events: list[WorkflowEvent] = [] async for ev in wf.run_stream("test limits"): @@ -784,7 +784,7 @@ async def test_magentic_checkpoint_runtime_only() -> None: storage = InMemoryCheckpointStorage() manager = FakeManager(max_round_count=10) - wf = MagenticBuilder().participants([DummyExec("agentA")]).with_standard_manager(manager).build() + wf = MagenticBuilder().participants([DummyExec("agentA")]).with_manager(manager=manager).build() baseline_output: ChatMessage | None = None async for ev in wf.run_stream("runtime checkpoint test", checkpoint_storage=storage): @@ -819,7 +819,7 @@ async def test_magentic_checkpoint_runtime_overrides_buildtime() -> None: wf = ( MagenticBuilder() .participants([DummyExec("agentA")]) - .with_standard_manager(manager) + .with_manager(manager=manager) .with_checkpointing(buildtime_storage) .build() ) @@ -874,7 +874,7 @@ async def test_magentic_checkpoint_restore_no_duplicate_history(): wf = ( MagenticBuilder() .participants([DummyExec("agentA")]) - .with_standard_manager(manager) + .with_manager(manager=manager) .with_checkpointing(storage) .build() ) @@ -927,3 +927,374 @@ async def test_magentic_checkpoint_restore_no_duplicate_history(): # endregion + +# region Participant Factory Tests + + +def test_magentic_builder_rejects_empty_participant_factories(): + """Test that MagenticBuilder rejects empty participant_factories list.""" + with pytest.raises(ValueError, match=r"participant_factories cannot be empty"): + MagenticBuilder().register_participants([]) + + with pytest.raises( + ValueError, + match=r"No participants provided\. Call \.participants\(\) or \.register_participants\(\) first\.", + ): + MagenticBuilder().with_manager(manager=FakeManager()).build() + + +def test_magentic_builder_rejects_mixing_participants_and_factories(): + """Test that mixing .participants() and .register_participants() raises an error.""" + agent = StubAgent("agentA", "reply from agentA") + + # Case 1: participants first, then register_participants + with pytest.raises(ValueError, match="Cannot mix .participants"): + MagenticBuilder().participants([agent]).register_participants([lambda: StubAgent("agentB", "reply")]) + + # Case 2: register_participants first, then participants + with pytest.raises(ValueError, match="Cannot mix .participants"): + MagenticBuilder().register_participants([lambda: agent]).participants([StubAgent("agentB", "reply")]) + + +def test_magentic_builder_rejects_multiple_calls_to_register_participants(): + """Test that multiple calls to .register_participants() raises an error.""" + with pytest.raises( + ValueError, match=r"register_participants\(\) has already been called on this builder instance." + ): + ( + MagenticBuilder() + .register_participants([lambda: StubAgent("agentA", "reply from agentA")]) + .register_participants([lambda: StubAgent("agentB", "reply from agentB")]) + ) + + +def test_magentic_builder_rejects_multiple_calls_to_participants(): + """Test that multiple calls to .participants() raises an error.""" + with pytest.raises(ValueError, match="participants have already been set"): + ( + MagenticBuilder() + .participants([StubAgent("agentA", "reply from agentA")]) + .participants([StubAgent("agentB", "reply from agentB")]) + ) + + +async def test_magentic_with_participant_factories(): + """Test workflow creation using participant_factories.""" + call_count = 0 + + def create_agent() -> StubAgent: + nonlocal call_count + call_count += 1 + return StubAgent("agentA", "reply from agentA") + + manager = FakeManager() + workflow = MagenticBuilder().register_participants([create_agent]).with_manager(manager=manager).build() + + # Factory should be called during build + assert call_count == 1 + + outputs: list[WorkflowOutputEvent] = [] + async for event in workflow.run_stream("test task"): + if isinstance(event, WorkflowOutputEvent): + outputs.append(event) + + assert len(outputs) == 1 + + +async def test_magentic_participant_factories_reusable_builder(): + """Test that the builder can be reused to build multiple workflows with factories.""" + call_count = 0 + + def create_agent() -> StubAgent: + nonlocal call_count + call_count += 1 + return StubAgent("agentA", "reply from agentA") + + builder = MagenticBuilder().register_participants([create_agent]).with_manager(manager=FakeManager()) + + # Build first workflow + wf1 = builder.build() + assert call_count == 1 + + # Build second workflow + wf2 = builder.build() + assert call_count == 2 + + # Verify that the two workflows have different agent instances + assert wf1.executors["agentA"] is not wf2.executors["agentA"] + + +async def test_magentic_participant_factories_with_checkpointing(): + """Test checkpointing with participant_factories.""" + storage = InMemoryCheckpointStorage() + + def create_agent() -> StubAgent: + return StubAgent("agentA", "reply from agentA") + + manager = FakeManager() + workflow = ( + MagenticBuilder() + .register_participants([create_agent]) + .with_manager(manager=manager) + .with_checkpointing(storage) + .build() + ) + + outputs: list[WorkflowOutputEvent] = [] + async for event in workflow.run_stream("checkpoint test"): + if isinstance(event, WorkflowOutputEvent): + outputs.append(event) + + assert outputs, "Should have workflow output" + + checkpoints = await storage.list_checkpoints() + assert checkpoints, "Checkpoints should be created during workflow execution" + + +# endregion + +# region Manager Factory Tests + + +def test_magentic_builder_rejects_multiple_manager_configurations(): + """Test that configuring multiple managers raises ValueError.""" + manager = FakeManager() + + builder = MagenticBuilder().with_manager(manager=manager) + + with pytest.raises(ValueError, match=r"with_manager\(\) has already been called"): + builder.with_manager(manager=manager) + + +def test_magentic_builder_requires_exactly_one_manager_option(): + """Test that exactly one manager option must be provided.""" + manager = FakeManager() + + def manager_factory() -> MagenticManagerBase: + return FakeManager() + + # No options provided + with pytest.raises(ValueError, match="Exactly one of"): + MagenticBuilder().with_manager() # type: ignore + + # Multiple options provided + with pytest.raises(ValueError, match="Exactly one of"): + MagenticBuilder().with_manager(manager=manager, manager_factory=manager_factory) # type: ignore + + +async def test_magentic_with_manager_factory(): + """Test workflow creation using manager_factory.""" + factory_call_count = 0 + + def manager_factory() -> MagenticManagerBase: + nonlocal factory_call_count + factory_call_count += 1 + return FakeManager() + + agent = StubAgent("agentA", "reply from agentA") + workflow = MagenticBuilder().participants([agent]).with_manager(manager_factory=manager_factory).build() + + # Factory should be called during build + assert factory_call_count == 1 + + outputs: list[WorkflowOutputEvent] = [] + async for event in workflow.run_stream("test task"): + if isinstance(event, WorkflowOutputEvent): + outputs.append(event) + + assert len(outputs) == 1 + + +async def test_magentic_with_agent_factory(): + """Test workflow creation using agent_factory for StandardMagenticManager.""" + factory_call_count = 0 + + def agent_factory() -> AgentProtocol: + nonlocal factory_call_count + factory_call_count += 1 + return cast(AgentProtocol, StubManagerAgent()) + + participant = StubAgent("agentA", "reply from agentA") + workflow = ( + MagenticBuilder() + .participants([participant]) + .with_manager(agent_factory=agent_factory, max_round_count=1) + .build() + ) + + # Factory should be called during build + assert factory_call_count == 1 + + # Verify workflow can be started (may not complete successfully due to stub behavior) + event_count = 0 + async for _ in workflow.run_stream("test task"): + event_count += 1 + if event_count > 10: + break + + assert event_count > 0 + + +async def test_magentic_manager_factory_reusable_builder(): + """Test that the builder can be reused to build multiple workflows with manager factory.""" + factory_call_count = 0 + + def manager_factory() -> MagenticManagerBase: + nonlocal factory_call_count + factory_call_count += 1 + return FakeManager() + + agent = StubAgent("agentA", "reply from agentA") + builder = MagenticBuilder().participants([agent]).with_manager(manager_factory=manager_factory) + + # Build first workflow + wf1 = builder.build() + assert factory_call_count == 1 + + # Build second workflow + wf2 = builder.build() + assert factory_call_count == 2 + + # Verify that the two workflows have different orchestrator instances + orchestrator1 = next(e for e in wf1.executors.values() if isinstance(e, MagenticOrchestrator)) + orchestrator2 = next(e for e in wf2.executors.values() if isinstance(e, MagenticOrchestrator)) + assert orchestrator1 is not orchestrator2 + + +def test_magentic_with_both_participant_and_manager_factories(): + """Test workflow creation using both participant_factories and manager_factory.""" + participant_factory_call_count = 0 + manager_factory_call_count = 0 + + def create_agent() -> StubAgent: + nonlocal participant_factory_call_count + participant_factory_call_count += 1 + return StubAgent("agentA", "reply from agentA") + + def manager_factory() -> MagenticManagerBase: + nonlocal manager_factory_call_count + manager_factory_call_count += 1 + return FakeManager() + + workflow = ( + MagenticBuilder().register_participants([create_agent]).with_manager(manager_factory=manager_factory).build() + ) + + # All factories should be called during build + assert participant_factory_call_count == 1 + assert manager_factory_call_count == 1 + + # Verify executor is present in the workflow + assert "agentA" in workflow.executors + + +async def test_magentic_factories_reusable_for_multiple_workflows(): + """Test that both factories are reused correctly for multiple workflow builds.""" + participant_factory_call_count = 0 + manager_factory_call_count = 0 + + def create_agent() -> StubAgent: + nonlocal participant_factory_call_count + participant_factory_call_count += 1 + return StubAgent("agentA", "reply from agentA") + + def manager_factory() -> MagenticManagerBase: + nonlocal manager_factory_call_count + manager_factory_call_count += 1 + return FakeManager() + + builder = MagenticBuilder().register_participants([create_agent]).with_manager(manager_factory=manager_factory) + + # Build first workflow + wf1 = builder.build() + assert participant_factory_call_count == 1 + assert manager_factory_call_count == 1 + + # Build second workflow + wf2 = builder.build() + assert participant_factory_call_count == 2 + assert manager_factory_call_count == 2 + + # Verify that the workflows have different agent and orchestrator instances + assert wf1.executors["agentA"] is not wf2.executors["agentA"] + + orchestrator1 = next(e for e in wf1.executors.values() if isinstance(e, MagenticOrchestrator)) + orchestrator2 = next(e for e in wf2.executors.values() if isinstance(e, MagenticOrchestrator)) + assert orchestrator1 is not orchestrator2 + + +def test_magentic_agent_factory_with_standard_manager_options(): + """Test that agent_factory properly passes through standard manager options.""" + factory_call_count = 0 + + def agent_factory() -> AgentProtocol: + nonlocal factory_call_count + factory_call_count += 1 + return cast(AgentProtocol, StubManagerAgent()) + + # Custom options to verify they are passed through + custom_max_stall_count = 5 + custom_max_reset_count = 2 + custom_max_round_count = 10 + custom_facts_prompt = "Custom facts prompt: {task}" + custom_plan_prompt = "Custom plan prompt: {team}" + custom_full_prompt = "Custom full prompt: {task} {team} {facts} {plan}" + custom_facts_update_prompt = "Custom facts update: {task} {old_facts}" + custom_plan_update_prompt = "Custom plan update: {team}" + custom_progress_prompt = "Custom progress: {task} {team} {names}" + custom_final_prompt = "Custom final: {task}" + + # Create a custom task ledger + from agent_framework._workflows._magentic import _MagenticTaskLedger + + custom_task_ledger = _MagenticTaskLedger( + facts=ChatMessage(role=Role.ASSISTANT, text="Custom facts"), + plan=ChatMessage(role=Role.ASSISTANT, text="Custom plan"), + ) + + participant = StubAgent("agentA", "reply from agentA") + workflow = ( + MagenticBuilder() + .participants([participant]) + .with_manager( + agent_factory=agent_factory, + task_ledger=custom_task_ledger, + max_stall_count=custom_max_stall_count, + max_reset_count=custom_max_reset_count, + max_round_count=custom_max_round_count, + task_ledger_facts_prompt=custom_facts_prompt, + task_ledger_plan_prompt=custom_plan_prompt, + task_ledger_full_prompt=custom_full_prompt, + task_ledger_facts_update_prompt=custom_facts_update_prompt, + task_ledger_plan_update_prompt=custom_plan_update_prompt, + progress_ledger_prompt=custom_progress_prompt, + final_answer_prompt=custom_final_prompt, + ) + .build() + ) + + # Factory should be called during build + assert factory_call_count == 1 + + # Get the orchestrator and verify the manager has the custom options + orchestrator = next(e for e in workflow.executors.values() if isinstance(e, MagenticOrchestrator)) + manager = orchestrator._manager # type: ignore[reportPrivateUsage] + + # Verify the manager is a StandardMagenticManager with the expected options + from agent_framework import StandardMagenticManager + + assert isinstance(manager, StandardMagenticManager) + assert manager.task_ledger is custom_task_ledger + assert manager.max_stall_count == custom_max_stall_count + assert manager.max_reset_count == custom_max_reset_count + assert manager.max_round_count == custom_max_round_count + assert manager.task_ledger_facts_prompt == custom_facts_prompt + assert manager.task_ledger_plan_prompt == custom_plan_prompt + assert manager.task_ledger_full_prompt == custom_full_prompt + assert manager.task_ledger_facts_update_prompt == custom_facts_update_prompt + assert manager.task_ledger_plan_update_prompt == custom_plan_update_prompt + assert manager.progress_ledger_prompt == custom_progress_prompt + assert manager.final_answer_prompt == custom_final_prompt + + +# endregion diff --git a/python/packages/core/tests/workflow/test_workflow_kwargs.py b/python/packages/core/tests/workflow/test_workflow_kwargs.py index 75c34f9d95..4d665c5516 100644 --- a/python/packages/core/tests/workflow/test_workflow_kwargs.py +++ b/python/packages/core/tests/workflow/test_workflow_kwargs.py @@ -1,6 +1,6 @@ # Copyright (c) Microsoft. All rights reserved. -from collections.abc import AsyncIterable +from collections.abc import AsyncIterable, Sequence from typing import Annotated, Any import pytest @@ -51,7 +51,7 @@ def __init__(self, name: str = "test_agent") -> None: async def run( self, - messages: str | ChatMessage | list[str] | list[ChatMessage] | None = None, + messages: str | ChatMessage | Sequence[str | ChatMessage] | None = None, *, thread: AgentThread | None = None, **kwargs: Any, @@ -61,7 +61,7 @@ async def run( async def run_stream( self, - messages: str | ChatMessage | list[str] | list[ChatMessage] | None = None, + messages: str | ChatMessage | Sequence[str | ChatMessage] | None = None, *, thread: AgentThread | None = None, **kwargs: Any, @@ -187,7 +187,7 @@ def simple_selector(state: GroupChatState) -> str: workflow = ( GroupChatBuilder() .participants([agent1, agent2]) - .with_select_speaker_func(simple_selector) + .with_orchestrator(selection_func=simple_selector) .with_max_rounds(2) # Limit rounds to prevent infinite loop .build() ) @@ -408,7 +408,7 @@ async def prepare_final_answer(self, magentic_context: MagenticContext) -> ChatM agent = _KwargsCapturingAgent(name="agent1") manager = _MockManager() - workflow = MagenticBuilder().participants([agent]).with_standard_manager(manager=manager).build() + workflow = MagenticBuilder().participants([agent]).with_manager(manager=manager).build() custom_data = {"session_id": "magentic123"} @@ -457,7 +457,7 @@ async def prepare_final_answer(self, magentic_context: MagenticContext) -> ChatM agent = _KwargsCapturingAgent(name="agent1") manager = _MockManager() - magentic_workflow = MagenticBuilder().participants([agent]).with_standard_manager(manager=manager).build() + magentic_workflow = MagenticBuilder().participants([agent]).with_manager(manager=manager).build() # Use MagenticWorkflow.run_stream() which goes through the kwargs attachment path custom_data = {"magentic_key": "magentic_value"} diff --git a/python/samples/autogen-migration/orchestrations/02_selector_group_chat.py b/python/samples/autogen-migration/orchestrations/02_selector_group_chat.py index c48c9882e4..f8c170cbef 100644 --- a/python/samples/autogen-migration/orchestrations/02_selector_group_chat.py +++ b/python/samples/autogen-migration/orchestrations/02_selector_group_chat.py @@ -86,12 +86,11 @@ async def run_agent_framework() -> None: workflow = ( GroupChatBuilder() .participants([python_expert, javascript_expert, database_expert]) - .set_manager( - manager=client.as_agent( + .with_orchestrator( + agent=client.as_agent( name="selector_manager", instructions="Based on the conversation, select the most appropriate expert to respond next.", ), - display_name="SelectorManager", ) .with_max_rounds(1) .build() diff --git a/python/samples/autogen-migration/orchestrations/04_magentic_one.py b/python/samples/autogen-migration/orchestrations/04_magentic_one.py index 48de809a95..11be3b7bdb 100644 --- a/python/samples/autogen-migration/orchestrations/04_magentic_one.py +++ b/python/samples/autogen-migration/orchestrations/04_magentic_one.py @@ -6,6 +6,16 @@ """ import asyncio +import json +from typing import cast + +from agent_framework import ( + AgentRunUpdateEvent, + ChatMessage, + MagenticOrchestratorEvent, + MagenticProgressLedger, + WorkflowOutputEvent, +) async def run_autogen() -> None: @@ -57,13 +67,7 @@ async def run_autogen() -> None: async def run_agent_framework() -> None: """Agent Framework's MagenticBuilder for orchestrated collaboration.""" - from agent_framework import ( - MagenticAgentDeltaEvent, - MagenticAgentMessageEvent, - MagenticBuilder, - MagenticFinalResultEvent, - MagenticOrchestratorMessageEvent, - ) + from agent_framework import MagenticBuilder from agent_framework.openai import OpenAIChatClient client = OpenAIChatClient(model_id="gpt-4.1-mini") @@ -90,9 +94,13 @@ async def run_agent_framework() -> None: # Create Magentic workflow workflow = ( MagenticBuilder() - .participants(researcher=researcher, coder=coder, reviewer=reviewer) - .with_standard_manager( - chat_client=client, + .participants([researcher, coder, reviewer]) + .with_manager( + agent=client.create_agent( + name="magentic_manager", + instructions="You coordinate a team to complete complex tasks efficiently.", + description="Orchestrator for team coordination", + ), max_round_count=20, max_stall_count=3, max_reset_count=1, @@ -101,41 +109,46 @@ async def run_agent_framework() -> None: ) # Run complex task + last_message_id: str | None = None + output_event: WorkflowOutputEvent | None = None print("[Agent Framework] Magentic conversation:") - last_stream_agent_id: str | None = None - stream_line_open: bool = False - async for event in workflow.run_stream("Research Python async patterns and write a simple example"): - if isinstance(event, MagenticOrchestratorMessageEvent): - if stream_line_open: - print() - stream_line_open = False - print(f"---------- Orchestrator:{event.kind} ----------") - print(getattr(event.message, "text", "")) - elif isinstance(event, MagenticAgentDeltaEvent): - if last_stream_agent_id != event.agent_id or not stream_line_open: - if stream_line_open: - print() - print(f"---------- {event.agent_id} ----------") - last_stream_agent_id = event.agent_id - stream_line_open = True - if event.text: - print(event.text, end="", flush=True) - elif isinstance(event, MagenticAgentMessageEvent): - if stream_line_open: - print() - stream_line_open = False - elif isinstance(event, MagenticFinalResultEvent): - if stream_line_open: - print() - stream_line_open = False - print("---------- Final Result ----------") - if event.message is not None: - print(event.message.text) - - if stream_line_open: - print() - print() # Final newline after conversation + if isinstance(event, AgentRunUpdateEvent): + message_id = event.data.message_id + if message_id != last_message_id: + if last_message_id is not None: + print("\n") + print(f"- {event.executor_id}:", end=" ", flush=True) + last_message_id = message_id + print(event.data, end="", flush=True) + + elif isinstance(event, MagenticOrchestratorEvent): + print(f"\n[Magentic Orchestrator Event] Type: {event.event_type.name}") + if isinstance(event.data, ChatMessage): + print(f"Please review the plan:\n{event.data.text}") + elif isinstance(event.data, MagenticProgressLedger): + print(f"Please review progress ledger:\n{json.dumps(event.data.to_dict(), indent=2)}") + else: + print(f"Unknown data type in MagenticOrchestratorEvent: {type(event.data)}") + + # Block to allow user to read the plan/progress before continuing + # Note: this is for demonstration only and is not the recommended way to handle human interaction. + # Please refer to `with_plan_review` for proper human interaction during planning phases. + await asyncio.get_event_loop().run_in_executor(None, input, "Press Enter to continue...") + + elif isinstance(event, WorkflowOutputEvent): + output_event = event + + if not output_event: + raise RuntimeError("Workflow did not produce a final output event.") + print("\n\nWorkflow completed!") + print("Final Output:") + # The output of the Magentic workflow is a list of ChatMessages with only one final message + # generated by the orchestrator. + output_messages = cast(list[ChatMessage], output_event.data) + if output_messages: + output = output_messages[-1].text + print(output) async def main() -> None: diff --git a/python/samples/getting_started/workflows/README.md b/python/samples/getting_started/workflows/README.md index 8ca5e0f4bc..2be4bca03e 100644 --- a/python/samples/getting_started/workflows/README.md +++ b/python/samples/getting_started/workflows/README.md @@ -115,7 +115,7 @@ For additional observability samples in Agent Framework, see the [observability | Concurrent Orchestration (Custom Aggregator) | [orchestration/concurrent_custom_aggregator.py](./orchestration/concurrent_custom_aggregator.py) | Override aggregator via callback; summarize results with an LLM | | Concurrent Orchestration (Custom Agent Executors) | [orchestration/concurrent_custom_agent_executors.py](./orchestration/concurrent_custom_agent_executors.py) | Child executors own ChatAgents; concurrent fan-out/fan-in via ConcurrentBuilder | | Concurrent Orchestration (Participant Factory) | [orchestration/concurrent_participant_factory.py](./orchestration/concurrent_participant_factory.py) | Use participant factories for state isolation between workflow instances | -| Group Chat with Agent Manager | [orchestration/group_chat_agent_manager.py](./orchestration/group_chat_agent_manager.py) | Agent-based manager using `with_agent_orchestrator()` to select next speaker | +| Group Chat with Agent Manager | [orchestration/group_chat_agent_manager.py](./orchestration/group_chat_agent_manager.py) | Agent-based manager using `with_orchestrator(agent=)` to select next speaker | | Group Chat Philosophical Debate | [orchestration/group_chat_philosophical_debate.py](./orchestration/group_chat_philosophical_debate.py) | Agent manager moderates long-form, multi-round debate across diverse participants | | Group Chat with Simple Function Selector | [orchestration/group_chat_simple_selector.py](./orchestration/group_chat_simple_selector.py) | Group chat with a simple function selector for next speaker | | Handoff (Simple) | [orchestration/handoff_simple.py](./orchestration/handoff_simple.py) | Single-tier routing: triage agent routes to specialists, control returns to user after each specialist response | diff --git a/python/samples/getting_started/workflows/agents/group_chat_workflow_as_agent.py b/python/samples/getting_started/workflows/agents/group_chat_workflow_as_agent.py index 83873fd96f..c6ec8ec3b0 100644 --- a/python/samples/getting_started/workflows/agents/group_chat_workflow_as_agent.py +++ b/python/samples/getting_started/workflows/agents/group_chat_workflow_as_agent.py @@ -34,8 +34,8 @@ async def main() -> None: workflow = ( GroupChatBuilder() - .with_agent_orchestrator( - OpenAIChatClient().as_agent( + .with_orchestrator( + agent=OpenAIChatClient().as_agent( name="Orchestrator", instructions="You coordinate a team conversation to solve the user's task.", ) diff --git a/python/samples/getting_started/workflows/agents/magentic_workflow_as_agent.py b/python/samples/getting_started/workflows/agents/magentic_workflow_as_agent.py index f4e5b38e86..3badeae78a 100644 --- a/python/samples/getting_started/workflows/agents/magentic_workflow_as_agent.py +++ b/python/samples/getting_started/workflows/agents/magentic_workflow_as_agent.py @@ -55,7 +55,7 @@ async def main() -> None: workflow = ( MagenticBuilder() .participants([researcher_agent, coder_agent]) - .with_standard_manager( + .with_manager( agent=manager_agent, max_round_count=10, max_stall_count=3, diff --git a/python/samples/getting_started/workflows/human-in-the-loop/group_chat_request_info.py b/python/samples/getting_started/workflows/human-in-the-loop/group_chat_request_info.py index c2cbb7dd00..b963e5a9a0 100644 --- a/python/samples/getting_started/workflows/human-in-the-loop/group_chat_request_info.py +++ b/python/samples/getting_started/workflows/human-in-the-loop/group_chat_request_info.py @@ -89,7 +89,7 @@ async def main() -> None: # Using agents= filter to only pause before pragmatist speaks (not every turn) workflow = ( GroupChatBuilder() - .with_agent_orchestrator(orchestrator) + .with_orchestrator(agent=orchestrator) .participants([optimist, pragmatist, creative]) .with_max_rounds(6) .with_request_info(agents=[pragmatist]) # Only pause before pragmatist speaks diff --git a/python/samples/getting_started/workflows/orchestration/group_chat_agent_manager.py b/python/samples/getting_started/workflows/orchestration/group_chat_agent_manager.py index 12475205d3..ea89f4d230 100644 --- a/python/samples/getting_started/workflows/orchestration/group_chat_agent_manager.py +++ b/python/samples/getting_started/workflows/orchestration/group_chat_agent_manager.py @@ -68,7 +68,7 @@ async def main() -> None: # Build the group chat workflow workflow = ( GroupChatBuilder() - .with_agent_orchestrator(orchestrator_agent) + .with_orchestrator(agent=orchestrator_agent) .participants([researcher, writer]) # Set a hard termination condition: stop after 4 assistant messages # The agent orchestrator will intelligently decide when to end before this limit but just in case diff --git a/python/samples/getting_started/workflows/orchestration/group_chat_philosophical_debate.py b/python/samples/getting_started/workflows/orchestration/group_chat_philosophical_debate.py index a26b9df4d0..c8506bc5e6 100644 --- a/python/samples/getting_started/workflows/orchestration/group_chat_philosophical_debate.py +++ b/python/samples/getting_started/workflows/orchestration/group_chat_philosophical_debate.py @@ -211,7 +211,7 @@ async def main() -> None: workflow = ( GroupChatBuilder() - .with_agent_orchestrator(moderator) + .with_orchestrator(agent=moderator) .participants([farmer, developer, teacher, activist, spiritual_leader, artist, immigrant, doctor]) .with_termination_condition(lambda messages: sum(1 for msg in messages if msg.role == Role.ASSISTANT) >= 10) .build() diff --git a/python/samples/getting_started/workflows/orchestration/group_chat_simple_selector.py b/python/samples/getting_started/workflows/orchestration/group_chat_simple_selector.py index 517ae313f3..1047cd6f22 100644 --- a/python/samples/getting_started/workflows/orchestration/group_chat_simple_selector.py +++ b/python/samples/getting_started/workflows/orchestration/group_chat_simple_selector.py @@ -17,7 +17,7 @@ Sample: Group Chat with a round-robin speaker selector What it does: -- Demonstrates the with_select_speaker_func() API for GroupChat orchestration +- Demonstrates the with_orchestrator() API for GroupChat orchestration - Uses a pure Python function to control speaker selection based on conversation state Prerequisites: @@ -84,7 +84,7 @@ async def main() -> None: workflow = ( GroupChatBuilder() .participants([expert, verifier, clarifier, skeptic]) - .with_select_speaker_func(round_robin_selector) + .with_orchestrator(selection_func=round_robin_selector) # Set a hard termination condition: stop after 6 messages (user task + one full rounds + 1) # One round is expert -> verifier -> clarifier -> skeptic, after which the expert gets to respond again. # This will end the conversation after the expert has spoken 2 times (one iteration loop) diff --git a/python/samples/getting_started/workflows/orchestration/magentic.py b/python/samples/getting_started/workflows/orchestration/magentic.py index 8e71d09a42..60746bc113 100644 --- a/python/samples/getting_started/workflows/orchestration/magentic.py +++ b/python/samples/getting_started/workflows/orchestration/magentic.py @@ -80,7 +80,7 @@ async def main() -> None: workflow = ( MagenticBuilder() .participants([researcher_agent, coder_agent]) - .with_standard_manager( + .with_manager( agent=manager_agent, max_round_count=10, max_stall_count=3, diff --git a/python/samples/getting_started/workflows/orchestration/magentic_checkpoint.py b/python/samples/getting_started/workflows/orchestration/magentic_checkpoint.py index 6fc284a9ab..2dd6a1a170 100644 --- a/python/samples/getting_started/workflows/orchestration/magentic_checkpoint.py +++ b/python/samples/getting_started/workflows/orchestration/magentic_checkpoint.py @@ -83,7 +83,7 @@ def build_workflow(checkpoint_storage: FileCheckpointStorage): MagenticBuilder() .participants([researcher, writer]) .with_plan_review() - .with_standard_manager( + .with_manager( agent=manager_agent, max_round_count=10, max_stall_count=3, diff --git a/python/samples/getting_started/workflows/orchestration/magentic_human_plan_review.py b/python/samples/getting_started/workflows/orchestration/magentic_human_plan_review.py index 37a53020e7..1050463d01 100644 --- a/python/samples/getting_started/workflows/orchestration/magentic_human_plan_review.py +++ b/python/samples/getting_started/workflows/orchestration/magentic_human_plan_review.py @@ -63,7 +63,7 @@ async def main() -> None: workflow = ( MagenticBuilder() .participants([researcher_agent, analyst_agent]) - .with_standard_manager( + .with_manager( agent=manager_agent, max_round_count=10, max_stall_count=1, diff --git a/python/samples/getting_started/workflows/tool-approval/group_chat_builder_tool_approval.py b/python/samples/getting_started/workflows/tool-approval/group_chat_builder_tool_approval.py index ae893e05ae..dc5c9e49a9 100644 --- a/python/samples/getting_started/workflows/tool-approval/group_chat_builder_tool_approval.py +++ b/python/samples/getting_started/workflows/tool-approval/group_chat_builder_tool_approval.py @@ -118,8 +118,7 @@ async def main() -> None: # 4. Build a group chat workflow with the selector function workflow = ( GroupChatBuilder() - # Optionally, use `.set_manager(...)` to customize the group chat manager - .with_select_speaker_func(select_next_speaker) + .with_orchestrator(selection_func=select_next_speaker) .participants([qa_engineer, devops_engineer]) # Set a hard limit to 4 rounds # First round: QAEngineer speaks diff --git a/python/samples/semantic-kernel-migration/orchestrations/group_chat.py b/python/samples/semantic-kernel-migration/orchestrations/group_chat.py index b43840eb49..4ce31f3a04 100644 --- a/python/samples/semantic-kernel-migration/orchestrations/group_chat.py +++ b/python/samples/semantic-kernel-migration/orchestrations/group_chat.py @@ -233,11 +233,8 @@ async def run_agent_framework_example(task: str) -> str: workflow = ( GroupChatBuilder() - .set_manager( - manager=AzureOpenAIChatClient(credential=credential).as_agent(), - display_name="Coordinator", - ) - .participants(researcher=researcher, planner=planner) + .with_orchestrator(agent=AzureOpenAIChatClient(credential=credential).as_agent()) + .participants([researcher, planner]) .build() ) diff --git a/python/samples/semantic-kernel-migration/orchestrations/magentic.py b/python/samples/semantic-kernel-migration/orchestrations/magentic.py index 87094a2047..3d9aa67ea8 100644 --- a/python/samples/semantic-kernel-migration/orchestrations/magentic.py +++ b/python/samples/semantic-kernel-migration/orchestrations/magentic.py @@ -144,12 +144,7 @@ async def run_agent_framework_example(prompt: str) -> str | None: chat_client=OpenAIChatClient(), ) - workflow = ( - MagenticBuilder() - .participants(researcher=researcher, coder=coder) - .with_standard_manager(agent=manager_agent) - .build() - ) + workflow = MagenticBuilder().participants([researcher, coder]).with_manager(agent=manager_agent).build() final_text: str | None = None async for event in workflow.run_stream(prompt):