Megosztás:


Szélek

A peremhálózatok határozzák meg, hogyan haladnak az üzenetek a munkafolyamat végrehajtói között. A munkafolyamat-gráf kapcsolatait jelölik, és meghatározzák az adatfolyam-útvonalakat. Az élek tartalmazhatnak olyan feltételeket, amelyek vezérelhetik az útválasztást az üzenettartalmak alapján.

Éltípusok

A keretrendszer számos élmintát támogat:

Típus Description Felhasználási eset
Közvetlen Egyszerű egy-az-egyhez kapcsolatok Lineáris pipelinek
Feltételes Élek azokkal a feltételekkel, amelyek meghatározzák, mikor áramlanak az üzenetek Bináris útválasztás (ha/más)
Switch-Case szerkezet Irányítás különböző végrehajtók felé feltételek alapján Többágú útválasztás
Többszörös kijelölés (szétterítés) Egy végrehajtó üzeneteket küld több célnak Párhuzamos feldolgozás
Beillő Több végrehajtó, amely egyetlen célhoz küld adatokat Aggregation

Közvetlen kapcsolatok

A legegyszerűbb forma – két végrehajtó csatlakoztatása feltételek nélkül:

WorkflowBuilder builder = new(sourceExecutor);
builder.AddEdge(sourceExecutor, targetExecutor);
builder = WorkflowBuilder(start_executor=source_executor)
builder.add_edge(source_executor, target_executor)
workflow = builder.build()

Ventilátoros élek

Üzenetek gyűjtése több forrásból egyetlen célba:

builder.AddFanInEdge(aggregatorExecutor, sources: [worker1, worker2, worker3]);
builder.add_fan_in_edge([worker1, worker2, worker3], aggregator_executor)

Az alábbi szakaszok részletes oktatóanyagokat nyújtanak a feltételes, a kapcsolós és a többszörös kijelölésű élekhez.

Feltételes élek

A feltételes élek lehetővé teszik, hogy a munkafolyamat útválasztási döntéseket hozzon a munkafolyamaton áthaladó üzenetek tartalma vagy tulajdonságai alapján. Ez lehetővé teszi a dinamikus elágazást, ahol a futtatókörnyezeti feltételek alapján különböző végrehajtási útvonalakat kell végrehajtani.

Mit fog felépíteni?

Létrehoz egy e-mail-feldolgozási munkafolyamatot, amely bemutatja a feltételes útválasztást:

  • Egy levélszemét-észlelési ügynök, amely elemzi a bejövő e-maileket, és strukturált JSON-t ad vissza.
  • Feltételes élek, amelyek az e-maileket a besorolás alapján különböző kezelőkhöz irányítják.
  • Egy megbízható e-mail-kezelő, amely profi válaszokat készít.
  • Egy levélszemétkezelő, amely gyanús e-maileket jelöl.
  • Megosztott állapotkezelés az e-mail-adatok munkafolyamat-lépések közötti megőrzéséhez.

A tárgyalt fogalmak

Előfeltételek

NuGet-csomagok telepítése

Először telepítse a szükséges csomagokat a .NET-projekthez:

dotnet add package Azure.AI.OpenAI --prerelease
dotnet add package Azure.Identity
dotnet add package Microsoft.Agents.AI.Workflows --prerelease
dotnet add package Microsoft.Extensions.AI.OpenAI --prerelease

Adatmodellek definiálása

Először határozza meg a munkafolyamaton áthaladó adatstruktúrát:

using System.Text.Json.Serialization;

/// <summary>
/// Represents the result of spam detection.
/// </summary>
public sealed class DetectionResult
{
    [JsonPropertyName("is_spam")]
    public bool IsSpam { get; set; }

    [JsonPropertyName("reason")]
    public string Reason { get; set; } = string.Empty;

    // Email ID is generated by the executor, not the agent
    [JsonIgnore]
    public string EmailId { get; set; } = string.Empty;
}

/// <summary>
/// Represents an email.
/// </summary>
internal sealed class Email
{
    [JsonPropertyName("email_id")]
    public string EmailId { get; set; } = string.Empty;

    [JsonPropertyName("email_content")]
    public string EmailContent { get; set; } = string.Empty;
}

/// <summary>
/// Represents the response from the email assistant.
/// </summary>
public sealed class EmailResponse
{
    [JsonPropertyName("response")]
    public string Response { get; set; } = string.Empty;
}

/// <summary>
/// Constants for shared state scopes.
/// </summary>
internal static class EmailStateConstants
{
    public const string EmailStateScope = "EmailState";
}

Feltételfüggvények létrehozása

A feltételfüggvény kiértékeli a levélszemét-észlelési eredményt, hogy meghatározza a munkafolyamat elérési útját:

/// <summary>
/// Creates a condition for routing messages based on the expected spam detection result.
/// </summary>
/// <param name="expectedResult">The expected spam detection result</param>
/// <returns>A function that evaluates whether a message meets the expected result</returns>
private static Func<object?, bool> GetCondition(bool expectedResult) =>
    detectionResult => detectionResult is DetectionResult result && result.IsSpam == expectedResult;

Ez a feltételértékelő függvény:

  • Egy paramétert bool expectedResult használ (a levélszemét esetében igaz, nem levélszemét esetén hamis)
  • Olyan függvényt ad vissza, amely peremfeltételként használható
  • Biztonságosan ellenőrzi, hogy az üzenet DetectionResult-e, és összehasonlítja a IsSpam tulajdonságot.

AI-ügynökök létrehozása

Állítsa be azokat az AI-ügynököket, amelyek kezelik a levélszemét észlelését és az e-mail-segítséget:

using Azure.AI.OpenAI;
using Azure.Identity;
using Microsoft.Agents.AI;
using Microsoft.Extensions.AI;

/// <summary>
/// Creates a spam detection agent.
/// </summary>
/// <returns>A ChatClientAgent configured for spam detection</returns>
private static ChatClientAgent GetSpamDetectionAgent(IChatClient chatClient) =>
    new(chatClient, new ChatClientAgentOptions(instructions: "You are a spam detection assistant that identifies spam emails.")
    {
        ChatOptions = new()
        {
            ResponseFormat = ChatResponseFormat.ForJsonSchema(AIJsonUtilities.CreateJsonSchema(typeof(DetectionResult)))
        }
    });

/// <summary>
/// Creates an email assistant agent.
/// </summary>
/// <returns>A ChatClientAgent configured for email assistance</returns>
private static ChatClientAgent GetEmailAssistantAgent(IChatClient chatClient) =>
    new(chatClient, new ChatClientAgentOptions(instructions: "You are an email assistant that helps users draft professional responses to emails.")
    {
        ChatOptions = new()
        {
            ResponseFormat = ChatResponseFormat.ForJsonSchema(AIJsonUtilities.CreateJsonSchema(typeof(EmailResponse)))
        }
    });

Végrehajtók implementálása

Hozza létre az e-mail-feldolgozás különböző szakaszait kezelő munkafolyamat-végrehajtókat:

using Microsoft.Agents.AI.Workflows;
using System.Text.Json;

/// <summary>
/// Executor that detects spam using an AI agent.
/// </summary>
internal sealed partial class SpamDetectionExecutor : Executor
{
    private readonly AIAgent _spamDetectionAgent;

    public SpamDetectionExecutor(AIAgent spamDetectionAgent) : base("SpamDetectionExecutor")
    {
        this._spamDetectionAgent = spamDetectionAgent;
    }

    [MessageHandler]
    private async ValueTask<DetectionResult> HandleAsync(ChatMessage message, IWorkflowContext context, CancellationToken cancellationToken = default)
    {
        // Generate a random email ID and store the email content to shared state
        var newEmail = new Email
        {
            EmailId = Guid.NewGuid().ToString("N"),
            EmailContent = message.Text
        };
        await context.QueueStateUpdateAsync(newEmail.EmailId, newEmail, scopeName: EmailStateConstants.EmailStateScope);

        // Invoke the agent for spam detection
        var response = await this._spamDetectionAgent.RunAsync(message);
        var detectionResult = JsonSerializer.Deserialize<DetectionResult>(response.Text);

        detectionResult!.EmailId = newEmail.EmailId;
        return detectionResult;
    }
}

/// <summary>
/// Executor that assists with email responses using an AI agent.
/// </summary>
internal sealed partial class EmailAssistantExecutor : Executor
{
    private readonly AIAgent _emailAssistantAgent;

    public EmailAssistantExecutor(AIAgent emailAssistantAgent) : base("EmailAssistantExecutor")
    {
        this._emailAssistantAgent = emailAssistantAgent;
    }

    [MessageHandler]
    private async ValueTask<EmailResponse> HandleAsync(DetectionResult message, IWorkflowContext context, CancellationToken cancellationToken = default)
    {
        if (message.IsSpam)
        {
            throw new ArgumentException("This executor should only handle non-spam messages.");
        }

        // Retrieve the email content from shared state
        var email = await context.ReadStateAsync<Email>(message.EmailId, scopeName: EmailStateConstants.EmailStateScope)
            ?? throw new InvalidOperationException("Email not found.");

        // Invoke the agent to draft a response
        var response = await this._emailAssistantAgent.RunAsync(email.EmailContent);
        var emailResponse = JsonSerializer.Deserialize<EmailResponse>(response.Text);

        return emailResponse!;
    }
}

/// <summary>
/// Executor that sends emails.
/// </summary>
internal sealed partial class SendEmailExecutor : Executor
{
    public SendEmailExecutor() : base("SendEmailExecutor") { }

    [MessageHandler]
    private async ValueTask HandleAsync(EmailResponse message, IWorkflowContext context, CancellationToken cancellationToken = default) =>
        await context.YieldOutputAsync($"Email sent: {message.Response}");
}

/// <summary>
/// Executor that handles spam messages.
/// </summary>
internal sealed partial class HandleSpamExecutor : Executor
{
    public HandleSpamExecutor() : base("HandleSpamExecutor") { }

    [MessageHandler]
    private async ValueTask HandleAsync(DetectionResult message, IWorkflowContext context, CancellationToken cancellationToken = default)
    {
        if (message.IsSpam)
        {
            await context.YieldOutputAsync($"Email marked as spam: {message.Reason}");
        }
        else
        {
            throw new ArgumentException("This executor should only handle spam messages.");
        }
    }
}

