Delen via


agent_framework Pakket

Pakketten

a2a
ag_ui
anthropic
azure
chatkit
declarative
devui
lab
mem0
microsoft
ollama
openai
redis

Modules

exceptions
observability

Klassen

AIFunction

Een hulpprogramma waarmee een Python-functie wordt verpakt om deze aan te roepen door AI-modellen.

Deze klasse verpakt een Python-functie om deze aan te roepen door AI-modellen met automatische parametervalidatie en het genereren van JSON-schema's.

Initialiseer de AIFunction.

AgentExecutor

ingebouwde uitvoerder waarmee een agent wordt verpakt voor het verwerken van berichten.

AgentExecutor past het gedrag aan op basis van de uitvoeringsmodus van de werkstroom:

  • run_stream(): verzendt incrementele AgentRunUpdateEvent-gebeurtenissen naarmate de agent tokens produceert
  • run(): verzendt één AgentRunEvent met het volledige antwoord

De uitvoerder detecteert automatisch de modus via WorkflowContext.is_streaming().

Initialiseer de uitvoerder met een unieke id.

AgentExecutorRequest

Een aanvraag voor een agentexecutor.

AgentExecutorResponse

Een reactie van een agentexecutor.

AgentInputRequest

Vraag menselijke invoer aan voordat een agent wordt uitgevoerd in opbouwwerkstromen op hoog niveau.

Verzonden via RequestInfoEvent wanneer een werkstroom wordt onderbroken voordat een agent wordt uitgevoerd. Het antwoord wordt in het gesprek geïnjecteerd als een gebruikersbericht om het gedrag van de agent te sturen.

Dit is het standaardaanvraagtype dat wordt gebruikt door .with_request_info() op SequentialBuilder, ConcurrentBuilder, GroupChatBuilder en HandoffBuilder.

AgentMiddleware

Abstracte basisklasse voor agent-middleware waarmee agentinvocations kunnen worden onderschept.

Met agent-middleware kunt u agentaanroepen voor en na de uitvoering onderscheppen en wijzigen. U kunt berichten inspecteren, context wijzigen, resultaten overschrijven of de uitvoering vroegtijdig beëindigen.

Opmerking

AgentMiddleware is een abstracte basisklasse. U moet deze subklassen en implementeren

de methode process() voor het maken van aangepaste agent-middleware.

AgentProtocol

Een protocol voor een agent die kan worden aangeroepen.

Dit protocol definieert de interface die alle agents moeten implementeren, inclusief eigenschappen voor identificatie en methoden voor uitvoering.

Opmerking

Protocollen maken gebruik van structurele subtyping (eenden typen). Klassen hebben niet nodig

om expliciet over te nemen van dit protocol om als compatibel te worden beschouwd.

Hierdoor kunt u volledig aangepaste agents maken zonder gebruik te maken

alle basisklassen van Agent Framework.

AgentRunContext

Contextobject voor agent-middleware-aanroepen.

Deze context wordt doorgegeven via de middleware-pijplijn van de agent en bevat alle informatie over de aanroep van de agent.

Initialiseer de AgentRunContext.

AgentRunEvent

Gebeurtenis geactiveerd wanneer een agentuitvoering is voltooid.

Initialiseer de uitvoeringsbeurtenis van de agent.

AgentRunResponse

Vertegenwoordigt het antwoord op een agentuitvoeringsaanvraag.

Bevat een of meer antwoordberichten en metagegevens over het antwoord. Een typisch antwoord bevat één bericht, maar kan meerdere berichten bevatten in scenario's met betrekking tot functieaanroepen, RAG-ophaalfuncties of complexe logica.

Initialiseer een AgentRunResponse.

AgentRunResponseUpdate

Vertegenwoordigt één streaming-antwoordsegment van een agent.

Initialiseer een AgentRunResponseUpdate.

AgentRunUpdateEvent

Gebeurtenis geactiveerd wanneer een agent streamingberichten heeft.

Initialiseer de streaming-gebeurtenis van de agent.

AgentThread

De agentthreadklasse, dit kan zowel een lokaal beheerde thread als een thread vertegenwoordigen die wordt beheerd door de service.

Een AgentThread onderhoudt de gespreksstatus en berichtgeschiedenis voor een agentinteractie. Het kan een door de service beheerde thread (via service_thread_id) of een lokaal berichtenarchief (via message_store), maar niet beide gebruiken.

Initialiseer een AgentThread, gebruik deze methode niet handmatig, altijd: agent.get_new_thread().

Opmerking

Service_thread_id of message_store kunnen worden ingesteld, maar niet beide.

AggregateContextProvider

Een ContextProvider die meerdere contextproviders bevat.

Het delegeert gebeurtenissen naar meerdere contextproviders en voegt antwoorden van deze gebeurtenissen samen voordat deze worden geretourneerd. Hierdoor kunt u meerdere contextproviders combineren tot één provider.

Opmerking

Er wordt automatisch een AggregateContextProvider gemaakt wanneer u één context doorgeeft

provider of een reeks contextproviders voor de agentconstructor.

Initialiseer de AggregateContextProvider met contextproviders.

BaseAgent

Basisklasse voor alle Agent Framework-agents.

Deze klasse biedt kernfunctionaliteit voor agent-implementaties, waaronder contextproviders, middleware-ondersteuning en threadbeheer.

Opmerking

BaseAgent kan niet rechtstreeks worden geïnstantieerd omdat het de

run(), run_stream() en andere methoden die zijn vereist voor AgentProtocol.

Gebruik een concrete implementatie zoals ChatAgent of maak een subklasse.

Initialiseer een BaseAgent-exemplaar.

BaseAnnotation

Basisklasse voor alle TYPEN AI-aantekeningen.

Initialiseer BaseAnnotation.

BaseChatClient

Basisklasse voor chatclients.

Deze abstracte basisklasse biedt kernfunctionaliteit voor implementaties van chatclients, waaronder middleware-ondersteuning, berichtvoorbereiding en normalisatie van hulpprogramma's.

Opmerking

BaseChatClient kan niet rechtstreeks worden geïnstantieerd omdat het een abstracte basisklasse is.

Subklassen moeten _inner_get_response() en _inner_get_streaming_response() implementeren.

Initialiseer een BaseChatClient-exemplaar.

BaseContent

Vertegenwoordigt inhoud die wordt gebruikt door AI-services.

Initialiseer BaseContent.

Case

Runtime-wrapper combineert een switch-casepredicaat met het doel.

Elke case koppelt een booleaanse predicaat met de uitvoerder die het bericht moet verwerken wanneer het predicaat waar is. De runtime houdt deze lichtgewicht container gescheiden van de serialisable SwitchCaseEdgeGroupCase , zodat de uitvoering kan worden uitgevoerd met live aanroepbare bestanden zonder de permanente status te vervuilen.

ChatAgent

Een chatclientagent.

Dit is de primaire agent-implementatie die gebruikmaakt van een chatclient om te communiceren met taalmodellen. Het ondersteunt hulpprogramma's, contextproviders, middleware en zowel streaming- als niet-streaming-antwoorden.

Initialiseer een ChatAgent-exemplaar.

Opmerking

De set parameters van frequency_penalty tot request_kwargs worden gebruikt om

bel de chatclient. Ze kunnen ook worden doorgegeven aan beide uitvoeringsmethoden.

Wanneer beide zijn ingesteld, hebben de methoden die worden doorgegeven aan de uitvoeringsmethoden voorrang.

ChatClientProtocol

Een protocol voor een chatclient die antwoorden kan genereren.

Dit protocol definieert de interface die alle chatclients moeten implementeren, inclusief methoden voor het genereren van zowel streaming- als niet-streaming-antwoorden.

Opmerking

Protocollen maken gebruik van structurele subtyping (eenden typen). Klassen hebben niet nodig

om expliciet over te nemen van dit protocol om als compatibel te worden beschouwd.

ChatContext

Contextobject voor chat-middleware-aanroepen.

Deze context wordt doorgegeven via de pijplijn voor chat-middleware en bevat alle informatie over de chataanvraag.

Initialiseer de ChatContext.

ChatMessage

Vertegenwoordigt een chatbericht.

Initialiseer ChatMessage.

ChatMessageStore

Een in-memory implementatie van ChatMessageStoreProtocol waarin berichten in een lijst worden opgeslagen.

Deze implementatie biedt een eenvoudige, lijstgebaseerde opslag voor chatberichten met ondersteuning voor serialisatie en deserialisatie. Hiermee worden alle vereiste methoden van het ChatMessageStoreProtocol protocol geïmplementeerd.

Het archief onderhoudt berichten in het geheugen en biedt methoden voor het serialiseren en deserialiseren van de status voor persistentiedoeleinden.

Maak een ChatMessageStore voor gebruik in een thread.

ChatMessageStoreProtocol

Definieert methoden voor het opslaan en ophalen van chatberichten die zijn gekoppeld aan een specifieke thread.

Implementaties van dit protocol zijn verantwoordelijk voor het beheren van de opslag van chatberichten, waaronder het verwerken van grote hoeveelheden gegevens door berichten af te kapen of samen te vatten indien nodig.

ChatMiddleware

Abstracte basisklasse voor chat-middleware die chatclientaanvragen kan onderscheppen.

Met chat-middleware kunt u chatclientaanvragen voor en na uitvoering onderscheppen en wijzigen. U kunt berichten wijzigen, systeemprompts, logboekaanvragen toevoegen of chatantwoorden overschrijven.

Opmerking

ChatMiddleware is een abstracte basisklasse. U moet deze subklassen en implementeren

de methode process() voor het maken van aangepaste chat-middleware.

ChatOptions

Algemene aanvraaginstellingen voor AI-services.

Initialiseer ChatOptions.

ChatResponse

Vertegenwoordigt het antwoord op een chataanvraag.

Initialiseert een ChatResponse met de opgegeven parameters.

ChatResponseUpdate

Vertegenwoordigt één streaming-antwoordsegment van een ChatClient.

Initialiseert een ChatResponseUpdate met de opgegeven parameters.

CheckpointStorage

Protocol voor back-ends voor controlepuntopslag.

CitationAnnotation

Vertegenwoordigt een aantekening van een bronvermelding.

Initialiseer CitationAnnotation.

ConcurrentBuilder

Opbouwfunctie op hoog niveau voor gelijktijdige agentwerkstromen.

  • deelnemers([...]) accepteert een lijst met AgentProtocol (aanbevolen) of Executor.

  • register_participants([...]) accepteert een lijst met factory's voor AgentProtocol (aanbevolen)

    of Uitvoerders factory's

  • build() draden: dispatcher -> fan-out -> deelnemers -> fan-in -> aggregator.

  • with_aggregator(...) overschrijft de standaardaggregator met een executor of callback.

  • register_aggregator(...) accepteert een factory voor een executor als aangepaste aggregator.

Gebruik:


   from agent_framework import ConcurrentBuilder

   # Minimal: use default aggregator (returns list[ChatMessage])
   workflow = ConcurrentBuilder().participants([agent1, agent2, agent3]).build()

   # With agent factories
   workflow = ConcurrentBuilder().register_participants([create_agent1, create_agent2, create_agent3]).build()


   # Custom aggregator via callback (sync or async). The callback receives
   # list[AgentExecutorResponse] and its return value becomes the workflow's output.
   def summarize(results: list[AgentExecutorResponse]) -> str:
       return " | ".join(r.agent_run_response.messages[-1].text for r in results)


   workflow = ConcurrentBuilder().participants([agent1, agent2, agent3]).with_aggregator(summarize).build()


   # Custom aggregator via a factory
   class MyAggregator(Executor):
       @handler
       async def aggregate(self, results: list[AgentExecutorResponse], ctx: WorkflowContext[Never, str]) -> None:
           await ctx.yield_output(" | ".join(r.agent_run_response.messages[-1].text for r in results))


   workflow = (
       ConcurrentBuilder()
       .register_participants([create_agent1, create_agent2, create_agent3])
       .register_aggregator(lambda: MyAggregator(id="my_aggregator"))
       .build()
   )


   # Enable checkpoint persistence so runs can resume
   workflow = ConcurrentBuilder().participants([agent1, agent2, agent3]).with_checkpointing(storage).build()

   # Enable request info before aggregation
   workflow = ConcurrentBuilder().participants([agent1, agent2]).with_request_info().build()
