Partager via


Créer un flux de travail avec une logique de branchement

Dans ce tutoriel, vous allez apprendre à créer un flux de travail avec une logique de branchement à l’aide d’Agent Framework. La logique de branchement permet à votre flux de travail de prendre des décisions en fonction de certaines conditions, ce qui permet un comportement plus complexe et dynamique.

Arêtes conditionnelles

Les arêtes conditionnelles permettent à votre flux de travail de prendre des décisions de routage en fonction du contenu ou des propriétés des messages qui transitent par le flux de travail. Cela permet la branche dynamique où différents chemins d’exécution sont pris en fonction des conditions d’exécution.

Ce que vous allez construire

Vous allez créer un flux de travail de traitement de courrier qui illustre le routage conditionnel :

  • Agent de détection de courrier indésirable qui analyse les e-mails entrants et retourne un JSON structuré.
  • Arêtes conditionnelles qui acheminent les e-mails vers différents gestionnaires en fonction de la classification.
  • Gestionnaire de messagerie légitime qui rédige des réponses professionnelles.
  • Gestionnaire de courrier indésirable qui marque les e-mails suspects.
  • Gestion de l’état partagé pour conserver les données de messagerie entre les étapes du flux de travail.

Concepts abordés

Prerequisites

Installer les packages NuGet

Tout d’abord, installez les packages requis pour votre projet .NET :

dotnet add package Azure.AI.OpenAI --prerelease
dotnet add package Azure.Identity
dotnet add package Microsoft.Agents.AI.Workflows --prerelease
dotnet add package Microsoft.Extensions.AI.OpenAI --prerelease

Définir des modèles de données

Commencez par définir les structures de données qui transitent par votre flux de travail :

using System.Text.Json.Serialization;

/// <summary>
/// Represents the result of spam detection.
/// </summary>
public sealed class DetectionResult
{
    [JsonPropertyName("is_spam")]
    public bool IsSpam { get; set; }

    [JsonPropertyName("reason")]
    public string Reason { get; set; } = string.Empty;

    // Email ID is generated by the executor, not the agent
    [JsonIgnore]
    public string EmailId { get; set; } = string.Empty;
}

/// <summary>
/// Represents an email.
/// </summary>
internal sealed class Email
{
    [JsonPropertyName("email_id")]
    public string EmailId { get; set; } = string.Empty;

    [JsonPropertyName("email_content")]
    public string EmailContent { get; set; } = string.Empty;
}

/// <summary>
/// Represents the response from the email assistant.
/// </summary>
public sealed class EmailResponse
{
    [JsonPropertyName("response")]
    public string Response { get; set; } = string.Empty;
}

/// <summary>
/// Constants for shared state scopes.
/// </summary>
internal static class EmailStateConstants
{
    public const string EmailStateScope = "EmailState";
}

Créer des fonctions de condition

La fonction condition évalue le résultat de détection du courrier indésirable pour déterminer le chemin d’accès que le flux de travail doit prendre :

/// <summary>
/// Creates a condition for routing messages based on the expected spam detection result.
/// </summary>
/// <param name="expectedResult">The expected spam detection result</param>
/// <returns>A function that evaluates whether a message meets the expected result</returns>
private static Func<object?, bool> GetCondition(bool expectedResult) =>
    detectionResult => detectionResult is DetectionResult result && result.IsSpam == expectedResult;

Cette fonction conditionnelle :

  • Prend un bool expectedResult paramètre (vrai pour spam, faux pour non-spam)
  • Retourne une fonction qui peut être utilisée comme condition de périphérie
  • Vérifie en toute sécurité si le message est un DetectionResult et compare la IsSpam propriété

Créer des agents IA

Configurez les agents IA qui gèreront la détection de courrier indésirable et l’assistance par e-mail :

using Azure.AI.OpenAI;
using Azure.Identity;
using Microsoft.Agents.AI;
using Microsoft.Extensions.AI;

/// <summary>
/// Creates a spam detection agent.
/// </summary>
/// <returns>A ChatClientAgent configured for spam detection</returns>
private static ChatClientAgent GetSpamDetectionAgent(IChatClient chatClient) =>
    new(chatClient, new ChatClientAgentOptions(instructions: "You are a spam detection assistant that identifies spam emails.")
    {
        ChatOptions = new()
        {
            ResponseFormat = ChatResponseFormat.ForJsonSchema(AIJsonUtilities.CreateJsonSchema(typeof(DetectionResult)))
        }
    });

/// <summary>
/// Creates an email assistant agent.
/// </summary>
/// <returns>A ChatClientAgent configured for email assistance</returns>
private static ChatClientAgent GetEmailAssistantAgent(IChatClient chatClient) =>
    new(chatClient, new ChatClientAgentOptions(instructions: "You are an email assistant that helps users draft professional responses to emails.")
    {
        ChatOptions = new()
        {
            ResponseFormat = ChatResponseFormat.ForJsonSchema(AIJsonUtilities.CreateJsonSchema(typeof(EmailResponse)))
        }
    });

Implémenter des exécuteurs

Créez les exécuteurs de flux de travail qui gèrent différentes étapes du traitement des e-mails :

using Microsoft.Agents.AI.Workflows;
using System.Text.Json;

/// <summary>
/// Executor that detects spam using an AI agent.
/// </summary>
internal sealed class SpamDetectionExecutor : Executor<ChatMessage, DetectionResult>
{
    private readonly AIAgent _spamDetectionAgent;

    public SpamDetectionExecutor(AIAgent spamDetectionAgent) : base("SpamDetectionExecutor")
    {
        this._spamDetectionAgent = spamDetectionAgent;
    }

    public override async ValueTask<DetectionResult> HandleAsync(ChatMessage message, IWorkflowContext context, CancellationToken cancellationToken = default)
    {
        // Generate a random email ID and store the email content to shared state
        var newEmail = new Email
        {
            EmailId = Guid.NewGuid().ToString("N"),
            EmailContent = message.Text
        };
        await context.QueueStateUpdateAsync(newEmail.EmailId, newEmail, scopeName: EmailStateConstants.EmailStateScope);

        // Invoke the agent for spam detection
        var response = await this._spamDetectionAgent.RunAsync(message);
        var detectionResult = JsonSerializer.Deserialize<DetectionResult>(response.Text);

        detectionResult!.EmailId = newEmail.EmailId;
        return detectionResult;
    }
}

/// <summary>
/// Executor that assists with email responses using an AI agent.
/// </summary>
internal sealed class EmailAssistantExecutor : Executor<DetectionResult, EmailResponse>
{
    private readonly AIAgent _emailAssistantAgent;

    public EmailAssistantExecutor(AIAgent emailAssistantAgent) : base("EmailAssistantExecutor")
    {
        this._emailAssistantAgent = emailAssistantAgent;
    }

    public override async ValueTask<EmailResponse> HandleAsync(DetectionResult message, IWorkflowContext context, CancellationToken cancellationToken = default)
    {
        if (message.IsSpam)
        {
            throw new ArgumentException("This executor should only handle non-spam messages.");
        }

        // Retrieve the email content from shared state
        var email = await context.ReadStateAsync<Email>(message.EmailId, scopeName: EmailStateConstants.EmailStateScope)
            ?? throw new InvalidOperationException("Email not found.");

        // Invoke the agent to draft a response
        var response = await this._emailAssistantAgent.RunAsync(email.EmailContent);
        var emailResponse = JsonSerializer.Deserialize<EmailResponse>(response.Text);

        return emailResponse!;
    }
}

/// <summary>
/// Executor that sends emails.
/// </summary>
internal sealed class SendEmailExecutor : Executor<EmailResponse>
{
    public SendEmailExecutor() : base("SendEmailExecutor") { }

    public override async ValueTask HandleAsync(EmailResponse message, IWorkflowContext context, CancellationToken cancellationToken = default) =>
        await context.YieldOutputAsync($"Email sent: {message.Response}");
}

/// <summary>
/// Executor that handles spam messages.
/// </summary>
internal sealed class HandleSpamExecutor : Executor<DetectionResult>
{
    public HandleSpamExecutor() : base("HandleSpamExecutor") { }

    public override async ValueTask HandleAsync(DetectionResult message, IWorkflowContext context, CancellationToken cancellationToken = default)
    {
        if (message.IsSpam)
        {
            await context.YieldOutputAsync($"Email marked as spam: {message.Reason}");
        }
        else
        {
            throw new ArgumentException("This executor should only handle spam messages.");
        }
    }
}

Générer le flux de travail avec des arêtes conditionnelles

Créez maintenant le programme principal qui génère et exécute le flux de travail :

using Microsoft.Extensions.AI;

