Freigeben über


agent_framework Paket

Pakete

a2a
ag_ui
anthropic
azure
chatkit
declarative
devui
lab
mem0
microsoft
ollama
openai
redis

Module

exceptions
observability

Klassen

AIFunction

Ein Tool, das eine Python-Funktion umschließt, damit sie von KI-Modellen aufgerufen werden kann.

Diese Klasse umschließt eine Python-Funktion, damit sie von KI-Modellen mit automatischer Parameterüberprüfung und JSON-Schemagenerierung aufgerufen werden kann.

Initialisieren Sie die AIFunction.

AgentExecutor

Integrierter Executor, der einen Agent zum Behandeln von Nachrichten umschließt.

AgentExecutor passt sein Verhalten basierend auf dem Workflowausführungsmodus an:

  • run_stream(): Emits inkrementelle AgentRunUpdateEvent-Ereignisse, wenn der Agent Token erzeugt
  • run(): Emits a single AgentRunEvent containing the complete response

Der Executor erkennt den Modus automatisch über WorkflowContext.is_streaming().

Initialisieren Sie den Executor mit einem eindeutigen Bezeichner.

AgentExecutorRequest

Eine Anforderung an einen Agent-Executor.

AgentExecutorResponse

Eine Antwort von einem Agent-Executor.

AgentInputRequest

Anforderung für menschliche Eingaben, bevor ein Agent in übergeordneten Generatorworkflows ausgeführt wird.

Wird über RequestInfoEvent ausgegeben, wenn ein Workflow angehalten wird, bevor ein Agent ausgeführt wird. Die Antwort wird als Benutzernachricht in die Unterhaltung eingefügt, um das Verhalten des Agenten zu steuern.

Dies ist der Standardanforderungstyp, der von .with_request_info() für SequentialBuilder, ConcurrentBuilder, GroupChatBuilder und HandoffBuilder verwendet wird.

AgentMiddleware

Abstrakte Basisklasse für Agent-Middleware, die Agentaufrufe abfangen kann.

Mit Agent-Middleware können Sie Agent-Aufrufe vor und nach der Ausführung abfangen und ändern. Sie können Nachrichten überprüfen, Kontext ändern, Ergebnisse außer Kraft setzen oder die Ausführung frühzeitig beenden.

Hinweis

AgentMiddleware ist eine abstrakte Basisklasse. Sie müssen sie unterklassen und implementieren

die Process()-Methode zum Erstellen von benutzerdefinierter Agent-Middleware.

AgentProtocol

Ein Protokoll für einen Agent, der aufgerufen werden kann.

Dieses Protokoll definiert die Schnittstelle, die alle Agents implementieren müssen, einschließlich Eigenschaften für die Identifizierung und Methoden für die Ausführung.

Hinweis

Protokolle verwenden strukturelle Untertypisierung (Ententyping). Klassen benötigen nicht

um explizit von diesem Protokoll zu erben, um als kompatibel betrachtet zu werden.

Auf diese Weise können Sie vollständig benutzerdefinierte Agents erstellen, ohne diese zu verwenden.

alle Agent Framework-Basisklassen.

AgentRunContext

Kontextobjekt für Agent-Middleware-Aufrufe.

Dieser Kontext wird über die Agent-Middleware-Pipeline übergeben und enthält alle Informationen über den Agentaufruf.

Initialisieren Sie den AgentRunContext.

AgentRunEvent

Das Ereignis wird ausgelöst, wenn eine Agentausführung abgeschlossen ist.

Initialisieren Sie das Agent-Ausführungsereignis.

AgentRunResponse

Stellt die Antwort auf eine Agent-Ausführungsanforderung dar.

Stellt eine oder mehrere Antwortnachrichten und Metadaten zur Antwort bereit. Eine typische Antwort enthält eine einzelne Nachricht, kann jedoch mehrere Nachrichten in Szenarien mit Funktionsaufrufen, RAG-Abrufen oder komplexer Logik enthalten.

Initialisieren Sie einen AgentRunResponse.

AgentRunResponseUpdate

Stellt einen einzelnen Streamingantwortabschnitt eines Agents dar.

Initialisieren sie ein AgentRunResponseUpdate.

AgentRunUpdateEvent

Das Ereignis wird ausgelöst, wenn ein Agent Nachrichten streamt.

Initialisieren Sie das Agent-Streamingereignis.

AgentThread

Die Agentthreadklasse kann sowohl einen lokal verwalteten Thread als auch einen vom Dienst verwalteten Thread darstellen.

Ein AgentThread Verwaltet den Unterhaltungsstatus und den Nachrichtenverlauf für eine Agentinteraktion. Sie kann entweder einen dienstverwalteten Thread (über service_thread_id) oder einen lokalen Nachrichtenspeicher (über message_store), aber nicht beides verwenden.

Initialisieren Sie einen AgentThread, verwenden Sie diese Methode nicht manuell, immer verwenden: agent.get_new_thread().

Hinweis

Entweder service_thread_id oder message_store können festgelegt werden, aber nicht beide.

AggregateContextProvider

Ein ContextProvider, der mehrere Kontextanbieter enthält.

Es delegiert Ereignisse an mehrere Kontextanbieter und aggregiert Antworten von diesen Ereignissen, bevor sie zurückgegeben werden. Auf diese Weise können Sie mehrere Kontextanbieter in einem einzigen Anbieter kombinieren.

Hinweis

Ein AggregateContextProvider wird automatisch erstellt, wenn Sie einen einzelnen Kontext übergeben.

Anbieter oder eine Abfolge von Kontextanbietern für den Agentkonstruktor.

Initialisieren Sie den AggregateContextProvider mit Kontextanbietern.

BaseAgent

Basisklasse für alle Agent Framework-Agents.

Diese Klasse bietet Kernfunktionen für Agentimplementierungen, einschließlich Kontextanbietern, Middleware-Unterstützung und Threadverwaltung.

Hinweis

BaseAgent kann nicht direkt instanziiert werden, da er die Implementierung der

run(), run_stream() und andere Methoden, die von AgentProtocol erforderlich sind.

Verwenden Sie eine konkrete Implementierung wie ChatAgent oder erstellen Sie eine Unterklasse.

Initialisieren sie eine BaseAgent-Instanz.

BaseAnnotation

Basisklasse für alle KI-Anmerkungstypen.

Initialisieren sie BaseAnnotation.

BaseChatClient

Basisklasse für Chatclients.

Diese abstrakte Basisklasse bietet Kernfunktionen für Chatclientimplementierungen, einschließlich Middleware-Unterstützung, Nachrichtenvorbereitung und Toolnormalisierung.

Hinweis

BaseChatClient kann nicht direkt instanziiert werden, da es sich um eine abstrakte Basisklasse handelt.

Unterklassen müssen _inner_get_response() und _inner_get_streaming_response() implementieren.

Initialisieren einer BaseChatClient-Instanz.

BaseContent

Stellt Inhalte dar, die von KI-Diensten verwendet werden.

Initialisieren sie BaseContent.

Case

Laufzeitwrapper, der ein Switch-Case-Prädikat mit seinem Ziel kombiniert.

Jeder Fall gekoppelt ein boolesches Prädikat mit dem Executor, das die Nachricht behandeln soll, wenn das Prädikat als True ausgewertet wird. Die Laufzeit hält diesen einfachen Container getrennt vom serialisierbaren SwitchCaseEdgeGroupCase , sodass die Ausführung mit Live-Aufrufen ausgeführt werden kann, ohne den dauerhaften Zustand zu verunreinigen.

ChatAgent

Ein Chatclient-Agent.

Dies ist die primäre Agentimplementierung, die einen Chatclient für die Interaktion mit Sprachmodellen verwendet. Es unterstützt Tools, Kontextanbieter, Middleware und sowohl Streaming- als auch Nicht-Streaming-Antworten.

Initialisieren sie eine ChatAgent-Instanz.

Hinweis

Der Satz von Parametern von frequency_penalty auf request_kwargs wird verwendet, um

rufen Sie den Chatclient an. Sie können auch an beide Ausführungsmethoden übergeben werden.

Wenn beide festgelegt sind, haben die an die Ausführungsmethoden übergebenen Vorrang.

ChatClientProtocol

Ein Protokoll für einen Chatclient, der Antworten generieren kann.

Dieses Protokoll definiert die Schnittstelle, die alle Chatclients implementieren müssen, einschließlich Methoden zum Generieren von Streaming- und Nicht-Streaming-Antworten.

Hinweis

Protokolle verwenden strukturelle Untertypisierung (Ententyping). Klassen benötigen nicht

um explizit von diesem Protokoll zu erben, um als kompatibel betrachtet zu werden.

ChatContext

Context-Objekt für Chat-Middleware-Aufrufe.

Dieser Kontext wird über die Chat-Middleware-Pipeline übergeben und enthält alle Informationen zur Chatanfrage.

Initialisieren Sie den ChatContext.

ChatMessage

Stellt eine Chatnachricht dar.

Initialisieren sie ChatMessage.

ChatMessageStore

Eine In-Memory-Implementierung von ChatMessageStoreProtocol, die Nachrichten in einer Liste speichert.

Diese Implementierung bietet einen einfachen listenbasierten Speicher für Chatnachrichten mit Unterstützung für Serialisierung und Deserialisierung. Es implementiert alle erforderlichen Methoden des ChatMessageStoreProtocol Protokolls.

Der Speicher verwaltet Nachrichten im Arbeitsspeicher und stellt Methoden zum Serialisieren und Deserialisieren des Zustands für Persistenzzwecke bereit.

Erstellen Sie einen ChatMessageStore für die Verwendung in einem Thread.

ChatMessageStoreProtocol

Definiert Methoden zum Speichern und Abrufen von Chatnachrichten, die einem bestimmten Thread zugeordnet sind.

Implementierungen dieses Protokolls sind für die Verwaltung des Speichers von Chatnachrichten verantwortlich, einschließlich der Verarbeitung großer Datenmengen durch Abschneiden oder Zusammenfassen von Nachrichten nach Bedarf.

ChatMiddleware

Abstrakte Basisklasse für Chat-Middleware, die Chatclientanforderungen abfangen kann.

Mit Chat-Middleware können Sie Chatclientanforderungen vor und nach der Ausführung abfangen und ändern. Sie können Nachrichten ändern, Systemaufforderungen, Protokollanforderungen hinzufügen oder Chatantworten außer Kraft setzen.

Hinweis

ChatMiddleware ist eine abstrakte Basisklasse. Sie müssen sie unterklassen und implementieren

die Process()-Methode zum Erstellen benutzerdefinierter Chat-Middleware.

ChatOptions

Allgemeine Anforderungseinstellungen für KI-Dienste.

Initialisieren sie ChatOptions.

ChatResponse

Stellt die Antwort auf eine Chatanfrage dar.

Initialisiert einen ChatResponse mit den bereitgestellten Parametern.

ChatResponseUpdate

Stellt einen einzelnen Streamingantwortabschnitt aus einem ChatClient dar.

Initialisiert ein ChatResponseUpdate mit den bereitgestellten Parametern.

CheckpointStorage

Protokoll für Prüfpunktspeicher-Back-Ends.

CitationAnnotation

Stellt eine Zitatanmerkung dar.

Initialisieren Sie CitationAnnotation.

ConcurrentBuilder

Übergeordneter Generator für gleichzeitige Agent-Workflows.

  • teilnehmer([...]) akzeptiert eine Liste von AgentProtocol (empfohlen) oder Executor.

  • register_participants([...]) akzeptiert eine Liste der Fabriken für AgentProtocol (empfohlen)

    oder Executor-Fabriken

  • build() kabel: dispatcher -> fan-out -> teilnehmer -> fan-in -> aggregator.

  • with_aggregator(...) setzt den Standardaggregator mit einem Executor oder Callback außer Kraft.

  • register_aggregator(...) akzeptiert eine Factory für einen Executor als benutzerdefinierter Aggregator.

