Sdílet prostřednictvím


agent_framework Balíček

Balíčky

a2a
ag_ui
anthropic
azure
chatkit
declarative
devui
lab
mem0
microsoft
ollama
openai
redis

Moduly

exceptions
observability

Třídy

AIFunction

Nástroj, který zabalí funkci Pythonu, aby ji bylo možné volat pomocí modelů AI.

Tato třída zabalí funkci Pythonu, která umožňuje volat modely AI pomocí automatického ověřování parametrů a generování schématu JSON.

Inicializace funkce AIFunction.

AgentExecutor

integrovaný exekutor, který zabalí agenta pro zpracování zpráv.

AgentExecutor přizpůsobuje své chování na základě režimu provádění pracovního postupu:

  • run_stream(): Generuje přírůstkové události AgentRunUpdateEvent, protože agent vytváří tokeny.
  • run(): Generuje jeden agentRunEvent obsahující úplnou odpověď.

Exekutor automaticky rozpozná režim prostřednictvím WorkflowContext.is_streaming().

Inicializace exekutoru s jedinečným identifikátorem

AgentExecutorRequest

Požadavek na exekutor agenta.

AgentExecutorResponse

Odpověď z exekutoru agenta.

AgentInputRequest

Žádost o vstup člověka před spuštěním agenta v pracovních postupech tvůrce vysoké úrovně

Když se pracovní postup pozastaví před spuštěním agenta, vygeneruje se prostřednictvím RequestInfoEvent. Odpověď se vloží do konverzace jako uživatelská zpráva, která ospraví chování agenta.

Toto je standardní typ požadavku používaný .with_request_info() pro Sekvenčníbuilder, ConcurrentBuilder, GroupChatBuilder a HandoffBuilder.

AgentMiddleware

Abstraktní základní třída pro middleware agenta, který může zachytit vyvolání agenta.

Middleware agenta umožňuje zachytit a upravit vyvolání agenta před a po spuštění. Můžete zkontrolovat zprávy, upravit kontext, přepsat výsledky nebo ukončit provádění včas.

Poznámka:

AgentMiddleware je abstraktní základní třída. Musíte ji podtřídět a implementovat.

metoda process() pro vytvoření vlastního middlewaru agenta.

AgentProtocol

Protokol pro agenta, který lze vyvolat.

Tento protokol definuje rozhraní, které musí implementovat všichni agenti, včetně vlastností pro identifikaci a metody spuštění.

Poznámka:

Protokoly používají strukturální podtypy (typování kachny). Třídy nepotřebují

explicitně dědí z tohoto protokolu, aby byl považován za kompatibilní.

Díky tomu můžete vytvářet zcela vlastní agenty bez použití.

všechny základní třídy Agent Framework.

AgentRunContext

Kontextový objekt pro vyvolání middlewaru agenta

Tento kontext se předává prostřednictvím kanálu middlewaru agenta a obsahuje všechny informace o vyvolání agenta.

Inicializuje AgentRunContext.

AgentRunEvent

Událost se aktivuje po dokončení spuštění agenta.

Inicializuje událost spuštění agenta.

AgentRunResponse

Představuje odpověď na žádost o spuštění agenta.

Poskytuje jednu nebo více zpráv odpovědí a metadata o odpovědi. Typická odpověď bude obsahovat jednu zprávu, ale může obsahovat více zpráv ve scénářích zahrnujících volání funkcí, načítání RAG nebo složitou logiku.

Inicializace AgentRunResponse

AgentRunResponseUpdate

Představuje jeden blok odpovědí streamování z agenta.

Inicializace AgentRunResponseUpdate.

AgentRunUpdateEvent

Událost se aktivuje, když agent streamuje zprávy.

Inicializuje událost streamování agenta.

AgentThread

Třída vlákna agenta, může představovat jak místně spravované vlákno, nebo vlákno spravované službou.

Udržuje AgentThread stav konverzace a historii zpráv pro interakci agenta. Může buď použít vlákno spravované službou (prostřednictvím service_thread_id) nebo místní úložiště zpráv (prostřednictvím message_store), ale ne obojí.

Inicializace AgentThread, nepoužívejte tuto metodu ručně, vždy použijte: agent.get_new_thread().

Poznámka:

Je možné nastavit service_thread_id nebo message_store, ale ne obojí.

AggregateContextProvider

ContextProvider, který obsahuje více zprostředkovatelů kontextu.

Před vrácením deleguje události na několik zprostředkovatelů kontextu a agreguje odpovědi z těchto událostí. To vám umožní zkombinovat několik zprostředkovatelů kontextu do jednoho zprostředkovatele.

Poznámka:

AggregateContextProvider se vytvoří automaticky, když předáte jeden kontext.

zprostředkovatele nebo posloupnost zprostředkovatelů kontextu konstruktoru agenta.

Inicializace AggregateContextProvider s kontextovými zprostředkovateli.

BaseAgent

Základní třída pro všechny agenty Agent Framework.

Tato třída poskytuje základní funkce pro implementace agentů, včetně poskytovatelů kontextu, podpory middlewaru a správy vláken.

Poznámka:

BaseAgent nelze vytvořit instanci přímo, protože neimplementuje

run(), run_stream() a další metody vyžadované agentemProtocol.

Použijte konkrétní implementaci, jako je ChatAgent nebo vytvořte podtřídu.

Inicializace instance BaseAgent

BaseAnnotation

Základní třída pro všechny typy poznámek AI

Inicializace BaseAnnotation

BaseChatClient

Základní třída pro chatovací klienty

Tato abstraktní základní třída poskytuje základní funkce pro implementace chatovacího klienta, včetně podpory middlewaru, přípravy zpráv a normalizace nástrojů.

Poznámka:

BaseChatClient nelze vytvořit instanci přímo, protože se jedná o abstraktní základní třídu.

Podtřídy musí implementovat _inner_get_response() a _inner_get_streaming_response().

Inicializuje instanci BaseChatClient.

BaseContent

Představuje obsah používaný službami AI.

Inicializovat BaseContent.

Case

Obálka modulu runtime kombinující predikát s případným přepínačem s cílem.

Každý případ se páruje logický predikát s exekutorem, který by měl zpracovat zprávu, když se predikát vyhodnotí jako Pravda. Modul runtime udržuje tento jednoduchý kontejner odděleně od serialisable SwitchCaseEdgeGroupCase , aby provádění fungovalo s živými volatelnými bez znečisťování trvalého stavu.

ChatAgent

Chatovací klientský agent.

Toto je primární implementace agenta, která používá chatovacího klienta k interakci s jazykovými modely. Podporuje nástroje, zprostředkovatele kontextu, middleware a odpovědi na streamování i odpovědi bez streamování.

Inicializace instance ChatAgent

Poznámka:

Sada parametrů z frequency_penalty na request_kwargs se používá

zavolejte chatovacího klienta. Lze je také předat oběma metodám spuštění.

Pokud jsou obě nastavené, mají přednost ty, které se předají metodám spuštění.

ChatClientProtocol

Protokol pro chatovacího klienta, který může generovat odpovědi.

Tento protokol definuje rozhraní, které musí implementovat všichni chatovací klienti, včetně metod pro generování streamovaných i nes streamingových odpovědí.

Poznámka:

Protokoly používají strukturální podtypy (typování kachny). Třídy nepotřebují

explicitně dědí z tohoto protokolu, aby byl považován za kompatibilní.

ChatContext

Kontextový objekt pro vyvolání middlewaru chatu

Tento kontext se předává prostřednictvím kanálu middlewaru chatu a obsahuje všechny informace o žádosti o chat.

Inicializuje ChatContext.

ChatMessage

Představuje zprávu chatu.

Inicializace ChatMessage

ChatMessageStore

Implementace ChatMessageStoreProtocol v paměti, která ukládá zprávy v seznamu.

Tato implementace poskytuje jednoduché seznamové úložiště pro chatové zprávy s podporou serializace a deserializace. Implementuje všechny požadované metody ChatMessageStoreProtocol protokolu.

Úložiště udržuje zprávy v paměti a poskytuje metody serializace a deserializace stavu pro účely trvalosti.

Vytvořte ChatMessageStore pro použití ve vlákně.

ChatMessageStoreProtocol

Definuje metody pro ukládání a načítání chatových zpráv přidružených ke konkrétnímu vláknu.

Implementace tohoto protokolu zodpovídají za správu ukládání chatových zpráv, včetně zpracování velkých objemů dat zkrácením nebo shrnutím zpráv podle potřeby.

ChatMiddleware

Abstraktní základní třída pro middleware chatu, která může zachycovat žádosti klientů chatu.

Middleware chatu umožňuje zachytit a upravit žádosti klientů chatu před a po spuštění. Můžete upravovat zprávy, přidávat systémové výzvy, žádosti protokolu nebo přepisovat odpovědi chatu.

Poznámka:

ChatMiddleware je abstraktní základní třída. Musíte ji podtřídět a implementovat.

metoda process() pro vytvoření vlastního middlewaru chatu.

ChatOptions

Běžná nastavení požadavků pro služby AI

Inicializace chatOptions

ChatResponse

Představuje odpověď na žádost chatu.

Inicializuje ChatResponse se zadanými parametry.

ChatResponseUpdate

Představuje jeden blok odpovědí streamování z ChatClient.

Inicializuje ChatResponseUpdate s zadanými parametry.

CheckpointStorage

Protokol pro back-endy úložiště kontrolních bodů

CitationAnnotation

Představuje citační poznámku.

Inicializace CitationAnnotation

ConcurrentBuilder

Tvůrce vysoké úrovně pro souběžné pracovní postupy agenta

  • participants([...]) přijímá seznam agentaProtocol (doporučeno) nebo exekutoru.

  • register_participants([...]) přijímá seznam továren pro AgentProtocol (doporučeno)

    objekty pro vytváření exekutorů

  • build() dráty: dispečer -> fan-out -> účastníci -> fan-in -> agregátor.

  • with_aggregator(...) přepíše výchozí agregátor exekutorem nebo zpětným voláním.

  • register_aggregator(...) přijímá objekt pro exekutor jako vlastní agregátor.

Použití:


   from agent_framework import ConcurrentBuilder

   # Minimal: use default aggregator (returns list[ChatMessage])
   workflow = ConcurrentBuilder().participants([agent1, agent2, agent3]).build()

   # With agent factories
   workflow = ConcurrentBuilder().register_participants([create_agent1, create_agent2, create_agent3]).build()


   # Custom aggregator via callback (sync or async). The callback receives
   # list[AgentExecutorResponse] and its return value becomes the workflow's output.
   def summarize(results: list[AgentExecutorResponse]) -> str:
       return " | ".join(r.agent_run_response.messages[-1].text for r in results)


   workflow = ConcurrentBuilder().participants([agent1, agent2, agent3]).with_aggregator(summarize).build()


   # Custom aggregator via a factory
   class MyAggregator(Executor):
       @handler
       async def aggregate(self, results: list[AgentExecutorResponse], ctx: WorkflowContext[Never, str]) -> None:
           await ctx.yield_output(" | ".join(r.agent_run_response.messages[-1].text for r in results))


   workflow = (
       ConcurrentBuilder()
       .register_participants([create_agent1, create_agent2, create_agent3])
       .register_aggregator(lambda: MyAggregator(id="my_aggregator"))
       .build()
   )


   # Enable checkpoint persistence so runs can resume
   workflow = ConcurrentBuilder().participants([agent1, agent2, agent3]).with_checkpointing(storage).build()

   # Enable request info before aggregation
   workflow = ConcurrentBuilder().participants([agent1, agent2]).with_request_info().build()
