Оркестрации рабочих процессов Microsoft Agent Framework — последовательные

В последовательной оркестрации агенты организованы в цепочке. Каждый агент обрабатывает задачу в свою очередь, передавая выходные данные следующему агенту в последовательности. Это идеально подходит для рабочих процессов, в которых каждый шаг строится на основе предыдущего, например проверки документов, потоков обработки данных или многоэтапного рассуждения.

Последовательная оркестрация

Это важно

По умолчанию каждый агент в последовательности использует полный диалог предыдущего агента — как входные сообщения, предоставленные предыдущему агенту, так и его ответные сообщения. Агентов можно настроить так, чтобы они отвечали только на сообщения предыдущего агента. Дополнительные сведения см. в разделе "Управление контекстом между агентами ".

Цели обучения

  • Создание последовательного конвейера агентов
  • Как связать агентов, где каждый из них строится на основе предыдущих результатов
  • Добавление одобрения с участием человека для вызовов конфиденциальных инструментов
  • Как комбинировать агентов с пользовательскими исполнителями для специализированных задач
  • Отслеживание потока беседы через конвейер

Определение агентов

В последовательной оркестрации агенты организованы в конвейере, где каждый агент обрабатывает задачу в свою очередь, передав выходные данные следующему агенту в последовательности.

Настройка клиента Azure OpenAI

using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Azure.AI.Projects;
using Azure.Identity;
using Microsoft.Agents.AI.Workflows;
using Microsoft.Extensions.AI;
using Microsoft.Agents.AI;

// 1) Set up the Azure OpenAI client
var endpoint = Environment.GetEnvironmentVariable("AZURE_OPENAI_ENDPOINT") ??
    throw new InvalidOperationException("AZURE_OPENAI_ENDPOINT is not set.");
var deploymentName = Environment.GetEnvironmentVariable("AZURE_OPENAI_DEPLOYMENT_NAME") ?? "gpt-4o-mini";
var client = new AIProjectClient(new Uri(endpoint), new DefaultAzureCredential())
    .GetProjectOpenAIClient()
    .GetProjectResponsesClient()
    .AsIChatClient(deploymentName);

Предупреждение

DefaultAzureCredential удобно для разработки, но требует тщательного рассмотрения в рабочей среде. В рабочей среде рекомендуется использовать определенные учетные данные (например, ManagedIdentityCredential), чтобы избежать проблем с задержкой, непреднамеренной проверки данных аутентификации и потенциальных рисков безопасности из-за резервных механизмов.

Создайте специализированные агенты, которые будут работать в последовательности:

// 2) Helper method to create translation agents
static ChatClientAgent GetTranslationAgent(string targetLanguage, IChatClient chatClient) =>
    new(chatClient,
        $"You are a translation assistant who only responds in {targetLanguage}. Respond to any " +
        $"input by outputting the name of the input language and then translating the input to {targetLanguage}.");

// Create translation agents for sequential processing
var translationAgents = (from lang in (string[])["French", "Spanish", "English"]
                         select GetTranslationAgent(lang, client));

Настройка последовательной оркестрации

Создание рабочего процесса с помощью AgentWorkflowBuilder:

// 3) Build sequential workflow
var workflow = AgentWorkflowBuilder.BuildSequential(translationAgents);

Запуск последовательного рабочего процесса

Выполните рабочий процесс и обработайте события:

// 4) Run the workflow
var messages = new List<ChatMessage> { new(ChatRole.User, "Hello, world!") };

await using StreamingRun run = await InProcessExecution.RunStreamingAsync(workflow, messages);
await run.TrySendMessageAsync(new TurnToken(emitEvents: true));

string? lastExecutorId = null;
List<ChatMessage> result = [];
await foreach (WorkflowEvent evt in run.WatchStreamAsync())
{
    if (evt is AgentResponseUpdateEvent e)
    {
        if (e.ExecutorId != lastExecutorId)
        {
            lastExecutorId = e.ExecutorId;
            Console.WriteLine();
            Console.Write($"{e.ExecutorId}: ");
        }

        Console.Write(e.Update.Text);
    }
    else if (evt is WorkflowOutputEvent outputEvt)
    {
        result = outputEvt.As<List<ChatMessage>>()!;
        break;
    }
}

// Display final result
Console.WriteLine();
foreach (var message in result)
{
    Console.WriteLine($"{message.Role}: {message.Text}");
}

Пример выходных данных

French_Translation: User: Hello, world!
French_Translation: Assistant: English detected. Bonjour, le monde !
Spanish_Translation: Assistant: French detected. ¡Hola, mundo!
English_Translation: Assistant: Spanish detected. Hello, world!

Последовательная оркестрация с человеком в процессе