public static class Program
{
    private static async Task Main()
    {
        // Set up the Azure OpenAI client
        var endpoint = Environment.GetEnvironmentVariable("AZURE_OPENAI_ENDPOINT")
            ?? throw new Exception("AZURE_OPENAI_ENDPOINT is not set.");
        var deploymentName = Environment.GetEnvironmentVariable("AZURE_OPENAI_DEPLOYMENT_NAME") ?? "gpt-4o-mini";
        var chatClient = new AzureOpenAIClient(new Uri(endpoint), new AzureCliCredential())
            .GetChatClient(deploymentName).AsIChatClient();

        // Create agents
        AIAgent spamDetectionAgent = GetSpamDetectionAgent(chatClient);
        AIAgent emailAssistantAgent = GetEmailAssistantAgent(chatClient);

        // Create executors
        var spamDetectionExecutor = new SpamDetectionExecutor(spamDetectionAgent);
        var emailAssistantExecutor = new EmailAssistantExecutor(emailAssistantAgent);
        var sendEmailExecutor = new SendEmailExecutor();
        var handleSpamExecutor = new HandleSpamExecutor();

        // Build the workflow with conditional edges
        var workflow = new WorkflowBuilder(spamDetectionExecutor)
            // Non-spam path: route to email assistant when IsSpam = false
            .AddEdge(spamDetectionExecutor, emailAssistantExecutor, condition: GetCondition(expectedResult: false))
            .AddEdge(emailAssistantExecutor, sendEmailExecutor)
            // Spam path: route to spam handler when IsSpam = true
            .AddEdge(spamDetectionExecutor, handleSpamExecutor, condition: GetCondition(expectedResult: true))
            .WithOutputFrom(handleSpamExecutor, sendEmailExecutor)
            .Build();

        // Execute the workflow with sample spam email
        string emailContent = "Congratulations! You've won $1,000,000! Click here to claim your prize now!";
        StreamingRun run = await InProcessExecution.StreamAsync(workflow, new ChatMessage(ChatRole.User, emailContent));
        await run.TrySendMessageAsync(new TurnToken(emitEvents: true));

        await foreach (WorkflowEvent evt in run.WatchStreamAsync().ConfigureAwait(false))
        {
            if (evt is WorkflowOutputEvent outputEvent)
            {
                Console.WriteLine($"{outputEvent}");
            }
        }
    }
}

Fonctionnement

  1. Entrée de flux de travail : le flux de travail commence par spamDetectionExecutor recevoir un ChatMessage.

  2. Analyse du courrier indésirable : l'agent de détection de courrier indésirable analyse l’e-mail et retourne une structure DetectionResult avec des propriétés IsSpam et Reason.

  3. Routage conditionnel : en fonction de la IsSpam valeur :

    • Si spam (IsSpam = true) : itinéraires vers HandleSpamExecutor en utilisant GetCondition(true)
    • S’il est légitime (IsSpam = false) : Itinéraires vers EmailAssistantExecutor utilisant GetCondition(false)
  4. Génération de réponse : pour les e-mails légitimes, l’Assistant e-mail rédige une réponse professionnelle.

  5. Sortie finale : le flux de travail génère une notification de courrier indésirable ou envoie la réponse par e-mail brouillon.

Fonctionnalités clés des arêtes conditionnelles

  1. Type-Safe Conditions : la GetCondition méthode crée des fonctions de condition réutilisables qui évaluent en toute sécurité le contenu du message.

  2. Chemins multiples : un seul exécuteur peut avoir plusieurs arêtes sortantes avec différentes conditions, ce qui permet une logique de branchement complexe.

  3. État partagé : les données de messagerie sont conservées entre les exécuteurs à l’aide de la gestion d’état délimitée, ce qui permet aux exécuteurs en aval d’accéder au contenu d’origine.

  4. Gestion des erreurs : les exécuteurs valident leurs entrées et lèvent des exceptions significatives lors de la réception de types de messages inattendus.

  5. Architecture propre : chaque exécuteur a une responsabilité unique, ce qui rend le workflow maintenable et testable.

Exécution de l’exemple

Lorsque vous exécutez ce flux de travail avec l’exemple d’e-mail de courrier indésirable :

Email marked as spam: This email contains common spam indicators including monetary prizes, urgency tactics, and suspicious links that are typical of phishing attempts.

Essayez de modifier le contenu de l’e-mail en quelque chose de légitime :

string emailContent = "Hi, I wanted to follow up on our meeting yesterday and get your thoughts on the project proposal.";

Le flux de travail sera acheminé vers l’Assistant e-mail et générera une réponse professionnelle.

Ce modèle de routage conditionnel constitue la base de la création de flux de travail sophistiqués qui peuvent gérer des arbres de décision complexes et une logique métier.

Implémentation complète

Pour obtenir l’implémentation de travail complète, consultez cet exemple dans le référentiel Agent Framework.

Ce que vous allez construire

Vous allez créer un flux de travail de traitement de courrier qui illustre le routage conditionnel :

  • Agent de détection de courrier indésirable qui analyse les e-mails entrants
  • Arêtes conditionnelles qui acheminent les e-mails vers différents gestionnaires en fonction de la classification
  • Gestionnaire de messagerie légitime qui rédige des réponses professionnelles
  • Gestionnaire de courrier indésirable qui marque les e-mails suspects

Concepts abordés

Prerequisites

  • Python 3.10 ou version ultérieure
  • Agent Framework installé : pip install agent-framework-core --pre
  • Service Azure OpenAI configuré avec des variables d’environnement appropriées
  • Authentification Azure CLI : az login

Étape 1 : Importer les dépendances requises

Commencez par importer les composants nécessaires pour les flux de travail conditionnels :

import asyncio
import os
from dataclasses import dataclass
from typing import Any, Literal
from uuid import uuid4

from typing_extensions import Never

from agent_framework import (
    AgentExecutor,
    AgentExecutorRequest,
    AgentExecutorResponse,
    ChatMessage,
    Role,
    WorkflowBuilder,
    WorkflowContext,
    executor,
    Case,
    Default,
)
from agent_framework.azure import AzureOpenAIChatClient
from azure.identity import AzureCliCredential
from pydantic import BaseModel

Étape 2 : Définir des modèles de données

Créez des modèles Pydantic pour l’échange de données structurées entre les composants de flux de travail :

class DetectionResult(BaseModel):
    """Represents the result of spam detection."""
    # is_spam drives the routing decision taken by edge conditions
    is_spam: bool
    # Human readable rationale from the detector
    reason: str
    # The agent must include the original email so downstream agents can operate without reloading content
    email_content: str


class EmailResponse(BaseModel):
    """Represents the response from the email assistant."""
    # The drafted reply that a user could copy or send
    response: str

Étape 3 : Créer des fonctions de condition

Définissez les fonctions de condition qui déterminent les décisions de routage :

def get_condition(expected_result: bool):
    """Create a condition callable that routes based on DetectionResult.is_spam."""

    # The returned function will be used as an edge predicate.
    # It receives whatever the upstream executor produced.
    def condition(message: Any) -> bool:
        # Defensive guard. If a non AgentExecutorResponse appears, let the edge pass to avoid dead ends.
        if not isinstance(message, AgentExecutorResponse):
            return True

        try:
            # Prefer parsing a structured DetectionResult from the agent JSON text.
            # Using model_validate_json ensures type safety and raises if the shape is wrong.
            detection = DetectionResult.model_validate_json(message.agent_run_response.text)
            # Route only when the spam flag matches the expected path.
            return detection.is_spam == expected_result
        except Exception:
            # Fail closed on parse errors so we do not accidentally route to the wrong path.
            # Returning False prevents this edge from activating.
            return False

    return condition

Étape 4 : Créer des gestionnaires de tâches

Définissez des exécuteurs pour gérer différents résultats de routage :

@executor(id="send_email")
async def handle_email_response(response: AgentExecutorResponse, ctx: WorkflowContext[Never, str]) -> None:
    """Handle legitimate emails by drafting a professional response."""
    # Downstream of the email assistant. Parse a validated EmailResponse and yield the workflow output.
    email_response = EmailResponse.model_validate_json(response.agent_run_response.text)
    await ctx.yield_output(f"Email sent:\n{email_response.response}")


@executor(id="handle_spam")
async def handle_spam_classifier_response(response: AgentExecutorResponse, ctx: WorkflowContext[Never, str]) -> None:
    """Handle spam emails by marking them appropriately."""
    # Spam path. Confirm the DetectionResult and yield the workflow output. Guard against accidental non spam input.
    detection = DetectionResult.model_validate_json(response.agent_run_response.text)
    if detection.is_spam:
        await ctx.yield_output(f"Email marked as spam: {detection.reason}")
    else:
        # This indicates the routing predicate and executor contract are out of sync.
        raise RuntimeError("This executor should only handle spam messages.")


@executor(id="to_email_assistant_request")
async def to_email_assistant_request(
    response: AgentExecutorResponse, ctx: WorkflowContext[AgentExecutorRequest]
) -> None:
    """Transform spam detection response into a request for the email assistant."""
    # Parse the detection result and extract the email content for the assistant
    detection = DetectionResult.model_validate_json(response.agent_run_response.text)

    # Create a new request for the email assistant with the original email content
    request = AgentExecutorRequest(
        messages=[ChatMessage(Role.USER, text=detection.email_content)],
        should_respond=True
    )
    await ctx.send_message(request)

Étape 5 : Créer des agents IA

Configurez les agents Azure OpenAI avec une mise en forme de sortie structurée :

async def main() -> None:
    # Create agents
    # AzureCliCredential uses your current az login. This avoids embedding secrets in code.
    chat_client = AzureOpenAIChatClient(credential=AzureCliCredential())

    # Agent 1. Classifies spam and returns a DetectionResult object.
    # response_format enforces that the LLM returns parsable JSON for the Pydantic model.
    spam_detection_agent = AgentExecutor(
        chat_client.create_agent(
            instructions=(
                "You are a spam detection assistant that identifies spam emails. "
                "Always return JSON with fields is_spam (bool), reason (string), and email_content (string). "
                "Include the original email content in email_content."
            ),
            response_format=DetectionResult,
        ),
        id="spam_detection_agent",
    )

    # Agent 2. Drafts a professional reply. Also uses structured JSON output for reliability.
    email_assistant_agent = AgentExecutor(
        chat_client.create_agent(
            instructions=(
                "You are an email assistant that helps users draft professional responses to emails. "
                "Your input might be a JSON object that includes 'email_content'; base your reply on that content. "
                "Return JSON with a single field 'response' containing the drafted reply."
            ),
            response_format=EmailResponse,
        ),
        id="email_assistant_agent",
    )

Étape 6 : Générer le flux de travail conditionnel

