次の方法で共有


agent_framework パッケージ

パッケージ

a2a
ag_ui
anthropic
azure
chatkit
declarative
devui
lab
mem0
microsoft
ollama
openai
redis

モジュール

exceptions
observability

クラス

AIFunction

PYTHON 関数をラップして AI モデルで呼び出し可能にするツール。

このクラスは、Python 関数をラップして、自動パラメーター検証と JSON スキーマ生成を使用して AI モデルで呼び出し可能にします。

AIFunction を初期化します。

AgentExecutor

メッセージを処理するためのエージェントをラップする組み込み Executor。

AgentExecutor は、ワークフロー実行モードに基づいて動作を調整します。

  • run_stream(): エージェントがトークンを生成すると、AgentRunUpdateEvent の増分イベントを生成します
  • run(): 完全な応答を含む単一の AgentRunEvent を出力します

executor は、WorkflowContext.is_streaming() を介してモードを自動的に検出します。

一意識別子を使用して Executor を初期化します。

AgentExecutorRequest

エージェント実行プログラムに対する要求。

AgentExecutorResponse

エージェント実行プログラムからの応答。

AgentInputRequest

エージェントが高度なビルダー ワークフローで実行される前に、人間による入力を要求します。

エージェントの実行前にワークフローが一時停止したときに RequestInfoEvent を介して出力されます。 応答は、エージェントの動作を誘導するためにユーザー メッセージとして会話に挿入されます。

これは、SequentialBuilder、ConcurrentBuilder、GroupChatBuilder、HandoffBuilder で .with_request_info() によって使用される標準的な要求の種類です。

AgentMiddleware

エージェント呼び出しをインターセプトできるエージェント ミドルウェアの抽象基本クラス。

エージェント ミドルウェアを使用すると、実行の前後にエージェント呼び出しをインターセプトして変更できます。 メッセージの検査、コンテキストの変更、結果のオーバーライド、または実行の早期終了を行うことができます。

AgentMiddleware は抽象基底クラスです。 サブクラス化して実装する必要があります

カスタム エージェント ミドルウェアを作成する process() メソッド。

AgentProtocol

呼び出すことができるエージェントのプロトコル。

このプロトコルは、ID のプロパティや実行メソッドなど、すべてのエージェントが実装する必要があるインターフェイスを定義します。

プロトコルでは、構造サブタイピング (アヒル型指定) が使用されます。 クラスは必要ありません

このプロトコルから明示的に継承する場合は互換性と見なされます。

これにより、使用せずに完全にカスタム エージェントを作成できます。

任意の Agent Framework 基底クラス。

AgentRunContext

エージェント ミドルウェア呼び出しのコンテキスト オブジェクト。

このコンテキストはエージェント ミドルウェア パイプラインを介して渡され、エージェント呼び出しに関するすべての情報が含まれます。

AgentRunContext を初期化します。

AgentRunEvent

エージェントの実行が完了したときにトリガーされるイベント。

エージェントの実行イベントを初期化します。

AgentRunResponse

エージェントの実行要求に対する応答を表します。

1 つ以上の応答メッセージと、応答に関するメタデータを提供します。 一般的な応答には 1 つのメッセージが含まれますが、関数呼び出し、RAG 取得、または複雑なロジックを含むシナリオでは複数のメッセージが含まれる場合があります。

AgentRunResponse を初期化します。

AgentRunResponseUpdate

エージェントからの単一のストリーミング応答チャンクを表します。

AgentRunResponseUpdate を初期化します。

AgentRunUpdateEvent

エージェントがメッセージをストリーミングしているときにトリガーされるイベント。

エージェント ストリーミング イベントを初期化します。

AgentThread

エージェント スレッド クラス。これは、ローカルで管理されるスレッドまたはサービスによって管理されるスレッドの両方を表すことができます。

AgentThreadは、エージェント操作の会話状態とメッセージ履歴を保持します。 サービス管理スレッド ( service_thread_id 経由) またはローカル メッセージ ストア ( message_store 経由) を使用できますが、両方を使用することはできません。

AgentThread を初期化します。このメソッドは手動で使用しないでください。常に次を使用します: agent.get_new_thread()

service_thread_idまたはmessage_storeを設定できますが、両方を設定することはできません。

AggregateContextProvider

複数のコンテキスト プロバイダーを含む ContextProvider。

イベントは複数のコンテキスト プロバイダーに委任され、返される前にそれらのイベントからの応答が集計されます。 これにより、複数のコンテキスト プロバイダーを 1 つのプロバイダーに結合できます。

1 つのコンテキストを渡すと、AggregateContextProvider が自動的に作成されます

プロバイダーまたはエージェント コンストラクターへのコンテキスト プロバイダーのシーケンス。

コンテキスト プロバイダーを使用して AggregateContextProvider を初期化します。

BaseAgent

すべての Agent Framework エージェントの基本クラス。

このクラスは、コンテキスト プロバイダー、ミドルウェアのサポート、スレッド管理など、エージェント実装のコア機能を提供します。

BaseAgent は、〘〘を実装していないため、直接インスタンス化することはできません。

run()、run_stream()、および AgentProtocol に必要なその他のメソッド。

ChatAgent などの具象実装を使用するか、サブクラスを作成します。

BaseAgent インスタンスを初期化します。

BaseAnnotation

すべての AI 注釈型の基本クラス。

BaseAnnotation を初期化します。

BaseChatClient

チャット クライアントの基本クラス。

この抽象基本クラスは、ミドルウェアのサポート、メッセージの準備、ツールの正規化など、チャット クライアント実装のコア機能を提供します。

BaseChatClient は抽象基底クラスであるため、直接インスタンス化することはできません。

サブクラスは、_inner_get_response() と _inner_get_streaming_response() を実装する必要があります。

BaseChatClient インスタンスを初期化します。

BaseContent

AI サービスで使用されるコンテンツを表します。

BaseContent を初期化します。

Case

スイッチ ケース述語とそのターゲットを組み合わせたランタイム ラッパー。

ケース は、述語が True に評価されたときにメッセージを処理する必要がある Executor とブール述語を結合します。 ランタイムは、この軽量コンテナーをシリアル化可能な SwitchCaseEdgeGroupCase とは別に保持するため、永続化された状態を汚染することなく、ライブ呼び出し可能な呼び出し可能な状態で実行を実行できます。

ChatAgent

チャット クライアント エージェント。

これは、チャット クライアントを使用して言語モデルと対話する主要なエージェント実装です。 ツール、コンテキスト プロバイダー、ミドルウェア、およびストリーミング応答と非ストリーミング応答の両方がサポートされています。

ChatAgent インスタンスを初期化します。

frequency_penaltyからrequest_kwargsまでのパラメーターのセットは、

チャット クライアントを呼び出します。 両方の実行メソッドに渡すこともできます。

両方を設定すると、実行メソッドに渡されたものが優先されます。

ChatClientProtocol

応答を生成できるチャット クライアントのプロトコル。

このプロトコルは、ストリーミング応答と非ストリーミング応答の両方を生成するためのメソッドを含め、すべてのチャット クライアントが実装する必要があるインターフェイスを定義します。

プロトコルでは、構造サブタイピング (アヒル型指定) が使用されます。 クラスは必要ありません

このプロトコルから明示的に継承する場合は互換性と見なされます。

ChatContext

チャット ミドルウェア呼び出しのコンテキスト オブジェクト。

このコンテキストは、チャット ミドルウェア パイプラインを通じて渡され、チャット要求に関するすべての情報が含まれます。

ChatContext を初期化します。

ChatMessage

チャット メッセージを表します。

ChatMessage を初期化します。

ChatMessageStore

メッセージをリストに格納する ChatMessageStoreProtocol のメモリ内実装。

この実装では、シリアル化と逆シリアル化をサポートするチャット メッセージ用の単純なリスト ベースのストレージが提供されます。 ChatMessageStoreProtocol プロトコルのすべての必要なメソッドを実装します。

ストアはメッセージをメモリ内に保持し、永続化のために状態をシリアル化および逆シリアル化するメソッドを提供します。

スレッドで使用する ChatMessageStore を作成します。

ChatMessageStoreProtocol

特定のスレッドに関連付けられたチャット メッセージを格納および取得するためのメソッドを定義します。

このプロトコルの実装は、必要に応じてメッセージを切り捨てたり要約したりして大量のデータを処理するなど、チャット メッセージのストレージを管理する役割を担います。

ChatMiddleware

チャット クライアント要求をインターセプトできるチャット ミドルウェアの抽象基本クラス。

チャット ミドルウェアを使用すると、実行の前後にチャット クライアント要求をインターセプトして変更できます。 メッセージの変更、システム プロンプトの追加、要求のログ記録、チャット応答のオーバーライドを行うことができます。

ChatMiddleware は抽象基本クラスです。 サブクラス化して実装する必要があります

カスタム チャット ミドルウェアを作成する process() メソッド。

ChatOptions

AI サービスの一般的な要求設定。

ChatOptions を初期化します。

ChatResponse

チャット要求への応答を表します。

指定されたパラメーターを使用して ChatResponse を初期化します。

ChatResponseUpdate

ChatClient からの単一のストリーミング応答チャンクを表します。

指定されたパラメーターを使用して ChatResponseUpdate を初期化します。

CheckpointStorage

チェックポイント ストレージ バックエンドのプロトコル。

CitationAnnotation

引用文献の注釈を表します。

CitationAnnotation を初期化します。

ConcurrentBuilder

同時実行エージェント ワークフローの概要ビルダー。

  • participants([...]) は、AgentProtocol (推奨) または Executor の一覧を受け取ります。

  • register_participants([...]) は AgentProtocol のファクトリの一覧を受け入れます (推奨)

    または Executor ファクトリ

  • build() wires: dispatcher -> fan-out -> participants -> fan-in -> アグリゲーター。

  • with_aggregator(...) は、既定のアグリゲーターを Executor またはコールバックでオーバーライドします。

  • register_aggregator(...) は、Executor のファクトリをカスタム アグリゲーターとして受け入れます。