Последовательные оркестрации поддерживают взаимодействие с человеком в цикле с помощью утверждения средства. Когда агенты используют инструменты, завернутые в ApprovalRequiredAIFunction оболочку, рабочий процесс приостанавливается и излучает объект RequestInfoEvent, содержащий ToolApprovalRequestContent. Внешние системы (например, человеческий оператор) могут проверять вызов инструмента, одобрить или отклонить его, и рабочий процесс возобновляется в соответствии с решением.

Последовательная оркестрация с человеком в контуре

Подсказка

Дополнительные сведения о модели запроса и ответа см. в разделе "Человек в цикле".

Определение агентов с помощью инструментов, требующих одобрения

Создайте агентов, где конфиденциальные инструменты обернуты в ApprovalRequiredAIFunction:

ChatClientAgent deployAgent = new(
    client,
    "You are a DevOps engineer. Check staging status first, then deploy to production.",
    "DeployAgent",
    "Handles deployments",
    [
        AIFunctionFactory.Create(CheckStagingStatus),
        new ApprovalRequiredAIFunction(AIFunctionFactory.Create(DeployToProduction))
    ]);

ChatClientAgent verifyAgent = new(
    client,
    "You are a QA engineer. Verify that the deployment was successful and summarize the results.",
    "VerifyAgent",
    "Verifies deployments");

Сборка и запуск с обработкой утверждений/одобрений

Обычно создайте последовательный рабочий процесс. Процесс согласования осуществляется через поток событий.

var workflow = AgentWorkflowBuilder.BuildSequential([deployAgent, verifyAgent]);

await foreach (WorkflowEvent evt in run.WatchStreamAsync())
{
    if (evt is RequestInfoEvent e &&
        e.Request.TryGetDataAs(out ToolApprovalRequestContent? approvalRequest))
    {
        await run.SendResponseAsync(
            e.Request.CreateResponse(approvalRequest.CreateResponse(approved: true)));
    }
}

Замечание

AgentWorkflowBuilder.BuildSequential() поддерживает утверждение инструмента из коробки — дополнительная конфигурация не требуется. Когда агент вызывает инструмент, упакованный в оболочку ApprovalRequiredAIFunction, рабочий процесс автоматически приостанавливается и эмитирует RequestInfoEvent.

Подсказка

Полный исполняемый пример этого процесса утверждения см. в GroupChatToolApproval примере. Тот же RequestInfoEvent шаблон обработки применяется к другим оркестрациям.

Основные понятия

  • Последовательная обработка: каждый агент обрабатывает выходные данные предыдущего агента в порядке
  • AgentWorkflowBuilder.BuildSequential(): создает рабочий процесс конвейера из коллекции агентов
  • ChatClientAgent: представляет собой агента, привязанного к клиенту чата и работающего по конкретным инструкциям
  • InProcessExecution.RunStreamingAsync(): выполняет рабочий процесс и возвращает потоковую передачу StreamingRun событий в режиме реального времени.
  • Обработка событий: мониторинг хода выполнения агента с помощью AgentResponseUpdateEvent и завершения с помощью WorkflowOutputEvent
  • Утверждение инструмента: оберните конфиденциальные инструменты с ApprovalRequiredAIFunction, чтобы требовать утверждение человека перед выполнением
  • RequestInfoEvent: создается, когда инструменту требуется утверждение; содержит ToolApprovalRequestContent сведения о вызове средства

В последовательной оркестрации каждый агент обрабатывает задачу в свою очередь, при этом выходные данные будут поступать от одного до следующего. Начните с определения агентов для двухэтапного процесса:

import os
from agent_framework.foundry import FoundryChatClient
from azure.identity import AzureCliCredential

# 1) Create agents using FoundryChatClient
chat_client = FoundryChatClient(
    project_endpoint=os.environ["FOUNDRY_PROJECT_ENDPOINT"],
    model=os.environ["FOUNDRY_MODEL"],
    credential=AzureCliCredential(),
)

writer = chat_client.as_agent(
    instructions=(
        "You are a concise copywriter. Provide a single, punchy marketing sentence based on the prompt."
    ),
    name="writer",
)

reviewer = chat_client.as_agent(
    instructions=(
        "You are a thoughtful reviewer. Give brief feedback on the previous assistant message."
    ),
    name="reviewer",
)

Настройка последовательной оркестрации

Класс SequentialBuilder создает конвейер, в котором агенты обрабатывают задачи по порядку. Каждый агент видит полный журнал бесед и добавляет свой ответ:

from agent_framework.orchestrations import SequentialBuilder

# 2) Build sequential workflow: writer -> reviewer
workflow = SequentialBuilder(participants=[writer, reviewer]).build()

Запуск последовательного рабочего процесса