Context

Třída obsahující libovolný kontext, který by měl být poskytnut modelu AI, jak je dodává ContextProvider.

Každý ContextProvider má možnost poskytnout svůj vlastní kontext pro každé vyvolání. Třída Context obsahuje další kontext zadaný ContextProvider. Tento kontext se před předáním modelu AI zkombinuje s kontextem poskytnutým jinými poskytovateli. Tento kontext je pro každé vyvolání a nebude uložen jako součást historie chatu.

Vytvořte nový kontextový objekt.

ContextProvider

Základní třída pro všechny zprostředkovatele kontextu

Zprostředkovatel kontextu je komponenta, která se dá použít k vylepšení správy kontextu umělé inteligence. Může naslouchat změnám v konverzaci a poskytnout další kontext modelu AI těsně před vyvoláním.

Poznámka:

ContextProvider je abstraktní základní třída. Musíte ji podtřídět a implementovat.

vyvolání() metody pro vytvoření vlastního zprostředkovatele kontextu. V ideálním případě byste měli

implementujte také vyvolané() a thread_created() metody ke sledování konverzací.

ale ty jsou volitelné.

DataContent

Představuje binární datový obsah s přidruženým typem média (označovaným také jako typ MIME).

Důležité

Jedná se o binární data, která jsou reprezentována jako identifikátor URI dat, nikoli pro online prostředky.

Pro online prostředky použijte identifikátor UriContent.

Inicializuje instanci DataContent.

Důležité

Jedná se o binární data, která jsou reprezentována jako identifikátor URI dat, nikoli pro online prostředky.

Pro online prostředky použijte identifikátor UriContent.

Default

Reprezentace výchozí větve ve skupině s případem přepínače

Výchozí větev se vyvolá pouze v případě, že se neshoduje žádný jiný predikát případu. V praxi je zaručeno, že existuje, takže směrování nikdy nevygeneruje prázdný cíl.

Edge

Modeluje směrovanou, volitelně podmíněnou předání mezi dvěma exekutory.

Každý Edge zachycuje minimální metadata potřebná k přesunutí zprávy z jednoho exekutoru do jiného uvnitř grafu pracovního postupu. Volitelně vloží logický predikát, který rozhoduje, jestli má být hrana za běhu přijata. Serializací okrajů na primitivy můžeme rekonstruovat topologii pracovního postupu bez ohledu na původní proces Pythonu.

Inicializuje plně zadanou hranu mezi dvěma exekutory pracovního postupu.

EdgeDuplicationError

Výjimka vyvolaná při zjištění duplicitních okrajů v pracovním postupu

ErrorContent

Představuje chybu.

Poznámky: Obvykle se používá pro jiné než závažné chyby, kdy se v rámci operace něco pokazilo, ale operace byla stále schopna pokračovat.

Inicializuje instanci ErrorContent.

Executor

Základní třída pro všechny exekutory pracovního postupu, které zpracovávají zprávy a provádějí výpočty.

Přehled

Exekutory jsou základní stavební bloky pracovních postupů, které představují jednotlivé jednotky zpracování, které přijímají zprávy, provádějí operace a vytvářejí výstupy. Každý exekutor je jedinečně identifikován a může zpracovávat konkrétní typy zpráv prostřednictvím zdobených metod obslužné rutiny.

Systém typů

Exekutory mají bohatý systém typů, který definuje jejich schopnosti:

Vstupní typy

Typy zpráv, které exekutor může zpracovat, zjištěny z podpisů metody obslužné rutiny:


   class MyExecutor(Executor):
       @handler
       async def handle_string(self, message: str, ctx: WorkflowContext) -> None:
           # This executor can handle 'str' input types

Přístup prostřednictvím vlastnosti input_types .

Výstupní typy

Typy zpráv, které exekutor může odeslat jiným exekutorům prostřednictvím ctx.send_message():


   class MyExecutor(Executor):
       @handler
       async def handle_data(self, message: str, ctx: WorkflowContext[int | bool]) -> None:
           # This executor can send 'int' or 'bool' messages

Přístup prostřednictvím vlastnosti output_types

Typy výstupu pracovního postupu

Typy dat, která exekutor může generovat jako výstupy na úrovni pracovního postupu prostřednictvím ctx.yield_output():>


   class MyExecutor(Executor):
       @handler
       async def process(self, message: str, ctx: WorkflowContext[int, str]) -> None:
           # Can send 'int' messages AND yield 'str' workflow outputs

Přístup prostřednictvím vlastnosti workflow_output_types

Zjišťování obslužné rutiny

Exekutory zjišťují své schopnosti prostřednictvím zdobených metod:

@handler Dekoratér

Označí metody, které zpracovávají příchozí zprávy:


   class MyExecutor(Executor):
       @handler
       async def handle_text(self, message: str, ctx: WorkflowContext[str]) -> None:
           await ctx.send_message(message.upper())

Průsečík požadavků dílčího pracovního postupu

K zachycení požadavků dílčího pracovního postupu použijte @handler metody:


   class ParentExecutor(Executor):
       @handler
       async def handle_subworkflow_request(
           self,
           request: SubWorkflowRequestMessage,
           ctx: WorkflowContext[SubWorkflowResponseMessage],
       ) -> None:
           if self.is_allowed(request.domain):
               response = request.create_response(data=True)
               await ctx.send_message(response, target_id=request.executor_id)
           else:
               await ctx.request_info(request.source_event, response_type=request.source_event.response_type)

Typy kontextu

Metody obslužné rutiny přijímají různé varianty WorkflowContext založené na jejich typech poznámek:

WorkflowContext (bez parametrů typu)

U obslužných rutin, které provádějí pouze vedlejší účinky bez odesílání zpráv nebo výstupů:


   class LoggingExecutor(Executor):
       @handler
       async def log_message(self, msg: str, ctx: WorkflowContext) -> None:
           print(f"Received: {msg}")  # Only logging, no outputs

WorkflowContext[T_Out]

Umožňuje odesílání zpráv typu T_Out prostřednictvím ctx.send_message():>


   class ProcessorExecutor(Executor):
       @handler
       async def handler(self, msg: str, ctx: WorkflowContext[int]) -> None:
           await ctx.send_message(42)  # Can send int messages

WorkflowContext[T_Out, T_W_Out]

Umožňuje odesílání zpráv (T_Out) i získávání výstupů pracovního postupu (T_W_Out):


   class DualOutputExecutor(Executor):
       @handler
       async def handler(self, msg: str, ctx: WorkflowContext[int, str]) -> None:
           await ctx.send_message(42)  # Send int message
           await ctx.yield_output("done")  # Yield str workflow output

Exekutory funkcí

Jednoduché funkce lze převést na exekutory pomocí dekorátoru @executor :


   @executor
   async def process_text(text: str, ctx: WorkflowContext[str]) -> None:
       await ctx.send_message(text.upper())


   # Or with custom ID:
   @executor(id="text_processor")
   def sync_process(text: str, ctx: WorkflowContext[str]) -> None:
       ctx.send_message(text.lower())  # Sync functions run in thread pool

Složení dílčího pracovního postupu

Exekutory můžou obsahovat dílčí pracovní postupy pomocí WorkflowExecutoru. Dílčí pracovní postupy můžou vyhovět požadavkům, které mohou nadřazené pracovní postupy zachytit. Podrobnosti o vzorech složení pracovních postupů a zpracování požadavků a odpovědí najdete v dokumentaci k WorkflowExecutoru.

Správa stavu

Exekutory můžou obsahovat stavy, které se uchovávají napříč spuštěními pracovního postupu a kontrolními body. Přepište on_checkpoint_save a on_checkpoint_restore metody pro implementaci vlastní serializace stavu a logiky obnovení.

Poznámky k implementaci

  • Nevolejte přímo execute() – vyvolá se modulem pracovního postupu.
  • Nepřepsat execute() – definujte obslužné rutiny pomocí dekorátorů.
  • Každý exekutor musí mít alespoň jednu metodu @handler .
  • Podpisy metody obslužné rutiny se ověřují při inicializaci.

Inicializace exekutoru s jedinečným identifikátorem

ExecutorCompletedEvent

Událost se aktivuje při dokončení obslužné rutiny exekutoru.

Inicializace události exekutoru s ID exekutoru a volitelnými daty

ExecutorEvent

Základní třída pro události exekutoru

Inicializace události exekutoru s ID exekutoru a volitelnými daty

ExecutorFailedEvent

Událost se aktivuje, když obslužná rutina exekutoru vyvolá chybu.

ExecutorInvokedEvent

Událost se aktivuje při vyvolání obslužné rutiny exekutoru.

Inicializace události exekutoru s ID exekutoru a volitelnými daty

FanInEdgeGroup

Představuje sblíženou sadu hran, které se živí jedním podřízeným exekutorem.

Skupiny ventilátorů se obvykle používají v případě, že více nadřazených fází nezávisle vytváří zprávy, které by měly přijít na stejný podřízený procesor.

Vytvořte mapování ventilátorů, které slučuje několik zdrojů do jednoho cíle.

FanOutEdgeGroup

Představuje skupinu okrajů ve stylu všesměrového vysílání s volitelnou logikou výběru.

Ventilátor předá zprávu vytvořenou jedním zdrojovým exekutorem jednomu nebo několika podřízeným exekutorům. Za běhu můžeme cíle dále zúžit spuštěním selection_func , který zkontroluje datovou část a vrátí podmnožinu ID, která by měla zprávu obdržet.

Vytvořte mapování ventilátorů z jednoho zdroje na mnoho cílů.

FileCheckpointStorage

Úložiště kontrolních bodů založených na souborech pro trvalost

Inicializace úložiště souborů

FinishReason

Představuje důvod dokončení odpovědi chatu.

Inicializace FinishReason s hodnotou.

FunctionApprovalRequestContent

Představuje žádost o schválení volání funkce uživatelem.

Inicializuje FunctionApprovalRequestContent instance.

FunctionApprovalResponseContent

Představuje odpověď pro schválení volání funkce uživatelem.

Inicializuje FunctionApprovalResponseContent instance.

FunctionCallContent

Představuje požadavek na volání funkce.

Inicializuje FunctionCallContent instance.

FunctionExecutor

Exekutor, který zabalí uživatelem definovanou funkci.

