Freigeben über


Agentenausführer

Wenn Sie einem Workflow einen KI-Agent hinzufügen, muss er in einen Executor eingeschlossen werden, damit das Workflowmodul Nachrichten an ihn weiterleiten, den Sitzungsstatus verwalten und seine Ausgabe verarbeiten kann. Der Agent-Executor ist der integrierte Executor, der diese Anpassung verarbeitet.

Übersicht

Der Agent-Executor überbrückt die Lücke zwischen der Agentabstraktion und dem Workflowausführungsmodell. Es hat folgende Aufgaben:

  • Empfängt typierte Nachrichten aus dem Workflowdiagramm und leitet sie an den zugrunde liegenden Agent weiter.
  • Verwaltet den Sitzungs- und Konversationsstatus des Agents zwischen den Ausführungen.
  • Passt sein Verhalten basierend auf dem Workflowausführungsmodus (Streaming oder Nicht-Streaming) an.
  • Liefert Ausgabeereignisse (AgentResponse oder AgentResponseUpdate) für den Workflowaufrufer zur Beobachtung.
  • Sendet Nachrichten an verbundene nachgeschaltete Executoren für die fortgesetzte Verarbeitung innerhalb des Diagramms.
  • Unterstützt das Checkpointing für lang andauernde Workflows.

Funktionsweise

In C# erstellt das Workflowmodul intern einen AIAgentHostExecutor für jeden AIAgent , der einem Workflow hinzugefügt wurde. Dieser spezialisierte Executor erweitert ChatProtocolExecutor und verwendet ein Turntokenmuster :

  1. Nachrichtenzwischenspeicherung – Wenn Nachrichten von anderen Exekutoren empfangen werden, sammelt der Agent-Executor sie. Wenn ForwardIncomingMessages aktiviert ist (Standard), werden die eingehenden Nachrichten auch an nachgeschaltete Ausführungsinstanzen weitergeleitet.
  2. Tokentrigger aktivieren – der Agent verarbeitet seine zwischengespeicherten Nachrichten erst nach dem Empfang eines TurnToken.
  3. Agentaufruf – der Executor ruft RunAsync (kein Streaming) oder RunStreamingAsync (Streaming) für den zugrunde liegenden Agent auf.
  4. Ausgaberendite – wenn Streamingereignisse aktiviert sind, wird jede inkrementelle AgentResponseUpdate Ausgabe als Workflowausgabe zurückgegeben. Wenn EmitAgentResponseEvents aktiviert ist, wird die aggregierte AgentResponse auch als Workflowausgabe bereitgestellt.
  5. Nachgeschaltetes Messaging – die Antwortnachrichten des Agents werden an verbundene nachgeschaltete Executoren gesendet.
  6. Token-Pass-Through aktivieren – nach Abschluss seines Durchgangs sendet der Executor ein neues TurnToken Token weiter, damit der nächste Agent in der Kette mit der Verarbeitung beginnen kann.

Tipp

Einige Szenarien erfordern möglicherweise einen spezialisierteren Agenten-Executor. Zum Beispiel verwenden Handoff-Orchestrierungen eine eigene HandoffAgentExecutor mit spezifischer Routinglogik.

Implizite und explizite Erstellung

Wenn Sie ein AIAgent an WorkflowBuilder übergeben, wird es vom Framework automatisch in einen AIAgentBinding eingeschlossen, was die zugrunde liegende AIAgentHostExecutor erstellt. Sie müssen den Agent Executor nicht direkt instanziieren.

AIAgent writerAgent = /* create your agent */;
AIAgent reviewerAgent = /* create your agent */;

// Agents are automatically wrapped — no manual executor creation required
var workflow = new WorkflowBuilder(writerAgent)
    .AddEdge(writerAgent, reviewerAgent)
    .Build();

Sie können auch die Hilfsmethoden auf AgentWorkflowBuilder für allgemeine Muster verwenden.

