Поделиться через


Оркестрации рабочих процессов Microsoft Agent Framework — последовательные

В последовательной оркестрации агенты организованы в цепочке. Каждый агент обрабатывает задачу в свою очередь, передавая выходные данные следующему агенту в последовательности. Это идеально подходит для рабочих процессов, в которых каждый шаг строится на основе предыдущего, например проверки документов, потоков обработки данных или многоэтапного рассуждения.

Последовательная оркестрация

Это важно

Полная история переписки от предыдущих агентов передается следующему агенту по цепочке. Каждый агент может видеть все предыдущие сообщения, что позволяет производить обработку с учетом контекста.

Цели обучения

  • Создание последовательного конвейера агентов
  • Как связать агентов, где каждый из них строится на основе предыдущих результатов
  • Как комбинировать агентов с пользовательскими исполнителями для специализированных задач
  • Отслеживание потока беседы через конвейер

Определение агентов

В последовательной оркестрации агенты организованы в конвейере, где каждый агент обрабатывает задачу в свою очередь, передав выходные данные следующему агенту в последовательности.

Настройка клиента Azure OpenAI

using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Azure.AI.OpenAI;
using Azure.Identity;
using Microsoft.Agents.AI.Workflows;
using Microsoft.Extensions.AI;
using Microsoft.Agents.AI;

// 1) Set up the Azure OpenAI client
var endpoint = Environment.GetEnvironmentVariable("AZURE_OPENAI_ENDPOINT") ??
    throw new InvalidOperationException("AZURE_OPENAI_ENDPOINT is not set.");
var deploymentName = Environment.GetEnvironmentVariable("AZURE_OPENAI_DEPLOYMENT_NAME") ?? "gpt-4o-mini";
var client = new AzureOpenAIClient(new Uri(endpoint), new DefaultAzureCredential())
    .GetChatClient(deploymentName)
    .AsIChatClient();

Предупреждение

DefaultAzureCredential удобно для разработки, но требует тщательного рассмотрения в рабочей среде. В рабочей среде рекомендуется использовать определенные учетные данные (например, ManagedIdentityCredential), чтобы избежать проблем с задержкой, непреднамеренной проверки данных аутентификации и потенциальных рисков безопасности из-за резервных механизмов.

Создайте специализированные агенты, которые будут работать в последовательности:

// 2) Helper method to create translation agents
static ChatClientAgent GetTranslationAgent(string targetLanguage, IChatClient chatClient) =>
    new(chatClient,
        $"You are a translation assistant who only responds in {targetLanguage}. Respond to any " +
        $"input by outputting the name of the input language and then translating the input to {targetLanguage}.");

// Create translation agents for sequential processing
var translationAgents = (from lang in (string[])["French", "Spanish", "English"]
                         select GetTranslationAgent(lang, client));

Настройка последовательной оркестрации

Создание рабочего процесса с помощью AgentWorkflowBuilder:

// 3) Build sequential workflow
var workflow = AgentWorkflowBuilder.BuildSequential(translationAgents);

Запуск последовательного рабочего процесса

Выполните рабочий процесс и обработайте события:

// 4) Run the workflow
var messages = new List<ChatMessage> { new(ChatRole.User, "Hello, world!") };

StreamingRun run = await InProcessExecution.StreamAsync(workflow, messages);
await run.TrySendMessageAsync(new TurnToken(emitEvents: true));

List<ChatMessage> result = new();
await foreach (WorkflowEvent evt in run.WatchStreamAsync().ConfigureAwait(false))
{
    if (evt is AgentResponseUpdateEvent e)
    {
        Console.WriteLine($"{e.ExecutorId}: {e.Data}");
    }
    else if (evt is WorkflowOutputEvent outputEvt)
    {
        result = (List<ChatMessage>)outputEvt.Data!;
        break;
    }
}

// Display final result
foreach (var message in result)
{
    Console.WriteLine($"{message.Role}: {message.Content}");
}

Пример выходных данных

French_Translation: User: Hello, world!
French_Translation: Assistant: English detected. Bonjour, le monde !
Spanish_Translation: Assistant: French detected. ¡Hola, mundo!
English_Translation: Assistant: Spanish detected. Hello, world!

Основные понятия

  • Последовательная обработка: каждый агент обрабатывает выходные данные предыдущего агента в порядке
  • AgentWorkflowBuilder.BuildSequential(): создает рабочий процесс конвейера из коллекции агентов
  • ChatClientAgent: представляет собой агента, привязанного к клиенту чата и работающего по конкретным инструкциям
  • StreamingRun: обеспечивает выполнение в режиме реального времени с возможностями потоковой передачи событий
  • Обработка событий: мониторинг хода выполнения агента с помощью AgentResponseUpdateEvent и завершения с помощью WorkflowOutputEvent

