次の方法で共有


AutoGen から Microsoft Agent Framework への移行ガイド

AutoGen から Microsoft Agent Framework Python SDK に移行するための包括的なガイド。

目次

バックグラウンド

AutoGen は、大規模な言語モデル (LLM) を使用して AI エージェントとマルチエージェント システムを構築するためのフレームワークです。 Microsoft Research の研究プロジェクトとして始まり、GroupChat やイベント ドリブン エージェント ランタイムなど、マルチエージェント オーケストレーションのいくつかの概念を開拓しました。 このプロジェクトはオープンソースコミュニティの実りあるコラボレーションであり、多くの重要な機能は外部の共同作成者から来ました。

Microsoft Agent Framework は、LLM を使用して AI エージェントとワークフローを構築するための新しい多言語 SDK です。 AutoGen で開拓されたアイデアの大幅な進化を表し、実際の使用状況から学んだ教訓が組み込まれています。 これは、Microsoft のコア AutoGen チームとセマンティック カーネル チームによって開発され、今後 AI アプリケーションを構築するための新しい基盤として設計されています。

このガイドでは、実際の移行パスについて説明します。最初に、同じままの内容と変更点をひとめで説明します。 次に、モデル クライアントのセットアップ、単一エージェント機能、最後に、具体的なコードを並べて使用したマルチエージェント オーケストレーションについて説明します。 その過程で、Agent Framework リポジトリの実行可能なサンプルへのリンクは、各手順を検証するのに役立ちます。

主な類似点と相違点

同じままの内容

基礎は使い慣れている。 モデル クライアントを中心にエージェントを作成し、指示を提供し、ツールをアタッチします。 どちらのライブラリも、関数スタイルのツール、トークン ストリーミング、マルチモーダル コンテンツ、非同期 I/O をサポートしています。

# Both frameworks follow similar patterns
# AutoGen
agent = AssistantAgent(name="assistant", model_client=client, tools=[my_tool])
result = await agent.run(task="Help me with this task")

# Agent Framework
agent = ChatAgent(name="assistant", chat_client=client, tools=[my_tool])
result = await agent.run("Help me with this task")

主な違い

  1. オーケストレーション スタイル: AutoGen では、イベント ドリブン コアと高レベルの Teamがペアになります。 Agent Framework は、入力の準備ができたらエッジに沿ってデータをルーティングし、Executor をアクティブにする、型指定されたグラフベースの Workflow を中心としています。

  2. ツール: AutoGen は関数を FunctionToolでラップします。 Agent Framework では、 @ai_functionを使用し、スキーマを自動的に推論し、コード インタープリターや Web 検索などのホストされたツールを追加します。

  3. エージェントの動作: AssistantAgentを増やさない限り、max_tool_iterationsは 1 ターンです。 ChatAgent は既定では複数ターンであり、最終的な回答が返されるまでツールを呼び出し続けます。

  4. ランタイム: AutoGen には、埋め込みおよび試験的な分散ランタイムが用意されています。 Agent Framework は現在、単一プロセスの構成に重点を置いています。分散実行が計画されています。

モデル クライアントの作成と構成

どちらのフレームワークも、主要な AI プロバイダー向けのモデル クライアントを提供しますが、API は同じですが、同一ではありません。

特徴 AutoGen エージェント フレームワーク
OpenAI クライアント OpenAIChatCompletionClient OpenAIChatClient
OpenAI Responses クライアント ❌ 利用不可 OpenAIResponsesClient
Azure OpenAI AzureOpenAIChatCompletionClient AzureOpenAIChatClient
Azure OpenAI 応答 ❌ 利用不可 AzureOpenAIResponsesClient
Azure AI AzureAIChatCompletionClient AzureAIAgentClient
Anthropic AnthropicChatCompletionClient 🚧 計画
Ollama OllamaChatCompletionClient 🚧 計画
キャッシュ処理 ChatCompletionCache 上包み 🚧 計画

AutoGen モデル クライアント

from autogen_ext.models.openai import OpenAIChatCompletionClient, AzureOpenAIChatCompletionClient

# OpenAI
client = OpenAIChatCompletionClient(
    model="gpt-5",
    api_key="your-key"
)

# Azure OpenAI
client = AzureOpenAIChatCompletionClient(
    azure_endpoint="https://your-endpoint.openai.azure.com/",
    azure_deployment="gpt-5",
    api_version="2024-12-01",
    api_key="your-key"
)

Agent Framework ChatClients

from agent_framework.openai import OpenAIChatClient
from agent_framework.azure import AzureOpenAIChatClient

# OpenAI (reads API key from environment)
client = OpenAIChatClient(model_id="gpt-5")

# Azure OpenAI (uses environment or default credentials; see samples for auth options)
client = AzureOpenAIChatClient(model_id="gpt-5")

詳細な例については、次を参照してください。

Responses API のサポート (Agent Framework 排他)

Agent Framework の AzureOpenAIResponsesClientOpenAIResponsesClient では、AutoGen では使用できない推論モデルと構造化応答の特殊なサポートが提供されます。

from agent_framework.azure import AzureOpenAIResponsesClient
from agent_framework.openai import OpenAIResponsesClient

# Azure OpenAI with Responses API
azure_responses_client = AzureOpenAIResponsesClient(model_id="gpt-5")

# OpenAI with Responses API
openai_responses_client = OpenAIResponsesClient(model_id="gpt-5")

Responses API の例については、次を参照してください。

Single-Agent 機能マッピング

このセクションでは、AutoGen と Agent Framework の間で単一エージェント機能をマップします。 クライアントを配置した状態で、エージェントを作成し、ツールをアタッチし、非ストリーミング実行とストリーミング実行のどちらかを選択します。

基本的なエージェントの作成と実行

モデル クライアントを構成したら、次の手順ではエージェントを作成します。 どちらのフレームワークも同様のエージェント抽象化を提供しますが、既定の動作と構成オプションが異なります。

AutoGen AssistantAgent

from autogen_agentchat.agents import AssistantAgent

agent = AssistantAgent(
    name="assistant",
    model_client=client,
    system_message="You are a helpful assistant.",
    tools=[my_tool],
    max_tool_iterations=1  # Single-turn by default
)

# Execution
result = await agent.run(task="What's the weather?")

Agent Framework ChatAgent

from agent_framework import ChatAgent, ai_function
from agent_framework.openai import OpenAIChatClient

# Create simple tools for the example
@ai_function
def get_weather(location: str) -> str:
    """Get weather for a location."""
    return f"Weather in {location}: sunny"

@ai_function
def get_time() -> str:
    """Get current time."""
    return "Current time: 2:30 PM"

# Create client
client = OpenAIChatClient(model_id="gpt-5")

async def example():
    # Direct creation
    agent = ChatAgent(
        name="assistant",
        chat_client=client,
        instructions="You are a helpful assistant.",
        tools=[get_weather]  # Multi-turn by default
    )

    # Factory method (more convenient)
    agent = client.create_agent(
        name="assistant",
        instructions="You are a helpful assistant.",
        tools=[get_weather]
    )

    # Execution with runtime tool configuration
    result = await agent.run(
        "What's the weather?",
        tools=[get_time],  # Can add tools at runtime
        tool_choice="auto"
    )