Context

Een klasse met context die moet worden geleverd aan het AI-model, zoals geleverd door een ContextProvider.

Elke ContextProvider heeft de mogelijkheid om voor elke aanroep een eigen context te bieden. De contextklasse bevat de aanvullende context die wordt geleverd door de ContextProvider. Deze context wordt gecombineerd met context die door andere providers wordt geleverd voordat deze wordt doorgegeven aan het AI-model. Deze context wordt aanroepen en wordt niet opgeslagen als onderdeel van de chatgeschiedenis.

Maak een nieuw contextobject.

ContextProvider

Basisklasse voor alle contextproviders.

Een contextprovider is een onderdeel dat kan worden gebruikt om het contextbeheer van ai te verbeteren. Het kan luisteren naar wijzigingen in het gesprek en extra context bieden aan het AI-model vlak voordat u het aanroept.

Opmerking

ContextProvider is een abstracte basisklasse. U moet deze subklassen en implementeren

de methode aanroepen() om een aangepaste contextprovider te maken. In het ideale geval moet u

implementeer ook de methoden invoked() en thread_created() om het gesprek bij te houden

staat, maar deze zijn optioneel.

DataContent

Vertegenwoordigt binaire gegevensinhoud met een gekoppeld mediatype (ook wel een MIME-type genoemd).

Belangrijk

Dit is voor binaire gegevens die worden weergegeven als een gegevens-URI, niet voor onlinebronnen.

Gebruik UriContent voor onlinebronnen.

Initialiseert een DataContent-exemplaar.

Belangrijk

Dit is voor binaire gegevens die worden weergegeven als een gegevens-URI, niet voor onlinebronnen.

Gebruik UriContent voor onlinebronnen.

Default

Runtimeweergave van de standaardvertakking in een switch-casegroep.

De standaardvertakking wordt alleen aangeroepen wanneer er geen andere casepredicaten overeenkomen. In de praktijk bestaat het gegarandeerd, zodat routering nooit een leeg doel produceert.

Edge

Modelleer een gerichte, optioneel voorwaardelijke hand-off tussen twee uitvoerders.

Elke Edge legt de minimale metagegevens vast die nodig zijn om een bericht van de ene uitvoerder naar de andere te verplaatsen in de werkstroomgrafiek. Optioneel wordt een booleaanse predicaat ingesloten waarmee wordt bepaald of de rand tijdens runtime moet worden genomen. Door de rand omlaag te serialiseren naar primitieven, kunnen we de topologie van een werkstroom reconstrueren, ongeacht het oorspronkelijke Python-proces.

Initialiseer een volledig opgegeven rand tussen twee werkstroomexecutors.

EdgeDuplicationError

Er is een uitzondering opgetreden wanneer dubbele randen worden gedetecteerd in de werkstroom.

ErrorContent

Vertegenwoordigt een fout.

Opmerkingen: Meestal gebruikt voor niet-fatale fouten, waarbij er iets misging als onderdeel van de bewerking, maar de bewerking kon nog steeds worden voortgezet.

Initialiseert een ErrorContent-exemplaar.

Executor

Basisklasse voor alle werkstroomexecutors die berichten verwerken en berekeningen uitvoeren.

Overzicht

Uitvoerders zijn de fundamentele bouwstenen van werkstromen, die afzonderlijke verwerkingseenheden vertegenwoordigen die berichten ontvangen, bewerkingen uitvoeren en uitvoer produceren. Elke uitvoerder wordt uniek geïdentificeerd en kan specifieke berichttypen verwerken via versierde handlermethoden.

Type systeem

Uitvoerders hebben een uitgebreid typesysteem waarmee hun mogelijkheden worden gedefinieerd:

Invoertypen

De typen berichten die een executor kan verwerken, gedetecteerd op basis van handlermethodehandtekeningen:


   class MyExecutor(Executor):
       @handler
       async def handle_string(self, message: str, ctx: WorkflowContext) -> None:
           # This executor can handle 'str' input types

Toegang via de eigenschap input_types .

Uitvoertypen

De typen berichten die een uitvoerder naar andere uitvoerders kan verzenden via ctx.send_message():


   class MyExecutor(Executor):
       @handler
       async def handle_data(self, message: str, ctx: WorkflowContext[int | bool]) -> None:
           # This executor can send 'int' or 'bool' messages

Toegang via de eigenschap output_types .

Typen werkstroomuitvoer

De typen gegevens die een uitvoerder kan verzenden als uitvoer op werkstroomniveau via ctx.yield_output():


   class MyExecutor(Executor):
       @handler
       async def process(self, message: str, ctx: WorkflowContext[int, str]) -> None:
           # Can send 'int' messages AND yield 'str' workflow outputs

Toegang via de eigenschap workflow_output_types .

HandlerDetectie

Uitvoerders ontdekken hun mogelijkheden via versierde methoden:

@handler Decorateur

Hiermee markeert u methoden waarmee binnenkomende berichten worden verwerkt:


   class MyExecutor(Executor):
       @handler
       async def handle_text(self, message: str, ctx: WorkflowContext[str]) -> None:
           await ctx.send_message(message.upper())

Onderschepping van subwerkstroomaanvragen

Methoden gebruiken @handler om subwerkstroomaanvragen te onderscheppen:


   class ParentExecutor(Executor):
       @handler
       async def handle_subworkflow_request(
           self,
           request: SubWorkflowRequestMessage,
           ctx: WorkflowContext[SubWorkflowResponseMessage],
       ) -> None:
           if self.is_allowed(request.domain):
               response = request.create_response(data=True)
               await ctx.send_message(response, target_id=request.executor_id)
           else:
               await ctx.request_info(request.source_event, response_type=request.source_event.response_type)

Contexttypen

Handlermethoden ontvangen verschillende WorkflowContext-varianten op basis van hun typeaantekeningen:

WorkflowContext (geen typeparameters)

Voor handlers die alleen bijwerkingen uitvoeren zonder berichten te verzenden of uitvoer te genereren:


   class LoggingExecutor(Executor):
       @handler
       async def log_message(self, msg: str, ctx: WorkflowContext) -> None:
           print(f"Received: {msg}")  # Only logging, no outputs

WorkflowContext[T_Out]

Hiermee kunt u berichten van het type T_Out verzenden via ctx.send_message():


   class ProcessorExecutor(Executor):
       @handler
       async def handler(self, msg: str, ctx: WorkflowContext[int]) -> None:
           await ctx.send_message(42)  # Can send int messages

WorkflowContext[T_Out, T_W_Out]

Hiermee kunt u zowel het verzenden van berichten (T_Out) als het opleveren van werkstroomuitvoer (T_W_Out):


   class DualOutputExecutor(Executor):
       @handler
       async def handler(self, msg: str, ctx: WorkflowContext[int, str]) -> None:
           await ctx.send_message(42)  # Send int message
           await ctx.yield_output("done")  # Yield str workflow output

Functie-uitvoerders

Eenvoudige functies kunnen worden geconverteerd naar uitvoerders met behulp van de @executor decorator:


   @executor
   async def process_text(text: str, ctx: WorkflowContext[str]) -> None:
       await ctx.send_message(text.upper())


   # Or with custom ID:
   @executor(id="text_processor")
   def sync_process(text: str, ctx: WorkflowContext[str]) -> None:
       ctx.send_message(text.lower())  # Sync functions run in thread pool

Samenstelling van subwerkstroom

Uitvoerders kunnen subwerkstromen bevatten met behulp van WorkflowExecutor. Subwerkstromen kunnen aanvragen indienen die bovenliggende werkstromen kunnen onderscheppen. Zie de documentatie van WorkflowExecutor voor meer informatie over werkstroomsamenstellingspatronen en de verwerking van aanvragen/antwoorden.

Toestandsbeheer

Uitvoerders kunnen statussen bevatten die behouden blijven voor werkstroomuitvoeringen en controlepunten. Overschrijf de on_checkpoint_save - en on_checkpoint_restore methoden voor het implementeren van aangepaste statusserialisatie- en herstellogica.

Opmerkingen bij de implementatie

  • Voer execute() niet rechtstreeks aan. Deze wordt aangeroepen door de werkstroomengine
  • Execute () niet overschrijven - handlers definiëren met decorators in plaats daarvan
  • Elke uitvoerder moet ten minste één @handler-methode hebben
  • Handler-methodehandtekeningen worden gevalideerd tijdens initialisatie

Initialiseer de uitvoerder met een unieke id.

ExecutorCompletedEvent

Gebeurtenis geactiveerd wanneer een uitvoerdershandler is voltooid.

Initialiseer de executor-gebeurtenis met een uitvoerders-id en optionele gegevens.

ExecutorEvent

Basisklasse voor uitvoerdersgebeurtenissen.

Initialiseer de executor-gebeurtenis met een uitvoerders-id en optionele gegevens.

ExecutorFailedEvent

Gebeurtenis geactiveerd wanneer een uitvoerdershandler een fout genereert.

ExecutorInvokedEvent

Gebeurtenis geactiveerd wanneer een uitvoerdershandler wordt aangeroepen.

Initialiseer de executor-gebeurtenis met een uitvoerders-id en optionele gegevens.

FanInEdgeGroup

Vertegenwoordig een samenvoegingsset randen die één downstream-uitvoerprogramma voeren.

Ventilatorgroepen worden meestal gebruikt wanneer meerdere upstreamfasen onafhankelijk berichten produceren die allemaal bij dezelfde downstreamprocessor moeten aankomen.

Bouw een fan-in-toewijzing die meerdere bronnen samenvoegt in één doel.

FanOutEdgeGroup

Vertegenwoordig een edge-groep in broadcaststijl met optionele selectielogica.

Een fan-out stuurt een bericht dat door één bronexecutor wordt geproduceerd door naar een of meer downstreamexecutors. Tijdens runtime kunnen we de doelen verder beperken door een selection_func uit te voeren die de nettolading inspecteert en de subset van id's retourneert die het bericht moeten ontvangen.

Maak een uitwaaiertoewijzing van één bron naar veel doelen.

FileCheckpointStorage

Op bestanden gebaseerde controlepuntopslag voor persistentie.

Initialiseer de bestandsopslag.

FinishReason

Vertegenwoordigt de reden waarom een chatantwoord is voltooid.

Initialiseer FinishReason met een waarde.

FunctionApprovalRequestContent

Vertegenwoordigt een aanvraag voor gebruikersgoedkeuring van een functieaanroep.

Initialiseert een FunctionApprovalRequestContent-exemplaar.

FunctionApprovalResponseContent

Vertegenwoordigt een antwoord voor gebruikersgoedkeuring van een functieaanroep.

Initialiseert een FunctionApprovalResponseContent-exemplaar.

FunctionCallContent

Vertegenwoordigt een aanvraag voor een functieoproep.

Initialiseert een FunctionCallContent-exemplaar.

FunctionExecutor

Executor die een door de gebruiker gedefinieerde functie verpakt.

Met deze uitvoerder kunnen gebruikers eenvoudige functies definiëren (zowel synchroniseren als asynchroon) en ze gebruiken als werkstroomexecutors zonder dat ze volledige uitvoeringsklassen hoeven te maken.

