Udostępnij przez


Tworzenie przepływu pracy za pomocą logiki rozgałęziania

Z tego samouczka dowiesz się, jak utworzyć przepływ pracy z logiką rozgałęziania przy użyciu platformy Agent Framework. Logika rozgałęziania umożliwia przepływowi pracy podejmowanie decyzji na podstawie określonych warunków, co umożliwia bardziej złożone i dynamiczne zachowanie.

Krawędzie warunkowe

Krawędzie warunkowe umożliwiają przepływowi pracy podejmowanie decyzji dotyczących routingu na podstawie zawartości lub właściwości komunikatów przepływających przez przepływ pracy. Umożliwia to dynamiczne rozgałęzianie, gdzie różne ścieżki wykonania są wybierane na podstawie warunków uruchomieniowych.

Co będziesz budować

Utworzysz przepływ pracy przetwarzania wiadomości e-mail, który demonstruje routing warunkowy:

  • Agent wykrywania spamu, który analizuje przychodzące wiadomości e-mail i zwraca ustrukturyzowany kod JSON.
  • Krawędzie warunkowe, które kierują wiadomości e-mail do różnych programów obsługi na podstawie klasyfikacji.
  • Legalny program obsługi poczty e-mail, który opracowuje profesjonalne odpowiedzi.
  • Mechanizm obsługi spamu, który oznacza podejrzane wiadomości e-mail.
  • Zarządzanie wspólnym stanem w celu zachowania danych e-mail między etapami przepływu pracy.

Omówione pojęcia

Wymagania wstępne

Instalowanie pakietów NuGet

Najpierw zainstaluj wymagane pakiety dla projektu .NET:

dotnet add package Azure.AI.OpenAI --prerelease
dotnet add package Azure.Identity
dotnet add package Microsoft.Agents.AI.Workflows --prerelease
dotnet add package Microsoft.Extensions.AI.OpenAI --prerelease

Definiowanie modeli danych

Zacznij od zdefiniowania struktur danych, które będą przepływać przez przepływ pracy:

using System.Text.Json.Serialization;

/// <summary>
/// Represents the result of spam detection.
/// </summary>
public sealed class DetectionResult
{
    [JsonPropertyName("is_spam")]
    public bool IsSpam { get; set; }

    [JsonPropertyName("reason")]
    public string Reason { get; set; } = string.Empty;

    // Email ID is generated by the executor, not the agent
    [JsonIgnore]
    public string EmailId { get; set; } = string.Empty;
}

/// <summary>
/// Represents an email.
/// </summary>
internal sealed class Email
{
    [JsonPropertyName("email_id")]
    public string EmailId { get; set; } = string.Empty;

    [JsonPropertyName("email_content")]
    public string EmailContent { get; set; } = string.Empty;
}

/// <summary>
/// Represents the response from the email assistant.
/// </summary>
public sealed class EmailResponse
{
    [JsonPropertyName("response")]
    public string Response { get; set; } = string.Empty;
}

/// <summary>
/// Constants for shared state scopes.
/// </summary>
internal static class EmailStateConstants
{
    public const string EmailStateScope = "EmailState";
}

Tworzenie funkcji warunkowych

Funkcja warunku ocenia wynik wykrywania spamu, aby określić, którą ścieżką przepływ pracy powinien podążać.

/// <summary>
/// Creates a condition for routing messages based on the expected spam detection result.
/// </summary>
/// <param name="expectedResult">The expected spam detection result</param>
/// <returns>A function that evaluates whether a message meets the expected result</returns>
private static Func<object?, bool> GetCondition(bool expectedResult) =>
    detectionResult => detectionResult is DetectionResult result && result.IsSpam == expectedResult;

Ta funkcja warunku:

  • bool expectedResult Przyjmuje parametr (prawda dla spamu, false dla nie-spamu)
  • Zwraca funkcję, którą można używać jako warunek brzegowy
  • Bezpiecznie sprawdza, czy komunikat jest typu DetectionResult i porównuje właściwość IsSpam.

Tworzenie agentów sztucznej inteligencji

Skonfiguruj agentów sztucznej inteligencji, którzy będą obsługiwać wykrywanie spamu i pomoc w wiadomościach e-mail:

using Azure.AI.OpenAI;
using Azure.Identity;
using Microsoft.Agents.AI;
using Microsoft.Extensions.AI;

/// <summary>
/// Creates a spam detection agent.
/// </summary>
/// <returns>A ChatClientAgent configured for spam detection</returns>
private static ChatClientAgent GetSpamDetectionAgent(IChatClient chatClient) =>
    new(chatClient, new ChatClientAgentOptions(instructions: "You are a spam detection assistant that identifies spam emails.")
    {
        ChatOptions = new()
        {
            ResponseFormat = ChatResponseFormat.ForJsonSchema(AIJsonUtilities.CreateJsonSchema(typeof(DetectionResult)))
        }
    });

/// <summary>
/// Creates an email assistant agent.
/// </summary>
/// <returns>A ChatClientAgent configured for email assistance</returns>
private static ChatClientAgent GetEmailAssistantAgent(IChatClient chatClient) =>
    new(chatClient, new ChatClientAgentOptions(instructions: "You are an email assistant that helps users draft professional responses to emails.")
    {
        ChatOptions = new()
        {
            ResponseFormat = ChatResponseFormat.ForJsonSchema(AIJsonUtilities.CreateJsonSchema(typeof(EmailResponse)))
        }
    });

Implementowanie funkcji wykonawczych

Utwórz funkcje wykonawcze przepływu pracy obsługujące różne etapy przetwarzania wiadomości e-mail:

using Microsoft.Agents.AI.Workflows;
using System.Text.Json;

/// <summary>
/// Executor that detects spam using an AI agent.
/// </summary>
internal sealed class SpamDetectionExecutor : Executor<ChatMessage, DetectionResult>
{
    private readonly AIAgent _spamDetectionAgent;

    public SpamDetectionExecutor(AIAgent spamDetectionAgent) : base("SpamDetectionExecutor")
    {
        this._spamDetectionAgent = spamDetectionAgent;
    }

    public override async ValueTask<DetectionResult> HandleAsync(ChatMessage message, IWorkflowContext context, CancellationToken cancellationToken = default)
    {
        // Generate a random email ID and store the email content to shared state
        var newEmail = new Email
        {
            EmailId = Guid.NewGuid().ToString("N"),
            EmailContent = message.Text
        };
        await context.QueueStateUpdateAsync(newEmail.EmailId, newEmail, scopeName: EmailStateConstants.EmailStateScope);

        // Invoke the agent for spam detection
        var response = await this._spamDetectionAgent.RunAsync(message);
        var detectionResult = JsonSerializer.Deserialize<DetectionResult>(response.Text);

        detectionResult!.EmailId = newEmail.EmailId;
        return detectionResult;
    }
}

/// <summary>
/// Executor that assists with email responses using an AI agent.
/// </summary>
internal sealed class EmailAssistantExecutor : Executor<DetectionResult, EmailResponse>
{
    private readonly AIAgent _emailAssistantAgent;

    public EmailAssistantExecutor(AIAgent emailAssistantAgent) : base("EmailAssistantExecutor")
    {
        this._emailAssistantAgent = emailAssistantAgent;
    }

    public override async ValueTask<EmailResponse> HandleAsync(DetectionResult message, IWorkflowContext context, CancellationToken cancellationToken = default)
    {
        if (message.IsSpam)
        {
            throw new ArgumentException("This executor should only handle non-spam messages.");
        }

        // Retrieve the email content from shared state
        var email = await context.ReadStateAsync<Email>(message.EmailId, scopeName: EmailStateConstants.EmailStateScope)
            ?? throw new InvalidOperationException("Email not found.");

        // Invoke the agent to draft a response
        var response = await this._emailAssistantAgent.RunAsync(email.EmailContent);
        var emailResponse = JsonSerializer.Deserialize<EmailResponse>(response.Text);

        return emailResponse!;
    }
}

/// <summary>
/// Executor that sends emails.
/// </summary>
internal sealed class SendEmailExecutor : Executor<EmailResponse>
{
    public SendEmailExecutor() : base("SendEmailExecutor") { }

    public override async ValueTask HandleAsync(EmailResponse message, IWorkflowContext context, CancellationToken cancellationToken = default) =>
        await context.YieldOutputAsync($"Email sent: {message.Response}");
}

/// <summary>
/// Executor that handles spam messages.
/// </summary>
internal sealed class HandleSpamExecutor : Executor<DetectionResult>
{
    public HandleSpamExecutor() : base("HandleSpamExecutor") { }

    public override async ValueTask HandleAsync(DetectionResult message, IWorkflowContext context, CancellationToken cancellationToken = default)
    {
        if (message.IsSpam)
        {
            await context.YieldOutputAsync($"Email marked as spam: {message.Reason}");
        }
        else
        {
            throw new ArgumentException("This executor should only handle spam messages.");
        }
    }
}

Budowa workflowu za pomocą krawędzi warunkowych

Teraz utwórz główny program, który kompiluje i wykonuje przepływ pracy:

using Microsoft.Extensions.AI;

