Share via


agent_framework Package

Packages

a2a
ag_ui
anthropic
azure
chatkit
declarative
devui
lab
mem0
microsoft
ollama
openai
redis

Modules

exceptions
observability

Classes

AIFunction

A tool that wraps a Python function to make it callable by AI models.

This class wraps a Python function to make it callable by AI models with automatic parameter validation and JSON schema generation.

Initialize the AIFunction.

AgentExecutor

built-in executor that wraps an agent for handling messages.

AgentExecutor adapts its behavior based on the workflow execution mode:

  • run_stream(): Emits incremental AgentRunUpdateEvent events as the agent produces tokens
  • run(): Emits a single AgentRunEvent containing the complete response

The executor automatically detects the mode via WorkflowContext.is_streaming().

Initialize the executor with a unique identifier.

AgentExecutorRequest

A request to an agent executor.

AgentExecutorResponse

A response from an agent executor.

AgentInputRequest

Request for human input before an agent runs in high-level builder workflows.

Emitted via RequestInfoEvent when a workflow pauses before an agent executes. The response is injected into the conversation as a user message to steer the agent's behavior.

This is the standard request type used by .with_request_info() on SequentialBuilder, ConcurrentBuilder, GroupChatBuilder, and HandoffBuilder.

AgentMiddleware

Abstract base class for agent middleware that can intercept agent invocations.

Agent middleware allows you to intercept and modify agent invocations before and after execution. You can inspect messages, modify context, override results, or terminate execution early.

Note

AgentMiddleware is an abstract base class. You must subclass it and implement

the process() method to create custom agent middleware.

AgentProtocol

A protocol for an agent that can be invoked.

This protocol defines the interface that all agents must implement, including properties for identification and methods for execution.

Note

Protocols use structural subtyping (duck typing). Classes don't need

to explicitly inherit from this protocol to be considered compatible.

This allows you to create completely custom agents without using

any Agent Framework base classes.

AgentRunContext

Context object for agent middleware invocations.

This context is passed through the agent middleware pipeline and contains all information about the agent invocation.

Initialize the AgentRunContext.

AgentRunEvent

Event triggered when an agent run is completed.

Initialize the agent run event.

AgentRunResponse

Represents the response to an Agent run request.

Provides one or more response messages and metadata about the response. A typical response will contain a single message, but may contain multiple messages in scenarios involving function calls, RAG retrievals, or complex logic.

Initialize an AgentRunResponse.

AgentRunResponseUpdate

Represents a single streaming response chunk from an Agent.

Initialize an AgentRunResponseUpdate.

AgentRunUpdateEvent

Event triggered when an agent is streaming messages.

Initialize the agent streaming event.

AgentThread

The Agent thread class, this can represent both a locally managed thread or a thread managed by the service.

An AgentThread maintains the conversation state and message history for an agent interaction. It can either use a service-managed thread (via service_thread_id) or a local message store (via message_store), but not both.

Initialize an AgentThread, do not use this method manually, always use: agent.get_new_thread().

Note

Either service_thread_id or message_store may be set, but not both.

AggregateContextProvider

A ContextProvider that contains multiple context providers.

It delegates events to multiple context providers and aggregates responses from those events before returning. This allows you to combine multiple context providers into a single provider.

Note

An AggregateContextProvider is created automatically when you pass a single context

provider or a sequence of context providers to the agent constructor.

Initialize the AggregateContextProvider with context providers.

BaseAgent

Base class for all Agent Framework agents.

This class provides core functionality for agent implementations, including context providers, middleware support, and thread management.

Note

BaseAgent cannot be instantiated directly as it doesn't implement the

run(), run_stream(), and other methods required by AgentProtocol.

Use a concrete implementation like ChatAgent or create a subclass.

Initialize a BaseAgent instance.

BaseAnnotation

Base class for all AI Annotation types.

Initialize BaseAnnotation.

BaseChatClient

Base class for chat clients.

This abstract base class provides core functionality for chat client implementations, including middleware support, message preparation, and tool normalization.

Note

BaseChatClient cannot be instantiated directly as it's an abstract base class.

Subclasses must implement _inner_get_response() and _inner_get_streaming_response().

Initialize a BaseChatClient instance.

BaseContent

Represents content used by AI services.

Initialize BaseContent.

Case

Runtime wrapper combining a switch-case predicate with its target.

Each Case couples a boolean predicate with the executor that should handle the message when the predicate evaluates to True. The runtime keeps this lightweight container separate from the serialisable SwitchCaseEdgeGroupCase so that execution can operate with live callables without polluting persisted state.

ChatAgent

A Chat Client Agent.

This is the primary agent implementation that uses a chat client to interact with language models. It supports tools, context providers, middleware, and both streaming and non-streaming responses.

Initialize a ChatAgent instance.

Note

The set of parameters from frequency_penalty to request_kwargs are used to

call the chat client. They can also be passed to both run methods.

When both are set, the ones passed to the run methods take precedence.

ChatClientProtocol

A protocol for a chat client that can generate responses.

This protocol defines the interface that all chat clients must implement, including methods for generating both streaming and non-streaming responses.

Note

Protocols use structural subtyping (duck typing). Classes don't need

to explicitly inherit from this protocol to be considered compatible.

ChatContext

Context object for chat middleware invocations.

This context is passed through the chat middleware pipeline and contains all information about the chat request.

Initialize the ChatContext.

ChatMessage

Represents a chat message.

Initialize ChatMessage.

ChatMessageStore

An in-memory implementation of ChatMessageStoreProtocol that stores messages in a list.

This implementation provides a simple, list-based storage for chat messages with support for serialization and deserialization. It implements all the required methods of the ChatMessageStoreProtocol protocol.

The store maintains messages in memory and provides methods to serialize and deserialize the state for persistence purposes.

Create a ChatMessageStore for use in a thread.

ChatMessageStoreProtocol

Defines methods for storing and retrieving chat messages associated with a specific thread.

Implementations of this protocol are responsible for managing the storage of chat messages, including handling large volumes of data by truncating or summarizing messages as necessary.

ChatMiddleware

Abstract base class for chat middleware that can intercept chat client requests.

Chat middleware allows you to intercept and modify chat client requests before and after execution. You can modify messages, add system prompts, log requests, or override chat responses.

Note

ChatMiddleware is an abstract base class. You must subclass it and implement

the process() method to create custom chat middleware.

ChatOptions

Common request settings for AI services.

Initialize ChatOptions.

ChatResponse

Represents the response to a chat request.

Initializes a ChatResponse with the provided parameters.

ChatResponseUpdate

Represents a single streaming response chunk from a ChatClient.

Initializes a ChatResponseUpdate with the provided parameters.

CheckpointStorage

Protocol for checkpoint storage backends.

CitationAnnotation

Represents a citation annotation.

Initialize CitationAnnotation.

ConcurrentBuilder

High-level builder for concurrent agent workflows.

  • participants([...]) accepts a list of AgentProtocol (recommended) or Executor.

  • register_participants([...]) accepts a list of factories for AgentProtocol (recommended)

    or Executor factories

  • build() wires: dispatcher -> fan-out -> participants -> fan-in -> aggregator.

  • with_aggregator(...) overrides the default aggregator with an Executor or callback.

  • register_aggregator(...) accepts a factory for an Executor as custom aggregator.

Usage:


   from agent_framework import ConcurrentBuilder

   # Minimal: use default aggregator (returns list[ChatMessage])
   workflow = ConcurrentBuilder().participants([agent1, agent2, agent3]).build()

   # With agent factories
   workflow = ConcurrentBuilder().register_participants([create_agent1, create_agent2, create_agent3]).build()


   # Custom aggregator via callback (sync or async). The callback receives
   # list[AgentExecutorResponse] and its return value becomes the workflow's output.
   def summarize(results: list[AgentExecutorResponse]) -> str:
       return " | ".join(r.agent_run_response.messages[-1].text for r in results)


   workflow = ConcurrentBuilder().participants([agent1, agent2, agent3]).with_aggregator(summarize).build()


   # Custom aggregator via a factory
   class MyAggregator(Executor):
       @handler
       async def aggregate(self, results: list[AgentExecutorResponse], ctx: WorkflowContext[Never, str]) -> None:
           await ctx.yield_output(" | ".join(r.agent_run_response.messages[-1].text for r in results))


   workflow = (
       ConcurrentBuilder()
       .register_participants([create_agent1, create_agent2, create_agent3])
       .register_aggregator(lambda: MyAggregator(id="my_aggregator"))
       .build()
   )


   # Enable checkpoint persistence so runs can resume
   workflow = ConcurrentBuilder().participants([agent1, agent2, agent3]).with_checkpointing(storage).build()

   # Enable request info before aggregation
   workflow = ConcurrentBuilder().participants([agent1, agent2]).with_request_info().build()
