Megosztás:


Munkafolyamat létrehozása elágaztatási logikával

Ebben az oktatóanyagban megtudhatja, hogyan hozhat létre elágaztatási logikával rendelkező munkafolyamatot az Agent Framework használatával. Az elágaztatási logika lehetővé teszi, hogy a munkafolyamat bizonyos feltételek alapján hozzon döntéseket, így összetettebb és dinamikusabb viselkedést tesz lehetővé.

Feltételes élek

A feltételes élek lehetővé teszik, hogy a munkafolyamat útválasztási döntéseket hozzon a munkafolyamaton áthaladó üzenetek tartalma vagy tulajdonságai alapján. Ez lehetővé teszi a dinamikus elágazást, ahol a futtatókörnyezeti feltételek alapján különböző végrehajtási útvonalakat kell végrehajtani.

Mit fog felépíteni?

Létrehoz egy e-mail-feldolgozási munkafolyamatot, amely bemutatja a feltételes útválasztást:

  • Egy levélszemét-észlelési ügynök, amely elemzi a bejövő e-maileket, és strukturált JSON-t ad vissza.
  • Feltételes élek, amelyek az e-maileket a besorolás alapján különböző kezelőkhöz irányítják.
  • Egy megbízható e-mail-kezelő, amely profi válaszokat készít.
  • Egy levélszemétkezelő, amely gyanús e-maileket jelöl.
  • Megosztott állapotkezelés az e-mail-adatok munkafolyamat-lépések közötti megőrzéséhez.

A tárgyalt fogalmak

Előfeltételek

NuGet-csomagok telepítése

Először telepítse a szükséges csomagokat a .NET-projekthez:

dotnet add package Azure.AI.OpenAI --prerelease
dotnet add package Azure.Identity
dotnet add package Microsoft.Agents.AI.Workflows --prerelease
dotnet add package Microsoft.Extensions.AI.OpenAI --prerelease

Adatmodellek definiálása

Először határozza meg a munkafolyamaton áthaladó adatstruktúrát:

using System.Text.Json.Serialization;

/// <summary>
/// Represents the result of spam detection.
/// </summary>
public sealed class DetectionResult
{
    [JsonPropertyName("is_spam")]
    public bool IsSpam { get; set; }

    [JsonPropertyName("reason")]
    public string Reason { get; set; } = string.Empty;

    // Email ID is generated by the executor, not the agent
    [JsonIgnore]
    public string EmailId { get; set; } = string.Empty;
}

/// <summary>
/// Represents an email.
/// </summary>
internal sealed class Email
{
    [JsonPropertyName("email_id")]
    public string EmailId { get; set; } = string.Empty;

    [JsonPropertyName("email_content")]
    public string EmailContent { get; set; } = string.Empty;
}

/// <summary>
/// Represents the response from the email assistant.
/// </summary>
public sealed class EmailResponse
{
    [JsonPropertyName("response")]
    public string Response { get; set; } = string.Empty;
}

/// <summary>
/// Constants for shared state scopes.
/// </summary>
internal static class EmailStateConstants
{
    public const string EmailStateScope = "EmailState";
}

Feltételfüggvények létrehozása

A feltételfüggvény kiértékeli a levélszemét-észlelési eredményt, hogy meghatározza a munkafolyamat elérési útját:

/// <summary>
/// Creates a condition for routing messages based on the expected spam detection result.
/// </summary>
/// <param name="expectedResult">The expected spam detection result</param>
/// <returns>A function that evaluates whether a message meets the expected result</returns>
private static Func<object?, bool> GetCondition(bool expectedResult) =>
    detectionResult => detectionResult is DetectionResult result && result.IsSpam == expectedResult;

Ez a feltételértékelő függvény:

  • Egy paramétert bool expectedResult használ (a levélszemét esetében igaz, nem levélszemét esetén hamis)
  • Olyan függvényt ad vissza, amely peremfeltételként használható
  • Biztonságosan ellenőrzi, hogy az üzenet DetectionResult-e, és összehasonlítja a IsSpam tulajdonságot.

AI-ügynökök létrehozása

Állítsa be azokat az AI-ügynököket, amelyek kezelik a levélszemét észlelését és az e-mail-segítséget:

using Azure.AI.OpenAI;
using Azure.Identity;
using Microsoft.Agents.AI;
using Microsoft.Extensions.AI;

/// <summary>
/// Creates a spam detection agent.
/// </summary>
/// <returns>A ChatClientAgent configured for spam detection</returns>
private static ChatClientAgent GetSpamDetectionAgent(IChatClient chatClient) =>
    new(chatClient, new ChatClientAgentOptions(instructions: "You are a spam detection assistant that identifies spam emails.")
    {
        ChatOptions = new()
        {
            ResponseFormat = ChatResponseFormat.ForJsonSchema(AIJsonUtilities.CreateJsonSchema(typeof(DetectionResult)))
        }
    });

/// <summary>
/// Creates an email assistant agent.
/// </summary>
/// <returns>A ChatClientAgent configured for email assistance</returns>
private static ChatClientAgent GetEmailAssistantAgent(IChatClient chatClient) =>
    new(chatClient, new ChatClientAgentOptions(instructions: "You are an email assistant that helps users draft professional responses to emails.")
    {
        ChatOptions = new()
        {
            ResponseFormat = ChatResponseFormat.ForJsonSchema(AIJsonUtilities.CreateJsonSchema(typeof(EmailResponse)))
        }
    });

Végrehajtók implementálása

Hozza létre az e-mail-feldolgozás különböző szakaszait kezelő munkafolyamat-végrehajtókat:

using Microsoft.Agents.AI.Workflows;
using System.Text.Json;

/// <summary>
/// Executor that detects spam using an AI agent.
/// </summary>
internal sealed class SpamDetectionExecutor : Executor<ChatMessage, DetectionResult>
{
    private readonly AIAgent _spamDetectionAgent;

    public SpamDetectionExecutor(AIAgent spamDetectionAgent) : base("SpamDetectionExecutor")
    {
        this._spamDetectionAgent = spamDetectionAgent;
    }

    public override async ValueTask<DetectionResult> HandleAsync(ChatMessage message, IWorkflowContext context, CancellationToken cancellationToken = default)
    {
        // Generate a random email ID and store the email content to shared state
        var newEmail = new Email
        {
            EmailId = Guid.NewGuid().ToString("N"),
            EmailContent = message.Text
        };
        await context.QueueStateUpdateAsync(newEmail.EmailId, newEmail, scopeName: EmailStateConstants.EmailStateScope);

        // Invoke the agent for spam detection
        var response = await this._spamDetectionAgent.RunAsync(message);
        var detectionResult = JsonSerializer.Deserialize<DetectionResult>(response.Text);

        detectionResult!.EmailId = newEmail.EmailId;
        return detectionResult;
    }
}

/// <summary>
/// Executor that assists with email responses using an AI agent.
/// </summary>
internal sealed class EmailAssistantExecutor : Executor<DetectionResult, EmailResponse>
{
    private readonly AIAgent _emailAssistantAgent;

    public EmailAssistantExecutor(AIAgent emailAssistantAgent) : base("EmailAssistantExecutor")
    {
        this._emailAssistantAgent = emailAssistantAgent;
    }

    public override async ValueTask<EmailResponse> HandleAsync(DetectionResult message, IWorkflowContext context, CancellationToken cancellationToken = default)
    {
        if (message.IsSpam)
        {
            throw new ArgumentException("This executor should only handle non-spam messages.");
        }

        // Retrieve the email content from shared state
        var email = await context.ReadStateAsync<Email>(message.EmailId, scopeName: EmailStateConstants.EmailStateScope)
            ?? throw new InvalidOperationException("Email not found.");

        // Invoke the agent to draft a response
        var response = await this._emailAssistantAgent.RunAsync(email.EmailContent);
        var emailResponse = JsonSerializer.Deserialize<EmailResponse>(response.Text);

        return emailResponse!;
    }
}

/// <summary>
/// Executor that sends emails.
/// </summary>
internal sealed class SendEmailExecutor : Executor<EmailResponse>
{
    public SendEmailExecutor() : base("SendEmailExecutor") { }

    public override async ValueTask HandleAsync(EmailResponse message, IWorkflowContext context, CancellationToken cancellationToken = default) =>
        await context.YieldOutputAsync($"Email sent: {message.Response}");
}

/// <summary>
/// Executor that handles spam messages.
/// </summary>
internal sealed class HandleSpamExecutor : Executor<DetectionResult>
{
    public HandleSpamExecutor() : base("HandleSpamExecutor") { }

    public override async ValueTask HandleAsync(DetectionResult message, IWorkflowContext context, CancellationToken cancellationToken = default)
    {
        if (message.IsSpam)
        {
            await context.YieldOutputAsync($"Email marked as spam: {message.Reason}");
        }
        else
        {
            throw new ArgumentException("This executor should only handle spam messages.");
        }
    }
}

Munkafolyamat létrehozása feltételes élekkel

Most hozza létre a munkafolyamatot létrehozó és végrehajtó fő programot:

using Microsoft.Extensions.AI;

