Compartilhar via


Executador de Agente

Quando você adiciona um agente de IA a um fluxo de trabalho, ele precisa ser encapsulado em um executor para que o mecanismo de fluxo de trabalho possa rotear mensagens para ele, gerenciar seu estado de sessão e manipular sua saída. O Executor do Agente é o executor interno que lida com essa adaptação.

Visão geral

O Executor do Agente faz a ponte entre a abstração do agente e o modelo de execução do fluxo de trabalho. Isso:

  • Recebe mensagens digitadas do grafo de fluxo de trabalho e as encaminha para o agente subjacente.
  • Gerencia o estado de sessão e de conversa do agente entre diferentes execuções.
  • Adapta seu comportamento com base no modo de execução do fluxo de trabalho (streaming ou não streaming).
  • Produz eventos de saída (AgentResponse ou AgentResponseUpdate) para o chamador de fluxo de trabalho para observação.
  • Envia mensagens para executores downstream conectados para processamento contínuo no grafo.
  • Dá suporte ao ponto de verificação para fluxos de trabalho de execução longa.

Como funciona

Em C#, o mecanismo de fluxo de trabalho cria internamente um AIAgentHostExecutor para cada AIAgent adicionado a um fluxo de trabalho. Este executor especializado estende ChatProtocolExecutor e usa um token de turno como padrão:

  1. Cache de mensagens – conforme as mensagens chegam de outros executores, o agente executor as coleta. Se ForwardIncomingMessages estiver habilitado (o padrão), as mensagens de entrada também serão encaminhadas para executores downstream.
  2. Ativar o gatilho de token – o agente processa suas mensagens armazenadas em cache somente depois de receber um TurnToken.
  3. Invocação do agente – o executor chama RunAsync (não streaming) ou RunStreamingAsync (streaming) no agente subjacente.
  4. Geração de saída – se os eventos de streaming estiverem habilitados, cada incremental AgentResponseUpdate será gerado como uma saída de fluxo de trabalho. Se EmitAgentResponseEvents estiver habilitado, o agregado AgentResponse também será gerado como uma saída de fluxo de trabalho.
  5. Mensagens downstream – as mensagens de resposta do agente são enviadas para executores downstream conectados.
  6. Ativar passagem de token — depois de concluir seu turno, o executor envia um novo TurnToken para o fluxo abaixo para que o próximo agente na cadeia possa iniciar o processamento.

Dica

Alguns cenários podem exigir um executor de agente mais especializado; por exemplo, orquestrações de transferência usam um HandoffAgentExecutor dedicado com lógica de roteamento personalizada.

Criação implícita versus explícita

Quando você passa um AIAgent para WorkflowBuilder, a estrutura a encapsula automaticamente em um AIAgentBinding, que cria o subjacente AIAgentHostExecutor. Você não precisa criar uma instância do executor do agente diretamente.

AIAgent writerAgent = /* create your agent */;
AIAgent reviewerAgent = /* create your agent */;

// Agents are automatically wrapped — no manual executor creation required
var workflow = new WorkflowBuilder(writerAgent)
    .AddEdge(writerAgent, reviewerAgent)
    .Build();

Você também pode usar os métodos auxiliares no AgentWorkflowBuilder para padrões comuns:

// Build a sequential pipeline of agents
var workflow = AgentWorkflowBuilder.BuildSequential(writerAgent, reviewerAgent);

Configuração Personalizada

Para personalizar como o executor do agente se comporta, use BindAsExecutor com AIAgentHostOptions:

var options = new AIAgentHostOptions
{
    EmitAgentUpdateEvents = true,
    EmitAgentResponseEvents = true,
    ReassignOtherAgentsAsUsers = true,
    ForwardIncomingMessages = true,
};

ExecutorBinding writerBinding = writerAgent.BindAsExecutor(options);
var workflow = new WorkflowBuilder(writerBinding)
    .AddEdge(writerBinding, reviewerAgent)
    .Build();

Tipos de entrada