Verwendung:


   from agent_framework import ConcurrentBuilder

   # Minimal: use default aggregator (returns list[ChatMessage])
   workflow = ConcurrentBuilder().participants([agent1, agent2, agent3]).build()

   # With agent factories
   workflow = ConcurrentBuilder().register_participants([create_agent1, create_agent2, create_agent3]).build()


   # Custom aggregator via callback (sync or async). The callback receives
   # list[AgentExecutorResponse] and its return value becomes the workflow's output.
   def summarize(results: list[AgentExecutorResponse]) -> str:
       return " | ".join(r.agent_run_response.messages[-1].text for r in results)


   workflow = ConcurrentBuilder().participants([agent1, agent2, agent3]).with_aggregator(summarize).build()


   # Custom aggregator via a factory
   class MyAggregator(Executor):
       @handler
       async def aggregate(self, results: list[AgentExecutorResponse], ctx: WorkflowContext[Never, str]) -> None:
           await ctx.yield_output(" | ".join(r.agent_run_response.messages[-1].text for r in results))


   workflow = (
       ConcurrentBuilder()
       .register_participants([create_agent1, create_agent2, create_agent3])
       .register_aggregator(lambda: MyAggregator(id="my_aggregator"))
       .build()
   )


   # Enable checkpoint persistence so runs can resume
   workflow = ConcurrentBuilder().participants([agent1, agent2, agent3]).with_checkpointing(storage).build()

   # Enable request info before aggregation
   workflow = ConcurrentBuilder().participants([agent1, agent2]).with_request_info().build()
Context

Eine Klasse mit jedem Kontext, der dem KI-Modell bereitgestellt werden soll, wie es von einem ContextProvider bereitgestellt wird.

Jeder ContextProvider verfügt über die Möglichkeit, für jeden Aufruf einen eigenen Kontext bereitzustellen. Die Context-Klasse enthält den zusätzlichen Kontext, der vom ContextProvider bereitgestellt wird. Dieser Kontext wird mit dem von anderen Anbietern bereitgestellten Kontext kombiniert, bevor er an das KI-Modell übergeben wird. Dieser Kontext ist aufrufbar und wird nicht als Teil des Chatverlaufs gespeichert.

Erstellen Sie ein neues Context-Objekt.

ContextProvider

Basisklasse für alle Kontextanbieter.

Ein Kontextanbieter ist eine Komponente, die verwendet werden kann, um die Kontextverwaltung der KI zu verbessern. Es kann änderungen in der Unterhaltung lauschen und zusätzlichen Kontext für das KI-Modell direkt vor dem Aufruf bereitstellen.

Hinweis

ContextProvider ist eine abstrakte Basisklasse. Sie müssen sie unterklassen und implementieren

die "invoking()"-Methode zum Erstellen eines benutzerdefinierten Kontextanbieters. Idealerweise sollten Sie

implementieren Sie außerdem die Methoden invoked() und thread_created() zum Nachverfolgen der Unterhaltung

status, aber diese sind optional.

DataContent

Stellt binären Dateninhalt mit einem zugeordneten Medientyp dar (auch als MIME-Typ bezeichnet).

Von Bedeutung

Dies gilt für Binärdaten, die als Daten-URI dargestellt werden, nicht für Onlineressourcen.

Verwenden Sie UriContent für Onlineressourcen.

Initialisiert eine DataContent-Instanz.

Von Bedeutung

Dies gilt für Binärdaten, die als Daten-URI dargestellt werden, nicht für Onlineressourcen.

Verwenden Sie UriContent für Onlineressourcen.

Default

Laufzeitdarstellung der Standardverzweigung in einer Switch-Case-Gruppe.

Der Standardverzweigung wird nur aufgerufen, wenn keine anderen Groß-/Kleinschreibungs-Prädikate übereinstimmen. In der Praxis ist es garantiert, dass Routing niemals ein leeres Ziel erzeugt.

Edge

Modellieren Sie eine gerichtete, optional bedingte Übergabe zwischen zwei Executoren.

Jeder Edge erfasst die minimalen Metadaten, die erforderlich sind, um eine Nachricht von einem Executor in ein anderes innerhalb des Workflowdiagramms zu verschieben. Es bettet optional ein boolesches Prädikat ein, das entscheidet, ob der Edge zur Laufzeit verwendet werden soll. Durch die Serialisierung des Rands bis hin zu Grundtypen können wir die Topologie eines Workflows unabhängig vom ursprünglichen Python-Prozess rekonstruieren.

Initialisieren Sie einen vollständig angegebenen Rand zwischen zwei Workflowausführern.

EdgeDuplicationError

Ausnahme ausgelöst, wenn doppelte Kanten im Workflow erkannt werden.

ErrorContent

Stellt einen Fehler dar.

Hinweise: Wird in der Regel für nicht schwerwiegende Fehler verwendet, bei denen im Rahmen des Vorgangs ein Fehler aufgetreten ist, der Vorgang jedoch weiterhin fortgesetzt werden konnte.

Initialisiert eine ErrorContent-Instanz.

Executor

Basisklasse für alle Workflowausführer, die Nachrichten verarbeiten und Berechnungen durchführen.

Überblick

Executors sind die grundlegenden Bausteine von Workflows, die einzelne Verarbeitungseinheiten darstellen, die Nachrichten empfangen, Vorgänge ausführen und Ausgaben erzeugen. Jeder Executor wird eindeutig identifiziert und kann bestimmte Nachrichtentypen über dekorierte Handlermethoden verarbeiten.

Typensystem

Executors verfügen über ein umfassendes System, das ihre Funktionen definiert:

Eingabetypen

Die Typen von Nachrichten, die ein Executor verarbeiten kann, ermittelt aus Handlermethodensignaturen:


   class MyExecutor(Executor):
       @handler
       async def handle_string(self, message: str, ctx: WorkflowContext) -> None:
           # This executor can handle 'str' input types

Zugriff über die eigenschaft input_types .

Ausgabetypen

Die Arten von Nachrichten, die ein Executor über ctx.send_message()an andere Executoren senden kann:


   class MyExecutor(Executor):
       @handler
       async def handle_data(self, message: str, ctx: WorkflowContext[int | bool]) -> None:
           # This executor can send 'int' or 'bool' messages

Zugriff über die eigenschaft output_types .

Workflowausgabetypen

Die Arten von Daten, die ein Executor als Ausgabe auf Workflowebene über ctx.yield_output():


   class MyExecutor(Executor):
       @handler
       async def process(self, message: str, ctx: WorkflowContext[int, str]) -> None:
           # Can send 'int' messages AND yield 'str' workflow outputs

Zugriff über die eigenschaft workflow_output_types .

Handlerermittlung

Executors entdecken ihre Funktionen durch verzierte Methoden:

@handler Dekorateur

Markiert Methoden zum Verarbeiten eingehender Nachrichten:


   class MyExecutor(Executor):
       @handler
       async def handle_text(self, message: str, ctx: WorkflowContext[str]) -> None:
           await ctx.send_message(message.upper())

Abfangen von Subworkflow-Anforderungen

Verwenden Sie Methoden zum Abfangen von @handler Unterworkflowanforderungen:


   class ParentExecutor(Executor):
       @handler
       async def handle_subworkflow_request(
           self,
           request: SubWorkflowRequestMessage,
           ctx: WorkflowContext[SubWorkflowResponseMessage],
       ) -> None:
           if self.is_allowed(request.domain):
               response = request.create_response(data=True)
               await ctx.send_message(response, target_id=request.executor_id)
           else:
               await ctx.request_info(request.source_event, response_type=request.source_event.response_type)

Kontexttypen

Handlermethoden empfangen verschiedene WorkflowContext-Varianten basierend auf ihren Typanmerkungen:

WorkflowContext (keine Typparameter)

Für Handler, die nur Nebenwirkungen ausführen, ohne Nachrichten zu senden oder Ausgaben zu liefern:


   class LoggingExecutor(Executor):
       @handler
       async def log_message(self, msg: str, ctx: WorkflowContext) -> None:
           print(f"Received: {msg}")  # Only logging, no outputs

WorkflowContext[T_Out]

Ermöglicht das Senden von Nachrichten vom Typ T_Out über ctx.send_message():


   class ProcessorExecutor(Executor):
       @handler
       async def handler(self, msg: str, ctx: WorkflowContext[int]) -> None:
           await ctx.send_message(42)  # Can send int messages

WorkflowContext[T_Out, T_W_Out]

Ermöglicht sowohl das Senden von Nachrichten (T_Out) als auch das Zurückgeben von Workflowausgaben (T_W_Out):


   class DualOutputExecutor(Executor):
       @handler
       async def handler(self, msg: str, ctx: WorkflowContext[int, str]) -> None:
           await ctx.send_message(42)  # Send int message
           await ctx.yield_output("done")  # Yield str workflow output

Funktionsausführer

Einfache Funktionen können mithilfe des @executor Dekorators in Executoren konvertiert werden:


   @executor
   async def process_text(text: str, ctx: WorkflowContext[str]) -> None:
       await ctx.send_message(text.upper())


   # Or with custom ID:
   @executor(id="text_processor")
   def sync_process(text: str, ctx: WorkflowContext[str]) -> None:
       ctx.send_message(text.lower())  # Sync functions run in thread pool

Subworkflow-Komposition

Executors können Unterworkflows mit WorkflowExecutor enthalten. Unterworkflows können Anforderungen stellen, die übergeordnete Workflows abfangen können. Ausführliche Informationen zu Workflowkompositionsmustern und Anforderungs-/Antwortbehandlung finden Sie in der WorkflowExecutor-Dokumentation.

Zustandsverwaltung

Executors können Zustände enthalten, die über Workflowausführungen und Prüfpunkte hinweg beibehalten werden. Überschreiben Sie die methoden on_checkpoint_save und on_checkpoint_restore , um benutzerdefinierte Zustands serialisierung und Wiederherstellungslogik zu implementieren.

Implementierungshinweise

  • Rufen Sie execute() nicht direkt auf – sie wird vom Workflowmodul aufgerufen.
  • Do not override execute() - define handlers using decorators instead
  • Jeder Executor muss über mindestens eine @handler Methode verfügen.
  • Handlermethodensignaturen werden zur Initialisierungszeit überprüft.

Initialisieren Sie den Executor mit einem eindeutigen Bezeichner.

ExecutorCompletedEvent

Das Ereignis wird ausgelöst, wenn ein Executorhandler abgeschlossen ist.

Initialisieren Sie das Executorereignis mit einer Executor-ID und optionalen Daten.

ExecutorEvent

Basisklasse für Executorereignisse.

Initialisieren Sie das Executorereignis mit einer Executor-ID und optionalen Daten.

ExecutorFailedEvent

Das Ereignis wird ausgelöst, wenn ein Executorhandler einen Fehler auslöst.

ExecutorInvokedEvent

Das Ereignis wird ausgelöst, wenn ein Executorhandler aufgerufen wird.

Initialisieren Sie das Executorereignis mit einer Executor-ID und optionalen Daten.

FanInEdgeGroup

Stellt einen konvergenden Satz von Rändern dar, der einen einzelnen nachgeschalteten Executor einspeist.

Fan-In-Gruppen werden in der Regel verwendet, wenn mehrere Upstreamstufen unabhängig voneinander Nachrichten erzeugen, die alle auf demselben Downstreamprozessor ankommen sollten.

Erstellen Sie eine Fan-In-Zuordnung, die mehrere Quellen in einem Ziel zusammenführt.

FanOutEdgeGroup

Stellen Sie eine Randgruppe im Übertragungsstil mit optionaler Auswahllogik dar.

Ein Fanout leitet eine Nachricht, die von einem einzelnen Quellausführer erzeugt wird, an einen oder mehrere nachgeschaltete Executoren weiter. Zur Laufzeit können wir die Ziele weiter einschränken, indem wir eine selection_func ausführen, die die Nutzlast überprüft, und die Teilmenge der IDs zurückgibt, die die Nachricht erhalten sollen.

Erstellen Sie eine Fanoutzuordnung aus einer einzigen Quelle zu vielen Zielen.

FileCheckpointStorage

Dateibasierter Prüfpunktspeicher für Persistenz.

Initialisieren Sie den Dateispeicher.

FinishReason

Stellt den Grund dar, warum eine Chatantwort abgeschlossen wurde.

Initialisieren Sie FinishReason mit einem Wert.

FunctionApprovalRequestContent

Stellt eine Anforderung für die Benutzergenehmigung eines Funktionsaufrufs dar.

Initialisiert eine FunctionApprovalRequestContent-Instanz.

FunctionApprovalResponseContent

Stellt eine Antwort für die Benutzergenehmigung eines Funktionsaufrufs dar.

