通过


agent_framework 包

a2a
ag_ui
anthropic
azure
chatkit
declarative
devui
lab
mem0
microsoft
ollama
openai
redis

模块

exceptions
observability

AIFunction

包装 Python 函数以使其可由 AI 模型调用的工具。

此类包装 Python 函数,使其可由 AI 模型使用自动参数验证和 JSON 架构生成来调用。

初始化 AIFunction。

AgentExecutor

用于包装代理以处理消息的内置执行程序。

AgentExecutor 根据工作流执行模式调整其行为:

  • run_stream(): 当代理生成令牌时发出增量 AgentRunUpdateEvent 事件
  • run(): 发出包含完整响应的单个 AgentRunEvent

执行程序通过 WorkflowContext.is_streaming() 自动检测模式。

使用唯一标识符初始化执行程序。

AgentExecutorRequest

对代理执行程序的请求。

AgentExecutorResponse

代理执行程序的响应。

AgentInputRequest

在高级生成器工作流中运行代理之前请求人工输入。

当工作流在执行代理之前暂停时,通过 RequestInfoEvent 发出。 响应作为用户消息注入到会话中,以引导代理的行为。

这是在 SequentialBuilder、ConcurrentBuilder、GroupChatBuilder 和 HandoffBuilder 上使用 .with_request_info() 使用的标准请求类型。

AgentMiddleware

可以截获代理调用的代理中间件的抽象基类。

代理中间件允许在执行前后截获和修改代理调用。 可以检查消息、修改上下文、替代结果或提前终止执行。

注释

AgentMiddleware 是一个抽象基类。 必须对其进行子类化并实现

用于创建自定义代理中间件的 process() 方法。

AgentProtocol

可以调用的代理的协议。

此协议定义所有代理必须实现的接口,包括标识的属性和执行方法。

注释

协议使用结构子类型(鸭子键入)。 类不需要

如果显式继承自此协议,则被视为兼容。

这样,无需使用,即可创建完全自定义的代理

任何 Agent Framework 基类。

AgentRunContext

代理中间件调用的上下文对象。

此上下文通过代理中间件管道传递,并包含有关代理调用的所有信息。

初始化 AgentRunContext。

AgentRunEvent

代理运行完成后触发的事件。

初始化代理运行事件。

AgentRunResponse

表示对代理运行请求的响应。

提供有关响应的一个或多个响应消息和元数据。 典型的响应将包含单个消息,但在涉及函数调用、RAG 检索或复杂逻辑的方案中可能包含多个消息。

初始化 AgentRunResponse。

AgentRunResponseUpdate

表示代理中的单个流式处理响应区块。

初始化 AgentRunResponseUpdate。

AgentRunUpdateEvent

代理流式处理消息时触发的事件。

初始化代理流式处理事件。

AgentThread

代理线程类,它可以表示本地托管线程或由服务管理的线程。

维护 AgentThread 代理交互的会话状态和消息历史记录。 它可以使用服务管理的线程(通过 service_thread_id)或本地消息存储(通过 message_store),但不能同时使用两者。

初始化 AgentThread,请勿手动使用此方法,始终使用: agent.get_new_thread()

注释

可以设置service_thread_id或message_store,但不能同时设置两者。

AggregateContextProvider

包含多个上下文提供程序的 ContextProvider。

它将事件委托给多个上下文提供程序,并在返回之前聚合这些事件的响应。 这样,就可以将多个上下文提供程序合并到单个提供程序中。

注释

传递单个上下文时,会自动创建 AggregateContextProvider

提供程序或代理构造函数的上下文提供程序序列。

使用上下文提供程序初始化 AggregateContextProvider。

BaseAgent

所有 Agent Framework 代理的基类。

此类为代理实现提供核心功能,包括上下文提供程序、中间件支持和线程管理。

注释

无法直接实例化 BaseAgent,因为它不实现

run()、run_stream()和 AgentProtocol 所需的其他方法。

使用 ChatAgent 等具体实现或创建子类。

初始化 BaseAgent 实例。

BaseAnnotation

所有 AI 批注类型的基类。

初始化 BaseAnnotation。

BaseChatClient

聊天客户端的基类。

此抽象基类为聊天客户端实现提供核心功能,包括中间件支持、消息准备和工具规范化。

注释

不能将 BaseChatClient 直接实例化为抽象基类。

子类必须实现_inner_get_response()和_inner_get_streaming_response()。

初始化 BaseChatClient 实例。

BaseContent

表示 AI 服务使用的内容。

初始化 BaseContent。

Case

运行时包装器将开关大小写谓词与其目标组合在一起。

每个 Case 将布尔谓词与执行程序耦合,该执行器应在谓词的计算结果为 True 时处理消息。 运行时将此轻型容器与可序列化 的 SwitchCaseEdgeGroupCase 分开,以便执行可以使用实时可调用对象运行,而不会污染持久状态。

ChatAgent

聊天客户端代理。

这是使用聊天客户端与语言模型交互的主要代理实现。 它支持工具、上下文提供程序、中间件以及流式处理和非流式处理响应。

初始化 ChatAgent 实例。

注释

使用从frequency_penalty到request_kwargs的参数集

调用聊天客户端。 还可以将它们传递给这两个运行方法。

设置两者时,传递给运行方法的优先方法。

ChatClientProtocol

可以生成响应的聊天客户端的协议。

此协议定义所有聊天客户端必须实现的接口,包括生成流式处理和非流式处理响应的方法。

注释

协议使用结构子类型(鸭子键入)。 类不需要

如果显式继承自此协议,则被视为兼容。

ChatContext

聊天中间件调用的上下文对象。

此上下文通过聊天中间件管道传递,其中包含有关聊天请求的所有信息。

初始化 ChatContext。

ChatMessage

表示聊天消息。

初始化 ChatMessage。

ChatMessageStore

ChatMessageStoreProtocol 的内存中实现,用于将消息存储在列表中。

此实现为聊天消息提供了一个简单的基于列表的存储,支持序列化和反序列化。 它实现协议的所有必需方法 ChatMessageStoreProtocol

该存储在内存中维护消息,并提供用于序列化和反序列化状态的方法,以实现持久性目的。

创建用于线程的 ChatMessageStore。

ChatMessageStoreProtocol

定义用于存储和检索与特定线程关联的聊天消息的方法。

此协议的实现负责管理聊天消息的存储,包括在必要时截断或汇总消息来处理大量数据。

ChatMiddleware

可以截获聊天客户端请求的聊天中间件的抽象基类。

聊天中间件允许在执行前后截获和修改聊天客户端请求。 可以修改消息、添加系统提示、记录请求或替代聊天响应。

注释

ChatMiddleware 是一个抽象基类。 必须对其进行子类化并实现

用于创建自定义聊天中间件的 process() 方法。

ChatOptions

AI 服务的常见请求设置。

初始化 ChatOptions。

ChatResponse

表示对聊天请求的响应。

使用提供的参数初始化 ChatResponse。

ChatResponseUpdate

表示 ChatClient 中的单个流式处理响应区块。

使用提供的参数初始化 ChatResponseUpdate。

CheckpointStorage

检查点存储后端的协议。

CitationAnnotation

表示引文批注。

初始化 CitationAnnotation。

ConcurrentBuilder

并发代理工作流的高级生成器。

  • 参与者(...) 接受 AgentProtocol(推荐)或执行程序的列表。

  • register_participants(...) 接受 AgentProtocol 的工厂列表(建议)

    或执行程序工厂

  • build() 线路:调度程序 -> 扇出 -> 参与者 -> 扇入 -> 聚合器。

  • with_aggregator(...) 使用执行程序或回调替代默认聚合器。

  • register_aggregator(...) 接受执行程序的工厂作为自定义聚合器。

用法:


   from agent_framework import ConcurrentBuilder

   # Minimal: use default aggregator (returns list[ChatMessage])
   workflow = ConcurrentBuilder().participants([agent1, agent2, agent3]).build()

   # With agent factories
   workflow = ConcurrentBuilder().register_participants([create_agent1, create_agent2, create_agent3]).build()


   # Custom aggregator via callback (sync or async). The callback receives
   # list[AgentExecutorResponse] and its return value becomes the workflow's output.
   def summarize(results: list[AgentExecutorResponse]) -> str:
       return " | ".join(r.agent_run_response.messages[-1].text for r in results)


   workflow = ConcurrentBuilder().participants([agent1, agent2, agent3]).with_aggregator(summarize).build()


   # Custom aggregator via a factory
   class MyAggregator(Executor):
       @handler
       async def aggregate(self, results: list[AgentExecutorResponse], ctx: WorkflowContext[Never, str]) -> None:
           await ctx.yield_output(" | ".join(r.agent_run_response.messages[-1].text for r in results))


   workflow = (
       ConcurrentBuilder()
       .register_participants([create_agent1, create_agent2, create_agent3])
       .register_aggregator(lambda: MyAggregator(id="my_aggregator"))
       .build()
   )


   # Enable checkpoint persistence so runs can resume
   workflow = ConcurrentBuilder().participants([agent1, agent2, agent3]).with_checkpointing(storage).build()

   # Enable request info before aggregation
   workflow = ConcurrentBuilder().participants([agent1, agent2]).with_request_info().build()