Выполните рабочий процесс и соберите окончательные выходные данные. Выходные данные терминала — это AgentResponse содержащие ответные сообщения последнего агента:

from agent_framework import AgentResponse

# 3) Run and print the last agent's response
events = await workflow.run("Write a tagline for a budget-friendly eBike.")
outputs = events.get_outputs()

if outputs:
    print("===== Final Response =====")
    final: AgentResponse = outputs[0]
    for msg in final.messages:
        name = msg.author_name or "assistant"
        print(f"[{name}]\n{msg.text}")

Пример выходных данных

===== Final Response =====
[reviewer]
This tagline clearly communicates affordability and the benefit of extended travel, making it
appealing to budget-conscious consumers. It has a friendly and motivating tone, though it could
be slightly shorter for more punch. Overall, a strong and effective suggestion!

Продвинутый: Сочетание агентов с пользовательскими исполнителями

Последовательная оркестрация позволяет использовать агентов в сочетании с пользовательскими исполнителями для специализированной обработки. Это полезно, если требуется пользовательская логика, которая не требует LLM:

Определение пользовательского исполнителя

Замечание

Когда пользовательский исполнитель следует за агентом в последовательности, его обработчик получает AgentExecutorResponse (так как агенты внутренне обёрнуты AgentExecutor). Используйте agent_response.full_conversation для доступа к полному журналу бесед. Пользовательский исполнитель, используемый в качестве последнего участника (терминатора), должен вызывать ctx.yield_output(AgentResponse(...)) , чтобы его выходные данные становятся выходными данными терминала рабочего процесса.

from agent_framework import AgentExecutorResponse, AgentResponse, Executor, WorkflowContext, handler
from agent_framework import Message
from typing_extensions import Never

class Summarizer(Executor):
    """Terminator custom executor: consumes full conversation and yields a summary as the workflow's final answer."""

    @handler
    async def summarize(
        self,
        agent_response: AgentExecutorResponse,
        ctx: WorkflowContext[Never, AgentResponse]
    ) -> None:
        if not agent_response.full_conversation:
            await ctx.yield_output(AgentResponse(messages=[Message("assistant", ["No conversation to summarize."])]))
            return

        users = sum(1 for m in agent_response.full_conversation if m.role == "user")
        assistants = sum(1 for m in agent_response.full_conversation if m.role == "assistant")
        summary = Message("assistant", [f"Summary -> users:{users} assistants:{assistants}"])
        await ctx.yield_output(AgentResponse(messages=[summary]))

Создание смешанного последовательного рабочего процесса

# Create a content agent
content = chat_client.as_agent(
    instructions="Produce a concise paragraph answering the user's request.",
    name="content",
)

# Build sequential workflow: content -> summarizer
summarizer = Summarizer(id="summarizer")
workflow = SequentialBuilder(participants=[content, summarizer]).build()

Пример выходных данных с пользовательским экзекутором

===== Final Summary =====
Summary -> users:1 assistants:1

Управление контекстом между агентами

По умолчанию каждый агент в рабочем процессе использует полный SequentialBuilder диалог предыдущего агента (входные и ответные сообщения). Параметр chain_only_agent_responses=True настраивает всех агентов в цепочке на обработку только ответных сообщений от предыдущего агента.

workflow = SequentialBuilder(
    participants=[writer, translator, reviewer],
    chain_only_agent_responses=True,
).build()

Это полезно для конвейеров перевода, прогрессивного уточнения и других сценариев, в которых каждый агент должен сосредоточиться исключительно на преобразовании выходных данных предыдущего агента, не влияя на предыдущие повороты диалога.

Полный пример можно найти в репозитории Agent Framework в sequential_chain_only_agent_responses.py.

Подсказка

Более точное управление потоком контекста, включая пользовательские функции фильтра, см. в разделе "Режимы контекста " в справочнике исполнителя агента.

Промежуточные выходные данные

По умолчанию SequentialBuilder определяет последнего участника в качестве источника выходных данных терминала (final_output_from). Только результат этого участника появляется как событие "output".

Чтобы также отображались результаты более ранних участников, передайте intermediate_output_from вместе с участниками, которых нужно обозначить как промежуточные источники. Это неявно исключает этих участников из финального набора по умолчанию — они генерируют события "intermediate", а не события "output":

workflow = SequentialBuilder(
    participants=[writer, reviewer, editor],
    intermediate_output_from=[writer, reviewer],
).build()

Вы можете обрабатывать как события "intermediate", так и "output" в реальном времени в потоковом режиме:

from agent_framework import AgentResponseUpdate

# Track the last author to format streaming output.
last_author: str | None = None