public static class Program
{
    private static async Task Main()
    {
        // Set up the Azure OpenAI client
        var endpoint = Environment.GetEnvironmentVariable("AZURE_OPENAI_ENDPOINT")
            ?? throw new Exception("AZURE_OPENAI_ENDPOINT is not set.");
        var deploymentName = Environment.GetEnvironmentVariable("AZURE_OPENAI_DEPLOYMENT_NAME") ?? "gpt-4o-mini";
        var chatClient = new AzureOpenAIClient(new Uri(endpoint), new AzureCliCredential())
            .GetChatClient(deploymentName).AsIChatClient();

        // Create agents
        AIAgent spamDetectionAgent = GetSpamDetectionAgent(chatClient);
        AIAgent emailAssistantAgent = GetEmailAssistantAgent(chatClient);

        // Create executors
        var spamDetectionExecutor = new SpamDetectionExecutor(spamDetectionAgent);
        var emailAssistantExecutor = new EmailAssistantExecutor(emailAssistantAgent);
        var sendEmailExecutor = new SendEmailExecutor();
        var handleSpamExecutor = new HandleSpamExecutor();

        // Build the workflow with conditional edges
        var workflow = new WorkflowBuilder(spamDetectionExecutor)
            // Non-spam path: route to email assistant when IsSpam = false
            .AddEdge(spamDetectionExecutor, emailAssistantExecutor, condition: GetCondition(expectedResult: false))
            .AddEdge(emailAssistantExecutor, sendEmailExecutor)
            // Spam path: route to spam handler when IsSpam = true
            .AddEdge(spamDetectionExecutor, handleSpamExecutor, condition: GetCondition(expectedResult: true))
            .WithOutputFrom(handleSpamExecutor, sendEmailExecutor)
            .Build();

        // Execute the workflow with sample spam email
        string emailContent = "Congratulations! You've won $1,000,000! Click here to claim your prize now!";
        StreamingRun run = await InProcessExecution.StreamAsync(workflow, new ChatMessage(ChatRole.User, emailContent));
        await run.TrySendMessageAsync(new TurnToken(emitEvents: true));

        await foreach (WorkflowEvent evt in run.WatchStreamAsync().ConfigureAwait(false))
        {
            if (evt is WorkflowOutputEvent outputEvent)
            {
                Console.WriteLine($"{outputEvent}");
            }
        }
    }
}

Hogyan működik?

  1. Munkafolyamat-bejegyzés: A munkafolyamat azzal kezdődik, hogy spamDetectionExecutorChatMessage érkezik.

  2. Levélszemételemzés: A levélszemétészlelő ügynök elemzi az e-maileket, és strukturált és DetectionResult tulajdonságokkal rendelkező IsSpam adatokat ad Reason vissza.

  3. Feltételes útválasztás: Az IsSpam érték alapján:

    • Ha levélszemét (): Útvonalak IsSpam = true használatával HandleSpamExecutor
    • Ha jogos (): Útvonalak IsSpam = false használatához EmailAssistantExecutor
  4. Válaszgenerálás: A megbízható e-mailek esetében az e-mail-asszisztens profi választ készít.

  5. Végső kimenet: A munkafolyamat levélszemét-értesítést küld, vagy elküldi a betervezett e-mail-választ.

A feltételes élek főbb jellemzői

  1. Type-Safe feltételek: A GetCondition metódus újrahasználható állapotfüggvényeket hoz létre, amelyek biztonságosan értékelik ki az üzenet tartalmát.

  2. Több elérési út: Egyetlen végrehajtó több különböző feltételekkel rendelkező kimenő éllel rendelkezhet, így összetett elágaztatási logikát tesz lehetővé.

  3. Megosztott állapot: Az e-mail-adatok hatóköralapú állapotkezeléssel maradnak a végrehajtók között, így az alsóbb rétegbeli végrehajtók hozzáférhetnek az eredeti tartalomhoz.

  4. Hibakezelés: A végrehajtók ellenőrzik a bemeneteiket, és jelentős kivételeket jeleznek váratlan üzenettípusok fogadásakor.

  5. Tiszta architektúra: Minden végrehajtónak egyetlen felelőssége van, így a munkafolyamat karbantartható és tesztelhető.

A példa futtatása

Amikor ezt a munkafolyamatot a spam mintalevéllel futtatja:

Email marked as spam: This email contains common spam indicators including monetary prizes, urgency tactics, and suspicious links that are typical of phishing attempts.

Próbálja meg az e-mail-tartalmat egy megbízhatóra módosítani:

string emailContent = "Hi, I wanted to follow up on our meeting yesterday and get your thoughts on the project proposal.";

A munkafolyamat az e-mail-asszisztenshez irányítódik, és professzionális választ generál helyette.

Ez a feltételes útválasztási minta képezi az összetett döntési fákat és üzleti logikát kezelni képes kifinomult munkafolyamatok létrehozásának alapjait.

Implementáció befejezése

A teljes munka végrehajtásához tekintse meg ezt a mintát az Agent Framework-adattárban.

Mit fog felépíteni?

Létrehoz egy e-mail-feldolgozási munkafolyamatot, amely bemutatja a feltételes útválasztást:

  • Levélszemét-észlelési ügynök, amely elemzi a bejövő e-maileket
  • Feltételes élek, amelyek az e-maileket a besorolás alapján különböző kezelőkhöz irányítják
  • Egy megbízható e-mail-kezelő, amely profi válaszokat készít
  • Egy levélszemétkezelő, amely gyanús e-maileket jelöl

A tárgyalt fogalmak

Előfeltételek

  • Python 3.10 vagy újabb
  • Telepített Ügynök-keretrendszer: pip install agent-framework-core --pre
  • Megfelelő környezeti változókkal konfigurált Azure OpenAI-szolgáltatás
  • Azure CLI-hitelesítés: az login

1. lépés: Kötelező függőségek importálása

Először importálja a feltételes munkafolyamatokhoz szükséges összetevőket:

import asyncio
import os
from dataclasses import dataclass
from typing import Any, Literal
from uuid import uuid4

from typing_extensions import Never

from agent_framework import (
    AgentExecutor,
    AgentExecutorRequest,
    AgentExecutorResponse,
    ChatMessage,
    Role,
    WorkflowBuilder,
    WorkflowContext,
    executor,
    Case,
    Default,
)
from agent_framework.azure import AzureOpenAIChatClient
from azure.identity import AzureCliCredential
from pydantic import BaseModel

2. lépés: Adatmodellek definiálása

Pydantic-modellek létrehozása a munkafolyamat-összetevők közötti strukturált adatcseréhez:

class DetectionResult(BaseModel):
    """Represents the result of spam detection."""
    # is_spam drives the routing decision taken by edge conditions
    is_spam: bool
    # Human readable rationale from the detector
    reason: str
    # The agent must include the original email so downstream agents can operate without reloading content
    email_content: str


class EmailResponse(BaseModel):
    """Represents the response from the email assistant."""
    # The drafted reply that a user could copy or send
    response: str

3. lépés: Feltételfüggvények létrehozása

Adja meg az útválasztási döntéseket meghatározó feltételfüggvényeket:

def get_condition(expected_result: bool):
    """Create a condition callable that routes based on DetectionResult.is_spam."""

    # The returned function will be used as an edge predicate.
    # It receives whatever the upstream executor produced.
    def condition(message: Any) -> bool:
        # Defensive guard. If a non AgentExecutorResponse appears, let the edge pass to avoid dead ends.
        if not isinstance(message, AgentExecutorResponse):
            return True

        try:
            # Prefer parsing a structured DetectionResult from the agent JSON text.
            # Using model_validate_json ensures type safety and raises if the shape is wrong.
            detection = DetectionResult.model_validate_json(message.agent_run_response.text)
            # Route only when the spam flag matches the expected path.
            return detection.is_spam == expected_result
        except Exception:
            # Fail closed on parse errors so we do not accidentally route to the wrong path.
            # Returning False prevents this edge from activating.
            return False

    return condition

4. lépés: Kezelői végrehajtók létrehozása

A különböző útválasztási eredmények kezeléséhez definiáljon végrehajtókat:

@executor(id="send_email")
async def handle_email_response(response: AgentExecutorResponse, ctx: WorkflowContext[Never, str]) -> None:
    """Handle legitimate emails by drafting a professional response."""
    # Downstream of the email assistant. Parse a validated EmailResponse and yield the workflow output.
    email_response = EmailResponse.model_validate_json(response.agent_run_response.text)
    await ctx.yield_output(f"Email sent:\n{email_response.response}")


@executor(id="handle_spam")
async def handle_spam_classifier_response(response: AgentExecutorResponse, ctx: WorkflowContext[Never, str]) -> None:
    """Handle spam emails by marking them appropriately."""
    # Spam path. Confirm the DetectionResult and yield the workflow output. Guard against accidental non spam input.
    detection = DetectionResult.model_validate_json(response.agent_run_response.text)
    if detection.is_spam:
        await ctx.yield_output(f"Email marked as spam: {detection.reason}")
    else:
        # This indicates the routing predicate and executor contract are out of sync.
        raise RuntimeError("This executor should only handle spam messages.")


@executor(id="to_email_assistant_request")
async def to_email_assistant_request(
    response: AgentExecutorResponse, ctx: WorkflowContext[AgentExecutorRequest]
) -> None:
    """Transform spam detection response into a request for the email assistant."""
    # Parse the detection result and extract the email content for the assistant
    detection = DetectionResult.model_validate_json(response.agent_run_response.text)

    # Create a new request for the email assistant with the original email content
    request = AgentExecutorRequest(
        messages=[ChatMessage(Role.USER, text=detection.email_content)],
        should_respond=True
    )
    await ctx.send_message(request)

5. lépés: AI-ügynökök létrehozása

Az Azure OpenAI-ügynökök beállítása strukturált kimeneti formázással:

async def main() -> None:
    # Create agents
    # AzureCliCredential uses your current az login. This avoids embedding secrets in code.
    chat_client = AzureOpenAIChatClient(credential=AzureCliCredential())

    # Agent 1. Classifies spam and returns a DetectionResult object.
    # response_format enforces that the LLM returns parsable JSON for the Pydantic model.
    spam_detection_agent = AgentExecutor(
        chat_client.create_agent(
            instructions=(
                "You are a spam detection assistant that identifies spam emails. "
                "Always return JSON with fields is_spam (bool), reason (string), and email_content (string). "
                "Include the original email content in email_content."
            ),
            response_format=DetectionResult,
        ),
        id="spam_detection_agent",
    )

    # Agent 2. Drafts a professional reply. Also uses structured JSON output for reliability.
    email_assistant_agent = AgentExecutor(
        chat_client.create_agent(
            instructions=(
                "You are an email assistant that helps users draft professional responses to emails. "
                "Your input might be a JSON object that includes 'email_content'; base your reply on that content. "
                "Return JSON with a single field 'response' containing the drafted reply."
            ),
            response_format=EmailResponse,
        ),
        id="email_assistant_agent",
    )

