Freigeben über


Erstellen eines Workflows mit Verzweigungslogik

In diesem Lernprogramm erfahren Sie, wie Sie einen Workflow mit Verzweigungslogik mithilfe von Agent Framework erstellen. Die Verzweigungslogik ermöglicht Es Ihrem Workflow, Entscheidungen auf der Grundlage bestimmter Bedingungen zu treffen, wodurch komplexeres und dynamischeres Verhalten ermöglicht wird.

Bedingte Kanten

Bedingte Kanten ermöglichen es Ihrem Workflow, Routingentscheidungen basierend auf dem Inhalt oder den Eigenschaften von Nachrichten zu treffen, die über den Workflow fließen. Dies ermöglicht die dynamische Verzweigung, bei der verschiedene Ausführungspfade basierend auf Laufzeitbedingungen verwendet werden.

Was Sie erstellen werden

Sie erstellen einen E-Mail-Verarbeitungsworkflow, der das bedingte Routing veranschaulicht:

  • Ein Spamerkennungs-Agent, der eingehende E-Mails analysiert und strukturierte JSON zurückgibt.
  • Bedingte Kanten, die E-Mails basierend auf der Klassifizierung an verschiedene Handler weiterleiten.
  • Ein legitimer E-Mail-Handler, der professionelle Antworten erstellt.
  • Ein Spamhandler, der verdächtige E-Mails kennzeichnet.
  • Gemeinsame Zustandsverwaltung zum Speichern von E-Mail-Daten zwischen Workflowschritten.

Behandelte Konzepte

Voraussetzungen

Installieren von NuGet-Paketen

Installieren Sie zunächst die erforderlichen Pakete für Ihr .NET-Projekt:

dotnet add package Azure.AI.OpenAI --prerelease
dotnet add package Azure.Identity
dotnet add package Microsoft.Agents.AI.Workflows --prerelease
dotnet add package Microsoft.Extensions.AI.OpenAI --prerelease

Definieren von Datenmodellen

Definieren Sie zunächst die Datenstrukturen, die durch Ihren Workflow fließen:

using System.Text.Json.Serialization;

/// <summary>
/// Represents the result of spam detection.
/// </summary>
public sealed class DetectionResult
{
    [JsonPropertyName("is_spam")]
    public bool IsSpam { get; set; }

    [JsonPropertyName("reason")]
    public string Reason { get; set; } = string.Empty;

    // Email ID is generated by the executor, not the agent
    [JsonIgnore]
    public string EmailId { get; set; } = string.Empty;
}

/// <summary>
/// Represents an email.
/// </summary>
internal sealed class Email
{
    [JsonPropertyName("email_id")]
    public string EmailId { get; set; } = string.Empty;

    [JsonPropertyName("email_content")]
    public string EmailContent { get; set; } = string.Empty;
}

/// <summary>
/// Represents the response from the email assistant.
/// </summary>
public sealed class EmailResponse
{
    [JsonPropertyName("response")]
    public string Response { get; set; } = string.Empty;
}

/// <summary>
/// Constants for shared state scopes.
/// </summary>
internal static class EmailStateConstants
{
    public const string EmailStateScope = "EmailState";
}

Erstellen von Bedingungsfunktionen

Die Bedingungsfunktion wertet das Spamerkennungsergebnis aus, um zu bestimmen, welchen Pfad der Workflow übernehmen soll:

/// <summary>
/// Creates a condition for routing messages based on the expected spam detection result.
/// </summary>
/// <param name="expectedResult">The expected spam detection result</param>
/// <returns>A function that evaluates whether a message meets the expected result</returns>
private static Func<object?, bool> GetCondition(bool expectedResult) =>
    detectionResult => detectionResult is DetectionResult result && result.IsSpam == expectedResult;

Diese Bedingungsfunktion:

  • Verwendet einen bool expectedResult Parameter (true für Spam, false für Nichtspam)
  • Gibt eine Funktion zurück, die als Randbedingung verwendet werden kann
  • Überprüft sicher, ob die Nachricht ein DetectionResult ist und die IsSpam-Eigenschaft vergleicht.

KI-Agenten erstellen

Richten Sie die KI-Agents ein, die die Spamerkennung und E-Mail-Unterstützung behandeln:

using Azure.AI.OpenAI;
using Azure.Identity;
using Microsoft.Agents.AI;
using Microsoft.Extensions.AI;

/// <summary>
/// Creates a spam detection agent.
/// </summary>
/// <returns>A ChatClientAgent configured for spam detection</returns>
private static ChatClientAgent GetSpamDetectionAgent(IChatClient chatClient) =>
    new(chatClient, new ChatClientAgentOptions(instructions: "You are a spam detection assistant that identifies spam emails.")
    {
        ChatOptions = new()
        {
            ResponseFormat = ChatResponseFormat.ForJsonSchema(AIJsonUtilities.CreateJsonSchema(typeof(DetectionResult)))
        }
    });

/// <summary>
/// Creates an email assistant agent.
/// </summary>
/// <returns>A ChatClientAgent configured for email assistance</returns>
private static ChatClientAgent GetEmailAssistantAgent(IChatClient chatClient) =>
    new(chatClient, new ChatClientAgentOptions(instructions: "You are an email assistant that helps users draft professional responses to emails.")
    {
        ChatOptions = new()
        {
            ResponseFormat = ChatResponseFormat.ForJsonSchema(AIJsonUtilities.CreateJsonSchema(typeof(EmailResponse)))
        }
    });

Implementieren von Executors

Erstellen Sie die Workflowausführer, die unterschiedliche Phasen der E-Mail-Verarbeitung behandeln:

using Microsoft.Agents.AI.Workflows;
using System.Text.Json;

/// <summary>
/// Executor that detects spam using an AI agent.
/// </summary>
internal sealed class SpamDetectionExecutor : Executor<ChatMessage, DetectionResult>
{
    private readonly AIAgent _spamDetectionAgent;

    public SpamDetectionExecutor(AIAgent spamDetectionAgent) : base("SpamDetectionExecutor")
    {
        this._spamDetectionAgent = spamDetectionAgent;
    }

    public override async ValueTask<DetectionResult> HandleAsync(ChatMessage message, IWorkflowContext context, CancellationToken cancellationToken = default)
    {
        // Generate a random email ID and store the email content to shared state
        var newEmail = new Email
        {
            EmailId = Guid.NewGuid().ToString("N"),
            EmailContent = message.Text
        };
        await context.QueueStateUpdateAsync(newEmail.EmailId, newEmail, scopeName: EmailStateConstants.EmailStateScope);

        // Invoke the agent for spam detection
        var response = await this._spamDetectionAgent.RunAsync(message);
        var detectionResult = JsonSerializer.Deserialize<DetectionResult>(response.Text);

        detectionResult!.EmailId = newEmail.EmailId;
        return detectionResult;
    }
}

/// <summary>
/// Executor that assists with email responses using an AI agent.
/// </summary>
internal sealed class EmailAssistantExecutor : Executor<DetectionResult, EmailResponse>
{
    private readonly AIAgent _emailAssistantAgent;

    public EmailAssistantExecutor(AIAgent emailAssistantAgent) : base("EmailAssistantExecutor")
    {
        this._emailAssistantAgent = emailAssistantAgent;
    }

    public override async ValueTask<EmailResponse> HandleAsync(DetectionResult message, IWorkflowContext context, CancellationToken cancellationToken = default)
    {
        if (message.IsSpam)
        {
            throw new ArgumentException("This executor should only handle non-spam messages.");
        }

        // Retrieve the email content from shared state
        var email = await context.ReadStateAsync<Email>(message.EmailId, scopeName: EmailStateConstants.EmailStateScope)
            ?? throw new InvalidOperationException("Email not found.");

        // Invoke the agent to draft a response
        var response = await this._emailAssistantAgent.RunAsync(email.EmailContent);
        var emailResponse = JsonSerializer.Deserialize<EmailResponse>(response.Text);

        return emailResponse!;
    }
}

/// <summary>
/// Executor that sends emails.
/// </summary>
internal sealed class SendEmailExecutor : Executor<EmailResponse>
{
    public SendEmailExecutor() : base("SendEmailExecutor") { }

    public override async ValueTask HandleAsync(EmailResponse message, IWorkflowContext context, CancellationToken cancellationToken = default) =>
        await context.YieldOutputAsync($"Email sent: {message.Response}");
}

/// <summary>
/// Executor that handles spam messages.
/// </summary>
internal sealed class HandleSpamExecutor : Executor<DetectionResult>
{
    public HandleSpamExecutor() : base("HandleSpamExecutor") { }

    public override async ValueTask HandleAsync(DetectionResult message, IWorkflowContext context, CancellationToken cancellationToken = default)
    {
        if (message.IsSpam)
        {
            await context.YieldOutputAsync($"Email marked as spam: {message.Reason}");
        }
        else
        {
            throw new ArgumentException("This executor should only handle spam messages.");
        }
    }
}

Den Workflow mit bedingten Kanten erstellen

Erstellen Sie nun das Hauptprogramm, das den Workflow erstellt und ausführt:

using Microsoft.Extensions.AI;