Synchrone functies worden uitgevoerd in een threadpool met behulp van asyncio.to_thread() om te voorkomen dat de gebeurtenislus wordt geblokkeerd.

Initialiseer de FunctionExecutor met een door de gebruiker gedefinieerde functie.

FunctionInvocationConfiguration

Configuratie voor functie-aanroep in chatclients.

Deze klasse wordt automatisch gemaakt op elke chatclient die functie-aanroep ondersteunt. Dit betekent dat u in de meeste gevallen alleen de kenmerken van het exemplaar kunt wijzigen, in plaats van een nieuwe te maken.

Initialiseer FunctionInvocationConfiguration.

FunctionInvocationContext

Contextobject voor functie-middleware-aanroepen.

Deze context wordt doorgegeven via de pijplijn van de functie-middleware en bevat alle informatie over de aanroep van de functie.

Initialiseer de FunctionInvocationContext.

FunctionMiddleware

Abstracte basisklasse voor functie-middleware waarmee functie-aanroepen kunnen worden onderschept.

Met functie-middleware kunt u functie-/hulpprogrammaaanroepen voor en na de uitvoering onderscheppen en wijzigen. U kunt argumenten valideren, resultaten in de cache opslaan, aanroepen van logboeken of de uitvoering van de functie overschrijven.

Opmerking

FunctionMiddleware is een abstracte basisklasse. U moet deze subklassen en implementeren

de methode process() voor het maken van aangepaste functie-middleware.

FunctionResultContent

Vertegenwoordigt het resultaat van een functie-aanroep.

Initialiseert een FunctionResultContent-exemplaar.

GraphConnectivityError

Er is een uitzondering opgetreden wanneer er verbindingsproblemen met grafieken worden gedetecteerd.

GroupChatBuilder

Opbouwfunctie op hoog niveau voor door manager gerichte groepschatwerkstromen met dynamische indeling.

GroupChat coördineert gesprekken met meerdere agents met behulp van een manager die selecteert welke deelnemer hierna spreekt. De manager kan een eenvoudige Python-functie (set_select_speakers_func) of een op agents gebaseerde selector zijn via set_manager. Deze twee benaderingen sluiten elkaar wederzijds uit.

Kernwerkstroom:

  1. Deelnemers definiëren: lijst met agents (gebruikt hun .name) of dicttoewijzingsnamen aan agents

  2. Sprekerselectie configureren: set_select_speakers_func OF

    set_manager (niet beide)

  3. Optioneel: ronde limieten instellen, controlepunten, beëindigingsvoorwaarden

  4. De werkstroom bouwen en uitvoeren

Patronen voor sprekerselectie:

Patroon 1: Eenvoudige selectie op basis van functies (aanbevolen)


   from agent_framework import GroupChatBuilder, GroupChatStateSnapshot


   def select_next_speaker(state: GroupChatStateSnapshot) -> str | None:
       # state contains: task, participants, conversation, history, round_index
       if state["round_index"] >= 5:
           return None  # Finish
       last_speaker = state["history"][-1].speaker if state["history"] else None
       if last_speaker == "researcher":
           return "writer"
       return "researcher"


   workflow = (
       GroupChatBuilder()
       .set_select_speakers_func(select_next_speaker)
       .participants([researcher_agent, writer_agent])  # Uses agent.name
       .build()
   )

Patroon 2: selectie op basis van LLM


   from agent_framework import ChatAgent
   from agent_framework.azure import AzureOpenAIChatClient

   manager_agent = AzureOpenAIChatClient().create_agent(
       instructions="Coordinate the conversation and pick the next speaker.",
       name="Coordinator",
       temperature=0.3,
       seed=42,
       max_tokens=500,
   )

   workflow = (
       GroupChatBuilder()
       .set_manager(manager_agent, display_name="Coordinator")
       .participants([researcher, writer])  # Or use dict: researcher=r, writer=w
       .with_max_rounds(10)
       .build()
   )

Patroon 3: Informatie aanvragen voor feedback tijdens gesprekken


   from agent_framework import GroupChatBuilder

   # Pause before all participants
   workflow = (
       GroupChatBuilder()
       .set_select_speakers_func(select_next_speaker)
       .participants([researcher, writer])
       .with_request_info()
       .build()
   )

   # Pause only before specific participants
   workflow = (
       GroupChatBuilder()
       .set_select_speakers_func(select_next_speaker)
       .participants([researcher, writer, editor])
       .with_request_info(agents=[editor])  # Only pause before editor responds
       .build()
   )

Specificatie van deelnemers:

Twee manieren om deelnemers op te geven:

  • Lijstformulier: [agent1, agent2] - gebruikt agent.name kenmerk voor namen van deelnemers
  • Dict-formulier: {name1: agent1, name2: agent2} - expliciet naambesturingselement
  • Trefwoordformulier: deelnemers(name1=agent1, name2=agent2) - expliciet naambesturingselement

Statusmomentopnamestructuur:

De GroupChatStateSnapshot die is doorgegeven aan set_select_speakers_func bevat:

  • taak: ChatMessage - Oorspronkelijke gebruikerstaak
  • deelnemers: dict[str, str] - Toewijzing van namen van deelnemers aan beschrijvingen
  • gesprek: tuple[ChatMessage, ...] - Volledige gespreksgeschiedenis
  • geschiedenis: tuple[GroupChatTurn, ...] - Turn-by-turn record met sprekertoeschrijving
  • round_index: int - Aantal selectierondes van manager tot nu toe
  • pending_agent: str | Geen - Naam van agent die momenteel wordt verwerkt (indien aanwezig)

Belangrijke beperkingen:

  • Kan niet combineren set_select_speakers_func en set_manager
  • Namen van deelnemers moeten uniek zijn
  • Wanneer u een lijstformulier gebruikt, moeten agents een niet-leeg naamkenmerk hebben

Initialiseer de GroupChatBuilder.

GroupChatDirective

Instructies die worden verzonden door een implementatie van een groepschatmanager.

HandoffBuilder

Fluent Builder voor conversationele handoff-werkstromen met coördinator- en gespecialiseerde agenten.

Met het handoff-patroon kan een coördinatoragent aanvragen doorsturen naar gespecialiseerde agenten. De interactiemodus bepaalt of de werkstroom gebruikersinvoer aanvraagt nadat elke agentreactie is voltooid of autonoom wordt voltooid zodra agents klaar zijn met reageren. Een beëindigingsvoorwaarde bepaalt wanneer de werkstroom moet stoppen met het aanvragen van invoer en het voltooien.

Routeringspatronen:

Single-Tier (standaard): Alleen de coördinator kan aan specialisten afleveren. Nadat een specialist reageert, keert het besturingselement standaard terug naar de gebruiker voor meer invoer. Hiermee wordt een cyclische stroom gemaakt: gebruiker - coördinator -> [optionele specialist] -> gebruiker -> coördinator -> ...> Gebruik with_interaction_mode("autonome") om het aanvragen van extra gebruikersinvoer over te slaan en het uiteindelijke gesprek te genereren wanneer een agent reageert zonder delegeren.

Meerdere lagen (geavanceerd): Specialisten kunnen met behulp van .add_handoff()aan andere specialisten afleveren. Dit biedt meer flexibiliteit voor complexe werkstromen, maar is minder beheerbaar dan het patroon met één laag. Gebruikers verliezen realtime inzicht in tussenliggende stappen tijdens gespecialiseerde tot gespecialiseerde handoffs (hoewel de volledige gespreksgeschiedenis inclusief alle handoffs behouden blijft en achteraf kan worden geïnspecteerd).

Belangrijke functies:

  • Automatische handoff-detectie: De coördinator roept een handoff-tool aan waarvan

    argumenten (bijvoorbeeld {"handoff_to": "shipping_agent"}) identificeren welke specialist de controle moet ontvangen.

  • Automatisch gegenereerde hulpprogramma's: de opbouwfunctie synthetiseert standaard handoff_to_<agenthulpprogramma's> voor de coördinator, zodat u geen tijdelijke aanduidingen voor functies handmatig definieert.

  • Volledige gespreksgeschiedenis: het hele gesprek (inclusief eventuele ChatMessage.additional_properties) blijft behouden en doorgegeven aan elke agent.

  • Beëindigingsbeheer: wordt standaard beëindigd na 10 gebruikersberichten. Overschrijven met .with_termination_condition(lambda conv: ...) voor aangepaste logica (bijvoorbeeld 'afscheid').

  • Interactiemodi: kies human_in_loop (standaard) om gebruikers te vragen tussen agentwisselingen of autonoom om door te gaan met doorsturen naar agents zonder te vragen om gebruikersinvoer totdat een overdracht plaatsvindt of een beëindigings-/turnlimiet is bereikt (standaard autonome turnlimiet: 50).

  • Controlepunten: optionele persistentie voor hervatbare werkstromen.

Gebruik (Single-Tier):


   from agent_framework import HandoffBuilder
   from agent_framework.openai import OpenAIChatClient

   chat_client = OpenAIChatClient()

   # Create coordinator and specialist agents
   coordinator = chat_client.create_agent(
       instructions=(
           "You are a frontline support agent. Assess the user's issue and decide "
           "whether to hand off to 'refund_agent' or 'shipping_agent'. When delegation is "
           "required, call the matching handoff tool (for example `handoff_to_refund_agent`)."
       ),
       name="coordinator_agent",
   )

   refund = chat_client.create_agent(
       instructions="You handle refund requests. Ask for order details and process refunds.",
       name="refund_agent",
   )

   shipping = chat_client.create_agent(
       instructions="You resolve shipping issues. Track packages and update delivery status.",
       name="shipping_agent",
   )

   # Build the handoff workflow - default single-tier routing
   workflow = (
       HandoffBuilder(
           name="customer_support",
           participants=[coordinator, refund, shipping],
       )
       .set_coordinator(coordinator)
       .build()
   )

   # Run the workflow
   events = await workflow.run_stream("My package hasn't arrived yet")
   async for event in events:
       if isinstance(event, RequestInfoEvent):
           # Request user input
           user_response = input("You: ")
           await workflow.send_response(event.data.request_id, user_response)

Routering met meerdere lagen met .add_handoff():


   # Enable specialist-to-specialist handoffs with fluent API
   workflow = (
       HandoffBuilder(participants=[coordinator, replacement, delivery, billing])
       .set_coordinator(coordinator)
       .add_handoff(coordinator, [replacement, delivery, billing])  # Coordinator routes to all
       .add_handoff(replacement, [delivery, billing])  # Replacement delegates to delivery/billing
       .add_handoff(delivery, billing)  # Delivery escalates to billing
       .build()
   )

   # Flow: User → Coordinator → Replacement → Delivery → Back to User
   # (Replacement hands off to Delivery without returning to user)

Deelnemersfabrieken gebruiken voor statusisolatie:

Aangepaste beëindigingsvoorwaarde:


   # Terminate when user says goodbye or after 5 exchanges
   workflow = (
       HandoffBuilder(participants=[coordinator, refund, shipping])
       .set_coordinator(coordinator)
       .with_termination_condition(
           lambda conv: (
               sum(1 for msg in conv if msg.role.value == "user") >= 5
               or any("goodbye" in msg.text.lower() for msg in conv[-2:])
           )
       )
       .build()
   )

Controlepunten:


   from agent_framework import InMemoryCheckpointStorage

   storage = InMemoryCheckpointStorage()
   workflow = (
       HandoffBuilder(participants=[coordinator, refund, shipping])
       .set_coordinator(coordinator)
       .with_checkpointing(storage)
       .build()
   )

Initialiseer een HandoffBuilder voor het maken van conversationele handoff-werkstromen.