public static class Program
{
    private static async Task Main()
    {
        // Set up the Azure OpenAI client
        var endpoint = Environment.GetEnvironmentVariable("AZURE_OPENAI_ENDPOINT")
            ?? throw new Exception("AZURE_OPENAI_ENDPOINT is not set.");
        var deploymentName = Environment.GetEnvironmentVariable("AZURE_OPENAI_DEPLOYMENT_NAME") ?? "gpt-4o-mini";
        var chatClient = new AzureOpenAIClient(new Uri(endpoint), new AzureCliCredential())
            .GetChatClient(deploymentName).AsIChatClient();

        // Create agents
        AIAgent spamDetectionAgent = GetSpamDetectionAgent(chatClient);
        AIAgent emailAssistantAgent = GetEmailAssistantAgent(chatClient);

        // Create executors
        var spamDetectionExecutor = new SpamDetectionExecutor(spamDetectionAgent);
        var emailAssistantExecutor = new EmailAssistantExecutor(emailAssistantAgent);
        var sendEmailExecutor = new SendEmailExecutor();
        var handleSpamExecutor = new HandleSpamExecutor();

        // Build the workflow with conditional edges
        var workflow = new WorkflowBuilder(spamDetectionExecutor)
            // Non-spam path: route to email assistant when IsSpam = false
            .AddEdge(spamDetectionExecutor, emailAssistantExecutor, condition: GetCondition(expectedResult: false))
            .AddEdge(emailAssistantExecutor, sendEmailExecutor)
            // Spam path: route to spam handler when IsSpam = true
            .AddEdge(spamDetectionExecutor, handleSpamExecutor, condition: GetCondition(expectedResult: true))
            .WithOutputFrom(handleSpamExecutor, sendEmailExecutor)
            .Build();

        // Execute the workflow with sample spam email
        string emailContent = "Congratulations! You've won $1,000,000! Click here to claim your prize now!";
        StreamingRun run = await InProcessExecution.StreamAsync(workflow, new ChatMessage(ChatRole.User, emailContent));
        await run.TrySendMessageAsync(new TurnToken(emitEvents: true));

        await foreach (WorkflowEvent evt in run.WatchStreamAsync().ConfigureAwait(false))
        {
            if (evt is WorkflowOutputEvent outputEvent)
            {
                Console.WriteLine($"{outputEvent}");
            }
        }
    }
}

Jak to działa

  1. Wprowadzenie do przepływu pracy: przepływ pracy rozpoczyna się od otrzymania spamDetectionExecutorChatMessage.

  2. Analiza spamu: agent wykrywania spamu analizuje wiadomość e-mail i zwraca strukturę DetectionResult z właściwościami IsSpam i Reason .

  3. Routing warunkowy: na podstawie wartości IsSpam.

    • Jeśli spam (IsSpam = true): przekierowuje do HandleSpamExecutor za pomocą GetCondition(true)
    • Jeśli jest to zgodne z prawem (IsSpam = false): trasy do EmailAssistantExecutor za pomocą GetCondition(false)
  4. Generowanie odpowiedzi: w przypadku uzasadnionych wiadomości e-mail asystent poczty e-mail opracowuje profesjonalną odpowiedź.

  5. Wynik końcowy: przepływ pracy zwraca powiadomienie o spamie lub wysyła przygotowaną odpowiedź na wiadomość e-mail.

Kluczowe funkcje krawędzi warunkowych

  1. Warunki Bezpieczeństwa Typu: Metoda GetCondition tworzy funkcje warunków do wielokrotnego użytku, które bezpiecznie oceniają zawartość komunikatu.

  2. Wiele ścieżek: pojedynczy wykonawca może mieć wiele krawędzi wychodzących z różnymi warunkami, umożliwiając złożoną logikę rozgałęziania.

  3. Stan udostępniony: dane poczty e-mail są utrwalane w funkcjach wykonawczych przy użyciu zarządzania stanem o określonym zakresie, co umożliwia podrzędnym wykonawcom uzyskiwanie dostępu do oryginalnej zawartości.

  4. Obsługa błędów: funkcje wykonawcze weryfikują swoje dane wejściowe i zgłaszają znaczące wyjątki podczas odbierania nieoczekiwanych typów komunikatów.

  5. Czysta architektura: każda funkcja wykonawcza ma jedną odpowiedzialność, dzięki czemu przepływ pracy można konserwować i testować.

Uruchamianie przykładu

Po uruchomieniu tego przepływu pracy z przykładem wiadomości e-mail ze spamem:

Email marked as spam: This email contains common spam indicators including monetary prizes, urgency tactics, and suspicious links that are typical of phishing attempts.

Spróbuj zmienić zawartość wiadomości e-mail na coś uzasadnionego:

string emailContent = "Hi, I wanted to follow up on our meeting yesterday and get your thoughts on the project proposal.";

Przepływ pracy zostanie przekierowany do asystenta e-mail, który wygeneruje profesjonalną odpowiedź.

Ten wzorzec routingu warunkowego stanowi podstawę do tworzenia zaawansowanych przepływów pracy, które mogą obsługiwać złożone drzewa decyzyjne i logikę biznesową.

Kompletna implementacja

Aby uzyskać pełną działającą implementację, zobacz ten przykład w repozytorium Platformy agentów.

Co będziesz budować

Utworzysz przepływ pracy przetwarzania wiadomości e-mail, który demonstruje routing warunkowy:

  • Agent wykrywania spamu, który analizuje przychodzące wiadomości e-mail
  • Krawędzie warunkowe, które kierują wiadomości e-mail do różnych modułów obsługi na podstawie klasyfikacji
  • Legalny program obsługi poczty e-mail, który projektuje profesjonalne odpowiedzi
  • Narzędzie do obsługi spamu, które oznacza podejrzane wiadomości e-mail

Omówione pojęcia

Wymagania wstępne

  • Środowisko Python w wersji 3.10 lub nowszej
  • Zainstalowano program Agent Framework: pip install agent-framework-core --pre
  • Usługa Azure OpenAI skonfigurowana z odpowiednimi zmiennymi środowiskowymi
  • Uwierzytelnianie Azure CLI: az login

Krok 1. Importowanie wymaganych zależności

Zacznij od zaimportowania niezbędnych składników dla warunkowych przepływów pracy:

import asyncio
import os
from dataclasses import dataclass
from typing import Any, Literal
from uuid import uuid4

from typing_extensions import Never

from agent_framework import (
    AgentExecutor,
    AgentExecutorRequest,
    AgentExecutorResponse,
    ChatMessage,
    Role,
    WorkflowBuilder,
    WorkflowContext,
    executor,
    Case,
    Default,
)
from agent_framework.azure import AzureOpenAIChatClient
from azure.identity import AzureCliCredential
from pydantic import BaseModel

Krok 2. Definiowanie modeli danych

Tworzenie modeli Pydantic na potrzeby wymiany danych strukturalnych między składnikami przepływu pracy:

class DetectionResult(BaseModel):
    """Represents the result of spam detection."""
    # is_spam drives the routing decision taken by edge conditions
    is_spam: bool
    # Human readable rationale from the detector
    reason: str
    # The agent must include the original email so downstream agents can operate without reloading content
    email_content: str


class EmailResponse(BaseModel):
    """Represents the response from the email assistant."""
    # The drafted reply that a user could copy or send
    response: str

Krok 3: Utwórz funkcje warunkowe

Zdefiniuj funkcje warunku, które będą określać decyzje dotyczące routingu:

def get_condition(expected_result: bool):
    """Create a condition callable that routes based on DetectionResult.is_spam."""

    # The returned function will be used as an edge predicate.
    # It receives whatever the upstream executor produced.
    def condition(message: Any) -> bool:
        # Defensive guard. If a non AgentExecutorResponse appears, let the edge pass to avoid dead ends.
        if not isinstance(message, AgentExecutorResponse):
            return True

        try:
            # Prefer parsing a structured DetectionResult from the agent JSON text.
            # Using model_validate_json ensures type safety and raises if the shape is wrong.
            detection = DetectionResult.model_validate_json(message.agent_run_response.text)
            # Route only when the spam flag matches the expected path.
            return detection.is_spam == expected_result
        except Exception:
            # Fail closed on parse errors so we do not accidentally route to the wrong path.
            # Returning False prevents this edge from activating.
            return False

    return condition

Krok 4. Tworzenie funkcji wykonawczych programu obsługi

Zdefiniuj funkcje wykonawcze do obsługi różnych wyników routingu:

@executor(id="send_email")
async def handle_email_response(response: AgentExecutorResponse, ctx: WorkflowContext[Never, str]) -> None:
    """Handle legitimate emails by drafting a professional response."""
    # Downstream of the email assistant. Parse a validated EmailResponse and yield the workflow output.
    email_response = EmailResponse.model_validate_json(response.agent_run_response.text)
    await ctx.yield_output(f"Email sent:\n{email_response.response}")


@executor(id="handle_spam")
async def handle_spam_classifier_response(response: AgentExecutorResponse, ctx: WorkflowContext[Never, str]) -> None:
    """Handle spam emails by marking them appropriately."""
    # Spam path. Confirm the DetectionResult and yield the workflow output. Guard against accidental non spam input.
    detection = DetectionResult.model_validate_json(response.agent_run_response.text)
    if detection.is_spam:
        await ctx.yield_output(f"Email marked as spam: {detection.reason}")
    else:
        # This indicates the routing predicate and executor contract are out of sync.
        raise RuntimeError("This executor should only handle spam messages.")


@executor(id="to_email_assistant_request")
async def to_email_assistant_request(
    response: AgentExecutorResponse, ctx: WorkflowContext[AgentExecutorRequest]
) -> None:
    """Transform spam detection response into a request for the email assistant."""
    # Parse the detection result and extract the email content for the assistant
    detection = DetectionResult.model_validate_json(response.agent_run_response.text)

    # Create a new request for the email assistant with the original email content
    request = AgentExecutorRequest(
        messages=[ChatMessage(Role.USER, text=detection.email_content)],
        should_respond=True
    )
    await ctx.send_message(request)

Krok 5. Tworzenie agentów sztucznej inteligencji

Skonfiguruj agentów usługi Azure OpenAI z formatowaniem danych wyjściowych ze strukturą:

async def main() -> None:
    # Create agents
    # AzureCliCredential uses your current az login. This avoids embedding secrets in code.
    chat_client = AzureOpenAIChatClient(credential=AzureCliCredential())

    # Agent 1. Classifies spam and returns a DetectionResult object.
    # response_format enforces that the LLM returns parsable JSON for the Pydantic model.
    spam_detection_agent = AgentExecutor(
        chat_client.create_agent(
            instructions=(
                "You are a spam detection assistant that identifies spam emails. "
                "Always return JSON with fields is_spam (bool), reason (string), and email_content (string). "
                "Include the original email content in email_content."
            ),
            response_format=DetectionResult,
        ),
        id="spam_detection_agent",
    )

    # Agent 2. Drafts a professional reply. Also uses structured JSON output for reliability.
    email_assistant_agent = AgentExecutor(
        chat_client.create_agent(
            instructions=(
                "You are an email assistant that helps users draft professional responses to emails. "
                "Your input might be a JSON object that includes 'email_content'; base your reply on that content. "
                "Return JSON with a single field 'response' containing the drafted reply."
            ),
            response_format=EmailResponse,
        ),
        id="email_assistant_agent",
    )