Munkafolyamat létrehozása feltételes élekkel

Most hozza létre a munkafolyamatot létrehozó és végrehajtó fő programot:

using Microsoft.Extensions.AI;

public static class Program
{
    private static async Task Main()
    {
        // Set up the Azure OpenAI client
        var endpoint = Environment.GetEnvironmentVariable("AZURE_OPENAI_ENDPOINT")
            ?? throw new Exception("AZURE_OPENAI_ENDPOINT is not set.");
        var deploymentName = Environment.GetEnvironmentVariable("AZURE_OPENAI_DEPLOYMENT_NAME") ?? "gpt-4o-mini";
        var chatClient = new AzureOpenAIClient(new Uri(endpoint), new DefaultAzureCredential())
            .GetChatClient(deploymentName).AsIChatClient();

        // Create agents
        AIAgent spamDetectionAgent = GetSpamDetectionAgent(chatClient);
        AIAgent emailAssistantAgent = GetEmailAssistantAgent(chatClient);

        // Create executors
        var spamDetectionExecutor = new SpamDetectionExecutor(spamDetectionAgent);
        var emailAssistantExecutor = new EmailAssistantExecutor(emailAssistantAgent);
        var sendEmailExecutor = new SendEmailExecutor();
        var handleSpamExecutor = new HandleSpamExecutor();

        // Build the workflow with conditional edges
        var workflow = new WorkflowBuilder(spamDetectionExecutor)
            // Non-spam path: route to email assistant when IsSpam = false
            .AddEdge(spamDetectionExecutor, emailAssistantExecutor, condition: GetCondition(expectedResult: false))
            .AddEdge(emailAssistantExecutor, sendEmailExecutor)
            // Spam path: route to spam handler when IsSpam = true
            .AddEdge(spamDetectionExecutor, handleSpamExecutor, condition: GetCondition(expectedResult: true))
            .WithOutputFrom(handleSpamExecutor, sendEmailExecutor)
            .Build();

        // Execute the workflow with sample spam email
        string emailContent = "Congratulations! You've won $1,000,000! Click here to claim your prize now!";
        StreamingRun run = await InProcessExecution.StreamAsync(workflow, new ChatMessage(ChatRole.User, emailContent));
        await run.TrySendMessageAsync(new TurnToken(emitEvents: true));

        await foreach (WorkflowEvent evt in run.WatchStreamAsync().ConfigureAwait(false))
        {
            if (evt is WorkflowOutputEvent outputEvent)
            {
                Console.WriteLine($"{outputEvent}");
            }
        }
    }
}

Figyelmeztetés

DefaultAzureCredential a fejlesztéshez kényelmes, de a termelési környezetben gondos megfontolást igényel. Éles környezetben fontolja meg egy adott hitelesítő adat (pl. ManagedIdentityCredential) használatát a késési problémák elkerülése, a hitelesítő adatok nem szándékos próbálgatásának és a tartalék mechanizmusokból eredő esetleges biztonsági kockázatok elkerülése érdekében.

Hogyan működik?

  1. Munkafolyamat-bejegyzés: A munkafolyamat azzal kezdődik, hogy spamDetectionExecutorChatMessage érkezik.

  2. Levélszemételemzés: A levélszemétészlelő ügynök elemzi az e-maileket, és strukturált és DetectionResult tulajdonságokkal rendelkező IsSpam adatokat ad Reason vissza.

  3. Feltételes útválasztás: Az IsSpam érték alapján:

    • Ha levélszemét (): Útvonalak IsSpam = true használatával HandleSpamExecutor
    • Ha jogos (): Útvonalak IsSpam = false használatához EmailAssistantExecutor
  4. Válaszgenerálás: A megbízható e-mailek esetében az e-mail-asszisztens profi választ készít.

  5. Végső kimenet: A munkafolyamat levélszemét-értesítést küld, vagy elküldi a betervezett e-mail-választ.

A feltételes élek főbb jellemzői

  1. Type-Safe feltételek: A GetCondition metódus újrahasználható állapotfüggvényeket hoz létre, amelyek biztonságosan értékelik ki az üzenet tartalmát.

  2. Több elérési út: Egyetlen végrehajtó több különböző feltételekkel rendelkező kimenő éllel rendelkezhet, így összetett elágaztatási logikát tesz lehetővé.

  3. Megosztott állapot: Az e-mail-adatok hatóköralapú állapotkezeléssel maradnak a végrehajtók között, így az alsóbb rétegbeli végrehajtók hozzáférhetnek az eredeti tartalomhoz.

  4. Hibakezelés: A végrehajtók ellenőrzik a bemeneteiket, és jelentős kivételeket jeleznek váratlan üzenettípusok fogadásakor.

  5. Tiszta architektúra: Minden végrehajtónak egyetlen felelőssége van, így a munkafolyamat karbantartható és tesztelhető.

A példa futtatása

Amikor ezt a munkafolyamatot a spam mintalevéllel futtatja:

Email marked as spam: This email contains common spam indicators including monetary prizes, urgency tactics, and suspicious links that are typical of phishing attempts.

Próbálja meg az e-mail-tartalmat egy megbízhatóra módosítani:

string emailContent = "Hi, I wanted to follow up on our meeting yesterday and get your thoughts on the project proposal.";

A munkafolyamat az e-mail-asszisztenshez irányítódik, és professzionális választ generál helyette.

Ez a feltételes útválasztási minta képezi az összetett döntési fákat és üzleti logikát kezelni képes kifinomult munkafolyamatok létrehozásának alapjait.

Implementáció befejezése

A teljes munka végrehajtásához tekintse meg ezt a mintát az Agent Framework-adattárban.

Mit fog felépíteni?

Létrehoz egy e-mail-feldolgozási munkafolyamatot, amely bemutatja a feltételes útválasztást:

  • Levélszemét-észlelési ügynök, amely elemzi a bejövő e-maileket
  • Feltételes élek, amelyek az e-maileket a besorolás alapján különböző kezelőkhöz irányítják
  • Egy megbízható e-mail-kezelő, amely profi válaszokat készít
  • Egy levélszemétkezelő, amely gyanús e-maileket jelöl

A tárgyalt fogalmak

Előfeltételek

  • Python 3.10 vagy újabb
  • Telepített Ügynök-keretrendszer: pip install agent-framework-core --pre
  • Megfelelő környezeti változókkal konfigurált Azure OpenAI-szolgáltatás
  • Azure CLI-hitelesítés: az login

1. lépés: Kötelező függőségek importálása

Először importálja a feltételes munkafolyamatokhoz szükséges összetevőket:

import asyncio
import os
from dataclasses import dataclass
from typing import Any, Literal
from uuid import uuid4

from typing_extensions import Never

from agent_framework import (
    AgentExecutor,
    AgentExecutorRequest,
    AgentExecutorResponse,
    Message,
    WorkflowBuilder,
    WorkflowContext,
    executor,
    Case,
    Default,
)
from agent_framework.azure import AzureOpenAIChatClient
from azure.identity import AzureCliCredential
from pydantic import BaseModel

2. lépés: Adatmodellek definiálása

Pydantic-modellek létrehozása a munkafolyamat-összetevők közötti strukturált adatcseréhez:

class DetectionResult(BaseModel):
    """Represents the result of spam detection."""
    # is_spam drives the routing decision taken by edge conditions
    is_spam: bool
    # Human readable rationale from the detector
    reason: str
    # The agent must include the original email so downstream agents can operate without reloading content
    email_content: str


class EmailResponse(BaseModel):
    """Represents the response from the email assistant."""
    # The drafted reply that a user could copy or send
    response: str

3. lépés: Feltételfüggvények létrehozása

Adja meg az útválasztási döntéseket meghatározó feltételfüggvényeket:

def get_condition(expected_result: bool):
    """Create a condition callable that routes based on DetectionResult.is_spam."""

    # The returned function will be used as an edge predicate.
    # It receives whatever the upstream executor produced.
    def condition(message: Any) -> bool:
        # Defensive guard. If a non AgentExecutorResponse appears, let the edge pass to avoid dead ends.
        if not isinstance(message, AgentExecutorResponse):
            return True

        try:
            # Prefer parsing a structured DetectionResult from the agent JSON text.
            # Using model_validate_json ensures type safety and raises if the shape is wrong.
            detection = DetectionResult.model_validate_json(message.agent_run_response.text)
            # Route only when the spam flag matches the expected path.
            return detection.is_spam == expected_result
        except Exception:
            # Fail closed on parse errors so we do not accidentally route to the wrong path.
            # Returning False prevents this edge from activating.
            return False

    return condition

4. lépés: Kezelői végrehajtók létrehozása

A különböző útválasztási eredmények kezeléséhez definiáljon végrehajtókat:

@executor(id="send_email")
async def handle_email_response(response: AgentExecutorResponse, ctx: WorkflowContext[Never, str]) -> None:
    """Handle legitimate emails by drafting a professional response."""
    # Downstream of the email assistant. Parse a validated EmailResponse and yield the workflow output.
    email_response = EmailResponse.model_validate_json(response.agent_run_response.text)
    await ctx.yield_output(f"Email sent:\n{email_response.response}")


@executor(id="handle_spam")
async def handle_spam_classifier_response(response: AgentExecutorResponse, ctx: WorkflowContext[Never, str]) -> None:
    """Handle spam emails by marking them appropriately."""
    # Spam path. Confirm the DetectionResult and yield the workflow output. Guard against accidental non spam input.
    detection = DetectionResult.model_validate_json(response.agent_run_response.text)
    if detection.is_spam:
        await ctx.yield_output(f"Email marked as spam: {detection.reason}")
    else:
        # This indicates the routing predicate and executor contract are out of sync.
        raise RuntimeError("This executor should only handle spam messages.")


@executor(id="to_email_assistant_request")
async def to_email_assistant_request(
    response: AgentExecutorResponse, ctx: WorkflowContext[AgentExecutorRequest]
) -> None:
    """Transform spam detection response into a request for the email assistant."""
    # Parse the detection result and extract the email content for the assistant
    detection = DetectionResult.model_validate_json(response.agent_run_response.text)

    # Create a new request for the email assistant with the original email content
    request = AgentExecutorRequest(
        messages=[Message(role="user", contents=[detection.email_content])],
        should_respond=True
    )
    await ctx.send_message(request)