Initialisiert eine FunctionApprovalResponseContent-Instanz.

FunctionCallContent

Stellt eine Funktionsaufrufanforderung dar.

Initialisiert eine FunctionCallContent-Instanz.

FunctionExecutor

Executor, der eine benutzerdefinierte Funktion umschließt.

Mit diesem Executor können Benutzer einfache Funktionen (sowohl Synchronisierung als auch asynchron) definieren und als Workflowausführer verwenden, ohne vollständige Executorklassen erstellen zu müssen.

Synchrone Funktionen werden in einem Threadpool mithilfe von asyncio.to_thread() ausgeführt, um zu verhindern, dass die Ereignisschleife blockiert wird.

Initialisieren Sie den FunctionExecutor mit einer benutzerdefinierten Funktion.

FunctionInvocationConfiguration

Konfiguration für Funktionsaufrufe in Chatclients.

Diese Klasse wird automatisch auf jedem Chatclient erstellt, der Funktionsaufrufe unterstützt. Dies bedeutet, dass Sie in den meisten Fällen nur die Attribute für die Instanz ändern können, anstatt dann eine neue zu erstellen.

Initialize FunctionInvocationConfiguration.

FunctionInvocationContext

Context-Objekt für Funktions-Middleware-Aufrufe.

Dieser Kontext wird über die Middlewarepipeline der Funktion übergeben und enthält alle Informationen über den Funktionsaufruf.

Initialisieren Sie den FunctionInvocationContext.

FunctionMiddleware

Abstrakte Basisklasse für Funktions-Middleware, die Funktionsaufrufe abfangen kann.

Mit Funktions-Middleware können Sie Funktions-/Toolaufrufe vor und nach der Ausführung abfangen und ändern. Sie können Argumente, Cacheergebnisse, Protokollaufrufe oder Die Funktionsausführung außer Kraft setzen.

Hinweis

FunctionMiddleware ist eine abstrakte Basisklasse. Sie müssen sie unterklassen und implementieren

die Process()-Methode zum Erstellen benutzerdefinierter Funktions-Middleware.

FunctionResultContent

Stellt das Ergebnis eines Funktionsaufrufs dar.

Initialisiert eine FunctionResultContent-Instanz.

GraphConnectivityError

Ausnahme ausgelöst, wenn Diagrammkonnektivitätsprobleme erkannt werden.

GroupChatBuilder

High-Level Builder für managergesteuerte Gruppenchatworkflows mit dynamischer Orchestrierung.

GroupChat koordiniert Multi-Agent-Unterhaltungen mit einem Vorgesetzten, der auswählt, welcher Teilnehmer als Nächstes spricht. Der Manager kann eine einfache Python-Funktion (set_select_speakers_func) oder eine agentbasierte Selektor über set_manager. Diese beiden Ansätze schließen sich gegenseitig aus.

Kernworkflow:

  1. Definieren von Teilnehmern: Liste der Agents (verwendet ihren .name) oder diktieren Namen zu Agents

  2. Konfigurieren der Lautsprecherauswahl: set_select_speakers_func ODER

    set_manager (nicht beide)

  3. Optional: Festlegen von Rundengrenzwerten, Prüfpunkting, Beendigungsbedingungen

  4. Erstellen und Ausführen des Workflows

Auswahlmuster für Lautsprecher:

Muster 1: Einfache funktionsbasierte Auswahl (empfohlen)


   from agent_framework import GroupChatBuilder, GroupChatStateSnapshot


   def select_next_speaker(state: GroupChatStateSnapshot) -> str | None:
       # state contains: task, participants, conversation, history, round_index
       if state["round_index"] >= 5:
           return None  # Finish
       last_speaker = state["history"][-1].speaker if state["history"] else None
       if last_speaker == "researcher":
           return "writer"
       return "researcher"


   workflow = (
       GroupChatBuilder()
       .set_select_speakers_func(select_next_speaker)
       .participants([researcher_agent, writer_agent])  # Uses agent.name
       .build()
   )

Muster 2: LLM-basierte Auswahl


   from agent_framework import ChatAgent
   from agent_framework.azure import AzureOpenAIChatClient

   manager_agent = AzureOpenAIChatClient().create_agent(
       instructions="Coordinate the conversation and pick the next speaker.",
       name="Coordinator",
       temperature=0.3,
       seed=42,
       max_tokens=500,
   )

   workflow = (
       GroupChatBuilder()
       .set_manager(manager_agent, display_name="Coordinator")
       .participants([researcher, writer])  # Or use dict: researcher=r, writer=w
       .with_max_rounds(10)
       .build()
   )

Muster 3: Anfordern von Informationen für Feedback zur Mitte der Unterhaltung


   from agent_framework import GroupChatBuilder

   # Pause before all participants
   workflow = (
       GroupChatBuilder()
       .set_select_speakers_func(select_next_speaker)
       .participants([researcher, writer])
       .with_request_info()
       .build()
   )

   # Pause only before specific participants
   workflow = (
       GroupChatBuilder()
       .set_select_speakers_func(select_next_speaker)
       .participants([researcher, writer, editor])
       .with_request_info(agents=[editor])  # Only pause before editor responds
       .build()
   )

Teilnehmerspezifikation:

Zwei Möglichkeiten zum Angeben von Teilnehmern:

  • Listenformular: [Agent1, Agent2] – verwendet agent.name Attribut für Teilnehmernamen.
  • Diktierformular : {name1: agent1, name2: agent2} - explizites Namenssteuerelement
  • Schlüsselwortformular: participants(name1=agent1, name2=agent2) - explizites Namenssteuerelement

Zustandsmomentaufnahmestruktur:

Der an set_select_speakers_func übergebene GroupChatStateSnapshot enthält Folgendes:

  • task: ChatMessage - Ursprüngliche Benutzeraufgabe
  • teilnehmer: dict[str, str] - Zuordnung der Teilnehmernamen zu Beschreibungen
  • unterhaltung: tuple[ChatMessage, ...] - Vollständiger Unterhaltungsverlauf
  • Verlauf: Tupel[GroupChatTurn, ...] - Turn-by-turn-Record mit Sprecherzuordnung
  • round_index: int - Anzahl der bisherigen Vorgesetztenauswahlrunden
  • pending_agent: str | None – Name des aktuell verarbeiteten Agents (falls vorhanden)

Wichtige Einschränkungen:

  • Kann nicht kombinieren set_select_speakers_func und set_manager
  • Die Namen der Teilnehmer müssen eindeutig sein.
  • Bei Verwendung eines Listenformulars müssen Agents über ein nicht leeres Namensattribute verfügen.

Initialisieren Sie groupChatBuilder.

GroupChatDirective

Anweisung, die von einer Gruppenchat-Manager-Implementierung ausgegeben wird.

HandoffBuilder

Fluent Builder für Unterhaltungshandworkflows mit Koordinator und Spezialisten.

Das Übergabemuster ermöglicht es einem Koordinator, Anfragen an Spezialisten weiterzuleiten. Der Interaktionsmodus steuert, ob der Workflow die Benutzereingabe nach jeder Agentantwort anfordert oder autonom abgeschlossen wird, sobald die Agents fertig sind. Eine Beendigungsbedingung bestimmt, wann der Workflow das Anfordern von Eingaben beenden und abgeschlossen werden soll.

Routingmuster:

Single-Tier (Standard): Nur der Koordinator kann spezialisten übergeben. Standardmäßig kehrt das Steuerelement nach antwortenden Spezialisten für weitere Eingaben an den Benutzer zurück. Dadurch entsteht ein zyklischer Fluss: Benutzer -> Koordinator -> [optionaler Spezialist] -> Benutzer -> Koordinator -> ... Verwenden Sie with_interaction_mode("autonom"), um das Anfordern zusätzlicher Benutzereingaben zu überspringen und die letzte Unterhaltung zu erzielen, wenn ein Agent antwortet, ohne zu delegieren.

Mehrstufige (erweitert): Spezialisten können sich mit .add_handoff()an andere Spezialisten übergeben. Dies bietet mehr Flexibilität für komplexe Workflows, ist jedoch weniger steuerbar als das Einzelebenenmuster. Benutzer verlieren die Echtzeitsichtigkeit in Zwischenschritte während fachspezifischer Handoffs (obwohl der vollständige Unterhaltungsverlauf einschließlich aller Übergaben erhalten bleibt und danach geprüft werden kann).

Wichtige Features:

  • Automatische Übergabeerkennung: Der Koordinator ruft ein Handofftool auf, dessen Handoff-Tool

    Argumente (z. B. {"handoff_to": "shipping_agent"}) identifizieren den Spezialisten, der die Steuerung erhält.

  • Automatisch generierte Tools: Standardmäßig synthetisiert der Generator handoff_to_<Agent-Tools> für den Koordinator, sodass Sie Platzhalterfunktionen nicht manuell definieren.

  • Vollständiger Unterhaltungsverlauf: Die gesamte Unterhaltung (einschließlich aller ChatMessage.additional_properties) wird beibehalten und an jeden Agent übergeben.

  • Beendigungssteuerung: Wird standardmäßig nach 10 Benutzernachrichten beendet. Außerkraftsetzen mit .with_termination_condition(Lambda-Konv: ...) für benutzerdefinierte Logik (z. B. "verabschieden").

  • Interaktionsmodi: Wählen Sie human_in_loop (Standard) aus, um Benutzer zwischen Agentwechseln oder autonom aufzufordern, um die Weiterleitung an Agents fortzusetzen, ohne benutzereingaben aufzufordern, bis eine Übergabe erfolgt oder ein Beendigungs-/Turnlimit erreicht ist (standardautonomer Turn-Grenzwert: 50).

  • Prüfpunkt: Optionale Persistenz für reaktivierbare Workflows.

Nutzung (Single-Tier):


   from agent_framework import HandoffBuilder
   from agent_framework.openai import OpenAIChatClient

   chat_client = OpenAIChatClient()

   # Create coordinator and specialist agents
   coordinator = chat_client.create_agent(
       instructions=(
           "You are a frontline support agent. Assess the user's issue and decide "
           "whether to hand off to 'refund_agent' or 'shipping_agent'. When delegation is "
           "required, call the matching handoff tool (for example `handoff_to_refund_agent`)."
       ),
       name="coordinator_agent",
   )

   refund = chat_client.create_agent(
       instructions="You handle refund requests. Ask for order details and process refunds.",
       name="refund_agent",
   )

   shipping = chat_client.create_agent(
       instructions="You resolve shipping issues. Track packages and update delivery status.",
       name="shipping_agent",
   )

   # Build the handoff workflow - default single-tier routing
   workflow = (
       HandoffBuilder(
           name="customer_support",
           participants=[coordinator, refund, shipping],
       )
       .set_coordinator(coordinator)
       .build()
   )

   # Run the workflow
   events = await workflow.run_stream("My package hasn't arrived yet")
   async for event in events:
       if isinstance(event, RequestInfoEvent):
           # Request user input
           user_response = input("You: ")
           await workflow.send_response(event.data.request_id, user_response)

Mehrstufiges Routing mit .add_handoff():


   # Enable specialist-to-specialist handoffs with fluent API
   workflow = (
       HandoffBuilder(participants=[coordinator, replacement, delivery, billing])
       .set_coordinator(coordinator)
       .add_handoff(coordinator, [replacement, delivery, billing])  # Coordinator routes to all
       .add_handoff(replacement, [delivery, billing])  # Replacement delegates to delivery/billing
       .add_handoff(delivery, billing)  # Delivery escalates to billing
       .build()
   )

   # Flow: User → Coordinator → Replacement → Delivery → Back to User
   # (Replacement hands off to Delivery without returning to user)

Verwenden Sie Die Teilnehmerfabriken für die Zustandsisolation:

Benutzerdefinierte Beendigungsbedingung:


   # Terminate when user says goodbye or after 5 exchanges
   workflow = (
       HandoffBuilder(participants=[coordinator, refund, shipping])
       .set_coordinator(coordinator)
       .with_termination_condition(
           lambda conv: (
               sum(1 for msg in conv if msg.role.value == "user") >= 5
               or any("goodbye" in msg.text.lower() for msg in conv[-2:])
           )
       )
       .build()
   )

