在本教學課程中,您將學習如何使用代理程式架構建立具有分支邏輯的工作流程。 分支邏輯可讓您的工作流程根據特定條件做出決策,從而實現更複雜和動態的行為。
條件邊
條件式邊緣可讓您的工作流程根據流經工作流程的訊息內容或屬性做出路由決策。 這會啟用動態分支,其中根據執行階段條件採用不同的執行路徑。
您將構建什麼
您將建立示範條件式路由的電子郵件處理工作流程:
- 一種垃圾郵件偵測代理程式,可分析傳入的電子郵件並傳回結構化 JSON。
- 根據分類將電子郵件路由至不同處理程序的有條件的邊緣。
- 一個合法的電子郵件處理程序,可以起草專業的回應。
- 標記可疑電子郵件的垃圾郵件處理程序。
- 共用狀態管理,可在工作流程步驟之間保存電子郵件資料。
涵蓋概念
先決條件
- .NET 8.0 SDK 或更新版本。
- 已設定 Azure OpenAI 服務端點和部署。
- 已安裝並驗證 Azure CLI (針對 Azure 認證驗證)。
- 對 C# 和非同步程式設計有基本的了解。
- 新的主控台應用程式。
安裝 NuGet 套件
首先,安裝 .NET 專案所需的套件:
dotnet add package Azure.AI.OpenAI --prerelease
dotnet add package Azure.Identity
dotnet add package Microsoft.Agents.AI.Workflows --prerelease
dotnet add package Microsoft.Extensions.AI.OpenAI --prerelease
定義資料模型
首先定義將流經工作流程的資料結構:
using System.Text.Json.Serialization;
/// <summary>
/// Represents the result of spam detection.
/// </summary>
public sealed class DetectionResult
{
[JsonPropertyName("is_spam")]
public bool IsSpam { get; set; }
[JsonPropertyName("reason")]
public string Reason { get; set; } = string.Empty;
// Email ID is generated by the executor, not the agent
[JsonIgnore]
public string EmailId { get; set; } = string.Empty;
}
/// <summary>
/// Represents an email.
/// </summary>
internal sealed class Email
{
[JsonPropertyName("email_id")]
public string EmailId { get; set; } = string.Empty;
[JsonPropertyName("email_content")]
public string EmailContent { get; set; } = string.Empty;
}
/// <summary>
/// Represents the response from the email assistant.
/// </summary>
public sealed class EmailResponse
{
[JsonPropertyName("response")]
public string Response { get; set; } = string.Empty;
}
/// <summary>
/// Constants for shared state scopes.
/// </summary>
internal static class EmailStateConstants
{
public const string EmailStateScope = "EmailState";
}
建立條件函數
條件函式會評估垃圾郵件偵測結果,以判斷工作流程應採用的路徑:
/// <summary>
/// Creates a condition for routing messages based on the expected spam detection result.
/// </summary>
/// <param name="expectedResult">The expected spam detection result</param>
/// <returns>A function that evaluates whether a message meets the expected result</returns>
private static Func<object?, bool> GetCondition(bool expectedResult) =>
detectionResult => detectionResult is DetectionResult result && result.IsSpam == expectedResult;
這個條件函數:
- 採用參數
bool expectedResult(垃圾郵件為true,非垃圾郵件為false) - 傳回可做為邊緣條件的函式
- 安全地檢查訊息是否為 a
DetectionResult並比較IsSpam屬性
創建 AI 代理
設定將處理垃圾郵件偵測和電子郵件協助的 AI 代理程式:
using Azure.AI.OpenAI;
using Azure.Identity;
using Microsoft.Agents.AI;
using Microsoft.Extensions.AI;
/// <summary>
/// Creates a spam detection agent.
/// </summary>
/// <returns>A ChatClientAgent configured for spam detection</returns>
private static ChatClientAgent GetSpamDetectionAgent(IChatClient chatClient) =>
new(chatClient, new ChatClientAgentOptions(instructions: "You are a spam detection assistant that identifies spam emails.")
{
ChatOptions = new()
{
ResponseFormat = ChatResponseFormat.ForJsonSchema(AIJsonUtilities.CreateJsonSchema(typeof(DetectionResult)))
}
});
/// <summary>
/// Creates an email assistant agent.
/// </summary>
/// <returns>A ChatClientAgent configured for email assistance</returns>
private static ChatClientAgent GetEmailAssistantAgent(IChatClient chatClient) =>
new(chatClient, new ChatClientAgentOptions(instructions: "You are an email assistant that helps users draft professional responses to emails.")
{
ChatOptions = new()
{
ResponseFormat = ChatResponseFormat.ForJsonSchema(AIJsonUtilities.CreateJsonSchema(typeof(EmailResponse)))
}
});
實作執行者
建立處理電子郵件處理不同階段的工作流程執行程式:
using Microsoft.Agents.AI.Workflows;
using System.Text.Json;
/// <summary>
/// Executor that detects spam using an AI agent.
/// </summary>
internal sealed class SpamDetectionExecutor : Executor<ChatMessage, DetectionResult>
{
private readonly AIAgent _spamDetectionAgent;
public SpamDetectionExecutor(AIAgent spamDetectionAgent) : base("SpamDetectionExecutor")
{
this._spamDetectionAgent = spamDetectionAgent;
}
public override async ValueTask<DetectionResult> HandleAsync(ChatMessage message, IWorkflowContext context, CancellationToken cancellationToken = default)
{
// Generate a random email ID and store the email content to shared state
var newEmail = new Email
{
EmailId = Guid.NewGuid().ToString("N"),
EmailContent = message.Text
};
await context.QueueStateUpdateAsync(newEmail.EmailId, newEmail, scopeName: EmailStateConstants.EmailStateScope);
// Invoke the agent for spam detection
var response = await this._spamDetectionAgent.RunAsync(message);
var detectionResult = JsonSerializer.Deserialize<DetectionResult>(response.Text);
detectionResult!.EmailId = newEmail.EmailId;
return detectionResult;
}
}
/// <summary>
/// Executor that assists with email responses using an AI agent.
/// </summary>
internal sealed class EmailAssistantExecutor : Executor<DetectionResult, EmailResponse>
{
private readonly AIAgent _emailAssistantAgent;
public EmailAssistantExecutor(AIAgent emailAssistantAgent) : base("EmailAssistantExecutor")
{
this._emailAssistantAgent = emailAssistantAgent;
}
public override async ValueTask<EmailResponse> HandleAsync(DetectionResult message, IWorkflowContext context, CancellationToken cancellationToken = default)
{
if (message.IsSpam)
{
throw new ArgumentException("This executor should only handle non-spam messages.");
}
// Retrieve the email content from shared state
var email = await context.ReadStateAsync<Email>(message.EmailId, scopeName: EmailStateConstants.EmailStateScope)
?? throw new InvalidOperationException("Email not found.");
// Invoke the agent to draft a response
var response = await this._emailAssistantAgent.RunAsync(email.EmailContent);
var emailResponse = JsonSerializer.Deserialize<EmailResponse>(response.Text);
return emailResponse!;
}
}
/// <summary>
/// Executor that sends emails.
/// </summary>
internal sealed class SendEmailExecutor : Executor<EmailResponse>
{
public SendEmailExecutor() : base("SendEmailExecutor") { }
public override async ValueTask HandleAsync(EmailResponse message, IWorkflowContext context, CancellationToken cancellationToken = default) =>
await context.YieldOutputAsync($"Email sent: {message.Response}");
}
/// <summary>
/// Executor that handles spam messages.
/// </summary>
internal sealed class HandleSpamExecutor : Executor<DetectionResult>
{
public HandleSpamExecutor() : base("HandleSpamExecutor") { }
public override async ValueTask HandleAsync(DetectionResult message, IWorkflowContext context, CancellationToken cancellationToken = default)
{
if (message.IsSpam)
{
await context.YieldOutputAsync($"Email marked as spam: {message.Reason}");
}
else
{
throw new ArgumentException("This executor should only handle spam messages.");
}
}
}
使用條件式邊緣建置工作流程
現在建立建置和執行工作流程的主要程式:
using Microsoft.Extensions.AI;
public static class Program
{
private static async Task Main()
{
// Set up the Azure OpenAI client
var endpoint = Environment.GetEnvironmentVariable("AZURE_OPENAI_ENDPOINT")
?? throw new Exception("AZURE_OPENAI_ENDPOINT is not set.");
var deploymentName = Environment.GetEnvironmentVariable("AZURE_OPENAI_DEPLOYMENT_NAME") ?? "gpt-4o-mini";
var chatClient = new AzureOpenAIClient(new Uri(endpoint), new AzureCliCredential())
.GetChatClient(deploymentName).AsIChatClient();
// Create agents
AIAgent spamDetectionAgent = GetSpamDetectionAgent(chatClient);
AIAgent emailAssistantAgent = GetEmailAssistantAgent(chatClient);
// Create executors
var spamDetectionExecutor = new SpamDetectionExecutor(spamDetectionAgent);
var emailAssistantExecutor = new EmailAssistantExecutor(emailAssistantAgent);
var sendEmailExecutor = new SendEmailExecutor();
var handleSpamExecutor = new HandleSpamExecutor();
// Build the workflow with conditional edges
var workflow = new WorkflowBuilder(spamDetectionExecutor)
// Non-spam path: route to email assistant when IsSpam = false
.AddEdge(spamDetectionExecutor, emailAssistantExecutor, condition: GetCondition(expectedResult: false))
.AddEdge(emailAssistantExecutor, sendEmailExecutor)
// Spam path: route to spam handler when IsSpam = true
.AddEdge(spamDetectionExecutor, handleSpamExecutor, condition: GetCondition(expectedResult: true))
.WithOutputFrom(handleSpamExecutor, sendEmailExecutor)
.Build();
// Execute the workflow with sample spam email
string emailContent = "Congratulations! You've won $1,000,000! Click here to claim your prize now!";
StreamingRun run = await InProcessExecution.StreamAsync(workflow, new ChatMessage(ChatRole.User, emailContent));
await run.TrySendMessageAsync(new TurnToken(emitEvents: true));
await foreach (WorkflowEvent evt in run.WatchStreamAsync().ConfigureAwait(false))
{
if (evt is WorkflowOutputEvent outputEvent)
{
Console.WriteLine($"{outputEvent}");
}
}
}
}
運作方式
工作流程入口:工作流程從
spamDetectionExecutor接收ChatMessage。垃圾郵件分析:垃圾郵件偵測代理會分析電子郵件,並傳回包含
DetectionResult結構、IsSpam和Reason屬性的結果。條件式路由:根據
IsSpam值:-
如果垃圾郵件 ():
IsSpam = trueHandleSpamExecutor使用GetCondition(true) -
如果合法(使用
IsSpam = false於EmailAssistantExecutor的路徑):GetCondition(false)
-
如果垃圾郵件 ():
回應產生:對於合法電子郵件,電子郵件助理會起草專業回覆。
最終輸出:工作流程會產生垃圾郵件通知或發送起草的電子郵件回應。
條件邊界的關鍵特性
Type-Safe 條件:該
GetCondition方法創建可重複使用的條件函數,以安全地評估訊息內容。多路徑:單一執行器可以有多個具有不同條件的傳出邊緣,從而實現複雜的分支邏輯。
共享狀態:電子郵件資料使用作用域狀態管理在執行器之間持續存在,允許下游執行器存取原始內容。
錯誤處理:執行器驗證其輸入並在收到意外訊息類型時拋出有意義的異常。
乾淨的架構:每個執行者都有單一的職責,使工作流程可維護和測試。
執行範例
當您使用範例垃圾郵件執行此工作流程時:
Email marked as spam: This email contains common spam indicators including monetary prizes, urgency tactics, and suspicious links that are typical of phishing attempts.
嘗試將電子郵件內容變更為合法的內容:
string emailContent = "Hi, I wanted to follow up on our meeting yesterday and get your thoughts on the project proposal.";
工作流程將導向至電子郵件助手,然後生成專業的回覆。
這種條件路由模式構成了建立複雜工作流程的基礎,這些工作流程可以處理複雜的決策樹和業務邏輯。
完成實施
如需完整的工作實作,請參閱代理程式架構存放庫中的此 範例 。
您將構建什麼
您將建立示範條件式路由的電子郵件處理工作流程:
- 分析傳入電子郵件的垃圾郵件偵測代理程式
- 根據分類將電子郵件路由傳送至不同處理常式的條件式邊緣
- 起草專業回覆的合規電子郵件處理工具
- 標記可疑電子郵件的垃圾郵件處理程序
涵蓋概念
先決條件
- Python 3.10 或更新版本
- 已安裝代理程式架構:
pip install agent-framework-core --pre - 使用適當的環境變數設定的 Azure OpenAI 服務
- Azure CLI 驗證:
az login
步驟 1:匯入所需的依賴項
首先匯入條件式工作流程的必要元件:
import asyncio
import os
from dataclasses import dataclass
from typing import Any, Literal
from uuid import uuid4
from typing_extensions import Never
from agent_framework import (
AgentExecutor,
AgentExecutorRequest,
AgentExecutorResponse,
ChatMessage,
Role,
WorkflowBuilder,
WorkflowContext,
executor,
Case,
Default,
)
from agent_framework.azure import AzureOpenAIChatClient
from azure.identity import AzureCliCredential
from pydantic import BaseModel
步驟 2:定義資料模型
建立 Pydantic 模型,以在工作流程元件之間進行結構化資料交換:
class DetectionResult(BaseModel):
"""Represents the result of spam detection."""
# is_spam drives the routing decision taken by edge conditions
is_spam: bool
# Human readable rationale from the detector
reason: str
# The agent must include the original email so downstream agents can operate without reloading content
email_content: str
class EmailResponse(BaseModel):
"""Represents the response from the email assistant."""
# The drafted reply that a user could copy or send
response: str
步驟 3:建立條件函數
定義將決定路由決策的條件函數:
def get_condition(expected_result: bool):
"""Create a condition callable that routes based on DetectionResult.is_spam."""
# The returned function will be used as an edge predicate.
# It receives whatever the upstream executor produced.
def condition(message: Any) -> bool:
# Defensive guard. If a non AgentExecutorResponse appears, let the edge pass to avoid dead ends.
if not isinstance(message, AgentExecutorResponse):
return True
try:
# Prefer parsing a structured DetectionResult from the agent JSON text.
# Using model_validate_json ensures type safety and raises if the shape is wrong.
detection = DetectionResult.model_validate_json(message.agent_run_response.text)
# Route only when the spam flag matches the expected path.
return detection.is_spam == expected_result
except Exception:
# Fail closed on parse errors so we do not accidentally route to the wrong path.
# Returning False prevents this edge from activating.
return False
return condition
步驟 4:建立處理常式執行程式
定義執行器以處理不同的路由結果:
@executor(id="send_email")
async def handle_email_response(response: AgentExecutorResponse, ctx: WorkflowContext[Never, str]) -> None:
"""Handle legitimate emails by drafting a professional response."""
# Downstream of the email assistant. Parse a validated EmailResponse and yield the workflow output.
email_response = EmailResponse.model_validate_json(response.agent_run_response.text)
await ctx.yield_output(f"Email sent:\n{email_response.response}")
@executor(id="handle_spam")
async def handle_spam_classifier_response(response: AgentExecutorResponse, ctx: WorkflowContext[Never, str]) -> None:
"""Handle spam emails by marking them appropriately."""
# Spam path. Confirm the DetectionResult and yield the workflow output. Guard against accidental non spam input.
detection = DetectionResult.model_validate_json(response.agent_run_response.text)
if detection.is_spam:
await ctx.yield_output(f"Email marked as spam: {detection.reason}")
else:
# This indicates the routing predicate and executor contract are out of sync.
raise RuntimeError("This executor should only handle spam messages.")
@executor(id="to_email_assistant_request")
async def to_email_assistant_request(
response: AgentExecutorResponse, ctx: WorkflowContext[AgentExecutorRequest]
) -> None:
"""Transform spam detection response into a request for the email assistant."""
# Parse the detection result and extract the email content for the assistant
detection = DetectionResult.model_validate_json(response.agent_run_response.text)
# Create a new request for the email assistant with the original email content
request = AgentExecutorRequest(
messages=[ChatMessage(Role.USER, text=detection.email_content)],
should_respond=True
)
await ctx.send_message(request)
第 5 步:建立 AI 代理
使用結構化輸出格式設定 Azure OpenAI 代理程式:
async def main() -> None:
# Create agents
# AzureCliCredential uses your current az login. This avoids embedding secrets in code.
chat_client = AzureOpenAIChatClient(credential=AzureCliCredential())
# Agent 1. Classifies spam and returns a DetectionResult object.
# response_format enforces that the LLM returns parsable JSON for the Pydantic model.
spam_detection_agent = AgentExecutor(
chat_client.create_agent(
instructions=(
"You are a spam detection assistant that identifies spam emails. "
"Always return JSON with fields is_spam (bool), reason (string), and email_content (string). "
"Include the original email content in email_content."
),
response_format=DetectionResult,
),
id="spam_detection_agent",
)
# Agent 2. Drafts a professional reply. Also uses structured JSON output for reliability.
email_assistant_agent = AgentExecutor(
chat_client.create_agent(
instructions=(
"You are an email assistant that helps users draft professional responses to emails. "
"Your input might be a JSON object that includes 'email_content'; base your reply on that content. "
"Return JSON with a single field 'response' containing the drafted reply."
),
response_format=EmailResponse,
),
id="email_assistant_agent",
)
步驟 6:建置條件式工作流程
建立具有條件式邊緣的工作流程,這些邊緣會根據垃圾郵件偵測結果進行路由:
# Build the workflow graph.
# Start at the spam detector.
# If not spam, hop to a transformer that creates a new AgentExecutorRequest,
# then call the email assistant, then finalize.
# If spam, go directly to the spam handler and finalize.
workflow = (
WorkflowBuilder()
.set_start_executor(spam_detection_agent)
# Not spam path: transform response -> request for assistant -> assistant -> send email
.add_edge(spam_detection_agent, to_email_assistant_request, condition=get_condition(False))
.add_edge(to_email_assistant_request, email_assistant_agent)
.add_edge(email_assistant_agent, handle_email_response)
# Spam path: send to spam handler
.add_edge(spam_detection_agent, handle_spam_classifier_response, condition=get_condition(True))
.build()
)
步驟 7:執行工作流程
使用範例電子郵件內容執行工作流程:
# Read Email content from the sample resource file.
# This keeps the sample deterministic since the model sees the same email every run.
email_path = os.path.join(os.path.dirname(os.path.dirname(os.path.realpath(__file__))), "resources", "email.txt")
with open(email_path) as email_file: # noqa: ASYNC230
email = email_file.read()
# Execute the workflow. Since the start is an AgentExecutor, pass an AgentExecutorRequest.
# The workflow completes when it becomes idle (no more work to do).
request = AgentExecutorRequest(messages=[ChatMessage(Role.USER, text=email)], should_respond=True)
events = await workflow.run(request)
outputs = events.get_outputs()
if outputs:
print(f"Workflow output: {outputs[0]}")
if __name__ == "__main__":
asyncio.run(main())
條件邊的運作方式
條件函數:函數會
get_condition()建立一個述詞,用於檢查訊息內容並傳回True或False確定是否應該遍歷邊緣。訊息檢查:條件可以檢查訊息的任何方面,包括使用 Pydantic 模型解析的代理回應的結構化資料。
防禦性程式設計:條件函數包括錯誤處理,以防止在解析結構化資料時出現路由失敗。
動態路由:根據垃圾郵件偵測結果,電子郵件會自動路由到電子郵件助理(對於合法電子郵件)或垃圾郵件處理程式(對於可疑電子郵件)。
重要概念
- 邊緣條件:決定是否應遍歷邊緣的布林述詞
-
結構化輸出:使用 Pydantic 模型
response_format確保可靠的資料解析 - 防禦性路由: 條件函數處理邊緣情況以防止工作流程死胡同
- 訊息轉換:執行器可以在工作流程步驟之間轉換訊息類型
完成實施
如需完整的工作實作,請參閱代理程式架構存放庫中的 edge_condition.py 範例。
Switch-Case 邊界
在條件邊緣上建置
先前的條件式邊緣範例示範了雙向路由(垃圾郵件與合法電子郵件)。 然而,許多現實場景需要更複雜的決策樹。 當您需要根據不同條件路由到多個目的地時,交換機箱邊緣提供了更乾淨、更易於維護的解決方案。
您會利用 Switch-Case 來設計什麼
您將擴充電子郵件處理工作流程,以處理三個決策路徑:
- NotSpam → 電子郵件助手→發送電子郵件
- 垃圾郵件 → 處理垃圾郵件的執行程式
- 不確定 →處理不確定執行程序 (預設案例)
關鍵改進是使用 SwitchBuilder 模式而不是多個單獨的條件邊緣,隨著決策複雜性的增加,使工作流程更易於理解和維護。
涵蓋概念
Switch-Case 的資料模型
更新您的資料模型以支援三向分類:
/// <summary>
/// Represents the possible decisions for spam detection.
/// </summary>
public enum SpamDecision
{
NotSpam,
Spam,
Uncertain
}
/// <summary>
/// Represents the result of spam detection with enhanced decision support.
/// </summary>
public sealed class DetectionResult
{
[JsonPropertyName("spam_decision")]
[JsonConverter(typeof(JsonStringEnumConverter))]
public SpamDecision spamDecision { get; set; }
[JsonPropertyName("reason")]
public string Reason { get; set; } = string.Empty;
// Email ID is generated by the executor, not the agent
[JsonIgnore]
public string EmailId { get; set; } = string.Empty;
}
/// <summary>
/// Represents an email stored in shared state.
/// </summary>
internal sealed class Email
{
[JsonPropertyName("email_id")]
public string EmailId { get; set; } = string.Empty;
[JsonPropertyName("email_content")]
public string EmailContent { get; set; } = string.Empty;
}
/// <summary>
/// Represents the response from the email assistant.
/// </summary>
public sealed class EmailResponse
{
[JsonPropertyName("response")]
public string Response { get; set; } = string.Empty;
}
/// <summary>
/// Constants for shared state scopes.
/// </summary>
internal static class EmailStateConstants
{
public const string EmailStateScope = "EmailState";
}
Switch-Case 條件生成器
建立可重複使用的條件工廠,為每個垃圾郵件決策產生述句。
/// <summary>
/// Creates a condition for routing messages based on the expected spam detection result.
/// </summary>
/// <param name="expectedDecision">The expected spam detection decision</param>
/// <returns>A function that evaluates whether a message meets the expected result</returns>
private static Func<object?, bool> GetCondition(SpamDecision expectedDecision) =>
detectionResult => detectionResult is DetectionResult result && result.spamDecision == expectedDecision;
這種工廠作法:
- 減少代碼重複: 一個函數生成所有條件謂詞
- 確保一致性:所有條件都遵循相同的模式
- 簡化維護:條件邏輯的變更發生在一個地方
增強型 AI 代理
更新垃圾郵件偵測代理程式,以降低信心閾值,並傳回三向分類:
/// <summary>
/// Creates a spam detection agent with enhanced uncertainty handling.
/// </summary>
/// <returns>A ChatClientAgent configured for three-way spam detection</returns>
private static ChatClientAgent GetSpamDetectionAgent(IChatClient chatClient) =>
new(chatClient, new ChatClientAgentOptions(instructions: "You are a spam detection assistant that identifies spam emails. Be less confident in your assessments.")
{
ChatOptions = new()
{
ResponseFormat = ChatResponseFormat.ForJsonSchema<DetectionResult>()
}
});
/// <summary>
/// Creates an email assistant agent (unchanged from conditional edges example).
/// </summary>
/// <returns>A ChatClientAgent configured for email assistance</returns>
private static ChatClientAgent GetEmailAssistantAgent(IChatClient chatClient) =>
new(chatClient, new ChatClientAgentOptions(instructions: "You are an email assistant that helps users draft responses to emails with professionalism.")
{
ChatOptions = new()
{
ResponseFormat = ChatResponseFormat.ForJsonSchema<EmailResponse>()
}
});
具有增強型路由的工作流程執行器
實作執行程式,以處理具有共用狀態管理的三向路由:
/// <summary>
/// Executor that detects spam using an AI agent with three-way classification.
/// </summary>
internal sealed class SpamDetectionExecutor : Executor<ChatMessage, DetectionResult>
{
private readonly AIAgent _spamDetectionAgent;
public SpamDetectionExecutor(AIAgent spamDetectionAgent) : base("SpamDetectionExecutor")
{
this._spamDetectionAgent = spamDetectionAgent;
}
public override async ValueTask<DetectionResult> HandleAsync(ChatMessage message, IWorkflowContext context, CancellationToken cancellationToken = default)
{
// Generate a random email ID and store the email content in shared state
var newEmail = new Email
{
EmailId = Guid.NewGuid().ToString("N"),
EmailContent = message.Text
};
await context.QueueStateUpdateAsync(newEmail.EmailId, newEmail, scopeName: EmailStateConstants.EmailStateScope);
// Invoke the agent for enhanced spam detection
var response = await this._spamDetectionAgent.RunAsync(message);
var detectionResult = JsonSerializer.Deserialize<DetectionResult>(response.Text);
detectionResult!.EmailId = newEmail.EmailId;
return detectionResult;
}
}
/// <summary>
/// Executor that assists with email responses using an AI agent.
/// </summary>
internal sealed class EmailAssistantExecutor : Executor<DetectionResult, EmailResponse>
{
private readonly AIAgent _emailAssistantAgent;
public EmailAssistantExecutor(AIAgent emailAssistantAgent) : base("EmailAssistantExecutor")
{
this._emailAssistantAgent = emailAssistantAgent;
}
public override async ValueTask<EmailResponse> HandleAsync(DetectionResult message, IWorkflowContext context, CancellationToken cancellationToken = default)
{
if (message.spamDecision == SpamDecision.Spam)
{
throw new ArgumentException("This executor should only handle non-spam messages.");
}
// Retrieve the email content from shared state
var email = await context.ReadStateAsync<Email>(message.EmailId, scopeName: EmailStateConstants.EmailStateScope);
// Invoke the agent to draft a response
var response = await this._emailAssistantAgent.RunAsync(email!.EmailContent);
var emailResponse = JsonSerializer.Deserialize<EmailResponse>(response.Text);
return emailResponse!;
}
}
/// <summary>
/// Executor that sends emails.
/// </summary>
internal sealed class SendEmailExecutor : Executor<EmailResponse>
{
public SendEmailExecutor() : base("SendEmailExecutor") { }
public override async ValueTask HandleAsync(EmailResponse message, IWorkflowContext context, CancellationToken cancellationToken = default) =>
await context.YieldOutputAsync($"Email sent: {message.Response}").ConfigureAwait(false);
}
/// <summary>
/// Executor that handles spam messages.
/// </summary>
internal sealed class HandleSpamExecutor : Executor<DetectionResult>
{
public HandleSpamExecutor() : base("HandleSpamExecutor") { }
public override async ValueTask HandleAsync(DetectionResult message, IWorkflowContext context, CancellationToken cancellationToken = default)
{
if (message.spamDecision == SpamDecision.Spam)
{
await context.YieldOutputAsync($"Email marked as spam: {message.Reason}").ConfigureAwait(false);
}
else
{
throw new ArgumentException("This executor should only handle spam messages.");
}
}
}
/// <summary>
/// Executor that handles uncertain emails requiring manual review.
/// </summary>
internal sealed class HandleUncertainExecutor : Executor<DetectionResult>
{
public HandleUncertainExecutor() : base("HandleUncertainExecutor") { }
public override async ValueTask HandleAsync(DetectionResult message, IWorkflowContext context, CancellationToken cancellationToken = default)
{
if (message.spamDecision == SpamDecision.Uncertain)
{
var email = await context.ReadStateAsync<Email>(message.EmailId, scopeName: EmailStateConstants.EmailStateScope);
await context.YieldOutputAsync($"Email marked as uncertain: {message.Reason}. Email content: {email?.EmailContent}");
}
else
{
throw new ArgumentException("This executor should only handle uncertain spam decisions.");
}
}
}
使用 Switch-Case 模式來建立工作流程
將多個條件語句替換為更簡潔的 switch-case 結構:
public static class Program
{
private static async Task Main()
{
// Set up the Azure OpenAI client
var endpoint = Environment.GetEnvironmentVariable("AZURE_OPENAI_ENDPOINT") ?? throw new Exception("AZURE_OPENAI_ENDPOINT is not set.");
var deploymentName = Environment.GetEnvironmentVariable("AZURE_OPENAI_DEPLOYMENT_NAME") ?? "gpt-4o-mini";
var chatClient = new AzureOpenAIClient(new Uri(endpoint), new AzureCliCredential()).GetChatClient(deploymentName).AsIChatClient();
// Create agents
AIAgent spamDetectionAgent = GetSpamDetectionAgent(chatClient);
AIAgent emailAssistantAgent = GetEmailAssistantAgent(chatClient);
// Create executors
var spamDetectionExecutor = new SpamDetectionExecutor(spamDetectionAgent);
var emailAssistantExecutor = new EmailAssistantExecutor(emailAssistantAgent);
var sendEmailExecutor = new SendEmailExecutor();
var handleSpamExecutor = new HandleSpamExecutor();
var handleUncertainExecutor = new HandleUncertainExecutor();
// Build the workflow using switch-case for cleaner three-way routing
WorkflowBuilder builder = new(spamDetectionExecutor);
builder.AddSwitch(spamDetectionExecutor, switchBuilder =>
switchBuilder
.AddCase(
GetCondition(expectedDecision: SpamDecision.NotSpam),
emailAssistantExecutor
)
.AddCase(
GetCondition(expectedDecision: SpamDecision.Spam),
handleSpamExecutor
)
.WithDefault(
handleUncertainExecutor
)
)
// After the email assistant writes a response, it will be sent to the send email executor
.AddEdge(emailAssistantExecutor, sendEmailExecutor)
.WithOutputFrom(handleSpamExecutor, sendEmailExecutor, handleUncertainExecutor);
var workflow = builder.Build();
// Read an email from a text file (use ambiguous content for demonstration)
string email = Resources.Read("ambiguous_email.txt");
// Execute the workflow
StreamingRun run = await InProcessExecution.StreamAsync(workflow, new ChatMessage(ChatRole.User, email));
await run.TrySendMessageAsync(new TurnToken(emitEvents: true));
await foreach (WorkflowEvent evt in run.WatchStreamAsync().ConfigureAwait(false))
{
if (evt is WorkflowOutputEvent outputEvent)
{
Console.WriteLine($"{outputEvent}");
}
}
}
}
Switch-Case 好處
-
更乾淨的語法:為
SwitchBuilder多個條件邊提供了更具可讀性的替代方案 - 依序檢查:依次檢查案例,在找到第一個符合條件時停止
-
保證路由:該
WithDefault()方法確保訊息永遠不會卡住 - 更好的可維護性:添加新案例需要對工作流程結構進行最少的更改
- 類型安全:每個執行器都會驗證其輸入,以便及早發現路由錯誤
模式比較
之前 (條件邊界):
var workflow = new WorkflowBuilder(spamDetectionExecutor)
.AddEdge(spamDetectionExecutor, emailAssistantExecutor, condition: GetCondition(expectedResult: false))
.AddEdge(spamDetectionExecutor, handleSpamExecutor, condition: GetCondition(expectedResult: true))
// No clean way to handle a third case
.WithOutputFrom(handleSpamExecutor, sendEmailExecutor)
.Build();
之後(Switch-Case):
WorkflowBuilder builder = new(spamDetectionExecutor);
builder.AddSwitch(spamDetectionExecutor, switchBuilder =>
switchBuilder
.AddCase(GetCondition(SpamDecision.NotSpam), emailAssistantExecutor)
.AddCase(GetCondition(SpamDecision.Spam), handleSpamExecutor)
.WithDefault(handleUncertainExecutor) // Clean default case
)
// Continue building the rest of the workflow
隨著路由決策數量的增加,switch-case 模式的擴展性要好得多,並且預設情況為意外值提供了保護機制。
執行範例
當您使用模棱兩可的電子郵件內容執行此工作流程時:
Email marked as uncertain: This email contains promotional language but might be from a legitimate business contact, requiring human review for proper classification.
嘗試將電子郵件內容更改為明顯的垃圾郵件或明顯的合法郵件,以觀察不同的路由過程。
完成實施
如需完整的工作實作,請參閱代理程式架構存放庫中的此 範例 。
在條件邊緣上建置
先前的條件式邊緣範例示範了雙向路由(垃圾郵件與合法電子郵件)。 然而,許多現實場景需要更複雜的決策樹。 當您需要根據不同條件路由到多個目的地時,交換機箱邊緣提供了更乾淨、更易於維護的解決方案。
您接下來要打造的項目
您將擴充電子郵件處理工作流程,以處理三個決策路徑:
- NotSpam → 電子郵件助手→發送電子郵件
- 垃圾郵件 →標示為垃圾郵件
- 不確定 →手動檢閱旗標 (預設案例)
關鍵的改進是使用單一的 Switch-Case 邊緣群組,而取代多個個別條件邊緣,這樣隨著決策複雜性的增長,工作流程更易於理解和維護。
涵蓋概念
增強型資料模型
更新您的資料模型以支援三向分類:
from typing import Literal
class DetectionResultAgent(BaseModel):
"""Structured output returned by the spam detection agent."""
# The agent classifies the email into one of three categories
spam_decision: Literal["NotSpam", "Spam", "Uncertain"]
reason: str
class EmailResponse(BaseModel):
"""Structured output returned by the email assistant agent."""
response: str
@dataclass
class DetectionResult:
"""Internal typed payload used for routing and downstream handling."""
spam_decision: str
reason: str
email_id: str
@dataclass
class Email:
"""In memory record of the email content stored in shared state."""
email_id: str
email_content: str
Switch-Case 條件式工廠
建立可重複使用的條件工廠,為每個垃圾郵件決策產生述句。
def get_case(expected_decision: str):
"""Factory that returns a predicate matching a specific spam_decision value."""
def condition(message: Any) -> bool:
# Only match when the upstream payload is a DetectionResult with the expected decision
return isinstance(message, DetectionResult) and message.spam_decision == expected_decision
return condition
這種工廠作法:
- 減少代碼重複: 一個函數生成所有條件謂詞
- 確保一致性:所有條件都遵循相同的模式
- 簡化維護:條件邏輯的變更發生在一個地方
具有共用狀態的工作流程執行程式
實作使用共用狀態的執行器,以避免在每個工作流程步驟中傳遞大型電子郵件內容:
EMAIL_STATE_PREFIX = "email:"
CURRENT_EMAIL_ID_KEY = "current_email_id"
@executor(id="store_email")
async def store_email(email_text: str, ctx: WorkflowContext[AgentExecutorRequest]) -> None:
"""Store email content once and pass around a lightweight ID reference."""
# Persist the raw email content in shared state
new_email = Email(email_id=str(uuid4()), email_content=email_text)
await ctx.set_shared_state(f"{EMAIL_STATE_PREFIX}{new_email.email_id}", new_email)
await ctx.set_shared_state(CURRENT_EMAIL_ID_KEY, new_email.email_id)
# Forward email to spam detection agent
await ctx.send_message(
AgentExecutorRequest(messages=[ChatMessage(Role.USER, text=new_email.email_content)], should_respond=True)
)
@executor(id="to_detection_result")
async def to_detection_result(response: AgentExecutorResponse, ctx: WorkflowContext[DetectionResult]) -> None:
"""Transform agent response into a typed DetectionResult with email ID."""
# Parse the agent's structured JSON output
parsed = DetectionResultAgent.model_validate_json(response.agent_run_response.text)
email_id: str = await ctx.get_shared_state(CURRENT_EMAIL_ID_KEY)
# Create typed message for switch-case routing
await ctx.send_message(DetectionResult(
spam_decision=parsed.spam_decision,
reason=parsed.reason,
email_id=email_id
))
@executor(id="submit_to_email_assistant")
async def submit_to_email_assistant(detection: DetectionResult, ctx: WorkflowContext[AgentExecutorRequest]) -> None:
"""Handle NotSpam emails by forwarding to the email assistant."""
# Guard against misrouting
if detection.spam_decision != "NotSpam":
raise RuntimeError("This executor should only handle NotSpam messages.")
# Retrieve original email content from shared state
email: Email = await ctx.get_shared_state(f"{EMAIL_STATE_PREFIX}{detection.email_id}")
await ctx.send_message(
AgentExecutorRequest(messages=[ChatMessage(Role.USER, text=email.email_content)], should_respond=True)
)
@executor(id="finalize_and_send")
async def finalize_and_send(response: AgentExecutorResponse, ctx: WorkflowContext[Never, str]) -> None:
"""Parse email assistant response and yield final output."""
parsed = EmailResponse.model_validate_json(response.agent_run_response.text)
await ctx.yield_output(f"Email sent: {parsed.response}")
@executor(id="handle_spam")
async def handle_spam(detection: DetectionResult, ctx: WorkflowContext[Never, str]) -> None:
"""Handle confirmed spam emails."""
if detection.spam_decision == "Spam":
await ctx.yield_output(f"Email marked as spam: {detection.reason}")
else:
raise RuntimeError("This executor should only handle Spam messages.")
@executor(id="handle_uncertain")
async def handle_uncertain(detection: DetectionResult, ctx: WorkflowContext[Never, str]) -> None:
"""Handle uncertain classifications that need manual review."""
if detection.spam_decision == "Uncertain":
# Include original content for human review
email: Email | None = await ctx.get_shared_state(f"{EMAIL_STATE_PREFIX}{detection.email_id}")
await ctx.yield_output(
f"Email marked as uncertain: {detection.reason}. Email content: {getattr(email, 'email_content', '')}"
)
else:
raise RuntimeError("This executor should only handle Uncertain messages.")
創建增強型 AI 代理
更新垃圾郵件偵測代理程式,以降低信心閾值,並傳回三向分類:
async def main():
chat_client = AzureOpenAIChatClient(credential=AzureCliCredential())
# Enhanced spam detection agent with three-way classification
spam_detection_agent = AgentExecutor(
chat_client.create_agent(
instructions=(
"You are a spam detection assistant that identifies spam emails. "
"Be less confident in your assessments. "
"Always return JSON with fields 'spam_decision' (one of NotSpam, Spam, Uncertain) "
"and 'reason' (string)."
),
response_format=DetectionResultAgent,
),
id="spam_detection_agent",
)
# Email assistant remains the same
email_assistant_agent = AgentExecutor(
chat_client.create_agent(
instructions=(
"You are an email assistant that helps users draft responses to emails with professionalism."
),
response_format=EmailResponse,
),
id="email_assistant_agent",
)
使用 Switch-Case Edge 群組建置工作流程
將多個條件式邊緣替換為單個交換機案例組:
# Build workflow using switch-case for cleaner three-way routing
workflow = (
WorkflowBuilder()
.set_start_executor(store_email)
.add_edge(store_email, spam_detection_agent)
.add_edge(spam_detection_agent, to_detection_result)
.add_switch_case_edge_group(
to_detection_result,
[
# Explicit cases for specific decisions
Case(condition=get_case("NotSpam"), target=submit_to_email_assistant),
Case(condition=get_case("Spam"), target=handle_spam),
# Default case catches anything that doesn't match above
Default(target=handle_uncertain),
],
)
.add_edge(submit_to_email_assistant, email_assistant_agent)
.add_edge(email_assistant_agent, finalize_and_send)
.build()
)
執行和測試
使用示範三向路由的模棱兩可的電子郵件內容執行工作流程:
# Use ambiguous email content that might trigger uncertain classification
email = (
"Hey there, I noticed you might be interested in our latest offer—no pressure, but it expires soon. "
"Let me know if you'd like more details."
)
# Execute and display results
events = await workflow.run(email)
outputs = events.get_outputs()
if outputs:
for output in outputs:
print(f"Workflow output: {output}")
Switch-Case Edge 的主要優勢
- 更乾淨的語法:一個邊緣群組而不是多個條件邊緣
- 依序檢查:依次檢查案例,在找到第一個符合條件時停止
- 保證路由:預設情況可確保訊息永遠不會卡住
- 更好的可維護性:新增案例需要最少的變更
- 類型安全:每個執行器都會驗證其輸入以捕獲路由錯誤
比較:條件與 Switch-Case
之前 (條件邊界):
.add_edge(detector, handler_a, condition=lambda x: x.result == "A")
.add_edge(detector, handler_b, condition=lambda x: x.result == "B")
.add_edge(detector, handler_c, condition=lambda x: x.result == "C")
之後(Switch-Case):
.add_switch_case_edge_group(
detector,
[
Case(condition=lambda x: x.result == "A", target=handler_a),
Case(condition=lambda x: x.result == "B", target=handler_b),
Default(target=handler_c), # Catches everything else
],
)
隨著路由決策數量的增加,switch-case 模式的擴展性要好得多,並且預設情況為意外值提供了保護機制。
Switch-Case 範例程式碼
如需完整的工作實作,請參閱代理程式架構存放庫中的 switch_case_edge_group.py 範例。
多重選擇邊界
超越開關外殼:多選路由
雖然切換案例邊緣將訊息路由到一個目的地,但實際工作流程通常需要根據資料特性觸發多個並行操作。 分割邊緣 (實作為具有分割器嘅扇出邊緣) 可啟用複雜的扇出模式,其中單一訊息可以同時啟動多個下游執行程式。
進階電子郵件處理工作流程
以切換案例範例為基礎,您將建立增強型電子郵件處理系統,以示範複雜的路由邏輯:
- 垃圾郵件→ 單一垃圾處理程序(如 switch-case 結構)
- 合法電子郵件 → 始終 啟動電子郵件助手 + 長篇電子郵件在符合條件時啟動摘要功能
- 不確定的電子郵件→ 單一不確定的處理常式(如切換大小文字)
- 資料庫持續性 →針對短電子郵件和摘要長電子郵件觸發
此模式可啟用適應內容特性的平行處理管線。
涵蓋概念
多重選擇的資料模型
擴充資料模型以支援電子郵件長度分析和摘要:
/// <summary>
/// Represents the result of enhanced email analysis with additional metadata.
/// </summary>
public sealed class AnalysisResult
{
[JsonPropertyName("spam_decision")]
[JsonConverter(typeof(JsonStringEnumConverter))]
public SpamDecision spamDecision { get; set; }
[JsonPropertyName("reason")]
public string Reason { get; set; } = string.Empty;
// Additional properties for sophisticated routing
[JsonIgnore]
public int EmailLength { get; set; }
[JsonIgnore]
public string EmailSummary { get; set; } = string.Empty;
[JsonIgnore]
public string EmailId { get; set; } = string.Empty;
}
/// <summary>
/// Represents the response from the email assistant.
/// </summary>
public sealed class EmailResponse
{
[JsonPropertyName("response")]
public string Response { get; set; } = string.Empty;
}
/// <summary>
/// Represents the response from the email summary agent.
/// </summary>
public sealed class EmailSummary
{
[JsonPropertyName("summary")]
public string Summary { get; set; } = string.Empty;
}
/// <summary>
/// A custom workflow event for database operations.
/// </summary>
internal sealed class DatabaseEvent(string message) : WorkflowEvent(message) { }
/// <summary>
/// Constants for email processing thresholds.
/// </summary>
public static class EmailProcessingConstants
{
public const int LongEmailThreshold = 100;
}
目標指派功能:多元選擇的核心
目標指派器函式決定應接收每則訊息的執行者:
/// <summary>
/// Creates a target assigner for routing messages based on the analysis result.
/// </summary>
/// <returns>A function that takes an analysis result and returns the target partitions.</returns>
private static Func<AnalysisResult?, int, IEnumerable<int>> GetTargetAssigner()
{
return (analysisResult, targetCount) =>
{
if (analysisResult is not null)
{
if (analysisResult.spamDecision == SpamDecision.Spam)
{
return [0]; // Route only to spam handler (index 0)
}
else if (analysisResult.spamDecision == SpamDecision.NotSpam)
{
// Always route to email assistant (index 1)
List<int> targets = [1];
// Conditionally add summarizer for long emails (index 2)
if (analysisResult.EmailLength > EmailProcessingConstants.LongEmailThreshold)
{
targets.Add(2);
}
return targets;
}
else // Uncertain
{
return [3]; // Route only to uncertain handler (index 3)
}
}
throw new ArgumentException("Invalid analysis result.");
};
}
目標指派器功能的主要特點
- 動態目標選取:傳回要啟用的執行器索引清單
- 內容感知路由: 根據郵件長度等消息屬性做出決策
- 並行處理:多個目標可以同時執行
- 條件邏輯:基於多個條件的複雜分支
增強的工作流程執行器
實作處理進階分析和路由的執行程式:
/// <summary>
/// Executor that analyzes emails using an AI agent with enhanced analysis.
/// </summary>
internal sealed class EmailAnalysisExecutor : Executor<ChatMessage, AnalysisResult>
{
private readonly AIAgent _emailAnalysisAgent;
public EmailAnalysisExecutor(AIAgent emailAnalysisAgent) : base("EmailAnalysisExecutor")
{
this._emailAnalysisAgent = emailAnalysisAgent;
}
public override async ValueTask<AnalysisResult> HandleAsync(ChatMessage message, IWorkflowContext context, CancellationToken cancellationToken = default)
{
// Generate a random email ID and store the email content
var newEmail = new Email
{
EmailId = Guid.NewGuid().ToString("N"),
EmailContent = message.Text
};
await context.QueueStateUpdateAsync(newEmail.EmailId, newEmail, scopeName: EmailStateConstants.EmailStateScope);
// Invoke the agent for enhanced analysis
var response = await this._emailAnalysisAgent.RunAsync(message);
var analysisResult = JsonSerializer.Deserialize<AnalysisResult>(response.Text);
// Enrich with metadata for routing decisions
analysisResult!.EmailId = newEmail.EmailId;
analysisResult.EmailLength = newEmail.EmailContent.Length;
return analysisResult;
}
}
/// <summary>
/// Executor that assists with email responses using an AI agent.
/// </summary>
internal sealed class EmailAssistantExecutor : Executor<AnalysisResult, EmailResponse>
{
private readonly AIAgent _emailAssistantAgent;
public EmailAssistantExecutor(AIAgent emailAssistantAgent) : base("EmailAssistantExecutor")
{
this._emailAssistantAgent = emailAssistantAgent;
}
public override async ValueTask<EmailResponse> HandleAsync(AnalysisResult message, IWorkflowContext context, CancellationToken cancellationToken = default)
{
if (message.spamDecision == SpamDecision.Spam)
{
throw new ArgumentException("This executor should only handle non-spam messages.");
}
// Retrieve the email content from shared state
var email = await context.ReadStateAsync<Email>(message.EmailId, scopeName: EmailStateConstants.EmailStateScope);
// Invoke the agent to draft a response
var response = await this._emailAssistantAgent.RunAsync(email!.EmailContent);
var emailResponse = JsonSerializer.Deserialize<EmailResponse>(response.Text);
return emailResponse!;
}
}
/// <summary>
/// Executor that summarizes emails using an AI agent for long emails.
/// </summary>
internal sealed class EmailSummaryExecutor : Executor<AnalysisResult, AnalysisResult>
{
private readonly AIAgent _emailSummaryAgent;
public EmailSummaryExecutor(AIAgent emailSummaryAgent) : base("EmailSummaryExecutor")
{
this._emailSummaryAgent = emailSummaryAgent;
}
public override async ValueTask<AnalysisResult> HandleAsync(AnalysisResult message, IWorkflowContext context, CancellationToken cancellationToken = default)
{
// Read the email content from shared state
var email = await context.ReadStateAsync<Email>(message.EmailId, scopeName: EmailStateConstants.EmailStateScope);
// Generate summary for long emails
var response = await this._emailSummaryAgent.RunAsync(email!.EmailContent);
var emailSummary = JsonSerializer.Deserialize<EmailSummary>(response.Text);
// Enrich the analysis result with the summary
message.EmailSummary = emailSummary!.Summary;
return message;
}
}
/// <summary>
/// Executor that sends emails.
/// </summary>
internal sealed class SendEmailExecutor : Executor<EmailResponse>
{
public SendEmailExecutor() : base("SendEmailExecutor") { }
public override async ValueTask HandleAsync(EmailResponse message, IWorkflowContext context, CancellationToken cancellationToken = default) =>
await context.YieldOutputAsync($"Email sent: {message.Response}");
}
/// <summary>
/// Executor that handles spam messages.
/// </summary>
internal sealed class HandleSpamExecutor : Executor<AnalysisResult>
{
public HandleSpamExecutor() : base("HandleSpamExecutor") { }
public override async ValueTask HandleAsync(AnalysisResult message, IWorkflowContext context, CancellationToken cancellationToken = default)
{
if (message.spamDecision == SpamDecision.Spam)
{
await context.YieldOutputAsync($"Email marked as spam: {message.Reason}");
}
else
{
throw new ArgumentException("This executor should only handle spam messages.");
}
}
}
/// <summary>
/// Executor that handles uncertain messages requiring manual review.
/// </summary>
internal sealed class HandleUncertainExecutor : Executor<AnalysisResult>
{
public HandleUncertainExecutor() : base("HandleUncertainExecutor") { }
public override async ValueTask HandleAsync(AnalysisResult message, IWorkflowContext context, CancellationToken cancellationToken = default)
{
if (message.spamDecision == SpamDecision.Uncertain)
{
var email = await context.ReadStateAsync<Email>(message.EmailId, scopeName: EmailStateConstants.EmailStateScope);
await context.YieldOutputAsync($"Email marked as uncertain: {message.Reason}. Email content: {email?.EmailContent}");
}
else
{
throw new ArgumentException("This executor should only handle uncertain spam decisions.");
}
}
}
/// <summary>
/// Executor that handles database access with custom events.
/// </summary>
internal sealed class DatabaseAccessExecutor : Executor<AnalysisResult>
{
public DatabaseAccessExecutor() : base("DatabaseAccessExecutor") { }
public override async ValueTask HandleAsync(AnalysisResult message, IWorkflowContext context, CancellationToken cancellationToken = default)
{
// Simulate database operations
await context.ReadStateAsync<Email>(message.EmailId, scopeName: EmailStateConstants.EmailStateScope);
await Task.Delay(100); // Simulate database access delay
// Emit custom database event for monitoring
await context.AddEventAsync(new DatabaseEvent($"Email {message.EmailId} saved to database."));
}
}
增強的人工智慧代理
建立用於分析、協助和摘要的代理:
/// <summary>
/// Create an enhanced email analysis agent.
/// </summary>
/// <returns>A ChatClientAgent configured for comprehensive email analysis</returns>
private static ChatClientAgent GetEmailAnalysisAgent(IChatClient chatClient) =>
new(chatClient, new ChatClientAgentOptions(instructions: "You are a spam detection assistant that identifies spam emails.")
{
ChatOptions = new()
{
ResponseFormat = ChatResponseFormat.ForJsonSchema<AnalysisResult>()
}
});
/// <summary>
/// Creates an email assistant agent.
/// </summary>
/// <returns>A ChatClientAgent configured for email assistance</returns>
private static ChatClientAgent GetEmailAssistantAgent(IChatClient chatClient) =>
new(chatClient, new ChatClientAgentOptions(instructions: "You are an email assistant that helps users draft responses to emails with professionalism.")
{
ChatOptions = new()
{
ResponseFormat = ChatResponseFormat.ForJsonSchema<EmailResponse>()
}
});
/// <summary>
/// Creates an agent that summarizes emails.
/// </summary>
/// <returns>A ChatClientAgent configured for email summarization</returns>
private static ChatClientAgent GetEmailSummaryAgent(IChatClient chatClient) =>
new(chatClient, new ChatClientAgentOptions(instructions: "You are an assistant that helps users summarize emails.")
{
ChatOptions = new()
{
ResponseFormat = ChatResponseFormat.ForJsonSchema<EmailSummary>()
}
});
多選工作流程建構
使用複雜的路由和平行處理來建構工作流程:
public static class Program
{
private static async Task Main()
{
// Set up the Azure OpenAI client
var endpoint = Environment.GetEnvironmentVariable("AZURE_OPENAI_ENDPOINT") ?? throw new Exception("AZURE_OPENAI_ENDPOINT is not set.");
var deploymentName = Environment.GetEnvironmentVariable("AZURE_OPENAI_DEPLOYMENT_NAME") ?? "gpt-4o-mini";
var chatClient = new AzureOpenAIClient(new Uri(endpoint), new AzureCliCredential()).GetChatClient(deploymentName).AsIChatClient();
// Create agents
AIAgent emailAnalysisAgent = GetEmailAnalysisAgent(chatClient);
AIAgent emailAssistantAgent = GetEmailAssistantAgent(chatClient);
AIAgent emailSummaryAgent = GetEmailSummaryAgent(chatClient);
// Create executors
var emailAnalysisExecutor = new EmailAnalysisExecutor(emailAnalysisAgent);
var emailAssistantExecutor = new EmailAssistantExecutor(emailAssistantAgent);
var emailSummaryExecutor = new EmailSummaryExecutor(emailSummaryAgent);
var sendEmailExecutor = new SendEmailExecutor();
var handleSpamExecutor = new HandleSpamExecutor();
var handleUncertainExecutor = new HandleUncertainExecutor();
var databaseAccessExecutor = new DatabaseAccessExecutor();
// Build the workflow with multi-selection fan-out
WorkflowBuilder builder = new(emailAnalysisExecutor);
builder.AddFanOutEdge(
emailAnalysisExecutor,
targets: [
handleSpamExecutor, // Index 0: Spam handler
emailAssistantExecutor, // Index 1: Email assistant (always for NotSpam)
emailSummaryExecutor, // Index 2: Summarizer (conditionally for long NotSpam)
handleUncertainExecutor, // Index 3: Uncertain handler
],
targetSelector: GetTargetAssigner()
)
// Email assistant branch
.AddEdge(emailAssistantExecutor, sendEmailExecutor)
// Database persistence: conditional routing
.AddEdge<AnalysisResult>(
emailAnalysisExecutor,
databaseAccessExecutor,
condition: analysisResult => analysisResult?.EmailLength <= EmailProcessingConstants.LongEmailThreshold) // Short emails
.AddEdge(emailSummaryExecutor, databaseAccessExecutor) // Long emails with summary
.WithOutputFrom(handleUncertainExecutor, handleSpamExecutor, sendEmailExecutor);
var workflow = builder.Build();
// Read a moderately long email to trigger both assistant and summarizer
string email = Resources.Read("email.txt");
// Execute the workflow with custom event handling
StreamingRun run = await InProcessExecution.StreamAsync(workflow, new ChatMessage(ChatRole.User, email));
await run.TrySendMessageAsync(new TurnToken(emitEvents: true));
await foreach (WorkflowEvent evt in run.WatchStreamAsync().ConfigureAwait(false))
{
if (evt is WorkflowOutputEvent outputEvent)
{
Console.WriteLine($"Output: {outputEvent}");
}
if (evt is DatabaseEvent databaseEvent)
{
Console.WriteLine($"Database: {databaseEvent}");
}
}
}
}
模式比較:多重選擇與 Switch-Case
Switch-Case 結構(上個版本):
// One input → exactly one output
builder.AddSwitch(spamDetectionExecutor, switchBuilder =>
switchBuilder
.AddCase(GetCondition(SpamDecision.NotSpam), emailAssistantExecutor)
.AddCase(GetCondition(SpamDecision.Spam), handleSpamExecutor)
.WithDefault(handleUncertainExecutor)
)
多選模式:
// One input → one or more outputs (dynamic fan-out)
builder.AddFanOutEdge(
emailAnalysisExecutor,
targets: [handleSpamExecutor, emailAssistantExecutor, emailSummaryExecutor, handleUncertainExecutor],
targetSelector: GetTargetAssigner() // Returns list of target indices
)
多選邊的主要優點
- 並行處理:多個分支可以同時執行
- 條件式散出:目標數量會因內容而異
- 內容感知路由:基於訊息屬性的決策,而不僅僅是類型
- 高效資源利用:僅激活必要的分支
- 複雜的業務邏輯: 支持複雜的路由場景
執行多重選取範例
當您使用長電子郵件執行此工作流程時:
Output: Email sent: [Professional response generated by AI]
Database: Email abc123 saved to database.
當您使用簡短的電子郵件執行時,會略過摘要器:
Output: Email sent: [Professional response generated by AI]
Database: Email def456 saved to database.
Real-World 使用案例
- 電子郵件系統:轉交給回覆助手 + 存檔 + 分析(有條件)
- 內容處理: 觸發轉錄 + 翻譯 + 分析(基於內容類型)
- 訂單處理:路由到履行 + 計費 + 通知(基於訂單屬性)
- 數據管道: 根據數據特徵觸發不同的分析流程
多選功能完整實現
如需完整的工作實作,請參閱代理程式架構存放庫中的此 範例 。
超越開關外殼:多選路由
雖然切換案例邊緣將訊息路由到一個目的地,但實際工作流程通常需要根據資料特性觸發多個並行操作。 分割邊緣 (實作為多重選取邊緣群組) 可啟用複雜的扇出模式,其中單一訊息可以同時啟動多個下游執行程式。
進階電子郵件處理工作流程
以切換案例範例為基礎,您將建立增強型電子郵件處理系統,以示範複雜的路由邏輯:
- 垃圾郵件→ 單一垃圾處理程序(如 switch-case 結構)
- 合法電子郵件 → 始終 啟動電子郵件助手 + 長篇電子郵件在符合條件時啟動摘要功能
- 不確定的電子郵件→ 單一不確定的處理常式(如切換大小文字)
- 資料庫持續性 →針對短電子郵件和摘要長電子郵件觸發
此模式可啟用適應內容特性的平行處理管線。
涵蓋概念
用於多重選擇的增強資料模型
擴充資料模型以支援電子郵件長度分析和摘要:
class AnalysisResultAgent(BaseModel):
"""Enhanced structured output from email analysis agent."""
spam_decision: Literal["NotSpam", "Spam", "Uncertain"]
reason: str
class EmailResponse(BaseModel):
"""Response from email assistant."""
response: str
class EmailSummaryModel(BaseModel):
"""Summary generated by email summary agent."""
summary: str
@dataclass
class AnalysisResult:
"""Internal analysis result with email metadata for routing decisions."""
spam_decision: str
reason: str
email_length: int # Used for conditional routing
email_summary: str # Populated by summary agent
email_id: str
@dataclass
class Email:
"""Email content stored in shared state."""
email_id: str
email_content: str
# Custom event for database operations
class DatabaseEvent(WorkflowEvent):
"""Custom event for tracking database operations."""
pass
選擇功能:多重選擇的核心
選取函數會決定哪些執行程式應該接收每則訊息:
LONG_EMAIL_THRESHOLD = 100
def select_targets(analysis: AnalysisResult, target_ids: list[str]) -> list[str]:
"""Intelligent routing based on spam decision and email characteristics."""
# Target order: [handle_spam, submit_to_email_assistant, summarize_email, handle_uncertain]
handle_spam_id, submit_to_email_assistant_id, summarize_email_id, handle_uncertain_id = target_ids
if analysis.spam_decision == "Spam":
# Route only to spam handler
return [handle_spam_id]
elif analysis.spam_decision == "NotSpam":
# Always route to email assistant
targets = [submit_to_email_assistant_id]
# Conditionally add summarizer for long emails
if analysis.email_length > LONG_EMAIL_THRESHOLD:
targets.append(summarize_email_id)
return targets
else: # Uncertain
# Route only to uncertain handler
return [handle_uncertain_id]
選擇功能的主要特點
- 動態目標選取:傳回要啟用的執行程式ID清單
- 內容感知路由: 根據消息屬性做出決策
- 並行處理:多個目標可以同時執行
- 條件邏輯:基於多個條件的複雜分支
多重選擇工作流程執行器
實作執行程式來處理增強的分析和路由:
EMAIL_STATE_PREFIX = "email:"
CURRENT_EMAIL_ID_KEY = "current_email_id"
@executor(id="store_email")
async def store_email(email_text: str, ctx: WorkflowContext[AgentExecutorRequest]) -> None:
"""Store email and initiate analysis."""
new_email = Email(email_id=str(uuid4()), email_content=email_text)
await ctx.set_shared_state(f"{EMAIL_STATE_PREFIX}{new_email.email_id}", new_email)
await ctx.set_shared_state(CURRENT_EMAIL_ID_KEY, new_email.email_id)
await ctx.send_message(
AgentExecutorRequest(messages=[ChatMessage(Role.USER, text=new_email.email_content)], should_respond=True)
)
@executor(id="to_analysis_result")
async def to_analysis_result(response: AgentExecutorResponse, ctx: WorkflowContext[AnalysisResult]) -> None:
"""Transform agent response into enriched analysis result."""
parsed = AnalysisResultAgent.model_validate_json(response.agent_run_response.text)
email_id: str = await ctx.get_shared_state(CURRENT_EMAIL_ID_KEY)
email: Email = await ctx.get_shared_state(f"{EMAIL_STATE_PREFIX}{email_id}")
# Create enriched analysis result with email length for routing decisions
await ctx.send_message(
AnalysisResult(
spam_decision=parsed.spam_decision,
reason=parsed.reason,
email_length=len(email.email_content), # Key for conditional routing
email_summary="",
email_id=email_id,
)
)
@executor(id="submit_to_email_assistant")
async def submit_to_email_assistant(analysis: AnalysisResult, ctx: WorkflowContext[AgentExecutorRequest]) -> None:
"""Handle legitimate emails by forwarding to email assistant."""
if analysis.spam_decision != "NotSpam":
raise RuntimeError("This executor should only handle NotSpam messages.")
email: Email = await ctx.get_shared_state(f"{EMAIL_STATE_PREFIX}{analysis.email_id}")
await ctx.send_message(
AgentExecutorRequest(messages=[ChatMessage(Role.USER, text=email.email_content)], should_respond=True)
)
@executor(id="finalize_and_send")
async def finalize_and_send(response: AgentExecutorResponse, ctx: WorkflowContext[Never, str]) -> None:
"""Final step for email assistant branch."""
parsed = EmailResponse.model_validate_json(response.agent_run_response.text)
await ctx.yield_output(f"Email sent: {parsed.response}")
@executor(id="summarize_email")
async def summarize_email(analysis: AnalysisResult, ctx: WorkflowContext[AgentExecutorRequest]) -> None:
"""Generate summary for long emails (parallel branch)."""
# Only called for long NotSpam emails by selection function
email: Email = await ctx.get_shared_state(f"{EMAIL_STATE_PREFIX}{analysis.email_id}")
await ctx.send_message(
AgentExecutorRequest(messages=[ChatMessage(Role.USER, text=email.email_content)], should_respond=True)
)
@executor(id="merge_summary")
async def merge_summary(response: AgentExecutorResponse, ctx: WorkflowContext[AnalysisResult]) -> None:
"""Merge summary back into analysis result for database persistence."""
summary = EmailSummaryModel.model_validate_json(response.agent_run_response.text)
email_id: str = await ctx.get_shared_state(CURRENT_EMAIL_ID_KEY)
email: Email = await ctx.get_shared_state(f"{EMAIL_STATE_PREFIX}{email_id}")
# Create analysis result with summary for database storage
await ctx.send_message(
AnalysisResult(
spam_decision="NotSpam",
reason="",
email_length=len(email.email_content),
email_summary=summary.summary, # Now includes summary
email_id=email_id,
)
)
@executor(id="handle_spam")
async def handle_spam(analysis: AnalysisResult, ctx: WorkflowContext[Never, str]) -> None:
"""Handle spam emails (single target like switch-case)."""
if analysis.spam_decision == "Spam":
await ctx.yield_output(f"Email marked as spam: {analysis.reason}")
else:
raise RuntimeError("This executor should only handle Spam messages.")
@executor(id="handle_uncertain")
async def handle_uncertain(analysis: AnalysisResult, ctx: WorkflowContext[Never, str]) -> None:
"""Handle uncertain emails (single target like switch-case)."""
if analysis.spam_decision == "Uncertain":
email: Email | None = await ctx.get_shared_state(f"{EMAIL_STATE_PREFIX}{analysis.email_id}")
await ctx.yield_output(
f"Email marked as uncertain: {analysis.reason}. Email content: {getattr(email, 'email_content', '')}"
)
else:
raise RuntimeError("This executor should only handle Uncertain messages.")
@executor(id="database_access")
async def database_access(analysis: AnalysisResult, ctx: WorkflowContext[Never, str]) -> None:
"""Simulate database persistence with custom events."""
await asyncio.sleep(0.05) # Simulate DB operation
await ctx.add_event(DatabaseEvent(f"Email {analysis.email_id} saved to database."))
增強的人工智慧代理
建立用於分析、協助和摘要的代理:
async def main() -> None:
chat_client = AzureOpenAIChatClient(credential=AzureCliCredential())
# Enhanced analysis agent
email_analysis_agent = AgentExecutor(
chat_client.create_agent(
instructions=(
"You are a spam detection assistant that identifies spam emails. "
"Always return JSON with fields 'spam_decision' (one of NotSpam, Spam, Uncertain) "
"and 'reason' (string)."
),
response_format=AnalysisResultAgent,
),
id="email_analysis_agent",
)
# Email assistant (same as before)
email_assistant_agent = AgentExecutor(
chat_client.create_agent(
instructions=(
"You are an email assistant that helps users draft responses to emails with professionalism."
),
response_format=EmailResponse,
),
id="email_assistant_agent",
)
# New: Email summary agent for long emails
email_summary_agent = AgentExecutor(
chat_client.create_agent(
instructions="You are an assistant that helps users summarize emails.",
response_format=EmailSummaryModel,
),
id="email_summary_agent",
)
建立多重選擇工作流程
使用複雜的路由和平行處理來建構工作流程:
workflow = (
WorkflowBuilder()
.set_start_executor(store_email)
.add_edge(store_email, email_analysis_agent)
.add_edge(email_analysis_agent, to_analysis_result)
# Multi-selection edge group: intelligent fan-out based on content
.add_multi_selection_edge_group(
to_analysis_result,
[handle_spam, submit_to_email_assistant, summarize_email, handle_uncertain],
selection_func=select_targets,
)
# Email assistant branch (always for NotSpam)
.add_edge(submit_to_email_assistant, email_assistant_agent)
.add_edge(email_assistant_agent, finalize_and_send)
# Summary branch (only for long NotSpam emails)
.add_edge(summarize_email, email_summary_agent)
.add_edge(email_summary_agent, merge_summary)
# Database persistence: conditional routing
.add_edge(to_analysis_result, database_access,
condition=lambda r: r.email_length <= LONG_EMAIL_THRESHOLD) # Short emails
.add_edge(merge_summary, database_access) # Long emails with summary
.build()
)
使用事件串流執行
執行工作流程,並透過自訂事件觀察平行執行:
# Use a moderately long email to trigger both assistant and summarizer
email = """
Hello team, here are the updates for this week:
1. Project Alpha is on track and we should have the first milestone completed by Friday.
2. The client presentation has been scheduled for next Tuesday at 2 PM.
3. Please review the Q4 budget allocation and provide feedback by Wednesday.
Let me know if you have any questions or concerns.
Best regards,
Alex
"""
# Stream events to see parallel execution
async for event in workflow.run_stream(email):
if isinstance(event, DatabaseEvent):
print(f"Database: {event}")
elif isinstance(event, WorkflowOutputEvent):
print(f"Output: {event.data}")
多選與 Switch-Case 比較
Switch-Case 結構(上個版本):
# One input → exactly one output
.add_switch_case_edge_group(
source,
[
Case(condition=lambda x: x.result == "A", target=handler_a),
Case(condition=lambda x: x.result == "B", target=handler_b),
Default(target=handler_c),
],
)
多選模式:
# One input → one or more outputs (dynamic fan-out)
.add_multi_selection_edge_group(
source,
[handler_a, handler_b, handler_c, handler_d],
selection_func=intelligent_router, # Returns list of target IDs
)
C# 多重選擇功能的優勢
- 並行處理:多個分支可以同時執行
- 條件式散出:目標數量會因內容而異
- 內容感知路由:基於訊息屬性的決策,而不僅僅是類型
- 高效資源利用:僅激活必要的分支
- 複雜的業務邏輯: 支持複雜的路由場景
C# 實際應用程式
- 電子郵件系統:轉交給回覆助手 + 存檔 + 分析(有條件)
- 內容處理: 觸發轉錄 + 翻譯 + 分析(基於內容類型)
- 訂單處理:路由到履行 + 計費 + 通知(基於訂單屬性)
- 數據管道: 根據數據特徵觸發不同的分析流程
多選範例程式碼
如需完整的工作實作,請參閱代理程式架構存放庫中的 multi_selection_edge_group.py 範例。