Бөлісу құралы:


Оркестрации рабочих процессов Microsoft Agent Framework — последовательные

В последовательной оркестрации агенты организованы в цепочке. Каждый агент обрабатывает задачу в свою очередь, передавая выходные данные следующему агенту в последовательности. Это идеально подходит для рабочих процессов, в которых каждый шаг строится на основе предыдущего, например проверки документов, потоков обработки данных или многоэтапного рассуждения.

Последовательная оркестрация

Это важно

Полная история переписки от предыдущих агентов передается следующему агенту по цепочке. Каждый агент может видеть все предыдущие сообщения, что позволяет производить обработку с учетом контекста.

Цели обучения

  • Создание последовательного конвейера агентов
  • Как связать агентов, где каждый из них строится на основе предыдущих результатов
  • Добавление одобрения с участием человека для вызовов конфиденциальных инструментов
  • Как комбинировать агентов с пользовательскими исполнителями для специализированных задач
  • Отслеживание потока беседы через конвейер

Определение агентов

В последовательной оркестрации агенты организованы в конвейере, где каждый агент обрабатывает задачу в свою очередь, передав выходные данные следующему агенту в последовательности.

Настройка клиента Azure OpenAI

using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Azure.AI.Projects;
using Azure.Identity;
using Microsoft.Agents.AI.Workflows;
using Microsoft.Extensions.AI;
using Microsoft.Agents.AI;

// 1) Set up the Azure OpenAI client
var endpoint = Environment.GetEnvironmentVariable("AZURE_OPENAI_ENDPOINT") ??
    throw new InvalidOperationException("AZURE_OPENAI_ENDPOINT is not set.");
var deploymentName = Environment.GetEnvironmentVariable("AZURE_OPENAI_DEPLOYMENT_NAME") ?? "gpt-4o-mini";
var client = new AIProjectClient(new Uri(endpoint), new DefaultAzureCredential())
    .GetProjectOpenAIClient()
    .GetProjectResponsesClient()
    .AsIChatClient(deploymentName);

Предупреждение

DefaultAzureCredential удобно для разработки, но требует тщательного рассмотрения в рабочей среде. В рабочей среде рекомендуется использовать определенные учетные данные (например, ManagedIdentityCredential), чтобы избежать проблем с задержкой, непреднамеренной проверки данных аутентификации и потенциальных рисков безопасности из-за резервных механизмов.

Создайте специализированные агенты, которые будут работать в последовательности:

// 2) Helper method to create translation agents
static ChatClientAgent GetTranslationAgent(string targetLanguage, IChatClient chatClient) =>
    new(chatClient,
        $"You are a translation assistant who only responds in {targetLanguage}. Respond to any " +
        $"input by outputting the name of the input language and then translating the input to {targetLanguage}.");

// Create translation agents for sequential processing
var translationAgents = (from lang in (string[])["French", "Spanish", "English"]
                         select GetTranslationAgent(lang, client));

Настройка последовательной оркестрации

Создание рабочего процесса с помощью AgentWorkflowBuilder:

// 3) Build sequential workflow
var workflow = AgentWorkflowBuilder.BuildSequential(translationAgents);

Запуск последовательного рабочего процесса

Выполните рабочий процесс и обработайте события:

// 4) Run the workflow
var messages = new List<ChatMessage> { new(ChatRole.User, "Hello, world!") };

await using StreamingRun run = await InProcessExecution.RunStreamingAsync(workflow, messages);
await run.TrySendMessageAsync(new TurnToken(emitEvents: true));

string? lastExecutorId = null;
List<ChatMessage> result = [];
await foreach (WorkflowEvent evt in run.WatchStreamAsync())
{
    if (evt is AgentResponseUpdateEvent e)
    {
        if (e.ExecutorId != lastExecutorId)
        {
            lastExecutorId = e.ExecutorId;
            Console.WriteLine();
            Console.Write($"{e.ExecutorId}: ");
        }

        Console.Write(e.Update.Text);
    }
    else if (evt is WorkflowOutputEvent outputEvt)
    {
        result = outputEvt.As<List<ChatMessage>>()!;
        break;
    }
}