5. lépés: AI-ügynökök létrehozása

Az Azure OpenAI-ügynökök beállítása strukturált kimeneti formázással:

async def main() -> None:
    # Create agents
    # AzureCliCredential uses your current az login. This avoids embedding secrets in code.
    chat_client = AzureOpenAIChatClient(credential=AzureCliCredential())

    # Agent 1. Classifies spam and returns a DetectionResult object.
    # response_format enforces that the LLM returns parsable JSON for the Pydantic model.
    spam_detection_agent = AgentExecutor(
        chat_client.as_agent(
            instructions=(
                "You are a spam detection assistant that identifies spam emails. "
                "Always return JSON with fields is_spam (bool), reason (string), and email_content (string). "
                "Include the original email content in email_content."
            ),
            response_format=DetectionResult,
        ),
        id="spam_detection_agent",
    )

    # Agent 2. Drafts a professional reply. Also uses structured JSON output for reliability.
    email_assistant_agent = AgentExecutor(
        chat_client.as_agent(
            instructions=(
                "You are an email assistant that helps users draft professional responses to emails. "
                "Your input might be a JSON object that includes 'email_content'; base your reply on that content. "
                "Return JSON with a single field 'response' containing the drafted reply."
            ),
            response_format=EmailResponse,
        ),
        id="email_assistant_agent",
    )

6. lépés: A feltételes munkafolyamat létrehozása

Hozzon létre egy munkafolyamatot olyan feltételes élekkel, amelyek a levélszemétészlelési eredmények alapján haladnak:

    # Build the workflow graph.
    # Start at the spam detector.
    # If not spam, hop to a transformer that creates a new AgentExecutorRequest,
    # then call the email assistant, then finalize.
    # If spam, go directly to the spam handler and finalize.
    workflow = (
        WorkflowBuilder(start_executor=spam_detection_agent)
        # Not spam path: transform response -> request for assistant -> assistant -> send email
        .add_edge(spam_detection_agent, to_email_assistant_request, condition=get_condition(False))
        .add_edge(to_email_assistant_request, email_assistant_agent)
        .add_edge(email_assistant_agent, handle_email_response)
        # Spam path: send to spam handler
        .add_edge(spam_detection_agent, handle_spam_classifier_response, condition=get_condition(True))
        .build()
    )

7. lépés: A munkafolyamat végrehajtása

Futtassa a munkafolyamatot minta e-mail-tartalommal:

    # Read Email content from the sample resource file.
    # This keeps the sample deterministic since the model sees the same email every run.
    email_path = os.path.join(os.path.dirname(os.path.dirname(os.path.realpath(__file__))), "resources", "email.txt")

    with open(email_path) as email_file:  # noqa: ASYNC230
        email = email_file.read()

    # Execute the workflow. Since the start is an AgentExecutor, pass an AgentExecutorRequest.
    # The workflow completes when it becomes idle (no more work to do).
    request = AgentExecutorRequest(messages=[Message(role="user", contents=[email])], should_respond=True)
    events = await workflow.run(request)
    outputs = events.get_outputs()
    if outputs:
        print(f"Workflow output: {outputs[0]}")


if __name__ == "__main__":
    asyncio.run(main())

A feltételes élek működése

  1. Feltételfüggvények: A get_condition() függvény létrehoz egy predikátumot, amely megvizsgálja az üzenet tartalmát, és visszaadja True vagy False annak meghatározására, hogy át kell-e haladni az élen.

  2. Üzenetvizsgálat: A feltételek az üzenet bármely aspektusát megvizsgálhatják, beleértve a Pydantic-modellekkel elemezett ügynökválaszokból származó strukturált adatokat is.

  3. Védelmi programozás: A feltételfüggvény hibakezelést tartalmaz, amely megakadályozza a strukturált adatok elemzésekor fellépő útválasztási hibákat.

  4. Dinamikus útválasztás: A levélszemét észlelésének eredménye alapján a rendszer automatikusan átirányítja az e-maileket az e-mail-asszisztenshez (jogos e-mailek esetén) vagy a levélszemétkezelőhöz (gyanús e-mailek esetén).

Alapfogalmak

  • Élfeltételek: Logikai predikátumok, amelyek meghatározzák, hogy át kell-e haladni egy élen
  • Strukturált kimenetek: Pydantic-modellek használata megbízható adatelemzéssel response_format
  • Védekező útválasztás: A feltételfüggvények kezelik a szélsőséges eseteket a munkafolyamat holtpontjainak megakadályozása érdekében
  • Üzenetátalakítás: A végrehajtók átalakíthatják az üzenettípusokat a munkafolyamat lépései között

Implementáció befejezése

A teljes munka implementációjához tekintse meg az ügynök-keretrendszer adattárában található edge_condition.py mintát.

Switch-Case élek

Feltételes élekre építve

Az előző feltételes élek példája kétirányú útválasztást mutatott (levélszemét és jogos e-mailek). Számos valós forgatókönyv azonban kifinomultabb döntési fákat igényel. A kapcsolótok élei tisztább, karbantarthatóbb megoldást nyújtanak, ha különböző feltételek alapján több célhelyre kell irányítania.

Amit meg fogsz építeni a Switch-Case segítségével

Az e-mail-feldolgozási munkafolyamatot három döntési útvonal kezelésére fogja kiterjeszteni:

  • NotSpam → e-mail-asszisztens → e-mail küldése
  • Levélszemét → Levélszemét-végrehajtó kezelése
  • Bizonytalan → Bizonytalan végrehajtó kezelése (alapértelmezett eset)

A legfontosabb fejlesztés a minta használata több SwitchBuilder különálló feltételes él helyett, így a munkafolyamat könnyebben érthető és karbantartható a döntési összetettség növekedésével.

A tárgyalt fogalmak

Adatmodellek Switch-Case

Frissítse az adatmodelleket a háromirányú besorolás támogatásához:

/// <summary>
/// Represents the possible decisions for spam detection.
/// </summary>
public enum SpamDecision
{
    NotSpam,
    Spam,
    Uncertain
}

/// <summary>
/// Represents the result of spam detection with enhanced decision support.
/// </summary>
public sealed class DetectionResult
{
    [JsonPropertyName("spam_decision")]
    [JsonConverter(typeof(JsonStringEnumConverter))]
    public SpamDecision spamDecision { get; set; }

    [JsonPropertyName("reason")]
    public string Reason { get; set; } = string.Empty;

    // Email ID is generated by the executor, not the agent
    [JsonIgnore]
    public string EmailId { get; set; } = string.Empty;
}

/// <summary>
/// Represents an email stored in shared state.
/// </summary>
internal sealed class Email
{
    [JsonPropertyName("email_id")]
    public string EmailId { get; set; } = string.Empty;

    [JsonPropertyName("email_content")]
    public string EmailContent { get; set; } = string.Empty;
}

/// <summary>
/// Represents the response from the email assistant.
/// </summary>
public sealed class EmailResponse
{
    [JsonPropertyName("response")]
    public string Response { get; set; } = string.Empty;
}

/// <summary>
/// Constants for shared state scopes.
/// </summary>
internal static class EmailStateConstants
{
    public const string EmailStateScope = "EmailState";
}

Feltételgyár Switch-Case

Hozzon létre egy újrafelhasználható feltétel-előállítót, amely predikátumokat hoz létre az egyes levélszemét-döntésekhez:

/// <summary>
/// Creates a condition for routing messages based on the expected spam detection result.
/// </summary>
/// <param name="expectedDecision">The expected spam detection decision</param>
/// <returns>A function that evaluates whether a message meets the expected result</returns>
private static Func<object?, bool> GetCondition(SpamDecision expectedDecision) =>
    detectionResult => detectionResult is DetectionResult result && result.spamDecision == expectedDecision;

Ez a gyári megközelítés:

  • Csökkenti a kód duplikációját: Egy függvény létrehozza az összes feltétel-predikátumot
  • Konzisztencia biztosítása: Minden feltétel ugyanazt a mintát követi
  • Egyszerűbb karbantartás: A feltétellogika módosítása egy helyen történik

Továbbfejlesztett AI-ügynök

Frissítse a levélszemét-észlelési ügynököt, hogy kevésbé legyen magabiztos, és háromirányú besorolást ad vissza:

/// <summary>
/// Creates a spam detection agent with enhanced uncertainty handling.
/// </summary>
/// <returns>A ChatClientAgent configured for three-way spam detection</returns>
private static ChatClientAgent GetSpamDetectionAgent(IChatClient chatClient) =>
    new(chatClient, new ChatClientAgentOptions(instructions: "You are a spam detection assistant that identifies spam emails. Be less confident in your assessments.")
    {
        ChatOptions = new()
        {
            ResponseFormat = ChatResponseFormat.ForJsonSchema<DetectionResult>()
        }
    });

/// <summary>
/// Creates an email assistant agent (unchanged from conditional edges example).
/// </summary>
/// <returns>A ChatClientAgent configured for email assistance</returns>
private static ChatClientAgent GetEmailAssistantAgent(IChatClient chatClient) =>
    new(chatClient, new ChatClientAgentOptions(instructions: "You are an email assistant that helps users draft responses to emails with professionalism.")
    {
        ChatOptions = new()
        {
            ResponseFormat = ChatResponseFormat.ForJsonSchema<EmailResponse>()
        }
    });

Munkafolyamat-végrehajtók továbbfejlesztett útválasztással

A háromirányú útválasztást megosztott állapotkezeléssel kezelő végrehajtók implementálása:

/// <summary>
/// Executor that detects spam using an AI agent with three-way classification.
/// </summary>
internal sealed partial class SpamDetectionExecutor : Executor
{
    private readonly AIAgent _spamDetectionAgent;

    public SpamDetectionExecutor(AIAgent spamDetectionAgent) : base("SpamDetectionExecutor")
    {
        this._spamDetectionAgent = spamDetectionAgent;
    }