O executor do agente em C# aceita vários tipos de entrada: string, ChatMessagee IEnumerable<ChatMessage>. As entradas de string são automaticamente convertidas em instâncias de ChatMessage com a função de User. Todas as mensagens de entrada são acumuladas até que um TurnToken seja recebido, momento em que o executor processa o lote. Quando ReassignOtherAgentsAsUsers estiver habilitado (o padrão), as mensagens de outros agentes serão reatribuídas à função User para que o modelo subjacente as trate como entradas do usuário, enquanto as mensagens do agente atual mantêm a função Assistant.

Saída e encadeamento

Depois que o agente concluir sua vez, o executor:

  1. Envia as mensagens de resposta do agente para todos os executores downstream conectados.
  2. Encaminha um novo TurnToken para que o próximo agente na cadeia possa começar o processamento.

Isso torna os agentes de encadeamento simples : basta conectá-los com bordas:

var workflow = new WorkflowBuilder(frenchTranslator)
    .AddEdge(frenchTranslator, spanishTranslator)
    .AddEdge(spanishTranslator, englishTranslator)
    .Build();

Comportamento de streaming

O comportamento de streaming é controlado pela opção EmitAgentUpdateEvents em AIAgentHostOptions, ou dinamicamente por meio do TurnToken:

  • Quando habilitado, o executor chama RunStreamingAsync no agente e gera cada AgentResponseUpdate como evento de saída do fluxo de trabalho. Isso fornece atualizações de token por token em tempo real.
  • Quando desabilitado , o executor chama RunAsync e produz uma única resposta completa.
// Enable streaming events at the configuration level
var options = new AIAgentHostOptions
{
    EmitAgentUpdateEvents = true,
};

// Or enable streaming dynamically via TurnToken
await run.TrySendMessageAsync(new TurnToken(emitEvents: true));

Sessões compartilhadas

Cada executor de agente mantém sua própria sessão por padrão. Para compartilhar uma sessão entre agentes, configure os agentes com um provedor de sessão comum antes de adicioná-los ao fluxo de trabalho.

Opções de configuração

AIAgentHostOptions controla o comportamento do executor do agente:

Opção Default Descrição
EmitAgentUpdateEvents null Emita eventos de atualização de streaming durante a execução. TurnToken terá precedência se definido. Se ambos estiverem null, o streaming estará desabilitado.
EmitAgentResponseEvents false Emita a resposta do agente agregado como um evento de saída de fluxo de trabalho.
InterceptUserInputRequests false Intercepte e roteie-o UserInputRequestContent como uma mensagem de fluxo de trabalho para manipulação.
InterceptUnterminatedFunctionCalls false Intercepte FunctionCallContent sem um resultado correspondente e encaminhe-o como uma mensagem de fluxo de trabalho.
ReassignOtherAgentsAsUsers true Reatribua mensagens de outros agentes para a User função para que o modelo as trate como entradas do usuário.
ForwardIncomingMessages true Encaminhe mensagens de entrada para executores downstream antes das mensagens geradas pelo agente.

Definindo o ponto de verificação

O executor do agente dá suporte ao ponto de verificação para fluxos de trabalho de execução longa. Quando um ponto de verificação é realizado, o executor serializa:

  • O estado da sessão do agente (via SerializeSessionAsync)
  • A configuração de emissão de eventos do turno atual (presente apenas enquanto as requisições estão pendentes e o executor ainda não entregou sua entrada TurnToken).
  • Quaisquer solicitações de entrada de usuário pendentes e solicitações de chamada de função.

Na restauração, o executor desserializa a sessão e o estado de solicitação pendente, permitindo que o processo seja retomado do ponto onde foi interrompido.

Como funciona