Tento exekutor umožňuje uživatelům definovat jednoduché funkce (synchronizace i asynchronní) a používat je jako exekutory pracovních postupů, aniž by museli vytvářet úplné třídy exekutoru.

Synchronní funkce se spouští ve fondu vláken pomocí asyncio.to_thread(), aby se zabránilo blokování smyčky událostí.

Inicializace FunctionExecutor s uživatelem definovanou funkcí.

FunctionInvocationConfiguration

Konfigurace pro vyvolání funkce v chatovacích klientech

Tato třída se vytvoří automaticky na každém chatovacím klientovi, který podporuje vyvolání funkce. To znamená, že ve většině případů můžete pouze změnit atributy v instanci a pak vytvořit nový.

Inicializace FunctionInvocationConfiguration

FunctionInvocationContext

Kontextový objekt pro vyvolání middlewaru funkce

Tento kontext se předává prostřednictvím kanálu middlewaru funkce a obsahuje všechny informace o vyvolání funkce.

Inicializace FunctionInvocationContext.

FunctionMiddleware

Abstraktní základní třída pro middleware funkcí, která může zachytit vyvolání funkce.

Middleware funkcí umožňuje zachytit a upravit vyvolání funkcí nebo nástrojů před a po spuštění. Můžete ověřit argumenty, výsledky mezipaměti, vyvolání protokolů nebo přepsat provádění funkce.

Poznámka:

FunctionMiddleware je abstraktní základní třída. Musíte ji podtřídět a implementovat.

metoda process() pro vytvoření vlastního middlewaru funkcí.

FunctionResultContent

Představuje výsledek volání funkce.

Inicializuje FunctionResultContent instance.

GraphConnectivityError

Při zjištění problémů s připojením k grafu došlo k výjimce.

GroupChatBuilder

Tvůrce vysoké úrovně pro pracovní postupy skupinového chatu řízené manažerem s dynamickou orchestrací

GroupChat koordinuje konverzace s více agenty pomocí manažera, který vybere, který účastník mluví dále. Správcem může být jednoduchá funkce Pythonu (set_select_speakers_func) nebo selektor založený na agentech prostřednictvím set_manager. Tyto dva přístupy se vzájemně vylučují.

Základní pracovní postup:

  1. Definování účastníků: seznam agentů (používá jejich .name) nebo diktování názvů mapování na agenty

  2. Konfigurace výběru mluvčího: set_select_speakers_func NEBO

    set_manager (ne oba)

  3. Volitelné: nastavení limitů zaokrouhlení, kontrolních bodů, podmínek ukončení

  4. Sestavení a spuštění pracovního postupu

Vzory výběru mluvčího:

Vzor 1: Jednoduchý výběr založený na funkcích (doporučeno)


   from agent_framework import GroupChatBuilder, GroupChatStateSnapshot


   def select_next_speaker(state: GroupChatStateSnapshot) -> str | None:
       # state contains: task, participants, conversation, history, round_index
       if state["round_index"] >= 5:
           return None  # Finish
       last_speaker = state["history"][-1].speaker if state["history"] else None
       if last_speaker == "researcher":
           return "writer"
       return "researcher"


   workflow = (
       GroupChatBuilder()
       .set_select_speakers_func(select_next_speaker)
       .participants([researcher_agent, writer_agent])  # Uses agent.name
       .build()
   )

Model 2: Výběr založený na LLM


   from agent_framework import ChatAgent
   from agent_framework.azure import AzureOpenAIChatClient

   manager_agent = AzureOpenAIChatClient().create_agent(
       instructions="Coordinate the conversation and pick the next speaker.",
       name="Coordinator",
       temperature=0.3,
       seed=42,
       max_tokens=500,
   )

   workflow = (
       GroupChatBuilder()
       .set_manager(manager_agent, display_name="Coordinator")
       .participants([researcher, writer])  # Or use dict: researcher=r, writer=w
       .with_max_rounds(10)
       .build()
   )

Vzor 3: Žádost o informace o zpětné vazbě v polovině konverzace


   from agent_framework import GroupChatBuilder

   # Pause before all participants
   workflow = (
       GroupChatBuilder()
       .set_select_speakers_func(select_next_speaker)
       .participants([researcher, writer])
       .with_request_info()
       .build()
   )

   # Pause only before specific participants
   workflow = (
       GroupChatBuilder()
       .set_select_speakers_func(select_next_speaker)
       .participants([researcher, writer, editor])
       .with_request_info(agents=[editor])  # Only pause before editor responds
       .build()
   )

Specifikace účastníka:

Dva způsoby, jak určit účastníky:

  • Formulář seznamu: [agent1, agent2] – používá atribut agent.name pro jména účastníků.
  • Formulář diktování: {name1: agent1, name2: agent2} – explicitní ovládací prvek názvů
  • Formulář klíčového slova: participants(name1=agent1; name2=agent2) – explicitní řízení názvů

Stavová struktura snímků:

GroupChatStateSnapshot předaný set_select_speakers_func obsahuje:

  • task: ChatMessage - Původní uživatelský úkol
  • účastníci: dict[str, str] - Mapování jmen účastníků na popisy
  • konverzace: řazená kolekce členů[ChatMessage, ...] - Celá historie konverzací
  • historie: řazená kolekce členů[GroupChatTurn, ...] - Záznam turn-by-turn-by-turn s přisouzením mluvčího
  • round_index: int – počet dosud zadaných zaokrouhlených výběrů nadřízených
  • pending_agent: str | Žádné – název agenta, který aktuálně zpracovává (pokud existuje)

Důležitá omezení:

  • Nelze kombinovat set_select_speakers_func a set_manager
  • Jména účastníků musí být jedinečná.
  • Při použití formuláře seznamu musí mít agenti neprázdný atribut názvu .

Inicializace GroupChatBuilder.

GroupChatDirective

Instrukce vygenerované implementací správce skupinového chatu

HandoffBuilder

Fluent builder for conversational handoff workflows with coordinator and specialist agents.

Model předání umožňuje koordinačnímu agentovi směrovat požadavky na specializované agenty. Režim interakce určuje, jestli pracovní postup požaduje uživatelský vstup po každé odpovědi agenta nebo se dokončí nezávisle po dokončení reakce agentů. Podmínka ukončení určuje, kdy má pracovní postup přestat vyžadovat vstup a dokončit.

Vzory směrování:

Single-Tier (výchozí): Pouze koordinátor může předat specialistům. Ve výchozím nastavení se ovládací prvek po reakci odborníka vrátí uživateli pro další vstup. Tím se vytvoří cyklický tok: uživatel -> koordinátor -> [volitelný specialista] -> uživatel -> koordinátor -> ... Pomocí with_interaction_mode("autonomní") můžete přeskočit žádost o další uživatelský vstup a získat konečnou konverzaci, když agent odpoví bez delegování.

Vícevrstvé (pokročilé): Specialisté můžou předat jiným specialistům pomocí .add_handoff(). To poskytuje větší flexibilitu pro složité pracovní postupy, ale je méně ovladatelná než model s jednou vrstvou. Uživatelé ztratí přehled o průběžných krocích během předání specialistů-specialistů (ačkoli úplná historie konverzací včetně všech předání je zachována a lze ji následně zkontrolovat).

Klíčové funkce:

  • Automatická detekce předání: Koordinátor vyvolá nástroj pro předání, jehož

    argumenty (například {"handoff_to": "shipping_agent"}) identifikují odborníka, který má získat kontrolu.

  • Automaticky generované nástroje: Ve výchozím nastavení tvůrce syntetizuje handoff_to_<agent> nástroje pro koordinátora, takže nedefinujete zástupné funkce ručně.

  • Úplná historie konverzací: Celá konverzace (včetně všech ChatMessage.additional_properties) se zachová a předá každému agentu.

  • Řízení ukončení: Ve výchozím nastavení se ukončí po 10 uživatelských zprávách. Přepsání pomocí .with_termination_condition(lambda conv: ...) pro vlastní logiku (např. zjistit "sbohem").

  • Režimy interakce: Zvolte human_in_loop (výchozí) a vyzve uživatele mezi otáčením agenta nebo autonomním , aby pokračovali ve směrování zpět na agenty bez výzvy k zadání uživatelského vstupu, dokud nedojde k předání nebo dosažení limitu ukončení/turnu (výchozí limit autonomního turnu: 50).

  • Vytváření kontrolních bodů: Volitelná trvalost pro obnovitelné pracovní postupy.

Využití (Single-Tier):


   from agent_framework import HandoffBuilder
   from agent_framework.openai import OpenAIChatClient

   chat_client = OpenAIChatClient()

   # Create coordinator and specialist agents
   coordinator = chat_client.create_agent(
       instructions=(
           "You are a frontline support agent. Assess the user's issue and decide "
           "whether to hand off to 'refund_agent' or 'shipping_agent'. When delegation is "
           "required, call the matching handoff tool (for example `handoff_to_refund_agent`)."
       ),
       name="coordinator_agent",
   )

   refund = chat_client.create_agent(
       instructions="You handle refund requests. Ask for order details and process refunds.",
       name="refund_agent",
   )

   shipping = chat_client.create_agent(
       instructions="You resolve shipping issues. Track packages and update delivery status.",
       name="shipping_agent",
   )

   # Build the handoff workflow - default single-tier routing
   workflow = (
       HandoffBuilder(
           name="customer_support",
           participants=[coordinator, refund, shipping],
       )
       .set_coordinator(coordinator)
       .build()
   )

   # Run the workflow
   events = await workflow.run_stream("My package hasn't arrived yet")
   async for event in events:
       if isinstance(event, RequestInfoEvent):
           # Request user input
           user_response = input("You: ")
           await workflow.send_response(event.data.request_id, user_response)

Vícevrstvé směrování s .add_handoff():


   # Enable specialist-to-specialist handoffs with fluent API
   workflow = (
       HandoffBuilder(participants=[coordinator, replacement, delivery, billing])
       .set_coordinator(coordinator)
       .add_handoff(coordinator, [replacement, delivery, billing])  # Coordinator routes to all
       .add_handoff(replacement, [delivery, billing])  # Replacement delegates to delivery/billing
       .add_handoff(delivery, billing)  # Delivery escalates to billing
       .build()
   )

   # Flow: User → Coordinator → Replacement → Delivery → Back to User
   # (Replacement hands off to Delivery without returning to user)

Použití továren účastníků pro izolaci stavu:

Vlastní podmínka ukončení:


   # Terminate when user says goodbye or after 5 exchanges
   workflow = (
       HandoffBuilder(participants=[coordinator, refund, shipping])
       .set_coordinator(coordinator)
       .with_termination_condition(
           lambda conv: (
               sum(1 for msg in conv if msg.role.value == "user") >= 5
               or any("goodbye" in msg.text.lower() for msg in conv[-2:])
           )
       )
       .build()
   )