De opbouwfunctie begint in een niet-geconfigureerde status en vereist dat u het volgende aanroept:

  1. .participants([...]) - Agents registreren
  2. of .participant_factories({...}) - Agent/executor factory's registreren
  3. .set_coordinator(...) - Aanwijzen welke agent initiële gebruikersinvoer ontvangt
  4. .build() - De uiteindelijke werkstroom samenstellen

Met optionele configuratiemethoden kunt u contextbeheer, beëindigingslogica en persistentie aanpassen.

Opmerking

Deelnemers moeten stabiele namen/id's hebben omdat de werkstroom de

handoff tool argumenten voor deze id's. Namen van agents moeten overeenkomen

de tekenreeksen die door het handoff-hulpprogramma van de coördinator worden verzonden (bijvoorbeeld een hulpmiddel dat

outputs {"handoff_to": "billing"} vereist een agent met de naam facturering).

HandoffUserInputRequest

Bericht aanvragen dat wordt verzonden wanneer de werkstroom nieuwe gebruikersinvoer nodig heeft.

Opmerking: Het gespreksveld wordt opzettelijk uitgesloten van controlepuntserialisatie om duplicatie te voorkomen. Het gesprek blijft behouden in de status van de coördinator en wordt gereconstrueerd bij herstel. Zie probleem 2667.

HostedCodeInterpreterTool

Vertegenwoordigt een gehost hulpprogramma dat kan worden opgegeven aan een AI-service om deze in staat te stellen gegenereerde code uit te voeren.

Met dit hulpprogramma wordt geen code-interpretatie zelf geïmplementeerd. Het fungeert als een markering om een service te informeren dat het is toegestaan gegenereerde code uit te voeren als de service dit kan doen.

Initialiseer de HostedCodeInterpreterTool.

HostedFileContent

Vertegenwoordigt een gehoste bestandsinhoud.

Initialiseert een HostedFileContent-exemplaar.

HostedFileSearchTool

Vertegenwoordigt een hulpprogramma voor het zoeken van bestanden dat kan worden opgegeven aan een AI-service om het mogelijk te maken bestandszoekopdrachten uit te voeren.

Initialiseer een FileSearchTool.

HostedMCPSpecificApproval

Vertegenwoordigt de specifieke modus voor een gehost hulpprogramma.

Wanneer deze modus wordt gebruikt, moet de gebruiker opgeven welke hulpprogramma's altijd of nooit goedkeuring vereisen. Dit wordt weergegeven als een woordenlijst met twee optionele sleutels:

HostedMCPTool

Vertegenwoordigt een MCP-hulpprogramma dat wordt beheerd en uitgevoerd door de service.

Maak een gehost MCP-hulpprogramma.

HostedVectorStoreContent

Vertegenwoordigt een gehoste vectoropslaginhoud.

Initialiseert een HostedVectorStoreContent-exemplaar.

HostedWebSearchTool

Vertegenwoordigt een hulpprogramma voor webzoekopdrachten dat kan worden opgegeven aan een AI-service om webzoekopdrachten uit te voeren.

Initialiseer een HostedWebSearchTool.

InMemoryCheckpointStorage

In-memory controlepuntopslag voor testen en ontwikkelen.

Initialiseer de geheugenopslag.

InProcRunnerContext

In-process uitvoeringscontext voor lokale uitvoering en optionele controlepunten.

Initialiseer de uitvoeringscontext in het proces.

MCPStdioTool

MCP-hulpprogramma voor het maken van verbinding met op stdio gebaseerde MCP-servers.

Deze klasse maakt verbinding met MCP-servers die communiceren via standaardinvoer/uitvoer, die doorgaans worden gebruikt voor lokale processen.

Initialiseer het hulpprogramma MCP stdio.

Opmerking

De argumenten worden gebruikt om een StdioServerParameters-object te maken,

die vervolgens wordt gebruikt om een stdio-client te maken. Zie mcp.client.stdio.stdio_client

en mcp.client.stdio.stdio_server_parameters voor meer informatie.

MCPStreamableHTTPTool

MCP-hulpprogramma voor het maken van verbinding met OP HTTP gebaseerde MCP-servers.

Deze klasse maakt verbinding met MCP-servers die communiceren via streambare HTTP/SSE.

Initialiseer het HTTP-hulpprogramma dat kan worden gestreamd met MCP.

Opmerking

De argumenten worden gebruikt om een streambare HTTP-client te maken.

Zie mcp.client.streamable_http.streamablehttp_client voor meer informatie.

Eventuele extra argumenten die aan de constructor worden doorgegeven, worden doorgegeven aan de

streamable HTTP-clientconstructor.

MCPWebsocketTool

MCP-hulpprogramma voor het maken van verbinding met op WebSocket gebaseerde MCP-servers.

Deze klasse maakt verbinding met MCP-servers die communiceren via WebSocket.

Initialiseer het HULPPROGRAMMA MCP WebSocket.

Opmerking

De argumenten worden gebruikt om een WebSocket-client te maken.

Zie mcp.client.websocket.websocket_client voor meer informatie.

Eventuele extra argumenten die aan de constructor worden doorgegeven, worden doorgegeven aan de

WebSocket-clientconstructor.

MagenticBuilder

Fluent Builder voor het maken van Magentic One-indelingswerkstromen met meerdere agents.

Magentic One-werkstromen maken gebruik van een DOOR LLM aangedreven manager om meerdere agents te coördineren via dynamische taakplanning, voortgangstracking en adaptieve herplanning. De manager maakt plannen, selecteert agents, bewaakt de voortgang en bepaalt wanneer het opnieuw moet worden gepland of voltooid.

De opbouwfunctie biedt een fluent API voor het configureren van deelnemers, de manager, optionele beoordeling van plannen, controlepunten en callbacks voor gebeurtenissen.

Human-in-the-loop Ondersteuning: Magentic biedt gespecialiseerde HITL-mechanismen via:

  • .with_plan_review() - Plannen beoordelen en goedkeuren/herzien vóór uitvoering

  • .with_human_input_on_stall() - Tussenbeide wanneer werkstroom wordt vastgelopen

  • Goedkeuring van hulpprogramma's via FunctionApprovalRequestContent - Afzonderlijke hulpprogrammaaanroepen goedkeuren

Deze verzenden Magentic HumanInterventionRequest-gebeurtenissen die gestructureerde beslissingsopties bieden (GOEDKEUREN, HERZIEN, DOORGAAN, REPLAN, RICHTLIJNEN) die geschikt zijn voor de planningsindeling van Magentic.

Gebruik:


   from agent_framework import MagenticBuilder, StandardMagenticManager
   from azure.ai.projects.aio import AIProjectClient

   # Create manager with LLM client
   project_client = AIProjectClient.from_connection_string(...)
   chat_client = project_client.inference.get_chat_completions_client()

   # Build Magentic workflow with agents
   workflow = (
       MagenticBuilder()
       .participants(researcher=research_agent, writer=writing_agent, coder=coding_agent)
       .with_standard_manager(chat_client=chat_client, max_round_count=20, max_stall_count=3)
       .with_plan_review(enable=True)
       .with_checkpointing(checkpoint_storage)
       .build()
   )

   # Execute workflow
   async for message in workflow.run("Research and write article about AI agents"):
       print(message.text)

Met aangepaste manager:


   # Create custom manager subclass
   class MyCustomManager(MagenticManagerBase):
       async def plan(self, context: MagenticContext) -> ChatMessage:
           # Custom planning logic
           ...


   manager = MyCustomManager()
   workflow = MagenticBuilder().participants(agent1=agent1, agent2=agent2).with_standard_manager(manager).build()
MagenticContext

Context voor de Magentic Manager.

MagenticManagerBase

Basisklasse voor de Magentic One-manager.

ManagerDirectiveModel

Pydantic-model voor uitvoer van structured manager-instructie.

Maak een nieuw model door invoergegevens van trefwoordargumenten te parseren en valideren.

Genereert [ValidationError][pydantic_core. ValidationError] als de invoergegevens niet kunnen worden gevalideerd om een geldig model te vormen.

zelf is expliciet positioneel om zichzelf als veldnaam toe te staan.

ManagerSelectionRequest

Aanvraag verzonden naar manageragent voor volgende sprekerselectie.

Deze dataclass verpakt de volledige gespreksstatus en taakcontext voor de manageragent om een beslissing te nemen en een sprekerselectie te nemen.

ManagerSelectionResponse

Antwoord van manageragent met beslissing over sprekerselectie.

De manageragent moet deze structuur (of compatibele dict/JSON) produceren om de beslissing terug te geven aan de orchestrator.

Maak een nieuw model door invoergegevens van trefwoordargumenten te parseren en valideren.

Genereert [ValidationError][pydantic_core. ValidationError] als de invoergegevens niet kunnen worden gevalideerd om een geldig model te vormen.

zelf is expliciet positioneel om zichzelf als veldnaam toe te staan.

Message

Een klasse die een bericht in de werkstroom vertegenwoordigt.

OrchestrationState

Geïntegreerde statuscontainer voor orchestrator-controlepunten.

Deze gegevensklasse standaardiseert controlepuntserialisatie in alle drie de groepschatpatronen, terwijl patroonspecifieke extensies via metagegevens worden toegestaan.

Algemene kenmerken hebben betrekking op gedeelde indelingsproblemen (taken, gesprekken, round tracking). Patroonspecifieke status wordt gebruikt in de metagegevensdict.

RequestInfoEvent

Gebeurtenis geactiveerd wanneer een werkstroomexecutor externe gegevens aanvraagt.

Initialiseer de gebeurtenis aanvraaggegevens.

RequestInfoInterceptor

Interne uitvoerfunctie die de werkstroom voor menselijke invoer onderbreekt voordat de agent wordt uitgevoerd.

Deze uitvoerder wordt door opbouwfuncties ingevoegd in de werkstroomgrafiek wanneer .with_request_info() wordt aangeroepen. AgentExecutorRequest-berichten worden onderschept voordat de agent wordt uitgevoerd en onderbreekt de werkstroom via ctx.request_info() met een AgentInputRequest.

Wanneer een antwoord wordt ontvangen, injecteert de antwoordhandler de invoer als een gebruikersbericht in het gesprek en stuurt de aanvraag door naar de agent.

Met de optionele agent_filter parameter kunt u beperken welke agents de onderbreking activeren. Als de id van de doelagent zich niet in de filterset bevindt, wordt de aanvraag doorgestuurd zonder te onderbreken.

Initialiseer de onderscheppingsexecutor voor aanvraaggegevens.

Role

Beschrijft het beoogde doel van een bericht binnen een chatinteractie.

Eigenschappen: SYSTEEM: De rol waarmee het gedrag van het AI-systeem wordt geïnstrueerd of ingesteld. GEBRUIKER: De rol die gebruikersinvoer biedt voor chatinteracties. ASSISTENT: De rol die antwoorden biedt op door het systeem geïnstrueerde, door de gebruiker gevraagd invoer. TOOL: De rol die aanvullende informatie en verwijzingen levert als reactie op aanvragen voor het gebruik van hulpprogramma's.

Initialiseer de rol met een waarde.

Runner

Een klasse voor het uitvoeren van een werkstroom in Pregel-supersteps.

Initialiseer de runner met randen, gedeelde status en context.

RunnerContext

Protocol voor de uitvoeringscontext die door de runner wordt gebruikt.

Eén context die ondersteuning biedt voor berichten, gebeurtenissen en optionele controlepunten. Als controlepuntopslag niet is geconfigureerd, kunnen controlepuntmethoden worden geactiveerd.

SequentialBuilder