Créez un flux de travail avec des arêtes conditionnelles qui routent en fonction des résultats de détection de courrier indésirable :

    # Build the workflow graph.
    # Start at the spam detector.
    # If not spam, hop to a transformer that creates a new AgentExecutorRequest,
    # then call the email assistant, then finalize.
    # If spam, go directly to the spam handler and finalize.
    workflow = (
        WorkflowBuilder()
        .set_start_executor(spam_detection_agent)
        # Not spam path: transform response -> request for assistant -> assistant -> send email
        .add_edge(spam_detection_agent, to_email_assistant_request, condition=get_condition(False))
        .add_edge(to_email_assistant_request, email_assistant_agent)
        .add_edge(email_assistant_agent, handle_email_response)
        # Spam path: send to spam handler
        .add_edge(spam_detection_agent, handle_spam_classifier_response, condition=get_condition(True))
        .build()
    )

Étape 7 : Exécuter le flux de travail

Exécutez le flux de travail avec des exemples de contenu de messagerie :

    # Read Email content from the sample resource file.
    # This keeps the sample deterministic since the model sees the same email every run.
    email_path = os.path.join(os.path.dirname(os.path.dirname(os.path.realpath(__file__))), "resources", "email.txt")

    with open(email_path) as email_file:  # noqa: ASYNC230
        email = email_file.read()

    # Execute the workflow. Since the start is an AgentExecutor, pass an AgentExecutorRequest.
    # The workflow completes when it becomes idle (no more work to do).
    request = AgentExecutorRequest(messages=[ChatMessage(Role.USER, text=email)], should_respond=True)
    events = await workflow.run(request)
    outputs = events.get_outputs()
    if outputs:
        print(f"Workflow output: {outputs[0]}")


if __name__ == "__main__":
    asyncio.run(main())

Fonctionnement des arêtes conditionnelles

  1. Fonctions de condition : la get_condition() fonction crée un prédicat qui examine le contenu du message et retourne True ou False pour déterminer si le bord doit être parcouru.

  2. Inspection des messages : les conditions peuvent évaluer n’importe quel aspect du message, y compris les données structurées issues des réponses de l’agent, traitées avec des modèles Pydantic.

  3. Programmation défensive : la fonction condition inclut la gestion des erreurs pour empêcher les échecs de routage lors de l’analyse des données structurées.

  4. Routage dynamique : en fonction du résultat de la détection du courrier indésirable, les e-mails sont automatiquement acheminés vers l’Assistant e-mail (pour les e-mails légitimes) ou le gestionnaire de courrier indésirable (pour les e-mails suspects).

Concepts clés

  • Conditions de périphérie : prédicats booléens qui déterminent si un bord doit être parcouru
  • Sorties structurées : L’utilisation de modèles Pydantic garantit response_format l’analyse fiable des données
  • Routage défensif : les fonctions de condition gèrent les cas de périphérie pour empêcher les blocages du flux de travail
  • Transformation de message : les exécuteurs peuvent transformer les types de messages entre les étapes du flux de travail

Implémentation complète

Pour obtenir l’implémentation de travail complète, consultez l’exemple edge_condition.py dans le référentiel Agent Framework.

Arêtes de Switch-Case

Développement sur des bases conditionnelles

L’exemple précédent d’arêtes conditionnelles a montré un routage bidirectionnel (courrier indésirable et e-mails légitimes). Toutefois, de nombreux scénarios réels nécessitent des arbres de décision plus sophistiqués. Les arêtes switch-case offrent une solution plus propre et plus facile à gérer lorsque vous devez acheminer vers plusieurs destinations en fonction de différentes conditions.

Ce que vous allez créer avec Switch-Case

Vous allez étendre le flux de travail de traitement des e-mails pour gérer trois chemins de décision :

  • NotSpam → L’Assistant Courrier → Envoyer un e-mail
  • Courrier indésirable → gérer l’exécuteur de courrier indésirable
  • Incertain → Gérer l’exécuteur incertain (cas par défaut)

L’amélioration clé consiste à utiliser le SwitchBuilder modèle au lieu de plusieurs bords conditionnels individuels, ce qui facilite la compréhension et la maintenance du flux de travail à mesure que la complexité de la décision augmente.

Concepts abordés

Modèles de données pour Switch-Case

Mettez à jour vos modèles de données pour soutenir la classification tridirectionnelle.

/// <summary>
/// Represents the possible decisions for spam detection.
/// </summary>
public enum SpamDecision
{
    NotSpam,
    Spam,
    Uncertain
}

/// <summary>
/// Represents the result of spam detection with enhanced decision support.
/// </summary>
public sealed class DetectionResult
{
    [JsonPropertyName("spam_decision")]
    [JsonConverter(typeof(JsonStringEnumConverter))]
    public SpamDecision spamDecision { get; set; }

    [JsonPropertyName("reason")]
    public string Reason { get; set; } = string.Empty;

    // Email ID is generated by the executor, not the agent
    [JsonIgnore]
    public string EmailId { get; set; } = string.Empty;
}

/// <summary>
/// Represents an email stored in shared state.
/// </summary>
internal sealed class Email
{
    [JsonPropertyName("email_id")]
    public string EmailId { get; set; } = string.Empty;

    [JsonPropertyName("email_content")]
    public string EmailContent { get; set; } = string.Empty;
}

/// <summary>
/// Represents the response from the email assistant.
/// </summary>
public sealed class EmailResponse
{
    [JsonPropertyName("response")]
    public string Response { get; set; } = string.Empty;
}

/// <summary>
/// Constants for shared state scopes.
/// </summary>
internal static class EmailStateConstants
{
    public const string EmailStateScope = "EmailState";
}

Fabrique de conditions pour Switch-Case

Créez une fabrique de conditions réutilisables qui génère des prédicats pour chaque décision de courrier indésirable :

/// <summary>
/// Creates a condition for routing messages based on the expected spam detection result.
/// </summary>
/// <param name="expectedDecision">The expected spam detection decision</param>
/// <returns>A function that evaluates whether a message meets the expected result</returns>
private static Func<object?, bool> GetCondition(SpamDecision expectedDecision) =>
    detectionResult => detectionResult is DetectionResult result && result.spamDecision == expectedDecision;

Cette approche usinée :

  • Réduit la duplication de code : une fonction génère tous les prédicats de condition
  • Garantit la cohérence : toutes les conditions suivent le même modèle
  • Simplifie la maintenance : les modifications apportées à la logique de condition se produisent à un seul endroit

Agent IA amélioré

Mettez à jour l'agent de détection du spam pour qu'il soit moins sûr et afin de retourner des classifications tridirectionnelles.

/// <summary>
/// Creates a spam detection agent with enhanced uncertainty handling.
/// </summary>
/// <returns>A ChatClientAgent configured for three-way spam detection</returns>
private static ChatClientAgent GetSpamDetectionAgent(IChatClient chatClient) =>
    new(chatClient, new ChatClientAgentOptions(instructions: "You are a spam detection assistant that identifies spam emails. Be less confident in your assessments.")
    {
        ChatOptions = new()
        {
            ResponseFormat = ChatResponseFormat.ForJsonSchema<DetectionResult>()
        }
    });

/// <summary>
/// Creates an email assistant agent (unchanged from conditional edges example).
/// </summary>
/// <returns>A ChatClientAgent configured for email assistance</returns>
private static ChatClientAgent GetEmailAssistantAgent(IChatClient chatClient) =>
    new(chatClient, new ChatClientAgentOptions(instructions: "You are an email assistant that helps users draft responses to emails with professionalism.")
    {
        ChatOptions = new()
        {
            ResponseFormat = ChatResponseFormat.ForJsonSchema<EmailResponse>()
        }
    });

Exécuteurs de flux de travail avec routage amélioré

Implémentez des exécuteurs qui gèrent le routage tridirectionnel avec la gestion de l’état partagé :

/// <summary>
/// Executor that detects spam using an AI agent with three-way classification.
/// </summary>
internal sealed class SpamDetectionExecutor : Executor<ChatMessage, DetectionResult>
{
    private readonly AIAgent _spamDetectionAgent;

    public SpamDetectionExecutor(AIAgent spamDetectionAgent) : base("SpamDetectionExecutor")
    {
        this._spamDetectionAgent = spamDetectionAgent;
    }

    public override async ValueTask<DetectionResult> HandleAsync(ChatMessage message, IWorkflowContext context, CancellationToken cancellationToken = default)
    {
        // Generate a random email ID and store the email content in shared state
        var newEmail = new Email
        {
            EmailId = Guid.NewGuid().ToString("N"),
            EmailContent = message.Text
        };
        await context.QueueStateUpdateAsync(newEmail.EmailId, newEmail, scopeName: EmailStateConstants.EmailStateScope);

        // Invoke the agent for enhanced spam detection
        var response = await this._spamDetectionAgent.RunAsync(message);
        var detectionResult = JsonSerializer.Deserialize<DetectionResult>(response.Text);

        detectionResult!.EmailId = newEmail.EmailId;
        return detectionResult;
    }
}

/// <summary>
/// Executor that assists with email responses using an AI agent.
/// </summary>
internal sealed class EmailAssistantExecutor : Executor<DetectionResult, EmailResponse>
{
    private readonly AIAgent _emailAssistantAgent;

    public EmailAssistantExecutor(AIAgent emailAssistantAgent) : base("EmailAssistantExecutor")
    {
        this._emailAssistantAgent = emailAssistantAgent;
    }