Vytváření kontrolních bodů:


   from agent_framework import InMemoryCheckpointStorage

   storage = InMemoryCheckpointStorage()
   workflow = (
       HandoffBuilder(participants=[coordinator, refund, shipping])
       .set_coordinator(coordinator)
       .with_checkpointing(storage)
       .build()
   )

Inicializace handoffBuilderu pro vytváření konverzačních pracovních postupů předání

Tvůrce začíná v nekonfigurovaném stavu a vyžaduje volání:

  1. .participants([...]) - Registrace agentů
  2. nebo .participant_factories({...}) – Registrace továren agenta nebo exekutoru
  3. .set_coordinator(...) – Určení, který agent přijímá počáteční uživatelský vstup
  4. .build() – vytvoření konečného pracovního postupu

Volitelné metody konfigurace umožňují přizpůsobit správu kontextu, logiku ukončení a trvalost.

Poznámka:

Účastníci musí mít stabilní názvy a ID, protože pracovní postup mapuje

argumenty nástroje předání těmto identifikátorům. Názvy agentů by se měly shodovat

řetězce vygenerované nástrojem pro předání koordinátora (např. nástroj, který

výstupy {"handoff_to": "billing"} vyžaduje agenta s názvem fakturace).

HandoffUserInputRequest

Žádost o zprávu, která se vygeneruje, když pracovní postup potřebuje nový uživatelský vstup.

Poznámka: Pole konverzace je záměrně vyloučeno z serializace kontrolního bodu, aby se zabránilo duplikaci. Konverzace je zachována ve stavu koordinátora a bude rekonstruována při obnovení. Podívejte se na problém č. 2667.

HostedCodeInterpreterTool

Představuje hostovaný nástroj, který lze zadat službě AI, aby mohl spouštět vygenerovaný kód.

Tento nástroj neimplementuje samotnou interpretaci kódu. Slouží jako značka, která informuje službu, že je povoleno spouštět vygenerovaný kód, pokud je služba schopná to udělat.

Inicializace HostedCodeInterpreterTool.

HostedFileContent

Představuje obsah hostovaného souboru.

Inicializuje instanci HostedFileContent.

HostedFileSearchTool

Představuje nástroj pro vyhledávání souborů, který lze zadat službě AI, aby mohl provádět vyhledávání souborů.

Inicializace FileSearchTool.

HostedMCPSpecificApproval

Představuje konkrétní režim hostovaného nástroje.

Při použití tohoto režimu musí uživatel určit, které nástroje vždy nebo nikdy nevyžadují schválení. Tento slovník je reprezentován jako slovník se dvěma volitelnými klíči:

HostedMCPTool

Představuje nástroj MCP, který je spravovaný a spuštěný službou.

Vytvořte hostovaný nástroj MCP.

HostedVectorStoreContent

Představuje hostovaný obsah úložiště vektorů.

Inicializuje HostedVectorStoreContent instance.

HostedWebSearchTool

Představuje nástroj pro vyhledávání na webu, který lze zadat službě AI, aby mohl provádět vyhledávání na webu.

Inicializace HostedWebSearchTool.

InMemoryCheckpointStorage

Úložiště kontrolních bodů v paměti pro účely testování a vývoje

Inicializuje úložiště paměti.

InProcRunnerContext

Kontext provádění v procesu pro místní spuštění a volitelné kontrolní body

Inicializuje kontext provádění v procesu.

MCPStdioTool

Nástroj MCP pro připojení k serverům MCP založeným na stdio.

Tato třída se připojuje k serverům MCP, které komunikují přes standardní vstup/výstup, obvykle používané pro místní procesy.

Inicializuje nástroj STDIO MCP.

Poznámka:

Argumenty se používají k vytvoření objektu StdioServerParameters,

který se pak použije k vytvoření klienta stdio. Viz mcp.client.stdio.stdio_client

a mcp.client.stdio.stdio_server_parameters další podrobnosti.

MCPStreamableHTTPTool

Nástroj MCP pro připojení k serverům MCP založeným na protokolu HTTP

Tato třída se připojuje k serverům MCP, které komunikují prostřednictvím streamovatelného protokolu HTTP/SSE.

Inicializace streamovatelného nástroje HTTP MCP

Poznámka:

Argumenty se používají k vytvoření streamovatelného klienta HTTP.

Další podrobnosti najdete v tématu mcp.client.streamable_http.streamablehttp_client.

Všechny další argumenty předané konstruktoru se předají do

streamovatelný konstruktor klienta HTTP.

MCPWebsocketTool

Nástroj MCP pro připojení k serverům MCP založeným na WebSocketu

Tato třída se připojuje k serverům MCP, které komunikují přes WebSocket.

Inicializace nástroje WEBSocket MCP.

Poznámka:

Argumenty se používají k vytvoření klienta WebSocket.

Další podrobnosti najdete v mcp.client.websocket.websocket_client.

Všechny další argumenty předané konstruktoru se předají do

Konstruktor klienta WebSocket.

MagenticBuilder

Fluent builder for creating Magentic One multi-agent orchestraation workflows.

Magentické pracovní postupy one používají správce využívající LLM ke koordinaci více agentů prostřednictvím dynamického plánování úkolů, sledování průběhu a adaptivního přeplánování. Manažer vytvoří plány, vybere agenty, sleduje průběh a určí, kdy se má plánovat nebo dokončit.

Tvůrce poskytuje plynulé rozhraní API pro konfiguraci účastníků, manažera, volitelné kontroly plánu, kontrolního bodu a zpětného volání událostí.

Podpora human-in-the-loop: Magentic poskytuje specializované mechanismy HITL prostřednictvím:

  • .with_plan_review() – Kontrola a schválení/revize plánů před provedením

  • .with_human_input_on_stall() – Zasahuje při zastavení pracovního postupu

  • Schválení nástrojů prostřednictvím functionApprovalRequestContent – Schválení jednotlivých volání nástrojů

Tyto generují magentic HumanInterventionRequest události, které poskytují strukturované možnosti rozhodování (APPROVE, REVISE, CONTINUE, REPLAN, GUIDANCE) vhodné pro orchestraci na základě plánování magentic.

Použití:


   from agent_framework import MagenticBuilder, StandardMagenticManager
   from azure.ai.projects.aio import AIProjectClient

   # Create manager with LLM client
   project_client = AIProjectClient.from_connection_string(...)
   chat_client = project_client.inference.get_chat_completions_client()

   # Build Magentic workflow with agents
   workflow = (
       MagenticBuilder()
       .participants(researcher=research_agent, writer=writing_agent, coder=coding_agent)
       .with_standard_manager(chat_client=chat_client, max_round_count=20, max_stall_count=3)
       .with_plan_review(enable=True)
       .with_checkpointing(checkpoint_storage)
       .build()
   )

   # Execute workflow
   async for message in workflow.run("Research and write article about AI agents"):
       print(message.text)

S vlastním manažerem:


   # Create custom manager subclass
   class MyCustomManager(MagenticManagerBase):
       async def plan(self, context: MagenticContext) -> ChatMessage:
           # Custom planning logic
           ...


   manager = MyCustomManager()
   workflow = MagenticBuilder().participants(agent1=agent1, agent2=agent2).with_standard_manager(manager).build()
MagenticContext

Kontext pro magentického manažera.

MagenticManagerBase

Základní třída pro magentic One manager.

ManagerDirectiveModel

Pydantický model pro výstup direktiv strukturovaného manažera.

Vytvořte nový model parsováním a ověřením vstupních dat z argumentů klíčových slov.

Vyvolá [ValidationError][pydantic_core. ValidationError] Pokud vstupní data nelze ověřit pro vytvoření platného modelu.

Self je explicitně poziční pouze pro povolení sebe jako názvu pole.

ManagerSelectionRequest

Žádost odeslaná agentovi manažera pro další výběr mluvčího

Tato třída dataclass zabalí celý stav konverzace a kontext úlohy pro agenta manažera k analýze a výběru mluvčího.

ManagerSelectionResponse

Odpověď od manažera agenta s rozhodováním o výběru mluvčího

Agent manažera musí vytvořit tuto strukturu (nebo kompatibilní diktování nebo JSON) a sdělit tak rozhodnutí orchestrátoru.

Vytvořte nový model parsováním a ověřením vstupních dat z argumentů klíčových slov.

Vyvolá [ValidationError][pydantic_core. ValidationError] Pokud vstupní data nelze ověřit pro vytvoření platného modelu.

Self je explicitně poziční pouze pro povolení sebe jako názvu pole.

Message

Třída představující zprávu v pracovním postupu.

OrchestrationState

Kontejner sjednoceného stavu pro vytváření kontrolních bodů orchestrátoru

Tato třída dat standardizuje serializaci kontrolních bodů napříč všemi třemi vzory skupinového chatu a umožňuje rozšíření specifická pro vzory prostřednictvím metadat.

Běžné atributy zahrnují sdílené aspekty orchestrace (úkoly, konverzace, sledování zaokrouhlení). Stav specifický pro určitý vzor se nachází ve diktování metadat.

RequestInfoEvent

Událost se aktivuje, když exekutor pracovního postupu požaduje externí informace.

Inicializuje událost informací o požadavku.

RequestInfoInterceptor

Interní exekutor, který pozastaví pracovní postup pro lidské vstupy před spuštěním agenta.

Tento exekutor se vloží do grafu pracovního postupu tvůrci při zavolání .with_request_info(). Zachytí agentExecutorRequest zprávy PŘED spuštěním agenta a pozastaví pracovní postup prostřednictvím ctx.request_info() pomocí AgentInputRequest.

Při přijetí odpovědi obslužná rutina odpovědi vloží vstup jako zprávu uživatele do konverzace a předá požadavek agentovi.

Volitelný parametr agent_filter umožňuje omezit, kteří agenti aktivují pozastavení. Pokud ID cílového agenta není v sadě filtrů, požadavek se předá bez pozastavení.

Inicializuje exekutor informací o požadavku.

Role

Popisuje zamýšlený účel zprávy v rámci interakce chatu.

Vlastnosti: SYSTÉM: Role, která dává pokyn nebo nastavuje chování systému AI. UŽIVATEL: Role, která poskytuje vstup uživatele pro interakce chatu. ASISTENT: Role, která poskytuje odpovědi na vstup s pokynem systému a uživatelem. NÁSTROJ: Role, která poskytuje další informace a odkazy v reakci na požadavky nástroje.

Inicializace role s hodnotou

Runner

Třída, která spustí pracovní postup v superkrocích Pregelu.

Inicializuje spouštěč hrany, sdílený stav a kontext.

RunnerContext

Protokol pro kontext spuštění používaný spouštěčem.

Jeden kontext, který podporuje zasílání zpráv, událostí a volitelné vytváření kontrolních bodů. Pokud není nakonfigurované úložiště kontrolních bodů, můžou se vyvolat metody kontrolních bodů.

SequentialBuilder