6. lépés: A feltételes munkafolyamat létrehozása

Hozzon létre egy munkafolyamatot olyan feltételes élekkel, amelyek a levélszemétészlelési eredmények alapján haladnak:

    # Build the workflow graph.
    # Start at the spam detector.
    # If not spam, hop to a transformer that creates a new AgentExecutorRequest,
    # then call the email assistant, then finalize.
    # If spam, go directly to the spam handler and finalize.
    workflow = (
        WorkflowBuilder()
        .set_start_executor(spam_detection_agent)
        # Not spam path: transform response -> request for assistant -> assistant -> send email
        .add_edge(spam_detection_agent, to_email_assistant_request, condition=get_condition(False))
        .add_edge(to_email_assistant_request, email_assistant_agent)
        .add_edge(email_assistant_agent, handle_email_response)
        # Spam path: send to spam handler
        .add_edge(spam_detection_agent, handle_spam_classifier_response, condition=get_condition(True))
        .build()
    )

7. lépés: A munkafolyamat végrehajtása

Futtassa a munkafolyamatot minta e-mail-tartalommal:

    # Read Email content from the sample resource file.
    # This keeps the sample deterministic since the model sees the same email every run.
    email_path = os.path.join(os.path.dirname(os.path.dirname(os.path.realpath(__file__))), "resources", "email.txt")

    with open(email_path) as email_file:  # noqa: ASYNC230
        email = email_file.read()

    # Execute the workflow. Since the start is an AgentExecutor, pass an AgentExecutorRequest.
    # The workflow completes when it becomes idle (no more work to do).
    request = AgentExecutorRequest(messages=[ChatMessage(Role.USER, text=email)], should_respond=True)
    events = await workflow.run(request)
    outputs = events.get_outputs()
    if outputs:
        print(f"Workflow output: {outputs[0]}")


if __name__ == "__main__":
    asyncio.run(main())

A feltételes élek működése

  1. Feltételfüggvények: A get_condition() függvény létrehoz egy predikátumot, amely megvizsgálja az üzenet tartalmát, és visszaadja True vagy False annak meghatározására, hogy át kell-e haladni az élen.

  2. Üzenetvizsgálat: A feltételek az üzenet bármely aspektusát megvizsgálhatják, beleértve a Pydantic-modellekkel elemezett ügynökválaszokból származó strukturált adatokat is.

  3. Védelmi programozás: A feltételfüggvény hibakezelést tartalmaz, amely megakadályozza a strukturált adatok elemzésekor fellépő útválasztási hibákat.

  4. Dinamikus útválasztás: A levélszemét észlelésének eredménye alapján a rendszer automatikusan átirányítja az e-maileket az e-mail-asszisztenshez (jogos e-mailek esetén) vagy a levélszemétkezelőhöz (gyanús e-mailek esetén).

Alapfogalmak

  • Élfeltételek: Logikai predikátumok, amelyek meghatározzák, hogy át kell-e haladni egy élen
  • Strukturált kimenetek: Pydantic-modellek használata megbízható adatelemzéssel response_format
  • Védekező útválasztás: A feltételfüggvények kezelik a szélsőséges eseteket a munkafolyamat holtpontjainak megakadályozása érdekében
  • Üzenetátalakítás: A végrehajtók átalakíthatják az üzenettípusokat a munkafolyamat lépései között

Implementáció befejezése

A teljes munka implementációjához tekintse meg az ügynök-keretrendszer adattárában található edge_condition.py mintát.

Switch-Case élek

Feltételes élekre építve

Az előző feltételes élek példája kétirányú útválasztást mutatott (levélszemét és jogos e-mailek). Számos valós forgatókönyv azonban kifinomultabb döntési fákat igényel. A kapcsolótok élei tisztább, karbantarthatóbb megoldást nyújtanak, ha különböző feltételek alapján több célhelyre kell irányítania.

Amit meg fogsz építeni a Switch-Case segítségével

Az e-mail-feldolgozási munkafolyamatot három döntési útvonal kezelésére fogja kiterjeszteni:

  • NotSpam → e-mail-asszisztens → e-mail küldése
  • Levélszemét → Levélszemét-végrehajtó kezelése
  • Bizonytalan → Bizonytalan végrehajtó kezelése (alapértelmezett eset)

A legfontosabb fejlesztés a minta használata több SwitchBuilder különálló feltételes él helyett, így a munkafolyamat könnyebben érthető és karbantartható a döntési összetettség növekedésével.

A tárgyalt fogalmak

Adatmodellek Switch-Case

Frissítse az adatmodelleket a háromirányú besorolás támogatásához:

/// <summary>
/// Represents the possible decisions for spam detection.
/// </summary>
public enum SpamDecision
{
    NotSpam,
    Spam,
    Uncertain
}

/// <summary>
/// Represents the result of spam detection with enhanced decision support.
/// </summary>
public sealed class DetectionResult
{
    [JsonPropertyName("spam_decision")]
    [JsonConverter(typeof(JsonStringEnumConverter))]
    public SpamDecision spamDecision { get; set; }

    [JsonPropertyName("reason")]
    public string Reason { get; set; } = string.Empty;

    // Email ID is generated by the executor, not the agent
    [JsonIgnore]
    public string EmailId { get; set; } = string.Empty;
}

/// <summary>
/// Represents an email stored in shared state.
/// </summary>
internal sealed class Email
{
    [JsonPropertyName("email_id")]
    public string EmailId { get; set; } = string.Empty;

    [JsonPropertyName("email_content")]
    public string EmailContent { get; set; } = string.Empty;
}

/// <summary>
/// Represents the response from the email assistant.
/// </summary>
public sealed class EmailResponse
{
    [JsonPropertyName("response")]
    public string Response { get; set; } = string.Empty;
}

/// <summary>
/// Constants for shared state scopes.
/// </summary>
internal static class EmailStateConstants
{
    public const string EmailStateScope = "EmailState";
}

Feltételgyár Switch-Case

Hozzon létre egy újrafelhasználható feltétel-előállítót, amely predikátumokat hoz létre az egyes levélszemét-döntésekhez:

/// <summary>
/// Creates a condition for routing messages based on the expected spam detection result.
/// </summary>
/// <param name="expectedDecision">The expected spam detection decision</param>
/// <returns>A function that evaluates whether a message meets the expected result</returns>
private static Func<object?, bool> GetCondition(SpamDecision expectedDecision) =>
    detectionResult => detectionResult is DetectionResult result && result.spamDecision == expectedDecision;

Ez a gyári megközelítés:

  • Csökkenti a kód duplikációját: Egy függvény létrehozza az összes feltétel-predikátumot
  • Konzisztencia biztosítása: Minden feltétel ugyanazt a mintát követi
  • Egyszerűbb karbantartás: A feltétellogika módosítása egy helyen történik

Továbbfejlesztett AI-ügynök

Frissítse a levélszemét-észlelési ügynököt, hogy kevésbé legyen magabiztos, és háromirányú besorolást ad vissza:

/// <summary>
/// Creates a spam detection agent with enhanced uncertainty handling.
/// </summary>
/// <returns>A ChatClientAgent configured for three-way spam detection</returns>
private static ChatClientAgent GetSpamDetectionAgent(IChatClient chatClient) =>
    new(chatClient, new ChatClientAgentOptions(instructions: "You are a spam detection assistant that identifies spam emails. Be less confident in your assessments.")
    {
        ChatOptions = new()
        {
            ResponseFormat = ChatResponseFormat.ForJsonSchema<DetectionResult>()
        }
    });

/// <summary>
/// Creates an email assistant agent (unchanged from conditional edges example).
/// </summary>
/// <returns>A ChatClientAgent configured for email assistance</returns>
private static ChatClientAgent GetEmailAssistantAgent(IChatClient chatClient) =>
    new(chatClient, new ChatClientAgentOptions(instructions: "You are an email assistant that helps users draft responses to emails with professionalism.")
    {
        ChatOptions = new()
        {
            ResponseFormat = ChatResponseFormat.ForJsonSchema<EmailResponse>()
        }
    });

Munkafolyamat-végrehajtók továbbfejlesztett útválasztással

A háromirányú útválasztást megosztott állapotkezeléssel kezelő végrehajtók implementálása:

/// <summary>
/// Executor that detects spam using an AI agent with three-way classification.
/// </summary>
internal sealed class SpamDetectionExecutor : Executor<ChatMessage, DetectionResult>
{
    private readonly AIAgent _spamDetectionAgent;

    public SpamDetectionExecutor(AIAgent spamDetectionAgent) : base("SpamDetectionExecutor")
    {
        this._spamDetectionAgent = spamDetectionAgent;
    }

    public override async ValueTask<DetectionResult> HandleAsync(ChatMessage message, IWorkflowContext context, CancellationToken cancellationToken = default)
    {
        // Generate a random email ID and store the email content in shared state
        var newEmail = new Email
        {
            EmailId = Guid.NewGuid().ToString("N"),
            EmailContent = message.Text
        };
        await context.QueueStateUpdateAsync(newEmail.EmailId, newEmail, scopeName: EmailStateConstants.EmailStateScope);

        // Invoke the agent for enhanced spam detection
        var response = await this._spamDetectionAgent.RunAsync(message);
        var detectionResult = JsonSerializer.Deserialize<DetectionResult>(response.Text);

        detectionResult!.EmailId = newEmail.EmailId;
        return detectionResult;
    }
}

/// <summary>
/// Executor that assists with email responses using an AI agent.
/// </summary>
internal sealed class EmailAssistantExecutor : Executor<DetectionResult, EmailResponse>
{
    private readonly AIAgent _emailAssistantAgent;

    public EmailAssistantExecutor(AIAgent emailAssistantAgent) : base("EmailAssistantExecutor")
    {
        this._emailAssistantAgent = emailAssistantAgent;
    }