Context

包含应由 ContextProvider 提供的 AI 模型提供的任何上下文的类。

每个 ContextProvider 都能够为每个调用提供自己的上下文。 Context 类包含 ContextProvider 提供的其他上下文。 在传递给 AI 模型之前,此上下文将与其他提供程序提供的上下文组合在一起。 此上下文是按调用的,不会存储为聊天历史记录的一部分。

创建新的 Context 对象。

ContextProvider

所有上下文提供程序的基类。

上下文提供程序是可用于增强 AI 上下文管理的组件。 它可以侦听对话中的更改,并在调用之前为 AI 模型提供其他上下文。

注释

ContextProvider 是一个抽象基类。 必须对其进行子类化并实现

用于创建自定义上下文提供程序的调用方法。 理想情况下,应

还实现用于跟踪聊天的已调用和 thread_created() 方法

状态,但这些是可选的。

DataContent

表示具有关联媒体类型的二进制数据内容(也称为 MIME 类型)。

重要

这适用于表示为数据 URI 的二进制数据,不适用于联机资源。

将 UriContent 用于联机资源。

初始化 DataContent 实例。

重要

这适用于表示为数据 URI 的二进制数据,不适用于联机资源。

将 UriContent 用于联机资源。

Default

开关大小写组中默认分支的运行时表示形式。

仅当没有其他事例谓词匹配时,才会调用默认分支。 实际上,它保证存在,以便路由永远不会生成空目标。

Edge

在两个执行程序之间为定向的有条件的交割建模。

每个 Edge 捕获将消息从一个执行程序移动到工作流图中的另一个执行程序所需的最小元数据。 它(可选)嵌入一个布尔谓词,该谓词决定是否应在运行时获取边缘。 通过将边缘序列化为基元,无论原始 Python 进程如何,都可以重新构造工作流的拓扑。

初始化两个工作流执行程序之间的完全指定的边缘。

EdgeDuplicationError

在工作流中检测到重复边缘时引发异常。

ErrorContent

表示错误。

备注:通常用于非致命错误,作过程中出现问题,但作仍能够继续。

初始化 ErrorContent 实例。

Executor

处理消息和执行计算的所有工作流执行程序的基类。

概述

执行程序是工作流的基本构建基块,表示接收消息、执行作和生成输出的各个处理单元。 每个执行程序都是唯一标识的,可以通过修饰的处理程序方法处理特定消息类型。

类型系统

执行程序具有定义其功能的丰富类型系统:

输入类型

执行程序可以处理的消息类型,从处理程序方法签名中发现:


   class MyExecutor(Executor):
       @handler
       async def handle_string(self, message: str, ctx: WorkflowContext) -> None:
           # This executor can handle 'str' input types

通过 input_types 属性进行访问。

输出类型

执行程序可以通过 ctx.send_message()发送到其他执行程序的消息类型:


   class MyExecutor(Executor):
       @handler
       async def handle_data(self, message: str, ctx: WorkflowContext[int | bool]) -> None:
           # This executor can send 'int' or 'bool' messages

通过 output_types 属性进行访问。

工作流输出类型

执行程序可以通过 ctx.yield_output()作为工作流级输出发出的数据类型:


   class MyExecutor(Executor):
       @handler
       async def process(self, message: str, ctx: WorkflowContext[int, str]) -> None:
           # Can send 'int' messages AND yield 'str' workflow outputs

通过 workflow_output_types 属性进行访问。

处理程序发现

执行程序通过修饰的方法发现其功能:

@handler 装饰

标记处理传入消息的方法:


   class MyExecutor(Executor):
       @handler
       async def handle_text(self, message: str, ctx: WorkflowContext[str]) -> None:
           await ctx.send_message(message.upper())

子工作流请求截获

使用 @handler 方法截获子工作流请求:


   class ParentExecutor(Executor):
       @handler
       async def handle_subworkflow_request(
           self,
           request: SubWorkflowRequestMessage,
           ctx: WorkflowContext[SubWorkflowResponseMessage],
       ) -> None:
           if self.is_allowed(request.domain):
               response = request.create_response(data=True)
               await ctx.send_message(response, target_id=request.executor_id)
           else:
               await ctx.request_info(request.source_event, response_type=request.source_event.response_type)

上下文类型

处理程序方法根据其类型注释接收不同的 WorkflowContext 变体:

WorkflowContext (无类型参数)

对于仅执行副作用且不发送消息或生成输出的处理程序:


   class LoggingExecutor(Executor):
       @handler
       async def log_message(self, msg: str, ctx: WorkflowContext) -> None:
           print(f"Received: {msg}")  # Only logging, no outputs

WorkflowContext[T_Out]

允许通过 ctx.send_message()发送类型为T_Out的消息:


   class ProcessorExecutor(Executor):
       @handler
       async def handler(self, msg: str, ctx: WorkflowContext[int]) -> None:
           await ctx.send_message(42)  # Can send int messages

WorkflowContext[T_Out, T_W_Out]

启用发送消息(T_Out)和生成工作流输出(T_W_Out):


   class DualOutputExecutor(Executor):
       @handler
       async def handler(self, msg: str, ctx: WorkflowContext[int, str]) -> None:
           await ctx.send_message(42)  # Send int message
           await ctx.yield_output("done")  # Yield str workflow output

函数执行程序

可以使用 @executor 修饰器将简单函数转换为执行程序:


   @executor
   async def process_text(text: str, ctx: WorkflowContext[str]) -> None:
       await ctx.send_message(text.upper())


   # Or with custom ID:
   @executor(id="text_processor")
   def sync_process(text: str, ctx: WorkflowContext[str]) -> None:
       ctx.send_message(text.lower())  # Sync functions run in thread pool

子工作流组合

执行程序可以使用 WorkflowExecutor 包含子工作流。 子工作流可以发出父工作流可以截获的请求。 有关工作流组合模式和请求/响应处理的详细信息,请参阅 WorkflowExecutor 文档。

状态管理

执行程序可以包含跨工作流运行和检查点保留的状态。 重写 on_checkpoint_saveon_checkpoint_restore 方法来实现自定义状态序列化和还原逻辑。

实现说明

  • 不要直接调用 execute() - 它由工作流引擎调用
  • 不要替代 execute() - 改用修饰器定义处理程序
  • 每个执行程序必须至少有一 个@handler 方法
  • 在初始化时验证处理程序方法签名

使用唯一标识符初始化执行程序。

ExecutorCompletedEvent

执行程序处理程序完成时触发的事件。

使用执行程序 ID 和可选数据初始化执行程序事件。

ExecutorEvent

执行程序事件的基类。

使用执行程序 ID 和可选数据初始化执行程序事件。

ExecutorFailedEvent

执行程序处理程序引发错误时触发的事件。

ExecutorInvokedEvent

调用执行程序处理程序时触发的事件。

使用执行程序 ID 和可选数据初始化执行程序事件。

FanInEdgeGroup

表示向单个下游执行程序馈送的聚合边缘集。

当多个上游阶段独立生成应全部到达同一下游处理器的消息时,通常会使用扇入组。

生成将多个源合并到一个目标的扇入映射。

FanOutEdgeGroup

表示具有可选选择逻辑的广播样式边缘组。

扇出将单个源执行程序生成的消息转发到一个或多个下游执行程序。 在运行时,我们可以通过执行检查有效负载并返回应接收消息的 ID 子集 selection_func 进一步缩小目标范围。

创建从单个源到多个目标的扇出映射。

FileCheckpointStorage

用于持久性的基于文件的检查点存储。

初始化文件存储。

FinishReason

表示聊天响应完成的原因。

使用值初始化 FinishReason。

FunctionApprovalRequestContent

表示对函数调用的用户批准请求。

初始化 FunctionApprovalRequestContent 实例。

FunctionApprovalResponseContent

表示用户批准函数调用的响应。

初始化 FunctionApprovalResponseContent 实例。

FunctionCallContent

表示函数调用请求。

初始化 FunctionCallContent 实例。

FunctionExecutor

包装用户定义函数的执行程序。

此执行程序允许用户定义简单的函数(同步和异步),并将它们用作工作流执行程序,而无需创建完整的执行程序类。

使用 asyncio.to_thread() 在线程池中执行同步函数,以避免阻止事件循环。

使用用户定义的函数初始化 FunctionExecutor。

FunctionInvocationConfiguration