Context

A class containing any context that should be provided to the AI model as supplied by a ContextProvider.

Each ContextProvider has the ability to provide its own context for each invocation. The Context class contains the additional context supplied by the ContextProvider. This context will be combined with context supplied by other providers before being passed to the AI model. This context is per invocation, and will not be stored as part of the chat history.

Create a new Context object.

ContextProvider

Base class for all context providers.

A context provider is a component that can be used to enhance the AI's context management. It can listen to changes in the conversation and provide additional context to the AI model just before invocation.

Note

ContextProvider is an abstract base class. You must subclass it and implement

the invoking() method to create a custom context provider. Ideally, you should

also implement the invoked() and thread_created() methods to track conversation

state, but these are optional.

DataContent

Represents binary data content with an associated media type (also known as a MIME type).

Important

This is for binary data that is represented as a data URI, not for online resources.

Use UriContent for online resources.

Initializes a DataContent instance.

Important

This is for binary data that is represented as a data URI, not for online resources.

Use UriContent for online resources.

Default

Runtime representation of the default branch in a switch-case group.

The default branch is invoked only when no other case predicates match. In practice it is guaranteed to exist so that routing never produces an empty target.

Edge

Model a directed, optionally-conditional hand-off between two executors.

Each Edge captures the minimal metadata required to move a message from one executor to another inside the workflow graph. It optionally embeds a boolean predicate that decides if the edge should be taken at runtime. By serialising the edge down to primitives we can reconstruct the topology of a workflow irrespective of the original Python process.

Initialize a fully-specified edge between two workflow executors.

EdgeDuplicationError

Exception raised when duplicate edges are detected in the workflow.

ErrorContent

Represents an error.

Remarks: Typically used for non-fatal errors, where something went wrong as part of the operation, but the operation was still able to continue.

Initializes an ErrorContent instance.

Executor

Base class for all workflow executors that process messages and perform computations.

Overview

Executors are the fundamental building blocks of workflows, representing individual processing units that receive messages, perform operations, and produce outputs. Each executor is uniquely identified and can handle specific message types through decorated handler methods.

Type System

Executors have a rich type system that defines their capabilities:

Input Types

The types of messages an executor can process, discovered from handler method signatures:


   class MyExecutor(Executor):
       @handler
       async def handle_string(self, message: str, ctx: WorkflowContext) -> None:
           # This executor can handle 'str' input types

Access via the input_types property.

Output Types

The types of messages an executor can send to other executors via ctx.send_message():


   class MyExecutor(Executor):
       @handler
       async def handle_data(self, message: str, ctx: WorkflowContext[int | bool]) -> None:
           # This executor can send 'int' or 'bool' messages

Access via the output_types property.

Workflow Output Types

The types of data an executor can emit as workflow-level outputs via ctx.yield_output():


   class MyExecutor(Executor):
       @handler
       async def process(self, message: str, ctx: WorkflowContext[int, str]) -> None:
           # Can send 'int' messages AND yield 'str' workflow outputs

Access via the workflow_output_types property.

Handler Discovery

Executors discover their capabilities through decorated methods:

@handler Decorator

Marks methods that process incoming messages:


   class MyExecutor(Executor):
       @handler
       async def handle_text(self, message: str, ctx: WorkflowContext[str]) -> None:
           await ctx.send_message(message.upper())

Sub-workflow Request Interception

Use @handler methods to intercept sub-workflow requests:


   class ParentExecutor(Executor):
       @handler
       async def handle_subworkflow_request(
           self,
           request: SubWorkflowRequestMessage,
           ctx: WorkflowContext[SubWorkflowResponseMessage],
       ) -> None:
           if self.is_allowed(request.domain):
               response = request.create_response(data=True)
               await ctx.send_message(response, target_id=request.executor_id)
           else:
               await ctx.request_info(request.source_event, response_type=request.source_event.response_type)

Context Types

Handler methods receive different WorkflowContext variants based on their type annotations:

WorkflowContext (no type parameters)

For handlers that only perform side effects without sending messages or yielding outputs:


   class LoggingExecutor(Executor):
       @handler
       async def log_message(self, msg: str, ctx: WorkflowContext) -> None:
           print(f"Received: {msg}")  # Only logging, no outputs

WorkflowContext[T_Out]

Enables sending messages of type T_Out via ctx.send_message():


   class ProcessorExecutor(Executor):
       @handler
       async def handler(self, msg: str, ctx: WorkflowContext[int]) -> None:
           await ctx.send_message(42)  # Can send int messages

WorkflowContext[T_Out, T_W_Out]

Enables both sending messages (T_Out) and yielding workflow outputs (T_W_Out):


   class DualOutputExecutor(Executor):
       @handler
       async def handler(self, msg: str, ctx: WorkflowContext[int, str]) -> None:
           await ctx.send_message(42)  # Send int message
           await ctx.yield_output("done")  # Yield str workflow output

Function Executors

Simple functions can be converted to executors using the @executor decorator:


   @executor
   async def process_text(text: str, ctx: WorkflowContext[str]) -> None:
       await ctx.send_message(text.upper())


   # Or with custom ID:
   @executor(id="text_processor")
   def sync_process(text: str, ctx: WorkflowContext[str]) -> None:
       ctx.send_message(text.lower())  # Sync functions run in thread pool

Sub-workflow Composition

Executors can contain sub-workflows using WorkflowExecutor. Sub-workflows can make requests that parent workflows can intercept. See WorkflowExecutor documentation for details on workflow composition patterns and request/response handling.

State Management

Executors can contain states that persist across workflow runs and checkpoints. Override the on_checkpoint_save and on_checkpoint_restore methods to implement custom state serialization and restoration logic.

Implementation Notes

  • Do not call execute() directly - it's invoked by the workflow engine
  • Do not override execute() - define handlers using decorators instead
  • Each executor must have at least one @handler method
  • Handler method signatures are validated at initialization time

Initialize the executor with a unique identifier.

ExecutorCompletedEvent

Event triggered when an executor handler is completed.

Initialize the executor event with an executor ID and optional data.

ExecutorEvent

Base class for executor events.

Initialize the executor event with an executor ID and optional data.

ExecutorFailedEvent

Event triggered when an executor handler raises an error.

ExecutorInvokedEvent

Event triggered when an executor handler is invoked.

Initialize the executor event with an executor ID and optional data.

FanInEdgeGroup

Represent a converging set of edges that feed a single downstream executor.

Fan-in groups are typically used when multiple upstream stages independently produce messages that should all arrive at the same downstream processor.

Build a fan-in mapping that merges several sources into one target.

FanOutEdgeGroup

Represent a broadcast-style edge group with optional selection logic.

A fan-out forwards a message produced by a single source executor to one or more downstream executors. At runtime we may further narrow the targets by executing a selection_func that inspects the payload and returns the subset of ids that should receive the message.

Create a fan-out mapping from a single source to many targets.

FileCheckpointStorage

File-based checkpoint storage for persistence.

Initialize the file storage.

FinishReason

Represents the reason a chat response completed.

Initialize FinishReason with a value.

FunctionApprovalRequestContent

Represents a request for user approval of a function call.

Initializes a FunctionApprovalRequestContent instance.

FunctionApprovalResponseContent

Represents a response for user approval of a function call.

Initializes a FunctionApprovalResponseContent instance.

FunctionCallContent

Represents a function call request.

Initializes a FunctionCallContent instance.

FunctionExecutor

Executor that wraps a user-defined function.

This executor allows users to define simple functions (both sync and async) and use them as workflow executors without needing to create full executor classes.

Synchronous functions are executed in a thread pool using asyncio.to_thread() to avoid blocking the event loop.

Initialize the FunctionExecutor with a user-defined function.

FunctionInvocationConfiguration

Configuration for function invocation in chat clients.

This class is created automatically on every chat client that supports function invocation. This means that for most cases you can just alter the attributes on the instance, rather then creating a new one.

Initialize FunctionInvocationConfiguration.

FunctionInvocationContext