    public override async ValueTask<EmailResponse> HandleAsync(DetectionResult message, IWorkflowContext context, CancellationToken cancellationToken = default)
    {
        if (message.spamDecision == SpamDecision.Spam)
        {
            throw new ArgumentException("This executor should only handle non-spam messages.");
        }

        // Retrieve the email content from shared state
        var email = await context.ReadStateAsync<Email>(message.EmailId, scopeName: EmailStateConstants.EmailStateScope);

        // Invoke the agent to draft a response
        var response = await this._emailAssistantAgent.RunAsync(email!.EmailContent);
        var emailResponse = JsonSerializer.Deserialize<EmailResponse>(response.Text);

        return emailResponse!;
    }
}

/// <summary>
/// Executor that sends emails.
/// </summary>
internal sealed class SendEmailExecutor : Executor<EmailResponse>
{
    public SendEmailExecutor() : base("SendEmailExecutor") { }

    public override async ValueTask HandleAsync(EmailResponse message, IWorkflowContext context, CancellationToken cancellationToken = default) =>
        await context.YieldOutputAsync($"Email sent: {message.Response}").ConfigureAwait(false);
}

/// <summary>
/// Executor that handles spam messages.
/// </summary>
internal sealed class HandleSpamExecutor : Executor<DetectionResult>
{
    public HandleSpamExecutor() : base("HandleSpamExecutor") { }

    public override async ValueTask HandleAsync(DetectionResult message, IWorkflowContext context, CancellationToken cancellationToken = default)
    {
        if (message.spamDecision == SpamDecision.Spam)
        {
            await context.YieldOutputAsync($"Email marked as spam: {message.Reason}").ConfigureAwait(false);
        }
        else
        {
            throw new ArgumentException("This executor should only handle spam messages.");
        }
    }
}

/// <summary>
/// Executor that handles uncertain emails requiring manual review.
/// </summary>
internal sealed class HandleUncertainExecutor : Executor<DetectionResult>
{
    public HandleUncertainExecutor() : base("HandleUncertainExecutor") { }

    public override async ValueTask HandleAsync(DetectionResult message, IWorkflowContext context, CancellationToken cancellationToken = default)
    {
        if (message.spamDecision == SpamDecision.Uncertain)
        {
            var email = await context.ReadStateAsync<Email>(message.EmailId, scopeName: EmailStateConstants.EmailStateScope);
            await context.YieldOutputAsync($"Email marked as uncertain: {message.Reason}. Email content: {email?.EmailContent}");
        }
        else
        {
            throw new ArgumentException("This executor should only handle uncertain spam decisions.");
        }
    }
}

Munkafolyamat építése Switch-Case sémával

Cserélje le több feltételes élét a tisztább kapcsolós esetmintára:

public static class Program
{
    private static async Task Main()
    {
        // Set up the Azure OpenAI client
        var endpoint = Environment.GetEnvironmentVariable("AZURE_OPENAI_ENDPOINT") ?? throw new Exception("AZURE_OPENAI_ENDPOINT is not set.");
        var deploymentName = Environment.GetEnvironmentVariable("AZURE_OPENAI_DEPLOYMENT_NAME") ?? "gpt-4o-mini";
        var chatClient = new AzureOpenAIClient(new Uri(endpoint), new AzureCliCredential()).GetChatClient(deploymentName).AsIChatClient();

        // Create agents
        AIAgent spamDetectionAgent = GetSpamDetectionAgent(chatClient);
        AIAgent emailAssistantAgent = GetEmailAssistantAgent(chatClient);

        // Create executors
        var spamDetectionExecutor = new SpamDetectionExecutor(spamDetectionAgent);
        var emailAssistantExecutor = new EmailAssistantExecutor(emailAssistantAgent);
        var sendEmailExecutor = new SendEmailExecutor();
        var handleSpamExecutor = new HandleSpamExecutor();
        var handleUncertainExecutor = new HandleUncertainExecutor();

        // Build the workflow using switch-case for cleaner three-way routing
        WorkflowBuilder builder = new(spamDetectionExecutor);
        builder.AddSwitch(spamDetectionExecutor, switchBuilder =>
            switchBuilder
            .AddCase(
                GetCondition(expectedDecision: SpamDecision.NotSpam),
                emailAssistantExecutor
            )
            .AddCase(
                GetCondition(expectedDecision: SpamDecision.Spam),
                handleSpamExecutor
            )
            .WithDefault(
                handleUncertainExecutor
            )
        )
        // After the email assistant writes a response, it will be sent to the send email executor
        .AddEdge(emailAssistantExecutor, sendEmailExecutor)
        .WithOutputFrom(handleSpamExecutor, sendEmailExecutor, handleUncertainExecutor);

        var workflow = builder.Build();

        // Read an email from a text file (use ambiguous content for demonstration)
        string email = Resources.Read("ambiguous_email.txt");

        // Execute the workflow
        StreamingRun run = await InProcessExecution.StreamAsync(workflow, new ChatMessage(ChatRole.User, email));
        await run.TrySendMessageAsync(new TurnToken(emitEvents: true));
        await foreach (WorkflowEvent evt in run.WatchStreamAsync().ConfigureAwait(false))
        {
            if (evt is WorkflowOutputEvent outputEvent)
            {
                Console.WriteLine($"{outputEvent}");
            }
        }
    }
}

Switch-Case előnyök

  1. Tisztább szintaxis: A SwitchBuilder több feltételes él helyett olvashatóbb alternatívát kínál
  2. Megrendelt értékelés: Az eseteket egymás után értékelik ki, és az első meccsen leállnak
  3. Garantált útválasztás: A WithDefault() módszer biztosítja, hogy az üzenetek soha ne ragadnak meg
  4. Jobb karbantarthatóság: Az új esetek hozzáadása minimális módosítást igényel a munkafolyamat-struktúrában
  5. Típusbiztonság: Minden végrehajtó ellenőrzi a bemenetét az útválasztási hibák korai észleléséhez

Minta összehasonlítása

Előtte (feltételes élcsomópontok):

var workflow = new WorkflowBuilder(spamDetectionExecutor)
    .AddEdge(spamDetectionExecutor, emailAssistantExecutor, condition: GetCondition(expectedResult: false))
    .AddEdge(spamDetectionExecutor, handleSpamExecutor, condition: GetCondition(expectedResult: true))
    // No clean way to handle a third case
    .WithOutputFrom(handleSpamExecutor, sendEmailExecutor)
    .Build();

Utána (Switch-Case):

WorkflowBuilder builder = new(spamDetectionExecutor);
builder.AddSwitch(spamDetectionExecutor, switchBuilder =>
    switchBuilder
    .AddCase(GetCondition(SpamDecision.NotSpam), emailAssistantExecutor)
    .AddCase(GetCondition(SpamDecision.Spam), handleSpamExecutor)
    .WithDefault(handleUncertainExecutor)  // Clean default case
)
// Continue building the rest of the workflow

A switch-case minta sokkal jobban skálázódik ahogy az útválasztási döntések száma növekszik, és az alapértelmezett eset biztonsági hálót nyújt a váratlan értékek kezeléséhez.

A példa futtatása

Ha nem egyértelmű e-mail-tartalommal futtatja ezt a munkafolyamatot:

Email marked as uncertain: This email contains promotional language but might be from a legitimate business contact, requiring human review for proper classification.

Próbálja meg módosítani az e-mail-tartalmat egy egyértelműen spamre vagy egyértelműen legitimre, hogy lássa a különböző útválasztási útvonalak működését.

Implementáció befejezése

A teljes munka végrehajtásához tekintse meg ezt a mintát az Agent Framework-adattárban.

Feltételes élekre építve

Az előző feltételes élek példája kétirányú útválasztást mutatott (levélszemét és jogos e-mailek). Számos valós forgatókönyv azonban kifinomultabb döntési fákat igényel. A kapcsolótok élei tisztább, karbantarthatóbb megoldást nyújtanak, ha különböző feltételek alapján több célhelyre kell irányítania.

Mit fogsz legközelebb készíteni?

Az e-mail-feldolgozási munkafolyamatot három döntési útvonal kezelésére fogja kiterjeszteni:

  • NotSpam → e-mail-asszisztens → e-mail küldése
  • Levélszemét → Megjelölés levélszemétként
  • Bizonytalan → jelölő manuális felülvizsgálathoz (alapértelmezett eset)

A legfontosabb fejlesztés az, hogy több különálló feltételes él helyett egyetlen 'switch-case' élcsoportot használunk. Ez megkönnyíti a munkafolyamat megértését és karbantartását, ahogy a döntési komplexitás növekszik.

A tárgyalt fogalmak

Továbbfejlesztett adatmodellek

Frissítse az adatmodelleket a háromirányú besorolás támogatásához:

from typing import Literal

class DetectionResultAgent(BaseModel):
    """Structured output returned by the spam detection agent."""

    # The agent classifies the email into one of three categories
    spam_decision: Literal["NotSpam", "Spam", "Uncertain"]
    reason: str

class EmailResponse(BaseModel):
    """Structured output returned by the email assistant agent."""

    response: str

@dataclass
class DetectionResult:
    """Internal typed payload used for routing and downstream handling."""

    spam_decision: str
    reason: str
    email_id: str

@dataclass
class Email:
    """In memory record of the email content stored in shared state."""

    email_id: str
    email_content: str

Kapcsoló-Eset Feltételgyár

Hozzon létre egy újrafelhasználható feltétel-előállítót, amely predikátumokat hoz létre az egyes levélszemét-döntésekhez:

def get_case(expected_decision: str):
    """Factory that returns a predicate matching a specific spam_decision value."""

    def condition(message: Any) -> bool:
        # Only match when the upstream payload is a DetectionResult with the expected decision
        return isinstance(message, DetectionResult) and message.spam_decision == expected_decision

    return condition

Ez a gyári megközelítés:

  • Csökkenti a kód duplikációját: Egy függvény létrehozza az összes feltétel-predikátumot
  • Konzisztencia biztosítása: Minden feltétel ugyanazt a mintát követi
  • Egyszerűbb karbantartás: A feltétellogika módosítása egy helyen történik