Prüfpunkt:


   from agent_framework import InMemoryCheckpointStorage

   storage = InMemoryCheckpointStorage()
   workflow = (
       HandoffBuilder(participants=[coordinator, refund, shipping])
       .set_coordinator(coordinator)
       .with_checkpointing(storage)
       .build()
   )

Initialisieren Sie einen HandoffBuilder zum Erstellen von Unterhaltungshandoffworkflows.

Der Generator beginnt in einem nicht konfigurierten Zustand und erfordert, dass Sie Folgendes aufrufen:

  1. .participants([...]) - Registrieren von Agents
  2. oder .participant_factories({...}) – Agent/Executor-Fabriken registrieren
  3. .set_coordinator(...) – Festlegen, welcher Agent erste Benutzereingaben empfängt
  4. .build() – Erstellen des endgültigen Workflows

Mit optionalen Konfigurationsmethoden können Sie kontextverwaltung, Beendigungslogik und Persistenz anpassen.

Hinweis

Teilnehmer müssen über stabile Namen/IDs verfügen, da der Workflow den

Übergabetoolargumente an diese Bezeichner. Agentnamen sollten übereinstimmen

die vom Handofftool des Koordinators ausgegebenen Zeichenfolgen (z. B. ein Tool, das

gibt {"handoff_to" aus: "billing"} erfordert einen Agenten namens Abrechnung).

HandoffUserInputRequest

Anforderungsmeldung, die ausgegeben wird, wenn der Workflow neue Benutzereingaben benötigt.

Hinweis: Das Unterhaltungsfeld wird absichtlich von der Serialisierung des Prüfpunkts ausgeschlossen, um Duplizierung zu verhindern. Die Unterhaltung wird im Zustand des Koordinators erhalten und wird auf Wiederherstellung wieder aufgebaut. Siehe Problem Nr. 2667.

HostedCodeInterpreterTool

Stellt ein gehostetes Tool dar, das einem KI-Dienst angegeben werden kann, um die Ausführung von generiertem Code zu ermöglichen.

Dieses Tool implementiert keine Codeinterpretation selbst. Es dient als Markierung, um einen Dienst darüber zu informieren, dass er generierten Code ausführen darf, wenn der Dienst dies tun kann.

Initialisieren Sie das HostedCodeInterpreterTool.

HostedFileContent

Stellt einen gehosteten Dateiinhalt dar.

Initialisiert eine HostedFileContent-Instanz.

HostedFileSearchTool

Stellt ein Dateisuchtool dar, das einem KI-Dienst angegeben werden kann, um die Ausführung von Dateisuchen zu ermöglichen.

Initialisieren Sie ein FileSearchTool.

HostedMCPSpecificApproval

Stellt den spezifischen Modus für ein gehostetes Tool dar.

Bei Verwendung dieses Modus muss der Benutzer angeben, welche Tools immer oder nie genehmigt werden müssen. Dies wird als Wörterbuch mit zwei optionalen Schlüsseln dargestellt:

HostedMCPTool

Stellt ein MCP-Tool dar, das vom Dienst verwaltet und ausgeführt wird.

Erstellen Sie ein gehostetes MCP-Tool.

HostedVectorStoreContent

Stellt einen gehosteten Vektorspeicherinhalt dar.

Initialisiert eine HostedVectorStoreContent-Instanz.

HostedWebSearchTool

Stellt ein Websuchtool dar, das einem KI-Dienst angegeben werden kann, um es zum Ausführen von Websuchen zu ermöglichen.

Initialisieren sie ein HostedWebSearchTool.

InMemoryCheckpointStorage

Speicher im Arbeitsspeicher für Tests und Entwicklung.

Initialisieren Sie den Speicher.

InProcRunnerContext

In-Process-Ausführungskontext für lokale Ausführung und optionale Prüfpunkte.

Initialisieren Sie den In-Process-Ausführungskontext.

MCPStdioTool

MCP-Tool zum Herstellen einer Verbindung mit stdiobasierten MCP-Servern.

Diese Klasse stellt eine Verbindung mit MCP-Servern her, die über Standardeingabe/Ausgabe kommunizieren, die in der Regel für lokale Prozesse verwendet werden.

Initialisieren Sie das MCP-Stdiotool.

Hinweis

Die Argumente werden verwendet, um ein StdioServerParameters-Objekt zu erstellen,

die dann zum Erstellen eines Stdioclients verwendet wird. Siehe mcp.client.stdio.stdio_client

und mcp.client.stdio.stdio_server_parameters für weitere Details.

MCPStreamableHTTPTool

MCP-Tool zum Herstellen einer Verbindung mit HTTP-basierten MCP-Servern.

Diese Klasse verbindet sich mit MCP-Servern, die über streambare HTTP/SSE kommunizieren.

Initialisieren Sie das MCP-streambare HTTP-Tool.

Hinweis

Die Argumente werden verwendet, um einen streambaren HTTP-Client zu erstellen.

Weitere Informationen finden Sie unter mcp.client.streamable_http.streamablehttp_client.

Alle zusätzlichen Argumente, die an den Konstruktor übergeben werden, werden an die

streambarer HTTP-Clientkonstruktor.

MCPWebsocketTool

MCP-Tool zum Herstellen einer Verbindung mit webSocket-basierten MCP-Servern.

Diese Klasse verbindet sich mit MCP-Servern, die über WebSocket kommunizieren.

Initialisieren Sie das MCP WebSocket-Tool.

Hinweis

Die Argumente werden verwendet, um einen WebSocket-Client zu erstellen.

Weitere Informationen finden Sie unter mcp.client.websocket.websocket_client.

Alle zusätzlichen Argumente, die an den Konstruktor übergeben werden, werden an die

WebSocket-Clientkonstruktor.

MagenticBuilder

Fluent-Generator zum Erstellen von Multi-Agent-Orchestrierungsworkflows für Magentic One.

Magentic One-Workflows verwenden einen LLM-basierten Manager, um mehrere Agents durch dynamische Aufgabenplanung, Fortschrittsverfolgung und adaptive Neuplanung zu koordinieren. Der Manager erstellt Pläne, wählt Agents aus, überwacht den Fortschritt und bestimmt, wann der Plan neu geplant oder abgeschlossen werden soll.

Der Generator bietet eine Fluent-API zum Konfigurieren von Teilnehmern, dem Vorgesetzten, optionalen Planüberprüfungen, Prüfpunkten und Ereignisrückrufen.

Human-in-the-Loop-Unterstützung: Magentic bietet spezielle HITL-Mechanismen über:

  • .with_plan_review() – Überprüfen und Genehmigen/Überarbeiten von Plänen vor der Ausführung

  • .with_human_input_on_stall() – Eingreifen, wenn der Workflow angehalten wird

  • Toolgenehmigung über FunctionApprovalRequestContent – Genehmigen einzelner Toolaufrufe

Diese emittieren MagenticHumanInterventionRequest-Ereignisse , die strukturierte Entscheidungsoptionen (APPROVE, REVISE, CONTINUE, REPLAN, GUIDANCE) bereitstellen, die für die planungsbasierte Orchestrierung von Magentic geeignet sind.

Verwendung:


   from agent_framework import MagenticBuilder, StandardMagenticManager
   from azure.ai.projects.aio import AIProjectClient

   # Create manager with LLM client
   project_client = AIProjectClient.from_connection_string(...)
   chat_client = project_client.inference.get_chat_completions_client()

   # Build Magentic workflow with agents
   workflow = (
       MagenticBuilder()
       .participants(researcher=research_agent, writer=writing_agent, coder=coding_agent)
       .with_standard_manager(chat_client=chat_client, max_round_count=20, max_stall_count=3)
       .with_plan_review(enable=True)
       .with_checkpointing(checkpoint_storage)
       .build()
   )

   # Execute workflow
   async for message in workflow.run("Research and write article about AI agents"):
       print(message.text)

Mit benutzerdefiniertem Manager:


   # Create custom manager subclass
   class MyCustomManager(MagenticManagerBase):
       async def plan(self, context: MagenticContext) -> ChatMessage:
           # Custom planning logic
           ...


   manager = MyCustomManager()
   workflow = MagenticBuilder().participants(agent1=agent1, agent2=agent2).with_standard_manager(manager).build()
MagenticContext

Kontext für den Magentischen Manager.

MagenticManagerBase

Basisklasse für den Magentic One-Manager.

ManagerDirectiveModel

Pydantisches Modell für die Ausgabe der strukturierten Managerdirektive.

Erstellen Sie ein neues Modell, indem Sie Eingabedaten aus Schlüsselwortargumenten analysieren und validieren.

Löst [ValidationError][pydantic_core. ValidationError] wenn die Eingabedaten nicht überprüft werden können, um ein gültiges Modell zu bilden.

self ist explizit positional, um selbst als Feldnamen zuzulassen.

ManagerSelectionRequest

Anforderung, die an den Vorgesetzten für die Auswahl des nächsten Sprechers gesendet wurde.

Diese Dataclass enthält den vollständigen Unterhaltungszustand und den Aufgabenkontext für den Manager-Agent, um eine Sprecherauswahlentscheidung zu analysieren und zu treffen.

ManagerSelectionResponse

Antwort des Vorgesetzten mit Entscheidung zur Sprecherauswahl.

Der Manager-Agent muss diese Struktur (oder kompatibles Diktat/JSON) erstellen, um seine Entscheidung zurück an den Orchestrator zu übermitteln.

Erstellen Sie ein neues Modell, indem Sie Eingabedaten aus Schlüsselwortargumenten analysieren und validieren.

Löst [ValidationError][pydantic_core. ValidationError] wenn die Eingabedaten nicht überprüft werden können, um ein gültiges Modell zu bilden.

self ist explizit positional, um selbst als Feldnamen zuzulassen.

Message

Eine Klasse, die eine Nachricht im Workflow darstellt.

OrchestrationState

Einheitlicher Zustandscontainer für Orchestratorprüfpunkte.

Diese Dataclass standardisiert die Serialisierung des Prüfpunkts über alle drei Gruppenchatmuster hinweg und ermöglicht musterspezifische Erweiterungen über Metadaten.

Allgemeine Attribute decken gemeinsame Orchestrierungsbedenken (Aufgabe, Unterhaltung, Round tracking) ab. Musterspezifischer Zustand wird im Metadaten-Diktieren angezeigt.

RequestInfoEvent

Das Ereignis wird ausgelöst, wenn ein Workflowausführer externe Informationen anfordert.

Initialisieren Sie das Anforderungsinformationsereignis.

RequestInfoInterceptor

Interner Executor, der den Workflow für die menschliche Eingabe angehalten, bevor der Agent ausgeführt wird.

Dieser Executor wird nach Generatoren in das Workflowdiagramm eingefügt, wenn .with_request_info() aufgerufen wird. Er fängt AgentExecutorRequest-Nachrichten ab, BEVOR der Agent ausgeführt wird, und hält den Workflow über ctx.request_info() mit einem AgentInputRequest an.

Wenn eine Antwort empfangen wird, fügt der Antworthandler die Eingabe als Benutzernachricht in die Unterhaltung ein und leitet die Anforderung an den Agent weiter.

Der optionale agent_filter-Parameter ermöglicht das Einschränken, welche Agents die Pause auslösen. Wenn sich die ID des Ziel-Agents nicht im Filtersatz befindet, wird die Anforderung ohne Unterbrechung weitergeleitet.

Initialisieren Sie den Abfrageinformations-Interceptor-Executor.

Role

Beschreibt den beabsichtigten Zweck einer Nachricht innerhalb einer Chatinteraktion.

Eigenschaften: SYSTEM: Die Rolle, die das Verhalten des KI-Systems anweist oder festlegt. BENUTZER: Die Rolle, die Benutzereingaben für Chatinteraktionen bereitstellt. ASSISTENT: Die Rolle, die Antworten auf vom System angewiesene, vom Benutzer aufgeforderte Eingaben bereitstellt. TOOL: Die Rolle, die zusätzliche Informationen und Verweise als Reaktion auf Toolverwendungsanforderungen bereitstellt.

Initialisieren der Rolle mit einem Wert.

Runner

Eine Klasse zum Ausführen eines Workflows in Pregel-Supersteps.

Initialisieren Sie den Läufer mit Kanten, freigegebenem Zustand und Kontext.

RunnerContext

Protokoll für den ausführungskontext, der vom Runner verwendet wird.