// Build a sequential pipeline of agents
var workflow = AgentWorkflowBuilder.BuildSequential(writerAgent, reviewerAgent);

Benutzerdefinierte Konfiguration

Um das Verhalten des Agent-Executors anzupassen, verwenden Sie BindAsExecutor mit AIAgentHostOptions.

var options = new AIAgentHostOptions
{
    EmitAgentUpdateEvents = true,
    EmitAgentResponseEvents = true,
    ReassignOtherAgentsAsUsers = true,
    ForwardIncomingMessages = true,
};

ExecutorBinding writerBinding = writerAgent.BindAsExecutor(options);
var workflow = new WorkflowBuilder(writerBinding)
    .AddEdge(writerBinding, reviewerAgent)
    .Build();

Eingabetypen

Der Agent-Executor in C# akzeptiert mehrere Eingabetypen: string, , ChatMessageund IEnumerable<ChatMessage>. String-Eingaben werden automatisch in ChatMessage-Instanzen mit der User-Rolle umgewandelt. Alle eingehenden Nachrichten werden gesammelt, bis ein TurnToken empfangen wird. Zu diesem Zeitpunkt verarbeitet der Executor den Satz. Wenn ReassignOtherAgentsAsUsers diese Option aktiviert ist (standard), werden Nachrichten von anderen Agents der User Rolle neu zugewiesen, sodass das zugrunde liegende Modell sie als Benutzereingaben behandelt, während Nachrichten vom aktuellen Agent die Assistant Rolle beibehalten.

Ausgabe und Verkettung

Nachdem der Agent seine Ausführung abgeschlossen hat, führt der Executor Folgendes aus:

  1. Sendet die Antwortmeldungen des Agents an alle verbundenen nachgeschalteten Executoren.
  2. Leitet ein neues TurnToken weiter, damit der nächste Agent in der Kette mit der Verarbeitung beginnen kann.

Dadurch wird die Verkettung von Agenten einfach: Verbinden Sie sie einfach mit Kanten.

var workflow = new WorkflowBuilder(frenchTranslator)
    .AddEdge(frenchTranslator, spanishTranslator)
    .AddEdge(spanishTranslator, englishTranslator)
    .Build();

Streamingverhalten

Das Streamingverhalten wird durch die Option EmitAgentUpdateEvents auf AIAgentHostOptions gesteuert oder dynamisch über TurnToken.

  • Wenn diese Option aktiviert ist – ruft der Executor beim Agenten RunStreamingAsync und liefert jedes AgentResponseUpdate als Workflowausgabeereignis. Dadurch werden Token-nach-Token-Updates in Echtzeit bereitgestellt.
  • Wenn deaktiviert – ruft der Executor auf RunAsync und erzeugt eine einzelne vollständige Antwort.
// Enable streaming events at the configuration level
var options = new AIAgentHostOptions
{
    EmitAgentUpdateEvents = true,
};

// Or enable streaming dynamically via TurnToken
await run.TrySendMessageAsync(new TurnToken(emitEvents: true));

Geteilte Sitzungen

Jeder Agent-Executor verwaltet standardmäßig eine eigene Sitzung. Um eine Sitzung zwischen Agents zu teilen, konfigurieren Sie die Agents mit einem gemeinsamen Sitzungsanbieter, bevor Sie sie dem Workflow hinzufügen.

Konfigurationsoptionen

AIAgentHostOptions steuert das Verhalten der Agent-Ausführungsinstanz:

Auswahl Vorgabe Beschreibung
EmitAgentUpdateEvents null Stream-Update-Ereignisse während der Ausführung ausgeben. TurnToken hat Vorrang, wenn festgelegt. Wenn beide null sind, ist Streaming deaktiviert.
EmitAgentResponseEvents false Gibt die aggregierte Agentantwort als Workflowausgabeereignis aus.
InterceptUserInputRequests false Abfangen UserInputRequestContent und Weiterleiten als Workflownachricht für die Verarbeitung.
InterceptUnterminatedFunctionCalls false Fangen Sie FunctionCallContent ohne ein entsprechendes Ergebnis ab, und leiten Sie es als Workflow-Nachricht weiter.
ReassignOtherAgentsAsUsers true Weisen Sie Nachrichten von anderen Agents der User Rolle neu zu, damit das Modell sie als Benutzereingaben behandelt.
ForwardIncomingMessages true Leiten Sie eingehende Nachrichten an nachgeschaltete Ausführungsinstanzen weiter, bevor die vom Agenten generierten Nachrichten gesendet werden.

Erstellen von Prüfpunkten

Der Agent-Executor unterstützt Prüfpunkte für lange ausgeführte Workflows. Wenn ein Prüfpunkt ausgeführt wird, serialisiert der Executor Folgendes:

  • Der Sitzungsstatus des Agents (über SerializeSessionAsync).
  • Die Ereignisemissionskonfiguration des aktuellen Turns (nur vorhanden, während Anforderungen ausstehen und der Executor noch nicht seinen eingehenden TurnTokenWert zurückgegeben hat).
  • Alle ausstehenden Benutzereingabeanforderungen und Funktionsaufrufanforderungen.

Bei der Wiederherstellung deserialisiert der Executor den Sitzungs- und ausstehenden Anforderungsstatus, sodass der Workflow von dort fortgesetzt werden kann, wo er unterbrochen wurde.

Funktionsweise

Die AgentExecutor Klasse umschließt einen Agent, der das SupportsAgentRun Protokoll implementiert. Wenn der Executor eine Nachricht empfängt:

  1. Nachrichtennormalisierung – die Eingabe wird in eine Liste von Message Objekten normalisiert und dem internen Cache des Executors hinzugefügt. Der Executor akzeptiert mehrere Eingabetypen — str, Message, list[str | Message], AgentExecutorRequest und AgentExecutorResponse — wobei jeder einem dedizierten Handler zugewiesen wird, der die Eingabe vor dem Zwischenspeichern normalisiert.
  2. Agentaufruf – der Ausführende ruft agent.run() mit den zwischengespeicherten Nachrichten auf und wählt automatisch den Streaming-Modus oder Nicht-Streaming-Modus basierend auf dem Workflowausführungsmodus.
  3. Ausgabeemissionen – im Streamingmodus wird jedes AgentResponseUpdate als Workflowausgabeereignis zurückgegeben. Im Nicht-Streaming-Modus wird ein einzelnes AgentResponse zurückgegeben.
  4. Downstream-Dispatch – Nachdem der Agent abgeschlossen ist, sendet der Executor einen AgentExecutorResponse an alle verbundenen nachgeschaltete Executoren. Diese Antwort enthält den vollständigen Unterhaltungsverlauf und ermöglicht eine nahtlose Verkettung.
  5. Cache-Zurücksetzung – Der interne Nachrichtencache des Executor wird gelöscht, nachdem der Agent aufgerufen wurde, um sicherzustellen, dass jeder Agentaufruf nur neue Nachrichten verarbeitet, die seit dem letzten Aufruf empfangen wurden.

Tipp

Einige Szenarien erfordern möglicherweise einen spezielleren Agent-Executor; Beispielsweise verwenden Handoff-Orchestrierungen einen dedizierten Executor mit benutzerdefinierter Routinglogik.

Implizite und explizite Erstellung

Die WorkflowBuilder wickelt Agenten automatisch in AgentExecutor-Instanzen ein, wenn Sie einen Agenten direkt übergeben. Für die meisten Workflows reicht die implizite Erstellung aus:

from agent_framework import WorkflowBuilder

writer_agent = client.as_agent(name="Writer", instructions="...")
reviewer_agent = client.as_agent(name="Reviewer", instructions="...")