Megosztott állapotú munkafolyamat-végrehajtók

Olyan végrehajtók implementálása, amelyek megosztott állapotot használnak, hogy ne adjanak át nagy mennyiségű e-mail-tartalmat minden munkafolyamat-lépésben:

EMAIL_STATE_PREFIX = "email:"
CURRENT_EMAIL_ID_KEY = "current_email_id"

@executor(id="store_email")
async def store_email(email_text: str, ctx: WorkflowContext[AgentExecutorRequest]) -> None:
    """Store email content once and pass around a lightweight ID reference."""

    # Persist the raw email content in shared state
    new_email = Email(email_id=str(uuid4()), email_content=email_text)
    await ctx.set_shared_state(f"{EMAIL_STATE_PREFIX}{new_email.email_id}", new_email)
    await ctx.set_shared_state(CURRENT_EMAIL_ID_KEY, new_email.email_id)

    # Forward email to spam detection agent
    await ctx.send_message(
        AgentExecutorRequest(messages=[ChatMessage(Role.USER, text=new_email.email_content)], should_respond=True)
    )

@executor(id="to_detection_result")
async def to_detection_result(response: AgentExecutorResponse, ctx: WorkflowContext[DetectionResult]) -> None:
    """Transform agent response into a typed DetectionResult with email ID."""

    # Parse the agent's structured JSON output
    parsed = DetectionResultAgent.model_validate_json(response.agent_run_response.text)
    email_id: str = await ctx.get_shared_state(CURRENT_EMAIL_ID_KEY)

    # Create typed message for switch-case routing
    await ctx.send_message(DetectionResult(
        spam_decision=parsed.spam_decision,
        reason=parsed.reason,
        email_id=email_id
    ))

@executor(id="submit_to_email_assistant")
async def submit_to_email_assistant(detection: DetectionResult, ctx: WorkflowContext[AgentExecutorRequest]) -> None:
    """Handle NotSpam emails by forwarding to the email assistant."""

    # Guard against misrouting
    if detection.spam_decision != "NotSpam":
        raise RuntimeError("This executor should only handle NotSpam messages.")

    # Retrieve original email content from shared state
    email: Email = await ctx.get_shared_state(f"{EMAIL_STATE_PREFIX}{detection.email_id}")
    await ctx.send_message(
        AgentExecutorRequest(messages=[ChatMessage(Role.USER, text=email.email_content)], should_respond=True)
    )

@executor(id="finalize_and_send")
async def finalize_and_send(response: AgentExecutorResponse, ctx: WorkflowContext[Never, str]) -> None:
    """Parse email assistant response and yield final output."""

    parsed = EmailResponse.model_validate_json(response.agent_run_response.text)
    await ctx.yield_output(f"Email sent: {parsed.response}")

@executor(id="handle_spam")
async def handle_spam(detection: DetectionResult, ctx: WorkflowContext[Never, str]) -> None:
    """Handle confirmed spam emails."""

    if detection.spam_decision == "Spam":
        await ctx.yield_output(f"Email marked as spam: {detection.reason}")
    else:
        raise RuntimeError("This executor should only handle Spam messages.")

@executor(id="handle_uncertain")
async def handle_uncertain(detection: DetectionResult, ctx: WorkflowContext[Never, str]) -> None:
    """Handle uncertain classifications that need manual review."""

    if detection.spam_decision == "Uncertain":
        # Include original content for human review
        email: Email | None = await ctx.get_shared_state(f"{EMAIL_STATE_PREFIX}{detection.email_id}")
        await ctx.yield_output(
            f"Email marked as uncertain: {detection.reason}. Email content: {getattr(email, 'email_content', '')}"
        )
    else:
        raise RuntimeError("This executor should only handle Uncertain messages.")

Továbbfejlesztett AI-ügynök létrehozása

Frissítse a levélszemét-észlelési ügynököt, hogy kevésbé legyen magabiztos, és háromirányú besorolást ad vissza:

async def main():
    chat_client = AzureOpenAIChatClient(credential=AzureCliCredential())

    # Enhanced spam detection agent with three-way classification
    spam_detection_agent = AgentExecutor(
        chat_client.create_agent(
            instructions=(
                "You are a spam detection assistant that identifies spam emails. "
                "Be less confident in your assessments. "
                "Always return JSON with fields 'spam_decision' (one of NotSpam, Spam, Uncertain) "
                "and 'reason' (string)."
            ),
            response_format=DetectionResultAgent,
        ),
        id="spam_detection_agent",
    )

    # Email assistant remains the same
    email_assistant_agent = AgentExecutor(
        chat_client.create_agent(
            instructions=(
                "You are an email assistant that helps users draft responses to emails with professionalism."
            ),
            response_format=EmailResponse,
        ),
        id="email_assistant_agent",
    )

Munkafolyamat létrehozása Switch-Case Edge-csoporttal

Több feltételes él cseréje egyetlen kapcsolós esetcsoportra:

    # Build workflow using switch-case for cleaner three-way routing
    workflow = (
        WorkflowBuilder()
        .set_start_executor(store_email)
        .add_edge(store_email, spam_detection_agent)
        .add_edge(spam_detection_agent, to_detection_result)
        .add_switch_case_edge_group(
            to_detection_result,
            [
                # Explicit cases for specific decisions
                Case(condition=get_case("NotSpam"), target=submit_to_email_assistant),
                Case(condition=get_case("Spam"), target=handle_spam),
                # Default case catches anything that doesn't match above
                Default(target=handle_uncertain),
            ],
        )
        .add_edge(submit_to_email_assistant, email_assistant_agent)
        .add_edge(email_assistant_agent, finalize_and_send)
        .build()
    )

Végrehajtás és tesztelés

Futtassa a munkafolyamatot nem egyértelmű e-mail-tartalommal, amely bemutatja a háromirányú útválasztást:

    # Use ambiguous email content that might trigger uncertain classification
    email = (
        "Hey there, I noticed you might be interested in our latest offer—no pressure, but it expires soon. "
        "Let me know if you'd like more details."
    )

    # Execute and display results
    events = await workflow.run(email)
    outputs = events.get_outputs()
    if outputs:
        for output in outputs:
            print(f"Workflow output: {output}")

A Switch-Case élek fontos előnyei

  1. Tisztább szintaxis: Több feltételes él helyett egy élcsoport
  2. Megrendelt értékelés: Az eseteket egymás után értékelik ki, és az első meccsen leállnak
  3. Garantált útválasztás: Az alapértelmezett eset biztosítja, hogy az üzenetek soha ne ragadnak meg
  4. Jobb karbantarthatóság: Az új esetek hozzáadása minimális módosítást igényel
  5. Típusbiztonság: Minden végrehajtó ellenőrzi a bemenetét az útválasztási hibák észlelése érdekében

Összehasonlítás: Feltételes szerkezet vs. Switch-Case szerkezet

Előtte (feltételes élcsomópontok):

.add_edge(detector, handler_a, condition=lambda x: x.result == "A")
.add_edge(detector, handler_b, condition=lambda x: x.result == "B")
.add_edge(detector, handler_c, condition=lambda x: x.result == "C")

Utána (Switch-Case):

.add_switch_case_edge_group(
    detector,
    [
        Case(condition=lambda x: x.result == "A", target=handler_a),
        Case(condition=lambda x: x.result == "B", target=handler_b),
        Default(target=handler_c),  # Catches everything else
    ],
)

A switch-case minta sokkal jobban skálázódik ahogy az útválasztási döntések száma növekszik, és az alapértelmezett eset biztonsági hálót nyújt a váratlan értékek kezeléséhez.

Switch Case példakód

A teljes működő megvalósításért tekintse meg az Agent Framework adattárban található switch_case_edge_group.py mintát.

Többkijelölésű élek

A Switch-Case-en túl: Többválasztásos útválasztás

Bár a switch-case szerkezetek pontosan egy célhelyre irányítják az üzeneteket, a valós munkafolyamatoknak gyakran több párhuzamos műveletet kell aktiválniuk az adattulajdonságok alapján. A particionált élek (particionálókkal rendelkező kirakodó élekként implementálva) kifinomult kirakodómintákat tesznek lehetővé, ahol egyetlen üzenet egyszerre több alárendelt végrehajtót is aktiválhat.

Speciális e-mail-feldolgozási munkafolyamat

A kapcsolós eset példájára építve egy továbbfejlesztett e-mail-feldolgozó rendszert fog létrehozni, amely kifinomult útválasztási logikát mutat be:

  • Spam e-mailek → egységes levélszemét-kezelő (például switch-case szerkezet)
  • Megbízható e-mailekMindig aktiválja az e-mail-asszisztenst + Feltételesen aktiválja az összegzőt hosszú e-mailek esetén
  • Bizonytalan e-mailek → Egy bizonytalan kezelő (például switch-case szerkezet)
  • Adatbázis-adatmegőrzési → Rövid és összesített hosszú e-mailek esetén is aktiválódik

Ez a minta lehetővé teszi a tartalom jellemzőihez igazodó párhuzamos feldolgozási folyamatokat.

A tárgyalt fogalmak

Adatmodellek több kijelöléshez

Az adatmodellek kiterjesztése az e-mailek hosszának elemzéséhez és összegzéséhez:

/// <summary>
/// Represents the result of enhanced email analysis with additional metadata.
/// </summary>
public sealed class AnalysisResult
{
    [JsonPropertyName("spam_decision")]
    [JsonConverter(typeof(JsonStringEnumConverter))]
    public SpamDecision spamDecision { get; set; }

    [JsonPropertyName("reason")]
    public string Reason { get; set; } = string.Empty;

    // Additional properties for sophisticated routing
    [JsonIgnore]
    public int EmailLength { get; set; }

    [JsonIgnore]
    public string EmailSummary { get; set; } = string.Empty;

    [JsonIgnore]
    public string EmailId { get; set; } = string.Empty;
}