Tvůrce vysoké úrovně pro sekvenční pracovní postupy agenta/exekutoru se sdíleným kontextem

  • participants([...]) přijímá seznam instancí AgentProtocol (doporučeno) nebo Exekutor.

  • register_participants([...]) přijímá seznam továren pro AgentProtocol (doporučeno)

    objekty pro vytváření exekutorů

  • Exekutory musí definovat obslužnou rutinu, která využívá seznam[ChatMessage] a odesílá seznam[ChatMessage]

  • Pracovní postup provádí účastníky v pořadí a předá seznam [ChatMessage] dolů řetězu.

  • Agenti připojují ke konverzaci zprávy asistenta.

  • Vlastní exekutory můžou transformovat/sumarizovat a vracet seznam[ChatMessage]

  • Konečným výstupem je konverzace vytvořená posledním účastníkem.

Použití:


   from agent_framework import SequentialBuilder

   # With agent instances
   workflow = SequentialBuilder().participants([agent1, agent2, summarizer_exec]).build()

   # With agent factories
   workflow = (
       SequentialBuilder().register_participants([create_agent1, create_agent2, create_summarizer_exec]).build()
   )

   # Enable checkpoint persistence
   workflow = SequentialBuilder().participants([agent1, agent2]).with_checkpointing(storage).build()

   # Enable request info for mid-workflow feedback (pauses before each agent)
   workflow = SequentialBuilder().participants([agent1, agent2]).with_request_info().build()

   # Enable request info only for specific agents
   workflow = (
       SequentialBuilder()
       .participants([agent1, agent2, agent3])
       .with_request_info(agents=[agent2])  # Only pause before agent2
       .build()
   )
SharedState

Třída pro správu sdíleného stavu v pracovním postupu.

SharedState poskytuje přístup k datům stavu pracovního postupu bezpečný pro přístup z více vláken, která je potřeba sdílet mezi exekutory během provádění pracovního postupu.

Rezervované klíče: Následující klíče jsou vyhrazeny pro interní použití architektury a neměly by být upraveny uživatelským kódem:

  • _executor_state: Ukládá stav exekutoru pro vytváření kontrolních bodů (spravuje Runner).

Výstraha

Nepoužívejte klíče začínající podtržítkem (_), protože mohou být vyhrazeny pro

interní operace architektury.

Inicializuje sdílený stav.

SingleEdgeGroup

Obálka pro samotářnou hranu, která udržuje jednotné rozhraní API skupiny.

Vytvořte skupinu 1:1 mezi dvěma exekutory.

StandardMagenticManager

Standardní magentický manažer, který provádí skutečné volání LLM přes ChatAgent.

Manažer vytvoří výzvu, která zrcadlí původní orchestraci Magentic One:

  • Shromažďování faktů
  • Vytvoření plánu
  • Indikátor průběhu ve formátu JSON
  • Aktualizace faktů a aktualizace plánu při resetování
  • Syntéza konečných odpovědí

Inicializace standardního magentického manažera.

SubWorkflowRequestMessage

Zpráva odeslaná z dílčího pracovního postupu exekutoru v nadřazené pracovní postupu za účelem vyžádání informací

Tato zpráva zabalí RequestInfoEvent vygenerovaný exekutorem v dílčím pracovním postupu.

SubWorkflowResponseMessage

Zpráva odeslaná z nadřazeného pracovního postupu do dílčího pracovního postupu prostřednictvím WorkflowExecutoru za účelem poskytnutí požadovaných informací.

Tato zpráva zabalí data odpovědi spolu s původním RequestInfoEvent vygenerovaným exekutorem dílčího pracovního postupu.

SuperStepCompletedEvent

Událost se aktivuje, když skončí superkrok.

Inicializace události superkroku

SuperStepStartedEvent

Událost se aktivuje při spuštění superkroku.

Inicializace události superkroku

SwitchCaseEdgeGroup

Varianta ventilátoru, která napodobuje tradiční tok řízení přepínače nebo případu.

Každý případ zkontroluje datovou část zprávy a rozhodne, jestli má zprávu zpracovat. Přesně jeden případ nebo výchozí větev vrací cíl za běhu a zachovává sémantiku jednoúčelového odeslání.

Nakonfigurujte strukturu směrování přepínače nebo případu pro jeden zdrojový exekutor.

SwitchCaseEdgeGroupCase

Trvalý popis jedné podmíněné větve v případě přepínače

Na rozdíl od objektu runtime Case tato serializovatelná varianta ukládá pouze cílový identifikátor a popisný název predikátu. Pokud základní volatelné není během deserializace k dispozici, nahradíme zástupný symbol proxy serveru, který se nahlas nezdaří a zajistí, aby chybějící závislost byla okamžitě viditelná.

Zaznamenejte metadata směrování pro větev podmíněného případu.

SwitchCaseEdgeGroupDefault

Trvalý popisovač pro záložní větev skupiny pro případ přepínače.

Výchozí větev je zaručena, že existuje a je vyvolána, když se nepodaří spárovat datovou část s každým jiným predikátem případu.

Nasměrujte výchozí větev směrem k danému identifikátoru exekutoru.

TextContent

Představuje textový obsah v chatu.

Inicializuje instanci TextContent.

TextReasoningContent

Představuje text zdůvodnění obsahu v chatu.

Poznámky: Tato třída a TextContent jsou povrchně podobné, ale odlišné.

Inicializuje TextReasoningContent instance.

TextSpanRegion

Představuje oblast textu, která byla opatřena poznámkami.

Inicializuje textSpanRegion.

ToolMode

Definuje, jestli a jak se nástroje používají v žádosti o chat.

Inicializace prvku ToolMode

ToolProtocol

Představuje obecný nástroj.

Tento protokol definuje rozhraní, které musí implementovat všechny nástroje, aby byly kompatibilní s architekturou agenta. Implementuje se různými třídami nástrojů, jako jsou HostedMCPTool, HostedWebSearchTool a AIFunction. AIFunction je obvykle vytvořen ai_function dekorátor.

Vzhledem k tomu, že každý konektor potřebuje analyzovat nástroje odlišně, můžou uživatelé předat dikt, který specifikuje nástroj specifický pro službu, pokud není k dispozici žádná abstrakce.

TypeCompatibilityError

Výjimka vyvolaná při zjištění nekompatibility typu mezi připojenými exekutory

UriContent

Představuje obsah identifikátoru URI.

Důležité

Používá se pro obsah, který je identifikovaný identifikátorem URI, například obrázkem nebo souborem.

Pro (binární) identifikátory URI dat použijte místo toho hodnotu DataContent.

Inicializuje instanci UriContent.

Poznámky: Používá se pro obsah, který je identifikován identifikátorem URI, například obrázkem nebo souborem. Pro (binární) identifikátory URI dat použijte místo toho hodnotu DataContent .

UsageContent

Představuje informace o využití přidružené k žádosti o chat a odpověď.

Inicializuje instanci UsageContent.

UsageDetails

Poskytuje podrobnosti o využití žádosti nebo odpovědi.

Inicializuje instanci UsageDetails.

Workflow

Spouštěcí modul založený na grafech, který orchestruje připojené exekutory.

Přehled

Pracovní postup spouští směrovaný graf exekutorů připojených přes hraniční skupiny pomocí modelu podobného Pregelu, který běží v superkrocích, dokud se graf nestane nečinnou. Pracovní postupy jsou vytvořeny pomocí Třídy WorkflowBuilder - nevytvářejte instanci této třídy přímo.

Model spuštění

Exekutory běží v synchronizovaných superkrocích, kde každý exekutor:

  • Vyvolá se při příjmu zpráv z připojených hraničních skupin.
  • Může odesílat zprávy do podřízených exekutorů prostřednictvím ctx.send_message()
  • Může přinést výstupy na úrovni pracovního postupu prostřednictvím ctx.yield_output()
  • Může generovat vlastní události prostřednictvím ctx.add_event()

Zprávy mezi exekutory se doručují na konci každého superkroku a nejsou viditelné ve streamu událostí. Volajícím se dají sledovat pouze události na úrovni pracovního postupu (výstupy, vlastní události) a stavové události.

Vstupní a výstupní typy

Typy pracovních postupů se zjistí za běhu kontrolou:

  • Vstupní typy: Ze vstupních typů spouštěcího exekutoru
  • Typy výstupu: Sjednocení výstupních typů pracovních postupů všech exekutorů Access je prostřednictvím input_types a output_types vlastností.

Metody provádění

Pracovní postup poskytuje dvě primární rozhraní API pro spouštění, z nichž každá podporuje více scénářů:

  • run(): Provedení do dokončení, vrátí WorkflowRunResult se všemi událostmi.

  • run_stream(): Vrátí asynchronní generátor událostí, které při jejich výskytu generují události.

Obě metody podporují:

  • Počáteční spuštění pracovního postupu: Zadání parametru zprávy
  • Obnovení kontrolního bodu: Zadejte checkpoint_id (a volitelně checkpoint_storage)
  • Pokračování hil: Poskytnutí odpovědí , které budou pokračovat po žádostech RequestInfoExecutor
  • Vytváření kontrolních bodů za běhu: Zadejte checkpoint_storage pro povolení nebo přepsání kontrolních bodů pro toto spuštění.

Správa stavu

Instance pracovního postupu obsahují stavy a stavy, které se zachovají napříč voláními ke spuštění a run_stream. Pokud chcete spustit více nezávislých spuštění, vytvořte samostatné instance pracovního postupu přes WorkflowBuilder.

Externí vstupní požadavky

Exekutory v rámci pracovního postupu můžou vyžadovat externí vstup pomocí ctx.request_info():

  1. Volání exekutoru ctx.request_info() pro vyžádání vstupu
  2. Exekutor implementuje response_handler() pro zpracování odpovědi.
  3. Požadavky se vygenerují jako instance RequestInfoEvent ve streamu událostí.
  4. Pracovní postup zadá IDLE_WITH_PENDING_REQUESTS stav.
  5. Volající zpracovává požadavky a poskytuje odpovědi prostřednictvím metod send_responses nebo send_responses_streaming
  6. Odpovědi se směrují na požadovaná exekutory a obslužné rutiny odpovědí se volají.

Vytváření kontrolních bodů

Kontrolní body je možné nakonfigurovat v době sestavení nebo za běhu:

Čas sestavení (přes WorkflowBuilder): workflow = WorkflowBuilder().with_checkpointing(storage).build()

Runtime (prostřednictvím parametrů run/run_stream): result = await workflow.run(zpráva, checkpoint_storage=runtime_storage)

Pokud je tato možnost povolená, vytvoří se kontrolní body na konci každého superkroku a zachytí se:

  • Stavy exekutoru
  • Zprávy při přenosu
  • Pracovní postupy sdíleného stavu je možné pozastavit a obnovit napříč restartováními procesu pomocí úložiště kontrolních bodů.

Složení

Pracovní postupy lze vnořit pomocí WorkflowExecutoru, který zabalí podřízený pracovní postup jako exekutor. Vstupní a výstupní typy vnořeného pracovního postupu se stanou součástí typů WorkflowExecutor. Při vyvolání spustí WorkflowExecutor vnořený pracovní postup k dokončení a zpracuje jeho výstupy.