    public override async ValueTask<EmailResponse> HandleAsync(DetectionResult message, IWorkflowContext context, CancellationToken cancellationToken = default)
    {
        if (message.spamDecision == SpamDecision.Spam)
        {
            throw new ArgumentException("This executor should only handle non-spam messages.");
        }

        // Retrieve the email content from shared state
        var email = await context.ReadStateAsync<Email>(message.EmailId, scopeName: EmailStateConstants.EmailStateScope);

        // Invoke the agent to draft a response
        var response = await this._emailAssistantAgent.RunAsync(email!.EmailContent);
        var emailResponse = JsonSerializer.Deserialize<EmailResponse>(response.Text);

        return emailResponse!;
    }
}

/// <summary>
/// Executor that sends emails.
/// </summary>
internal sealed class SendEmailExecutor : Executor<EmailResponse>
{
    public SendEmailExecutor() : base("SendEmailExecutor") { }

    public override async ValueTask HandleAsync(EmailResponse message, IWorkflowContext context, CancellationToken cancellationToken = default) =>
        await context.YieldOutputAsync($"Email sent: {message.Response}").ConfigureAwait(false);
}

/// <summary>
/// Executor that handles spam messages.
/// </summary>
internal sealed class HandleSpamExecutor : Executor<DetectionResult>
{
    public HandleSpamExecutor() : base("HandleSpamExecutor") { }

    public override async ValueTask HandleAsync(DetectionResult message, IWorkflowContext context, CancellationToken cancellationToken = default)
    {
        if (message.spamDecision == SpamDecision.Spam)
        {
            await context.YieldOutputAsync($"Email marked as spam: {message.Reason}").ConfigureAwait(false);
        }
        else
        {
            throw new ArgumentException("This executor should only handle spam messages.");
        }
    }
}

/// <summary>
/// Executor that handles uncertain emails requiring manual review.
/// </summary>
internal sealed class HandleUncertainExecutor : Executor<DetectionResult>
{
    public HandleUncertainExecutor() : base("HandleUncertainExecutor") { }

    public override async ValueTask HandleAsync(DetectionResult message, IWorkflowContext context, CancellationToken cancellationToken = default)
    {
        if (message.spamDecision == SpamDecision.Uncertain)
        {
            var email = await context.ReadStateAsync<Email>(message.EmailId, scopeName: EmailStateConstants.EmailStateScope);
            await context.YieldOutputAsync($"Email marked as uncertain: {message.Reason}. Email content: {email?.EmailContent}");
        }
        else
        {
            throw new ArgumentException("This executor should only handle uncertain spam decisions.");
        }
    }
}

Construire un flux de processus avec le modèle Switch-Case

Remplacez plusieurs bords conditionnels par le modèle de casse de commutateur plus propre :

public static class Program
{
    private static async Task Main()
    {
        // Set up the Azure OpenAI client
        var endpoint = Environment.GetEnvironmentVariable("AZURE_OPENAI_ENDPOINT") ?? throw new Exception("AZURE_OPENAI_ENDPOINT is not set.");
        var deploymentName = Environment.GetEnvironmentVariable("AZURE_OPENAI_DEPLOYMENT_NAME") ?? "gpt-4o-mini";
        var chatClient = new AzureOpenAIClient(new Uri(endpoint), new AzureCliCredential()).GetChatClient(deploymentName).AsIChatClient();

        // Create agents
        AIAgent spamDetectionAgent = GetSpamDetectionAgent(chatClient);
        AIAgent emailAssistantAgent = GetEmailAssistantAgent(chatClient);

        // Create executors
        var spamDetectionExecutor = new SpamDetectionExecutor(spamDetectionAgent);
        var emailAssistantExecutor = new EmailAssistantExecutor(emailAssistantAgent);
        var sendEmailExecutor = new SendEmailExecutor();
        var handleSpamExecutor = new HandleSpamExecutor();
        var handleUncertainExecutor = new HandleUncertainExecutor();

        // Build the workflow using switch-case for cleaner three-way routing
        WorkflowBuilder builder = new(spamDetectionExecutor);
        builder.AddSwitch(spamDetectionExecutor, switchBuilder =>
            switchBuilder
            .AddCase(
                GetCondition(expectedDecision: SpamDecision.NotSpam),
                emailAssistantExecutor
            )
            .AddCase(
                GetCondition(expectedDecision: SpamDecision.Spam),
                handleSpamExecutor
            )
            .WithDefault(
                handleUncertainExecutor
            )
        )
        // After the email assistant writes a response, it will be sent to the send email executor
        .AddEdge(emailAssistantExecutor, sendEmailExecutor)
        .WithOutputFrom(handleSpamExecutor, sendEmailExecutor, handleUncertainExecutor);

        var workflow = builder.Build();

        // Read an email from a text file (use ambiguous content for demonstration)
        string email = Resources.Read("ambiguous_email.txt");

        // Execute the workflow
        StreamingRun run = await InProcessExecution.StreamAsync(workflow, new ChatMessage(ChatRole.User, email));
        await run.TrySendMessageAsync(new TurnToken(emitEvents: true));
        await foreach (WorkflowEvent evt in run.WatchStreamAsync().ConfigureAwait(false))
        {
            if (evt is WorkflowOutputEvent outputEvent)
            {
                Console.WriteLine($"{outputEvent}");
            }
        }
    }
}

avantages Switch-Case

  1. Syntaxe de nettoyage : La SwitchBuilder solution offre une alternative plus lisible à plusieurs bords conditionnels
  2. Évaluation ordonnée : les cas sont évalués séquentiellement, s'arrêtant au premier élément correspondant
  3. Routage garanti : la WithDefault() méthode garantit que les messages ne sont jamais bloqués
  4. Meilleure facilité de maintenance : l’ajout de nouveaux cas nécessite des modifications minimales dans la structure de flux de travail
  5. Sécurité du type : chaque exécuteur valide son entrée pour intercepter les erreurs de routage tôt

Comparaison des modèles

Avant (arêtes conditionnelles) :

var workflow = new WorkflowBuilder(spamDetectionExecutor)
    .AddEdge(spamDetectionExecutor, emailAssistantExecutor, condition: GetCondition(expectedResult: false))
    .AddEdge(spamDetectionExecutor, handleSpamExecutor, condition: GetCondition(expectedResult: true))
    // No clean way to handle a third case
    .WithOutputFrom(handleSpamExecutor, sendEmailExecutor)
    .Build();

Après (Switch-Case) :

WorkflowBuilder builder = new(spamDetectionExecutor);
builder.AddSwitch(spamDetectionExecutor, switchBuilder =>
    switchBuilder
    .AddCase(GetCondition(SpamDecision.NotSpam), emailAssistantExecutor)
    .AddCase(GetCondition(SpamDecision.Spam), handleSpamExecutor)
    .WithDefault(handleUncertainExecutor)  // Clean default case
)
// Continue building the rest of the workflow

Le modèle switch-case s’adapte beaucoup mieux à mesure que le nombre de décisions de routage augmente, et le cas par défaut fournit un filet de sécurité pour les valeurs inattendues.

Exécution de l’exemple

Lorsque vous exécutez ce flux de travail avec du contenu de courrier ambigu :

Email marked as uncertain: This email contains promotional language but might be from a legitimate business contact, requiring human review for proper classification.

Essayez de modifier le contenu de l’e-mail en quelque chose clairement spam ou clairement légitime pour voir les différents chemins de routage en action.

Implémentation complète

Pour obtenir l’implémentation de travail complète, consultez cet exemple dans le référentiel Agent Framework.

Développement sur des bases conditionnelles

L’exemple précédent d’arêtes conditionnelles a montré un routage bidirectionnel (courrier indésirable et e-mails légitimes). Toutefois, de nombreux scénarios réels nécessitent des arbres de décision plus sophistiqués. Les arêtes switch-case offrent une solution plus propre et plus facile à gérer lorsque vous devez acheminer vers plusieurs destinations en fonction de différentes conditions.

Ce que vous allez construire ensuite

Vous allez étendre le flux de travail de traitement des e-mails pour gérer trois chemins de décision :

  • NotSpam → L’Assistant Courrier → Envoyer un e-mail
  • Courrier indésirable → marquer comme courrier indésirable
  • Indicateur de → incertain pour la révision manuelle (cas par défaut)

L’amélioration clé consiste à utiliser un seul groupe de périphérie switch-case au lieu de plusieurs bords conditionnels individuels, ce qui facilite la compréhension et la maintenance du flux de travail à mesure que la complexité de la décision augmente.

Concepts abordés

Modèles de données améliorés

Mettez à jour vos modèles de données pour soutenir la classification tridirectionnelle.

from typing import Literal

class DetectionResultAgent(BaseModel):
    """Structured output returned by the spam detection agent."""

    # The agent classifies the email into one of three categories
    spam_decision: Literal["NotSpam", "Spam", "Uncertain"]
    reason: str

class EmailResponse(BaseModel):
    """Structured output returned by the email assistant agent."""

    response: str

@dataclass
class DetectionResult:
    """Internal typed payload used for routing and downstream handling."""

    spam_decision: str
    reason: str
    email_id: str

@dataclass
class Email:
    """In memory record of the email content stored in shared state."""

    email_id: str
    email_content: str

Usine de conditions Switch-Case

Créez une fabrique de conditions réutilisables qui génère des prédicats pour chaque décision de courrier indésirable :

def get_case(expected_decision: str):
    """Factory that returns a predicate matching a specific spam_decision value."""

    def condition(message: Any) -> bool:
        # Only match when the upstream payload is a DetectionResult with the expected decision
        return isinstance(message, DetectionResult) and message.spam_decision == expected_decision

    return condition

Cette approche usinée :

  • Réduit la duplication de code : une fonction génère tous les prédicats de condition
  • Garantit la cohérence : toutes les conditions suivent le même modèle
  • Simplifie la maintenance : les modifications apportées à la logique de condition se produisent à un seul endroit

Exécuteurs de flux de travail avec état partagé