/// <summary>
/// Represents the response from the email assistant.
/// </summary>
public sealed class EmailResponse
{
    [JsonPropertyName("response")]
    public string Response { get; set; } = string.Empty;
}

/// <summary>
/// Represents the response from the email summary agent.
/// </summary>
public sealed class EmailSummary
{
    [JsonPropertyName("summary")]
    public string Summary { get; set; } = string.Empty;
}

/// <summary>
/// A custom workflow event for database operations.
/// </summary>
internal sealed class DatabaseEvent(string message) : WorkflowEvent(message) { }

/// <summary>
/// Constants for email processing thresholds.
/// </summary>
public static class EmailProcessingConstants
{
    public const int LongEmailThreshold = 100;
}

Célkijelölő függvény: A többszörös kijelölés szíve

A cél-hozzárendelő függvény határozza meg, hogy mely végrehajtók kapják meg az egyes üzeneteket:

/// <summary>
/// Creates a target assigner for routing messages based on the analysis result.
/// </summary>
/// <returns>A function that takes an analysis result and returns the target partitions.</returns>
private static Func<AnalysisResult?, int, IEnumerable<int>> GetTargetAssigner()
{
    return (analysisResult, targetCount) =>
    {
        if (analysisResult is not null)
        {
            if (analysisResult.spamDecision == SpamDecision.Spam)
            {
                return [0]; // Route only to spam handler (index 0)
            }
            else if (analysisResult.spamDecision == SpamDecision.NotSpam)
            {
                // Always route to email assistant (index 1)
                List<int> targets = [1];

                // Conditionally add summarizer for long emails (index 2)
                if (analysisResult.EmailLength > EmailProcessingConstants.LongEmailThreshold)
                {
                    targets.Add(2);
                }

                return targets;
            }
            else // Uncertain
            {
                return [3]; // Route only to uncertain handler (index 3)
            }
        }
        throw new ArgumentException("Invalid analysis result.");
    };
}

A Target Assigner függvény főbb jellemzői

  1. Dinamikus cél kiválasztása: Az aktiválandó végrehajtói indexek listáját adja vissza
  2. Tartalomérzékeny útválasztás: Döntéseket hoz az üzenet tulajdonságai, például az e-mail hossza alapján
  3. Párhuzamos feldolgozás: Egyszerre több cél is végrehajtható
  4. Feltételes logika: Összetett elágaztatás több feltétel alapján

Továbbfejlesztett munkafolyamat-végrehajtók

A speciális elemzést és útválasztást kezelő végrehajtók implementálása:

/// <summary>
/// Executor that analyzes emails using an AI agent with enhanced analysis.
/// </summary>
internal sealed class EmailAnalysisExecutor : Executor<ChatMessage, AnalysisResult>
{
    private readonly AIAgent _emailAnalysisAgent;

    public EmailAnalysisExecutor(AIAgent emailAnalysisAgent) : base("EmailAnalysisExecutor")
    {
        this._emailAnalysisAgent = emailAnalysisAgent;
    }

    public override async ValueTask<AnalysisResult> HandleAsync(ChatMessage message, IWorkflowContext context, CancellationToken cancellationToken = default)
    {
        // Generate a random email ID and store the email content
        var newEmail = new Email
        {
            EmailId = Guid.NewGuid().ToString("N"),
            EmailContent = message.Text
        };
        await context.QueueStateUpdateAsync(newEmail.EmailId, newEmail, scopeName: EmailStateConstants.EmailStateScope);

        // Invoke the agent for enhanced analysis
        var response = await this._emailAnalysisAgent.RunAsync(message);
        var analysisResult = JsonSerializer.Deserialize<AnalysisResult>(response.Text);

        // Enrich with metadata for routing decisions
        analysisResult!.EmailId = newEmail.EmailId;
        analysisResult.EmailLength = newEmail.EmailContent.Length;

        return analysisResult;
    }
}

/// <summary>
/// Executor that assists with email responses using an AI agent.
/// </summary>
internal sealed class EmailAssistantExecutor : Executor<AnalysisResult, EmailResponse>
{
    private readonly AIAgent _emailAssistantAgent;

    public EmailAssistantExecutor(AIAgent emailAssistantAgent) : base("EmailAssistantExecutor")
    {
        this._emailAssistantAgent = emailAssistantAgent;
    }

    public override async ValueTask<EmailResponse> HandleAsync(AnalysisResult message, IWorkflowContext context, CancellationToken cancellationToken = default)
    {
        if (message.spamDecision == SpamDecision.Spam)
        {
            throw new ArgumentException("This executor should only handle non-spam messages.");
        }

        // Retrieve the email content from shared state
        var email = await context.ReadStateAsync<Email>(message.EmailId, scopeName: EmailStateConstants.EmailStateScope);

        // Invoke the agent to draft a response
        var response = await this._emailAssistantAgent.RunAsync(email!.EmailContent);
        var emailResponse = JsonSerializer.Deserialize<EmailResponse>(response.Text);

        return emailResponse!;
    }
}

/// <summary>
/// Executor that summarizes emails using an AI agent for long emails.
/// </summary>
internal sealed class EmailSummaryExecutor : Executor<AnalysisResult, AnalysisResult>
{
    private readonly AIAgent _emailSummaryAgent;

    public EmailSummaryExecutor(AIAgent emailSummaryAgent) : base("EmailSummaryExecutor")
    {
        this._emailSummaryAgent = emailSummaryAgent;
    }

    public override async ValueTask<AnalysisResult> HandleAsync(AnalysisResult message, IWorkflowContext context, CancellationToken cancellationToken = default)
    {
        // Read the email content from shared state
        var email = await context.ReadStateAsync<Email>(message.EmailId, scopeName: EmailStateConstants.EmailStateScope);

        // Generate summary for long emails
        var response = await this._emailSummaryAgent.RunAsync(email!.EmailContent);
        var emailSummary = JsonSerializer.Deserialize<EmailSummary>(response.Text);

        // Enrich the analysis result with the summary
        message.EmailSummary = emailSummary!.Summary;

        return message;
    }
}

/// <summary>
/// Executor that sends emails.
/// </summary>
internal sealed class SendEmailExecutor : Executor<EmailResponse>
{
    public SendEmailExecutor() : base("SendEmailExecutor") { }

    public override async ValueTask HandleAsync(EmailResponse message, IWorkflowContext context, CancellationToken cancellationToken = default) =>
        await context.YieldOutputAsync($"Email sent: {message.Response}");
}

/// <summary>
/// Executor that handles spam messages.
/// </summary>
internal sealed class HandleSpamExecutor : Executor<AnalysisResult>
{
    public HandleSpamExecutor() : base("HandleSpamExecutor") { }

    public override async ValueTask HandleAsync(AnalysisResult message, IWorkflowContext context, CancellationToken cancellationToken = default)
    {
        if (message.spamDecision == SpamDecision.Spam)
        {
            await context.YieldOutputAsync($"Email marked as spam: {message.Reason}");
        }
        else
        {
            throw new ArgumentException("This executor should only handle spam messages.");
        }
    }
}

/// <summary>
/// Executor that handles uncertain messages requiring manual review.
/// </summary>
internal sealed class HandleUncertainExecutor : Executor<AnalysisResult>
{
    public HandleUncertainExecutor() : base("HandleUncertainExecutor") { }

    public override async ValueTask HandleAsync(AnalysisResult message, IWorkflowContext context, CancellationToken cancellationToken = default)
    {
        if (message.spamDecision == SpamDecision.Uncertain)
        {
            var email = await context.ReadStateAsync<Email>(message.EmailId, scopeName: EmailStateConstants.EmailStateScope);
            await context.YieldOutputAsync($"Email marked as uncertain: {message.Reason}. Email content: {email?.EmailContent}");
        }
        else
        {
            throw new ArgumentException("This executor should only handle uncertain spam decisions.");
        }
    }
}

/// <summary>
/// Executor that handles database access with custom events.
/// </summary>
internal sealed class DatabaseAccessExecutor : Executor<AnalysisResult>
{
    public DatabaseAccessExecutor() : base("DatabaseAccessExecutor") { }

    public override async ValueTask HandleAsync(AnalysisResult message, IWorkflowContext context, CancellationToken cancellationToken = default)
    {
        // Simulate database operations
        await context.ReadStateAsync<Email>(message.EmailId, scopeName: EmailStateConstants.EmailStateScope);
        await Task.Delay(100); // Simulate database access delay

        // Emit custom database event for monitoring
        await context.AddEventAsync(new DatabaseEvent($"Email {message.EmailId} saved to database."));
    }
}

Továbbfejlesztett AI-ügynökök

Ügynökök létrehozása elemzéshez, segítségnyújtáshoz és összegzéshez:

/// <summary>
/// Create an enhanced email analysis agent.
/// </summary>
/// <returns>A ChatClientAgent configured for comprehensive email analysis</returns>
private static ChatClientAgent GetEmailAnalysisAgent(IChatClient chatClient) =>
    new(chatClient, new ChatClientAgentOptions(instructions: "You are a spam detection assistant that identifies spam emails.")
    {
        ChatOptions = new()
        {
            ResponseFormat = ChatResponseFormat.ForJsonSchema<AnalysisResult>()
        }
    });

/// <summary>
/// Creates an email assistant agent.
/// </summary>
/// <returns>A ChatClientAgent configured for email assistance</returns>
private static ChatClientAgent GetEmailAssistantAgent(IChatClient chatClient) =>
    new(chatClient, new ChatClientAgentOptions(instructions: "You are an email assistant that helps users draft responses to emails with professionalism.")
    {
        ChatOptions = new()
        {
            ResponseFormat = ChatResponseFormat.ForJsonSchema<EmailResponse>()
        }
    });

/// <summary>
/// Creates an agent that summarizes emails.
/// </summary>
/// <returns>A ChatClientAgent configured for email summarization</returns>
private static ChatClientAgent GetEmailSummaryAgent(IChatClient chatClient) =>
    new(chatClient, new ChatClientAgentOptions(instructions: "You are an assistant that helps users summarize emails.")
    {
        ChatOptions = new()
        {
            ResponseFormat = ChatResponseFormat.ForJsonSchema<EmailSummary>()
        }
    });

