Remarque
L’accès à cette page nécessite une autorisation. Vous pouvez essayer de vous connecter ou de modifier des répertoires.
L’accès à cette page nécessite une autorisation. Vous pouvez essayer de modifier des répertoires.
Dans ce tutoriel, vous allez apprendre à créer un flux de travail avec une logique de branchement à l’aide d’Agent Framework. La logique de branchement permet à votre flux de travail de prendre des décisions en fonction de certaines conditions, ce qui permet un comportement plus complexe et dynamique.
Arêtes conditionnelles
Les arêtes conditionnelles permettent à votre flux de travail de prendre des décisions de routage en fonction du contenu ou des propriétés des messages qui transitent par le flux de travail. Cela permet la branche dynamique où différents chemins d’exécution sont pris en fonction des conditions d’exécution.
Ce que vous allez construire
Vous allez créer un flux de travail de traitement de courrier qui illustre le routage conditionnel :
- Agent de détection de courrier indésirable qui analyse les e-mails entrants et retourne un JSON structuré.
- Arêtes conditionnelles qui acheminent les e-mails vers différents gestionnaires en fonction de la classification.
- Gestionnaire de messagerie légitime qui rédige des réponses professionnelles.
- Gestionnaire de courrier indésirable qui marque les e-mails suspects.
- Gestion de l’état partagé pour conserver les données de messagerie entre les étapes du flux de travail.
Concepts abordés
Prerequisites
- Sdk .NET 8.0 ou version ultérieure.
- Point de terminaison et déploiement du service Azure OpenAI configurés.
- Azure CLI installé et authentifié (pour l’authentification des informations d’identification Azure)
- Compréhension de base de la programmation C# et asynchrone.
- Nouvelle application console.
Installer les packages NuGet
Tout d’abord, installez les packages requis pour votre projet .NET :
dotnet add package Azure.AI.OpenAI --prerelease
dotnet add package Azure.Identity
dotnet add package Microsoft.Agents.AI.Workflows --prerelease
dotnet add package Microsoft.Extensions.AI.OpenAI --prerelease
Définir des modèles de données
Commencez par définir les structures de données qui transitent par votre flux de travail :
using System.Text.Json.Serialization;
/// <summary>
/// Represents the result of spam detection.
/// </summary>
public sealed class DetectionResult
{
[JsonPropertyName("is_spam")]
public bool IsSpam { get; set; }
[JsonPropertyName("reason")]
public string Reason { get; set; } = string.Empty;
// Email ID is generated by the executor, not the agent
[JsonIgnore]
public string EmailId { get; set; } = string.Empty;
}
/// <summary>
/// Represents an email.
/// </summary>
internal sealed class Email
{
[JsonPropertyName("email_id")]
public string EmailId { get; set; } = string.Empty;
[JsonPropertyName("email_content")]
public string EmailContent { get; set; } = string.Empty;
}
/// <summary>
/// Represents the response from the email assistant.
/// </summary>
public sealed class EmailResponse
{
[JsonPropertyName("response")]
public string Response { get; set; } = string.Empty;
}
/// <summary>
/// Constants for shared state scopes.
/// </summary>
internal static class EmailStateConstants
{
public const string EmailStateScope = "EmailState";
}
Créer des fonctions de condition
La fonction condition évalue le résultat de détection du courrier indésirable pour déterminer le chemin d’accès que le flux de travail doit prendre :
/// <summary>
/// Creates a condition for routing messages based on the expected spam detection result.
/// </summary>
/// <param name="expectedResult">The expected spam detection result</param>
/// <returns>A function that evaluates whether a message meets the expected result</returns>
private static Func<object?, bool> GetCondition(bool expectedResult) =>
detectionResult => detectionResult is DetectionResult result && result.IsSpam == expectedResult;
Cette fonction conditionnelle :
- Prend un
bool expectedResultparamètre (vrai pour spam, faux pour non-spam) - Retourne une fonction qui peut être utilisée comme condition de périphérie
- Vérifie en toute sécurité si le message est un
DetectionResultet compare laIsSpampropriété
Créer des agents IA
Configurez les agents IA qui gèreront la détection de courrier indésirable et l’assistance par e-mail :
using Azure.AI.OpenAI;
using Azure.Identity;
using Microsoft.Agents.AI;
using Microsoft.Extensions.AI;
/// <summary>
/// Creates a spam detection agent.
/// </summary>
/// <returns>A ChatClientAgent configured for spam detection</returns>
private static ChatClientAgent GetSpamDetectionAgent(IChatClient chatClient) =>
new(chatClient, new ChatClientAgentOptions(instructions: "You are a spam detection assistant that identifies spam emails.")
{
ChatOptions = new()
{
ResponseFormat = ChatResponseFormat.ForJsonSchema(AIJsonUtilities.CreateJsonSchema(typeof(DetectionResult)))
}
});
/// <summary>
/// Creates an email assistant agent.
/// </summary>
/// <returns>A ChatClientAgent configured for email assistance</returns>
private static ChatClientAgent GetEmailAssistantAgent(IChatClient chatClient) =>
new(chatClient, new ChatClientAgentOptions(instructions: "You are an email assistant that helps users draft professional responses to emails.")
{
ChatOptions = new()
{
ResponseFormat = ChatResponseFormat.ForJsonSchema(AIJsonUtilities.CreateJsonSchema(typeof(EmailResponse)))
}
});
Implémenter des exécuteurs
Créez les exécuteurs de flux de travail qui gèrent différentes étapes du traitement des e-mails :
using Microsoft.Agents.AI.Workflows;
using System.Text.Json;
/// <summary>
/// Executor that detects spam using an AI agent.
/// </summary>
internal sealed class SpamDetectionExecutor : Executor<ChatMessage, DetectionResult>
{
private readonly AIAgent _spamDetectionAgent;
public SpamDetectionExecutor(AIAgent spamDetectionAgent) : base("SpamDetectionExecutor")
{
this._spamDetectionAgent = spamDetectionAgent;
}
public override async ValueTask<DetectionResult> HandleAsync(ChatMessage message, IWorkflowContext context, CancellationToken cancellationToken = default)
{
// Generate a random email ID and store the email content to shared state
var newEmail = new Email
{
EmailId = Guid.NewGuid().ToString("N"),
EmailContent = message.Text
};
await context.QueueStateUpdateAsync(newEmail.EmailId, newEmail, scopeName: EmailStateConstants.EmailStateScope);
// Invoke the agent for spam detection
var response = await this._spamDetectionAgent.RunAsync(message);
var detectionResult = JsonSerializer.Deserialize<DetectionResult>(response.Text);
detectionResult!.EmailId = newEmail.EmailId;
return detectionResult;
}
}
/// <summary>
/// Executor that assists with email responses using an AI agent.
/// </summary>
internal sealed class EmailAssistantExecutor : Executor<DetectionResult, EmailResponse>
{
private readonly AIAgent _emailAssistantAgent;
public EmailAssistantExecutor(AIAgent emailAssistantAgent) : base("EmailAssistantExecutor")
{
this._emailAssistantAgent = emailAssistantAgent;
}
public override async ValueTask<EmailResponse> HandleAsync(DetectionResult message, IWorkflowContext context, CancellationToken cancellationToken = default)
{
if (message.IsSpam)
{
throw new ArgumentException("This executor should only handle non-spam messages.");
}
// Retrieve the email content from shared state
var email = await context.ReadStateAsync<Email>(message.EmailId, scopeName: EmailStateConstants.EmailStateScope)
?? throw new InvalidOperationException("Email not found.");
// Invoke the agent to draft a response
var response = await this._emailAssistantAgent.RunAsync(email.EmailContent);
var emailResponse = JsonSerializer.Deserialize<EmailResponse>(response.Text);
return emailResponse!;
}
}
/// <summary>
/// Executor that sends emails.
/// </summary>
internal sealed class SendEmailExecutor : Executor<EmailResponse>
{
public SendEmailExecutor() : base("SendEmailExecutor") { }
public override async ValueTask HandleAsync(EmailResponse message, IWorkflowContext context, CancellationToken cancellationToken = default) =>
await context.YieldOutputAsync($"Email sent: {message.Response}");
}
/// <summary>
/// Executor that handles spam messages.
/// </summary>
internal sealed class HandleSpamExecutor : Executor<DetectionResult>
{
public HandleSpamExecutor() : base("HandleSpamExecutor") { }
public override async ValueTask HandleAsync(DetectionResult message, IWorkflowContext context, CancellationToken cancellationToken = default)
{
if (message.IsSpam)
{
await context.YieldOutputAsync($"Email marked as spam: {message.Reason}");
}
else
{
throw new ArgumentException("This executor should only handle spam messages.");
}
}
}
Générer le flux de travail avec des arêtes conditionnelles
Créez maintenant le programme principal qui génère et exécute le flux de travail :
using Microsoft.Extensions.AI;
public static class Program
{
private static async Task Main()
{
// Set up the Azure OpenAI client
var endpoint = Environment.GetEnvironmentVariable("AZURE_OPENAI_ENDPOINT")
?? throw new Exception("AZURE_OPENAI_ENDPOINT is not set.");
var deploymentName = Environment.GetEnvironmentVariable("AZURE_OPENAI_DEPLOYMENT_NAME") ?? "gpt-4o-mini";
var chatClient = new AzureOpenAIClient(new Uri(endpoint), new AzureCliCredential())
.GetChatClient(deploymentName).AsIChatClient();
// Create agents
AIAgent spamDetectionAgent = GetSpamDetectionAgent(chatClient);
AIAgent emailAssistantAgent = GetEmailAssistantAgent(chatClient);
// Create executors
var spamDetectionExecutor = new SpamDetectionExecutor(spamDetectionAgent);
var emailAssistantExecutor = new EmailAssistantExecutor(emailAssistantAgent);
var sendEmailExecutor = new SendEmailExecutor();
var handleSpamExecutor = new HandleSpamExecutor();
// Build the workflow with conditional edges
var workflow = new WorkflowBuilder(spamDetectionExecutor)
// Non-spam path: route to email assistant when IsSpam = false
.AddEdge(spamDetectionExecutor, emailAssistantExecutor, condition: GetCondition(expectedResult: false))
.AddEdge(emailAssistantExecutor, sendEmailExecutor)
// Spam path: route to spam handler when IsSpam = true
.AddEdge(spamDetectionExecutor, handleSpamExecutor, condition: GetCondition(expectedResult: true))
.WithOutputFrom(handleSpamExecutor, sendEmailExecutor)
.Build();
// Execute the workflow with sample spam email
string emailContent = "Congratulations! You've won $1,000,000! Click here to claim your prize now!";
StreamingRun run = await InProcessExecution.StreamAsync(workflow, new ChatMessage(ChatRole.User, emailContent));
await run.TrySendMessageAsync(new TurnToken(emitEvents: true));
await foreach (WorkflowEvent evt in run.WatchStreamAsync().ConfigureAwait(false))
{
if (evt is WorkflowOutputEvent outputEvent)
{
Console.WriteLine($"{outputEvent}");
}
}
}
}
Fonctionnement
Entrée de flux de travail : le flux de travail commence par
spamDetectionExecutorrecevoir unChatMessage.Analyse du courrier indésirable : l'agent de détection de courrier indésirable analyse l’e-mail et retourne une structure
DetectionResultavec des propriétésIsSpametReason.Routage conditionnel : en fonction de la
IsSpamvaleur :-
Si spam (
IsSpam = true) : itinéraires versHandleSpamExecutoren utilisantGetCondition(true) -
S’il est légitime (
IsSpam = false) : Itinéraires versEmailAssistantExecutorutilisantGetCondition(false)
-
Si spam (
Génération de réponse : pour les e-mails légitimes, l’Assistant e-mail rédige une réponse professionnelle.
Sortie finale : le flux de travail génère une notification de courrier indésirable ou envoie la réponse par e-mail brouillon.
Fonctionnalités clés des arêtes conditionnelles
Type-Safe Conditions : la
GetConditionméthode crée des fonctions de condition réutilisables qui évaluent en toute sécurité le contenu du message.Chemins multiples : un seul exécuteur peut avoir plusieurs arêtes sortantes avec différentes conditions, ce qui permet une logique de branchement complexe.
État partagé : les données de messagerie sont conservées entre les exécuteurs à l’aide de la gestion d’état délimitée, ce qui permet aux exécuteurs en aval d’accéder au contenu d’origine.
Gestion des erreurs : les exécuteurs valident leurs entrées et lèvent des exceptions significatives lors de la réception de types de messages inattendus.
Architecture propre : chaque exécuteur a une responsabilité unique, ce qui rend le workflow maintenable et testable.
Exécution de l’exemple
Lorsque vous exécutez ce flux de travail avec l’exemple d’e-mail de courrier indésirable :
Email marked as spam: This email contains common spam indicators including monetary prizes, urgency tactics, and suspicious links that are typical of phishing attempts.
Essayez de modifier le contenu de l’e-mail en quelque chose de légitime :
string emailContent = "Hi, I wanted to follow up on our meeting yesterday and get your thoughts on the project proposal.";
Le flux de travail sera acheminé vers l’Assistant e-mail et générera une réponse professionnelle.
Ce modèle de routage conditionnel constitue la base de la création de flux de travail sophistiqués qui peuvent gérer des arbres de décision complexes et une logique métier.
Implémentation complète
Pour obtenir l’implémentation de travail complète, consultez cet exemple dans le référentiel Agent Framework.
Ce que vous allez construire
Vous allez créer un flux de travail de traitement de courrier qui illustre le routage conditionnel :
- Agent de détection de courrier indésirable qui analyse les e-mails entrants
- Arêtes conditionnelles qui acheminent les e-mails vers différents gestionnaires en fonction de la classification
- Gestionnaire de messagerie légitime qui rédige des réponses professionnelles
- Gestionnaire de courrier indésirable qui marque les e-mails suspects
Concepts abordés
Prerequisites
- Python 3.10 ou version ultérieure
- Agent Framework installé :
pip install agent-framework-core --pre - Service Azure OpenAI configuré avec des variables d’environnement appropriées
- Authentification Azure CLI :
az login
Étape 1 : Importer les dépendances requises
Commencez par importer les composants nécessaires pour les flux de travail conditionnels :
import asyncio
import os
from dataclasses import dataclass
from typing import Any, Literal
from uuid import uuid4
from typing_extensions import Never
from agent_framework import (
AgentExecutor,
AgentExecutorRequest,
AgentExecutorResponse,
ChatMessage,
Role,
WorkflowBuilder,
WorkflowContext,
executor,
Case,
Default,
)
from agent_framework.azure import AzureOpenAIChatClient
from azure.identity import AzureCliCredential
from pydantic import BaseModel
Étape 2 : Définir des modèles de données
Créez des modèles Pydantic pour l’échange de données structurées entre les composants de flux de travail :
class DetectionResult(BaseModel):
"""Represents the result of spam detection."""
# is_spam drives the routing decision taken by edge conditions
is_spam: bool
# Human readable rationale from the detector
reason: str
# The agent must include the original email so downstream agents can operate without reloading content
email_content: str
class EmailResponse(BaseModel):
"""Represents the response from the email assistant."""
# The drafted reply that a user could copy or send
response: str
Étape 3 : Créer des fonctions de condition
Définissez les fonctions de condition qui déterminent les décisions de routage :
def get_condition(expected_result: bool):
"""Create a condition callable that routes based on DetectionResult.is_spam."""
# The returned function will be used as an edge predicate.
# It receives whatever the upstream executor produced.
def condition(message: Any) -> bool:
# Defensive guard. If a non AgentExecutorResponse appears, let the edge pass to avoid dead ends.
if not isinstance(message, AgentExecutorResponse):
return True
try:
# Prefer parsing a structured DetectionResult from the agent JSON text.
# Using model_validate_json ensures type safety and raises if the shape is wrong.
detection = DetectionResult.model_validate_json(message.agent_run_response.text)
# Route only when the spam flag matches the expected path.
return detection.is_spam == expected_result
except Exception:
# Fail closed on parse errors so we do not accidentally route to the wrong path.
# Returning False prevents this edge from activating.
return False
return condition
Étape 4 : Créer des gestionnaires de tâches
Définissez des exécuteurs pour gérer différents résultats de routage :
@executor(id="send_email")
async def handle_email_response(response: AgentExecutorResponse, ctx: WorkflowContext[Never, str]) -> None:
"""Handle legitimate emails by drafting a professional response."""
# Downstream of the email assistant. Parse a validated EmailResponse and yield the workflow output.
email_response = EmailResponse.model_validate_json(response.agent_run_response.text)
await ctx.yield_output(f"Email sent:\n{email_response.response}")
@executor(id="handle_spam")
async def handle_spam_classifier_response(response: AgentExecutorResponse, ctx: WorkflowContext[Never, str]) -> None:
"""Handle spam emails by marking them appropriately."""
# Spam path. Confirm the DetectionResult and yield the workflow output. Guard against accidental non spam input.
detection = DetectionResult.model_validate_json(response.agent_run_response.text)
if detection.is_spam:
await ctx.yield_output(f"Email marked as spam: {detection.reason}")
else:
# This indicates the routing predicate and executor contract are out of sync.
raise RuntimeError("This executor should only handle spam messages.")
@executor(id="to_email_assistant_request")
async def to_email_assistant_request(
response: AgentExecutorResponse, ctx: WorkflowContext[AgentExecutorRequest]
) -> None:
"""Transform spam detection response into a request for the email assistant."""
# Parse the detection result and extract the email content for the assistant
detection = DetectionResult.model_validate_json(response.agent_run_response.text)
# Create a new request for the email assistant with the original email content
request = AgentExecutorRequest(
messages=[ChatMessage(Role.USER, text=detection.email_content)],
should_respond=True
)
await ctx.send_message(request)
Étape 5 : Créer des agents IA
Configurez les agents Azure OpenAI avec une mise en forme de sortie structurée :
async def main() -> None:
# Create agents
# AzureCliCredential uses your current az login. This avoids embedding secrets in code.
chat_client = AzureOpenAIChatClient(credential=AzureCliCredential())
# Agent 1. Classifies spam and returns a DetectionResult object.
# response_format enforces that the LLM returns parsable JSON for the Pydantic model.
spam_detection_agent = AgentExecutor(
chat_client.create_agent(
instructions=(
"You are a spam detection assistant that identifies spam emails. "
"Always return JSON with fields is_spam (bool), reason (string), and email_content (string). "
"Include the original email content in email_content."
),
response_format=DetectionResult,
),
id="spam_detection_agent",
)
# Agent 2. Drafts a professional reply. Also uses structured JSON output for reliability.
email_assistant_agent = AgentExecutor(
chat_client.create_agent(
instructions=(
"You are an email assistant that helps users draft professional responses to emails. "
"Your input might be a JSON object that includes 'email_content'; base your reply on that content. "
"Return JSON with a single field 'response' containing the drafted reply."
),
response_format=EmailResponse,
),
id="email_assistant_agent",
)
Étape 6 : Générer le flux de travail conditionnel
Créez un flux de travail avec des arêtes conditionnelles qui routent en fonction des résultats de détection de courrier indésirable :
# Build the workflow graph.
# Start at the spam detector.
# If not spam, hop to a transformer that creates a new AgentExecutorRequest,
# then call the email assistant, then finalize.
# If spam, go directly to the spam handler and finalize.
workflow = (
WorkflowBuilder()
.set_start_executor(spam_detection_agent)
# Not spam path: transform response -> request for assistant -> assistant -> send email
.add_edge(spam_detection_agent, to_email_assistant_request, condition=get_condition(False))
.add_edge(to_email_assistant_request, email_assistant_agent)
.add_edge(email_assistant_agent, handle_email_response)
# Spam path: send to spam handler
.add_edge(spam_detection_agent, handle_spam_classifier_response, condition=get_condition(True))
.build()
)
Étape 7 : Exécuter le flux de travail
Exécutez le flux de travail avec des exemples de contenu de messagerie :
# Read Email content from the sample resource file.
# This keeps the sample deterministic since the model sees the same email every run.
email_path = os.path.join(os.path.dirname(os.path.dirname(os.path.realpath(__file__))), "resources", "email.txt")
with open(email_path) as email_file: # noqa: ASYNC230
email = email_file.read()
# Execute the workflow. Since the start is an AgentExecutor, pass an AgentExecutorRequest.
# The workflow completes when it becomes idle (no more work to do).
request = AgentExecutorRequest(messages=[ChatMessage(Role.USER, text=email)], should_respond=True)
events = await workflow.run(request)
outputs = events.get_outputs()
if outputs:
print(f"Workflow output: {outputs[0]}")
if __name__ == "__main__":
asyncio.run(main())
Fonctionnement des arêtes conditionnelles
Fonctions de condition : la
get_condition()fonction crée un prédicat qui examine le contenu du message et retourneTrueouFalsepour déterminer si le bord doit être parcouru.Inspection des messages : les conditions peuvent évaluer n’importe quel aspect du message, y compris les données structurées issues des réponses de l’agent, traitées avec des modèles Pydantic.
Programmation défensive : la fonction condition inclut la gestion des erreurs pour empêcher les échecs de routage lors de l’analyse des données structurées.
Routage dynamique : en fonction du résultat de la détection du courrier indésirable, les e-mails sont automatiquement acheminés vers l’Assistant e-mail (pour les e-mails légitimes) ou le gestionnaire de courrier indésirable (pour les e-mails suspects).
Concepts clés
- Conditions de périphérie : prédicats booléens qui déterminent si un bord doit être parcouru
-
Sorties structurées : L’utilisation de modèles Pydantic garantit
response_formatl’analyse fiable des données - Routage défensif : les fonctions de condition gèrent les cas de périphérie pour empêcher les blocages du flux de travail
- Transformation de message : les exécuteurs peuvent transformer les types de messages entre les étapes du flux de travail
Implémentation complète
Pour obtenir l’implémentation de travail complète, consultez l’exemple edge_condition.py dans le référentiel Agent Framework.
Arêtes de Switch-Case
Développement sur des bases conditionnelles
L’exemple précédent d’arêtes conditionnelles a montré un routage bidirectionnel (courrier indésirable et e-mails légitimes). Toutefois, de nombreux scénarios réels nécessitent des arbres de décision plus sophistiqués. Les arêtes switch-case offrent une solution plus propre et plus facile à gérer lorsque vous devez acheminer vers plusieurs destinations en fonction de différentes conditions.
Ce que vous allez créer avec Switch-Case
Vous allez étendre le flux de travail de traitement des e-mails pour gérer trois chemins de décision :
- NotSpam → L’Assistant Courrier → Envoyer un e-mail
- Courrier indésirable → gérer l’exécuteur de courrier indésirable
- Incertain → Gérer l’exécuteur incertain (cas par défaut)
L’amélioration clé consiste à utiliser le SwitchBuilder modèle au lieu de plusieurs bords conditionnels individuels, ce qui facilite la compréhension et la maintenance du flux de travail à mesure que la complexité de la décision augmente.
Concepts abordés
Modèles de données pour Switch-Case
Mettez à jour vos modèles de données pour soutenir la classification tridirectionnelle.
/// <summary>
/// Represents the possible decisions for spam detection.
/// </summary>
public enum SpamDecision
{
NotSpam,
Spam,
Uncertain
}
/// <summary>
/// Represents the result of spam detection with enhanced decision support.
/// </summary>
public sealed class DetectionResult
{
[JsonPropertyName("spam_decision")]
[JsonConverter(typeof(JsonStringEnumConverter))]
public SpamDecision spamDecision { get; set; }
[JsonPropertyName("reason")]
public string Reason { get; set; } = string.Empty;
// Email ID is generated by the executor, not the agent
[JsonIgnore]
public string EmailId { get; set; } = string.Empty;
}
/// <summary>
/// Represents an email stored in shared state.
/// </summary>
internal sealed class Email
{
[JsonPropertyName("email_id")]
public string EmailId { get; set; } = string.Empty;
[JsonPropertyName("email_content")]
public string EmailContent { get; set; } = string.Empty;
}
/// <summary>
/// Represents the response from the email assistant.
/// </summary>
public sealed class EmailResponse
{
[JsonPropertyName("response")]
public string Response { get; set; } = string.Empty;
}
/// <summary>
/// Constants for shared state scopes.
/// </summary>
internal static class EmailStateConstants
{
public const string EmailStateScope = "EmailState";
}
Fabrique de conditions pour Switch-Case
Créez une fabrique de conditions réutilisables qui génère des prédicats pour chaque décision de courrier indésirable :
/// <summary>
/// Creates a condition for routing messages based on the expected spam detection result.
/// </summary>
/// <param name="expectedDecision">The expected spam detection decision</param>
/// <returns>A function that evaluates whether a message meets the expected result</returns>
private static Func<object?, bool> GetCondition(SpamDecision expectedDecision) =>
detectionResult => detectionResult is DetectionResult result && result.spamDecision == expectedDecision;
Cette approche usinée :
- Réduit la duplication de code : une fonction génère tous les prédicats de condition
- Garantit la cohérence : toutes les conditions suivent le même modèle
- Simplifie la maintenance : les modifications apportées à la logique de condition se produisent à un seul endroit
Agent IA amélioré
Mettez à jour l'agent de détection du spam pour qu'il soit moins sûr et afin de retourner des classifications tridirectionnelles.
/// <summary>
/// Creates a spam detection agent with enhanced uncertainty handling.
/// </summary>
/// <returns>A ChatClientAgent configured for three-way spam detection</returns>
private static ChatClientAgent GetSpamDetectionAgent(IChatClient chatClient) =>
new(chatClient, new ChatClientAgentOptions(instructions: "You are a spam detection assistant that identifies spam emails. Be less confident in your assessments.")
{
ChatOptions = new()
{
ResponseFormat = ChatResponseFormat.ForJsonSchema<DetectionResult>()
}
});
/// <summary>
/// Creates an email assistant agent (unchanged from conditional edges example).
/// </summary>
/// <returns>A ChatClientAgent configured for email assistance</returns>
private static ChatClientAgent GetEmailAssistantAgent(IChatClient chatClient) =>
new(chatClient, new ChatClientAgentOptions(instructions: "You are an email assistant that helps users draft responses to emails with professionalism.")
{
ChatOptions = new()
{
ResponseFormat = ChatResponseFormat.ForJsonSchema<EmailResponse>()
}
});
Exécuteurs de flux de travail avec routage amélioré
Implémentez des exécuteurs qui gèrent le routage tridirectionnel avec la gestion de l’état partagé :
/// <summary>
/// Executor that detects spam using an AI agent with three-way classification.
/// </summary>
internal sealed class SpamDetectionExecutor : Executor<ChatMessage, DetectionResult>
{
private readonly AIAgent _spamDetectionAgent;
public SpamDetectionExecutor(AIAgent spamDetectionAgent) : base("SpamDetectionExecutor")
{
this._spamDetectionAgent = spamDetectionAgent;
}
public override async ValueTask<DetectionResult> HandleAsync(ChatMessage message, IWorkflowContext context, CancellationToken cancellationToken = default)
{
// Generate a random email ID and store the email content in shared state
var newEmail = new Email
{
EmailId = Guid.NewGuid().ToString("N"),
EmailContent = message.Text
};
await context.QueueStateUpdateAsync(newEmail.EmailId, newEmail, scopeName: EmailStateConstants.EmailStateScope);
// Invoke the agent for enhanced spam detection
var response = await this._spamDetectionAgent.RunAsync(message);
var detectionResult = JsonSerializer.Deserialize<DetectionResult>(response.Text);
detectionResult!.EmailId = newEmail.EmailId;
return detectionResult;
}
}
/// <summary>
/// Executor that assists with email responses using an AI agent.
/// </summary>
internal sealed class EmailAssistantExecutor : Executor<DetectionResult, EmailResponse>
{
private readonly AIAgent _emailAssistantAgent;
public EmailAssistantExecutor(AIAgent emailAssistantAgent) : base("EmailAssistantExecutor")
{
this._emailAssistantAgent = emailAssistantAgent;
}
public override async ValueTask<EmailResponse> HandleAsync(DetectionResult message, IWorkflowContext context, CancellationToken cancellationToken = default)
{
if (message.spamDecision == SpamDecision.Spam)
{
throw new ArgumentException("This executor should only handle non-spam messages.");
}
// Retrieve the email content from shared state
var email = await context.ReadStateAsync<Email>(message.EmailId, scopeName: EmailStateConstants.EmailStateScope);
// Invoke the agent to draft a response
var response = await this._emailAssistantAgent.RunAsync(email!.EmailContent);
var emailResponse = JsonSerializer.Deserialize<EmailResponse>(response.Text);
return emailResponse!;
}
}
/// <summary>
/// Executor that sends emails.
/// </summary>
internal sealed class SendEmailExecutor : Executor<EmailResponse>
{
public SendEmailExecutor() : base("SendEmailExecutor") { }
public override async ValueTask HandleAsync(EmailResponse message, IWorkflowContext context, CancellationToken cancellationToken = default) =>
await context.YieldOutputAsync($"Email sent: {message.Response}").ConfigureAwait(false);
}
/// <summary>
/// Executor that handles spam messages.
/// </summary>
internal sealed class HandleSpamExecutor : Executor<DetectionResult>
{
public HandleSpamExecutor() : base("HandleSpamExecutor") { }
public override async ValueTask HandleAsync(DetectionResult message, IWorkflowContext context, CancellationToken cancellationToken = default)
{
if (message.spamDecision == SpamDecision.Spam)
{
await context.YieldOutputAsync($"Email marked as spam: {message.Reason}").ConfigureAwait(false);
}
else
{
throw new ArgumentException("This executor should only handle spam messages.");
}
}
}
/// <summary>
/// Executor that handles uncertain emails requiring manual review.
/// </summary>
internal sealed class HandleUncertainExecutor : Executor<DetectionResult>
{
public HandleUncertainExecutor() : base("HandleUncertainExecutor") { }
public override async ValueTask HandleAsync(DetectionResult message, IWorkflowContext context, CancellationToken cancellationToken = default)
{
if (message.spamDecision == SpamDecision.Uncertain)
{
var email = await context.ReadStateAsync<Email>(message.EmailId, scopeName: EmailStateConstants.EmailStateScope);
await context.YieldOutputAsync($"Email marked as uncertain: {message.Reason}. Email content: {email?.EmailContent}");
}
else
{
throw new ArgumentException("This executor should only handle uncertain spam decisions.");
}
}
}
Construire un flux de processus avec le modèle Switch-Case
Remplacez plusieurs bords conditionnels par le modèle de casse de commutateur plus propre :
public static class Program
{
private static async Task Main()
{
// Set up the Azure OpenAI client
var endpoint = Environment.GetEnvironmentVariable("AZURE_OPENAI_ENDPOINT") ?? throw new Exception("AZURE_OPENAI_ENDPOINT is not set.");
var deploymentName = Environment.GetEnvironmentVariable("AZURE_OPENAI_DEPLOYMENT_NAME") ?? "gpt-4o-mini";
var chatClient = new AzureOpenAIClient(new Uri(endpoint), new AzureCliCredential()).GetChatClient(deploymentName).AsIChatClient();
// Create agents
AIAgent spamDetectionAgent = GetSpamDetectionAgent(chatClient);
AIAgent emailAssistantAgent = GetEmailAssistantAgent(chatClient);
// Create executors
var spamDetectionExecutor = new SpamDetectionExecutor(spamDetectionAgent);
var emailAssistantExecutor = new EmailAssistantExecutor(emailAssistantAgent);
var sendEmailExecutor = new SendEmailExecutor();
var handleSpamExecutor = new HandleSpamExecutor();
var handleUncertainExecutor = new HandleUncertainExecutor();
// Build the workflow using switch-case for cleaner three-way routing
WorkflowBuilder builder = new(spamDetectionExecutor);
builder.AddSwitch(spamDetectionExecutor, switchBuilder =>
switchBuilder
.AddCase(
GetCondition(expectedDecision: SpamDecision.NotSpam),
emailAssistantExecutor
)
.AddCase(
GetCondition(expectedDecision: SpamDecision.Spam),
handleSpamExecutor
)
.WithDefault(
handleUncertainExecutor
)
)
// After the email assistant writes a response, it will be sent to the send email executor
.AddEdge(emailAssistantExecutor, sendEmailExecutor)
.WithOutputFrom(handleSpamExecutor, sendEmailExecutor, handleUncertainExecutor);
var workflow = builder.Build();
// Read an email from a text file (use ambiguous content for demonstration)
string email = Resources.Read("ambiguous_email.txt");
// Execute the workflow
StreamingRun run = await InProcessExecution.StreamAsync(workflow, new ChatMessage(ChatRole.User, email));
await run.TrySendMessageAsync(new TurnToken(emitEvents: true));
await foreach (WorkflowEvent evt in run.WatchStreamAsync().ConfigureAwait(false))
{
if (evt is WorkflowOutputEvent outputEvent)
{
Console.WriteLine($"{outputEvent}");
}
}
}
}
avantages Switch-Case
-
Syntaxe de nettoyage : La
SwitchBuildersolution offre une alternative plus lisible à plusieurs bords conditionnels - Évaluation ordonnée : les cas sont évalués séquentiellement, s'arrêtant au premier élément correspondant
-
Routage garanti : la
WithDefault()méthode garantit que les messages ne sont jamais bloqués - Meilleure facilité de maintenance : l’ajout de nouveaux cas nécessite des modifications minimales dans la structure de flux de travail
- Sécurité du type : chaque exécuteur valide son entrée pour intercepter les erreurs de routage tôt
Comparaison des modèles
Avant (arêtes conditionnelles) :
var workflow = new WorkflowBuilder(spamDetectionExecutor)
.AddEdge(spamDetectionExecutor, emailAssistantExecutor, condition: GetCondition(expectedResult: false))
.AddEdge(spamDetectionExecutor, handleSpamExecutor, condition: GetCondition(expectedResult: true))
// No clean way to handle a third case
.WithOutputFrom(handleSpamExecutor, sendEmailExecutor)
.Build();
Après (Switch-Case) :
WorkflowBuilder builder = new(spamDetectionExecutor);
builder.AddSwitch(spamDetectionExecutor, switchBuilder =>
switchBuilder
.AddCase(GetCondition(SpamDecision.NotSpam), emailAssistantExecutor)
.AddCase(GetCondition(SpamDecision.Spam), handleSpamExecutor)
.WithDefault(handleUncertainExecutor) // Clean default case
)
// Continue building the rest of the workflow
Le modèle switch-case s’adapte beaucoup mieux à mesure que le nombre de décisions de routage augmente, et le cas par défaut fournit un filet de sécurité pour les valeurs inattendues.
Exécution de l’exemple
Lorsque vous exécutez ce flux de travail avec du contenu de courrier ambigu :
Email marked as uncertain: This email contains promotional language but might be from a legitimate business contact, requiring human review for proper classification.
Essayez de modifier le contenu de l’e-mail en quelque chose clairement spam ou clairement légitime pour voir les différents chemins de routage en action.
Implémentation complète
Pour obtenir l’implémentation de travail complète, consultez cet exemple dans le référentiel Agent Framework.
Développement sur des bases conditionnelles
L’exemple précédent d’arêtes conditionnelles a montré un routage bidirectionnel (courrier indésirable et e-mails légitimes). Toutefois, de nombreux scénarios réels nécessitent des arbres de décision plus sophistiqués. Les arêtes switch-case offrent une solution plus propre et plus facile à gérer lorsque vous devez acheminer vers plusieurs destinations en fonction de différentes conditions.
Ce que vous allez construire ensuite
Vous allez étendre le flux de travail de traitement des e-mails pour gérer trois chemins de décision :
- NotSpam → L’Assistant Courrier → Envoyer un e-mail
- Courrier indésirable → marquer comme courrier indésirable
- Indicateur de → incertain pour la révision manuelle (cas par défaut)
L’amélioration clé consiste à utiliser un seul groupe de périphérie switch-case au lieu de plusieurs bords conditionnels individuels, ce qui facilite la compréhension et la maintenance du flux de travail à mesure que la complexité de la décision augmente.
Concepts abordés
Modèles de données améliorés
Mettez à jour vos modèles de données pour soutenir la classification tridirectionnelle.
from typing import Literal
class DetectionResultAgent(BaseModel):
"""Structured output returned by the spam detection agent."""
# The agent classifies the email into one of three categories
spam_decision: Literal["NotSpam", "Spam", "Uncertain"]
reason: str
class EmailResponse(BaseModel):
"""Structured output returned by the email assistant agent."""
response: str
@dataclass
class DetectionResult:
"""Internal typed payload used for routing and downstream handling."""
spam_decision: str
reason: str
email_id: str
@dataclass
class Email:
"""In memory record of the email content stored in shared state."""
email_id: str
email_content: str
Usine de conditions Switch-Case
Créez une fabrique de conditions réutilisables qui génère des prédicats pour chaque décision de courrier indésirable :
def get_case(expected_decision: str):
"""Factory that returns a predicate matching a specific spam_decision value."""
def condition(message: Any) -> bool:
# Only match when the upstream payload is a DetectionResult with the expected decision
return isinstance(message, DetectionResult) and message.spam_decision == expected_decision
return condition
Cette approche usinée :
- Réduit la duplication de code : une fonction génère tous les prédicats de condition
- Garantit la cohérence : toutes les conditions suivent le même modèle
- Simplifie la maintenance : les modifications apportées à la logique de condition se produisent à un seul endroit
Exécuteurs de flux de travail avec état partagé
Implémentez des exécuteurs qui utilisent l’état partagé pour éviter de transmettre du contenu de courrier volumineux à chaque étape du flux de travail :
EMAIL_STATE_PREFIX = "email:"
CURRENT_EMAIL_ID_KEY = "current_email_id"
@executor(id="store_email")
async def store_email(email_text: str, ctx: WorkflowContext[AgentExecutorRequest]) -> None:
"""Store email content once and pass around a lightweight ID reference."""
# Persist the raw email content in shared state
new_email = Email(email_id=str(uuid4()), email_content=email_text)
await ctx.set_shared_state(f"{EMAIL_STATE_PREFIX}{new_email.email_id}", new_email)
await ctx.set_shared_state(CURRENT_EMAIL_ID_KEY, new_email.email_id)
# Forward email to spam detection agent
await ctx.send_message(
AgentExecutorRequest(messages=[ChatMessage(Role.USER, text=new_email.email_content)], should_respond=True)
)
@executor(id="to_detection_result")
async def to_detection_result(response: AgentExecutorResponse, ctx: WorkflowContext[DetectionResult]) -> None:
"""Transform agent response into a typed DetectionResult with email ID."""
# Parse the agent's structured JSON output
parsed = DetectionResultAgent.model_validate_json(response.agent_run_response.text)
email_id: str = await ctx.get_shared_state(CURRENT_EMAIL_ID_KEY)
# Create typed message for switch-case routing
await ctx.send_message(DetectionResult(
spam_decision=parsed.spam_decision,
reason=parsed.reason,
email_id=email_id
))
@executor(id="submit_to_email_assistant")
async def submit_to_email_assistant(detection: DetectionResult, ctx: WorkflowContext[AgentExecutorRequest]) -> None:
"""Handle NotSpam emails by forwarding to the email assistant."""
# Guard against misrouting
if detection.spam_decision != "NotSpam":
raise RuntimeError("This executor should only handle NotSpam messages.")
# Retrieve original email content from shared state
email: Email = await ctx.get_shared_state(f"{EMAIL_STATE_PREFIX}{detection.email_id}")
await ctx.send_message(
AgentExecutorRequest(messages=[ChatMessage(Role.USER, text=email.email_content)], should_respond=True)
)
@executor(id="finalize_and_send")
async def finalize_and_send(response: AgentExecutorResponse, ctx: WorkflowContext[Never, str]) -> None:
"""Parse email assistant response and yield final output."""
parsed = EmailResponse.model_validate_json(response.agent_run_response.text)
await ctx.yield_output(f"Email sent: {parsed.response}")
@executor(id="handle_spam")
async def handle_spam(detection: DetectionResult, ctx: WorkflowContext[Never, str]) -> None:
"""Handle confirmed spam emails."""
if detection.spam_decision == "Spam":
await ctx.yield_output(f"Email marked as spam: {detection.reason}")
else:
raise RuntimeError("This executor should only handle Spam messages.")
@executor(id="handle_uncertain")
async def handle_uncertain(detection: DetectionResult, ctx: WorkflowContext[Never, str]) -> None:
"""Handle uncertain classifications that need manual review."""
if detection.spam_decision == "Uncertain":
# Include original content for human review
email: Email | None = await ctx.get_shared_state(f"{EMAIL_STATE_PREFIX}{detection.email_id}")
await ctx.yield_output(
f"Email marked as uncertain: {detection.reason}. Email content: {getattr(email, 'email_content', '')}"
)
else:
raise RuntimeError("This executor should only handle Uncertain messages.")
Créer un agent IA amélioré
Mettez à jour l'agent de détection du spam pour qu'il soit moins sûr et afin de retourner des classifications tridirectionnelles.
async def main():
chat_client = AzureOpenAIChatClient(credential=AzureCliCredential())
# Enhanced spam detection agent with three-way classification
spam_detection_agent = AgentExecutor(
chat_client.create_agent(
instructions=(
"You are a spam detection assistant that identifies spam emails. "
"Be less confident in your assessments. "
"Always return JSON with fields 'spam_decision' (one of NotSpam, Spam, Uncertain) "
"and 'reason' (string)."
),
response_format=DetectionResultAgent,
),
id="spam_detection_agent",
)
# Email assistant remains the same
email_assistant_agent = AgentExecutor(
chat_client.create_agent(
instructions=(
"You are an email assistant that helps users draft responses to emails with professionalism."
),
response_format=EmailResponse,
),
id="email_assistant_agent",
)
Générer un flux de travail avec Switch-Case groupe Edge
remplacez plusieurs arêtes conditionnelles par un seul groupe switch-case :
# Build workflow using switch-case for cleaner three-way routing
workflow = (
WorkflowBuilder()
.set_start_executor(store_email)
.add_edge(store_email, spam_detection_agent)
.add_edge(spam_detection_agent, to_detection_result)
.add_switch_case_edge_group(
to_detection_result,
[
# Explicit cases for specific decisions
Case(condition=get_case("NotSpam"), target=submit_to_email_assistant),
Case(condition=get_case("Spam"), target=handle_spam),
# Default case catches anything that doesn't match above
Default(target=handle_uncertain),
],
)
.add_edge(submit_to_email_assistant, email_assistant_agent)
.add_edge(email_assistant_agent, finalize_and_send)
.build()
)
Exécuter et tester
Exécutez le flux de travail avec du contenu de messagerie ambiguë qui illustre le routage tridirectionnel :
# Use ambiguous email content that might trigger uncertain classification
email = (
"Hey there, I noticed you might be interested in our latest offer—no pressure, but it expires soon. "
"Let me know if you'd like more details."
)
# Execute and display results
events = await workflow.run(email)
outputs = events.get_outputs()
if outputs:
for output in outputs:
print(f"Workflow output: {output}")
Avantages clés des arêtes Switch-Case
- Syntaxe de nettoyage : un groupe de bords au lieu de plusieurs bords conditionnels
- Évaluation ordonnée : les cas sont évalués séquentiellement, s'arrêtant au premier élément correspondant
- Routage garanti : le cas par défaut garantit que les messages ne sont jamais bloqués
- Meilleure maintenance : l’ajout de nouveaux cas nécessite des modifications minimales
- Sécurité du type : chaque exécuteur valide son entrée pour intercepter les erreurs de routage
Comparaison : conditionnel et Switch-Case
Avant (arêtes conditionnelles) :
.add_edge(detector, handler_a, condition=lambda x: x.result == "A")
.add_edge(detector, handler_b, condition=lambda x: x.result == "B")
.add_edge(detector, handler_c, condition=lambda x: x.result == "C")
Après (Switch-Case) :
.add_switch_case_edge_group(
detector,
[
Case(condition=lambda x: x.result == "A", target=handler_a),
Case(condition=lambda x: x.result == "B", target=handler_b),
Default(target=handler_c), # Catches everything else
],
)
Le modèle switch-case s’adapte beaucoup mieux à mesure que le nombre de décisions de routage augmente, et le cas par défaut fournit un filet de sécurité pour les valeurs inattendues.
Switch-Case exemple de code
Pour obtenir l’implémentation de travail complète, consultez l’exemple de switch_case_edge_group.py dans le référentiel Agent Framework.
Arêtes de sélection multiple
Au-delà du commutateur : routage à sélection multiple
Les branches switch-case acheminent les messages vers exactement une seule destination, mais les flux de travail réels doivent souvent déclencher plusieurs opérations parallèles en fonction des propriétés des données. Les bords partitionnés (implémentés en tant que bords de ventilateur avec des partitionneurs) permettent des modèles sophistiqués de fan-out où un seul message peut activer simultanément plusieurs exécuteurs en aval.
Flux de travail avancé de traitement des e-mails
En se basant sur l’exemple de cas de commutateur, vous allez créer un système de traitement de courrier amélioré qui illustre une logique de routage sophistiquée :
- Courrier indésirable → gestionnaire de courrier indésirable unique (par exemple, switch-case)
- E-mails légitimes → Déclencher toujours l'assistant e-mail + Déclencher conditionnellement le résumeur pour les e-mails longs
- E-mails incertains → gestionnaire incertain unique (par exemple, switch-case)
- Persistance de base de données → déclenchée pour les e-mails courts et les e-mails longs résumés
Ce modèle permet des pipelines de traitement parallèles qui s’adaptent aux caractéristiques de contenu.
Concepts abordés
Modèles de données pour la sélection multiple
Étendez les modèles de données pour prendre en charge l’analyse de longueur des e-mails et la synthèse :
/// <summary>
/// Represents the result of enhanced email analysis with additional metadata.
/// </summary>
public sealed class AnalysisResult
{
[JsonPropertyName("spam_decision")]
[JsonConverter(typeof(JsonStringEnumConverter))]
public SpamDecision spamDecision { get; set; }
[JsonPropertyName("reason")]
public string Reason { get; set; } = string.Empty;
// Additional properties for sophisticated routing
[JsonIgnore]
public int EmailLength { get; set; }
[JsonIgnore]
public string EmailSummary { get; set; } = string.Empty;
[JsonIgnore]
public string EmailId { get; set; } = string.Empty;
}
/// <summary>
/// Represents the response from the email assistant.
/// </summary>
public sealed class EmailResponse
{
[JsonPropertyName("response")]
public string Response { get; set; } = string.Empty;
}
/// <summary>
/// Represents the response from the email summary agent.
/// </summary>
public sealed class EmailSummary
{
[JsonPropertyName("summary")]
public string Summary { get; set; } = string.Empty;
}
/// <summary>
/// A custom workflow event for database operations.
/// </summary>
internal sealed class DatabaseEvent(string message) : WorkflowEvent(message) { }
/// <summary>
/// Constants for email processing thresholds.
/// </summary>
public static class EmailProcessingConstants
{
public const int LongEmailThreshold = 100;
}
Fonction d’assigneur cible : cœur de la sélection multiple
La fonction d’assigneur cible détermine les exécuteurs qui doivent recevoir chaque message :
/// <summary>
/// Creates a target assigner for routing messages based on the analysis result.
/// </summary>
/// <returns>A function that takes an analysis result and returns the target partitions.</returns>
private static Func<AnalysisResult?, int, IEnumerable<int>> GetTargetAssigner()
{
return (analysisResult, targetCount) =>
{
if (analysisResult is not null)
{
if (analysisResult.spamDecision == SpamDecision.Spam)
{
return [0]; // Route only to spam handler (index 0)
}
else if (analysisResult.spamDecision == SpamDecision.NotSpam)
{
// Always route to email assistant (index 1)
List<int> targets = [1];
// Conditionally add summarizer for long emails (index 2)
if (analysisResult.EmailLength > EmailProcessingConstants.LongEmailThreshold)
{
targets.Add(2);
}
return targets;
}
else // Uncertain
{
return [3]; // Route only to uncertain handler (index 3)
}
}
throw new ArgumentException("Invalid analysis result.");
};
}
Fonctionnalités clés de la fonction d’assigneur cible
- Sélection cible dynamique : retourne une liste d’index d’exécuteur à activer
- Routage prenant en charge le contenu : prend des décisions en fonction des propriétés de message telles que la longueur du courrier électronique
- Traitement parallèle : plusieurs cibles peuvent s’exécuter simultanément
- Logique conditionnelle : branchement complexe basé sur plusieurs critères
Exécuteurs de flux de travail améliorés
Implémentez des exécuteurs qui gèrent l’analyse et le routage avancés :
/// <summary>
/// Executor that analyzes emails using an AI agent with enhanced analysis.
/// </summary>
internal sealed class EmailAnalysisExecutor : Executor<ChatMessage, AnalysisResult>
{
private readonly AIAgent _emailAnalysisAgent;
public EmailAnalysisExecutor(AIAgent emailAnalysisAgent) : base("EmailAnalysisExecutor")
{
this._emailAnalysisAgent = emailAnalysisAgent;
}
public override async ValueTask<AnalysisResult> HandleAsync(ChatMessage message, IWorkflowContext context, CancellationToken cancellationToken = default)
{
// Generate a random email ID and store the email content
var newEmail = new Email
{
EmailId = Guid.NewGuid().ToString("N"),
EmailContent = message.Text
};
await context.QueueStateUpdateAsync(newEmail.EmailId, newEmail, scopeName: EmailStateConstants.EmailStateScope);
// Invoke the agent for enhanced analysis
var response = await this._emailAnalysisAgent.RunAsync(message);
var analysisResult = JsonSerializer.Deserialize<AnalysisResult>(response.Text);
// Enrich with metadata for routing decisions
analysisResult!.EmailId = newEmail.EmailId;
analysisResult.EmailLength = newEmail.EmailContent.Length;
return analysisResult;
}
}
/// <summary>
/// Executor that assists with email responses using an AI agent.
/// </summary>
internal sealed class EmailAssistantExecutor : Executor<AnalysisResult, EmailResponse>
{
private readonly AIAgent _emailAssistantAgent;
public EmailAssistantExecutor(AIAgent emailAssistantAgent) : base("EmailAssistantExecutor")
{
this._emailAssistantAgent = emailAssistantAgent;
}
public override async ValueTask<EmailResponse> HandleAsync(AnalysisResult message, IWorkflowContext context, CancellationToken cancellationToken = default)
{
if (message.spamDecision == SpamDecision.Spam)
{
throw new ArgumentException("This executor should only handle non-spam messages.");
}
// Retrieve the email content from shared state
var email = await context.ReadStateAsync<Email>(message.EmailId, scopeName: EmailStateConstants.EmailStateScope);
// Invoke the agent to draft a response
var response = await this._emailAssistantAgent.RunAsync(email!.EmailContent);
var emailResponse = JsonSerializer.Deserialize<EmailResponse>(response.Text);
return emailResponse!;
}
}
/// <summary>
/// Executor that summarizes emails using an AI agent for long emails.
/// </summary>
internal sealed class EmailSummaryExecutor : Executor<AnalysisResult, AnalysisResult>
{
private readonly AIAgent _emailSummaryAgent;
public EmailSummaryExecutor(AIAgent emailSummaryAgent) : base("EmailSummaryExecutor")
{
this._emailSummaryAgent = emailSummaryAgent;
}
public override async ValueTask<AnalysisResult> HandleAsync(AnalysisResult message, IWorkflowContext context, CancellationToken cancellationToken = default)
{
// Read the email content from shared state
var email = await context.ReadStateAsync<Email>(message.EmailId, scopeName: EmailStateConstants.EmailStateScope);
// Generate summary for long emails
var response = await this._emailSummaryAgent.RunAsync(email!.EmailContent);
var emailSummary = JsonSerializer.Deserialize<EmailSummary>(response.Text);
// Enrich the analysis result with the summary
message.EmailSummary = emailSummary!.Summary;
return message;
}
}
/// <summary>
/// Executor that sends emails.
/// </summary>
internal sealed class SendEmailExecutor : Executor<EmailResponse>
{
public SendEmailExecutor() : base("SendEmailExecutor") { }
public override async ValueTask HandleAsync(EmailResponse message, IWorkflowContext context, CancellationToken cancellationToken = default) =>
await context.YieldOutputAsync($"Email sent: {message.Response}");
}
/// <summary>
/// Executor that handles spam messages.
/// </summary>
internal sealed class HandleSpamExecutor : Executor<AnalysisResult>
{
public HandleSpamExecutor() : base("HandleSpamExecutor") { }
public override async ValueTask HandleAsync(AnalysisResult message, IWorkflowContext context, CancellationToken cancellationToken = default)
{
if (message.spamDecision == SpamDecision.Spam)
{
await context.YieldOutputAsync($"Email marked as spam: {message.Reason}");
}
else
{
throw new ArgumentException("This executor should only handle spam messages.");
}
}
}
/// <summary>
/// Executor that handles uncertain messages requiring manual review.
/// </summary>
internal sealed class HandleUncertainExecutor : Executor<AnalysisResult>
{
public HandleUncertainExecutor() : base("HandleUncertainExecutor") { }
public override async ValueTask HandleAsync(AnalysisResult message, IWorkflowContext context, CancellationToken cancellationToken = default)
{
if (message.spamDecision == SpamDecision.Uncertain)
{
var email = await context.ReadStateAsync<Email>(message.EmailId, scopeName: EmailStateConstants.EmailStateScope);
await context.YieldOutputAsync($"Email marked as uncertain: {message.Reason}. Email content: {email?.EmailContent}");
}
else
{
throw new ArgumentException("This executor should only handle uncertain spam decisions.");
}
}
}
/// <summary>
/// Executor that handles database access with custom events.
/// </summary>
internal sealed class DatabaseAccessExecutor : Executor<AnalysisResult>
{
public DatabaseAccessExecutor() : base("DatabaseAccessExecutor") { }
public override async ValueTask HandleAsync(AnalysisResult message, IWorkflowContext context, CancellationToken cancellationToken = default)
{
// Simulate database operations
await context.ReadStateAsync<Email>(message.EmailId, scopeName: EmailStateConstants.EmailStateScope);
await Task.Delay(100); // Simulate database access delay
// Emit custom database event for monitoring
await context.AddEventAsync(new DatabaseEvent($"Email {message.EmailId} saved to database."));
}
}
Agents IA améliorés
Créez des agents pour l’analyse, l’assistance et la synthèse :
/// <summary>
/// Create an enhanced email analysis agent.
/// </summary>
/// <returns>A ChatClientAgent configured for comprehensive email analysis</returns>
private static ChatClientAgent GetEmailAnalysisAgent(IChatClient chatClient) =>
new(chatClient, new ChatClientAgentOptions(instructions: "You are a spam detection assistant that identifies spam emails.")
{
ChatOptions = new()
{
ResponseFormat = ChatResponseFormat.ForJsonSchema<AnalysisResult>()
}
});
/// <summary>
/// Creates an email assistant agent.
/// </summary>
/// <returns>A ChatClientAgent configured for email assistance</returns>
private static ChatClientAgent GetEmailAssistantAgent(IChatClient chatClient) =>
new(chatClient, new ChatClientAgentOptions(instructions: "You are an email assistant that helps users draft responses to emails with professionalism.")
{
ChatOptions = new()
{
ResponseFormat = ChatResponseFormat.ForJsonSchema<EmailResponse>()
}
});
/// <summary>
/// Creates an agent that summarizes emails.
/// </summary>
/// <returns>A ChatClientAgent configured for email summarization</returns>
private static ChatClientAgent GetEmailSummaryAgent(IChatClient chatClient) =>
new(chatClient, new ChatClientAgentOptions(instructions: "You are an assistant that helps users summarize emails.")
{
ChatOptions = new()
{
ResponseFormat = ChatResponseFormat.ForJsonSchema<EmailSummary>()
}
});
Construction de flux de travail à sélection multiple
Construisez le flux de travail avec un routage sophistiqué et un traitement parallèle :
public static class Program
{
private static async Task Main()
{
// Set up the Azure OpenAI client
var endpoint = Environment.GetEnvironmentVariable("AZURE_OPENAI_ENDPOINT") ?? throw new Exception("AZURE_OPENAI_ENDPOINT is not set.");
var deploymentName = Environment.GetEnvironmentVariable("AZURE_OPENAI_DEPLOYMENT_NAME") ?? "gpt-4o-mini";
var chatClient = new AzureOpenAIClient(new Uri(endpoint), new AzureCliCredential()).GetChatClient(deploymentName).AsIChatClient();
// Create agents
AIAgent emailAnalysisAgent = GetEmailAnalysisAgent(chatClient);
AIAgent emailAssistantAgent = GetEmailAssistantAgent(chatClient);
AIAgent emailSummaryAgent = GetEmailSummaryAgent(chatClient);
// Create executors
var emailAnalysisExecutor = new EmailAnalysisExecutor(emailAnalysisAgent);
var emailAssistantExecutor = new EmailAssistantExecutor(emailAssistantAgent);
var emailSummaryExecutor = new EmailSummaryExecutor(emailSummaryAgent);
var sendEmailExecutor = new SendEmailExecutor();
var handleSpamExecutor = new HandleSpamExecutor();
var handleUncertainExecutor = new HandleUncertainExecutor();
var databaseAccessExecutor = new DatabaseAccessExecutor();
// Build the workflow with multi-selection fan-out
WorkflowBuilder builder = new(emailAnalysisExecutor);
builder.AddFanOutEdge(
emailAnalysisExecutor,
targets: [
handleSpamExecutor, // Index 0: Spam handler
emailAssistantExecutor, // Index 1: Email assistant (always for NotSpam)
emailSummaryExecutor, // Index 2: Summarizer (conditionally for long NotSpam)
handleUncertainExecutor, // Index 3: Uncertain handler
],
targetSelector: GetTargetAssigner()
)
// Email assistant branch
.AddEdge(emailAssistantExecutor, sendEmailExecutor)
// Database persistence: conditional routing
.AddEdge<AnalysisResult>(
emailAnalysisExecutor,
databaseAccessExecutor,
condition: analysisResult => analysisResult?.EmailLength <= EmailProcessingConstants.LongEmailThreshold) // Short emails
.AddEdge(emailSummaryExecutor, databaseAccessExecutor) // Long emails with summary
.WithOutputFrom(handleUncertainExecutor, handleSpamExecutor, sendEmailExecutor);
var workflow = builder.Build();
// Read a moderately long email to trigger both assistant and summarizer
string email = Resources.Read("email.txt");
// Execute the workflow with custom event handling
StreamingRun run = await InProcessExecution.StreamAsync(workflow, new ChatMessage(ChatRole.User, email));
await run.TrySendMessageAsync(new TurnToken(emitEvents: true));
await foreach (WorkflowEvent evt in run.WatchStreamAsync().ConfigureAwait(false))
{
if (evt is WorkflowOutputEvent outputEvent)
{
Console.WriteLine($"Output: {outputEvent}");
}
if (evt is DatabaseEvent databaseEvent)
{
Console.WriteLine($"Database: {databaseEvent}");
}
}
}
}
Comparaison des modèles : Sélection Multiple et Switch-Case
Modèle Switch-Case (précédent) :
// One input → exactly one output
builder.AddSwitch(spamDetectionExecutor, switchBuilder =>
switchBuilder
.AddCase(GetCondition(SpamDecision.NotSpam), emailAssistantExecutor)
.AddCase(GetCondition(SpamDecision.Spam), handleSpamExecutor)
.WithDefault(handleUncertainExecutor)
)
Modèle de sélection multiple :
// One input → one or more outputs (dynamic fan-out)
builder.AddFanOutEdge(
emailAnalysisExecutor,
targets: [handleSpamExecutor, emailAssistantExecutor, emailSummaryExecutor, handleUncertainExecutor],
targetSelector: GetTargetAssigner() // Returns list of target indices
)
Avantages clés des arêtes de sélection multiple
- Traitement parallèle : plusieurs branches peuvent s’exécuter simultanément
- Ventilateur conditionnel : le nombre de cibles varie en fonction du contenu
- Routage prenant en charge le contenu : décisions basées sur les propriétés du message, pas seulement le type
- Utilisation efficace des ressources : seules les branches nécessaires sont activées
- Logique métier complexe : prend en charge des scénarios de routage sophistiqués
Exécution de l’exemple de sélection multiple
Lorsque vous exécutez ce flux de travail avec un e-mail long :
Output: Email sent: [Professional response generated by AI]
Database: Email abc123 saved to database.
Lorsque vous exécutez un e-mail court, le récapitulateur est ignoré :
Output: Email sent: [Professional response generated by AI]
Database: Email def456 saved to database.
Real-World Cas d’usage
- Systèmes de messagerie : acheminer vers l'assistant de réponse ; archivage ; analytique (sous condition)
- Traitement du contenu : déclencher la transcription + traduction + analyse (en fonction du type de contenu)
- Traitement des commandes : itinéraire vers le traitement + facturation + notifications (en fonction des propriétés de commande)
- Pipelines de données : déclencher différents flux d’analyse en fonction des caractéristiques des données
Implémentation complète de la sélection multiple
Pour obtenir l’implémentation de travail complète, consultez cet exemple dans le référentiel Agent Framework.
Au-delà du commutateur : routage à sélection multiple
Les branches switch-case acheminent les messages vers exactement une seule destination, mais les flux de travail réels doivent souvent déclencher plusieurs opérations parallèles en fonction des propriétés des données. Les arêtes partitionnées (implémentées en tant que groupes d'arêtes à sélection multiple) permettent des modèles de fan-out sophistiqués où un seul message peut activer simultanément plusieurs exécuteurs en aval.
Flux de travail avancé de traitement des e-mails
En se basant sur l’exemple de cas de commutateur, vous allez créer un système de traitement de courrier amélioré qui illustre une logique de routage sophistiquée :
- Courrier indésirable → gestionnaire de courrier indésirable unique (par exemple, switch-case)
- E-mails légitimes → Déclencher toujours l'assistant e-mail + Déclencher conditionnellement le résumeur pour les e-mails longs
- E-mails incertains → gestionnaire incertain unique (par exemple, switch-case)
- Persistance de base de données → déclenchée pour les e-mails courts et les e-mails longs résumés
Ce modèle permet des pipelines de traitement parallèles qui s’adaptent aux caractéristiques de contenu.
Concepts abordés
Modèles de données améliorés pour la sélection multiple
Étendez les modèles de données pour prendre en charge l’analyse de longueur des e-mails et la synthèse :
class AnalysisResultAgent(BaseModel):
"""Enhanced structured output from email analysis agent."""
spam_decision: Literal["NotSpam", "Spam", "Uncertain"]
reason: str
class EmailResponse(BaseModel):
"""Response from email assistant."""
response: str
class EmailSummaryModel(BaseModel):
"""Summary generated by email summary agent."""
summary: str
@dataclass
class AnalysisResult:
"""Internal analysis result with email metadata for routing decisions."""
spam_decision: str
reason: str
email_length: int # Used for conditional routing
email_summary: str # Populated by summary agent
email_id: str
@dataclass
class Email:
"""Email content stored in shared state."""
email_id: str
email_content: str
# Custom event for database operations
class DatabaseEvent(WorkflowEvent):
"""Custom event for tracking database operations."""
pass
Fonction de sélection : Cœur de la sélection multiple
La fonction de sélection détermine quels exécuteurs doivent recevoir chaque message :
LONG_EMAIL_THRESHOLD = 100
def select_targets(analysis: AnalysisResult, target_ids: list[str]) -> list[str]:
"""Intelligent routing based on spam decision and email characteristics."""
# Target order: [handle_spam, submit_to_email_assistant, summarize_email, handle_uncertain]
handle_spam_id, submit_to_email_assistant_id, summarize_email_id, handle_uncertain_id = target_ids
if analysis.spam_decision == "Spam":
# Route only to spam handler
return [handle_spam_id]
elif analysis.spam_decision == "NotSpam":
# Always route to email assistant
targets = [submit_to_email_assistant_id]
# Conditionally add summarizer for long emails
if analysis.email_length > LONG_EMAIL_THRESHOLD:
targets.append(summarize_email_id)
return targets
else: # Uncertain
# Route only to uncertain handler
return [handle_uncertain_id]
Fonctionnalités clés des fonctions de sélection
- Sélection cible dynamique : retourne une liste d’ID d’exécuteur à activer
- Routage prenant en charge le contenu : prend des décisions basées sur les propriétés du message
- Traitement parallèle : plusieurs cibles peuvent s’exécuter simultanément
- Logique conditionnelle : branchement complexe basé sur plusieurs critères
Exécuteurs de flux de travail à sélection multiple
Implémentez des exécuteurs qui gèrent l’analyse et le routage améliorés :
EMAIL_STATE_PREFIX = "email:"
CURRENT_EMAIL_ID_KEY = "current_email_id"
@executor(id="store_email")
async def store_email(email_text: str, ctx: WorkflowContext[AgentExecutorRequest]) -> None:
"""Store email and initiate analysis."""
new_email = Email(email_id=str(uuid4()), email_content=email_text)
await ctx.set_shared_state(f"{EMAIL_STATE_PREFIX}{new_email.email_id}", new_email)
await ctx.set_shared_state(CURRENT_EMAIL_ID_KEY, new_email.email_id)
await ctx.send_message(
AgentExecutorRequest(messages=[ChatMessage(Role.USER, text=new_email.email_content)], should_respond=True)
)
@executor(id="to_analysis_result")
async def to_analysis_result(response: AgentExecutorResponse, ctx: WorkflowContext[AnalysisResult]) -> None:
"""Transform agent response into enriched analysis result."""
parsed = AnalysisResultAgent.model_validate_json(response.agent_run_response.text)
email_id: str = await ctx.get_shared_state(CURRENT_EMAIL_ID_KEY)
email: Email = await ctx.get_shared_state(f"{EMAIL_STATE_PREFIX}{email_id}")
# Create enriched analysis result with email length for routing decisions
await ctx.send_message(
AnalysisResult(
spam_decision=parsed.spam_decision,
reason=parsed.reason,
email_length=len(email.email_content), # Key for conditional routing
email_summary="",
email_id=email_id,
)
)
@executor(id="submit_to_email_assistant")
async def submit_to_email_assistant(analysis: AnalysisResult, ctx: WorkflowContext[AgentExecutorRequest]) -> None:
"""Handle legitimate emails by forwarding to email assistant."""
if analysis.spam_decision != "NotSpam":
raise RuntimeError("This executor should only handle NotSpam messages.")
email: Email = await ctx.get_shared_state(f"{EMAIL_STATE_PREFIX}{analysis.email_id}")
await ctx.send_message(
AgentExecutorRequest(messages=[ChatMessage(Role.USER, text=email.email_content)], should_respond=True)
)
@executor(id="finalize_and_send")
async def finalize_and_send(response: AgentExecutorResponse, ctx: WorkflowContext[Never, str]) -> None:
"""Final step for email assistant branch."""
parsed = EmailResponse.model_validate_json(response.agent_run_response.text)
await ctx.yield_output(f"Email sent: {parsed.response}")
@executor(id="summarize_email")
async def summarize_email(analysis: AnalysisResult, ctx: WorkflowContext[AgentExecutorRequest]) -> None:
"""Generate summary for long emails (parallel branch)."""
# Only called for long NotSpam emails by selection function
email: Email = await ctx.get_shared_state(f"{EMAIL_STATE_PREFIX}{analysis.email_id}")
await ctx.send_message(
AgentExecutorRequest(messages=[ChatMessage(Role.USER, text=email.email_content)], should_respond=True)
)
@executor(id="merge_summary")
async def merge_summary(response: AgentExecutorResponse, ctx: WorkflowContext[AnalysisResult]) -> None:
"""Merge summary back into analysis result for database persistence."""
summary = EmailSummaryModel.model_validate_json(response.agent_run_response.text)
email_id: str = await ctx.get_shared_state(CURRENT_EMAIL_ID_KEY)
email: Email = await ctx.get_shared_state(f"{EMAIL_STATE_PREFIX}{email_id}")
# Create analysis result with summary for database storage
await ctx.send_message(
AnalysisResult(
spam_decision="NotSpam",
reason="",
email_length=len(email.email_content),
email_summary=summary.summary, # Now includes summary
email_id=email_id,
)
)
@executor(id="handle_spam")
async def handle_spam(analysis: AnalysisResult, ctx: WorkflowContext[Never, str]) -> None:
"""Handle spam emails (single target like switch-case)."""
if analysis.spam_decision == "Spam":
await ctx.yield_output(f"Email marked as spam: {analysis.reason}")
else:
raise RuntimeError("This executor should only handle Spam messages.")
@executor(id="handle_uncertain")
async def handle_uncertain(analysis: AnalysisResult, ctx: WorkflowContext[Never, str]) -> None:
"""Handle uncertain emails (single target like switch-case)."""
if analysis.spam_decision == "Uncertain":
email: Email | None = await ctx.get_shared_state(f"{EMAIL_STATE_PREFIX}{analysis.email_id}")
await ctx.yield_output(
f"Email marked as uncertain: {analysis.reason}. Email content: {getattr(email, 'email_content', '')}"
)
else:
raise RuntimeError("This executor should only handle Uncertain messages.")
@executor(id="database_access")
async def database_access(analysis: AnalysisResult, ctx: WorkflowContext[Never, str]) -> None:
"""Simulate database persistence with custom events."""
await asyncio.sleep(0.05) # Simulate DB operation
await ctx.add_event(DatabaseEvent(f"Email {analysis.email_id} saved to database."))
Agents IA améliorés
Créez des agents pour l’analyse, l’assistance et la synthèse :
async def main() -> None:
chat_client = AzureOpenAIChatClient(credential=AzureCliCredential())
# Enhanced analysis agent
email_analysis_agent = AgentExecutor(
chat_client.create_agent(
instructions=(
"You are a spam detection assistant that identifies spam emails. "
"Always return JSON with fields 'spam_decision' (one of NotSpam, Spam, Uncertain) "
"and 'reason' (string)."
),
response_format=AnalysisResultAgent,
),
id="email_analysis_agent",
)
# Email assistant (same as before)
email_assistant_agent = AgentExecutor(
chat_client.create_agent(
instructions=(
"You are an email assistant that helps users draft responses to emails with professionalism."
),
response_format=EmailResponse,
),
id="email_assistant_agent",
)
# New: Email summary agent for long emails
email_summary_agent = AgentExecutor(
chat_client.create_agent(
instructions="You are an assistant that helps users summarize emails.",
response_format=EmailSummaryModel,
),
id="email_summary_agent",
)
Générer un flux de travail de sélection multiple
Construisez le flux de travail avec un routage sophistiqué et un traitement parallèle :
workflow = (
WorkflowBuilder()
.set_start_executor(store_email)
.add_edge(store_email, email_analysis_agent)
.add_edge(email_analysis_agent, to_analysis_result)
# Multi-selection edge group: intelligent fan-out based on content
.add_multi_selection_edge_group(
to_analysis_result,
[handle_spam, submit_to_email_assistant, summarize_email, handle_uncertain],
selection_func=select_targets,
)
# Email assistant branch (always for NotSpam)
.add_edge(submit_to_email_assistant, email_assistant_agent)
.add_edge(email_assistant_agent, finalize_and_send)
# Summary branch (only for long NotSpam emails)
.add_edge(summarize_email, email_summary_agent)
.add_edge(email_summary_agent, merge_summary)
# Database persistence: conditional routing
.add_edge(to_analysis_result, database_access,
condition=lambda r: r.email_length <= LONG_EMAIL_THRESHOLD) # Short emails
.add_edge(merge_summary, database_access) # Long emails with summary
.build()
)
Exécution avec la diffusion en continu d’événements
Exécutez le flux de travail et observez l’exécution parallèle par le biais d’événements personnalisés :
# Use a moderately long email to trigger both assistant and summarizer
email = """
Hello team, here are the updates for this week:
1. Project Alpha is on track and we should have the first milestone completed by Friday.
2. The client presentation has been scheduled for next Tuesday at 2 PM.
3. Please review the Q4 budget allocation and provide feedback by Wednesday.
Let me know if you have any questions or concerns.
Best regards,
Alex
"""
# Stream events to see parallel execution
async for event in workflow.run_stream(email):
if isinstance(event, DatabaseEvent):
print(f"Database: {event}")
elif isinstance(event, WorkflowOutputEvent):
print(f"Output: {event.data}")
Sélection Multiple versus *Switch-Case* Comparaison
Modèle Switch-Case (précédent) :
# One input → exactly one output
.add_switch_case_edge_group(
source,
[
Case(condition=lambda x: x.result == "A", target=handler_a),
Case(condition=lambda x: x.result == "B", target=handler_b),
Default(target=handler_c),
],
)
Modèle de sélection multiple :
# One input → one or more outputs (dynamic fan-out)
.add_multi_selection_edge_group(
source,
[handler_a, handler_b, handler_c, handler_d],
selection_func=intelligent_router, # Returns list of target IDs
)
Avantages de la sélection multiple en C#
- Traitement parallèle : plusieurs branches peuvent s’exécuter simultanément
- Ventilateur conditionnel : le nombre de cibles varie en fonction du contenu
- Routage prenant en charge le contenu : décisions basées sur les propriétés du message, pas seulement le type
- Utilisation efficace des ressources : seules les branches nécessaires sont activées
- Logique métier complexe : prend en charge des scénarios de routage sophistiqués
C# Applications Pratiques
- Systèmes de messagerie : acheminer vers l'assistant de réponse ; archivage ; analytique (sous condition)
- Traitement du contenu : déclencher la transcription + traduction + analyse (en fonction du type de contenu)
- Traitement des commandes : itinéraire vers le traitement + facturation + notifications (en fonction des propriétés de commande)
- Pipelines de données : déclencher différents flux d’analyse en fonction des caractéristiques des données
Exemple de code de sélection multiple
Pour obtenir l’implémentation de travail complète, consultez l’exemple multi_selection_edge_group.py dans le référentiel Agent Framework.