public static class Program
{
    private static async Task Main()
    {
        // Set up the Azure OpenAI client
        var endpoint = Environment.GetEnvironmentVariable("AZURE_OPENAI_ENDPOINT")
            ?? throw new Exception("AZURE_OPENAI_ENDPOINT is not set.");
        var deploymentName = Environment.GetEnvironmentVariable("AZURE_OPENAI_DEPLOYMENT_NAME") ?? "gpt-4o-mini";
        var chatClient = new AzureOpenAIClient(new Uri(endpoint), new AzureCliCredential())
            .GetChatClient(deploymentName).AsIChatClient();

        // Create agents
        AIAgent spamDetectionAgent = GetSpamDetectionAgent(chatClient);
        AIAgent emailAssistantAgent = GetEmailAssistantAgent(chatClient);

        // Create executors
        var spamDetectionExecutor = new SpamDetectionExecutor(spamDetectionAgent);
        var emailAssistantExecutor = new EmailAssistantExecutor(emailAssistantAgent);
        var sendEmailExecutor = new SendEmailExecutor();
        var handleSpamExecutor = new HandleSpamExecutor();

        // Build the workflow with conditional edges
        var workflow = new WorkflowBuilder(spamDetectionExecutor)
            // Non-spam path: route to email assistant when IsSpam = false
            .AddEdge(spamDetectionExecutor, emailAssistantExecutor, condition: GetCondition(expectedResult: false))
            .AddEdge(emailAssistantExecutor, sendEmailExecutor)
            // Spam path: route to spam handler when IsSpam = true
            .AddEdge(spamDetectionExecutor, handleSpamExecutor, condition: GetCondition(expectedResult: true))
            .WithOutputFrom(handleSpamExecutor, sendEmailExecutor)
            .Build();

        // Execute the workflow with sample spam email
        string emailContent = "Congratulations! You've won $1,000,000! Click here to claim your prize now!";
        StreamingRun run = await InProcessExecution.StreamAsync(workflow, new ChatMessage(ChatRole.User, emailContent));
        await run.TrySendMessageAsync(new TurnToken(emitEvents: true));

        await foreach (WorkflowEvent evt in run.WatchStreamAsync().ConfigureAwait(false))
        {
            if (evt is WorkflowOutputEvent outputEvent)
            {
                Console.WriteLine($"{outputEvent}");
            }
        }
    }
}

Funktionsweise

  1. Workfloweintrag: Der Workflow beginnt mit spamDetectionExecutor dem Empfangen eines ChatMessage.

  2. Spamanalyse: Der Spamerkennungs-Agent analysiert die E-Mail und gibt eine strukturierte DetectionResult mit IsSpam und Reason Eigenschaften zurück.

  3. Bedingtes Routing: Basierend auf dem IsSpam Wert:

    • Bei Spam (IsSpam = true): Leitet an HandleSpamExecutor weiter mit GetCondition(true)
    • Falls legitim (IsSpam = false): Routen zu EmailAssistantExecutor mit GetCondition(false)
  4. Antwortgenerierung: Für legitime E-Mails erstellt der E-Mail-Assistent eine professionelle Antwort.

  5. Endgültige Ausgabe: Der Workflow liefert entweder eine Spambenachrichtigung oder sendet die entwurfsierte E-Mail-Antwort.

Wichtige Merkmale bedingter Kanten

  1. Type-Safe Bedingungen: Die GetCondition Methode erstellt wiederverwendbare Bedingungsfunktionen, die Inhalte von Nachrichten sicher auswerten.

  2. Mehrere Pfade: Ein einzelner Executor kann mehrere ausgehende Kanten mit unterschiedlichen Bedingungen aufweisen, wodurch komplexe Verzweigungslogik ermöglicht wird.

  3. Freigegebener Status: E-Mail-Daten werden über die Bereichsstatusverwaltung hinweg beibehalten, sodass nachgeschaltete Executoren auf ursprüngliche Inhalte zugreifen können.

  4. Fehlerbehandlung: Executors überprüfen ihre Eingaben und lösen aussagekräftige Ausnahmen aus, wenn unerwartete Nachrichtentypen empfangen werden.

  5. Saubere Architektur: Jeder Executor hat eine einzige Verantwortung, wodurch der Workflow verwaltet und getestet werden kann.

Ausführen des Beispiels

Wenn Sie diesen Workflow mit der Beispielspam-E-Mail ausführen:

Email marked as spam: This email contains common spam indicators including monetary prizes, urgency tactics, and suspicious links that are typical of phishing attempts.

Versuchen Sie, den E-Mail-Inhalt in etwas Legitimes zu ändern:

string emailContent = "Hi, I wanted to follow up on our meeting yesterday and get your thoughts on the project proposal.";

Der Workflow leitet stattdessen an den E-Mail-Assistenten weiter und generiert stattdessen eine professionelle Antwort.

Dieses bedingte Routingmuster bildet die Grundlage für die Erstellung komplexer Workflows, die komplexe Entscheidungsstrukturen und Geschäftslogik verarbeiten können.

Vollständige Implementierung

Die vollständige funktionierende Implementierung finden Sie in diesem Beispiel im Agent Framework-Repository.

Was Sie erstellen werden

Sie erstellen einen E-Mail-Verarbeitungsworkflow, der das bedingte Routing veranschaulicht:

  • Ein Spamerkennungs-Agent, der eingehende E-Mails analysiert
  • Bedingte Kanten, die E-Mails basierend auf der Klassifizierung an verschiedene Handler weiterleiten
  • Ein legitimer E-Mail-Handler, der professionelle Antworten erstellt
  • Ein Spamhandler, der verdächtige E-Mails markiert

Behandelte Konzepte

Voraussetzungen

  • Python 3.10 oder höher
  • Agent Framework installiert: pip install agent-framework-core --pre
  • Azure OpenAI-Dienst mit ordnungsgemäßen Umgebungsvariablen konfiguriert
  • Azure CLI-Authentifizierung: az login

Schritt 1: Importieren erforderlicher Abhängigkeiten

Importieren Sie zunächst die erforderlichen Komponenten für bedingte Workflows:

import asyncio
import os
from dataclasses import dataclass
from typing import Any, Literal
from uuid import uuid4

from typing_extensions import Never

from agent_framework import (
    AgentExecutor,
    AgentExecutorRequest,
    AgentExecutorResponse,
    ChatMessage,
    Role,
    WorkflowBuilder,
    WorkflowContext,
    executor,
    Case,
    Default,
)
from agent_framework.azure import AzureOpenAIChatClient
from azure.identity import AzureCliCredential
from pydantic import BaseModel

Schritt 2: Definieren von Datenmodellen

Erstellen Sie Pydantische Modelle für den strukturierten Datenaustausch zwischen Workflowkomponenten:

class DetectionResult(BaseModel):
    """Represents the result of spam detection."""
    # is_spam drives the routing decision taken by edge conditions
    is_spam: bool
    # Human readable rationale from the detector
    reason: str
    # The agent must include the original email so downstream agents can operate without reloading content
    email_content: str


class EmailResponse(BaseModel):
    """Represents the response from the email assistant."""
    # The drafted reply that a user could copy or send
    response: str

Schritt 3: Erstellen von Bedingungsfunktionen

Definieren Sie Bedingungsfunktionen, die Routingentscheidungen bestimmen:

def get_condition(expected_result: bool):
    """Create a condition callable that routes based on DetectionResult.is_spam."""

    # The returned function will be used as an edge predicate.
    # It receives whatever the upstream executor produced.
    def condition(message: Any) -> bool:
        # Defensive guard. If a non AgentExecutorResponse appears, let the edge pass to avoid dead ends.
        if not isinstance(message, AgentExecutorResponse):
            return True

        try:
            # Prefer parsing a structured DetectionResult from the agent JSON text.
            # Using model_validate_json ensures type safety and raises if the shape is wrong.
            detection = DetectionResult.model_validate_json(message.agent_run_response.text)
            # Route only when the spam flag matches the expected path.
            return detection.is_spam == expected_result
        except Exception:
            # Fail closed on parse errors so we do not accidentally route to the wrong path.
            # Returning False prevents this edge from activating.
            return False

    return condition

Schritt 4: Erstellen von Handler-Exekutoren

Definieren von Executors zur Behandlung unterschiedlicher Routing-Ergebnisse

@executor(id="send_email")
async def handle_email_response(response: AgentExecutorResponse, ctx: WorkflowContext[Never, str]) -> None:
    """Handle legitimate emails by drafting a professional response."""
    # Downstream of the email assistant. Parse a validated EmailResponse and yield the workflow output.
    email_response = EmailResponse.model_validate_json(response.agent_run_response.text)
    await ctx.yield_output(f"Email sent:\n{email_response.response}")


@executor(id="handle_spam")
async def handle_spam_classifier_response(response: AgentExecutorResponse, ctx: WorkflowContext[Never, str]) -> None:
    """Handle spam emails by marking them appropriately."""
    # Spam path. Confirm the DetectionResult and yield the workflow output. Guard against accidental non spam input.
    detection = DetectionResult.model_validate_json(response.agent_run_response.text)
    if detection.is_spam:
        await ctx.yield_output(f"Email marked as spam: {detection.reason}")
    else:
        # This indicates the routing predicate and executor contract are out of sync.
        raise RuntimeError("This executor should only handle spam messages.")


@executor(id="to_email_assistant_request")
async def to_email_assistant_request(
    response: AgentExecutorResponse, ctx: WorkflowContext[AgentExecutorRequest]
) -> None:
    """Transform spam detection response into a request for the email assistant."""
    # Parse the detection result and extract the email content for the assistant
    detection = DetectionResult.model_validate_json(response.agent_run_response.text)

    # Create a new request for the email assistant with the original email content
    request = AgentExecutorRequest(
        messages=[ChatMessage(Role.USER, text=detection.email_content)],
        should_respond=True
    )
    await ctx.send_message(request)

Schritt 5: Erstellen von KI-Agents

Richten Sie die Azure OpenAI-Agents mit strukturierter Ausgabeformatierung ein:

async def main() -> None:
    # Create agents
    # AzureCliCredential uses your current az login. This avoids embedding secrets in code.
    chat_client = AzureOpenAIChatClient(credential=AzureCliCredential())

    # Agent 1. Classifies spam and returns a DetectionResult object.
    # response_format enforces that the LLM returns parsable JSON for the Pydantic model.
    spam_detection_agent = AgentExecutor(
        chat_client.create_agent(
            instructions=(
                "You are a spam detection assistant that identifies spam emails. "
                "Always return JSON with fields is_spam (bool), reason (string), and email_content (string). "
                "Include the original email content in email_content."
            ),
            response_format=DetectionResult,
        ),
        id="spam_detection_agent",
    )

    # Agent 2. Drafts a professional reply. Also uses structured JSON output for reliability.
    email_assistant_agent = AgentExecutor(
        chat_client.create_agent(
            instructions=(
                "You are an email assistant that helps users draft professional responses to emails. "
                "Your input might be a JSON object that includes 'email_content'; base your reply on that content. "
                "Return JSON with a single field 'response' containing the drafted reply."
            ),
            response_format=EmailResponse,
        ),
        id="email_assistant_agent",
    )