Ein einzelner Kontext, der Messaging, Ereignisse und optionale Prüfpunkte unterstützt. Wenn der Prüfpunktspeicher nicht konfiguriert ist, können Prüfpunktmethoden auslösen.

SequentialBuilder

Übergeordneter Generator für sequenzielle Agent-/Executor-Workflows mit freigegebenem Kontext.

  • teilnehmer([...]) akzeptiert eine Liste von AgentProtocol -Instanzen (empfohlen) oder Executor-Instanzen

  • register_participants([...]) akzeptiert eine Liste der Fabriken für AgentProtocol (empfohlen)

    oder Executor-Fabriken

  • Executors müssen einen Handler definieren, der die Liste verwendet[ChatMessage] und sendet eine Liste [ChatMessage]

  • Der Workflow verkabelt Teilnehmer in der Reihenfolge und übergibt eine Liste [ChatMessage] entlang der Kette.

  • Agents fügen ihre Assistentennachrichten an die Unterhaltung an

  • Benutzerdefinierte Executoren können eine Liste transformieren/zusammenfassen und zurückgeben[ChatMessage]

  • Die endgültige Ausgabe ist die Unterhaltung, die vom letzten Teilnehmer erzeugt wird.

Verwendung:


   from agent_framework import SequentialBuilder

   # With agent instances
   workflow = SequentialBuilder().participants([agent1, agent2, summarizer_exec]).build()

   # With agent factories
   workflow = (
       SequentialBuilder().register_participants([create_agent1, create_agent2, create_summarizer_exec]).build()
   )

   # Enable checkpoint persistence
   workflow = SequentialBuilder().participants([agent1, agent2]).with_checkpointing(storage).build()

   # Enable request info for mid-workflow feedback (pauses before each agent)
   workflow = SequentialBuilder().participants([agent1, agent2]).with_request_info().build()

   # Enable request info only for specific agents
   workflow = (
       SequentialBuilder()
       .participants([agent1, agent2, agent3])
       .with_request_info(agents=[agent2])  # Only pause before agent2
       .build()
   )
SharedState

Eine Klasse zum Verwalten des freigegebenen Zustands in einem Workflow.

SharedState bietet threadsicheren Zugriff auf Workflowstatusdaten, die während der Workflowausführung für alle Ausführenden freigegeben werden müssen.

Reservierte Schlüssel: Die folgenden Schlüssel sind für die interne Framework-Verwendung reserviert und sollten nicht von Benutzercode geändert werden:

  • _executor_state: Speichert den Ausführungsstatus für die Prüfpunkterstellung (verwaltet von Runner)

Warnung

Verwenden Sie keine Tasten, die mit Unterstrich (_) beginnen, da sie für

Interne Framework-Vorgänge.

Initialisieren Sie den freigegebenen Zustand.

SingleEdgeGroup

Komfortwrapper für einen solitären Rand, wobei die Gruppen-API einheitlich bleibt.

Erstellen Sie eine 1:1-Edgegruppe zwischen zwei Executoren.

StandardMagenticManager

Standard Magentic Manager, der echte LLM-Anrufe über einen ChatAgent durchführt.

Der Manager erstellt Eingabeaufforderungen, die die ursprüngliche Magentic One-Orchestrierung spiegeln:

  • Faktensammlung
  • Planen der Erstellung
  • Fortschrittsbuch in JSON
  • Faktenaktualisierung und Planupdate beim Zurücksetzen
  • Endgültige Antwortsynthese

Initialisieren Sie den Standard Magentic Manager.

SubWorkflowRequestMessage

Nachricht, die von einem Unterworkflow an einen Executor im übergeordneten Workflow gesendet wird, um Informationen anzufordern.

Diese Nachricht umschließt ein RequestInfoEvent, das vom Executor im Unterworkflow ausgegeben wird.

SubWorkflowResponseMessage

Nachricht, die von einem übergeordneten Workflow an einen Unterworkflow über WorkflowExecutor gesendet wird, um angeforderte Informationen bereitzustellen.

Diese Nachricht umschließt die Antwortdaten zusammen mit dem ursprünglichen RequestInfoEvent, das vom Unterworkflow-Executor ausgegeben wird.

SuperStepCompletedEvent

Das Ereignis wird ausgelöst, wenn ein Superstep endet.

Initialisieren Sie das Superstep-Ereignis.

SuperStepStartedEvent

Das Ereignis wird ausgelöst, wenn ein Superstep gestartet wird.

Initialisieren Sie das Superstep-Ereignis.

SwitchCaseEdgeGroup

Lüfterausgangsvariante, die einen herkömmlichen Schalt-/Fallsteuerungsfluss nachahmt.

Jeder Fall prüft die Nachrichtennutzlast und entscheidet, ob die Nachricht behandelt werden soll. Genau ein Fall oder der Standardverzweigung gibt zur Laufzeit ein Ziel zurück, wobei die Semantik für einmalige Verteiler beibehalten wird.

Konfigurieren Sie eine Switch-/Case-Routingstruktur für einen einzelnen Quellausführer.

SwitchCaseEdgeGroupCase

Persistable description of a single conditional branch in a switch-case.

Im Gegensatz zum Runtime Case-Objekt speichert diese serialisierbare Variante nur den Zielbezeichner und einen beschreibenden Namen für das Prädikat. Wenn die zugrunde liegende Aufruffunktion während der Deserialisierung nicht verfügbar ist, ersetzen wir einen Proxyplatzhalter, der laut fehlschlägt, um sicherzustellen, dass die fehlende Abhängigkeit sofort sichtbar ist.

Zeichnen Sie die Routingmetadaten für eine bedingte Fallverzweigung auf.

SwitchCaseEdgeGroupDefault

Persistable descriptor for the fallback branch of a switch-case group.

Der Standardverzweigung ist garantiert vorhanden und wird aufgerufen, wenn jedes andere Fall-Prädikat nicht mit der Nutzlast übereinstimmt.

Zeigen Sie den Standardverzweigung auf den angegebenen Executorbezeichner.

TextContent

Stellt Textinhalte in einem Chat dar.

Initialisiert eine TextContent-Instanz.

TextReasoningContent

Stellt Textgrundsätze in einem Chat dar.

Hinweise: Diese Klasse und TextContent sind oberflächlich ähnlich, aber unterschiedlich.

Initialisiert eine TextReasoningContent-Instanz.

TextSpanRegion

Stellt einen Textbereich dar, der kommentiert wurde.

Initialisieren sie TextSpanRegion.

ToolMode

Definiert, ob und wie Tools in einer Chatanfrage verwendet werden.

Initialisieren Sie ToolMode.

ToolProtocol

Stellt ein generisches Tool dar.

Dieses Protokoll definiert die Schnittstelle, die alle Tools implementieren müssen, um mit dem Agent-Framework kompatibel zu sein. Sie wird von verschiedenen Toolklassen wie HostedMCPTool, HostedWebSearchTool und AIFunction implementiert. Eine AIFunction wird in der Regel vom ai_function Dekorateur erstellt.

Da jeder Connector Tools anders analysieren muss, können Benutzer ein Diktat übergeben, um ein dienstspezifisches Tool anzugeben, wenn keine Abstraktion verfügbar ist.

TypeCompatibilityError

Ausnahme ausgelöst, wenn die Typinkompatibilität zwischen verbundenen Executoren erkannt wird.

UriContent

Stellt einen URI-Inhalt dar.

Von Bedeutung

Dies wird für Inhalte verwendet, die durch einen URI identifiziert werden, z. B. ein Bild oder eine Datei.

Verwenden Sie für (binäre) Daten-URIs stattdessen DataContent.

Initialisiert eine UriContent-Instanz.

Hinweise: Dies wird für Inhalte verwendet, die durch einen URI identifiziert werden, z. B. ein Bild oder eine Datei. Verwenden Sie für (binäre) Daten-URIs stattdessen DataContent .

UsageContent

Stellt Nutzungsinformationen dar, die einer Chatanfrage und -antwort zugeordnet sind.

Initialisiert eine UsageContent-Instanz.

UsageDetails

Stellt Nutzungsdetails zu einer Anforderung/Antwort bereit.

Initialisiert die UsageDetails-Instanz.

Workflow

Ein graphbasiertes Ausführungsmodul, das verbundene Executoren orchestriert.

Überblick

Ein Workflow führt ein gerichtetes Diagramm von Ausführenden aus, die über Edgegruppen mit einem Pregel-ähnlichen Modell verbunden sind und in Supersteps ausgeführt werden, bis das Diagramm im Leerlauf ist. Workflows werden mit der WorkflowBuilder-Klasse erstellt – instanziieren Sie diese Klasse nicht direkt.

Ausführungsmodell

Executors werden in synchronisierten Supersteps ausgeführt, wobei jeder Executor:

  • Wird aufgerufen, wenn sie Nachrichten von verbundenen Edgegruppen empfängt.
  • Nachrichten können über ctx.send_message() an nachgeschaltete Executoren gesendet werden.
  • Kann Ausgaben auf Workflowebene über ctx.yield_output()
  • Kann benutzerdefinierte Ereignisse über ctx.add_event() ausgeben

Nachrichten zwischen Executoren werden am Ende jedes Supersteps übermittelt und sind im Ereignisdatenstrom nicht sichtbar. Nur Ereignisse auf Workflowebene (Ausgaben, benutzerdefinierte Ereignisse) und Statusereignisse sind für Aufrufer feststellbar.

Eingabe-/Ausgabetypen

Workflowtypen werden zur Laufzeit ermittelt, indem Sie Folgendes überprüfen:

  • Eingabetypen: Eingabetypen des Startausführers
  • Ausgabetypen: Union aller Workflowausgabetypen aller Executoren Greifen Sie über die eigenschaften input_types und output_types zu.

Ausführungsmethoden

Der Workflow stellt zwei primäre Ausführungs-APIs bereit, die jeweils mehrere Szenarien unterstützen:

  • run(): Ausführen zum Abschluss, Gibt WorkflowRunResult mit allen Ereignissen zurück.

  • run_stream(): Gibt asynchrone Generators zurück, der Ereignisse liefert, sobald sie auftreten

Beide Methoden unterstützen:

  • Anfänglicher Workflow wird ausgeführt: Bereitstellen des Nachrichtenparameters
  • Prüfpunktwiederherstellung: Bereitstellen von checkpoint_id (und optional checkpoint_storage)
  • HIL-Fortsetzung: Bereitstellen von Antworten , die nach RequestInfoExecutor-Anforderungen fortgesetzt werden sollen
  • Laufzeitprüfpunkt: Bereitstellen checkpoint_storage zum Aktivieren/Überschreiben des Prüfpunkts für diese Ausführung

Zustandsverwaltung

Workflowinstanzen enthalten Zustände und Zustände werden über Aufrufe hinweg beibehalten, die ausgeführt werden sollen, und run_stream. Um mehrere unabhängige Ausführungen auszuführen, erstellen Sie separate Workflowinstanzen über WorkflowBuilder.

Externe Eingabeanforderungen

Executors innerhalb eines Workflows können externe Eingaben mithilfe von ctx.request_info()anfordern:

  1. Executor ruft ctx.request_info() auf, um Eingaben anzufordern
  2. Der Executor implementiert response_handler() zum Verarbeiten der Antwort
  3. Anforderungen werden als RequestInfoEvent-Instanzen im Ereignisstream ausgegeben.
  4. Workflow wechselt in IDLE_WITH_PENDING_REQUESTS Zustand
  5. Der Aufrufer verarbeitet Anforderungen und stellt Antworten über die Methoden send_responses oder send_responses_streaming bereit .
  6. Antworten werden an die anfordernden Executoren weitergeleitet, und Antworthandler werden aufgerufen.

Setzen von Prüfpunkten

Prüfpunkte können zur Buildzeit oder Laufzeit konfiguriert werden:

Buildzeit (über WorkflowBuilder): workflow = WorkflowBuilder().with_checkpointing(storage).build()

Runtime (via run/run_stream parameters): result = await workflow.run(message, checkpoint_storage=runtime_storage)

Wenn diese Option aktiviert ist, werden Prüfpunkte am Ende jedes Supersteps erstellt und erfasst:

  • Vollstreckungsstatus
  • Nachrichten während der Übertragung
  • Freigegebene Zustandsworkflows können über Prozessneustarts mit Prüfpunktspeicher angehalten und fortgesetzt werden.

Zusammensetzung

