Observação
O acesso a essa página exige autorização. Você pode tentar entrar ou alterar diretórios.
O acesso a essa página exige autorização. Você pode tentar alterar os diretórios.
Quando você adiciona um agente de IA a um fluxo de trabalho, ele precisa ser encapsulado em um executor para que o mecanismo de fluxo de trabalho possa rotear mensagens para ele, gerenciar seu estado de sessão e manipular sua saída. O Executor do Agente é o executor interno que lida com essa adaptação.
Visão geral
O Executor do Agente faz a ponte entre a abstração do agente e o modelo de execução do fluxo de trabalho. Isso:
- Recebe mensagens digitadas do grafo de fluxo de trabalho e as encaminha para o agente subjacente.
- Gerencia o estado de sessão e de conversa do agente entre diferentes execuções.
- Adapta seu comportamento com base no modo de execução do fluxo de trabalho (streaming ou não streaming).
- Produz eventos de saída (
AgentResponseouAgentResponseUpdate) para o chamador de fluxo de trabalho para observação. - Envia mensagens para executores downstream conectados para processamento contínuo no grafo.
- Dá suporte ao ponto de verificação para fluxos de trabalho de execução longa.
Como funciona
Em C#, o mecanismo de fluxo de trabalho cria internamente um AIAgentHostExecutor para cada AIAgent adicionado a um fluxo de trabalho. Este executor especializado estende ChatProtocolExecutor e usa um token de turno como padrão:
-
Cache de mensagens – conforme as mensagens chegam de outros executores, o agente executor as coleta. Se
ForwardIncomingMessagesestiver habilitado (o padrão), as mensagens de entrada também serão encaminhadas para executores downstream. -
Ativar o gatilho de token – o agente processa suas mensagens armazenadas em cache somente depois de receber um
TurnToken. -
Invocação do agente – o executor chama
RunAsync(não streaming) ouRunStreamingAsync(streaming) no agente subjacente. -
Geração de saída – se os eventos de streaming estiverem habilitados, cada incremental
AgentResponseUpdateserá gerado como uma saída de fluxo de trabalho. SeEmitAgentResponseEventsestiver habilitado, o agregadoAgentResponsetambém será gerado como uma saída de fluxo de trabalho. - Mensagens downstream – as mensagens de resposta do agente são enviadas para executores downstream conectados.
-
Ativar passagem de token — depois de concluir seu turno, o executor envia um novo
TurnTokenpara o fluxo abaixo para que o próximo agente na cadeia possa iniciar o processamento.
Dica
Alguns cenários podem exigir um executor de agente mais especializado; por exemplo, orquestrações de transferência usam um HandoffAgentExecutor dedicado com lógica de roteamento personalizada.
Criação implícita versus explícita
Quando você passa um AIAgent para WorkflowBuilder, a estrutura a encapsula automaticamente em um AIAgentBinding, que cria o subjacente AIAgentHostExecutor. Você não precisa criar uma instância do executor do agente diretamente.
AIAgent writerAgent = /* create your agent */;
AIAgent reviewerAgent = /* create your agent */;
// Agents are automatically wrapped — no manual executor creation required
var workflow = new WorkflowBuilder(writerAgent)
.AddEdge(writerAgent, reviewerAgent)
.Build();
Você também pode usar os métodos auxiliares no AgentWorkflowBuilder para padrões comuns:
// Build a sequential pipeline of agents
var workflow = AgentWorkflowBuilder.BuildSequential(writerAgent, reviewerAgent);
Configuração Personalizada
Para personalizar como o executor do agente se comporta, use BindAsExecutor com AIAgentHostOptions:
var options = new AIAgentHostOptions
{
EmitAgentUpdateEvents = true,
EmitAgentResponseEvents = true,
ReassignOtherAgentsAsUsers = true,
ForwardIncomingMessages = true,
};
ExecutorBinding writerBinding = writerAgent.BindAsExecutor(options);
var workflow = new WorkflowBuilder(writerBinding)
.AddEdge(writerBinding, reviewerAgent)
.Build();
Tipos de entrada
O executor do agente em C# aceita vários tipos de entrada: string, ChatMessagee IEnumerable<ChatMessage>. As entradas de string são automaticamente convertidas em instâncias de ChatMessage com a função de User. Todas as mensagens de entrada são acumuladas até que um TurnToken seja recebido, momento em que o executor processa o lote. Quando ReassignOtherAgentsAsUsers estiver habilitado (o padrão), as mensagens de outros agentes serão reatribuídas à função User para que o modelo subjacente as trate como entradas do usuário, enquanto as mensagens do agente atual mantêm a função Assistant.
Saída e encadeamento
Depois que o agente concluir sua vez, o executor:
- Envia as mensagens de resposta do agente para todos os executores downstream conectados.
- Encaminha um novo
TurnTokenpara que o próximo agente na cadeia possa começar o processamento.
Isso torna os agentes de encadeamento simples : basta conectá-los com bordas:
var workflow = new WorkflowBuilder(frenchTranslator)
.AddEdge(frenchTranslator, spanishTranslator)
.AddEdge(spanishTranslator, englishTranslator)
.Build();
Comportamento de streaming
O comportamento de streaming é controlado pela opção EmitAgentUpdateEvents em AIAgentHostOptions, ou dinamicamente por meio do TurnToken:
-
Quando habilitado, o executor chama
RunStreamingAsyncno agente e gera cadaAgentResponseUpdatecomo evento de saída do fluxo de trabalho. Isso fornece atualizações de token por token em tempo real. -
Quando desabilitado , o executor chama
RunAsynce produz uma única resposta completa.
// Enable streaming events at the configuration level
var options = new AIAgentHostOptions
{
EmitAgentUpdateEvents = true,
};
// Or enable streaming dynamically via TurnToken
await run.TrySendMessageAsync(new TurnToken(emitEvents: true));
Sessões compartilhadas
Cada executor de agente mantém sua própria sessão por padrão. Para compartilhar uma sessão entre agentes, configure os agentes com um provedor de sessão comum antes de adicioná-los ao fluxo de trabalho.
Opções de configuração
AIAgentHostOptions controla o comportamento do executor do agente:
| Opção | Default | Descrição |
|---|---|---|
EmitAgentUpdateEvents |
null |
Emita eventos de atualização de streaming durante a execução.
TurnToken terá precedência se definido. Se ambos estiverem null, o streaming estará desabilitado. |
EmitAgentResponseEvents |
false |
Emita a resposta do agente agregado como um evento de saída de fluxo de trabalho. |
InterceptUserInputRequests |
false |
Intercepte e roteie-o UserInputRequestContent como uma mensagem de fluxo de trabalho para manipulação. |
InterceptUnterminatedFunctionCalls |
false |
Intercepte FunctionCallContent sem um resultado correspondente e encaminhe-o como uma mensagem de fluxo de trabalho. |
ReassignOtherAgentsAsUsers |
true |
Reatribua mensagens de outros agentes para a User função para que o modelo as trate como entradas do usuário. |
ForwardIncomingMessages |
true |
Encaminhe mensagens de entrada para executores downstream antes das mensagens geradas pelo agente. |
Definindo o ponto de verificação
O executor do agente dá suporte ao ponto de verificação para fluxos de trabalho de execução longa. Quando um ponto de verificação é realizado, o executor serializa:
- O estado da sessão do agente (via
SerializeSessionAsync) - A configuração de emissão de eventos do turno atual (presente apenas enquanto as requisições estão pendentes e o executor ainda não entregou sua entrada
TurnToken). - Quaisquer solicitações de entrada de usuário pendentes e solicitações de chamada de função.
Na restauração, o executor desserializa a sessão e o estado de solicitação pendente, permitindo que o processo seja retomado do ponto onde foi interrompido.
Como funciona
A AgentExecutor classe encapsula um agente que implementa o SupportsAgentRun protocolo. Quando o executor recebe uma mensagem:
-
Normalização da mensagem – a entrada é normalizada em uma lista de
Messageobjetos e adicionada ao cache interno do executor. O executor aceita vários tipos de entrada –str,Message,list[str | Message]eAgentExecutorRequestAgentExecutorResponse– cada um roteado para um manipulador dedicado que normaliza a entrada antes do cache. -
Invocação do agente – o executor chama
agent.run()com as mensagens armazenadas em cache, selecionando automaticamente o modo de streaming ou não streaming com base no modo de execução do fluxo de trabalho. -
Emissão de saída – no modo de streaming, cada
AgentResponseUpdateé produzido como um evento de saída do fluxo de trabalho. No modo sem transmissão contínua, um únicoAgentResponseé gerado. -
Transmissão downstream – após o agente concluir, o executor envia um
AgentExecutorResponsepara todos os executores downstream conectados. Essa resposta inclui o histórico completo da conversa, habilitando um encadeamento perfeito. - Redefinição de cache – o cache de mensagens internas do executor é limpo depois que o agente é invocado, garantindo que cada invocação de agente processe apenas novas mensagens recebidas desde a última invocação.
Dica
Alguns cenários podem exigir um executor de agente mais especializado; por exemplo, orquestrações de entrega usam um executor dedicado com lógica de roteamento personalizada.
Criação implícita versus explícita
O WorkflowBuilder encapsula automaticamente agentes em AgentExecutor instâncias quando você passa um agente diretamente. Para a maioria dos fluxos de trabalho, a criação implícita é suficiente:
from agent_framework import WorkflowBuilder
writer_agent = client.as_agent(name="Writer", instructions="...")
reviewer_agent = client.as_agent(name="Reviewer", instructions="...")
# Agents are automatically wrapped — no manual AgentExecutor creation required
workflow = (
WorkflowBuilder(start_executor=writer_agent)
.add_edge(writer_agent, reviewer_agent)
.build()
)
Criação explícita
Crie um AgentExecutor explicitamente quando precisar:
- Compartilhe uma sessão entre vários agentes.
- Forneça uma ID de executor personalizada.
- Faça referência à mesma instância do executor em várias bordas.
from agent_framework import AgentExecutor
writer_executor = AgentExecutor(writer_agent, id="my-writer")
reviewer_executor = AgentExecutor(reviewer_agent, id="my-reviewer")
workflow = (
WorkflowBuilder(start_executor=writer_executor)
.add_edge(writer_executor, reviewer_executor)
.build()
)
Parâmetros do construtor:
| Parâmetro | Tipo | Descrição |
|---|---|---|
agent |
SupportsAgentRun |
O agente a ser encapsulado. |
session |
AgentSession \| None |
Sessão a ser usada para execuções de agente. Caso None, uma nova sessão será criada a partir do agente. |
id |
str \| None |
Identificador exclusivo do executor. O padrão será o nome do agente, se disponível. |
Tipos de entrada
O AgentExecutor define vários métodos de manipulador, cada um aceitando um tipo de entrada diferente. O mecanismo de fluxo de trabalho despacha automaticamente o manipulador correto com base no tipo de mensagem. Todos os tipos de entrada disparam o agente para executar imediatamente, exceto por AgentExecutorRequest, onde o sinalizador should_respond controla se o agente executa ou simplesmente armazena as mensagens em cache.
| Tipo de entrada | Manipulador | Agente de Disparadores | Descrição |
|---|---|---|---|
AgentExecutorRequest |
run |
Condicional | O tipo de entrada canônica. Contém uma lista de mensagens e um should_respond flag que controla se o agente é executado. |
str |
from_str |
Sempre | Aceita um prompt de cadeia de caracteres bruto. |
Message |
from_message |
Sempre | Aceita um único Message objeto. |
list[str \| Message] |
from_messages |
Sempre | Aceita uma lista de cadeias de caracteres ou Message objetos como contexto de conversa. |
AgentExecutorResponse |
from_response |
Sempre | Aceita a resposta de um executor de agente anterior, habilitando o encadeamento direto. |
Usando AgentExecutorRequest
AgentExecutorRequest é o tipo de entrada canônica e fornece mais controle:
from agent_framework import AgentExecutorRequest, Message
# Create a request with messages
request = AgentExecutorRequest(
messages=[Message("user", text="Hello, world!")],
should_respond=True,
)
# Run the workflow
result = await workflow.run(request)
O should_respond sinalizador controla se o agente processa as mensagens imediatamente ou simplesmente as armazena em cache para mais tarde:
-
True(padrão) — o agente é executado e produz uma resposta. -
False— as mensagens são adicionadas ao cache, mas o agente não é executado. Isso é útil para pré-carregar o contexto da conversa antes de disparar uma resposta.
Saída e encadeamento
Depois que o agente é concluído, o executor envia um AgentExecutorResponse downstream. Esta classe de dados contém:
| Campo | Tipo | Descrição |
|---|---|---|
executor_id |
str |
O ID do executor que produziu a resposta. |
agent_response |
AgentResponse |
A resposta do agente subjacente (sem alterações feitas pelo cliente). |
full_conversation |
list[Message] \| None |
O contexto completo da conversa (entradas anteriores + saídas do agente) para o encadeamento. |
Ao encadear executores de agente, o executor downstream recebe o AgentExecutorResponse por meio do manipulador from_response. Ele usa o campo full_conversation para preservar o histórico completo da conversa, impedindo que agentes subsequentes percam o contexto prévio.
spam_detector = AgentExecutor(create_spam_detector_agent())
email_assistant = AgentExecutor(create_email_assistant_agent())
# The email_assistant receives the spam_detector's full conversation context
workflow = (
WorkflowBuilder(start_executor=spam_detector)
.add_edge(spam_detector, email_assistant)
.build()
)
Comportamento de streaming
AgentExecutor adapta-se automaticamente ao modo de execução do fluxo de trabalho.
-
stream=True— chamaagent.run(stream=True)e gera cadaAgentResponseUpdatecomo evento de saída de fluxo de trabalho. Após a conclusão do streaming, as atualizações são agregadas em um componente completoAgentResponsepara o envio downstream. -
stream=False(padrão) — chamaagent.run(stream=False)e produz um únicoAgentResponsecomo um evento de saída de fluxo de trabalho.
# Streaming mode — receive incremental updates
events = workflow.run("Write a story about a cat.", stream=True)
async for event in events:
if event.type == "output" and isinstance(event.data, AgentResponseUpdate):
print(event.data.text, end="", flush=True)
# Non-streaming mode — receive complete response
result = await workflow.run("Write a story about a cat.")
# Retrieve AgentResponse objects from the result
outputs = result.get_outputs()
for output in outputs:
if isinstance(output, AgentResponse):
print(output.text)
Sessões compartilhadas
Por padrão, cada um AgentExecutor cria sua própria sessão. Para compartilhar uma sessão entre vários agentes (por exemplo, para manter um thread de conversa comum), crie uma sessão explicitamente e passe-a para cada executor:
from agent_framework import AgentExecutor
# Create a shared session from one agent
shared_session = writer_agent.create_session()
# Both executors share the same session
writer_executor = AgentExecutor(writer_agent, session=shared_session)
reviewer_executor = AgentExecutor(reviewer_agent, session=shared_session)
Observação
Nem todos os agentes dão suporte a sessões compartilhadas. Normalmente, somente agentes do mesmo tipo de provedor podem compartilhar uma sessão.
Definindo o ponto de verificação
O AgentExecutor oferece suporte ao checkpoint para salvar e restaurar o estado em fluxos de trabalho de longa duração. Quando um ponto de verificação é realizado, o executor serializa:
- O cache de mensagens interna.
- O histórico completo da conversa.
- O estado da sessão do agente.
- Quaisquer solicitações e respostas de entrada de usuário pendentes.
Na restauração, o executor desserializa esse estado, permitindo que o fluxo de trabalho seja retomado de onde parou.
Aviso
O ponto de verificação com agentes que usam sessões do lado do servidor (como AzureAIAgentClient) tem limitações. O estado da sessão do lado do servidor não é capturado em pontos de verificação e pode ser modificado por execuções subsequentes. Considere implementar um executor personalizado se você precisar de um ponto de verificação confiável com sessões do lado do servidor.