Context object for function middleware invocations.

This context is passed through the function middleware pipeline and contains all information about the function invocation.

Initialize the FunctionInvocationContext.

FunctionMiddleware

Abstract base class for function middleware that can intercept function invocations.

Function middleware allows you to intercept and modify function/tool invocations before and after execution. You can validate arguments, cache results, log invocations, or override function execution.

Note

FunctionMiddleware is an abstract base class. You must subclass it and implement

the process() method to create custom function middleware.

FunctionResultContent

Represents the result of a function call.

Initializes a FunctionResultContent instance.

GraphConnectivityError

Exception raised when graph connectivity issues are detected.

GroupChatBuilder

High-level builder for manager-directed group chat workflows with dynamic orchestration.

GroupChat coordinates multi-agent conversations using a manager that selects which participant speaks next. The manager can be a simple Python function (set_select_speakers_func) or an agent-based selector via set_manager. These two approaches are mutually exclusive.

Core Workflow:

  1. Define participants: list of agents (uses their .name) or dict mapping names to agents

  2. Configure speaker selection: set_select_speakers_func OR

    set_manager (not both)

  3. Optional: set round limits, checkpointing, termination conditions

  4. Build and run the workflow

Speaker Selection Patterns:

Pattern 1: Simple function-based selection (recommended)


   from agent_framework import GroupChatBuilder, GroupChatStateSnapshot


   def select_next_speaker(state: GroupChatStateSnapshot) -> str | None:
       # state contains: task, participants, conversation, history, round_index
       if state["round_index"] >= 5:
           return None  # Finish
       last_speaker = state["history"][-1].speaker if state["history"] else None
       if last_speaker == "researcher":
           return "writer"
       return "researcher"


   workflow = (
       GroupChatBuilder()
       .set_select_speakers_func(select_next_speaker)
       .participants([researcher_agent, writer_agent])  # Uses agent.name
       .build()
   )

Pattern 2: LLM-based selection


   from agent_framework import ChatAgent
   from agent_framework.azure import AzureOpenAIChatClient

   manager_agent = AzureOpenAIChatClient().create_agent(
       instructions="Coordinate the conversation and pick the next speaker.",
       name="Coordinator",
       temperature=0.3,
       seed=42,
       max_tokens=500,
   )

   workflow = (
       GroupChatBuilder()
       .set_manager(manager_agent, display_name="Coordinator")
       .participants([researcher, writer])  # Or use dict: researcher=r, writer=w
       .with_max_rounds(10)
       .build()
   )

Pattern 3: Request info for mid-conversation feedback


   from agent_framework import GroupChatBuilder

   # Pause before all participants
   workflow = (
       GroupChatBuilder()
       .set_select_speakers_func(select_next_speaker)
       .participants([researcher, writer])
       .with_request_info()
       .build()
   )

   # Pause only before specific participants
   workflow = (
       GroupChatBuilder()
       .set_select_speakers_func(select_next_speaker)
       .participants([researcher, writer, editor])
       .with_request_info(agents=[editor])  # Only pause before editor responds
       .build()
   )

Participant Specification:

Two ways to specify participants:

  • List form: [agent1, agent2] - uses agent.name attribute for participant names
  • Dict form: {name1: agent1, name2: agent2} - explicit name control
  • Keyword form: participants(name1=agent1, name2=agent2) - explicit name control

State Snapshot Structure:

The GroupChatStateSnapshot passed to set_select_speakers_func contains:

  • task: ChatMessage - Original user task
  • participants: dict[str, str] - Mapping of participant names to descriptions
  • conversation: tuple[ChatMessage, ...] - Full conversation history
  • history: tuple[GroupChatTurn, ...] - Turn-by-turn record with speaker attribution
  • round_index: int - Number of manager selection rounds so far
  • pending_agent: str | None - Name of agent currently processing (if any)

Important Constraints:

Initialize the GroupChatBuilder.

GroupChatDirective

Instruction emitted by a group chat manager implementation.

HandoffBuilder

Fluent builder for conversational handoff workflows with coordinator and specialist agents.

The handoff pattern enables a coordinator agent to route requests to specialist agents. Interaction mode controls whether the workflow requests user input after each agent response or completes autonomously once agents finish responding. A termination condition determines when the workflow should stop requesting input and complete.

Routing Patterns:

Single-Tier (Default): Only the coordinator can hand off to specialists. By default, after any specialist responds, control returns to the user for more input. This creates a cyclical flow: user -> coordinator -> [optional specialist] -> user -> coordinator -> ... Use with_interaction_mode("autonomous") to skip requesting additional user input and yield the final conversation when an agent responds without delegating.

Multi-Tier (Advanced): Specialists can hand off to other specialists using .add_handoff(). This provides more flexibility for complex workflows but is less controllable than the single-tier pattern. Users lose real-time visibility into intermediate steps during specialist-to-specialist handoffs (though the full conversation history including all handoffs is preserved and can be inspected afterward).

Key Features:

  • Automatic handoff detection: The coordinator invokes a handoff tool whose

    arguments (for example {"handoff_to": "shipping_agent"}) identify the specialist to receive control.

  • Auto-generated tools: By default the builder synthesizes handoff_to_<agent> tools for the coordinator, so you don't manually define placeholder functions.

  • Full conversation history: The entire conversation (including any ChatMessage.additional_properties) is preserved and passed to each agent.

  • Termination control: By default, terminates after 10 user messages. Override with .with_termination_condition(lambda conv: ...) for custom logic (e.g., detect "goodbye").

  • Interaction modes: Choose human_in_loop (default) to prompt users between agent turns, or autonomous to continue routing back to agents without prompting for user input until a handoff occurs or a termination/turn limit is reached (default autonomous turn limit: 50).

  • Checkpointing: Optional persistence for resumable workflows.

Usage (Single-Tier):


   from agent_framework import HandoffBuilder
   from agent_framework.openai import OpenAIChatClient

   chat_client = OpenAIChatClient()

   # Create coordinator and specialist agents
   coordinator = chat_client.create_agent(
       instructions=(
           "You are a frontline support agent. Assess the user's issue and decide "
           "whether to hand off to 'refund_agent' or 'shipping_agent'. When delegation is "
           "required, call the matching handoff tool (for example `handoff_to_refund_agent`)."
       ),
       name="coordinator_agent",
   )

   refund = chat_client.create_agent(
       instructions="You handle refund requests. Ask for order details and process refunds.",
       name="refund_agent",
   )

   shipping = chat_client.create_agent(
       instructions="You resolve shipping issues. Track packages and update delivery status.",
       name="shipping_agent",
   )

   # Build the handoff workflow - default single-tier routing
   workflow = (
       HandoffBuilder(
           name="customer_support",
           participants=[coordinator, refund, shipping],
       )
       .set_coordinator(coordinator)
       .build()
   )

   # Run the workflow
   events = await workflow.run_stream("My package hasn't arrived yet")
   async for event in events:
       if isinstance(event, RequestInfoEvent):
           # Request user input
           user_response = input("You: ")
           await workflow.send_response(event.data.request_id, user_response)

Multi-Tier Routing with .add_handoff():


   # Enable specialist-to-specialist handoffs with fluent API
   workflow = (
       HandoffBuilder(participants=[coordinator, replacement, delivery, billing])
       .set_coordinator(coordinator)
       .add_handoff(coordinator, [replacement, delivery, billing])  # Coordinator routes to all
       .add_handoff(replacement, [delivery, billing])  # Replacement delegates to delivery/billing
       .add_handoff(delivery, billing)  # Delivery escalates to billing
       .build()
   )

   # Flow: User → Coordinator → Replacement → Delivery → Back to User
   # (Replacement hands off to Delivery without returning to user)

Use Participant Factories for State Isolation:

Custom Termination Condition:


   # Terminate when user says goodbye or after 5 exchanges
   workflow = (
       HandoffBuilder(participants=[coordinator, refund, shipping])
       .set_coordinator(coordinator)
       .with_termination_condition(
           lambda conv: (
               sum(1 for msg in conv if msg.role.value == "user") >= 5
               or any("goodbye" in msg.text.lower() for msg in conv[-2:])
           )
       )
       .build()
   )

Checkpointing:


   from agent_framework import InMemoryCheckpointStorage

   storage = InMemoryCheckpointStorage()
   workflow = (
       HandoffBuilder(participants=[coordinator, refund, shipping])
       .set_coordinator(coordinator)
       .with_checkpointing(storage)
       .build()
   )