A AgentExecutor classe encapsula um agente que implementa o SupportsAgentRun protocolo. Quando o executor recebe uma mensagem:

  1. Normalização da mensagem – a entrada é normalizada em uma lista de Message objetos e adicionada ao cache interno do executor. O executor aceita vários tipos de entrada – str, Message, list[str | Message]e AgentExecutorRequestAgentExecutorResponse – cada um roteado para um manipulador dedicado que normaliza a entrada antes do cache.
  2. Invocação do agente – o executor chama agent.run() com as mensagens armazenadas em cache, selecionando automaticamente o modo de streaming ou não streaming com base no modo de execução do fluxo de trabalho.
  3. Emissão de saída – no modo de streaming, cada AgentResponseUpdate é produzido como um evento de saída do fluxo de trabalho. No modo sem transmissão contínua, um único AgentResponse é gerado.
  4. Transmissão downstream – após o agente concluir, o executor envia um AgentExecutorResponse para todos os executores downstream conectados. Essa resposta inclui o histórico completo da conversa, habilitando um encadeamento perfeito.
  5. Redefinição de cache – o cache de mensagens internas do executor é limpo depois que o agente é invocado, garantindo que cada invocação de agente processe apenas novas mensagens recebidas desde a última invocação.

Dica

Alguns cenários podem exigir um executor de agente mais especializado; por exemplo, orquestrações de entrega usam um executor dedicado com lógica de roteamento personalizada.

Criação implícita versus explícita

O WorkflowBuilder encapsula automaticamente agentes em AgentExecutor instâncias quando você passa um agente diretamente. Para a maioria dos fluxos de trabalho, a criação implícita é suficiente:

from agent_framework import WorkflowBuilder

writer_agent = client.as_agent(name="Writer", instructions="...")
reviewer_agent = client.as_agent(name="Reviewer", instructions="...")

# Agents are automatically wrapped — no manual AgentExecutor creation required
workflow = (
    WorkflowBuilder(start_executor=writer_agent)
    .add_edge(writer_agent, reviewer_agent)
    .build()
)

Criação explícita

Crie um AgentExecutor explicitamente quando precisar:

  • Compartilhe uma sessão entre vários agentes.
  • Forneça uma ID de executor personalizada.
  • Faça referência à mesma instância do executor em várias bordas.
from agent_framework import AgentExecutor

writer_executor = AgentExecutor(writer_agent, id="my-writer")
reviewer_executor = AgentExecutor(reviewer_agent, id="my-reviewer")

workflow = (
    WorkflowBuilder(start_executor=writer_executor)
    .add_edge(writer_executor, reviewer_executor)
    .build()
)

Parâmetros do construtor:

Parâmetro Tipo Descrição
agent SupportsAgentRun O agente a ser encapsulado.
session AgentSession \| None Sessão a ser usada para execuções de agente. Caso None, uma nova sessão será criada a partir do agente.
id str \| None Identificador exclusivo do executor. O padrão será o nome do agente, se disponível.

Tipos de entrada

O AgentExecutor define vários métodos de manipulador, cada um aceitando um tipo de entrada diferente. O mecanismo de fluxo de trabalho despacha automaticamente o manipulador correto com base no tipo de mensagem. Todos os tipos de entrada disparam o agente para executar imediatamente, exceto por AgentExecutorRequest, onde o sinalizador should_respond controla se o agente executa ou simplesmente armazena as mensagens em cache.

Tipo de entrada Manipulador Agente de Disparadores Descrição
AgentExecutorRequest run Condicional O tipo de entrada canônica. Contém uma lista de mensagens e um should_respond flag que controla se o agente é executado.
str from_str Sempre Aceita um prompt de cadeia de caracteres bruto.
Message from_message Sempre Aceita um único Message objeto.
list[str \| Message] from_messages Sempre Aceita uma lista de cadeias de caracteres ou Message objetos como contexto de conversa.
AgentExecutorResponse from_response Sempre Aceita a resposta de um executor de agente anterior, habilitando o encadeamento direto.

Usando AgentExecutorRequest

AgentExecutorRequest é o tipo de entrada canônica e fornece mais controle:

from agent_framework import AgentExecutorRequest, Message

# Create a request with messages
request = AgentExecutorRequest(
    messages=[Message("user", text="Hello, world!")],
    should_respond=True,
)