Implémentez des exécuteurs qui utilisent l’état partagé pour éviter de transmettre du contenu de courrier volumineux à chaque étape du flux de travail :

EMAIL_STATE_PREFIX = "email:"
CURRENT_EMAIL_ID_KEY = "current_email_id"

@executor(id="store_email")
async def store_email(email_text: str, ctx: WorkflowContext[AgentExecutorRequest]) -> None:
    """Store email content once and pass around a lightweight ID reference."""

    # Persist the raw email content in shared state
    new_email = Email(email_id=str(uuid4()), email_content=email_text)
    await ctx.set_shared_state(f"{EMAIL_STATE_PREFIX}{new_email.email_id}", new_email)
    await ctx.set_shared_state(CURRENT_EMAIL_ID_KEY, new_email.email_id)

    # Forward email to spam detection agent
    await ctx.send_message(
        AgentExecutorRequest(messages=[ChatMessage(Role.USER, text=new_email.email_content)], should_respond=True)
    )

@executor(id="to_detection_result")
async def to_detection_result(response: AgentExecutorResponse, ctx: WorkflowContext[DetectionResult]) -> None:
    """Transform agent response into a typed DetectionResult with email ID."""

    # Parse the agent's structured JSON output
    parsed = DetectionResultAgent.model_validate_json(response.agent_run_response.text)
    email_id: str = await ctx.get_shared_state(CURRENT_EMAIL_ID_KEY)

    # Create typed message for switch-case routing
    await ctx.send_message(DetectionResult(
        spam_decision=parsed.spam_decision,
        reason=parsed.reason,
        email_id=email_id
    ))

@executor(id="submit_to_email_assistant")
async def submit_to_email_assistant(detection: DetectionResult, ctx: WorkflowContext[AgentExecutorRequest]) -> None:
    """Handle NotSpam emails by forwarding to the email assistant."""

    # Guard against misrouting
    if detection.spam_decision != "NotSpam":
        raise RuntimeError("This executor should only handle NotSpam messages.")

    # Retrieve original email content from shared state
    email: Email = await ctx.get_shared_state(f"{EMAIL_STATE_PREFIX}{detection.email_id}")
    await ctx.send_message(
        AgentExecutorRequest(messages=[ChatMessage(Role.USER, text=email.email_content)], should_respond=True)
    )

@executor(id="finalize_and_send")
async def finalize_and_send(response: AgentExecutorResponse, ctx: WorkflowContext[Never, str]) -> None:
    """Parse email assistant response and yield final output."""

    parsed = EmailResponse.model_validate_json(response.agent_run_response.text)
    await ctx.yield_output(f"Email sent: {parsed.response}")

@executor(id="handle_spam")
async def handle_spam(detection: DetectionResult, ctx: WorkflowContext[Never, str]) -> None:
    """Handle confirmed spam emails."""

    if detection.spam_decision == "Spam":
        await ctx.yield_output(f"Email marked as spam: {detection.reason}")
    else:
        raise RuntimeError("This executor should only handle Spam messages.")

@executor(id="handle_uncertain")
async def handle_uncertain(detection: DetectionResult, ctx: WorkflowContext[Never, str]) -> None:
    """Handle uncertain classifications that need manual review."""

    if detection.spam_decision == "Uncertain":
        # Include original content for human review
        email: Email | None = await ctx.get_shared_state(f"{EMAIL_STATE_PREFIX}{detection.email_id}")
        await ctx.yield_output(
            f"Email marked as uncertain: {detection.reason}. Email content: {getattr(email, 'email_content', '')}"
        )
    else:
        raise RuntimeError("This executor should only handle Uncertain messages.")

Créer un agent IA amélioré

Mettez à jour l'agent de détection du spam pour qu'il soit moins sûr et afin de retourner des classifications tridirectionnelles.

async def main():
    chat_client = AzureOpenAIChatClient(credential=AzureCliCredential())

    # Enhanced spam detection agent with three-way classification
    spam_detection_agent = AgentExecutor(
        chat_client.create_agent(
            instructions=(
                "You are a spam detection assistant that identifies spam emails. "
                "Be less confident in your assessments. "
                "Always return JSON with fields 'spam_decision' (one of NotSpam, Spam, Uncertain) "
                "and 'reason' (string)."
            ),
            response_format=DetectionResultAgent,
        ),
        id="spam_detection_agent",
    )

    # Email assistant remains the same
    email_assistant_agent = AgentExecutor(
        chat_client.create_agent(
            instructions=(
                "You are an email assistant that helps users draft responses to emails with professionalism."
            ),
            response_format=EmailResponse,
        ),
        id="email_assistant_agent",
    )

Générer un flux de travail avec Switch-Case groupe Edge

remplacez plusieurs arêtes conditionnelles par un seul groupe switch-case :

    # Build workflow using switch-case for cleaner three-way routing
    workflow = (
        WorkflowBuilder()
        .set_start_executor(store_email)
        .add_edge(store_email, spam_detection_agent)
        .add_edge(spam_detection_agent, to_detection_result)
        .add_switch_case_edge_group(
            to_detection_result,
            [
                # Explicit cases for specific decisions
                Case(condition=get_case("NotSpam"), target=submit_to_email_assistant),
                Case(condition=get_case("Spam"), target=handle_spam),
                # Default case catches anything that doesn't match above
                Default(target=handle_uncertain),
            ],
        )
        .add_edge(submit_to_email_assistant, email_assistant_agent)
        .add_edge(email_assistant_agent, finalize_and_send)
        .build()
    )

Exécuter et tester

Exécutez le flux de travail avec du contenu de messagerie ambiguë qui illustre le routage tridirectionnel :

    # Use ambiguous email content that might trigger uncertain classification
    email = (
        "Hey there, I noticed you might be interested in our latest offer—no pressure, but it expires soon. "
        "Let me know if you'd like more details."
    )

    # Execute and display results
    events = await workflow.run(email)
    outputs = events.get_outputs()
    if outputs:
        for output in outputs:
            print(f"Workflow output: {output}")

Avantages clés des arêtes Switch-Case

  1. Syntaxe de nettoyage : un groupe de bords au lieu de plusieurs bords conditionnels
  2. Évaluation ordonnée : les cas sont évalués séquentiellement, s'arrêtant au premier élément correspondant
  3. Routage garanti : le cas par défaut garantit que les messages ne sont jamais bloqués
  4. Meilleure maintenance : l’ajout de nouveaux cas nécessite des modifications minimales
  5. Sécurité du type : chaque exécuteur valide son entrée pour intercepter les erreurs de routage

Comparaison : conditionnel et Switch-Case

Avant (arêtes conditionnelles) :

.add_edge(detector, handler_a, condition=lambda x: x.result == "A")
.add_edge(detector, handler_b, condition=lambda x: x.result == "B")
.add_edge(detector, handler_c, condition=lambda x: x.result == "C")

Après (Switch-Case) :

.add_switch_case_edge_group(
    detector,
    [
        Case(condition=lambda x: x.result == "A", target=handler_a),
        Case(condition=lambda x: x.result == "B", target=handler_b),
        Default(target=handler_c),  # Catches everything else
    ],
)

Le modèle switch-case s’adapte beaucoup mieux à mesure que le nombre de décisions de routage augmente, et le cas par défaut fournit un filet de sécurité pour les valeurs inattendues.

Switch-Case exemple de code

Pour obtenir l’implémentation de travail complète, consultez l’exemple de switch_case_edge_group.py dans le référentiel Agent Framework.

Arêtes de sélection multiple

Au-delà du commutateur : routage à sélection multiple

Les branches switch-case acheminent les messages vers exactement une seule destination, mais les flux de travail réels doivent souvent déclencher plusieurs opérations parallèles en fonction des propriétés des données. Les bords partitionnés (implémentés en tant que bords de ventilateur avec des partitionneurs) permettent des modèles sophistiqués de fan-out où un seul message peut activer simultanément plusieurs exécuteurs en aval.

Flux de travail avancé de traitement des e-mails

En se basant sur l’exemple de cas de commutateur, vous allez créer un système de traitement de courrier amélioré qui illustre une logique de routage sophistiquée :

  • Courrier indésirable → gestionnaire de courrier indésirable unique (par exemple, switch-case)
  • E-mails légitimes → Déclencher toujours l'assistant e-mail + Déclencher conditionnellement le résumeur pour les e-mails longs
  • E-mails incertains → gestionnaire incertain unique (par exemple, switch-case)
  • Persistance de base de données → déclenchée pour les e-mails courts et les e-mails longs résumés

Ce modèle permet des pipelines de traitement parallèles qui s’adaptent aux caractéristiques de contenu.

Concepts abordés

Modèles de données pour la sélection multiple

Étendez les modèles de données pour prendre en charge l’analyse de longueur des e-mails et la synthèse :

/// <summary>
/// Represents the result of enhanced email analysis with additional metadata.
/// </summary>
public sealed class AnalysisResult
{
    [JsonPropertyName("spam_decision")]
    [JsonConverter(typeof(JsonStringEnumConverter))]
    public SpamDecision spamDecision { get; set; }

    [JsonPropertyName("reason")]
    public string Reason { get; set; } = string.Empty;

    // Additional properties for sophisticated routing
    [JsonIgnore]
    public int EmailLength { get; set; }

    [JsonIgnore]
    public string EmailSummary { get; set; } = string.Empty;

    [JsonIgnore]
    public string EmailId { get; set; } = string.Empty;
}

/// <summary>
/// Represents the response from the email assistant.
/// </summary>
public sealed class EmailResponse
{
    [JsonPropertyName("response")]
    public string Response { get; set; } = string.Empty;
}