Workflows können mithilfe von WorkflowExecutor geschachtelt werden, wodurch ein untergeordneter Workflow als Executor umbrochen wird. Die Eingabe-/Ausgabetypen des geschachtelten Workflows werden Teil der WorkflowExecutor-Typen. Wenn der WorkflowExecutor aufgerufen wird, wird der geschachtelte Workflow ausgeführt, um seine Ausgaben fertig zu machen und zu verarbeiten.

Initialisieren Sie den Workflow mit einer Liste von Rändern.

WorkflowAgent

Eine Agent-Unterklasse , die einen Workflow umschließt und als Agent verfügbar macht.

Initialisieren Sie den WorkflowAgent.

WorkflowBuilder

Eine Generatorklasse zum Erstellen von Workflows.

Diese Klasse bietet eine Fluent-API zum Definieren von Workflowdiagrammen durch Verbinden von Executoren mit Kanten und Konfigurieren von Ausführungsparametern. Aufrufen build zum Erstellen einer unveränderlichen Workflow Instanz.

Initialisieren Sie den WorkflowBuilder mit einer leeren Liste von Rändern und ohne Startausführer.

WorkflowCheckpoint

Stellt einen vollständigen Prüfpunkt des Workflowzustands dar.

Prüfpunkte erfassen den vollständigen Ausführungszustand eines Workflows an einem bestimmten Punkt, sodass Workflows angehalten und fortgesetzt werden können.

Hinweis

Das shared_state Dict kann reservierte Schlüssel enthalten, die vom Framework verwaltet werden.

Ausführliche Informationen zu reservierten Schlüsseln finden Sie in der SharedState-Klassendokumentation.

WorkflowCheckpointSummary

Lesbare Zusammenfassung eines Workflowprüfpunkts.

WorkflowContext

Ausführungskontext, mit dem Executoren mit Workflows und anderen Executoren interagieren können.

Überblick

WorkflowContext bietet eine kontrollierte Schnittstelle für Executoren zum Senden von Nachrichten, zum Erstellen von Ausgaben, zum Verwalten des Zustands und zur Interaktion mit dem umfassenderen Workflowökosystem. Er erzwingt die Typsicherheit durch generische Parameter, während der direkte Zugriff auf interne Laufzeitkomponenten verhindert wird.

Typparameter

Der Kontext wird parametrisiert, um die Typsicherheit für verschiedene Vorgänge zu erzwingen:

WorkflowContext (keine Parameter)

Für Executoren, die nur Nebenwirkungen ausführen, ohne Nachrichten zu senden oder Ausgaben zu liefern:


   async def log_handler(message: str, ctx: WorkflowContext) -> None:
       print(f"Received: {message}")  # Only side effects

WorkflowContext[T_Out]

Ermöglicht das Senden von Nachrichten vom Typ T_Out an andere Executoren:


   async def processor(message: str, ctx: WorkflowContext[int]) -> None:
       result = len(message)
       await ctx.send_message(result)  # Send int to downstream executors

WorkflowContext[T_Out, T_W_Out]

Ermöglicht sowohl das Senden von Nachrichten (T_Out) als auch das Zurückgeben von Workflowausgaben (T_W_Out):


   async def dual_output(message: str, ctx: WorkflowContext[int, str]) -> None:
       await ctx.send_message(42)  # Send int message
       await ctx.yield_output("complete")  # Yield str workflow output

Union-Typen

Mehrere Typen können mithilfe der Union-Schreibweise angegeben werden:


   async def flexible(message: str, ctx: WorkflowContext[int | str, bool | dict]) -> None:
       await ctx.send_message("text")  # or send 42
       await ctx.yield_output(True)  # or yield {"status": "done"}

Initialisieren Sie den Executorkontext mit dem angegebenen Workflowkontext.

WorkflowErrorDetails

Strukturierte Fehlerinformationen, die in Fehlerereignissen/Ergebnissen angezeigt werden sollen.

WorkflowEvent

Basisklasse für Workflowereignisse.

Initialisieren Sie das Workflowereignis mit optionalen Daten.

WorkflowExecutor

Ein Executor, der einen Workflow umschließt, um die hierarchische Workflowkomposition zu ermöglichen.

Überblick

WorkflowExecutor bewirkt, dass sich ein Workflow als einzelner Executor innerhalb eines übergeordneten Workflows verhält und geschachtelte Workflowarchitekturen ermöglicht. Er behandelt den vollständigen Lebenszyklus der Ausführung von Unterworkflows, einschließlich Ereignisverarbeitung, Ausgabeweiterleitung und Anforderungs-/Antwortkoordination zwischen übergeordneten und untergeordneten Workflows.

Ausführungsmodell

Beim Aufrufen von WorkflowExecutor:

  1. Startet den umschlossenen Workflow mit der Eingabemeldung.
  2. Führt den Teilworkflow bis zum Abschluss oder bis zum Ausführen externer Eingaben aus.
  3. Verarbeitet den vollständigen Ereignisdatenstrom des Unterworkflows nach der Ausführung.
  4. Leitet Ausgaben als Nachrichten an den übergeordneten Workflow weiter.
  5. Behandelt externe Anforderungen, indem sie an den übergeordneten Workflow weitergeleitet werden
  6. Sammelt Antworten und setzt die Ausführung des Unterworkflows fort.

Ereignisstreamverarbeitung

WorkflowExecutor verarbeitet Ereignisse nach Abschluss des Unterworkflows:

Ausgabeweiterleitung

Alle Ausgaben des Unterworkflows werden automatisch an das übergeordnete Element weitergeleitet:

Wenn allow_direct_output "False" ist (Standard):


   # An executor in the sub-workflow yields outputs
   await ctx.yield_output("sub-workflow result")

   # WorkflowExecutor forwards to parent via ctx.send_message()
   # Parent receives the output as a regular message

Wenn allow_direct_output "True" ist:

Anforderungs-/Antwortkoordination

Wenn Unterworkflows externe Informationen benötigen:


   # An executor in the sub-workflow makes request
   request = MyDataRequest(query="user info")

   # WorkflowExecutor captures RequestInfoEvent and wraps it in a SubWorkflowRequestMessage
   # then send it to the receiving executor in parent workflow. The executor in parent workflow
   # can handle the request locally or forward it to an external source.
   # The WorkflowExecutor tracks the pending request, and implements a response handler.
   # When the response is received, it executes the response handler to accumulate responses
   # and resume the sub-workflow when all expected responses are received.
   # The response handler expects a SubWorkflowResponseMessage wrapping the response data.

Zustandsverwaltung

WorkflowExecutor verwaltet den Ausführungszustand über Anforderungs-/Antwortzyklen hinweg:

  • Verfolgt ausstehende Anforderungen nach request_id
  • Sammelt Antworten, bis alle erwarteten Antworten empfangen werden
  • Setzt die Ausführung des Unterworkflows mit vollständigem Antwortbatch fort.
  • Behandelt gleichzeitige Ausführungen und mehrere ausstehende Anforderungen

Typsystemintegration

WorkflowExecutor erbt seine Typsignatur vom umschlossenen Workflow:

Eingabetypen

Entspricht den Startausführereingabetypen des eingeschlossenen Workflows:


   # If sub-workflow accepts str, WorkflowExecutor accepts str
   workflow_executor = WorkflowExecutor(my_workflow, id="wrapper")
   assert workflow_executor.input_types == my_workflow.input_types

Ausgabetypen

Kombiniert Unterworkflow-Ausgaben mit Anforderungskoordinationstypen:


   # Includes all sub-workflow output types
   # Plus SubWorkflowRequestMessage if sub-workflow can make requests
   output_types = workflow.output_types + [SubWorkflowRequestMessage]  # if applicable

Fehlerbehandlung

WorkflowExecutor verteilt Unterworkflowfehler:

  • Erfasst WorkflowFailedEvent vom Unterworkflow
  • Konvertiert in WorkflowErrorEvent im übergeordneten Kontext
  • Enthält detaillierte Fehlerinformationen, einschließlich der Unterworkflow-ID.

Unterstützung für gleichzeitige Ausführung

WorkflowExecutor unterstützt vollständig mehrere gleichzeitige Unterworkflowausführungen:

Per-Execution Zustandsisolation

Jeder Unterworkflowaufruf erstellt einen isolierten ExecutionContext:


   # Multiple concurrent invocations are supported
   workflow_executor = WorkflowExecutor(my_workflow, id="concurrent_executor")

   # Each invocation gets its own execution context
   # Execution 1: processes input_1 independently
   # Execution 2: processes input_2 independently
   # No state interference between executions

Anforderungs-/Antwortkoordination

Antworten werden ordnungsgemäß an die ursprüngliche Ausführung weitergeleitet:

  • Jede Ausführung verfolgt ihre eigenen ausstehenden Anforderungen und erwarteten Antworten.
  • Anforderungs-zu-Ausführung-Zuordnung stellt sicher, dass Antworten den richtigen Unterworkflow erreichen
  • Antwortakkumulation ist pro Ausführung isoliert
  • Automatische Bereinigung, wenn die Ausführung abgeschlossen ist

Speicherverwaltung

  • Unbegrenzte gleichzeitige Ausführung unterstützt
  • Jede Ausführung verfügt über eine eindeutige UUID-basierte Identifikation
  • Bereinigen abgeschlossener Ausführungskontexte
  • Threadsichere Zustandsverwaltung für gleichzeitigen Zugriff

Wichtige Überlegungen

Freigegebene Workflowinstanz: Alle gleichzeitigen Ausführungen verwenden dieselbe zugrunde liegende Workflowinstanz. Stellen Sie sicher, dass der umschlossene Workflow und die zugehörigen Executoren zustandslos sind, um eine ordnungsgemäße Isolierung zu gewährleisten.


   # Avoid: Stateful executor with instance variables
   class StatefulExecutor(Executor):
       def __init__(self):
           super().__init__(id="stateful")
           self.data = []  # This will be shared across concurrent executions!

Integration in übergeordnete Workflows

Übergeordnete Workflows können Unterworkflowanforderungen abfangen:

Implementierungshinweise

  • Teilworkflows werden bis zum Abschluss ausgeführt, bevor die Ergebnisse verarbeitet werden
  • Ereignisverarbeitung ist atomar – alle Ausgaben werden vor Anforderungen weitergeleitet.
  • Antwortakkumulation stellt sicher, dass Unterworkflows vollständige Antwortbatches empfangen
  • Der Ausführungszustand wird für die ordnungsgemäße Wiederaufnahme nach externen Anforderungen beibehalten.
  • Gleichzeitige Ausführungen sind vollständig isoliert und stören sich nicht gegenseitig

Initialisieren Sie den WorkflowExecutor.

WorkflowFailedEvent

Integriertes Lebenszyklusereignis, das ausgegeben wird, wenn eine Workflowausführung mit einem Fehler beendet wird.

WorkflowOutputEvent

Das Ereignis wird ausgelöst, wenn ein Workflowausführer die Ausgabe liefert.

Initialisieren Sie das Workflowausgabeereignis.

WorkflowRunResult

Container für Ereignisse, die während der Ausführung von Nicht-Streaming-Workflows generiert wurden.

Überblick

Stellt die vollständigen Ausführungsergebnisse einer Workflowausführung dar, die alle Ereignisse enthält, die von Anfang bis Leerlauf generiert werden. Workflows erzeugen Ausgaben inkrementell über ctx.yield_output()-Aufrufe während der Ausführung.

Ereignisstruktur

Behält die Trennung zwischen Datenebenen- und Steuerebenenereignissen bei:

  • Ereignisse auf Datenebene: Executor-Aufrufe, Fertigstellungen, Ausgaben und Anforderungen (in der Hauptliste)
  • Steuerungsebenenereignisse: Statuszeitachse über status_timeline()-Methode zugänglich

Schlüsselmethoden

  • get_outputs(): Extrahieren aller Workflowausgaben aus der Ausführung
  • get_request_info_events(): Abrufen externer Eingabeanforderungen, die während der Ausführung vorgenommen wurden
  • get_final_state(): Abrufen des endgültigen Workflowzustands (IDLE, IDLE_WITH_PENDING_REQUESTS usw.)
  • status_timeline(): Zugreifen auf den vollständigen Statusereignisverlauf
WorkflowStartedEvent

Integriertes Lebenszyklusereignis, das ausgegeben wird, wenn eine Workflowausführung beginnt.