Initialize a HandoffBuilder for creating conversational handoff workflows.

The builder starts in an unconfigured state and requires you to call:

  1. .participants([...]) - Register agents
  2. or .participant_factories({...}) - Register agent/executor factories
  3. .set_coordinator(...) - Designate which agent receives initial user input
  4. .build() - Construct the final Workflow

Optional configuration methods allow you to customize context management, termination logic, and persistence.

Note

Participants must have stable names/ids because the workflow maps the

handoff tool arguments to these identifiers. Agent names should match

the strings emitted by the coordinator's handoff tool (e.g., a tool that

outputs {"handoff_to": "billing"} requires an agent named billing).

HandoffUserInputRequest

Request message emitted when the workflow needs fresh user input.

Note: The conversation field is intentionally excluded from checkpoint serialization to prevent duplication. The conversation is preserved in the coordinator's state and will be reconstructed on restore. See issue #2667.

HostedCodeInterpreterTool

Represents a hosted tool that can be specified to an AI service to enable it to execute generated code.

This tool does not implement code interpretation itself. It serves as a marker to inform a service that it is allowed to execute generated code if the service is capable of doing so.

Initialize the HostedCodeInterpreterTool.

HostedFileContent

Represents a hosted file content.

Initializes a HostedFileContent instance.

HostedFileSearchTool

Represents a file search tool that can be specified to an AI service to enable it to perform file searches.

Initialize a FileSearchTool.

HostedMCPSpecificApproval

Represents the specific mode for a hosted tool.

When using this mode, the user must specify which tools always or never require approval. This is represented as a dictionary with two optional keys:

HostedMCPTool

Represents a MCP tool that is managed and executed by the service.

Create a hosted MCP tool.

HostedVectorStoreContent

Represents a hosted vector store content.

Initializes a HostedVectorStoreContent instance.

HostedWebSearchTool

Represents a web search tool that can be specified to an AI service to enable it to perform web searches.

Initialize a HostedWebSearchTool.

InMemoryCheckpointStorage

In-memory checkpoint storage for testing and development.

Initialize the memory storage.

InProcRunnerContext

In-process execution context for local execution and optional checkpointing.

Initialize the in-process execution context.

MCPStdioTool

MCP tool for connecting to stdio-based MCP servers.

This class connects to MCP servers that communicate via standard input/output, typically used for local processes.

Initialize the MCP stdio tool.

Note

The arguments are used to create a StdioServerParameters object,

which is then used to create a stdio client. See mcp.client.stdio.stdio_client

and mcp.client.stdio.stdio_server_parameters for more details.

MCPStreamableHTTPTool

MCP tool for connecting to HTTP-based MCP servers.

This class connects to MCP servers that communicate via streamable HTTP/SSE.

Initialize the MCP streamable HTTP tool.

Note

The arguments are used to create a streamable HTTP client.

See mcp.client.streamable_http.streamablehttp_client for more details.

Any extra arguments passed to the constructor will be passed to the

streamable HTTP client constructor.

MCPWebsocketTool

MCP tool for connecting to WebSocket-based MCP servers.

This class connects to MCP servers that communicate via WebSocket.

Initialize the MCP WebSocket tool.

Note

The arguments are used to create a WebSocket client.

See mcp.client.websocket.websocket_client for more details.

Any extra arguments passed to the constructor will be passed to the

WebSocket client constructor.

MagenticBuilder

Fluent builder for creating Magentic One multi-agent orchestration workflows.

Magentic One workflows use an LLM-powered manager to coordinate multiple agents through dynamic task planning, progress tracking, and adaptive replanning. The manager creates plans, selects agents, monitors progress, and determines when to replan or complete.

The builder provides a fluent API for configuring participants, the manager, optional plan review, checkpointing, and event callbacks.

Human-in-the-loop Support: Magentic provides specialized HITL mechanisms via:

  • .with_plan_review() - Review and approve/revise plans before execution

  • .with_human_input_on_stall() - Intervene when workflow stalls

  • Tool approval via FunctionApprovalRequestContent - Approve individual tool calls

These emit MagenticHumanInterventionRequest events that provide structured decision options (APPROVE, REVISE, CONTINUE, REPLAN, GUIDANCE) appropriate for Magentic's planning-based orchestration.

Usage:


   from agent_framework import MagenticBuilder, StandardMagenticManager
   from azure.ai.projects.aio import AIProjectClient

   # Create manager with LLM client
   project_client = AIProjectClient.from_connection_string(...)
   chat_client = project_client.inference.get_chat_completions_client()

   # Build Magentic workflow with agents
   workflow = (
       MagenticBuilder()
       .participants(researcher=research_agent, writer=writing_agent, coder=coding_agent)
       .with_standard_manager(chat_client=chat_client, max_round_count=20, max_stall_count=3)
       .with_plan_review(enable=True)
       .with_checkpointing(checkpoint_storage)
       .build()
   )

   # Execute workflow
   async for message in workflow.run("Research and write article about AI agents"):
       print(message.text)

With custom manager:


   # Create custom manager subclass
   class MyCustomManager(MagenticManagerBase):
       async def plan(self, context: MagenticContext) -> ChatMessage:
           # Custom planning logic
           ...


   manager = MyCustomManager()
   workflow = MagenticBuilder().participants(agent1=agent1, agent2=agent2).with_standard_manager(manager).build()
MagenticContext

Context for the Magentic manager.

MagenticManagerBase

Base class for the Magentic One manager.

ManagerDirectiveModel

Pydantic model for structured manager directive output.

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

ManagerSelectionRequest

Request sent to manager agent for next speaker selection.

This dataclass packages the full conversation state and task context for the manager agent to analyze and make a speaker selection decision.

ManagerSelectionResponse

Response from manager agent with speaker selection decision.

The manager agent must produce this structure (or compatible dict/JSON) to communicate its decision back to the orchestrator.

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

Message

A class representing a message in the workflow.

OrchestrationState

Unified state container for orchestrator checkpointing.

This dataclass standardizes checkpoint serialization across all three group chat patterns while allowing pattern-specific extensions via metadata.

Common attributes cover shared orchestration concerns (task, conversation, round tracking). Pattern-specific state goes in the metadata dict.

RequestInfoEvent

Event triggered when a workflow executor requests external information.

Initialize the request info event.

RequestInfoInterceptor

Internal executor that pauses workflow for human input before agent runs.

This executor is inserted into the workflow graph by builders when .with_request_info() is called. It intercepts AgentExecutorRequest messages BEFORE the agent runs and pauses the workflow via ctx.request_info() with an AgentInputRequest.

When a response is received, the response handler injects the input as a user message into the conversation and forwards the request to the agent.

The optional agent_filter parameter allows limiting which agents trigger the pause. If the target agent's ID is not in the filter set, the request is forwarded without pausing.

Initialize the request info interceptor executor.

Role

Describes the intended purpose of a message within a chat interaction.

Properties: SYSTEM: The role that instructs or sets the behavior of the AI system. USER: The role that provides user input for chat interactions. ASSISTANT: The role that provides responses to system-instructed, user-prompted input. TOOL: The role that provides additional information and references in response to tool use requests.

Initialize Role with a value.

Runner

A class to run a workflow in Pregel supersteps.

Initialize the runner with edges, shared state, and context.

RunnerContext

Protocol for the execution context used by the runner.

A single context that supports messaging, events, and optional checkpointing. If checkpoint storage is not configured, checkpoint methods may raise.

SequentialBuilder

High-level builder for sequential agent/executor workflows with shared context.

  • participants([...]) accepts a list of AgentProtocol (recommended) or Executor instances

  • register_participants([...]) accepts a list of factories for AgentProtocol (recommended)

    or Executor factories

  • Executors must define a handler that consumes list[ChatMessage] and sends out a list[ChatMessage]

  • The workflow wires participants in order, passing a list[ChatMessage] down the chain

  • Agents append their assistant messages to the conversation

  • Custom executors can transform/summarize and return a list[ChatMessage]

  • The final output is the conversation produced by the last participant