使用:


   from agent_framework import ConcurrentBuilder

   # Minimal: use default aggregator (returns list[ChatMessage])
   workflow = ConcurrentBuilder().participants([agent1, agent2, agent3]).build()

   # With agent factories
   workflow = ConcurrentBuilder().register_participants([create_agent1, create_agent2, create_agent3]).build()


   # Custom aggregator via callback (sync or async). The callback receives
   # list[AgentExecutorResponse] and its return value becomes the workflow's output.
   def summarize(results: list[AgentExecutorResponse]) -> str:
       return " | ".join(r.agent_run_response.messages[-1].text for r in results)


   workflow = ConcurrentBuilder().participants([agent1, agent2, agent3]).with_aggregator(summarize).build()


   # Custom aggregator via a factory
   class MyAggregator(Executor):
       @handler
       async def aggregate(self, results: list[AgentExecutorResponse], ctx: WorkflowContext[Never, str]) -> None:
           await ctx.yield_output(" | ".join(r.agent_run_response.messages[-1].text for r in results))


   workflow = (
       ConcurrentBuilder()
       .register_participants([create_agent1, create_agent2, create_agent3])
       .register_aggregator(lambda: MyAggregator(id="my_aggregator"))
       .build()
   )


   # Enable checkpoint persistence so runs can resume
   workflow = ConcurrentBuilder().participants([agent1, agent2, agent3]).with_checkpointing(storage).build()

   # Enable request info before aggregation
   workflow = ConcurrentBuilder().participants([agent1, agent2]).with_request_info().build()
Context

ContextProvider によって提供される AI モデルに提供する必要があるコンテキストを含むクラス。

各 ContextProvider には、呼び出しごとに独自のコンテキストを提供する機能があります。 Context クラスには、ContextProvider によって提供される追加のコンテキストが含まれています。 このコンテキストは、AI モデルに渡される前に、他のプロバイダーによって提供されるコンテキストと組み合わされます。 このコンテキストは呼び出しごとであり、チャット履歴の一部として格納されません。

新しい Context オブジェクトを作成します。

ContextProvider

すべてのコンテキスト プロバイダーの基本クラス。

コンテキスト プロバイダーは、AI のコンテキスト管理を強化するために使用できるコンポーネントです。 会話の変更をリッスンし、呼び出しの直前に AI モデルに追加のコンテキストを提供できます。

ContextProvider は抽象基底クラスです。 サブクラス化して実装する必要があります

カスタム コンテキスト プロバイダーを作成する呼び出し元 () メソッド。 理想的には、次の手順を行う必要があります。

また、会話を追跡するために invoked() メソッドと thread_created() メソッドを実装します

状態ですが、これらは省略可能です。

DataContent

関連付けられたメディアの種類 (MIME の種類とも呼ばれます) を持つバイナリ データ コンテンツを表します。

Important

これは、オンライン リソースではなく、データ URI として表されるバイナリ データ用です。

オンライン リソースには UriContent を使用します。

DataContent インスタンスを初期化します。

Important

これは、オンライン リソースではなく、データ URI として表されるバイナリ データ用です。

オンライン リソースには UriContent を使用します。

Default

スイッチ ケース グループ内の既定のブランチのランタイム表現。

既定の分岐は、他のケース述語が一致しない場合にのみ呼び出されます。 実際には、ルーティングによって空のターゲットが生成されないように、存在することが保証されます。

Edge

2 つの Executor の間で、必要に応じて条件付きのダイレクト ハンドオフをモデル化します。

エッジ は、ワークフロー グラフ内の 1 つの Executor から別の Executor にメッセージを移動するために必要な最小限のメタデータをキャプチャします。 必要に応じて、実行時にエッジを取得するかどうかを決定するブール述語を埋め込みます。 エッジをプリミティブまでシリアル化することで、元の Python プロセスに関係なくワークフローのトポロジを再構築できます。

2 つのワークフロー 実行プログラムの間で、完全に指定されたエッジを初期化します。

EdgeDuplicationError

ワークフローで重複するエッジが検出されると、例外が発生します。

ErrorContent

エラーを表します。

備考: 通常、操作の一部として問題が発生したが、操作を続行できた致命的でないエラーに使用されます。

ErrorContent インスタンスを初期化します。

Executor

メッセージを処理し、計算を実行するすべてのワークフロー 実行プログラムの基本クラス。

概要

Executor はワークフローの基本的な構成要素であり、メッセージの受信、操作の実行、出力の生成を行う個々の処理ユニットを表します。 各 Executor は一意に識別され、装飾されたハンドラー メソッドを使用して特定のメッセージの種類を処理できます。

型システム

Executor には、その機能を定義する豊富な型システムがあります。

入力の種類

Executor が処理できるメッセージの種類。ハンドラー メソッドシグネチャから検出されます。


   class MyExecutor(Executor):
       @handler
       async def handle_string(self, message: str, ctx: WorkflowContext) -> None:
           # This executor can handle 'str' input types

input_types プロパティを使用したアクセス。

出力の種類

executor が ctx.send_message()を介して他の Executor に送信できるメッセージの種類:


   class MyExecutor(Executor):
       @handler
       async def handle_data(self, message: str, ctx: WorkflowContext[int | bool]) -> None:
           # This executor can send 'int' or 'bool' messages

output_types プロパティを使用したアクセス。

ワークフロー出力の種類

executor が ctx.yield_output()を介してワークフロー レベルの出力として出力できるデータの種類。


   class MyExecutor(Executor):
       @handler
       async def process(self, message: str, ctx: WorkflowContext[int, str]) -> None:
           # Can send 'int' messages AND yield 'str' workflow outputs

workflow_output_types プロパティを使用したアクセス。

ハンドラーの検出

Executor は、装飾されたメソッドを使用して機能を検出します。

@handler デコレータ

受信メッセージを処理するメソッドをマークします。


   class MyExecutor(Executor):
       @handler
       async def handle_text(self, message: str, ctx: WorkflowContext[str]) -> None:
           await ctx.send_message(message.upper())

サブワークフロー要求のインターセプト

@handlerメソッドを使用して、サブワークフロー要求をインターセプトします。


   class ParentExecutor(Executor):
       @handler
       async def handle_subworkflow_request(
           self,
           request: SubWorkflowRequestMessage,
           ctx: WorkflowContext[SubWorkflowResponseMessage],
       ) -> None:
           if self.is_allowed(request.domain):
               response = request.create_response(data=True)
               await ctx.send_message(response, target_id=request.executor_id)
           else:
               await ctx.request_info(request.source_event, response_type=request.source_event.response_type)

コンテキスト型

ハンドラー メソッドは、型の注釈に基づいて異なる WorkflowContext バリアントを受け取ります。

WorkflowContext (型パラメーターなし)

メッセージを送信したり出力を生成したりせずに副作用のみを実行するハンドラーの場合:


   class LoggingExecutor(Executor):
       @handler
       async def log_message(self, msg: str, ctx: WorkflowContext) -> None:
           print(f"Received: {msg}")  # Only logging, no outputs

WorkflowContext[T_Out]

ctx.send_message()を介してT_Out型のメッセージを送信できるようにします。


   class ProcessorExecutor(Executor):
       @handler
       async def handler(self, msg: str, ctx: WorkflowContext[int]) -> None:
           await ctx.send_message(42)  # Can send int messages

WorkflowContext[T_Out, T_W_Out]

メッセージの送信 (T_Out) とワークフロー出力の生成 (T_W_Out) の両方を有効にします。


   class DualOutputExecutor(Executor):
       @handler
       async def handler(self, msg: str, ctx: WorkflowContext[int, str]) -> None:
           await ctx.send_message(42)  # Send int message
           await ctx.yield_output("done")  # Yield str workflow output

Function Executors

単純関数は、 @executor デコレーターを使用して Executor に変換できます。


   @executor
   async def process_text(text: str, ctx: WorkflowContext[str]) -> None:
       await ctx.send_message(text.upper())


   # Or with custom ID:
   @executor(id="text_processor")
   def sync_process(text: str, ctx: WorkflowContext[str]) -> None:
       ctx.send_message(text.lower())  # Sync functions run in thread pool

サブワークフローコンポジション

Executor には、WorkflowExecutor を使用してサブワークフローを含めることができます。 サブワークフローは、親ワークフローがインターセプトできる要求を行うことができます。 ワークフロー構成パターンと要求/応答処理の詳細については、WorkflowExecutor のドキュメントを参照してください。

状態管理

Executor には、ワークフローの実行とチェックポイント間で保持される状態を含めることができます。 on_checkpoint_saveメソッドとon_checkpoint_restore メソッドをオーバーライドして、カスタム状態のシリアル化と復元ロジックを実装します。

実装に関する注意事項

  • execute() を直接呼び出さないでください。ワークフロー エンジンによって呼び出されます
  • execute() をオーバーライドしない - 代わりにデコレーターを使用してハンドラーを定義する
  • 各 Executor には、少なくとも 1 つの @handler メソッドが必要です
  • 初期化時にハンドラー メソッドシグネチャが検証される

一意識別子を使用して Executor を初期化します。

ExecutorCompletedEvent

Executor ハンドラーが完了したときにトリガーされるイベント。

Executor ID と省略可能なデータを使用して、Executor イベントを初期化します。

ExecutorEvent

Executor イベントの基本クラス。

Executor ID と省略可能なデータを使用して、Executor イベントを初期化します。

ExecutorFailedEvent

Executor ハンドラーでエラーが発生したときにトリガーされるイベント。

ExecutorInvokedEvent

Executor ハンドラーが呼び出されたときにトリガーされるイベント。

Executor ID と省略可能なデータを使用して、Executor イベントを初期化します。

FanInEdgeGroup

単一のダウンストリーム Executor を供給するエッジの収束セットを表します。

通常、ファンイン グループは、複数のアップストリーム ステージが独立して同じダウンストリーム プロセッサに到着するメッセージを生成する場合に使用されます。

複数のソースを 1 つのターゲットにマージするファンイン マッピングを構築します。

FanOutEdgeGroup

オプションの選択ロジックを使用して、ブロードキャスト スタイルのエッジ グループを表します。

ファンアウトにより、1 つのソース Executor によって生成されたメッセージが、1 つ以上のダウンストリーム Executor に転送されます。 実行時に、ペイロードを検査し、メッセージを受信する ID のサブセットを返す selection_func を実行することで、ターゲットをさらに絞り込む場合があります。

1 つのソースから多数のターゲットへのファンアウト マッピングを作成します。

FileCheckpointStorage

永続化のためのファイル ベースのチェックポイント ストレージ。

ファイル ストレージを初期化します。

FinishReason

チャット応答が完了した理由を表します。

値を使用して FinishReason を初期化します。

FunctionApprovalRequestContent