主な違い:

  • 既定の動作: ChatAgent はツール呼び出しを自動的に反復処理しますが、 AssistantAgent には明示的な max_tool_iterations 設定が必要です
  • ランタイム構成: ChatAgent.run() は、呼び出しごとのカスタマイズの tools パラメーターと tool_choice パラメーターを受け入れます
  • ファクトリ メソッド: Agent Framework は、チャット クライアントから直接便利なファクトリ メソッドを提供します
  • 状態管理: ChatAgent はステートレスであり、会話履歴を状態の一部として保持する AssistantAgent とは異なり、呼び出し間の会話履歴は保持されません

AgentThread を使用した会話状態の管理

ChatAgentで会話を続けるには、AgentThreadを使用して会話履歴を管理します。

# Assume we have an agent from previous examples
async def conversation_example():
    # Create a new thread that will be reused
    thread = agent.get_new_thread()

    # First interaction - thread is empty
    result1 = await agent.run("What's 2+2?", thread=thread)
    print(result1.text)  # "4"

    # Continue conversation - thread contains previous messages
    result2 = await agent.run("What about that number times 10?", thread=thread)
    print(result2.text)  # "40" (understands "that number" refers to 4)

    # AgentThread can use external storage, similar to ChatCompletionContext in AutoGen

既定ではステートレス: クイック デモ

# Without a thread (two independent invocations)
r1 = await agent.run("What's 2+2?")
print(r1.text)  # for example, "4"

r2 = await agent.run("What about that number times 10?")
print(r2.text)  # Likely ambiguous without prior context; cannot be "40"

# With a thread (shared context across calls)
thread = agent.get_new_thread()
print((await agent.run("What's 2+2?", thread=thread)).text)  # "4"
print((await agent.run("What about that number times 10?", thread=thread)).text)  # "40"

スレッド管理の例については、以下を参照してください。

OpenAI アシスタント エージェントの等価性

どちらのフレームワークでも、OpenAI Assistant API の統合が提供されます。

# AutoGen OpenAIAssistantAgent
from autogen_ext.agents.openai import OpenAIAssistantAgent
# Agent Framework has OpenAI Assistants support via OpenAIAssistantsClient
from agent_framework.openai import OpenAIAssistantsClient

OpenAI アシスタントの例については、次を参照してください。

ストリーミング のサポート

どちらのフレームワークも、UI の応答性を維持するために、クライアントとエージェントからリアルタイムでトークンをストリーミングします。

AutoGen ストリーミング

# Model client streaming
async for chunk in client.create_stream(messages):
    if isinstance(chunk, str):
        print(chunk, end="")

# Agent streaming
async for event in agent.run_stream(task="Hello"):
    if isinstance(event, ModelClientStreamingChunkEvent):
        print(event.content, end="")
    elif isinstance(event, TaskResult):
        print("Final result received")

Agent Framework Streaming

# Assume we have client, agent, and tools from previous examples
async def streaming_example():
    # Chat client streaming
    async for chunk in client.get_streaming_response("Hello", tools=tools):
        if chunk.text:
            print(chunk.text, end="")

    # Agent streaming
    async for chunk in agent.run_stream("Hello"):
        if chunk.text:
            print(chunk.text, end="", flush=True)

ヒント: Agent Framework では、クライアントとエージェントの両方が同じ更新形式になります。どちらの場合も chunk.text を読み取ることができます。

メッセージの種類と作成

メッセージのしくみを理解することは、効果的なエージェント通信に不可欠です。 どちらのフレームワークも、メッセージの作成と処理に異なるアプローチを提供します。AutoGen では個別のメッセージ クラスを使用し、Agent Framework は統合メッセージ システムを使用します。

AutoGen メッセージの種類

from autogen_agentchat.messages import TextMessage, MultiModalMessage
from autogen_core.models import UserMessage

# Text message
text_msg = TextMessage(content="Hello", source="user")

# Multi-modal message
multi_modal_msg = MultiModalMessage(
    content=["Describe this image", image_data],
    source="user"
)

# Convert to model format for use with model clients
user_message = text_msg.to_model_message()

Agent Framework のメッセージの種類

from agent_framework import ChatMessage, TextContent, DataContent, UriContent, Role
import base64

# Text message
text_msg = ChatMessage(role=Role.USER, text="Hello")

# Supply real image bytes, or use a data: URI/URL via UriContent
image_bytes = b"<your_image_bytes>"
image_b64 = base64.b64encode(image_bytes).decode()
image_uri = f"data:image/jpeg;base64,{image_b64}"

# Multi-modal message with mixed content
multi_modal_msg = ChatMessage(
    role=Role.USER,
    contents=[
        TextContent(text="Describe this image"),
        DataContent(uri=image_uri, media_type="image/jpeg")
    ]
)

主な違い:

  • AutoGen では、TextMessage フィールドを持つ個別のメッセージ クラス (MultiModalMessagesource) が使用されます
  • Agent Framework では、型指定されたコンテンツ オブジェクトとChatMessage フィールドを含む統合roleが使用されます
  • Agent Framework メッセージでは、文字列ソースの代わりに Role 列挙型 (USER、ASSISTANT、SYSTEM、TOOL) が使用されます

ツールの作成と統合

ツールは、テキスト生成を超えてエージェント機能を拡張します。 このフレームワークでは、ツールの作成にさまざまなアプローチが取られます。Agent Framework では、より自動化されたスキーマ生成が提供されます。

AutoGen FunctionTool

from autogen_core.tools import FunctionTool

async def get_weather(location: str) -> str:
    """Get weather for a location."""
    return f"Weather in {location}: sunny"

# Manual tool creation
tool = FunctionTool(
    func=get_weather,
    description="Get weather information"
)

# Use with agent
agent = AssistantAgent(name="assistant", model_client=client, tools=[tool])

Agent Framework @ai_function

from agent_framework import ai_function
from typing import Annotated
from pydantic import Field

@ai_function
def get_weather(
    location: Annotated[str, Field(description="The location to get weather for")]
) -> str:
    """Get weather for a location."""
    return f"Weather in {location}: sunny"

# Direct use with agent (automatic conversion)
agent = ChatAgent(name="assistant", chat_client=client, tools=[get_weather])

詳細な例については、次を参照してください。

ホステッド ツール (Agent Framework 排他)

Agent Framework には、AutoGen では使用できないホスト型ツールが用意されています。

from agent_framework import ChatAgent, HostedCodeInterpreterTool, HostedWebSearchTool
from agent_framework.azure import AzureOpenAIChatClient

# Azure OpenAI client with a model that supports hosted tools
client = AzureOpenAIChatClient(model_id="gpt-5")

# Code execution tool
code_tool = HostedCodeInterpreterTool()

# Web search tool
search_tool = HostedWebSearchTool()

agent = ChatAgent(
    name="researcher",
    chat_client=client,
    tools=[code_tool, search_tool]
)

