Condividi tramite


Agente Esecutore

Quando si aggiunge un agente di intelligenza artificiale a un flusso di lavoro, è necessario eseguire il wrapping in un executor in modo che il motore del flusso di lavoro possa instradare i messaggi, gestire lo stato della sessione e gestire l'output. Il Agent Executor è l'esecutore predefinito che gestisce questo adattamento.

Informazioni generali

Agent Executor consente di colmare il divario tra l'astrazione dell'agente e il modello di esecuzione del flusso di lavoro. IT:

  • Riceve i messaggi digitati dal grafico del flusso di lavoro e li inoltra all'agente sottostante.
  • Gestisce lo stato della sessione e della conversazione dell'agente tra le esecuzioni.
  • Adatta il comportamento in base alla modalità di esecuzione del flusso di lavoro (streaming o non streaming).
  • Restituisce gli eventi di output (AgentResponse o AgentResponseUpdate) al chiamante del flusso di lavoro affinché possano essere osservati.
  • Invia messaggi agli executor downstream connessi per l'elaborazione continua all'interno del grafico.
  • Supporta il checkpoint per i flussi di lavoro di lunga durata.

Funzionamento

In C# il motore del flusso di lavoro crea internamente un oggetto AIAgentHostExecutor per ognuno AIAgent aggiunto a un flusso di lavoro. Questo executor specializzato estende ChatProtocolExecutor e usa un modello di token di turno :

  1. Caching dei messaggi: quando arrivano messaggi da altri executor, l'agente executor li raccoglie. Se ForwardIncomingMessages è abilitato (impostazione predefinita), i messaggi in arrivo vengono inoltrati anche agli executor downstream.
  2. Attivazione del token: l'agente elabora i suoi messaggi memorizzati nella cache solo dopo aver ricevuto un TurnToken.
  3. Chiamata dell'agente : l'esecutore chiama RunAsync (non-streaming) o RunStreamingAsync (streaming) sull'agente sottostante.
  4. Output yielding : se gli eventi di streaming sono abilitati, ogni incrementale AgentResponseUpdate viene restituito come output del flusso di lavoro. Se EmitAgentResponseEvents è abilitato, anche l'aggregato AgentResponse viene restituito come output del flusso di lavoro.
  5. Messaggistica downstream : i messaggi di risposta dell'agente vengono inviati agli executor downstream connessi.
  6. Turn token pass-through — dopo aver completato il proprio ciclo, l'executor invia un nuovo TurnToken downstream in modo che l'agente successivo nella catena possa iniziare l'elaborazione.

Suggerimento

Alcuni scenari possono richiedere un esecutore agente più specializzato; ad esempio, le orchestrazioni handoff utilizzano un HandoffAgentExecutor dedicato con logica di routing personalizzata.

Creazione implicita e esplicita

Quando si passa un AIAgent a WorkflowBuilder, il framework lo chiude automaticamente in un AIAgentBinding, creando l'elemento sottostante AIAgentHostExecutor. Non è necessario istanziare direttamente l'executor dell'agente.

AIAgent writerAgent = /* create your agent */;
AIAgent reviewerAgent = /* create your agent */;

// Agents are automatically wrapped — no manual executor creation required
var workflow = new WorkflowBuilder(writerAgent)
    .AddEdge(writerAgent, reviewerAgent)
    .Build();

È anche possibile usare i metodi helper in AgentWorkflowBuilder per modelli comuni:

// Build a sequential pipeline of agents
var workflow = AgentWorkflowBuilder.BuildSequential(writerAgent, reviewerAgent);

Configurazione personalizzata

Per personalizzare il comportamento dell'esecutore dell'agente, utilizzare BindAsExecutor con AIAgentHostOptions:

var options = new AIAgentHostOptions
{
    EmitAgentUpdateEvents = true,
    EmitAgentResponseEvents = true,
    ReassignOtherAgentsAsUsers = true,
    ForwardIncomingMessages = true,
};

ExecutorBinding writerBinding = writerAgent.BindAsExecutor(options);
var workflow = new WorkflowBuilder(writerBinding)
    .AddEdge(writerBinding, reviewerAgent)
    .Build();

Tipi di input