Usage:


   from agent_framework import SequentialBuilder

   # With agent instances
   workflow = SequentialBuilder().participants([agent1, agent2, summarizer_exec]).build()

   # With agent factories
   workflow = (
       SequentialBuilder().register_participants([create_agent1, create_agent2, create_summarizer_exec]).build()
   )

   # Enable checkpoint persistence
   workflow = SequentialBuilder().participants([agent1, agent2]).with_checkpointing(storage).build()

   # Enable request info for mid-workflow feedback (pauses before each agent)
   workflow = SequentialBuilder().participants([agent1, agent2]).with_request_info().build()

   # Enable request info only for specific agents
   workflow = (
       SequentialBuilder()
       .participants([agent1, agent2, agent3])
       .with_request_info(agents=[agent2])  # Only pause before agent2
       .build()
   )
SharedState

A class to manage shared state in a workflow.

SharedState provides thread-safe access to workflow state data that needs to be shared across executors during workflow execution.

Reserved Keys: The following keys are reserved for internal framework use and should not be modified by user code:

  • _executor_state: Stores executor state for checkpointing (managed by Runner)

Warning

Do not use keys starting with underscore (_) as they may be reserved for

internal framework operations.

Initialize the shared state.

SingleEdgeGroup

Convenience wrapper for a solitary edge, keeping the group API uniform.

Create a one-to-one edge group between two executors.

StandardMagenticManager

Standard Magentic manager that performs real LLM calls via a ChatAgent.

The manager constructs prompts that mirror the original Magentic One orchestration:

  • Facts gathering
  • Plan creation
  • Progress ledger in JSON
  • Facts update and plan update on reset
  • Final answer synthesis

Initialize the Standard Magentic Manager.

SubWorkflowRequestMessage

Message sent from a sub-workflow to an executor in the parent workflow to request information.

This message wraps a RequestInfoEvent emitted by the executor in the sub-workflow.

SubWorkflowResponseMessage

Message sent from a parent workflow to a sub-workflow via WorkflowExecutor to provide requested information.

This message wraps the response data along with the original RequestInfoEvent emitted by the sub-workflow executor.

SuperStepCompletedEvent

Event triggered when a superstep ends.

Initialize the superstep event.

SuperStepStartedEvent

Event triggered when a superstep starts.

Initialize the superstep event.

SwitchCaseEdgeGroup

Fan-out variant that mimics a traditional switch/case control flow.

Each case inspects the message payload and decides whether it should handle the message. Exactly one case-or the default branch-returns a target at runtime, preserving single-dispatch semantics.

Configure a switch/case routing structure for a single source executor.

SwitchCaseEdgeGroupCase

Persistable description of a single conditional branch in a switch-case.

Unlike the runtime Case object this serialisable variant stores only the target identifier and a descriptive name for the predicate. When the underlying callable is unavailable during deserialisation we substitute a proxy placeholder that fails loudly, ensuring the missing dependency is immediately visible.

Record the routing metadata for a conditional case branch.

SwitchCaseEdgeGroupDefault

Persistable descriptor for the fallback branch of a switch-case group.

The default branch is guaranteed to exist and is invoked when every other case predicate fails to match the payload.

Point the default branch toward the given executor identifier.

TextContent

Represents text content in a chat.

Initializes a TextContent instance.

TextReasoningContent

Represents text reasoning content in a chat.

Remarks: This class and TextContent are superficially similar, but distinct.

Initializes a TextReasoningContent instance.

TextSpanRegion

Represents a region of text that has been annotated.

Initialize TextSpanRegion.

ToolMode

Defines if and how tools are used in a chat request.

Initialize ToolMode.

ToolProtocol

Represents a generic tool.

This protocol defines the interface that all tools must implement to be compatible with the agent framework. It is implemented by various tool classes such as HostedMCPTool, HostedWebSearchTool, and AIFunction's. A AIFunction is usually created by the ai_function decorator.

Since each connector needs to parse tools differently, users can pass a dict to specify a service-specific tool when no abstraction is available.

TypeCompatibilityError

Exception raised when type incompatibility is detected between connected executors.

UriContent

Represents a URI content.

Important

This is used for content that is identified by a URI, such as an image or a file.

For (binary) data URIs, use DataContent instead.

Initializes a UriContent instance.

Remarks: This is used for content that is identified by a URI, such as an image or a file. For (binary) data URIs, use DataContent instead.

UsageContent

Represents usage information associated with a chat request and response.

Initializes a UsageContent instance.

UsageDetails

Provides usage details about a request/response.

Initializes the UsageDetails instance.

Workflow

A graph-based execution engine that orchestrates connected executors.

Overview

A workflow executes a directed graph of executors connected via edge groups using a Pregel-like model, running in supersteps until the graph becomes idle. Workflows are created using the WorkflowBuilder class - do not instantiate this class directly.

Execution Model

Executors run in synchronized supersteps where each executor:

  • Is invoked when it receives messages from connected edge groups
  • Can send messages to downstream executors via ctx.send_message()
  • Can yield workflow-level outputs via ctx.yield_output()
  • Can emit custom events via ctx.add_event()

Messages between executors are delivered at the end of each superstep and are not visible in the event stream. Only workflow-level events (outputs, custom events) and status events are observable to callers.

Input/Output Types

Workflow types are discovered at runtime by inspecting:

  • Input types: From the start executor's input types
  • Output types: Union of all executors' workflow output types Access these via the input_types and output_types properties.

Execution Methods

The workflow provides two primary execution APIs, each supporting multiple scenarios:

  • run(): Execute to completion, returns WorkflowRunResult with all events

  • run_stream(): Returns async generator yielding events as they occur

Both methods support:

  • Initial workflow runs: Provide message parameter
  • Checkpoint restoration: Provide checkpoint_id (and optionally checkpoint_storage)
  • HIL continuation: Provide responses to continue after RequestInfoExecutor requests
  • Runtime checkpointing: Provide checkpoint_storage to enable/override checkpointing for this run

State Management

Workflow instances contain states and states are preserved across calls to run and run_stream. To execute multiple independent runs, create separate Workflow instances via WorkflowBuilder.

External Input Requests

Executors within a workflow can request external input using ctx.request_info():

  1. Executor calls ctx.request_info() to request input
  2. Executor implements response_handler() to process the response
  3. Requests are emitted as RequestInfoEvent instances in the event stream
  4. Workflow enters IDLE_WITH_PENDING_REQUESTS state
  5. Caller handles requests and provides responses via the send_responses or send_responses_streaming methods
  6. Responses are routed to the requesting executors and response handlers are invoked

Checkpointing

Checkpointing can be configured at build time or runtime:

Build-time (via WorkflowBuilder): workflow = WorkflowBuilder().with_checkpointing(storage).build()

Runtime (via run/run_stream parameters): result = await workflow.run(message, checkpoint_storage=runtime_storage)

When enabled, checkpoints are created at the end of each superstep, capturing:

  • Executor states
  • Messages in transit
  • Shared state Workflows can be paused and resumed across process restarts using checkpoint storage.

Composition

Workflows can be nested using WorkflowExecutor, which wraps a child workflow as an executor. The nested workflow's input/output types become part of the WorkflowExecutor's types. When invoked, the WorkflowExecutor runs the nested workflow to completion and processes its outputs.

Initialize the workflow with a list of edges.

WorkflowAgent

An Agent subclass that wraps a workflow and exposes it as an agent.

Initialize the WorkflowAgent.

WorkflowBuilder

A builder class for constructing workflows.

This class provides a fluent API for defining workflow graphs by connecting executors with edges and configuring execution parameters. Call build to create an immutable Workflow instance.

Initialize the WorkflowBuilder with an empty list of edges and no starting executor.

WorkflowCheckpoint

Represents a complete checkpoint of workflow state.

Checkpoints capture the full execution state of a workflow at a specific point, enabling workflows to be paused and resumed.

Note

The shared_state dict may contain reserved keys managed by the framework.

See SharedState class documentation for details on reserved keys.

WorkflowCheckpointSummary

Human-readable summary of a workflow checkpoint.

WorkflowContext

Execution context that enables executors to interact with workflows and other executors.

Overview

WorkflowContext provides a controlled interface for executors to send messages, yield outputs, manage state, and interact with the broader workflow ecosystem. It enforces type safety through generic parameters while preventing direct access to internal runtime components.

Type Parameters

The context is parameterized to enforce type safety for different operations:

WorkflowContext (no parameters)

For executors that only perform side effects without sending messages or yielding outputs:


   async def log_handler(message: str, ctx: WorkflowContext) -> None:
       print(f"Received: {message}")  # Only side effects

WorkflowContext[T_Out]