    [MessageHandler]
    private async ValueTask<DetectionResult> HandleAsync(ChatMessage message, IWorkflowContext context, CancellationToken cancellationToken = default)
    {
        // Generate a random email ID and store the email content in shared state
        var newEmail = new Email
        {
            EmailId = Guid.NewGuid().ToString("N"),
            EmailContent = message.Text
        };
        await context.QueueStateUpdateAsync(newEmail.EmailId, newEmail, scopeName: EmailStateConstants.EmailStateScope);

        // Invoke the agent for enhanced spam detection
        var response = await this._spamDetectionAgent.RunAsync(message);
        var detectionResult = JsonSerializer.Deserialize<DetectionResult>(response.Text);

        detectionResult!.EmailId = newEmail.EmailId;
        return detectionResult;
    }
}

/// <summary>
/// Executor that assists with email responses using an AI agent.
/// </summary>
internal sealed partial class EmailAssistantExecutor : Executor
{
    private readonly AIAgent _emailAssistantAgent;

    public EmailAssistantExecutor(AIAgent emailAssistantAgent) : base("EmailAssistantExecutor")
    {
        this._emailAssistantAgent = emailAssistantAgent;
    }

    [MessageHandler]
    private async ValueTask<EmailResponse> HandleAsync(DetectionResult message, IWorkflowContext context, CancellationToken cancellationToken = default)
    {
        if (message.spamDecision == SpamDecision.Spam)
        {
            throw new ArgumentException("This executor should only handle non-spam messages.");
        }

        // Retrieve the email content from shared state
        var email = await context.ReadStateAsync<Email>(message.EmailId, scopeName: EmailStateConstants.EmailStateScope);

        // Invoke the agent to draft a response
        var response = await this._emailAssistantAgent.RunAsync(email!.EmailContent);
        var emailResponse = JsonSerializer.Deserialize<EmailResponse>(response.Text);

        return emailResponse!;
    }
}

/// <summary>
/// Executor that sends emails.
/// </summary>
internal sealed partial class SendEmailExecutor : Executor
{
    public SendEmailExecutor() : base("SendEmailExecutor") { }

    [MessageHandler]
    private async ValueTask HandleAsync(EmailResponse message, IWorkflowContext context, CancellationToken cancellationToken = default) =>
        await context.YieldOutputAsync($"Email sent: {message.Response}").ConfigureAwait(false);
}

/// <summary>
/// Executor that handles spam messages.
/// </summary>
internal sealed partial class HandleSpamExecutor : Executor
{
    public HandleSpamExecutor() : base("HandleSpamExecutor") { }

    [MessageHandler]
    private async ValueTask HandleAsync(DetectionResult message, IWorkflowContext context, CancellationToken cancellationToken = default)
    {
        if (message.spamDecision == SpamDecision.Spam)
        {
            await context.YieldOutputAsync($"Email marked as spam: {message.Reason}").ConfigureAwait(false);
        }
        else
        {
            throw new ArgumentException("This executor should only handle spam messages.");
        }
    }
}

/// <summary>
/// Executor that handles uncertain emails requiring manual review.
/// </summary>
internal sealed partial class HandleUncertainExecutor : Executor
{
    public HandleUncertainExecutor() : base("HandleUncertainExecutor") { }

    [MessageHandler]
    private async ValueTask HandleAsync(DetectionResult message, IWorkflowContext context, CancellationToken cancellationToken = default)
    {
        if (message.spamDecision == SpamDecision.Uncertain)
        {
            var email = await context.ReadStateAsync<Email>(message.EmailId, scopeName: EmailStateConstants.EmailStateScope);
            await context.YieldOutputAsync($"Email marked as uncertain: {message.Reason}. Email content: {email?.EmailContent}");
        }
        else
        {
            throw new ArgumentException("This executor should only handle uncertain spam decisions.");
        }
    }
}

Munkafolyamat építése Switch-Case sémával

Cserélje le több feltételes élét a tisztább kapcsolós esetmintára:

public static class Program
{
    private static async Task Main()
    {
        // Set up the Azure OpenAI client
        var endpoint = Environment.GetEnvironmentVariable("AZURE_OPENAI_ENDPOINT") ?? throw new Exception("AZURE_OPENAI_ENDPOINT is not set.");
        var deploymentName = Environment.GetEnvironmentVariable("AZURE_OPENAI_DEPLOYMENT_NAME") ?? "gpt-4o-mini";
        var chatClient = new AzureOpenAIClient(new Uri(endpoint), new DefaultAzureCredential()).GetChatClient(deploymentName).AsIChatClient();

        // Create agents
        AIAgent spamDetectionAgent = GetSpamDetectionAgent(chatClient);
        AIAgent emailAssistantAgent = GetEmailAssistantAgent(chatClient);

        // Create executors
        var spamDetectionExecutor = new SpamDetectionExecutor(spamDetectionAgent);
        var emailAssistantExecutor = new EmailAssistantExecutor(emailAssistantAgent);
        var sendEmailExecutor = new SendEmailExecutor();
        var handleSpamExecutor = new HandleSpamExecutor();
        var handleUncertainExecutor = new HandleUncertainExecutor();

        // Build the workflow using switch-case for cleaner three-way routing
        WorkflowBuilder builder = new(spamDetectionExecutor);
        builder.AddSwitch(spamDetectionExecutor, switchBuilder =>
            switchBuilder
            .AddCase(
                GetCondition(expectedDecision: SpamDecision.NotSpam),
                emailAssistantExecutor
            )
            .AddCase(
                GetCondition(expectedDecision: SpamDecision.Spam),
                handleSpamExecutor
            )
            .WithDefault(
                handleUncertainExecutor
            )
        )
        // After the email assistant writes a response, it will be sent to the send email executor
        .AddEdge(emailAssistantExecutor, sendEmailExecutor)
        .WithOutputFrom(handleSpamExecutor, sendEmailExecutor, handleUncertainExecutor);

        var workflow = builder.Build();

        // Read an email from a text file (use ambiguous content for demonstration)
        string email = Resources.Read("ambiguous_email.txt");

        // Execute the workflow
        StreamingRun run = await InProcessExecution.StreamAsync(workflow, new ChatMessage(ChatRole.User, email));
        await run.TrySendMessageAsync(new TurnToken(emitEvents: true));
        await foreach (WorkflowEvent evt in run.WatchStreamAsync().ConfigureAwait(false))
        {
            if (evt is WorkflowOutputEvent outputEvent)
            {
                Console.WriteLine($"{outputEvent}");
            }
        }
    }
}

Switch-Case előnyök

  1. Tisztább szintaxis: A SwitchBuilder több feltételes él helyett olvashatóbb alternatívát kínál
  2. Megrendelt értékelés: Az eseteket egymás után értékelik ki, és az első meccsen leállnak
  3. Garantált útválasztás: A WithDefault() módszer biztosítja, hogy az üzenetek soha ne ragadnak meg
  4. Jobb karbantarthatóság: Az új esetek hozzáadása minimális módosítást igényel a munkafolyamat-struktúrában
  5. Típusbiztonság: Minden végrehajtó ellenőrzi a bemenetét az útválasztási hibák korai észleléséhez

Minta összehasonlítása

Előtte (feltételes élcsomópontok):

var workflow = new WorkflowBuilder(spamDetectionExecutor)
    .AddEdge(spamDetectionExecutor, emailAssistantExecutor, condition: GetCondition(expectedResult: false))
    .AddEdge(spamDetectionExecutor, handleSpamExecutor, condition: GetCondition(expectedResult: true))
    // No clean way to handle a third case
    .WithOutputFrom(handleSpamExecutor, sendEmailExecutor)
    .Build();

Utána (Switch-Case):

WorkflowBuilder builder = new(spamDetectionExecutor);
builder.AddSwitch(spamDetectionExecutor, switchBuilder =>
    switchBuilder
    .AddCase(GetCondition(SpamDecision.NotSpam), emailAssistantExecutor)
    .AddCase(GetCondition(SpamDecision.Spam), handleSpamExecutor)
    .WithDefault(handleUncertainExecutor)  // Clean default case
)
// Continue building the rest of the workflow

A switch-case minta sokkal jobban skálázódik ahogy az útválasztási döntések száma növekszik, és az alapértelmezett eset biztonsági hálót nyújt a váratlan értékek kezeléséhez.

A példa futtatása

Ha nem egyértelmű e-mail-tartalommal futtatja ezt a munkafolyamatot:

Email marked as uncertain: This email contains promotional language but might be from a legitimate business contact, requiring human review for proper classification.

Próbálja meg módosítani az e-mail-tartalmat egy egyértelműen spamre vagy egyértelműen legitimre, hogy lássa a különböző útválasztási útvonalak működését.

Implementáció befejezése

A teljes munka végrehajtásához tekintse meg ezt a mintát az Agent Framework-adattárban.

Feltételes élekre építve

Az előző feltételes élek példája kétirányú útválasztást mutatott (levélszemét és jogos e-mailek). Számos valós forgatókönyv azonban kifinomultabb döntési fákat igényel. A kapcsolótok élei tisztább, karbantarthatóbb megoldást nyújtanak, ha különböző feltételek alapján több célhelyre kell irányítania.

Mit fogsz legközelebb készíteni?

Az e-mail-feldolgozási munkafolyamatot három döntési útvonal kezelésére fogja kiterjeszteni:

  • NotSpam → e-mail-asszisztens → e-mail küldése
  • Levélszemét → Megjelölés levélszemétként
  • Bizonytalan → jelölő manuális felülvizsgálathoz (alapértelmezett eset)

A legfontosabb fejlesztés az, hogy több különálló feltételes él helyett egyetlen 'switch-case' élcsoportot használunk. Ez megkönnyíti a munkafolyamat megértését és karbantartását, ahogy a döntési komplexitás növekszik.

A tárgyalt fogalmak

Továbbfejlesztett adatmodellek

Frissítse az adatmodelleket a háromirányú besorolás támogatásához:

from typing import Literal

class DetectionResultAgent(BaseModel):
    """Structured output returned by the spam detection agent."""

    # The agent classifies the email into one of three categories
    spam_decision: Literal["NotSpam", "Spam", "Uncertain"]
    reason: str

class EmailResponse(BaseModel):
    """Structured output returned by the email assistant agent."""

    response: str