Opbouwfunctie op hoog niveau voor sequentiële agent-/uitvoerwerkstromen met gedeelde context.

  • deelnemers([...]) accepteert een lijst met AgentProtocol -exemplaren (aanbevolen) of Executor-exemplaren

  • register_participants([...]) accepteert een lijst met factory's voor AgentProtocol (aanbevolen)

    of Uitvoerders factory's

  • Uitvoerders moeten een handler definiëren die lijst[ChatMessage] verbruikt en een lijst verzendt[ChatMessage]

  • De deelnemers aan de werkstroom worden op volgorde doorgegeven, waarbij een lijst[ChatMessage] in de keten wordt doorgegeven

  • Agenten voegen hun assistentberichten toe aan het gesprek

  • Aangepaste uitvoerders kunnen een lijst transformeren/samenvatten en retourneren[ChatMessage]

  • De uiteindelijke uitvoer is het gesprek dat wordt geproduceerd door de laatste deelnemer

Gebruik:


   from agent_framework import SequentialBuilder

   # With agent instances
   workflow = SequentialBuilder().participants([agent1, agent2, summarizer_exec]).build()

   # With agent factories
   workflow = (
       SequentialBuilder().register_participants([create_agent1, create_agent2, create_summarizer_exec]).build()
   )

   # Enable checkpoint persistence
   workflow = SequentialBuilder().participants([agent1, agent2]).with_checkpointing(storage).build()

   # Enable request info for mid-workflow feedback (pauses before each agent)
   workflow = SequentialBuilder().participants([agent1, agent2]).with_request_info().build()

   # Enable request info only for specific agents
   workflow = (
       SequentialBuilder()
       .participants([agent1, agent2, agent3])
       .with_request_info(agents=[agent2])  # Only pause before agent2
       .build()
   )
SharedState

Een klasse voor het beheren van de gedeelde status in een werkstroom.

SharedState biedt thread-veilige toegang tot werkstroomstatusgegevens die tijdens de uitvoering van de werkstroom moeten worden gedeeld tussen uitvoerders.

Gereserveerde sleutels: de volgende sleutels zijn gereserveerd voor intern frameworkgebruik en mogen niet worden gewijzigd door gebruikerscode:

  • _executor_state: de uitvoerdersstatus opslaan voor controlepunten (beheerd door Runner)

Waarschuwing

Gebruik geen sleutels die beginnen met onderstrepingsteken (_), omdat ze mogelijk zijn gereserveerd voor

interne frameworkbewerkingen.

Initialiseer de gedeelde status.

SingleEdgeGroup

Gemakswikkelaar voor een eenzame rand, waardoor de groeps-API uniform blijft.

Maak een een-op-een-edge-groep tussen twee uitvoerders.

StandardMagenticManager

Standard Magentic Manager die echte LLM-oproepen uitvoert via een ChatAgent.

De manager bouwt prompts die de oorspronkelijke Magentic One-indeling spiegelen:

  • Feiten verzamelen
  • Plan maken
  • Voortgangsboek in JSON
  • Feiten bijwerken en bijwerken plannen bij opnieuw instellen
  • Uiteindelijke antwoordsynthese

Initialiseer de Standard Magentic Manager.

SubWorkflowRequestMessage

Bericht dat vanuit een subwerkstroom naar een uitvoerder in de bovenliggende werkstroom wordt verzonden om informatie aan te vragen.

Dit bericht verpakt een RequestInfoEvent die wordt verzonden door de uitvoerder in de subwerkstroom.

SubWorkflowResponseMessage

Bericht dat is verzonden van een bovenliggende werkstroom naar een subwerkstroom via WorkflowExecutor om gevraagde informatie te verstrekken.

Dit bericht verpakt de antwoordgegevens samen met de oorspronkelijke RequestInfoEvent die is verzonden door de uitvoerfunctie van de subwerkstroom.

SuperStepCompletedEvent

Gebeurtenis geactiveerd wanneer een superstep eindigt.

Initialiseer de superstep-gebeurtenis.

SuperStepStartedEvent

Gebeurtenis geactiveerd wanneer een superstep wordt gestart.

Initialiseer de superstep-gebeurtenis.

SwitchCaseEdgeGroup

Fan-out variant die een traditionele switch/case controlestroom nabootst.

In elk geval wordt de nettolading van het bericht gecontroleerd en wordt bepaald of het bericht moet worden verwerkt. Precies één geval of de standaardvertakking retourneert een doel tijdens runtime, waarbij semantiek voor één verzending behouden blijft.

Configureer een switch-/caserouteringsstructuur voor één bronexecutor.

SwitchCaseEdgeGroupCase

Persistente beschrijving van één voorwaardelijke vertakking in een switch-case.

In tegenstelling tot het runtime-caseobject slaat deze serialisable variant alleen de doel-id en een beschrijvende naam voor het predicaat op. Wanneer de onderliggende aanroepbare functie niet beschikbaar is tijdens de deserialisatie, vervangen we een tijdelijke aanduiding voor een proxy die luid mislukt, zodat de ontbrekende afhankelijkheid onmiddellijk zichtbaar is.

Noteer de routeringsmetagegevens voor een voorwaardelijke casebranch.

SwitchCaseEdgeGroupDefault

Persistente descriptor voor de terugvalvertakking van een switch-casegroep.

De standaardvertakking is gegarandeerd aanwezig en wordt aangeroepen wanneer elk ander predicaat niet overeenkomt met de nettolading.

Wijs de standaardbranch naar de opgegeven uitvoerders-id.

TextContent

Vertegenwoordigt tekstinhoud in een chat.

Initialiseert een TextContent-exemplaar.

TextReasoningContent

Vertegenwoordigt tekstredeneringsinhoud in een chat.

Opmerkingen: Deze klasse en TextContent zijn oppervlakkig vergelijkbaar, maar uniek.

Initialiseert een TextReasoningContent-exemplaar.

TextSpanRegion

Vertegenwoordigt een gebied met tekst dat is geannoteerd.

Initialiseer TextSpanRegion.

ToolMode

Definieert of en hoe hulpprogramma's worden gebruikt in een chataanvraag.

Initialiseer ToolMode.

ToolProtocol

Vertegenwoordigt een algemeen hulpprogramma.

Dit protocol definieert de interface die alle hulpprogramma's moeten implementeren om compatibel te zijn met het agentframework. Het wordt geïmplementeerd door verschillende hulpprogrammaklassen, zoals HostedMCPTool, HostedWebSearchTool en AIFunction's. Een AIFunction wordt meestal gemaakt door de ai_function decorator.

Omdat elke connector verschillende hulpprogramma's moet parseren, kunnen gebruikers een dict doorgeven om een servicespecifiek hulpprogramma op te geven wanneer er geen abstractie beschikbaar is.

TypeCompatibilityError

Er is een uitzondering opgetreden bij het detecteren van incompatibiliteit tussen verbonden uitvoerders.

UriContent

Vertegenwoordigt een URI-inhoud.

Belangrijk

Dit wordt gebruikt voor inhoud die wordt geïdentificeerd door een URI, zoals een afbeelding of een bestand.

Voor (binaire) gegevens-URI's gebruikt u In plaats daarvan DataContent.

Initialiseert een UriContent-exemplaar.

Opmerkingen: Dit wordt gebruikt voor inhoud die wordt geïdentificeerd door een URI, zoals een afbeelding of een bestand. Voor (binaire) gegevens-URI's gebruikt u In plaats daarvan DataContent .

UsageContent

Vertegenwoordigt gebruiksgegevens die zijn gekoppeld aan een chataanvraag en -antwoord.

Initialiseert een UsageContent-exemplaar.

UsageDetails

Biedt gebruiksgegevens over een aanvraag/antwoord.

Initialiseert het UsageDetails-exemplaar.

Workflow

Een engine voor uitvoering op basis van grafieken waarmee verbonden uitvoerders worden ingedeeld.

Overzicht

Een werkstroom voert een gerichte grafiek uit van uitvoerders die zijn verbonden via edge-groepen met behulp van een Pregel-achtig model, uitgevoerd in supersteps totdat de grafiek inactief wordt. Werkstromen worden gemaakt met behulp van de klasse WorkflowBuilder. Instantieer deze klasse niet rechtstreeks.

Uitvoeringsmodel

Uitvoerders worden uitgevoerd in gesynchroniseerde supersteps waarbij elke uitvoerder:

  • Wordt aangeroepen wanneer berichten van verbonden edge-groepen worden ontvangen
  • Kan berichten verzenden naar downstreamexecutors via ctx.send_message()
  • Kan uitvoer op werkstroomniveau opleveren via ctx.yield_output()
  • Kan aangepaste gebeurtenissen verzenden via ctx.add_event()

Berichten tussen uitvoerders worden aan het einde van elke superstap bezorgd en zijn niet zichtbaar in de gebeurtenisstroom. Alleen gebeurtenissen op werkstroomniveau (uitvoer, aangepaste gebeurtenissen) en statusgebeurtenissen zijn zichtbaar voor bellers.

Invoer-/uitvoertypen

Werkstroomtypen worden tijdens runtime gedetecteerd door het volgende te controleren:

  • Invoertypen: Vanaf de invoertypen van het startexecutor
  • Uitvoertypen: Samenvoeging van de werkstroomuitvoertypen van alle uitvoerders: Ga naar deze typen via de eigenschappen input_types en output_types.

Uitvoeringsmethoden

De werkstroom biedt twee primaire uitvoerings-API's, die elk meerdere scenario's ondersteunen:

  • run(): Uitvoeren tot voltooiing, retourneert WorkflowRunResult met alle gebeurtenissen

  • run_stream(): retourneert asynchrone generator die gebeurtenissen oplevert zodra deze zich voordoen

Beide methoden ondersteunen:

  • Initiële werkstroomuitvoeringen: berichtparameter opgeven
  • Controlepuntherstel: geef checkpoint_id op (en eventueel checkpoint_storage)
  • HIL-vervolg: Geef antwoorden op om door te gaan na RequestInfoExecutor-aanvragen
  • Runtime-controlepunten: geef checkpoint_storage op om controlepunten voor deze uitvoering in te schakelen/overschrijven

Toestandsbeheer

Werkstroomexemplaren bevatten statussen en statussen blijven behouden voor aanroepen die moeten worden uitgevoerd en run_stream. Als u meerdere onafhankelijke uitvoeringen wilt uitvoeren, maakt u afzonderlijke werkstroomexemplaren via WorkflowBuilder.

Aanvragen voor externe invoer

Uitvoerders binnen een werkstroom kunnen externe invoer aanvragen met behulp van ctx.request_info():

  1. Executor roept ctx.request_info() aan om invoer aan te vragen
  2. Executor implementeert response_handler() om het antwoord te verwerken
  3. Aanvragen worden verzonden als RequestInfoEvent-exemplaren in de gebeurtenisstroom
  4. Werkstroom voert IDLE_WITH_PENDING_REQUESTS status in
  5. De aanroeper verwerkt aanvragen en levert antwoorden via de send_responses - of send_responses_streaming methoden
  6. Antwoorden worden doorgestuurd naar de aanvragende uitvoerders en antwoordhandlers worden aangeroepen

Controlepunten maken

Controlepunten kunnen tijdens de build of runtime worden geconfigureerd:

Build-time (via WorkflowBuilder): workflow = WorkflowBuilder().with_checkpointing(storage).build()

Runtime (via run/run_stream parameters): result = await workflow.run(message, checkpoint_storage=runtime_storage)

Wanneer deze optie is ingeschakeld, worden controlepunten gemaakt aan het einde van elke superstep, waarbij het volgende wordt vastgelegd:

  • Uitvoerdersstatussen
  • Berichten die onderweg zijn
  • Gedeelde statuswerkstromen kunnen worden onderbroken en hervat tijdens het opnieuw opstarten van processen met behulp van controlepuntopslag.