Schritt 6: Erstellen des bedingten Workflows

Erstellen Sie einen Workflow mit bedingten Kanten, die basierend auf den Ergebnissen der Spamerkennung weitergeleitet werden:

    # Build the workflow graph.
    # Start at the spam detector.
    # If not spam, hop to a transformer that creates a new AgentExecutorRequest,
    # then call the email assistant, then finalize.
    # If spam, go directly to the spam handler and finalize.
    workflow = (
        WorkflowBuilder()
        .set_start_executor(spam_detection_agent)
        # Not spam path: transform response -> request for assistant -> assistant -> send email
        .add_edge(spam_detection_agent, to_email_assistant_request, condition=get_condition(False))
        .add_edge(to_email_assistant_request, email_assistant_agent)
        .add_edge(email_assistant_agent, handle_email_response)
        # Spam path: send to spam handler
        .add_edge(spam_detection_agent, handle_spam_classifier_response, condition=get_condition(True))
        .build()
    )

Schritt 7: Ausführen des Workflows

Führen Sie den Workflow mit Beispiel-E-Mail-Inhalten aus:

    # Read Email content from the sample resource file.
    # This keeps the sample deterministic since the model sees the same email every run.
    email_path = os.path.join(os.path.dirname(os.path.dirname(os.path.realpath(__file__))), "resources", "email.txt")

    with open(email_path) as email_file:  # noqa: ASYNC230
        email = email_file.read()

    # Execute the workflow. Since the start is an AgentExecutor, pass an AgentExecutorRequest.
    # The workflow completes when it becomes idle (no more work to do).
    request = AgentExecutorRequest(messages=[ChatMessage(Role.USER, text=email)], should_respond=True)
    events = await workflow.run(request)
    outputs = events.get_outputs()
    if outputs:
        print(f"Workflow output: {outputs[0]}")


if __name__ == "__main__":
    asyncio.run(main())

Funktionsweise von bedingten Kanten

  1. Bedingungsfunktionen: Die get_condition() Funktion erstellt ein Prädikat, das den Nachrichteninhalt untersucht und zurückgibt True oder False ob der Rand durchlaufen werden soll.

  2. Nachrichtenüberprüfung: Bedingungen können jeden Aspekt der Nachricht prüfen, einschließlich strukturierter Daten aus Agentantworten, die mit Pydantischen Modellen analysiert wurden.

  3. Defensive Programmierung: Die Bedingungsfunktion enthält fehlerbehandlung, um Routingfehler beim Analysieren strukturierter Daten zu verhindern.

  4. Dynamisches Routing: Basierend auf dem Spamerkennungsergebnis werden E-Mails automatisch an den E-Mail-Assistenten (für legitime E-Mails) oder den Spamhandler (für verdächtige E-Mails) weitergeleitet.

Wichtige Konzepte

  • Edgebedingungen: Boolesche Prädikate, die bestimmen, ob ein Rand durchlaufen werden soll
  • Strukturierte Ausgaben: Die Verwendung von Pydantic-Modellen mit response_format sorgt für zuverlässiges Parsing von Daten.
  • Defensive Routing: Bedingungsfunktionen behandeln Randfälle, um Workflow-Sackgassen zu verhindern
  • Nachrichtentransformation: Executors können Nachrichtentypen zwischen Workflowschritten transformieren.

Vollständige Implementierung

Die vollständige Arbeitsimplementierung finden Sie im edge_condition.py Beispiel im Agent Framework-Repository.

Switch-Case Verzweigungen

Erstellen von bedingten Kanten

Im vorherigen Beispiel für bedingte Kanten wurde das bidirektionale Routing (Spam und legitime E-Mails) veranschaulicht. Viele reale Szenarien erfordern jedoch anspruchsvollere Entscheidungsstrukturen. Switch-Case-Kanten bieten eine übersichtlichere, wartungsfreundlichere Lösung, wenn Sie basierend auf unterschiedlichen Bedingungen zu mehreren Zielen geleitet werden müssen.

Was Sie mit Switch-Case entwickeln werden

Sie erweitern den E-Mail-Verarbeitungsworkflow, um drei Entscheidungspfade zu verarbeiten:

  • NotSpam → E-Mail-Assistent → Senden von E-Mails
  • Spam -→ Behandeln von Spamausführern
  • Unsicher → Unsicheren Executor behandeln (Standardfall)

Die wichtigste Verbesserung ist die Verwendung des SwitchBuilder Musters anstelle mehrerer einzelner bedingter Kanten, wodurch der Workflow leichter zu verstehen und zu verwalten ist, wenn die Entscheidungskomplexität wächst.

Behandelte Konzepte

Datenmodelle für Switch-Case

Aktualisieren Sie Ihre Datenmodelle, um die Drei-Wege-Klassifizierung zu unterstützen:

/// <summary>
/// Represents the possible decisions for spam detection.
/// </summary>
public enum SpamDecision
{
    NotSpam,
    Spam,
    Uncertain
}

/// <summary>
/// Represents the result of spam detection with enhanced decision support.
/// </summary>
public sealed class DetectionResult
{
    [JsonPropertyName("spam_decision")]
    [JsonConverter(typeof(JsonStringEnumConverter))]
    public SpamDecision spamDecision { get; set; }

    [JsonPropertyName("reason")]
    public string Reason { get; set; } = string.Empty;

    // Email ID is generated by the executor, not the agent
    [JsonIgnore]
    public string EmailId { get; set; } = string.Empty;
}

/// <summary>
/// Represents an email stored in shared state.
/// </summary>
internal sealed class Email
{
    [JsonPropertyName("email_id")]
    public string EmailId { get; set; } = string.Empty;

    [JsonPropertyName("email_content")]
    public string EmailContent { get; set; } = string.Empty;
}

/// <summary>
/// Represents the response from the email assistant.
/// </summary>
public sealed class EmailResponse
{
    [JsonPropertyName("response")]
    public string Response { get; set; } = string.Empty;
}

/// <summary>
/// Constants for shared state scopes.
/// </summary>
internal static class EmailStateConstants
{
    public const string EmailStateScope = "EmailState";
}

Bedingungsfabrik für switch-case

Erstellen Sie eine wiederverwendbare Bedingungsfactory, die Prädikate für jede Spamentscheidung generiert:

/// <summary>
/// Creates a condition for routing messages based on the expected spam detection result.
/// </summary>
/// <param name="expectedDecision">The expected spam detection decision</param>
/// <returns>A function that evaluates whether a message meets the expected result</returns>
private static Func<object?, bool> GetCondition(SpamDecision expectedDecision) =>
    detectionResult => detectionResult is DetectionResult result && result.spamDecision == expectedDecision;

Dieser Fabrikansatz:

  • Reduziert die Codeduplizierung: Eine Funktion generiert alle Bedingungs-Prädikate.
  • Stellt Konsistenz sicher: Alle Bedingungen folgen demselben Muster.
  • Vereinfacht die Wartung: Änderungen an der Bedingungslogik erfolgen an einer zentralen Stelle.

Erweiterter KI-Agent

Aktualisieren Sie den Spamerkennungs-Agent so, dass er weniger sicher ist und Drei-Wege-Klassifizierungen zurückgibt:

/// <summary>
/// Creates a spam detection agent with enhanced uncertainty handling.
/// </summary>
/// <returns>A ChatClientAgent configured for three-way spam detection</returns>
private static ChatClientAgent GetSpamDetectionAgent(IChatClient chatClient) =>
    new(chatClient, new ChatClientAgentOptions(instructions: "You are a spam detection assistant that identifies spam emails. Be less confident in your assessments.")
    {
        ChatOptions = new()
        {
            ResponseFormat = ChatResponseFormat.ForJsonSchema<DetectionResult>()
        }
    });

/// <summary>
/// Creates an email assistant agent (unchanged from conditional edges example).
/// </summary>
/// <returns>A ChatClientAgent configured for email assistance</returns>
private static ChatClientAgent GetEmailAssistantAgent(IChatClient chatClient) =>
    new(chatClient, new ChatClientAgentOptions(instructions: "You are an email assistant that helps users draft responses to emails with professionalism.")
    {
        ChatOptions = new()
        {
            ResponseFormat = ChatResponseFormat.ForJsonSchema<EmailResponse>()
        }
    });

Workflowausführer mit erweitertem Routing

Implementieren Sie Ausführungsinstanzen, die das Drei-Wege-Routing mit der Verwaltung eines gemeinsamen Zustands bearbeiten.

/// <summary>
/// Executor that detects spam using an AI agent with three-way classification.
/// </summary>
internal sealed class SpamDetectionExecutor : Executor<ChatMessage, DetectionResult>
{
    private readonly AIAgent _spamDetectionAgent;

    public SpamDetectionExecutor(AIAgent spamDetectionAgent) : base("SpamDetectionExecutor")
    {
        this._spamDetectionAgent = spamDetectionAgent;
    }

    public override async ValueTask<DetectionResult> HandleAsync(ChatMessage message, IWorkflowContext context, CancellationToken cancellationToken = default)
    {
        // Generate a random email ID and store the email content in shared state
        var newEmail = new Email
        {
            EmailId = Guid.NewGuid().ToString("N"),
            EmailContent = message.Text
        };
        await context.QueueStateUpdateAsync(newEmail.EmailId, newEmail, scopeName: EmailStateConstants.EmailStateScope);

        // Invoke the agent for enhanced spam detection
        var response = await this._spamDetectionAgent.RunAsync(message);
        var detectionResult = JsonSerializer.Deserialize<DetectionResult>(response.Text);

        detectionResult!.EmailId = newEmail.EmailId;
        return detectionResult;
    }
}

/// <summary>
/// Executor that assists with email responses using an AI agent.
/// </summary>
internal sealed class EmailAssistantExecutor : Executor<DetectionResult, EmailResponse>
{
    private readonly AIAgent _emailAssistantAgent;

    public EmailAssistantExecutor(AIAgent emailAssistantAgent) : base("EmailAssistantExecutor")
    {
        this._emailAssistantAgent = emailAssistantAgent;
    }