@dataclass
class DetectionResult:
    """Internal typed payload used for routing and downstream handling."""

    spam_decision: str
    reason: str
    email_id: str

@dataclass
class Email:
    """In memory record of the email content stored in shared state."""

    email_id: str
    email_content: str

Kapcsoló-Eset Feltételgyár

Hozzon létre egy újrafelhasználható feltétel-előállítót, amely predikátumokat hoz létre az egyes levélszemét-döntésekhez:

def get_case(expected_decision: str):
    """Factory that returns a predicate matching a specific spam_decision value."""

    def condition(message: Any) -> bool:
        # Only match when the upstream payload is a DetectionResult with the expected decision
        return isinstance(message, DetectionResult) and message.spam_decision == expected_decision

    return condition

Ez a gyári megközelítés:

  • Csökkenti a kód duplikációját: Egy függvény létrehozza az összes feltétel-predikátumot
  • Konzisztencia biztosítása: Minden feltétel ugyanazt a mintát követi
  • Egyszerűbb karbantartás: A feltétellogika módosítása egy helyen történik

Megosztott állapotú munkafolyamat-végrehajtók

Olyan végrehajtók implementálása, amelyek megosztott állapotot használnak, hogy ne adjanak át nagy mennyiségű e-mail-tartalmat minden munkafolyamat-lépésben:

EMAIL_STATE_PREFIX = "email:"
CURRENT_EMAIL_ID_KEY = "current_email_id"

@executor(id="store_email")
async def store_email(email_text: str, ctx: WorkflowContext[AgentExecutorRequest]) -> None:
    """Store email content once and pass around a lightweight ID reference."""

    # Persist the raw email content in shared state
    new_email = Email(email_id=str(uuid4()), email_content=email_text)
    ctx.set_state(f"{EMAIL_STATE_PREFIX}{new_email.email_id}", new_email)
    ctx.set_state(CURRENT_EMAIL_ID_KEY, new_email.email_id)

    # Forward email to spam detection agent
    await ctx.send_message(
        AgentExecutorRequest(messages=[Message(role="user", contents=[new_email.email_content])], should_respond=True)
    )

@executor(id="to_detection_result")
async def to_detection_result(response: AgentExecutorResponse, ctx: WorkflowContext[DetectionResult]) -> None:
    """Transform agent response into a typed DetectionResult with email ID."""

    # Parse the agent's structured JSON output
    parsed = DetectionResultAgent.model_validate_json(response.agent_run_response.text)
    email_id: str = ctx.get_state(CURRENT_EMAIL_ID_KEY)

    # Create typed message for switch-case routing
    await ctx.send_message(DetectionResult(
        spam_decision=parsed.spam_decision,
        reason=parsed.reason,
        email_id=email_id
    ))

@executor(id="submit_to_email_assistant")
async def submit_to_email_assistant(detection: DetectionResult, ctx: WorkflowContext[AgentExecutorRequest]) -> None:
    """Handle NotSpam emails by forwarding to the email assistant."""

    # Guard against misrouting
    if detection.spam_decision != "NotSpam":
        raise RuntimeError("This executor should only handle NotSpam messages.")

    # Retrieve original email content from shared state
    email: Email = ctx.get_state(f"{EMAIL_STATE_PREFIX}{detection.email_id}")
    await ctx.send_message(
        AgentExecutorRequest(messages=[Message(role="user", contents=[email.email_content])], should_respond=True)
    )

@executor(id="finalize_and_send")
async def finalize_and_send(response: AgentExecutorResponse, ctx: WorkflowContext[Never, str]) -> None:
    """Parse email assistant response and yield final output."""

    parsed = EmailResponse.model_validate_json(response.agent_run_response.text)
    await ctx.yield_output(f"Email sent: {parsed.response}")

@executor(id="handle_spam")
async def handle_spam(detection: DetectionResult, ctx: WorkflowContext[Never, str]) -> None:
    """Handle confirmed spam emails."""

    if detection.spam_decision == "Spam":
        await ctx.yield_output(f"Email marked as spam: {detection.reason}")
    else:
        raise RuntimeError("This executor should only handle Spam messages.")

@executor(id="handle_uncertain")
async def handle_uncertain(detection: DetectionResult, ctx: WorkflowContext[Never, str]) -> None:
    """Handle uncertain classifications that need manual review."""

    if detection.spam_decision == "Uncertain":
        # Include original content for human review
        email: Email | None = ctx.get_state(f"{EMAIL_STATE_PREFIX}{detection.email_id}")
        await ctx.yield_output(
            f"Email marked as uncertain: {detection.reason}. Email content: {getattr(email, 'email_content', '')}"
        )
    else:
        raise RuntimeError("This executor should only handle Uncertain messages.")

Továbbfejlesztett AI-ügynök létrehozása

Frissítse a levélszemét-észlelési ügynököt, hogy kevésbé legyen magabiztos, és háromirányú besorolást ad vissza:

async def main():
    chat_client = AzureOpenAIChatClient(credential=AzureCliCredential())

    # Enhanced spam detection agent with three-way classification
    spam_detection_agent = AgentExecutor(
        chat_client.as_agent(
            instructions=(
                "You are a spam detection assistant that identifies spam emails. "
                "Be less confident in your assessments. "
                "Always return JSON with fields 'spam_decision' (one of NotSpam, Spam, Uncertain) "
                "and 'reason' (string)."
            ),
            response_format=DetectionResultAgent,
        ),
        id="spam_detection_agent",
    )

    # Email assistant remains the same
    email_assistant_agent = AgentExecutor(
        chat_client.as_agent(
            instructions=(
                "You are an email assistant that helps users draft responses to emails with professionalism."
            ),
            response_format=EmailResponse,
        ),
        id="email_assistant_agent",
    )

Munkafolyamat létrehozása Switch-Case Edge-csoporttal

Több feltételes él cseréje egyetlen kapcsolós esetcsoportra:

    # Build workflow using switch-case for cleaner three-way routing
    workflow = (
        WorkflowBuilder(start_executor=store_email)
        .add_edge(store_email, spam_detection_agent)
        .add_edge(spam_detection_agent, to_detection_result)
        .add_switch_case_edge_group(
            to_detection_result,
            [
                # Explicit cases for specific decisions
                Case(condition=get_case("NotSpam"), target=submit_to_email_assistant),
                Case(condition=get_case("Spam"), target=handle_spam),
                # Default case catches anything that doesn't match above
                Default(target=handle_uncertain),
            ],
        )
        .add_edge(submit_to_email_assistant, email_assistant_agent)
        .add_edge(email_assistant_agent, finalize_and_send)
        .build()
    )

Végrehajtás és tesztelés

Futtassa a munkafolyamatot nem egyértelmű e-mail-tartalommal, amely bemutatja a háromirányú útválasztást:

    # Use ambiguous email content that might trigger uncertain classification
    email = (
        "Hey there, I noticed you might be interested in our latest offer—no pressure, but it expires soon. "
        "Let me know if you'd like more details."
    )

    # Execute and display results
    events = await workflow.run(email)
    outputs = events.get_outputs()
    if outputs:
        for output in outputs:
            print(f"Workflow output: {output}")

A Switch-Case élek fontos előnyei

  1. Tisztább szintaxis: Több feltételes él helyett egy élcsoport
  2. Megrendelt értékelés: Az eseteket egymás után értékelik ki, és az első meccsen leállnak
  3. Garantált útválasztás: Az alapértelmezett eset biztosítja, hogy az üzenetek soha ne ragadnak meg
  4. Jobb karbantarthatóság: Az új esetek hozzáadása minimális módosítást igényel
  5. Típusbiztonság: Minden végrehajtó ellenőrzi a bemenetét az útválasztási hibák észlelése érdekében

Összehasonlítás: Feltételes szerkezet vs. Switch-Case szerkezet

Előtte (feltételes élcsomópontok):

.add_edge(detector, handler_a, condition=lambda x: x.result == "A")
.add_edge(detector, handler_b, condition=lambda x: x.result == "B")
.add_edge(detector, handler_c, condition=lambda x: x.result == "C")

Utána (Switch-Case):

.add_switch_case_edge_group(
    detector,
    [
        Case(condition=lambda x: x.result == "A", target=handler_a),
        Case(condition=lambda x: x.result == "B", target=handler_b),
        Default(target=handler_c),  # Catches everything else
    ],
)

A switch-case minta sokkal jobban skálázódik ahogy az útválasztási döntések száma növekszik, és az alapértelmezett eset biztonsági hálót nyújt a váratlan értékek kezeléséhez.

Switch Case példakód

A teljes működő megvalósításért tekintse meg az Agent Framework adattárban található switch_case_edge_group.py mintát.

Többkijelölésű élek

A Switch-Case-en túl: Többválasztásos útválasztás

Bár a switch-case szerkezetek pontosan egy célhelyre irányítják az üzeneteket, a valós munkafolyamatoknak gyakran több párhuzamos műveletet kell aktiválniuk az adattulajdonságok alapján. A particionált élek (particionálókkal rendelkező kirakodó élekként implementálva) kifinomult kirakodómintákat tesznek lehetővé, ahol egyetlen üzenet egyszerre több alárendelt végrehajtót is aktiválhat.

Speciális e-mail-feldolgozási munkafolyamat

A kapcsolós eset példájára építve egy továbbfejlesztett e-mail-feldolgozó rendszert fog létrehozni, amely kifinomult útválasztási logikát mutat be:

  • Spam e-mailek → egységes levélszemét-kezelő (például switch-case szerkezet)
  • Megbízható e-mailekMindig aktiválja az e-mail-asszisztenst + Feltételesen aktiválja az összegzőt hosszú e-mailek esetén
  • Bizonytalan e-mailek → Egy bizonytalan kezelő (például switch-case szerkezet)
  • Adatbázis-adatmegőrzési → Rövid és összesített hosszú e-mailek esetén is aktiválódik

Ez a minta lehetővé teszi a tartalom jellemzőihez igazodó párhuzamos feldolgozási folyamatokat.

A tárgyalt fogalmak

Adatmodellek több kijelöléshez

Az adatmodellek kiterjesztése az e-mailek hosszának elemzéséhez és összegzéséhez:

/// <summary>
/// Represents the result of enhanced email analysis with additional metadata.
/// </summary>
public sealed class AnalysisResult
{
    [JsonPropertyName("spam_decision")]
    [JsonConverter(typeof(JsonStringEnumConverter))]
    public SpamDecision spamDecision { get; set; }

    [JsonPropertyName("reason")]
    public string Reason { get; set; } = string.Empty;

    // Additional properties for sophisticated routing
    [JsonIgnore]
    public int EmailLength { get; set; }

    [JsonIgnore]
    public string EmailSummary { get; set; } = string.Empty;

    [JsonIgnore]
    public string EmailId { get; set; } = string.Empty;
}

/// <summary>
/// Represents the response from the email assistant.
/// </summary>
public sealed class EmailResponse
{
    [JsonPropertyName("response")]
    public string Response { get; set; } = string.Empty;
}

/// <summary>
/// Represents the response from the email summary agent.
/// </summary>
public sealed class EmailSummary
{
    [JsonPropertyName("summary")]
    public string Summary { get; set; } = string.Empty;
}

/// <summary>
/// A custom workflow event for database operations.
/// </summary>
internal sealed class DatabaseEvent(string message) : WorkflowEvent(message) { }

/// <summary>
/// Constants for email processing thresholds.
/// </summary>
public static class EmailProcessingConstants
{
    public const int LongEmailThreshold = 100;
}

Célkijelölő függvény: A többszörös kijelölés szíve

A cél-hozzárendelő függvény határozza meg, hogy mely végrehajtók kapják meg az egyes üzeneteket:

/// <summary>
/// Creates a target assigner for routing messages based on the analysis result.
/// </summary>
/// <returns>A function that takes an analysis result and returns the target partitions.</returns>
private static Func<AnalysisResult?, int, IEnumerable<int>> GetTargetAssigner()
{
    return (analysisResult, targetCount) =>
    {
        if (analysisResult is not null)
        {
            if (analysisResult.spamDecision == SpamDecision.Spam)
            {
                return [0]; // Route only to spam handler (index 0)
            }
            else if (analysisResult.spamDecision == SpamDecision.NotSpam)
            {
                // Always route to email assistant (index 1)
                List<int> targets = [1];

                // Conditionally add summarizer for long emails (index 2)
                if (analysisResult.EmailLength > EmailProcessingConstants.LongEmailThreshold)
                {
                    targets.Add(2);
                }

                return targets;
            }
            else // Uncertain
            {
                return [3]; // Route only to uncertain handler (index 3)
            }
        }
        throw new ArgumentException("Invalid analysis result.");
    };
}

A Target Assigner függvény főbb jellemzői

  1. Dinamikus cél kiválasztása: Az aktiválandó végrehajtói indexek listáját adja vissza
  2. Tartalomérzékeny útválasztás: Döntéseket hoz az üzenet tulajdonságai, például az e-mail hossza alapján
  3. Párhuzamos feldolgozás: Egyszerre több cél is végrehajtható
  4. Feltételes logika: Összetett elágaztatás több feltétel alapján

Továbbfejlesztett munkafolyamat-végrehajtók

A speciális elemzést és útválasztást kezelő végrehajtók implementálása:

/// <summary>
/// Executor that analyzes emails using an AI agent with enhanced analysis.
/// </summary>
internal sealed partial class EmailAnalysisExecutor : Executor
{
    private readonly AIAgent _emailAnalysisAgent;

    public EmailAnalysisExecutor(AIAgent emailAnalysisAgent) : base("EmailAnalysisExecutor")
    {
        this._emailAnalysisAgent = emailAnalysisAgent;
    }

    [MessageHandler]
    private async ValueTask<AnalysisResult> HandleAsync(ChatMessage message, IWorkflowContext context, CancellationToken cancellationToken = default)
    {
        // Generate a random email ID and store the email content
        var newEmail = new Email
        {
            EmailId = Guid.NewGuid().ToString("N"),
            EmailContent = message.Text
        };
        await context.QueueStateUpdateAsync(newEmail.EmailId, newEmail, scopeName: EmailStateConstants.EmailStateScope);

        // Invoke the agent for enhanced analysis
        var response = await this._emailAnalysisAgent.RunAsync(message);
        var analysisResult = JsonSerializer.Deserialize<AnalysisResult>(response.Text);

        // Enrich with metadata for routing decisions
        analysisResult!.EmailId = newEmail.EmailId;
        analysisResult.EmailLength = newEmail.EmailContent.Length;

        return analysisResult;
    }
}

/// <summary>
/// Executor that assists with email responses using an AI agent.
/// </summary>
internal sealed partial class EmailAssistantExecutor : Executor
{
    private readonly AIAgent _emailAssistantAgent;

    public EmailAssistantExecutor(AIAgent emailAssistantAgent) : base("EmailAssistantExecutor")
    {
        this._emailAssistantAgent = emailAssistantAgent;
    }

    [MessageHandler]
    private async ValueTask<EmailResponse> HandleAsync(AnalysisResult message, IWorkflowContext context, CancellationToken cancellationToken = default)
    {
        if (message.spamDecision == SpamDecision.Spam)
        {
            throw new ArgumentException("This executor should only handle non-spam messages.");
        }

        // Retrieve the email content from shared state
        var email = await context.ReadStateAsync<Email>(message.EmailId, scopeName: EmailStateConstants.EmailStateScope);

        // Invoke the agent to draft a response
        var response = await this._emailAssistantAgent.RunAsync(email!.EmailContent);
        var emailResponse = JsonSerializer.Deserialize<EmailResponse>(response.Text);

        return emailResponse!;
    }
}

/// <summary>
/// Executor that summarizes emails using an AI agent for long emails.
/// </summary>
internal sealed partial class EmailSummaryExecutor : Executor
{
    private readonly AIAgent _emailSummaryAgent;

    public EmailSummaryExecutor(AIAgent emailSummaryAgent) : base("EmailSummaryExecutor")
    {
        this._emailSummaryAgent = emailSummaryAgent;
    }

    [MessageHandler]
    private async ValueTask<AnalysisResult> HandleAsync(AnalysisResult message, IWorkflowContext context, CancellationToken cancellationToken = default)
    {
        // Read the email content from shared state
        var email = await context.ReadStateAsync<Email>(message.EmailId, scopeName: EmailStateConstants.EmailStateScope);

        // Generate summary for long emails
        var response = await this._emailSummaryAgent.RunAsync(email!.EmailContent);
        var emailSummary = JsonSerializer.Deserialize<EmailSummary>(response.Text);

        // Enrich the analysis result with the summary
        message.EmailSummary = emailSummary!.Summary;

        return message;
    }
}

/// <summary>
/// Executor that sends emails.
/// </summary>
internal sealed partial class SendEmailExecutor : Executor
{
    public SendEmailExecutor() : base("SendEmailExecutor") { }

    [MessageHandler]
    private async ValueTask HandleAsync(EmailResponse message, IWorkflowContext context, CancellationToken cancellationToken = default) =>
        await context.YieldOutputAsync($"Email sent: {message.Response}");
}

/// <summary>
/// Executor that handles spam messages.
/// </summary>
internal sealed partial class HandleSpamExecutor : Executor
{
    public HandleSpamExecutor() : base("HandleSpamExecutor") { }

    [MessageHandler]
    private async ValueTask HandleAsync(AnalysisResult message, IWorkflowContext context, CancellationToken cancellationToken = default)
    {
        if (message.spamDecision == SpamDecision.Spam)
        {
            await context.YieldOutputAsync($"Email marked as spam: {message.Reason}");
        }
        else
        {
            throw new ArgumentException("This executor should only handle spam messages.");
        }
    }
}

/// <summary>
/// Executor that handles uncertain messages requiring manual review.
/// </summary>
internal sealed partial class HandleUncertainExecutor : Executor
{
    public HandleUncertainExecutor() : base("HandleUncertainExecutor") { }

    [MessageHandler]
    private async ValueTask HandleAsync(AnalysisResult message, IWorkflowContext context, CancellationToken cancellationToken = default)
    {
        if (message.spamDecision == SpamDecision.Uncertain)
        {
            var email = await context.ReadStateAsync<Email>(message.EmailId, scopeName: EmailStateConstants.EmailStateScope);
            await context.YieldOutputAsync($"Email marked as uncertain: {message.Reason}. Email content: {email?.EmailContent}");
        }
        else
        {
            throw new ArgumentException("This executor should only handle uncertain spam decisions.");
        }
    }
}

/// <summary>
/// Executor that handles database access with custom events.
/// </summary>
internal sealed partial class DatabaseAccessExecutor : Executor
{
    public DatabaseAccessExecutor() : base("DatabaseAccessExecutor") { }

    [MessageHandler]
    private async ValueTask HandleAsync(AnalysisResult message, IWorkflowContext context, CancellationToken cancellationToken = default)
    {
        // Simulate database operations
        await context.ReadStateAsync<Email>(message.EmailId, scopeName: EmailStateConstants.EmailStateScope);
        await Task.Delay(100); // Simulate database access delay

        // Emit custom database event for monitoring
        await context.AddEventAsync(new DatabaseEvent($"Email {message.EmailId} saved to database."));
    }
}

Továbbfejlesztett AI-ügynökök

Ügynökök létrehozása elemzéshez, segítségnyújtáshoz és összegzéshez:

/// <summary>
/// Create an enhanced email analysis agent.
/// </summary>
/// <returns>A ChatClientAgent configured for comprehensive email analysis</returns>
private static ChatClientAgent GetEmailAnalysisAgent(IChatClient chatClient) =>
    new(chatClient, new ChatClientAgentOptions(instructions: "You are a spam detection assistant that identifies spam emails.")
    {
        ChatOptions = new()
        {
            ResponseFormat = ChatResponseFormat.ForJsonSchema<AnalysisResult>()
        }
    });

/// <summary>
/// Creates an email assistant agent.
/// </summary>
/// <returns>A ChatClientAgent configured for email assistance</returns>
private static ChatClientAgent GetEmailAssistantAgent(IChatClient chatClient) =>
    new(chatClient, new ChatClientAgentOptions(instructions: "You are an email assistant that helps users draft responses to emails with professionalism.")
    {
        ChatOptions = new()
        {
            ResponseFormat = ChatResponseFormat.ForJsonSchema<EmailResponse>()
        }
    });