Enables sending messages of type T_Out to other executors:


   async def processor(message: str, ctx: WorkflowContext[int]) -> None:
       result = len(message)
       await ctx.send_message(result)  # Send int to downstream executors

WorkflowContext[T_Out, T_W_Out]

Enables both sending messages (T_Out) and yielding workflow outputs (T_W_Out):


   async def dual_output(message: str, ctx: WorkflowContext[int, str]) -> None:
       await ctx.send_message(42)  # Send int message
       await ctx.yield_output("complete")  # Yield str workflow output

Union Types

Multiple types can be specified using union notation:


   async def flexible(message: str, ctx: WorkflowContext[int | str, bool | dict]) -> None:
       await ctx.send_message("text")  # or send 42
       await ctx.yield_output(True)  # or yield {"status": "done"}

Initialize the executor context with the given workflow context.

WorkflowErrorDetails

Structured error information to surface in error events/results.

WorkflowEvent

Base class for workflow events.

Initialize the workflow event with optional data.

WorkflowExecutor

An executor that wraps a workflow to enable hierarchical workflow composition.

Overview

WorkflowExecutor makes a workflow behave as a single executor within a parent workflow, enabling nested workflow architectures. It handles the complete lifecycle of sub-workflow execution including event processing, output forwarding, and request/response coordination between parent and child workflows.

Execution Model

When invoked, WorkflowExecutor:

  1. Starts the wrapped workflow with the input message
  2. Runs the sub-workflow to completion or until it needs external input
  3. Processes the sub-workflow's complete event stream after execution
  4. Forwards outputs to the parent workflow as messages
  5. Handles external requests by routing them to the parent workflow
  6. Accumulates responses and resumes sub-workflow execution

Event Stream Processing

WorkflowExecutor processes events after sub-workflow completion:

Output Forwarding

All outputs from the sub-workflow are automatically forwarded to the parent:

When allow_direct_output is False (default):


   # An executor in the sub-workflow yields outputs
   await ctx.yield_output("sub-workflow result")

   # WorkflowExecutor forwards to parent via ctx.send_message()
   # Parent receives the output as a regular message

When allow_direct_output is True:

Request/Response Coordination

When sub-workflows need external information:


   # An executor in the sub-workflow makes request
   request = MyDataRequest(query="user info")

   # WorkflowExecutor captures RequestInfoEvent and wraps it in a SubWorkflowRequestMessage
   # then send it to the receiving executor in parent workflow. The executor in parent workflow
   # can handle the request locally or forward it to an external source.
   # The WorkflowExecutor tracks the pending request, and implements a response handler.
   # When the response is received, it executes the response handler to accumulate responses
   # and resume the sub-workflow when all expected responses are received.
   # The response handler expects a SubWorkflowResponseMessage wrapping the response data.

State Management

WorkflowExecutor maintains execution state across request/response cycles:

  • Tracks pending requests by request_id
  • Accumulates responses until all expected responses are received
  • Resumes sub-workflow execution with complete response batch
  • Handles concurrent executions and multiple pending requests

Type System Integration

WorkflowExecutor inherits its type signature from the wrapped workflow:

Input Types

Matches the wrapped workflow's start executor input types:


   # If sub-workflow accepts str, WorkflowExecutor accepts str
   workflow_executor = WorkflowExecutor(my_workflow, id="wrapper")
   assert workflow_executor.input_types == my_workflow.input_types

Output Types

Combines sub-workflow outputs with request coordination types:


   # Includes all sub-workflow output types
   # Plus SubWorkflowRequestMessage if sub-workflow can make requests
   output_types = workflow.output_types + [SubWorkflowRequestMessage]  # if applicable

Error Handling

WorkflowExecutor propagates sub-workflow failures:

  • Captures WorkflowFailedEvent from sub-workflow
  • Converts to WorkflowErrorEvent in parent context
  • Provides detailed error information including sub-workflow ID

Concurrent Execution Support

WorkflowExecutor fully supports multiple concurrent sub-workflow executions:

Per-Execution State Isolation

Each sub-workflow invocation creates an isolated ExecutionContext:


   # Multiple concurrent invocations are supported
   workflow_executor = WorkflowExecutor(my_workflow, id="concurrent_executor")

   # Each invocation gets its own execution context
   # Execution 1: processes input_1 independently
   # Execution 2: processes input_2 independently
   # No state interference between executions

Request/Response Coordination

Responses are correctly routed to the originating execution:

  • Each execution tracks its own pending requests and expected responses
  • Request-to-execution mapping ensures responses reach the correct sub-workflow
  • Response accumulation is isolated per execution
  • Automatic cleanup when execution completes

Memory Management

  • Unlimited concurrent executions supported
  • Each execution has unique UUID-based identification
  • Cleanup of completed execution contexts
  • Thread-safe state management for concurrent access

Important Considerations

Shared Workflow Instance: All concurrent executions use the same underlying workflow instance. For proper isolation, ensure that the wrapped workflow and its executors are stateless.


   # Avoid: Stateful executor with instance variables
   class StatefulExecutor(Executor):
       def __init__(self):
           super().__init__(id="stateful")
           self.data = []  # This will be shared across concurrent executions!

Integration with Parent Workflows

Parent workflows can intercept sub-workflow requests:

Implementation Notes

  • Sub-workflows run to completion before processing their results
  • Event processing is atomic - all outputs are forwarded before requests
  • Response accumulation ensures sub-workflows receive complete response batches
  • Execution state is maintained for proper resumption after external requests
  • Concurrent executions are fully isolated and do not interfere with each other

Initialize the WorkflowExecutor.

WorkflowFailedEvent

Built-in lifecycle event emitted when a workflow run terminates with an error.

WorkflowOutputEvent

Event triggered when a workflow executor yields output.

Initialize the workflow output event.

WorkflowRunResult

Container for events generated during non-streaming workflow execution.

Overview

Represents the complete execution results of a workflow run, containing all events generated from start to idle state. Workflows produce outputs incrementally through ctx.yield_output() calls during execution.

Event Structure

Maintains separation between data-plane and control-plane events:

  • Data-plane events: Executor invocations, completions, outputs, and requests (in main list)
  • Control-plane events: Status timeline accessible via status_timeline() method

Key Methods

  • get_outputs(): Extract all workflow outputs from the execution
  • get_request_info_events(): Retrieve external input requests made during execution
  • get_final_state(): Get the final workflow state (IDLE, IDLE_WITH_PENDING_REQUESTS, etc.)
  • status_timeline(): Access the complete status event history
WorkflowStartedEvent

Built-in lifecycle event emitted when a workflow run begins.

Initialize the workflow event with optional data.

WorkflowStatusEvent

Built-in lifecycle event emitted for workflow run state transitions.

Initialize the workflow status event with a new state and optional data.

WorkflowValidationError

Base exception for workflow validation errors.

WorkflowViz

A class for visualizing workflows using graphviz and Mermaid.

Initialize the WorkflowViz with a workflow.

Enums

MagenticHumanInterventionDecision

Decision options for human intervention responses.

MagenticHumanInterventionKind

The kind of human intervention being requested.

ValidationTypeEnum

Enumeration of workflow validation types.

WorkflowEventSource

Identifies whether a workflow event came from the framework or an executor.

Use FRAMEWORK for events emitted by built-in orchestration paths—even when the code that raises them lives in runner-related modules—and EXECUTOR for events surfaced by developer-provided executor implementations.

WorkflowRunState

Run-level state of a workflow execution.

Semantics:

  • STARTED: Run has been initiated and the workflow context has been created. This is an initial state before any meaningful work is performed. In this codebase we emit a dedicated WorkflowStartedEvent for telemetry, and typically advance the status directly to IN_PROGRESS. Consumers may still rely on STARTED for state machines that need an explicit pre-work phase.

  • IN_PROGRESS: The workflow is actively executing (e.g., the initial message has been delivered to the start executor or a superstep is running). This status is emitted at the beginning of a run and can be followed by other statuses as the run progresses.

  • IN_PROGRESS_PENDING_REQUESTS: Active execution while one or more request-for-information operations are outstanding. New work may still be scheduled while requests are in flight.

  • IDLE: The workflow is quiescent with no outstanding requests and no more work to do. This is the normal terminal state for workflows that have finished executing, potentially having produced outputs along the way.

  • IDLE_WITH_PENDING_REQUESTS: The workflow is paused awaiting external input (e.g., emitted a RequestInfoEvent). This is a non-terminal state; the workflow can resume when responses are supplied.

  • FAILED: Terminal state indicating an error surfaced. Accompanied by a WorkflowFailedEvent with structured error details.

  • CANCELLED: Terminal state indicating the run was cancelled by a caller or orchestrator. Not currently emitted by default runner paths but included for integrators/orchestrators that support cancellation.

