Hinweis
Für den Zugriff auf diese Seite ist eine Autorisierung erforderlich. Sie können versuchen, sich anzumelden oder das Verzeichnis zu wechseln.
Für den Zugriff auf diese Seite ist eine Autorisierung erforderlich. Sie können versuchen, das Verzeichnis zu wechseln.
Wenn Sie einem Workflow einen KI-Agent hinzufügen, muss er in einen Executor eingeschlossen werden, damit das Workflowmodul Nachrichten an ihn weiterleiten, den Sitzungsstatus verwalten und seine Ausgabe verarbeiten kann. Der Agent-Executor ist der integrierte Executor, der diese Anpassung verarbeitet.
Übersicht
Der Agent-Executor überbrückt die Lücke zwischen der Agentabstraktion und dem Workflowausführungsmodell. Es hat folgende Aufgaben:
- Empfängt typierte Nachrichten aus dem Workflowdiagramm und leitet sie an den zugrunde liegenden Agent weiter.
- Verwaltet den Sitzungs- und Konversationsstatus des Agents zwischen den Ausführungen.
- Passt sein Verhalten basierend auf dem Workflowausführungsmodus (Streaming oder Nicht-Streaming) an.
- Liefert Ausgabeereignisse (
AgentResponseoderAgentResponseUpdate) für den Workflowaufrufer zur Beobachtung. - Sendet Nachrichten an verbundene nachgeschaltete Executoren für die fortgesetzte Verarbeitung innerhalb des Diagramms.
- Unterstützt das Checkpointing für lang andauernde Workflows.
Funktionsweise
In C# erstellt das Workflowmodul intern einen AIAgentHostExecutor für jeden AIAgent , der einem Workflow hinzugefügt wurde. Dieser spezialisierte Executor erweitert ChatProtocolExecutor und verwendet ein Turntokenmuster :
-
Nachrichtenzwischenspeicherung – Wenn Nachrichten von anderen Exekutoren empfangen werden, sammelt der Agent-Executor sie. Wenn
ForwardIncomingMessagesaktiviert ist (Standard), werden die eingehenden Nachrichten auch an nachgeschaltete Ausführungsinstanzen weitergeleitet. -
Tokentrigger aktivieren – der Agent verarbeitet seine zwischengespeicherten Nachrichten erst nach dem Empfang eines
TurnToken. -
Agentaufruf – der Executor ruft
RunAsync(kein Streaming) oderRunStreamingAsync(Streaming) für den zugrunde liegenden Agent auf. -
Ausgaberendite – wenn Streamingereignisse aktiviert sind, wird jede inkrementelle
AgentResponseUpdateAusgabe als Workflowausgabe zurückgegeben. WennEmitAgentResponseEventsaktiviert ist, wird die aggregierteAgentResponseauch als Workflowausgabe bereitgestellt. - Nachgeschaltetes Messaging – die Antwortnachrichten des Agents werden an verbundene nachgeschaltete Executoren gesendet.
-
Token-Pass-Through aktivieren – nach Abschluss seines Durchgangs sendet der Executor ein neues
TurnTokenToken weiter, damit der nächste Agent in der Kette mit der Verarbeitung beginnen kann.
Tipp
Einige Szenarien erfordern möglicherweise einen spezialisierteren Agenten-Executor. Zum Beispiel verwenden Handoff-Orchestrierungen eine eigene HandoffAgentExecutor mit spezifischer Routinglogik.
Implizite und explizite Erstellung
Wenn Sie ein AIAgent an WorkflowBuilder übergeben, wird es vom Framework automatisch in einen AIAgentBinding eingeschlossen, was die zugrunde liegende AIAgentHostExecutor erstellt. Sie müssen den Agent Executor nicht direkt instanziieren.
AIAgent writerAgent = /* create your agent */;
AIAgent reviewerAgent = /* create your agent */;
// Agents are automatically wrapped — no manual executor creation required
var workflow = new WorkflowBuilder(writerAgent)
.AddEdge(writerAgent, reviewerAgent)
.Build();
Sie können auch die Hilfsmethoden auf AgentWorkflowBuilder für allgemeine Muster verwenden.
// Build a sequential pipeline of agents
var workflow = AgentWorkflowBuilder.BuildSequential(writerAgent, reviewerAgent);
Benutzerdefinierte Konfiguration
Um das Verhalten des Agent-Executors anzupassen, verwenden Sie BindAsExecutor mit AIAgentHostOptions.
var options = new AIAgentHostOptions
{
EmitAgentUpdateEvents = true,
EmitAgentResponseEvents = true,
ReassignOtherAgentsAsUsers = true,
ForwardIncomingMessages = true,
};
ExecutorBinding writerBinding = writerAgent.BindAsExecutor(options);
var workflow = new WorkflowBuilder(writerBinding)
.AddEdge(writerBinding, reviewerAgent)
.Build();
Eingabetypen
Der Agent-Executor in C# akzeptiert mehrere Eingabetypen: string, , ChatMessageund IEnumerable<ChatMessage>. String-Eingaben werden automatisch in ChatMessage-Instanzen mit der User-Rolle umgewandelt. Alle eingehenden Nachrichten werden gesammelt, bis ein TurnToken empfangen wird. Zu diesem Zeitpunkt verarbeitet der Executor den Satz. Wenn ReassignOtherAgentsAsUsers diese Option aktiviert ist (standard), werden Nachrichten von anderen Agents der User Rolle neu zugewiesen, sodass das zugrunde liegende Modell sie als Benutzereingaben behandelt, während Nachrichten vom aktuellen Agent die Assistant Rolle beibehalten.
Ausgabe und Verkettung
Nachdem der Agent seine Ausführung abgeschlossen hat, führt der Executor Folgendes aus:
- Sendet die Antwortmeldungen des Agents an alle verbundenen nachgeschalteten Executoren.
- Leitet ein neues
TurnTokenweiter, damit der nächste Agent in der Kette mit der Verarbeitung beginnen kann.
Dadurch wird die Verkettung von Agenten einfach: Verbinden Sie sie einfach mit Kanten.
var workflow = new WorkflowBuilder(frenchTranslator)
.AddEdge(frenchTranslator, spanishTranslator)
.AddEdge(spanishTranslator, englishTranslator)
.Build();
Streamingverhalten
Das Streamingverhalten wird durch die Option EmitAgentUpdateEvents auf AIAgentHostOptions gesteuert oder dynamisch über TurnToken.
-
Wenn diese Option aktiviert ist – ruft der Executor beim Agenten
RunStreamingAsyncund liefert jedesAgentResponseUpdateals Workflowausgabeereignis. Dadurch werden Token-nach-Token-Updates in Echtzeit bereitgestellt. -
Wenn deaktiviert – ruft der Executor auf
RunAsyncund erzeugt eine einzelne vollständige Antwort.
// Enable streaming events at the configuration level
var options = new AIAgentHostOptions
{
EmitAgentUpdateEvents = true,
};
// Or enable streaming dynamically via TurnToken
await run.TrySendMessageAsync(new TurnToken(emitEvents: true));
Geteilte Sitzungen
Jeder Agent-Executor verwaltet standardmäßig eine eigene Sitzung. Um eine Sitzung zwischen Agents zu teilen, konfigurieren Sie die Agents mit einem gemeinsamen Sitzungsanbieter, bevor Sie sie dem Workflow hinzufügen.
Konfigurationsoptionen
AIAgentHostOptions steuert das Verhalten der Agent-Ausführungsinstanz:
| Auswahl | Vorgabe | Beschreibung |
|---|---|---|
EmitAgentUpdateEvents |
null |
Stream-Update-Ereignisse während der Ausführung ausgeben.
TurnToken hat Vorrang, wenn festgelegt. Wenn beide null sind, ist Streaming deaktiviert. |
EmitAgentResponseEvents |
false |
Gibt die aggregierte Agentantwort als Workflowausgabeereignis aus. |
InterceptUserInputRequests |
false |
Abfangen UserInputRequestContent und Weiterleiten als Workflownachricht für die Verarbeitung. |
InterceptUnterminatedFunctionCalls |
false |
Fangen Sie FunctionCallContent ohne ein entsprechendes Ergebnis ab, und leiten Sie es als Workflow-Nachricht weiter. |
ReassignOtherAgentsAsUsers |
true |
Weisen Sie Nachrichten von anderen Agents der User Rolle neu zu, damit das Modell sie als Benutzereingaben behandelt. |
ForwardIncomingMessages |
true |
Leiten Sie eingehende Nachrichten an nachgeschaltete Ausführungsinstanzen weiter, bevor die vom Agenten generierten Nachrichten gesendet werden. |
Erstellen von Prüfpunkten
Der Agent-Executor unterstützt Prüfpunkte für lange ausgeführte Workflows. Wenn ein Prüfpunkt ausgeführt wird, serialisiert der Executor Folgendes:
- Der Sitzungsstatus des Agents (über
SerializeSessionAsync). - Die Ereignisemissionskonfiguration des aktuellen Turns (nur vorhanden, während Anforderungen ausstehen und der Executor noch nicht seinen eingehenden
TurnTokenWert zurückgegeben hat). - Alle ausstehenden Benutzereingabeanforderungen und Funktionsaufrufanforderungen.
Bei der Wiederherstellung deserialisiert der Executor den Sitzungs- und ausstehenden Anforderungsstatus, sodass der Workflow von dort fortgesetzt werden kann, wo er unterbrochen wurde.
Funktionsweise
Die AgentExecutor Klasse umschließt einen Agent, der das SupportsAgentRun Protokoll implementiert. Wenn der Executor eine Nachricht empfängt:
-
Nachrichtennormalisierung – die Eingabe wird in eine Liste von
MessageObjekten normalisiert und dem internen Cache des Executors hinzugefügt. Der Executor akzeptiert mehrere Eingabetypen —str,Message,list[str | Message],AgentExecutorRequestundAgentExecutorResponse— wobei jeder einem dedizierten Handler zugewiesen wird, der die Eingabe vor dem Zwischenspeichern normalisiert. -
Agentaufruf – der Ausführende ruft
agent.run()mit den zwischengespeicherten Nachrichten auf und wählt automatisch den Streaming-Modus oder Nicht-Streaming-Modus basierend auf dem Workflowausführungsmodus. -
Ausgabeemissionen – im Streamingmodus wird jedes
AgentResponseUpdateals Workflowausgabeereignis zurückgegeben. Im Nicht-Streaming-Modus wird ein einzelnesAgentResponsezurückgegeben. -
Downstream-Dispatch – Nachdem der Agent abgeschlossen ist, sendet der Executor einen
AgentExecutorResponsean alle verbundenen nachgeschaltete Executoren. Diese Antwort enthält den vollständigen Unterhaltungsverlauf und ermöglicht eine nahtlose Verkettung. - Cache-Zurücksetzung – Der interne Nachrichtencache des Executor wird gelöscht, nachdem der Agent aufgerufen wurde, um sicherzustellen, dass jeder Agentaufruf nur neue Nachrichten verarbeitet, die seit dem letzten Aufruf empfangen wurden.
Tipp
Einige Szenarien erfordern möglicherweise einen spezielleren Agent-Executor; Beispielsweise verwenden Handoff-Orchestrierungen einen dedizierten Executor mit benutzerdefinierter Routinglogik.
Implizite und explizite Erstellung
Die WorkflowBuilder wickelt Agenten automatisch in AgentExecutor-Instanzen ein, wenn Sie einen Agenten direkt übergeben. Für die meisten Workflows reicht die implizite Erstellung aus:
from agent_framework import WorkflowBuilder
writer_agent = client.as_agent(name="Writer", instructions="...")
reviewer_agent = client.as_agent(name="Reviewer", instructions="...")
# Agents are automatically wrapped — no manual AgentExecutor creation required
workflow = (
WorkflowBuilder(start_executor=writer_agent)
.add_edge(writer_agent, reviewer_agent)
.build()
)
Explizite Erstellung
Erstellen Sie eine AgentExecutor explizit, wenn Sie dies benötigen:
- Eine Sitzung zwischen mehreren Agenten freigeben.
- Stellen Sie eine benutzerdefinierte Executor-ID für Routing und gezielte Laufzeit-Kwargs bereit.
- Verweisen Sie auf dieselbe Executorinstanz in mehreren Rändern.
from agent_framework import AgentExecutor
writer_executor = AgentExecutor(writer_agent, id="my-writer")
reviewer_executor = AgentExecutor(reviewer_agent, id="my-reviewer")
workflow = (
WorkflowBuilder(start_executor=writer_executor)
.add_edge(writer_executor, reviewer_executor)
.build()
)
Konstruktorparameter:
| Parameter | Typ | Beschreibung |
|---|---|---|
agent |
SupportsAgentRun |
Der zu umschließende Agent. |
session |
AgentSession \| None |
Sitzung, die für Agentausführungen verwendet werden soll. Wenn None eine neue Sitzung durch den Agenten erstellt wird. |
id |
str \| None |
Eindeutige Executor-ID. Wenn verfügbar, wird standardmäßig der Name des Agents verwendet. |
Tipp
Die Executor-ID ist auch der Schlüssel, der verwendet wird, wenn Sie workflow.run(function_invocation_kwargs=...) oder client_kwargs= auf einzelne Agenten richten. Wenn Sie weglassen id, verwendet der Workflow den Namen des umschlossenen Agents.
Eingabetypen
Die AgentExecutor definiert mehrere Handlermethoden, die jeweils einen anderen Eingabetyp akzeptieren. Das Workflowmodul verteilt automatisch den richtigen Handler basierend auf dem Nachrichtentyp. Alle Eingabetypen lösen die Ausführung des Agents sofort aus, außer bei AgentExecutorRequest, wo das should_respond-Flag steuert, ob der Agent ausgeführt wird oder die Nachrichten zwischengespeichert werden.
| Eingabetyp | Bearbeiter | Auslöser-Agent | Beschreibung |
|---|---|---|---|
AgentExecutorRequest |
run |
Bedingt | Der kanonische Eingabetyp. Enthält eine Liste von Nachrichten und ein should_respond Flag, das steuert, ob der Agent ausgeführt werden soll. |
str |
from_str |
Immer | Akzeptiert eine unformatierte Zeichenfolgenaufforderung. |
Message |
from_message |
Immer | Akzeptiert ein einzelnes Message Objekt. |
list[str \| Message] |
from_messages |
Immer | Akzeptiert eine Liste von Zeichenfolgen oder Message Objekten als Unterhaltungskontext. |
AgentExecutorResponse |
from_response |
Immer | Akzeptiert die Antwort eines früheren Agentausführers und ermöglicht die direkte Verkettung. |
Verwenden von AgentExecutorRequest
AgentExecutorRequest ist der kanonische Eingabetyp und bietet das meiste Steuerelement:
from agent_framework import AgentExecutorRequest, Message
# Create a request with messages
request = AgentExecutorRequest(
messages=[Message(role="user", contents=["Hello, world!"])],
should_respond=True,
)
# Run the workflow
result = await workflow.run(request)
Das should_respond Flag steuert, ob der Agent die Nachrichten sofort verarbeitet oder einfach für später zwischenspeichert:
-
True(Standard) – der Agent läuft und erzeugt eine Antwort. -
False— die Nachrichten werden dem Cache hinzugefügt, der Agent wird jedoch nicht ausgeführt. Dies ist nützlich für das Vorabladen des Unterhaltungskontexts, bevor eine Antwort ausgelöst wird.
Ausgabe und Verkettung
Nach Abschluss des Agents sendet der Executor ein AgentExecutorResponse stromabwärts. Diese Datenklasse enthält:
| Feld | Typ | Beschreibung |
|---|---|---|
executor_id |
str |
Die ID des Ausführenden, der die Antwort erzeugt hat. |
agent_response |
AgentResponse |
Die zugrunde liegende Antwort des Agenten (nicht geändert vom Client). |
full_conversation |
list[Message] \| None |
Der vollständige Unterhaltungskontext (vorherige Eingaben + Agentausgabe) für die Verkettung. |
Beim Verketten von Agentausführern empfängt der nachgeschaltete Executor den AgentExecutorResponse über den from_response Handler. Es verwendet das full_conversation Feld, um den vollständigen Unterhaltungsverlauf beizubehalten und zu verhindern, dass nachfolgende Agenten den vorherigen Kontext verlieren.
spam_detector = AgentExecutor(create_spam_detector_agent())
email_assistant = AgentExecutor(create_email_assistant_agent())
# The email_assistant receives the spam_detector's full conversation context
workflow = (
WorkflowBuilder(start_executor=spam_detector)
.add_edge(spam_detector, email_assistant)
.build()
)
Streamingverhalten
Der AgentExecutor passt sich automatisch an den Workflow-Ausführungsmodus an.
-
stream=True— Aufrufeagent.run(stream=True)und Ergibt jedesAgentResponseUpdateals Workflowausgabeereignis. Nach Abschluss des Streamings werden die Updates für die nachgelagerte Weiterleitung in eine vollständigeAgentResponsezusammengefasst. -
stream=False(Standard) – ruftagent.run(stream=False)auf und gibt ein einzelnesAgentResponseals Workflow-Ausgabeereignis.
# Streaming mode — receive incremental updates
events = workflow.run("Write a story about a cat.", stream=True)
async for event in events:
if event.type == "output" and isinstance(event.data, AgentResponseUpdate):
print(event.data.text, end="", flush=True)
# Non-streaming mode — receive complete response
result = await workflow.run("Write a story about a cat.")
# Retrieve AgentResponse objects from the result
outputs = result.get_outputs()
for output in outputs:
if isinstance(output, AgentResponse):
print(output.text)
Geteilte Sitzungen
Standardmäßig erstellt jeder AgentExecutor eine eigene Sitzung. Um eine Sitzung zwischen mehreren Agents zu teilen (z. B. zum Verwalten eines gemeinsamen Unterhaltungsthreads), erstellen Sie eine Sitzung explizit, und übergeben Sie sie an jeden Executor:
from agent_framework import AgentExecutor
# Create a shared session from one agent
shared_session = writer_agent.create_session()
# Both executors share the same session
writer_executor = AgentExecutor(writer_agent, session=shared_session)
reviewer_executor = AgentExecutor(reviewer_agent, session=shared_session)
Hinweis
Nicht alle Agenten unterstützen freigegebene Sitzungen. In der Regel können nur Agents desselben Anbietertyps eine Sitzung gemeinsam nutzen.
Erstellen von Prüfpunkten
Dieses AgentExecutor unterstützt Prüfpunkte zum Speichern und Wiederherstellen des Zustands in lang andauernden Workflows. Wenn ein Prüfpunkt ausgeführt wird, serialisiert der Executor Folgendes:
- Der interne Nachrichtencache.
- Der vollständige Gesprächsverlauf.
- Der Sitzungsstatus des Agenten.
- Alle ausstehenden Benutzereingabeanforderungen und -antworten.
Bei der Wiederherstellung wird dieser Zustand vom Executor deserialisiert, sodass der Workflow von der Stelle fortgesetzt werden kann, an der er unterbrochen wurde.
Warnung
Die Prüfpunkte mit Agents, die serverseitige Sitzungen (z. B. FoundryAgent) verwenden, haben Einschränkungen. Der serverseitige Sitzungszustand wird in Prüfpunkten nicht erfasst und kann von nachfolgenden Ausführungen geändert werden. Erwägen Sie die Implementierung eines benutzerdefinierten Executors, wenn Sie zuverlässige Prüfpunkte mit serverseitigen Sitzungen benötigen.