Inicializace pracovního postupu se seznamem hran

WorkflowAgent

Podtřída agenta , která zabalí pracovní postup a zpřístupní ho jako agenta.

Inicializuje agenta pracovního postupu.

WorkflowBuilder

Třída tvůrce pro vytváření pracovních postupů.

Tato třída poskytuje rozhraní API fluent pro definování grafů pracovních postupů propojením exekutorů s hranami a konfigurací parametrů provádění. Volání build pro vytvoření neměnné Workflow instance

Inicializace WorkflowBuilder s prázdným seznamem hran a žádný spouštěcí exekutor.

WorkflowCheckpoint

Představuje kompletní kontrolní bod stavu pracovního postupu.

Kontrolní body zaznamenávají stav úplného spuštění pracovního postupu v určitém bodě a umožňují pozastavení a obnovení pracovních postupů.

Poznámka:

Diktování shared_state může obsahovat rezervované klíče spravované architekturou.

Podrobnosti o rezervovaných klíčích najdete v dokumentaci ke třídě SharedState.

WorkflowCheckpointSummary

Souhrn kontrolního bodu pracovního postupu čitelný pro člověka

WorkflowContext

Kontext spouštění, který umožňuje exekutorům pracovat s pracovními postupy a dalšími exekutory.

Přehled

WorkflowContext poskytuje řízené rozhraní pro exekutory k odesílání zpráv, získávání výstupů, správě stavu a interakci s širším ekosystémem pracovních postupů. Vynucuje bezpečnost typů prostřednictvím obecných parametrů a zároveň brání přímému přístupu k interním komponentám modulu runtime.

Parametry typu

Kontext je parametrizován tak, aby vynucuje bezpečnost typů pro různé operace:

WorkflowContext (bez parametrů)

U exekutorů, které provádějí pouze vedlejší účinky bez odesílání zpráv nebo výstupů:


   async def log_handler(message: str, ctx: WorkflowContext) -> None:
       print(f"Received: {message}")  # Only side effects

WorkflowContext[T_Out]

Umožňuje odesílání zpráv typu T_Out jiným exekutorům:


   async def processor(message: str, ctx: WorkflowContext[int]) -> None:
       result = len(message)
       await ctx.send_message(result)  # Send int to downstream executors

WorkflowContext[T_Out, T_W_Out]

Umožňuje odesílání zpráv (T_Out) i získávání výstupů pracovního postupu (T_W_Out):


   async def dual_output(message: str, ctx: WorkflowContext[int, str]) -> None:
       await ctx.send_message(42)  # Send int message
       await ctx.yield_output("complete")  # Yield str workflow output

Typy sjednocení

Pomocí sjednocovacího zápisu je možné zadat více typů:


   async def flexible(message: str, ctx: WorkflowContext[int | str, bool | dict]) -> None:
       await ctx.send_message("text")  # or send 42
       await ctx.yield_output(True)  # or yield {"status": "done"}

Inicializuje kontext exekutoru s daným kontextem pracovního postupu.

WorkflowErrorDetails

Strukturované informaceoch

WorkflowEvent

Základní třída pro události pracovního postupu

Inicializace události pracovního postupu s volitelnými daty

WorkflowExecutor

Exekutor, který zabalí pracovní postup, aby bylo možné hierarchické složení pracovního postupu.

Přehled

WorkflowExecutor se chová jako jeden exekutor v rámci nadřazeného pracovního postupu a umožňuje vnořené architektury pracovních postupů. Zpracovává kompletní životní cyklus provádění dílčího pracovního postupu, včetně zpracování událostí, předávání výstupů a koordinace požadavků a odpovědí mezi nadřazeným a podřízeným pracovním postupem.

Model spuštění

Při vyvolání WorkflowExecutor:

  1. Spustí zabalený pracovní postup se vstupní zprávou.
  2. Spustí dílčí pracovní postup k dokončení nebo dokud nebude potřebovat externí vstup.
  3. Zpracuje úplný datový proud událostí dílčího pracovního postupu po spuštění.
  4. Přepošluje výstupy nadřazeného pracovního postupu jako zprávy.
  5. Zpracovává externí požadavky jejich směrováním do nadřazeného pracovního postupu.
  6. Shromažďuje odpovědi a obnoví provádění dílčího pracovního postupu.

Zpracování streamu událostí

WorkflowExecutor zpracovává události po dokončení dílčího pracovního postupu:

Přesměrování výstupu

Všechny výstupy z dílčího pracovního postupu se automaticky předávají nadřazené:

Pokud je allow_direct_output false (výchozí):


   # An executor in the sub-workflow yields outputs
   await ctx.yield_output("sub-workflow result")

   # WorkflowExecutor forwards to parent via ctx.send_message()
   # Parent receives the output as a regular message

Pokud allow_direct_output má hodnotu True:

Koordinace požadavků a odpovědí

Když dílčí pracovní postupy potřebují externí informace:


   # An executor in the sub-workflow makes request
   request = MyDataRequest(query="user info")

   # WorkflowExecutor captures RequestInfoEvent and wraps it in a SubWorkflowRequestMessage
   # then send it to the receiving executor in parent workflow. The executor in parent workflow
   # can handle the request locally or forward it to an external source.
   # The WorkflowExecutor tracks the pending request, and implements a response handler.
   # When the response is received, it executes the response handler to accumulate responses
   # and resume the sub-workflow when all expected responses are received.
   # The response handler expects a SubWorkflowResponseMessage wrapping the response data.

Správa stavu

WorkflowExecutor udržuje stav provádění napříč cykly požadavků a odpovědí:

  • Sleduje čekající žádosti podle request_id
  • Shromažďuje odpovědi, dokud nebudou přijaty všechny očekávané odpovědi.
  • Obnoví provádění dílčího pracovního postupu s úplnou dávkou odpovědí.
  • Zpracovává souběžné spouštění a více čekajících požadavků.

Integrace systému typů

WorkflowExecutor dědí svůj podpis typu z zabaleného pracovního postupu:

Vstupní typy

Odpovídá vstupním typům spouštěcího exekutoru zabaleného pracovního postupu:


   # If sub-workflow accepts str, WorkflowExecutor accepts str
   workflow_executor = WorkflowExecutor(my_workflow, id="wrapper")
   assert workflow_executor.input_types == my_workflow.input_types

Výstupní typy

Kombinuje výstupy dílčího pracovního postupu s typy koordinace požadavků:


   # Includes all sub-workflow output types
   # Plus SubWorkflowRequestMessage if sub-workflow can make requests
   output_types = workflow.output_types + [SubWorkflowRequestMessage]  # if applicable

Zpracování chyb

WorkflowExecutor šíří selhání dílčího pracovního postupu:

  • Zaznamenává workflowFailedEvent z dílčího pracovního postupu.
  • Převede na WorkflowErrorEvent v nadřazeného kontextu.
  • Poskytuje podrobné informace o chybě včetně ID dílčího pracovního postupu.

Podpora souběžného spouštění

WorkflowExecutor plně podporuje více souběžných spuštění dílčích pracovních postupů:

izolace stavu Per-Execution

Každé vyvolání dílčího pracovního postupu vytvoří izolovaný objekt ExecutionContext:


   # Multiple concurrent invocations are supported
   workflow_executor = WorkflowExecutor(my_workflow, id="concurrent_executor")

   # Each invocation gets its own execution context
   # Execution 1: processes input_1 independently
   # Execution 2: processes input_2 independently
   # No state interference between executions

Koordinace požadavků a odpovědí

Odpovědi jsou správně směrovány na původní spuštění:

  • Každé spuštění sleduje vlastní čekající požadavky a očekávané odpovědi.
  • Mapování požadavků na spuštění zajišťuje, že odpovědi dosáhnou správného dílčího pracovního postupu.
  • Akumulace odpovědí je izolovaná pro každé spuštění.
  • Automatické vyčištění po dokončení provádění

Správa paměti

  • Podporované neomezené souběžné spouštění
  • Každé spuštění má jedinečnou identifikaci založenou na UUID.
  • Vyčištění dokončených kontextů spuštění
  • Správa stavu bezpečného vlákna pro souběžný přístup

Důležité aspekty

Sdílená instance pracovního postupu: Všechna souběžná spuštění používají stejnou základní instanci pracovního postupu. Pro správnou izolaci zajistěte, aby zabalený pracovní postup a jeho exekutory byly bezstavové.


   # Avoid: Stateful executor with instance variables
   class StatefulExecutor(Executor):
       def __init__(self):
           super().__init__(id="stateful")
           self.data = []  # This will be shared across concurrent executions!

Integrace s nadřazenými pracovními postupy

Nadřazené pracovní postupy můžou zachycovat požadavky dílčího pracovního postupu:

Poznámky k implementaci

  • Dílčí pracovní postupy se před zpracováním výsledků spustí do dokončení.
  • Zpracování událostí je atomické – před požadavky se předávají všechny výstupy.
  • Akumulace odpovědí zajišťuje, že dílčí pracovní postupy přijímají úplné dávky odpovědí.
  • Stav spuštění se udržuje pro správné obnovení po externích požadavcích.
  • Souběžné spouštění jsou plně izolované a vzájemně nekomusují

Inicializace WorkflowExecutor.

WorkflowFailedEvent

Integrovaná událost životního cyklu vygenerovaná při ukončení pracovního postupu s chybou.

WorkflowOutputEvent

Událost aktivovaná, když exekutor pracovního postupu vrátí výstup.

Inicializuje výstupní událost pracovního postupu.

WorkflowRunResult

Kontejner pro události vygenerované během provádění pracovního postupu bez streamování

Přehled

Představuje kompletní výsledky spuštění pracovního postupu obsahující všechny události vygenerované od začátku do nečinného stavu. Pracovní postupy vytvářejí výstupy přírůstkově prostřednictvím volání ctx.yield_output().

Struktura událostí

Udržuje oddělení mezi událostmi roviny dat a řídicí roviny:

  • Události roviny dat: Vyvolání exekutoru, dokončení, výstupy a požadavky (v hlavním seznamu)
  • Události roviny řízení: Časová osa stavu přístupná prostřednictvím metody status_timeline()

Klíčové metody

  • get_outputs(): Extrahujte všechny výstupy pracovního postupu z provádění.
  • get_request_info_events(): Načtení externích vstupních požadavků provedených během provádění
  • get_final_state(): Získejte konečný stav pracovního postupu (NEČINNOST, IDLE_WITH_PENDING_REQUESTS atd.)
  • status_timeline(): Přístup k celé historii událostí stavu
WorkflowStartedEvent

Předdefinovaná událost životního cyklu, která se vygeneruje při zahájení spuštění pracovního postupu.

Inicializace události pracovního postupu s volitelnými daty

WorkflowStatusEvent

Integrovaná událost životního cyklu vygenerovaná pro přechody stavu spuštění pracovního postupu

Inicializace události stavu pracovního postupu s novým stavem a volitelnými daty