/// <summary>
/// Creates an agent that summarizes emails.
/// </summary>
/// <returns>A ChatClientAgent configured for email summarization</returns>
private static ChatClientAgent GetEmailSummaryAgent(IChatClient chatClient) =>
    new(chatClient, new ChatClientAgentOptions(instructions: "You are an assistant that helps users summarize emails.")
    {
        ChatOptions = new()
        {
            ResponseFormat = ChatResponseFormat.ForJsonSchema<EmailSummary>()
        }
    });

Többkijelöléses munkafolyamat felépítése

Hozza létre a munkafolyamatot kifinomult útválasztással és párhuzamos feldolgozással:

public static class Program
{
    private static async Task Main()
    {
        // Set up the Azure OpenAI client
        var endpoint = Environment.GetEnvironmentVariable("AZURE_OPENAI_ENDPOINT") ?? throw new Exception("AZURE_OPENAI_ENDPOINT is not set.");
        var deploymentName = Environment.GetEnvironmentVariable("AZURE_OPENAI_DEPLOYMENT_NAME") ?? "gpt-4o-mini";
        var chatClient = new AzureOpenAIClient(new Uri(endpoint), new DefaultAzureCredential()).GetChatClient(deploymentName).AsIChatClient();

        // Create agents
        AIAgent emailAnalysisAgent = GetEmailAnalysisAgent(chatClient);
        AIAgent emailAssistantAgent = GetEmailAssistantAgent(chatClient);
        AIAgent emailSummaryAgent = GetEmailSummaryAgent(chatClient);

        // Create executors
        var emailAnalysisExecutor = new EmailAnalysisExecutor(emailAnalysisAgent);
        var emailAssistantExecutor = new EmailAssistantExecutor(emailAssistantAgent);
        var emailSummaryExecutor = new EmailSummaryExecutor(emailSummaryAgent);
        var sendEmailExecutor = new SendEmailExecutor();
        var handleSpamExecutor = new HandleSpamExecutor();
        var handleUncertainExecutor = new HandleUncertainExecutor();
        var databaseAccessExecutor = new DatabaseAccessExecutor();

        // Build the workflow with multi-selection fan-out
        WorkflowBuilder builder = new(emailAnalysisExecutor);
        builder.AddFanOutEdge(
            emailAnalysisExecutor,
            targets: [
                handleSpamExecutor,        // Index 0: Spam handler
                emailAssistantExecutor,    // Index 1: Email assistant (always for NotSpam)
                emailSummaryExecutor,      // Index 2: Summarizer (conditionally for long NotSpam)
                handleUncertainExecutor,   // Index 3: Uncertain handler
            ],
            targetSelector: GetTargetAssigner()
        )
        // Email assistant branch
        .AddEdge(emailAssistantExecutor, sendEmailExecutor)

        // Database persistence: conditional routing
        .AddEdge<AnalysisResult>(
            emailAnalysisExecutor,
            databaseAccessExecutor,
            condition: analysisResult => analysisResult?.EmailLength <= EmailProcessingConstants.LongEmailThreshold) // Short emails
        .AddEdge(emailSummaryExecutor, databaseAccessExecutor) // Long emails with summary

        .WithOutputFrom(handleUncertainExecutor, handleSpamExecutor, sendEmailExecutor);

        var workflow = builder.Build();

        // Read a moderately long email to trigger both assistant and summarizer
        string email = Resources.Read("email.txt");

        // Execute the workflow with custom event handling
        StreamingRun run = await InProcessExecution.StreamAsync(workflow, new ChatMessage(ChatRole.User, email));
        await run.TrySendMessageAsync(new TurnToken(emitEvents: true));
        await foreach (WorkflowEvent evt in run.WatchStreamAsync().ConfigureAwait(false))
        {
            if (evt is WorkflowOutputEvent outputEvent)
            {
                Console.WriteLine($"Output: {outputEvent}");
            }

            if (evt is DatabaseEvent databaseEvent)
            {
                Console.WriteLine($"Database: {databaseEvent}");
            }
        }
    }
}

Minta összehasonlítása: Több kijelölés és Switch-Case

Switch-Case mintázat (előző):

// One input → exactly one output
builder.AddSwitch(spamDetectionExecutor, switchBuilder =>
    switchBuilder
    .AddCase(GetCondition(SpamDecision.NotSpam), emailAssistantExecutor)
    .AddCase(GetCondition(SpamDecision.Spam), handleSpamExecutor)
    .WithDefault(handleUncertainExecutor)
)

Több kijelölési minta:

// One input → one or more outputs (dynamic fan-out)
builder.AddFanOutEdge(
    emailAnalysisExecutor,
    targets: [handleSpamExecutor, emailAssistantExecutor, emailSummaryExecutor, handleUncertainExecutor],
    targetSelector: GetTargetAssigner() // Returns list of target indices
)

A többkijelöléses élek fő előnyei

  1. Párhuzamos feldolgozás: Egyszerre több ág is végrehajtható
  2. Feltételes szétosztás: A célok száma a tartalom függvényében változik
  3. Tartalomérzékeny útválasztás: Az üzenet tulajdonságain alapuló döntések, nem csak beírás
  4. Hatékony erőforrás-használat: Csak a szükséges ágak aktiválódnak
  5. Összetett üzleti logika: Támogatja a kifinomult útválasztási forgatókönyveket

A többszörös kijelölési példa futtatása

Ha hosszú e-mailt futtatja ezt a munkafolyamatot:

Output: Email sent: [Professional response generated by AI]
Database: Email abc123 saved to database.

Ha rövid e-maillel fut, a rendszer kihagyja az összegzőt:

Output: Email sent: [Professional response generated by AI]
Database: Email def456 saved to database.

Real-World használati esetek

  • Levelezőrendszerek: Útvonal a válasz-asszisztenshez + archív + elemzés (feltételesen)
  • Tartalomfeldolgozás: Trigger átírása + fordítás + elemzés (tartalomtípus alapján)
  • Rendelésfeldolgozás: Teljesítési útvonal + számlázás + értesítések (a rendelés tulajdonságai alapján)
  • Adatfolyamok: Különböző elemzési folyamatok aktiválása az adatjellemzők alapján

Több kijelöléses teljes implementáció

A teljes munka végrehajtásához tekintse meg ezt a mintát az Agent Framework-adattárban.

A Switch-Case-en túl: Többválasztásos útválasztás

Bár a switch-case szerkezetek pontosan egy célhelyre irányítják az üzeneteket, a valós munkafolyamatoknak gyakran több párhuzamos műveletet kell aktiválniuk az adattulajdonságok alapján. A particionált élek (többkijelölési élcsoportként implementálva) kifinomult ventilátorkiválasztási mintákat tesznek lehetővé, ahol egyetlen üzenet egyszerre több alárendelt végrehajtót is aktiválhat.

Speciális e-mail-feldolgozási munkafolyamat

A kapcsolós eset példájára építve egy továbbfejlesztett e-mail-feldolgozó rendszert fog létrehozni, amely kifinomult útválasztási logikát mutat be:

  • Spam e-mailek → egységes levélszemét-kezelő (például switch-case szerkezet)
  • Megbízható e-mailekMindig aktiválja az e-mail-asszisztenst + Feltételesen aktiválja az összegzőt hosszú e-mailek esetén
  • Bizonytalan e-mailek → Egy bizonytalan kezelő (például switch-case szerkezet)
  • Adatbázis-adatmegőrzési → Rövid és összesített hosszú e-mailek esetén is aktiválódik

Ez a minta lehetővé teszi a tartalom jellemzőihez igazodó párhuzamos feldolgozási folyamatokat.

A tárgyalt fogalmak

Továbbfejlesztett adatmodellek több kijelöléshez

Az adatmodellek kiterjesztése az e-mailek hosszának elemzéséhez és összegzéséhez:

class AnalysisResultAgent(BaseModel):
    """Enhanced structured output from email analysis agent."""

    spam_decision: Literal["NotSpam", "Spam", "Uncertain"]
    reason: str

class EmailResponse(BaseModel):
    """Response from email assistant."""

    response: str

class EmailSummaryModel(BaseModel):
    """Summary generated by email summary agent."""

    summary: str

@dataclass
class AnalysisResult:
    """Internal analysis result with email metadata for routing decisions."""

    spam_decision: str
    reason: str
    email_length: int  # Used for conditional routing
    email_summary: str  # Populated by summary agent
    email_id: str

@dataclass
class Email:
    """Email content stored in shared state."""

    email_id: str
    email_content: str

# Custom event data for database operations
class DatabaseEvent:
    """Custom event data for tracking database operations."""
    def __init__(self, message: str):
        self.message = message

    def __repr__(self) -> str:
        return f"DatabaseEvent({self.message})"

Kijelölési függvény: A többszörös kijelölés szíve

A kijelölési függvény határozza meg, hogy mely végrehajtók kapják meg az egyes üzeneteket:

LONG_EMAIL_THRESHOLD = 100

def select_targets(analysis: AnalysisResult, target_ids: list[str]) -> list[str]:
    """Intelligent routing based on spam decision and email characteristics."""

    # Target order: [handle_spam, submit_to_email_assistant, summarize_email, handle_uncertain]
    handle_spam_id, submit_to_email_assistant_id, summarize_email_id, handle_uncertain_id = target_ids

    if analysis.spam_decision == "Spam":
        # Route only to spam handler
        return [handle_spam_id]

    elif analysis.spam_decision == "NotSpam":
        # Always route to email assistant
        targets = [submit_to_email_assistant_id]

        # Conditionally add summarizer for long emails
        if analysis.email_length > LONG_EMAIL_THRESHOLD:
            targets.append(summarize_email_id)

        return targets

    else:  # Uncertain
        # Route only to uncertain handler
        return [handle_uncertain_id]

A kijelölési függvények főbb jellemzői

  1. Dinamikus cél kiválasztása: Az aktiválandó végrehajtóazonosítók listáját adja vissza
  2. Tartalomérzékeny útválasztás: Döntéseket hoz az üzenet tulajdonságai alapján
  3. Párhuzamos feldolgozás: Egyszerre több cél is végrehajtható
  4. Feltételes logika: Összetett elágaztatás több feltétel alapján