聊天客户端中函数调用的配置。

此类在支持函数调用的每个聊天客户端上自动创建。 这意味着在大多数情况下,只需更改实例上的属性,然后创建一个新属性。

初始化 FunctionInvocationConfiguration。

FunctionInvocationContext

函数中间件调用的上下文对象。

此上下文通过函数中间件管道传递,并包含有关函数调用的所有信息。

初始化 FunctionInvocationContext。

FunctionMiddleware

可以截获函数调用的函数中间件的抽象基类。

函数中间件允许在执行前后截获和修改函数/工具调用。 可以验证参数、缓存结果、日志调用或重写函数执行。

注释

FunctionMiddleware 是一个抽象基类。 必须对其进行子类化并实现

用于创建自定义函数中间件的 process() 方法。

FunctionResultContent

表示函数调用的结果。

初始化 FunctionResultContent 实例。

GraphConnectivityError

检测到图形连接问题时引发的异常。

GroupChatBuilder

使用动态业务流程实现经理定向的群组聊天工作流的高级生成器。

GroupChat 使用选择下一个参与者发言的经理协调多代理对话。 管理器可以是简单的 Python 函数(set_select_speakers_func)或基于代理的选择器。set_manager 这两种方法是相互排斥的。

核心工作流:

  1. 定义参与者:代理列表(使用其 .name)或听写将名称映射到代理

  2. 配置扬声器选择: set_select_speakers_func

    set_manager (不是两者)

  3. 可选:设置轮次限制、检查点、终止条件

  4. 生成并运行工作流

说话人选择模式:

模式 1:基于函数的简单选择(建议)


   from agent_framework import GroupChatBuilder, GroupChatStateSnapshot


   def select_next_speaker(state: GroupChatStateSnapshot) -> str | None:
       # state contains: task, participants, conversation, history, round_index
       if state["round_index"] >= 5:
           return None  # Finish
       last_speaker = state["history"][-1].speaker if state["history"] else None
       if last_speaker == "researcher":
           return "writer"
       return "researcher"


   workflow = (
       GroupChatBuilder()
       .set_select_speakers_func(select_next_speaker)
       .participants([researcher_agent, writer_agent])  # Uses agent.name
       .build()
   )

模式 2:基于 LLM 的选择


   from agent_framework import ChatAgent
   from agent_framework.azure import AzureOpenAIChatClient

   manager_agent = AzureOpenAIChatClient().create_agent(
       instructions="Coordinate the conversation and pick the next speaker.",
       name="Coordinator",
       temperature=0.3,
       seed=42,
       max_tokens=500,
   )

   workflow = (
       GroupChatBuilder()
       .set_manager(manager_agent, display_name="Coordinator")
       .participants([researcher, writer])  # Or use dict: researcher=r, writer=w
       .with_max_rounds(10)
       .build()
   )

模式 3:请求中对话反馈信息


   from agent_framework import GroupChatBuilder

   # Pause before all participants
   workflow = (
       GroupChatBuilder()
       .set_select_speakers_func(select_next_speaker)
       .participants([researcher, writer])
       .with_request_info()
       .build()
   )

   # Pause only before specific participants
   workflow = (
       GroupChatBuilder()
       .set_select_speakers_func(select_next_speaker)
       .participants([researcher, writer, editor])
       .with_request_info(agents=[editor])  # Only pause before editor responds
       .build()
   )

参与者规范:

指定参与者的两种方法:

  • 列表表单: [agent1,agent2] - 对参与者名称使用 agent.name 属性
  • 听写形式: {name1: agent1, name2: agent2} - 显式名称控件
  • 关键字表单: 参与者(name1=agent1,name2=agent2) - 显式名称控件

状态快照结构:

传递给set_select_speakers_func的 GroupChatStateSnapshot 包含:

  • 任务:ChatMessage - 原始用户任务
  • 参与者:dict[str, str] - 参与者名称与说明的映射
  • conversation: tuple[ChatMessage, ...] - 完整对话历史记录
  • history: tuple[GroupChatTurn, ...] - 带说话人归属的轮次记录
  • round_index:int - 到目前为止经理选择轮数
  • pending_agent:str |无 - 当前正在处理的代理的名称(如果有)

重要约束:

初始化 GroupChatBuilder。

GroupChatDirective

群聊管理器实现发出的指令。

HandoffBuilder

Fluent builder for conversational handoff workflows with coordinator and specialist agents.

切换模式使协调器代理能够将请求路由到专家代理。 交互模式控制工作流是在每次代理响应后请求用户输入,还是在代理完成响应后自动完成。 终止条件确定工作流何时应停止请求输入并完成。

路由模式:

Single-Tier(默认): 只有协调员才能交给专家。 默认情况下,在任何专家做出响应后,控件将返回给用户以获取更多输入。 这将创建循环流:用户 -> 协调器 -> [可选专家] -> 用户 -> 协调器 -> ...使用 with_interaction_mode(“自治”) 跳过请求其他用户输入,并在代理不委派的情况下做出响应时生成最终会话。

多层(高级): 专家可以使用 .add_handoff()交给其他专家。 这为复杂的工作流提供了更大的灵活性,但比单层模式更可控。 用户会在专家到专家交接期间实时了解中间步骤(尽管包括所有交接的完整对话历史记录都保留,之后可以检查)。

关键功能:

  • 自动交接检测:协调器调用其移交工具

    参数(例如 {“handoff_to”: “shipping_agent”}) 标识接受控制的专家。

  • 自动生成的工具:默认情况下,生成器会合成协调器 handoff_to_<代理> 工具,因此你不会手动定义占位符函数。

  • 完整对话历史记录:将保留整个会话(包括任何 ChatMessage.additional_properties),并将其传递给每个代理。

  • 终止控制:默认情况下,在 10 条用户消息之后终止。 使用 .with_termination_condition(lambda conv: ...) 替代自定义逻辑(例如,检测“再见”)。

  • 交互模式:选择 human_in_loop (默认)以在代理轮次之间提示用户,或者 自主 地继续路由回代理,而无需提示用户输入,直到发生交接或达到终止/轮次限制(默认自治轮次限制:50)。

  • 检查点:可恢复工作流的可选持久性。

用法(Single-Tier):


   from agent_framework import HandoffBuilder
   from agent_framework.openai import OpenAIChatClient

   chat_client = OpenAIChatClient()

   # Create coordinator and specialist agents
   coordinator = chat_client.create_agent(
       instructions=(
           "You are a frontline support agent. Assess the user's issue and decide "
           "whether to hand off to 'refund_agent' or 'shipping_agent'. When delegation is "
           "required, call the matching handoff tool (for example `handoff_to_refund_agent`)."
       ),
       name="coordinator_agent",
   )

   refund = chat_client.create_agent(
       instructions="You handle refund requests. Ask for order details and process refunds.",
       name="refund_agent",
   )

   shipping = chat_client.create_agent(
       instructions="You resolve shipping issues. Track packages and update delivery status.",
       name="shipping_agent",
   )

   # Build the handoff workflow - default single-tier routing
   workflow = (
       HandoffBuilder(
           name="customer_support",
           participants=[coordinator, refund, shipping],
       )
       .set_coordinator(coordinator)
       .build()
   )

   # Run the workflow
   events = await workflow.run_stream("My package hasn't arrived yet")
   async for event in events:
       if isinstance(event, RequestInfoEvent):
           # Request user input
           user_response = input("You: ")
           await workflow.send_response(event.data.request_id, user_response)

具有.add_handoff的多层路由():


   # Enable specialist-to-specialist handoffs with fluent API
   workflow = (
       HandoffBuilder(participants=[coordinator, replacement, delivery, billing])
       .set_coordinator(coordinator)
       .add_handoff(coordinator, [replacement, delivery, billing])  # Coordinator routes to all
       .add_handoff(replacement, [delivery, billing])  # Replacement delegates to delivery/billing
       .add_handoff(delivery, billing)  # Delivery escalates to billing
       .build()
   )

   # Flow: User → Coordinator → Replacement → Delivery → Back to User
   # (Replacement hands off to Delivery without returning to user)

使用参与者工厂进行状态隔离:

自定义终止条件:


   # Terminate when user says goodbye or after 5 exchanges
   workflow = (
       HandoffBuilder(participants=[coordinator, refund, shipping])
       .set_coordinator(coordinator)
       .with_termination_condition(
           lambda conv: (
               sum(1 for msg in conv if msg.role.value == "user") >= 5
               or any("goodbye" in msg.text.lower() for msg in conv[-2:])
           )
       )
       .build()
   )

检查点:


   from agent_framework import InMemoryCheckpointStorage

   storage = InMemoryCheckpointStorage()
   workflow = (
       HandoffBuilder(participants=[coordinator, refund, shipping])
       .set_coordinator(coordinator)
       .with_checkpointing(storage)
       .build()
   )

