AutoGen から Microsoft Agent Framework Python SDK に移行するための包括的なガイド。
目次
- 背景
- 主な類似点と相違点
- モデル クライアントの作成と構成
- Single-Agent 機能マッピング
- マルチエージェント機能マッピング
- 可観測性
- 結論
バックグラウンド
AutoGen は、大規模な言語モデル (LLM) を使用して AI エージェントとマルチエージェント システムを構築するためのフレームワークです。 Microsoft Research の研究プロジェクトとして始まり、GroupChat やイベント ドリブン エージェント ランタイムなど、マルチエージェント オーケストレーションのいくつかの概念を開拓しました。 このプロジェクトはオープンソースコミュニティの実りあるコラボレーションであり、多くの重要な機能は外部の共同作成者から来ました。
Microsoft Agent Framework は、LLM を使用して AI エージェントとワークフローを構築するための新しい多言語 SDK です。 AutoGen で開拓されたアイデアの大幅な進化を表し、実際の使用状況から学んだ教訓が組み込まれています。 これは、Microsoft のコア AutoGen チームとセマンティック カーネル チームによって開発され、今後 AI アプリケーションを構築するための新しい基盤として設計されています。
このガイドでは、実際の移行パスについて説明します。最初に、同じままの内容と変更点をひとめで説明します。 次に、モデル クライアントのセットアップ、単一エージェント機能、最後に、具体的なコードを並べて使用したマルチエージェント オーケストレーションについて説明します。 その過程で、Agent Framework リポジトリの実行可能なサンプルへのリンクは、各手順を検証するのに役立ちます。
主な類似点と相違点
同じままの内容
基礎は使い慣れている。 モデル クライアントを中心にエージェントを作成し、指示を提供し、ツールをアタッチします。 どちらのライブラリも、関数スタイルのツール、トークン ストリーミング、マルチモーダル コンテンツ、非同期 I/O をサポートしています。
# Both frameworks follow similar patterns
# AutoGen
agent = AssistantAgent(name="assistant", model_client=client, tools=[my_tool])
result = await agent.run(task="Help me with this task")
# Agent Framework
agent = ChatAgent(name="assistant", chat_client=client, tools=[my_tool])
result = await agent.run("Help me with this task")
主な違い
オーケストレーション スタイル: AutoGen では、イベント ドリブン コアと高レベルの
Teamがペアになります。 Agent Framework は、入力の準備ができたらエッジに沿ってデータをルーティングし、Executor をアクティブにする、型指定されたグラフベースのWorkflowを中心としています。ツール: AutoGen は関数を
FunctionToolでラップします。 Agent Framework では、@ai_functionを使用し、スキーマを自動的に推論し、コード インタープリターや Web 検索などのホストされたツールを追加します。エージェントの動作:
AssistantAgentを増やさない限り、max_tool_iterationsは 1 ターンです。ChatAgentは既定では複数ターンであり、最終的な回答が返されるまでツールを呼び出し続けます。ランタイム: AutoGen には、埋め込みおよび試験的な分散ランタイムが用意されています。 Agent Framework は現在、単一プロセスの構成に重点を置いています。分散実行が計画されています。
モデル クライアントの作成と構成
どちらのフレームワークも、主要な AI プロバイダー向けのモデル クライアントを提供しますが、API は同じですが、同一ではありません。
| 特徴 | AutoGen | エージェント フレームワーク |
|---|---|---|
| OpenAI クライアント | OpenAIChatCompletionClient |
OpenAIChatClient |
| OpenAI Responses クライアント | ❌ 利用不可 | OpenAIResponsesClient |
| Azure OpenAI | AzureOpenAIChatCompletionClient |
AzureOpenAIChatClient |
| Azure OpenAI 応答 | ❌ 利用不可 | AzureOpenAIResponsesClient |
| Azure AI | AzureAIChatCompletionClient |
AzureAIAgentClient |
| Anthropic | AnthropicChatCompletionClient |
🚧 計画 |
| Ollama | OllamaChatCompletionClient |
🚧 計画 |
| キャッシュ処理 |
ChatCompletionCache 上包み |
🚧 計画 |
AutoGen モデル クライアント
from autogen_ext.models.openai import OpenAIChatCompletionClient, AzureOpenAIChatCompletionClient
# OpenAI
client = OpenAIChatCompletionClient(
model="gpt-5",
api_key="your-key"
)
# Azure OpenAI
client = AzureOpenAIChatCompletionClient(
azure_endpoint="https://your-endpoint.openai.azure.com/",
azure_deployment="gpt-5",
api_version="2024-12-01",
api_key="your-key"
)
Agent Framework ChatClients
from agent_framework.openai import OpenAIChatClient
from agent_framework.azure import AzureOpenAIChatClient
# OpenAI (reads API key from environment)
client = OpenAIChatClient(model_id="gpt-5")
# Azure OpenAI (uses environment or default credentials; see samples for auth options)
client = AzureOpenAIChatClient(model_id="gpt-5")
詳細な例については、次を参照してください。
- OpenAI チャット クライアント - 基本的な OpenAI クライアントのセットアップ
- Azure OpenAI チャット クライアント - 認証を使用した Azure OpenAI
- Azure AI クライアント - Azure AI エージェントの統合
Responses API のサポート (Agent Framework 排他)
Agent Framework の AzureOpenAIResponsesClient と OpenAIResponsesClient では、AutoGen では使用できない推論モデルと構造化応答の特殊なサポートが提供されます。
from agent_framework.azure import AzureOpenAIResponsesClient
from agent_framework.openai import OpenAIResponsesClient
# Azure OpenAI with Responses API
azure_responses_client = AzureOpenAIResponsesClient(model_id="gpt-5")
# OpenAI with Responses API
openai_responses_client = OpenAIResponsesClient(model_id="gpt-5")
Responses API の例については、次を参照してください。
- Azure Responses Client Basic - Azure OpenAI with responses
- OpenAI Responses Client Basic - OpenAI 応答の統合
Single-Agent 機能マッピング
このセクションでは、AutoGen と Agent Framework の間で単一エージェント機能をマップします。 クライアントを配置した状態で、エージェントを作成し、ツールをアタッチし、非ストリーミング実行とストリーミング実行のどちらかを選択します。
基本的なエージェントの作成と実行
モデル クライアントを構成したら、次の手順ではエージェントを作成します。 どちらのフレームワークも同様のエージェント抽象化を提供しますが、既定の動作と構成オプションが異なります。
AutoGen AssistantAgent
from autogen_agentchat.agents import AssistantAgent
agent = AssistantAgent(
name="assistant",
model_client=client,
system_message="You are a helpful assistant.",
tools=[my_tool],
max_tool_iterations=1 # Single-turn by default
)
# Execution
result = await agent.run(task="What's the weather?")
Agent Framework ChatAgent
from agent_framework import ChatAgent, ai_function
from agent_framework.openai import OpenAIChatClient
# Create simple tools for the example
@ai_function
def get_weather(location: str) -> str:
"""Get weather for a location."""
return f"Weather in {location}: sunny"
@ai_function
def get_time() -> str:
"""Get current time."""
return "Current time: 2:30 PM"
# Create client
client = OpenAIChatClient(model_id="gpt-5")
async def example():
# Direct creation
agent = ChatAgent(
name="assistant",
chat_client=client,
instructions="You are a helpful assistant.",
tools=[get_weather] # Multi-turn by default
)
# Factory method (more convenient)
agent = client.create_agent(
name="assistant",
instructions="You are a helpful assistant.",
tools=[get_weather]
)
# Execution with runtime tool configuration
result = await agent.run(
"What's the weather?",
tools=[get_time], # Can add tools at runtime
tool_choice="auto"
)
主な違い:
-
既定の動作:
ChatAgentはツール呼び出しを自動的に反復処理しますが、AssistantAgentには明示的なmax_tool_iterations設定が必要です -
ランタイム構成:
ChatAgent.run()は、呼び出しごとのカスタマイズのtoolsパラメーターとtool_choiceパラメーターを受け入れます - ファクトリ メソッド: Agent Framework は、チャット クライアントから直接便利なファクトリ メソッドを提供します
-
状態管理:
ChatAgentはステートレスであり、会話履歴を状態の一部として保持するAssistantAgentとは異なり、呼び出し間の会話履歴は保持されません
AgentThread を使用した会話状態の管理
ChatAgentで会話を続けるには、AgentThreadを使用して会話履歴を管理します。
# Assume we have an agent from previous examples
async def conversation_example():
# Create a new thread that will be reused
thread = agent.get_new_thread()
# First interaction - thread is empty
result1 = await agent.run("What's 2+2?", thread=thread)
print(result1.text) # "4"
# Continue conversation - thread contains previous messages
result2 = await agent.run("What about that number times 10?", thread=thread)
print(result2.text) # "40" (understands "that number" refers to 4)
# AgentThread can use external storage, similar to ChatCompletionContext in AutoGen
既定ではステートレス: クイック デモ
# Without a thread (two independent invocations)
r1 = await agent.run("What's 2+2?")
print(r1.text) # for example, "4"
r2 = await agent.run("What about that number times 10?")
print(r2.text) # Likely ambiguous without prior context; cannot be "40"
# With a thread (shared context across calls)
thread = agent.get_new_thread()
print((await agent.run("What's 2+2?", thread=thread)).text) # "4"
print((await agent.run("What about that number times 10?", thread=thread)).text) # "40"
スレッド管理の例については、以下を参照してください。
- スレッドを使用した Azure AI - 会話状態管理
- スレッドを使用した OpenAI チャット クライアント - スレッド の使用パターン
- Redis でサポートされるスレッド - 外部での会話状態の永続化
OpenAI アシスタント エージェントの等価性
どちらのフレームワークでも、OpenAI Assistant API の統合が提供されます。
# AutoGen OpenAIAssistantAgent
from autogen_ext.agents.openai import OpenAIAssistantAgent
# Agent Framework has OpenAI Assistants support via OpenAIAssistantsClient
from agent_framework.openai import OpenAIAssistantsClient
OpenAI アシスタントの例については、次を参照してください。
- OpenAI Assistants Basic - 基本的なアシスタントのセットアップ
- 関数ツールを使用した OpenAI アシスタント - カスタム ツールの統合
- Azure OpenAI Assistants Basic - Azure アシスタントのセットアップ
- スレッドを含む OpenAI アシスタント - スレッド 管理
ストリーミング のサポート
どちらのフレームワークも、UI の応答性を維持するために、クライアントとエージェントからリアルタイムでトークンをストリーミングします。
AutoGen ストリーミング
# Model client streaming
async for chunk in client.create_stream(messages):
if isinstance(chunk, str):
print(chunk, end="")
# Agent streaming
async for event in agent.run_stream(task="Hello"):
if isinstance(event, ModelClientStreamingChunkEvent):
print(event.content, end="")
elif isinstance(event, TaskResult):
print("Final result received")
Agent Framework Streaming
# Assume we have client, agent, and tools from previous examples
async def streaming_example():
# Chat client streaming
async for chunk in client.get_streaming_response("Hello", tools=tools):
if chunk.text:
print(chunk.text, end="")
# Agent streaming
async for chunk in agent.run_stream("Hello"):
if chunk.text:
print(chunk.text, end="", flush=True)
ヒント: Agent Framework では、クライアントとエージェントの両方が同じ更新形式になります。どちらの場合も chunk.text を読み取ることができます。
メッセージの種類と作成
メッセージのしくみを理解することは、効果的なエージェント通信に不可欠です。 どちらのフレームワークも、メッセージの作成と処理に異なるアプローチを提供します。AutoGen では個別のメッセージ クラスを使用し、Agent Framework は統合メッセージ システムを使用します。
AutoGen メッセージの種類
from autogen_agentchat.messages import TextMessage, MultiModalMessage
from autogen_core.models import UserMessage
# Text message
text_msg = TextMessage(content="Hello", source="user")
# Multi-modal message
multi_modal_msg = MultiModalMessage(
content=["Describe this image", image_data],
source="user"
)
# Convert to model format for use with model clients
user_message = text_msg.to_model_message()
Agent Framework のメッセージの種類
from agent_framework import ChatMessage, TextContent, DataContent, UriContent, Role
import base64
# Text message
text_msg = ChatMessage(role=Role.USER, text="Hello")
# Supply real image bytes, or use a data: URI/URL via UriContent
image_bytes = b"<your_image_bytes>"
image_b64 = base64.b64encode(image_bytes).decode()
image_uri = f"data:image/jpeg;base64,{image_b64}"
# Multi-modal message with mixed content
multi_modal_msg = ChatMessage(
role=Role.USER,
contents=[
TextContent(text="Describe this image"),
DataContent(uri=image_uri, media_type="image/jpeg")
]
)
主な違い:
- AutoGen では、
TextMessageフィールドを持つ個別のメッセージ クラス (MultiModalMessage、source) が使用されます - Agent Framework では、型指定されたコンテンツ オブジェクトと
ChatMessageフィールドを含む統合roleが使用されます - Agent Framework メッセージでは、文字列ソースの代わりに
Role列挙型 (USER、ASSISTANT、SYSTEM、TOOL) が使用されます
ツールの作成と統合
ツールは、テキスト生成を超えてエージェント機能を拡張します。 このフレームワークでは、ツールの作成にさまざまなアプローチが取られます。Agent Framework では、より自動化されたスキーマ生成が提供されます。
AutoGen FunctionTool
from autogen_core.tools import FunctionTool
async def get_weather(location: str) -> str:
"""Get weather for a location."""
return f"Weather in {location}: sunny"
# Manual tool creation
tool = FunctionTool(
func=get_weather,
description="Get weather information"
)
# Use with agent
agent = AssistantAgent(name="assistant", model_client=client, tools=[tool])
Agent Framework @ai_function
from agent_framework import ai_function
from typing import Annotated
from pydantic import Field
@ai_function
def get_weather(
location: Annotated[str, Field(description="The location to get weather for")]
) -> str:
"""Get weather for a location."""
return f"Weather in {location}: sunny"
# Direct use with agent (automatic conversion)
agent = ChatAgent(name="assistant", chat_client=client, tools=[get_weather])
詳細な例については、次を参照してください。
- OpenAI チャット エージェント Basic - Simple OpenAI チャット エージェント
- 関数ツールを使用した OpenAI - カスタム ツールを使用したエージェント
- Azure OpenAI Basic - Azure OpenAI エージェントのセットアップ
ホステッド ツール (Agent Framework 排他)
Agent Framework には、AutoGen では使用できないホスト型ツールが用意されています。
from agent_framework import ChatAgent, HostedCodeInterpreterTool, HostedWebSearchTool
from agent_framework.azure import AzureOpenAIChatClient
# Azure OpenAI client with a model that supports hosted tools
client = AzureOpenAIChatClient(model_id="gpt-5")
# Code execution tool
code_tool = HostedCodeInterpreterTool()
# Web search tool
search_tool = HostedWebSearchTool()
agent = ChatAgent(
name="researcher",
chat_client=client,
tools=[code_tool, search_tool]
)
詳細な例については、次を参照してください。
- コード インタープリターを使用した Azure AI - コード実行ツール
- 複数のツールを使用した Azure AI - 複数のホストされたツール
- OpenAI と Web Search - Web 検索の統合
要件と注意事項:
- ホストされているツールは、それらをサポートするモデル/アカウントでのみ使用できます。 これらのツールを有効にする前に、プロバイダーの権利とモデルのサポートを確認します。
- 構成はプロバイダーによって異なります。セットアップとアクセス許可については、各サンプルの前提条件に従ってください。
- すべてのモデルが、ホストされているすべてのツール (Web 検索とコード インタープリターなど) をサポートしているわけではありません。 環境内で互換性のあるモデルを選択します。
注
AutoGen はローカル コード実行ツールをサポートしていますが、この機能は今後の Agent Framework バージョンで計画されています。
主な違い: エージェント フレームワークは、エージェント レベルでツールのイテレーションを自動的に処理します。 AutoGen の max_tool_iterations パラメーターとは異なり、Agent Framework エージェントは、無限ループを防ぐための組み込みの安全メカニズムを使用して、既定で完了するまでツールの実行を続行します。
MCP サーバーのサポート
高度なツール統合のために、どちらのフレームワークもモデル コンテキスト プロトコル (MCP) をサポートし、エージェントが外部サービスとデータ ソースと対話できるようにします。 Agent Framework では、より包括的な組み込みサポートが提供されます。
AutoGen MCP のサポート
AutoGen には、拡張機能による基本的な MCP サポートがあります (特定の実装の詳細はバージョンによって異なります)。
Agent Framework MCP のサポート
from agent_framework import ChatAgent, MCPStdioTool, MCPStreamableHTTPTool, MCPWebsocketTool
from agent_framework.openai import OpenAIChatClient
# Create client for the example
client = OpenAIChatClient(model_id="gpt-5")
# Stdio MCP server
mcp_tool = MCPStdioTool(
name="filesystem",
command="uvx mcp-server-filesystem",
args=["/allowed/directory"]
)
# HTTP streaming MCP
http_mcp = MCPStreamableHTTPTool(
name="http_mcp",
url="http://localhost:8000/sse"
)
# WebSocket MCP
ws_mcp = MCPWebsocketTool(
name="websocket_mcp",
url="ws://localhost:8000/ws"
)
agent = ChatAgent(name="assistant", chat_client=client, tools=[mcp_tool])
MCP の例については、次を参照してください。
- ローカル MCP での OpenAI - OpenAI での MCPStreamableHTTPTool の使用
- ホストされた MCP での OpenAI - ホストされた MCP サービスの使用
- ローカル MCP での Azure AI - Azure AI での MCP の使用
- ホストされた MCP を使用した Azure AI - Azure AI でのホストされた MCP の使用
ツールとしてのエージェント パターン
強力なパターンの 1 つは、エージェント自体をツールとして使用し、階層型エージェント アーキテクチャを有効にすることです。 どちらのフレームワークも、実装が異なるこのパターンをサポートしています。
AutoGen AgentTool
from autogen_agentchat.tools import AgentTool
# Create specialized agent
writer = AssistantAgent(
name="writer",
model_client=client,
system_message="You are a creative writer."
)
# Wrap as tool
writer_tool = AgentTool(agent=writer)
# Use in coordinator (requires disabling parallel tool calls)
coordinator_client = OpenAIChatCompletionClient(
model="gpt-5",
parallel_tool_calls=False
)
coordinator = AssistantAgent(
name="coordinator",
model_client=coordinator_client,
tools=[writer_tool]
)
Agent Framework as_tool()
from agent_framework import ChatAgent
# Assume we have client from previous examples
# Create specialized agent
writer = ChatAgent(
name="writer",
chat_client=client,
instructions="You are a creative writer."
)
# Convert to tool
writer_tool = writer.as_tool(
name="creative_writer",
description="Generate creative content",
arg_name="request",
arg_description="What to write"
)
# Use in coordinator
coordinator = ChatAgent(
name="coordinator",
chat_client=client,
tools=[writer_tool]
)
明示的な移行に関する注意: AutoGen では、同じエージェント インスタンスを呼び出すときのコンカレンシーの問題を回避するために、エージェントをツールとしてラップするときに、コーディネーターのモデル クライアントに parallel_tool_calls=False を設定します。
エージェント フレームワークでは、エージェントが既定でステートレスになっているので、 as_tool() は並列ツール呼び出しを無効にする必要はありません。
ミドルウェア (エージェント フレームワーク機能)
エージェント フレームワークには、AutoGen に不足しているミドルウェア機能が導入されています。 ミドルウェアを使用すると、ログ記録、セキュリティ、パフォーマンスの監視などの強力な横断的な問題を実現できます。
from agent_framework import ChatAgent, AgentRunContext, FunctionInvocationContext
from typing import Callable, Awaitable
# Assume we have client from previous examples
async def logging_middleware(
context: AgentRunContext,
next: Callable[[AgentRunContext], Awaitable[None]]
) -> None:
print(f"Agent {context.agent.name} starting")
await next(context)
print(f"Agent {context.agent.name} completed")
async def security_middleware(
context: FunctionInvocationContext,
next: Callable[[FunctionInvocationContext], Awaitable[None]]
) -> None:
if "password" in str(context.arguments):
print("Blocking function call with sensitive data")
return # Don't call next()
await next(context)
agent = ChatAgent(
name="secure_agent",
chat_client=client,
middleware=[logging_middleware, security_middleware]
)
メリット:
- セキュリティ: 入力の検証とコンテンツのフィルター処理
- 可観測性: ログ記録、メトリック、トレース
- パフォーマンス: キャッシュとレート制限
- エラー処理: グレースフルな低下と再試行ロジック
ミドルウェアの詳細な例については、次を参照してください。
- 関数ベースのミドルウェア - 単純な関数ミドルウェア
- クラス ベースのミドルウェア - オブジェクト指向ミドルウェア
- 例外処理ミドルウェア - エラー処理パターン
- 共有状態ミドルウェア - エージェント間の状態管理
カスタム エージェント
モデルに基づくエージェントがまったく必要ない場合があります。カスタム ロジックを使用して決定論的または API ベースのエージェントが必要な場合があります。 どちらのフレームワークもカスタム エージェントの構築をサポートしていますが、パターンは異なります。
AutoGen: サブクラス BaseChatAgent
from typing import Sequence
from autogen_agentchat.agents import BaseChatAgent
from autogen_agentchat.base import Response
from autogen_agentchat.messages import BaseChatMessage, TextMessage, StopMessage
from autogen_core import CancellationToken
class StaticAgent(BaseChatAgent):
def __init__(self, name: str = "static", description: str = "Static responder") -> None:
super().__init__(name, description)
@property
def produced_message_types(self) -> Sequence[type[BaseChatMessage]]: # Which message types this agent produces
return (TextMessage,)
async def on_messages(self, messages: Sequence[BaseChatMessage], cancellation_token: CancellationToken) -> Response:
# Always return a static response
return Response(chat_message=TextMessage(content="Hello from AutoGen custom agent", source=self.name))
Notes:
-
on_messages(...)を実装し、チャット メッセージを含むResponseを返します。 - 必要に応じて、
on_reset(...)を実装して、実行間の内部状態をクリアします。
エージェント フレームワーク: BaseAgent の拡張 (スレッド対応)
from collections.abc import AsyncIterable
from typing import Any
from agent_framework import (
AgentRunResponse,
AgentRunResponseUpdate,
AgentThread,
BaseAgent,
ChatMessage,
Role,
TextContent,
)
class StaticAgent(BaseAgent):
async def run(
self,
messages: str | ChatMessage | list[str] | list[ChatMessage] | None = None,
*,
thread: AgentThread | None = None,
**kwargs: Any,
) -> AgentRunResponse:
# Build a static reply
reply = ChatMessage(role=Role.ASSISTANT, contents=[TextContent(text="Hello from AF custom agent")])
# Persist conversation to the provided AgentThread (if any)
if thread is not None:
normalized = self._normalize_messages(messages)
await self._notify_thread_of_new_messages(thread, normalized, reply)
return AgentRunResponse(messages=[reply])
async def run_stream(
self,
messages: str | ChatMessage | list[str] | list[ChatMessage] | None = None,
*,
thread: AgentThread | None = None,
**kwargs: Any,
) -> AsyncIterable[AgentRunResponseUpdate]:
# Stream the same static response in a single chunk for simplicity
yield AgentRunResponseUpdate(contents=[TextContent(text="Hello from AF custom agent")], role=Role.ASSISTANT)
# Notify thread of input and the complete response once streaming ends
if thread is not None:
reply = ChatMessage(role=Role.ASSISTANT, contents=[TextContent(text="Hello from AF custom agent")])
normalized = self._normalize_messages(messages)
await self._notify_thread_of_new_messages(thread, normalized, reply)
Notes:
-
AgentThread外部で会話状態を維持します。agent.get_new_thread()を使用してrun/run_streamに渡します。 - スレッドが交換の両側を持つよう、
self._notify_thread_of_new_messages(thread, input_messages, response_messages)を呼び出します。 - 完全なサンプルを参照してください: カスタム エージェント
次に、フレームワークが最も異なる領域であるマルチエージェント オーケストレーションを見てみましょう。
マルチエージェント機能マッピング
プログラミング モデルの概要
マルチエージェント プログラミング モデルは、2 つのフレームワーク間の最も大きな違いを表します。
AutoGen のデュアル モデル アプローチ
AutoGen には、次の 2 つのプログラミング モデルが用意されています。
-
autogen-core:RoutedAgentおよびメッセージ サブスクリプションを使用した低レベルのイベント ドリブン プログラミング -
Team抽象化: 上に構築された高レベルの実行中心モデルautogen-core
# Low-level autogen-core (complex)
class MyAgent(RoutedAgent):
@message_handler
async def handle_message(self, message: TextMessage, ctx: MessageContext) -> None:
# Handle specific message types
pass
# High-level Team (easier but limited)
team = RoundRobinGroupChat(
participants=[agent1, agent2],
termination_condition=StopAfterNMessages(5)
)
result = await team.run(task="Collaborate on this task")
課題:
- 低レベルモデルは、ほとんどのユーザーにとって複雑すぎる
- 高度なモデルは、複雑な動作の制限になる可能性があります
- 2 つのモデル間のブリッジングにより、実装の複雑さが増す
エージェント フレームワークの統合ワークフロー モデル
Agent Framework には、両方の方法のベストを組み合わせた単一の Workflow 抽象化が用意されています。
from agent_framework import WorkflowBuilder, executor, WorkflowContext
from typing_extensions import Never
# Assume we have agent1 and agent2 from previous examples
@executor(id="agent1")
async def agent1_executor(input_msg: str, ctx: WorkflowContext[str]) -> None:
response = await agent1.run(input_msg)
await ctx.send_message(response.text)
@executor(id="agent2")
async def agent2_executor(input_msg: str, ctx: WorkflowContext[Never, str]) -> None:
response = await agent2.run(input_msg)
await ctx.yield_output(response.text) # Final output
# Build typed data flow graph
workflow = (WorkflowBuilder()
.add_edge(agent1_executor, agent2_executor)
.set_start_executor(agent1_executor)
.build())
# Example usage (would be in async context)
# result = await workflow.run("Initial input")
ワークフローの詳細な例については、次を参照してください。
- ワークフローの基本 - Executor とエッジの概要
- ワークフロー内のエージェント - ワークフロー 内のエージェントの統合
- ワークフロー ストリーミング - リアルタイムワークフロー実行
メリット:
- 統合モデル: すべての複雑さのレベルに対する単一の抽象化
- タイプ セーフ: 厳密に型指定された入力と出力
- グラフの視覚化: データ フロー表現をクリアする
- 柔軟な構成: エージェント、関数、サブワークフローを混在させます
ワークフローと GraphFlow
Agent Framework の Workflow 抽象化は、AutoGen の試験的な GraphFlow 機能に着想を得たものですが、設計理念における重要な進化を表しています。
- GraphFlow: エッジが遷移であり、メッセージがすべてのエージェントにブロードキャストされる制御フロー ベース。切り替えは、ブロードキャストされたメッセージの内容に合った状態になります
- ワークフロー: メッセージが特定のエッジを介してルーティングされ、Executor がエッジによってアクティブ化され、同時実行がサポートされるデータ フロー ベース。
ビジュアルの概要
次の図は、AutoGen の制御フロー GraphFlow (左) と Agent Framework のデータ フロー ワークフロー (右) を比較したものです。 GraphFlow は、条件付き遷移とブロードキャストを使用してエージェントをノードとしてモデル化します。 ワークフロー モデル Executor (エージェント、関数、またはサブワークフロー) は、型指定されたエッジによって接続されます。また、要求/応答の一時停止とチェックポイント処理もサポートしています。
flowchart LR
subgraph AutoGenGraphFlow
direction TB
U[User / Task] --> A[Agent A]
A -->|success| B[Agent B]
A -->|retry| C[Agent C]
A -. broadcast .- B
A -. broadcast .- C
end
subgraph AgentFrameworkWorkflow
direction TB
I[Input] --> E1[Executor 1]
E1 -->|"str"| E2[Executor 2]
E1 -->|"image"| E3[Executor 3]
E3 -->|"str"| E2
E2 --> OUT[(Final Output)]
end
R[Request / Response Gate]
E2 -. request .-> R
R -. resume .-> E2
CP[Checkpoint]
E1 -. save .-> CP
CP -. load .-> E1
実際に:
- GraphFlow はエージェントをノードとして使用し、メッセージをブロードキャストします。エッジは条件付き遷移を表します。
- ワークフローは、型指定されたメッセージをエッジに沿ってルーティングします。 ノード (Executor) には、エージェント、純粋関数、またはサブワークフローを指定できます。
- 要求/応答を使用すると、ワークフローを外部入力に対して一時停止できます。チェックポイント処理は進行状況を保持し、再開を有効にします。
コードの比較
1) シーケンシャル + 条件付き
# AutoGen GraphFlow (fluent builder) — writer → reviewer → editor (conditional)
from autogen_agentchat.agents import AssistantAgent
from autogen_agentchat.teams import DiGraphBuilder, GraphFlow
writer = AssistantAgent(name="writer", description="Writes a draft", model_client=client)
reviewer = AssistantAgent(name="reviewer", description="Reviews the draft", model_client=client)
editor = AssistantAgent(name="editor", description="Finalizes the draft", model_client=client)
graph = (
DiGraphBuilder()
.add_node(writer).add_node(reviewer).add_node(editor)
.add_edge(writer, reviewer) # always
.add_edge(reviewer, editor, condition=lambda msg: "approve" in msg.to_model_text())
.set_entry_point(writer)
).build()
team = GraphFlow(participants=[writer, reviewer, editor], graph=graph)
result = await team.run(task="Draft a short paragraph about solar power")
# Agent Framework Workflow — sequential executors with conditional logic
from agent_framework import WorkflowBuilder, executor, WorkflowContext
from typing_extensions import Never
@executor(id="writer")
async def writer_exec(task: str, ctx: WorkflowContext[str]) -> None:
await ctx.send_message(f"Draft: {task}")
@executor(id="reviewer")
async def reviewer_exec(draft: str, ctx: WorkflowContext[str]) -> None:
decision = "approve" if "solar" in draft.lower() else "revise"
await ctx.send_message(f"{decision}:{draft}")
@executor(id="editor")
async def editor_exec(msg: str, ctx: WorkflowContext[Never, str]) -> None:
if msg.startswith("approve:"):
await ctx.yield_output(msg.split(":", 1)[1])
else:
await ctx.yield_output("Needs revision")
workflow_seq = (
WorkflowBuilder()
.add_edge(writer_exec, reviewer_exec)
.add_edge(reviewer_exec, editor_exec)
.set_start_executor(writer_exec)
.build()
)
2) ファンアウト + 結合 (ALL 対 ANY)
# AutoGen GraphFlow — A → (B, C) → D with ALL/ANY join
from autogen_agentchat.teams import DiGraphBuilder, GraphFlow
A, B, C, D = agent_a, agent_b, agent_c, agent_d
# ALL (default): D runs after both B and C
g_all = (
DiGraphBuilder()
.add_node(A).add_node(B).add_node(C).add_node(D)
.add_edge(A, B).add_edge(A, C)
.add_edge(B, D).add_edge(C, D)
.set_entry_point(A)
).build()
# ANY: D runs when either B or C completes
g_any = (
DiGraphBuilder()
.add_node(A).add_node(B).add_node(C).add_node(D)
.add_edge(A, B).add_edge(A, C)
.add_edge(B, D, activation_group="join_d", activation_condition="any")
.add_edge(C, D, activation_group="join_d", activation_condition="any")
.set_entry_point(A)
).build()
# Agent Framework Workflow — A → (B, C) → aggregator (ALL vs ANY)
from agent_framework import WorkflowBuilder, executor, WorkflowContext
from typing_extensions import Never
@executor(id="A")
async def start(task: str, ctx: WorkflowContext[str]) -> None:
await ctx.send_message(f"B:{task}", target_id="B")
await ctx.send_message(f"C:{task}", target_id="C")
@executor(id="B")
async def branch_b(text: str, ctx: WorkflowContext[str]) -> None:
await ctx.send_message(f"B_done:{text}")
@executor(id="C")
async def branch_c(text: str, ctx: WorkflowContext[str]) -> None:
await ctx.send_message(f"C_done:{text}")
@executor(id="join_any")
async def join_any(msg: str, ctx: WorkflowContext[Never, str]) -> None:
await ctx.yield_output(f"First: {msg}") # ANY join (first arrival)
@executor(id="join_all")
async def join_all(msg: str, ctx: WorkflowContext[str, str]) -> None:
state = await ctx.get_executor_state() or {"items": []}
state["items"].append(msg)
await ctx.set_executor_state(state)
if len(state["items"]) >= 2:
await ctx.yield_output(" | ".join(state["items"])) # ALL join
wf_any = (
WorkflowBuilder()
.add_edge(start, branch_b).add_edge(start, branch_c)
.add_edge(branch_b, join_any).add_edge(branch_c, join_any)
.set_start_executor(start)
.build()
)
wf_all = (
WorkflowBuilder()
.add_edge(start, branch_b).add_edge(start, branch_c)
.add_edge(branch_b, join_all).add_edge(branch_c, join_all)
.set_start_executor(start)
.build()
)
3) 対象ルーティング (ブロードキャストなし)
from agent_framework import WorkflowBuilder, executor, WorkflowContext
from typing_extensions import Never
@executor(id="ingest")
async def ingest(task: str, ctx: WorkflowContext[str]) -> None:
# Route selectively using target_id
if task.startswith("image:"):
await ctx.send_message(task.removeprefix("image:"), target_id="vision")
else:
await ctx.send_message(task, target_id="writer")
@executor(id="writer")
async def write(text: str, ctx: WorkflowContext[Never, str]) -> None:
await ctx.yield_output(f"Draft: {text}")
@executor(id="vision")
async def caption(image_ref: str, ctx: WorkflowContext[Never, str]) -> None:
await ctx.yield_output(f"Caption: {image_ref}")
workflow = (
WorkflowBuilder()
.add_edge(ingest, write)
.add_edge(ingest, caption)
.set_start_executor(ingest)
.build()
)
# Example usage (async):
# await workflow.run("Summarize the benefits of solar power")
# await workflow.run("image:https://example.com/panel.jpg")
注意する内容:
- GraphFlow はメッセージをブロードキャストし、条件付き遷移を使用します。 結合動作は、ターゲット側の
activationとエッジごとのactivation_group/activation_conditionを使用して構成されます (たとえば、join_dで両方のエッジをactivation_condition="any"にグループ化します)。 - ワークフローはデータを明示的にルーティングします。
target_idを使用してダウンストリーム Executor を選択します。 結合の動作は、受信側の Executor に存在します (たとえば、最初の入力とすべての待機時のイールド)、またはオーケストレーション ビルダー/アグリゲーターを使用します。 - Workflow の Executor は自由形式です。
ChatAgent、関数、またはサブワークフローをラップし、同じグラフ内でそれらを混在させます。
主な違い
次の表は、AutoGen の GraphFlow とエージェント フレームワークのワークフローの基本的な違いをまとめたものです。
| 特徴 | AutoGen GraphFlow | Agent Framework ワークフロー |
|---|---|---|
| フローの種類 | 制御フロー (エッジは遷移) | データ フロー (エッジ ルート メッセージ) |
| ノードの種類 | エージェントのみ | エージェント、関数、サブワークフロー |
| アクティベーション | メッセージブロードキャスト | エッジ ベースのアクティブ化 |
| タイプ セーフ | Limited | 全体を通して強力な入力 |
| コンポーザビリティ | Limited | 高いコンポーザブル |
パターンの入れ子
AutoGen チームの入れ子
# Inner team
inner_team = RoundRobinGroupChat(
participants=[specialist1, specialist2],
termination_condition=StopAfterNMessages(3)
)
# Outer team with nested team as participant
outer_team = RoundRobinGroupChat(
participants=[coordinator, inner_team, reviewer], # Team as participant
termination_condition=StopAfterNMessages(10)
)
# Messages are broadcasted to all participants including nested team
result = await outer_team.run("Complex task requiring collaboration")
AutoGen の入れ子の特性:
- 入れ子になったチームが外部チームからすべてのメッセージを受信する
- 入れ子になったチーム メッセージは、すべての外部チーム参加者にブロードキャストされます
- すべてのレベルの共有メッセージ コンテキスト
Agent Framework ワークフローの入れ子
from agent_framework import WorkflowExecutor, WorkflowBuilder
# Assume we have executors from previous examples
# specialist1_executor, specialist2_executor, coordinator_executor, reviewer_executor
# Create sub-workflow
sub_workflow = (WorkflowBuilder()
.add_edge(specialist1_executor, specialist2_executor)
.set_start_executor(specialist1_executor)
.build())
# Wrap as executor
sub_workflow_executor = WorkflowExecutor(
workflow=sub_workflow,
id="sub_process"
)
# Use in parent workflow
parent_workflow = (WorkflowBuilder()
.add_edge(coordinator_executor, sub_workflow_executor)
.add_edge(sub_workflow_executor, reviewer_executor)
.set_start_executor(coordinator_executor)
.build())
Agent Framework の入れ子の特性:
- 分離された入力/出力
WorkflowExecutor - メッセージブロードキャストなし - 特定の接続を経由するデータ フロー
- 各ワークフロー レベルの独立した状態管理
グループ チャット パターン
グループ チャット パターンを使用すると、複数のエージェントが複雑なタスクで共同作業を行うことができます。 フレームワーク間で一般的なパターンがどのように変換されるかを次に示します。
RoundRobinGroupChat パターン
AutoGen の実装:
from autogen_agentchat.teams import RoundRobinGroupChat
from autogen_agentchat.conditions import StopAfterNMessages
team = RoundRobinGroupChat(
participants=[agent1, agent2, agent3],
termination_condition=StopAfterNMessages(10)
)
result = await team.run("Discuss this topic")
エージェント フレームワークの実装:
from agent_framework import SequentialBuilder, WorkflowOutputEvent
# Assume we have agent1, agent2, agent3 from previous examples
# Sequential workflow through participants
workflow = SequentialBuilder().participants([agent1, agent2, agent3]).build()
# Example usage (would be in async context)
async def sequential_example():
# Each agent appends to shared conversation
async for event in workflow.run_stream("Discuss this topic"):
if isinstance(event, WorkflowOutputEvent):
conversation_history = event.data # list[ChatMessage]
オーケストレーションの詳細な例については、次を参照してください。
- シーケンシャル エージェント - ラウンド ロビン スタイルのエージェントの実行
- シーケンシャル カスタム Executor - カスタム Executor パターン
同時実行パターンの場合、Agent Framework には次の機能も用意されています。
from agent_framework import ConcurrentBuilder, WorkflowOutputEvent
# Assume we have agent1, agent2, agent3 from previous examples
# Concurrent workflow for parallel processing
workflow = (ConcurrentBuilder()
.participants([agent1, agent2, agent3])
.build())
# Example usage (would be in async context)
async def concurrent_example():
# All agents process the input concurrently
async for event in workflow.run_stream("Process this in parallel"):
if isinstance(event, WorkflowOutputEvent):
results = event.data # Combined results from all agents
同時実行の例については、次を参照してください。
- 同時実行エージェント - 並列エージェントの実行
- 同時実行カスタム Executor - カスタム並列パターン
- カスタム アグリゲーターとの同時実行 - 結果集計パターン
MagenticOneGroupChat パターン
AutoGen の実装:
from autogen_agentchat.teams import MagenticOneGroupChat
team = MagenticOneGroupChat(
participants=[researcher, coder, executor],
model_client=coordinator_client,
termination_condition=StopAfterNMessages(20)
)
result = await team.run("Complex research and analysis task")
エージェント フレームワークの実装:
from typing import cast
from agent_framework import (
MAGENTIC_EVENT_TYPE_AGENT_DELTA,
MAGENTIC_EVENT_TYPE_ORCHESTRATOR,
AgentRunUpdateEvent,
ChatAgent,
ChatMessage,
MagenticBuilder,
WorkflowOutputEvent,
)
from agent_framework.openai import OpenAIChatClient
# Create a manager agent for orchestration
manager_agent = ChatAgent(
name="MagenticManager",
description="Orchestrator that coordinates the workflow",
instructions="You coordinate a team to complete complex tasks efficiently.",
chat_client=OpenAIChatClient(),
)
workflow = (
MagenticBuilder()
.participants(researcher=researcher, coder=coder)
.with_standard_manager(
agent=manager_agent,
max_round_count=20,
max_stall_count=3,
max_reset_count=2,
)
.build()
)
# Example usage (would be in async context)
async def magentic_example():
output: str | None = None
async for event in workflow.run_stream("Complex research task"):
if isinstance(event, AgentRunUpdateEvent):
props = event.data.additional_properties if event.data else None
event_type = props.get("magentic_event_type") if props else None
if event_type == MAGENTIC_EVENT_TYPE_ORCHESTRATOR:
text = event.data.text if event.data else ""
print(f"[ORCHESTRATOR]: {text}")
elif event_type == MAGENTIC_EVENT_TYPE_AGENT_DELTA:
agent_id = props.get("agent_id", event.executor_id) if props else event.executor_id
if event.data and event.data.text:
print(f"[{agent_id}]: {event.data.text}", end="")
elif isinstance(event, WorkflowOutputEvent):
output_messages = cast(list[ChatMessage], event.data)
if output_messages:
output = output_messages[-1].text
Agent Framework のカスタマイズ オプション:
マゼンティック ワークフローには、広範なカスタマイズ オプションが用意されています。
- マネージャーの構成: カスタム命令とモデル設定で ChatAgent を使用する
-
ラウンド制限:
max_round_count、max_stall_count、max_reset_count -
イベント ストリーミング:
magentic_event_typeメタデータでAgentRunUpdateEventを使用する - エージェントの特殊化: エージェントごとのカスタム命令とツール
- Human-in-the-loop: 計画のレビュー、ツールの承認、ストールの介入
# Advanced customization example with human-in-the-loop
from typing import cast
from agent_framework import (
MAGENTIC_EVENT_TYPE_AGENT_DELTA,
MAGENTIC_EVENT_TYPE_ORCHESTRATOR,
AgentRunUpdateEvent,
ChatAgent,
MagenticBuilder,
MagenticHumanInterventionDecision,
MagenticHumanInterventionKind,
MagenticHumanInterventionReply,
MagenticHumanInterventionRequest,
RequestInfoEvent,
WorkflowOutputEvent,
)
from agent_framework.openai import OpenAIChatClient
# Create manager agent with custom configuration
manager_agent = ChatAgent(
name="MagenticManager",
description="Orchestrator for complex tasks",
instructions="Custom orchestration instructions...",
chat_client=OpenAIChatClient(model_id="gpt-4o"),
)
workflow = (
MagenticBuilder()
.participants(
researcher=researcher_agent,
coder=coder_agent,
analyst=analyst_agent,
)
.with_standard_manager(
agent=manager_agent,
max_round_count=15, # Limit total rounds
max_stall_count=2, # Trigger stall handling
max_reset_count=1, # Allow one reset on failure
)
.with_plan_review() # Enable human plan review
.with_human_input_on_stall() # Enable human intervention on stalls
.build()
)
# Handle human intervention requests during execution
async for event in workflow.run_stream("Complex task"):
if isinstance(event, RequestInfoEvent) and event.request_type is MagenticHumanInterventionRequest:
req = cast(MagenticHumanInterventionRequest, event.data)
if req.kind == MagenticHumanInterventionKind.PLAN_REVIEW:
# Review and approve the plan
reply = MagenticHumanInterventionReply(
decision=MagenticHumanInterventionDecision.APPROVE
)
async for ev in workflow.send_responses_streaming({event.request_id: reply}):
pass # Handle continuation
マゼンティックの詳細な例については、次を参照してください。
- 基本的なマゼンティック ワークフロー - 標準で調整されたマルチエージェント ワークフロー
- チェックポイント処理を使用したマゼンティック - 永続的に調整されたワークフロー
- マゼンティック ヒューマン プランの更新 - Human-in-the-loop プラン レビュー
- マゼンティック エージェントの明確化 - エージェントの明確化のためのツールの承認
- マゼンティック・ヒューマン・リプラン - ストールに対する人間の介入
将来のパターン
Agent Framework のロードマップには、現在開発中のいくつかの AutoGen パターンが含まれています。
- Swarm パターン: ハンドオフ ベースのエージェントの調整
- SelectorGroupChat: LLM 駆動スピーカーの選択
Human-in-the-Loop with Request Response
Agent Framework の Workflow の主な新機能は 、要求と応答の概念です。これにより、ワークフローは実行を一時停止し、外部入力を待ってから続行できます。 この機能は、AutoGen の Team 抽象化には存在せず、高度な人間のループ内パターンを可能にします。
AutoGen の制限事項
AutoGen の Team 抽象化は開始されると継続的に実行され、人間の入力の実行を一時停止するための組み込みのメカニズムは提供されません。 人間のループ内機能には、フレームワークの外部でカスタム実装が必要です。
Agent Framework Request-Response API
Agent Framework には、任意の Executor が ctx.request_info() を使用して要求を送信し、 @response_handler デコレーターで応答を処理できる組み込みの要求応答機能が用意されています。
from agent_framework import (
RequestInfoEvent, WorkflowBuilder, WorkflowContext,
Executor, handler, response_handler
)
from dataclasses import dataclass
# Assume we have agent_executor defined elsewhere
# Define typed request payload
@dataclass
class ApprovalRequest:
"""Request human approval for agent output."""
content: str = ""
agent_name: str = ""
# Workflow executor that requests human approval
class ReviewerExecutor(Executor):
@handler
async def review_content(
self,
agent_response: str,
ctx: WorkflowContext
) -> None:
# Request human input with structured data
approval_request = ApprovalRequest(
content=agent_response,
agent_name="writer_agent"
)
await ctx.request_info(request_data=approval_request, response_type=str)
@response_handler
async def handle_approval_response(
self,
original_request: ApprovalRequest,
decision: str,
ctx: WorkflowContext
) -> None:
decision_lower = decision.strip().lower()
original_content = original_request.content
if decision_lower == "approved":
await ctx.yield_output(f"APPROVED: {original_content}")
else:
await ctx.yield_output(f"REVISION NEEDED: {decision}")
# Build workflow with human-in-the-loop
reviewer = ReviewerExecutor(id="reviewer")
workflow = (WorkflowBuilder()
.add_edge(agent_executor, reviewer)
.set_start_executor(agent_executor)
.build())
Human-in-the-Loop ワークフローの実行
Agent Framework には、一時停止再開サイクルを処理するためのストリーミング API が用意されています。
from agent_framework import RequestInfoEvent, WorkflowOutputEvent
# Assume we have workflow defined from previous examples
async def run_with_human_input():
pending_responses = None
completed = False
while not completed:
# First iteration uses run_stream, subsequent use send_responses_streaming
stream = (
workflow.send_responses_streaming(pending_responses)
if pending_responses
else workflow.run_stream("initial input")
)
events = [event async for event in stream]
pending_responses = None
# Collect human requests and outputs
for event in events:
if isinstance(event, RequestInfoEvent):
# Display request to human and collect response
request_data = event.data # ApprovalRequest instance
print(f"Review needed: {request_data.content}")
human_response = input("Enter 'approved' or revision notes: ")
pending_responses = {event.request_id: human_response}
elif isinstance(event, WorkflowOutputEvent):
print(f"Final result: {event.data}")
completed = True
human-in-the-loop ワークフローの例については、次を参照してください。
- 人間入力を使用したゲームの推測 - ユーザー フィードバックを含む対話型ワークフロー
- ユーザー入力を使用したエージェントとしてのワークフロー - 入 れ子になったワークフローと人間の対話
ワークフローのチェックポイント処理と再開
AutoGen のWorkflow抽象化に対する Agent Framework のTeamのもう 1 つの主な利点は、チェックポイント処理と実行の再開の組み込みサポートです。 これにより、ワークフローをチェックポイントから後で一時停止、永続化、再開できるため、フォールト トレランスが提供され、実行時間の長いワークフローまたは非同期ワークフローが有効になります。
AutoGen の制限事項
AutoGen の Team 抽象化では、組み込みのチェックポイント機能は提供されません。 永続化または回復メカニズムは外部で実装する必要があり、多くの場合、複雑な状態管理とシリアル化ロジックが必要です。
エージェント フレームワークのチェックポイント処理
Agent Framework では、FileCheckpointStorageとwith_checkpointing()のWorkflowBuilderメソッドを使用した包括的なチェックポイント処理が提供されます。 チェックポイント キャプチャ:
-
Executor 状態: 使用する各 Executor のローカル状態
ctx.set_executor_state() -
共有状態: 次を使用して実行プログラム間の状態
ctx.set_shared_state() - メッセージ キュー: Executor 間の保留中のメッセージ
- ワークフローの位置: 現在の実行の進行状況と次のステップ
from agent_framework import (
FileCheckpointStorage, WorkflowBuilder, WorkflowContext,
Executor, handler
)
from typing_extensions import Never
class ProcessingExecutor(Executor):
@handler
async def process(self, data: str, ctx: WorkflowContext[str]) -> None:
# Process the data
result = f"Processed: {data.upper()}"
print(f"Processing: '{data}' -> '{result}'")
# Persist executor-local state
prev_state = await ctx.get_executor_state() or {}
count = prev_state.get("count", 0) + 1
await ctx.set_executor_state({
"count": count,
"last_input": data,
"last_output": result
})
# Persist shared state for other executors
await ctx.set_shared_state("original_input", data)
await ctx.set_shared_state("processed_output", result)
await ctx.send_message(result)
class FinalizeExecutor(Executor):
@handler
async def finalize(self, data: str, ctx: WorkflowContext[Never, str]) -> None:
result = f"Final: {data}"
await ctx.yield_output(result)
# Configure checkpoint storage
checkpoint_storage = FileCheckpointStorage(storage_path="./checkpoints")
processing_executor = ProcessingExecutor(id="processing")
finalize_executor = FinalizeExecutor(id="finalize")
# Build workflow with checkpointing enabled
workflow = (WorkflowBuilder()
.add_edge(processing_executor, finalize_executor)
.set_start_executor(processing_executor)
.with_checkpointing(checkpoint_storage=checkpoint_storage) # Enable checkpointing
.build())
# Example usage (would be in async context)
async def checkpoint_example():
# Run workflow - checkpoints are created automatically
async for event in workflow.run_stream("input data"):
print(f"Event: {event}")
チェックポイントからの再開
Agent Framework には、特定のチェックポイントの一覧表示、検査、再開を行う API が用意されています。
from typing_extensions import Never
from agent_framework import (
Executor,
FileCheckpointStorage,
WorkflowContext,
WorkflowBuilder,
get_checkpoint_summary,
handler,
)
class UpperCaseExecutor(Executor):
@handler
async def process(self, text: str, ctx: WorkflowContext[str]) -> None:
result = text.upper()
await ctx.send_message(result)
class ReverseExecutor(Executor):
@handler
async def process(self, text: str, ctx: WorkflowContext[Never, str]) -> None:
result = text[::-1]
await ctx.yield_output(result)
def create_workflow(checkpoint_storage: FileCheckpointStorage):
"""Create a workflow with two executors and checkpointing."""
upper_executor = UpperCaseExecutor(id="upper")
reverse_executor = ReverseExecutor(id="reverse")
return (WorkflowBuilder()
.add_edge(upper_executor, reverse_executor)
.set_start_executor(upper_executor)
.with_checkpointing(checkpoint_storage=checkpoint_storage)
.build())
# Assume we have checkpoint_storage from previous examples
checkpoint_storage = FileCheckpointStorage(storage_path="./checkpoints")
async def checkpoint_resume_example():
# List available checkpoints
checkpoints = await checkpoint_storage.list_checkpoints()
# Display checkpoint information
for checkpoint in checkpoints:
summary = get_checkpoint_summary(checkpoint)
print(f"Checkpoint {summary.checkpoint_id}: iteration={summary.iteration_count}")
# Resume from a specific checkpoint
if checkpoints:
chosen_checkpoint_id = checkpoints[0].checkpoint_id
# Create new workflow instance and resume
new_workflow = create_workflow(checkpoint_storage)
async for event in new_workflow.run_stream(
checkpoint_id=chosen_checkpoint_id,
checkpoint_storage=checkpoint_storage
):
print(f"Resumed event: {event}")
高度なチェックポイント機能
Human-in-the-Loop 統合を使用したチェックポイント:
チェックポイント処理は、人間が入力するためにワークフローを一時停止し、後で再開できるように、ループ内の人間のワークフローとシームレスに連携します。 保留中の要求を含むチェックポイントから再開すると、これらの要求はイベントとして再出力されます。
# Assume we have workflow, checkpoint_id, and checkpoint_storage from previous examples
async def resume_with_pending_requests_example():
# Resume from checkpoint - pending requests will be re-emitted
request_info_events = []
async for event in workflow.run_stream(
checkpoint_id=checkpoint_id,
checkpoint_storage=checkpoint_storage
):
if isinstance(event, RequestInfoEvent):
request_info_events.append(event)
# Handle re-emitted pending request
responses = {}
for event in request_info_events:
response = handle_request(event.data)
responses[event.request_id] = response
# Send response back to workflow
async for event in workflow.send_responses_streaming(responses):
print(f"Event: {event}")
主な利点
AutoGen と比較して、Agent Framework のチェックポイント処理では次の機能が提供されます。
- 自動永続化: 手動の状態管理は必要ありません
- 細かい復旧: スーパーステップ境界からの再開
- 状態の分離: Executor ローカル状態と共有状態を分離する
- 人間とループ内の統合: ユーザー入力によるシームレスな一時停止再開
- フォールト トレランス: 障害または中断からの堅牢な復旧
実際の例
包括的なチェックポイント処理の例については、次を参照してください。
- 再開を使用したチェックポイント - 基本的なチェックポイント処理と対話型の再開
- Human-in-the-Loop を使用したチェックポイント - 人間の承認ゲートを持つ永続的なワークフロー
- サブワークフロー チェックポイント - 入れ子になったワークフローのチェックポイント処理
- マゼンティック チェックポイント - 調整されたマルチエージェント ワークフローのチェックポイント処理
Observability
AutoGen と Agent Framework はどちらも監視機能を提供しますが、さまざまなアプローチと機能を備えています。
AutoGen の可観測性
AutoGen には、次のインストルメンテーションを使用した OpenTelemetry のネイティブ サポートがあります。
-
ランタイム トレース:
SingleThreadedAgentRuntimeとGrpcWorkerAgentRuntime -
ツールの実行: GenAI セマンティック規則に従って
BaseToolスパンを持つexecute_tool -
エージェント操作:
BaseChatAgentとcreate_agentスパンを使用したinvoke_agent
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from autogen_core import SingleThreadedAgentRuntime
# Configure OpenTelemetry
tracer_provider = TracerProvider()
trace.set_tracer_provider(tracer_provider)
# Pass to runtime
runtime = SingleThreadedAgentRuntime(tracer_provider=tracer_provider)
エージェント フレームワークの可観測性
Agent Framework は、複数のアプローチを通じて包括的な可観測性を提供します。
- ゼロ コードのセットアップ: 環境変数を使用した自動インストルメンテーション
- 手動構成: カスタム パラメーターを使用したプログラムによるセットアップ
- 豊富なテレメトリ: エージェント、ワークフロー、ツール実行の追跡
- コンソール出力: コンソールの組み込みのログ記録と視覚化
from agent_framework import ChatAgent
from agent_framework.observability import setup_observability
from agent_framework.openai import OpenAIChatClient
# Zero-code setup via environment variables
# Set ENABLE_OTEL=true
# Set OTLP_ENDPOINT=http://localhost:4317
# Or manual setup
setup_observability(
otlp_endpoint="http://localhost:4317"
)
# Create client for the example
client = OpenAIChatClient(model_id="gpt-5")
async def observability_example():
# Observability is automatically applied to all agents and workflows
agent = ChatAgent(name="assistant", chat_client=client)
result = await agent.run("Hello") # Automatically traced
主な違い:
- セットアップの複雑さ: Agent Framework では、よりシンプルなゼロコード セットアップ オプションが提供されます
- スコープ: エージェント フレームワークは、ワークフロー レベルの可観測性を含む広範な範囲を提供します
- 視覚化: Agent Framework には、コンソール出力と開発 UI が組み込まれています
- 構成: Agent Framework では、より柔軟な構成オプションが提供されます
監視の詳細な例については、次を参照してください。
- ゼロ コードのセットアップ - 環境変数の構成
- 手動セットアップ - プログラムによる構成
- エージェントの監視 - 単一エージェント テレメトリ
- ワークフロー監視 - マルチエージェント ワークフロー トレース
Conclusion
この移行ガイドでは、AutoGen と Microsoft Agent Framework の間の包括的なマッピングを提供します。基本的なエージェントの作成から複雑なマルチエージェント ワークフローまで、すべてをカバーします。 移行に関する重要なポイント:
- 単一エージェントの移行 は簡単で、同様の API と Agent Framework の機能が強化されています
- マルチエージェント パターンでは 、イベント ドリブンからデータ フロー ベースのアーキテクチャへのアプローチを再考する必要がありますが、GraphFlow に既に慣れている場合は、移行が容易になります
- Agent Framework には 、ミドルウェア、ホステッド ツール、型指定されたワークフローなどの追加機能が用意されています
その他の例と詳細な実装ガイダンスについては、 Agent Framework のサンプル ディレクトリを参照してください。
その他のサンプル カテゴリ
Agent Framework には、他のいくつかの重要な領域にわたるサンプルが用意されています。
- スレッド: スレッド のサンプル - 会話の状態とコンテキストの管理
- マルチモーダル入力: マルチモーダル サンプル - 画像やその他のメディアの種類の操作
- コンテキスト プロバイダー: コンテキスト プロバイダーのサンプル - 外部コンテキスト統合パターン