Compositie

Werkstromen kunnen worden genest met behulp van WorkflowExecutor, waarmee een onderliggende werkstroom wordt verpakt als een uitvoerder. De invoer-/uitvoertypen van de geneste werkstroom worden onderdeel van de typen workflowExecutor. Wanneer de workflowexecutor wordt aangeroepen, wordt de geneste werkstroom uitgevoerd om de uitvoer te voltooien en te verwerkt.

Initialiseer de werkstroom met een lijst met randen.

WorkflowAgent

Een agentsubklasse die een werkstroom verpakt en beschikbaar maakt als agent.

Initialiseer de WorkflowAgent.

WorkflowBuilder

Een opbouwfunctieklasse voor het maken van werkstromen.

Deze klasse biedt een fluent API voor het definiëren van werkstroomgrafieken door uitvoerders te verbinden met randen en uitvoeringsparameters te configureren. Aanroep build om een onveranderbaar Workflow exemplaar te maken.

Initialiseer de WorkflowBuilder met een lege lijst met randen en geen startexecutor.

WorkflowCheckpoint

Vertegenwoordigt een volledig controlepunt van de werkstroomstatus.

Controlepunten leggen de volledige uitvoeringsstatus van een werkstroom vast op een specifiek punt, waardoor werkstromen kunnen worden onderbroken en hervat.

Opmerking

De shared_state dict kan gereserveerde sleutels bevatten die worden beheerd door het framework.

Zie de sharedState-klassedocumentatie voor meer informatie over gereserveerde sleutels.

WorkflowCheckpointSummary

Leesbare samenvatting van een controlepunt voor een werkstroom.

WorkflowContext

Uitvoeringscontext waarmee uitvoerders kunnen communiceren met werkstromen en andere uitvoerders.

Overzicht

WorkflowContext biedt een gecontroleerde interface voor uitvoerders om berichten te verzenden, uitvoer te leveren, de status te beheren en te communiceren met het bredere werkstroomecosysteem. Het dwingt typeveiligheid af via algemene parameters en voorkomt directe toegang tot interne runtime-onderdelen.

Typeparameters

De context wordt geparameteriseerd om typeveiligheid af te dwingen voor verschillende bewerkingen:

WorkflowContext (geen parameters)

Voor uitvoerders die alleen bijwerkingen uitvoeren zonder berichten te verzenden of uitvoer te genereren:


   async def log_handler(message: str, ctx: WorkflowContext) -> None:
       print(f"Received: {message}")  # Only side effects

WorkflowContext[T_Out]

Hiermee kunt u berichten van het type T_Out verzenden naar andere uitvoerders:


   async def processor(message: str, ctx: WorkflowContext[int]) -> None:
       result = len(message)
       await ctx.send_message(result)  # Send int to downstream executors

WorkflowContext[T_Out, T_W_Out]

Hiermee kunt u zowel het verzenden van berichten (T_Out) als het opleveren van werkstroomuitvoer (T_W_Out):


   async def dual_output(message: str, ctx: WorkflowContext[int, str]) -> None:
       await ctx.send_message(42)  # Send int message
       await ctx.yield_output("complete")  # Yield str workflow output

Samenvoegtypen

Er kunnen meerdere typen worden opgegeven met behulp van samenvoegings notatie:


   async def flexible(message: str, ctx: WorkflowContext[int | str, bool | dict]) -> None:
       await ctx.send_message("text")  # or send 42
       await ctx.yield_output(True)  # or yield {"status": "done"}

Initialiseer de uitvoerderscontext met de opgegeven werkstroomcontext.

WorkflowErrorDetails

Gestructureerde foutinformatie om weer te geven in fout gebeurtenissen/resultaten.

WorkflowEvent

Basisklasse voor werkstroomgebeurtenissen.

Initialiseer de werkstroom gebeurtenis met optionele gegevens.

WorkflowExecutor

Een uitvoerder die een werkstroom verpakt om hiërarchische werkstroomsamenstelling mogelijk te maken.

Overzicht

WorkflowExecutor zorgt ervoor dat een werkstroom zich gedraagt als één uitvoerder binnen een bovenliggende werkstroom, waardoor geneste werkstroomarchitecturen mogelijk zijn. Het verwerkt de volledige levenscyclus van de uitvoering van subwerkstromen, waaronder gebeurtenisverwerking, doorsturen van uitvoer en coördinatie van aanvragen/antwoorden tussen bovenliggende en onderliggende werkstromen.

Uitvoeringsmodel

Wanneer deze wordt aangeroepen, workflowexecutor:

  1. Start de verpakte werkstroom met het invoerbericht
  2. Voert de subwerkstroom uit om te worden voltooid of totdat er externe invoer nodig is
  3. Verwerkt de volledige gebeurtenisstroom van de subwerkstroom na uitvoering
  4. Stuurt uitvoer door naar de bovenliggende werkstroom als berichten
  5. Verwerkt externe aanvragen door ze door te routeren naar de bovenliggende werkstroom
  6. Verzamelt antwoorden en hervat de uitvoering van de subwerkstroom

Gebeurtenisstroomverwerking

WorkflowExecutor verwerkt gebeurtenissen nadat de subwerkstroom is voltooid:

Doorsturen van uitvoer

Alle uitvoer van de subwerkstroom wordt automatisch doorgestuurd naar het bovenliggende item:

Wanneer allow_direct_output onwaar is (standaard):


   # An executor in the sub-workflow yields outputs
   await ctx.yield_output("sub-workflow result")

   # WorkflowExecutor forwards to parent via ctx.send_message()
   # Parent receives the output as a regular message

Wanneer allow_direct_output waar is:

Coördinatie van aanvragen/antwoorden

Wanneer subwerkstromen externe informatie nodig hebben:


   # An executor in the sub-workflow makes request
   request = MyDataRequest(query="user info")

   # WorkflowExecutor captures RequestInfoEvent and wraps it in a SubWorkflowRequestMessage
   # then send it to the receiving executor in parent workflow. The executor in parent workflow
   # can handle the request locally or forward it to an external source.
   # The WorkflowExecutor tracks the pending request, and implements a response handler.
   # When the response is received, it executes the response handler to accumulate responses
   # and resume the sub-workflow when all expected responses are received.
   # The response handler expects a SubWorkflowResponseMessage wrapping the response data.

Toestandsbeheer

WorkflowExecutor onderhoudt de uitvoeringsstatus in aanvraag-/antwoordcycli:

  • Houdt aanvragen in behandeling bij door request_id
  • Verzamelt antwoorden totdat alle verwachte antwoorden worden ontvangen
  • Uitvoering van subwerkstroom hervatten met volledige antwoordbatch
  • Verwerkt gelijktijdige uitvoeringen en meerdere aanvragen die in behandeling zijn

Type systeemintegratie

WorkflowExecutor neemt de typehandtekening over van de verpakte werkstroom:

Invoertypen

Komt overeen met de invoertypen voor het begin van de werkstroom:


   # If sub-workflow accepts str, WorkflowExecutor accepts str
   workflow_executor = WorkflowExecutor(my_workflow, id="wrapper")
   assert workflow_executor.input_types == my_workflow.input_types

Uitvoertypen

Combineert uitvoer van subwerkstromen met aanvraagcoördinatietypen:


   # Includes all sub-workflow output types
   # Plus SubWorkflowRequestMessage if sub-workflow can make requests
   output_types = workflow.output_types + [SubWorkflowRequestMessage]  # if applicable

Foutafhandeling

WorkflowExecutor geeft subwerkstroomfouten door:

  • Captures WorkflowFailedEvent from sub-workflow
  • Converteert naar WorkflowErrorEvent in bovenliggende context
  • Biedt gedetailleerde foutinformatie, inclusief subwerkstroom-id

Ondersteuning voor gelijktijdige uitvoering

WorkflowExecutor biedt volledige ondersteuning voor meerdere gelijktijdige uitvoeringen van subwerkstromen:

Per-Execution statusisolatie

Elke aanroep van een subwerkstroom maakt een geïsoleerde ExecutionContext:


   # Multiple concurrent invocations are supported
   workflow_executor = WorkflowExecutor(my_workflow, id="concurrent_executor")

   # Each invocation gets its own execution context
   # Execution 1: processes input_1 independently
   # Execution 2: processes input_2 independently
   # No state interference between executions

Coördinatie van aanvragen/antwoorden

Antwoorden worden correct doorgestuurd naar de oorspronkelijke uitvoering:

  • Elke uitvoering houdt zijn eigen aanvragen bij die in behandeling zijn en verwachte antwoorden
  • Toewijzing van aanvraag-naar-uitvoering zorgt ervoor dat antwoorden de juiste subwerkstroom bereiken
  • Reactieaccumulatie wordt geïsoleerd per uitvoering
  • Automatisch opschonen wanneer de uitvoering is voltooid

Geheugenbeheer

  • Onbeperkte gelijktijdige uitvoeringen die worden ondersteund
  • Elke uitvoering heeft een unieke UUID-identificatie
  • Opschonen van voltooide uitvoeringscontexten
  • Thread-veilig statusbeheer voor gelijktijdige toegang

Belangrijke overwegingen

Gedeelde werkstroomexemplaren: alle gelijktijdige uitvoeringen gebruiken hetzelfde onderliggende werkstroomexemplaren. Zorg ervoor dat de verpakte werkstroom en de uitvoerders staatloos zijn voor een juiste isolatie.


   # Avoid: Stateful executor with instance variables
   class StatefulExecutor(Executor):
       def __init__(self):
           super().__init__(id="stateful")
           self.data = []  # This will be shared across concurrent executions!

Integratie met bovenliggende werkstromen

Bovenliggende werkstromen kunnen subwerkstroomaanvragen onderscheppen:

Opmerkingen bij de implementatie

  • Subwerkstromen worden uitgevoerd tot voltooiing voordat de resultaten worden verwerkt
  • Gebeurtenisverwerking is atomisch: alle uitvoer wordt doorgestuurd vóór aanvragen
  • Accumulatie van reacties zorgt ervoor dat subwerkstromen volledige antwoordbatches ontvangen
  • De uitvoeringsstatus wordt gehandhaafd voor de juiste hervatting na externe aanvragen
  • Gelijktijdige uitvoeringen zijn volledig geïsoleerd en verstoren elkaar niet

Initialiseer de WorkflowExecutor.

WorkflowFailedEvent

Ingebouwde levenscyclusgebeurtenis die wordt gegenereerd wanneer een werkstroomuitvoering wordt beëindigd met een fout.

WorkflowOutputEvent

Gebeurtenis geactiveerd wanneer een werkstroomexecutor uitvoer oplevert.

Initialiseer de gebeurtenis voor de uitvoer van de werkstroom.

WorkflowRunResult

Container voor gebeurtenissen die zijn gegenereerd tijdens het uitvoeren van werkstromen die niet worden gestreamd.

Overzicht

Vertegenwoordigt de volledige uitvoeringsresultaten van een werkstroomuitvoering, met alle gebeurtenissen die zijn gegenereerd vanaf het begin tot de niet-actieve status. Werkstromen produceren incrementeel uitvoer via ctx.yield_output() aanroepen tijdens de uitvoering.

Gebeurtenisstructuur

Onderhoudt de scheiding tussen gegevensvlak- en besturingsvlakgebeurtenissen:

  • Gegevensvlakgebeurtenissen: Uitvoerders aanroepen, voltooiingen, uitvoer en aanvragen (in de hoofdlijst)
  • Besturingsvlakgebeurtenissen: Tijdlijn status toegankelijk via methode status_timeline()