Functions

agent_middleware

Decorator to mark a function as agent middleware.

This decorator explicitly identifies a function as agent middleware, which processes AgentRunContext objects.

agent_middleware(func: Callable[[AgentRunContext, Callable[[AgentRunContext], Awaitable[None]]], Awaitable[None]]) -> Callable[[AgentRunContext, Callable[[AgentRunContext], Awaitable[None]]], Awaitable[None]]

Parameters

Name Description
func
Required

The middleware function to mark as agent middleware.

Returns

Type Description

The same function with agent middleware marker.

Examples


   from agent_framework import agent_middleware, AgentRunContext, ChatAgent


   @agent_middleware
   async def logging_middleware(context: AgentRunContext, next):
       print(f"Before: {context.agent.name}")
       await next(context)
       print(f"After: {context.result}")


   # Use with an agent
   agent = ChatAgent(chat_client=client, name="assistant", middleware=logging_middleware)

ai_function

Decorate a function to turn it into a AIFunction that can be passed to models and executed automatically.

This decorator creates a Pydantic model from the function's signature, which will be used to validate the arguments passed to the function and to generate the JSON schema for the function's parameters.

To add descriptions to parameters, use the Annotated type from typing with a string description as the second argument. You can also use Pydantic's Field class for more advanced configuration.

Note

When approval_mode is set to "always_require", the function will not be executed

until explicit approval is given, this only applies to the auto-invocation flow.

It is also important to note that if the model returns multiple function calls, some that require approval

and others that do not, it will ask approval for all of them.

ai_function(func: Callable[[...], ReturnT | Awaitable[ReturnT]] | None = None, *, name: str | None = None, description: str | None = None, approval_mode: Literal['always_require', 'never_require'] | None = None, max_invocations: int | None = None, max_invocation_exceptions: int | None = None, additional_properties: dict[str, Any] | None = None) -> AIFunction[Any, ReturnT] | Callable[[Callable[[...], ReturnT | Awaitable[ReturnT]]], AIFunction[Any, ReturnT]]

Parameters

Name Description
func
Callable[[...], <xref:agent_framework._tools.ReturnT> | Awaitable[<xref:agent_framework._tools.ReturnT>]] | None

The function to decorate.

Default value: None
name
Required
str | None
description
Required
str | None
approval_mode
Required
Literal['always_require', 'never_require'] | None
max_invocations
Required
int | None
max_invocation_exceptions
Required
int | None
additional_properties
Required

Keyword-Only Parameters

Name Description
name

The name of the function. If not provided, the function's __name__ attribute will be used.

Default value: None
description

A description of the function. If not provided, the function's docstring will be used.

Default value: None
approval_mode

Whether or not approval is required to run this tool. Default is that approval is not needed.

Default value: None
max_invocations

The maximum number of times this function can be invoked. If None, there is no limit, should be at least 1.

Default value: None
max_invocation_exceptions

The maximum number of exceptions allowed during invocations. If None, there is no limit, should be at least 1.

Default value: None
additional_properties

Additional properties to set on the function.

Default value: None

Returns

Type Description
AIFunction[Any, <xref:agent_framework._tools.ReturnT>] | Callable[[Callable[[…], <xref:agent_framework._tools.ReturnT> | Awaitable[<xref:agent_framework._tools.ReturnT>]]], AIFunction[Any, <xref:agent_framework._tools.ReturnT>]]

Examples


   from agent_framework import ai_function
   from typing import Annotated


   @ai_function
   def ai_function_example(
       arg1: Annotated[str, "The first argument"],
       arg2: Annotated[int, "The second argument"],
   ) -> str:
       # An example function that takes two arguments and returns a string.
       return f"arg1: {arg1}, arg2: {arg2}"


   # the same function but with approval required to run
   @ai_function(approval_mode="always_require")
   def ai_function_example(
       arg1: Annotated[str, "The first argument"],
       arg2: Annotated[int, "The second argument"],
   ) -> str:
       # An example function that takes two arguments and returns a string.
       return f"arg1: {arg1}, arg2: {arg2}"


   # With custom name and description
   @ai_function(name="custom_weather", description="Custom weather function")
   def another_weather_func(location: str) -> str:
       return f"Weather in {location}"


   # Async functions are also supported
   @ai_function
   async def async_get_weather(location: str) -> str:
       '''Get weather asynchronously.'''
       # Simulate async operation
       return f"Weather in {location}"

chat_middleware

Decorator to mark a function as chat middleware.

This decorator explicitly identifies a function as chat middleware, which processes ChatContext objects.

chat_middleware(func: Callable[[ChatContext, Callable[[ChatContext], Awaitable[None]]], Awaitable[None]]) -> Callable[[ChatContext, Callable[[ChatContext], Awaitable[None]]], Awaitable[None]]

Parameters

Name Description
func
Required

The middleware function to mark as chat middleware.

Returns

Type Description

The same function with chat middleware marker.

Examples


   from agent_framework import chat_middleware, ChatContext, ChatAgent


   @chat_middleware
   async def logging_middleware(context: ChatContext, next):
       print(f"Messages: {len(context.messages)}")
       await next(context)
       print(f"Response: {context.result}")


   # Use with an agent
   agent = ChatAgent(chat_client=client, name="assistant", middleware=logging_middleware)

create_edge_runner

Factory function to create the appropriate edge runner for an edge group.

create_edge_runner(edge_group: EdgeGroup, executors: dict[str, Executor]) -> EdgeRunner

Parameters

Name Description
edge_group
Required
<xref:agent_framework._workflows._edge.EdgeGroup>

The edge group to create a runner for.

executors
Required

Map of executor IDs to executor instances.

Returns

Type Description
<xref:agent_framework._workflows._edge_runner.EdgeRunner>

The appropriate EdgeRunner instance.

executor

Decorator that converts a standalone function into a FunctionExecutor instance.

The @executor decorator is designed for standalone module-level functions only. For class-based executors, use the Executor base class with @handler on instance methods.

Supports both synchronous and asynchronous functions. Synchronous functions are executed in a thread pool to avoid blocking the event loop.

Important

Use @executor for standalone functions (module-level or local functions)

Do NOT use @executor with staticmethod or classmethod

For class-based executors, subclass Executor and use @handler on instance methods

Usage:


   # Standalone async function (RECOMMENDED):
   @executor(id="upper_case")
   async def to_upper(text: str, ctx: WorkflowContext[str]):
       await ctx.send_message(text.upper())


   # Standalone sync function (runs in thread pool):
   @executor
   def process_data(data: str):
       return data.upper()


   # For class-based executors, use @handler instead:
   class MyExecutor(Executor):
       def __init__(self):
           super().__init__(id="my_executor")

       @handler
       async def process(self, data: str, ctx: WorkflowContext[str]):
           await ctx.send_message(data.upper())
executor(func: Callable[[...], Any] | None = None, *, id: str | None = None) -> Callable[[Callable[[...], Any]], FunctionExecutor] | FunctionExecutor

Parameters

Name Description
func
Callable[[...], Any] | None

The function to decorate (when used without parentheses)

Default value: None
id
Required
str | None

Optional custom ID for the executor. If None, uses the function name.

Keyword-Only Parameters

Name Description
id
Default value: None

Returns

Type Description

A FunctionExecutor instance that can be wired into a Workflow.

Exceptions

Type Description

If used with staticmethod or classmethod (unsupported pattern)

function_middleware

Decorator to mark a function as function middleware.

This decorator explicitly identifies a function as function middleware, which processes FunctionInvocationContext objects.

function_middleware(func: Callable[[FunctionInvocationContext, Callable[[FunctionInvocationContext], Awaitable[None]]], Awaitable[None]]) -> Callable[[FunctionInvocationContext, Callable[[FunctionInvocationContext], Awaitable[None]]], Awaitable[None]]

Parameters

Name Description
func
Required

The middleware function to mark as function middleware.

Returns

Type Description

The same function with function middleware marker.