関数呼び出しのユーザー承認要求を表します。

FunctionApprovalRequestContent インスタンスを初期化します。

FunctionApprovalResponseContent

関数呼び出しのユーザー承認の応答を表します。

FunctionApprovalResponseContent インスタンスを初期化します。

FunctionCallContent

関数呼び出し要求を表します。

FunctionCallContent インスタンスを初期化します。

FunctionExecutor

ユーザー定義関数をラップする Executor。

この Executor を使用すると、ユーザーは単純な関数 (同期と非同期の両方) を定義し、完全な Executor クラスを作成しなくてもワークフロー 実行プログラムとして使用できます。

同期関数は、イベント ループをブロックしないように、asyncio.to_thread() を使用してスレッド プールで実行されます。

ユーザー定義関数を使用して FunctionExecutor を初期化します。

FunctionInvocationConfiguration

チャット クライアントでの関数呼び出しの構成。

このクラスは、関数呼び出しをサポートするすべてのチャット クライアントで自動的に作成されます。 つまり、ほとんどの場合、インスタンスの属性を変更するだけで、新しい属性を作成できます。

FunctionInvocationConfiguration を初期化します。

FunctionInvocationContext

関数ミドルウェア呼び出しのコンテキスト オブジェクト。

このコンテキストは関数ミドルウェア パイプラインを介して渡され、関数呼び出しに関するすべての情報が含まれます。

FunctionInvocationContext を初期化します。

FunctionMiddleware

関数呼び出しをインターセプトできる関数ミドルウェアの抽象基底クラス。

関数ミドルウェアを使用すると、実行の前後に関数/ツールの呼び出しをインターセプトして変更できます。 引数の検証、結果のキャッシュ、ログの呼び出し、関数の実行のオーバーライドを行うことができます。

FunctionMiddleware は抽象基底クラスです。 サブクラス化して実装する必要があります

カスタム関数ミドルウェアを作成する process() メソッド。

FunctionResultContent

関数呼び出しの結果を表します。

FunctionResultContent インスタンスを初期化します。

GraphConnectivityError

グラフ接続の問題が検出されたときに発生する例外。

GroupChatBuilder

動的オーケストレーションを使用したマネージャー向けグループ チャット ワークフローの概要ビルダー。

GroupChat は、次に話す参加者を選択するマネージャーを使用して、マルチエージェントの会話を調整します。 マネージャーは、単純な Python 関数 (set_select_speakers_func) でも、 set_managerを介したエージェント ベースのセレクターでもかまいません。 これら 2 つのアプローチは相互に排他的です。

コア ワークフロー:

  1. 参加者の定義: エージェントの一覧 (.name を使用) またはエージェントへの名前のディクテーション マッピング

  2. スピーカーの選択を構成する: set_select_speakers_func OR

    set_manager (両方ではない)

  3. 省略可能: ラウンド制限、チェックポイント処理、終了条件を設定する

  4. ワークフローをビルドして実行する

話者の選択パターン:

パターン 1: 単純な関数ベースの選択 (推奨)


   from agent_framework import GroupChatBuilder, GroupChatStateSnapshot


   def select_next_speaker(state: GroupChatStateSnapshot) -> str | None:
       # state contains: task, participants, conversation, history, round_index
       if state["round_index"] >= 5:
           return None  # Finish
       last_speaker = state["history"][-1].speaker if state["history"] else None
       if last_speaker == "researcher":
           return "writer"
       return "researcher"


   workflow = (
       GroupChatBuilder()
       .set_select_speakers_func(select_next_speaker)
       .participants([researcher_agent, writer_agent])  # Uses agent.name
       .build()
   )

パターン 2: LLM ベースの選択


   from agent_framework import ChatAgent
   from agent_framework.azure import AzureOpenAIChatClient

   manager_agent = AzureOpenAIChatClient().create_agent(
       instructions="Coordinate the conversation and pick the next speaker.",
       name="Coordinator",
       temperature=0.3,
       seed=42,
       max_tokens=500,
   )

   workflow = (
       GroupChatBuilder()
       .set_manager(manager_agent, display_name="Coordinator")
       .participants([researcher, writer])  # Or use dict: researcher=r, writer=w
       .with_max_rounds(10)
       .build()
   )

パターン 3: 会話中のフィードバックの情報を要求する


   from agent_framework import GroupChatBuilder

   # Pause before all participants
   workflow = (
       GroupChatBuilder()
       .set_select_speakers_func(select_next_speaker)
       .participants([researcher, writer])
       .with_request_info()
       .build()
   )

   # Pause only before specific participants
   workflow = (
       GroupChatBuilder()
       .set_select_speakers_func(select_next_speaker)
       .participants([researcher, writer, editor])
       .with_request_info(agents=[editor])  # Only pause before editor responds
       .build()
   )

参加者の仕様:

参加者を指定する 2 つの方法:

  • リスト フォーム: [agent1, agent2] - 参加者名 に agent.name 属性を使用します
  • Dict 形式: {name1: agent1, name2: agent2} - 明示的な名前制御
  • キーワード 形式: participants(name1=agent1, name2=agent2) - 明示的な名前制御

状態スナップショット構造:

set_select_speakers_funcに渡される GroupChatStateSnapshot には、次のものが含まれます。

  • タスク: ChatMessage - 元のユーザー タスク
  • participants: dict[str, str] - 参加者名と説明のマッピング
  • conversation: tuple[ChatMessage, ...] - 完全な会話履歴
  • history: tuple[GroupChatTurn, ...] - 話者属性を持つターンバイターン レコード
  • round_index: int - これまでのマネージャー選択ラウンドの数
  • pending_agent: str |なし - 現在処理中のエージェントの名前 (存在する場合)

重要な制約:

  • set_select_speakers_funcと結合できないset_manager
  • 参加者名は一意である必要があります
  • リスト フォームを使用する場合、エージェントには空でない 名前 属性が必要です

GroupChatBuilder を初期化します。

GroupChatDirective

グループ チャット マネージャーの実装によって出力される命令。

HandoffBuilder

コーディネーターおよびスペシャリストエージェントとの会話ハンドオフワークフローのための Fluent ビルダー。

ハンドオフ パターンを使用すると、コーディネーター エージェントは、スペシャリスト エージェントに要求をルーティングできます。 対話モードでは、各エージェントの応答後にワークフローがユーザー入力を要求するか、エージェントの応答が完了したら自律的に完了するかを制御します。 終了条件は、ワークフローが入力の要求を停止して完了するタイミングを決定します。

ルーティング パターン:

Single-Tier (既定値): コーディネーターだけが専門家に引き渡すことができます。 既定では、専門家が応答した後、より多くの入力のために制御がユーザーに返されます。 これにより、ユーザー -> コーディネーター -> [省略可能なスペシャリスト] -> ユーザー -> コーディネーター -> ... という循環フローが作成されます。 with_interaction_mode("autonomous") を使用して、追加のユーザー入力の要求をスキップし、エージェントが委任せずに応答したときに最終的な会話を生成します。

多層 (詳細): スペシャリストは、 .add_handoff()を使用して他の専門家に引き渡すことができます。 これにより、複雑なワークフローの柔軟性が向上しますが、単一層パターンよりも制御が少なくなります。 ユーザーは、スペシャリストからスペシャリストへのハンドオフ中に中間ステップをリアルタイムで可視化できなくなります (ただし、すべてのハンドオフを含む完全な会話履歴は保持され、後で検査できます)。

主な機能

  • ハンドオフの自動検出: コーディネーターはハンドオフ ツールを呼び出します。

    引数 ( 例: {"handoff_to": "shipping_agent"}) は、制御を受ける専門家を識別します。

  • 自動生成ツール: 既定では、ビルダーはコーディネーター 用の handoff_to_<agent> ツールを合成するため、プレースホルダー関数を手動で定義しません。

  • 完全な会話履歴: 会話全体 ( 任意のChatMessage.additional_propertiesを含む) が保持され、各エージェントに渡されます。

  • 終了制御: 既定では、10 個のユーザー メッセージの後に終了します。 カスタム ロジックの .with_termination_condition(ラムダ conv: ...) でオーバーライドします (例: "goodbye" を検出します)。

  • 対話モード: human_in_loop (既定) を選択すると、エージェントのターン間でユーザーにプロンプトが表示されるか、ハンドオフが発生するか、終了/ターン制限に達するまでユーザー入力を求めずにエージェントへのルーティングを続行するように 自律的 に行います (既定の自律ターン制限: 50)。

  • チェックポイント処理: 再開可能なワークフローのオプションの永続化。

使用法 (Single-Tier):


   from agent_framework import HandoffBuilder
   from agent_framework.openai import OpenAIChatClient

   chat_client = OpenAIChatClient()

   # Create coordinator and specialist agents
   coordinator = chat_client.create_agent(
       instructions=(
           "You are a frontline support agent. Assess the user's issue and decide "
           "whether to hand off to 'refund_agent' or 'shipping_agent'. When delegation is "
           "required, call the matching handoff tool (for example `handoff_to_refund_agent`)."
       ),
       name="coordinator_agent",
   )

   refund = chat_client.create_agent(
       instructions="You handle refund requests. Ask for order details and process refunds.",
       name="refund_agent",
   )

   shipping = chat_client.create_agent(
       instructions="You resolve shipping issues. Track packages and update delivery status.",
       name="shipping_agent",
   )

   # Build the handoff workflow - default single-tier routing
   workflow = (
       HandoffBuilder(
           name="customer_support",
           participants=[coordinator, refund, shipping],
       )
       .set_coordinator(coordinator)
       .build()
   )

   # Run the workflow
   events = await workflow.run_stream("My package hasn't arrived yet")
   async for event in events:
       if isinstance(event, RequestInfoEvent):
           # Request user input
           user_response = input("You: ")
           await workflow.send_response(event.data.request_id, user_response)

.add_handoff() を使用した多層ルーティング:


   # Enable specialist-to-specialist handoffs with fluent API
   workflow = (
       HandoffBuilder(participants=[coordinator, replacement, delivery, billing])
       .set_coordinator(coordinator)
       .add_handoff(coordinator, [replacement, delivery, billing])  # Coordinator routes to all
       .add_handoff(replacement, [delivery, billing])  # Replacement delegates to delivery/billing
       .add_handoff(delivery, billing)  # Delivery escalates to billing
       .build()
   )

   # Flow: User → Coordinator → Replacement → Delivery → Back to User
   # (Replacement hands off to Delivery without returning to user)