Initialisieren Sie das Workflowereignis mit optionalen Daten.

WorkflowStatusEvent

Integriertes Lebenszyklusereignis, das für Workflowausführungsstatusübergänge ausgegeben wird.

Initialisieren Sie das Workflowstatusereignis mit einem neuen Zustand und optionalen Daten.

WorkflowValidationError

Basis exception for workflow validation errors.

WorkflowViz

Eine Klasse zum Visualisieren von Workflows mithilfe von graphviz und Mermaid.

Initialisieren Sie workflowViz mit einem Workflow.

Enumerationen

MagenticHumanInterventionDecision

Entscheidungsoptionen für menschliche Interventionen.

MagenticHumanInterventionKind

Die Art des menschlichen Eingreifens, das angefordert wird.

ValidationTypeEnum

Aufzählung der Workflowüberprüfungstypen.

WorkflowEventSource

Gibt an, ob ein Workflowereignis aus dem Framework oder einem Executor stammt.

Verwenden Sie FRAMEWORK für Ereignisse, die von integrierten Orchestrierungspfaden ausgegeben werden , auch wenn der Code, der sie auslöst, in runner-bezogenen Modulen lebt, und EXECUTOR für Ereignisse, die von Entwicklern bereitgestellten Executorimplementierungen angezeigt werden.

WorkflowRunState

Status auf Ausführungsebene einer Workflowausführung.

Semantik:

  • GESTARTET: Die Ausführung wurde initiiert, und der Workflowkontext wurde erstellt. Dies ist ein Anfangszustand, bevor eine sinnvolle Arbeit ausgeführt wird. In dieser Codebasis geben wir ein dediziertes WorkflowStartedEvent für Telemetrie aus und setzen den Status in der Regel direkt auf IN_PROGRESS. Verbraucher können sich weiterhin auf "GESTARTET" für Zustandsautomaten verlassen, die eine explizite Vorarbeitsphase benötigen.

  • IN_PROGRESS: Der Workflow wird aktiv ausgeführt (z. B. wurde die anfängliche Nachricht an den Startausführer übermittelt, oder ein Superstep wird ausgeführt). Dieser Status wird am Anfang einer Ausführung ausgegeben und kann von anderen Status beim Fortschritt der Ausführung gefolgt werden.

  • IN_PROGRESS_PENDING_REQUESTS: Aktive Ausführung, während ein oder mehrere Anforderungs-for-Information-Vorgänge ausstehen. Neue Arbeit ist möglicherweise noch geplant, während Sich Anforderungen im Test-Flight befinden.

  • IDLE: Der Workflow bleibt ohne ausstehende Anforderungen still und funktioniert nicht mehr. Dies ist der normale Terminalzustand für Workflows, die die Ausführung abgeschlossen haben und möglicherweise Ausgaben auf dem Weg produziert haben.

  • IDLE_WITH_PENDING_REQUESTS: Der Workflow wird angehalten, um auf externe Eingaben zu warten (z. B. ein RequestInfoEvent ausgegeben). Dies ist ein nicht terminaler Zustand; der Workflow kann fortgesetzt werden, wenn Antworten bereitgestellt werden.

  • FAILED: Terminalstatus, der einen Fehler anzeigt. Begleitet von einem WorkflowFailedEvent mit strukturierten Fehlerdetails.

  • ABGEBROCHEN: Terminalstatus, der angibt, dass die Ausführung von einem Anrufer oder Orchestrator abgebrochen wurde. Derzeit nicht von standardmäßigen Läuferpfaden ausgegeben, sondern für Integratoren/Orchestratoren enthalten, die den Abbruch unterstützen.

Functions

agent_middleware

Dekorateur, um eine Funktion als Agent-Middleware zu markieren.

Dieser Dekorateur identifiziert eine Funktion explizit als Agent-Middleware, die AgentRunContext-Objekte verarbeitet.

agent_middleware(func: Callable[[AgentRunContext, Callable[[AgentRunContext], Awaitable[None]]], Awaitable[None]]) -> Callable[[AgentRunContext, Callable[[AgentRunContext], Awaitable[None]]], Awaitable[None]]

Parameter

Name Beschreibung
func
Erforderlich

Die Middleware-Funktion, die als Agent-Middleware gekennzeichnet werden soll.

Gibt zurück

Typ Beschreibung

Dieselbe Funktion mit Agent-Middleware-Marker.

Beispiele


   from agent_framework import agent_middleware, AgentRunContext, ChatAgent


   @agent_middleware
   async def logging_middleware(context: AgentRunContext, next):
       print(f"Before: {context.agent.name}")
       await next(context)
       print(f"After: {context.result}")


   # Use with an agent
   agent = ChatAgent(chat_client=client, name="assistant", middleware=logging_middleware)

ai_function

Schmücken Sie eine Funktion, um sie in eine AIFunction umzuwandeln, die an Modelle übergeben und automatisch ausgeführt werden kann.

Dieser Dekorateur erstellt ein Pydantisches Modell aus der Signatur der Funktion, das verwendet wird, um die an die Funktion übergebenen Argumente zu überprüfen und das JSON-Schema für die Parameter der Funktion zu generieren.

Wenn Sie Parametern Beschreibungen hinzufügen möchten, verwenden Sie den Annotated Typ aus typing einer Zeichenfolgenbeschreibung als zweites Argument. Sie können die Klasse von Field Pydantic auch für eine erweiterte Konfiguration verwenden.

Hinweis

Wenn approval_mode auf "always_require" festgelegt ist, wird die Funktion nicht ausgeführt.

bis zur expliziten Genehmigung gilt dies nur für den automatischen Aufruffluss.

Beachten Sie außerdem, dass einige, die eine Genehmigung erfordern, wenn das Modell mehrere Funktionsaufrufe zurückgibt.

und andere, die nicht, sie werden die Genehmigung für alle anfordern.

ai_function(func: Callable[[...], ReturnT | Awaitable[ReturnT]] | None = None, *, name: str | None = None, description: str | None = None, approval_mode: Literal['always_require', 'never_require'] | None = None, max_invocations: int | None = None, max_invocation_exceptions: int | None = None, additional_properties: dict[str, Any] | None = None) -> AIFunction[Any, ReturnT] | Callable[[Callable[[...], ReturnT | Awaitable[ReturnT]]], AIFunction[Any, ReturnT]]

Parameter

Name Beschreibung
func
Callable[[...], <xref:agent_framework._tools.ReturnT> | Awaitable[<xref:agent_framework._tools.ReturnT>]] | None

Die Funktion zum Dekorieren.

Standardwert: None
name
Erforderlich
str | None
description
Erforderlich
str | None
approval_mode
Erforderlich
Literal['always_require', 'never_require'] | None
max_invocations
Erforderlich
int | None
max_invocation_exceptions
Erforderlich
int | None
additional_properties
Erforderlich

Nur Schlüsselwortparameter

Name Beschreibung
name

Der Name der Funktion. Wenn nicht angegeben, wird das Attribut der Funktion __name__ verwendet.

Standardwert: None
description

Eine Beschreibung der Funktion. Wenn nicht angegeben, wird die Docstring-Zeichenfolge der Funktion verwendet.

Standardwert: None
approval_mode

Gibt an, ob die Genehmigung erforderlich ist, um dieses Tool auszuführen. Der Standardwert ist, dass keine Genehmigung erforderlich ist.

Standardwert: None
max_invocations

Die maximale Anzahl der Aufrufe dieser Funktion. Wenn keines vorhanden ist, sollte es mindestens 1 sein.

Standardwert: None
max_invocation_exceptions

Die maximale Anzahl von Ausnahmen, die bei Aufrufen zulässig sind. Wenn keines vorhanden ist, sollte es mindestens 1 sein.

Standardwert: None
additional_properties

Zusätzliche Eigenschaften, die für die Funktion festgelegt werden sollen.

Standardwert: None

Gibt zurück

Typ Beschreibung
AIFunction[Any, <xref:agent_framework._tools.ReturnT>] | Callable[[Callable[[…], <xref:agent_framework._tools.ReturnT> | Awaitable[<xref:agent_framework._tools.ReturnT>]]], AIFunction[Any, <xref:agent_framework._tools.ReturnT>]]

Beispiele


   from agent_framework import ai_function
   from typing import Annotated


   @ai_function
   def ai_function_example(
       arg1: Annotated[str, "The first argument"],
       arg2: Annotated[int, "The second argument"],
   ) -> str:
       # An example function that takes two arguments and returns a string.
       return f"arg1: {arg1}, arg2: {arg2}"


   # the same function but with approval required to run
   @ai_function(approval_mode="always_require")
   def ai_function_example(
       arg1: Annotated[str, "The first argument"],
       arg2: Annotated[int, "The second argument"],
   ) -> str:
       # An example function that takes two arguments and returns a string.
       return f"arg1: {arg1}, arg2: {arg2}"


   # With custom name and description
   @ai_function(name="custom_weather", description="Custom weather function")
   def another_weather_func(location: str) -> str:
       return f"Weather in {location}"


   # Async functions are also supported
   @ai_function
   async def async_get_weather(location: str) -> str:
       '''Get weather asynchronously.'''
       # Simulate async operation
       return f"Weather in {location}"

chat_middleware

Dekorateur, um eine Funktion als Chat-Middleware zu markieren.

Dieser Dekorateur identifiziert eine Funktion explizit als Chat-Middleware, die ChatContext-Objekte verarbeitet.

chat_middleware(func: Callable[[ChatContext, Callable[[ChatContext], Awaitable[None]]], Awaitable[None]]) -> Callable[[ChatContext, Callable[[ChatContext], Awaitable[None]]], Awaitable[None]]

Parameter

Name Beschreibung
func
Erforderlich

Die Middleware-Funktion, die als Chat-Middleware gekennzeichnet werden soll.

Gibt zurück

Typ Beschreibung

Dieselbe Funktion mit Chat-Middleware-Markierung.

Beispiele


   from agent_framework import chat_middleware, ChatContext, ChatAgent


   @chat_middleware
   async def logging_middleware(context: ChatContext, next):
       print(f"Messages: {len(context.messages)}")
       await next(context)
       print(f"Response: {context.result}")


   # Use with an agent
   agent = ChatAgent(chat_client=client, name="assistant", middleware=logging_middleware)

create_edge_runner

Factory function to create the appropriate edge runner for an edge group.

create_edge_runner(edge_group: EdgeGroup, executors: dict[str, Executor]) -> EdgeRunner

Parameter

Name Beschreibung
edge_group
Erforderlich
<xref:agent_framework._workflows._edge.EdgeGroup>

Die Randgruppe, für die ein Läufer erstellt werden soll.

executors
Erforderlich

Zuordnung von Executor-IDs zu Executorinstanzen.

Gibt zurück

Typ Beschreibung
<xref:agent_framework._workflows._edge_runner.EdgeRunner>

Die entsprechende EdgeRunner-Instanz.

executor

Dekorateur, der eine eigenständige Funktion in eine FunctionExecutor-Instanz konvertiert.

Der @executor Dekorateur ist nur für eigenständige Funktionen auf Modulebene ausgelegt. Verwenden Sie für klassenbasierte Executoren die Executor-Basisklasse für @handler Instanzmethoden.

Unterstützt synchrone und asynchrone Funktionen. Synchrone Funktionen werden in einem Threadpool ausgeführt, um zu verhindern, dass die Ereignisschleife blockiert wird.

Von Bedeutung

Verwendung @executor für eigenständige Funktionen (Modulebene oder lokale Funktionen)

Nicht mit @executorstaticmethod oder classmethod

Für klassenbasierte Executoren, Unterklassenausführer und Verwendung @handler für Instanzmethoden

Verwendung:


   # Standalone async function (RECOMMENDED):
   @executor(id="upper_case")
   async def to_upper(text: str, ctx: WorkflowContext[str]):
       await ctx.send_message(text.upper())


   # Standalone sync function (runs in thread pool):
   @executor
   def process_data(data: str):
       return data.upper()


   # For class-based executors, use @handler instead:
   class MyExecutor(Executor):
       def __init__(self):
           super().__init__(id="my_executor")

       @handler
       async def process(self, data: str, ctx: WorkflowContext[str]):
           await ctx.send_message(data.upper())