Többkijelöléses munkafolyamat-végrehajtók

A továbbfejlesztett elemzést és útválasztást kezelő végrehajtók implementálása:

EMAIL_STATE_PREFIX = "email:"
CURRENT_EMAIL_ID_KEY = "current_email_id"

@executor(id="store_email")
async def store_email(email_text: str, ctx: WorkflowContext[AgentExecutorRequest]) -> None:
    """Store email and initiate analysis."""

    new_email = Email(email_id=str(uuid4()), email_content=email_text)
    ctx.set_state(f"{EMAIL_STATE_PREFIX}{new_email.email_id}", new_email)
    ctx.set_state(CURRENT_EMAIL_ID_KEY, new_email.email_id)

    await ctx.send_message(
        AgentExecutorRequest(messages=[Message(role="user", contents=[new_email.email_content])], should_respond=True)
    )

@executor(id="to_analysis_result")
async def to_analysis_result(response: AgentExecutorResponse, ctx: WorkflowContext[AnalysisResult]) -> None:
    """Transform agent response into enriched analysis result."""

    parsed = AnalysisResultAgent.model_validate_json(response.agent_run_response.text)
    email_id: str = ctx.get_state(CURRENT_EMAIL_ID_KEY)
    email: Email = ctx.get_state(f"{EMAIL_STATE_PREFIX}{email_id}")

    # Create enriched analysis result with email length for routing decisions
    await ctx.send_message(
        AnalysisResult(
            spam_decision=parsed.spam_decision,
            reason=parsed.reason,
            email_length=len(email.email_content),  # Key for conditional routing
            email_summary="",
            email_id=email_id,
        )
    )

@executor(id="submit_to_email_assistant")
async def submit_to_email_assistant(analysis: AnalysisResult, ctx: WorkflowContext[AgentExecutorRequest]) -> None:
    """Handle legitimate emails by forwarding to email assistant."""

    if analysis.spam_decision != "NotSpam":
        raise RuntimeError("This executor should only handle NotSpam messages.")

    email: Email = ctx.get_state(f"{EMAIL_STATE_PREFIX}{analysis.email_id}")
    await ctx.send_message(
        AgentExecutorRequest(messages=[Message(role="user", contents=[email.email_content])], should_respond=True)
    )

@executor(id="finalize_and_send")
async def finalize_and_send(response: AgentExecutorResponse, ctx: WorkflowContext[Never, str]) -> None:
    """Final step for email assistant branch."""

    parsed = EmailResponse.model_validate_json(response.agent_run_response.text)
    await ctx.yield_output(f"Email sent: {parsed.response}")

@executor(id="summarize_email")
async def summarize_email(analysis: AnalysisResult, ctx: WorkflowContext[AgentExecutorRequest]) -> None:
    """Generate summary for long emails (parallel branch)."""

    # Only called for long NotSpam emails by selection function
    email: Email = ctx.get_state(f"{EMAIL_STATE_PREFIX}{analysis.email_id}")
    await ctx.send_message(
        AgentExecutorRequest(messages=[Message(role="user", contents=[email.email_content])], should_respond=True)
    )

@executor(id="merge_summary")
async def merge_summary(response: AgentExecutorResponse, ctx: WorkflowContext[AnalysisResult]) -> None:
    """Merge summary back into analysis result for database persistence."""

    summary = EmailSummaryModel.model_validate_json(response.agent_run_response.text)
    email_id: str = ctx.get_state(CURRENT_EMAIL_ID_KEY)
    email: Email = ctx.get_state(f"{EMAIL_STATE_PREFIX}{email_id}")

    # Create analysis result with summary for database storage
    await ctx.send_message(
        AnalysisResult(
            spam_decision="NotSpam",
            reason="",
            email_length=len(email.email_content),
            email_summary=summary.summary,  # Now includes summary
            email_id=email_id,
        )
    )

@executor(id="handle_spam")
async def handle_spam(analysis: AnalysisResult, ctx: WorkflowContext[Never, str]) -> None:
    """Handle spam emails (single target like switch-case)."""

    if analysis.spam_decision == "Spam":
        await ctx.yield_output(f"Email marked as spam: {analysis.reason}")
    else:
        raise RuntimeError("This executor should only handle Spam messages.")

@executor(id="handle_uncertain")
async def handle_uncertain(analysis: AnalysisResult, ctx: WorkflowContext[Never, str]) -> None:
    """Handle uncertain emails (single target like switch-case)."""

    if analysis.spam_decision == "Uncertain":
        email: Email | None = ctx.get_state(f"{EMAIL_STATE_PREFIX}{analysis.email_id}")
        await ctx.yield_output(
            f"Email marked as uncertain: {analysis.reason}. Email content: {getattr(email, 'email_content', '')}"
        )
    else:
        raise RuntimeError("This executor should only handle Uncertain messages.")

@executor(id="database_access")
async def database_access(analysis: AnalysisResult, ctx: WorkflowContext[Never, str]) -> None:
    """Simulate database persistence with custom events."""

    await asyncio.sleep(0.05)  # Simulate DB operation
    await ctx.add_event(WorkflowEvent("data", data=DatabaseEvent(f"Email {analysis.email_id} saved to database.")))

Továbbfejlesztett AI-ügynökök

Ügynökök létrehozása elemzéshez, segítségnyújtáshoz és összegzéshez:

async def main() -> None:
    chat_client = AzureOpenAIChatClient(credential=AzureCliCredential())

    # Enhanced analysis agent
    email_analysis_agent = AgentExecutor(
        chat_client.as_agent(
            instructions=(
                "You are a spam detection assistant that identifies spam emails. "
                "Always return JSON with fields 'spam_decision' (one of NotSpam, Spam, Uncertain) "
                "and 'reason' (string)."
            ),
            response_format=AnalysisResultAgent,
        ),
        id="email_analysis_agent",
    )

    # Email assistant (same as before)
    email_assistant_agent = AgentExecutor(
        chat_client.as_agent(
            instructions=(
                "You are an email assistant that helps users draft responses to emails with professionalism."
            ),
            response_format=EmailResponse,
        ),
        id="email_assistant_agent",
    )

    # New: Email summary agent for long emails
    email_summary_agent = AgentExecutor(
        chat_client.as_agent(
            instructions="You are an assistant that helps users summarize emails.",
            response_format=EmailSummaryModel,
        ),
        id="email_summary_agent",
    )

Többkijelöléses munkafolyamat létrehozása

Hozza létre a munkafolyamatot kifinomult útválasztással és párhuzamos feldolgozással:

    workflow = (
        WorkflowBuilder(start_executor=store_email)
        .add_edge(store_email, email_analysis_agent)
        .add_edge(email_analysis_agent, to_analysis_result)

        # Multi-selection edge group: intelligent fan-out based on content
        .add_multi_selection_edge_group(
            to_analysis_result,
            [handle_spam, submit_to_email_assistant, summarize_email, handle_uncertain],
            selection_func=select_targets,
        )

        # Email assistant branch (always for NotSpam)
        .add_edge(submit_to_email_assistant, email_assistant_agent)
        .add_edge(email_assistant_agent, finalize_and_send)

        # Summary branch (only for long NotSpam emails)
        .add_edge(summarize_email, email_summary_agent)
        .add_edge(email_summary_agent, merge_summary)

        # Database persistence: conditional routing
        .add_edge(to_analysis_result, database_access,
                 condition=lambda r: r.email_length <= LONG_EMAIL_THRESHOLD)  # Short emails
        .add_edge(merge_summary, database_access)  # Long emails with summary

        .build()
    )

Végrehajtás eseménystreameléssel

Futtassa a munkafolyamatot, és figyelje meg a párhuzamos végrehajtást egyéni eseményeken keresztül:

    # Use a moderately long email to trigger both assistant and summarizer
    email = """
    Hello team, here are the updates for this week:

    1. Project Alpha is on track and we should have the first milestone completed by Friday.
    2. The client presentation has been scheduled for next Tuesday at 2 PM.
    3. Please review the Q4 budget allocation and provide feedback by Wednesday.

    Let me know if you have any questions or concerns.

    Best regards,
    Alex
    """

    # Stream events to see parallel execution
    async for event in workflow.run_stream(email):
        if isinstance(event.data, DatabaseEvent):
            print(f"Database: {event}")
        elif event.type == "output":
            print(f"Output: {event.data}")

Többszörös kiválasztás és Switch-Case összehasonlítása

Switch-Case mintázat (előző):

# One input → exactly one output
.add_switch_case_edge_group(
    source,
    [
        Case(condition=lambda x: x.result == "A", target=handler_a),
        Case(condition=lambda x: x.result == "B", target=handler_b),
        Default(target=handler_c),
    ],
)

Több kijelölési minta:

# One input → one or more outputs (dynamic fan-out)
.add_multi_selection_edge_group(
    source,
    [handler_a, handler_b, handler_c, handler_d],
    selection_func=intelligent_router,  # Returns list of target IDs
)

A C# többszörös kiválasztás előnyei

  1. Párhuzamos feldolgozás: Egyszerre több ág is végrehajtható
  2. Feltételes szétosztás: A célok száma a tartalom függvényében változik
  3. Tartalomérzékeny útválasztás: Az üzenet tulajdonságain alapuló döntések, nem csak beírás
  4. Hatékony erőforrás-használat: Csak a szükséges ágak aktiválódnak
  5. Összetett üzleti logika: Támogatja a kifinomult útválasztási forgatókönyveket

C# valós alkalmazások

  • Levelezőrendszerek: Útvonal a válasz-asszisztenshez + archív + elemzés (feltételesen)
  • Tartalomfeldolgozás: Trigger átírása + fordítás + elemzés (tartalomtípus alapján)
  • Rendelésfeldolgozás: Teljesítési útvonal + számlázás + értesítések (a rendelés tulajdonságai alapján)
  • Adatfolyamok: Különböző elemzési folyamatok aktiválása az adatjellemzők alapján

Többkijelöléses mintakód

A teljes működő megvalósításért tekintse meg az Agent Framework adattárban található multi_selection_edge_group.py mintát.

Következő lépések