詳細な例については、次を参照してください。

要件と注意事項:

  • ホストされているツールは、それらをサポートするモデル/アカウントでのみ使用できます。 これらのツールを有効にする前に、プロバイダーの権利とモデルのサポートを確認します。
  • 構成はプロバイダーによって異なります。セットアップとアクセス許可については、各サンプルの前提条件に従ってください。
  • すべてのモデルが、ホストされているすべてのツール (Web 検索とコード インタープリターなど) をサポートしているわけではありません。 環境内で互換性のあるモデルを選択します。

AutoGen はローカル コード実行ツールをサポートしていますが、この機能は今後の Agent Framework バージョンで計画されています。

主な違い: エージェント フレームワークは、エージェント レベルでツールのイテレーションを自動的に処理します。 AutoGen の max_tool_iterations パラメーターとは異なり、Agent Framework エージェントは、無限ループを防ぐための組み込みの安全メカニズムを使用して、既定で完了するまでツールの実行を続行します。

MCP サーバーのサポート

高度なツール統合のために、どちらのフレームワークもモデル コンテキスト プロトコル (MCP) をサポートし、エージェントが外部サービスとデータ ソースと対話できるようにします。 Agent Framework では、より包括的な組み込みサポートが提供されます。

AutoGen MCP のサポート

AutoGen には、拡張機能による基本的な MCP サポートがあります (特定の実装の詳細はバージョンによって異なります)。

Agent Framework MCP のサポート

from agent_framework import ChatAgent, MCPStdioTool, MCPStreamableHTTPTool, MCPWebsocketTool
from agent_framework.openai import OpenAIChatClient

# Create client for the example
client = OpenAIChatClient(model_id="gpt-5")

# Stdio MCP server
mcp_tool = MCPStdioTool(
    name="filesystem",
    command="uvx mcp-server-filesystem",
    args=["/allowed/directory"]
)

# HTTP streaming MCP
http_mcp = MCPStreamableHTTPTool(
    name="http_mcp",
    url="http://localhost:8000/sse"
)

# WebSocket MCP
ws_mcp = MCPWebsocketTool(
    name="websocket_mcp",
    url="ws://localhost:8000/ws"
)

agent = ChatAgent(name="assistant", chat_client=client, tools=[mcp_tool])

MCP の例については、次を参照してください。

ツールとしてのエージェント パターン

強力なパターンの 1 つは、エージェント自体をツールとして使用し、階層型エージェント アーキテクチャを有効にすることです。 どちらのフレームワークも、実装が異なるこのパターンをサポートしています。

AutoGen AgentTool

from autogen_agentchat.tools import AgentTool

# Create specialized agent
writer = AssistantAgent(
    name="writer",
    model_client=client,
    system_message="You are a creative writer."
)

# Wrap as tool
writer_tool = AgentTool(agent=writer)

# Use in coordinator (requires disabling parallel tool calls)
coordinator_client = OpenAIChatCompletionClient(
    model="gpt-5",
    parallel_tool_calls=False
)
coordinator = AssistantAgent(
    name="coordinator",
    model_client=coordinator_client,
    tools=[writer_tool]
)

Agent Framework as_tool()

from agent_framework import ChatAgent

# Assume we have client from previous examples
# Create specialized agent
writer = ChatAgent(
    name="writer",
    chat_client=client,
    instructions="You are a creative writer."
)

# Convert to tool
writer_tool = writer.as_tool(
    name="creative_writer",
    description="Generate creative content",
    arg_name="request",
    arg_description="What to write"
)

# Use in coordinator
coordinator = ChatAgent(
    name="coordinator",
    chat_client=client,
    tools=[writer_tool]
)

明示的な移行に関する注意: AutoGen では、同じエージェント インスタンスを呼び出すときのコンカレンシーの問題を回避するために、エージェントをツールとしてラップするときに、コーディネーターのモデル クライアントに parallel_tool_calls=False を設定します。 エージェント フレームワークでは、エージェントが既定でステートレスになっているので、 as_tool() は並列ツール呼び出しを無効にする必要はありません。

ミドルウェア (エージェント フレームワーク機能)

エージェント フレームワークには、AutoGen に不足しているミドルウェア機能が導入されています。 ミドルウェアを使用すると、ログ記録、セキュリティ、パフォーマンスの監視などの強力な横断的な問題を実現できます。

from agent_framework import ChatAgent, AgentRunContext, FunctionInvocationContext
from typing import Callable, Awaitable

# Assume we have client from previous examples
async def logging_middleware(
    context: AgentRunContext,
    next: Callable[[AgentRunContext], Awaitable[None]]
) -> None:
    print(f"Agent {context.agent.name} starting")
    await next(context)
    print(f"Agent {context.agent.name} completed")

async def security_middleware(
    context: FunctionInvocationContext,
    next: Callable[[FunctionInvocationContext], Awaitable[None]]
) -> None:
    if "password" in str(context.arguments):
        print("Blocking function call with sensitive data")
        return  # Don't call next()
    await next(context)

agent = ChatAgent(
    name="secure_agent",
    chat_client=client,
    middleware=[logging_middleware, security_middleware]
)

メリット:

  • セキュリティ: 入力の検証とコンテンツのフィルター処理
  • 可観測性: ログ記録、メトリック、トレース
  • パフォーマンス: キャッシュとレート制限
  • エラー処理: グレースフルな低下と再試行ロジック

ミドルウェアの詳細な例については、次を参照してください。

カスタム エージェント

モデルに基づくエージェントがまったく必要ない場合があります。カスタム ロジックを使用して決定論的または API ベースのエージェントが必要な場合があります。 どちらのフレームワークもカスタム エージェントの構築をサポートしていますが、パターンは異なります。

AutoGen: サブクラス BaseChatAgent

from typing import Sequence
from autogen_agentchat.agents import BaseChatAgent
from autogen_agentchat.base import Response
from autogen_agentchat.messages import BaseChatMessage, TextMessage, StopMessage
from autogen_core import CancellationToken

class StaticAgent(BaseChatAgent):
    def __init__(self, name: str = "static", description: str = "Static responder") -> None:
        super().__init__(name, description)

    @property
    def produced_message_types(self) -> Sequence[type[BaseChatMessage]]:  # Which message types this agent produces
        return (TextMessage,)

    async def on_messages(self, messages: Sequence[BaseChatMessage], cancellation_token: CancellationToken) -> Response:
        # Always return a static response
        return Response(chat_message=TextMessage(content="Hello from AutoGen custom agent", source=self.name))

Notes:

  • on_messages(...)を実装し、チャット メッセージを含むResponseを返します。
  • 必要に応じて、 on_reset(...) を実装して、実行間の内部状態をクリアします。

エージェント フレームワーク: BaseAgent の拡張 (スレッド対応)

from collections.abc import AsyncIterable
from typing import Any
from agent_framework import (
    AgentRunResponse,
    AgentRunResponseUpdate,
    AgentThread,
    BaseAgent,
    ChatMessage,
    Role,
    TextContent,
)