Többkijelöléses munkafolyamat felépítése

Hozza létre a munkafolyamatot kifinomult útválasztással és párhuzamos feldolgozással:

public static class Program
{
    private static async Task Main()
    {
        // Set up the Azure OpenAI client
        var endpoint = Environment.GetEnvironmentVariable("AZURE_OPENAI_ENDPOINT") ?? throw new Exception("AZURE_OPENAI_ENDPOINT is not set.");
        var deploymentName = Environment.GetEnvironmentVariable("AZURE_OPENAI_DEPLOYMENT_NAME") ?? "gpt-4o-mini";
        var chatClient = new AzureOpenAIClient(new Uri(endpoint), new AzureCliCredential()).GetChatClient(deploymentName).AsIChatClient();

        // Create agents
        AIAgent emailAnalysisAgent = GetEmailAnalysisAgent(chatClient);
        AIAgent emailAssistantAgent = GetEmailAssistantAgent(chatClient);
        AIAgent emailSummaryAgent = GetEmailSummaryAgent(chatClient);

        // Create executors
        var emailAnalysisExecutor = new EmailAnalysisExecutor(emailAnalysisAgent);
        var emailAssistantExecutor = new EmailAssistantExecutor(emailAssistantAgent);
        var emailSummaryExecutor = new EmailSummaryExecutor(emailSummaryAgent);
        var sendEmailExecutor = new SendEmailExecutor();
        var handleSpamExecutor = new HandleSpamExecutor();
        var handleUncertainExecutor = new HandleUncertainExecutor();
        var databaseAccessExecutor = new DatabaseAccessExecutor();

        // Build the workflow with multi-selection fan-out
        WorkflowBuilder builder = new(emailAnalysisExecutor);
        builder.AddFanOutEdge(
            emailAnalysisExecutor,
            targets: [
                handleSpamExecutor,        // Index 0: Spam handler
                emailAssistantExecutor,    // Index 1: Email assistant (always for NotSpam)
                emailSummaryExecutor,      // Index 2: Summarizer (conditionally for long NotSpam)
                handleUncertainExecutor,   // Index 3: Uncertain handler
            ],
            targetSelector: GetTargetAssigner()
        )
        // Email assistant branch
        .AddEdge(emailAssistantExecutor, sendEmailExecutor)

        // Database persistence: conditional routing
        .AddEdge<AnalysisResult>(
            emailAnalysisExecutor,
            databaseAccessExecutor,
            condition: analysisResult => analysisResult?.EmailLength <= EmailProcessingConstants.LongEmailThreshold) // Short emails
        .AddEdge(emailSummaryExecutor, databaseAccessExecutor) // Long emails with summary

        .WithOutputFrom(handleUncertainExecutor, handleSpamExecutor, sendEmailExecutor);

        var workflow = builder.Build();

        // Read a moderately long email to trigger both assistant and summarizer
        string email = Resources.Read("email.txt");

        // Execute the workflow with custom event handling
        StreamingRun run = await InProcessExecution.StreamAsync(workflow, new ChatMessage(ChatRole.User, email));
        await run.TrySendMessageAsync(new TurnToken(emitEvents: true));
        await foreach (WorkflowEvent evt in run.WatchStreamAsync().ConfigureAwait(false))
        {
            if (evt is WorkflowOutputEvent outputEvent)
            {
                Console.WriteLine($"Output: {outputEvent}");
            }

            if (evt is DatabaseEvent databaseEvent)
            {
                Console.WriteLine($"Database: {databaseEvent}");
            }
        }
    }
}

Minta összehasonlítása: Több kijelölés és Switch-Case

Switch-Case mintázat (előző):

// One input → exactly one output
builder.AddSwitch(spamDetectionExecutor, switchBuilder =>
    switchBuilder
    .AddCase(GetCondition(SpamDecision.NotSpam), emailAssistantExecutor)
    .AddCase(GetCondition(SpamDecision.Spam), handleSpamExecutor)
    .WithDefault(handleUncertainExecutor)
)

Több kijelölési minta:

// One input → one or more outputs (dynamic fan-out)
builder.AddFanOutEdge(
    emailAnalysisExecutor,
    targets: [handleSpamExecutor, emailAssistantExecutor, emailSummaryExecutor, handleUncertainExecutor],
    targetSelector: GetTargetAssigner() // Returns list of target indices
)

A többkijelöléses élek fő előnyei

  1. Párhuzamos feldolgozás: Egyszerre több ág is végrehajtható
  2. Feltételes szétosztás: A célok száma a tartalom függvényében változik
  3. Tartalomérzékeny útválasztás: Az üzenet tulajdonságain alapuló döntések, nem csak beírás
  4. Hatékony erőforrás-használat: Csak a szükséges ágak aktiválódnak
  5. Összetett üzleti logika: Támogatja a kifinomult útválasztási forgatókönyveket

A többszörös kijelölési példa futtatása

Ha hosszú e-mailt futtatja ezt a munkafolyamatot:

Output: Email sent: [Professional response generated by AI]
Database: Email abc123 saved to database.

Ha rövid e-maillel fut, a rendszer kihagyja az összegzőt:

Output: Email sent: [Professional response generated by AI]
Database: Email def456 saved to database.

Real-World használati esetek

  • Levelezőrendszerek: Útvonal a válasz-asszisztenshez + archív + elemzés (feltételesen)
  • Tartalomfeldolgozás: Trigger átírása + fordítás + elemzés (tartalomtípus alapján)
  • Rendelésfeldolgozás: Teljesítési útvonal + számlázás + értesítések (a rendelés tulajdonságai alapján)
  • Adatfolyamok: Különböző elemzési folyamatok aktiválása az adatjellemzők alapján

Több kijelöléses teljes implementáció

A teljes munka végrehajtásához tekintse meg ezt a mintát az Agent Framework-adattárban.

A Switch-Case-en túl: Többválasztásos útválasztás

Bár a switch-case szerkezetek pontosan egy célhelyre irányítják az üzeneteket, a valós munkafolyamatoknak gyakran több párhuzamos műveletet kell aktiválniuk az adattulajdonságok alapján. A particionált élek (többkijelölési élcsoportként implementálva) kifinomult ventilátorkiválasztási mintákat tesznek lehetővé, ahol egyetlen üzenet egyszerre több alárendelt végrehajtót is aktiválhat.

Speciális e-mail-feldolgozási munkafolyamat

A kapcsolós eset példájára építve egy továbbfejlesztett e-mail-feldolgozó rendszert fog létrehozni, amely kifinomult útválasztási logikát mutat be:

  • Spam e-mailek → egységes levélszemét-kezelő (például switch-case szerkezet)
  • Megbízható e-mailekMindig aktiválja az e-mail-asszisztenst + Feltételesen aktiválja az összegzőt hosszú e-mailek esetén
  • Bizonytalan e-mailek → Egy bizonytalan kezelő (például switch-case szerkezet)
  • Adatbázis-adatmegőrzési → Rövid és összesített hosszú e-mailek esetén is aktiválódik

Ez a minta lehetővé teszi a tartalom jellemzőihez igazodó párhuzamos feldolgozási folyamatokat.

A tárgyalt fogalmak

Továbbfejlesztett adatmodellek több kijelöléshez

Az adatmodellek kiterjesztése az e-mailek hosszának elemzéséhez és összegzéséhez:

class AnalysisResultAgent(BaseModel):
    """Enhanced structured output from email analysis agent."""

    spam_decision: Literal["NotSpam", "Spam", "Uncertain"]
    reason: str

class EmailResponse(BaseModel):
    """Response from email assistant."""

    response: str

class EmailSummaryModel(BaseModel):
    """Summary generated by email summary agent."""

    summary: str

@dataclass
class AnalysisResult:
    """Internal analysis result with email metadata for routing decisions."""

    spam_decision: str
    reason: str
    email_length: int  # Used for conditional routing
    email_summary: str  # Populated by summary agent
    email_id: str

@dataclass
class Email:
    """Email content stored in shared state."""

    email_id: str
    email_content: str

# Custom event for database operations
class DatabaseEvent(WorkflowEvent):
    """Custom event for tracking database operations."""
    pass

Kijelölési függvény: A többszörös kijelölés szíve

A kijelölési függvény határozza meg, hogy mely végrehajtók kapják meg az egyes üzeneteket:

LONG_EMAIL_THRESHOLD = 100

def select_targets(analysis: AnalysisResult, target_ids: list[str]) -> list[str]:
    """Intelligent routing based on spam decision and email characteristics."""

    # Target order: [handle_spam, submit_to_email_assistant, summarize_email, handle_uncertain]
    handle_spam_id, submit_to_email_assistant_id, summarize_email_id, handle_uncertain_id = target_ids

    if analysis.spam_decision == "Spam":
        # Route only to spam handler
        return [handle_spam_id]

    elif analysis.spam_decision == "NotSpam":
        # Always route to email assistant
        targets = [submit_to_email_assistant_id]

        # Conditionally add summarizer for long emails
        if analysis.email_length > LONG_EMAIL_THRESHOLD:
            targets.append(summarize_email_id)

        return targets

    else:  # Uncertain
        # Route only to uncertain handler
        return [handle_uncertain_id]

A kijelölési függvények főbb jellemzői

  1. Dinamikus cél kiválasztása: Az aktiválandó végrehajtóazonosítók listáját adja vissza
  2. Tartalomérzékeny útválasztás: Döntéseket hoz az üzenet tulajdonságai alapján
  3. Párhuzamos feldolgozás: Egyszerre több cél is végrehajtható
  4. Feltételes logika: Összetett elágaztatás több feltétel alapján

Többkijelöléses munkafolyamat-végrehajtók

A továbbfejlesztett elemzést és útválasztást kezelő végrehajtók implementálása:

EMAIL_STATE_PREFIX = "email:"
CURRENT_EMAIL_ID_KEY = "current_email_id"