初始化 HandoffBuilder 以创建对话式交接工作流。

生成器以未配置状态启动,并要求你调用:

  1. .participants(...) - 注册代理
  2. .participant_factories({...}) - 注册代理/执行程序工厂
  3. .set_coordinator(...) - 指定哪个代理接收初始用户输入
  4. .build() - 构造最终工作流

可选的配置方法允许自定义上下文管理、终止逻辑和持久性。

注释

参与者必须具有稳定的名称/ID,因为工作流会映射

向这些标识符移交工具参数。 代理名称应匹配

协调器移交工具发出的字符串(例如,一个工具)

outputs {“handoff_to”: “billing”} 需要名为 billing 的代理。

HandoffUserInputRequest

当工作流需要新的用户输入时发出的请求消息。

注意:将会话字段有意从检查点序列化中排除,以防止重复。 对话保留在协调器的状态中,将在还原时重新构造。 请参阅问题 #2667。

HostedCodeInterpreterTool

表示可指定给 AI 服务的托管工具,使它能够执行生成的代码。

此工具不实现代码解释本身。 它充当标记,以通知服务,如果服务能够执行此作,则允许它执行生成的代码。

初始化 HostedCodeInterpreterTool。

HostedFileContent

表示托管文件内容。

初始化 HostedFileContent 实例。

HostedFileSearchTool

表示可指定给 AI 服务的文件搜索工具,使它能够执行文件搜索。

初始化 FileSearchTool。

HostedMCPSpecificApproval

表示托管工具的特定模式。

使用此模式时,用户必须始终或永远不需要审批哪些工具。 这表示为具有两个可选键的字典:

HostedMCPTool

表示由服务管理和执行的 MCP 工具。

创建托管 MCP 工具。

HostedVectorStoreContent

表示托管矢量存储内容。

初始化 HostedVectorStoreContent 实例。

HostedWebSearchTool

表示可指定给 AI 服务的 Web 搜索工具,使它能够执行 Web 搜索。

初始化 HostedWebSearchTool。

InMemoryCheckpointStorage

用于测试和开发的内存中检查点存储。

初始化内存存储。

InProcRunnerContext

用于本地执行和可选检查点的进程内执行上下文。

初始化进程内执行上下文。

MCPStdioTool

用于连接到基于 stdio 的 MCP 服务器的 MCP 工具。

此类连接到通过标准输入/输出进行通信的 MCP 服务器,通常用于本地进程。

初始化 MCP stdio 工具。

注释

参数用于创建 StdioServerParameters 对象,

然后,用于创建 stdio 客户端。 请参阅mcp.client.stdio.stdio_client

和mcp.client.stdio.stdio_server_parameters以获取更多详细信息。

MCPStreamableHTTPTool

用于连接到基于 HTTP 的 MCP 服务器的 MCP 工具。

此类连接到通过可流 HTTP/SSE 通信的 MCP 服务器。

初始化 MCP 可流式传输 HTTP 工具。

注释

参数用于创建可流式传输的 HTTP 客户端。

有关更多详细信息,请参阅 mcp.client.streamable_http.streamablehttp_client。

传递给构造函数的任何额外参数都将传递给

可流式传输的 HTTP 客户端构造函数。

MCPWebsocketTool

用于连接到基于 WebSocket 的 MCP 服务器的 MCP 工具。

此类连接到通过 WebSocket 通信的 MCP 服务器。

初始化 MCP WebSocket 工具。

注释

参数用于创建 WebSocket 客户端。

有关详细信息,请参阅mcp.client.websocket.websocket_client。

传递给构造函数的任何额外参数都将传递给

WebSocket 客户端构造函数。

MagenticBuilder

用于创建 Magentic One 多代理业务流程工作流的 Fluent 生成器。

Magentic One 工作流使用 LLM 驱动的管理器通过动态任务规划、进度跟踪和自适应重新规划协调多个代理。 经理创建计划、选择代理、监视进度并确定何时重新计划或完成。

生成器提供了一个 Fluent API,用于配置参与者、经理、可选计划评审、检查点和事件回调。

人机循环支持:Magentic 通过以下方法提供专用的 HITL 机制:

  • .with_plan_review() - 在执行前查看和批准/修订计划

  • .with_human_input_on_stall() - 工作流停止时进行干预

  • 通过 FunctionApprovalRequestContent 进行工具审批 - 批准单个工具调用

这些事件发出 MagenticHumanInterventionRequest 事件,这些事件提供适用于 Magentic 基于规划的业务流程的结构化决策选项(批准、修订、继续、重新计划、指导)。

用法:


   from agent_framework import MagenticBuilder, StandardMagenticManager
   from azure.ai.projects.aio import AIProjectClient

   # Create manager with LLM client
   project_client = AIProjectClient.from_connection_string(...)
   chat_client = project_client.inference.get_chat_completions_client()

   # Build Magentic workflow with agents
   workflow = (
       MagenticBuilder()
       .participants(researcher=research_agent, writer=writing_agent, coder=coding_agent)
       .with_standard_manager(chat_client=chat_client, max_round_count=20, max_stall_count=3)
       .with_plan_review(enable=True)
       .with_checkpointing(checkpoint_storage)
       .build()
   )

   # Execute workflow
   async for message in workflow.run("Research and write article about AI agents"):
       print(message.text)

使用自定义管理器:


   # Create custom manager subclass
   class MyCustomManager(MagenticManagerBase):
       async def plan(self, context: MagenticContext) -> ChatMessage:
           # Custom planning logic
           ...


   manager = MyCustomManager()
   workflow = MagenticBuilder().participants(agent1=agent1, agent2=agent2).with_standard_manager(manager).build()
MagenticContext

Magentic 管理器的上下文。

MagenticManagerBase

Magentic One 管理器的基类。

ManagerDirectiveModel

结构化管理器指令输出的 Pydantic 模型。

通过分析和验证关键字参数中的输入数据来创建新模型。

引发 [ValidationError][pydantic_core。如果无法验证输入数据以形成有效的模型,则 ValidationError]。

self 是显式位置仅允许 自己 作为字段名称。

ManagerSelectionRequest

请求发送到经理代理进行下一次演讲者选择。

此数据类打包管理器代理的完整聊天状态和任务上下文,以便分析和做出演讲者选择决策。

ManagerSelectionResponse

使用说话人选择决策从经理代理做出响应。

管理器代理必须生成此结构(或兼容的听写/JSON),才能将其决策传回业务流程协调程序。

通过分析和验证关键字参数中的输入数据来创建新模型。

引发 [ValidationError][pydantic_core。如果无法验证输入数据以形成有效的模型,则 ValidationError]。

self 是显式位置仅允许 自己 作为字段名称。

Message

表示工作流中的消息的类。

OrchestrationState

用于业务流程协调程序检查点的统一状态容器。

此数据类标准化所有三种群组聊天模式的检查点序列化,同时允许通过元数据进行模式特定的扩展。

常见属性涵盖共享业务流程问题(任务、对话、轮询跟踪)。 模式特定的状态进入元数据听写。

RequestInfoEvent

工作流执行程序请求外部信息时触发的事件。

初始化请求信息事件。

RequestInfoInterceptor

在代理运行之前暂停人工输入工作流的内部执行程序。

调用 .with_request_info() 时,生成器会将此执行程序插入到工作流图中。 它会在代理运行之前截获 AgentExecutorRequest 消息,并使用 AgentInputRequest 通过 ctx.request_info() 暂停工作流。

收到响应时,响应处理程序会将输入作为用户消息注入到会话中,并将请求转发到代理。

可选 agent_filter 参数允许限制哪些代理触发暂停。 如果目标代理的 ID 不在筛选器集中,则会在不暂停的情况下转发请求。

初始化请求信息侦听器执行程序。

Role

描述聊天交互中消息的预期用途。

属性:SYSTEM:指示或设置 AI 系统行为的角色。 USER:为聊天交互提供用户输入的角色。 助手:提供对系统指示的用户提示输入的响应的角色。 工具:提供其他信息和引用以响应工具使用请求的角色。

使用值初始化角色。

Runner

在 Pregel 超级步骤中运行工作流的类。

使用边缘、共享状态和上下文初始化运行程序。

RunnerContext

运行程序使用的执行上下文的协议。

支持消息传送、事件和可选检查点的单个上下文。 如果未配置检查点存储,则可能会引发检查点方法。

SequentialBuilder

具有共享上下文的顺序代理/执行程序工作流的高级生成器。

  • 参与者(...) 接受 AgentProtocol(推荐)或执行程序实例的列表

  • register_participants(...) 接受 AgentProtocol 的工厂列表(建议)

    或执行程序工厂

  • 执行程序必须定义使用 list[ChatMessage] 并发送列表[ChatMessage] 的处理程序

  • 工作流按顺序连接参与者,将列表[ChatMessage] 向下传递链

  • 代理将助手消息追加到对话

  • 自定义执行程序可以转换/汇总并返回列表[ChatMessage]

  • 最终输出是最后一个参与者生成的对话