# Run the workflow
result = await workflow.run(request)

O should_respond sinalizador controla se o agente processa as mensagens imediatamente ou simplesmente as armazena em cache para mais tarde:

  • True (padrão) — o agente é executado e produz uma resposta.
  • False — as mensagens são adicionadas ao cache, mas o agente não é executado. Isso é útil para pré-carregar o contexto da conversa antes de disparar uma resposta.

Saída e encadeamento

Depois que o agente é concluído, o executor envia um AgentExecutorResponse downstream. Esta classe de dados contém:

Campo Tipo Descrição
executor_id str O ID do executor que produziu a resposta.
agent_response AgentResponse A resposta do agente subjacente (sem alterações feitas pelo cliente).
full_conversation list[Message] \| None O contexto completo da conversa (entradas anteriores + saídas do agente) para o encadeamento.

Ao encadear executores de agente, o executor downstream recebe o AgentExecutorResponse por meio do manipulador from_response. Ele usa o campo full_conversation para preservar o histórico completo da conversa, impedindo que agentes subsequentes percam o contexto prévio.

spam_detector = AgentExecutor(create_spam_detector_agent())
email_assistant = AgentExecutor(create_email_assistant_agent())

# The email_assistant receives the spam_detector's full conversation context
workflow = (
    WorkflowBuilder(start_executor=spam_detector)
    .add_edge(spam_detector, email_assistant)
    .build()
)

Comportamento de streaming

AgentExecutor adapta-se automaticamente ao modo de execução do fluxo de trabalho.

  • stream=True — chama agent.run(stream=True) e gera cada AgentResponseUpdate como evento de saída de fluxo de trabalho. Após a conclusão do streaming, as atualizações são agregadas em um componente completo AgentResponse para o envio downstream.
  • stream=False (padrão) — chama agent.run(stream=False) e produz um único AgentResponse como um evento de saída de fluxo de trabalho.
# Streaming mode — receive incremental updates
events = workflow.run("Write a story about a cat.", stream=True)
async for event in events:
    if event.type == "output" and isinstance(event.data, AgentResponseUpdate):
        print(event.data.text, end="", flush=True)

# Non-streaming mode — receive complete response
result = await workflow.run("Write a story about a cat.")

# Retrieve AgentResponse objects from the result
outputs = result.get_outputs()
for output in outputs:
    if isinstance(output, AgentResponse):
        print(output.text)

Sessões compartilhadas

Por padrão, cada um AgentExecutor cria sua própria sessão. Para compartilhar uma sessão entre vários agentes (por exemplo, para manter um thread de conversa comum), crie uma sessão explicitamente e passe-a para cada executor:

from agent_framework import AgentExecutor

# Create a shared session from one agent
shared_session = writer_agent.create_session()

# Both executors share the same session
writer_executor = AgentExecutor(writer_agent, session=shared_session)
reviewer_executor = AgentExecutor(reviewer_agent, session=shared_session)

Observação

Nem todos os agentes dão suporte a sessões compartilhadas. Normalmente, somente agentes do mesmo tipo de provedor podem compartilhar uma sessão.

Definindo o ponto de verificação

O AgentExecutor oferece suporte ao checkpoint para salvar e restaurar o estado em fluxos de trabalho de longa duração. Quando um ponto de verificação é realizado, o executor serializa:

  • O cache de mensagens interna.
  • O histórico completo da conversa.
  • O estado da sessão do agente.
  • Quaisquer solicitações e respostas de entrada de usuário pendentes.

Na restauração, o executor desserializa esse estado, permitindo que o fluxo de trabalho seja retomado de onde parou.

Aviso

O ponto de verificação com agentes que usam sessões do lado do servidor (como AzureAIAgentClient) tem limitações. O estado da sessão do lado do servidor não é capturado em pontos de verificação e pode ser modificado por execuções subsequentes. Considere implementar um executor personalizado se você precisar de um ponto de verificação confiável com sessões do lado do servidor.

Próximas Etapas