    public override async ValueTask<EmailResponse> HandleAsync(DetectionResult message, IWorkflowContext context, CancellationToken cancellationToken = default)
    {
        if (message.spamDecision == SpamDecision.Spam)
        {
            throw new ArgumentException("This executor should only handle non-spam messages.");
        }

        // Retrieve the email content from shared state
        var email = await context.ReadStateAsync<Email>(message.EmailId, scopeName: EmailStateConstants.EmailStateScope);

        // Invoke the agent to draft a response
        var response = await this._emailAssistantAgent.RunAsync(email!.EmailContent);
        var emailResponse = JsonSerializer.Deserialize<EmailResponse>(response.Text);

        return emailResponse!;
    }
}

/// <summary>
/// Executor that sends emails.
/// </summary>
internal sealed class SendEmailExecutor : Executor<EmailResponse>
{
    public SendEmailExecutor() : base("SendEmailExecutor") { }

    public override async ValueTask HandleAsync(EmailResponse message, IWorkflowContext context, CancellationToken cancellationToken = default) =>
        await context.YieldOutputAsync($"Email sent: {message.Response}").ConfigureAwait(false);
}

/// <summary>
/// Executor that handles spam messages.
/// </summary>
internal sealed class HandleSpamExecutor : Executor<DetectionResult>
{
    public HandleSpamExecutor() : base("HandleSpamExecutor") { }

    public override async ValueTask HandleAsync(DetectionResult message, IWorkflowContext context, CancellationToken cancellationToken = default)
    {
        if (message.spamDecision == SpamDecision.Spam)
        {
            await context.YieldOutputAsync($"Email marked as spam: {message.Reason}").ConfigureAwait(false);
        }
        else
        {
            throw new ArgumentException("This executor should only handle spam messages.");
        }
    }
}

/// <summary>
/// Executor that handles uncertain emails requiring manual review.
/// </summary>
internal sealed class HandleUncertainExecutor : Executor<DetectionResult>
{
    public HandleUncertainExecutor() : base("HandleUncertainExecutor") { }

    public override async ValueTask HandleAsync(DetectionResult message, IWorkflowContext context, CancellationToken cancellationToken = default)
    {
        if (message.spamDecision == SpamDecision.Uncertain)
        {
            var email = await context.ReadStateAsync<Email>(message.EmailId, scopeName: EmailStateConstants.EmailStateScope);
            await context.YieldOutputAsync($"Email marked as uncertain: {message.Reason}. Email content: {email?.EmailContent}");
        }
        else
        {
            throw new ArgumentException("This executor should only handle uncertain spam decisions.");
        }
    }
}

Workflow mit Switch-Case-Struktur erstellen

Ersetzen Sie mehrere bedingte Kanten durch das übersichtlichere Switch-Case-Muster:

public static class Program
{
    private static async Task Main()
    {
        // Set up the Azure OpenAI client
        var endpoint = Environment.GetEnvironmentVariable("AZURE_OPENAI_ENDPOINT") ?? throw new Exception("AZURE_OPENAI_ENDPOINT is not set.");
        var deploymentName = Environment.GetEnvironmentVariable("AZURE_OPENAI_DEPLOYMENT_NAME") ?? "gpt-4o-mini";
        var chatClient = new AzureOpenAIClient(new Uri(endpoint), new AzureCliCredential()).GetChatClient(deploymentName).AsIChatClient();

        // Create agents
        AIAgent spamDetectionAgent = GetSpamDetectionAgent(chatClient);
        AIAgent emailAssistantAgent = GetEmailAssistantAgent(chatClient);

        // Create executors
        var spamDetectionExecutor = new SpamDetectionExecutor(spamDetectionAgent);
        var emailAssistantExecutor = new EmailAssistantExecutor(emailAssistantAgent);
        var sendEmailExecutor = new SendEmailExecutor();
        var handleSpamExecutor = new HandleSpamExecutor();
        var handleUncertainExecutor = new HandleUncertainExecutor();

        // Build the workflow using switch-case for cleaner three-way routing
        WorkflowBuilder builder = new(spamDetectionExecutor);
        builder.AddSwitch(spamDetectionExecutor, switchBuilder =>
            switchBuilder
            .AddCase(
                GetCondition(expectedDecision: SpamDecision.NotSpam),
                emailAssistantExecutor
            )
            .AddCase(
                GetCondition(expectedDecision: SpamDecision.Spam),
                handleSpamExecutor
            )
            .WithDefault(
                handleUncertainExecutor
            )
        )
        // After the email assistant writes a response, it will be sent to the send email executor
        .AddEdge(emailAssistantExecutor, sendEmailExecutor)
        .WithOutputFrom(handleSpamExecutor, sendEmailExecutor, handleUncertainExecutor);

        var workflow = builder.Build();

        // Read an email from a text file (use ambiguous content for demonstration)
        string email = Resources.Read("ambiguous_email.txt");

        // Execute the workflow
        StreamingRun run = await InProcessExecution.StreamAsync(workflow, new ChatMessage(ChatRole.User, email));
        await run.TrySendMessageAsync(new TurnToken(emitEvents: true));
        await foreach (WorkflowEvent evt in run.WatchStreamAsync().ConfigureAwait(false))
        {
            if (evt is WorkflowOutputEvent outputEvent)
            {
                Console.WriteLine($"{outputEvent}");
            }
        }
    }
}

Switch-Case Vorteile

  1. Übersichtlichere Syntax: Dies SwitchBuilder bietet eine besser lesbare Alternative zu mehreren bedingten Kanten
  2. Sortierte Auswertung: Fälle werden sequenziell ausgewertet und bei der ersten Übereinstimmung beendet.
  3. Garantiertes Routing: Die WithDefault() Methode stellt sicher, dass Nachrichten nie hängen bleiben
  4. Bessere Wartung: Das Hinzufügen neuer Fälle erfordert minimale Änderungen an der Workflowstruktur.
  5. Typsicherheit: Jeder Executor überprüft seine Eingabe, um Routingfehler frühzeitig abzufangen.

Mustervergleich

Vorher (Bedingte Kanten):

var workflow = new WorkflowBuilder(spamDetectionExecutor)
    .AddEdge(spamDetectionExecutor, emailAssistantExecutor, condition: GetCondition(expectedResult: false))
    .AddEdge(spamDetectionExecutor, handleSpamExecutor, condition: GetCondition(expectedResult: true))
    // No clean way to handle a third case
    .WithOutputFrom(handleSpamExecutor, sendEmailExecutor)
    .Build();

Nach (Switch-Case):

WorkflowBuilder builder = new(spamDetectionExecutor);
builder.AddSwitch(spamDetectionExecutor, switchBuilder =>
    switchBuilder
    .AddCase(GetCondition(SpamDecision.NotSpam), emailAssistantExecutor)
    .AddCase(GetCondition(SpamDecision.Spam), handleSpamExecutor)
    .WithDefault(handleUncertainExecutor)  // Clean default case
)
// Continue building the rest of the workflow

Das Switch-Case-Muster wird wesentlich besser skaliert, da die Anzahl der Routingentscheidungen wächst, und der Standardfall bietet ein Sicherheitsnetz für unerwartete Werte.

Ausführen des Beispiels

Wenn Sie diesen Workflow mit mehrdeutigen E-Mail-Inhalten ausführen:

Email marked as uncertain: This email contains promotional language but might be from a legitimate business contact, requiring human review for proper classification.

Versuchen Sie, den E-Mail-Inhalt in etwas eindeutig Spam oder eindeutig legitim zu ändern, um die verschiedenen Routingpfade in Aktion zu erleben.

Vollständige Implementierung

Die vollständige funktionierende Implementierung finden Sie in diesem Beispiel im Agent Framework-Repository.

Erstellen von bedingten Kanten

Im vorherigen Beispiel für bedingte Kanten wurde das bidirektionale Routing (Spam und legitime E-Mails) veranschaulicht. Viele reale Szenarien erfordern jedoch anspruchsvollere Entscheidungsstrukturen. Switch-Case-Kanten bieten eine übersichtlichere, wartungsfreundlichere Lösung, wenn Sie basierend auf unterschiedlichen Bedingungen zu mehreren Zielen geleitet werden müssen.

Was Sie als Nächstes erstellen

Sie erweitern den E-Mail-Verarbeitungsworkflow, um drei Entscheidungspfade zu verarbeiten:

  • NotSpam → E-Mail-Assistent → Senden von E-Mails
  • Spam → Als Spam markieren
  • Unsicher → Flag für manuelle Überprüfung (Standardfall)

Die Hauptverbesserung besteht darin, anstelle mehrerer einzelner bedingter Kanten eine einzige Switch-Case-Edgegruppe zu verwenden, wodurch der Workflow mit zunehmender Entscheidungskomplexität einfacher zu verstehen und zu verwalten ist.

Behandelte Konzepte

Erweiterte Datenmodelle

Aktualisieren Sie Ihre Datenmodelle, um die Drei-Wege-Klassifizierung zu unterstützen:

from typing import Literal

class DetectionResultAgent(BaseModel):
    """Structured output returned by the spam detection agent."""

    # The agent classifies the email into one of three categories
    spam_decision: Literal["NotSpam", "Spam", "Uncertain"]
    reason: str

class EmailResponse(BaseModel):
    """Structured output returned by the email assistant agent."""

    response: str

@dataclass
class DetectionResult:
    """Internal typed payload used for routing and downstream handling."""

    spam_decision: str
    reason: str
    email_id: str

@dataclass
class Email:
    """In memory record of the email content stored in shared state."""

    email_id: str
    email_content: str

Switch-Case Bedingungsfactory

Erstellen Sie eine wiederverwendbare Bedingungsfactory, die Prädikate für jede Spamentscheidung generiert:

def get_case(expected_decision: str):
    """Factory that returns a predicate matching a specific spam_decision value."""

    def condition(message: Any) -> bool:
        # Only match when the upstream payload is a DetectionResult with the expected decision
        return isinstance(message, DetectionResult) and message.spam_decision == expected_decision

    return condition

Dieser Fabrikansatz:

  • Reduziert die Codeduplizierung: Eine Funktion generiert alle Bedingungs-Prädikate.
  • Stellt Konsistenz sicher: Alle Bedingungen folgen demselben Muster.
  • Vereinfacht die Wartung: Änderungen an der Bedingungslogik erfolgen an einer zentralen Stelle.