用法:


   from agent_framework import SequentialBuilder

   # With agent instances
   workflow = SequentialBuilder().participants([agent1, agent2, summarizer_exec]).build()

   # With agent factories
   workflow = (
       SequentialBuilder().register_participants([create_agent1, create_agent2, create_summarizer_exec]).build()
   )

   # Enable checkpoint persistence
   workflow = SequentialBuilder().participants([agent1, agent2]).with_checkpointing(storage).build()

   # Enable request info for mid-workflow feedback (pauses before each agent)
   workflow = SequentialBuilder().participants([agent1, agent2]).with_request_info().build()

   # Enable request info only for specific agents
   workflow = (
       SequentialBuilder()
       .participants([agent1, agent2, agent3])
       .with_request_info(agents=[agent2])  # Only pause before agent2
       .build()
   )
SharedState

用于管理工作流中共享状态的类。

SharedState 提供对工作流状态数据(需要在工作流执行过程中跨执行程序共享)的线程安全访问。

保留密钥:以下密钥保留供内部框架使用,不应由用户代码修改:

  • _executor_state:存储检查点的执行程序状态(由运行者管理)

警告

不要使用以下划线开头的键(_),因为它们可能保留

内部框架作。

初始化共享状态。

SingleEdgeGroup

单独边缘的便利包装器,使组 API 保持一致。

在两个执行程序之间创建一对一边缘组。

StandardMagenticManager

标准 Magentic 管理器,通过 ChatAgent 执行真正的 LLM 调用。

管理器构造反映原始 Magentic One 业务流程的提示:

  • 事实收集
  • 计划创建
  • JSON 中的进度账本
  • 重置时的事实更新和计划更新
  • 最终答案合成

初始化标准 Magentic Manager。

SubWorkflowRequestMessage

从子工作流发送到父工作流中的执行程序以请求信息的消息。

此消息包装子工作流中执行程序发出的 RequestInfoEvent。

SubWorkflowResponseMessage

通过 WorkflowExecutor 将父工作流发送到子工作流的消息,以提供请求的信息。

此消息包装响应数据以及子工作流执行程序发出的原始 RequestInfoEvent。

SuperStepCompletedEvent

在超级步骤结束时触发的事件。

初始化 superstep 事件。

SuperStepStartedEvent

在超级步骤启动时触发的事件。

初始化 superstep 事件。

SwitchCaseEdgeGroup

模拟传统开关/案例控制流的扇出变体。

每个事例都会检查消息有效负载,并确定它是否应对消息进行处理。 恰好一个事例或默认分支在运行时返回目标,保留单调度语义。

为单个源执行程序配置交换机/大小写路由结构。

SwitchCaseEdgeGroupCase

切换案例中单个条件分支的可持久说明。

与运行时 Case 对象不同,此可序列化变体仅存储谓词的目标标识符和描述性名称。 当基础可调用对象在反序列化期间不可用时,我们替换了一个大声失败的代理占位符,确保缺少的依赖项立即可见。

记录条件事例分支的路由元数据。

SwitchCaseEdgeGroupDefault

开关大小写组回退分支的可持久描述符。

默认分支可以保证存在,并且当其他每个事例谓词都不匹配有效负载时调用。

将默认分支指向给定的执行程序标识符。

TextContent

表示聊天中的文本内容。

初始化 TextContent 实例。

TextReasoningContent

表示聊天中文本推理内容。

备注:此类和 TextContent 在表面上相似,但不同。

初始化 TextReasoningContent 实例。

TextSpanRegion

表示已批注的文本区域。

初始化 TextSpanRegion。

ToolMode

定义聊天请求中是否以及如何使用工具。

初始化 ToolMode。

ToolProtocol

表示通用工具。

此协议定义所有工具必须实现的接口才能与代理框架兼容。 它由各种工具类实现,例如 HostedMCPTool、HostedWebSearchTool 和 AIFunction。 AIFunction 通常由 ai_function 修饰器创建。

由于每个连接器都需要以不同的方式分析工具,因此用户可以传递一个听写来指定服务特定的工具(如果没有抽象)。

TypeCompatibilityError

在连接执行程序之间检测到类型不兼容时引发异常。

UriContent

表示 URI 内容。

重要

这用于 URI 标识的内容,例如图像或文件。

对于(二进制)数据 URI,请改用 DataContent。

初始化 UriContent 实例。

备注:这用于 URI 标识的内容,例如图像或文件。 对于(二进制)数据 URI,请改用 DataContent

UsageContent

表示与聊天请求和响应关联的使用情况信息。

初始化 UsageContent 实例。

UsageDetails

提供有关请求/响应的使用情况详细信息。

初始化 UsageDetails 实例。

Workflow

一个基于图形的执行引擎,用于协调连接的执行程序。

概述

工作流使用类似于 Pregel 的模型执行通过边缘组连接的定向执行程序图,在超级步骤中运行,直到图形处于空闲状态。 使用 WorkflowBuilder 类创建工作流 - 不直接实例化此类。

执行模型

执行程序在同步的超级步骤中运行,其中每个执行程序:

  • 从连接的边缘组接收消息时调用
  • 可以通过 ctx.send_message() 将消息发送到下游执行程序
  • 可以通过 ctx.yield_output() 生成工作流级输出
  • 可以通过 ctx.add_event() 发出自定义事件

执行程序之间的消息在每个超级步骤结束时传递,事件流中不可见。 只有工作流级事件(输出、自定义事件)和状态事件才能被调用方观察到。

输入/输出类型

通过检查在运行时发现工作流类型:

  • 输入类型:从开始执行程序的输入类型
  • 输出类型:所有执行器的工作流输出类型的联合通过input_types和output_types属性访问这些类型。

执行方法

工作流提供两个主要执行 API,每个 API 支持多个方案:

  • run():执行到完成,返回包含所有事件的 WorkflowRunResult

  • run_stream():在发生事件时返回异步生成器生成事件

这两种方法都支持:

  • 初始工作流运行:提供 消息 参数
  • 检查点还原:提供 checkpoint_id (可选 )checkpoint_storage
  • HIL 延续:在 RequestInfoExecutor 请求后提供继续的响应
  • 运行时检查点:提供 checkpoint_storage 来为此运行启用/替代检查点

状态管理

工作流实例包含状态和状态, 在调用中 保留运行和 run_stream。 若要执行多个独立运行,请通过 WorkflowBuilder 创建单独的工作流实例。

外部输入请求

工作流中的执行程序可以使用 ctx.request_info()请求外部输入:

  1. 执行程序调用 ctx.request_info() 请求输入
  2. 执行程序实现 response_handler() 来处理响应
  3. 请求作为事件流中的 RequestInfoEvent 实例发出
  4. 工作流进入IDLE_WITH_PENDING_REQUESTS状态
  5. 调用方通过 send_responsessend_responses_streaming 方法处理请求并提供响应
  6. 响应将路由到请求的执行程序,并调用响应处理程序

检查点

可以在生成时或运行时配置检查点:

生成时(通过 WorkflowBuilder):工作流 = WorkflowBuilder().with_checkpointing(storage)。build()

运行时(通过 run/run_stream 参数):result = await workflow.run(message, checkpoint_storage=runtime_storage)

启用后,会在每个超级步骤结束时创建检查点,捕获:

  • 执行程序状态
  • 传输中的消息
  • 可以使用检查点存储在进程重启过程中暂停和恢复共享状态工作流。

组成

可以使用 WorkflowExecutor 嵌套工作流,该工作流将子工作流包装为执行程序。 嵌套工作流的输入/输出类型将成为 WorkflowExecutor 类型的一部分。 调用时,WorkflowExecutor 运行嵌套工作流以完成并处理其输出。

使用边缘列表初始化工作流。

WorkflowAgent

包装工作流并将其公开为代理的 代理 子类。

初始化 WorkflowAgent。

WorkflowBuilder

用于构造工作流的生成器类。

此类通过连接执行程序与边缘和配置执行参数来定义工作流图形的 Fluent API。 调用 build 以创建不可变 Workflow 实例。

使用空的边缘列表初始化 WorkflowBuilder,而无需启动执行程序。

WorkflowCheckpoint

表示工作流状态的完整检查点。

检查点捕获特定点工作流的完整执行状态,使工作流能够暂停和恢复。

注释

shared_state听写可能包含由框架管理的保留密钥。

有关保留密钥的详细信息,请参阅 SharedState 类文档。

WorkflowCheckpointSummary

工作流检查点的人工可读摘要。

WorkflowContext