状態の分離に参加要素ファクトリを使用する:

カスタム終了条件:


   # Terminate when user says goodbye or after 5 exchanges
   workflow = (
       HandoffBuilder(participants=[coordinator, refund, shipping])
       .set_coordinator(coordinator)
       .with_termination_condition(
           lambda conv: (
               sum(1 for msg in conv if msg.role.value == "user") >= 5
               or any("goodbye" in msg.text.lower() for msg in conv[-2:])
           )
       )
       .build()
   )

チェックポイント:


   from agent_framework import InMemoryCheckpointStorage

   storage = InMemoryCheckpointStorage()
   workflow = (
       HandoffBuilder(participants=[coordinator, refund, shipping])
       .set_coordinator(coordinator)
       .with_checkpointing(storage)
       .build()
   )

会話ハンドオフ ワークフローを作成するために HandoffBuilder を初期化します。

ビルダーは未構成の状態で開始され、次を呼び出す必要があります。

  1. .participants([...]) - エージェントを登録する
  2. または .participant_factories({...}) - エージェント/Executor ファクトリを登録する
  3. .set_coordinator(...) - 初期ユーザー入力を受け取るエージェントを指定する
  4. .build() - 最終的なワークフローを構築する

オプションの構成方法を使用すると、コンテキスト管理、終了ロジック、永続化をカスタマイズできます。

ワークフローでマップされるため、参加者は安定した名前/ID を持っている必要があります。

これらの識別子に対するハンドオフ ツール引数。 エージェント名が一致する必要がある

コーディネーターのハンドオフ ツールによって出力される文字列 (例:

出力 {"handoff_to": "billing"} には、billing という名前のエージェントが必要です)。

HandoffUserInputRequest

ワークフローに新しいユーザー入力が必要な場合に出力される要求メッセージ。

注: 会話フィールドは、重複を防ぐために、チェックポイントのシリアル化から意図的に除外されます。 会話はコーディネーターの状態で保持され、復元時に再構築されます。 問題 #2667 を参照してください。

HostedCodeInterpreterTool

生成されたコードを実行できるように AI サービスに指定できるホスト型ツールを表します。

このツールでは、コード解釈自体は実装されません。 これは、サービスが実行できる場合に生成されたコードの実行が許可されていることをサービスに通知するためのマーカーとして機能します。

HostedCodeInterpreterTool を初期化します。

HostedFileContent

ホストされているファイルの内容を表します。

HostedFileContent インスタンスを初期化します。

HostedFileSearchTool

AI サービスに指定してファイル検索を実行できるようにするファイル検索ツールを表します。

FileSearchTool を初期化します。

HostedMCPSpecificApproval

ホストされているツールの特定のモードを表します。

このモードを使用する場合、ユーザーは常に承認を必要としないツールを指定する必要があります。 これは、次の 2 つの省略可能なキーを持つディクショナリとして表されます。

HostedMCPTool

サービスによって管理および実行される MCP ツールを表します。

ホストされた MCP ツールを作成します。

HostedVectorStoreContent

ホストされたベクター ストアのコンテンツを表します。

HostedVectorStoreContent インスタンスを初期化します。

HostedWebSearchTool

AI サービスに指定して Web 検索を実行できるようにする Web 検索ツールを表します。

HostedWebSearchTool を初期化します。

InMemoryCheckpointStorage

テストと開発のためのメモリ内チェックポイント ストレージ。

メモリ ストレージを初期化します。

InProcRunnerContext

ローカル実行とオプションのチェックポイント処理のためのインプロセス実行コンテキスト。

インプロセス実行コンテキストを初期化します。

MCPStdioTool

stdio ベースの MCP サーバーに接続するための MCP ツール。

このクラスは、通常ローカル プロセスに使用される標準の入力/出力を介して通信する MCP サーバーに接続します。

MCP stdio ツールを初期化します。

引数は、StdioServerParameters オブジェクトを作成するために使用されます。

これは、stdio クライアントの作成に使用されます。 mcp.client.stdio.stdio_clientを参照してください

詳細については、mcp.client.stdio.stdio_server_parametersを参照してください。

MCPStreamableHTTPTool

HTTP ベースの MCP サーバーに接続するための MCP ツール。

このクラスは、ストリーミング可能な HTTP/SSE 経由で通信する MCP サーバーに接続します。

MCP ストリーミング可能 HTTP ツールを初期化します。

引数は、ストリーミング可能な HTTP クライアントを作成するために使用されます。

詳細については、mcp.client.streamable_http.streamablehttp_client を参照してください。

コンストラクターに渡される追加の引数は、〘〗〘

ストリーム可能な HTTP クライアント コンストラクター。

MCPWebsocketTool

WebSocket ベースの MCP サーバーに接続するための MCP ツール。

このクラスは、WebSocket 経由で通信する MCP サーバーに接続します。

MCP WebSocket ツールを初期化します。

引数は、WebSocket クライアントを作成するために使用されます。

詳細については、mcp.client.websocket.websocket_clientを参照してください。

コンストラクターに渡される追加の引数は、〘〗〘

WebSocket クライアント コンストラクター。

MagenticBuilder

Magentic One マルチエージェント オーケストレーション ワークフローを作成するための Fluent ビルダー。

Magentic One ワークフローでは、LLM を利用したマネージャーを使用して、動的なタスク計画、進行状況の追跡、アダプティブ再計画を通じて複数のエージェントを調整します。 マネージャーは計画を作成し、エージェントを選択し、進行状況を監視し、計画を再計画または完了するタイミングを決定します。

ビルダーには、参加者、マネージャー、オプションのプラン レビュー、チェックポイント処理、およびイベント コールバックを構成するための fluent API が用意されています。

Human-in-the-loop サポート: Magentic は、次を介して特殊な HITL メカニズムを提供します。

  • .with_plan_review() - 実行前に計画の確認と承認/修正を行う

  • .with_human_input_on_stall() - ワークフローがストールしたときに介入する

  • FunctionApprovalRequestContent を使用したツールの承認 - 個々のツール呼び出しを承認する

これらは、Magentic の計画ベースのオーケストレーションに適した構造化された決定オプション (APPROVE、REVISE、CONTINUE、REPLAN、GUIDANCE) を提供する MagenticHumanInterventionRequest イベントを生成します。

使用:


   from agent_framework import MagenticBuilder, StandardMagenticManager
   from azure.ai.projects.aio import AIProjectClient

   # Create manager with LLM client
   project_client = AIProjectClient.from_connection_string(...)
   chat_client = project_client.inference.get_chat_completions_client()

   # Build Magentic workflow with agents
   workflow = (
       MagenticBuilder()
       .participants(researcher=research_agent, writer=writing_agent, coder=coding_agent)
       .with_standard_manager(chat_client=chat_client, max_round_count=20, max_stall_count=3)
       .with_plan_review(enable=True)
       .with_checkpointing(checkpoint_storage)
       .build()
   )

   # Execute workflow
   async for message in workflow.run("Research and write article about AI agents"):
       print(message.text)

カスタム マネージャーの場合:


   # Create custom manager subclass
   class MyCustomManager(MagenticManagerBase):
       async def plan(self, context: MagenticContext) -> ChatMessage:
           # Custom planning logic
           ...


   manager = MyCustomManager()
   workflow = MagenticBuilder().participants(agent1=agent1, agent2=agent2).with_standard_manager(manager).build()
MagenticContext

マゼンティック マネージャーのコンテキスト。

MagenticManagerBase

マゼンティック One マネージャーの基本クラス。

ManagerDirectiveModel

構造化マネージャー ディレクティブ出力の Pydantic モデル。

キーワード引数からの入力データを解析して検証することで、新しいモデルを作成します。

[ValidationError][pydantic_core を発生させます。ValidationError] (入力データを検証して有効なモデルを形成できない場合)。

self は、フィールド名として self を許可するために明示的に位置指定専用です。

ManagerSelectionRequest

次に話者を選択するためにマネージャー エージェントに送信された要求。

このデータクラスは、マネージャー エージェントが話者の選択を分析して決定するための完全な会話状態とタスク コンテキストをパッケージ化します。

ManagerSelectionResponse

講演者の選択に関するマネージャー エージェントからの応答。

マネージャー エージェントは、オーケストレーターに決定を伝えるために、この構造 (または互換性のある dict/JSON) を生成する必要があります。

キーワード引数からの入力データを解析して検証することで、新しいモデルを作成します。

[ValidationError][pydantic_core を発生させます。ValidationError] (入力データを検証して有効なモデルを形成できない場合)。

self は、フィールド名として self を許可するために明示的に位置指定専用です。

Message

ワークフロー内のメッセージを表すクラス。

OrchestrationState

オーケストレーターのチェックポイント処理用の統合状態コンテナー。

このデータクラスは、メタデータを介してパターン固有の拡張機能を許可しながら、3 つのグループ チャット パターンすべてにチェックポイントのシリアル化を標準化します。

一般的な属性は、共有オーケストレーションの問題 (タスク、会話、ラウンド追跡) を対象としています。 パターン固有の状態がメタデータ ディクテーションに入ります。

RequestInfoEvent

ワークフロー 実行プログラムが外部情報を要求したときにトリガーされるイベント。

要求情報イベントを初期化します。

RequestInfoInterceptor

エージェントの実行前に人間の入力のワークフローを一時停止する内部実行プログラム。

この Executor は、 .with_request_info() が呼び出されたときにビルダーによってワークフロー グラフに挿入されます。 エージェントが実行される前に AgentExecutorRequest メッセージをインターセプトし、AgentInputRequest を 使用して ctx.request_info() を介してワークフローを一時停止します。

応答が受信されると、応答ハンドラーは入力をユーザー メッセージとして会話に挿入し、要求をエージェントに転送します。

省略可能 な agent_filter パラメーターを使用すると、一時停止をトリガーするエージェントを制限できます。 ターゲット エージェントの ID がフィルター セットにない場合、要求は一時停止せずに転送されます。

要求情報インターセプター Executor を初期化します。

Role

チャット操作内のメッセージの目的について説明します。

プロパティ: SYSTEM: AI システムの動作を指示または設定するロール。 USER: チャット操作にユーザー入力を提供するロール。 ASSISTANT: システム指示のユーザープロンプト入力への応答を提供するロール。 TOOL: ツールの使用要求に応答して追加情報と参照を提供するロール。