Workflowausführer mit freigegebenem Status

Implementieren Sie Executoren, die den freigegebenen Zustand verwenden, um zu vermeiden, dass große E-Mail-Inhalte über jeden Workflowschritt übergeben werden:

EMAIL_STATE_PREFIX = "email:"
CURRENT_EMAIL_ID_KEY = "current_email_id"

@executor(id="store_email")
async def store_email(email_text: str, ctx: WorkflowContext[AgentExecutorRequest]) -> None:
    """Store email content once and pass around a lightweight ID reference."""

    # Persist the raw email content in shared state
    new_email = Email(email_id=str(uuid4()), email_content=email_text)
    await ctx.set_shared_state(f"{EMAIL_STATE_PREFIX}{new_email.email_id}", new_email)
    await ctx.set_shared_state(CURRENT_EMAIL_ID_KEY, new_email.email_id)

    # Forward email to spam detection agent
    await ctx.send_message(
        AgentExecutorRequest(messages=[ChatMessage(Role.USER, text=new_email.email_content)], should_respond=True)
    )

@executor(id="to_detection_result")
async def to_detection_result(response: AgentExecutorResponse, ctx: WorkflowContext[DetectionResult]) -> None:
    """Transform agent response into a typed DetectionResult with email ID."""

    # Parse the agent's structured JSON output
    parsed = DetectionResultAgent.model_validate_json(response.agent_run_response.text)
    email_id: str = await ctx.get_shared_state(CURRENT_EMAIL_ID_KEY)

    # Create typed message for switch-case routing
    await ctx.send_message(DetectionResult(
        spam_decision=parsed.spam_decision,
        reason=parsed.reason,
        email_id=email_id
    ))

@executor(id="submit_to_email_assistant")
async def submit_to_email_assistant(detection: DetectionResult, ctx: WorkflowContext[AgentExecutorRequest]) -> None:
    """Handle NotSpam emails by forwarding to the email assistant."""

    # Guard against misrouting
    if detection.spam_decision != "NotSpam":
        raise RuntimeError("This executor should only handle NotSpam messages.")

    # Retrieve original email content from shared state
    email: Email = await ctx.get_shared_state(f"{EMAIL_STATE_PREFIX}{detection.email_id}")
    await ctx.send_message(
        AgentExecutorRequest(messages=[ChatMessage(Role.USER, text=email.email_content)], should_respond=True)
    )

@executor(id="finalize_and_send")
async def finalize_and_send(response: AgentExecutorResponse, ctx: WorkflowContext[Never, str]) -> None:
    """Parse email assistant response and yield final output."""

    parsed = EmailResponse.model_validate_json(response.agent_run_response.text)
    await ctx.yield_output(f"Email sent: {parsed.response}")

@executor(id="handle_spam")
async def handle_spam(detection: DetectionResult, ctx: WorkflowContext[Never, str]) -> None:
    """Handle confirmed spam emails."""

    if detection.spam_decision == "Spam":
        await ctx.yield_output(f"Email marked as spam: {detection.reason}")
    else:
        raise RuntimeError("This executor should only handle Spam messages.")

@executor(id="handle_uncertain")
async def handle_uncertain(detection: DetectionResult, ctx: WorkflowContext[Never, str]) -> None:
    """Handle uncertain classifications that need manual review."""

    if detection.spam_decision == "Uncertain":
        # Include original content for human review
        email: Email | None = await ctx.get_shared_state(f"{EMAIL_STATE_PREFIX}{detection.email_id}")
        await ctx.yield_output(
            f"Email marked as uncertain: {detection.reason}. Email content: {getattr(email, 'email_content', '')}"
        )
    else:
        raise RuntimeError("This executor should only handle Uncertain messages.")

Erstellen eines erweiterten KI-Agents

Aktualisieren Sie den Spamerkennungs-Agent so, dass er weniger sicher ist und Drei-Wege-Klassifizierungen zurückgibt:

async def main():
    chat_client = AzureOpenAIChatClient(credential=AzureCliCredential())

    # Enhanced spam detection agent with three-way classification
    spam_detection_agent = AgentExecutor(
        chat_client.create_agent(
            instructions=(
                "You are a spam detection assistant that identifies spam emails. "
                "Be less confident in your assessments. "
                "Always return JSON with fields 'spam_decision' (one of NotSpam, Spam, Uncertain) "
                "and 'reason' (string)."
            ),
            response_format=DetectionResultAgent,
        ),
        id="spam_detection_agent",
    )

    # Email assistant remains the same
    email_assistant_agent = AgentExecutor(
        chat_client.create_agent(
            instructions=(
                "You are an email assistant that helps users draft responses to emails with professionalism."
            ),
            response_format=EmailResponse,
        ),
        id="email_assistant_agent",
    )

Erstellen eines Workflows mit der Switch-Case-Edge-Gruppe

Ersetzen Sie mehrere bedingte Kanten durch eine einzelne Switch-Case-Gruppe:

    # Build workflow using switch-case for cleaner three-way routing
    workflow = (
        WorkflowBuilder()
        .set_start_executor(store_email)
        .add_edge(store_email, spam_detection_agent)
        .add_edge(spam_detection_agent, to_detection_result)
        .add_switch_case_edge_group(
            to_detection_result,
            [
                # Explicit cases for specific decisions
                Case(condition=get_case("NotSpam"), target=submit_to_email_assistant),
                Case(condition=get_case("Spam"), target=handle_spam),
                # Default case catches anything that doesn't match above
                Default(target=handle_uncertain),
            ],
        )
        .add_edge(submit_to_email_assistant, email_assistant_agent)
        .add_edge(email_assistant_agent, finalize_and_send)
        .build()
    )

Ausführen und Testen

Führen Sie den Workflow mit mehrdeutigen E-Mail-Inhalten aus, die das dreiseitige Routing veranschaulicht:

    # Use ambiguous email content that might trigger uncertain classification
    email = (
        "Hey there, I noticed you might be interested in our latest offer—no pressure, but it expires soon. "
        "Let me know if you'd like more details."
    )

    # Execute and display results
    events = await workflow.run(email)
    outputs = events.get_outputs()
    if outputs:
        for output in outputs:
            print(f"Workflow output: {output}")

Wichtige Vorteile von Switch-Case Edges

  1. Übersichtlichere Syntax: Eine Randgruppe anstelle mehrerer bedingter Kanten
  2. Sortierte Auswertung: Fälle werden sequenziell ausgewertet und bei der ersten Übereinstimmung beendet.
  3. Garantiertes Routing: Der Standardfall stellt sicher, dass Nachrichten nie hängen bleiben
  4. Bessere Wartung: Das Hinzufügen neuer Fälle erfordert minimale Änderungen.
  5. Typsicherheit: Jeder Executor überprüft seine Eingabe, um Routingfehler abzufangen.

Vergleich: Bedingt im Vergleich zu Switch-Case

Vorher (Bedingte Kanten):

.add_edge(detector, handler_a, condition=lambda x: x.result == "A")
.add_edge(detector, handler_b, condition=lambda x: x.result == "B")
.add_edge(detector, handler_c, condition=lambda x: x.result == "C")

Nach (Switch-Case):

.add_switch_case_edge_group(
    detector,
    [
        Case(condition=lambda x: x.result == "A", target=handler_a),
        Case(condition=lambda x: x.result == "B", target=handler_b),
        Default(target=handler_c),  # Catches everything else
    ],
)

Das Switch-Case-Muster wird wesentlich besser skaliert, da die Anzahl der Routingentscheidungen wächst, und der Standardfall bietet ein Sicherheitsnetz für unerwartete Werte.

Switch-Case Beispielcode

Die vollständige Arbeitsimplementierung finden Sie im switch_case_edge_group.py Beispiel im Agent Framework-Repository.

Ränder für mehrfache Auswahl

Über Switch-Case hinaus: Multi-Auswahl-Routing

Während Switch-Case-Edges Nachrichten an genau ein Ziel weiterleiten, müssen Workflows in der Praxis häufig mehrere parallele Vorgänge basierend auf Datenmerkmalen auslösen. Partitionierte Kanten (implementiert als Fanout-Kanten mit Partitionierern) ermöglichen anspruchsvolle Fanoutmuster, bei denen eine einzelne Nachricht mehrere nachgeschaltete Executoren gleichzeitig aktivieren kann.

Erweiterter E-Mail-Verarbeitungsworkflow

Basierend auf dem Switch-Case-Beispiel erstellen Sie ein erweitertes E-Mail-Verarbeitungssystem, das komplexe Routinglogik veranschaulicht:

  • Spam-E-Mails → einzelner Spam-Handler (z. B. Switch-Case)
  • Legitime E-Mails → E-Mail-Assistent immer auslösen + Zusammenfassung für lange E-Mails bedingt auslösen
  • Unklare E-Mails → Ein einzelner unklarer Verwalter (z. B. switch-case)
  • Datenbankpersistenz → für kurze E-Mails und zusammengefasste lange E-Mails ausgelöst

Dieses Muster ermöglicht parallele Verarbeitungspipelinen, die sich an Inhaltseigenschaften anpassen.

Behandelte Konzepte

Datenmodelle für Mehrfachauswahl

Erweitern Sie die Datenmodelle, um die Analyse und Zusammenfassung der E-Mail-Länge zu unterstützen:

/// <summary>
/// Represents the result of enhanced email analysis with additional metadata.
/// </summary>
public sealed class AnalysisResult
{
    [JsonPropertyName("spam_decision")]
    [JsonConverter(typeof(JsonStringEnumConverter))]
    public SpamDecision spamDecision { get; set; }

    [JsonPropertyName("reason")]
    public string Reason { get; set; } = string.Empty;

    // Additional properties for sophisticated routing
    [JsonIgnore]
    public int EmailLength { get; set; }

    [JsonIgnore]
    public string EmailSummary { get; set; } = string.Empty;

    [JsonIgnore]
    public string EmailId { get; set; } = string.Empty;
}