Krok 6. Kompilowanie przepływu pracy warunkowego

Utwórz przepływ pracy z krawędziami warunkowymi, które są kierowane na podstawie wyników wykrywania spamu:

    # Build the workflow graph.
    # Start at the spam detector.
    # If not spam, hop to a transformer that creates a new AgentExecutorRequest,
    # then call the email assistant, then finalize.
    # If spam, go directly to the spam handler and finalize.
    workflow = (
        WorkflowBuilder()
        .set_start_executor(spam_detection_agent)
        # Not spam path: transform response -> request for assistant -> assistant -> send email
        .add_edge(spam_detection_agent, to_email_assistant_request, condition=get_condition(False))
        .add_edge(to_email_assistant_request, email_assistant_agent)
        .add_edge(email_assistant_agent, handle_email_response)
        # Spam path: send to spam handler
        .add_edge(spam_detection_agent, handle_spam_classifier_response, condition=get_condition(True))
        .build()
    )

Krok 7. Wykonywanie przepływu pracy

Uruchom przepływ pracy z przykładową zawartością wiadomości e-mail:

    # Read Email content from the sample resource file.
    # This keeps the sample deterministic since the model sees the same email every run.
    email_path = os.path.join(os.path.dirname(os.path.dirname(os.path.realpath(__file__))), "resources", "email.txt")

    with open(email_path) as email_file:  # noqa: ASYNC230
        email = email_file.read()

    # Execute the workflow. Since the start is an AgentExecutor, pass an AgentExecutorRequest.
    # The workflow completes when it becomes idle (no more work to do).
    request = AgentExecutorRequest(messages=[ChatMessage(Role.USER, text=email)], should_respond=True)
    events = await workflow.run(request)
    outputs = events.get_outputs()
    if outputs:
        print(f"Workflow output: {outputs[0]}")


if __name__ == "__main__":
    asyncio.run(main())

Jak działają krawędzie warunkowe

  1. Funkcje warunku: Funkcja get_condition() tworzy predykat, który analizuje zawartość komunikatu i zwraca True lub False, aby określić, czy krawędź powinna być przechodzona.

  2. Inspekcja komunikatów: warunki mogą sprawdzać dowolny aspekt komunikatu, w tym dane ustrukturyzowane z odpowiedzi agenta analizowane za pomocą modeli Pydantic.

  3. Programowanie defensywne: Funkcja warunku obejmuje obsługę błędów, aby zapobiec niepowodzeniu routingu podczas analizowania danych strukturalnych.

  4. Routing dynamiczny: na podstawie wyniku wykrywania spamu wiadomości e-mail są automatycznie kierowane do asystenta poczty e-mail (w przypadku uzasadnionych wiadomości e-mail) lub programu obsługi spamu (w przypadku podejrzanych wiadomości e-mail).

Kluczowe pojęcia

  • Warunki krawędzi: predykaty logiczne, które określają, czy krawędź powinna być przechodzina
  • Strukturalne dane wyjściowe: korzystanie z modeli Pydantic z response_format zapewnia niezawodne analizowanie danych
  • Routing defensywny: funkcje warunku obsługują przypadki krańcowe, aby zapobiec zastoju w przepływie pracy
  • Przekształcanie komunikatów: Funkcje wykonawcze mogą przekształcać typy komunikatów między krokami przepływu pracy

Kompletna implementacja

Aby uzyskać pełną działającą implementację, zobacz przykład edge_condition.py w repozytorium Platformy agentów.

Krawędzie Instrukcji Switch-Case

Tworzenie na krawędziach warunkowych

W poprzednim przykładzie krawędzi warunkowych pokazano trasowanie dwukierunkowe (spam vs. uzasadnione wiadomości e-mail). Jednak wiele rzeczywistych scenariuszy wymaga bardziej zaawansowanych drzew decyzyjnych. Krawędzie przełączników zapewniają bardziej czytelne, bardziej konserwowalne rozwiązanie, gdy trzeba kierować do wielu miejsc docelowych na podstawie różnych warunków.

Co utworzysz za pomocą Switch-Case

Rozszerzysz przepływ pracy przetwarzania wiadomości e-mail, aby obsłużyć trzy ścieżki decyzyjne:

  • NotSpam → Asystent poczty e-mail → wysyłanie wiadomości e-mail
  • Spam → do obsługi funkcji wykonawczej spamu
  • Niępewny → obsługa niepewnego wykonawcy (przypadek domyślny)

Kluczowe ulepszenie polega na zastosowaniu wzorca SwitchBuilder zamiast wielu pojedynczych krawędzi warunkowych, dzięki czemu przepływ pracy jest łatwiejszy do zrozumienia i utrzymania w miarę jak złożoność decyzji rośnie.

Omówione pojęcia

Modele danych dla Switch-Case

Zaktualizuj modele danych, aby obsługiwały klasyfikację trzykierunkową:

/// <summary>
/// Represents the possible decisions for spam detection.
/// </summary>
public enum SpamDecision
{
    NotSpam,
    Spam,
    Uncertain
}

/// <summary>
/// Represents the result of spam detection with enhanced decision support.
/// </summary>
public sealed class DetectionResult
{
    [JsonPropertyName("spam_decision")]
    [JsonConverter(typeof(JsonStringEnumConverter))]
    public SpamDecision spamDecision { get; set; }

    [JsonPropertyName("reason")]
    public string Reason { get; set; } = string.Empty;

    // Email ID is generated by the executor, not the agent
    [JsonIgnore]
    public string EmailId { get; set; } = string.Empty;
}

/// <summary>
/// Represents an email stored in shared state.
/// </summary>
internal sealed class Email
{
    [JsonPropertyName("email_id")]
    public string EmailId { get; set; } = string.Empty;

    [JsonPropertyName("email_content")]
    public string EmailContent { get; set; } = string.Empty;
}

/// <summary>
/// Represents the response from the email assistant.
/// </summary>
public sealed class EmailResponse
{
    [JsonPropertyName("response")]
    public string Response { get; set; } = string.Empty;
}

/// <summary>
/// Constants for shared state scopes.
/// </summary>
internal static class EmailStateConstants
{
    public const string EmailStateScope = "EmailState";
}

Fabryka warunków dla Switch-Case

Utwórz fabrykę warunków wielokrotnego użytku, która generuje predykaty dla każdej decyzji o spamie:

/// <summary>
/// Creates a condition for routing messages based on the expected spam detection result.
/// </summary>
/// <param name="expectedDecision">The expected spam detection decision</param>
/// <returns>A function that evaluates whether a message meets the expected result</returns>
private static Func<object?, bool> GetCondition(SpamDecision expectedDecision) =>
    detectionResult => detectionResult is DetectionResult result && result.spamDecision == expectedDecision;

To podejście fabryczne:

  • Zmniejsza duplikację kodu: jedna funkcja generuje wszystkie predykaty warunku
  • Gwarantuje spójność: wszystkie warunki są zgodne z tym samym wzorcem
  • Upraszcza konserwację: zmiany logiki warunku są wykonywane w jednym miejscu

Ulepszony agent sztucznej inteligencji

Zaktualizuj agenta wykrywania spamu, aby był mniej zdecydowany i zwracał trójkierunkowe klasyfikacje.

/// <summary>
/// Creates a spam detection agent with enhanced uncertainty handling.
/// </summary>
/// <returns>A ChatClientAgent configured for three-way spam detection</returns>
private static ChatClientAgent GetSpamDetectionAgent(IChatClient chatClient) =>
    new(chatClient, new ChatClientAgentOptions(instructions: "You are a spam detection assistant that identifies spam emails. Be less confident in your assessments.")
    {
        ChatOptions = new()
        {
            ResponseFormat = ChatResponseFormat.ForJsonSchema<DetectionResult>()
        }
    });

/// <summary>
/// Creates an email assistant agent (unchanged from conditional edges example).
/// </summary>
/// <returns>A ChatClientAgent configured for email assistance</returns>
private static ChatClientAgent GetEmailAssistantAgent(IChatClient chatClient) =>
    new(chatClient, new ChatClientAgentOptions(instructions: "You are an email assistant that helps users draft responses to emails with professionalism.")
    {
        ChatOptions = new()
        {
            ResponseFormat = ChatResponseFormat.ForJsonSchema<EmailResponse>()
        }
    });

Funkcje wykonawcze przepływu pracy z rozszerzonym routingiem

Zaimplementuj funkcje wykonawcze obsługujące routing trzykierunkowy z funkcją zarządzania stanem udostępnionym:

/// <summary>
/// Executor that detects spam using an AI agent with three-way classification.
/// </summary>
internal sealed class SpamDetectionExecutor : Executor<ChatMessage, DetectionResult>
{
    private readonly AIAgent _spamDetectionAgent;

    public SpamDetectionExecutor(AIAgent spamDetectionAgent) : base("SpamDetectionExecutor")
    {
        this._spamDetectionAgent = spamDetectionAgent;
    }

    public override async ValueTask<DetectionResult> HandleAsync(ChatMessage message, IWorkflowContext context, CancellationToken cancellationToken = default)
    {
        // Generate a random email ID and store the email content in shared state
        var newEmail = new Email
        {
            EmailId = Guid.NewGuid().ToString("N"),
            EmailContent = message.Text
        };
        await context.QueueStateUpdateAsync(newEmail.EmailId, newEmail, scopeName: EmailStateConstants.EmailStateScope);

        // Invoke the agent for enhanced spam detection
        var response = await this._spamDetectionAgent.RunAsync(message);
        var detectionResult = JsonSerializer.Deserialize<DetectionResult>(response.Text);

        detectionResult!.EmailId = newEmail.EmailId;
        return detectionResult;
    }
}

/// <summary>
/// Executor that assists with email responses using an AI agent.
/// </summary>
internal sealed class EmailAssistantExecutor : Executor<DetectionResult, EmailResponse>
{
    private readonly AIAgent _emailAssistantAgent;

    public EmailAssistantExecutor(AIAgent emailAssistantAgent) : base("EmailAssistantExecutor")
    {
        this._emailAssistantAgent = emailAssistantAgent;
    }