# Agents are automatically wrapped — no manual AgentExecutor creation required
workflow = (
    WorkflowBuilder(start_executor=writer_agent)
    .add_edge(writer_agent, reviewer_agent)
    .build()
)

Explizite Erstellung

Erstellen Sie eine AgentExecutor explizit, wenn Sie dies benötigen:

  • Eine Sitzung zwischen mehreren Agenten freigeben.
  • Stellen Sie eine benutzerdefinierte Executor-ID für Routing und gezielte Laufzeit-Kwargs bereit.
  • Verweisen Sie auf dieselbe Executorinstanz in mehreren Rändern.
from agent_framework import AgentExecutor

writer_executor = AgentExecutor(writer_agent, id="my-writer")
reviewer_executor = AgentExecutor(reviewer_agent, id="my-reviewer")

workflow = (
    WorkflowBuilder(start_executor=writer_executor)
    .add_edge(writer_executor, reviewer_executor)
    .build()
)

Konstruktorparameter:

Parameter Typ Beschreibung
agent SupportsAgentRun Der zu umschließende Agent.
session AgentSession \| None Sitzung, die für Agentausführungen verwendet werden soll. Wenn None eine neue Sitzung durch den Agenten erstellt wird.
id str \| None Eindeutige Executor-ID. Wenn verfügbar, wird standardmäßig der Name des Agents verwendet.

Tipp

Die Executor-ID ist auch der Schlüssel, der verwendet wird, wenn Sie workflow.run(function_invocation_kwargs=...) oder client_kwargs= auf einzelne Agenten richten. Wenn Sie weglassen id, verwendet der Workflow den Namen des umschlossenen Agents.

Eingabetypen

Die AgentExecutor definiert mehrere Handlermethoden, die jeweils einen anderen Eingabetyp akzeptieren. Das Workflowmodul verteilt automatisch den richtigen Handler basierend auf dem Nachrichtentyp. Alle Eingabetypen lösen die Ausführung des Agents sofort aus, außer bei AgentExecutorRequest, wo das should_respond-Flag steuert, ob der Agent ausgeführt wird oder die Nachrichten zwischengespeichert werden.

Eingabetyp Bearbeiter Auslöser-Agent Beschreibung
AgentExecutorRequest run Bedingt Der kanonische Eingabetyp. Enthält eine Liste von Nachrichten und ein should_respond Flag, das steuert, ob der Agent ausgeführt werden soll.
str from_str Immer Akzeptiert eine unformatierte Zeichenfolgenaufforderung.
Message from_message Immer Akzeptiert ein einzelnes Message Objekt.
list[str \| Message] from_messages Immer Akzeptiert eine Liste von Zeichenfolgen oder Message Objekten als Unterhaltungskontext.
AgentExecutorResponse from_response Immer Akzeptiert die Antwort eines früheren Agentausführers und ermöglicht die direkte Verkettung.

Verwenden von AgentExecutorRequest

AgentExecutorRequest ist der kanonische Eingabetyp und bietet das meiste Steuerelement:

from agent_framework import AgentExecutorRequest, Message

# Create a request with messages
request = AgentExecutorRequest(
    messages=[Message(role="user", contents=["Hello, world!"])],
    should_respond=True,
)

# Run the workflow
result = await workflow.run(request)

Das should_respond Flag steuert, ob der Agent die Nachrichten sofort verarbeitet oder einfach für später zwischenspeichert:

  • True (Standard) – der Agent läuft und erzeugt eine Antwort.
  • False — die Nachrichten werden dem Cache hinzugefügt, der Agent wird jedoch nicht ausgeführt. Dies ist nützlich für das Vorabladen des Unterhaltungskontexts, bevor eine Antwort ausgelöst wird.

Ausgabe und Verkettung

Nach Abschluss des Agents sendet der Executor ein AgentExecutorResponse stromabwärts. Diese Datenklasse enthält:

Feld Typ Beschreibung
executor_id str Die ID des Ausführenden, der die Antwort erzeugt hat.
agent_response AgentResponse Die zugrunde liegende Antwort des Agenten (nicht geändert vom Client).
full_conversation list[Message] \| None Der vollständige Unterhaltungskontext (vorherige Eingaben + Agentausgabe) für die Verkettung.

Beim Verketten von Agentausführern empfängt der nachgeschaltete Executor den AgentExecutorResponse über den from_response Handler. Es verwendet das full_conversation Feld, um den vollständigen Unterhaltungsverlauf beizubehalten und zu verhindern, dass nachfolgende Agenten den vorherigen Kontext verlieren.

spam_detector = AgentExecutor(create_spam_detector_agent())
email_assistant = AgentExecutor(create_email_assistant_agent())

# The email_assistant receives the spam_detector's full conversation context
workflow = (
    WorkflowBuilder(start_executor=spam_detector)
    .add_edge(spam_detector, email_assistant)
    .build()
)

Streamingverhalten

Der AgentExecutor passt sich automatisch an den Workflow-Ausführungsmodus an.

  • stream=True — Aufrufe agent.run(stream=True) und Ergibt jedes AgentResponseUpdate als Workflowausgabeereignis. Nach Abschluss des Streamings werden die Updates für die nachgelagerte Weiterleitung in eine vollständige AgentResponse zusammengefasst.
  • stream=False (Standard) – ruft agent.run(stream=False) auf und gibt ein einzelnes AgentResponse als Workflow-Ausgabeereignis.
# Streaming mode — receive incremental updates
events = workflow.run("Write a story about a cat.", stream=True)
async for event in events:
    if event.type == "output" and isinstance(event.data, AgentResponseUpdate):
        print(event.data.text, end="", flush=True)

# Non-streaming mode — receive complete response
result = await workflow.run("Write a story about a cat.")

# Retrieve AgentResponse objects from the result
outputs = result.get_outputs()
for output in outputs:
    if isinstance(output, AgentResponse):
        print(output.text)

Geteilte Sitzungen

Standardmäßig erstellt jeder AgentExecutor eine eigene Sitzung. Um eine Sitzung zwischen mehreren Agents zu teilen (z. B. zum Verwalten eines gemeinsamen Unterhaltungsthreads), erstellen Sie eine Sitzung explizit, und übergeben Sie sie an jeden Executor:

from agent_framework import AgentExecutor

# Create a shared session from one agent
shared_session = writer_agent.create_session()

# Both executors share the same session
writer_executor = AgentExecutor(writer_agent, session=shared_session)
reviewer_executor = AgentExecutor(reviewer_agent, session=shared_session)

Hinweis

Nicht alle Agenten unterstützen freigegebene Sitzungen. In der Regel können nur Agents desselben Anbietertyps eine Sitzung gemeinsam nutzen.

Erstellen von Prüfpunkten

Dieses AgentExecutor unterstützt Prüfpunkte zum Speichern und Wiederherstellen des Zustands in lang andauernden Workflows. Wenn ein Prüfpunkt ausgeführt wird, serialisiert der Executor Folgendes:

  • Der interne Nachrichtencache.
  • Der vollständige Gesprächsverlauf.
  • Der Sitzungsstatus des Agenten.
  • Alle ausstehenden Benutzereingabeanforderungen und -antworten.

Bei der Wiederherstellung wird dieser Zustand vom Executor deserialisiert, sodass der Workflow von der Stelle fortgesetzt werden kann, an der er unterbrochen wurde.

Warnung

Die Prüfpunkte mit Agents, die serverseitige Sitzungen (z. B. FoundryAgent) verwenden, haben Einschränkungen. Der serverseitige Sitzungszustand wird in Prüfpunkten nicht erfasst und kann von nachfolgenden Ausführungen geändert werden. Erwägen Sie die Implementierung eines benutzerdefinierten Executors, wenn Sie zuverlässige Prüfpunkte mit serverseitigen Sitzungen benötigen.

Nächste Schritte