Ескертпе
Бұл бетке кіру үшін қатынас шегін айқындау қажет. Жүйеге кіруді немесе каталогтарды өзгертуді байқап көруге болады.
Бұл бетке кіру үшін қатынас шегін айқындау қажет. Каталогтарды өзгертуді байқап көруге болады.
Ребра определяют, как сообщения пересекают поток между исполнителями в рабочем процессе. Они представляют подключения в графе рабочего процесса и определяют пути потока данных. Границы могут включать условия для контроля маршрутизации на основе содержимого сообщения.
Типы вычислений на периферии
Платформа поддерживает несколько пограничных шаблонов:
| Тип | Description | Сценарий использования |
|---|---|---|
| Прямой | Простые подключения типа "один к одному" | Линейные конвейеры |
| Условный | Границы с условиями, определяющие, когда сообщения текут | Двоичная маршрутизация (if/else) |
| Switch-Case | Маршрутизация к разным исполнителям в зависимости от условий | Маршрутизация с несколькими ветвями |
| Multi-Selection (Fan-out) | Один исполнитель отправляет сообщения нескольким целевым объектам | Параллельная обработка |
| Фанин | Несколько исполнителей, отправляющих в одну целевую систему | Aggregation |
Прямые края
Простейшая форма — подключение двух исполнителей без условий:
WorkflowBuilder builder = new(sourceExecutor);
builder.AddEdge(sourceExecutor, targetExecutor);
builder = WorkflowBuilder(start_executor=source_executor)
builder.add_edge(source_executor, target_executor)
workflow = builder.build()
Фан-in Edges
Сбор сообщений из нескольких источников в один целевой объект:
builder.AddFanInBarrierEdge(sources: [ worker1, worker2, worker3 ], target: aggregatorExecutor);
builder.add_fan_in_edge([worker1, worker2, worker3], aggregator_executor)
В следующих разделах приведены подробные руководства по условным, коммутаторам и ребрам с несколькими выборами.
Условные края
Условные края позволяют рабочему процессу принимать решения о маршрутизации на основе содержимого или свойств сообщений, передаваемых через рабочий процесс. Это позволяет динамическое ветвление, где используются разные пути выполнения в зависимости от условий времени выполнения.
Что вы будете создавать
Вы создадите рабочий процесс обработки электронной почты, демонстрирующий условную маршрутизацию:
- Агент обнаружения нежелательной почты, который анализирует входящие сообщения электронной почты и возвращает структурированный JSON.
- Условные границы, которые маршрутизируют электронные письма к различным обработчикам в зависимости от классификации.
- Авторитетный менеджер электронной почты, который создаёт профессиональные ответы.
- Обработчик нежелательной почты, который помечает подозрительные сообщения электронной почты.
- Управление общим состоянием для сохранения данных электронной почты между этапами рабочего процесса.
Основные понятия, описанные в статье
Предпосылки
- Пакет SDK для .NET 8.0 или более поздней версии.
- Конечная точка и развертывание службы Azure OpenAI настроены.
- Azure CLI установлен и прошел проверку подлинности (для проверки подлинности учетных данных Azure).
- Базовое понимание программирования C# и асинхронного программирования.
- Новое консольное приложение.
Установка пакетов Nuget
Сначала установите необходимые пакеты для проекта .NET:
dotnet add package Azure.AI.Projects --prerelease
dotnet add package Azure.Identity
dotnet add package Microsoft.Agents.AI.Workflows --prerelease
dotnet add package Microsoft.Agents.AI.Foundry --prerelease
Определение моделей данных
Начните с определения структур данных, которые будут проходить через рабочий процесс:
using System.Text.Json.Serialization;
/// <summary>
/// Represents the result of spam detection.
/// </summary>
public sealed class DetectionResult
{
[JsonPropertyName("is_spam")]
public bool IsSpam { get; set; }
[JsonPropertyName("reason")]
public string Reason { get; set; } = string.Empty;
// Email ID is generated by the executor, not the agent
[JsonIgnore]
public string EmailId { get; set; } = string.Empty;
}
/// <summary>
/// Represents an email.
/// </summary>
internal sealed class Email
{
[JsonPropertyName("email_id")]
public string EmailId { get; set; } = string.Empty;
[JsonPropertyName("email_content")]
public string EmailContent { get; set; } = string.Empty;
}
/// <summary>
/// Represents the response from the email assistant.
/// </summary>
public sealed class EmailResponse
{
[JsonPropertyName("response")]
public string Response { get; set; } = string.Empty;
}
/// <summary>
/// Constants for shared state scopes.
/// </summary>
internal static class EmailStateConstants
{
public const string EmailStateScope = "EmailState";
}
Создание условных функций
Функция условия оценивает результат обнаружения нежелательной почты, чтобы определить, какой путь должен пройти рабочий процесс:
/// <summary>
/// Creates a condition for routing messages based on the expected spam detection result.
/// </summary>
/// <param name="expectedResult">The expected spam detection result</param>
/// <returns>A function that evaluates whether a message meets the expected result</returns>
private static Func<object?, bool> GetCondition(bool expectedResult) =>
detectionResult => detectionResult is DetectionResult result && result.IsSpam == expectedResult;
Эта функция условия:
-
bool expectedResultПринимает параметр (true для спама, false для не-спама) - Возвращает функцию, которая может использоваться в качестве пограничного условия
- Безопасно проверяет, является ли сообщение типом
DetectionResult, и сравнивает свойствоIsSpam.
Создание агентов ИИ
Настройте агенты ИИ, которые будут обрабатывать обнаружение нежелательной почты и помощь по электронной почте:
using Azure.AI.Projects;
using Azure.Identity;
using Microsoft.Agents.AI;
using Microsoft.Extensions.AI;
/// <summary>
/// Creates a spam detection agent.
/// </summary>
/// <returns>A ChatClientAgent configured for spam detection</returns>
private static ChatClientAgent GetSpamDetectionAgent(IChatClient chatClient) =>
new(chatClient, new ChatClientAgentOptions
{
ChatOptions = new()
{
Instructions = "You are a spam detection assistant that identifies spam emails.",
ResponseFormat = ChatResponseFormat.ForJsonSchema(AIJsonUtilities.CreateJsonSchema(typeof(DetectionResult)))
}
});
/// <summary>
/// Creates an email assistant agent.
/// </summary>
/// <returns>A ChatClientAgent configured for email assistance</returns>
private static ChatClientAgent GetEmailAssistantAgent(IChatClient chatClient) =>
new(chatClient, new ChatClientAgentOptions
{
ChatOptions = new()
{
Instructions = "You are an email assistant that helps users draft professional responses to emails.",
ResponseFormat = ChatResponseFormat.ForJsonSchema(AIJsonUtilities.CreateJsonSchema(typeof(EmailResponse)))
}
});
Реализация пула исполнителей
Создайте исполнителей рабочего процесса, обрабатывающих различные этапы обработки электронной почты:
using Microsoft.Agents.AI.Workflows;
using System.Text.Json;
/// <summary>
/// Executor that detects spam using an AI agent.
/// </summary>
internal sealed partial class SpamDetectionExecutor : Executor
{
private readonly AIAgent _spamDetectionAgent;
public SpamDetectionExecutor(AIAgent spamDetectionAgent) : base("SpamDetectionExecutor")
{
this._spamDetectionAgent = spamDetectionAgent;
}
[MessageHandler]
private async ValueTask<DetectionResult> HandleAsync(ChatMessage message, IWorkflowContext context, CancellationToken cancellationToken = default)
{
// Generate a random email ID and store the email content to shared state
var newEmail = new Email
{
EmailId = Guid.NewGuid().ToString("N"),
EmailContent = message.Text
};
await context.QueueStateUpdateAsync(newEmail.EmailId, newEmail, scopeName: EmailStateConstants.EmailStateScope);
// Invoke the agent for spam detection
var response = await this._spamDetectionAgent.RunAsync(message);
var detectionResult = JsonSerializer.Deserialize<DetectionResult>(response.Text);
detectionResult!.EmailId = newEmail.EmailId;
return detectionResult;
}
}
/// <summary>
/// Executor that assists with email responses using an AI agent.
/// </summary>
internal sealed partial class EmailAssistantExecutor : Executor
{
private readonly AIAgent _emailAssistantAgent;
public EmailAssistantExecutor(AIAgent emailAssistantAgent) : base("EmailAssistantExecutor")
{
this._emailAssistantAgent = emailAssistantAgent;
}
[MessageHandler]
private async ValueTask<EmailResponse> HandleAsync(DetectionResult message, IWorkflowContext context, CancellationToken cancellationToken = default)
{
if (message.IsSpam)
{
throw new ArgumentException("This executor should only handle non-spam messages.");
}
// Retrieve the email content from shared state
var email = await context.ReadStateAsync<Email>(message.EmailId, scopeName: EmailStateConstants.EmailStateScope)
?? throw new InvalidOperationException("Email not found.");
// Invoke the agent to draft a response
var response = await this._emailAssistantAgent.RunAsync(email.EmailContent);
var emailResponse = JsonSerializer.Deserialize<EmailResponse>(response.Text);
return emailResponse!;
}
}
/// <summary>
/// Executor that sends emails.
/// </summary>
internal sealed partial class SendEmailExecutor : Executor
{
public SendEmailExecutor() : base("SendEmailExecutor") { }
[MessageHandler]
private async ValueTask HandleAsync(EmailResponse message, IWorkflowContext context, CancellationToken cancellationToken = default) =>
await context.YieldOutputAsync($"Email sent: {message.Response}");
}
/// <summary>
/// Executor that handles spam messages.
/// </summary>
internal sealed partial class HandleSpamExecutor : Executor
{
public HandleSpamExecutor() : base("HandleSpamExecutor") { }
[MessageHandler]
private async ValueTask HandleAsync(DetectionResult message, IWorkflowContext context, CancellationToken cancellationToken = default)
{
if (message.IsSpam)
{
await context.YieldOutputAsync($"Email marked as spam: {message.Reason}");
}
else
{
throw new ArgumentException("This executor should only handle spam messages.");
}
}
}
Создание рабочего процесса с условными ребрами
Теперь создайте основную программу, которая создает и выполняет рабочий процесс:
using Microsoft.Extensions.AI;
public static class Program
{
private static async Task Main()
{
// Set up the Azure OpenAI client
var endpoint = Environment.GetEnvironmentVariable("AZURE_OPENAI_ENDPOINT")
?? throw new Exception("AZURE_OPENAI_ENDPOINT is not set.");
var deploymentName = Environment.GetEnvironmentVariable("AZURE_OPENAI_DEPLOYMENT_NAME") ?? "gpt-4o-mini";
var chatClient = new AIProjectClient(new Uri(endpoint), new DefaultAzureCredential())
.GetProjectOpenAIClient().GetProjectResponsesClient().AsIChatClient(deploymentName);
// Create agents
AIAgent spamDetectionAgent = GetSpamDetectionAgent(chatClient);
AIAgent emailAssistantAgent = GetEmailAssistantAgent(chatClient);
// Create executors
var spamDetectionExecutor = new SpamDetectionExecutor(spamDetectionAgent);
var emailAssistantExecutor = new EmailAssistantExecutor(emailAssistantAgent);
var sendEmailExecutor = new SendEmailExecutor();
var handleSpamExecutor = new HandleSpamExecutor();
// Build the workflow with conditional edges
var workflow = new WorkflowBuilder(spamDetectionExecutor)
// Non-spam path: route to email assistant when IsSpam = false
.AddEdge(spamDetectionExecutor, emailAssistantExecutor, condition: GetCondition(expectedResult: false))
.AddEdge(emailAssistantExecutor, sendEmailExecutor)
// Spam path: route to spam handler when IsSpam = true
.AddEdge(spamDetectionExecutor, handleSpamExecutor, condition: GetCondition(expectedResult: true))
.WithOutputFrom(handleSpamExecutor, sendEmailExecutor)
.Build();
// Execute the workflow with sample spam email
string emailContent = "Congratulations! You've won $1,000,000! Click here to claim your prize now!";
StreamingRun run = await InProcessExecution.RunStreamingAsync(workflow, new ChatMessage(ChatRole.User, emailContent));
await run.TrySendMessageAsync(new TurnToken(emitEvents: true));
await foreach (WorkflowEvent evt in run.WatchStreamAsync().ConfigureAwait(false))
{
if (evt is WorkflowOutputEvent outputEvent)
{
Console.WriteLine($"{outputEvent}");
}
}
}
}
Предупреждение
DefaultAzureCredential удобно для разработки, но требует тщательного рассмотрения в рабочей среде. В рабочей среде рекомендуется использовать определенные учетные данные (например, ManagedIdentityCredential), чтобы избежать проблем с задержкой, непреднамеренной проверки данных аутентификации и потенциальных рисков безопасности из-за резервных механизмов.
Принцип работы
Запись рабочего процесса: рабочий процесс начинается с получения
spamDetectionExecutor.Анализ спама: агент обнаружения спама анализирует электронную почту и возвращает структурированное
DetectionResult, содержащее свойстваIsSpamиReason.Условная маршрутизация: на основе
IsSpamзначения:-
Если спам (
IsSpam = true): маршрутизация вHandleSpamExecutorс использованиемGetCondition(true) -
Если законный (
IsSpam = false): маршруты кEmailAssistantExecutorс использованиемGetCondition(false)
-
Если спам (
Создание ответов: для авторизованных писем помощник по электронной почте создаёт профессиональный ответ.
Окончательные выходные данные: рабочий процесс выдает уведомление о нежелательной почте или отправляет черновик ответа электронной почты.
Ключевые функции условных ребер
Type-Safe условия: метод
GetConditionсоздает повторно используемые функции условий, которые безопасно оценивают содержимое сообщения.Несколько путей: Один исполнитель может иметь несколько исходящих связей с различными условиями, что позволяет выполнять сложную логику ветвления.
Общее состояние: данные электронной почты сохраняются в разных исполнителях с помощью управления состоянием с ограниченной областью, что позволяет подчиненным исполнителям получать доступ к исходному содержимому.
Обработка ошибок: исполнители проверяют входные данные и создают значимые исключения при получении непредвиденных типов сообщений.
Чистая архитектура: каждый исполнитель несет отдельную ответственность, что позволяет поддерживать и тестировать рабочий процесс.
Запуск примера
При запуске этого рабочего процесса с образцом спам-письма:
Email marked as spam: This email contains common spam indicators including monetary prizes, urgency tactics, and suspicious links that are typical of phishing attempts.
Попробуйте изменить содержимое электронной почты на что-то законное:
string emailContent = "Hi, I wanted to follow up on our meeting yesterday and get your thoughts on the project proposal.";
Рабочий процесс перенаправит помощника по электронной почте и создаст профессиональный ответ.
Этот шаблон условной маршрутизации формирует основу для создания сложных рабочих процессов, которые могут обрабатывать сложные деревья принятия решений и бизнес-логику.
Полная реализация
Полный рабочий процесс см. в этом примере в репозитории Agent Framework.
Что вы будете создавать
Вы создадите рабочий процесс обработки электронной почты, демонстрирующий условную маршрутизацию:
- Агент обнаружения нежелательной почты, который анализирует входящие сообщения электронной почты
- Условные края, которые направляют сообщения электронной почты различным обработчикам на основе классификации
- Надежный обработчик писем, который составляет профессиональные ответы
- Обработчик нежелательной почты, который помечает подозрительные сообщения электронной почты
Основные понятия, описанные в статье
Предпосылки
- Python 3.10 или более поздней версии
- Установленная платформа агента:
pip install agent-framework-core - Служба Azure OpenAI, настроенная с соответствующими переменными среды
- Проверка подлинности Azure CLI:
az login
Шаг 1. Импорт требуемых зависимостей
Начните с импорта необходимых компонентов для условных рабочих процессов:
import asyncio
import os
from dataclasses import dataclass
from typing import Any, Literal
from uuid import uuid4
from typing_extensions import Never
from agent_framework import (
AgentExecutor,
AgentExecutorRequest,
AgentExecutorResponse,
Message,
WorkflowBuilder,
WorkflowContext,
executor,
Case,
Default,
)
import os
from agent_framework.openai import OpenAIChatCompletionClient
from azure.identity import AzureCliCredential
from pydantic import BaseModel
Шаг 2. Определение моделей данных
Создание моделей Pydantic для структурированного обмена данными между компонентами рабочего процесса:
class DetectionResult(BaseModel):
"""Represents the result of spam detection."""
# is_spam drives the routing decision taken by edge conditions
is_spam: bool
# Human readable rationale from the detector
reason: str
# The agent must include the original email so downstream agents can operate without reloading content
email_content: str
class EmailResponse(BaseModel):
"""Represents the response from the email assistant."""
# The drafted reply that a user could copy or send
response: str
Шаг 3. Создание функций условий
Определите функции условий, которые определяют решения о маршрутизации:
def get_condition(expected_result: bool):
"""Create a condition callable that routes based on DetectionResult.is_spam."""
# The returned function will be used as an edge predicate.
# It receives whatever the upstream executor produced.
def condition(message: Any) -> bool:
# Defensive guard. If a non AgentExecutorResponse appears, let the edge pass to avoid dead ends.
if not isinstance(message, AgentExecutorResponse):
return True
try:
# Prefer parsing a structured DetectionResult from the agent JSON text.
# Using model_validate_json ensures type safety and raises if the shape is wrong.
detection = DetectionResult.model_validate_json(message.agent_run_response.text)
# Route only when the spam flag matches the expected path.
return detection.is_spam == expected_result
except Exception:
# Fail closed on parse errors so we do not accidentally route to the wrong path.
# Returning False prevents this edge from activating.
return False
return condition
Шаг 4. Создание обработчиков-исполнителей
Определите исполнителей для обработки различных результатов маршрутизации:
@executor(id="send_email")
async def handle_email_response(response: AgentExecutorResponse, ctx: WorkflowContext[Never, str]) -> None:
"""Handle legitimate emails by drafting a professional response."""
# Downstream of the email assistant. Parse a validated EmailResponse and yield the workflow output.
email_response = EmailResponse.model_validate_json(response.agent_run_response.text)
await ctx.yield_output(f"Email sent:\n{email_response.response}")
@executor(id="handle_spam")
async def handle_spam_classifier_response(response: AgentExecutorResponse, ctx: WorkflowContext[Never, str]) -> None:
"""Handle spam emails by marking them appropriately."""
# Spam path. Confirm the DetectionResult and yield the workflow output. Guard against accidental non spam input.
detection = DetectionResult.model_validate_json(response.agent_run_response.text)
if detection.is_spam:
await ctx.yield_output(f"Email marked as spam: {detection.reason}")
else:
# This indicates the routing predicate and executor contract are out of sync.
raise RuntimeError("This executor should only handle spam messages.")
@executor(id="to_email_assistant_request")
async def to_email_assistant_request(
response: AgentExecutorResponse, ctx: WorkflowContext[AgentExecutorRequest]
) -> None:
"""Transform spam detection response into a request for the email assistant."""
# Parse the detection result and extract the email content for the assistant
detection = DetectionResult.model_validate_json(response.agent_run_response.text)
# Create a new request for the email assistant with the original email content
request = AgentExecutorRequest(
messages=[Message(role="user", contents=[detection.email_content])],
should_respond=True
)
await ctx.send_message(request)
Шаг 5. Создание агентов ИИ
Настройте агенты Azure OpenAI со структурированным форматированием выходных данных:
async def main() -> None:
# Create agents
# AzureCliCredential uses your current az login. This avoids embedding secrets in code.
chat_client = OpenAIChatCompletionClient(
model=os.environ["AZURE_OPENAI_CHAT_COMPLETION_MODEL"],
azure_endpoint=os.environ["AZURE_OPENAI_ENDPOINT"],
api_version=os.getenv("AZURE_OPENAI_API_VERSION"),
credential=AzureCliCredential(),
)
# Agent 1. Classifies spam and returns a DetectionResult object.
# response_format enforces that the LLM returns parsable JSON for the Pydantic model.
spam_detection_agent = AgentExecutor(
chat_client.as_agent(
instructions=(
"You are a spam detection assistant that identifies spam emails. "
"Always return JSON with fields is_spam (bool), reason (string), and email_content (string). "
"Include the original email content in email_content."
),
response_format=DetectionResult,
),
id="spam_detection_agent",
)
# Agent 2. Drafts a professional reply. Also uses structured JSON output for reliability.
email_assistant_agent = AgentExecutor(
chat_client.as_agent(
instructions=(
"You are an email assistant that helps users draft professional responses to emails. "
"Your input might be a JSON object that includes 'email_content'; base your reply on that content. "
"Return JSON with a single field 'response' containing the drafted reply."
),
response_format=EmailResponse,
),
id="email_assistant_agent",
)
Шаг 6. Создание условного рабочего процесса
Создайте рабочий процесс с условными краями, которые маршрутируют на основе результатов обнаружения нежелательной почты:
# Build the workflow graph.
# Start at the spam detector.
# If not spam, hop to a transformer that creates a new AgentExecutorRequest,
# then call the email assistant, then finalize.
# If spam, go directly to the spam handler and finalize.
workflow = (
WorkflowBuilder(start_executor=spam_detection_agent)
# Not spam path: transform response -> request for assistant -> assistant -> send email
.add_edge(spam_detection_agent, to_email_assistant_request, condition=get_condition(False))
.add_edge(to_email_assistant_request, email_assistant_agent)
.add_edge(email_assistant_agent, handle_email_response)
# Spam path: send to spam handler
.add_edge(spam_detection_agent, handle_spam_classifier_response, condition=get_condition(True))
.build()
)
Шаг 7. Выполнение рабочего процесса
Запустите рабочий процесс с примером содержимого электронной почты:
# Read Email content from the sample resource file.
# This keeps the sample deterministic since the model sees the same email every run.
email_path = os.path.join(os.path.dirname(os.path.dirname(os.path.realpath(__file__))), "resources", "email.txt")
with open(email_path) as email_file: # noqa: ASYNC230
email = email_file.read()
# Execute the workflow. Since the start is an AgentExecutor, pass an AgentExecutorRequest.
# The workflow completes when it becomes idle (no more work to do).
request = AgentExecutorRequest(messages=[Message(role="user", contents=[email])], should_respond=True)
events = await workflow.run(request)
outputs = events.get_outputs()
if outputs:
print(f"Workflow output: {outputs[0]}")
if __name__ == "__main__":
asyncio.run(main())
Как работают условные ребра
Функции условий: Функция
get_condition()создает предикат, который проверяет содержимое сообщения и возвращаетTrueилиFalse, чтобы определить, следует ли перейти через грань.Проверка сообщений. Условия могут проверять любой аспект сообщения, включая структурированные данные из ответов агента, проанализированных с помощью моделей Pydantic.
Оборонительное программирование. Функция условия включает обработку ошибок, чтобы предотвратить сбои маршрутизации при анализе структурированных данных.
Динамическая маршрутизация. В зависимости от результата обнаружения спама сообщения электронной почты автоматически направляются в помощник по электронной почте (для допустимых сообщений электронной почты) или обработчик нежелательной почты (для подозрительных сообщений электронной почты).
Основные понятия
- Условия на ребрах: логические предикаты, определяющие, следует ли проводить ребро.
-
Структурированные выходные данные: использование моделей Pydantic с
response_formatгарантирует надежный анализ данных - Защитная маршрутизация: функции условий предназначены для обработки пограничных случаев, чтобы предотвратить тупики рабочего процесса
- Преобразование сообщений: исполнителям можно преобразовать типы сообщений между шагами рабочего процесса
Полная реализация
Полную рабочую реализацию см. в примере edge_condition.py из репозитория Agent Framework.
Границы Switch-Case
Создание условных границ
В предыдущем примере условных ребер показана двусторонняя маршрутизация (спам и законные сообщения электронной почты). Однако для многих реальных сценариев требуется более сложные деревья принятия решений. Ребра коммутатора обеспечивают более чистое, более поддерживаемое решение, если необходимо перенаправить в несколько направлений на основе различных условий.
Что вы создадите с помощью Switch-Case
Вы расширите рабочий процесс обработки электронной почты для обработки трех путей принятия решений:
- Помощник по электронной почте NotSpam → → отправить сообщение электронной почты
- Обработчик спама → Исполнитель по обработке спама
- Неопределенный → обработка неопределенного исполнителя (по умолчанию)
Ключевое улучшение заключается в использовании SwitchBuilder шаблона вместо нескольких отдельных условных ребер, что упрощает понимание и обслуживание рабочего процесса по мере роста сложности принятия решений.
Основные понятия, описанные в статье
Модели данных для Switch-Case
Обновите модели данных для поддержки трехфакторной классификации.
/// <summary>
/// Represents the possible decisions for spam detection.
/// </summary>
public enum SpamDecision
{
NotSpam,
Spam,
Uncertain
}
/// <summary>
/// Represents the result of spam detection with enhanced decision support.
/// </summary>
public sealed class DetectionResult
{
[JsonPropertyName("spam_decision")]
[JsonConverter(typeof(JsonStringEnumConverter))]
public SpamDecision spamDecision { get; set; }
[JsonPropertyName("reason")]
public string Reason { get; set; } = string.Empty;
// Email ID is generated by the executor, not the agent
[JsonIgnore]
public string EmailId { get; set; } = string.Empty;
}
/// <summary>
/// Represents an email stored in shared state.
/// </summary>
internal sealed class Email
{
[JsonPropertyName("email_id")]
public string EmailId { get; set; } = string.Empty;
[JsonPropertyName("email_content")]
public string EmailContent { get; set; } = string.Empty;
}
/// <summary>
/// Represents the response from the email assistant.
/// </summary>
public sealed class EmailResponse
{
[JsonPropertyName("response")]
public string Response { get; set; } = string.Empty;
}
/// <summary>
/// Constants for shared state scopes.
/// </summary>
internal static class EmailStateConstants
{
public const string EmailStateScope = "EmailState";
}
Фабрика условий для Switch-Case
Создайте многоразовую фабрику условий, которая создает предикаты для каждого решения о спаме:
/// <summary>
/// Creates a condition for routing messages based on the expected spam detection result.
/// </summary>
/// <param name="expectedDecision">The expected spam detection decision</param>
/// <returns>A function that evaluates whether a message meets the expected result</returns>
private static Func<object?, bool> GetCondition(SpamDecision expectedDecision) =>
detectionResult => detectionResult is DetectionResult result && result.spamDecision == expectedDecision;
Этот метод заводского подхода:
- Уменьшает дублирование кода: одна функция создает все предикаты условий
- Обеспечивает согласованность: все условия соответствуют одному и тому же шаблону.
- Упрощение обслуживания: изменения логики условий происходят в одном месте
Расширенный агент ИИ
Обновите агент обнаружения спама, чтобы он проявлял меньшую уверенность и возвращал классификации по трем типам.
/// <summary>
/// Creates a spam detection agent with enhanced uncertainty handling.
/// </summary>
/// <returns>A ChatClientAgent configured for three-way spam detection</returns>
private static ChatClientAgent GetSpamDetectionAgent(IChatClient chatClient) =>
new(chatClient, new ChatClientAgentOptions
{
ChatOptions = new()
{
Instructions = "You are a spam detection assistant that identifies spam emails. Be less confident in your assessments.",
ResponseFormat = ChatResponseFormat.ForJsonSchema<DetectionResult>()
}
});
/// <summary>
/// Creates an email assistant agent (unchanged from conditional edges example).
/// </summary>
/// <returns>A ChatClientAgent configured for email assistance</returns>
private static ChatClientAgent GetEmailAssistantAgent(IChatClient chatClient) =>
new(chatClient, new ChatClientAgentOptions
{
ChatOptions = new()
{
Instructions = "You are an email assistant that helps users draft responses to emails with professionalism.",
ResponseFormat = ChatResponseFormat.ForJsonSchema<EmailResponse>()
}
});
Исполнители рабочих процессов с расширенной маршрутизацией
Реализуйте исполнители, обрабатывающие трёхстороннюю маршрутизацию с управлением общим состоянием.
/// <summary>
/// Executor that detects spam using an AI agent with three-way classification.
/// </summary>
internal sealed partial class SpamDetectionExecutor : Executor
{
private readonly AIAgent _spamDetectionAgent;
public SpamDetectionExecutor(AIAgent spamDetectionAgent) : base("SpamDetectionExecutor")
{
this._spamDetectionAgent = spamDetectionAgent;
}
[MessageHandler]
private async ValueTask<DetectionResult> HandleAsync(ChatMessage message, IWorkflowContext context, CancellationToken cancellationToken = default)
{
// Generate a random email ID and store the email content in shared state
var newEmail = new Email
{
EmailId = Guid.NewGuid().ToString("N"),
EmailContent = message.Text
};
await context.QueueStateUpdateAsync(newEmail.EmailId, newEmail, scopeName: EmailStateConstants.EmailStateScope);
// Invoke the agent for enhanced spam detection
var response = await this._spamDetectionAgent.RunAsync(message);
var detectionResult = JsonSerializer.Deserialize<DetectionResult>(response.Text);
detectionResult!.EmailId = newEmail.EmailId;
return detectionResult;
}
}
/// <summary>
/// Executor that assists with email responses using an AI agent.
/// </summary>
internal sealed partial class EmailAssistantExecutor : Executor
{
private readonly AIAgent _emailAssistantAgent;
public EmailAssistantExecutor(AIAgent emailAssistantAgent) : base("EmailAssistantExecutor")
{
this._emailAssistantAgent = emailAssistantAgent;
}
[MessageHandler]
private async ValueTask<EmailResponse> HandleAsync(DetectionResult message, IWorkflowContext context, CancellationToken cancellationToken = default)
{
if (message.spamDecision == SpamDecision.Spam)
{
throw new ArgumentException("This executor should only handle non-spam messages.");
}
// Retrieve the email content from shared state
var email = await context.ReadStateAsync<Email>(message.EmailId, scopeName: EmailStateConstants.EmailStateScope);
// Invoke the agent to draft a response
var response = await this._emailAssistantAgent.RunAsync(email!.EmailContent);
var emailResponse = JsonSerializer.Deserialize<EmailResponse>(response.Text);
return emailResponse!;
}
}
/// <summary>
/// Executor that sends emails.
/// </summary>
internal sealed partial class SendEmailExecutor : Executor
{
public SendEmailExecutor() : base("SendEmailExecutor") { }
[MessageHandler]
private async ValueTask HandleAsync(EmailResponse message, IWorkflowContext context, CancellationToken cancellationToken = default) =>
await context.YieldOutputAsync($"Email sent: {message.Response}").ConfigureAwait(false);
}
/// <summary>
/// Executor that handles spam messages.
/// </summary>
internal sealed partial class HandleSpamExecutor : Executor
{
public HandleSpamExecutor() : base("HandleSpamExecutor") { }
[MessageHandler]
private async ValueTask HandleAsync(DetectionResult message, IWorkflowContext context, CancellationToken cancellationToken = default)
{
if (message.spamDecision == SpamDecision.Spam)
{
await context.YieldOutputAsync($"Email marked as spam: {message.Reason}").ConfigureAwait(false);
}
else
{
throw new ArgumentException("This executor should only handle spam messages.");
}
}
}
/// <summary>
/// Executor that handles uncertain emails requiring manual review.
/// </summary>
internal sealed partial class HandleUncertainExecutor : Executor
{
public HandleUncertainExecutor() : base("HandleUncertainExecutor") { }
[MessageHandler]
private async ValueTask HandleAsync(DetectionResult message, IWorkflowContext context, CancellationToken cancellationToken = default)
{
if (message.spamDecision == SpamDecision.Uncertain)
{
var email = await context.ReadStateAsync<Email>(message.EmailId, scopeName: EmailStateConstants.EmailStateScope);
await context.YieldOutputAsync($"Email marked as uncertain: {message.Reason}. Email content: {email?.EmailContent}");
}
else
{
throw new ArgumentException("This executor should only handle uncertain spam decisions.");
}
}
}
Создание рабочего процесса с помощью шаблона Switch-Case
Замените несколько условных ребер более чистым шаблоном switch-case:
public static class Program
{
private static async Task Main()
{
// Set up the Azure OpenAI client
var endpoint = Environment.GetEnvironmentVariable("AZURE_OPENAI_ENDPOINT") ?? throw new Exception("AZURE_OPENAI_ENDPOINT is not set.");
var deploymentName = Environment.GetEnvironmentVariable("AZURE_OPENAI_DEPLOYMENT_NAME") ?? "gpt-4o-mini";
var chatClient = new AIProjectClient(new Uri(endpoint), new DefaultAzureCredential())
.GetProjectOpenAIClient()
.GetProjectResponsesClient()
.AsIChatClient(deploymentName);
// Create agents
AIAgent spamDetectionAgent = GetSpamDetectionAgent(chatClient);
AIAgent emailAssistantAgent = GetEmailAssistantAgent(chatClient);
// Create executors
var spamDetectionExecutor = new SpamDetectionExecutor(spamDetectionAgent);
var emailAssistantExecutor = new EmailAssistantExecutor(emailAssistantAgent);
var sendEmailExecutor = new SendEmailExecutor();
var handleSpamExecutor = new HandleSpamExecutor();
var handleUncertainExecutor = new HandleUncertainExecutor();
// Build the workflow using switch-case for cleaner three-way routing
WorkflowBuilder builder = new(spamDetectionExecutor);
builder.AddSwitch(spamDetectionExecutor, switchBuilder =>
switchBuilder
.AddCase(
GetCondition(expectedDecision: SpamDecision.NotSpam),
emailAssistantExecutor
)
.AddCase(
GetCondition(expectedDecision: SpamDecision.Spam),
handleSpamExecutor
)
.WithDefault(
handleUncertainExecutor
)
)
// After the email assistant writes a response, it will be sent to the send email executor
.AddEdge(emailAssistantExecutor, sendEmailExecutor)
.WithOutputFrom(handleSpamExecutor, sendEmailExecutor, handleUncertainExecutor);
var workflow = builder.Build();
// Read an email from a text file (use ambiguous content for demonstration)
string email = Resources.Read("ambiguous_email.txt");
// Execute the workflow
StreamingRun run = await InProcessExecution.RunStreamingAsync(workflow, new ChatMessage(ChatRole.User, email));
await run.TrySendMessageAsync(new TurnToken(emitEvents: true));
await foreach (WorkflowEvent evt in run.WatchStreamAsync().ConfigureAwait(false))
{
if (evt is WorkflowOutputEvent outputEvent)
{
Console.WriteLine($"{outputEvent}");
}
}
}
}
преимущества Switch-Case
-
Чистый синтаксис:
SwitchBuilderпредоставляет более читаемую альтернативу множественным условным переходам. - Упорядоченная оценка: кейсы оцениваются последовательно, остановившись на первом совпадении
-
Гарантированная маршрутизация:
WithDefault()метод гарантирует, что сообщения никогда не застряли - Улучшенная поддержка. Добавление новых вариантов требует минимальных изменений в структуре рабочего процесса
- Безопасность типов: каждый исполнитель проверяет входные данные, чтобы заранее обнаружить ошибки маршрутизации
Сравнение шаблонов
До: (Условные ребра)
var workflow = new WorkflowBuilder(spamDetectionExecutor)
.AddEdge(spamDetectionExecutor, emailAssistantExecutor, condition: GetCondition(expectedResult: false))
.AddEdge(spamDetectionExecutor, handleSpamExecutor, condition: GetCondition(expectedResult: true))
// No clean way to handle a third case
.WithOutputFrom(handleSpamExecutor, sendEmailExecutor)
.Build();
После (Switch-Case):
WorkflowBuilder builder = new(spamDetectionExecutor);
builder.AddSwitch(spamDetectionExecutor, switchBuilder =>
switchBuilder
.AddCase(GetCondition(SpamDecision.NotSpam), emailAssistantExecutor)
.AddCase(GetCondition(SpamDecision.Spam), handleSpamExecutor)
.WithDefault(handleUncertainExecutor) // Clean default case
)
// Continue building the rest of the workflow
Шаблон switch-case масштабируется гораздо лучше по мере роста числа решений маршрутизации, а дефолтный случай предоставляет страховку для непредвиденных значений.
Запуск примера
При запуске этого рабочего процесса с неоднозначным содержимым электронной почты:
Email marked as uncertain: This email contains promotional language but might be from a legitimate business contact, requiring human review for proper classification.
Попробуйте изменить содержимое электронной почты на явно спамовое или очевидно допустимое, чтобы увидеть различные пути маршрутизации в действии.
Полная реализация
Полный рабочий процесс см. в этом примере в репозитории Agent Framework.
Создание условных границ
В предыдущем примере условных ребер показана двусторонняя маршрутизация (спам и законные сообщения электронной почты). Однако для многих реальных сценариев требуется более сложные деревья принятия решений. Ребра коммутатора обеспечивают более чистое, более поддерживаемое решение, если необходимо перенаправить в несколько направлений на основе различных условий.
Что вы создадите дальше
Вы расширите рабочий процесс обработки электронной почты для обработки трех путей принятия решений:
- Помощник по электронной почте NotSpam → → отправить сообщение электронной почты
- Спам → пометить как спам
- Неопределенный флаг → для проверки вручную (по умолчанию)
Ключевое улучшение заключается в использовании одной группы ребер коммутатора вместо нескольких отдельных условных ребер, что упрощает и поддерживает рабочий процесс по мере увеличения сложности принятия решений.
Основные понятия, описанные в статье
Расширенные модели данных
Обновите модели данных для поддержки трехфакторной классификации.
from typing import Literal
class DetectionResultAgent(BaseModel):
"""Structured output returned by the spam detection agent."""
# The agent classifies the email into one of three categories
spam_decision: Literal["NotSpam", "Spam", "Uncertain"]
reason: str
class EmailResponse(BaseModel):
"""Structured output returned by the email assistant agent."""
response: str
@dataclass
class DetectionResult:
"""Internal typed payload used for routing and downstream handling."""
spam_decision: str
reason: str
email_id: str
@dataclass
class Email:
"""In memory record of the email content stored in shared state."""
email_id: str
email_content: str
Фабрика условий Switch-Case
Создайте многоразовую фабрику условий, которая создает предикаты для каждого решения о спаме:
def get_case(expected_decision: str):
"""Factory that returns a predicate matching a specific spam_decision value."""
def condition(message: Any) -> bool:
# Only match when the upstream payload is a DetectionResult with the expected decision
return isinstance(message, DetectionResult) and message.spam_decision == expected_decision
return condition
Этот метод заводского подхода:
- Уменьшает дублирование кода: одна функция создает все предикаты условий
- Обеспечивает согласованность: все условия соответствуют одному и тому же шаблону.
- Упрощение обслуживания: изменения логики условий происходят в одном месте
Исполнители рабочего процесса с общим состоянием
Реализуйте исполнителей, которые используют общее состояние, чтобы избежать передачи большого содержимого электронной почты на каждом шаге рабочего процесса:
EMAIL_STATE_PREFIX = "email:"
CURRENT_EMAIL_ID_KEY = "current_email_id"
@executor(id="store_email")
async def store_email(email_text: str, ctx: WorkflowContext[AgentExecutorRequest]) -> None:
"""Store email content once and pass around a lightweight ID reference."""
# Persist the raw email content in shared state
new_email = Email(email_id=str(uuid4()), email_content=email_text)
ctx.set_state(f"{EMAIL_STATE_PREFIX}{new_email.email_id}", new_email)
ctx.set_state(CURRENT_EMAIL_ID_KEY, new_email.email_id)
# Forward email to spam detection agent
await ctx.send_message(
AgentExecutorRequest(messages=[Message(role="user", contents=[new_email.email_content])], should_respond=True)
)
@executor(id="to_detection_result")
async def to_detection_result(response: AgentExecutorResponse, ctx: WorkflowContext[DetectionResult]) -> None:
"""Transform agent response into a typed DetectionResult with email ID."""
# Parse the agent's structured JSON output
parsed = DetectionResultAgent.model_validate_json(response.agent_run_response.text)
email_id: str = ctx.get_state(CURRENT_EMAIL_ID_KEY)
# Create typed message for switch-case routing
await ctx.send_message(DetectionResult(
spam_decision=parsed.spam_decision,
reason=parsed.reason,
email_id=email_id
))
@executor(id="submit_to_email_assistant")
async def submit_to_email_assistant(detection: DetectionResult, ctx: WorkflowContext[AgentExecutorRequest]) -> None:
"""Handle NotSpam emails by forwarding to the email assistant."""
# Guard against misrouting
if detection.spam_decision != "NotSpam":
raise RuntimeError("This executor should only handle NotSpam messages.")
# Retrieve original email content from shared state
email: Email = ctx.get_state(f"{EMAIL_STATE_PREFIX}{detection.email_id}")
await ctx.send_message(
AgentExecutorRequest(messages=[Message(role="user", contents=[email.email_content])], should_respond=True)
)
@executor(id="finalize_and_send")
async def finalize_and_send(response: AgentExecutorResponse, ctx: WorkflowContext[Never, str]) -> None:
"""Parse email assistant response and yield final output."""
parsed = EmailResponse.model_validate_json(response.agent_run_response.text)
await ctx.yield_output(f"Email sent: {parsed.response}")
@executor(id="handle_spam")
async def handle_spam(detection: DetectionResult, ctx: WorkflowContext[Never, str]) -> None:
"""Handle confirmed spam emails."""
if detection.spam_decision == "Spam":
await ctx.yield_output(f"Email marked as spam: {detection.reason}")
else:
raise RuntimeError("This executor should only handle Spam messages.")
@executor(id="handle_uncertain")
async def handle_uncertain(detection: DetectionResult, ctx: WorkflowContext[Never, str]) -> None:
"""Handle uncertain classifications that need manual review."""
if detection.spam_decision == "Uncertain":
# Include original content for human review
email: Email | None = ctx.get_state(f"{EMAIL_STATE_PREFIX}{detection.email_id}")
await ctx.yield_output(
f"Email marked as uncertain: {detection.reason}. Email content: {getattr(email, 'email_content', '')}"
)
else:
raise RuntimeError("This executor should only handle Uncertain messages.")
Создание расширенного агента ИИ
Обновите агент обнаружения спама, чтобы он проявлял меньшую уверенность и возвращал классификации по трем типам.
async def main():
chat_client = OpenAIChatCompletionClient(
model=os.environ["AZURE_OPENAI_CHAT_COMPLETION_MODEL"],
azure_endpoint=os.environ["AZURE_OPENAI_ENDPOINT"],
api_version=os.getenv("AZURE_OPENAI_API_VERSION"),
credential=AzureCliCredential(),
)
# Enhanced spam detection agent with three-way classification
spam_detection_agent = AgentExecutor(
chat_client.as_agent(
instructions=(
"You are a spam detection assistant that identifies spam emails. "
"Be less confident in your assessments. "
"Always return JSON with fields 'spam_decision' (one of NotSpam, Spam, Uncertain) "
"and 'reason' (string)."
),
response_format=DetectionResultAgent,
),
id="spam_detection_agent",
)
# Email assistant remains the same
email_assistant_agent = AgentExecutor(
chat_client.as_agent(
instructions=(
"You are an email assistant that helps users draft responses to emails with professionalism."
),
response_format=EmailResponse,
),
id="email_assistant_agent",
)
Создание рабочего процесса с помощью группы Switch-Case Edge
Замените несколько условных ребер одной группой вариантов:
# Build workflow using switch-case for cleaner three-way routing
workflow = (
WorkflowBuilder(start_executor=store_email)
.add_edge(store_email, spam_detection_agent)
.add_edge(spam_detection_agent, to_detection_result)
.add_switch_case_edge_group(
to_detection_result,
[
# Explicit cases for specific decisions
Case(condition=get_case("NotSpam"), target=submit_to_email_assistant),
Case(condition=get_case("Spam"), target=handle_spam),
# Default case catches anything that doesn't match above
Default(target=handle_uncertain),
],
)
.add_edge(submit_to_email_assistant, email_assistant_agent)
.add_edge(email_assistant_agent, finalize_and_send)
.build()
)
Выполнение и тестирование
Запустите рабочий процесс с неоднозначным содержимым электронной почты, демонстрирующим трехстороннюю маршрутизацию.
# Use ambiguous email content that might trigger uncertain classification
email = (
"Hey there, I noticed you might be interested in our latest offer—no pressure, but it expires soon. "
"Let me know if you'd like more details."
)
# Execute and display results
events = await workflow.run(email)
outputs = events.get_outputs()
if outputs:
for output in outputs:
print(f"Workflow output: {output}")
Основные преимущества конструкции Switch-Case
- Более чистый синтаксис: одна граничная группа вместо нескольких условных ребер
- Упорядоченная оценка: кейсы оцениваются последовательно, остановившись на первом совпадении
- Гарантированная маршрутизация: в случае по умолчанию сообщения никогда не застряли
- Улучшенная поддержка. Добавление новых вариантов требует минимальных изменений
- Безопасность типов: каждый исполнитель проверяет входные данные для перехвата ошибок маршрутизации.
Сравнение: условное и Switch-Case
До: (Условные ребра)
.add_edge(detector, handler_a, condition=lambda x: x.result == "A")
.add_edge(detector, handler_b, condition=lambda x: x.result == "B")
.add_edge(detector, handler_c, condition=lambda x: x.result == "C")
После (Switch-Case):
.add_switch_case_edge_group(
detector,
[
Case(condition=lambda x: x.result == "A", target=handler_a),
Case(condition=lambda x: x.result == "B", target=handler_b),
Default(target=handler_c), # Catches everything else
],
)
Шаблон switch-case масштабируется гораздо лучше по мере роста числа решений маршрутизации, а дефолтный случай предоставляет страховку для непредвиденных значений.
пример кода Switch-Case
Полный рабочий процесс см. в примере switch_case_edge_group.py в репозитории Agent Framework.
Ребра с возможностью множественного выбора
За пределами switch-case: многоопционная маршрутизация
Хотя ребра коммутатора направляют сообщения в ровно одно место назначения, рабочие процессы реального мира часто должны активировать несколько параллельных операций на основе характеристик данных. Секционированные ребра (реализованы в виде ребер распределения выходных потоков с разделителями) позволяют создавать сложные шаблоны распределения, в которых одно сообщение может одновременно активировать несколько исполнителей нижнего уровня.
Расширенный рабочий процесс обработки электронной почты
На основе примера с переключателем, вы создадите расширенную систему обработки электронной почты, показывающую сложную логику маршрутизации.
- Спам электронной почты → единый обработчик нежелательной почты (например, switch-case)
Законные сообщения электронной почты →Помощник по электронной почте всегда срабатывает + Условный триггер сводных данных для длинных сообщений электронной почты- Неопределенные сообщения электронной почты → единый неопределенный обработчик (например, switch-case)
- Сохраняемость базы данных → активирована как для коротких сообщений электронной почты, так и для суммированных длинных сообщений электронной почты
Этот шаблон позволяет параллельным конвейерам обработки адаптироваться к характеристикам содержимого.
Основные понятия, описанные в статье
Модели данных для множественного выбора
Расширьте модели данных для поддержки анализа длины и суммирования электронной почты:
/// <summary>
/// Represents the result of enhanced email analysis with additional metadata.
/// </summary>
public sealed class AnalysisResult
{
[JsonPropertyName("spam_decision")]
[JsonConverter(typeof(JsonStringEnumConverter))]
public SpamDecision spamDecision { get; set; }
[JsonPropertyName("reason")]
public string Reason { get; set; } = string.Empty;
// Additional properties for sophisticated routing
[JsonIgnore]
public int EmailLength { get; set; }
[JsonIgnore]
public string EmailSummary { get; set; } = string.Empty;
[JsonIgnore]
public string EmailId { get; set; } = string.Empty;
}
/// <summary>
/// Represents the response from the email assistant.
/// </summary>
public sealed class EmailResponse
{
[JsonPropertyName("response")]
public string Response { get; set; } = string.Empty;
}
/// <summary>
/// Represents the response from the email summary agent.
/// </summary>
public sealed class EmailSummary
{
[JsonPropertyName("summary")]
public string Summary { get; set; } = string.Empty;
}
/// <summary>
/// A custom workflow event for database operations.
/// </summary>
internal sealed class DatabaseEvent(string message) : WorkflowEvent(message) { }
/// <summary>
/// Constants for email processing thresholds.
/// </summary>
public static class EmailProcessingConstants
{
public const int LongEmailThreshold = 100;
}
Функция назначения цели: сердце многократного выбора
Функция целевого назначателя определяет, какие исполнители должны получать каждое сообщение:
/// <summary>
/// Creates a target assigner for routing messages based on the analysis result.
/// </summary>
/// <returns>A function that takes an analysis result and returns the target partitions.</returns>
private static Func<AnalysisResult?, int, IEnumerable<int>> GetTargetAssigner()
{
return (analysisResult, targetCount) =>
{
if (analysisResult is not null)
{
if (analysisResult.spamDecision == SpamDecision.Spam)
{
return [0]; // Route only to spam handler (index 0)
}
else if (analysisResult.spamDecision == SpamDecision.NotSpam)
{
// Always route to email assistant (index 1)
List<int> targets = [1];
// Conditionally add summarizer for long emails (index 2)
if (analysisResult.EmailLength > EmailProcessingConstants.LongEmailThreshold)
{
targets.Add(2);
}
return targets;
}
else // Uncertain
{
return [3]; // Route only to uncertain handler (index 3)
}
}
throw new ArgumentException("Invalid analysis result.");
};
}
Ключевые функции целевого назначателя
- Выбор динамического целевого объекта: возвращает список индексов исполнителя для активации
- Маршрутизация с учетом содержимого: принимает решения на основе свойств сообщения, таких как длина электронной почты
- Параллельная обработка: несколько целевых объектов могут выполняться одновременно
- Условная логика: сложное ветвление на основе нескольких критериев
Расширенные исполнители рабочих процессов
Реализуйте исполнителей, обрабатывающих расширенный анализ и маршрутизацию:
/// <summary>
/// Executor that analyzes emails using an AI agent with enhanced analysis.
/// </summary>
internal sealed partial class EmailAnalysisExecutor : Executor
{
private readonly AIAgent _emailAnalysisAgent;
public EmailAnalysisExecutor(AIAgent emailAnalysisAgent) : base("EmailAnalysisExecutor")
{
this._emailAnalysisAgent = emailAnalysisAgent;
}
[MessageHandler]
private async ValueTask<AnalysisResult> HandleAsync(ChatMessage message, IWorkflowContext context, CancellationToken cancellationToken = default)
{
// Generate a random email ID and store the email content
var newEmail = new Email
{
EmailId = Guid.NewGuid().ToString("N"),
EmailContent = message.Text
};
await context.QueueStateUpdateAsync(newEmail.EmailId, newEmail, scopeName: EmailStateConstants.EmailStateScope);
// Invoke the agent for enhanced analysis
var response = await this._emailAnalysisAgent.RunAsync(message);
var analysisResult = JsonSerializer.Deserialize<AnalysisResult>(response.Text);
// Enrich with metadata for routing decisions
analysisResult!.EmailId = newEmail.EmailId;
analysisResult.EmailLength = newEmail.EmailContent.Length;
return analysisResult;
}
}
/// <summary>
/// Executor that assists with email responses using an AI agent.
/// </summary>
internal sealed partial class EmailAssistantExecutor : Executor
{
private readonly AIAgent _emailAssistantAgent;
public EmailAssistantExecutor(AIAgent emailAssistantAgent) : base("EmailAssistantExecutor")
{
this._emailAssistantAgent = emailAssistantAgent;
}
[MessageHandler]
private async ValueTask<EmailResponse> HandleAsync(AnalysisResult message, IWorkflowContext context, CancellationToken cancellationToken = default)
{
if (message.spamDecision == SpamDecision.Spam)
{
throw new ArgumentException("This executor should only handle non-spam messages.");
}
// Retrieve the email content from shared state
var email = await context.ReadStateAsync<Email>(message.EmailId, scopeName: EmailStateConstants.EmailStateScope);
// Invoke the agent to draft a response
var response = await this._emailAssistantAgent.RunAsync(email!.EmailContent);
var emailResponse = JsonSerializer.Deserialize<EmailResponse>(response.Text);
return emailResponse!;
}
}
/// <summary>
/// Executor that summarizes emails using an AI agent for long emails.
/// </summary>
internal sealed partial class EmailSummaryExecutor : Executor
{
private readonly AIAgent _emailSummaryAgent;
public EmailSummaryExecutor(AIAgent emailSummaryAgent) : base("EmailSummaryExecutor")
{
this._emailSummaryAgent = emailSummaryAgent;
}
[MessageHandler]
private async ValueTask<AnalysisResult> HandleAsync(AnalysisResult message, IWorkflowContext context, CancellationToken cancellationToken = default)
{
// Read the email content from shared state
var email = await context.ReadStateAsync<Email>(message.EmailId, scopeName: EmailStateConstants.EmailStateScope);
// Generate summary for long emails
var response = await this._emailSummaryAgent.RunAsync(email!.EmailContent);
var emailSummary = JsonSerializer.Deserialize<EmailSummary>(response.Text);
// Enrich the analysis result with the summary
message.EmailSummary = emailSummary!.Summary;
return message;
}
}
/// <summary>
/// Executor that sends emails.
/// </summary>
internal sealed partial class SendEmailExecutor : Executor
{
public SendEmailExecutor() : base("SendEmailExecutor") { }
[MessageHandler]
private async ValueTask HandleAsync(EmailResponse message, IWorkflowContext context, CancellationToken cancellationToken = default) =>
await context.YieldOutputAsync($"Email sent: {message.Response}");
}
/// <summary>
/// Executor that handles spam messages.
/// </summary>
internal sealed partial class HandleSpamExecutor : Executor
{
public HandleSpamExecutor() : base("HandleSpamExecutor") { }
[MessageHandler]
private async ValueTask HandleAsync(AnalysisResult message, IWorkflowContext context, CancellationToken cancellationToken = default)
{
if (message.spamDecision == SpamDecision.Spam)
{
await context.YieldOutputAsync($"Email marked as spam: {message.Reason}");
}
else
{
throw new ArgumentException("This executor should only handle spam messages.");
}
}
}
/// <summary>
/// Executor that handles uncertain messages requiring manual review.
/// </summary>
internal sealed partial class HandleUncertainExecutor : Executor
{
public HandleUncertainExecutor() : base("HandleUncertainExecutor") { }
[MessageHandler]
private async ValueTask HandleAsync(AnalysisResult message, IWorkflowContext context, CancellationToken cancellationToken = default)
{
if (message.spamDecision == SpamDecision.Uncertain)
{
var email = await context.ReadStateAsync<Email>(message.EmailId, scopeName: EmailStateConstants.EmailStateScope);
await context.YieldOutputAsync($"Email marked as uncertain: {message.Reason}. Email content: {email?.EmailContent}");
}
else
{
throw new ArgumentException("This executor should only handle uncertain spam decisions.");
}
}
}
/// <summary>
/// Executor that handles database access with custom events.
/// </summary>
internal sealed partial class DatabaseAccessExecutor : Executor
{
public DatabaseAccessExecutor() : base("DatabaseAccessExecutor") { }
[MessageHandler]
private async ValueTask HandleAsync(AnalysisResult message, IWorkflowContext context, CancellationToken cancellationToken = default)
{
// Simulate database operations
await context.ReadStateAsync<Email>(message.EmailId, scopeName: EmailStateConstants.EmailStateScope);
await Task.Delay(100); // Simulate database access delay
// Emit custom database event for monitoring
await context.AddEventAsync(new DatabaseEvent($"Email {message.EmailId} saved to database."));
}
}
Усиленные агенты ИИ
Создание агентов для анализа, помощи и суммирования:
/// <summary>
/// Create an enhanced email analysis agent.
/// </summary>
/// <returns>A ChatClientAgent configured for comprehensive email analysis</returns>
private static ChatClientAgent GetEmailAnalysisAgent(IChatClient chatClient) =>
new(chatClient, new ChatClientAgentOptions
{
ChatOptions = new()
{
Instructions = "You are a spam detection assistant that identifies spam emails.",
ResponseFormat = ChatResponseFormat.ForJsonSchema<AnalysisResult>()
}
});
/// <summary>
/// Creates an email assistant agent.
/// </summary>
/// <returns>A ChatClientAgent configured for email assistance</returns>
private static ChatClientAgent GetEmailAssistantAgent(IChatClient chatClient) =>
new(chatClient, new ChatClientAgentOptions
{
ChatOptions = new()
{
Instructions = "You are an email assistant that helps users draft responses to emails with professionalism.",
ResponseFormat = ChatResponseFormat.ForJsonSchema<EmailResponse>()
}
});
/// <summary>
/// Creates an agent that summarizes emails.
/// </summary>
/// <returns>A ChatClientAgent configured for email summarization</returns>
private static ChatClientAgent GetEmailSummaryAgent(IChatClient chatClient) =>
new(chatClient, new ChatClientAgentOptions
{
ChatOptions = new()
{
Instructions = "You are an assistant that helps users summarize emails.",
ResponseFormat = ChatResponseFormat.ForJsonSchema<EmailSummary>()
}
});
Создание рабочего процесса с несколькими выборами
Создайте рабочий процесс с сложной маршрутизацией и параллельной обработкой:
public static class Program
{
private static async Task Main()
{
// Set up the Azure OpenAI client
var endpoint = Environment.GetEnvironmentVariable("AZURE_OPENAI_ENDPOINT") ?? throw new Exception("AZURE_OPENAI_ENDPOINT is not set.");
var deploymentName = Environment.GetEnvironmentVariable("AZURE_OPENAI_DEPLOYMENT_NAME") ?? "gpt-4o-mini";
var chatClient = new AIProjectClient(new Uri(endpoint), new DefaultAzureCredential())
.GetProjectOpenAIClient()
.GetProjectResponsesClient()
.AsIChatClient(deploymentName);
// Create agents
AIAgent emailAnalysisAgent = GetEmailAnalysisAgent(chatClient);
AIAgent emailAssistantAgent = GetEmailAssistantAgent(chatClient);
AIAgent emailSummaryAgent = GetEmailSummaryAgent(chatClient);
// Create executors
var emailAnalysisExecutor = new EmailAnalysisExecutor(emailAnalysisAgent);
var emailAssistantExecutor = new EmailAssistantExecutor(emailAssistantAgent);
var emailSummaryExecutor = new EmailSummaryExecutor(emailSummaryAgent);
var sendEmailExecutor = new SendEmailExecutor();
var handleSpamExecutor = new HandleSpamExecutor();
var handleUncertainExecutor = new HandleUncertainExecutor();
var databaseAccessExecutor = new DatabaseAccessExecutor();
// Build the workflow with multi-selection fan-out
WorkflowBuilder builder = new(emailAnalysisExecutor);
builder.AddFanOutEdge(
emailAnalysisExecutor,
targets: [
handleSpamExecutor, // Index 0: Spam handler
emailAssistantExecutor, // Index 1: Email assistant (always for NotSpam)
emailSummaryExecutor, // Index 2: Summarizer (conditionally for long NotSpam)
handleUncertainExecutor, // Index 3: Uncertain handler
],
targetSelector: GetTargetAssigner()
)
// Email assistant branch
.AddEdge(emailAssistantExecutor, sendEmailExecutor)
// Database persistence: conditional routing
.AddEdge<AnalysisResult>(
emailAnalysisExecutor,
databaseAccessExecutor,
condition: analysisResult => analysisResult?.EmailLength <= EmailProcessingConstants.LongEmailThreshold) // Short emails
.AddEdge(emailSummaryExecutor, databaseAccessExecutor) // Long emails with summary
.WithOutputFrom(handleUncertainExecutor, handleSpamExecutor, sendEmailExecutor);
var workflow = builder.Build();
// Read a moderately long email to trigger both assistant and summarizer
string email = Resources.Read("email.txt");
// Execute the workflow with custom event handling
StreamingRun run = await InProcessExecution.RunStreamingAsync(workflow, new ChatMessage(ChatRole.User, email));
await run.TrySendMessageAsync(new TurnToken(emitEvents: true));
await foreach (WorkflowEvent evt in run.WatchStreamAsync().ConfigureAwait(false))
{
if (evt is WorkflowOutputEvent outputEvent)
{
Console.WriteLine($"Output: {outputEvent}");
}
if (evt is DatabaseEvent databaseEvent)
{
Console.WriteLine($"Database: {databaseEvent}");
}
}
}
}
Сравнение шаблонов: Множественный выбор и switch-case
Шаблон Switch-Case (предыдущий):
// One input → exactly one output
builder.AddSwitch(spamDetectionExecutor, switchBuilder =>
switchBuilder
.AddCase(GetCondition(SpamDecision.NotSpam), emailAssistantExecutor)
.AddCase(GetCondition(SpamDecision.Spam), handleSpamExecutor)
.WithDefault(handleUncertainExecutor)
)
Шаблон выбора с несколькими параметрами:
// One input → one or more outputs (dynamic fan-out)
builder.AddFanOutEdge(
emailAnalysisExecutor,
targets: [handleSpamExecutor, emailAssistantExecutor, emailSummaryExecutor, handleUncertainExecutor],
targetSelector: GetTargetAssigner() // Returns list of target indices
)
Основные преимущества ребер с множественным выбором
- Параллельная обработка: несколько ветвей могут выполняться одновременно
- Условный фан-аут: количество целей может варьироваться в зависимости от содержимого
- Маршрутизация с учетом содержимого: решения на основе свойств сообщения, а не только типа
- Эффективное использование ресурсов: активируются только необходимые ветви
- Сложная бизнес-логика: поддерживает сложные сценарии маршрутизации
Запуск примера с несколькими выборами
При запуске этого рабочего процесса с длинной электронной почтой:
Output: Email sent: [Professional response generated by AI]
Database: Email abc123 saved to database.
При запуске с коротким электронным письмом сумматор пропускается:
Output: Email sent: [Professional response generated by AI]
Database: Email def456 saved to database.
Варианты использования Real-World
- Системы электронной почты: направление к помощнику по ответам + архив + аналитика (при условии)
- Обработка содержимого: триггер транскрибирования и перевода и анализа (на основе типа контента)
- Обработка заказов: маршрут к выполнению и выставлению счетов и уведомлениям (на основе свойств заказа)
- Конвейеры данных: активация различных потоков аналитики на основе характеристик данных
Полная реализация с множественным выбором
Полный рабочий процесс см. в этом примере в репозитории Agent Framework.
За пределами switch-case: многоопционная маршрутизация
Хотя ребра коммутатора направляют сообщения в ровно одно место назначения, рабочие процессы реального мира часто должны активировать несколько параллельных операций на основе характеристик данных. Секционированные ребра (реализованы как группы многоразового выбора ребер) обеспечивают сложные шаблоны разветвления, в которых одно сообщение может активировать несколько исполнителей на следующих уровнях одновременно.
Расширенный рабочий процесс обработки электронной почты
На основе примера с переключателем, вы создадите расширенную систему обработки электронной почты, показывающую сложную логику маршрутизации.
- Спам электронной почты → единый обработчик нежелательной почты (например, switch-case)
Законные сообщения электронной почты →Помощник по электронной почте всегда срабатывает + Условный триггер сводных данных для длинных сообщений электронной почты- Неопределенные сообщения электронной почты → единый неопределенный обработчик (например, switch-case)
- Сохраняемость базы данных → активирована как для коротких сообщений электронной почты, так и для суммированных длинных сообщений электронной почты
Этот шаблон позволяет параллельным конвейерам обработки адаптироваться к характеристикам содержимого.
Основные понятия, описанные в статье
Расширенные модели данных для выбора нескольких вариантов
Расширьте модели данных для поддержки анализа длины и суммирования электронной почты:
class AnalysisResultAgent(BaseModel):
"""Enhanced structured output from email analysis agent."""
spam_decision: Literal["NotSpam", "Spam", "Uncertain"]
reason: str
class EmailResponse(BaseModel):
"""Response from email assistant."""
response: str
class EmailSummaryModel(BaseModel):
"""Summary generated by email summary agent."""
summary: str
@dataclass
class AnalysisResult:
"""Internal analysis result with email metadata for routing decisions."""
spam_decision: str
reason: str
email_length: int # Used for conditional routing
email_summary: str # Populated by summary agent
email_id: str
@dataclass
class Email:
"""Email content stored in shared state."""
email_id: str
email_content: str
# Custom event data for database operations
class DatabaseEvent:
"""Custom event data for tracking database operations."""
def __init__(self, message: str):
self.message = message
def __repr__(self) -> str:
return f"DatabaseEvent({self.message})"
Функция выбора: сердце множественного выбора
Функция выбора определяет, какие исполнителям следует получать каждое сообщение:
LONG_EMAIL_THRESHOLD = 100
def select_targets(analysis: AnalysisResult, target_ids: list[str]) -> list[str]:
"""Intelligent routing based on spam decision and email characteristics."""
# Target order: [handle_spam, submit_to_email_assistant, summarize_email, handle_uncertain]
handle_spam_id, submit_to_email_assistant_id, summarize_email_id, handle_uncertain_id = target_ids
if analysis.spam_decision == "Spam":
# Route only to spam handler
return [handle_spam_id]
elif analysis.spam_decision == "NotSpam":
# Always route to email assistant
targets = [submit_to_email_assistant_id]
# Conditionally add summarizer for long emails
if analysis.email_length > LONG_EMAIL_THRESHOLD:
targets.append(summarize_email_id)
return targets
else: # Uncertain
# Route only to uncertain handler
return [handle_uncertain_id]
Ключевые функции выбора
- Динамический выбор целевого объекта: возвращает список идентификаторов исполнителя для активации
- Маршрутизация с учетом содержимого: принимает решения на основе свойств сообщения
- Параллельная обработка: несколько целевых объектов могут выполняться одновременно
- Условная логика: сложное ветвление на основе нескольких критериев
Исполнители рабочих процессов с несколькими выборами
Реализуйте исполнителей, обрабатывающих расширенный анализ и маршрутизацию:
EMAIL_STATE_PREFIX = "email:"
CURRENT_EMAIL_ID_KEY = "current_email_id"
@executor(id="store_email")
async def store_email(email_text: str, ctx: WorkflowContext[AgentExecutorRequest]) -> None:
"""Store email and initiate analysis."""
new_email = Email(email_id=str(uuid4()), email_content=email_text)
ctx.set_state(f"{EMAIL_STATE_PREFIX}{new_email.email_id}", new_email)
ctx.set_state(CURRENT_EMAIL_ID_KEY, new_email.email_id)
await ctx.send_message(
AgentExecutorRequest(messages=[Message(role="user", contents=[new_email.email_content])], should_respond=True)
)
@executor(id="to_analysis_result")
async def to_analysis_result(response: AgentExecutorResponse, ctx: WorkflowContext[AnalysisResult]) -> None:
"""Transform agent response into enriched analysis result."""
parsed = AnalysisResultAgent.model_validate_json(response.agent_run_response.text)
email_id: str = ctx.get_state(CURRENT_EMAIL_ID_KEY)
email: Email = ctx.get_state(f"{EMAIL_STATE_PREFIX}{email_id}")
# Create enriched analysis result with email length for routing decisions
await ctx.send_message(
AnalysisResult(
spam_decision=parsed.spam_decision,
reason=parsed.reason,
email_length=len(email.email_content), # Key for conditional routing
email_summary="",
email_id=email_id,
)
)
@executor(id="submit_to_email_assistant")
async def submit_to_email_assistant(analysis: AnalysisResult, ctx: WorkflowContext[AgentExecutorRequest]) -> None:
"""Handle legitimate emails by forwarding to email assistant."""
if analysis.spam_decision != "NotSpam":
raise RuntimeError("This executor should only handle NotSpam messages.")
email: Email = ctx.get_state(f"{EMAIL_STATE_PREFIX}{analysis.email_id}")
await ctx.send_message(
AgentExecutorRequest(messages=[Message(role="user", contents=[email.email_content])], should_respond=True)
)
@executor(id="finalize_and_send")
async def finalize_and_send(response: AgentExecutorResponse, ctx: WorkflowContext[Never, str]) -> None:
"""Final step for email assistant branch."""
parsed = EmailResponse.model_validate_json(response.agent_run_response.text)
await ctx.yield_output(f"Email sent: {parsed.response}")
@executor(id="summarize_email")
async def summarize_email(analysis: AnalysisResult, ctx: WorkflowContext[AgentExecutorRequest]) -> None:
"""Generate summary for long emails (parallel branch)."""
# Only called for long NotSpam emails by selection function
email: Email = ctx.get_state(f"{EMAIL_STATE_PREFIX}{analysis.email_id}")
await ctx.send_message(
AgentExecutorRequest(messages=[Message(role="user", contents=[email.email_content])], should_respond=True)
)
@executor(id="merge_summary")
async def merge_summary(response: AgentExecutorResponse, ctx: WorkflowContext[AnalysisResult]) -> None:
"""Merge summary back into analysis result for database persistence."""
summary = EmailSummaryModel.model_validate_json(response.agent_run_response.text)
email_id: str = ctx.get_state(CURRENT_EMAIL_ID_KEY)
email: Email = ctx.get_state(f"{EMAIL_STATE_PREFIX}{email_id}")
# Create analysis result with summary for database storage
await ctx.send_message(
AnalysisResult(
spam_decision="NotSpam",
reason="",
email_length=len(email.email_content),
email_summary=summary.summary, # Now includes summary
email_id=email_id,
)
)
@executor(id="handle_spam")
async def handle_spam(analysis: AnalysisResult, ctx: WorkflowContext[Never, str]) -> None:
"""Handle spam emails (single target like switch-case)."""
if analysis.spam_decision == "Spam":
await ctx.yield_output(f"Email marked as spam: {analysis.reason}")
else:
raise RuntimeError("This executor should only handle Spam messages.")
@executor(id="handle_uncertain")
async def handle_uncertain(analysis: AnalysisResult, ctx: WorkflowContext[Never, str]) -> None:
"""Handle uncertain emails (single target like switch-case)."""
if analysis.spam_decision == "Uncertain":
email: Email | None = ctx.get_state(f"{EMAIL_STATE_PREFIX}{analysis.email_id}")
await ctx.yield_output(
f"Email marked as uncertain: {analysis.reason}. Email content: {getattr(email, 'email_content', '')}"
)
else:
raise RuntimeError("This executor should only handle Uncertain messages.")
@executor(id="database_access")
async def database_access(analysis: AnalysisResult, ctx: WorkflowContext[Never, str]) -> None:
"""Simulate database persistence with custom events."""
await asyncio.sleep(0.05) # Simulate DB operation
await ctx.add_event(WorkflowEvent("data", data=DatabaseEvent(f"Email {analysis.email_id} saved to database.")))
Усиленные агенты ИИ
Создание агентов для анализа, помощи и суммирования:
async def main() -> None:
chat_client = OpenAIChatCompletionClient(
model=os.environ["AZURE_OPENAI_CHAT_COMPLETION_MODEL"],
azure_endpoint=os.environ["AZURE_OPENAI_ENDPOINT"],
api_version=os.getenv("AZURE_OPENAI_API_VERSION"),
credential=AzureCliCredential(),
)
# Enhanced analysis agent
email_analysis_agent = AgentExecutor(
chat_client.as_agent(
instructions=(
"You are a spam detection assistant that identifies spam emails. "
"Always return JSON with fields 'spam_decision' (one of NotSpam, Spam, Uncertain) "
"and 'reason' (string)."
),
response_format=AnalysisResultAgent,
),
id="email_analysis_agent",
)
# Email assistant (same as before)
email_assistant_agent = AgentExecutor(
chat_client.as_agent(
instructions=(
"You are an email assistant that helps users draft responses to emails with professionalism."
),
response_format=EmailResponse,
),
id="email_assistant_agent",
)
# New: Email summary agent for long emails
email_summary_agent = AgentExecutor(
chat_client.as_agent(
instructions="You are an assistant that helps users summarize emails.",
response_format=EmailSummaryModel,
),
id="email_summary_agent",
)
Создание рабочего процесса выбора с несколькими параметрами
Создайте рабочий процесс с сложной маршрутизацией и параллельной обработкой:
workflow = (
WorkflowBuilder(start_executor=store_email)
.add_edge(store_email, email_analysis_agent)
.add_edge(email_analysis_agent, to_analysis_result)
# Multi-selection edge group: intelligent fan-out based on content
.add_multi_selection_edge_group(
to_analysis_result,
[handle_spam, submit_to_email_assistant, summarize_email, handle_uncertain],
selection_func=select_targets,
)
# Email assistant branch (always for NotSpam)
.add_edge(submit_to_email_assistant, email_assistant_agent)
.add_edge(email_assistant_agent, finalize_and_send)
# Summary branch (only for long NotSpam emails)
.add_edge(summarize_email, email_summary_agent)
.add_edge(email_summary_agent, merge_summary)
# Database persistence: conditional routing
.add_edge(to_analysis_result, database_access,
condition=lambda r: r.email_length <= LONG_EMAIL_THRESHOLD) # Short emails
.add_edge(merge_summary, database_access) # Long emails with summary
.build()
)
Выполнение с потоковой передачей событий
Запустите рабочий процесс и просмотрите параллельное выполнение с помощью пользовательских событий:
# Use a moderately long email to trigger both assistant and summarizer
email = """
Hello team, here are the updates for this week:
1. Project Alpha is on track and we should have the first milestone completed by Friday.
2. The client presentation has been scheduled for next Tuesday at 2 PM.
3. Please review the Q4 budget allocation and provide feedback by Wednesday.
Let me know if you have any questions or concerns.
Best regards,
Alex
"""
# Stream events to see parallel execution
async for event in workflow.run_stream(email):
if isinstance(event.data, DatabaseEvent):
print(f"Database: {event}")
elif event.type == "output":
print(f"Output: {event.data}")
Сравнение Множественного выбора и конструкции Switch-Case
Шаблон Switch-Case (предыдущий):
# One input → exactly one output
.add_switch_case_edge_group(
source,
[
Case(condition=lambda x: x.result == "A", target=handler_a),
Case(condition=lambda x: x.result == "B", target=handler_b),
Default(target=handler_c),
],
)
Шаблон выбора с несколькими параметрами:
# One input → one or more outputs (dynamic fan-out)
.add_multi_selection_edge_group(
source,
[handler_a, handler_b, handler_c, handler_d],
selection_func=intelligent_router, # Returns list of target IDs
)
Преимущества выбора нескольких вариантов
- Параллельная обработка: несколько ветвей могут выполняться одновременно
- Условный фан-аут: количество целей может варьироваться в зависимости от содержимого
- Маршрутизация с учетом содержимого: решения на основе свойств сообщения, а не только типа
- Эффективное использование ресурсов: активируются только необходимые ветви
- Сложная бизнес-логика: поддерживает сложные сценарии маршрутизации
приложения для реального мира
- Системы электронной почты: направление к помощнику по ответам + архив + аналитика (при условии)
- Обработка содержимого: триггер транскрибирования и перевода и анализа (на основе типа контента)
- Обработка заказов: маршрут к выполнению и выставлению счетов и уведомлениям (на основе свойств заказа)
- Конвейеры данных: активация различных потоков аналитики на основе характеристик данных
Пример кода с несколькими выборами
Полный рабочий процесс см. в примере multi_selection_edge_group.py в репозитории Agent Framework.