    public override async ValueTask<EmailResponse> HandleAsync(DetectionResult message, IWorkflowContext context, CancellationToken cancellationToken = default)
    {
        if (message.spamDecision == SpamDecision.Spam)
        {
            throw new ArgumentException("This executor should only handle non-spam messages.");
        }

        // Retrieve the email content from shared state
        var email = await context.ReadStateAsync<Email>(message.EmailId, scopeName: EmailStateConstants.EmailStateScope);

        // Invoke the agent to draft a response
        var response = await this._emailAssistantAgent.RunAsync(email!.EmailContent);
        var emailResponse = JsonSerializer.Deserialize<EmailResponse>(response.Text);

        return emailResponse!;
    }
}

/// <summary>
/// Executor that sends emails.
/// </summary>
internal sealed class SendEmailExecutor : Executor<EmailResponse>
{
    public SendEmailExecutor() : base("SendEmailExecutor") { }

    public override async ValueTask HandleAsync(EmailResponse message, IWorkflowContext context, CancellationToken cancellationToken = default) =>
        await context.YieldOutputAsync($"Email sent: {message.Response}").ConfigureAwait(false);
}

/// <summary>
/// Executor that handles spam messages.
/// </summary>
internal sealed class HandleSpamExecutor : Executor<DetectionResult>
{
    public HandleSpamExecutor() : base("HandleSpamExecutor") { }

    public override async ValueTask HandleAsync(DetectionResult message, IWorkflowContext context, CancellationToken cancellationToken = default)
    {
        if (message.spamDecision == SpamDecision.Spam)
        {
            await context.YieldOutputAsync($"Email marked as spam: {message.Reason}").ConfigureAwait(false);
        }
        else
        {
            throw new ArgumentException("This executor should only handle spam messages.");
        }
    }
}

/// <summary>
/// Executor that handles uncertain emails requiring manual review.
/// </summary>
internal sealed class HandleUncertainExecutor : Executor<DetectionResult>
{
    public HandleUncertainExecutor() : base("HandleUncertainExecutor") { }

    public override async ValueTask HandleAsync(DetectionResult message, IWorkflowContext context, CancellationToken cancellationToken = default)
    {
        if (message.spamDecision == SpamDecision.Uncertain)
        {
            var email = await context.ReadStateAsync<Email>(message.EmailId, scopeName: EmailStateConstants.EmailStateScope);
            await context.YieldOutputAsync($"Email marked as uncertain: {message.Reason}. Email content: {email?.EmailContent}");
        }
        else
        {
            throw new ArgumentException("This executor should only handle uncertain spam decisions.");
        }
    }
}

Tworzenie przepływu pracy za pomocą wzorca Switch-Case

Zastąp wiele krawędzi warunkowych wzorcem czystszego przypadku przełącznika:

public static class Program
{
    private static async Task Main()
    {
        // Set up the Azure OpenAI client
        var endpoint = Environment.GetEnvironmentVariable("AZURE_OPENAI_ENDPOINT") ?? throw new Exception("AZURE_OPENAI_ENDPOINT is not set.");
        var deploymentName = Environment.GetEnvironmentVariable("AZURE_OPENAI_DEPLOYMENT_NAME") ?? "gpt-4o-mini";
        var chatClient = new AzureOpenAIClient(new Uri(endpoint), new AzureCliCredential()).GetChatClient(deploymentName).AsIChatClient();

        // Create agents
        AIAgent spamDetectionAgent = GetSpamDetectionAgent(chatClient);
        AIAgent emailAssistantAgent = GetEmailAssistantAgent(chatClient);

        // Create executors
        var spamDetectionExecutor = new SpamDetectionExecutor(spamDetectionAgent);
        var emailAssistantExecutor = new EmailAssistantExecutor(emailAssistantAgent);
        var sendEmailExecutor = new SendEmailExecutor();
        var handleSpamExecutor = new HandleSpamExecutor();
        var handleUncertainExecutor = new HandleUncertainExecutor();

        // Build the workflow using switch-case for cleaner three-way routing
        WorkflowBuilder builder = new(spamDetectionExecutor);
        builder.AddSwitch(spamDetectionExecutor, switchBuilder =>
            switchBuilder
            .AddCase(
                GetCondition(expectedDecision: SpamDecision.NotSpam),
                emailAssistantExecutor
            )
            .AddCase(
                GetCondition(expectedDecision: SpamDecision.Spam),
                handleSpamExecutor
            )
            .WithDefault(
                handleUncertainExecutor
            )
        )
        // After the email assistant writes a response, it will be sent to the send email executor
        .AddEdge(emailAssistantExecutor, sendEmailExecutor)
        .WithOutputFrom(handleSpamExecutor, sendEmailExecutor, handleUncertainExecutor);

        var workflow = builder.Build();

        // Read an email from a text file (use ambiguous content for demonstration)
        string email = Resources.Read("ambiguous_email.txt");

        // Execute the workflow
        StreamingRun run = await InProcessExecution.StreamAsync(workflow, new ChatMessage(ChatRole.User, email));
        await run.TrySendMessageAsync(new TurnToken(emitEvents: true));
        await foreach (WorkflowEvent evt in run.WatchStreamAsync().ConfigureAwait(false))
        {
            if (evt is WorkflowOutputEvent outputEvent)
            {
                Console.WriteLine($"{outputEvent}");
            }
        }
    }
}

korzyści Switch-Case

  1. Czystsza składnia: Zapewnia SwitchBuilder bardziej czytelną alternatywę dla wielu krawędzi warunkowych
  2. Uporządkowana ocena: Przypadki są oceniane sekwencyjnie, przy czym proces zatrzymuje się przy pierwszym dopasowaniu.
  3. GwarantowanyWithDefault() Routing: metoda zapewnia, że komunikaty nigdy nie utkwią
  4. Lepsza obsługa: dodawanie nowych przypadków wymaga minimalnych zmian w strukturze przepływu pracy
  5. Bezpieczeństwo typu: każdy wykonawca weryfikuje swoje dane wejściowe, aby wcześnie przechwytywać błędy routingu

Porównanie wzorców

Przed (warunki dotyczące krawędzi):

var workflow = new WorkflowBuilder(spamDetectionExecutor)
    .AddEdge(spamDetectionExecutor, emailAssistantExecutor, condition: GetCondition(expectedResult: false))
    .AddEdge(spamDetectionExecutor, handleSpamExecutor, condition: GetCondition(expectedResult: true))
    // No clean way to handle a third case
    .WithOutputFrom(handleSpamExecutor, sendEmailExecutor)
    .Build();

Po (Switch-Case):

WorkflowBuilder builder = new(spamDetectionExecutor);
builder.AddSwitch(spamDetectionExecutor, switchBuilder =>
    switchBuilder
    .AddCase(GetCondition(SpamDecision.NotSpam), emailAssistantExecutor)
    .AddCase(GetCondition(SpamDecision.Spam), handleSpamExecutor)
    .WithDefault(handleUncertainExecutor)  // Clean default case
)
// Continue building the rest of the workflow

Wzorzec switch-case lepiej skalowuje się wraz ze wzrostem liczby decyzji dotyczących routingu, a przypadek domyślny zapewnia zabezpieczenie dla nieoczekiwanych wartości.

Uruchamianie przykładu

Po uruchomieniu tego przepływu pracy z niejednoznaczną zawartością wiadomości e-mail:

Email marked as uncertain: This email contains promotional language but might be from a legitimate business contact, requiring human review for proper classification.

Spróbuj zmienić zawartość wiadomości e-mail na coś wyraźnie spam lub wyraźnie legalne, aby zobaczyć działanie różnych ścieżek routingu.

Kompletna implementacja

Aby uzyskać pełną działającą implementację, zobacz ten przykład w repozytorium Platformy agentów.

Tworzenie na krawędziach warunkowych

W poprzednim przykładzie krawędzi warunkowych pokazano trasowanie dwukierunkowe (spam vs. uzasadnione wiadomości e-mail). Jednak wiele rzeczywistych scenariuszy wymaga bardziej zaawansowanych drzew decyzyjnych. Krawędzie przełączników zapewniają bardziej czytelne, bardziej konserwowalne rozwiązanie, gdy trzeba kierować do wielu miejsc docelowych na podstawie różnych warunków.

Co stworzysz następnie

Rozszerzysz przepływ pracy przetwarzania wiadomości e-mail, aby obsłużyć trzy ścieżki decyzyjne:

  • NotSpam → Asystent poczty e-mail → wysyłanie wiadomości e-mail
  • Spam → oznacz jako spam
  • Niepewna → flaga ręcznego przeglądu (przypadek domyślny)

Kluczowa poprawa polega na użyciu pojedynczej grupy krawędzi switch-case w miejsce wielu pojedynczych krawędzi warunkowych, co upraszcza zrozumienie i utrzymanie przepływu pracy wraz ze wzrostem złożoności decyzji.

Omówione pojęcia

Ulepszone modele danych

Zaktualizuj modele danych, aby obsługiwały klasyfikację trzykierunkową:

from typing import Literal

class DetectionResultAgent(BaseModel):
    """Structured output returned by the spam detection agent."""

    # The agent classifies the email into one of three categories
    spam_decision: Literal["NotSpam", "Spam", "Uncertain"]
    reason: str

class EmailResponse(BaseModel):
    """Structured output returned by the email assistant agent."""

    response: str

@dataclass
class DetectionResult:
    """Internal typed payload used for routing and downstream handling."""

    spam_decision: str
    reason: str
    email_id: str

@dataclass
class Email:
    """In memory record of the email content stored in shared state."""

    email_id: str
    email_content: str

Fabryka warunków typu Switch-Case

Utwórz fabrykę warunków wielokrotnego użytku, która generuje predykaty dla każdej decyzji o spamie:

def get_case(expected_decision: str):
    """Factory that returns a predicate matching a specific spam_decision value."""

    def condition(message: Any) -> bool:
        # Only match when the upstream payload is a DetectionResult with the expected decision
        return isinstance(message, DetectionResult) and message.spam_decision == expected_decision

    return condition