/// <summary>
/// Represents the response from the email summary agent.
/// </summary>
public sealed class EmailSummary
{
    [JsonPropertyName("summary")]
    public string Summary { get; set; } = string.Empty;
}

/// <summary>
/// A custom workflow event for database operations.
/// </summary>
internal sealed class DatabaseEvent(string message) : WorkflowEvent(message) { }

/// <summary>
/// Constants for email processing thresholds.
/// </summary>
public static class EmailProcessingConstants
{
    public const int LongEmailThreshold = 100;
}

Fonction d’assigneur cible : cœur de la sélection multiple

La fonction d’assigneur cible détermine les exécuteurs qui doivent recevoir chaque message :

/// <summary>
/// Creates a target assigner for routing messages based on the analysis result.
/// </summary>
/// <returns>A function that takes an analysis result and returns the target partitions.</returns>
private static Func<AnalysisResult?, int, IEnumerable<int>> GetTargetAssigner()
{
    return (analysisResult, targetCount) =>
    {
        if (analysisResult is not null)
        {
            if (analysisResult.spamDecision == SpamDecision.Spam)
            {
                return [0]; // Route only to spam handler (index 0)
            }
            else if (analysisResult.spamDecision == SpamDecision.NotSpam)
            {
                // Always route to email assistant (index 1)
                List<int> targets = [1];

                // Conditionally add summarizer for long emails (index 2)
                if (analysisResult.EmailLength > EmailProcessingConstants.LongEmailThreshold)
                {
                    targets.Add(2);
                }

                return targets;
            }
            else // Uncertain
            {
                return [3]; // Route only to uncertain handler (index 3)
            }
        }
        throw new ArgumentException("Invalid analysis result.");
    };
}

Fonctionnalités clés de la fonction d’assigneur cible

  1. Sélection cible dynamique : retourne une liste d’index d’exécuteur à activer
  2. Routage prenant en charge le contenu : prend des décisions en fonction des propriétés de message telles que la longueur du courrier électronique
  3. Traitement parallèle : plusieurs cibles peuvent s’exécuter simultanément
  4. Logique conditionnelle : branchement complexe basé sur plusieurs critères

Exécuteurs de flux de travail améliorés

Implémentez des exécuteurs qui gèrent l’analyse et le routage avancés :

/// <summary>
/// Executor that analyzes emails using an AI agent with enhanced analysis.
/// </summary>
internal sealed class EmailAnalysisExecutor : Executor<ChatMessage, AnalysisResult>
{
    private readonly AIAgent _emailAnalysisAgent;

    public EmailAnalysisExecutor(AIAgent emailAnalysisAgent) : base("EmailAnalysisExecutor")
    {
        this._emailAnalysisAgent = emailAnalysisAgent;
    }

    public override async ValueTask<AnalysisResult> HandleAsync(ChatMessage message, IWorkflowContext context, CancellationToken cancellationToken = default)
    {
        // Generate a random email ID and store the email content
        var newEmail = new Email
        {
            EmailId = Guid.NewGuid().ToString("N"),
            EmailContent = message.Text
        };
        await context.QueueStateUpdateAsync(newEmail.EmailId, newEmail, scopeName: EmailStateConstants.EmailStateScope);

        // Invoke the agent for enhanced analysis
        var response = await this._emailAnalysisAgent.RunAsync(message);
        var analysisResult = JsonSerializer.Deserialize<AnalysisResult>(response.Text);

        // Enrich with metadata for routing decisions
        analysisResult!.EmailId = newEmail.EmailId;
        analysisResult.EmailLength = newEmail.EmailContent.Length;

        return analysisResult;
    }
}

/// <summary>
/// Executor that assists with email responses using an AI agent.
/// </summary>
internal sealed class EmailAssistantExecutor : Executor<AnalysisResult, EmailResponse>
{
    private readonly AIAgent _emailAssistantAgent;

    public EmailAssistantExecutor(AIAgent emailAssistantAgent) : base("EmailAssistantExecutor")
    {
        this._emailAssistantAgent = emailAssistantAgent;
    }

    public override async ValueTask<EmailResponse> HandleAsync(AnalysisResult message, IWorkflowContext context, CancellationToken cancellationToken = default)
    {
        if (message.spamDecision == SpamDecision.Spam)
        {
            throw new ArgumentException("This executor should only handle non-spam messages.");
        }

        // Retrieve the email content from shared state
        var email = await context.ReadStateAsync<Email>(message.EmailId, scopeName: EmailStateConstants.EmailStateScope);

        // Invoke the agent to draft a response
        var response = await this._emailAssistantAgent.RunAsync(email!.EmailContent);
        var emailResponse = JsonSerializer.Deserialize<EmailResponse>(response.Text);

        return emailResponse!;
    }
}

/// <summary>
/// Executor that summarizes emails using an AI agent for long emails.
/// </summary>
internal sealed class EmailSummaryExecutor : Executor<AnalysisResult, AnalysisResult>
{
    private readonly AIAgent _emailSummaryAgent;

    public EmailSummaryExecutor(AIAgent emailSummaryAgent) : base("EmailSummaryExecutor")
    {
        this._emailSummaryAgent = emailSummaryAgent;
    }

    public override async ValueTask<AnalysisResult> HandleAsync(AnalysisResult message, IWorkflowContext context, CancellationToken cancellationToken = default)
    {
        // Read the email content from shared state
        var email = await context.ReadStateAsync<Email>(message.EmailId, scopeName: EmailStateConstants.EmailStateScope);

        // Generate summary for long emails
        var response = await this._emailSummaryAgent.RunAsync(email!.EmailContent);
        var emailSummary = JsonSerializer.Deserialize<EmailSummary>(response.Text);

        // Enrich the analysis result with the summary
        message.EmailSummary = emailSummary!.Summary;

        return message;
    }
}

/// <summary>
/// Executor that sends emails.
/// </summary>
internal sealed class SendEmailExecutor : Executor<EmailResponse>
{
    public SendEmailExecutor() : base("SendEmailExecutor") { }

    public override async ValueTask HandleAsync(EmailResponse message, IWorkflowContext context, CancellationToken cancellationToken = default) =>
        await context.YieldOutputAsync($"Email sent: {message.Response}");
}

/// <summary>
/// Executor that handles spam messages.
/// </summary>
internal sealed class HandleSpamExecutor : Executor<AnalysisResult>
{
    public HandleSpamExecutor() : base("HandleSpamExecutor") { }

    public override async ValueTask HandleAsync(AnalysisResult message, IWorkflowContext context, CancellationToken cancellationToken = default)
    {
        if (message.spamDecision == SpamDecision.Spam)
        {
            await context.YieldOutputAsync($"Email marked as spam: {message.Reason}");
        }
        else
        {
            throw new ArgumentException("This executor should only handle spam messages.");
        }
    }
}

/// <summary>
/// Executor that handles uncertain messages requiring manual review.
/// </summary>
internal sealed class HandleUncertainExecutor : Executor<AnalysisResult>
{
    public HandleUncertainExecutor() : base("HandleUncertainExecutor") { }

    public override async ValueTask HandleAsync(AnalysisResult message, IWorkflowContext context, CancellationToken cancellationToken = default)
    {
        if (message.spamDecision == SpamDecision.Uncertain)
        {
            var email = await context.ReadStateAsync<Email>(message.EmailId, scopeName: EmailStateConstants.EmailStateScope);
            await context.YieldOutputAsync($"Email marked as uncertain: {message.Reason}. Email content: {email?.EmailContent}");
        }
        else
        {
            throw new ArgumentException("This executor should only handle uncertain spam decisions.");
        }
    }
}

/// <summary>
/// Executor that handles database access with custom events.
/// </summary>
internal sealed class DatabaseAccessExecutor : Executor<AnalysisResult>
{
    public DatabaseAccessExecutor() : base("DatabaseAccessExecutor") { }

    public override async ValueTask HandleAsync(AnalysisResult message, IWorkflowContext context, CancellationToken cancellationToken = default)
    {
        // Simulate database operations
        await context.ReadStateAsync<Email>(message.EmailId, scopeName: EmailStateConstants.EmailStateScope);
        await Task.Delay(100); // Simulate database access delay

        // Emit custom database event for monitoring
        await context.AddEventAsync(new DatabaseEvent($"Email {message.EmailId} saved to database."));
    }
}

Agents IA améliorés

Créez des agents pour l’analyse, l’assistance et la synthèse :

/// <summary>
/// Create an enhanced email analysis agent.
/// </summary>
/// <returns>A ChatClientAgent configured for comprehensive email analysis</returns>
private static ChatClientAgent GetEmailAnalysisAgent(IChatClient chatClient) =>
    new(chatClient, new ChatClientAgentOptions(instructions: "You are a spam detection assistant that identifies spam emails.")
    {
        ChatOptions = new()
        {
            ResponseFormat = ChatResponseFormat.ForJsonSchema<AnalysisResult>()
        }
    });

/// <summary>
/// Creates an email assistant agent.
/// </summary>
/// <returns>A ChatClientAgent configured for email assistance</returns>
private static ChatClientAgent GetEmailAssistantAgent(IChatClient chatClient) =>
    new(chatClient, new ChatClientAgentOptions(instructions: "You are an email assistant that helps users draft responses to emails with professionalism.")
    {
        ChatOptions = new()
        {
            ResponseFormat = ChatResponseFormat.ForJsonSchema<EmailResponse>()
        }
    });

/// <summary>
/// Creates an agent that summarizes emails.
/// </summary>
/// <returns>A ChatClientAgent configured for email summarization</returns>
private static ChatClientAgent GetEmailSummaryAgent(IChatClient chatClient) =>
    new(chatClient, new ChatClientAgentOptions(instructions: "You are an assistant that helps users summarize emails.")
    {
        ChatOptions = new()
        {
            ResponseFormat = ChatResponseFormat.ForJsonSchema<EmailSummary>()
        }
    });