В последовательной оркестрации каждый агент обрабатывает задачу в свою очередь, при этом выходные данные будут поступать от одного до следующего. Начните с определения агентов для двухэтапного процесса:

from agent_framework.azure import AzureChatClient
from azure.identity import AzureCliCredential

# 1) Create agents using AzureChatClient
chat_client = AzureChatClient(credential=AzureCliCredential())

writer = chat_client.as_agent(
    instructions=(
        "You are a concise copywriter. Provide a single, punchy marketing sentence based on the prompt."
    ),
    name="writer",
)

reviewer = chat_client.as_agent(
    instructions=(
        "You are a thoughtful reviewer. Give brief feedback on the previous assistant message."
    ),
    name="reviewer",
)

Настройка последовательной оркестрации

Класс SequentialBuilder создает конвейер, в котором агенты обрабатывают задачи по порядку. Каждый агент видит полный журнал бесед и добавляет свой ответ:

from agent_framework.orchestrations import SequentialBuilder

# 2) Build sequential workflow: writer -> reviewer
workflow = SequentialBuilder(participants=[writer, reviewer]).build()

Запуск последовательного рабочего процесса

Выполните рабочий процесс и соберите окончательный разговор, показывающий вклад каждого агента:

from agent_framework import Message, WorkflowEvent

# 3) Run and print final conversation
output_evt: WorkflowEvent | None = None
async for event in workflow.run_stream("Write a tagline for a budget-friendly eBike."):
    if event.type == "output":
        output_evt = event

if output_evt:
    print("===== Final Conversation =====")
    messages: list[Message] | Any = output_evt.data
    for i, msg in enumerate(messages, start=1):
        name = msg.author_name or ("assistant" if msg.role == "assistant" else "user")
        print(f"{'-' * 60}\n{i:02d} [{name}]\n{msg.text}")

Пример выходных данных

===== Final Conversation =====
------------------------------------------------------------
01 [user]
Write a tagline for a budget-friendly eBike.
------------------------------------------------------------
02 [writer]
Ride farther, spend less—your affordable eBike adventure starts here.
------------------------------------------------------------
03 [reviewer]
This tagline clearly communicates affordability and the benefit of extended travel, making it
appealing to budget-conscious consumers. It has a friendly and motivating tone, though it could
be slightly shorter for more punch. Overall, a strong and effective suggestion!

Продвинутый: Сочетание агентов с пользовательскими исполнителями

Последовательная оркестрация позволяет использовать агентов в сочетании с пользовательскими исполнителями для специализированной обработки. Это полезно, если требуется пользовательская логика, которая не требует LLM:

Определение пользовательского исполнителя

from agent_framework import Executor, WorkflowContext, handler
from agent_framework import Message

class Summarizer(Executor):
    """Simple summarizer: consumes full conversation and appends an assistant summary."""

    @handler
    async def summarize(
        self,
        conversation: list[Message],
        ctx: WorkflowContext[list[Message]]
    ) -> None:
        users = sum(1 for m in conversation if m.role == "user")
        assistants = sum(1 for m in conversation if m.role == "assistant")
        summary = Message(
            role="assistant",
            contents=[f"Summary -> users:{users} assistants:{assistants}"]
        )
        await ctx.send_message(list(conversation) + [summary])

Создание смешанного последовательного рабочего процесса

# Create a content agent
content = chat_client.as_agent(
    instructions="Produce a concise paragraph answering the user's request.",
    name="content",
)

# Build sequential workflow: content -> summarizer
summarizer = Summarizer(id="summarizer")
workflow = SequentialBuilder(participants=[content, summarizer]).build()

Пример выходных данных с пользовательским экзекутором

------------------------------------------------------------
01 [user]
Explain the benefits of budget eBikes for commuters.
------------------------------------------------------------
02 [content]
Budget eBikes offer commuters an affordable, eco-friendly alternative to cars and public transport.
Their electric assistance reduces physical strain and allows riders to cover longer distances quickly,
minimizing travel time and fatigue. Budget models are low-cost to maintain and operate, making them accessible
for a wider range of people. Additionally, eBikes help reduce traffic congestion and carbon emissions,
supporting greener urban environments. Overall, budget eBikes provide cost-effective, efficient, and
sustainable transportation for daily commuting needs.
------------------------------------------------------------
03 [assistant]
Summary -> users:1 assistants:1

Основные понятия

  • Общий контекст: каждый участник получает полный журнал бесед, включая все предыдущие сообщения.
  • Порядок исполнения имеет значение: агенты выполняются строго в порядке, указанном в списке participants()
  • Гибкие участники: вы можете смешивать агенты и пользовательских исполнителей в любом порядке
  • Поток беседы: каждый агент или исполнитель дополняет беседу, создавая полный диалог

Дальнейшие шаги