执行器能够与工作流和其他执行程序交互的执行上下文。

概述

WorkflowContext 为执行程序提供了一个受控接口,用于发送消息、生成输出、管理状态以及与更广泛的工作流生态系统交互。 它通过泛型参数强制实施类型安全性,同时防止直接访问内部运行时组件。

类型参数

上下文已参数化,以对不同的作强制实施类型安全性:

WorkflowContext (无参数)

对于仅在不发送消息或生成输出的情况下执行副作用的执行程序:


   async def log_handler(message: str, ctx: WorkflowContext) -> None:
       print(f"Received: {message}")  # Only side effects

WorkflowContext[T_Out]

允许将类型T_Out的消息发送到其他执行程序:


   async def processor(message: str, ctx: WorkflowContext[int]) -> None:
       result = len(message)
       await ctx.send_message(result)  # Send int to downstream executors

WorkflowContext[T_Out, T_W_Out]

启用发送消息(T_Out)和生成工作流输出(T_W_Out):


   async def dual_output(message: str, ctx: WorkflowContext[int, str]) -> None:
       await ctx.send_message(42)  # Send int message
       await ctx.yield_output("complete")  # Yield str workflow output

联合类型

可以使用联合表示法指定多个类型:


   async def flexible(message: str, ctx: WorkflowContext[int | str, bool | dict]) -> None:
       await ctx.send_message("text")  # or send 42
       await ctx.yield_output(True)  # or yield {"status": "done"}

使用给定的工作流上下文初始化执行程序上下文。

WorkflowErrorDetails

结构化错误信息,以在错误事件/结果中显示。

WorkflowEvent

工作流事件的基类。

使用可选数据初始化工作流事件。

WorkflowExecutor

包装工作流以启用分层工作流组合的执行程序。

概述

WorkflowExecutor 使工作流的行为在父工作流中充当单个执行程序,从而启用嵌套工作流体系结构。 它处理子工作流执行的完整生命周期,包括事件处理、输出转发以及父工作流和子工作流之间的请求/响应协调。

执行模型

调用时,WorkflowExecutor:

  1. 使用输入消息启动包装的工作流
  2. 运行子工作流以完成或直到需要外部输入
  3. 执行后处理子工作流的完整事件流
  4. 将输出作为消息转发到父工作流
  5. 通过将外部请求路由到父工作流来处理外部请求
  6. 累积响应并恢复子工作流执行

事件流处理

WorkflowExecutor 在子工作流完成后处理事件:

输出转发

子工作流中的所有输出都会自动转发到父级:

当allow_direct_output为 False 时(默认值):


   # An executor in the sub-workflow yields outputs
   await ctx.yield_output("sub-workflow result")

   # WorkflowExecutor forwards to parent via ctx.send_message()
   # Parent receives the output as a regular message

当allow_direct_output为 True 时:

请求/响应协调

当子工作流需要外部信息时:


   # An executor in the sub-workflow makes request
   request = MyDataRequest(query="user info")

   # WorkflowExecutor captures RequestInfoEvent and wraps it in a SubWorkflowRequestMessage
   # then send it to the receiving executor in parent workflow. The executor in parent workflow
   # can handle the request locally or forward it to an external source.
   # The WorkflowExecutor tracks the pending request, and implements a response handler.
   # When the response is received, it executes the response handler to accumulate responses
   # and resume the sub-workflow when all expected responses are received.
   # The response handler expects a SubWorkflowResponseMessage wrapping the response data.

状态管理

WorkflowExecutor 跨请求/响应周期维护执行状态:

  • 通过request_id跟踪挂起的请求
  • 累积响应,直到收到所有预期响应
  • 使用完整的响应批处理恢复子工作流执行
  • 处理并发执行和多个挂起的请求

类型系统集成

WorkflowExecutor 从包装的工作流继承其类型签名:

输入类型

匹配包装的工作流的启动执行程序输入类型:


   # If sub-workflow accepts str, WorkflowExecutor accepts str
   workflow_executor = WorkflowExecutor(my_workflow, id="wrapper")
   assert workflow_executor.input_types == my_workflow.input_types

输出类型

将子工作流输出与请求协调类型相结合:


   # Includes all sub-workflow output types
   # Plus SubWorkflowRequestMessage if sub-workflow can make requests
   output_types = workflow.output_types + [SubWorkflowRequestMessage]  # if applicable

错误处理

WorkflowExecutor 传播子工作流失败:

  • 从子工作流捕获 WorkflowFailedEvent
  • 在父上下文中转换为 WorkflowErrorEvent
  • 提供详细的错误信息,包括子工作流 ID

并发执行支持

WorkflowExecutor 完全支持多个并发子工作流执行:

Per-Execution 状态隔离

每个子工作流调用都会创建独立的 ExecutionContext:


   # Multiple concurrent invocations are supported
   workflow_executor = WorkflowExecutor(my_workflow, id="concurrent_executor")

   # Each invocation gets its own execution context
   # Execution 1: processes input_1 independently
   # Execution 2: processes input_2 independently
   # No state interference between executions

请求/响应协调

响应正确路由到原始执行:

  • 每个执行跟踪其自己的挂起请求和预期响应
  • 请求到执行映射可确保响应到达正确的子工作流
  • 响应累积是按执行隔离的
  • 执行完成时自动清理

内存管理

  • 支持无限并发执行
  • 每个执行都具有唯一的基于 UUID 的标识
  • 清理已完成的执行上下文
  • 并发访问的线程安全状态管理

重要注意事项

共享工作流实例:所有并发执行都使用相同的基础工作流实例。 若要进行适当的隔离,请确保包装的工作流及其执行程序是无状态的。


   # Avoid: Stateful executor with instance variables
   class StatefulExecutor(Executor):
       def __init__(self):
           super().__init__(id="stateful")
           self.data = []  # This will be shared across concurrent executions!

与父工作流集成

父工作流可以截获子工作流请求:

实现说明

  • 子工作流在处理结果之前运行到完成
  • 事件处理是原子的 - 所有输出在请求之前转发
  • 响应累积可确保子工作流接收完整的响应批处理
  • 在执行状态是为了在外部请求后正确恢复而保留
  • 并发执行是完全隔离的,不会相互干扰

初始化 WorkflowExecutor。

WorkflowFailedEvent

工作流运行终止并出现错误时发出的内置生命周期事件。

WorkflowOutputEvent

工作流执行程序生成输出时触发的事件。

初始化工作流输出事件。

WorkflowRunResult

非流式处理工作流执行期间生成的事件的容器。

概述

表示工作流运行的完整执行结果,其中包含从开始到空闲状态生成的所有事件。 工作流在执行期间通过 ctx.yield_output() 调用以增量方式生成输出。

事件结构

维护数据平面事件与控制平面事件之间的分离:

  • 数据平面事件:执行程序调用、完成、输出和请求(在主列表中)
  • 控制平面事件:状态时间线可通过 status_timeline() 方法访问

键方法

  • get_outputs(): 从执行中提取所有工作流输出
  • get_request_info_events(): 检索执行期间发出的外部输入请求
  • get_final_state(): 获取最终工作流状态(IDLE、IDLE_WITH_PENDING_REQUESTS 等)
  • status_timeline(): 访问完整的状态事件历史记录
WorkflowStartedEvent

工作流运行开始时发出的内置生命周期事件。

使用可选数据初始化工作流事件。

WorkflowStatusEvent

为工作流运行状态转换发出的内置生命周期事件。

使用新的状态和可选数据初始化工作流状态事件。

WorkflowValidationError

工作流验证错误的基例外。

WorkflowViz

用于使用 graphviz 和 Mermaid 可视化工作流的类。

使用工作流初始化 WorkflowViz。

枚举

MagenticHumanInterventionDecision

人工干预响应的决策选项。

MagenticHumanInterventionKind

需要这种人工干预。

ValidationTypeEnum

工作流验证类型的枚举。

WorkflowEventSource

标识工作流事件是来自框架还是执行程序。

FRAMEWORK 用于由内置业务流程路径发出的事件(即使引发它们的代码驻留在运行程序相关模块中),对于由开发人员提供的执行程序实现显示的事件 ,也使用 EXECUTOR

WorkflowRunState

工作流执行的运行级别状态。