To podejście fabryczne:

  • Zmniejsza duplikację kodu: jedna funkcja generuje wszystkie predykaty warunku
  • Gwarantuje spójność: wszystkie warunki są zgodne z tym samym wzorcem
  • Upraszcza konserwację: zmiany logiki warunku są wykonywane w jednym miejscu

Wykonawcy przepływu pracy z udostępnionym stanem

Zaimplementuj funkcje wykonawcze używające stanu udostępnionego, aby uniknąć przekazywania dużej zawartości wiadomości e-mail za pośrednictwem każdego kroku przepływu pracy:

EMAIL_STATE_PREFIX = "email:"
CURRENT_EMAIL_ID_KEY = "current_email_id"

@executor(id="store_email")
async def store_email(email_text: str, ctx: WorkflowContext[AgentExecutorRequest]) -> None:
    """Store email content once and pass around a lightweight ID reference."""

    # Persist the raw email content in shared state
    new_email = Email(email_id=str(uuid4()), email_content=email_text)
    await ctx.set_shared_state(f"{EMAIL_STATE_PREFIX}{new_email.email_id}", new_email)
    await ctx.set_shared_state(CURRENT_EMAIL_ID_KEY, new_email.email_id)

    # Forward email to spam detection agent
    await ctx.send_message(
        AgentExecutorRequest(messages=[ChatMessage(Role.USER, text=new_email.email_content)], should_respond=True)
    )

@executor(id="to_detection_result")
async def to_detection_result(response: AgentExecutorResponse, ctx: WorkflowContext[DetectionResult]) -> None:
    """Transform agent response into a typed DetectionResult with email ID."""

    # Parse the agent's structured JSON output
    parsed = DetectionResultAgent.model_validate_json(response.agent_run_response.text)
    email_id: str = await ctx.get_shared_state(CURRENT_EMAIL_ID_KEY)

    # Create typed message for switch-case routing
    await ctx.send_message(DetectionResult(
        spam_decision=parsed.spam_decision,
        reason=parsed.reason,
        email_id=email_id
    ))

@executor(id="submit_to_email_assistant")
async def submit_to_email_assistant(detection: DetectionResult, ctx: WorkflowContext[AgentExecutorRequest]) -> None:
    """Handle NotSpam emails by forwarding to the email assistant."""

    # Guard against misrouting
    if detection.spam_decision != "NotSpam":
        raise RuntimeError("This executor should only handle NotSpam messages.")

    # Retrieve original email content from shared state
    email: Email = await ctx.get_shared_state(f"{EMAIL_STATE_PREFIX}{detection.email_id}")
    await ctx.send_message(
        AgentExecutorRequest(messages=[ChatMessage(Role.USER, text=email.email_content)], should_respond=True)
    )

@executor(id="finalize_and_send")
async def finalize_and_send(response: AgentExecutorResponse, ctx: WorkflowContext[Never, str]) -> None:
    """Parse email assistant response and yield final output."""

    parsed = EmailResponse.model_validate_json(response.agent_run_response.text)
    await ctx.yield_output(f"Email sent: {parsed.response}")

@executor(id="handle_spam")
async def handle_spam(detection: DetectionResult, ctx: WorkflowContext[Never, str]) -> None:
    """Handle confirmed spam emails."""

    if detection.spam_decision == "Spam":
        await ctx.yield_output(f"Email marked as spam: {detection.reason}")
    else:
        raise RuntimeError("This executor should only handle Spam messages.")

@executor(id="handle_uncertain")
async def handle_uncertain(detection: DetectionResult, ctx: WorkflowContext[Never, str]) -> None:
    """Handle uncertain classifications that need manual review."""

    if detection.spam_decision == "Uncertain":
        # Include original content for human review
        email: Email | None = await ctx.get_shared_state(f"{EMAIL_STATE_PREFIX}{detection.email_id}")
        await ctx.yield_output(
            f"Email marked as uncertain: {detection.reason}. Email content: {getattr(email, 'email_content', '')}"
        )
    else:
        raise RuntimeError("This executor should only handle Uncertain messages.")

Tworzenie rozszerzonego agenta sztucznej inteligencji

Zaktualizuj agenta wykrywania spamu, aby był mniej zdecydowany i zwracał trójkierunkowe klasyfikacje.

async def main():
    chat_client = AzureOpenAIChatClient(credential=AzureCliCredential())

    # Enhanced spam detection agent with three-way classification
    spam_detection_agent = AgentExecutor(
        chat_client.create_agent(
            instructions=(
                "You are a spam detection assistant that identifies spam emails. "
                "Be less confident in your assessments. "
                "Always return JSON with fields 'spam_decision' (one of NotSpam, Spam, Uncertain) "
                "and 'reason' (string)."
            ),
            response_format=DetectionResultAgent,
        ),
        id="spam_detection_agent",
    )

    # Email assistant remains the same
    email_assistant_agent = AgentExecutor(
        chat_client.create_agent(
            instructions=(
                "You are an email assistant that helps users draft responses to emails with professionalism."
            ),
            response_format=EmailResponse,
        ),
        id="email_assistant_agent",
    )

Tworzenie przepływu pracy za pomocą grupy Switch-Case Edge

Zastąp wiele krawędzi warunkowych pojedynczą instrukcją switch-case:

    # Build workflow using switch-case for cleaner three-way routing
    workflow = (
        WorkflowBuilder()
        .set_start_executor(store_email)
        .add_edge(store_email, spam_detection_agent)
        .add_edge(spam_detection_agent, to_detection_result)
        .add_switch_case_edge_group(
            to_detection_result,
            [
                # Explicit cases for specific decisions
                Case(condition=get_case("NotSpam"), target=submit_to_email_assistant),
                Case(condition=get_case("Spam"), target=handle_spam),
                # Default case catches anything that doesn't match above
                Default(target=handle_uncertain),
            ],
        )
        .add_edge(submit_to_email_assistant, email_assistant_agent)
        .add_edge(email_assistant_agent, finalize_and_send)
        .build()
    )

Wykonywanie i testowanie

Uruchom przepływ pracy z niejasną treścią wiadomości e-mail, która demonstruje trasowanie w trzech kierunkach.

    # Use ambiguous email content that might trigger uncertain classification
    email = (
        "Hey there, I noticed you might be interested in our latest offer—no pressure, but it expires soon. "
        "Let me know if you'd like more details."
    )

    # Execute and display results
    events = await workflow.run(email)
    outputs = events.get_outputs()
    if outputs:
        for output in outputs:
            print(f"Workflow output: {output}")

Kluczowe zalety Switch-Case Edge

  1. Składnia czystsza: jedna grupa krawędzi zamiast wielu krawędzi warunkowych
  2. Uporządkowana ocena: Przypadki są oceniane sekwencyjnie, przy czym proces zatrzymuje się przy pierwszym dopasowaniu.
  3. Gwarantowany routing: przypadek domyślny gwarantuje, że komunikaty nigdy nie utknęły
  4. Lepsza obsługa: dodawanie nowych przypadków wymaga minimalnych zmian
  5. Bezpieczeństwo typu: Każdy wykonawca weryfikuje swoje dane wejściowe w celu przechwycenia błędów routingu

Porównanie: warunkowe a Switch-Case

Przed (warunki dotyczące krawędzi):

.add_edge(detector, handler_a, condition=lambda x: x.result == "A")
.add_edge(detector, handler_b, condition=lambda x: x.result == "B")
.add_edge(detector, handler_c, condition=lambda x: x.result == "C")

Po (Switch-Case):

.add_switch_case_edge_group(
    detector,
    [
        Case(condition=lambda x: x.result == "A", target=handler_a),
        Case(condition=lambda x: x.result == "B", target=handler_b),
        Default(target=handler_c),  # Catches everything else
    ],
)

Wzorzec switch-case lepiej skalowuje się wraz ze wzrostem liczby decyzji dotyczących routingu, a przypadek domyślny zapewnia zabezpieczenie dla nieoczekiwanych wartości.

przykładowy kod Switch-Case

Aby uzyskać pełną działającą implementację, zobacz przykład switch_case_edge_group.py w repozytorium Platformy agentów.

Krawędzie do wielokrotnego wyboru

Poza instrukcją switch-case: Wielokrotna selekcja routingu

Podczas gdy krawędzie przełączników kierują komunikaty do dokładnie jednego miejsca docelowego, rzeczywiste przepływy pracy często muszą wyzwalać wiele równoległych operacji na podstawie cech danych. Krawędzie z partycjami (zaimplementowane jako krawędzie rozgałęzienia z partycjonerami) umożliwiają zaawansowane wzorce rozgałęzienia, w których pojedyncza wiadomość może aktywować wiele podrzędnych wykonawców jednocześnie.

Zaawansowany przepływ pracy przetwarzania wiadomości e-mail

Korzystając z przykładu switch-case, utworzysz ulepszony system przetwarzania wiadomości e-mail, który demonstruje zaawansowaną logikę routingu:

  • Spamowe wiadomości e-mail → pojedynczy mechanizm obsługi spamu (jak switch-case)
  • Prawidłowe wiadomości e-mailZawsze uruchamiaj asystenta poczty e-mail + Warunkowo uruchamiaj podsumowywanie dla długich wiadomości e-mail
  • Niepewne wiadomości e-mail → pojedynczy niepewny handler (jak switch-case)
  • Trwałość bazy danych → wyzwalana zarówno dla krótkich wiadomości e-mail, jak i podsumowanych długich wiadomości e-mail

Ten wzorzec umożliwia równoległe potoki przetwarzania, które dostosowują się do cech zawartości.

Omówione pojęcia

Modele danych dla wielokrotnego wyboru

Rozszerz modele danych, aby obsługiwać analizę długości wiadomości e-mail i podsumowywanie:

/// <summary>
/// Represents the result of enhanced email analysis with additional metadata.
/// </summary>
public sealed class AnalysisResult
{
    [JsonPropertyName("spam_decision")]
    [JsonConverter(typeof(JsonStringEnumConverter))]
    public SpamDecision spamDecision { get; set; }

    [JsonPropertyName("reason")]
    public string Reason { get; set; } = string.Empty;

    // Additional properties for sophisticated routing
    [JsonIgnore]
    public int EmailLength { get; set; }

    [JsonIgnore]
    public string EmailSummary { get; set; } = string.Empty;