@executor(id="store_email")
async def store_email(email_text: str, ctx: WorkflowContext[AgentExecutorRequest]) -> None:
    """Store email and initiate analysis."""

    new_email = Email(email_id=str(uuid4()), email_content=email_text)
    await ctx.set_shared_state(f"{EMAIL_STATE_PREFIX}{new_email.email_id}", new_email)
    await ctx.set_shared_state(CURRENT_EMAIL_ID_KEY, new_email.email_id)

    await ctx.send_message(
        AgentExecutorRequest(messages=[ChatMessage(Role.USER, text=new_email.email_content)], should_respond=True)
    )

@executor(id="to_analysis_result")
async def to_analysis_result(response: AgentExecutorResponse, ctx: WorkflowContext[AnalysisResult]) -> None:
    """Transform agent response into enriched analysis result."""

    parsed = AnalysisResultAgent.model_validate_json(response.agent_run_response.text)
    email_id: str = await ctx.get_shared_state(CURRENT_EMAIL_ID_KEY)
    email: Email = await ctx.get_shared_state(f"{EMAIL_STATE_PREFIX}{email_id}")

    # Create enriched analysis result with email length for routing decisions
    await ctx.send_message(
        AnalysisResult(
            spam_decision=parsed.spam_decision,
            reason=parsed.reason,
            email_length=len(email.email_content),  # Key for conditional routing
            email_summary="",
            email_id=email_id,
        )
    )

@executor(id="submit_to_email_assistant")
async def submit_to_email_assistant(analysis: AnalysisResult, ctx: WorkflowContext[AgentExecutorRequest]) -> None:
    """Handle legitimate emails by forwarding to email assistant."""

    if analysis.spam_decision != "NotSpam":
        raise RuntimeError("This executor should only handle NotSpam messages.")

    email: Email = await ctx.get_shared_state(f"{EMAIL_STATE_PREFIX}{analysis.email_id}")
    await ctx.send_message(
        AgentExecutorRequest(messages=[ChatMessage(Role.USER, text=email.email_content)], should_respond=True)
    )

@executor(id="finalize_and_send")
async def finalize_and_send(response: AgentExecutorResponse, ctx: WorkflowContext[Never, str]) -> None:
    """Final step for email assistant branch."""

    parsed = EmailResponse.model_validate_json(response.agent_run_response.text)
    await ctx.yield_output(f"Email sent: {parsed.response}")

@executor(id="summarize_email")
async def summarize_email(analysis: AnalysisResult, ctx: WorkflowContext[AgentExecutorRequest]) -> None:
    """Generate summary for long emails (parallel branch)."""

    # Only called for long NotSpam emails by selection function
    email: Email = await ctx.get_shared_state(f"{EMAIL_STATE_PREFIX}{analysis.email_id}")
    await ctx.send_message(
        AgentExecutorRequest(messages=[ChatMessage(Role.USER, text=email.email_content)], should_respond=True)
    )

@executor(id="merge_summary")
async def merge_summary(response: AgentExecutorResponse, ctx: WorkflowContext[AnalysisResult]) -> None:
    """Merge summary back into analysis result for database persistence."""

    summary = EmailSummaryModel.model_validate_json(response.agent_run_response.text)
    email_id: str = await ctx.get_shared_state(CURRENT_EMAIL_ID_KEY)
    email: Email = await ctx.get_shared_state(f"{EMAIL_STATE_PREFIX}{email_id}")

    # Create analysis result with summary for database storage
    await ctx.send_message(
        AnalysisResult(
            spam_decision="NotSpam",
            reason="",
            email_length=len(email.email_content),
            email_summary=summary.summary,  # Now includes summary
            email_id=email_id,
        )
    )

@executor(id="handle_spam")
async def handle_spam(analysis: AnalysisResult, ctx: WorkflowContext[Never, str]) -> None:
    """Handle spam emails (single target like switch-case)."""

    if analysis.spam_decision == "Spam":
        await ctx.yield_output(f"Email marked as spam: {analysis.reason}")
    else:
        raise RuntimeError("This executor should only handle Spam messages.")

@executor(id="handle_uncertain")
async def handle_uncertain(analysis: AnalysisResult, ctx: WorkflowContext[Never, str]) -> None:
    """Handle uncertain emails (single target like switch-case)."""

    if analysis.spam_decision == "Uncertain":
        email: Email | None = await ctx.get_shared_state(f"{EMAIL_STATE_PREFIX}{analysis.email_id}")
        await ctx.yield_output(
            f"Email marked as uncertain: {analysis.reason}. Email content: {getattr(email, 'email_content', '')}"
        )
    else:
        raise RuntimeError("This executor should only handle Uncertain messages.")

@executor(id="database_access")
async def database_access(analysis: AnalysisResult, ctx: WorkflowContext[Never, str]) -> None:
    """Simulate database persistence with custom events."""

    await asyncio.sleep(0.05)  # Simulate DB operation
    await ctx.add_event(DatabaseEvent(f"Email {analysis.email_id} saved to database."))

Továbbfejlesztett AI-ügynökök

Ügynökök létrehozása elemzéshez, segítségnyújtáshoz és összegzéshez:

async def main() -> None:
    chat_client = AzureOpenAIChatClient(credential=AzureCliCredential())

    # Enhanced analysis agent
    email_analysis_agent = AgentExecutor(
        chat_client.create_agent(
            instructions=(
                "You are a spam detection assistant that identifies spam emails. "
                "Always return JSON with fields 'spam_decision' (one of NotSpam, Spam, Uncertain) "
                "and 'reason' (string)."
            ),
            response_format=AnalysisResultAgent,
        ),
        id="email_analysis_agent",
    )

    # Email assistant (same as before)
    email_assistant_agent = AgentExecutor(
        chat_client.create_agent(
            instructions=(
                "You are an email assistant that helps users draft responses to emails with professionalism."
            ),
            response_format=EmailResponse,
        ),
        id="email_assistant_agent",
    )

    # New: Email summary agent for long emails
    email_summary_agent = AgentExecutor(
        chat_client.create_agent(
            instructions="You are an assistant that helps users summarize emails.",
            response_format=EmailSummaryModel,
        ),
        id="email_summary_agent",
    )

Többkijelöléses munkafolyamat létrehozása

Hozza létre a munkafolyamatot kifinomult útválasztással és párhuzamos feldolgozással:

    workflow = (
        WorkflowBuilder()
        .set_start_executor(store_email)
        .add_edge(store_email, email_analysis_agent)
        .add_edge(email_analysis_agent, to_analysis_result)

        # Multi-selection edge group: intelligent fan-out based on content
        .add_multi_selection_edge_group(
            to_analysis_result,
            [handle_spam, submit_to_email_assistant, summarize_email, handle_uncertain],
            selection_func=select_targets,
        )

        # Email assistant branch (always for NotSpam)
        .add_edge(submit_to_email_assistant, email_assistant_agent)
        .add_edge(email_assistant_agent, finalize_and_send)

        # Summary branch (only for long NotSpam emails)
        .add_edge(summarize_email, email_summary_agent)
        .add_edge(email_summary_agent, merge_summary)

        # Database persistence: conditional routing
        .add_edge(to_analysis_result, database_access,
                 condition=lambda r: r.email_length <= LONG_EMAIL_THRESHOLD)  # Short emails
        .add_edge(merge_summary, database_access)  # Long emails with summary

        .build()
    )

Végrehajtás eseménystreameléssel

Futtassa a munkafolyamatot, és figyelje meg a párhuzamos végrehajtást egyéni eseményeken keresztül:

    # Use a moderately long email to trigger both assistant and summarizer
    email = """
    Hello team, here are the updates for this week:

    1. Project Alpha is on track and we should have the first milestone completed by Friday.
    2. The client presentation has been scheduled for next Tuesday at 2 PM.
    3. Please review the Q4 budget allocation and provide feedback by Wednesday.

    Let me know if you have any questions or concerns.

    Best regards,
    Alex
    """

    # Stream events to see parallel execution
    async for event in workflow.run_stream(email):
        if isinstance(event, DatabaseEvent):
            print(f"Database: {event}")
        elif isinstance(event, WorkflowOutputEvent):
            print(f"Output: {event.data}")

Többszörös kiválasztás és Switch-Case összehasonlítása

Switch-Case mintázat (előző):

# One input → exactly one output
.add_switch_case_edge_group(
    source,
    [
        Case(condition=lambda x: x.result == "A", target=handler_a),
        Case(condition=lambda x: x.result == "B", target=handler_b),
        Default(target=handler_c),
    ],
)

Több kijelölési minta:

# One input → one or more outputs (dynamic fan-out)
.add_multi_selection_edge_group(
    source,
    [handler_a, handler_b, handler_c, handler_d],
    selection_func=intelligent_router,  # Returns list of target IDs
)

A C# többszörös kiválasztás előnyei

  1. Párhuzamos feldolgozás: Egyszerre több ág is végrehajtható
  2. Feltételes szétosztás: A célok száma a tartalom függvényében változik
  3. Tartalomérzékeny útválasztás: Az üzenet tulajdonságain alapuló döntések, nem csak beírás
  4. Hatékony erőforrás-használat: Csak a szükséges ágak aktiválódnak
  5. Összetett üzleti logika: Támogatja a kifinomult útválasztási forgatókönyveket

C# valós alkalmazások

  • Levelezőrendszerek: Útvonal a válasz-asszisztenshez + archív + elemzés (feltételesen)
  • Tartalomfeldolgozás: Trigger átírása + fordítás + elemzés (tartalomtípus alapján)
  • Rendelésfeldolgozás: Teljesítési útvonal + számlázás + értesítések (a rendelés tulajdonságai alapján)
  • Adatfolyamok: Különböző elemzési folyamatok aktiválása az adatjellemzők alapján

Többkijelöléses mintakód

A teljes működő megvalósításért tekintse meg az Agent Framework adattárban található multi_selection_edge_group.py mintát.

Következő lépések