/// <summary>
/// Represents the response from the email assistant.
/// </summary>
public sealed class EmailResponse
{
    [JsonPropertyName("response")]
    public string Response { get; set; } = string.Empty;
}

/// <summary>
/// Represents the response from the email summary agent.
/// </summary>
public sealed class EmailSummary
{
    [JsonPropertyName("summary")]
    public string Summary { get; set; } = string.Empty;
}

/// <summary>
/// A custom workflow event for database operations.
/// </summary>
internal sealed class DatabaseEvent(string message) : WorkflowEvent(message) { }

/// <summary>
/// Constants for email processing thresholds.
/// </summary>
public static class EmailProcessingConstants
{
    public const int LongEmailThreshold = 100;
}

Zielzuweisungsfunktion: Das Herzstück der Mehrfachauswahl

Die Zielzuweisungsfunktion bestimmt, welche Executoren jede Nachricht empfangen sollen:

/// <summary>
/// Creates a target assigner for routing messages based on the analysis result.
/// </summary>
/// <returns>A function that takes an analysis result and returns the target partitions.</returns>
private static Func<AnalysisResult?, int, IEnumerable<int>> GetTargetAssigner()
{
    return (analysisResult, targetCount) =>
    {
        if (analysisResult is not null)
        {
            if (analysisResult.spamDecision == SpamDecision.Spam)
            {
                return [0]; // Route only to spam handler (index 0)
            }
            else if (analysisResult.spamDecision == SpamDecision.NotSpam)
            {
                // Always route to email assistant (index 1)
                List<int> targets = [1];

                // Conditionally add summarizer for long emails (index 2)
                if (analysisResult.EmailLength > EmailProcessingConstants.LongEmailThreshold)
                {
                    targets.Add(2);
                }

                return targets;
            }
            else // Uncertain
            {
                return [3]; // Route only to uncertain handler (index 3)
            }
        }
        throw new ArgumentException("Invalid analysis result.");
    };
}

Wichtige Features der Zielzuweisungsfunktion

  1. Dynamische Zielauswahl: Gibt eine Liste der zu aktivierenden Executorindizes zurück.
  2. Inhaltsfähiges Routing: Trifft Entscheidungen basierend auf Nachrichteneigenschaften wie E-Mail-Länge
  3. Parallele Verarbeitung: Mehrere Ziele können gleichzeitig ausgeführt werden
  4. Bedingte Logik: Komplexe Verzweigung basierend auf mehreren Kriterien

Erweiterte Workflow-Executoren

Implementieren Sie Executoren, die die erweiterte Analyse und das Routing behandeln:

/// <summary>
/// Executor that analyzes emails using an AI agent with enhanced analysis.
/// </summary>
internal sealed class EmailAnalysisExecutor : Executor<ChatMessage, AnalysisResult>
{
    private readonly AIAgent _emailAnalysisAgent;

    public EmailAnalysisExecutor(AIAgent emailAnalysisAgent) : base("EmailAnalysisExecutor")
    {
        this._emailAnalysisAgent = emailAnalysisAgent;
    }

    public override async ValueTask<AnalysisResult> HandleAsync(ChatMessage message, IWorkflowContext context, CancellationToken cancellationToken = default)
    {
        // Generate a random email ID and store the email content
        var newEmail = new Email
        {
            EmailId = Guid.NewGuid().ToString("N"),
            EmailContent = message.Text
        };
        await context.QueueStateUpdateAsync(newEmail.EmailId, newEmail, scopeName: EmailStateConstants.EmailStateScope);

        // Invoke the agent for enhanced analysis
        var response = await this._emailAnalysisAgent.RunAsync(message);
        var analysisResult = JsonSerializer.Deserialize<AnalysisResult>(response.Text);

        // Enrich with metadata for routing decisions
        analysisResult!.EmailId = newEmail.EmailId;
        analysisResult.EmailLength = newEmail.EmailContent.Length;

        return analysisResult;
    }
}

/// <summary>
/// Executor that assists with email responses using an AI agent.
/// </summary>
internal sealed class EmailAssistantExecutor : Executor<AnalysisResult, EmailResponse>
{
    private readonly AIAgent _emailAssistantAgent;

    public EmailAssistantExecutor(AIAgent emailAssistantAgent) : base("EmailAssistantExecutor")
    {
        this._emailAssistantAgent = emailAssistantAgent;
    }

    public override async ValueTask<EmailResponse> HandleAsync(AnalysisResult message, IWorkflowContext context, CancellationToken cancellationToken = default)
    {
        if (message.spamDecision == SpamDecision.Spam)
        {
            throw new ArgumentException("This executor should only handle non-spam messages.");
        }

        // Retrieve the email content from shared state
        var email = await context.ReadStateAsync<Email>(message.EmailId, scopeName: EmailStateConstants.EmailStateScope);

        // Invoke the agent to draft a response
        var response = await this._emailAssistantAgent.RunAsync(email!.EmailContent);
        var emailResponse = JsonSerializer.Deserialize<EmailResponse>(response.Text);

        return emailResponse!;
    }
}

/// <summary>
/// Executor that summarizes emails using an AI agent for long emails.
/// </summary>
internal sealed class EmailSummaryExecutor : Executor<AnalysisResult, AnalysisResult>
{
    private readonly AIAgent _emailSummaryAgent;

    public EmailSummaryExecutor(AIAgent emailSummaryAgent) : base("EmailSummaryExecutor")
    {
        this._emailSummaryAgent = emailSummaryAgent;
    }

    public override async ValueTask<AnalysisResult> HandleAsync(AnalysisResult message, IWorkflowContext context, CancellationToken cancellationToken = default)
    {
        // Read the email content from shared state
        var email = await context.ReadStateAsync<Email>(message.EmailId, scopeName: EmailStateConstants.EmailStateScope);

        // Generate summary for long emails
        var response = await this._emailSummaryAgent.RunAsync(email!.EmailContent);
        var emailSummary = JsonSerializer.Deserialize<EmailSummary>(response.Text);

        // Enrich the analysis result with the summary
        message.EmailSummary = emailSummary!.Summary;

        return message;
    }
}

/// <summary>
/// Executor that sends emails.
/// </summary>
internal sealed class SendEmailExecutor : Executor<EmailResponse>
{
    public SendEmailExecutor() : base("SendEmailExecutor") { }

    public override async ValueTask HandleAsync(EmailResponse message, IWorkflowContext context, CancellationToken cancellationToken = default) =>
        await context.YieldOutputAsync($"Email sent: {message.Response}");
}

/// <summary>
/// Executor that handles spam messages.
/// </summary>
internal sealed class HandleSpamExecutor : Executor<AnalysisResult>
{
    public HandleSpamExecutor() : base("HandleSpamExecutor") { }

    public override async ValueTask HandleAsync(AnalysisResult message, IWorkflowContext context, CancellationToken cancellationToken = default)
    {
        if (message.spamDecision == SpamDecision.Spam)
        {
            await context.YieldOutputAsync($"Email marked as spam: {message.Reason}");
        }
        else
        {
            throw new ArgumentException("This executor should only handle spam messages.");
        }
    }
}

/// <summary>
/// Executor that handles uncertain messages requiring manual review.
/// </summary>
internal sealed class HandleUncertainExecutor : Executor<AnalysisResult>
{
    public HandleUncertainExecutor() : base("HandleUncertainExecutor") { }

    public override async ValueTask HandleAsync(AnalysisResult message, IWorkflowContext context, CancellationToken cancellationToken = default)
    {
        if (message.spamDecision == SpamDecision.Uncertain)
        {
            var email = await context.ReadStateAsync<Email>(message.EmailId, scopeName: EmailStateConstants.EmailStateScope);
            await context.YieldOutputAsync($"Email marked as uncertain: {message.Reason}. Email content: {email?.EmailContent}");
        }
        else
        {
            throw new ArgumentException("This executor should only handle uncertain spam decisions.");
        }
    }
}

/// <summary>
/// Executor that handles database access with custom events.
/// </summary>
internal sealed class DatabaseAccessExecutor : Executor<AnalysisResult>
{
    public DatabaseAccessExecutor() : base("DatabaseAccessExecutor") { }

    public override async ValueTask HandleAsync(AnalysisResult message, IWorkflowContext context, CancellationToken cancellationToken = default)
    {
        // Simulate database operations
        await context.ReadStateAsync<Email>(message.EmailId, scopeName: EmailStateConstants.EmailStateScope);
        await Task.Delay(100); // Simulate database access delay

        // Emit custom database event for monitoring
        await context.AddEventAsync(new DatabaseEvent($"Email {message.EmailId} saved to database."));
    }
}

Erweiterte KI-Agenten

Erstellen Sie Agents für Analyse, Unterstützung und Zusammenfassung:

/// <summary>
/// Create an enhanced email analysis agent.
/// </summary>
/// <returns>A ChatClientAgent configured for comprehensive email analysis</returns>
private static ChatClientAgent GetEmailAnalysisAgent(IChatClient chatClient) =>
    new(chatClient, new ChatClientAgentOptions(instructions: "You are a spam detection assistant that identifies spam emails.")
    {
        ChatOptions = new()
        {
            ResponseFormat = ChatResponseFormat.ForJsonSchema<AnalysisResult>()
        }
    });

/// <summary>
/// Creates an email assistant agent.
/// </summary>
/// <returns>A ChatClientAgent configured for email assistance</returns>
private static ChatClientAgent GetEmailAssistantAgent(IChatClient chatClient) =>
    new(chatClient, new ChatClientAgentOptions(instructions: "You are an email assistant that helps users draft responses to emails with professionalism.")
    {
        ChatOptions = new()
        {
            ResponseFormat = ChatResponseFormat.ForJsonSchema<EmailResponse>()
        }
    });

/// <summary>
/// Creates an agent that summarizes emails.
/// </summary>
/// <returns>A ChatClientAgent configured for email summarization</returns>
private static ChatClientAgent GetEmailSummaryAgent(IChatClient chatClient) =>
    new(chatClient, new ChatClientAgentOptions(instructions: "You are an assistant that helps users summarize emails.")
    {
        ChatOptions = new()
        {
            ResponseFormat = ChatResponseFormat.ForJsonSchema<EmailSummary>()
        }
    });