WorkflowValidationError

Základní výjimka pro chyby ověření pracovního postupu

WorkflowViz

Třída pro vizualizaci pracovních postupů pomocí graphviz a Mermaid.

Inicializace WorkflowViz s pracovním postupem

Výčty

MagenticHumanInterventionDecision

Možnosti rozhodování pro reakce na lidské zásahy.

MagenticHumanInterventionKind

Druh lidského zásahu, o který se žádá.

ValidationTypeEnum

Výčet typů ověřování pracovního postupu

WorkflowEventSource

Určuje, jestli událost pracovního postupu pochází z architektury nebo exekutoru.

Rozhraní FRAMEWORK použijte pro události generované integrovanými cestami orchestrace , i když kód, který je vyvolává, žije v modulech souvisejících se spouštěčem – a EXECUTOR pro události zobrazené implementacemi exekutoru poskytované vývojářem.

WorkflowRunState

Stav spuštění pracovního postupu na úrovni spuštění

Sémantika:

  • SPUŠTĚNO: Spuštění bylo zahájeno a kontext pracovního postupu byl vytvořen. Jedná se o počáteční stav před provedením jakékoli smysluplné práce. V tomto základu kódu vysíláme vyhrazený workflowStartedEvent pro telemetrii a obvykle postupuje stav přímo na IN_PROGRESS. Spotřebitelé se stále můžou spoléhat na SPUŠTĚNO pro stavové počítače, které potřebují explicitní předpracovací fázi.

  • IN_PROGRESS: Pracovní postup se aktivně spouští (např. počáteční zpráva byla doručena do spouštěcího exekutoru nebo je spuštěn superkrok). Tento stav se vygeneruje na začátku spuštění a může následovat další stavy při průběhu spuštění.

  • IN_PROGRESS_PENDING_REQUESTS: Aktivní spuštění, zatímco jedna nebo více operací s informacemi o požadavci je nevyrovnaná. Nová práce může být pořád naplánovaná, i když jsou požadavky v testovací verzi.

  • NEČINNÉ: Pracovní postup je nečinný bez nevyřízených požadavků a další práce. Toto je normální stav terminálu pro pracovní postupy, které dokončily provádění, a potenciálně mají výstupy po cestě.

  • IDLE_WITH_PENDING_REQUESTS: Pracovní postup se pozastaví a čeká na externí vstup (např. vygeneroval požadavek RequestInfoEvent). Jedná se o nekon terminálový stav; pracovní postup může po zadání odpovědí pokračovat.

  • FAILED: Stav terminálu označující, že došlo k chybě. Doprovázeno workflowFailedEvent se strukturovanými podrobnostmi o chybách.

  • CANCELLED: Stav terminálu označující, že spuštění zrušil volající nebo orchestrátor. Aktuálně nevygenerované ve výchozím nastavení cesty spouštěče, ale zahrnuté pro integrátory nebo orchestrátory, které podporují zrušení.

Funkce

agent_middleware

Dekorátor označují funkci jako middleware agenta.

Tento dekorátor explicitně identifikuje funkci jako middleware agenta, který zpracovává AgentRunContext objekty.

agent_middleware(func: Callable[[AgentRunContext, Callable[[AgentRunContext], Awaitable[None]]], Awaitable[None]]) -> Callable[[AgentRunContext, Callable[[AgentRunContext], Awaitable[None]]], Awaitable[None]]

Parametry

Name Description
func
Vyžadováno

Funkce middlewaru, která se má označit jako middleware agenta.

Návraty

Typ Description

Stejná funkce se značkou middlewaru agenta.

Příklady


   from agent_framework import agent_middleware, AgentRunContext, ChatAgent


   @agent_middleware
   async def logging_middleware(context: AgentRunContext, next):
       print(f"Before: {context.agent.name}")
       await next(context)
       print(f"After: {context.result}")


   # Use with an agent
   agent = ChatAgent(chat_client=client, name="assistant", middleware=logging_middleware)

ai_function

Vyzdobit funkci, aby se přeměnila na AIFunction, která se dá předat modelům a spustit automaticky.

Tento dekorátor vytvoří Pydantický model z podpisu funkce, který se použije k ověření argumentů předaných funkci a k vygenerování schématu JSON pro parametry funkce.

Chcete-li přidat popisy k parametrům, použijte Annotated jako druhý argument typ s typing popisem řetězce. K pokročilejší konfiguraci můžete použít také třídu Pydantic Field .

Poznámka:

Pokud je approval_mode nastavená na "always_require", funkce se nespustí.

až do explicitního schválení se to týká pouze toku automatického vyvolání.

Je také důležité si uvědomit, že pokud model vrací více volání funkcí, některé vyžadují schválení.

a ostatní, kteří ne, požádá o schválení pro všechny.

ai_function(func: Callable[[...], ReturnT | Awaitable[ReturnT]] | None = None, *, name: str | None = None, description: str | None = None, approval_mode: Literal['always_require', 'never_require'] | None = None, max_invocations: int | None = None, max_invocation_exceptions: int | None = None, additional_properties: dict[str, Any] | None = None) -> AIFunction[Any, ReturnT] | Callable[[Callable[[...], ReturnT | Awaitable[ReturnT]]], AIFunction[Any, ReturnT]]

Parametry

Name Description
func
Callable[[...], <xref:agent_framework._tools.ReturnT> | Awaitable[<xref:agent_framework._tools.ReturnT>]] | None

Funkce, která se má ozdobit.

Default value: None
name
Vyžadováno
str | None
description
Vyžadováno
str | None
approval_mode
Vyžadováno
Literal['always_require', 'never_require'] | None
max_invocations
Vyžadováno
int | None
max_invocation_exceptions
Vyžadováno
int | None
additional_properties
Vyžadováno

Výhradně parametry klíčových slov

Name Description
name

Název funkce. Pokud není zadaný, použije se atribut funkce __name__ .

Default value: None
description

Popis funkce. Pokud není zadaný, použije se řetězec docstring funkce.

Default value: None
approval_mode

Zda se ke spuštění tohoto nástroje vyžaduje nebo neschválení. Výchozí hodnota je, že schválení není potřeba.

Default value: None
max_invocations

Maximální počet vyvolání této funkce. Pokud žádný, neexistuje žádný limit, měl by být alespoň 1.

Default value: None
max_invocation_exceptions

Maximální počet výjimek povolených během vyvolání. Pokud žádný, neexistuje žádný limit, měl by být alespoň 1.

Default value: None
additional_properties

Další vlastnosti, které chcete nastavit pro funkci.

Default value: None

Návraty

Typ Description
AIFunction[Any, <xref:agent_framework._tools.ReturnT>] | Callable[[Callable[[…], <xref:agent_framework._tools.ReturnT> | Awaitable[<xref:agent_framework._tools.ReturnT>]]], AIFunction[Any, <xref:agent_framework._tools.ReturnT>]]

Příklady


   from agent_framework import ai_function
   from typing import Annotated


   @ai_function
   def ai_function_example(
       arg1: Annotated[str, "The first argument"],
       arg2: Annotated[int, "The second argument"],
   ) -> str:
       # An example function that takes two arguments and returns a string.
       return f"arg1: {arg1}, arg2: {arg2}"


   # the same function but with approval required to run
   @ai_function(approval_mode="always_require")
   def ai_function_example(
       arg1: Annotated[str, "The first argument"],
       arg2: Annotated[int, "The second argument"],
   ) -> str:
       # An example function that takes two arguments and returns a string.
       return f"arg1: {arg1}, arg2: {arg2}"


   # With custom name and description
   @ai_function(name="custom_weather", description="Custom weather function")
   def another_weather_func(location: str) -> str:
       return f"Weather in {location}"


   # Async functions are also supported
   @ai_function
   async def async_get_weather(location: str) -> str:
       '''Get weather asynchronously.'''
       # Simulate async operation
       return f"Weather in {location}"

chat_middleware

Dekorátor, který označí funkci jako middleware chatu.

Tento dekorátor explicitně identifikuje funkci jako middleware chatu, který zpracovává objekty ChatContext.

chat_middleware(func: Callable[[ChatContext, Callable[[ChatContext], Awaitable[None]]], Awaitable[None]]) -> Callable[[ChatContext, Callable[[ChatContext], Awaitable[None]]], Awaitable[None]]

Parametry

Name Description
func
Vyžadováno

Funkce middlewaru, která se označí jako middleware chatu.

Návraty

Typ Description

Stejná funkce se značkou middlewaru chatu.

Příklady


   from agent_framework import chat_middleware, ChatContext, ChatAgent


   @chat_middleware
   async def logging_middleware(context: ChatContext, next):
       print(f"Messages: {len(context.messages)}")
       await next(context)
       print(f"Response: {context.result}")


   # Use with an agent
   agent = ChatAgent(chat_client=client, name="assistant", middleware=logging_middleware)

create_edge_runner

Funkce továrny pro vytvoření vhodného spouštěče okrajů pro skupinu okrajů

create_edge_runner(edge_group: EdgeGroup, executors: dict[str, Executor]) -> EdgeRunner

Parametry

Name Description
edge_group
Vyžadováno
<xref:agent_framework._workflows._edge.EdgeGroup>

Skupina okrajů pro vytvoření spouštěče.

executors
Vyžadováno

Mapování ID exekutoru na instance exekutoru

Návraty

Typ Description
<xref:agent_framework._workflows._edge_runner.EdgeRunner>

Příslušná instance EdgeRunneru.

executor

Dekorátor, který převádí samostatnou funkci na instanci FunctionExecutor.

Dekorátor @executor je určen pouze pro samostatné funkce na úrovni modulu. Pro exekutory založené na třídách použijte základní třídu Executor s metodami @handler instance.

Podporuje synchronní i asynchronní funkce. Synchronní funkce se spouští ve fondu vláken, aby se zabránilo blokování smyčky událostí.

Důležité

Používá se @executor pro samostatné funkce (na úrovni modulu nebo místní funkce).

Nepoužívejte @executor s staticmethodclassmethod

Pro exekutory založené na třídách, podtřídu Executor a použití @handler v metodách instance

Použití:


   # Standalone async function (RECOMMENDED):
   @executor(id="upper_case")
   async def to_upper(text: str, ctx: WorkflowContext[str]):
       await ctx.send_message(text.upper())


   # Standalone sync function (runs in thread pool):
   @executor
   def process_data(data: str):
       return data.upper()


   # For class-based executors, use @handler instead:
   class MyExecutor(Executor):
       def __init__(self):
           super().__init__(id="my_executor")

       @handler
       async def process(self, data: str, ctx: WorkflowContext[str]):
           await ctx.send_message(data.upper())
executor(func: Callable[[...], Any] | None = None, *, id: str | None = None) -> Callable[[Callable[[...], Any]], FunctionExecutor] | FunctionExecutor

Parametry

Name Description
func
Callable[[...], Any] | None

Funkce k dekoraci (při použití bez závorek)