class StaticAgent(BaseAgent):
    async def run(
        self,
        messages: str | ChatMessage | list[str] | list[ChatMessage] | None = None,
        *,
        thread: AgentThread | None = None,
        **kwargs: Any,
    ) -> AgentRunResponse:
        # Build a static reply
        reply = ChatMessage(role=Role.ASSISTANT, contents=[TextContent(text="Hello from AF custom agent")])

        # Persist conversation to the provided AgentThread (if any)
        if thread is not None:
            normalized = self._normalize_messages(messages)
            await self._notify_thread_of_new_messages(thread, normalized, reply)

        return AgentRunResponse(messages=[reply])

    async def run_stream(
        self,
        messages: str | ChatMessage | list[str] | list[ChatMessage] | None = None,
        *,
        thread: AgentThread | None = None,
        **kwargs: Any,
    ) -> AsyncIterable[AgentRunResponseUpdate]:
        # Stream the same static response in a single chunk for simplicity
        yield AgentRunResponseUpdate(contents=[TextContent(text="Hello from AF custom agent")], role=Role.ASSISTANT)

        # Notify thread of input and the complete response once streaming ends
        if thread is not None:
            reply = ChatMessage(role=Role.ASSISTANT, contents=[TextContent(text="Hello from AF custom agent")])
            normalized = self._normalize_messages(messages)
            await self._notify_thread_of_new_messages(thread, normalized, reply)

Notes:

  • AgentThread 外部で会話状態を維持します。 agent.get_new_thread() を使用して run/run_streamに渡します。
  • スレッドが交換の両側を持つよう、 self._notify_thread_of_new_messages(thread, input_messages, response_messages) を呼び出します。
  • 完全なサンプルを参照してください: カスタム エージェント

次に、フレームワークが最も異なる領域であるマルチエージェント オーケストレーションを見てみましょう。

マルチエージェント機能マッピング

プログラミング モデルの概要

マルチエージェント プログラミング モデルは、2 つのフレームワーク間の最も大きな違いを表します。

AutoGen のデュアル モデル アプローチ

AutoGen には、次の 2 つのプログラミング モデルが用意されています。

  1. autogen-core: RoutedAgent およびメッセージ サブスクリプションを使用した低レベルのイベント ドリブン プログラミング
  2. Team 抽象化: 上に構築された高レベルの実行中心モデル autogen-core
# Low-level autogen-core (complex)
class MyAgent(RoutedAgent):
    @message_handler
    async def handle_message(self, message: TextMessage, ctx: MessageContext) -> None:
        # Handle specific message types
        pass

# High-level Team (easier but limited)
team = RoundRobinGroupChat(
    participants=[agent1, agent2],
    termination_condition=StopAfterNMessages(5)
)
result = await team.run(task="Collaborate on this task")

課題:

  • 低レベルモデルは、ほとんどのユーザーにとって複雑すぎる
  • 高度なモデルは、複雑な動作の制限になる可能性があります
  • 2 つのモデル間のブリッジングにより、実装の複雑さが増す

エージェント フレームワークの統合ワークフロー モデル

Agent Framework には、両方の方法のベストを組み合わせた単一の Workflow 抽象化が用意されています。

from agent_framework import WorkflowBuilder, executor, WorkflowContext
from typing_extensions import Never

# Assume we have agent1 and agent2 from previous examples
@executor(id="agent1")
async def agent1_executor(input_msg: str, ctx: WorkflowContext[str]) -> None:
    response = await agent1.run(input_msg)
    await ctx.send_message(response.text)

@executor(id="agent2")
async def agent2_executor(input_msg: str, ctx: WorkflowContext[Never, str]) -> None:
    response = await agent2.run(input_msg)
    await ctx.yield_output(response.text)  # Final output

# Build typed data flow graph
workflow = (WorkflowBuilder()
           .add_edge(agent1_executor, agent2_executor)
           .set_start_executor(agent1_executor)
           .build())

# Example usage (would be in async context)
# result = await workflow.run("Initial input")

ワークフローの詳細な例については、次を参照してください。

メリット:

  • 統合モデル: すべての複雑さのレベルに対する単一の抽象化
  • タイプ セーフ: 厳密に型指定された入力と出力
  • グラフの視覚化: データ フロー表現をクリアする
  • 柔軟な構成: エージェント、関数、サブワークフローを混在させます

ワークフローと GraphFlow

Agent Framework の Workflow 抽象化は、AutoGen の試験的な GraphFlow 機能に着想を得たものですが、設計理念における重要な進化を表しています。

  • GraphFlow: エッジが遷移であり、メッセージがすべてのエージェントにブロードキャストされる制御フロー ベース。切り替えは、ブロードキャストされたメッセージの内容に合った状態になります
  • ワークフロー: メッセージが特定のエッジを介してルーティングされ、Executor がエッジによってアクティブ化され、同時実行がサポートされるデータ フロー ベース。

ビジュアルの概要

次の図は、AutoGen の制御フロー GraphFlow (左) と Agent Framework のデータ フロー ワークフロー (右) を比較したものです。 GraphFlow は、条件付き遷移とブロードキャストを使用してエージェントをノードとしてモデル化します。 ワークフロー モデル Executor (エージェント、関数、またはサブワークフロー) は、型指定されたエッジによって接続されます。また、要求/応答の一時停止とチェックポイント処理もサポートしています。

flowchart LR

  subgraph AutoGenGraphFlow
    direction TB
    U[User / Task] --> A[Agent A]
    A -->|success| B[Agent B]
    A -->|retry| C[Agent C]
    A -. broadcast .- B
    A -. broadcast .- C
  end

  subgraph AgentFrameworkWorkflow
    direction TB
    I[Input] --> E1[Executor 1]
    E1 -->|"str"| E2[Executor 2]
    E1 -->|"image"| E3[Executor 3]
    E3 -->|"str"| E2
    E2 --> OUT[(Final Output)]
  end

  R[Request / Response Gate]
  E2 -. request .-> R
  R -. resume .-> E2

  CP[Checkpoint]
  E1 -. save .-> CP
  CP -. load .-> E1

実際に:

  • GraphFlow はエージェントをノードとして使用し、メッセージをブロードキャストします。エッジは条件付き遷移を表します。
  • ワークフローは、型指定されたメッセージをエッジに沿ってルーティングします。 ノード (Executor) には、エージェント、純粋関数、またはサブワークフローを指定できます。
  • 要求/応答を使用すると、ワークフローを外部入力に対して一時停止できます。チェックポイント処理は進行状況を保持し、再開を有効にします。

コードの比較

1) シーケンシャル + 条件付き
# AutoGen GraphFlow (fluent builder) — writer → reviewer → editor (conditional)
from autogen_agentchat.agents import AssistantAgent
from autogen_agentchat.teams import DiGraphBuilder, GraphFlow

writer = AssistantAgent(name="writer", description="Writes a draft", model_client=client)
reviewer = AssistantAgent(name="reviewer", description="Reviews the draft", model_client=client)
editor = AssistantAgent(name="editor", description="Finalizes the draft", model_client=client)

