Observação
O acesso a essa página exige autorização. Você pode tentar entrar ou alterar diretórios.
O acesso a essa página exige autorização. Você pode tentar alterar os diretórios.
Na orquestração sequencial, os agentes são dispostos em formato de pipeline. Cada agente processa a tarefa por sua vez, passando sua saída para o próximo agente na sequência. Isso é ideal para fluxos de trabalho em que cada etapa se baseia na anterior, como revisão de documentos, pipelines de processamento de dados ou raciocínio de vários estágios.
Importante
Por padrão, cada agente na sequência consome a conversa completa do agente anterior , tanto as mensagens de entrada fornecidas ao agente anterior quanto suas mensagens de resposta. Você pode configurar agentes para consumir somente as mensagens de resposta do agente anterior. Consulte Controlando o contexto entre agentes para obter detalhes.
O que você vai aprender
- Como criar um pipeline sequencial de agentes
- Como encadear agentes onde cada um se baseia na saída anterior
- Como adicionar aprovação humana no loop para chamadas de ferramentas confidenciais
- Como misturar agentes com executores personalizados para tarefas especializadas
- Como acompanhar o fluxo de conversa por meio do pipeline
Definir seus agentes
Na orquestração sequencial, os agentes são organizados em um pipeline em que cada agente processa a tarefa por sua vez, passando a saída para o próximo agente na sequência.
Configurar o cliente do Azure OpenAI
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Azure.AI.Projects;
using Azure.Identity;
using Microsoft.Agents.AI.Workflows;
using Microsoft.Extensions.AI;
using Microsoft.Agents.AI;
// 1) Set up the Azure OpenAI client
var endpoint = Environment.GetEnvironmentVariable("AZURE_OPENAI_ENDPOINT") ??
throw new InvalidOperationException("AZURE_OPENAI_ENDPOINT is not set.");
var deploymentName = Environment.GetEnvironmentVariable("AZURE_OPENAI_DEPLOYMENT_NAME") ?? "gpt-4o-mini";
var client = new AIProjectClient(new Uri(endpoint), new DefaultAzureCredential())
.GetProjectOpenAIClient()
.GetProjectResponsesClient()
.AsIChatClient(deploymentName);
Aviso
DefaultAzureCredential é conveniente para o desenvolvimento, mas requer uma consideração cuidadosa na produção. Em produção, considere o uso de uma credencial específica (por exemplo, ManagedIdentityCredential) para evitar problemas de latência, investigação de credenciais não intencionais e possíveis riscos de segurança de mecanismos de fallback.
Crie agentes especializados que funcionarão em sequência:
// 2) Helper method to create translation agents
static ChatClientAgent GetTranslationAgent(string targetLanguage, IChatClient chatClient) =>
new(chatClient,
$"You are a translation assistant who only responds in {targetLanguage}. Respond to any " +
$"input by outputting the name of the input language and then translating the input to {targetLanguage}.");
// Create translation agents for sequential processing
var translationAgents = (from lang in (string[])["French", "Spanish", "English"]
select GetTranslationAgent(lang, client));
Configurar a orquestração sequencial
Crie o fluxo de trabalho usando AgentWorkflowBuilder:
// 3) Build sequential workflow
var workflow = AgentWorkflowBuilder.BuildSequential(translationAgents);
Executar o fluxo de trabalho sequencial
Execute o fluxo de trabalho e processe os eventos:
// 4) Run the workflow
var messages = new List<ChatMessage> { new(ChatRole.User, "Hello, world!") };
await using StreamingRun run = await InProcessExecution.RunStreamingAsync(workflow, messages);
await run.TrySendMessageAsync(new TurnToken(emitEvents: true));
string? lastExecutorId = null;
List<ChatMessage> result = [];
await foreach (WorkflowEvent evt in run.WatchStreamAsync())
{
if (evt is AgentResponseUpdateEvent e)
{
if (e.ExecutorId != lastExecutorId)
{
lastExecutorId = e.ExecutorId;
Console.WriteLine();
Console.Write($"{e.ExecutorId}: ");
}
Console.Write(e.Update.Text);
}
else if (evt is WorkflowOutputEvent outputEvt)
{
result = outputEvt.As<List<ChatMessage>>()!;
break;
}
}
// Display final result
Console.WriteLine();
foreach (var message in result)
{
Console.WriteLine($"{message.Role}: {message.Text}");
}
Saída de exemplo
French_Translation: User: Hello, world!
French_Translation: Assistant: English detected. Bonjour, le monde !
Spanish_Translation: Assistant: French detected. ¡Hola, mundo!
English_Translation: Assistant: Spanish detected. Hello, world!
Orquestração sequencial com Human-in-the-Loop
Orquestrações sequenciais dão suporte a interações humanas no loop por meio da aprovação da ferramenta. Quando os agentes usam ferramentas encapsuladas com ApprovalRequiredAIFunction, o fluxo de trabalho pausa e emite um RequestInfoEvent contendo um ToolApprovalRequestContent. Sistemas externos (como um operador humano) podem inspecionar a chamada da ferramenta, aprová-la ou rejeitá-la e o fluxo de trabalho é retomado adequadamente.
Dica
Para obter mais detalhes sobre o modelo de solicitação e resposta, consulte Human-in-the-Loop.
Definir agentes com ferramentas que exigem aprovação
Crie agentes em que as ferramentas confidenciais são encapsuladas com ApprovalRequiredAIFunction:
ChatClientAgent deployAgent = new(
client,
"You are a DevOps engineer. Check staging status first, then deploy to production.",
"DeployAgent",
"Handles deployments",
[
AIFunctionFactory.Create(CheckStagingStatus),
new ApprovalRequiredAIFunction(AIFunctionFactory.Create(DeployToProduction))
]);
ChatClientAgent verifyAgent = new(
client,
"You are a QA engineer. Verify that the deployment was successful and summarize the results.",
"VerifyAgent",
"Verifies deployments");
Compilar e executar com gerenciamento de aprovações
Crie o fluxo de trabalho sequencial normalmente. O fluxo de aprovação é tratado por meio do fluxo de eventos:
var workflow = AgentWorkflowBuilder.BuildSequential([deployAgent, verifyAgent]);
await foreach (WorkflowEvent evt in run.WatchStreamAsync())
{
if (evt is RequestInfoEvent e &&
e.Request.TryGetDataAs(out ToolApprovalRequestContent? approvalRequest))
{
await run.SendResponseAsync(
e.Request.CreateResponse(approvalRequest.CreateResponse(approved: true)));
}
}
Observação
AgentWorkflowBuilder.BuildSequential() oferece suporte à aprovação de ferramentas prontas para uso — não é necessária nenhuma configuração adicional. Quando um agente chama uma ferramenta encapsulada com ApprovalRequiredAIFunction, o fluxo de trabalho pausa automaticamente e emite um RequestInfoEvent.
Dica
Para obter um exemplo executável completo desse fluxo de aprovação, consulte o GroupChatToolApproval exemplo. O mesmo RequestInfoEvent padrão de tratamento se aplica a outras orquestrações.
Conceitos Principais
- Processamento Sequencial: cada agente processa a saída do agente anterior em ordem
- AgentWorkflowBuilder.BuildSequential(): cria um fluxo de trabalho de pipeline de uma coleção de agentes
- ChatClientAgent: representa um agente apoiado por um cliente de chat com instruções específicas
-
InProcessExecution.RunStreamingAsync(): executa o fluxo de trabalho e retorna um
StreamingRunpara streaming de eventos em tempo real -
Tratamento de eventos: monitorar o progresso do agente através
AgentResponseUpdateEvente a conclusão atravésWorkflowOutputEvent -
Aprovação de ferramenta: Envolver ferramentas confidenciais com
ApprovalRequiredAIFunctionpara exigir aprovação humana antes da execução. -
RequestInfoEvent: emitido quando uma ferramenta requer aprovação; contém
ToolApprovalRequestContentcom os detalhes da chamada da ferramenta
Na orquestração sequencial, cada agente processa a tarefa sequencialmente, com a saída fluindo de um agente para o próximo. Comece definindo agentes para um processo de dois estágios:
import os
from agent_framework.foundry import FoundryChatClient
from azure.identity import AzureCliCredential
# 1) Create agents using FoundryChatClient
chat_client = FoundryChatClient(
project_endpoint=os.environ["FOUNDRY_PROJECT_ENDPOINT"],
model=os.environ["FOUNDRY_MODEL"],
credential=AzureCliCredential(),
)
writer = chat_client.as_agent(
instructions=(
"You are a concise copywriter. Provide a single, punchy marketing sentence based on the prompt."
),
name="writer",
)
reviewer = chat_client.as_agent(
instructions=(
"You are a thoughtful reviewer. Give brief feedback on the previous assistant message."
),
name="reviewer",
)
Configurar a orquestração sequencial
A SequentialBuilder classe cria um pipeline em que os agentes processam tarefas em ordem. Cada agente vê o histórico completo da conversa e adiciona sua resposta:
from agent_framework.orchestrations import SequentialBuilder
# 2) Build sequential workflow: writer -> reviewer
workflow = SequentialBuilder(participants=[writer, reviewer]).build()
Executar o fluxo de trabalho sequencial
Execute o fluxo de trabalho e colete a saída final. A saída do terminal é uma AgentResponse que contém as mensagens de resposta do último agente:
from agent_framework import AgentResponse
# 3) Run and print the last agent's response
events = await workflow.run("Write a tagline for a budget-friendly eBike.")
outputs = events.get_outputs()
if outputs:
print("===== Final Response =====")
final: AgentResponse = outputs[0]
for msg in final.messages:
name = msg.author_name or "assistant"
print(f"[{name}]\n{msg.text}")
Saída de exemplo
===== Final Response =====
[reviewer]
This tagline clearly communicates affordability and the benefit of extended travel, making it
appealing to budget-conscious consumers. It has a friendly and motivating tone, though it could
be slightly shorter for more punch. Overall, a strong and effective suggestion!
Avançado: combinando agentes com executores personalizados
A orquestração sequencial dá suporte à combinação de agentes com executores personalizados para processamento especializado. Isso é útil quando você precisa de uma lógica personalizada que não exija uma LLM:
Definir um Executor Personalizado
Observação
Quando um executor personalizado segue um agente na sequência, seu manipulador recebe um AgentExecutorResponse (porque os agentes são encapsulados internamente por AgentExecutor). Use agent_response.full_conversation para acessar o histórico completo da conversa. Um executor personalizado usado como o último participante (terminador) deve chamar ctx.yield_output(AgentResponse(...)) para que sua saída se torne a saída do terminal do fluxo de trabalho.
from agent_framework import AgentExecutorResponse, AgentResponse, Executor, WorkflowContext, handler
from agent_framework import Message
from typing_extensions import Never
class Summarizer(Executor):
"""Terminator custom executor: consumes full conversation and yields a summary as the workflow's final answer."""
@handler
async def summarize(
self,
agent_response: AgentExecutorResponse,
ctx: WorkflowContext[Never, AgentResponse]
) -> None:
if not agent_response.full_conversation:
await ctx.yield_output(AgentResponse(messages=[Message("assistant", ["No conversation to summarize."])]))
return
users = sum(1 for m in agent_response.full_conversation if m.role == "user")
assistants = sum(1 for m in agent_response.full_conversation if m.role == "assistant")
summary = Message("assistant", [f"Summary -> users:{users} assistants:{assistants}"])
await ctx.yield_output(AgentResponse(messages=[summary]))
Criar um fluxo de trabalho sequencial misto
# Create a content agent
content = chat_client.as_agent(
instructions="Produce a concise paragraph answering the user's request.",
name="content",
)
# Build sequential workflow: content -> summarizer
summarizer = Summarizer(id="summarizer")
workflow = SequentialBuilder(participants=[content, summarizer]).build()
Saída de exemplo com Executador Customizado
===== Final Summary =====
Summary -> users:1 assistants:1
Controlando o contexto entre agentes
Por padrão, cada agente em um SequentialBuilder fluxo de trabalho consome a conversa completa do agente anterior (entrada + mensagens de resposta). A configuração chain_only_agent_responses=True configura todos os agentes na sequência para consumir somente as mensagens de resposta do agente anterior em vez disso:
workflow = SequentialBuilder(
participants=[writer, translator, reviewer],
chain_only_agent_responses=True,
).build()
Isso é útil para pipelines de tradução, refinamento progressivo e outros cenários em que cada agente deve se concentrar apenas na transformação da saída do agente anterior sem ser influenciado por turnos de conversa anteriores.
Para obter um exemplo completo, consulte sequential_chain_only_agent_responses.py no repositório do Agent Framework.
Dica
Para obter um controle mais refinado sobre o fluxo de contexto, incluindo funções de filtro personalizadas, consulte Modos de Contexto na referência do Executor do Agente.
Saídas Intermediárias
Por padrão, apenas a saída do último participante aparece como um evento do fluxo de output trabalho. Defina intermediate_outputs=True para exibir a saída de cada participante, além da saída final:
workflow = SequentialBuilder(
participants=[writer, reviewer, editor],
intermediate_outputs=True,
).build()
Você pode lidar com esses eventos em tempo real no modo de streaming:
from agent_framework import AgentResponseUpdate
# Track the last author to format streaming output.
last_author: str | None = None
async for event in workflow.run("Write a tagline for a budget-friendly eBike.", stream=True):
if event.type == "output" and isinstance(event.data, AgentResponseUpdate):
update = event.data
author = update.author_name
if author != last_author:
if last_author is not None:
print() # Newline between different authors
print(f"{author}: {update.text}", end="", flush=True)
last_author = author
else:
print(update.text, end="", flush=True)
Orquestração sequencial com Human-in-the-Loop
Orquestrações sequenciais dão suporte a interações humanas no loop de duas maneiras: aprovação de ferramenta para controlar chamadas de ferramentas confidenciais e solicitação de informações para pausar após cada resposta do agente para coletar comentários.
Dica
Para obter mais detalhes sobre o modelo de solicitação e resposta, consulte Human-in-the-Loop.
Aprovação da ferramenta em fluxos de trabalho sequenciais
Use @tool(approval_mode="always_require") para marcar ferramentas que precisam de aprovação humana antes da execução. O fluxo de trabalho pausa e emite um request_info evento quando o agente tenta chamar a ferramenta.
@tool(approval_mode="always_require")
def execute_database_query(query: str) -> str:
return f"Query executed successfully: {query}"
database_agent = Agent(
client=chat_client,
name="DatabaseAgent",
instructions="You are a database assistant.",
tools=[execute_database_query],
)
workflow = SequentialBuilder(participants=[database_agent]).build()
Processe o fluxo de eventos e manipule as solicitações de aprovação:
async def process_event_stream(stream):
responses = {}
async for event in stream:
if event.type == "request_info" and event.data.type == "function_approval_request":
responses[event.request_id] = event.data.to_function_approval_response(approved=True)
return responses if responses else None
stream = workflow.run("Check the schema and update all pending orders", stream=True)
pending_responses = await process_event_stream(stream)
while pending_responses is not None:
stream = workflow.run(stream=True, responses=pending_responses)
pending_responses = await process_event_stream(stream)
Dica
Para obter um exemplo executável completo, consulte sequential_builder_tool_approval.py. A aprovação da ferramenta funciona com SequentialBuilder sem nenhuma configuração extra de builder.
Solicitar informações para comentários do agente
Use .with_request_info() para pausar depois que agentes específicos responderem, permitindo entrada externa (como revisão humana) antes do próximo agente começar:
drafter = Agent(
client=chat_client,
name="drafter",
instructions="You are a document drafter. Create a brief draft on the given topic.",
)
editor = Agent(
client=chat_client,
name="editor",
instructions="You are an editor. Review and improve the draft. Incorporate any human feedback.",
)
finalizer = Agent(
client=chat_client,
name="finalizer",
instructions="You are a finalizer. Create a polished final version.",
)
# Enable request info for the editor agent only
workflow = (
SequentialBuilder(participants=[drafter, editor, finalizer])
.with_request_info(agents=["editor"])
.build()
)
async def process_event_stream(stream):
responses = {}
async for event in stream:
if event.type == "request_info":
responses[event.request_id] = AgentRequestInfoResponse.approve()
return responses if responses else None
stream = workflow.run("Write a brief introduction to artificial intelligence.", stream=True)
pending_responses = await process_event_stream(stream)
while pending_responses is not None:
stream = workflow.run(stream=True, responses=pending_responses)
pending_responses = await process_event_stream(stream)
Dica
Confira os exemplos completos: aprovação de ferramenta sequencial e informações de solicitação sequencial.
Conceitos Principais
- Contexto Compartilhado: por padrão, cada agente consome a conversa completa do agente anterior, incluindo mensagens de entrada e resposta
-
Controle de Contexto: use
chain_only_agent_responses=Truepara configurar agentes para consumir apenas as mensagens de resposta do agente anterior -
Saída AgentResponse: o resultado final do fluxo de trabalho é um
AgentResponseque representa a resposta do último agente (não a conversa completa) -
Order Matters: Agentes são executados estritamente na ordem especificada na
participantslista - Participantes flexíveis: você pode misturar agentes e executores personalizados em qualquer ordem
-
Contrato do Terminador Personalizado: um executor personalizado usado como último participante deve chamar
ctx.yield_output(AgentResponse(...))para produzir a saída do terminal -
Saídas Intermediárias: definido
intermediate_outputs=Truepara exibir a saída de cada participante como um evento de fluxooutputde trabalho, não apenas do último participante -
Aprovação da Ferramenta: Usar
@tool(approval_mode="always_require")para operações confidenciais que precisam de revisão humana -
Informações de solicitação: use
.with_request_info(agents=[...])para pausar e aguardar comentários externos de agentes específicos