値を使用してロールを初期化します。

Runner

Pregel スーパーステップでワークフローを実行するクラス。

エッジ、共有状態、コンテキストを使用してランナーを初期化します。

RunnerContext

ランナーによって使用される実行コンテキストのプロトコル。

メッセージング、イベント、およびオプションのチェックポイント処理をサポートする 1 つのコンテキスト。 チェックポイント ストレージが構成されていない場合、チェックポイント メソッドが発生する可能性があります。

SequentialBuilder

共有コンテキストを持つシーケンシャル エージェント/Executor ワークフローの概要ビルダー。

  • participants([...]) は、AgentProtocol (推奨) または Executor インスタンスの一覧を受け入れます

  • register_participants([...]) は AgentProtocol のファクトリの一覧を受け入れます (推奨)

    または Executor ファクトリ

  • Executor は list[ChatMessage] を使用し、list[ChatMessage] を送信するハンドラーを定義する必要があります

  • ワークフローは参加者を順番に結び付け、リスト[ChatMessage]をチェーンの下に渡します

  • エージェントが会話にアシスタント メッセージを追加する

  • カスタム Executor は、リストを変換/集計して返すことができます[ChatMessage]

  • 最後の出力は、最後の参加者によって生成された会話です

使用:


   from agent_framework import SequentialBuilder

   # With agent instances
   workflow = SequentialBuilder().participants([agent1, agent2, summarizer_exec]).build()

   # With agent factories
   workflow = (
       SequentialBuilder().register_participants([create_agent1, create_agent2, create_summarizer_exec]).build()
   )

   # Enable checkpoint persistence
   workflow = SequentialBuilder().participants([agent1, agent2]).with_checkpointing(storage).build()

   # Enable request info for mid-workflow feedback (pauses before each agent)
   workflow = SequentialBuilder().participants([agent1, agent2]).with_request_info().build()

   # Enable request info only for specific agents
   workflow = (
       SequentialBuilder()
       .participants([agent1, agent2, agent3])
       .with_request_info(agents=[agent2])  # Only pause before agent2
       .build()
   )
SharedState

ワークフロー内の共有状態を管理するクラス。

SharedState は、ワークフローの実行中に Executor 間で共有する必要があるワークフロー状態データにスレッド セーフなアクセスを提供します。

予約キー: 次のキーは内部フレームワーク用に予約されており、ユーザー コードで変更しないでください。

  • _executor_state: チェックポイント処理の Executor 状態を格納します (Runner によって管理されます)。

Warnung

アンダースコア (_) で始まるキーは予約されている可能性があるため、使用しないでください。

内部フレームワーク操作。

共有状態を初期化します。

SingleEdgeGroup

グループ API を均一に保つ、単独エッジの便利なラッパー。

2 つの Executor の間に 1 対 1 のエッジ グループを作成します。

StandardMagenticManager

ChatAgent を介して実際の LLM 呼び出しを実行する標準の Magentic マネージャー。

マネージャーは、元の Magentic One オーケストレーションをミラー化するプロンプトを作成します。

  • 事実の収集
  • プランの作成
  • JSON の進行状況台帳
  • ファクトの更新とリセット時の更新計画
  • 最終的な回答の合成

Standard Magentic Manager を初期化します。

SubWorkflowRequestMessage

サブワークフローから親ワークフローの Executor に送信され、情報を要求するメッセージ。

このメッセージは、サブワークフローの Executor によって出力された RequestInfoEvent をラップします。

SubWorkflowResponseMessage

要求された情報を提供するために、WorkflowExecutor を介して親ワークフローからサブワークフローに送信されるメッセージ。

このメッセージは、サブワークフロー 実行プログラムによって出力された元の RequestInfoEvent と共に応答データをラップします。

SuperStepCompletedEvent

スーパーステップが終了したときにトリガーされるイベント。

スーパーステップ イベントを初期化します。

SuperStepStartedEvent

スーパーステップの開始時にトリガーされるイベント。

スーパーステップ イベントを初期化します。

SwitchCaseEdgeGroup

従来のスイッチ/ケース制御フローを模倣するファンアウトバリアント。

各ケースは、メッセージ ペイロードを検査し、メッセージを処理する必要があるかどうかを決定します。 1 つのケースまたは既定のブランチが実行時にターゲットを返し、単一ディスパッチ セマンティクスを保持します。

1 つのソース Executor のスイッチ/ケース ルーティング構造を構成します。

SwitchCaseEdgeGroupCase

スイッチ ケースでの 1 つの条件付き分岐の永続化可能な説明。

ランタイム Case オブジェクトとは異なり、このシリアル化可能なバリアントは、述語のターゲット識別子とわかりやすい名前のみを格納します。 脱ダイヤル中に基になる呼び出し可能な呼び出しが使用できない場合は、大声で失敗するプロキシ プレースホルダーに置き換え、不足している依存関係がすぐに表示されるようにします。

条件付きケース ブランチのルーティング メタデータを記録します。

SwitchCaseEdgeGroupDefault

スイッチ ケース グループのフォールバック ブランチの永続化可能な記述子。

既定の分岐は存在することが保証され、他のすべてのケース述語がペイロードと一致しない場合に呼び出されます。

既定の分岐を指定された Executor 識別子に向けて指定します。

TextContent

チャット内のテキスト コンテンツを表します。

TextContent インスタンスを初期化します。

TextReasoningContent

チャット内のテキスト推論コンテンツを表します。

備考: このクラスと TextContent は表面的には似ていますが、異なります。

TextReasoningContent インスタンスを初期化します。

TextSpanRegion

注釈が付けられたテキストの領域を表します。

TextSpanRegion を初期化します。

ToolMode

チャット要求でツールを使用するかどうかを定義します。

ToolMode を初期化します。

ToolProtocol

汎用ツールを表します。

このプロトコルは、すべてのツールがエージェント フレームワークと互換性を持つよう実装する必要があるインターフェイスを定義します。 HostedMCPTool、HostedWebSearchTool、AIFunction などのさまざまなツール クラスによって実装されます。 AIFunction は通常、 ai_function デコレーターによって作成されます。

各コネクタはツールを異なる方法で解析する必要があるため、抽象化が使用できない場合、ユーザーは dict を渡してサービス固有のツールを指定できます。

TypeCompatibilityError

接続された Executor 間で型の非互換性が検出されると、例外が発生します。

UriContent

URI コンテンツを表します。

Important

これは、画像やファイルなどの URI によって識別されるコンテンツに使用されます。

(バイナリ) データ URI の場合は、代わりに DataContent を使用します。

UriContent インスタンスを初期化します。

備考: これは、画像やファイルなどの URI で識別されるコンテンツに使用されます。 (バイナリ) データ URI の場合は、代わりに DataContent を使用します。

UsageContent

チャットの要求と応答に関連付けられている使用状況情報を表します。

UsageContent インスタンスを初期化します。

UsageDetails

要求/応答に関する使用状況の詳細を提供します。

UsageDetails インスタンスを初期化します。

Workflow

接続された Executor を調整するグラフ ベースの実行エンジン。

概要

ワークフローは、Pregel に似たモデルを使用してエッジ グループ経由で接続された Executor の有向グラフを実行し、グラフがアイドルになるまでスーパーステップで実行されます。 ワークフローは WorkflowBuilder クラスを使用して作成されます。このクラスを直接インスタンス化しないでください。

実行モデル

Executor は、各 Executor が同期されたスーパーステップで実行されます。

  • 接続されたエッジ グループからメッセージを受信したときに呼び出されます
  • ctx.send_message() を介してダウンストリーム Executor にメッセージを送信できます
  • ctx.yield_output() を介してワークフロー レベルの出力を生成できます
  • ctx.add_event() を介してカスタム イベントを出力できます

Executor 間のメッセージは各スーパーステップの最後に配信され、イベント ストリームには表示されません。 呼び出し元が監視できるのは、ワークフロー レベルのイベント (出力、カスタム イベント) と状態イベントだけです。

入力/出力の種類

ワークフローの種類は、実行時に次の検査によって検出されます。

  • 入力の種類: 開始 Executor の入力型から
  • 出力の種類: すべての Executor のワークフロー出力の種類の和集合 input_typesプロパティとoutput_types プロパティを使用してアクセスします。

実行メソッド

ワークフローには 2 つの主要な実行 API が用意されています。それぞれが複数のシナリオをサポートしています。

  • run(): Execute to completion, returns WorkflowRunResult with all events

  • run_stream(): 発生したイベントを生成する非同期ジェネレーターを返します

どちらの方法でも、次の機能がサポートされます。

  • 初期ワークフローの実行: メッセージ パラメーターを指定する
  • チェックポイントの復元: checkpoint_id (および必要に応じて checkpoint_storage) を提供します
  • HIL 継続: RequestInfoExecutor 要求の後に続行する 応答 を提供する
  • ランタイム のチェックポイント処理: この実行のチェックポイント処理を有効またはオーバーライドするための checkpoint_storage を提供する

状態管理

ワークフロー インスタンスには状態が含まれており、実行とrun_streamの呼び出し間で状態が保持されます。 複数の独立した実行を実行するには、WorkflowBuilder を使用して個別のワークフロー インスタンスを作成します。

外部入力要求

ワークフロー内の Executor は、 ctx.request_info()を使用して外部入力を要求できます。

  1. Executor は ctx.request_info() を呼び出して入力を要求します
  2. Executor は、response_handler () を実装して応答を処理します
  3. 要求は、イベント ストリームで RequestInfoEvent インスタンスとして出力されます
  4. ワークフローがIDLE_WITH_PENDING_REQUESTS状態に入る
  5. 呼び出し元は要求を処理し、 send_responses または send_responses_streaming メソッドを介して応答を提供します
  6. 応答は要求側の Executor にルーティングされ、応答ハンドラーが呼び出されます

チェックポイント機能

チェックポイント処理は、ビルド時または実行時に構成できます。

ビルド時 (WorkflowBuilder 経由): workflow = WorkflowBuilder().with_checkpointing(storage).build()

ランタイム (run/run_stream パラメーターを使用): result = await workflow.run(message, checkpoint_storage=runtime_storage)

有効にすると、各スーパーステップの最後にチェックポイントが作成され、次の情報がキャプチャされます。

  • Executor の状態
  • 転送中のメッセージ
  • 共有状態ワークフローは、チェックポイント ストレージを使用してプロセスの再起動間で一時停止および再開できます。