Konstruktion von Workflows mit Mehrfachauswahl

Erstellen Sie den Workflow mit anspruchsvollem Routing und paralleler Verarbeitung:

public static class Program
{
    private static async Task Main()
    {
        // Set up the Azure OpenAI client
        var endpoint = Environment.GetEnvironmentVariable("AZURE_OPENAI_ENDPOINT") ?? throw new Exception("AZURE_OPENAI_ENDPOINT is not set.");
        var deploymentName = Environment.GetEnvironmentVariable("AZURE_OPENAI_DEPLOYMENT_NAME") ?? "gpt-4o-mini";
        var chatClient = new AzureOpenAIClient(new Uri(endpoint), new AzureCliCredential()).GetChatClient(deploymentName).AsIChatClient();

        // Create agents
        AIAgent emailAnalysisAgent = GetEmailAnalysisAgent(chatClient);
        AIAgent emailAssistantAgent = GetEmailAssistantAgent(chatClient);
        AIAgent emailSummaryAgent = GetEmailSummaryAgent(chatClient);

        // Create executors
        var emailAnalysisExecutor = new EmailAnalysisExecutor(emailAnalysisAgent);
        var emailAssistantExecutor = new EmailAssistantExecutor(emailAssistantAgent);
        var emailSummaryExecutor = new EmailSummaryExecutor(emailSummaryAgent);
        var sendEmailExecutor = new SendEmailExecutor();
        var handleSpamExecutor = new HandleSpamExecutor();
        var handleUncertainExecutor = new HandleUncertainExecutor();
        var databaseAccessExecutor = new DatabaseAccessExecutor();

        // Build the workflow with multi-selection fan-out
        WorkflowBuilder builder = new(emailAnalysisExecutor);
        builder.AddFanOutEdge(
            emailAnalysisExecutor,
            targets: [
                handleSpamExecutor,        // Index 0: Spam handler
                emailAssistantExecutor,    // Index 1: Email assistant (always for NotSpam)
                emailSummaryExecutor,      // Index 2: Summarizer (conditionally for long NotSpam)
                handleUncertainExecutor,   // Index 3: Uncertain handler
            ],
            targetSelector: GetTargetAssigner()
        )
        // Email assistant branch
        .AddEdge(emailAssistantExecutor, sendEmailExecutor)

        // Database persistence: conditional routing
        .AddEdge<AnalysisResult>(
            emailAnalysisExecutor,
            databaseAccessExecutor,
            condition: analysisResult => analysisResult?.EmailLength <= EmailProcessingConstants.LongEmailThreshold) // Short emails
        .AddEdge(emailSummaryExecutor, databaseAccessExecutor) // Long emails with summary

        .WithOutputFrom(handleUncertainExecutor, handleSpamExecutor, sendEmailExecutor);

        var workflow = builder.Build();

        // Read a moderately long email to trigger both assistant and summarizer
        string email = Resources.Read("email.txt");

        // Execute the workflow with custom event handling
        StreamingRun run = await InProcessExecution.StreamAsync(workflow, new ChatMessage(ChatRole.User, email));
        await run.TrySendMessageAsync(new TurnToken(emitEvents: true));
        await foreach (WorkflowEvent evt in run.WatchStreamAsync().ConfigureAwait(false))
        {
            if (evt is WorkflowOutputEvent outputEvent)
            {
                Console.WriteLine($"Output: {outputEvent}");
            }

            if (evt is DatabaseEvent databaseEvent)
            {
                Console.WriteLine($"Database: {databaseEvent}");
            }
        }
    }
}

Mustervergleich: Mehrfachauswahl im Vergleich zu Switch-Case

Switch-Case-Struktur (vorherig):

// One input → exactly one output
builder.AddSwitch(spamDetectionExecutor, switchBuilder =>
    switchBuilder
    .AddCase(GetCondition(SpamDecision.NotSpam), emailAssistantExecutor)
    .AddCase(GetCondition(SpamDecision.Spam), handleSpamExecutor)
    .WithDefault(handleUncertainExecutor)
)

Mehrfachauswahlmuster:

// One input → one or more outputs (dynamic fan-out)
builder.AddFanOutEdge(
    emailAnalysisExecutor,
    targets: [handleSpamExecutor, emailAssistantExecutor, emailSummaryExecutor, handleUncertainExecutor],
    targetSelector: GetTargetAssigner() // Returns list of target indices
)

Wichtige Vorteile von Mehrfachauswahl-Rändern

  1. Parallele Verarbeitung: Mehrere Zweige können gleichzeitig ausgeführt werden
  2. Bedingter Fanout: Die Anzahl der Ziele variiert je nach Inhalt.
  3. Inhaltsfähiges Routing: Entscheidungen basierend auf Nachrichteneigenschaften, nicht nur Typ
  4. Effiziente Ressourcennutzung: Nur notwendige Verzweigungen werden aktiviert.
  5. Komplexe Geschäftslogik: Unterstützt anspruchsvolle Routingszenarien

Ausführen des Beispiels für mehrfache Auswahl

Wenn Sie diesen Workflow mit einer langen E-Mail ausführen:

Output: Email sent: [Professional response generated by AI]
Database: Email abc123 saved to database.

Beim Ausführen mit einer kurzen E-Mail wird die Zusammenfassung übersprungen.

Output: Email sent: [Professional response generated by AI]
Database: Email def456 saved to database.

Real-World-Anwendungsfälle

  • E-Mail-Systeme: Weiterleitung zum Antwortassistent + Archiv + Analytics (bedingt)
  • Inhaltsverarbeitung: Triggertranskription + Übersetzung + Analyse (basierend auf Dem Inhaltstyp)
  • Auftragsabwicklung: Weiterleitung zur Erfüllung + Abrechnung + Benachrichtigungen (basierend auf Auftragsmerkmalen)
  • Datenpipelines: Auslösen unterschiedlicher Analyseflüsse basierend auf Datenmerkmalen

Vollständige Implementierung der Mehrfachauswahl

Die vollständige funktionierende Implementierung finden Sie in diesem Beispiel im Agent Framework-Repository.

Über Switch-Case hinaus: Multi-Auswahl-Routing

Während Switch-Case-Edges Nachrichten an genau ein Ziel weiterleiten, müssen Workflows in der Praxis häufig mehrere parallele Vorgänge basierend auf Datenmerkmalen auslösen. Partitionierte Kanten (implementiert als Multiauswahl-Edgegruppen) ermöglichen anspruchsvolle Fanoutmuster, bei denen eine einzelne Nachricht mehrere nachgeschaltete Executoren gleichzeitig aktivieren kann.

Erweiterter E-Mail-Verarbeitungsworkflow

Basierend auf dem Switch-Case-Beispiel erstellen Sie ein erweitertes E-Mail-Verarbeitungssystem, das komplexe Routinglogik veranschaulicht:

  • Spam-E-Mails → einzelner Spam-Handler (z. B. Switch-Case)
  • Legitime E-Mails → E-Mail-Assistent immer auslösen + Zusammenfassung für lange E-Mails bedingt auslösen
  • Unklare E-Mails → Ein einzelner unklarer Verwalter (z. B. switch-case)
  • Datenbankpersistenz → für kurze E-Mails und zusammengefasste lange E-Mails ausgelöst

Dieses Muster ermöglicht parallele Verarbeitungspipelinen, die sich an Inhaltseigenschaften anpassen.

Behandelte Konzepte

Erweiterte Datenmodelle für mehrfache Auswahl

Erweitern Sie die Datenmodelle, um die Analyse und Zusammenfassung der E-Mail-Länge zu unterstützen:

class AnalysisResultAgent(BaseModel):
    """Enhanced structured output from email analysis agent."""

    spam_decision: Literal["NotSpam", "Spam", "Uncertain"]
    reason: str

class EmailResponse(BaseModel):
    """Response from email assistant."""

    response: str

class EmailSummaryModel(BaseModel):
    """Summary generated by email summary agent."""

    summary: str

@dataclass
class AnalysisResult:
    """Internal analysis result with email metadata for routing decisions."""

    spam_decision: str
    reason: str
    email_length: int  # Used for conditional routing
    email_summary: str  # Populated by summary agent
    email_id: str

@dataclass
class Email:
    """Email content stored in shared state."""

    email_id: str
    email_content: str

# Custom event for database operations
class DatabaseEvent(WorkflowEvent):
    """Custom event for tracking database operations."""
    pass

Auswahlfunktion: Das Herzstück der Mehrfachauswahl

Die Auswahlfunktion bestimmt, welche Executoren jede Nachricht empfangen sollen:

LONG_EMAIL_THRESHOLD = 100

def select_targets(analysis: AnalysisResult, target_ids: list[str]) -> list[str]:
    """Intelligent routing based on spam decision and email characteristics."""

    # Target order: [handle_spam, submit_to_email_assistant, summarize_email, handle_uncertain]
    handle_spam_id, submit_to_email_assistant_id, summarize_email_id, handle_uncertain_id = target_ids

    if analysis.spam_decision == "Spam":
        # Route only to spam handler
        return [handle_spam_id]

    elif analysis.spam_decision == "NotSpam":
        # Always route to email assistant
        targets = [submit_to_email_assistant_id]

        # Conditionally add summarizer for long emails
        if analysis.email_length > LONG_EMAIL_THRESHOLD:
            targets.append(summarize_email_id)

        return targets

    else:  # Uncertain
        # Route only to uncertain handler
        return [handle_uncertain_id]

Wichtige Features von Auswahlfunktionen

  1. Dynamische Zielauswahl: Gibt eine Liste der zu aktivierenden Executor-IDs zurück.
  2. Inhaltsfähiges Routing: Trifft Entscheidungen basierend auf Nachrichteneigenschaften
  3. Parallele Verarbeitung: Mehrere Ziele können gleichzeitig ausgeführt werden
  4. Bedingte Logik: Komplexe Verzweigung basierend auf mehreren Kriterien

Workflowausführer mit mehrfacher Auswahl

Implementieren Sie Executoren, die die erweiterte Analyse und das Routing behandeln:

EMAIL_STATE_PREFIX = "email:"
CURRENT_EMAIL_ID_KEY = "current_email_id"

@executor(id="store_email")
async def store_email(email_text: str, ctx: WorkflowContext[AgentExecutorRequest]) -> None:
    """Store email and initiate analysis."""

    new_email = Email(email_id=str(uuid4()), email_content=email_text)
    await ctx.set_shared_state(f"{EMAIL_STATE_PREFIX}{new_email.email_id}", new_email)
    await ctx.set_shared_state(CURRENT_EMAIL_ID_KEY, new_email.email_id)

    await ctx.send_message(
        AgentExecutorRequest(messages=[ChatMessage(Role.USER, text=new_email.email_content)], should_respond=True)
    )

@executor(id="to_analysis_result")
async def to_analysis_result(response: AgentExecutorResponse, ctx: WorkflowContext[AnalysisResult]) -> None:
    """Transform agent response into enriched analysis result."""

    parsed = AnalysisResultAgent.model_validate_json(response.agent_run_response.text)
    email_id: str = await ctx.get_shared_state(CURRENT_EMAIL_ID_KEY)
    email: Email = await ctx.get_shared_state(f"{EMAIL_STATE_PREFIX}{email_id}")

    # Create enriched analysis result with email length for routing decisions
    await ctx.send_message(
        AnalysisResult(
            spam_decision=parsed.spam_decision,
            reason=parsed.reason,
            email_length=len(email.email_content),  # Key for conditional routing
            email_summary="",
            email_id=email_id,
        )
    )

@executor(id="submit_to_email_assistant")
async def submit_to_email_assistant(analysis: AnalysisResult, ctx: WorkflowContext[AgentExecutorRequest]) -> None:
    """Handle legitimate emails by forwarding to email assistant."""

    if analysis.spam_decision != "NotSpam":
        raise RuntimeError("This executor should only handle NotSpam messages.")

    email: Email = await ctx.get_shared_state(f"{EMAIL_STATE_PREFIX}{analysis.email_id}")
    await ctx.send_message(
        AgentExecutorRequest(messages=[ChatMessage(Role.USER, text=email.email_content)], should_respond=True)
    )

@executor(id="finalize_and_send")
async def finalize_and_send(response: AgentExecutorResponse, ctx: WorkflowContext[Never, str]) -> None:
    """Final step for email assistant branch."""

    parsed = EmailResponse.model_validate_json(response.agent_run_response.text)
    await ctx.yield_output(f"Email sent: {parsed.response}")

@executor(id="summarize_email")
async def summarize_email(analysis: AnalysisResult, ctx: WorkflowContext[AgentExecutorRequest]) -> None:
    """Generate summary for long emails (parallel branch)."""

    # Only called for long NotSpam emails by selection function
    email: Email = await ctx.get_shared_state(f"{EMAIL_STATE_PREFIX}{analysis.email_id}")
    await ctx.send_message(
        AgentExecutorRequest(messages=[ChatMessage(Role.USER, text=email.email_content)], should_respond=True)
    )

@executor(id="merge_summary")
async def merge_summary(response: AgentExecutorResponse, ctx: WorkflowContext[AnalysisResult]) -> None:
    """Merge summary back into analysis result for database persistence."""

    summary = EmailSummaryModel.model_validate_json(response.agent_run_response.text)
    email_id: str = await ctx.get_shared_state(CURRENT_EMAIL_ID_KEY)
    email: Email = await ctx.get_shared_state(f"{EMAIL_STATE_PREFIX}{email_id}")

    # Create analysis result with summary for database storage
    await ctx.send_message(
        AnalysisResult(
            spam_decision="NotSpam",
            reason="",
            email_length=len(email.email_content),
            email_summary=summary.summary,  # Now includes summary
            email_id=email_id,
        )
    )

@executor(id="handle_spam")
async def handle_spam(analysis: AnalysisResult, ctx: WorkflowContext[Never, str]) -> None:
    """Handle spam emails (single target like switch-case)."""

    if analysis.spam_decision == "Spam":
        await ctx.yield_output(f"Email marked as spam: {analysis.reason}")
    else:
        raise RuntimeError("This executor should only handle Spam messages.")

@executor(id="handle_uncertain")
async def handle_uncertain(analysis: AnalysisResult, ctx: WorkflowContext[Never, str]) -> None:
    """Handle uncertain emails (single target like switch-case)."""

    if analysis.spam_decision == "Uncertain":
        email: Email | None = await ctx.get_shared_state(f"{EMAIL_STATE_PREFIX}{analysis.email_id}")
        await ctx.yield_output(
            f"Email marked as uncertain: {analysis.reason}. Email content: {getattr(email, 'email_content', '')}"
        )
    else:
        raise RuntimeError("This executor should only handle Uncertain messages.")

@executor(id="database_access")
async def database_access(analysis: AnalysisResult, ctx: WorkflowContext[Never, str]) -> None:
    """Simulate database persistence with custom events."""

    await asyncio.sleep(0.05)  # Simulate DB operation
    await ctx.add_event(DatabaseEvent(f"Email {analysis.email_id} saved to database."))

Erweiterte KI-Agenten

Erstellen Sie Agents für Analyse, Unterstützung und Zusammenfassung:

async def main() -> None:
    chat_client = AzureOpenAIChatClient(credential=AzureCliCredential())

    # Enhanced analysis agent
    email_analysis_agent = AgentExecutor(
        chat_client.create_agent(
            instructions=(
                "You are a spam detection assistant that identifies spam emails. "
                "Always return JSON with fields 'spam_decision' (one of NotSpam, Spam, Uncertain) "
                "and 'reason' (string)."
            ),
            response_format=AnalysisResultAgent,
        ),
        id="email_analysis_agent",
    )

    # Email assistant (same as before)
    email_assistant_agent = AgentExecutor(
        chat_client.create_agent(
            instructions=(
                "You are an email assistant that helps users draft responses to emails with professionalism."
            ),
            response_format=EmailResponse,
        ),
        id="email_assistant_agent",
    )

    # New: Email summary agent for long emails
    email_summary_agent = AgentExecutor(
        chat_client.create_agent(
            instructions="You are an assistant that helps users summarize emails.",
            response_format=EmailSummaryModel,
        ),
        id="email_summary_agent",
    )

Erstellen eines Multiauswahl-Workflows

Erstellen Sie den Workflow mit anspruchsvollem Routing und paralleler Verarbeitung:

    workflow = (
        WorkflowBuilder()
        .set_start_executor(store_email)
        .add_edge(store_email, email_analysis_agent)
        .add_edge(email_analysis_agent, to_analysis_result)

        # Multi-selection edge group: intelligent fan-out based on content
        .add_multi_selection_edge_group(
            to_analysis_result,
            [handle_spam, submit_to_email_assistant, summarize_email, handle_uncertain],
            selection_func=select_targets,
        )

        # Email assistant branch (always for NotSpam)
        .add_edge(submit_to_email_assistant, email_assistant_agent)
        .add_edge(email_assistant_agent, finalize_and_send)

        # Summary branch (only for long NotSpam emails)
        .add_edge(summarize_email, email_summary_agent)
        .add_edge(email_summary_agent, merge_summary)

        # Database persistence: conditional routing
        .add_edge(to_analysis_result, database_access,
                 condition=lambda r: r.email_length <= LONG_EMAIL_THRESHOLD)  # Short emails
        .add_edge(merge_summary, database_access)  # Long emails with summary

        .build()
    )

Ausführung mit Ereignisstreaming

Führen Sie den Workflow aus, und beobachten Sie die parallele Ausführung über benutzerdefinierte Ereignisse:

    # Use a moderately long email to trigger both assistant and summarizer
    email = """
    Hello team, here are the updates for this week:

    1. Project Alpha is on track and we should have the first milestone completed by Friday.
    2. The client presentation has been scheduled for next Tuesday at 2 PM.
    3. Please review the Q4 budget allocation and provide feedback by Wednesday.

    Let me know if you have any questions or concerns.

    Best regards,
    Alex
    """

    # Stream events to see parallel execution
    async for event in workflow.run_stream(email):
        if isinstance(event, DatabaseEvent):
            print(f"Database: {event}")
        elif isinstance(event, WorkflowOutputEvent):
            print(f"Output: {event.data}")

Mehrfachauswahl vs. Switch-Case-Vergleich

Switch-Case-Struktur (vorherig):

# One input → exactly one output
.add_switch_case_edge_group(
    source,
    [
        Case(condition=lambda x: x.result == "A", target=handler_a),
        Case(condition=lambda x: x.result == "B", target=handler_b),
        Default(target=handler_c),
    ],
)

Mehrfachauswahlmuster:

# One input → one or more outputs (dynamic fan-out)
.add_multi_selection_edge_group(
    source,
    [handler_a, handler_b, handler_c, handler_d],
    selection_func=intelligent_router,  # Returns list of target IDs
)

Vorteile der Mehrfachauswahl in C#

  1. Parallele Verarbeitung: Mehrere Zweige können gleichzeitig ausgeführt werden
  2. Bedingter Fanout: Die Anzahl der Ziele variiert je nach Inhalt.
  3. Inhaltsfähiges Routing: Entscheidungen basierend auf Nachrichteneigenschaften, nicht nur Typ
  4. Effiziente Ressourcennutzung: Nur notwendige Verzweigungen werden aktiviert.
  5. Komplexe Geschäftslogik: Unterstützt anspruchsvolle Routingszenarien

C#-Real-World-Anwendungen

  • E-Mail-Systeme: Weiterleitung zum Antwortassistent + Archiv + Analytics (bedingt)
  • Inhaltsverarbeitung: Triggertranskription + Übersetzung + Analyse (basierend auf Dem Inhaltstyp)
  • Auftragsabwicklung: Weiterleitung zur Erfüllung + Abrechnung + Benachrichtigungen (basierend auf Auftragsmerkmalen)
  • Datenpipelines: Auslösen unterschiedlicher Analyseflüsse basierend auf Datenmerkmalen

Beispielcode für mehrfache Auswahl

Die vollständige Arbeitsimplementierung finden Sie im multi_selection_edge_group.py Beispiel im Agent Framework-Repository.

Nächste Schritte