语义学:

  • STARTED:已启动运行,并且已创建工作流上下文。 在执行任何有意义的工作之前,这是一个初始状态。 在此代码库中,我们将发出专用 WorkflowStartedEvent 进行遥测,通常将状态直接提升为 IN_PROGRESS。 对于需要显式工作前阶段的状态计算机,使用者仍可能依赖 STARTED

  • IN_PROGRESS:工作流正在积极执行(例如,初始消息已传递到启动执行程序或超级步骤正在运行)。 此状态在运行开始时发出,可以在运行过程中后跟其他状态。

  • IN_PROGRESS_PENDING_REQUESTS:在一个或多个信息请求作未完成时活动执行。 在请求处于运行状态时,仍可能会计划新工作。

  • IDLE:工作流是静止的,没有未完成的请求,也没有更多工作要做。 这是已完成执行的工作流的正常终端状态,可能一路生成输出。

  • IDLE_WITH_PENDING_REQUESTS:工作流暂停等待外部输入(例如发出 RequestInfoEvent)。 这是非终端状态;提供响应时,工作流可以恢复。

  • FAILED:指示显示错误的终端状态。 带有结构化错误详细信息的 WorkflowFailedEvent

  • 已取消:指示运行已由调用方或业务流程协调程序取消的终端状态。 当前未默认发出运行程序路径,但包括支持取消的集成器/业务流程协调程序。

函数

agent_middleware

修饰器将函数标记为代理中间件。

此修饰器将函数显式标识为代理中间件,该中间件处理 AgentRunContext 对象。

agent_middleware(func: Callable[[AgentRunContext, Callable[[AgentRunContext], Awaitable[None]]], Awaitable[None]]) -> Callable[[AgentRunContext, Callable[[AgentRunContext], Awaitable[None]]], Awaitable[None]]

参数

名称 说明
func
必需

要标记为代理中间件的中间件函数。

返回

类型 说明

与代理中间件标记相同的函数。

示例


   from agent_framework import agent_middleware, AgentRunContext, ChatAgent


   @agent_middleware
   async def logging_middleware(context: AgentRunContext, next):
       print(f"Before: {context.agent.name}")
       await next(context)
       print(f"After: {context.result}")


   # Use with an agent
   agent = ChatAgent(chat_client=client, name="assistant", middleware=logging_middleware)

ai_function

修饰函数,将其转换为 AIFunction,可以传递给模型并自动执行。

此修饰器从函数的签名创建 Pydantic 模型,该模型将用于验证传递给函数的参数,并为函数的参数生成 JSON 架构。

若要向参数添加说明,请使用 Annotated 字符串 typing 说明中的类型作为第二个参数。 还可以使用 Pydantic 的 Field 类进行更高级的配置。

注释

当approval_mode设置为“always_require”时,将不会执行该函数

在给出显式批准之前,这仅适用于自动调用流。

另请务必注意,如果模型返回多个函数调用,则需要审批的一些调用

和其他不这样做的人,它将要求批准他们所有人。

ai_function(func: Callable[[...], ReturnT | Awaitable[ReturnT]] | None = None, *, name: str | None = None, description: str | None = None, approval_mode: Literal['always_require', 'never_require'] | None = None, max_invocations: int | None = None, max_invocation_exceptions: int | None = None, additional_properties: dict[str, Any] | None = None) -> AIFunction[Any, ReturnT] | Callable[[Callable[[...], ReturnT | Awaitable[ReturnT]]], AIFunction[Any, ReturnT]]

参数

名称 说明
func
Callable[[...], <xref:agent_framework._tools.ReturnT> | Awaitable[<xref:agent_framework._tools.ReturnT>]] | None

要修饰的函数。

默认值: None
name
必需
str | None
description
必需
str | None
approval_mode
必需
Literal['always_require', 'never_require'] | None
max_invocations
必需
int | None
max_invocation_exceptions
必需
int | None
additional_properties
必需

仅限关键字的参数

名称 说明
name

函数的名称。 如果未提供,将使用函数 __name__ 的属性。

默认值: None
description

函数的说明。 如果未提供,将使用函数的 docstring。

默认值: None
approval_mode

是否需要批准才能运行此工具。 默认值为不需要审批。

默认值: None
max_invocations

可以调用此函数的最大次数。 如果没有限制,则至少应为 1。

默认值: None
max_invocation_exceptions

调用期间允许的最大异常数。 如果没有限制,则至少应为 1。

默认值: None
additional_properties

要对函数设置的其他属性。

默认值: None

返回

类型 说明
AIFunction[Any, <xref:agent_framework._tools.ReturnT>] | Callable[[Callable[[…], <xref:agent_framework._tools.ReturnT> | Awaitable[<xref:agent_framework._tools.ReturnT>]]], AIFunction[Any, <xref:agent_framework._tools.ReturnT>]]

示例


   from agent_framework import ai_function
   from typing import Annotated


   @ai_function
   def ai_function_example(
       arg1: Annotated[str, "The first argument"],
       arg2: Annotated[int, "The second argument"],
   ) -> str:
       # An example function that takes two arguments and returns a string.
       return f"arg1: {arg1}, arg2: {arg2}"


   # the same function but with approval required to run
   @ai_function(approval_mode="always_require")
   def ai_function_example(
       arg1: Annotated[str, "The first argument"],
       arg2: Annotated[int, "The second argument"],
   ) -> str:
       # An example function that takes two arguments and returns a string.
       return f"arg1: {arg1}, arg2: {arg2}"


   # With custom name and description
   @ai_function(name="custom_weather", description="Custom weather function")
   def another_weather_func(location: str) -> str:
       return f"Weather in {location}"


   # Async functions are also supported
   @ai_function
   async def async_get_weather(location: str) -> str:
       '''Get weather asynchronously.'''
       # Simulate async operation
       return f"Weather in {location}"

chat_middleware

修饰器将函数标记为聊天中间件。

此修饰器将函数显式标识为聊天中间件,该中间件处理 ChatContext 对象。

chat_middleware(func: Callable[[ChatContext, Callable[[ChatContext], Awaitable[None]]], Awaitable[None]]) -> Callable[[ChatContext, Callable[[ChatContext], Awaitable[None]]], Awaitable[None]]

参数

名称 说明
func
必需

要标记为聊天中间件的中间件的中间件函数。

返回

类型 说明

与聊天中间件标记相同的函数。

示例


   from agent_framework import chat_middleware, ChatContext, ChatAgent


   @chat_middleware
   async def logging_middleware(context: ChatContext, next):
       print(f"Messages: {len(context.messages)}")
       await next(context)
       print(f"Response: {context.result}")


   # Use with an agent
   agent = ChatAgent(chat_client=client, name="assistant", middleware=logging_middleware)

create_edge_runner

工厂函数,用于为边缘组创建适当的边缘运行程序。

create_edge_runner(edge_group: EdgeGroup, executors: dict[str, Executor]) -> EdgeRunner

参数

名称 说明
edge_group
必需
<xref:agent_framework._workflows._edge.EdgeGroup>

要为其创建运行程序的边缘组。

executors
必需

执行程序 ID 到执行程序实例的映射。

返回

类型 说明
<xref:agent_framework._workflows._edge_runner.EdgeRunner>

相应的 EdgeRunner 实例。

executor

将独立函数转换为 FunctionExecutor 实例的修饰器。

@executor修饰器仅用于独立模块级函数。 对于基于类的执行程序,对实例方法使用执行器基类 @handler 。

支持同步和异步函数。 同步函数在线程池中执行,以避免阻止事件循环。

重要

用于 @executor 独立函数(模块级或本地函数)

请勿与@executorstaticmethodclassmethod

对于基于类的执行程序,子类执行程序并在 @handler 实例方法上使用

用法:


   # Standalone async function (RECOMMENDED):
   @executor(id="upper_case")
   async def to_upper(text: str, ctx: WorkflowContext[str]):
       await ctx.send_message(text.upper())


   # Standalone sync function (runs in thread pool):
   @executor
   def process_data(data: str):
       return data.upper()


   # For class-based executors, use @handler instead:
   class MyExecutor(Executor):
       def __init__(self):
           super().__init__(id="my_executor")

       @handler
       async def process(self, data: str, ctx: WorkflowContext[str]):
           await ctx.send_message(data.upper())
executor(func: Callable[[...], Any] | None = None, *, id: str | None = None) -> Callable[[Callable[[...], Any]], FunctionExecutor] | FunctionExecutor

参数

名称 说明
func
Callable[[...], Any] | None

要修饰的函数(使用时不使用括号)

默认值: None
id
必需
str | None

执行程序的可选自定义 ID。 如果为 None,则使用函数名称。

仅限关键字的参数

名称 说明
id
默认值: None

返回

类型 说明

可连接到工作流的 FunctionExecutor 实例。

例外

类型 说明

staticmethod如果使用或classmethod(不支持的模式)

function_middleware

修饰器将函数标记为函数中间件。

此修饰器将函数显式标识为函数中间件,该中间件处理 FunctionInvocationContext 对象。

function_middleware(func: Callable[[FunctionInvocationContext, Callable[[FunctionInvocationContext], Awaitable[None]]], Awaitable[None]]) -> Callable[[FunctionInvocationContext, Callable[[FunctionInvocationContext], Awaitable[None]]], Awaitable[None]]

参数

名称 说明
func
必需