組成

ワークフローは、子ワークフローを Executor としてラップする WorkflowExecutor を使用して入れ子にすることができます。 入れ子になったワークフローの入力/出力型は、WorkflowExecutor の型の一部になります。 WorkflowExecutor が呼び出されると、入れ子になったワークフローが実行されて完了し、その出力が処理されます。

エッジの一覧を使用してワークフローを初期化します。

WorkflowAgent

ワークフローをラップし、エージェントとして公開する Agent サブクラス。

WorkflowAgent を初期化します。

WorkflowBuilder

ワークフローを構築するためのビルダー クラス。

このクラスは、Executor とエッジを接続し、実行パラメーターを構成することで、ワークフロー グラフを定義するための fluent API を提供します。 buildを呼び出して、変更できないWorkflow インスタンスを作成します。

エッジの空のリストを使用して WorkflowBuilder を初期化し、開始 Executor を使用しません。

WorkflowCheckpoint

ワークフロー状態の完全なチェックポイントを表します。

チェックポイントは、特定の時点でのワークフローの完全な実行状態をキャプチャし、ワークフローを一時停止および再開できるようにします。

shared_state dict には、フレームワークによって管理される予約キーが含まれている場合があります。

予約キーの詳細については、SharedState クラスのドキュメントを参照してください。

WorkflowCheckpointSummary

ワークフロー チェックポイントの人間が判読できる概要。

WorkflowContext

Executor がワークフローやその他の Executor と対話できるようにする実行コンテキスト。

概要

WorkflowContext には、Executor がメッセージを送信し、出力を生成し、状態を管理し、より広範なワークフロー エコシステムと対話するための制御されたインターフェイスが用意されています。 内部ランタイム コンポーネントへの直接アクセスを防ぎながら、ジェネリック パラメーターを使用して型セーフを適用します。

型パラメーター

コンテキストは、さまざまな操作に対して型セーフを適用するためにパラメーター化されます。

WorkflowContext (パラメーターなし)

メッセージを送信したり出力を生成したりせずに副作用のみを実行する Executor の場合:


   async def log_handler(message: str, ctx: WorkflowContext) -> None:
       print(f"Received: {message}")  # Only side effects

WorkflowContext[T_Out]

T_Out型のメッセージを他の Executor に送信できるようにします。


   async def processor(message: str, ctx: WorkflowContext[int]) -> None:
       result = len(message)
       await ctx.send_message(result)  # Send int to downstream executors

WorkflowContext[T_Out, T_W_Out]

メッセージの送信 (T_Out) とワークフロー出力の生成 (T_W_Out) の両方を有効にします。


   async def dual_output(message: str, ctx: WorkflowContext[int, str]) -> None:
       await ctx.send_message(42)  # Send int message
       await ctx.yield_output("complete")  # Yield str workflow output

共用体の型

共用体表記を使用して、複数の型を指定できます。


   async def flexible(message: str, ctx: WorkflowContext[int | str, bool | dict]) -> None:
       await ctx.send_message("text")  # or send 42
       await ctx.yield_output(True)  # or yield {"status": "done"}

指定されたワークフロー コンテキストを使用して Executor コンテキストを初期化します。

WorkflowErrorDetails

エラー イベント/結果に表示される構造化されたエラー情報。

WorkflowEvent

ワークフロー イベントの基本クラス。

オプションのデータを使用してワークフロー イベントを初期化します。

WorkflowExecutor

ワークフローをラップして階層型ワークフローコンポジションを有効にする Executor。

概要

WorkflowExecutor は、ワークフローを親ワークフロー内で単一の Executor として動作させ、入れ子になったワークフロー アーキテクチャを有効にします。 イベント処理、出力転送、親ワークフローと子ワークフロー間の要求/応答調整など、サブワークフロー実行の完全なライフサイクルを処理します。

実行モデル

呼び出されると、WorkflowExecutor:

  1. 入力メッセージでラップされたワークフローを開始します
  2. サブワークフローを実行して完了するか、外部入力が必要になるまで実行します。
  3. 実行後にサブワークフローの完全なイベント ストリームを処理します
  4. メッセージとして親ワークフローに出力を転送します
  5. 外部要求を親ワークフローにルーティングして処理する
  6. 応答を蓄積し、サブワークフローの実行を再開します

イベント ストリーム処理

WorkflowExecutor は、サブワークフローの完了後にイベントを処理します。

出力転送

サブワークフローからのすべての出力は、自動的に親に転送されます。

allow_direct_outputが False の場合 (既定値):


   # An executor in the sub-workflow yields outputs
   await ctx.yield_output("sub-workflow result")

   # WorkflowExecutor forwards to parent via ctx.send_message()
   # Parent receives the output as a regular message

allow_direct_outputが True の場合:

要求/応答の調整

サブワークフローに外部情報が必要な場合:


   # An executor in the sub-workflow makes request
   request = MyDataRequest(query="user info")

   # WorkflowExecutor captures RequestInfoEvent and wraps it in a SubWorkflowRequestMessage
   # then send it to the receiving executor in parent workflow. The executor in parent workflow
   # can handle the request locally or forward it to an external source.
   # The WorkflowExecutor tracks the pending request, and implements a response handler.
   # When the response is received, it executes the response handler to accumulate responses
   # and resume the sub-workflow when all expected responses are received.
   # The response handler expects a SubWorkflowResponseMessage wrapping the response data.

状態管理

WorkflowExecutor は、要求/応答サイクル間で実行状態を維持します。

  • 保留中の要求をrequest_idで追跡します
  • すべての予想される応答が受信されるまで応答を累積します。
  • 完全な応答バッチでサブワークフローの実行を再開します
  • 同時実行と複数の保留中の要求を処理します

型システム統合

WorkflowExecutor は、ラップされたワークフローから型シグネチャを継承します。

入力の種類

ラップされたワークフローの開始 Executor 入力の種類と一致します。


   # If sub-workflow accepts str, WorkflowExecutor accepts str
   workflow_executor = WorkflowExecutor(my_workflow, id="wrapper")
   assert workflow_executor.input_types == my_workflow.input_types

出力の種類

サブワークフロー出力と要求調整の種類を組み合わせます。


   # Includes all sub-workflow output types
   # Plus SubWorkflowRequestMessage if sub-workflow can make requests
   output_types = workflow.output_types + [SubWorkflowRequestMessage]  # if applicable

エラー処理

WorkflowExecutor は、サブワークフローエラーを伝達します。

  • サブワークフローから WorkflowFailedEvent をキャプチャします
  • 親コンテキストで WorkflowErrorEvent に変換します
  • サブワークフロー ID を含む詳細なエラー情報を提供します

同時実行のサポート

WorkflowExecutor は、複数の同時サブワークフロー実行を完全にサポートします。

Per-Execution 状態の分離

サブワークフロー呼び出しごとに、分離された ExecutionContext が作成されます。


   # Multiple concurrent invocations are supported
   workflow_executor = WorkflowExecutor(my_workflow, id="concurrent_executor")

   # Each invocation gets its own execution context
   # Execution 1: processes input_1 independently
   # Execution 2: processes input_2 independently
   # No state interference between executions

要求/応答の調整

応答は、元の実行に正しくルーティングされます。

  • 各実行は、独自の保留中の要求と予想される応答を追跡します
  • 要求から実行へのマッピングにより、応答が正しいサブワークフローに確実に到達する
  • 応答の蓄積は実行ごとに分離されます
  • 実行が完了したときの自動クリーンアップ

メモリ管理

  • 無制限の同時実行がサポートされます
  • 各実行には、一意の UUID ベースの ID があります
  • 完了した実行コンテキストのクリーンアップ
  • 同時実行アクセスのスレッド セーフな状態管理

重要な考慮事項

共有ワークフロー インスタンス: すべての同時実行では、同じ基になるワークフロー インスタンスが使用されます。 適切に分離するために、ラップされたワークフローとその実行プログラムがステートレスであることを確認します。


   # Avoid: Stateful executor with instance variables
   class StatefulExecutor(Executor):
       def __init__(self):
           super().__init__(id="stateful")
           self.data = []  # This will be shared across concurrent executions!

親ワークフローとの統合

親ワークフローは、サブワークフロー要求をインターセプトできます。

実装に関する注意事項

  • サブワークフローは、結果を処理する前に完了するまで実行されます
  • イベント処理がアトミックである - すべての出力は要求の前に転送されます
  • 応答の蓄積により、サブワークフローが完全な応答バッチを受け取ります
  • 外部要求後に適切な再開のために実行状態が維持される
  • 同時実行は完全に分離され、相互に干渉しません

WorkflowExecutor を初期化します。

WorkflowFailedEvent

ワークフロー実行がエラーで終了したときに生成される組み込みのライフサイクル イベント。

WorkflowOutputEvent

ワークフロー 実行プログラムが出力を生成するときにトリガーされるイベント。

ワークフロー出力イベントを初期化します。

WorkflowRunResult

非ストリーミング ワークフローの実行中に生成されるイベントのコンテナー。

概要

ワークフロー実行の完全な実行結果を表します。開始からアイドル状態まで生成されたすべてのイベントが含まれます。 ワークフローは、実行中に ctx.yield_output() 呼び出しを通じて出力を段階的に生成します。

イベント構造

データ プレーン イベントとコントロール プレーン イベントの分離を維持します。

  • データ プレーン イベント: Executor の呼び出し、完了、出力、および要求 (メイン リスト内)
  • コントロール プレーン イベント: status_timeline() メソッドを使用してアクセスできる状態タイムライン

キー メソッド

  • get_outputs(): 実行からすべてのワークフロー出力を抽出します
  • get_request_info_events(): 実行中に行われた外部入力要求を取得します
  • get_final_state(): 最終的なワークフロー状態 (IDLE、IDLE_WITH_PENDING_REQUESTS など) を取得します。
  • status_timeline(): 完全な状態イベント履歴にアクセスする
WorkflowStartedEvent

ワークフロー実行の開始時に生成される組み込みのライフサイクル イベント。

オプションのデータを使用してワークフロー イベントを初期化します。

WorkflowStatusEvent

ワークフローの実行状態遷移に対して生成される組み込みのライフサイクル イベント。

新しい状態とオプションのデータを使用して、ワークフローの状態イベントを初期化します。

WorkflowValidationError

ワークフロー検証エラーの基本例外。