// Display final result
Console.WriteLine();
foreach (var message in result)
{
    Console.WriteLine($"{message.Role}: {message.Text}");
}

Пример выходных данных

French_Translation: User: Hello, world!
French_Translation: Assistant: English detected. Bonjour, le monde !
Spanish_Translation: Assistant: French detected. ¡Hola, mundo!
English_Translation: Assistant: Spanish detected. Hello, world!

Последовательная оркестрация с человеком в процессе

Последовательные оркестрации поддерживают взаимодействие с человеком в цикле с помощью утверждения средства. Когда агенты используют инструменты, завернутые в ApprovalRequiredAIFunction оболочку, рабочий процесс приостанавливается и излучает объект RequestInfoEvent, содержащий ToolApprovalRequestContent. Внешние системы (например, человеческий оператор) могут проверять вызов инструмента, одобрить или отклонить его, и рабочий процесс возобновляется в соответствии с решением.

Последовательная оркестрация с человеком в контуре

Подсказка

Дополнительные сведения о модели запроса и ответа см. в разделе "Человек в цикле".

Определение агентов с помощью инструментов, требующих одобрения

Создайте агентов, где конфиденциальные инструменты обернуты в ApprovalRequiredAIFunction:

ChatClientAgent deployAgent = new(
    client,
    "You are a DevOps engineer. Check staging status first, then deploy to production.",
    "DeployAgent",
    "Handles deployments",
    [
        AIFunctionFactory.Create(CheckStagingStatus),
        new ApprovalRequiredAIFunction(AIFunctionFactory.Create(DeployToProduction))
    ]);

ChatClientAgent verifyAgent = new(
    client,
    "You are a QA engineer. Verify that the deployment was successful and summarize the results.",
    "VerifyAgent",
    "Verifies deployments");

Сборка и запуск с обработкой утверждений/одобрений

Обычно создайте последовательный рабочий процесс. Процесс согласования осуществляется через поток событий.

var workflow = AgentWorkflowBuilder.BuildSequential([deployAgent, verifyAgent]);

await foreach (WorkflowEvent evt in run.WatchStreamAsync())
{
    if (evt is RequestInfoEvent e &&
        e.Request.TryGetDataAs(out ToolApprovalRequestContent? approvalRequest))
    {
        await run.SendResponseAsync(
            e.Request.CreateResponse(approvalRequest.CreateResponse(approved: true)));
    }
}

Замечание

AgentWorkflowBuilder.BuildSequential() поддерживает утверждение инструмента из коробки — дополнительная конфигурация не требуется. Когда агент вызывает инструмент, упакованный в оболочку ApprovalRequiredAIFunction, рабочий процесс автоматически приостанавливается и эмитирует RequestInfoEvent.

Подсказка

Полный исполняемый пример этого процесса утверждения см. в GroupChatToolApproval примере. Тот же RequestInfoEvent шаблон обработки применяется к другим оркестрациям.

Основные понятия

  • Последовательная обработка: каждый агент обрабатывает выходные данные предыдущего агента в порядке
  • AgentWorkflowBuilder.BuildSequential(): создает рабочий процесс конвейера из коллекции агентов
  • ChatClientAgent: представляет собой агента, привязанного к клиенту чата и работающего по конкретным инструкциям
  • InProcessExecution.RunStreamingAsync(): выполняет рабочий процесс и возвращает потоковую передачу StreamingRun событий в режиме реального времени.
  • Обработка событий: мониторинг хода выполнения агента с помощью AgentResponseUpdateEvent и завершения с помощью WorkflowOutputEvent
  • Утверждение инструмента: оберните конфиденциальные инструменты с ApprovalRequiredAIFunction, чтобы требовать утверждение человека перед выполнением
  • RequestInfoEvent: создается, когда инструменту требуется утверждение; содержит ToolApprovalRequestContent сведения о вызове средства

В последовательной оркестрации каждый агент обрабатывает задачу в свою очередь, при этом выходные данные будут поступать от одного до следующего. Начните с определения агентов для двухэтапного процесса:

import os
from agent_framework.foundry import FoundryChatClient
from azure.identity import AzureCliCredential