executor(func: Callable[[...], Any] | None = None, *, id: str | None = None) -> Callable[[Callable[[...], Any]], FunctionExecutor] | FunctionExecutor

Parameter

Name Beschreibung
func
Callable[[...], Any] | None

Die Funktion zum Dekorieren (bei Verwendung ohne Klammern)

Standardwert: None
id
Erforderlich
str | None

Optionale benutzerdefinierte ID für den Executor. Wenn keine, wird der Funktionsname verwendet.

Nur Schlüsselwortparameter

Name Beschreibung
id
Standardwert: None

Gibt zurück

Typ Beschreibung

Eine FunctionExecutor-Instanz, die in einen Workflow verkabelt werden kann.

Ausnahmen

Typ Beschreibung

Bei Verwendung mit staticmethod oder classmethod (nicht unterstütztes Muster)

function_middleware

Dekorateur, um eine Funktion als Funktions-Middleware zu markieren.

Dieser Dekorateur identifiziert eine Funktion explizit als Funktions-Middleware, die FunctionInvocationContext-Objekte verarbeitet.

function_middleware(func: Callable[[FunctionInvocationContext, Callable[[FunctionInvocationContext], Awaitable[None]]], Awaitable[None]]) -> Callable[[FunctionInvocationContext, Callable[[FunctionInvocationContext], Awaitable[None]]], Awaitable[None]]

Parameter

Name Beschreibung
func
Erforderlich

Die Middleware-Funktion, die als Funktions-Middleware gekennzeichnet werden soll.

Gibt zurück

Typ Beschreibung

Dieselbe Funktion mit Middlewaremarkierung der Funktion.

Beispiele


   from agent_framework import function_middleware, FunctionInvocationContext, ChatAgent


   @function_middleware
   async def logging_middleware(context: FunctionInvocationContext, next):
       print(f"Calling: {context.function.name}")
       await next(context)
       print(f"Result: {context.result}")


   # Use with an agent
   agent = ChatAgent(chat_client=client, name="assistant", middleware=logging_middleware)

get_checkpoint_summary

get_checkpoint_summary(checkpoint: WorkflowCheckpoint) -> WorkflowCheckpointSummary

Parameter

Name Beschreibung
checkpoint
Erforderlich

Gibt zurück

Typ Beschreibung

get_logger

Rufen Sie einen Logger mit dem angegebenen Namen ab, standardmäßig auf "agent_framework".

get_logger(name: str = 'agent_framework') -> Logger

Parameter

Name Beschreibung
name
str

Der Name des Loggers. Der Standardwert ist "agent_framework".

Standardwert: "agent_framework"

Gibt zurück

Typ Beschreibung

Die konfigurierte Loggerinstanz.

handler

Dekorateur zum Registrieren eines Handlers für einen Executor.

handler(func: Callable[[ExecutorT, Any, ContextT], Awaitable[Any]]) -> Callable[[ExecutorT, Any, ContextT], Awaitable[Any]]

Parameter

Name Beschreibung
func
Erforderlich
Callable[[<xref:agent_framework._workflows._executor.ExecutorT>, Any, <xref:agent_framework._workflows._executor.ContextT>], Awaitable[Any]]

Die Funktion zum Dekorieren. Kann keine sein, wenn ohne Parameter verwendet wird.

Gibt zurück

Typ Beschreibung
Callable[[<xref:agent_framework._workflows._executor.ExecutorT>, Any, <xref:agent_framework._workflows._executor.ContextT>], Awaitable[Any]]

Die mit Handlermetadaten versehene Funktion.

Beispiele

@handler async def handle_string(self, message: str, ctx: WorkflowContext[str]) -> None:

...

@handler async def handle_data(self, message: dict, ctx: WorkflowContext[str | int]) -> None:

...

prepare_function_call_results

Bereiten Sie die Werte der Ergebnisse des Funktionsaufrufs vor.

prepare_function_call_results(content: TextContent | DataContent | TextReasoningContent | UriContent | FunctionCallContent | FunctionResultContent | ErrorContent | UsageContent | HostedFileContent | HostedVectorStoreContent | FunctionApprovalRequestContent | FunctionApprovalResponseContent | Any | list[TextContent | DataContent | TextReasoningContent | UriContent | FunctionCallContent | FunctionResultContent | ErrorContent | UsageContent | HostedFileContent | HostedVectorStoreContent | FunctionApprovalRequestContent | FunctionApprovalResponseContent | Any]) -> str

Parameter

Name Beschreibung
content
Erforderlich

Gibt zurück

Typ Beschreibung
str

prepend_agent_framework_to_user_agent

Stellen Sie dem User-Agent in den Headern "agent-framework" voran.

Wenn die Telemetrie des Benutzer-Agents über die AGENT_FRAMEWORK_USER_AGENT_DISABLED Umgebungsvariable deaktiviert ist, enthält der User-Agent-Header keine Agent-Framework-Informationen. Sie wird wie folgt zurückgesendet, oder als leeres Diktat, wenn None übergeben wird.

prepend_agent_framework_to_user_agent(headers: dict[str, Any] | None = None) -> dict[str, Any]

Parameter

Name Beschreibung
headers

Das vorhandene Kopfzeilenwörterbuch.

Standardwert: None

Gibt zurück

Typ Beschreibung

Ein neues Diktat, bei dem "User-Agent" auf "agent-framework-python/{version}" festgelegt ist, wenn Header keine sind. Das geänderte Headerwörterbuch mit "agent-framework-python/{version}" wird dem Benutzer-Agent vorangestellt.

Beispiele


   from agent_framework import prepend_agent_framework_to_user_agent

   # Add agent-framework to new headers
   headers = prepend_agent_framework_to_user_agent()
   print(headers["User-Agent"])  # "agent-framework-python/0.1.0"

   # Prepend to existing headers
   existing = {"User-Agent": "my-app/1.0"}
   headers = prepend_agent_framework_to_user_agent(existing)
   print(headers["User-Agent"])  # "agent-framework-python/0.1.0 my-app/1.0"

response_handler

Dekorateur zum Registrieren eines Handlers zum Behandeln von Antworten für eine Anforderung.

response_handler(func: Callable[[ExecutorT, Any, Any, ContextT], Awaitable[None]]) -> Callable[[ExecutorT, Any, Any, ContextT], Awaitable[None]]

Parameter

Name Beschreibung
func
Erforderlich
Callable[[<xref:agent_framework._workflows._request_info_mixin.ExecutorT>, Any, Any, <xref:agent_framework._workflows._request_info_mixin.ContextT>], Awaitable[None]]

Die Funktion zum Dekorieren.

Gibt zurück

Typ Beschreibung
Callable[[<xref:agent_framework._workflows._request_info_mixin.ExecutorT>, Any, Any, <xref:agent_framework._workflows._request_info_mixin.ContextT>], Awaitable[None]]

Die mit Handlermetadaten versehene Funktion.

Beispiele


   @handler
   async def run(self, message: int, context: WorkflowContext[str]) -> None:
       # Example of a handler that sends a request
       ...
       # Send a request with a `CustomRequest` payload and expect a `str` response.
       await context.request_info(CustomRequest(...), str)


   @response_handler
   async def handle_response(
       self,
       original_request: CustomRequest,
       response: str,
       context: WorkflowContext[str],
   ) -> None:
       # Example of a response handler for the above request
       ...


   @response_handler
   async def handle_response(
       self,
       original_request: CustomRequest,
       response: dict,
       context: WorkflowContext[int],
   ) -> None:
       # Example of a response handler for a request expecting a dict response
       ...

setup_logging

Richten Sie die Protokollierungskonfiguration für das Agent-Framework ein.

setup_logging() -> None

Gibt zurück

Typ Beschreibung

use_agent_middleware

Klassendekoror, der Middleware-Unterstützung zu einer Agentklasse hinzufügt.

Dieser Dekorateur fügt Middleware-Funktionen zu jeder Agentklasse hinzu. Sie umschließt die run() Und run_stream() Methoden, um middleware-Ausführung bereitzustellen.

Die Middleware-Ausführung kann jederzeit beendet werden, indem die context.terminate Eigenschaft auf "True" festgelegt wird. Sobald die Pipeline festgelegt ist, wird die Ausführung weiterer Middleware beendet, sobald die Steuerung zur Pipeline zurückkehrt.

Hinweis

Dieser Dekorateur wird bereits auf integrierte Agentklassen angewendet. Sie müssen nur verwenden

wenn Sie benutzerdefinierte Agentimplementierungen erstellen.

use_agent_middleware(agent_class: type[TAgent]) -> type[TAgent]

Parameter

Name Beschreibung
agent_class
Erforderlich
type[<xref:TAgent>]

Die Agentklasse, der Middleware-Unterstützung hinzugefügt werden soll.

Gibt zurück

Typ Beschreibung
type[~<xref:TAgent>]

Die geänderte Agentklasse mit Middleware-Unterstützung.

Beispiele


   from agent_framework import use_agent_middleware


   @use_agent_middleware
   class CustomAgent:
       async def run(self, messages, **kwargs):
           # Agent implementation
           pass

       async def run_stream(self, messages, **kwargs):
           # Streaming implementation
           pass

use_chat_middleware

Klassendekoror, der Middleware-Unterstützung zu einer Chatclientklasse hinzufügt.

Dieser Dekorateur fügt Middleware-Funktionen zu jeder Chatclientklasse hinzu. Sie umschließt die get_response() Und get_streaming_response() Methoden, um middleware-Ausführung bereitzustellen.

Hinweis

Dieser Dekorateur wird bereits auf integrierte Chatclientklassen angewendet. Sie müssen nur verwenden

wenn Sie benutzerdefinierte Chatclientimplementierungen erstellen.

use_chat_middleware(chat_client_class: type[TChatClient]) -> type[TChatClient]

Parameter

Name Beschreibung
chat_client_class
Erforderlich
type[<xref:TChatClient>]

Die Chatclientklasse, der Middleware-Unterstützung hinzugefügt werden soll.

Gibt zurück

Typ Beschreibung
type[~<xref:TChatClient>]

Die geänderte Chatclientklasse mit Middleware-Unterstützung.

Beispiele


   from agent_framework import use_chat_middleware


   @use_chat_middleware
   class CustomChatClient:
       async def get_response(self, messages, **kwargs):
           # Chat client implementation
           pass

       async def get_streaming_response(self, messages, **kwargs):
           # Streaming implementation
           pass

use_function_invocation

Klassendekoror, der Toolanrufe für einen Chatclient ermöglicht.

Dieser Dekorateur umschließt die get_response Und get_streaming_response Methoden, um Funktionsaufrufe aus dem Modell automatisch zu verarbeiten, auszuführen und die Ergebnisse zurück an das Modell zur weiteren Verarbeitung zurückzugeben.

use_function_invocation(chat_client: type[TChatClient]) -> type[TChatClient]

Parameter

Name Beschreibung
chat_client
Erforderlich
type[<xref:TChatClient>]

Die Chatclientklasse zum Dekorieren.

Gibt zurück

Typ Beschreibung
type[~<xref:TChatClient>]

Die dekorierte Chatclientklasse mit aktivierter Funktionsaufruffunktion.

Ausnahmen

Typ Beschreibung

Wenn der Chatclient nicht über die erforderlichen Methoden verfügt.

Beispiele


   from agent_framework import use_function_invocation, BaseChatClient


   @use_function_invocation
   class MyCustomClient(BaseChatClient):
       async def get_response(self, messages, **kwargs):
           # Implementation here
           pass

       async def get_streaming_response(self, messages, **kwargs):
           # Implementation here
           pass


   # The client now automatically handles function calls
   client = MyCustomClient()

validate_workflow_graph

Komfortfunktion zum Überprüfen eines Workflowdiagramms.

validate_workflow_graph(edge_groups: Sequence[EdgeGroup], executors: dict[str, Executor], start_executor: Executor) -> None

Parameter

Name Beschreibung
edge_groups
Erforderlich
Sequence[<xref:agent_framework._workflows._edge.EdgeGroup>]

Liste der Edgegruppen im Workflow

executors
Erforderlich

Zuordnung von Executor-IDs zu Executorinstanzen

start_executor
Erforderlich

Der Startausführer (kann Instanz oder ID sein)

Gibt zurück

Typ Beschreibung

Ausnahmen

Typ Beschreibung

Wenn eine Überprüfung fehlschlägt