WorkflowViz

graphviz と人魚を使用してワークフローを視覚化するためのクラス。

ワークフローを使用して WorkflowViz を初期化します。

列挙型

MagenticHumanInterventionDecision

人間の介入応答の決定オプション。

MagenticHumanInterventionKind

要求される人間の介入の種類。

ValidationTypeEnum

ワークフロー検証の種類の列挙。

WorkflowEventSource

ワークフロー イベントがフレームワークまたは Executor から発生したかどうかを識別します。

フレームワークは、組み込みのオーケストレーション パスによって生成されるイベント (それらを発生させるコードがランナー関連のモジュールに存在する場合でも) に使用し 、開発者が提供する Executor 実装によって表示されるイベントに EXECUTOR を使用します。

WorkflowRunState

ワークフロー実行の実行レベルの状態。

意味論:

  • STARTED: 実行が開始され、ワークフロー コンテキストが作成されました。 これは、意味のある作業が実行される前の初期状態です。 このコードベースでは、テレメトリ用に専用 の WorkflowStartedEvent を 出力し、通常は状態を 直接IN_PROGRESSに進みます。 コンシューマーは、明示的な作業前フェーズを必要とするステート マシンに対して STARTED を引き続き利用できます。

  • IN_PROGRESS: ワークフローがアクティブに実行されています (たとえば、最初のメッセージが開始 Executor に配信されたか、スーパーステップが実行されています)。 この状態は、実行の開始時に出力され、実行の進行に応じて他の状態が続く可能性があります。

  • IN_PROGRESS_PENDING_REQUESTS: 1 つ以上の情報要求操作が未処理である間のアクティブな実行。 要求が処理中でも、新しい作業がスケジュールされる場合があります。

  • IDLE: ワークフローは、未処理の要求がなく、それ以上の作業を行わない休止状態です。 これは、実行が完了し、途中で出力が生成される可能性があるワークフローの通常の終了状態です。

  • IDLE_WITH_PENDING_REQUESTS: ワークフローは、外部入力 ( RequestInfoEvent の出力など) を待って一時停止されています。 これは非終端状態です。ワークフローは、応答が提供されたときに再開できます。

  • FAILED: エラーが発生したことを示すターミナル状態。 構造化されたエラーの詳細を含む WorkflowFailedEvent を伴います。

  • CANCELLED: 呼び出し元またはオーケストレーターによって実行が取り消されたことを示すターミナル状態。 現在、既定のランナー パスでは出力されませんが、取り消しをサポートするインテグレーター/オーケストレーターには含まれています。

関数

agent_middleware

関数をエージェント ミドルウェアとしてマークするデコレーター。

このデコレーターは、AgentRunContext オブジェクトを処理するエージェント ミドルウェアとして関数を明示的に識別します。

agent_middleware(func: Callable[[AgentRunContext, Callable[[AgentRunContext], Awaitable[None]]], Awaitable[None]]) -> Callable[[AgentRunContext, Callable[[AgentRunContext], Awaitable[None]]], Awaitable[None]]

パラメーター

名前 説明
func
必須

エージェント ミドルウェアとしてマークするミドルウェア関数。

戻り値

説明

エージェント ミドルウェア マーカーと同じ関数。


   from agent_framework import agent_middleware, AgentRunContext, ChatAgent


   @agent_middleware
   async def logging_middleware(context: AgentRunContext, next):
       print(f"Before: {context.agent.name}")
       await next(context)
       print(f"After: {context.result}")


   # Use with an agent
   agent = ChatAgent(chat_client=client, name="assistant", middleware=logging_middleware)

ai_function

関数を装飾して、モデルに渡して自動的に実行できる AIFunction に変換します。

このデコレーターは、関数のシグネチャから Pydantic モデルを作成します。これは、関数に渡される引数を検証し、関数のパラメーターの JSON スキーマを生成するために使用されます。

パラメーターに説明を追加するには、Annotatedtyping型を使用し、2 番目の引数として文字列の説明を指定します。 Pydantic の Field クラスを使用して、より高度な構成を行うこともできます。

approval_modeが "always_require" に設定されている場合、関数は実行されません

明示的な承認が与えられるまで、これは自動呼び出しフローにのみ適用されます。

また、モデルが複数の関数呼び出しを返す場合は、承認が必要な一部の関数が返されることに注意することも重要です。

そうでない他の人は、それらのすべてに対して承認を求めます。

ai_function(func: Callable[[...], ReturnT | Awaitable[ReturnT]] | None = None, *, name: str | None = None, description: str | None = None, approval_mode: Literal['always_require', 'never_require'] | None = None, max_invocations: int | None = None, max_invocation_exceptions: int | None = None, additional_properties: dict[str, Any] | None = None) -> AIFunction[Any, ReturnT] | Callable[[Callable[[...], ReturnT | Awaitable[ReturnT]]], AIFunction[Any, ReturnT]]

パラメーター

名前 説明
func
Callable[[...], <xref:agent_framework._tools.ReturnT> | Awaitable[<xref:agent_framework._tools.ReturnT>]] | None

飾る機能。

規定値: None
name
必須
str | None
description
必須
str | None
approval_mode
必須
Literal['always_require', 'never_require'] | None
max_invocations
必須
int | None
max_invocation_exceptions
必須
int | None
additional_properties
必須

キーワードのみのパラメーター

名前 説明
name

関数の名前です。 指定しない場合は、関数の __name__ 属性が使用されます。

規定値: None
description

関数の説明。 指定しない場合は、関数の docstring が使用されます。

規定値: None
approval_mode

このツールを実行するために承認が必要かどうか。 既定では、承認は必要ありません。

規定値: None
max_invocations

この関数を呼び出すことができる最大回数。 None の場合、制限はありません。少なくとも 1 である必要があります。

規定値: None
max_invocation_exceptions

呼び出し中に許可される例外の最大数。 None の場合、制限はありません。少なくとも 1 である必要があります。

規定値: None
additional_properties

関数に設定する追加のプロパティ。

規定値: None

戻り値

説明
AIFunction[Any, <xref:agent_framework._tools.ReturnT>] | Callable[[Callable[[…], <xref:agent_framework._tools.ReturnT> | Awaitable[<xref:agent_framework._tools.ReturnT>]]], AIFunction[Any, <xref:agent_framework._tools.ReturnT>]]


   from agent_framework import ai_function
   from typing import Annotated


   @ai_function
   def ai_function_example(
       arg1: Annotated[str, "The first argument"],
       arg2: Annotated[int, "The second argument"],
   ) -> str:
       # An example function that takes two arguments and returns a string.
       return f"arg1: {arg1}, arg2: {arg2}"


   # the same function but with approval required to run
   @ai_function(approval_mode="always_require")
   def ai_function_example(
       arg1: Annotated[str, "The first argument"],
       arg2: Annotated[int, "The second argument"],
   ) -> str:
       # An example function that takes two arguments and returns a string.
       return f"arg1: {arg1}, arg2: {arg2}"


   # With custom name and description
   @ai_function(name="custom_weather", description="Custom weather function")
   def another_weather_func(location: str) -> str:
       return f"Weather in {location}"


   # Async functions are also supported
   @ai_function
   async def async_get_weather(location: str) -> str:
       '''Get weather asynchronously.'''
       # Simulate async operation
       return f"Weather in {location}"

chat_middleware

関数をチャット ミドルウェアとしてマークするデコレーター。

このデコレーターは、ChatContext オブジェクトを処理するチャット ミドルウェアとして関数を明示的に識別します。

chat_middleware(func: Callable[[ChatContext, Callable[[ChatContext], Awaitable[None]]], Awaitable[None]]) -> Callable[[ChatContext, Callable[[ChatContext], Awaitable[None]]], Awaitable[None]]

パラメーター

名前 説明
func
必須

チャット ミドルウェアとしてマークするミドルウェア関数。

戻り値

説明

チャット ミドルウェア マーカーと同じ関数。


   from agent_framework import chat_middleware, ChatContext, ChatAgent


   @chat_middleware
   async def logging_middleware(context: ChatContext, next):
       print(f"Messages: {len(context.messages)}")
       await next(context)
       print(f"Response: {context.result}")


   # Use with an agent
   agent = ChatAgent(chat_client=client, name="assistant", middleware=logging_middleware)

create_edge_runner

エッジ グループに適したエッジ ランナーを作成するファクトリ関数。

create_edge_runner(edge_group: EdgeGroup, executors: dict[str, Executor]) -> EdgeRunner

パラメーター

名前 説明
edge_group
必須
<xref:agent_framework._workflows._edge.EdgeGroup>

ランナーを作成するエッジ グループ。

executors
必須

Executor ID を Executor インスタンスにマップします。

戻り値

説明
<xref:agent_framework._workflows._edge_runner.EdgeRunner>

適切な EdgeRunner インスタンス。

executor

スタンドアロン関数を FunctionExecutor インスタンスに変換するデコレーター。

@executorデコレーターは、スタンドアロンのモジュール レベルの関数のみを対象に設計されています。 クラス ベースの Executor の場合は、インスタンス メソッドの @handler で Executor 基本クラスを使用します。

同期関数と非同期関数の両方をサポートします。 同期関数は、イベント ループをブロックしないようにスレッド プールで実行されます。

Important

スタンドアロン関数 (モジュール レベルまたはローカル関数) に @executor を使用する

@executorでstaticmethodを使用しないでください。classmethod

クラス ベースの Executor の場合は、Executor をサブクラス化し、インスタンス メソッドで @handler を使用します

使用:


   # Standalone async function (RECOMMENDED):
   @executor(id="upper_case")
   async def to_upper(text: str, ctx: WorkflowContext[str]):
       await ctx.send_message(text.upper())


   # Standalone sync function (runs in thread pool):
   @executor
   def process_data(data: str):
       return data.upper()


   # For class-based executors, use @handler instead:
   class MyExecutor(Executor):
       def __init__(self):
           super().__init__(id="my_executor")

       @handler
       async def process(self, data: str, ctx: WorkflowContext[str]):
           await ctx.send_message(data.upper())
executor(func: Callable[[...], Any] | None = None, *, id: str | None = None) -> Callable[[Callable[[...], Any]], FunctionExecutor] | FunctionExecutor

パラメーター

名前 説明
func
Callable[[...], Any] | None

装飾する関数 (かっこなしで使用する場合)

規定値: None
id
必須
str | None