Construction de flux de travail à sélection multiple

Construisez le flux de travail avec un routage sophistiqué et un traitement parallèle :

public static class Program
{
    private static async Task Main()
    {
        // Set up the Azure OpenAI client
        var endpoint = Environment.GetEnvironmentVariable("AZURE_OPENAI_ENDPOINT") ?? throw new Exception("AZURE_OPENAI_ENDPOINT is not set.");
        var deploymentName = Environment.GetEnvironmentVariable("AZURE_OPENAI_DEPLOYMENT_NAME") ?? "gpt-4o-mini";
        var chatClient = new AzureOpenAIClient(new Uri(endpoint), new AzureCliCredential()).GetChatClient(deploymentName).AsIChatClient();

        // Create agents
        AIAgent emailAnalysisAgent = GetEmailAnalysisAgent(chatClient);
        AIAgent emailAssistantAgent = GetEmailAssistantAgent(chatClient);
        AIAgent emailSummaryAgent = GetEmailSummaryAgent(chatClient);

        // Create executors
        var emailAnalysisExecutor = new EmailAnalysisExecutor(emailAnalysisAgent);
        var emailAssistantExecutor = new EmailAssistantExecutor(emailAssistantAgent);
        var emailSummaryExecutor = new EmailSummaryExecutor(emailSummaryAgent);
        var sendEmailExecutor = new SendEmailExecutor();
        var handleSpamExecutor = new HandleSpamExecutor();
        var handleUncertainExecutor = new HandleUncertainExecutor();
        var databaseAccessExecutor = new DatabaseAccessExecutor();

        // Build the workflow with multi-selection fan-out
        WorkflowBuilder builder = new(emailAnalysisExecutor);
        builder.AddFanOutEdge(
            emailAnalysisExecutor,
            targets: [
                handleSpamExecutor,        // Index 0: Spam handler
                emailAssistantExecutor,    // Index 1: Email assistant (always for NotSpam)
                emailSummaryExecutor,      // Index 2: Summarizer (conditionally for long NotSpam)
                handleUncertainExecutor,   // Index 3: Uncertain handler
            ],
            targetSelector: GetTargetAssigner()
        )
        // Email assistant branch
        .AddEdge(emailAssistantExecutor, sendEmailExecutor)

        // Database persistence: conditional routing
        .AddEdge<AnalysisResult>(
            emailAnalysisExecutor,
            databaseAccessExecutor,
            condition: analysisResult => analysisResult?.EmailLength <= EmailProcessingConstants.LongEmailThreshold) // Short emails
        .AddEdge(emailSummaryExecutor, databaseAccessExecutor) // Long emails with summary

        .WithOutputFrom(handleUncertainExecutor, handleSpamExecutor, sendEmailExecutor);

        var workflow = builder.Build();

        // Read a moderately long email to trigger both assistant and summarizer
        string email = Resources.Read("email.txt");

        // Execute the workflow with custom event handling
        StreamingRun run = await InProcessExecution.StreamAsync(workflow, new ChatMessage(ChatRole.User, email));
        await run.TrySendMessageAsync(new TurnToken(emitEvents: true));
        await foreach (WorkflowEvent evt in run.WatchStreamAsync().ConfigureAwait(false))
        {
            if (evt is WorkflowOutputEvent outputEvent)
            {
                Console.WriteLine($"Output: {outputEvent}");
            }

            if (evt is DatabaseEvent databaseEvent)
            {
                Console.WriteLine($"Database: {databaseEvent}");
            }
        }
    }
}

Comparaison des modèles : Sélection Multiple et Switch-Case

Modèle Switch-Case (précédent) :

// One input → exactly one output
builder.AddSwitch(spamDetectionExecutor, switchBuilder =>
    switchBuilder
    .AddCase(GetCondition(SpamDecision.NotSpam), emailAssistantExecutor)
    .AddCase(GetCondition(SpamDecision.Spam), handleSpamExecutor)
    .WithDefault(handleUncertainExecutor)
)

Modèle de sélection multiple :

// One input → one or more outputs (dynamic fan-out)
builder.AddFanOutEdge(
    emailAnalysisExecutor,
    targets: [handleSpamExecutor, emailAssistantExecutor, emailSummaryExecutor, handleUncertainExecutor],
    targetSelector: GetTargetAssigner() // Returns list of target indices
)

Avantages clés des arêtes de sélection multiple

  1. Traitement parallèle : plusieurs branches peuvent s’exécuter simultanément
  2. Ventilateur conditionnel : le nombre de cibles varie en fonction du contenu
  3. Routage prenant en charge le contenu : décisions basées sur les propriétés du message, pas seulement le type
  4. Utilisation efficace des ressources : seules les branches nécessaires sont activées
  5. Logique métier complexe : prend en charge des scénarios de routage sophistiqués

Exécution de l’exemple de sélection multiple

Lorsque vous exécutez ce flux de travail avec un e-mail long :

Output: Email sent: [Professional response generated by AI]
Database: Email abc123 saved to database.

Lorsque vous exécutez un e-mail court, le récapitulateur est ignoré :

Output: Email sent: [Professional response generated by AI]
Database: Email def456 saved to database.

Real-World Cas d’usage

  • Systèmes de messagerie : acheminer vers l'assistant de réponse ; archivage ; analytique (sous condition)
  • Traitement du contenu : déclencher la transcription + traduction + analyse (en fonction du type de contenu)
  • Traitement des commandes : itinéraire vers le traitement + facturation + notifications (en fonction des propriétés de commande)
  • Pipelines de données : déclencher différents flux d’analyse en fonction des caractéristiques des données

Implémentation complète de la sélection multiple

Pour obtenir l’implémentation de travail complète, consultez cet exemple dans le référentiel Agent Framework.

Au-delà du commutateur : routage à sélection multiple