graph = (
    DiGraphBuilder()
    .add_node(writer).add_node(reviewer).add_node(editor)
    .add_edge(writer, reviewer)  # always
    .add_edge(reviewer, editor, condition=lambda msg: "approve" in msg.to_model_text())
    .set_entry_point(writer)
).build()

team = GraphFlow(participants=[writer, reviewer, editor], graph=graph)
result = await team.run(task="Draft a short paragraph about solar power")
# Agent Framework Workflow — sequential executors with conditional logic
from agent_framework import WorkflowBuilder, executor, WorkflowContext
from typing_extensions import Never

@executor(id="writer")
async def writer_exec(task: str, ctx: WorkflowContext[str]) -> None:
    await ctx.send_message(f"Draft: {task}")

@executor(id="reviewer")
async def reviewer_exec(draft: str, ctx: WorkflowContext[str]) -> None:
    decision = "approve" if "solar" in draft.lower() else "revise"
    await ctx.send_message(f"{decision}:{draft}")

@executor(id="editor")
async def editor_exec(msg: str, ctx: WorkflowContext[Never, str]) -> None:
    if msg.startswith("approve:"):
        await ctx.yield_output(msg.split(":", 1)[1])
    else:
        await ctx.yield_output("Needs revision")

workflow_seq = (
    WorkflowBuilder()
    .add_edge(writer_exec, reviewer_exec)
    .add_edge(reviewer_exec, editor_exec)
    .set_start_executor(writer_exec)
    .build()
)
2) ファンアウト + 結合 (ALL 対 ANY)
# AutoGen GraphFlow — A → (B, C) → D with ALL/ANY join
from autogen_agentchat.teams import DiGraphBuilder, GraphFlow
A, B, C, D = agent_a, agent_b, agent_c, agent_d

# ALL (default): D runs after both B and C
g_all = (
    DiGraphBuilder()
    .add_node(A).add_node(B).add_node(C).add_node(D)
    .add_edge(A, B).add_edge(A, C)
    .add_edge(B, D).add_edge(C, D)
    .set_entry_point(A)
).build()

# ANY: D runs when either B or C completes
g_any = (
    DiGraphBuilder()
    .add_node(A).add_node(B).add_node(C).add_node(D)
    .add_edge(A, B).add_edge(A, C)
    .add_edge(B, D, activation_group="join_d", activation_condition="any")
    .add_edge(C, D, activation_group="join_d", activation_condition="any")
    .set_entry_point(A)
).build()
# Agent Framework Workflow — A → (B, C) → aggregator (ALL vs ANY)
from agent_framework import WorkflowBuilder, executor, WorkflowContext
from typing_extensions import Never

@executor(id="A")
async def start(task: str, ctx: WorkflowContext[str]) -> None:
    await ctx.send_message(f"B:{task}", target_id="B")
    await ctx.send_message(f"C:{task}", target_id="C")

@executor(id="B")
async def branch_b(text: str, ctx: WorkflowContext[str]) -> None:
    await ctx.send_message(f"B_done:{text}")

@executor(id="C")
async def branch_c(text: str, ctx: WorkflowContext[str]) -> None:
    await ctx.send_message(f"C_done:{text}")

@executor(id="join_any")
async def join_any(msg: str, ctx: WorkflowContext[Never, str]) -> None:
    await ctx.yield_output(f"First: {msg}")  # ANY join (first arrival)

@executor(id="join_all")
async def join_all(msg: str, ctx: WorkflowContext[str, str]) -> None:
    state = await ctx.get_executor_state() or {"items": []}
    state["items"].append(msg)
    await ctx.set_executor_state(state)
    if len(state["items"]) >= 2:
        await ctx.yield_output(" | ".join(state["items"]))  # ALL join

wf_any = (
    WorkflowBuilder()
    .add_edge(start, branch_b).add_edge(start, branch_c)
    .add_edge(branch_b, join_any).add_edge(branch_c, join_any)
    .set_start_executor(start)
    .build()
)

wf_all = (
    WorkflowBuilder()
    .add_edge(start, branch_b).add_edge(start, branch_c)
    .add_edge(branch_b, join_all).add_edge(branch_c, join_all)
    .set_start_executor(start)
    .build()
)
3) 対象ルーティング (ブロードキャストなし)
from agent_framework import WorkflowBuilder, executor, WorkflowContext
from typing_extensions import Never

@executor(id="ingest")
async def ingest(task: str, ctx: WorkflowContext[str]) -> None:
    # Route selectively using target_id
    if task.startswith("image:"):
        await ctx.send_message(task.removeprefix("image:"), target_id="vision")
    else:
        await ctx.send_message(task, target_id="writer")

@executor(id="writer")
async def write(text: str, ctx: WorkflowContext[Never, str]) -> None:
    await ctx.yield_output(f"Draft: {text}")

@executor(id="vision")
async def caption(image_ref: str, ctx: WorkflowContext[Never, str]) -> None:
    await ctx.yield_output(f"Caption: {image_ref}")

workflow = (
    WorkflowBuilder()
    .add_edge(ingest, write)
    .add_edge(ingest, caption)
    .set_start_executor(ingest)
    .build()
)

# Example usage (async):
# await workflow.run("Summarize the benefits of solar power")
# await workflow.run("image:https://example.com/panel.jpg")

注意する内容:

  • GraphFlow はメッセージをブロードキャストし、条件付き遷移を使用します。 結合動作は、ターゲット側のactivationとエッジごとのactivation_group/activation_conditionを使用して構成されます (たとえば、join_dで両方のエッジをactivation_condition="any"にグループ化します)。
  • ワークフローはデータを明示的にルーティングします。 target_id を使用してダウンストリーム Executor を選択します。 結合の動作は、受信側の Executor に存在します (たとえば、最初の入力とすべての待機時のイールド)、またはオーケストレーション ビルダー/アグリゲーターを使用します。
  • Workflow の Executor は自由形式です。 ChatAgent、関数、またはサブワークフローをラップし、同じグラフ内でそれらを混在させます。

主な違い

次の表は、AutoGen の GraphFlow とエージェント フレームワークのワークフローの基本的な違いをまとめたものです。

特徴 AutoGen GraphFlow Agent Framework ワークフロー
フローの種類 制御フロー (エッジは遷移) データ フロー (エッジ ルート メッセージ)
ノードの種類 エージェントのみ エージェント、関数、サブワークフロー
アクティベーション メッセージブロードキャスト エッジ ベースのアクティブ化
タイプ セーフ Limited 全体を通して強力な入力
コンポーザビリティ Limited 高いコンポーザブル

パターンの入れ子

AutoGen チームの入れ子

# Inner team
inner_team = RoundRobinGroupChat(
    participants=[specialist1, specialist2],
    termination_condition=StopAfterNMessages(3)
)

# Outer team with nested team as participant
outer_team = RoundRobinGroupChat(
    participants=[coordinator, inner_team, reviewer],  # Team as participant
    termination_condition=StopAfterNMessages(10)
)

# Messages are broadcasted to all participants including nested team
result = await outer_team.run("Complex task requiring collaboration")