Executor のオプションのカスタム ID。 None の場合は、関数名を使用します。

キーワードのみのパラメーター

名前 説明
id
規定値: None

戻り値

説明

ワークフローにワイヤードできる FunctionExecutor インスタンス。

例外

説明

staticmethodまたはclassmethodと共に使用する場合 (サポートされていないパターン)

function_middleware

関数を関数ミドルウェアとしてマークするデコレーター。

このデコレーターは、FunctionInvocationContext オブジェクトを処理する関数ミドルウェアとして関数を明示的に識別します。

function_middleware(func: Callable[[FunctionInvocationContext, Callable[[FunctionInvocationContext], Awaitable[None]]], Awaitable[None]]) -> Callable[[FunctionInvocationContext, Callable[[FunctionInvocationContext], Awaitable[None]]], Awaitable[None]]

パラメーター

名前 説明
func
必須

関数ミドルウェアとしてマークするミドルウェア関数。

戻り値

説明

関数ミドルウェア マーカーと同じ関数。


   from agent_framework import function_middleware, FunctionInvocationContext, ChatAgent


   @function_middleware
   async def logging_middleware(context: FunctionInvocationContext, next):
       print(f"Calling: {context.function.name}")
       await next(context)
       print(f"Result: {context.result}")


   # Use with an agent
   agent = ChatAgent(chat_client=client, name="assistant", middleware=logging_middleware)

get_checkpoint_summary

get_checkpoint_summary(checkpoint: WorkflowCheckpoint) -> WorkflowCheckpointSummary

パラメーター

名前 説明
checkpoint
必須

戻り値

説明

get_logger

指定した名前のロガーを取得します。既定値は 'agent_framework' です。

get_logger(name: str = 'agent_framework') -> Logger

パラメーター

名前 説明
name
str

ロガーの名前。 既定値は 'agent_framework' です。

規定値: "agent_framework"

戻り値

説明

構成されたロガー インスタンス。

handler

Executor のハンドラーを登録するデコレーター。

handler(func: Callable[[ExecutorT, Any, ContextT], Awaitable[Any]]) -> Callable[[ExecutorT, Any, ContextT], Awaitable[Any]]

パラメーター

名前 説明
func
必須
Callable[[<xref:agent_framework._workflows._executor.ExecutorT>, Any, <xref:agent_framework._workflows._executor.ContextT>], Awaitable[Any]]

飾る機能。 パラメーターなしで使用する場合は None を指定できます。

戻り値

説明
Callable[[<xref:agent_framework._workflows._executor.ExecutorT>, Any, <xref:agent_framework._workflows._executor.ContextT>], Awaitable[Any]]

ハンドラー メタデータを含む修飾関数。

@handler async def handle_string(self, message: str, ctx: WorkflowContext[str]) -> None:

...

@handler async def handle_data(self, message: dict, ctx: WorkflowContext[str | int]) -> None:

...

prepare_function_call_results

関数呼び出し結果の値を準備します。

prepare_function_call_results(content: TextContent | DataContent | TextReasoningContent | UriContent | FunctionCallContent | FunctionResultContent | ErrorContent | UsageContent | HostedFileContent | HostedVectorStoreContent | FunctionApprovalRequestContent | FunctionApprovalResponseContent | Any | list[TextContent | DataContent | TextReasoningContent | UriContent | FunctionCallContent | FunctionResultContent | ErrorContent | UsageContent | HostedFileContent | HostedVectorStoreContent | FunctionApprovalRequestContent | FunctionApprovalResponseContent | Any]) -> str

パラメーター

名前 説明
content
必須

戻り値

説明
str

prepend_agent_framework_to_user_agent

ヘッダー内の User-Agent の前に "agent-framework" を追加します。

AGENT_FRAMEWORK_USER_AGENT_DISABLED環境変数を使用してユーザー エージェント テレメトリが無効になっている場合、User-Agent ヘッダーにはエージェント フレームワーク情報は含まれません。 None が渡されると、そのまままたは空のディクテーションとして返されます。

prepend_agent_framework_to_user_agent(headers: dict[str, Any] | None = None) -> dict[str, Any]

パラメーター

名前 説明
headers

既存のヘッダー ディクショナリ。

規定値: None

戻り値

説明

ヘッダーが None の場合、"User-Agent" が "agent-framework-python/{version}" に設定された新しい dict。 ユーザー エージェントの前に "agent-framework-python/{version}" が付加された変更されたヘッダー ディクショナリ。


   from agent_framework import prepend_agent_framework_to_user_agent

   # Add agent-framework to new headers
   headers = prepend_agent_framework_to_user_agent()
   print(headers["User-Agent"])  # "agent-framework-python/0.1.0"

   # Prepend to existing headers
   existing = {"User-Agent": "my-app/1.0"}
   headers = prepend_agent_framework_to_user_agent(existing)
   print(headers["User-Agent"])  # "agent-framework-python/0.1.0 my-app/1.0"

response_handler

要求の応答を処理するハンドラーを登録するデコレーター。

response_handler(func: Callable[[ExecutorT, Any, Any, ContextT], Awaitable[None]]) -> Callable[[ExecutorT, Any, Any, ContextT], Awaitable[None]]

パラメーター

名前 説明
func
必須
Callable[[<xref:agent_framework._workflows._request_info_mixin.ExecutorT>, Any, Any, <xref:agent_framework._workflows._request_info_mixin.ContextT>], Awaitable[None]]

飾る機能。

戻り値

説明
Callable[[<xref:agent_framework._workflows._request_info_mixin.ExecutorT>, Any, Any, <xref:agent_framework._workflows._request_info_mixin.ContextT>], Awaitable[None]]

ハンドラー メタデータを含む修飾関数。


   @handler
   async def run(self, message: int, context: WorkflowContext[str]) -> None:
       # Example of a handler that sends a request
       ...
       # Send a request with a `CustomRequest` payload and expect a `str` response.
       await context.request_info(CustomRequest(...), str)


   @response_handler
   async def handle_response(
       self,
       original_request: CustomRequest,
       response: str,
       context: WorkflowContext[str],
   ) -> None:
       # Example of a response handler for the above request
       ...


   @response_handler
   async def handle_response(
       self,
       original_request: CustomRequest,
       response: dict,
       context: WorkflowContext[int],
   ) -> None:
       # Example of a response handler for a request expecting a dict response
       ...

setup_logging

エージェント フレームワークのログ構成を設定します。

setup_logging() -> None

戻り値

説明

use_agent_middleware

エージェント クラスにミドルウェアのサポートを追加するクラス デコレーター。

このデコレーターは、任意のエージェント クラスにミドルウェア機能を追加します。 ミドルウェアの実行を提供するために、 run() メソッドと run_stream() メソッドをラップします。

ミドルウェアの実行は、 context.terminate プロパティを True に設定することで、任意の時点で終了できます。 設定すると、制御がパイプラインに戻るとすぐに、パイプラインはミドルウェアの実行を停止します。

このデコレーターは、組み込みのエージェント クラスに既に適用されています。 使用する必要があるのは

カスタム エージェントの実装を作成する場合は〘。

use_agent_middleware(agent_class: type[TAgent]) -> type[TAgent]

パラメーター

名前 説明
agent_class
必須
type[<xref:TAgent>]

ミドルウェアのサポートを追加するエージェント クラス。

戻り値

説明
type[~<xref:TAgent>]

ミドルウェアをサポートする変更されたエージェント クラス。


   from agent_framework import use_agent_middleware


   @use_agent_middleware
   class CustomAgent:
       async def run(self, messages, **kwargs):
           # Agent implementation
           pass

       async def run_stream(self, messages, **kwargs):
           # Streaming implementation
           pass

use_chat_middleware

チャット クライアント クラスにミドルウェアのサポートを追加するクラス デコレーター。

このデコレーターは、任意のチャット クライアント クラスにミドルウェア機能を追加します。 ミドルウェアの実行を提供するために、 get_response() メソッドと get_streaming_response() メソッドをラップします。

このデコレーターは、組み込みのチャット クライアント クラスに既に適用されています。 使用する必要があるのは

カスタム チャット クライアントの実装を作成する場合は〘。

use_chat_middleware(chat_client_class: type[TChatClient]) -> type[TChatClient]

パラメーター

名前 説明
chat_client_class
必須
type[<xref:TChatClient>]

ミドルウェアのサポートを追加するチャット クライアント クラス。

戻り値

説明
type[~<xref:TChatClient>]

ミドルウェアをサポートする変更されたチャット クライアント クラス。


   from agent_framework import use_chat_middleware


   @use_chat_middleware
   class CustomChatClient:
       async def get_response(self, messages, **kwargs):
           # Chat client implementation
           pass

       async def get_streaming_response(self, messages, **kwargs):
           # Streaming implementation
           pass

use_function_invocation

チャット クライアントのツール呼び出しを可能にするクラス デコレーター。

このデコレーターは、 get_response メソッドと get_streaming_response メソッドをラップして、モデルからの関数呼び出しを自動的に処理し、それらを実行し、さらに処理するために結果をモデルに返します。

use_function_invocation(chat_client: type[TChatClient]) -> type[TChatClient]

パラメーター

名前 説明
chat_client
必須
type[<xref:TChatClient>]

装飾するチャット クライアント クラス。

戻り値

説明
type[~<xref:TChatClient>]

関数呼び出しが有効になっている装飾済みチャット クライアント クラス。

例外

説明

チャット クライアントに必要なメソッドがない場合。


   from agent_framework import use_function_invocation, BaseChatClient


   @use_function_invocation
   class MyCustomClient(BaseChatClient):
       async def get_response(self, messages, **kwargs):
           # Implementation here
           pass

       async def get_streaming_response(self, messages, **kwargs):
           # Implementation here
           pass


   # The client now automatically handles function calls
   client = MyCustomClient()

validate_workflow_graph

ワークフロー グラフを検証するための便利な関数。

validate_workflow_graph(edge_groups: Sequence[EdgeGroup], executors: dict[str, Executor], start_executor: Executor) -> None

パラメーター

名前 説明
edge_groups
必須
Sequence[<xref:agent_framework._workflows._edge.EdgeGroup>]

ワークフロー内のエッジ グループの一覧

executors
必須

Executor ID を Executor インスタンスにマップする

start_executor
必須

開始 Executor (インスタンスまたは ID を指定できます)

戻り値

説明

例外

説明

検証に失敗した場合