本頁提供如何在 Microsoft Agent Framework 工作流程中使用 代理程式 的概觀。
概觀
若要將智慧新增至工作流程,您可以利用 AI 代理程式作為工作流程執行的一部分。 AI 代理可以輕鬆整合到工作流程中,使您能夠創建以前難以實現的複雜、智慧的解決方案。
將代理程式直接新增至工作流程
您可以透過邊緣將代理程式新增至工作流程:
using Microsoft.Agents.Workflows;
using Microsoft.Extensions.AI;
using Microsoft.Agents.AI;
// Create the agents first
AIAgent agentA = new ChatClientAgent(chatClient, instructions);
AIAgent agentB = new ChatClientAgent(chatClient, instructions);
// Build a workflow with the agents
WorkflowBuilder builder = new(agentA);
builder.AddEdge(agentA, agentB);
Workflow<ChatMessage> workflow = builder.Build<ChatMessage>();
執行工作流程
在上面建立的工作流程中,代理程式實際上包裝在執行程式內,該執行程式處理代理程式與工作流程其他部分的通訊。 執行程式可以處理三種訊息類型:
-
ChatMessage:單一聊天訊息 -
List<ChatMessage>:聊天訊息清單 -
TurnToken:表示新回合開始的回合令牌
執行程式不會觸發代理程式回應,直到收到 TurnToken. 在接收到TurnToken之前收到的任何訊息都會被緩衝,並在收到TurnToken後傳送至代理。
StreamingRun run = await InProcessExecution.StreamAsync(workflow, new ChatMessage(ChatRole.User, "Hello World!"));
// Must send the turn token to trigger the agents. The agents are wrapped as executors.
// When they receive messages, they will cache the messages and only start processing
// when they receive a TurnToken. The turn token will be passed from one agent to the next.
await run.TrySendMessageAsync(new TurnToken(emitEvents: true));
await foreach (WorkflowEvent evt in run.WatchStreamAsync().ConfigureAwait(false))
{
// The agents will run in streaming mode and an AgentRunUpdateEvent
// will be emitted as new chunks are generated.
if (evt is AgentRunUpdateEvent agentRunUpdate)
{
Console.WriteLine($"{agentRunUpdate.ExecutorId}: {agentRunUpdate.Data}");
}
}
使用內建代理執行器
您可以透過邊緣將代理程式新增至工作流程:
from agent_framework import WorkflowBuilder
from agent_framework.azure import AzureChatClient
from azure.identity import AzureCliCredential
# Create the agents first
chat_client = AzureChatClient(credential=AzureCliCredential())
writer_agent: ChatAgent = chat_client.create_agent(
instructions=(
"You are an excellent content writer. You create new content and edit contents based on the feedback."
),
name="writer_agent",
)
reviewer_agent = chat_client.create_agent(
instructions=(
"You are an excellent content reviewer."
"Provide actionable feedback to the writer about the provided content."
"Provide the feedback in the most concise manner possible."
),
name="reviewer_agent",
)
# Build a workflow with the agents
builder = WorkflowBuilder()
builder.set_start_executor(writer_agent)
builder.add_edge(writer_agent, reviewer_agent)
workflow = builder.build()
執行工作流程
在上面建立的工作流程中,代理程式實際上包裝在執行程式內,該執行程式處理代理程式與工作流程其他部分的通訊。 執行程式可以處理三種訊息類型:
-
str:字串格式的單一聊天訊息 -
ChatMessage:單一聊天訊息 -
List<ChatMessage>:聊天訊息清單
每當執行器收到其中一種類型的訊息時,它就會觸發代理程式回應,回應類型將是 AgentExecutorResponse 物件。 此類別包含有關代理程式回應的有用資訊,包括:
-
executor_id:產生此回應的執行程式識別碼 -
agent_run_response:客服專員的完整回應 -
full_conversation:到目前為止的完整對話記錄
執行工作流程時,可能會發出與代理程式回應相關的兩種可能事件類型:
-
AgentRunUpdateEvent包含代理程式回應區塊,因為它們是在串流模式下產生的。 -
AgentRunEvent包含來自非串流模式下代理程式的完整回應。
依預設,代理程式會包裝在以串流模式執行的執行程式中。 您可以建立自訂執行程式來自訂此行為。 有關更多詳細信息,請參閱下一節。
last_executor_id = None
async for event in workflow.run_streaming("Write a short blog post about AI agents."):
if isinstance(event, AgentRunUpdateEvent):
if event.executor_id != last_executor_id:
if last_executor_id is not None:
print()
print(f"{event.executor_id}:", end=" ", flush=True)
last_executor_id = event.executor_id
print(event.data, end="", flush=True)
使用自訂代理執行環境
有時您可能想要自訂 AI 代理程式整合到工作流程中的方式。 您可以透過建立自訂執行程式來達成此目的。 這可讓您控制:
- 代理程式的叫用:串流或非串流
- 代理程式將處理的訊息類型,包括自訂訊息類型
- 代理程式的生命週期,包括初始化和清理
- 代理程式執行緒和其他資源的使用方式
- 代理程式執行期間發出的其他事件,包括自訂事件
- 與其他工作流程功能集成,例如共享狀態和請求/響應
internal sealed class CustomAgentExecutor : Executor<CustomInput, CustomOutput>("CustomAgentExecutor")
{
private readonly AIAgent _agent;
/// <summary>
/// Creates a new instance of the <see cref="CustomAgentExecutor"/> class.
/// </summary>
/// <param name="agent">The AI agent used for custom processing</param>
public CustomAgentExecutor(AIAgent agent) : base("CustomAgentExecutor")
{
this._agent = agent;
}
public async ValueTask<CustomOutput> HandleAsync(CustomInput message, IWorkflowContext context)
{
// Retrieve any shared states if needed
var sharedState = await context.ReadStateAsync<SharedStateType>("sharedStateId", scopeName: "SharedStateScope");
// Render the input for the agent
var agentInput = RenderInput(message, sharedState);
// Invoke the agent
// Assume the agent is configured with structured outputs with type `CustomOutput`
var response = await this._agent.RunAsync(agentInput);
var customOutput = JsonSerializer.Deserialize<CustomOutput>(response.Text);
return customOutput;
}
}
from agent_framework import (
ChatAgent,
ChatMessage,
Executor,
WorkflowContext,
handler
)
class Writer(Executor):
agent: ChatAgent
def __init__(self, chat_client: AzureChatClient, id: str = "writer"):
# Create a domain specific agent using your configured AzureChatClient.
agent = chat_client.create_agent(
instructions=(
"You are an excellent content writer. You create new content and edit contents based on the feedback."
),
)
# Associate the agent with this executor node. The base Executor stores it on self.agent.
super().__init__(agent=agent, id=id)
@handler
async def handle(self, message: ChatMessage, ctx: WorkflowContext[list[ChatMessage]]) -> None:
"""Handles a single chat message and forwards the accumulated messages to the next executor in the workflow."""
# Invoke the agent with the incoming message and get the response
messages: list[ChatMessage] = [message]
response = await self.agent.run(messages)
# Accumulate messages and send them to the next executor in the workflow.
messages.extend(response.messages)
await ctx.send_message(messages)