Default value: None
id
Vyžadováno
str | None

Volitelné vlastní ID exekutoru Pokud žádný, použije název funkce.

Výhradně parametry klíčových slov

Name Description
id
Default value: None

Návraty

Typ Description

Instance FunctionExecutor, která může být připojena k pracovnímu postupu.

Výjimky

Typ Description

Pokud se používá s staticmethod nebo classmethod (nepodporovaný vzor)

function_middleware

Dekorátor, který označí funkci jako middleware funkce.

Tento dekorátor explicitně identifikuje funkci jako middleware funkce, která zpracovává FunctionInvocationContext objekty.

function_middleware(func: Callable[[FunctionInvocationContext, Callable[[FunctionInvocationContext], Awaitable[None]]], Awaitable[None]]) -> Callable[[FunctionInvocationContext, Callable[[FunctionInvocationContext], Awaitable[None]]], Awaitable[None]]

Parametry

Name Description
func
Vyžadováno

Funkce middlewaru, která se má označit jako middleware funkce.

Návraty

Typ Description

Stejná funkce se značkou middlewaru funkcí.

Příklady


   from agent_framework import function_middleware, FunctionInvocationContext, ChatAgent


   @function_middleware
   async def logging_middleware(context: FunctionInvocationContext, next):
       print(f"Calling: {context.function.name}")
       await next(context)
       print(f"Result: {context.result}")


   # Use with an agent
   agent = ChatAgent(chat_client=client, name="assistant", middleware=logging_middleware)

get_checkpoint_summary

get_checkpoint_summary(checkpoint: WorkflowCheckpoint) -> WorkflowCheckpointSummary

Parametry

Name Description
checkpoint
Vyžadováno

Návraty

Typ Description

get_logger

Získejte protokolovací nástroje se zadaným názvem, který je ve výchozím nastavení agent_framework.

get_logger(name: str = 'agent_framework') -> Logger

Parametry

Name Description
name
str

Název protokolovacího nástroje. Výchozí hodnota je agent_framework.

Default value: "agent_framework"

Návraty

Typ Description

Nakonfigurovaná instance protokolovacího nástroje.

handler

Dekorátor pro registraci obslužné rutiny pro exekutor.

handler(func: Callable[[ExecutorT, Any, ContextT], Awaitable[Any]]) -> Callable[[ExecutorT, Any, ContextT], Awaitable[Any]]

Parametry

Name Description
func
Vyžadováno
Callable[[<xref:agent_framework._workflows._executor.ExecutorT>, Any, <xref:agent_framework._workflows._executor.ContextT>], Awaitable[Any]]

Funkce, která se má ozdobit. Při použití bez parametrů může být Žádná.

Návraty

Typ Description
Callable[[<xref:agent_framework._workflows._executor.ExecutorT>, Any, <xref:agent_framework._workflows._executor.ContextT>], Awaitable[Any]]

Zdobená funkce s metadaty obslužné rutiny.

Příklady

@handler asynchronní def handle_string(self; message: str, ctx: WorkflowContext[str]) -> None:

...

@handler asynchronní def handle_data(self; message: dict, ctx: WorkflowContext[str | int]) -> None:

...

prepare_function_call_results

Připravte hodnoty výsledků volání funkce.

prepare_function_call_results(content: TextContent | DataContent | TextReasoningContent | UriContent | FunctionCallContent | FunctionResultContent | ErrorContent | UsageContent | HostedFileContent | HostedVectorStoreContent | FunctionApprovalRequestContent | FunctionApprovalResponseContent | Any | list[TextContent | DataContent | TextReasoningContent | UriContent | FunctionCallContent | FunctionResultContent | ErrorContent | UsageContent | HostedFileContent | HostedVectorStoreContent | FunctionApprovalRequestContent | FunctionApprovalResponseContent | Any]) -> str

Parametry

Name Description
content
Vyžadováno

Návraty

Typ Description
str

prepend_agent_framework_to_user_agent

Předpřipraveno "agent-framework" na User-Agent v hlavicích.

Pokud je telemetrie uživatelského agenta zakázaná prostřednictvím AGENT_FRAMEWORK_USER_AGENT_DISABLED proměnné prostředí, hlavička User-Agent nebude obsahovat informace o rozhraní agenta. Bude odeslán zpět tak, jak je, nebo jako prázdný dikt, když se předá žádný.

prepend_agent_framework_to_user_agent(headers: dict[str, Any] | None = None) -> dict[str, Any]

Parametry

Name Description
headers

Slovník existujících hlaviček.

Default value: None

Návraty

Typ Description

Nové diktování se znakem User-Agent nastaveným na agent-framework-python/{version}, pokud záhlaví nejsou žádná. Upravený slovník hlaviček s předpřipraveným uživatelem agent-framework-python/{version}

Příklady


   from agent_framework import prepend_agent_framework_to_user_agent

   # Add agent-framework to new headers
   headers = prepend_agent_framework_to_user_agent()
   print(headers["User-Agent"])  # "agent-framework-python/0.1.0"

   # Prepend to existing headers
   existing = {"User-Agent": "my-app/1.0"}
   headers = prepend_agent_framework_to_user_agent(existing)
   print(headers["User-Agent"])  # "agent-framework-python/0.1.0 my-app/1.0"

response_handler

Dekorátor pro registraci obslužné rutiny pro zpracování odpovědí na požadavek.

response_handler(func: Callable[[ExecutorT, Any, Any, ContextT], Awaitable[None]]) -> Callable[[ExecutorT, Any, Any, ContextT], Awaitable[None]]

Parametry

Name Description
func
Vyžadováno
Callable[[<xref:agent_framework._workflows._request_info_mixin.ExecutorT>, Any, Any, <xref:agent_framework._workflows._request_info_mixin.ContextT>], Awaitable[None]]

Funkce, která se má ozdobit.

Návraty

Typ Description
Callable[[<xref:agent_framework._workflows._request_info_mixin.ExecutorT>, Any, Any, <xref:agent_framework._workflows._request_info_mixin.ContextT>], Awaitable[None]]

Zdobená funkce s metadaty obslužné rutiny.

Příklady


   @handler
   async def run(self, message: int, context: WorkflowContext[str]) -> None:
       # Example of a handler that sends a request
       ...
       # Send a request with a `CustomRequest` payload and expect a `str` response.
       await context.request_info(CustomRequest(...), str)


   @response_handler
   async def handle_response(
       self,
       original_request: CustomRequest,
       response: str,
       context: WorkflowContext[str],
   ) -> None:
       # Example of a response handler for the above request
       ...


   @response_handler
   async def handle_response(
       self,
       original_request: CustomRequest,
       response: dict,
       context: WorkflowContext[int],
   ) -> None:
       # Example of a response handler for a request expecting a dict response
       ...

setup_logging

Nastavte konfiguraci protokolování pro architekturu agenta.

setup_logging() -> None

Návraty

Typ Description

use_agent_middleware

Dekorátor tříd, který přidává podporu middlewaru do třídy agenta.

Tento dekorátor přidá funkce middlewaru do jakékoli třídy agenta. Zabalí metody run()run_stream() a zabalí provádění middlewaru.

Spuštění middlewaru lze kdykoli ukončit nastavením context.terminate vlastnosti na Hodnotu True. Po nastavení se kanál zastaví spuštění dalšího middlewaru, jakmile se ovládací prvek vrátí do kanálu.

Poznámka:

Tento dekorátor je již použit pro předdefinované třídy agentů. Stačí použít jenom

pokud vytváříte vlastní implementace agenta.

use_agent_middleware(agent_class: type[TAgent]) -> type[TAgent]

Parametry

Name Description
agent_class
Vyžadováno
type[<xref:TAgent>]

Třída agenta pro přidání podpory middlewaru.

Návraty

Typ Description
type[~<xref:TAgent>]

Upravená třída agenta s podporou middlewaru.

Příklady


   from agent_framework import use_agent_middleware


   @use_agent_middleware
   class CustomAgent:
       async def run(self, messages, **kwargs):
           # Agent implementation
           pass

       async def run_stream(self, messages, **kwargs):
           # Streaming implementation
           pass

use_chat_middleware

Dekorátor tříd, který přidává podporu middlewaru do třídy klienta chatu.

Tento dekorátor přidává funkce middlewaru do jakékoli třídy chatovacího klienta. Zabalí metody get_response()get_streaming_response() a zabalí provádění middlewaru.

Poznámka:

Tento dekorátor je již použit pro předdefinované třídy klientů chatu. Stačí použít jenom

pokud vytváříte vlastní implementace chatovacího klienta.

use_chat_middleware(chat_client_class: type[TChatClient]) -> type[TChatClient]

Parametry

Name Description
chat_client_class
Vyžadováno
type[<xref:TChatClient>]

Třída klienta chatu pro přidání podpory middlewaru.

Návraty

Typ Description
type[~<xref:TChatClient>]

Upravená třída klienta chatu s podporou middlewaru.

Příklady


   from agent_framework import use_chat_middleware


   @use_chat_middleware
   class CustomChatClient:
       async def get_response(self, messages, **kwargs):
           # Chat client implementation
           pass

       async def get_streaming_response(self, messages, **kwargs):
           # Streaming implementation
           pass

use_function_invocation

Dekorátor třídy, který umožňuje volání nástroje pro chatovacího klienta.

Tento dekorátor zabalí get_response a get_streaming_response metody pro automatické zpracování volání funkcí z modelu, jejich provedení a vrácení výsledků zpět do modelu pro další zpracování.

use_function_invocation(chat_client: type[TChatClient]) -> type[TChatClient]

Parametry

Name Description
chat_client
Vyžadováno
type[<xref:TChatClient>]

Třída chatovacího klienta, která se má ozdobit.

Návraty

Typ Description
type[~<xref:TChatClient>]

Upravená třída klienta chatu s povoleným vyvoláním funkce.

Výjimky

Typ Description

Pokud chatovací klient nemá požadované metody.

Příklady


   from agent_framework import use_function_invocation, BaseChatClient


   @use_function_invocation
   class MyCustomClient(BaseChatClient):
       async def get_response(self, messages, **kwargs):
           # Implementation here
           pass

       async def get_streaming_response(self, messages, **kwargs):
           # Implementation here
           pass


   # The client now automatically handles function calls
   client = MyCustomClient()

validate_workflow_graph

Funkce Convenience k ověření grafu pracovního postupu

validate_workflow_graph(edge_groups: Sequence[EdgeGroup], executors: dict[str, Executor], start_executor: Executor) -> None

Parametry

Name Description
edge_groups
Vyžadováno
Sequence[<xref:agent_framework._workflows._edge.EdgeGroup>]

seznam hraničních skupin v pracovním postupu

executors
Vyžadováno

Mapování ID exekutoru na instance exekutoru

start_executor
Vyžadováno

Spouštěcí exekutor (může to být instance nebo ID)

Návraty

Typ Description

Výjimky

Typ Description

Pokud se nějaké ověření nezdaří