L'executor dell'agente in C# accetta più tipi di input: string, ChatMessagee IEnumerable<ChatMessage>. Gli input di stringa vengono convertiti automaticamente in istanze ChatMessage con ruolo User. Tutti i messaggi in arrivo vengono accumulati fino a quando viene ricevuto un TurnToken, a quel punto l'executor elabora il batch. Quando ReassignOtherAgentsAsUsers è abilitato (impostazione predefinita), i messaggi di altri agenti vengono riassegnati al User ruolo in modo che il modello sottostante li consideri come input utente, mentre i messaggi dell'agente corrente mantengono il Assistant ruolo.

Output e concatenamento

Al termine del turno dell'agente, l'esecutore:

  1. Invia i messaggi di risposta dell'agente a tutti gli executor downstream connessi.
  2. Inoltra un nuovo oggetto TurnToken in modo che l'agente successivo nella catena possa iniziare l'elaborazione.

In questo modo gli agenti di concatenamento sono semplici: è sufficiente connetterli con i bordi:

var workflow = new WorkflowBuilder(frenchTranslator)
    .AddEdge(frenchTranslator, spanishTranslator)
    .AddEdge(spanishTranslator, englishTranslator)
    .Build();

Comportamento di streaming

Il comportamento di streaming è controllato dall'opzione EmitAgentUpdateEvents in AIAgentHostOptionso in modo dinamico tramite TurnToken:

  • Se abilitato, l'executor chiama RunStreamingAsync sull'agente e restituisce ogni AgentResponseUpdate come evento di output del flusso di lavoro. In questo modo vengono forniti aggiornamenti token per token in tempo reale.
  • Se disabilitato : l'executor chiama RunAsync e produce una singola risposta completa.
// Enable streaming events at the configuration level
var options = new AIAgentHostOptions
{
    EmitAgentUpdateEvents = true,
};

// Or enable streaming dynamically via TurnToken
await run.TrySendMessageAsync(new TurnToken(emitEvents: true));

Sessioni condivise

Per impostazione predefinita, ogni executor dell'agente gestisce la propria sessione. Per condividere una sessione tra agenti, configurare gli agenti con un provider di sessione comune prima di aggiungerli al flusso di lavoro.

Opzioni di configurazione

AIAgentHostOptions controlla il comportamento dell'esecutore dell'agente:

Opzione Impostazione predefinita Descrizione
EmitAgentUpdateEvents null Emettere eventi di aggiornamento dello streaming durante l'esecuzione. TurnToken ha la precedenza se è impostato. Se entrambi sono null, lo streaming è disabilitato.
EmitAgentResponseEvents false Generare la risposta dell'agente aggregato come evento di output del flusso di lavoro.
InterceptUserInputRequests false Interceptare UserInputRequestContent e instradarlo come messaggio nel flusso di lavoro per la gestione.
InterceptUnterminatedFunctionCalls false Intercettare FunctionCallContent senza un risultato corrispondente e instradarlo come messaggio del flusso di lavoro.
ReassignOtherAgentsAsUsers true Riassegnare i messaggi da altri agenti al User ruolo in modo che il modello li consideri come input dell'utente.
ForwardIncomingMessages true Inoltrare i messaggi in arrivo agli executor downstream prima dei messaggi generati dall'agente.

Checkpoint