AutoGen の入れ子の特性:

  • 入れ子になったチームが外部チームからすべてのメッセージを受信する
  • 入れ子になったチーム メッセージは、すべての外部チーム参加者にブロードキャストされます
  • すべてのレベルの共有メッセージ コンテキスト

Agent Framework ワークフローの入れ子

from agent_framework import WorkflowExecutor, WorkflowBuilder

# Assume we have executors from previous examples
# specialist1_executor, specialist2_executor, coordinator_executor, reviewer_executor

# Create sub-workflow
sub_workflow = (WorkflowBuilder()
               .add_edge(specialist1_executor, specialist2_executor)
               .set_start_executor(specialist1_executor)
               .build())

# Wrap as executor
sub_workflow_executor = WorkflowExecutor(
    workflow=sub_workflow,
    id="sub_process"
)

# Use in parent workflow
parent_workflow = (WorkflowBuilder()
                  .add_edge(coordinator_executor, sub_workflow_executor)
                  .add_edge(sub_workflow_executor, reviewer_executor)
                  .set_start_executor(coordinator_executor)
                  .build())

Agent Framework の入れ子の特性:

  • 分離された入力/出力 WorkflowExecutor
  • メッセージブロードキャストなし - 特定の接続を経由するデータ フロー
  • 各ワークフロー レベルの独立した状態管理

グループ チャット パターン

グループ チャット パターンを使用すると、複数のエージェントが複雑なタスクで共同作業を行うことができます。 フレームワーク間で一般的なパターンがどのように変換されるかを次に示します。

RoundRobinGroupChat パターン

AutoGen の実装:

from autogen_agentchat.teams import RoundRobinGroupChat
from autogen_agentchat.conditions import StopAfterNMessages

team = RoundRobinGroupChat(
    participants=[agent1, agent2, agent3],
    termination_condition=StopAfterNMessages(10)
)
result = await team.run("Discuss this topic")

エージェント フレームワークの実装:

from agent_framework import SequentialBuilder, WorkflowOutputEvent

# Assume we have agent1, agent2, agent3 from previous examples
# Sequential workflow through participants
workflow = SequentialBuilder().participants([agent1, agent2, agent3]).build()

# Example usage (would be in async context)
async def sequential_example():
    # Each agent appends to shared conversation
    async for event in workflow.run_stream("Discuss this topic"):
        if isinstance(event, WorkflowOutputEvent):
            conversation_history = event.data  # list[ChatMessage]

オーケストレーションの詳細な例については、次を参照してください。

同時実行パターンの場合、Agent Framework には次の機能も用意されています。

from agent_framework import ConcurrentBuilder, WorkflowOutputEvent

# Assume we have agent1, agent2, agent3 from previous examples
# Concurrent workflow for parallel processing
workflow = (ConcurrentBuilder()
           .participants([agent1, agent2, agent3])
           .build())

# Example usage (would be in async context)
async def concurrent_example():
    # All agents process the input concurrently
    async for event in workflow.run_stream("Process this in parallel"):
        if isinstance(event, WorkflowOutputEvent):
            results = event.data  # Combined results from all agents

同時実行の例については、次を参照してください。

MagenticOneGroupChat パターン

AutoGen の実装:

from autogen_agentchat.teams import MagenticOneGroupChat

team = MagenticOneGroupChat(
    participants=[researcher, coder, executor],
    model_client=coordinator_client,
    termination_condition=StopAfterNMessages(20)
)
result = await team.run("Complex research and analysis task")

エージェント フレームワークの実装:

from typing import cast
from agent_framework import (
    MAGENTIC_EVENT_TYPE_AGENT_DELTA,
    MAGENTIC_EVENT_TYPE_ORCHESTRATOR,
    AgentRunUpdateEvent,
    ChatAgent,
    ChatMessage,
    MagenticBuilder,
    WorkflowOutputEvent,
)
from agent_framework.openai import OpenAIChatClient

# Create a manager agent for orchestration
manager_agent = ChatAgent(
    name="MagenticManager",
    description="Orchestrator that coordinates the workflow",
    instructions="You coordinate a team to complete complex tasks efficiently.",
    chat_client=OpenAIChatClient(),
)

workflow = (
    MagenticBuilder()
    .participants(researcher=researcher, coder=coder)
    .with_standard_manager(
        agent=manager_agent,
        max_round_count=20,
        max_stall_count=3,
        max_reset_count=2,
    )
    .build()
)

# Example usage (would be in async context)
async def magentic_example():
    output: str | None = None
    async for event in workflow.run_stream("Complex research task"):
        if isinstance(event, AgentRunUpdateEvent):
            props = event.data.additional_properties if event.data else None
            event_type = props.get("magentic_event_type") if props else None

            if event_type == MAGENTIC_EVENT_TYPE_ORCHESTRATOR:
                text = event.data.text if event.data else ""
                print(f"[ORCHESTRATOR]: {text}")
            elif event_type == MAGENTIC_EVENT_TYPE_AGENT_DELTA:
                agent_id = props.get("agent_id", event.executor_id) if props else event.executor_id
                if event.data and event.data.text:
                    print(f"[{agent_id}]: {event.data.text}", end="")

        elif isinstance(event, WorkflowOutputEvent):
            output_messages = cast(list[ChatMessage], event.data)
            if output_messages:
                output = output_messages[-1].text

Agent Framework のカスタマイズ オプション:

マゼンティック ワークフローには、広範なカスタマイズ オプションが用意されています。

  • マネージャーの構成: カスタム命令とモデル設定で ChatAgent を使用する
  • ラウンド制限: max_round_countmax_stall_countmax_reset_count
  • イベント ストリーミング: magentic_event_typeメタデータでAgentRunUpdateEventを使用する
  • エージェントの特殊化: エージェントごとのカスタム命令とツール
  • Human-in-the-loop: 計画のレビュー、ツールの承認、ストールの介入
# Advanced customization example with human-in-the-loop
from typing import cast
from agent_framework import (
    MAGENTIC_EVENT_TYPE_AGENT_DELTA,
    MAGENTIC_EVENT_TYPE_ORCHESTRATOR,
    AgentRunUpdateEvent,
    ChatAgent,
    MagenticBuilder,
    MagenticHumanInterventionDecision,
    MagenticHumanInterventionKind,
    MagenticHumanInterventionReply,
    MagenticHumanInterventionRequest,
    RequestInfoEvent,
    WorkflowOutputEvent,
)
from agent_framework.openai import OpenAIChatClient

# Create manager agent with custom configuration
manager_agent = ChatAgent(
    name="MagenticManager",
    description="Orchestrator for complex tasks",
    instructions="Custom orchestration instructions...",
    chat_client=OpenAIChatClient(model_id="gpt-4o"),
)

workflow = (
    MagenticBuilder()
    .participants(
        researcher=researcher_agent,
        coder=coder_agent,
        analyst=analyst_agent,
    )
    .with_standard_manager(
        agent=manager_agent,
        max_round_count=15,      # Limit total rounds
        max_stall_count=2,       # Trigger stall handling
        max_reset_count=1,       # Allow one reset on failure
    )
    .with_plan_review()           # Enable human plan review
    .with_human_input_on_stall()  # Enable human intervention on stalls
    .build()
)