# 1) Create agents using FoundryChatClient
chat_client = FoundryChatClient(
    project_endpoint=os.environ["FOUNDRY_PROJECT_ENDPOINT"],
    model=os.environ["FOUNDRY_MODEL"],
    credential=AzureCliCredential(),
)

writer = chat_client.as_agent(
    instructions=(
        "You are a concise copywriter. Provide a single, punchy marketing sentence based on the prompt."
    ),
    name="writer",
)

reviewer = chat_client.as_agent(
    instructions=(
        "You are a thoughtful reviewer. Give brief feedback on the previous assistant message."
    ),
    name="reviewer",
)

Настройка последовательной оркестрации

Класс SequentialBuilder создает конвейер, в котором агенты обрабатывают задачи по порядку. Каждый агент видит полный журнал бесед и добавляет свой ответ:

from agent_framework.orchestrations import SequentialBuilder

# 2) Build sequential workflow: writer -> reviewer
workflow = SequentialBuilder(participants=[writer, reviewer]).build()

Запуск последовательного рабочего процесса

Выполните рабочий процесс и соберите окончательный разговор, показывающий вклад каждого агента:

from typing import Any, cast
from agent_framework import Message, WorkflowEvent

# 3) Run and print final conversation
outputs: list[list[Message]] = []
async for event in workflow.run("Write a tagline for a budget-friendly eBike.", stream=True):
    if event.type == "output":
        outputs.append(cast(list[Message], event.data))

if outputs:
    print("===== Final Conversation =====")
    messages: list[Message] = outputs[-1]
    for i, msg in enumerate(messages, start=1):
        name = msg.author_name or ("assistant" if msg.role == "assistant" else "user")
        print(f"{'-' * 60}\n{i:02d} [{name}]\n{msg.text}")

Пример выходных данных

===== Final Conversation =====
------------------------------------------------------------
01 [user]
Write a tagline for a budget-friendly eBike.
------------------------------------------------------------
02 [writer]
Ride farther, spend less—your affordable eBike adventure starts here.
------------------------------------------------------------
03 [reviewer]
This tagline clearly communicates affordability and the benefit of extended travel, making it
appealing to budget-conscious consumers. It has a friendly and motivating tone, though it could
be slightly shorter for more punch. Overall, a strong and effective suggestion!

Продвинутый: Сочетание агентов с пользовательскими исполнителями

Последовательная оркестрация позволяет использовать агентов в сочетании с пользовательскими исполнителями для специализированной обработки. Это полезно, если требуется пользовательская логика, которая не требует LLM:

Определение пользовательского исполнителя

Замечание

Когда пользовательский исполнитель следует за агентом в последовательности, его обработчик получает AgentExecutorResponse (так как агенты внутренне обёрнуты AgentExecutor). Используйте agent_response.full_conversation для доступа к полному журналу бесед.

from agent_framework import AgentExecutorResponse, Executor, WorkflowContext, handler
from agent_framework import Message

class Summarizer(Executor):
    """Simple summarizer: consumes full conversation and appends an assistant summary."""

    @handler
    async def summarize(
        self,
        agent_response: AgentExecutorResponse,
        ctx: WorkflowContext[list[Message]]
    ) -> None:
        if not agent_response.full_conversation:
            await ctx.send_message([Message("assistant", ["No conversation to summarize."])])
            return

        users = sum(1 for m in agent_response.full_conversation if m.role == "user")
        assistants = sum(1 for m in agent_response.full_conversation if m.role == "assistant")
        summary = Message("assistant", [f"Summary -> users:{users} assistants:{assistants}"])
        await ctx.send_message(list(agent_response.full_conversation) + [summary])

Создание смешанного последовательного рабочего процесса

# Create a content agent
content = chat_client.as_agent(
    instructions="Produce a concise paragraph answering the user's request.",
    name="content",
)

# Build sequential workflow: content -> summarizer
summarizer = Summarizer(id="summarizer")
workflow = SequentialBuilder(participants=[content, summarizer]).build()

Пример выходных данных с пользовательским экзекутором