L'esecutore agente supporta il checkpoint per i flussi di lavoro di lunga durata. Quando viene acquisito un checkpoint, l'executor serializza:

  • Lo stato di sessione dell'agente (tramite SerializeSessionAsync).
  • La configurazione delle emissioni di eventi del turno corrente (presente solo mentre le richieste sono in sospeso e l'executor non ha ancora restituito il relativo in ingresso TurnToken).
  • Eventuali richieste di input dell'utente in sospeso e richieste di chiamata di funzione.

In fase di ripristino, l'esecutore deserializza la sessione e lo stato della richiesta in attesa, permettendo al flusso di lavoro di riprendere dal punto in cui è stato interrotto.

Funzionamento

La AgentExecutor classe esegue il wrapping di un agente che implementa il SupportsAgentRun protocollo. Quando l'executor riceve un messaggio:

  1. Normalizzazione dei messaggi: l'input viene normalizzato in un elenco di Message oggetti e aggiunto alla cache interna dell'executor. L'executor accetta più tipi di input , str, Messagelist[str | Message]AgentExecutorRequest, e AgentExecutorResponse , ognuno indirizzato a un gestore dedicato che normalizza l'input prima della memorizzazione nella cache.
  2. Chiamata dell'agente : l'executor chiama agent.run() con i messaggi memorizzati nella cache, selezionando automaticamente la modalità di streaming o non di streaming in base alla modalità di esecuzione del flusso di lavoro.
  3. Emissione di output : in modalità di streaming, ognuna AgentResponseUpdate viene restituita come evento di output del flusso di lavoro. In modalità non streaming, viene restituito un singolo AgentResponse.
  4. Inoltro a valle — dopo che l'agente ha completato, l'executor invia un oggetto AgentExecutorResponse a tutti gli executor a valle connessi. Questa risposta include la cronologia completa della conversazione, consentendo il concatenamento facile.
  5. Reimpostazione della cache : la cache dei messaggi interna dell'executor viene cancellata dopo che l'agente viene richiamato, assicurandosi che ogni chiamata dell'agente elabora solo i nuovi messaggi ricevuti dall'ultima chiamata.

Suggerimento

Alcuni scenari possono richiedere un executor agente più specializzato; Ad esempio, le orchestrazioni handoff usano un executor dedicato con logica di routing personalizzata.

Creazione implicita e esplicita

Incapsula automaticamente gli agenti nelle istanze AgentExecutor quando si passa direttamente un agente. Per la maggior parte dei flussi di lavoro, la creazione implicita è sufficiente:

from agent_framework import WorkflowBuilder

writer_agent = client.as_agent(name="Writer", instructions="...")
reviewer_agent = client.as_agent(name="Reviewer", instructions="...")

# Agents are automatically wrapped — no manual AgentExecutor creation required
workflow = (
    WorkflowBuilder(start_executor=writer_agent)
    .add_edge(writer_agent, reviewer_agent)
    .build()
)

Creazione esplicita

Crea AgentExecutor esplicitamente quando necessario.

  • Condividere una sessione tra più agenti.
  • Specificare un ID executor personalizzato.
  • Fare riferimento alla stessa istanza dell'executor in più connessioni.
from agent_framework import AgentExecutor

writer_executor = AgentExecutor(writer_agent, id="my-writer")
reviewer_executor = AgentExecutor(reviewer_agent, id="my-reviewer")

workflow = (
    WorkflowBuilder(start_executor=writer_executor)
    .add_edge(writer_executor, reviewer_executor)
    .build()
)

Parametri del costruttore:

Parametro Tipo Descrizione
agent SupportsAgentRun Agente di cui eseguire il wrapping.
session AgentSession \| None Sessione da usare per le attività dell'agente. Se None, viene creata una nuova sessione dall'agente.
id str \| None ID univoco dell'esecutore. L'impostazione predefinita è il nome dell'agente, se disponibile.

Tipi di input

Il AgentExecutor definisce più metodi handler, ciascuno accetta un diverso tipo di input. Il motore del flusso di lavoro invia automaticamente il gestore corretto in base al tipo di messaggio. Tutti i tipi di input attivano l'esecuzione immediata dell'agente, ad eccezione della AgentExecutorRequest posizione in cui il should_respond flag controlla se l'agente viene eseguito o memorizza semplicemente nella cache i messaggi:

Tipo di input Gestore Agente di trigger Descrizione
AgentExecutorRequest run Conditional Tipo di input canonico. Contiene un elenco di messaggi e una should_respond flag che controlla l'esecuzione dell'agente.
str from_str Sempre Accetta una richiesta di stringa non elaborata.
Message from_message Sempre Accetta un singolo Message oggetto.
list[str \| Message] from_messages Sempre Accetta un elenco di stringhe o Message oggetti come contesto di conversazione.
AgentExecutorResponse from_response Sempre Accetta la risposta di un esecutore dell'agente che lo precede, abilitando il concatenamento diretto.

Utilizzo di AgentExecutorRequest

AgentExecutorRequest è il tipo di input canonico e fornisce il maggior controllo:

from agent_framework import AgentExecutorRequest, Message

# Create a request with messages
request = AgentExecutorRequest(
    messages=[Message("user", text="Hello, world!")],
    should_respond=True,
)

# Run the workflow
result = await workflow.run(request)

Il should_respond flag controlla se l'agente elabora immediatamente i messaggi o li memorizza semplicemente nella cache per un secondo momento:

  • True (impostazione predefinita): l'agente viene eseguito e produce una risposta.
  • False : i messaggi vengono aggiunti alla cache, ma l'agente non viene eseguito. Ciò è utile per precaricare il contesto della conversazione prima di attivare una risposta.

Output e concatenamento

Al termine dell'operazione dell'agente, l'esecutore invia un AgentExecutorResponse a valle. Questa classe di dati contiene:

Campo Tipo Descrizione
executor_id str ID dell'esecutore che ha generato la risposta.
agent_response AgentResponse Risposta dell'agente sottostante (non modificata dal client).
full_conversation list[Message] \| None Contesto completo della conversazione (input precedenti e output dell'agente) per il concatenamento.

Quando si concatenano gli executor dell'agente, l'executor downstream riceve AgentExecutorResponse tramite il from_response gestore. Usa il full_conversation campo per mantenere la cronologia completa delle conversazioni, impedendo agli agenti downstream di perdere il contesto precedente:

spam_detector = AgentExecutor(create_spam_detector_agent())
email_assistant = AgentExecutor(create_email_assistant_agent())

# The email_assistant receives the spam_detector's full conversation context
workflow = (
    WorkflowBuilder(start_executor=spam_detector)
    .add_edge(spam_detector, email_assistant)
    .build()
)

Comportamento di streaming

L'oggetto AgentExecutor si adatta automaticamente alla modalità di esecuzione del flusso di lavoro:

  • stream=True : chiama agent.run(stream=True) e restituisce ognuno AgentResponseUpdate come evento di output del flusso di lavoro. Al termine dello streaming, gli aggiornamenti vengono aggregati in un flusso completo AgentResponse per l'invio a valle.
  • stream=False (impostazione predefinita): chiama agent.run(stream=False) e restituisce un singolo AgentResponse evento di output del flusso di lavoro.
# Streaming mode — receive incremental updates
events = workflow.run("Write a story about a cat.", stream=True)
async for event in events:
    if event.type == "output" and isinstance(event.data, AgentResponseUpdate):
        print(event.data.text, end="", flush=True)

# Non-streaming mode — receive complete response
result = await workflow.run("Write a story about a cat.")

# Retrieve AgentResponse objects from the result
outputs = result.get_outputs()
for output in outputs:
    if isinstance(output, AgentResponse):
        print(output.text)

Sessioni condivise

Per impostazione predefinita, ognuna AgentExecutor crea la propria sessione. Per condividere una sessione tra più agenti (ad esempio, per mantenere un thread di conversazione comune), creare una sessione in modo esplicito e passarla a ogni executor:

from agent_framework import AgentExecutor

# Create a shared session from one agent
shared_session = writer_agent.create_session()

# Both executors share the same session
writer_executor = AgentExecutor(writer_agent, session=shared_session)
reviewer_executor = AgentExecutor(reviewer_agent, session=shared_session)

Annotazioni

Non tutti gli agenti supportano sessioni condivise. In genere, solo gli agenti dello stesso tipo di provider possono condividere una sessione.

Checkpoint

AgentExecutor supporta la gestione dei punti di controllo per il salvataggio e il ripristino dello stato nei flussi di lavoro di lunga durata. Quando viene acquisito un checkpoint, l'executor serializza:

  • Cache dei messaggi interna.
  • Cronologia completa della conversazione.
  • Stato della sessione dell'agente.
  • Tutte le richieste e risposte di input dell'utente pendenti.

Al ripristino, l'executor deserializza questo stato, consentendo al flusso di lavoro di riprendere da dove è stato interrotto.

Avviso

Il checkpoint con agenti che usano sessioni lato server (ad esempio AzureAIAgentClient) presenta limitazioni. Lo stato della sessione sul lato server non viene acquisito nei checkpoint e può essere modificato dalle esecuzioni successive. Valutare la possibilità di implementare un executor personalizzato se è necessario eseguire un checkpoint affidabile con sessioni lato server.

Passaggi successivi