    [JsonIgnore]
    public string EmailId { get; set; } = string.Empty;
}

/// <summary>
/// Represents the response from the email assistant.
/// </summary>
public sealed class EmailResponse
{
    [JsonPropertyName("response")]
    public string Response { get; set; } = string.Empty;
}

/// <summary>
/// Represents the response from the email summary agent.
/// </summary>
public sealed class EmailSummary
{
    [JsonPropertyName("summary")]
    public string Summary { get; set; } = string.Empty;
}

/// <summary>
/// A custom workflow event for database operations.
/// </summary>
internal sealed class DatabaseEvent(string message) : WorkflowEvent(message) { }

/// <summary>
/// Constants for email processing thresholds.
/// </summary>
public static class EmailProcessingConstants
{
    public const int LongEmailThreshold = 100;
}

Funkcja przypisywania celów: Serce wielokrotnego wyboru

Funkcja przypisania docelowego określa, które funkcje wykonawcze powinny odbierać poszczególne komunikaty:

/// <summary>
/// Creates a target assigner for routing messages based on the analysis result.
/// </summary>
/// <returns>A function that takes an analysis result and returns the target partitions.</returns>
private static Func<AnalysisResult?, int, IEnumerable<int>> GetTargetAssigner()
{
    return (analysisResult, targetCount) =>
    {
        if (analysisResult is not null)
        {
            if (analysisResult.spamDecision == SpamDecision.Spam)
            {
                return [0]; // Route only to spam handler (index 0)
            }
            else if (analysisResult.spamDecision == SpamDecision.NotSpam)
            {
                // Always route to email assistant (index 1)
                List<int> targets = [1];

                // Conditionally add summarizer for long emails (index 2)
                if (analysisResult.EmailLength > EmailProcessingConstants.LongEmailThreshold)
                {
                    targets.Add(2);
                }

                return targets;
            }
            else // Uncertain
            {
                return [3]; // Route only to uncertain handler (index 3)
            }
        }
        throw new ArgumentException("Invalid analysis result.");
    };
}

Kluczowe cechy funkcji przypisywania celów

  1. Dynamiczny wybór celu: zwraca listę indeksów wykonawczych do aktywowania
  2. Routing z uwzględnieniem zawartości: podejmuje decyzje na podstawie właściwości wiadomości, takich jak długość e-maila
  3. Przetwarzanie równoległe: wiele obiektów docelowych może być wykonywanych jednocześnie
  4. Logika warunkowa: złożone rozgałęzianie na podstawie wielu kryteriów

Ulepszone egzekutory przepływu pracy

Zaimplementuj funkcje wykonawcze obsługujące zaawansowaną analizę i routing:

/// <summary>
/// Executor that analyzes emails using an AI agent with enhanced analysis.
/// </summary>
internal sealed class EmailAnalysisExecutor : Executor<ChatMessage, AnalysisResult>
{
    private readonly AIAgent _emailAnalysisAgent;

    public EmailAnalysisExecutor(AIAgent emailAnalysisAgent) : base("EmailAnalysisExecutor")
    {
        this._emailAnalysisAgent = emailAnalysisAgent;
    }

    public override async ValueTask<AnalysisResult> HandleAsync(ChatMessage message, IWorkflowContext context, CancellationToken cancellationToken = default)
    {
        // Generate a random email ID and store the email content
        var newEmail = new Email
        {
            EmailId = Guid.NewGuid().ToString("N"),
            EmailContent = message.Text
        };
        await context.QueueStateUpdateAsync(newEmail.EmailId, newEmail, scopeName: EmailStateConstants.EmailStateScope);

        // Invoke the agent for enhanced analysis
        var response = await this._emailAnalysisAgent.RunAsync(message);
        var analysisResult = JsonSerializer.Deserialize<AnalysisResult>(response.Text);

        // Enrich with metadata for routing decisions
        analysisResult!.EmailId = newEmail.EmailId;
        analysisResult.EmailLength = newEmail.EmailContent.Length;

        return analysisResult;
    }
}

/// <summary>
/// Executor that assists with email responses using an AI agent.
/// </summary>
internal sealed class EmailAssistantExecutor : Executor<AnalysisResult, EmailResponse>
{
    private readonly AIAgent _emailAssistantAgent;

    public EmailAssistantExecutor(AIAgent emailAssistantAgent) : base("EmailAssistantExecutor")
    {
        this._emailAssistantAgent = emailAssistantAgent;
    }

    public override async ValueTask<EmailResponse> HandleAsync(AnalysisResult message, IWorkflowContext context, CancellationToken cancellationToken = default)
    {
        if (message.spamDecision == SpamDecision.Spam)
        {
            throw new ArgumentException("This executor should only handle non-spam messages.");
        }

        // Retrieve the email content from shared state
        var email = await context.ReadStateAsync<Email>(message.EmailId, scopeName: EmailStateConstants.EmailStateScope);

        // Invoke the agent to draft a response
        var response = await this._emailAssistantAgent.RunAsync(email!.EmailContent);
        var emailResponse = JsonSerializer.Deserialize<EmailResponse>(response.Text);

        return emailResponse!;
    }
}

/// <summary>
/// Executor that summarizes emails using an AI agent for long emails.
/// </summary>
internal sealed class EmailSummaryExecutor : Executor<AnalysisResult, AnalysisResult>
{
    private readonly AIAgent _emailSummaryAgent;

    public EmailSummaryExecutor(AIAgent emailSummaryAgent) : base("EmailSummaryExecutor")
    {
        this._emailSummaryAgent = emailSummaryAgent;
    }

    public override async ValueTask<AnalysisResult> HandleAsync(AnalysisResult message, IWorkflowContext context, CancellationToken cancellationToken = default)
    {
        // Read the email content from shared state
        var email = await context.ReadStateAsync<Email>(message.EmailId, scopeName: EmailStateConstants.EmailStateScope);

        // Generate summary for long emails
        var response = await this._emailSummaryAgent.RunAsync(email!.EmailContent);
        var emailSummary = JsonSerializer.Deserialize<EmailSummary>(response.Text);

        // Enrich the analysis result with the summary
        message.EmailSummary = emailSummary!.Summary;

        return message;
    }
}

/// <summary>
/// Executor that sends emails.
/// </summary>
internal sealed class SendEmailExecutor : Executor<EmailResponse>
{
    public SendEmailExecutor() : base("SendEmailExecutor") { }

    public override async ValueTask HandleAsync(EmailResponse message, IWorkflowContext context, CancellationToken cancellationToken = default) =>
        await context.YieldOutputAsync($"Email sent: {message.Response}");
}

/// <summary>
/// Executor that handles spam messages.
/// </summary>
internal sealed class HandleSpamExecutor : Executor<AnalysisResult>
{
    public HandleSpamExecutor() : base("HandleSpamExecutor") { }

    public override async ValueTask HandleAsync(AnalysisResult message, IWorkflowContext context, CancellationToken cancellationToken = default)
    {
        if (message.spamDecision == SpamDecision.Spam)
        {
            await context.YieldOutputAsync($"Email marked as spam: {message.Reason}");
        }
        else
        {
            throw new ArgumentException("This executor should only handle spam messages.");
        }
    }
}

/// <summary>
/// Executor that handles uncertain messages requiring manual review.
/// </summary>
internal sealed class HandleUncertainExecutor : Executor<AnalysisResult>
{
    public HandleUncertainExecutor() : base("HandleUncertainExecutor") { }

    public override async ValueTask HandleAsync(AnalysisResult message, IWorkflowContext context, CancellationToken cancellationToken = default)
    {
        if (message.spamDecision == SpamDecision.Uncertain)
        {
            var email = await context.ReadStateAsync<Email>(message.EmailId, scopeName: EmailStateConstants.EmailStateScope);
            await context.YieldOutputAsync($"Email marked as uncertain: {message.Reason}. Email content: {email?.EmailContent}");
        }
        else
        {
            throw new ArgumentException("This executor should only handle uncertain spam decisions.");
        }
    }
}

/// <summary>
/// Executor that handles database access with custom events.
/// </summary>
internal sealed class DatabaseAccessExecutor : Executor<AnalysisResult>
{
    public DatabaseAccessExecutor() : base("DatabaseAccessExecutor") { }

    public override async ValueTask HandleAsync(AnalysisResult message, IWorkflowContext context, CancellationToken cancellationToken = default)
    {
        // Simulate database operations
        await context.ReadStateAsync<Email>(message.EmailId, scopeName: EmailStateConstants.EmailStateScope);
        await Task.Delay(100); // Simulate database access delay

        // Emit custom database event for monitoring
        await context.AddEventAsync(new DatabaseEvent($"Email {message.EmailId} saved to database."));
    }
}

Ulepszoni agenci sztucznej inteligencji

Utwórz agentów na potrzeby analizy, pomocy i podsumowania:

/// <summary>
/// Create an enhanced email analysis agent.
/// </summary>
/// <returns>A ChatClientAgent configured for comprehensive email analysis</returns>
private static ChatClientAgent GetEmailAnalysisAgent(IChatClient chatClient) =>
    new(chatClient, new ChatClientAgentOptions(instructions: "You are a spam detection assistant that identifies spam emails.")
    {
        ChatOptions = new()
        {
            ResponseFormat = ChatResponseFormat.ForJsonSchema<AnalysisResult>()
        }
    });

/// <summary>
/// Creates an email assistant agent.
/// </summary>
/// <returns>A ChatClientAgent configured for email assistance</returns>
private static ChatClientAgent GetEmailAssistantAgent(IChatClient chatClient) =>
    new(chatClient, new ChatClientAgentOptions(instructions: "You are an email assistant that helps users draft responses to emails with professionalism.")
    {
        ChatOptions = new()
        {
            ResponseFormat = ChatResponseFormat.ForJsonSchema<EmailResponse>()
        }
    });

/// <summary>
/// Creates an agent that summarizes emails.
/// </summary>
/// <returns>A ChatClientAgent configured for email summarization</returns>
private static ChatClientAgent GetEmailSummaryAgent(IChatClient chatClient) =>
    new(chatClient, new ChatClientAgentOptions(instructions: "You are an assistant that helps users summarize emails.")
    {
        ChatOptions = new()
        {
            ResponseFormat = ChatResponseFormat.ForJsonSchema<EmailSummary>()
        }
    });

