Примечание.
Для доступа к этой странице требуется авторизация. Вы можете попробовать войти или изменить каталоги.
Для доступа к этой странице требуется авторизация. Вы можете попробовать изменить каталоги.
В последовательной оркестрации агенты организованы в цепочке. Каждый агент обрабатывает задачу в свою очередь, передавая выходные данные следующему агенту в последовательности. Это идеально подходит для рабочих процессов, в которых каждый шаг строится на основе предыдущего, например проверки документов, потоков обработки данных или многоэтапного рассуждения.
Это важно
По умолчанию каждый агент в последовательности использует полный диалог предыдущего агента — как входные сообщения, предоставленные предыдущему агенту, так и его ответные сообщения. Агентов можно настроить так, чтобы они отвечали только на сообщения предыдущего агента. Дополнительные сведения см. в разделе "Управление контекстом между агентами ".
Цели обучения
- Создание последовательного конвейера агентов
- Как связать агентов, где каждый из них строится на основе предыдущих результатов
- Добавление одобрения с участием человека для вызовов конфиденциальных инструментов
- Как комбинировать агентов с пользовательскими исполнителями для специализированных задач
- Отслеживание потока беседы через конвейер
Определение агентов
В последовательной оркестрации агенты организованы в конвейере, где каждый агент обрабатывает задачу в свою очередь, передав выходные данные следующему агенту в последовательности.
Настройка клиента Azure OpenAI
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Azure.AI.Projects;
using Azure.Identity;
using Microsoft.Agents.AI.Workflows;
using Microsoft.Extensions.AI;
using Microsoft.Agents.AI;
// 1) Set up the Azure OpenAI client
var endpoint = Environment.GetEnvironmentVariable("AZURE_OPENAI_ENDPOINT") ??
throw new InvalidOperationException("AZURE_OPENAI_ENDPOINT is not set.");
var deploymentName = Environment.GetEnvironmentVariable("AZURE_OPENAI_DEPLOYMENT_NAME") ?? "gpt-4o-mini";
var client = new AIProjectClient(new Uri(endpoint), new DefaultAzureCredential())
.GetProjectOpenAIClient()
.GetProjectResponsesClient()
.AsIChatClient(deploymentName);
Предупреждение
DefaultAzureCredential удобно для разработки, но требует тщательного рассмотрения в рабочей среде. В рабочей среде рекомендуется использовать определенные учетные данные (например, ManagedIdentityCredential), чтобы избежать проблем с задержкой, непреднамеренной проверки данных аутентификации и потенциальных рисков безопасности из-за резервных механизмов.
Создайте специализированные агенты, которые будут работать в последовательности:
// 2) Helper method to create translation agents
static ChatClientAgent GetTranslationAgent(string targetLanguage, IChatClient chatClient) =>
new(chatClient,
$"You are a translation assistant who only responds in {targetLanguage}. Respond to any " +
$"input by outputting the name of the input language and then translating the input to {targetLanguage}.");
// Create translation agents for sequential processing
var translationAgents = (from lang in (string[])["French", "Spanish", "English"]
select GetTranslationAgent(lang, client));
Настройка последовательной оркестрации
Создание рабочего процесса с помощью AgentWorkflowBuilder:
// 3) Build sequential workflow
var workflow = AgentWorkflowBuilder.BuildSequential(translationAgents);
Запуск последовательного рабочего процесса
Выполните рабочий процесс и обработайте события:
// 4) Run the workflow
var messages = new List<ChatMessage> { new(ChatRole.User, "Hello, world!") };
await using StreamingRun run = await InProcessExecution.RunStreamingAsync(workflow, messages);
await run.TrySendMessageAsync(new TurnToken(emitEvents: true));
string? lastExecutorId = null;
List<ChatMessage> result = [];
await foreach (WorkflowEvent evt in run.WatchStreamAsync())
{
if (evt is AgentResponseUpdateEvent e)
{
if (e.ExecutorId != lastExecutorId)
{
lastExecutorId = e.ExecutorId;
Console.WriteLine();
Console.Write($"{e.ExecutorId}: ");
}
Console.Write(e.Update.Text);
}
else if (evt is WorkflowOutputEvent outputEvt)
{
result = outputEvt.As<List<ChatMessage>>()!;
break;
}
}
// Display final result
Console.WriteLine();
foreach (var message in result)
{
Console.WriteLine($"{message.Role}: {message.Text}");
}
Пример выходных данных
French_Translation: User: Hello, world!
French_Translation: Assistant: English detected. Bonjour, le monde !
Spanish_Translation: Assistant: French detected. ¡Hola, mundo!
English_Translation: Assistant: Spanish detected. Hello, world!
Последовательная оркестрация с человеком в процессе
Последовательные оркестрации поддерживают взаимодействие с человеком в цикле с помощью утверждения средства. Когда агенты используют инструменты, завернутые в ApprovalRequiredAIFunction оболочку, рабочий процесс приостанавливается и излучает объект RequestInfoEvent, содержащий ToolApprovalRequestContent. Внешние системы (например, человеческий оператор) могут проверять вызов инструмента, одобрить или отклонить его, и рабочий процесс возобновляется в соответствии с решением.
Подсказка
Дополнительные сведения о модели запроса и ответа см. в разделе "Человек в цикле".
Определение агентов с помощью инструментов, требующих одобрения
Создайте агентов, где конфиденциальные инструменты обернуты в ApprovalRequiredAIFunction:
ChatClientAgent deployAgent = new(
client,
"You are a DevOps engineer. Check staging status first, then deploy to production.",
"DeployAgent",
"Handles deployments",
[
AIFunctionFactory.Create(CheckStagingStatus),
new ApprovalRequiredAIFunction(AIFunctionFactory.Create(DeployToProduction))
]);
ChatClientAgent verifyAgent = new(
client,
"You are a QA engineer. Verify that the deployment was successful and summarize the results.",
"VerifyAgent",
"Verifies deployments");
Сборка и запуск с обработкой утверждений/одобрений
Обычно создайте последовательный рабочий процесс. Процесс согласования осуществляется через поток событий.
var workflow = AgentWorkflowBuilder.BuildSequential([deployAgent, verifyAgent]);
await foreach (WorkflowEvent evt in run.WatchStreamAsync())
{
if (evt is RequestInfoEvent e &&
e.Request.TryGetDataAs(out ToolApprovalRequestContent? approvalRequest))
{
await run.SendResponseAsync(
e.Request.CreateResponse(approvalRequest.CreateResponse(approved: true)));
}
}
Замечание
AgentWorkflowBuilder.BuildSequential() поддерживает утверждение инструмента из коробки — дополнительная конфигурация не требуется. Когда агент вызывает инструмент, упакованный в оболочку ApprovalRequiredAIFunction, рабочий процесс автоматически приостанавливается и эмитирует RequestInfoEvent.
Подсказка
Полный исполняемый пример этого процесса утверждения см. в GroupChatToolApproval примере. Тот же RequestInfoEvent шаблон обработки применяется к другим оркестрациям.
Основные понятия
- Последовательная обработка: каждый агент обрабатывает выходные данные предыдущего агента в порядке
- AgentWorkflowBuilder.BuildSequential(): создает рабочий процесс конвейера из коллекции агентов
- ChatClientAgent: представляет собой агента, привязанного к клиенту чата и работающего по конкретным инструкциям
-
InProcessExecution.RunStreamingAsync(): выполняет рабочий процесс и возвращает потоковую передачу
StreamingRunсобытий в режиме реального времени. -
Обработка событий: мониторинг хода выполнения агента с помощью
AgentResponseUpdateEventи завершения с помощьюWorkflowOutputEvent -
Утверждение инструмента: оберните конфиденциальные инструменты с
ApprovalRequiredAIFunction, чтобы требовать утверждение человека перед выполнением -
RequestInfoEvent: создается, когда инструменту требуется утверждение; содержит
ToolApprovalRequestContentсведения о вызове средства
В последовательной оркестрации каждый агент обрабатывает задачу в свою очередь, при этом выходные данные будут поступать от одного до следующего. Начните с определения агентов для двухэтапного процесса:
import os
from agent_framework.foundry import FoundryChatClient
from azure.identity import AzureCliCredential
# 1) Create agents using FoundryChatClient
chat_client = FoundryChatClient(
project_endpoint=os.environ["FOUNDRY_PROJECT_ENDPOINT"],
model=os.environ["FOUNDRY_MODEL"],
credential=AzureCliCredential(),
)
writer = chat_client.as_agent(
instructions=(
"You are a concise copywriter. Provide a single, punchy marketing sentence based on the prompt."
),
name="writer",
)
reviewer = chat_client.as_agent(
instructions=(
"You are a thoughtful reviewer. Give brief feedback on the previous assistant message."
),
name="reviewer",
)
Настройка последовательной оркестрации
Класс SequentialBuilder создает конвейер, в котором агенты обрабатывают задачи по порядку. Каждый агент видит полный журнал бесед и добавляет свой ответ:
from agent_framework.orchestrations import SequentialBuilder
# 2) Build sequential workflow: writer -> reviewer
workflow = SequentialBuilder(participants=[writer, reviewer]).build()
Запуск последовательного рабочего процесса
Выполните рабочий процесс и соберите окончательные выходные данные. Выходные данные терминала — это AgentResponse содержащие ответные сообщения последнего агента:
from agent_framework import AgentResponse
# 3) Run and print the last agent's response
events = await workflow.run("Write a tagline for a budget-friendly eBike.")
outputs = events.get_outputs()
if outputs:
print("===== Final Response =====")
final: AgentResponse = outputs[0]
for msg in final.messages:
name = msg.author_name or "assistant"
print(f"[{name}]\n{msg.text}")
Пример выходных данных
===== Final Response =====
[reviewer]
This tagline clearly communicates affordability and the benefit of extended travel, making it
appealing to budget-conscious consumers. It has a friendly and motivating tone, though it could
be slightly shorter for more punch. Overall, a strong and effective suggestion!
Продвинутый: Сочетание агентов с пользовательскими исполнителями
Последовательная оркестрация позволяет использовать агентов в сочетании с пользовательскими исполнителями для специализированной обработки. Это полезно, если требуется пользовательская логика, которая не требует LLM:
Определение пользовательского исполнителя
Замечание
Когда пользовательский исполнитель следует за агентом в последовательности, его обработчик получает AgentExecutorResponse (так как агенты внутренне обёрнуты AgentExecutor). Используйте agent_response.full_conversation для доступа к полному журналу бесед. Пользовательский исполнитель, используемый в качестве последнего участника (терминатора), должен вызывать ctx.yield_output(AgentResponse(...)) , чтобы его выходные данные становятся выходными данными терминала рабочего процесса.
from agent_framework import AgentExecutorResponse, AgentResponse, Executor, WorkflowContext, handler
from agent_framework import Message
from typing_extensions import Never
class Summarizer(Executor):
"""Terminator custom executor: consumes full conversation and yields a summary as the workflow's final answer."""
@handler
async def summarize(
self,
agent_response: AgentExecutorResponse,
ctx: WorkflowContext[Never, AgentResponse]
) -> None:
if not agent_response.full_conversation:
await ctx.yield_output(AgentResponse(messages=[Message("assistant", ["No conversation to summarize."])]))
return
users = sum(1 for m in agent_response.full_conversation if m.role == "user")
assistants = sum(1 for m in agent_response.full_conversation if m.role == "assistant")
summary = Message("assistant", [f"Summary -> users:{users} assistants:{assistants}"])
await ctx.yield_output(AgentResponse(messages=[summary]))
Создание смешанного последовательного рабочего процесса
# Create a content agent
content = chat_client.as_agent(
instructions="Produce a concise paragraph answering the user's request.",
name="content",
)
# Build sequential workflow: content -> summarizer
summarizer = Summarizer(id="summarizer")
workflow = SequentialBuilder(participants=[content, summarizer]).build()
Пример выходных данных с пользовательским экзекутором
===== Final Summary =====
Summary -> users:1 assistants:1
Управление контекстом между агентами
По умолчанию каждый агент в рабочем процессе использует полный SequentialBuilder диалог предыдущего агента (входные и ответные сообщения). Параметр chain_only_agent_responses=True настраивает всех агентов в цепочке на обработку только ответных сообщений от предыдущего агента.
workflow = SequentialBuilder(
participants=[writer, translator, reviewer],
chain_only_agent_responses=True,
).build()
Это полезно для конвейеров перевода, прогрессивного уточнения и других сценариев, в которых каждый агент должен сосредоточиться исключительно на преобразовании выходных данных предыдущего агента, не влияя на предыдущие повороты диалога.
Полный пример можно найти в репозитории Agent Framework в sequential_chain_only_agent_responses.py.
Подсказка
Более точное управление потоком контекста, включая пользовательские функции фильтра, см. в разделе "Режимы контекста " в справочнике исполнителя агента.
Промежуточные выходные данные
По умолчанию SequentialBuilder определяет последнего участника в качестве источника выходных данных терминала (final_output_from). Только результат этого участника появляется как событие "output".
Чтобы также отображались результаты более ранних участников, передайте intermediate_output_from вместе с участниками, которых нужно обозначить как промежуточные источники. Это неявно исключает этих участников из финального набора по умолчанию — они генерируют события "intermediate", а не события "output":
workflow = SequentialBuilder(
participants=[writer, reviewer, editor],
intermediate_output_from=[writer, reviewer],
).build()
Вы можете обрабатывать как события "intermediate", так и "output" в реальном времени в потоковом режиме:
from agent_framework import AgentResponseUpdate
# Track the last author to format streaming output.
last_author: str | None = None
async for event in workflow.run("Write a tagline for a budget-friendly eBike.", stream=True):
if event.type in ("output", "intermediate") and isinstance(event.data, AgentResponseUpdate):
update = event.data
author = update.author_name
if author != last_author:
if last_author is not None:
print() # Newline between different authors
label = "FINAL" if event.type == "output" else "intermediate"
print(f"[{label}] {author}: {update.text}", end="", flush=True)
last_author = author
else:
print(update.text, end="", flush=True)
Последовательная оркестрация с человеком в процессе
Последовательные оркестрации поддерживают взаимодействие с участием человека на каждом этапе двумя способами: утверждение для управления вызовами инструментов с повышенной чувствительностью и запрос сведений на паузу после каждого ответа агента для получения отзывов.
Подсказка
Дополнительные сведения о модели запроса и ответа см. в разделе "Человек в цикле".
Утверждение инструмента в последовательных рабочих процессах
Используйте @tool(approval_mode="always_require") для пометки инструментов, которым требуется утверждение человеком перед выполнением. Рабочий процесс приостанавливается и выдает событие, когда агент пытается вызвать инструмент.
@tool(approval_mode="always_require")
def execute_database_query(query: str) -> str:
return f"Query executed successfully: {query}"
database_agent = Agent(
client=chat_client,
name="DatabaseAgent",
instructions="You are a database assistant.",
tools=[execute_database_query],
)
workflow = SequentialBuilder(participants=[database_agent]).build()
Обработайте поток событий и обработайте запросы на утверждение:
async def process_event_stream(stream):
responses = {}
async for event in stream:
if event.type == "request_info" and event.data.type == "function_approval_request":
responses[event.request_id] = event.data.to_function_approval_response(approved=True)
return responses if responses else None
stream = workflow.run("Check the schema and update all pending orders", stream=True)
pending_responses = await process_event_stream(stream)
while pending_responses is not None:
stream = workflow.run(stream=True, responses=pending_responses)
pending_responses = await process_event_stream(stream)
Подсказка
Полный пример исполняемого кода см. в разделе sequential_builder_tool_approval.py. Утверждение инструмента работает с SequentialBuilder без какой-либо дополнительной конфигурации билдера.
Запрос сведений об отзыве агента
Используйте .with_request_info() для приостановки после ответа определенных агентов, позволяя внешнему вводу (например, человеческой проверке) до начала следующего агента.
drafter = Agent(
client=chat_client,
name="drafter",
instructions="You are a document drafter. Create a brief draft on the given topic.",
)
editor = Agent(
client=chat_client,
name="editor",
instructions="You are an editor. Review and improve the draft. Incorporate any human feedback.",
)
finalizer = Agent(
client=chat_client,
name="finalizer",
instructions="You are a finalizer. Create a polished final version.",
)
# Enable request info for the editor agent only
workflow = (
SequentialBuilder(participants=[drafter, editor, finalizer])
.with_request_info(agents=["editor"])
.build()
)
async def process_event_stream(stream):
responses = {}
async for event in stream:
if event.type == "request_info":
responses[event.request_id] = AgentRequestInfoResponse.approve()
return responses if responses else None
stream = workflow.run("Write a brief introduction to artificial intelligence.", stream=True)
pending_responses = await process_event_stream(stream)
while pending_responses is not None:
stream = workflow.run(stream=True, responses=pending_responses)
pending_responses = await process_event_stream(stream)
Подсказка
См. полные примеры: последовательное утверждение инструмента и последовательная информация о запросе.
Основные понятия
- Общий контекст. По умолчанию каждый агент использует полный диалог предыдущего агента, включая входные и ответные сообщения.
-
Элемент управления контекстом: используется
chain_only_agent_responses=Trueдля настройки агентов для использования только сообщений ответа предыдущего агента -
Выходные данные AgentResponse: выходные данные терминала рабочего процесса — это
AgentResponseответ последнего агента (не полный диалог). -
Порядок исполнения имеет значение: агенты выполняются строго в порядке, указанном в списке
participants - Гибкие участники: вы можете смешивать агенты и пользовательских исполнителей в любом порядке
-
Пользовательский контракт терминатора: настраиваемый исполнитель, используемый в качестве последнего участника, должен вызывать
ctx.yield_output(AgentResponse(...))для создания выходных данных на терминал. -
Промежуточные результаты: установите
intermediate_outputs=True, чтобы делать выходные данные каждого участника видимыми как событие рабочего процессаoutput, а не только последнего участника -
Утверждение инструмента: использование
@tool(approval_mode="always_require")для конфиденциальных операций, требующих проверки человека -
Запрос информации: используйте
.with_request_info(agents=[...])для приостановки после конкретных агентов для внешней обратной связи