Belangrijke methoden

  • get_outputs(): Alle werkstroomuitvoer uit de uitvoering extraheren
  • get_request_info_events(): Externe invoeraanvragen ophalen die tijdens de uitvoering zijn gedaan
  • get_final_state(): De uiteindelijke werkstroomstatus (IDLE, IDLE_WITH_PENDING_REQUESTS, enzovoort) ophalen
  • status_timeline(): Toegang tot de volledige status van de gebeurtenisgeschiedenis
WorkflowStartedEvent

Ingebouwde levenscyclusgebeurtenis die wordt gegenereerd wanneer een werkstroomuitvoering begint.

Initialiseer de werkstroom gebeurtenis met optionele gegevens.

WorkflowStatusEvent

Ingebouwde levenscyclusgebeurtenis die wordt verzonden voor overgangen van de status van de werkstroom.

Initialiseer de gebeurtenis van de werkstroomstatus met een nieuwe status en optionele gegevens.

WorkflowValidationError

Basisonderzondering voor werkstroomvalidatiefouten.

WorkflowViz

Een klasse voor het visualiseren van werkstromen met graphviz en Mermaid.

Initialiseer de WorkflowViz met een werkstroom.

Enums

MagenticHumanInterventionDecision

Beslissingsopties voor reacties op menselijke interventie.

MagenticHumanInterventionKind

Het soort menselijke tussenkomst dat wordt aangevraagd.

ValidationTypeEnum

Opsomming van werkstroomvalidatietypen.

WorkflowEventSource

Hiermee wordt aangegeven of een werkstroom gebeurtenis afkomstig is van het framework of een uitvoerder.

Gebruik FRAMEWORK voor gebeurtenissen die worden verzonden door ingebouwde indelingspaden, zelfs wanneer de code die ze genereert, zich in runner-gerelateerde modules bevindt, en EXECUTOR voor gebeurtenissen die worden weergegeven door door ontwikkelaars geleverde executor-implementaties.

WorkflowRunState

Uitvoeringsstatus van een werkstroom.

Semantiek:

  • GESTART: De uitvoering is gestart en de werkstroomcontext is gemaakt. Dit is een initiële status voordat zinvol werk wordt uitgevoerd. In deze codebase verzenden we een toegewezen WorkflowStartedEvent voor telemetrie en sturen we de status doorgaans rechtstreeks naar IN_PROGRESS. Consumenten kunnen nog steeds afhankelijk zijn van STARTED voor statusmachines waarvoor een expliciete prewerkfase nodig is.

  • IN_PROGRESS: de werkstroom wordt actief uitgevoerd (bijvoorbeeld het eerste bericht is bezorgd bij de startuitvoering of er wordt een superstap uitgevoerd). Deze status wordt aan het begin van een uitvoering verzonden en kan worden gevolgd door andere statussen wanneer de uitvoering vordert.

  • IN_PROGRESS_PENDING_REQUESTS: actieve uitvoering terwijl een of meer aanvraag-voor-informatiebewerkingen uitstekend zijn. Er kunnen nog steeds nieuwe werkzaamheden worden gepland terwijl aanvragen in de vlucht zijn.

  • Inactiviteit: De werkstroom is stil, zonder openstaande aanvragen en geen werk meer te doen. Dit is de normale terminalstatus voor werkstromen die klaar zijn met uitvoeren, mogelijk uitvoer hebben geproduceerd.

  • IDLE_WITH_PENDING_REQUESTS: De werkstroom wordt onderbroken op externe invoer (bijvoorbeeld een RequestInfoEvent verzonden). Dit is een niet-terminalstatus; de werkstroom kan worden hervat wanneer antwoorden worden opgegeven.

  • MISLUKT: Terminalstatus die aangeeft dat er een fout is opgetreden. Vergezeld van een WorkflowFailedEvent met gestructureerde foutdetails.

  • GEANNULEERD: Terminalstatus die aangeeft dat de uitvoering is geannuleerd door een beller of orchestrator. Momenteel niet verzonden door standaard runner-paden, maar opgenomen voor integrators/orchestrators die ondersteuning bieden voor annulering.

Functies

agent_middleware

Decorator om een functie als agent-middleware te markeren.

Deze decorator identificeert expliciet een functie als agent-middleware, die AgentRunContext-objecten verwerkt.

agent_middleware(func: Callable[[AgentRunContext, Callable[[AgentRunContext], Awaitable[None]]], Awaitable[None]]) -> Callable[[AgentRunContext, Callable[[AgentRunContext], Awaitable[None]]], Awaitable[None]]

Parameters

Name Description
func
Vereist

De middlewarefunctie die als agent-middleware moet worden gemarkeerd.

Retouren

Type Description

Dezelfde functie met agent middlewaremarkering.

Voorbeelden


   from agent_framework import agent_middleware, AgentRunContext, ChatAgent


   @agent_middleware
   async def logging_middleware(context: AgentRunContext, next):
       print(f"Before: {context.agent.name}")
       await next(context)
       print(f"After: {context.result}")


   # Use with an agent
   agent = ChatAgent(chat_client=client, name="assistant", middleware=logging_middleware)

ai_function

Een functie versieren om deze om te zetten in een AIFunction die kan worden doorgegeven aan modellen en automatisch kan worden uitgevoerd.

Deze decorator maakt een Pydantic-model op basis van de handtekening van de functie, die wordt gebruikt om de argumenten te valideren die aan de functie worden doorgegeven en om het JSON-schema voor de parameters van de functie te genereren.

Als u beschrijvingen wilt toevoegen aan parameters, gebruikt u het Annotated type van typing met een tekenreeksbeschrijving als het tweede argument. U kunt de klasse van Field Pydantic ook gebruiken voor geavanceerdere configuratie.

Opmerking

Wanneer approval_mode is ingesteld op 'always_require', wordt de functie niet uitgevoerd

totdat expliciete goedkeuring wordt gegeven, is dit alleen van toepassing op de stroom voor automatisch aanroepen.

Het is ook belangrijk om te weten dat als het model meerdere functie-aanroepen retourneert, sommige die goedkeuring vereisen

en anderen die dat niet doen, zal het goedkeuring vragen voor allemaal.

ai_function(func: Callable[[...], ReturnT | Awaitable[ReturnT]] | None = None, *, name: str | None = None, description: str | None = None, approval_mode: Literal['always_require', 'never_require'] | None = None, max_invocations: int | None = None, max_invocation_exceptions: int | None = None, additional_properties: dict[str, Any] | None = None) -> AIFunction[Any, ReturnT] | Callable[[Callable[[...], ReturnT | Awaitable[ReturnT]]], AIFunction[Any, ReturnT]]

Parameters

Name Description
func
Callable[[...], <xref:agent_framework._tools.ReturnT> | Awaitable[<xref:agent_framework._tools.ReturnT>]] | None

De functie om te versieren.

Default value: None
name
Vereist
str | None
description
Vereist
str | None
approval_mode
Vereist
Literal['always_require', 'never_require'] | None
max_invocations
Vereist
int | None
max_invocation_exceptions
Vereist
int | None
additional_properties
Vereist

Keyword-Only Parameters

Name Description
name

De naam van de functie. Indien niet opgegeven, wordt het kenmerk van __name__ de functie gebruikt.

Default value: None
description

Een beschrijving van de functie. Indien niet opgegeven, wordt de docstring van de functie gebruikt.

Default value: None
approval_mode

Of goedkeuring is vereist om dit hulpprogramma uit te voeren. Standaard is dat goedkeuring niet nodig is.

Default value: None
max_invocations

Het maximum aantal keren dat deze functie kan worden aangeroepen. Als geen, is er geen limiet, moet ten minste 1 zijn.

Default value: None
max_invocation_exceptions

Het maximum aantal uitzonderingen dat is toegestaan tijdens aanroepen. Als geen, is er geen limiet, moet ten minste 1 zijn.

Default value: None
additional_properties

Aanvullende eigenschappen die moeten worden ingesteld voor de functie.

Default value: None

Retouren

Type Description
AIFunction[Any, <xref:agent_framework._tools.ReturnT>] | Callable[[Callable[[…], <xref:agent_framework._tools.ReturnT> | Awaitable[<xref:agent_framework._tools.ReturnT>]]], AIFunction[Any, <xref:agent_framework._tools.ReturnT>]]

Voorbeelden


   from agent_framework import ai_function
   from typing import Annotated


   @ai_function
   def ai_function_example(
       arg1: Annotated[str, "The first argument"],
       arg2: Annotated[int, "The second argument"],
   ) -> str:
       # An example function that takes two arguments and returns a string.
       return f"arg1: {arg1}, arg2: {arg2}"


   # the same function but with approval required to run
   @ai_function(approval_mode="always_require")
   def ai_function_example(
       arg1: Annotated[str, "The first argument"],
       arg2: Annotated[int, "The second argument"],
   ) -> str:
       # An example function that takes two arguments and returns a string.
       return f"arg1: {arg1}, arg2: {arg2}"


   # With custom name and description
   @ai_function(name="custom_weather", description="Custom weather function")
   def another_weather_func(location: str) -> str:
       return f"Weather in {location}"


   # Async functions are also supported
   @ai_function
   async def async_get_weather(location: str) -> str:
       '''Get weather asynchronously.'''
       # Simulate async operation
       return f"Weather in {location}"

chat_middleware

Decorator om een functie als chat-middleware te markeren.

Deze decorator identificeert expliciet een functie als chat-middleware, die ChatContext-objecten verwerkt.

chat_middleware(func: Callable[[ChatContext, Callable[[ChatContext], Awaitable[None]]], Awaitable[None]]) -> Callable[[ChatContext, Callable[[ChatContext], Awaitable[None]]], Awaitable[None]]

Parameters

Name Description
func
Vereist

De middlewarefunctie die als chat-middleware wordt gemarkeerd.

Retouren

Type Description

Dezelfde functie met de markering chat-middleware.

Voorbeelden


   from agent_framework import chat_middleware, ChatContext, ChatAgent


   @chat_middleware
   async def logging_middleware(context: ChatContext, next):
       print(f"Messages: {len(context.messages)}")
       await next(context)
       print(f"Response: {context.result}")


   # Use with an agent
   agent = ChatAgent(chat_client=client, name="assistant", middleware=logging_middleware)

create_edge_runner

Fabrieksfunctie voor het maken van de juiste edge-runner voor een randgroep.

create_edge_runner(edge_group: EdgeGroup, executors: dict[str, Executor]) -> EdgeRunner

Parameters

Name Description
edge_group
Vereist
<xref:agent_framework._workflows._edge.EdgeGroup>

De randgroep waarvoor u een runner wilt maken.

executors
Vereist

Toewijzing van uitvoerders-id's aan uitvoerdersexemplaren.

Retouren

Type Description
<xref:agent_framework._workflows._edge_runner.EdgeRunner>

Het juiste EdgeRunner-exemplaar.

executor

Decorator die een zelfstandige functie converteert naar een FunctionExecutor-exemplaar.

De @executor decorator is alleen ontworpen voor zelfstandige functies op moduleniveau. Gebruik voor op klassen gebaseerde uitvoerders de basisklasse Executor met @handler op exemplaarmethoden.

Ondersteunt zowel synchrone als asynchrone functies. Synchrone functies worden uitgevoerd in een threadgroep om te voorkomen dat de gebeurtenislus wordt geblokkeerd.

Belangrijk

Gebruiken @executor voor zelfstandige functies (moduleniveau of lokale functies)

Niet gebruiken @executor met staticmethod of classmethod

Voor op klassen gebaseerde uitvoerders, subklasseExecutor en gebruiken @handler op exemplaarmethoden