Konstrukcja przepływu pracy wielokrotnego wyboru

Skonstruuj przepływ pracy z zaawansowanym routingiem i przetwarzaniem równoległym:

public static class Program
{
    private static async Task Main()
    {
        // Set up the Azure OpenAI client
        var endpoint = Environment.GetEnvironmentVariable("AZURE_OPENAI_ENDPOINT") ?? throw new Exception("AZURE_OPENAI_ENDPOINT is not set.");
        var deploymentName = Environment.GetEnvironmentVariable("AZURE_OPENAI_DEPLOYMENT_NAME") ?? "gpt-4o-mini";
        var chatClient = new AzureOpenAIClient(new Uri(endpoint), new AzureCliCredential()).GetChatClient(deploymentName).AsIChatClient();

        // Create agents
        AIAgent emailAnalysisAgent = GetEmailAnalysisAgent(chatClient);
        AIAgent emailAssistantAgent = GetEmailAssistantAgent(chatClient);
        AIAgent emailSummaryAgent = GetEmailSummaryAgent(chatClient);

        // Create executors
        var emailAnalysisExecutor = new EmailAnalysisExecutor(emailAnalysisAgent);
        var emailAssistantExecutor = new EmailAssistantExecutor(emailAssistantAgent);
        var emailSummaryExecutor = new EmailSummaryExecutor(emailSummaryAgent);
        var sendEmailExecutor = new SendEmailExecutor();
        var handleSpamExecutor = new HandleSpamExecutor();
        var handleUncertainExecutor = new HandleUncertainExecutor();
        var databaseAccessExecutor = new DatabaseAccessExecutor();

        // Build the workflow with multi-selection fan-out
        WorkflowBuilder builder = new(emailAnalysisExecutor);
        builder.AddFanOutEdge(
            emailAnalysisExecutor,
            targets: [
                handleSpamExecutor,        // Index 0: Spam handler
                emailAssistantExecutor,    // Index 1: Email assistant (always for NotSpam)
                emailSummaryExecutor,      // Index 2: Summarizer (conditionally for long NotSpam)
                handleUncertainExecutor,   // Index 3: Uncertain handler
            ],
            targetSelector: GetTargetAssigner()
        )
        // Email assistant branch
        .AddEdge(emailAssistantExecutor, sendEmailExecutor)

        // Database persistence: conditional routing
        .AddEdge<AnalysisResult>(
            emailAnalysisExecutor,
            databaseAccessExecutor,
            condition: analysisResult => analysisResult?.EmailLength <= EmailProcessingConstants.LongEmailThreshold) // Short emails
        .AddEdge(emailSummaryExecutor, databaseAccessExecutor) // Long emails with summary

        .WithOutputFrom(handleUncertainExecutor, handleSpamExecutor, sendEmailExecutor);

        var workflow = builder.Build();

        // Read a moderately long email to trigger both assistant and summarizer
        string email = Resources.Read("email.txt");

        // Execute the workflow with custom event handling
        StreamingRun run = await InProcessExecution.StreamAsync(workflow, new ChatMessage(ChatRole.User, email));
        await run.TrySendMessageAsync(new TurnToken(emitEvents: true));
        await foreach (WorkflowEvent evt in run.WatchStreamAsync().ConfigureAwait(false))
        {
            if (evt is WorkflowOutputEvent outputEvent)
            {
                Console.WriteLine($"Output: {outputEvent}");
            }

            if (evt is DatabaseEvent databaseEvent)
            {
                Console.WriteLine($"Database: {databaseEvent}");
            }
        }
    }
}

Porównanie wzorców: wybór wielokrotny a Switch-Case

Wzorzec Switch-Case (poprzedni):

// One input → exactly one output
builder.AddSwitch(spamDetectionExecutor, switchBuilder =>
    switchBuilder
    .AddCase(GetCondition(SpamDecision.NotSpam), emailAssistantExecutor)
    .AddCase(GetCondition(SpamDecision.Spam), handleSpamExecutor)
    .WithDefault(handleUncertainExecutor)
)

Wzorzec wyboru wielokrotnego:

// One input → one or more outputs (dynamic fan-out)
builder.AddFanOutEdge(
    emailAnalysisExecutor,
    targets: [handleSpamExecutor, emailAssistantExecutor, emailSummaryExecutor, handleUncertainExecutor],
    targetSelector: GetTargetAssigner() // Returns list of target indices
)

Kluczowe zalety opcji wielokrotnego wyboru krawędzi

  1. Przetwarzanie równoległe: wiele gałęzi może być wykonywanych jednocześnie
  2. Warunkowe rozgałęzienie: liczba celów różni się w zależności od zawartości
  3. Routing obsługujący zawartość: decyzje oparte na właściwościach komunikatów, a nie tylko typ
  4. Efektywne użycie zasobów: aktywowane są tylko niezbędne gałęzie
  5. Złożona logika biznesowa: obsługuje zaawansowane scenariusze routingu

Uruchamianie przykładu wyboru wielokrotnego

Podczas uruchamiania tej procedury z długą wiadomością e-mail:

Output: Email sent: [Professional response generated by AI]
Database: Email abc123 saved to database.

Gdy uruchomisz krótką wiadomość e-mail, funkcja podsumowania zostanie pominięta.

Output: Email sent: [Professional response generated by AI]
Database: Email def456 saved to database.

Real-World Przypadki użycia

  • Systemy poczty e-mail: Ścieżka do asystenta odpowiedzi + archiwizacja + analityka (warunkowo)
  • Przetwarzanie zawartości: wyzwalanie transkrypcji + tłumaczenie i analiza (na podstawie typu zawartości)
  • Przetwarzanie zamówień: przekierowanie do realizacji, rozliczenia, powiadomień (na podstawie cech zamówienia)
  • Potoki danych: wyzwalanie różnych przepływów analitycznych na podstawie cech danych

Kompletna implementacja wielokrotnego wyboru

Aby uzyskać pełną działającą implementację, zobacz ten przykład w repozytorium Platformy agentów.

Poza instrukcją switch-case: Wielokrotna selekcja routingu

Podczas gdy krawędzie przełączników kierują komunikaty do dokładnie jednego miejsca docelowego, rzeczywiste przepływy pracy często muszą wyzwalać wiele równoległych operacji na podstawie cech danych. Partycjonowane krawędzie (zaimplementowane jako grupy krawędzi wielokrotnego wyboru) umożliwiają zaawansowane wzorce fan-out, w których pojedynczy komunikat może aktywować wiele podrzędnych funkcji wykonawczych jednocześnie.

Zaawansowany przepływ pracy przetwarzania wiadomości e-mail

Korzystając z przykładu switch-case, utworzysz ulepszony system przetwarzania wiadomości e-mail, który demonstruje zaawansowaną logikę routingu:

  • Spamowe wiadomości e-mail → pojedynczy mechanizm obsługi spamu (jak switch-case)
  • Prawidłowe wiadomości e-mailZawsze uruchamiaj asystenta poczty e-mail + Warunkowo uruchamiaj podsumowywanie dla długich wiadomości e-mail
  • Niepewne wiadomości e-mail → pojedynczy niepewny handler (jak switch-case)
  • Trwałość bazy danych → wyzwalana zarówno dla krótkich wiadomości e-mail, jak i podsumowanych długich wiadomości e-mail

Ten wzorzec umożliwia równoległe potoki przetwarzania, które dostosowują się do cech zawartości.

Omówione pojęcia

Ulepszony model danych dla wyboru wielu opcji

Rozszerz modele danych, aby obsługiwać analizę długości wiadomości e-mail i podsumowywanie:

class AnalysisResultAgent(BaseModel):
    """Enhanced structured output from email analysis agent."""

    spam_decision: Literal["NotSpam", "Spam", "Uncertain"]
    reason: str

class EmailResponse(BaseModel):
    """Response from email assistant."""

    response: str

class EmailSummaryModel(BaseModel):
    """Summary generated by email summary agent."""

    summary: str

@dataclass
class AnalysisResult:
    """Internal analysis result with email metadata for routing decisions."""

    spam_decision: str
    reason: str
    email_length: int  # Used for conditional routing
    email_summary: str  # Populated by summary agent
    email_id: str

@dataclass
class Email:
    """Email content stored in shared state."""

    email_id: str
    email_content: str

# Custom event for database operations
class DatabaseEvent(WorkflowEvent):
    """Custom event for tracking database operations."""
    pass

Funkcja wyboru: Serce wielokrotnego zaznaczania

Funkcja wyboru określa, które funkcje wykonawcze powinny odbierać poszczególne komunikaty:

LONG_EMAIL_THRESHOLD = 100

def select_targets(analysis: AnalysisResult, target_ids: list[str]) -> list[str]:
    """Intelligent routing based on spam decision and email characteristics."""

    # Target order: [handle_spam, submit_to_email_assistant, summarize_email, handle_uncertain]
    handle_spam_id, submit_to_email_assistant_id, summarize_email_id, handle_uncertain_id = target_ids

    if analysis.spam_decision == "Spam":
        # Route only to spam handler
        return [handle_spam_id]

    elif analysis.spam_decision == "NotSpam":
        # Always route to email assistant
        targets = [submit_to_email_assistant_id]

        # Conditionally add summarizer for long emails
        if analysis.email_length > LONG_EMAIL_THRESHOLD:
            targets.append(summarize_email_id)

        return targets

    else:  # Uncertain
        # Route only to uncertain handler
        return [handle_uncertain_id]

Najważniejsze funkcje funkcji wyboru

  1. Dynamiczny wybór celu: zwraca listę identyfikatorów wykonawcy do aktywowania
  2. Routing obsługujący zawartość: podejmuje decyzje na podstawie właściwości komunikatów
  3. Przetwarzanie równoległe: wiele obiektów docelowych może być wykonywanych jednocześnie
  4. Logika warunkowa: złożone rozgałęzianie na podstawie wielu kryteriów

Wykonawcy przepływu pracy wielokrotnego wyboru

Zaimplementuj funkcje wykonawcze obsługujące rozszerzoną analizę i routing:

EMAIL_STATE_PREFIX = "email:"
CURRENT_EMAIL_ID_KEY = "current_email_id"

@executor(id="store_email")
async def store_email(email_text: str, ctx: WorkflowContext[AgentExecutorRequest]) -> None:
    """Store email and initiate analysis."""

    new_email = Email(email_id=str(uuid4()), email_content=email_text)
    await ctx.set_shared_state(f"{EMAIL_STATE_PREFIX}{new_email.email_id}", new_email)
    await ctx.set_shared_state(CURRENT_EMAIL_ID_KEY, new_email.email_id)

    await ctx.send_message(
        AgentExecutorRequest(messages=[ChatMessage(Role.USER, text=new_email.email_content)], should_respond=True)
    )

@executor(id="to_analysis_result")
async def to_analysis_result(response: AgentExecutorResponse, ctx: WorkflowContext[AnalysisResult]) -> None:
    """Transform agent response into enriched analysis result."""

    parsed = AnalysisResultAgent.model_validate_json(response.agent_run_response.text)
    email_id: str = await ctx.get_shared_state(CURRENT_EMAIL_ID_KEY)
    email: Email = await ctx.get_shared_state(f"{EMAIL_STATE_PREFIX}{email_id}")

    # Create enriched analysis result with email length for routing decisions
    await ctx.send_message(
        AnalysisResult(
            spam_decision=parsed.spam_decision,
            reason=parsed.reason,
            email_length=len(email.email_content),  # Key for conditional routing
            email_summary="",
            email_id=email_id,
        )
    )

@executor(id="submit_to_email_assistant")
async def submit_to_email_assistant(analysis: AnalysisResult, ctx: WorkflowContext[AgentExecutorRequest]) -> None:
    """Handle legitimate emails by forwarding to email assistant."""

    if analysis.spam_decision != "NotSpam":
        raise RuntimeError("This executor should only handle NotSpam messages.")

    email: Email = await ctx.get_shared_state(f"{EMAIL_STATE_PREFIX}{analysis.email_id}")
    await ctx.send_message(
        AgentExecutorRequest(messages=[ChatMessage(Role.USER, text=email.email_content)], should_respond=True)
    )

@executor(id="finalize_and_send")
async def finalize_and_send(response: AgentExecutorResponse, ctx: WorkflowContext[Never, str]) -> None:
    """Final step for email assistant branch."""

    parsed = EmailResponse.model_validate_json(response.agent_run_response.text)
    await ctx.yield_output(f"Email sent: {parsed.response}")

@executor(id="summarize_email")
async def summarize_email(analysis: AnalysisResult, ctx: WorkflowContext[AgentExecutorRequest]) -> None:
    """Generate summary for long emails (parallel branch)."""

    # Only called for long NotSpam emails by selection function
    email: Email = await ctx.get_shared_state(f"{EMAIL_STATE_PREFIX}{analysis.email_id}")
    await ctx.send_message(
        AgentExecutorRequest(messages=[ChatMessage(Role.USER, text=email.email_content)], should_respond=True)
    )

@executor(id="merge_summary")
async def merge_summary(response: AgentExecutorResponse, ctx: WorkflowContext[AnalysisResult]) -> None:
    """Merge summary back into analysis result for database persistence."""

    summary = EmailSummaryModel.model_validate_json(response.agent_run_response.text)
    email_id: str = await ctx.get_shared_state(CURRENT_EMAIL_ID_KEY)
    email: Email = await ctx.get_shared_state(f"{EMAIL_STATE_PREFIX}{email_id}")

    # Create analysis result with summary for database storage
    await ctx.send_message(
        AnalysisResult(
            spam_decision="NotSpam",
            reason="",
            email_length=len(email.email_content),
            email_summary=summary.summary,  # Now includes summary
            email_id=email_id,
        )
    )

@executor(id="handle_spam")
async def handle_spam(analysis: AnalysisResult, ctx: WorkflowContext[Never, str]) -> None:
    """Handle spam emails (single target like switch-case)."""

    if analysis.spam_decision == "Spam":
        await ctx.yield_output(f"Email marked as spam: {analysis.reason}")
    else:
        raise RuntimeError("This executor should only handle Spam messages.")

@executor(id="handle_uncertain")
async def handle_uncertain(analysis: AnalysisResult, ctx: WorkflowContext[Never, str]) -> None:
    """Handle uncertain emails (single target like switch-case)."""

    if analysis.spam_decision == "Uncertain":
        email: Email | None = await ctx.get_shared_state(f"{EMAIL_STATE_PREFIX}{analysis.email_id}")
        await ctx.yield_output(
            f"Email marked as uncertain: {analysis.reason}. Email content: {getattr(email, 'email_content', '')}"
        )
    else:
        raise RuntimeError("This executor should only handle Uncertain messages.")

@executor(id="database_access")
async def database_access(analysis: AnalysisResult, ctx: WorkflowContext[Never, str]) -> None:
    """Simulate database persistence with custom events."""

    await asyncio.sleep(0.05)  # Simulate DB operation
    await ctx.add_event(DatabaseEvent(f"Email {analysis.email_id} saved to database."))

Ulepszoni agenci sztucznej inteligencji

Utwórz agentów na potrzeby analizy, pomocy i podsumowania:

async def main() -> None:
    chat_client = AzureOpenAIChatClient(credential=AzureCliCredential())

    # Enhanced analysis agent
    email_analysis_agent = AgentExecutor(
        chat_client.create_agent(
            instructions=(
                "You are a spam detection assistant that identifies spam emails. "
                "Always return JSON with fields 'spam_decision' (one of NotSpam, Spam, Uncertain) "
                "and 'reason' (string)."
            ),
            response_format=AnalysisResultAgent,
        ),
        id="email_analysis_agent",
    )

    # Email assistant (same as before)
    email_assistant_agent = AgentExecutor(
        chat_client.create_agent(
            instructions=(
                "You are an email assistant that helps users draft responses to emails with professionalism."
            ),
            response_format=EmailResponse,
        ),
        id="email_assistant_agent",
    )

    # New: Email summary agent for long emails
    email_summary_agent = AgentExecutor(
        chat_client.create_agent(
            instructions="You are an assistant that helps users summarize emails.",
            response_format=EmailSummaryModel,
        ),
        id="email_summary_agent",
    )

Tworzenie przepływu pracy z możliwością wyboru wielu opcji

Skonstruuj przepływ pracy z zaawansowanym routingiem i przetwarzaniem równoległym:

    workflow = (
        WorkflowBuilder()
        .set_start_executor(store_email)
        .add_edge(store_email, email_analysis_agent)
        .add_edge(email_analysis_agent, to_analysis_result)

        # Multi-selection edge group: intelligent fan-out based on content
        .add_multi_selection_edge_group(
            to_analysis_result,
            [handle_spam, submit_to_email_assistant, summarize_email, handle_uncertain],
            selection_func=select_targets,
        )

        # Email assistant branch (always for NotSpam)
        .add_edge(submit_to_email_assistant, email_assistant_agent)
        .add_edge(email_assistant_agent, finalize_and_send)

        # Summary branch (only for long NotSpam emails)
        .add_edge(summarize_email, email_summary_agent)
        .add_edge(email_summary_agent, merge_summary)

        # Database persistence: conditional routing
        .add_edge(to_analysis_result, database_access,
                 condition=lambda r: r.email_length <= LONG_EMAIL_THRESHOLD)  # Short emails
        .add_edge(merge_summary, database_access)  # Long emails with summary

        .build()
    )

Wykonywanie przy użyciu strumieniowania zdarzeń

Uruchom proces roboczy i obserwuj równoległe wykonywanie za pomocą zdarzeń niestandardowych.

    # Use a moderately long email to trigger both assistant and summarizer
    email = """
    Hello team, here are the updates for this week:

    1. Project Alpha is on track and we should have the first milestone completed by Friday.
    2. The client presentation has been scheduled for next Tuesday at 2 PM.
    3. Please review the Q4 budget allocation and provide feedback by Wednesday.

    Let me know if you have any questions or concerns.

    Best regards,
    Alex
    """

    # Stream events to see parallel execution
    async for event in workflow.run_stream(email):
        if isinstance(event, DatabaseEvent):
            print(f"Database: {event}")
        elif isinstance(event, WorkflowOutputEvent):
            print(f"Output: {event.data}")

Wybór wielokrotny a porównanie Switch-Case

Wzorzec Switch-Case (poprzedni):

# One input → exactly one output
.add_switch_case_edge_group(
    source,
    [
        Case(condition=lambda x: x.result == "A", target=handler_a),
        Case(condition=lambda x: x.result == "B", target=handler_b),
        Default(target=handler_c),
    ],
)

Wzorzec wyboru wielokrotnego:

# One input → one or more outputs (dynamic fan-out)
.add_multi_selection_edge_group(
    source,
    [handler_a, handler_b, handler_c, handler_d],
    selection_func=intelligent_router,  # Returns list of target IDs
)

Korzyści z wielokrotnego wyboru języka C#

  1. Przetwarzanie równoległe: wiele gałęzi może być wykonywanych jednocześnie
  2. Warunkowe rozgałęzienie: liczba celów różni się w zależności od zawartości
  3. Routing obsługujący zawartość: decyzje oparte na właściwościach komunikatów, a nie tylko typ
  4. Efektywne użycie zasobów: aktywowane są tylko niezbędne gałęzie
  5. Złożona logika biznesowa: obsługuje zaawansowane scenariusze routingu

Praktyczne zastosowania języka C#

  • Systemy poczty e-mail: Ścieżka do asystenta odpowiedzi + archiwizacja + analityka (warunkowo)
  • Przetwarzanie zawartości: wyzwalanie transkrypcji + tłumaczenie i analiza (na podstawie typu zawartości)
  • Przetwarzanie zamówień: przekierowanie do realizacji, rozliczenia, powiadomień (na podstawie cech zamówienia)
  • Potoki danych: wyzwalanie różnych przepływów analitycznych na podstawie cech danych

Przykładowy kod wielokrotnego wyboru

Aby uzyskać pełną działającą implementację, zobacz przykład multi_selection_edge_group.py w repozytorium Platformy agentów.

Dalsze kroki