async for event in workflow.run("Write a tagline for a budget-friendly eBike.", stream=True):
    if event.type in ("output", "intermediate") and isinstance(event.data, AgentResponseUpdate):
        update = event.data
        author = update.author_name
        if author != last_author:
            if last_author is not None:
                print()  # Newline between different authors
            label = "FINAL" if event.type == "output" else "intermediate"
            print(f"[{label}] {author}: {update.text}", end="", flush=True)
            last_author = author
        else:
            print(update.text, end="", flush=True)

Последовательная оркестрация с человеком в процессе

Последовательные оркестрации поддерживают взаимодействие с участием человека на каждом этапе двумя способами: утверждение для управления вызовами инструментов с повышенной чувствительностью и запрос сведений на паузу после каждого ответа агента для получения отзывов.

Последовательная оркестрация с человеком в контуре

Подсказка

Дополнительные сведения о модели запроса и ответа см. в разделе "Человек в цикле".

Утверждение инструмента в последовательных рабочих процессах

Используйте @tool(approval_mode="always_require") для пометки инструментов, которым требуется утверждение человеком перед выполнением. Рабочий процесс приостанавливается и выдает событие, когда агент пытается вызвать инструмент.

@tool(approval_mode="always_require")
def execute_database_query(query: str) -> str:
    return f"Query executed successfully: {query}"


database_agent = Agent(
    client=chat_client,
    name="DatabaseAgent",
    instructions="You are a database assistant.",
    tools=[execute_database_query],
)

workflow = SequentialBuilder(participants=[database_agent]).build()

Обработайте поток событий и обработайте запросы на утверждение:

async def process_event_stream(stream):
    responses = {}
    async for event in stream:
        if event.type == "request_info" and event.data.type == "function_approval_request":
            responses[event.request_id] = event.data.to_function_approval_response(approved=True)
    return responses if responses else None

stream = workflow.run("Check the schema and update all pending orders", stream=True)

pending_responses = await process_event_stream(stream)
while pending_responses is not None:
    stream = workflow.run(stream=True, responses=pending_responses)
    pending_responses = await process_event_stream(stream)

Подсказка

Полный пример исполняемого кода см. в разделе sequential_builder_tool_approval.py. Утверждение инструмента работает с SequentialBuilder без какой-либо дополнительной конфигурации билдера.

Запрос сведений об отзыве агента

Используйте .with_request_info() для приостановки после ответа определенных агентов, позволяя внешнему вводу (например, человеческой проверке) до начала следующего агента.

drafter = Agent(
    client=chat_client,
    name="drafter",
    instructions="You are a document drafter. Create a brief draft on the given topic.",
)

editor = Agent(
    client=chat_client,
    name="editor",
    instructions="You are an editor. Review and improve the draft. Incorporate any human feedback.",
)

finalizer = Agent(
    client=chat_client,
    name="finalizer",
    instructions="You are a finalizer. Create a polished final version.",
)

# Enable request info for the editor agent only
workflow = (
    SequentialBuilder(participants=[drafter, editor, finalizer])
    .with_request_info(agents=["editor"])
    .build()
)

async def process_event_stream(stream):
    responses = {}
    async for event in stream:
        if event.type == "request_info":
            responses[event.request_id] = AgentRequestInfoResponse.approve()
    return responses if responses else None

stream = workflow.run("Write a brief introduction to artificial intelligence.", stream=True)

pending_responses = await process_event_stream(stream)
while pending_responses is not None:
    stream = workflow.run(stream=True, responses=pending_responses)
    pending_responses = await process_event_stream(stream)

Основные понятия

  • Общий контекст. По умолчанию каждый агент использует полный диалог предыдущего агента, включая входные и ответные сообщения.
  • Элемент управления контекстом: используется chain_only_agent_responses=True для настройки агентов для использования только сообщений ответа предыдущего агента
  • Выходные данные AgentResponse: выходные данные терминала рабочего процесса — это AgentResponse ответ последнего агента (не полный диалог).
  • Порядок исполнения имеет значение: агенты выполняются строго в порядке, указанном в списке participants
  • Гибкие участники: вы можете смешивать агенты и пользовательских исполнителей в любом порядке
  • Пользовательский контракт терминатора: настраиваемый исполнитель, используемый в качестве последнего участника, должен вызывать ctx.yield_output(AgentResponse(...)) для создания выходных данных на терминал.
  • Промежуточные результаты: установите intermediate_outputs=True, чтобы делать выходные данные каждого участника видимыми как событие рабочего процесса output, а не только последнего участника
  • Утверждение инструмента: использование @tool(approval_mode="always_require") для конфиденциальных операций, требующих проверки человека
  • Запрос информации: используйте .with_request_info(agents=[...]) для приостановки после конкретных агентов для внешней обратной связи

Дальнейшие шаги