# Handle human intervention requests during execution
async for event in workflow.run_stream("Complex task"):
    if isinstance(event, RequestInfoEvent) and event.request_type is MagenticHumanInterventionRequest:
        req = cast(MagenticHumanInterventionRequest, event.data)
        if req.kind == MagenticHumanInterventionKind.PLAN_REVIEW:
            # Review and approve the plan
            reply = MagenticHumanInterventionReply(
                decision=MagenticHumanInterventionDecision.APPROVE
            )
            async for ev in workflow.send_responses_streaming({event.request_id: reply}):
                pass  # Handle continuation

マゼンティックの詳細な例については、次を参照してください。

将来のパターン

Agent Framework のロードマップには、現在開発中のいくつかの AutoGen パターンが含まれています。

  • Swarm パターン: ハンドオフ ベースのエージェントの調整
  • SelectorGroupChat: LLM 駆動スピーカーの選択

Human-in-the-Loop with Request Response

Agent Framework の Workflow の主な新機能は 、要求と応答の概念です。これにより、ワークフローは実行を一時停止し、外部入力を待ってから続行できます。 この機能は、AutoGen の Team 抽象化には存在せず、高度な人間のループ内パターンを可能にします。

AutoGen の制限事項

AutoGen の Team 抽象化は開始されると継続的に実行され、人間の入力の実行を一時停止するための組み込みのメカニズムは提供されません。 人間のループ内機能には、フレームワークの外部でカスタム実装が必要です。

Agent Framework Request-Response API

Agent Framework には、任意の Executor が ctx.request_info() を使用して要求を送信し、 @response_handler デコレーターで応答を処理できる組み込みの要求応答機能が用意されています。

from agent_framework import (
    RequestInfoEvent, WorkflowBuilder, WorkflowContext, 
    Executor, handler, response_handler
)
from dataclasses import dataclass

# Assume we have agent_executor defined elsewhere

# Define typed request payload
@dataclass
class ApprovalRequest:
    """Request human approval for agent output."""
    content: str = ""
    agent_name: str = ""

# Workflow executor that requests human approval
class ReviewerExecutor(Executor):
    
    @handler
    async def review_content(
        self,
        agent_response: str,
        ctx: WorkflowContext
    ) -> None:
        # Request human input with structured data
        approval_request = ApprovalRequest(
            content=agent_response,
            agent_name="writer_agent"
        )
        await ctx.request_info(request_data=approval_request, response_type=str)
    
    @response_handler
    async def handle_approval_response(
        self,
        original_request: ApprovalRequest,
        decision: str,
        ctx: WorkflowContext
    ) -> None:
        decision_lower = decision.strip().lower()
        original_content = original_request.content

        if decision_lower == "approved":
            await ctx.yield_output(f"APPROVED: {original_content}")
        else:
            await ctx.yield_output(f"REVISION NEEDED: {decision}")

# Build workflow with human-in-the-loop
reviewer = ReviewerExecutor(id="reviewer")

workflow = (WorkflowBuilder()
           .add_edge(agent_executor, reviewer)
           .set_start_executor(agent_executor)
           .build())

Human-in-the-Loop ワークフローの実行

Agent Framework には、一時停止再開サイクルを処理するためのストリーミング API が用意されています。

from agent_framework import RequestInfoEvent, WorkflowOutputEvent

# Assume we have workflow defined from previous examples
async def run_with_human_input():
    pending_responses = None
    completed = False

    while not completed:
        # First iteration uses run_stream, subsequent use send_responses_streaming
        stream = (
            workflow.send_responses_streaming(pending_responses)
            if pending_responses
            else workflow.run_stream("initial input")
        )

        events = [event async for event in stream]
        pending_responses = None

        # Collect human requests and outputs
        for event in events:
            if isinstance(event, RequestInfoEvent):
                # Display request to human and collect response
                request_data = event.data  # ApprovalRequest instance
                print(f"Review needed: {request_data.content}")

                human_response = input("Enter 'approved' or revision notes: ")
                pending_responses = {event.request_id: human_response}

            elif isinstance(event, WorkflowOutputEvent):
                print(f"Final result: {event.data}")
                completed = True

human-in-the-loop ワークフローの例については、次を参照してください。

ワークフローのチェックポイント処理と再開

AutoGen のWorkflow抽象化に対する Agent Framework のTeamのもう 1 つの主な利点は、チェックポイント処理と実行の再開の組み込みサポートです。 これにより、ワークフローをチェックポイントから後で一時停止、永続化、再開できるため、フォールト トレランスが提供され、実行時間の長いワークフローまたは非同期ワークフローが有効になります。

AutoGen の制限事項

AutoGen の Team 抽象化では、組み込みのチェックポイント機能は提供されません。 永続化または回復メカニズムは外部で実装する必要があり、多くの場合、複雑な状態管理とシリアル化ロジックが必要です。

エージェント フレームワークのチェックポイント処理

Agent Framework では、FileCheckpointStoragewith_checkpointing()WorkflowBuilderメソッドを使用した包括的なチェックポイント処理が提供されます。 チェックポイント キャプチャ:

  • Executor 状態: 使用する各 Executor のローカル状態 ctx.set_executor_state()
  • 共有状態: 次を使用して実行プログラム間の状態 ctx.set_shared_state()
  • メッセージ キュー: Executor 間の保留中のメッセージ
  • ワークフローの位置: 現在の実行の進行状況と次のステップ
from agent_framework import (
    FileCheckpointStorage, WorkflowBuilder, WorkflowContext,
    Executor, handler
)
from typing_extensions import Never

class ProcessingExecutor(Executor):
    @handler
    async def process(self, data: str, ctx: WorkflowContext[str]) -> None:
        # Process the data
        result = f"Processed: {data.upper()}"
        print(f"Processing: '{data}' -> '{result}'")

        # Persist executor-local state
        prev_state = await ctx.get_executor_state() or {}
        count = prev_state.get("count", 0) + 1
        await ctx.set_executor_state({
            "count": count,
            "last_input": data,
            "last_output": result
        })

        # Persist shared state for other executors
        await ctx.set_shared_state("original_input", data)
        await ctx.set_shared_state("processed_output", result)

        await ctx.send_message(result)

class FinalizeExecutor(Executor):
    @handler
    async def finalize(self, data: str, ctx: WorkflowContext[Never, str]) -> None:
        result = f"Final: {data}"
        await ctx.yield_output(result)

# Configure checkpoint storage
checkpoint_storage = FileCheckpointStorage(storage_path="./checkpoints")
processing_executor = ProcessingExecutor(id="processing")
finalize_executor = FinalizeExecutor(id="finalize")

# Build workflow with checkpointing enabled
workflow = (WorkflowBuilder()
           .add_edge(processing_executor, finalize_executor)
           .set_start_executor(processing_executor)
           .with_checkpointing(checkpoint_storage=checkpoint_storage)  # Enable checkpointing
           .build())