Examples


   from agent_framework import function_middleware, FunctionInvocationContext, ChatAgent


   @function_middleware
   async def logging_middleware(context: FunctionInvocationContext, next):
       print(f"Calling: {context.function.name}")
       await next(context)
       print(f"Result: {context.result}")


   # Use with an agent
   agent = ChatAgent(chat_client=client, name="assistant", middleware=logging_middleware)

get_checkpoint_summary

get_checkpoint_summary(checkpoint: WorkflowCheckpoint) -> WorkflowCheckpointSummary

Parameters

Name Description
checkpoint
Required

Returns

Type Description

get_logger

Get a logger with the specified name, defaulting to 'agent_framework'.

get_logger(name: str = 'agent_framework') -> Logger

Parameters

Name Description
name
str

The name of the logger. Defaults to 'agent_framework'.

Default value: "agent_framework"

Returns

Type Description

The configured logger instance.

handler

Decorator to register a handler for an executor.

handler(func: Callable[[ExecutorT, Any, ContextT], Awaitable[Any]]) -> Callable[[ExecutorT, Any, ContextT], Awaitable[Any]]

Parameters

Name Description
func
Required
Callable[[<xref:agent_framework._workflows._executor.ExecutorT>, Any, <xref:agent_framework._workflows._executor.ContextT>], Awaitable[Any]]

The function to decorate. Can be None when used without parameters.

Returns

Type Description
Callable[[<xref:agent_framework._workflows._executor.ExecutorT>, Any, <xref:agent_framework._workflows._executor.ContextT>], Awaitable[Any]]

The decorated function with handler metadata.

Examples

@handler async def handle_string(self, message: str, ctx: WorkflowContext[str]) -> None:

...

@handler async def handle_data(self, message: dict, ctx: WorkflowContext[str | int]) -> None:

...

prepare_function_call_results

Prepare the values of the function call results.

prepare_function_call_results(content: TextContent | DataContent | TextReasoningContent | UriContent | FunctionCallContent | FunctionResultContent | ErrorContent | UsageContent | HostedFileContent | HostedVectorStoreContent | FunctionApprovalRequestContent | FunctionApprovalResponseContent | Any | list[TextContent | DataContent | TextReasoningContent | UriContent | FunctionCallContent | FunctionResultContent | ErrorContent | UsageContent | HostedFileContent | HostedVectorStoreContent | FunctionApprovalRequestContent | FunctionApprovalResponseContent | Any]) -> str

Parameters

Name Description
content
Required

Returns

Type Description
str

prepend_agent_framework_to_user_agent

Prepend "agent-framework" to the User-Agent in the headers.

When user agent telemetry is disabled through the AGENT_FRAMEWORK_USER_AGENT_DISABLED environment variable, the User-Agent header will not include the agent-framework information. It will be sent back as is, or as an empty dict when None is passed.

prepend_agent_framework_to_user_agent(headers: dict[str, Any] | None = None) -> dict[str, Any]

Parameters

Name Description
headers

The existing headers dictionary.

Default value: None

Returns

Type Description

A new dict with "User-Agent" set to "agent-framework-python/{version}" if headers is None. The modified headers dictionary with "agent-framework-python/{version}" prepended to the User-Agent.

Examples


   from agent_framework import prepend_agent_framework_to_user_agent

   # Add agent-framework to new headers
   headers = prepend_agent_framework_to_user_agent()
   print(headers["User-Agent"])  # "agent-framework-python/0.1.0"

   # Prepend to existing headers
   existing = {"User-Agent": "my-app/1.0"}
   headers = prepend_agent_framework_to_user_agent(existing)
   print(headers["User-Agent"])  # "agent-framework-python/0.1.0 my-app/1.0"

response_handler

Decorator to register a handler to handle responses for a request.

response_handler(func: Callable[[ExecutorT, Any, Any, ContextT], Awaitable[None]]) -> Callable[[ExecutorT, Any, Any, ContextT], Awaitable[None]]

Parameters

Name Description
func
Required
Callable[[<xref:agent_framework._workflows._request_info_mixin.ExecutorT>, Any, Any, <xref:agent_framework._workflows._request_info_mixin.ContextT>], Awaitable[None]]

The function to decorate.

Returns

Type Description
Callable[[<xref:agent_framework._workflows._request_info_mixin.ExecutorT>, Any, Any, <xref:agent_framework._workflows._request_info_mixin.ContextT>], Awaitable[None]]

The decorated function with handler metadata.

Examples


   @handler
   async def run(self, message: int, context: WorkflowContext[str]) -> None:
       # Example of a handler that sends a request
       ...
       # Send a request with a `CustomRequest` payload and expect a `str` response.
       await context.request_info(CustomRequest(...), str)


   @response_handler
   async def handle_response(
       self,
       original_request: CustomRequest,
       response: str,
       context: WorkflowContext[str],
   ) -> None:
       # Example of a response handler for the above request
       ...


   @response_handler
   async def handle_response(
       self,
       original_request: CustomRequest,
       response: dict,
       context: WorkflowContext[int],
   ) -> None:
       # Example of a response handler for a request expecting a dict response
       ...

setup_logging

Setup the logging configuration for the agent framework.

setup_logging() -> None

Returns

Type Description

use_agent_middleware

Class decorator that adds middleware support to an agent class.

This decorator adds middleware functionality to any agent class. It wraps the run() and run_stream() methods to provide middleware execution.

The middleware execution can be terminated at any point by setting the context.terminate property to True. Once set, the pipeline will stop executing further middleware as soon as control returns to the pipeline.

Note

This decorator is already applied to built-in agent classes. You only need to use

it if you're creating custom agent implementations.

use_agent_middleware(agent_class: type[TAgent]) -> type[TAgent]

Parameters

Name Description
agent_class
Required
type[<xref:TAgent>]

The agent class to add middleware support to.

Returns

Type Description
type[~<xref:TAgent>]

The modified agent class with middleware support.

Examples


   from agent_framework import use_agent_middleware


   @use_agent_middleware
   class CustomAgent:
       async def run(self, messages, **kwargs):
           # Agent implementation
           pass

       async def run_stream(self, messages, **kwargs):
           # Streaming implementation
           pass

use_chat_middleware

Class decorator that adds middleware support to a chat client class.

This decorator adds middleware functionality to any chat client class. It wraps the get_response() and get_streaming_response() methods to provide middleware execution.

Note

This decorator is already applied to built-in chat client classes. You only need to use

it if you're creating custom chat client implementations.

use_chat_middleware(chat_client_class: type[TChatClient]) -> type[TChatClient]

Parameters

Name Description
chat_client_class
Required
type[<xref:TChatClient>]

The chat client class to add middleware support to.

Returns

Type Description
type[~<xref:TChatClient>]

The modified chat client class with middleware support.

Examples


   from agent_framework import use_chat_middleware


   @use_chat_middleware
   class CustomChatClient:
       async def get_response(self, messages, **kwargs):
           # Chat client implementation
           pass

       async def get_streaming_response(self, messages, **kwargs):
           # Streaming implementation
           pass

use_function_invocation

Class decorator that enables tool calling for a chat client.

This decorator wraps the get_response and get_streaming_response methods to automatically handle function calls from the model, execute them, and return the results back to the model for further processing.

use_function_invocation(chat_client: type[TChatClient]) -> type[TChatClient]

Parameters

Name Description
chat_client
Required
type[<xref:TChatClient>]

The chat client class to decorate.

Returns

Type Description
type[~<xref:TChatClient>]

The decorated chat client class with function invocation enabled.

Exceptions

Type Description

If the chat client does not have the required methods.

Examples


   from agent_framework import use_function_invocation, BaseChatClient


   @use_function_invocation
   class MyCustomClient(BaseChatClient):
       async def get_response(self, messages, **kwargs):
           # Implementation here
           pass

       async def get_streaming_response(self, messages, **kwargs):
           # Implementation here
           pass


   # The client now automatically handles function calls
   client = MyCustomClient()

validate_workflow_graph

Convenience function to validate a workflow graph.

validate_workflow_graph(edge_groups: Sequence[EdgeGroup], executors: dict[str, Executor], start_executor: Executor) -> None

Parameters

Name Description
edge_groups
Required
Sequence[<xref:agent_framework._workflows._edge.EdgeGroup>]

list of edge groups in the workflow

executors
Required

Map of executor IDs to executor instances

start_executor
Required

The starting executor (can be instance or ID)

Returns

Type Description

Exceptions

Type Description

If any validation fails