要标记为函数中间件的中间件的中间件函数。

返回

类型 说明

具有函数中间件标记的相同函数。

示例


   from agent_framework import function_middleware, FunctionInvocationContext, ChatAgent


   @function_middleware
   async def logging_middleware(context: FunctionInvocationContext, next):
       print(f"Calling: {context.function.name}")
       await next(context)
       print(f"Result: {context.result}")


   # Use with an agent
   agent = ChatAgent(chat_client=client, name="assistant", middleware=logging_middleware)

get_checkpoint_summary

get_checkpoint_summary(checkpoint: WorkflowCheckpoint) -> WorkflowCheckpointSummary

参数

名称 说明
checkpoint
必需

返回

类型 说明

get_logger

获取具有指定名称的记录器,默认为“agent_framework”。

get_logger(name: str = 'agent_framework') -> Logger

参数

名称 说明
name
str

记录器的名称。 默认为“agent_framework”。

默认值: "agent_framework"

返回

类型 说明

配置的记录器实例。

handler

修饰器用于为执行程序注册处理程序。

handler(func: Callable[[ExecutorT, Any, ContextT], Awaitable[Any]]) -> Callable[[ExecutorT, Any, ContextT], Awaitable[Any]]

参数

名称 说明
func
必需
Callable[[<xref:agent_framework._workflows._executor.ExecutorT>, Any, <xref:agent_framework._workflows._executor.ContextT>], Awaitable[Any]]

要修饰的函数。 在没有参数的情况下使用时可为 None。

返回

类型 说明
Callable[[<xref:agent_framework._workflows._executor.ExecutorT>, Any, <xref:agent_framework._workflows._executor.ContextT>], Awaitable[Any]]

带有处理程序元数据的修饰函数。

示例

@handler async def handle_string(self, message: str, ctx: WorkflowContext[str]) -> None:

...

@handler async def handle_data(self, message: dict, ctx: WorkflowContext[str | int]) -> None:

...

prepare_function_call_results

准备函数调用结果的值。

prepare_function_call_results(content: TextContent | DataContent | TextReasoningContent | UriContent | FunctionCallContent | FunctionResultContent | ErrorContent | UsageContent | HostedFileContent | HostedVectorStoreContent | FunctionApprovalRequestContent | FunctionApprovalResponseContent | Any | list[TextContent | DataContent | TextReasoningContent | UriContent | FunctionCallContent | FunctionResultContent | ErrorContent | UsageContent | HostedFileContent | HostedVectorStoreContent | FunctionApprovalRequestContent | FunctionApprovalResponseContent | Any]) -> str

参数

名称 说明
content
必需

返回

类型 说明
str

prepend_agent_framework_to_user_agent

将“agent-framework”追加到标头中的 User-Agent。

通过 AGENT_FRAMEWORK_USER_AGENT_DISABLED 环境变量禁用用户代理遥测时,User-Agent 标头将不包括代理框架信息。 它将按原样发送回,或在传递 None 时作为空听写发送。

prepend_agent_framework_to_user_agent(headers: dict[str, Any] | None = None) -> dict[str, Any]

参数

名称 说明
headers

现有标头字典。

默认值: None

返回

类型 说明

如果标头为 None,则“User-Agent”的新听写设置为“agent-framework-python/{version}”。 用户代理前面有“agent-framework-python/{version}”的修改标头字典。

示例


   from agent_framework import prepend_agent_framework_to_user_agent

   # Add agent-framework to new headers
   headers = prepend_agent_framework_to_user_agent()
   print(headers["User-Agent"])  # "agent-framework-python/0.1.0"

   # Prepend to existing headers
   existing = {"User-Agent": "my-app/1.0"}
   headers = prepend_agent_framework_to_user_agent(existing)
   print(headers["User-Agent"])  # "agent-framework-python/0.1.0 my-app/1.0"

response_handler

修饰器用于注册处理程序来处理请求的响应。

response_handler(func: Callable[[ExecutorT, Any, Any, ContextT], Awaitable[None]]) -> Callable[[ExecutorT, Any, Any, ContextT], Awaitable[None]]

参数

名称 说明
func
必需
Callable[[<xref:agent_framework._workflows._request_info_mixin.ExecutorT>, Any, Any, <xref:agent_framework._workflows._request_info_mixin.ContextT>], Awaitable[None]]

要修饰的函数。

返回

类型 说明
Callable[[<xref:agent_framework._workflows._request_info_mixin.ExecutorT>, Any, Any, <xref:agent_framework._workflows._request_info_mixin.ContextT>], Awaitable[None]]

带有处理程序元数据的修饰函数。

示例


   @handler
   async def run(self, message: int, context: WorkflowContext[str]) -> None:
       # Example of a handler that sends a request
       ...
       # Send a request with a `CustomRequest` payload and expect a `str` response.
       await context.request_info(CustomRequest(...), str)


   @response_handler
   async def handle_response(
       self,
       original_request: CustomRequest,
       response: str,
       context: WorkflowContext[str],
   ) -> None:
       # Example of a response handler for the above request
       ...


   @response_handler
   async def handle_response(
       self,
       original_request: CustomRequest,
       response: dict,
       context: WorkflowContext[int],
   ) -> None:
       # Example of a response handler for a request expecting a dict response
       ...

setup_logging

设置代理框架的日志记录配置。

setup_logging() -> None

返回

类型 说明

use_agent_middleware

向代理类添加中间件支持的类修饰器。

此修饰器向任何代理类添加中间件功能。 它包装提供 run() 中间件执行的方法和 run_stream() 方法。

中间件执行可以通过将 context.terminate 属性设置为 True 随时终止。 设置后,管道将在控制返回到管道后立即停止执行进一步中间件。

注释

此修饰器已应用于内置代理类。 只需使用

如果要创建自定义代理实现,则为它。

use_agent_middleware(agent_class: type[TAgent]) -> type[TAgent]

参数

名称 说明
agent_class
必需
type[<xref:TAgent>]

要向其添加中间件支持的代理类。

返回

类型 说明
type[~<xref:TAgent>]

具有中间件支持的修改代理类。

示例


   from agent_framework import use_agent_middleware


   @use_agent_middleware
   class CustomAgent:
       async def run(self, messages, **kwargs):
           # Agent implementation
           pass

       async def run_stream(self, messages, **kwargs):
           # Streaming implementation
           pass

use_chat_middleware

向聊天客户端类添加中间件支持的类修饰器。

此修饰器向任何聊天客户端类添加中间件功能。 它包装提供 get_response() 中间件执行的方法和 get_streaming_response() 方法。

注释

此修饰器已应用于内置聊天客户端类。 只需使用

如果要创建自定义聊天客户端实现,则为它。

use_chat_middleware(chat_client_class: type[TChatClient]) -> type[TChatClient]

参数

名称 说明
chat_client_class
必需
type[<xref:TChatClient>]

要向其添加中间件支持的聊天客户端类。

返回

类型 说明
type[~<xref:TChatClient>]

具有中间件支持的修改后的聊天客户端类。

示例


   from agent_framework import use_chat_middleware


   @use_chat_middleware
   class CustomChatClient:
       async def get_response(self, messages, **kwargs):
           # Chat client implementation
           pass

       async def get_streaming_response(self, messages, **kwargs):
           # Streaming implementation
           pass

use_function_invocation

用于为聊天客户端启用工具调用的类修饰器。

此修饰器包装 get_responseget_streaming_response 方法,以自动处理来自模型的函数调用,执行它们,并将结果返回给模型以供进一步处理。

use_function_invocation(chat_client: type[TChatClient]) -> type[TChatClient]

参数

名称 说明
chat_client
必需
type[<xref:TChatClient>]

要修饰的聊天客户端类。

返回

类型 说明
type[~<xref:TChatClient>]

启用了函数调用的修饰聊天客户端类。

例外

类型 说明

如果聊天客户端没有所需的方法。

示例


   from agent_framework import use_function_invocation, BaseChatClient


   @use_function_invocation
   class MyCustomClient(BaseChatClient):
       async def get_response(self, messages, **kwargs):
           # Implementation here
           pass

       async def get_streaming_response(self, messages, **kwargs):
           # Implementation here
           pass


   # The client now automatically handles function calls
   client = MyCustomClient()

validate_workflow_graph

用于验证工作流图的便利函数。

validate_workflow_graph(edge_groups: Sequence[EdgeGroup], executors: dict[str, Executor], start_executor: Executor) -> None

参数

名称 说明
edge_groups
必需
Sequence[<xref:agent_framework._workflows._edge.EdgeGroup>]

工作流中的边缘组列表

executors
必需

执行程序 ID 到执行程序实例的映射

start_executor
必需

起始执行程序(可以是实例或 ID)

返回

类型 说明

例外

类型 说明

如果任何验证失败