# Example usage (would be in async context)
async def checkpoint_example():
    # Run workflow - checkpoints are created automatically
    async for event in workflow.run_stream("input data"):
        print(f"Event: {event}")

チェックポイントからの再開

Agent Framework には、特定のチェックポイントの一覧表示、検査、再開を行う API が用意されています。

from typing_extensions import Never

from agent_framework import (
    Executor,
    FileCheckpointStorage,
    WorkflowContext,
    WorkflowBuilder,
    get_checkpoint_summary,
    handler,
)

class UpperCaseExecutor(Executor):
    @handler
    async def process(self, text: str, ctx: WorkflowContext[str]) -> None:
        result = text.upper()
        await ctx.send_message(result)

class ReverseExecutor(Executor):
    @handler
    async def process(self, text: str, ctx: WorkflowContext[Never, str]) -> None:
        result = text[::-1]
        await ctx.yield_output(result)

def create_workflow(checkpoint_storage: FileCheckpointStorage):
    """Create a workflow with two executors and checkpointing."""
    upper_executor = UpperCaseExecutor(id="upper")
    reverse_executor = ReverseExecutor(id="reverse")

    return (WorkflowBuilder()
           .add_edge(upper_executor, reverse_executor)
           .set_start_executor(upper_executor)
           .with_checkpointing(checkpoint_storage=checkpoint_storage)
           .build())

# Assume we have checkpoint_storage from previous examples
checkpoint_storage = FileCheckpointStorage(storage_path="./checkpoints")

async def checkpoint_resume_example():
    # List available checkpoints
    checkpoints = await checkpoint_storage.list_checkpoints()

    # Display checkpoint information
    for checkpoint in checkpoints:
        summary = get_checkpoint_summary(checkpoint)
        print(f"Checkpoint {summary.checkpoint_id}: iteration={summary.iteration_count}")

    # Resume from a specific checkpoint
    if checkpoints:
        chosen_checkpoint_id = checkpoints[0].checkpoint_id

        # Create new workflow instance and resume
        new_workflow = create_workflow(checkpoint_storage)
        async for event in new_workflow.run_stream(
            checkpoint_id=chosen_checkpoint_id,
            checkpoint_storage=checkpoint_storage
        ):
            print(f"Resumed event: {event}")

高度なチェックポイント機能

Human-in-the-Loop 統合を使用したチェックポイント:

チェックポイント処理は、人間が入力するためにワークフローを一時停止し、後で再開できるように、ループ内の人間のワークフローとシームレスに連携します。 保留中の要求を含むチェックポイントから再開すると、これらの要求はイベントとして再出力されます。

# Assume we have workflow, checkpoint_id, and checkpoint_storage from previous examples
async def resume_with_pending_requests_example():
    # Resume from checkpoint - pending requests will be re-emitted
    request_info_events = []
    async for event in workflow.run_stream(
        checkpoint_id=checkpoint_id,
        checkpoint_storage=checkpoint_storage
    ):
        if isinstance(event, RequestInfoEvent):
            request_info_events.append(event)

    # Handle re-emitted pending request
    responses = {}
    for event in request_info_events:
        response = handle_request(event.data)
        responses[event.request_id] = response

    # Send response back to workflow
    async for event in workflow.send_responses_streaming(responses):
        print(f"Event: {event}")

主な利点

AutoGen と比較して、Agent Framework のチェックポイント処理では次の機能が提供されます。

  • 自動永続化: 手動の状態管理は必要ありません
  • 細かい復旧: スーパーステップ境界からの再開
  • 状態の分離: Executor ローカル状態と共有状態を分離する
  • 人間とループ内の統合: ユーザー入力によるシームレスな一時停止再開
  • フォールト トレランス: 障害または中断からの堅牢な復旧

実際の例

包括的なチェックポイント処理の例については、次を参照してください。


Observability

AutoGen と Agent Framework はどちらも監視機能を提供しますが、さまざまなアプローチと機能を備えています。

AutoGen の可観測性

AutoGen には、次のインストルメンテーションを使用した OpenTelemetry のネイティブ サポートがあります。

  • ランタイム トレース: SingleThreadedAgentRuntimeGrpcWorkerAgentRuntime
  • ツールの実行: GenAI セマンティック規則に従ってBaseToolスパンを持つexecute_tool
  • エージェント操作: BaseChatAgentcreate_agentスパンを使用したinvoke_agent
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from autogen_core import SingleThreadedAgentRuntime

# Configure OpenTelemetry
tracer_provider = TracerProvider()
trace.set_tracer_provider(tracer_provider)

# Pass to runtime
runtime = SingleThreadedAgentRuntime(tracer_provider=tracer_provider)

エージェント フレームワークの可観測性

Agent Framework は、複数のアプローチを通じて包括的な可観測性を提供します。

  • ゼロ コードのセットアップ: 環境変数を使用した自動インストルメンテーション
  • 手動構成: カスタム パラメーターを使用したプログラムによるセットアップ
  • 豊富なテレメトリ: エージェント、ワークフロー、ツール実行の追跡
  • コンソール出力: コンソールの組み込みのログ記録と視覚化
from agent_framework import ChatAgent
from agent_framework.observability import setup_observability
from agent_framework.openai import OpenAIChatClient

# Zero-code setup via environment variables
# Set ENABLE_OTEL=true
# Set OTLP_ENDPOINT=http://localhost:4317

# Or manual setup
setup_observability(
    otlp_endpoint="http://localhost:4317"
)

# Create client for the example
client = OpenAIChatClient(model_id="gpt-5")

async def observability_example():
    # Observability is automatically applied to all agents and workflows
    agent = ChatAgent(name="assistant", chat_client=client)
    result = await agent.run("Hello")  # Automatically traced

主な違い:

  • セットアップの複雑さ: Agent Framework では、よりシンプルなゼロコード セットアップ オプションが提供されます
  • スコープ: エージェント フレームワークは、ワークフロー レベルの可観測性を含む広範な範囲を提供します
  • 視覚化: Agent Framework には、コンソール出力と開発 UI が組み込まれています
  • 構成: Agent Framework では、より柔軟な構成オプションが提供されます

監視の詳細な例については、次を参照してください。


Conclusion

この移行ガイドでは、AutoGen と Microsoft Agent Framework の間の包括的なマッピングを提供します。基本的なエージェントの作成から複雑なマルチエージェント ワークフローまで、すべてをカバーします。 移行に関する重要なポイント:

  • 単一エージェントの移行 は簡単で、同様の API と Agent Framework の機能が強化されています
  • マルチエージェント パターンでは 、イベント ドリブンからデータ フロー ベースのアーキテクチャへのアプローチを再考する必要がありますが、GraphFlow に既に慣れている場合は、移行が容易になります
  • Agent Framework には 、ミドルウェア、ホステッド ツール、型指定されたワークフローなどの追加機能が用意されています

その他の例と詳細な実装ガイダンスについては、 Agent Framework のサンプル ディレクトリを参照してください。

その他のサンプル カテゴリ

Agent Framework には、他のいくつかの重要な領域にわたるサンプルが用意されています。

次のステップ