Les branches switch-case acheminent les messages vers exactement une seule destination, mais les flux de travail réels doivent souvent déclencher plusieurs opérations parallèles en fonction des propriétés des données. Les arêtes partitionnées (implémentées en tant que groupes d'arêtes à sélection multiple) permettent des modèles de fan-out sophistiqués où un seul message peut activer simultanément plusieurs exécuteurs en aval.

Flux de travail avancé de traitement des e-mails

En se basant sur l’exemple de cas de commutateur, vous allez créer un système de traitement de courrier amélioré qui illustre une logique de routage sophistiquée :

  • Courrier indésirable → gestionnaire de courrier indésirable unique (par exemple, switch-case)
  • E-mails légitimes → Déclencher toujours l'assistant e-mail + Déclencher conditionnellement le résumeur pour les e-mails longs
  • E-mails incertains → gestionnaire incertain unique (par exemple, switch-case)
  • Persistance de base de données → déclenchée pour les e-mails courts et les e-mails longs résumés

Ce modèle permet des pipelines de traitement parallèles qui s’adaptent aux caractéristiques de contenu.

Concepts abordés

Modèles de données améliorés pour la sélection multiple

Étendez les modèles de données pour prendre en charge l’analyse de longueur des e-mails et la synthèse :

class AnalysisResultAgent(BaseModel):
    """Enhanced structured output from email analysis agent."""

    spam_decision: Literal["NotSpam", "Spam", "Uncertain"]
    reason: str

class EmailResponse(BaseModel):
    """Response from email assistant."""

    response: str

class EmailSummaryModel(BaseModel):
    """Summary generated by email summary agent."""

    summary: str

@dataclass
class AnalysisResult:
    """Internal analysis result with email metadata for routing decisions."""

    spam_decision: str
    reason: str
    email_length: int  # Used for conditional routing
    email_summary: str  # Populated by summary agent
    email_id: str

@dataclass
class Email:
    """Email content stored in shared state."""

    email_id: str
    email_content: str

# Custom event for database operations
class DatabaseEvent(WorkflowEvent):
    """Custom event for tracking database operations."""
    pass

Fonction de sélection : Cœur de la sélection multiple

La fonction de sélection détermine quels exécuteurs doivent recevoir chaque message :

LONG_EMAIL_THRESHOLD = 100

def select_targets(analysis: AnalysisResult, target_ids: list[str]) -> list[str]:
    """Intelligent routing based on spam decision and email characteristics."""

    # Target order: [handle_spam, submit_to_email_assistant, summarize_email, handle_uncertain]
    handle_spam_id, submit_to_email_assistant_id, summarize_email_id, handle_uncertain_id = target_ids

    if analysis.spam_decision == "Spam":
        # Route only to spam handler
        return [handle_spam_id]

    elif analysis.spam_decision == "NotSpam":
        # Always route to email assistant
        targets = [submit_to_email_assistant_id]

        # Conditionally add summarizer for long emails
        if analysis.email_length > LONG_EMAIL_THRESHOLD:
            targets.append(summarize_email_id)

        return targets

    else:  # Uncertain
        # Route only to uncertain handler
        return [handle_uncertain_id]

Fonctionnalités clés des fonctions de sélection

  1. Sélection cible dynamique : retourne une liste d’ID d’exécuteur à activer
  2. Routage prenant en charge le contenu : prend des décisions basées sur les propriétés du message
  3. Traitement parallèle : plusieurs cibles peuvent s’exécuter simultanément
  4. Logique conditionnelle : branchement complexe basé sur plusieurs critères

Exécuteurs de flux de travail à sélection multiple

Implémentez des exécuteurs qui gèrent l’analyse et le routage améliorés :

EMAIL_STATE_PREFIX = "email:"
CURRENT_EMAIL_ID_KEY = "current_email_id"

@executor(id="store_email")
async def store_email(email_text: str, ctx: WorkflowContext[AgentExecutorRequest]) -> None:
    """Store email and initiate analysis."""

    new_email = Email(email_id=str(uuid4()), email_content=email_text)
    await ctx.set_shared_state(f"{EMAIL_STATE_PREFIX}{new_email.email_id}", new_email)
    await ctx.set_shared_state(CURRENT_EMAIL_ID_KEY, new_email.email_id)

    await ctx.send_message(
        AgentExecutorRequest(messages=[ChatMessage(Role.USER, text=new_email.email_content)], should_respond=True)
    )

@executor(id="to_analysis_result")
async def to_analysis_result(response: AgentExecutorResponse, ctx: WorkflowContext[AnalysisResult]) -> None:
    """Transform agent response into enriched analysis result."""

    parsed = AnalysisResultAgent.model_validate_json(response.agent_run_response.text)
    email_id: str = await ctx.get_shared_state(CURRENT_EMAIL_ID_KEY)
    email: Email = await ctx.get_shared_state(f"{EMAIL_STATE_PREFIX}{email_id}")

    # Create enriched analysis result with email length for routing decisions
    await ctx.send_message(
        AnalysisResult(
            spam_decision=parsed.spam_decision,
            reason=parsed.reason,
            email_length=len(email.email_content),  # Key for conditional routing
            email_summary="",
            email_id=email_id,
        )
    )

@executor(id="submit_to_email_assistant")
async def submit_to_email_assistant(analysis: AnalysisResult, ctx: WorkflowContext[AgentExecutorRequest]) -> None:
    """Handle legitimate emails by forwarding to email assistant."""

    if analysis.spam_decision != "NotSpam":
        raise RuntimeError("This executor should only handle NotSpam messages.")

    email: Email = await ctx.get_shared_state(f"{EMAIL_STATE_PREFIX}{analysis.email_id}")
    await ctx.send_message(
        AgentExecutorRequest(messages=[ChatMessage(Role.USER, text=email.email_content)], should_respond=True)
    )

@executor(id="finalize_and_send")
async def finalize_and_send(response: AgentExecutorResponse, ctx: WorkflowContext[Never, str]) -> None:
    """Final step for email assistant branch."""

    parsed = EmailResponse.model_validate_json(response.agent_run_response.text)
    await ctx.yield_output(f"Email sent: {parsed.response}")

@executor(id="summarize_email")
async def summarize_email(analysis: AnalysisResult, ctx: WorkflowContext[AgentExecutorRequest]) -> None:
    """Generate summary for long emails (parallel branch)."""

    # Only called for long NotSpam emails by selection function
    email: Email = await ctx.get_shared_state(f"{EMAIL_STATE_PREFIX}{analysis.email_id}")
    await ctx.send_message(
        AgentExecutorRequest(messages=[ChatMessage(Role.USER, text=email.email_content)], should_respond=True)
    )

@executor(id="merge_summary")
async def merge_summary(response: AgentExecutorResponse, ctx: WorkflowContext[AnalysisResult]) -> None:
    """Merge summary back into analysis result for database persistence."""

    summary = EmailSummaryModel.model_validate_json(response.agent_run_response.text)
    email_id: str = await ctx.get_shared_state(CURRENT_EMAIL_ID_KEY)
    email: Email = await ctx.get_shared_state(f"{EMAIL_STATE_PREFIX}{email_id}")

    # Create analysis result with summary for database storage
    await ctx.send_message(
        AnalysisResult(
            spam_decision="NotSpam",
            reason="",
            email_length=len(email.email_content),
            email_summary=summary.summary,  # Now includes summary
            email_id=email_id,
        )
    )

@executor(id="handle_spam")
async def handle_spam(analysis: AnalysisResult, ctx: WorkflowContext[Never, str]) -> None:
    """Handle spam emails (single target like switch-case)."""

    if analysis.spam_decision == "Spam":
        await ctx.yield_output(f"Email marked as spam: {analysis.reason}")
    else:
        raise RuntimeError("This executor should only handle Spam messages.")

@executor(id="handle_uncertain")
async def handle_uncertain(analysis: AnalysisResult, ctx: WorkflowContext[Never, str]) -> None:
    """Handle uncertain emails (single target like switch-case)."""

    if analysis.spam_decision == "Uncertain":
        email: Email | None = await ctx.get_shared_state(f"{EMAIL_STATE_PREFIX}{analysis.email_id}")
        await ctx.yield_output(
            f"Email marked as uncertain: {analysis.reason}. Email content: {getattr(email, 'email_content', '')}"
        )
    else:
        raise RuntimeError("This executor should only handle Uncertain messages.")

@executor(id="database_access")
async def database_access(analysis: AnalysisResult, ctx: WorkflowContext[Never, str]) -> None:
    """Simulate database persistence with custom events."""

    await asyncio.sleep(0.05)  # Simulate DB operation
    await ctx.add_event(DatabaseEvent(f"Email {analysis.email_id} saved to database."))

Agents IA améliorés

Créez des agents pour l’analyse, l’assistance et la synthèse :

async def main() -> None:
    chat_client = AzureOpenAIChatClient(credential=AzureCliCredential())

    # Enhanced analysis agent
    email_analysis_agent = AgentExecutor(
        chat_client.create_agent(
            instructions=(
                "You are a spam detection assistant that identifies spam emails. "
                "Always return JSON with fields 'spam_decision' (one of NotSpam, Spam, Uncertain) "
                "and 'reason' (string)."
            ),
            response_format=AnalysisResultAgent,
        ),
        id="email_analysis_agent",
    )

    # Email assistant (same as before)
    email_assistant_agent = AgentExecutor(
        chat_client.create_agent(
            instructions=(
                "You are an email assistant that helps users draft responses to emails with professionalism."
            ),
            response_format=EmailResponse,
        ),
        id="email_assistant_agent",
    )

    # New: Email summary agent for long emails
    email_summary_agent = AgentExecutor(
        chat_client.create_agent(
            instructions="You are an assistant that helps users summarize emails.",
            response_format=EmailSummaryModel,
        ),
        id="email_summary_agent",
    )

Générer un flux de travail de sélection multiple

Construisez le flux de travail avec un routage sophistiqué et un traitement parallèle :

    workflow = (
        WorkflowBuilder()
        .set_start_executor(store_email)
        .add_edge(store_email, email_analysis_agent)
        .add_edge(email_analysis_agent, to_analysis_result)

        # Multi-selection edge group: intelligent fan-out based on content
        .add_multi_selection_edge_group(
            to_analysis_result,
            [handle_spam, submit_to_email_assistant, summarize_email, handle_uncertain],
            selection_func=select_targets,
        )

        # Email assistant branch (always for NotSpam)
        .add_edge(submit_to_email_assistant, email_assistant_agent)
        .add_edge(email_assistant_agent, finalize_and_send)

        # Summary branch (only for long NotSpam emails)
        .add_edge(summarize_email, email_summary_agent)
        .add_edge(email_summary_agent, merge_summary)

        # Database persistence: conditional routing
        .add_edge(to_analysis_result, database_access,
                 condition=lambda r: r.email_length <= LONG_EMAIL_THRESHOLD)  # Short emails
        .add_edge(merge_summary, database_access)  # Long emails with summary

        .build()
    )

Exécution avec la diffusion en continu d’événements

Exécutez le flux de travail et observez l’exécution parallèle par le biais d’événements personnalisés :

    # Use a moderately long email to trigger both assistant and summarizer
    email = """
    Hello team, here are the updates for this week:

    1. Project Alpha is on track and we should have the first milestone completed by Friday.
    2. The client presentation has been scheduled for next Tuesday at 2 PM.
    3. Please review the Q4 budget allocation and provide feedback by Wednesday.

    Let me know if you have any questions or concerns.

    Best regards,
    Alex
    """

    # Stream events to see parallel execution
    async for event in workflow.run_stream(email):
        if isinstance(event, DatabaseEvent):
            print(f"Database: {event}")
        elif isinstance(event, WorkflowOutputEvent):
            print(f"Output: {event.data}")

Sélection Multiple versus *Switch-Case* Comparaison

Modèle Switch-Case (précédent) :

# One input → exactly one output
.add_switch_case_edge_group(
    source,
    [
        Case(condition=lambda x: x.result == "A", target=handler_a),
        Case(condition=lambda x: x.result == "B", target=handler_b),
        Default(target=handler_c),
    ],
)

Modèle de sélection multiple :

# One input → one or more outputs (dynamic fan-out)
.add_multi_selection_edge_group(
    source,
    [handler_a, handler_b, handler_c, handler_d],
    selection_func=intelligent_router,  # Returns list of target IDs
)

Avantages de la sélection multiple en C#

  1. Traitement parallèle : plusieurs branches peuvent s’exécuter simultanément
  2. Ventilateur conditionnel : le nombre de cibles varie en fonction du contenu
  3. Routage prenant en charge le contenu : décisions basées sur les propriétés du message, pas seulement le type
  4. Utilisation efficace des ressources : seules les branches nécessaires sont activées
  5. Logique métier complexe : prend en charge des scénarios de routage sophistiqués

C# Applications Pratiques

  • Systèmes de messagerie : acheminer vers l'assistant de réponse ; archivage ; analytique (sous condition)
  • Traitement du contenu : déclencher la transcription + traduction + analyse (en fonction du type de contenu)
  • Traitement des commandes : itinéraire vers le traitement + facturation + notifications (en fonction des propriétés de commande)
  • Pipelines de données : déclencher différents flux d’analyse en fonction des caractéristiques des données

Exemple de code de sélection multiple

Pour obtenir l’implémentation de travail complète, consultez l’exemple multi_selection_edge_group.py dans le référentiel Agent Framework.

Étapes suivantes