Not
Bu sayfaya erişim yetkilendirme gerektiriyor. Oturum açmayı veya dizinleri değiştirmeyi deneyebilirsiniz.
Bu sayfaya erişim yetkilendirme gerektiriyor. Dizinleri değiştirmeyi deneyebilirsiniz.
Bu öğreticide, Agent Framework kullanarak dallanma mantığıyla iş akışı oluşturmayı öğreneceksiniz. Dallanma mantığı, iş akışınızın belirli koşullara göre kararlar alarak daha karmaşık ve dinamik bir davranışa olanak tanır.
Koşullu Kenarlar
Koşullu kenarlar, iş akışınızın iş akışı üzerinden akan iletilerin içeriğine veya özelliklerine göre yönlendirme kararları vermesine olanak tanır. Bu, çalışma zamanı koşullarına göre farklı yürütme yollarının alındığı dinamik dallanmayı etkinleştirir.
Neler Oluşturacaksınız
Koşullu yönlendirmeyi gösteren bir e-posta işleme iş akışı oluşturacaksınız:
- Gelen e-postaları analiz eden ve yapılandırılmış JSON döndüren bir istenmeyen posta algılama aracısı.
- E-postaları sınıflandırmaya göre farklı işleyicilere yönlendiren koşullu kenarlar.
- Profesyonel yanıtlar taslağı hazırlayan meşru bir e-posta işleyicisi.
- Şüpheli e-postaları işaretleyen bir istenmeyen posta işleyicisi.
- İş akışı adımları arasında e-posta verilerini kalıcı hale getirmek için paylaşılan durum yönetimi.
Ele Alınan Kavramlar
Önkoşullar
- .NET 8.0 SDK veya üzeri.
- Azure OpenAI hizmet uç noktası ve dağıtımı yapılandırıldı.
- Azure CLI yüklü ve Azure kimlik bilgisi için kimlik doğrulaması yapılmıştır.
- C# ve zaman uyumsuz programlama hakkında temel bilgiler.
- Yeni bir konsol uygulaması.
NuGet paketlerini yükleme
İlk olarak, .NET projeniz için gerekli paketleri yükleyin:
dotnet add package Azure.AI.OpenAI --prerelease
dotnet add package Azure.Identity
dotnet add package Microsoft.Agents.AI.Workflows --prerelease
dotnet add package Microsoft.Extensions.AI.OpenAI --prerelease
Veri Modellerini Tanımlama
İş akışınızda akacak veri yapılarını tanımlayarak başlayın:
using System.Text.Json.Serialization;
/// <summary>
/// Represents the result of spam detection.
/// </summary>
public sealed class DetectionResult
{
[JsonPropertyName("is_spam")]
public bool IsSpam { get; set; }
[JsonPropertyName("reason")]
public string Reason { get; set; } = string.Empty;
// Email ID is generated by the executor, not the agent
[JsonIgnore]
public string EmailId { get; set; } = string.Empty;
}
/// <summary>
/// Represents an email.
/// </summary>
internal sealed class Email
{
[JsonPropertyName("email_id")]
public string EmailId { get; set; } = string.Empty;
[JsonPropertyName("email_content")]
public string EmailContent { get; set; } = string.Empty;
}
/// <summary>
/// Represents the response from the email assistant.
/// </summary>
public sealed class EmailResponse
{
[JsonPropertyName("response")]
public string Response { get; set; } = string.Empty;
}
/// <summary>
/// Constants for shared state scopes.
/// </summary>
internal static class EmailStateConstants
{
public const string EmailStateScope = "EmailState";
}
Koşul İşlevleri Oluşturma
Condition işlevi, iş akışının hangi yolu izlemesi gerektiğini belirlemek için istenmeyen posta algılama sonucunu değerlendirir:
/// <summary>
/// Creates a condition for routing messages based on the expected spam detection result.
/// </summary>
/// <param name="expectedResult">The expected spam detection result</param>
/// <returns>A function that evaluates whether a message meets the expected result</returns>
private static Func<object?, bool> GetCondition(bool expectedResult) =>
detectionResult => detectionResult is DetectionResult result && result.IsSpam == expectedResult;
Bu koşul fonksiyonu:
- Bir
bool expectedResultparametre alır (spam için true, spam olmayan için false) - Kenar koşulu olarak kullanılabilecek bir işlev döndürür
- İletinin güvenli bir şekilde bir
DetectionResultolup olmadığını denetler veIsSpamözelliğini karşılaştırır.
Yapay zeka aracıları oluşturma
İstenmeyen posta algılamayı ve e-posta yardımını işleyecek yapay zeka aracılarını ayarlayın:
using Azure.AI.OpenAI;
using Azure.Identity;
using Microsoft.Agents.AI;
using Microsoft.Extensions.AI;
/// <summary>
/// Creates a spam detection agent.
/// </summary>
/// <returns>A ChatClientAgent configured for spam detection</returns>
private static ChatClientAgent GetSpamDetectionAgent(IChatClient chatClient) =>
new(chatClient, new ChatClientAgentOptions(instructions: "You are a spam detection assistant that identifies spam emails.")
{
ChatOptions = new()
{
ResponseFormat = ChatResponseFormat.ForJsonSchema(AIJsonUtilities.CreateJsonSchema(typeof(DetectionResult)))
}
});
/// <summary>
/// Creates an email assistant agent.
/// </summary>
/// <returns>A ChatClientAgent configured for email assistance</returns>
private static ChatClientAgent GetEmailAssistantAgent(IChatClient chatClient) =>
new(chatClient, new ChatClientAgentOptions(instructions: "You are an email assistant that helps users draft professional responses to emails.")
{
ChatOptions = new()
{
ResponseFormat = ChatResponseFormat.ForJsonSchema(AIJsonUtilities.CreateJsonSchema(typeof(EmailResponse)))
}
});
Yürütücüleri Uygulama
E-posta işlemenin farklı aşamalarını işleyen iş akışı yürütücüleri oluşturun:
using Microsoft.Agents.AI.Workflows;
using System.Text.Json;
/// <summary>
/// Executor that detects spam using an AI agent.
/// </summary>
internal sealed class SpamDetectionExecutor : Executor<ChatMessage, DetectionResult>
{
private readonly AIAgent _spamDetectionAgent;
public SpamDetectionExecutor(AIAgent spamDetectionAgent) : base("SpamDetectionExecutor")
{
this._spamDetectionAgent = spamDetectionAgent;
}
public override async ValueTask<DetectionResult> HandleAsync(ChatMessage message, IWorkflowContext context, CancellationToken cancellationToken = default)
{
// Generate a random email ID and store the email content to shared state
var newEmail = new Email
{
EmailId = Guid.NewGuid().ToString("N"),
EmailContent = message.Text
};
await context.QueueStateUpdateAsync(newEmail.EmailId, newEmail, scopeName: EmailStateConstants.EmailStateScope);
// Invoke the agent for spam detection
var response = await this._spamDetectionAgent.RunAsync(message);
var detectionResult = JsonSerializer.Deserialize<DetectionResult>(response.Text);
detectionResult!.EmailId = newEmail.EmailId;
return detectionResult;
}
}
/// <summary>
/// Executor that assists with email responses using an AI agent.
/// </summary>
internal sealed class EmailAssistantExecutor : Executor<DetectionResult, EmailResponse>
{
private readonly AIAgent _emailAssistantAgent;
public EmailAssistantExecutor(AIAgent emailAssistantAgent) : base("EmailAssistantExecutor")
{
this._emailAssistantAgent = emailAssistantAgent;
}
public override async ValueTask<EmailResponse> HandleAsync(DetectionResult message, IWorkflowContext context, CancellationToken cancellationToken = default)
{
if (message.IsSpam)
{
throw new ArgumentException("This executor should only handle non-spam messages.");
}
// Retrieve the email content from shared state
var email = await context.ReadStateAsync<Email>(message.EmailId, scopeName: EmailStateConstants.EmailStateScope)
?? throw new InvalidOperationException("Email not found.");
// Invoke the agent to draft a response
var response = await this._emailAssistantAgent.RunAsync(email.EmailContent);
var emailResponse = JsonSerializer.Deserialize<EmailResponse>(response.Text);
return emailResponse!;
}
}
/// <summary>
/// Executor that sends emails.
/// </summary>
internal sealed class SendEmailExecutor : Executor<EmailResponse>
{
public SendEmailExecutor() : base("SendEmailExecutor") { }
public override async ValueTask HandleAsync(EmailResponse message, IWorkflowContext context, CancellationToken cancellationToken = default) =>
await context.YieldOutputAsync($"Email sent: {message.Response}");
}
/// <summary>
/// Executor that handles spam messages.
/// </summary>
internal sealed class HandleSpamExecutor : Executor<DetectionResult>
{
public HandleSpamExecutor() : base("HandleSpamExecutor") { }
public override async ValueTask HandleAsync(DetectionResult message, IWorkflowContext context, CancellationToken cancellationToken = default)
{
if (message.IsSpam)
{
await context.YieldOutputAsync($"Email marked as spam: {message.Reason}");
}
else
{
throw new ArgumentException("This executor should only handle spam messages.");
}
}
}
Koşullu Kenarlarla İş Akışı Oluşturma
Şimdi iş akışını oluşturan ve yürüten ana programı oluşturun:
using Microsoft.Extensions.AI;
public static class Program
{
private static async Task Main()
{
// Set up the Azure OpenAI client
var endpoint = Environment.GetEnvironmentVariable("AZURE_OPENAI_ENDPOINT")
?? throw new Exception("AZURE_OPENAI_ENDPOINT is not set.");
var deploymentName = Environment.GetEnvironmentVariable("AZURE_OPENAI_DEPLOYMENT_NAME") ?? "gpt-4o-mini";
var chatClient = new AzureOpenAIClient(new Uri(endpoint), new AzureCliCredential())
.GetChatClient(deploymentName).AsIChatClient();
// Create agents
AIAgent spamDetectionAgent = GetSpamDetectionAgent(chatClient);
AIAgent emailAssistantAgent = GetEmailAssistantAgent(chatClient);
// Create executors
var spamDetectionExecutor = new SpamDetectionExecutor(spamDetectionAgent);
var emailAssistantExecutor = new EmailAssistantExecutor(emailAssistantAgent);
var sendEmailExecutor = new SendEmailExecutor();
var handleSpamExecutor = new HandleSpamExecutor();
// Build the workflow with conditional edges
var workflow = new WorkflowBuilder(spamDetectionExecutor)
// Non-spam path: route to email assistant when IsSpam = false
.AddEdge(spamDetectionExecutor, emailAssistantExecutor, condition: GetCondition(expectedResult: false))
.AddEdge(emailAssistantExecutor, sendEmailExecutor)
// Spam path: route to spam handler when IsSpam = true
.AddEdge(spamDetectionExecutor, handleSpamExecutor, condition: GetCondition(expectedResult: true))
.WithOutputFrom(handleSpamExecutor, sendEmailExecutor)
.Build();
// Execute the workflow with sample spam email
string emailContent = "Congratulations! You've won $1,000,000! Click here to claim your prize now!";
StreamingRun run = await InProcessExecution.StreamAsync(workflow, new ChatMessage(ChatRole.User, emailContent));
await run.TrySendMessageAsync(new TurnToken(emitEvents: true));
await foreach (WorkflowEvent evt in run.WatchStreamAsync().ConfigureAwait(false))
{
if (evt is WorkflowOutputEvent outputEvent)
{
Console.WriteLine($"{outputEvent}");
}
}
}
}
Nasıl Çalışır?
İş Akışı Girişi: İş akışı bir
spamDetectionExecutoralma ileChatMessagebaşlar.İstenmeyen Posta Analizi: İstenmeyen posta algılama aracısı e-postayı analiz eder ve ve
DetectionResultözellikleriyleIsSpamyapılandırılmışReasonbir sonuç döndürür.Koşullu Yönlendirme:
IsSpamdeğerine göre:-
İstenmeyen postaysa (
IsSpam = true):HandleSpamExecutorGetCondition(true)kullanarak yönlendirilir -
Geçerliyse (
IsSpam = false):EmailAssistantExecutoriçinGetCondition(false)kullanarak yollar
-
İstenmeyen postaysa (
Yanıt Oluşturma: Yasal e-postalar için e-posta yardımcısı profesyonel bir yanıt hazırlar.
Son Çıkış: İş akışı bir istenmeyen posta bildirimi verir veya taslak e-posta yanıtını gönderir.
Koşullu Kenarların Temel Özellikleri
Type-Safe Koşul:
GetConditionyöntemi, ileti içeriğini güvenli bir şekilde değerlendiren, yeniden kullanılabilir koşul işlevleri oluşturur.Birden Çok Yol: Tek bir yürütücü, karmaşık dallanma mantığını etkinleştirerek farklı koşullara sahip birden çok giden kenara sahip olabilir.
Paylaşılan Durum: E-posta verileri, kapsamlı durum yönetimi kullanılarak yürütücüler arasında kalır ve aşağı akış yürütücülerinin özgün içeriğe erişmesine olanak sağlar.
Hata İşleme: Yürütücüler girişlerini doğrular ve beklenmeyen ileti türlerini alırken anlamlı özel durumlar oluşturur.
Temiz Mimari: Her yürütücü tek bir sorumluluğa sahiptir ve bu da iş akışının sürdürülebilir ve test edilebilir olmasını sağlar.
Örneği Çalıştırma
Bu iş akışını örnek istenmeyen posta e-postasıyla çalıştırdığınızda:
Email marked as spam: This email contains common spam indicators including monetary prizes, urgency tactics, and suspicious links that are typical of phishing attempts.
E-posta içeriğini yasal bir şeyle değiştirmeyi deneyin:
string emailContent = "Hi, I wanted to follow up on our meeting yesterday and get your thoughts on the project proposal.";
İş akışı e-posta yardımcısına yönlendirilir ve bunun yerine profesyonel bir yanıt oluşturur.
Bu koşullu yönlendirme düzeni, karmaşık karar ağaçlarını ve iş mantığını işleyebilen gelişmiş iş akışları oluşturmanın temelini oluşturur.
Uygulamayı Tamamla
Tam çalışma uygulaması için Agent Framework deposundaki bu örneğe bakın.
Neler Oluşturacaksınız
Koşullu yönlendirmeyi gösteren bir e-posta işleme iş akışı oluşturacaksınız:
- Gelen e-postaları analiz eden bir istenmeyen posta algılama aracısı
- E-postaları sınıflandırmaya göre farklı işleyicilere yönlendiren koşullu kenarlar
- Profesyonel yanıtlar hazırlayan meşru bir e-posta işleyicisi
- Şüpheli e-postaları işaretleyen istenmeyen posta işleyicisi
Ele Alınan Kavramlar
Önkoşullar
- Python 3.10 veya üzeri
- Agent Framework yüklendi:
pip install agent-framework-core --pre - Uygun ortam değişkenleriyle yapılandırılmış Azure OpenAI hizmeti
- Azure CLI kimlik doğrulaması:
az login
1. Adım: Gerekli Bağımlılıkları İçeri Aktarma
Koşullu iş akışları için gerekli bileşenleri içeri aktararak başlayın:
import asyncio
import os
from dataclasses import dataclass
from typing import Any, Literal
from uuid import uuid4
from typing_extensions import Never
from agent_framework import (
AgentExecutor,
AgentExecutorRequest,
AgentExecutorResponse,
ChatMessage,
Role,
WorkflowBuilder,
WorkflowContext,
executor,
Case,
Default,
)
from agent_framework.azure import AzureOpenAIChatClient
from azure.identity import AzureCliCredential
from pydantic import BaseModel
2. Adım: Veri Modellerini Tanımlama
İş akışı bileşenleri arasında yapılandırılmış veri alışverişi için Pydantic modelleri oluşturun:
class DetectionResult(BaseModel):
"""Represents the result of spam detection."""
# is_spam drives the routing decision taken by edge conditions
is_spam: bool
# Human readable rationale from the detector
reason: str
# The agent must include the original email so downstream agents can operate without reloading content
email_content: str
class EmailResponse(BaseModel):
"""Represents the response from the email assistant."""
# The drafted reply that a user could copy or send
response: str
3. Adım: Koşul İşlevleri Oluşturma
Yönlendirme kararlarını belirleyecek koşul işlevlerini tanımlayın:
def get_condition(expected_result: bool):
"""Create a condition callable that routes based on DetectionResult.is_spam."""
# The returned function will be used as an edge predicate.
# It receives whatever the upstream executor produced.
def condition(message: Any) -> bool:
# Defensive guard. If a non AgentExecutorResponse appears, let the edge pass to avoid dead ends.
if not isinstance(message, AgentExecutorResponse):
return True
try:
# Prefer parsing a structured DetectionResult from the agent JSON text.
# Using model_validate_json ensures type safety and raises if the shape is wrong.
detection = DetectionResult.model_validate_json(message.agent_run_response.text)
# Route only when the spam flag matches the expected path.
return detection.is_spam == expected_result
except Exception:
# Fail closed on parse errors so we do not accidentally route to the wrong path.
# Returning False prevents this edge from activating.
return False
return condition
4. Adım: Yürütme İşleyicilerini Oluşturma
Farklı yönlendirme sonuçlarını işlemek için yürütücüleri tanımlayın:
@executor(id="send_email")
async def handle_email_response(response: AgentExecutorResponse, ctx: WorkflowContext[Never, str]) -> None:
"""Handle legitimate emails by drafting a professional response."""
# Downstream of the email assistant. Parse a validated EmailResponse and yield the workflow output.
email_response = EmailResponse.model_validate_json(response.agent_run_response.text)
await ctx.yield_output(f"Email sent:\n{email_response.response}")
@executor(id="handle_spam")
async def handle_spam_classifier_response(response: AgentExecutorResponse, ctx: WorkflowContext[Never, str]) -> None:
"""Handle spam emails by marking them appropriately."""
# Spam path. Confirm the DetectionResult and yield the workflow output. Guard against accidental non spam input.
detection = DetectionResult.model_validate_json(response.agent_run_response.text)
if detection.is_spam:
await ctx.yield_output(f"Email marked as spam: {detection.reason}")
else:
# This indicates the routing predicate and executor contract are out of sync.
raise RuntimeError("This executor should only handle spam messages.")
@executor(id="to_email_assistant_request")
async def to_email_assistant_request(
response: AgentExecutorResponse, ctx: WorkflowContext[AgentExecutorRequest]
) -> None:
"""Transform spam detection response into a request for the email assistant."""
# Parse the detection result and extract the email content for the assistant
detection = DetectionResult.model_validate_json(response.agent_run_response.text)
# Create a new request for the email assistant with the original email content
request = AgentExecutorRequest(
messages=[ChatMessage(Role.USER, text=detection.email_content)],
should_respond=True
)
await ctx.send_message(request)
5. Adım: Yapay zeka aracıları oluşturma
Yapılandırılmış çıkış biçimlendirmesi ile Azure OpenAI aracılarını ayarlayın:
async def main() -> None:
# Create agents
# AzureCliCredential uses your current az login. This avoids embedding secrets in code.
chat_client = AzureOpenAIChatClient(credential=AzureCliCredential())
# Agent 1. Classifies spam and returns a DetectionResult object.
# response_format enforces that the LLM returns parsable JSON for the Pydantic model.
spam_detection_agent = AgentExecutor(
chat_client.create_agent(
instructions=(
"You are a spam detection assistant that identifies spam emails. "
"Always return JSON with fields is_spam (bool), reason (string), and email_content (string). "
"Include the original email content in email_content."
),
response_format=DetectionResult,
),
id="spam_detection_agent",
)
# Agent 2. Drafts a professional reply. Also uses structured JSON output for reliability.
email_assistant_agent = AgentExecutor(
chat_client.create_agent(
instructions=(
"You are an email assistant that helps users draft professional responses to emails. "
"Your input might be a JSON object that includes 'email_content'; base your reply on that content. "
"Return JSON with a single field 'response' containing the drafted reply."
),
response_format=EmailResponse,
),
id="email_assistant_agent",
)
6. Adım: Koşullu İş Akışı Oluşturma
İstenmeyen posta algılama sonuçlarına göre yönlendiren koşullu kenarlara sahip bir iş akışı oluşturun:
# Build the workflow graph.
# Start at the spam detector.
# If not spam, hop to a transformer that creates a new AgentExecutorRequest,
# then call the email assistant, then finalize.
# If spam, go directly to the spam handler and finalize.
workflow = (
WorkflowBuilder()
.set_start_executor(spam_detection_agent)
# Not spam path: transform response -> request for assistant -> assistant -> send email
.add_edge(spam_detection_agent, to_email_assistant_request, condition=get_condition(False))
.add_edge(to_email_assistant_request, email_assistant_agent)
.add_edge(email_assistant_agent, handle_email_response)
# Spam path: send to spam handler
.add_edge(spam_detection_agent, handle_spam_classifier_response, condition=get_condition(True))
.build()
)
7. Adım: İş Akışını Yürütme
İş akışını örnek e-posta içeriğiyle çalıştırın:
# Read Email content from the sample resource file.
# This keeps the sample deterministic since the model sees the same email every run.
email_path = os.path.join(os.path.dirname(os.path.dirname(os.path.realpath(__file__))), "resources", "email.txt")
with open(email_path) as email_file: # noqa: ASYNC230
email = email_file.read()
# Execute the workflow. Since the start is an AgentExecutor, pass an AgentExecutorRequest.
# The workflow completes when it becomes idle (no more work to do).
request = AgentExecutorRequest(messages=[ChatMessage(Role.USER, text=email)], should_respond=True)
events = await workflow.run(request)
outputs = events.get_outputs()
if outputs:
print(f"Workflow output: {outputs[0]}")
if __name__ == "__main__":
asyncio.run(main())
Koşullu Kenarlar Nasıl Çalışır?
Koşul İşlevleri:
get_condition()işlevi, ileti içeriğini inceleyen ve kenardan geçilip geçilmeyeceğini belirlemek içinTrueveyaFalsedöndüren bir koşul ifadesi oluşturur.İleti denetimi: Koşullar, Pydantic modelleriyle ayrıştırılan aracı yanıtlarından alınan yapılandırılmış veriler de dahil olmak üzere iletinin herhangi bir yönünü inceleyebilir.
Savunma Programlama: Koşul işlevi, yapılandırılmış verileri ayrıştırırken yönlendirme hatalarını önlemek için hata işlemeyi içerir.
Dinamik Yönlendirme: İstenmeyen posta algılama sonucuna bağlı olarak, e-postalar otomatik olarak e-posta yardımcısına (meşru e-postalar için) veya istenmeyen posta işleyicisine (şüpheli e-postalar için) yönlendirilir.
Önemli Kavramlar
- Kenar Koşulları: Kenardan geçiş yapılıp yapılmayacağını belirleyen Boole koşulları
-
Yapılandırılmış Çıkışlar: Güvenilir veri ayrıştırma ile Pydantic modellerinin
response_formatkullanılması - Savunma Yönlendirmesi: Koşul işlevleri iş akışı çıkmazlarını önlemek için istisnai durumları işler
- İleti Dönüştürme: Yürütücüler, iş akışı adımları arasında ileti türlerini dönüştürebilir
Uygulamayı Tamamla
Tam çalışma uygulaması için Agent Framework deposundaki edge_condition.py örneğine bakın.
Switch-Case Kenarlar
Koşullu Kenarlar Üzerine İnşa Etme
Önceki koşullu kenarlar örneğinde iki yönlü yönlendirme (istenmeyen posta ve yasal e-postalar) gösterilmiştir. Ancak, birçok gerçek dünya senaryosu daha karmaşık karar ağaçları gerektirir. Anahtar servis talebi kenarları, farklı koşullara göre birden çok hedefe yönlendirmeniz gerektiğinde daha temiz ve daha bakımlı bir çözüm sağlar.
Switch-Case ile Neler Oluşturacaksınız?
E-posta işleme iş akışını üç karar yolunu işleyecek şekilde genişleteceksiniz:
- NotSpam → E-posta Yardımcısı → E-posta Gönder
- İstenmeyen posta → İstenmeyen Posta Yürütücüsü İşleme
- Belirsiz → Belirsiz Yürütücüyü İşle (varsayılan durum)
Önemli geliştirme, birden çok koşullu kenar yerine deseni SwitchBuilder kullanmaktır ve karar karmaşıklığı arttıkça iş akışının anlaşılmasını ve korunmasını kolaylaştırır.
Ele Alınan Kavramlar
Switch-Case için Veri Modelleri
Veri modellerinizi üç yönlü sınıflandırmayı destekleyecek şekilde güncelleştirin:
/// <summary>
/// Represents the possible decisions for spam detection.
/// </summary>
public enum SpamDecision
{
NotSpam,
Spam,
Uncertain
}
/// <summary>
/// Represents the result of spam detection with enhanced decision support.
/// </summary>
public sealed class DetectionResult
{
[JsonPropertyName("spam_decision")]
[JsonConverter(typeof(JsonStringEnumConverter))]
public SpamDecision spamDecision { get; set; }
[JsonPropertyName("reason")]
public string Reason { get; set; } = string.Empty;
// Email ID is generated by the executor, not the agent
[JsonIgnore]
public string EmailId { get; set; } = string.Empty;
}
/// <summary>
/// Represents an email stored in shared state.
/// </summary>
internal sealed class Email
{
[JsonPropertyName("email_id")]
public string EmailId { get; set; } = string.Empty;
[JsonPropertyName("email_content")]
public string EmailContent { get; set; } = string.Empty;
}
/// <summary>
/// Represents the response from the email assistant.
/// </summary>
public sealed class EmailResponse
{
[JsonPropertyName("response")]
public string Response { get; set; } = string.Empty;
}
/// <summary>
/// Constants for shared state scopes.
/// </summary>
internal static class EmailStateConstants
{
public const string EmailStateScope = "EmailState";
}
Switch-Case için Condition Factory
Her istenmeyen posta kararı için koşullar oluşturan yeniden kullanılabilir bir koşul fabrikası oluşturun:
/// <summary>
/// Creates a condition for routing messages based on the expected spam detection result.
/// </summary>
/// <param name="expectedDecision">The expected spam detection decision</param>
/// <returns>A function that evaluates whether a message meets the expected result</returns>
private static Func<object?, bool> GetCondition(SpamDecision expectedDecision) =>
detectionResult => detectionResult is DetectionResult result && result.spamDecision == expectedDecision;
Bu fabrika yaklaşımı:
- Kod Yinelemesini Azaltır: Bir işlev tüm koşul koşullarını oluşturur
- Tutarlılık sağlar: Tüm koşullar aynı desene uyar
- Bakımı Basitleştirir: Koşul mantığındaki değişiklikler tek bir yerde gerçekleşir
Gelişmiş Yapay Zeka Aracısı
İstenmeyen posta algılama aracısını daha az güvenecek ve üç yönlü sınıflandırmalar döndürecek şekilde güncelleştirin:
/// <summary>
/// Creates a spam detection agent with enhanced uncertainty handling.
/// </summary>
/// <returns>A ChatClientAgent configured for three-way spam detection</returns>
private static ChatClientAgent GetSpamDetectionAgent(IChatClient chatClient) =>
new(chatClient, new ChatClientAgentOptions(instructions: "You are a spam detection assistant that identifies spam emails. Be less confident in your assessments.")
{
ChatOptions = new()
{
ResponseFormat = ChatResponseFormat.ForJsonSchema<DetectionResult>()
}
});
/// <summary>
/// Creates an email assistant agent (unchanged from conditional edges example).
/// </summary>
/// <returns>A ChatClientAgent configured for email assistance</returns>
private static ChatClientAgent GetEmailAssistantAgent(IChatClient chatClient) =>
new(chatClient, new ChatClientAgentOptions(instructions: "You are an email assistant that helps users draft responses to emails with professionalism.")
{
ChatOptions = new()
{
ResponseFormat = ChatResponseFormat.ForJsonSchema<EmailResponse>()
}
});
Gelişmiş Yönlendirme ile İş Akışı Yürütücüleri
Paylaşılan durum yönetimiyle üç yönlü yönlendirmeyi işleyen yürütücüler uygulayın:
/// <summary>
/// Executor that detects spam using an AI agent with three-way classification.
/// </summary>
internal sealed class SpamDetectionExecutor : Executor<ChatMessage, DetectionResult>
{
private readonly AIAgent _spamDetectionAgent;
public SpamDetectionExecutor(AIAgent spamDetectionAgent) : base("SpamDetectionExecutor")
{
this._spamDetectionAgent = spamDetectionAgent;
}
public override async ValueTask<DetectionResult> HandleAsync(ChatMessage message, IWorkflowContext context, CancellationToken cancellationToken = default)
{
// Generate a random email ID and store the email content in shared state
var newEmail = new Email
{
EmailId = Guid.NewGuid().ToString("N"),
EmailContent = message.Text
};
await context.QueueStateUpdateAsync(newEmail.EmailId, newEmail, scopeName: EmailStateConstants.EmailStateScope);
// Invoke the agent for enhanced spam detection
var response = await this._spamDetectionAgent.RunAsync(message);
var detectionResult = JsonSerializer.Deserialize<DetectionResult>(response.Text);
detectionResult!.EmailId = newEmail.EmailId;
return detectionResult;
}
}
/// <summary>
/// Executor that assists with email responses using an AI agent.
/// </summary>
internal sealed class EmailAssistantExecutor : Executor<DetectionResult, EmailResponse>
{
private readonly AIAgent _emailAssistantAgent;
public EmailAssistantExecutor(AIAgent emailAssistantAgent) : base("EmailAssistantExecutor")
{
this._emailAssistantAgent = emailAssistantAgent;
}
public override async ValueTask<EmailResponse> HandleAsync(DetectionResult message, IWorkflowContext context, CancellationToken cancellationToken = default)
{
if (message.spamDecision == SpamDecision.Spam)
{
throw new ArgumentException("This executor should only handle non-spam messages.");
}
// Retrieve the email content from shared state
var email = await context.ReadStateAsync<Email>(message.EmailId, scopeName: EmailStateConstants.EmailStateScope);
// Invoke the agent to draft a response
var response = await this._emailAssistantAgent.RunAsync(email!.EmailContent);
var emailResponse = JsonSerializer.Deserialize<EmailResponse>(response.Text);
return emailResponse!;
}
}
/// <summary>
/// Executor that sends emails.
/// </summary>
internal sealed class SendEmailExecutor : Executor<EmailResponse>
{
public SendEmailExecutor() : base("SendEmailExecutor") { }
public override async ValueTask HandleAsync(EmailResponse message, IWorkflowContext context, CancellationToken cancellationToken = default) =>
await context.YieldOutputAsync($"Email sent: {message.Response}").ConfigureAwait(false);
}
/// <summary>
/// Executor that handles spam messages.
/// </summary>
internal sealed class HandleSpamExecutor : Executor<DetectionResult>
{
public HandleSpamExecutor() : base("HandleSpamExecutor") { }
public override async ValueTask HandleAsync(DetectionResult message, IWorkflowContext context, CancellationToken cancellationToken = default)
{
if (message.spamDecision == SpamDecision.Spam)
{
await context.YieldOutputAsync($"Email marked as spam: {message.Reason}").ConfigureAwait(false);
}
else
{
throw new ArgumentException("This executor should only handle spam messages.");
}
}
}
/// <summary>
/// Executor that handles uncertain emails requiring manual review.
/// </summary>
internal sealed class HandleUncertainExecutor : Executor<DetectionResult>
{
public HandleUncertainExecutor() : base("HandleUncertainExecutor") { }
public override async ValueTask HandleAsync(DetectionResult message, IWorkflowContext context, CancellationToken cancellationToken = default)
{
if (message.spamDecision == SpamDecision.Uncertain)
{
var email = await context.ReadStateAsync<Email>(message.EmailId, scopeName: EmailStateConstants.EmailStateScope);
await context.YieldOutputAsync($"Email marked as uncertain: {message.Reason}. Email content: {email?.EmailContent}");
}
else
{
throw new ArgumentException("This executor should only handle uncertain spam decisions.");
}
}
}
Switch-Case Deseni ile İş Akışı Oluşturma
Birden çok koşullu kenarı daha temiz switch-case düzeniyle değiştirin.
public static class Program
{
private static async Task Main()
{
// Set up the Azure OpenAI client
var endpoint = Environment.GetEnvironmentVariable("AZURE_OPENAI_ENDPOINT") ?? throw new Exception("AZURE_OPENAI_ENDPOINT is not set.");
var deploymentName = Environment.GetEnvironmentVariable("AZURE_OPENAI_DEPLOYMENT_NAME") ?? "gpt-4o-mini";
var chatClient = new AzureOpenAIClient(new Uri(endpoint), new AzureCliCredential()).GetChatClient(deploymentName).AsIChatClient();
// Create agents
AIAgent spamDetectionAgent = GetSpamDetectionAgent(chatClient);
AIAgent emailAssistantAgent = GetEmailAssistantAgent(chatClient);
// Create executors
var spamDetectionExecutor = new SpamDetectionExecutor(spamDetectionAgent);
var emailAssistantExecutor = new EmailAssistantExecutor(emailAssistantAgent);
var sendEmailExecutor = new SendEmailExecutor();
var handleSpamExecutor = new HandleSpamExecutor();
var handleUncertainExecutor = new HandleUncertainExecutor();
// Build the workflow using switch-case for cleaner three-way routing
WorkflowBuilder builder = new(spamDetectionExecutor);
builder.AddSwitch(spamDetectionExecutor, switchBuilder =>
switchBuilder
.AddCase(
GetCondition(expectedDecision: SpamDecision.NotSpam),
emailAssistantExecutor
)
.AddCase(
GetCondition(expectedDecision: SpamDecision.Spam),
handleSpamExecutor
)
.WithDefault(
handleUncertainExecutor
)
)
// After the email assistant writes a response, it will be sent to the send email executor
.AddEdge(emailAssistantExecutor, sendEmailExecutor)
.WithOutputFrom(handleSpamExecutor, sendEmailExecutor, handleUncertainExecutor);
var workflow = builder.Build();
// Read an email from a text file (use ambiguous content for demonstration)
string email = Resources.Read("ambiguous_email.txt");
// Execute the workflow
StreamingRun run = await InProcessExecution.StreamAsync(workflow, new ChatMessage(ChatRole.User, email));
await run.TrySendMessageAsync(new TurnToken(emitEvents: true));
await foreach (WorkflowEvent evt in run.WatchStreamAsync().ConfigureAwait(false))
{
if (evt is WorkflowOutputEvent outputEvent)
{
Console.WriteLine($"{outputEvent}");
}
}
}
}
Switch-Case Avantajları
-
Daha Temiz Söz Dizimi: ,
SwitchBuilderbirden çok koşullu kenar için daha okunabilir bir alternatif sağlar - Sıralı Değerlendirme: Vakalar sıralı olarak değerlendirilir ve ilk eşleşmede durduruluyor
-
Garantili Yönlendirme: Yöntemi, iletilerin
WithDefault()hiçbir zaman takılmamasını sağlar - Daha İyi Bakım: Yeni servis talepleri eklemek için iş akışı yapısında en az değişiklik yapılması gerekir
- Tür Güvenliği: Her yürütücü, yönlendirme hatalarını erken yakalamak için girdisini doğrular
Desen Karşılaştırması
Önceki (Koşullu Kenarlar):
var workflow = new WorkflowBuilder(spamDetectionExecutor)
.AddEdge(spamDetectionExecutor, emailAssistantExecutor, condition: GetCondition(expectedResult: false))
.AddEdge(spamDetectionExecutor, handleSpamExecutor, condition: GetCondition(expectedResult: true))
// No clean way to handle a third case
.WithOutputFrom(handleSpamExecutor, sendEmailExecutor)
.Build();
Switch-Case sonrası:
WorkflowBuilder builder = new(spamDetectionExecutor);
builder.AddSwitch(spamDetectionExecutor, switchBuilder =>
switchBuilder
.AddCase(GetCondition(SpamDecision.NotSpam), emailAssistantExecutor)
.AddCase(GetCondition(SpamDecision.Spam), handleSpamExecutor)
.WithDefault(handleUncertainExecutor) // Clean default case
)
// Continue building the rest of the workflow
Switch-case deseni, yönlendirme kararlarının sayısı arttıkça çok daha iyi ölçeklenir ve varsayılan durum, beklenmeyen değerler için bir güvenlik ağı sağlar.
Örneği Çalıştırma
Bu iş akışını belirsiz e-posta içeriğiyle çalıştırdığınızda:
Email marked as uncertain: This email contains promotional language but might be from a legitimate business contact, requiring human review for proper classification.
Farklı yönlendirme yollarının çalıştığını görmek için e-posta içeriğini açıkça istenmeyen posta veya açıkça meşru bir şey olarak değiştirmeyi deneyin.
Uygulamayı Tamamla
Tam çalışma uygulaması için Agent Framework deposundaki bu örneğe bakın.
Koşullu Kenarlar Üzerine İnşa Etme
Önceki koşullu kenarlar örneğinde iki yönlü yönlendirme (istenmeyen posta ve yasal e-postalar) gösterilmiştir. Ancak, birçok gerçek dünya senaryosu daha karmaşık karar ağaçları gerektirir. Anahtar servis talebi kenarları, farklı koşullara göre birden çok hedefe yönlendirmeniz gerektiğinde daha temiz ve daha bakımlı bir çözüm sağlar.
Sıradaki Projeniz
E-posta işleme iş akışını üç karar yolunu işleyecek şekilde genişleteceksiniz:
- NotSpam → E-posta Yardımcısı → E-posta Gönder
- İstenmeyen posta → İstenmeyen Posta Olarak İşaretle
- Belirsiz → Manuel İnceleme İçin İşaretle (varsayılan durum)
Önemli geliştirme, birden çok koşullu kenar yerine tek bir switch-case kenar grubu kullanmak olup, karar karmaşıklığı arttıkça iş akışının anlaşılmasını ve bakımını daha kolay hale getirir.
Ele Alınan Kavramlar
Gelişmiş Veri Modelleri
Veri modellerinizi üç yönlü sınıflandırmayı destekleyecek şekilde güncelleştirin:
from typing import Literal
class DetectionResultAgent(BaseModel):
"""Structured output returned by the spam detection agent."""
# The agent classifies the email into one of three categories
spam_decision: Literal["NotSpam", "Spam", "Uncertain"]
reason: str
class EmailResponse(BaseModel):
"""Structured output returned by the email assistant agent."""
response: str
@dataclass
class DetectionResult:
"""Internal typed payload used for routing and downstream handling."""
spam_decision: str
reason: str
email_id: str
@dataclass
class Email:
"""In memory record of the email content stored in shared state."""
email_id: str
email_content: str
Anahtar-Kasa Durum Fabrikası
Her istenmeyen posta kararı için koşullar oluşturan yeniden kullanılabilir bir koşul fabrikası oluşturun:
def get_case(expected_decision: str):
"""Factory that returns a predicate matching a specific spam_decision value."""
def condition(message: Any) -> bool:
# Only match when the upstream payload is a DetectionResult with the expected decision
return isinstance(message, DetectionResult) and message.spam_decision == expected_decision
return condition
Bu fabrika yaklaşımı:
- Kod Yinelemesini Azaltır: Bir işlev tüm koşul koşullarını oluşturur
- Tutarlılık sağlar: Tüm koşullar aynı desene uyar
- Bakımı Basitleştirir: Koşul mantığındaki değişiklikler tek bir yerde gerçekleşir
Paylaşılan Duruma Sahip İş Akışı Yürütücüleri
Her iş akışı adımında büyük e-posta içeriğinin geçirilmesini önlemek için paylaşılan durumu kullanan yürütücüler uygulayın:
EMAIL_STATE_PREFIX = "email:"
CURRENT_EMAIL_ID_KEY = "current_email_id"
@executor(id="store_email")
async def store_email(email_text: str, ctx: WorkflowContext[AgentExecutorRequest]) -> None:
"""Store email content once and pass around a lightweight ID reference."""
# Persist the raw email content in shared state
new_email = Email(email_id=str(uuid4()), email_content=email_text)
await ctx.set_shared_state(f"{EMAIL_STATE_PREFIX}{new_email.email_id}", new_email)
await ctx.set_shared_state(CURRENT_EMAIL_ID_KEY, new_email.email_id)
# Forward email to spam detection agent
await ctx.send_message(
AgentExecutorRequest(messages=[ChatMessage(Role.USER, text=new_email.email_content)], should_respond=True)
)
@executor(id="to_detection_result")
async def to_detection_result(response: AgentExecutorResponse, ctx: WorkflowContext[DetectionResult]) -> None:
"""Transform agent response into a typed DetectionResult with email ID."""
# Parse the agent's structured JSON output
parsed = DetectionResultAgent.model_validate_json(response.agent_run_response.text)
email_id: str = await ctx.get_shared_state(CURRENT_EMAIL_ID_KEY)
# Create typed message for switch-case routing
await ctx.send_message(DetectionResult(
spam_decision=parsed.spam_decision,
reason=parsed.reason,
email_id=email_id
))
@executor(id="submit_to_email_assistant")
async def submit_to_email_assistant(detection: DetectionResult, ctx: WorkflowContext[AgentExecutorRequest]) -> None:
"""Handle NotSpam emails by forwarding to the email assistant."""
# Guard against misrouting
if detection.spam_decision != "NotSpam":
raise RuntimeError("This executor should only handle NotSpam messages.")
# Retrieve original email content from shared state
email: Email = await ctx.get_shared_state(f"{EMAIL_STATE_PREFIX}{detection.email_id}")
await ctx.send_message(
AgentExecutorRequest(messages=[ChatMessage(Role.USER, text=email.email_content)], should_respond=True)
)
@executor(id="finalize_and_send")
async def finalize_and_send(response: AgentExecutorResponse, ctx: WorkflowContext[Never, str]) -> None:
"""Parse email assistant response and yield final output."""
parsed = EmailResponse.model_validate_json(response.agent_run_response.text)
await ctx.yield_output(f"Email sent: {parsed.response}")
@executor(id="handle_spam")
async def handle_spam(detection: DetectionResult, ctx: WorkflowContext[Never, str]) -> None:
"""Handle confirmed spam emails."""
if detection.spam_decision == "Spam":
await ctx.yield_output(f"Email marked as spam: {detection.reason}")
else:
raise RuntimeError("This executor should only handle Spam messages.")
@executor(id="handle_uncertain")
async def handle_uncertain(detection: DetectionResult, ctx: WorkflowContext[Never, str]) -> None:
"""Handle uncertain classifications that need manual review."""
if detection.spam_decision == "Uncertain":
# Include original content for human review
email: Email | None = await ctx.get_shared_state(f"{EMAIL_STATE_PREFIX}{detection.email_id}")
await ctx.yield_output(
f"Email marked as uncertain: {detection.reason}. Email content: {getattr(email, 'email_content', '')}"
)
else:
raise RuntimeError("This executor should only handle Uncertain messages.")
Gelişmiş Yapay Zeka Aracısı Oluşturma
İstenmeyen posta algılama aracısını daha az güvenecek ve üç yönlü sınıflandırmalar döndürecek şekilde güncelleştirin:
async def main():
chat_client = AzureOpenAIChatClient(credential=AzureCliCredential())
# Enhanced spam detection agent with three-way classification
spam_detection_agent = AgentExecutor(
chat_client.create_agent(
instructions=(
"You are a spam detection assistant that identifies spam emails. "
"Be less confident in your assessments. "
"Always return JSON with fields 'spam_decision' (one of NotSpam, Spam, Uncertain) "
"and 'reason' (string)."
),
response_format=DetectionResultAgent,
),
id="spam_detection_agent",
)
# Email assistant remains the same
email_assistant_agent = AgentExecutor(
chat_client.create_agent(
instructions=(
"You are an email assistant that helps users draft responses to emails with professionalism."
),
response_format=EmailResponse,
),
id="email_assistant_agent",
)
Switch-Case Edge Grubu ile İş Akışı Oluşturma
Birden çok koşullu kenarı tek bir switch-case yapısıyla değiştirin.
# Build workflow using switch-case for cleaner three-way routing
workflow = (
WorkflowBuilder()
.set_start_executor(store_email)
.add_edge(store_email, spam_detection_agent)
.add_edge(spam_detection_agent, to_detection_result)
.add_switch_case_edge_group(
to_detection_result,
[
# Explicit cases for specific decisions
Case(condition=get_case("NotSpam"), target=submit_to_email_assistant),
Case(condition=get_case("Spam"), target=handle_spam),
# Default case catches anything that doesn't match above
Default(target=handle_uncertain),
],
)
.add_edge(submit_to_email_assistant, email_assistant_agent)
.add_edge(email_assistant_agent, finalize_and_send)
.build()
)
Yürütme ve Test
üç yönlü yönlendirmeyi gösteren belirsiz e-posta içeriğiyle iş akışını çalıştırın:
# Use ambiguous email content that might trigger uncertain classification
email = (
"Hey there, I noticed you might be interested in our latest offer—no pressure, but it expires soon. "
"Let me know if you'd like more details."
)
# Execute and display results
events = await workflow.run(email)
outputs = events.get_outputs()
if outputs:
for output in outputs:
print(f"Workflow output: {output}")
Switch-Case Kenarlarının Başlıca Avantajları
- Daha Temiz Söz Dizimi: Birden çok koşullu kenar yerine bir kenar grubu
- Sıralı Değerlendirme: Vakalar sıralı olarak değerlendirilir ve ilk eşleşmede durduruluyor
- Garantili Yönlendirme: Varsayılan durum, iletilerin hiçbir zaman takılmamasını sağlar
- Daha İyi Bakım: Yeni durum eklemek için asgari değişiklik gerekir
- Tür Güvenliği: Her yürütücü yönlendirme hatalarını yakalamak için girdisini doğrular
Karşılaştırma: Koşullu ve Switch-Case
Önceki (Koşullu Kenarlar):
.add_edge(detector, handler_a, condition=lambda x: x.result == "A")
.add_edge(detector, handler_b, condition=lambda x: x.result == "B")
.add_edge(detector, handler_c, condition=lambda x: x.result == "C")
Switch-Case sonrası:
.add_switch_case_edge_group(
detector,
[
Case(condition=lambda x: x.result == "A", target=handler_a),
Case(condition=lambda x: x.result == "B", target=handler_b),
Default(target=handler_c), # Catches everything else
],
)
Switch-case deseni, yönlendirme kararlarının sayısı arttıkça çok daha iyi ölçeklenir ve varsayılan durum, beklenmeyen değerler için bir güvenlik ağı sağlar.
Switch-Case Örnek Kodu
Tam çalışma uygulaması için Agent Framework deposundaki switch_case_edge_group.py örneğine bakın.
Çoklu Seçim Kenarları
Switch-Case Dışında: Çoklu Seçim Yönlendirme
Switch-case yapıları mesajları belirli bir hedefe yönlendirirken, gerçek dünya iş akışlarının genellikle veri özelliklerine göre birden çok paralel işlemi tetiklemeye ihtiyacı vardır. Bölümlenmiş kenarlar (bölümlendiriciler kullanılarak fan-out kenarları olarak gerçekleştirilmiştir) tek bir iletinin aynı anda birden çok aşağı akış yürütücüyü etkinleştirebildiği gelişmiş dağıtım desenleri sağlar.
Gelişmiş E-posta İşleme İş Akışı
Gelişmiş yönlendirme mantığını gösteren anahtar-kasa örneğini kullanarak gelişmiş bir e-posta işleme sistemi oluşturacaksınız.
- İstenmeyen postalar → Tek istenmeyen posta işleyicisi (anahtar servis talebi gibi)
- Yasal e-postalar → E-posta yardımcısını her zaman tetikleme + Uzun e-postalar için koşullu olarak özetleyici tetikleme
- Belirsiz e-postalar → Tek belirsiz işleyici (switch-case benzeri)
- Veritabanı kalıcılığı → Hem kısa hem de özetlenmiş uzun e-postalar için tetiklenmiş
Bu düzen, içerik özelliklerine uyum sağlayan paralel işleme işlem hatlarını etkinleştirir.
Ele Alınan Kavramlar
Çoklu Seçim için Veri Modelleri
E-posta uzunluğu analizini ve özetlemeyi desteklemek için veri modellerini genişletin:
/// <summary>
/// Represents the result of enhanced email analysis with additional metadata.
/// </summary>
public sealed class AnalysisResult
{
[JsonPropertyName("spam_decision")]
[JsonConverter(typeof(JsonStringEnumConverter))]
public SpamDecision spamDecision { get; set; }
[JsonPropertyName("reason")]
public string Reason { get; set; } = string.Empty;
// Additional properties for sophisticated routing
[JsonIgnore]
public int EmailLength { get; set; }
[JsonIgnore]
public string EmailSummary { get; set; } = string.Empty;
[JsonIgnore]
public string EmailId { get; set; } = string.Empty;
}
/// <summary>
/// Represents the response from the email assistant.
/// </summary>
public sealed class EmailResponse
{
[JsonPropertyName("response")]
public string Response { get; set; } = string.Empty;
}
/// <summary>
/// Represents the response from the email summary agent.
/// </summary>
public sealed class EmailSummary
{
[JsonPropertyName("summary")]
public string Summary { get; set; } = string.Empty;
}
/// <summary>
/// A custom workflow event for database operations.
/// </summary>
internal sealed class DatabaseEvent(string message) : WorkflowEvent(message) { }
/// <summary>
/// Constants for email processing thresholds.
/// </summary>
public static class EmailProcessingConstants
{
public const int LongEmailThreshold = 100;
}
Hedef Atama İşlevi: Çoklu Seçimin Kalbi
Hedef atayan işlevi, hangi yürütücülerin her iletiyi alması gerektiğini belirler:
/// <summary>
/// Creates a target assigner for routing messages based on the analysis result.
/// </summary>
/// <returns>A function that takes an analysis result and returns the target partitions.</returns>
private static Func<AnalysisResult?, int, IEnumerable<int>> GetTargetAssigner()
{
return (analysisResult, targetCount) =>
{
if (analysisResult is not null)
{
if (analysisResult.spamDecision == SpamDecision.Spam)
{
return [0]; // Route only to spam handler (index 0)
}
else if (analysisResult.spamDecision == SpamDecision.NotSpam)
{
// Always route to email assistant (index 1)
List<int> targets = [1];
// Conditionally add summarizer for long emails (index 2)
if (analysisResult.EmailLength > EmailProcessingConstants.LongEmailThreshold)
{
targets.Add(2);
}
return targets;
}
else // Uncertain
{
return [3]; // Route only to uncertain handler (index 3)
}
}
throw new ArgumentException("Invalid analysis result.");
};
}
Hedef Atayıcı İşlevinin Temel Özellikleri
- Dinamik Hedef Seçimi: Etkinleştirecek yürütücü dizinlerinin listesini döndürür
- İçeriğe Duyarlı Yönlendirme: E-posta uzunluğu gibi ileti özelliklerine göre kararlar alır
- Paralel İşleme: Birden çok hedef aynı anda yürütülebilir
- Koşullu Mantık: Birden çok ölçüte dayalı karmaşık dallanma
Gelişmiş İş Akışı Yürütücüleri
Gelişmiş analiz ve yönlendirmeyi işleyen yürütücüler uygulayın:
/// <summary>
/// Executor that analyzes emails using an AI agent with enhanced analysis.
/// </summary>
internal sealed class EmailAnalysisExecutor : Executor<ChatMessage, AnalysisResult>
{
private readonly AIAgent _emailAnalysisAgent;
public EmailAnalysisExecutor(AIAgent emailAnalysisAgent) : base("EmailAnalysisExecutor")
{
this._emailAnalysisAgent = emailAnalysisAgent;
}
public override async ValueTask<AnalysisResult> HandleAsync(ChatMessage message, IWorkflowContext context, CancellationToken cancellationToken = default)
{
// Generate a random email ID and store the email content
var newEmail = new Email
{
EmailId = Guid.NewGuid().ToString("N"),
EmailContent = message.Text
};
await context.QueueStateUpdateAsync(newEmail.EmailId, newEmail, scopeName: EmailStateConstants.EmailStateScope);
// Invoke the agent for enhanced analysis
var response = await this._emailAnalysisAgent.RunAsync(message);
var analysisResult = JsonSerializer.Deserialize<AnalysisResult>(response.Text);
// Enrich with metadata for routing decisions
analysisResult!.EmailId = newEmail.EmailId;
analysisResult.EmailLength = newEmail.EmailContent.Length;
return analysisResult;
}
}
/// <summary>
/// Executor that assists with email responses using an AI agent.
/// </summary>
internal sealed class EmailAssistantExecutor : Executor<AnalysisResult, EmailResponse>
{
private readonly AIAgent _emailAssistantAgent;
public EmailAssistantExecutor(AIAgent emailAssistantAgent) : base("EmailAssistantExecutor")
{
this._emailAssistantAgent = emailAssistantAgent;
}
public override async ValueTask<EmailResponse> HandleAsync(AnalysisResult message, IWorkflowContext context, CancellationToken cancellationToken = default)
{
if (message.spamDecision == SpamDecision.Spam)
{
throw new ArgumentException("This executor should only handle non-spam messages.");
}
// Retrieve the email content from shared state
var email = await context.ReadStateAsync<Email>(message.EmailId, scopeName: EmailStateConstants.EmailStateScope);
// Invoke the agent to draft a response
var response = await this._emailAssistantAgent.RunAsync(email!.EmailContent);
var emailResponse = JsonSerializer.Deserialize<EmailResponse>(response.Text);
return emailResponse!;
}
}
/// <summary>
/// Executor that summarizes emails using an AI agent for long emails.
/// </summary>
internal sealed class EmailSummaryExecutor : Executor<AnalysisResult, AnalysisResult>
{
private readonly AIAgent _emailSummaryAgent;
public EmailSummaryExecutor(AIAgent emailSummaryAgent) : base("EmailSummaryExecutor")
{
this._emailSummaryAgent = emailSummaryAgent;
}
public override async ValueTask<AnalysisResult> HandleAsync(AnalysisResult message, IWorkflowContext context, CancellationToken cancellationToken = default)
{
// Read the email content from shared state
var email = await context.ReadStateAsync<Email>(message.EmailId, scopeName: EmailStateConstants.EmailStateScope);
// Generate summary for long emails
var response = await this._emailSummaryAgent.RunAsync(email!.EmailContent);
var emailSummary = JsonSerializer.Deserialize<EmailSummary>(response.Text);
// Enrich the analysis result with the summary
message.EmailSummary = emailSummary!.Summary;
return message;
}
}
/// <summary>
/// Executor that sends emails.
/// </summary>
internal sealed class SendEmailExecutor : Executor<EmailResponse>
{
public SendEmailExecutor() : base("SendEmailExecutor") { }
public override async ValueTask HandleAsync(EmailResponse message, IWorkflowContext context, CancellationToken cancellationToken = default) =>
await context.YieldOutputAsync($"Email sent: {message.Response}");
}
/// <summary>
/// Executor that handles spam messages.
/// </summary>
internal sealed class HandleSpamExecutor : Executor<AnalysisResult>
{
public HandleSpamExecutor() : base("HandleSpamExecutor") { }
public override async ValueTask HandleAsync(AnalysisResult message, IWorkflowContext context, CancellationToken cancellationToken = default)
{
if (message.spamDecision == SpamDecision.Spam)
{
await context.YieldOutputAsync($"Email marked as spam: {message.Reason}");
}
else
{
throw new ArgumentException("This executor should only handle spam messages.");
}
}
}
/// <summary>
/// Executor that handles uncertain messages requiring manual review.
/// </summary>
internal sealed class HandleUncertainExecutor : Executor<AnalysisResult>
{
public HandleUncertainExecutor() : base("HandleUncertainExecutor") { }
public override async ValueTask HandleAsync(AnalysisResult message, IWorkflowContext context, CancellationToken cancellationToken = default)
{
if (message.spamDecision == SpamDecision.Uncertain)
{
var email = await context.ReadStateAsync<Email>(message.EmailId, scopeName: EmailStateConstants.EmailStateScope);
await context.YieldOutputAsync($"Email marked as uncertain: {message.Reason}. Email content: {email?.EmailContent}");
}
else
{
throw new ArgumentException("This executor should only handle uncertain spam decisions.");
}
}
}
/// <summary>
/// Executor that handles database access with custom events.
/// </summary>
internal sealed class DatabaseAccessExecutor : Executor<AnalysisResult>
{
public DatabaseAccessExecutor() : base("DatabaseAccessExecutor") { }
public override async ValueTask HandleAsync(AnalysisResult message, IWorkflowContext context, CancellationToken cancellationToken = default)
{
// Simulate database operations
await context.ReadStateAsync<Email>(message.EmailId, scopeName: EmailStateConstants.EmailStateScope);
await Task.Delay(100); // Simulate database access delay
// Emit custom database event for monitoring
await context.AddEventAsync(new DatabaseEvent($"Email {message.EmailId} saved to database."));
}
}
Gelişmiş Yapay Zeka Aracıları
Analiz, yardım ve özetleme için aracılar oluşturma:
/// <summary>
/// Create an enhanced email analysis agent.
/// </summary>
/// <returns>A ChatClientAgent configured for comprehensive email analysis</returns>
private static ChatClientAgent GetEmailAnalysisAgent(IChatClient chatClient) =>
new(chatClient, new ChatClientAgentOptions(instructions: "You are a spam detection assistant that identifies spam emails.")
{
ChatOptions = new()
{
ResponseFormat = ChatResponseFormat.ForJsonSchema<AnalysisResult>()
}
});
/// <summary>
/// Creates an email assistant agent.
/// </summary>
/// <returns>A ChatClientAgent configured for email assistance</returns>
private static ChatClientAgent GetEmailAssistantAgent(IChatClient chatClient) =>
new(chatClient, new ChatClientAgentOptions(instructions: "You are an email assistant that helps users draft responses to emails with professionalism.")
{
ChatOptions = new()
{
ResponseFormat = ChatResponseFormat.ForJsonSchema<EmailResponse>()
}
});
/// <summary>
/// Creates an agent that summarizes emails.
/// </summary>
/// <returns>A ChatClientAgent configured for email summarization</returns>
private static ChatClientAgent GetEmailSummaryAgent(IChatClient chatClient) =>
new(chatClient, new ChatClientAgentOptions(instructions: "You are an assistant that helps users summarize emails.")
{
ChatOptions = new()
{
ResponseFormat = ChatResponseFormat.ForJsonSchema<EmailSummary>()
}
});
Çok Seçimli İş Akışı Oluşturma
Gelişmiş yönlendirme ve paralel işleme ile iş akışını oluşturma:
public static class Program
{
private static async Task Main()
{
// Set up the Azure OpenAI client
var endpoint = Environment.GetEnvironmentVariable("AZURE_OPENAI_ENDPOINT") ?? throw new Exception("AZURE_OPENAI_ENDPOINT is not set.");
var deploymentName = Environment.GetEnvironmentVariable("AZURE_OPENAI_DEPLOYMENT_NAME") ?? "gpt-4o-mini";
var chatClient = new AzureOpenAIClient(new Uri(endpoint), new AzureCliCredential()).GetChatClient(deploymentName).AsIChatClient();
// Create agents
AIAgent emailAnalysisAgent = GetEmailAnalysisAgent(chatClient);
AIAgent emailAssistantAgent = GetEmailAssistantAgent(chatClient);
AIAgent emailSummaryAgent = GetEmailSummaryAgent(chatClient);
// Create executors
var emailAnalysisExecutor = new EmailAnalysisExecutor(emailAnalysisAgent);
var emailAssistantExecutor = new EmailAssistantExecutor(emailAssistantAgent);
var emailSummaryExecutor = new EmailSummaryExecutor(emailSummaryAgent);
var sendEmailExecutor = new SendEmailExecutor();
var handleSpamExecutor = new HandleSpamExecutor();
var handleUncertainExecutor = new HandleUncertainExecutor();
var databaseAccessExecutor = new DatabaseAccessExecutor();
// Build the workflow with multi-selection fan-out
WorkflowBuilder builder = new(emailAnalysisExecutor);
builder.AddFanOutEdge(
emailAnalysisExecutor,
targets: [
handleSpamExecutor, // Index 0: Spam handler
emailAssistantExecutor, // Index 1: Email assistant (always for NotSpam)
emailSummaryExecutor, // Index 2: Summarizer (conditionally for long NotSpam)
handleUncertainExecutor, // Index 3: Uncertain handler
],
targetSelector: GetTargetAssigner()
)
// Email assistant branch
.AddEdge(emailAssistantExecutor, sendEmailExecutor)
// Database persistence: conditional routing
.AddEdge<AnalysisResult>(
emailAnalysisExecutor,
databaseAccessExecutor,
condition: analysisResult => analysisResult?.EmailLength <= EmailProcessingConstants.LongEmailThreshold) // Short emails
.AddEdge(emailSummaryExecutor, databaseAccessExecutor) // Long emails with summary
.WithOutputFrom(handleUncertainExecutor, handleSpamExecutor, sendEmailExecutor);
var workflow = builder.Build();
// Read a moderately long email to trigger both assistant and summarizer
string email = Resources.Read("email.txt");
// Execute the workflow with custom event handling
StreamingRun run = await InProcessExecution.StreamAsync(workflow, new ChatMessage(ChatRole.User, email));
await run.TrySendMessageAsync(new TurnToken(emitEvents: true));
await foreach (WorkflowEvent evt in run.WatchStreamAsync().ConfigureAwait(false))
{
if (evt is WorkflowOutputEvent outputEvent)
{
Console.WriteLine($"Output: {outputEvent}");
}
if (evt is DatabaseEvent databaseEvent)
{
Console.WriteLine($"Database: {databaseEvent}");
}
}
}
}
Desen Karşılaştırması: Çoklu Seçim ile Switch-Case Karşılaştırması
Switch-Case Deseni (Önceki):
// One input → exactly one output
builder.AddSwitch(spamDetectionExecutor, switchBuilder =>
switchBuilder
.AddCase(GetCondition(SpamDecision.NotSpam), emailAssistantExecutor)
.AddCase(GetCondition(SpamDecision.Spam), handleSpamExecutor)
.WithDefault(handleUncertainExecutor)
)
Çoklu Seçim Kalıbı:
// One input → one or more outputs (dynamic fan-out)
builder.AddFanOutEdge(
emailAnalysisExecutor,
targets: [handleSpamExecutor, emailAssistantExecutor, emailSummaryExecutor, handleUncertainExecutor],
targetSelector: GetTargetAssigner() // Returns list of target indices
)
Çoklu Seçim Kenarlarının Başlıca Avantajları
- Paralel İşleme: Birden çok dal aynı anda yürütülebilir
- Koşullu Yayma: Hedef sayısı içeriğe göre değişir
- İçerik Kullanan Yönlendirme: Yalnızca türe değil, ileti özelliklerine dayalı kararlar
- Verimli Kaynak Kullanımı: Yalnızca gerekli dallar etkinleştirilir
- Karmaşık İş Mantığı: Gelişmiş yönlendirme senaryolarını destekler
Çoklu Seçim Örneğini Çalıştırma
Bu iş akışını uzun bir e-postayla çalıştırdığınızda:
Output: Email sent: [Professional response generated by AI]
Database: Email abc123 saved to database.
Kısa bir e-postayla çalıştırdığınızda özetleyici atlanır:
Output: Email sent: [Professional response generated by AI]
Database: Email def456 saved to database.
Real-World Kullanım Örnekleri
- E-posta Sistemleri: Yanıt yardımcısı + arşiv + analize yönlendirme (koşullu olarak)
- İçerik İşleme: Transkripsiyon + çeviri + çözümlemeyi tetikleme (içerik türüne göre)
- Sipariş İşleme: Gönderime yönlendirme + faturalama + bildirimler (sipariş özelliklerine göre)
- Veri İşlem Hatları: Veri özelliklerine göre farklı analiz akışlarını tetikleme
Çoklu Seçim Tam Uygulaması
Tam çalışma uygulaması için Agent Framework deposundaki bu örneğe bakın.
Switch-Case Dışında: Çoklu Seçim Yönlendirme
Switch-case yapıları mesajları belirli bir hedefe yönlendirirken, gerçek dünya iş akışlarının genellikle veri özelliklerine göre birden çok paralel işlemi tetiklemeye ihtiyacı vardır. Bölümlenmiş kenarlar (çok seçimli kenar grupları olarak uygulanır) tek bir iletinin aynı anda birden fazla aşağı akış yöneticisini etkinleştirebildiği gelişmiş dağıtım kalıplarını mümkün kılar.
Gelişmiş E-posta İşleme İş Akışı
Gelişmiş yönlendirme mantığını gösteren anahtar-kasa örneğini kullanarak gelişmiş bir e-posta işleme sistemi oluşturacaksınız.
- İstenmeyen postalar → Tek istenmeyen posta işleyicisi (anahtar servis talebi gibi)
- Yasal e-postalar → E-posta yardımcısını her zaman tetikleme + Uzun e-postalar için koşullu olarak özetleyici tetikleme
- Belirsiz e-postalar → Tek belirsiz işleyici (switch-case benzeri)
- Veritabanı kalıcılığı → Hem kısa hem de özetlenmiş uzun e-postalar için tetiklenmiş
Bu düzen, içerik özelliklerine uyum sağlayan paralel işleme işlem hatlarını etkinleştirir.
Ele Alınan Kavramlar
Çoklu Seçim için Gelişmiş Veri Modelleri
E-posta uzunluğu analizini ve özetlemeyi desteklemek için veri modellerini genişletin:
class AnalysisResultAgent(BaseModel):
"""Enhanced structured output from email analysis agent."""
spam_decision: Literal["NotSpam", "Spam", "Uncertain"]
reason: str
class EmailResponse(BaseModel):
"""Response from email assistant."""
response: str
class EmailSummaryModel(BaseModel):
"""Summary generated by email summary agent."""
summary: str
@dataclass
class AnalysisResult:
"""Internal analysis result with email metadata for routing decisions."""
spam_decision: str
reason: str
email_length: int # Used for conditional routing
email_summary: str # Populated by summary agent
email_id: str
@dataclass
class Email:
"""Email content stored in shared state."""
email_id: str
email_content: str
# Custom event for database operations
class DatabaseEvent(WorkflowEvent):
"""Custom event for tracking database operations."""
pass
Seçim İşlevi: Çoklu Seçimin Kalbi
Seçim işlevi, hangi yürütücülerin her iletiyi alması gerektiğini belirler:
LONG_EMAIL_THRESHOLD = 100
def select_targets(analysis: AnalysisResult, target_ids: list[str]) -> list[str]:
"""Intelligent routing based on spam decision and email characteristics."""
# Target order: [handle_spam, submit_to_email_assistant, summarize_email, handle_uncertain]
handle_spam_id, submit_to_email_assistant_id, summarize_email_id, handle_uncertain_id = target_ids
if analysis.spam_decision == "Spam":
# Route only to spam handler
return [handle_spam_id]
elif analysis.spam_decision == "NotSpam":
# Always route to email assistant
targets = [submit_to_email_assistant_id]
# Conditionally add summarizer for long emails
if analysis.email_length > LONG_EMAIL_THRESHOLD:
targets.append(summarize_email_id)
return targets
else: # Uncertain
# Route only to uncertain handler
return [handle_uncertain_id]
Seçim İşlevlerinin Temel Özellikleri
- Dinamik Hedef Seçimi: Etkinleştirecek yürütücü kimliklerinin listesini döndürür
- İçeriğe Duyarlı Yönlendirme: İleti özelliklerine göre kararlar alır
- Paralel İşleme: Birden çok hedef aynı anda yürütülebilir
- Koşullu Mantık: Birden çok ölçüte dayalı karmaşık dallanma
Çoklu Seçim İş Akışı Yürütücüleri
Gelişmiş çözümleme ve yönlendirmeyi işleyen yürütücüler uygulayın:
EMAIL_STATE_PREFIX = "email:"
CURRENT_EMAIL_ID_KEY = "current_email_id"
@executor(id="store_email")
async def store_email(email_text: str, ctx: WorkflowContext[AgentExecutorRequest]) -> None:
"""Store email and initiate analysis."""
new_email = Email(email_id=str(uuid4()), email_content=email_text)
await ctx.set_shared_state(f"{EMAIL_STATE_PREFIX}{new_email.email_id}", new_email)
await ctx.set_shared_state(CURRENT_EMAIL_ID_KEY, new_email.email_id)
await ctx.send_message(
AgentExecutorRequest(messages=[ChatMessage(Role.USER, text=new_email.email_content)], should_respond=True)
)
@executor(id="to_analysis_result")
async def to_analysis_result(response: AgentExecutorResponse, ctx: WorkflowContext[AnalysisResult]) -> None:
"""Transform agent response into enriched analysis result."""
parsed = AnalysisResultAgent.model_validate_json(response.agent_run_response.text)
email_id: str = await ctx.get_shared_state(CURRENT_EMAIL_ID_KEY)
email: Email = await ctx.get_shared_state(f"{EMAIL_STATE_PREFIX}{email_id}")
# Create enriched analysis result with email length for routing decisions
await ctx.send_message(
AnalysisResult(
spam_decision=parsed.spam_decision,
reason=parsed.reason,
email_length=len(email.email_content), # Key for conditional routing
email_summary="",
email_id=email_id,
)
)
@executor(id="submit_to_email_assistant")
async def submit_to_email_assistant(analysis: AnalysisResult, ctx: WorkflowContext[AgentExecutorRequest]) -> None:
"""Handle legitimate emails by forwarding to email assistant."""
if analysis.spam_decision != "NotSpam":
raise RuntimeError("This executor should only handle NotSpam messages.")
email: Email = await ctx.get_shared_state(f"{EMAIL_STATE_PREFIX}{analysis.email_id}")
await ctx.send_message(
AgentExecutorRequest(messages=[ChatMessage(Role.USER, text=email.email_content)], should_respond=True)
)
@executor(id="finalize_and_send")
async def finalize_and_send(response: AgentExecutorResponse, ctx: WorkflowContext[Never, str]) -> None:
"""Final step for email assistant branch."""
parsed = EmailResponse.model_validate_json(response.agent_run_response.text)
await ctx.yield_output(f"Email sent: {parsed.response}")
@executor(id="summarize_email")
async def summarize_email(analysis: AnalysisResult, ctx: WorkflowContext[AgentExecutorRequest]) -> None:
"""Generate summary for long emails (parallel branch)."""
# Only called for long NotSpam emails by selection function
email: Email = await ctx.get_shared_state(f"{EMAIL_STATE_PREFIX}{analysis.email_id}")
await ctx.send_message(
AgentExecutorRequest(messages=[ChatMessage(Role.USER, text=email.email_content)], should_respond=True)
)
@executor(id="merge_summary")
async def merge_summary(response: AgentExecutorResponse, ctx: WorkflowContext[AnalysisResult]) -> None:
"""Merge summary back into analysis result for database persistence."""
summary = EmailSummaryModel.model_validate_json(response.agent_run_response.text)
email_id: str = await ctx.get_shared_state(CURRENT_EMAIL_ID_KEY)
email: Email = await ctx.get_shared_state(f"{EMAIL_STATE_PREFIX}{email_id}")
# Create analysis result with summary for database storage
await ctx.send_message(
AnalysisResult(
spam_decision="NotSpam",
reason="",
email_length=len(email.email_content),
email_summary=summary.summary, # Now includes summary
email_id=email_id,
)
)
@executor(id="handle_spam")
async def handle_spam(analysis: AnalysisResult, ctx: WorkflowContext[Never, str]) -> None:
"""Handle spam emails (single target like switch-case)."""
if analysis.spam_decision == "Spam":
await ctx.yield_output(f"Email marked as spam: {analysis.reason}")
else:
raise RuntimeError("This executor should only handle Spam messages.")
@executor(id="handle_uncertain")
async def handle_uncertain(analysis: AnalysisResult, ctx: WorkflowContext[Never, str]) -> None:
"""Handle uncertain emails (single target like switch-case)."""
if analysis.spam_decision == "Uncertain":
email: Email | None = await ctx.get_shared_state(f"{EMAIL_STATE_PREFIX}{analysis.email_id}")
await ctx.yield_output(
f"Email marked as uncertain: {analysis.reason}. Email content: {getattr(email, 'email_content', '')}"
)
else:
raise RuntimeError("This executor should only handle Uncertain messages.")
@executor(id="database_access")
async def database_access(analysis: AnalysisResult, ctx: WorkflowContext[Never, str]) -> None:
"""Simulate database persistence with custom events."""
await asyncio.sleep(0.05) # Simulate DB operation
await ctx.add_event(DatabaseEvent(f"Email {analysis.email_id} saved to database."))
Gelişmiş Yapay Zeka Aracıları
Analiz, yardım ve özetleme için aracılar oluşturma:
async def main() -> None:
chat_client = AzureOpenAIChatClient(credential=AzureCliCredential())
# Enhanced analysis agent
email_analysis_agent = AgentExecutor(
chat_client.create_agent(
instructions=(
"You are a spam detection assistant that identifies spam emails. "
"Always return JSON with fields 'spam_decision' (one of NotSpam, Spam, Uncertain) "
"and 'reason' (string)."
),
response_format=AnalysisResultAgent,
),
id="email_analysis_agent",
)
# Email assistant (same as before)
email_assistant_agent = AgentExecutor(
chat_client.create_agent(
instructions=(
"You are an email assistant that helps users draft responses to emails with professionalism."
),
response_format=EmailResponse,
),
id="email_assistant_agent",
)
# New: Email summary agent for long emails
email_summary_agent = AgentExecutor(
chat_client.create_agent(
instructions="You are an assistant that helps users summarize emails.",
response_format=EmailSummaryModel,
),
id="email_summary_agent",
)
Çoklu Seçim İş Akışı Oluşturma
Gelişmiş yönlendirme ve paralel işleme ile iş akışını oluşturma:
workflow = (
WorkflowBuilder()
.set_start_executor(store_email)
.add_edge(store_email, email_analysis_agent)
.add_edge(email_analysis_agent, to_analysis_result)
# Multi-selection edge group: intelligent fan-out based on content
.add_multi_selection_edge_group(
to_analysis_result,
[handle_spam, submit_to_email_assistant, summarize_email, handle_uncertain],
selection_func=select_targets,
)
# Email assistant branch (always for NotSpam)
.add_edge(submit_to_email_assistant, email_assistant_agent)
.add_edge(email_assistant_agent, finalize_and_send)
# Summary branch (only for long NotSpam emails)
.add_edge(summarize_email, email_summary_agent)
.add_edge(email_summary_agent, merge_summary)
# Database persistence: conditional routing
.add_edge(to_analysis_result, database_access,
condition=lambda r: r.email_length <= LONG_EMAIL_THRESHOLD) # Short emails
.add_edge(merge_summary, database_access) # Long emails with summary
.build()
)
Olay Akışı ile Yürütme
İş akışını çalıştırın ve özel olaylar aracılığıyla paralel yürütmeyi gözlemleyin:
# Use a moderately long email to trigger both assistant and summarizer
email = """
Hello team, here are the updates for this week:
1. Project Alpha is on track and we should have the first milestone completed by Friday.
2. The client presentation has been scheduled for next Tuesday at 2 PM.
3. Please review the Q4 budget allocation and provide feedback by Wednesday.
Let me know if you have any questions or concerns.
Best regards,
Alex
"""
# Stream events to see parallel execution
async for event in workflow.run_stream(email):
if isinstance(event, DatabaseEvent):
print(f"Database: {event}")
elif isinstance(event, WorkflowOutputEvent):
print(f"Output: {event.data}")
Çoklu Seçim Yapısı ve Switch-Case Yapısı Karşılaştırması
Switch-Case Deseni (Önceki):
# One input → exactly one output
.add_switch_case_edge_group(
source,
[
Case(condition=lambda x: x.result == "A", target=handler_a),
Case(condition=lambda x: x.result == "B", target=handler_b),
Default(target=handler_c),
],
)
Çoklu Seçim Kalıbı:
# One input → one or more outputs (dynamic fan-out)
.add_multi_selection_edge_group(
source,
[handler_a, handler_b, handler_c, handler_d],
selection_func=intelligent_router, # Returns list of target IDs
)
C# Çoklu Seçim Avantajları
- Paralel İşleme: Birden çok dal aynı anda yürütülebilir
- Koşullu Yayma: Hedef sayısı içeriğe göre değişir
- İçerik Kullanan Yönlendirme: Yalnızca türe değil, ileti özelliklerine dayalı kararlar
- Verimli Kaynak Kullanımı: Yalnızca gerekli dallar etkinleştirilir
- Karmaşık İş Mantığı: Gelişmiş yönlendirme senaryolarını destekler
C# Real-World Uygulamaları
- E-posta Sistemleri: Yanıt yardımcısı + arşiv + analize yönlendirme (koşullu olarak)
- İçerik İşleme: Transkripsiyon + çeviri + çözümlemeyi tetikleme (içerik türüne göre)
- Sipariş İşleme: Gönderime yönlendirme + faturalama + bildirimler (sipariş özelliklerine göre)
- Veri İşlem Hatları: Veri özelliklerine göre farklı analiz akışlarını tetikleme
Çok Seçimli Örnek Kod
Tam çalışma uygulaması için Agent Framework deposundaki multi_selection_edge_group.py örneğine bakın.