------------------------------------------------------------
01 [user]
Explain the benefits of budget eBikes for commuters.
------------------------------------------------------------
02 [content]
Budget eBikes offer commuters an affordable, eco-friendly alternative to cars and public transport.
Their electric assistance reduces physical strain and allows riders to cover longer distances quickly,
minimizing travel time and fatigue. Budget models are low-cost to maintain and operate, making them accessible
for a wider range of people. Additionally, eBikes help reduce traffic congestion and carbon emissions,
supporting greener urban environments. Overall, budget eBikes provide cost-effective, efficient, and
sustainable transportation for daily commuting needs.
------------------------------------------------------------
03 [assistant]
Summary -> users:1 assistants:1

Последовательная оркестрация с участием человека

Последовательные оркестрации поддерживают взаимодействие с участием человека на каждом этапе двумя способами: утверждение для управления вызовами инструментов с повышенной чувствительностью и запрос сведений на паузу после каждого ответа агента для получения отзывов.

Последовательная оркестрация с человеческим участием

Подсказка

Дополнительные сведения о модели запроса и ответа см. в разделе "Человек в цикле".

Утверждение инструмента в последовательных рабочих процессах

Используйте @tool(approval_mode="always_require") для пометки инструментов, которым требуется утверждение человеком перед выполнением. Рабочий процесс приостанавливается и выдает событие, когда агент пытается вызвать инструмент.

@tool(approval_mode="always_require")
def execute_database_query(query: str) -> str:
    return f"Query executed successfully: {query}"


database_agent = Agent(
    client=chat_client,
    name="DatabaseAgent",
    instructions="You are a database assistant.",
    tools=[execute_database_query],
)

workflow = SequentialBuilder(participants=[database_agent]).build()

Обработайте поток событий и обработайте запросы на утверждение:

async def process_event_stream(stream):
    responses = {}
    async for event in stream:
        if event.type == "request_info" and event.data.type == "function_approval_request":
            responses[event.request_id] = event.data.to_function_approval_response(approved=True)
    return responses if responses else None

stream = workflow.run("Check the schema and update all pending orders", stream=True)

pending_responses = await process_event_stream(stream)
while pending_responses is not None:
    stream = workflow.run(stream=True, responses=pending_responses)
    pending_responses = await process_event_stream(stream)

Подсказка

Полный пример исполняемого кода см. в разделе sequential_builder_tool_approval.py. Утверждение инструмента работает с SequentialBuilder без какой-либо дополнительной конфигурации билдера.

Запрос сведений об отзыве агента

Используйте .with_request_info() для приостановки после ответа определенных агентов, позволяя внешнему вводу (например, человеческой проверке) до начала следующего агента.

drafter = Agent(
    client=chat_client,
    name="drafter",
    instructions="You are a document drafter. Create a brief draft on the given topic.",
)

editor = Agent(
    client=chat_client,
    name="editor",
    instructions="You are an editor. Review and improve the draft. Incorporate any human feedback.",
)

finalizer = Agent(
    client=chat_client,
    name="finalizer",
    instructions="You are a finalizer. Create a polished final version.",
)

# Enable request info for the editor agent only
workflow = (
    SequentialBuilder(participants=[drafter, editor, finalizer])
    .with_request_info(agents=["editor"])
    .build()
)

async def process_event_stream(stream):
    responses = {}
    async for event in stream:
        if event.type == "request_info":
            responses[event.request_id] = AgentRequestInfoResponse.approve()
    return responses if responses else None

stream = workflow.run("Write a brief introduction to artificial intelligence.", stream=True)

pending_responses = await process_event_stream(stream)
while pending_responses is not None:
    stream = workflow.run(stream=True, responses=pending_responses)
    pending_responses = await process_event_stream(stream)

Основные понятия

  • Общий контекст: каждый участник получает полный журнал бесед, включая все предыдущие сообщения.
  • Порядок исполнения имеет значение: агенты выполняются строго в порядке, указанном в списке participants
  • Гибкие участники: вы можете смешивать агенты и пользовательских исполнителей в любом порядке
  • Поток беседы: каждый агент или исполнитель дополняет беседу, создавая полный диалог
  • Утверждение инструмента: использование @tool(approval_mode="always_require") для конфиденциальных операций, требующих проверки человека
  • Запрос информации: используйте .with_request_info(agents=[...]) для приостановки после конкретных агентов для внешней обратной связи

Дальнейшие шаги