Gebruik:


   # Standalone async function (RECOMMENDED):
   @executor(id="upper_case")
   async def to_upper(text: str, ctx: WorkflowContext[str]):
       await ctx.send_message(text.upper())


   # Standalone sync function (runs in thread pool):
   @executor
   def process_data(data: str):
       return data.upper()


   # For class-based executors, use @handler instead:
   class MyExecutor(Executor):
       def __init__(self):
           super().__init__(id="my_executor")

       @handler
       async def process(self, data: str, ctx: WorkflowContext[str]):
           await ctx.send_message(data.upper())
executor(func: Callable[[...], Any] | None = None, *, id: str | None = None) -> Callable[[Callable[[...], Any]], FunctionExecutor] | FunctionExecutor

Parameters

Name Description
func
Callable[[...], Any] | None

De functie om te versieren (indien gebruikt zonder haakjes)

Default value: None
id
Vereist
str | None

Optionele aangepaste id voor de uitvoerder. Als geen, gebruikt u de functienaam.

Keyword-Only Parameters

Name Description
id
Default value: None

Retouren

Type Description

Een FunctionExecutor-exemplaar dat kan worden bekabeld in een werkstroom.

Uitzonderingen

Type Description

Indien gebruikt met staticmethod of classmethod (niet-ondersteund patroon)

function_middleware

Decorator om een functie als functie-middleware te markeren.

Deze decorator identificeert expliciet een functie als functie-middleware, die FunctionInvocationContext-objecten verwerkt.

function_middleware(func: Callable[[FunctionInvocationContext, Callable[[FunctionInvocationContext], Awaitable[None]]], Awaitable[None]]) -> Callable[[FunctionInvocationContext, Callable[[FunctionInvocationContext], Awaitable[None]]], Awaitable[None]]

Parameters

Name Description
func
Vereist

De middlewarefunctie die als functie-middleware moet worden gemarkeerd.

Retouren

Type Description

Dezelfde functie met functie-middlewaremarkering.

Voorbeelden


   from agent_framework import function_middleware, FunctionInvocationContext, ChatAgent


   @function_middleware
   async def logging_middleware(context: FunctionInvocationContext, next):
       print(f"Calling: {context.function.name}")
       await next(context)
       print(f"Result: {context.result}")


   # Use with an agent
   agent = ChatAgent(chat_client=client, name="assistant", middleware=logging_middleware)

get_checkpoint_summary

get_checkpoint_summary(checkpoint: WorkflowCheckpoint) -> WorkflowCheckpointSummary

Parameters

Name Description
checkpoint
Vereist

Retouren

Type Description

get_logger

Haal een logger op met de opgegeven naam, die standaard wordt ingesteld op 'agent_framework'.

get_logger(name: str = 'agent_framework') -> Logger

Parameters

Name Description
name
str

De naam van de logger. Standaard ingesteld op 'agent_framework'.

Default value: "agent_framework"

Retouren

Type Description

Het geconfigureerde loggerexemplaren.

handler

Decorator voor het registreren van een handler voor een uitvoerder.

handler(func: Callable[[ExecutorT, Any, ContextT], Awaitable[Any]]) -> Callable[[ExecutorT, Any, ContextT], Awaitable[Any]]

Parameters

Name Description
func
Vereist
Callable[[<xref:agent_framework._workflows._executor.ExecutorT>, Any, <xref:agent_framework._workflows._executor.ContextT>], Awaitable[Any]]

De functie om te versieren. Kan geen zijn wanneer deze zonder parameters wordt gebruikt.

Retouren

Type Description
Callable[[<xref:agent_framework._workflows._executor.ExecutorT>, Any, <xref:agent_framework._workflows._executor.ContextT>], Awaitable[Any]]

De versierde functie met handlermetagegevens.

Voorbeelden

@handler async def handle_string(zelf, bericht: str, ctx: WorkflowContext[str]) -> Geen:

...

@handler async def handle_data(zelf, bericht: dict, ctx: WorkflowContext[str | int]) -> Geen:

...

prepare_function_call_results

Bereid de waarden van de resultaten van de functieoproep voor.

prepare_function_call_results(content: TextContent | DataContent | TextReasoningContent | UriContent | FunctionCallContent | FunctionResultContent | ErrorContent | UsageContent | HostedFileContent | HostedVectorStoreContent | FunctionApprovalRequestContent | FunctionApprovalResponseContent | Any | list[TextContent | DataContent | TextReasoningContent | UriContent | FunctionCallContent | FunctionResultContent | ErrorContent | UsageContent | HostedFileContent | HostedVectorStoreContent | FunctionApprovalRequestContent | FunctionApprovalResponseContent | Any]) -> str

Parameters

Name Description
content
Vereist

Retouren

Type Description
str

prepend_agent_framework_to_user_agent

Prepend "agent-framework" aan de User-Agent in de headers.

Wanneer telemetrie van de gebruikersagent is uitgeschakeld via de AGENT_FRAMEWORK_USER_AGENT_DISABLED omgevingsvariabele, bevat de User-Agent-header geen informatie over het agentframework. Het wordt teruggestuurd als zodanig of als een leeg dict als er geen wordt doorgegeven.

prepend_agent_framework_to_user_agent(headers: dict[str, Any] | None = None) -> dict[str, Any]

Parameters

Name Description
headers

De bestaande koptekstenwoordenlijst.

Default value: None

Retouren

Type Description

Een nieuwe dict met User-Agent ingesteld op agent-framework-python/{version} als headers Geen zijn. De gewijzigde headerswoordenlijst met agent-framework-python/{version}, voorafgegaan door de User-Agent.

Voorbeelden


   from agent_framework import prepend_agent_framework_to_user_agent

   # Add agent-framework to new headers
   headers = prepend_agent_framework_to_user_agent()
   print(headers["User-Agent"])  # "agent-framework-python/0.1.0"

   # Prepend to existing headers
   existing = {"User-Agent": "my-app/1.0"}
   headers = prepend_agent_framework_to_user_agent(existing)
   print(headers["User-Agent"])  # "agent-framework-python/0.1.0 my-app/1.0"

response_handler

Decorator voor het registreren van een handler voor het afhandelen van antwoorden voor een aanvraag.

response_handler(func: Callable[[ExecutorT, Any, Any, ContextT], Awaitable[None]]) -> Callable[[ExecutorT, Any, Any, ContextT], Awaitable[None]]

Parameters

Name Description
func
Vereist
Callable[[<xref:agent_framework._workflows._request_info_mixin.ExecutorT>, Any, Any, <xref:agent_framework._workflows._request_info_mixin.ContextT>], Awaitable[None]]

De functie om te versieren.

Retouren

Type Description
Callable[[<xref:agent_framework._workflows._request_info_mixin.ExecutorT>, Any, Any, <xref:agent_framework._workflows._request_info_mixin.ContextT>], Awaitable[None]]

De versierde functie met handlermetagegevens.

Voorbeelden


   @handler
   async def run(self, message: int, context: WorkflowContext[str]) -> None:
       # Example of a handler that sends a request
       ...
       # Send a request with a `CustomRequest` payload and expect a `str` response.
       await context.request_info(CustomRequest(...), str)


   @response_handler
   async def handle_response(
       self,
       original_request: CustomRequest,
       response: str,
       context: WorkflowContext[str],
   ) -> None:
       # Example of a response handler for the above request
       ...


   @response_handler
   async def handle_response(
       self,
       original_request: CustomRequest,
       response: dict,
       context: WorkflowContext[int],
   ) -> None:
       # Example of a response handler for a request expecting a dict response
       ...

setup_logging

Stel de configuratie voor logboekregistratie in voor het agentframework.

setup_logging() -> None

Retouren

Type Description

use_agent_middleware

Klasse-decorator die middleware-ondersteuning toevoegt aan een agentklasse.

Deze decorator voegt middlewarefunctionaliteit toe aan elke agentklasse. Het verpakt de run() en run_stream() methoden om middleware-uitvoering te bieden.

De middleware-uitvoering kan op elk gewenst moment worden beëindigd door de context.terminate eigenschap in te stellen op True. Zodra de pijplijn is ingesteld, stopt de uitvoering van verdere middleware zodra het besturingselement terugkeert naar de pijplijn.

Opmerking

Deze decorator is al toegepast op ingebouwde agentklassen. U hoeft alleen maar te gebruiken

als u aangepaste agent-implementaties maakt.

use_agent_middleware(agent_class: type[TAgent]) -> type[TAgent]

Parameters

Name Description
agent_class
Vereist
type[<xref:TAgent>]

De agentklasse waaraan middleware-ondersteuning moet worden toegevoegd.

Retouren

Type Description
type[~<xref:TAgent>]

De gewijzigde agentklasse met middleware-ondersteuning.

Voorbeelden


   from agent_framework import use_agent_middleware


   @use_agent_middleware
   class CustomAgent:
       async def run(self, messages, **kwargs):
           # Agent implementation
           pass

       async def run_stream(self, messages, **kwargs):
           # Streaming implementation
           pass

use_chat_middleware

Klasse-decorator die middleware-ondersteuning toevoegt aan een chatclientklasse.

Deze decorator voegt middlewarefunctionaliteit toe aan elke chatclientklasse. Het verpakt de get_response() en get_streaming_response() methoden om middleware-uitvoering te bieden.

Opmerking

Deze decorator is al toegepast op ingebouwde chatclientklassen. U hoeft alleen maar te gebruiken

als u aangepaste chatclient-implementaties maakt.

use_chat_middleware(chat_client_class: type[TChatClient]) -> type[TChatClient]

Parameters

Name Description
chat_client_class
Vereist
type[<xref:TChatClient>]

De chatclientklasse waaraan middleware-ondersteuning moet worden toegevoegd.

Retouren

Type Description
type[~<xref:TChatClient>]

De gewijzigde chatclientklasse met middleware-ondersteuning.

Voorbeelden


   from agent_framework import use_chat_middleware


   @use_chat_middleware
   class CustomChatClient:
       async def get_response(self, messages, **kwargs):
           # Chat client implementation
           pass

       async def get_streaming_response(self, messages, **kwargs):
           # Streaming implementation
           pass

use_function_invocation

Klasse-decorator waarmee hulpprogramma's voor een chatclient kunnen worden aangeroepen.

Deze decorator verpakt de get_response en get_streaming_response methoden om functie-aanroepen van het model automatisch te verwerken, uit te voeren en de resultaten terug te sturen naar het model voor verdere verwerking.

use_function_invocation(chat_client: type[TChatClient]) -> type[TChatClient]

Parameters

Name Description
chat_client
Vereist
type[<xref:TChatClient>]

De chatclientklasse om te versieren.

Retouren

Type Description
type[~<xref:TChatClient>]

De ingerichte chatclientklasse met functie-aanroep ingeschakeld.

Uitzonderingen

Type Description

Als de chatclient niet over de vereiste methoden beschikt.

Voorbeelden


   from agent_framework import use_function_invocation, BaseChatClient


   @use_function_invocation
   class MyCustomClient(BaseChatClient):
       async def get_response(self, messages, **kwargs):
           # Implementation here
           pass

       async def get_streaming_response(self, messages, **kwargs):
           # Implementation here
           pass


   # The client now automatically handles function calls
   client = MyCustomClient()

validate_workflow_graph

Handige functie om een werkstroomgrafiek te valideren.

validate_workflow_graph(edge_groups: Sequence[EdgeGroup], executors: dict[str, Executor], start_executor: Executor) -> None

Parameters

Name Description
edge_groups
Vereist
Sequence[<xref:agent_framework._workflows._edge.EdgeGroup>]

lijst met edge-groepen in de werkstroom

executors
Vereist

Toewijzing van uitvoerders-id's aan uitvoerdersexemplaren

start_executor
Vereist

De startexecutor (kan een exemplaar of id zijn)

Retouren

Type Description

Uitzonderingen

Type Description

Als er een validatie mislukt