Bagikan melalui


Panduan Migrasi AutoGen ke Microsoft Agent Framework

Panduan komprehensif untuk bermigrasi dari AutoGen ke Microsoft Agent Framework Python SDK.

Daftar Isi

Latar Belakang

AutoGen adalah kerangka kerja untuk membangun agen AI dan sistem multi-agen menggunakan model bahasa besar (LLM). Ini dimulai sebagai proyek penelitian di Microsoft Research dan merintis beberapa konsep dalam orkestrasi multi-agen, seperti GroupChat dan runtime agen berbasis peristiwa. Proyek ini telah menjadi kolaborasi yang bermanfaat dari komunitas sumber terbuka dan banyak fitur penting berasal dari kontributor eksternal.

Microsoft Agent Framework adalah SDK multi-bahasa baru untuk membangun agen dan alur kerja AI menggunakan LLM. Ini mewakili evolusi yang signifikan dari ide-ide yang dirintis dalam AutoGen dan menggabungkan pelajaran yang dipelajari dari penggunaan dunia nyata. Ini dikembangkan oleh tim Inti AutoGen dan Semantic Kernel di Microsoft, dan dirancang untuk menjadi fondasi baru untuk membangun aplikasi AI ke depannya.

Panduan ini menjelaskan jalur migrasi praktis: dimulai dengan mencakup apa yang tetap sama dan apa yang berubah sekilas. Kemudian, ini mencakup pengaturan klien model, fitur agen tunggal, dan akhirnya orkestrasi multi-agen dengan kode konkret berdampingan. Sepanjang jalan, tautan ke sampel yang dapat dijalankan di repositori Kerangka Kerja Agen membantu Anda memvalidasi setiap langkah.

Kesamaan dan Perbedaan Utama

Apa yang Tetap Sama

Yayasan sudah tidak asing lagi. Anda masih membuat agen di sekitar klien model, memberikan instruksi, dan melampirkan alat. Kedua pustaka mendukung alat gaya fungsi, streaming token, konten multimodal, dan I/O asinkron.

# Both frameworks follow similar patterns
# AutoGen
agent = AssistantAgent(name="assistant", model_client=client, tools=[my_tool])
result = await agent.run(task="Help me with this task")

# Agent Framework
agent = ChatAgent(name="assistant", chat_client=client, tools=[my_tool])
result = await agent.run("Help me with this task")

Perbedaan Utama

  1. Gaya orkestrasi: AutoGen memasangkan inti berbasis peristiwa dengan tingkat Teamtinggi . Agent Framework berpusat pada berbasis Workflow grafik yang diketik yang merutekan data di sepanjang tepi dan mengaktifkan pelaksana saat input siap.

  2. Alat: AutoGen membungkus fungsi dengan FunctionTool. Agent Framework menggunakan @tool, menyimpulkan skema secara otomatis, dan menambahkan alat yang dihosting seperti penerjemah kode dan pencarian web.

  3. Perilaku agen: AssistantAgent adalah giliran tunggal kecuali Anda meningkatkan max_tool_iterations. ChatAgent adalah multi-giliran secara default dan terus memanggil alat sampai dapat mengembalikan jawaban akhir.

  4. Runtime: AutoGen menawarkan runtime terdistribusi yang disematkan dan eksperimental. Agent Framework berfokus pada komposisi proses tunggal hari ini; eksekusi terdistribusi direncanakan.

Pembuatan dan Konfigurasi Klien Model

Kedua kerangka kerja menyediakan klien model untuk penyedia AI utama, dengan API serupa tetapi tidak identik.

Fitur AutoGen Kerangka Kerja Agen
Klien OpenAI OpenAIChatCompletionClient OpenAIChatClient
Klien Respons OpenAI ❌ Tidak tersedia OpenAIResponsesClient
Azure OpenAI AzureOpenAIChatCompletionClient AzureOpenAIChatClient
Respons Azure OpenAI ❌ Tidak tersedia AzureOpenAIResponsesClient
Azure AI AzureAIChatCompletionClient AzureAIAgentClient
Antropik AnthropicChatCompletionClient 🚧 Direncanakan
Ollama OllamaChatCompletionClient 🚧 Direncanakan
Penggunaan Cache ChatCompletionCache Pembungkus 🚧 Direncanakan

Klien Model AutoGen

from autogen_ext.models.openai import OpenAIChatCompletionClient, AzureOpenAIChatCompletionClient

# OpenAI
client = OpenAIChatCompletionClient(
    model="gpt-5",
    api_key="your-key"
)

# Azure OpenAI
client = AzureOpenAIChatCompletionClient(
    azure_endpoint="https://your-endpoint.openai.azure.com/",
    azure_deployment="gpt-5",
    api_version="2024-12-01",
    api_key="your-key"
)

Agent Framework ChatClients

from agent_framework.openai import OpenAIChatClient
from agent_framework.azure import AzureOpenAIChatClient

# OpenAI (reads API key from environment)
client = OpenAIChatClient(model_id="gpt-5")

# Azure OpenAI (uses environment or default credentials; see samples for auth options)
client = AzureOpenAIChatClient(model_id="gpt-5")

Untuk contoh terperinci, lihat:

Dukungan API Respons (Agen Framework Exclusive)

Kerangka Kerja AzureOpenAIResponsesClient Agen dan OpenAIResponsesClient memberikan dukungan khusus untuk model penalaran dan respons terstruktur yang tidak tersedia di AutoGen:

from agent_framework.azure import AzureOpenAIResponsesClient
from agent_framework.openai import OpenAIResponsesClient

# Azure OpenAI with Responses API
azure_responses_client = AzureOpenAIResponsesClient(model_id="gpt-5")

# OpenAI with Responses API
openai_responses_client = OpenAIResponsesClient(model_id="gpt-5")

Untuk contoh API Respons, lihat:

Pemetaan Fitur Single-Agent

Bagian ini memetakan fitur agen tunggal antara AutoGen dan Agent Framework. Dengan klien di tempat, buat agen, lampirkan alat, dan pilih antara eksekusi non-streaming dan streaming.

Pembuatan dan Eksekusi Agen Dasar

Setelah Anda mengonfigurasi klien model, langkah selanjutnya adalah membuat agen. Kedua kerangka kerja menyediakan abstraksi agen serupa, tetapi dengan perilaku default dan opsi konfigurasi yang berbeda.

AutoGen AssistantAgent

from autogen_agentchat.agents import AssistantAgent

agent = AssistantAgent(
    name="assistant",
    model_client=client,
    system_message="You are a helpful assistant.",
    tools=[my_tool],
    max_tool_iterations=1  # Single-turn by default
)

# Execution
result = await agent.run(task="What's the weather?")

Agen Framework ChatAgent

from agent_framework import ChatAgent, tool
from agent_framework.openai import OpenAIChatClient

# Create simple tools for the example
@tool
def get_weather(location: str) -> str:
    """Get weather for a location."""
    return f"Weather in {location}: sunny"

@tool
def get_time() -> str:
    """Get current time."""
    return "Current time: 2:30 PM"

# Create client
client = OpenAIChatClient(model_id="gpt-5")

async def example():
    # Direct creation with default options
    agent = ChatAgent(
        name="assistant",
        chat_client=client,
        instructions="You are a helpful assistant.",
        tools=[get_weather],  # Multi-turn by default
        default_options={
            "temperature": 0.7,
            "max_tokens": 1000,
        }
    )

    # Factory method (more convenient)
    agent = client.as_agent(
        name="assistant",
        instructions="You are a helpful assistant.",
        tools=[get_weather],
        default_options={"temperature": 0.7}
    )

    # Execution with runtime tool and options configuration
    result = await agent.run(
        "What's the weather?",
        tools=[get_time],  # Can add tools at runtime (keyword arg)
        options={"tool_choice": "auto"}  # Other options go in options dict
    )

Perbedaan Utama:

  • Perilaku default: ChatAgent secara otomatis melakukan iterasi melalui panggilan alat, sementara AssistantAgent memerlukan pengaturan eksplisit max_tool_iterations
  • Konfigurasi runtime: ChatAgent.run() menerima tools sebagai argumen kata kunci dan opsi lain melalui options parameter dict untuk kustomisasi per pemanggilan
  • Sistem opsi: Agent Framework menggunakan opsi berbasis TypedDict (misalnya, OpenAIChatOptions) untuk keamanan jenis dan pelengkapan otomatis IDE. Opsi diteruskan melalui default_options saat konstruksi dan options saat runtime
  • Metode pabrik: Agent Framework menyediakan metode pabrik yang nyaman langsung dari klien obrolan
  • Manajemen status: ChatAgent tanpa status dan tidak mempertahankan riwayat percakapan antara pemanggilan, tidak seperti AssistantAgent yang mempertahankan riwayat percakapan sebagai bagian dari statusnya

Mengelola Status Percakapan dengan AgentThread

Untuk melanjutkan percakapan dengan ChatAgent, gunakan AgentThread untuk mengelola riwayat percakapan:

# Assume we have an agent from previous examples
async def conversation_example():
    # Create a new thread that will be reused
    thread = agent.get_new_thread()

    # First interaction - thread is empty
    result1 = await agent.run("What's 2+2?", thread=thread)
    print(result1.text)  # "4"

    # Continue conversation - thread contains previous messages
    result2 = await agent.run("What about that number times 10?", thread=thread)
    print(result2.text)  # "40" (understands "that number" refers to 4)

    # AgentThread can use external storage, similar to ChatCompletionContext in AutoGen

Stateless secara default: demo cepat

# Without a thread (two independent invocations)
r1 = await agent.run("What's 2+2?")
print(r1.text)  # for example, "4"

r2 = await agent.run("What about that number times 10?")
print(r2.text)  # Likely ambiguous without prior context; cannot be "40"

# With a thread (shared context across calls)
thread = agent.get_new_thread()
print((await agent.run("What's 2+2?", thread=thread)).text)  # "4"
print((await agent.run("What about that number times 10?", thread=thread)).text)  # "40"

Untuk contoh manajemen utas, lihat:

Kesetaraan Agen Asisten OpenAI

Kedua kerangka kerja menyediakan integrasi OpenAI Assistant API:

# AutoGen OpenAIAssistantAgent
from autogen_ext.agents.openai import OpenAIAssistantAgent
# Agent Framework has OpenAI Assistants support via OpenAIAssistantsClient
from agent_framework.openai import OpenAIAssistantsClient

Untuk contoh Asisten OpenAI, lihat:

Dukungan Streaming

Kedua kerangka kerja mengalirkan token secara real time—dari klien dan dari agen—untuk menjaga UI tetap responsif.

AutoGen Streaming

# Model client streaming
async for chunk in client.create_stream(messages):
    if isinstance(chunk, str):
        print(chunk, end="")

# Agent streaming
async for event in agent.run_stream(task="Hello"):
    if isinstance(event, ModelClientStreamingChunkEvent):
        print(event.content, end="")
    elif isinstance(event, TaskResult):
        print("Final result received")

Streaming Kerangka Kerja Agen

# Assume we have client, agent, and tools from previous examples
async def streaming_example():
    # Chat client streaming - tools go in options dict
    async for chunk in client.get_streaming_response(
        "Hello",
        options={"tools": tools}
    ):
        if chunk.text:
            print(chunk.text, end="")

    # Agent streaming - tools can be keyword arg on agents
    async for chunk in agent.run_stream("Hello", tools=tools):
        if chunk.text:
            print(chunk.text, end="", flush=True)

Tips: Dalam Agent Framework, klien dan agen menghasilkan bentuk pembaruan yang sama; Anda dapat membaca chunk.text dalam kedua kasus. Perhatikan bahwa untuk klien obrolan, tools masuk ke options dict, sementara untuk agen, tools tetap menjadi argumen kata kunci langsung.

Jenis dan Pembuatan Pesan

Memahami cara kerja pesan sangat penting untuk komunikasi agen yang efektif. Kedua kerangka kerja menyediakan pendekatan yang berbeda untuk pembuatan dan penanganan pesan, dengan AutoGen menggunakan kelas pesan terpisah dan Agent Framework menggunakan sistem pesan terpadu.

Jenis Pesan AutoGen

from autogen_agentchat.messages import TextMessage, MultiModalMessage
from autogen_core.models import UserMessage

# Text message
text_msg = TextMessage(content="Hello", source="user")

# Multi-modal message
multi_modal_msg = MultiModalMessage(
    content=["Describe this image", image_data],
    source="user"
)

# Convert to model format for use with model clients
user_message = text_msg.to_model_message()

Jenis Pesan Kerangka Kerja Agen

from agent_framework import ChatMessage, TextContent, DataContent, UriContent, Role
import base64

# Text message
text_msg = ChatMessage(role=Role.USER, text="Hello")

# Supply real image bytes, or use a data: URI/URL via UriContent
image_bytes = b"<your_image_bytes>"
image_b64 = base64.b64encode(image_bytes).decode()
image_uri = f"data:image/jpeg;base64,{image_b64}"

# Multi-modal message with mixed content
multi_modal_msg = ChatMessage(
    role=Role.USER,
    contents=[
        TextContent(text="Describe this image"),
        DataContent(uri=image_uri, media_type="image/jpeg")
    ]
)

Perbedaan Utama:

  • AutoGen menggunakan kelas pesan terpisah (TextMessage, MultiModalMessage) dengan source bidang
  • Kerangka Kerja Agen menggunakan terpadu ChatMessage dengan objek konten yang di ketik dan role bidang
  • Pesan Kerangka Kerja Agen menggunakan Role enum (USER, ASSISTANT, SYSTEM, TOOL) alih-alih sumber string

Pembuatan dan Integrasi Alat

Alat memperluas kemampuan agen di luar pembuatan teks. Kerangka kerja mengambil pendekatan yang berbeda untuk pembuatan alat, dengan Agent Framework menyediakan pembuatan skema yang lebih otomatis.

AutoGen FunctionTool

from autogen_core.tools import FunctionTool

async def get_weather(location: str) -> str:
    """Get weather for a location."""
    return f"Weather in {location}: sunny"

# Manual tool creation
tool = FunctionTool(
    func=get_weather,
    description="Get weather information"
)

# Use with agent
agent = AssistantAgent(name="assistant", model_client=client, tools=[tool])

Kerangka Kerja Agen @tool

from agent_framework import tool
from typing import Annotated
from pydantic import Field

@tool
def get_weather(
    location: Annotated[str, Field(description="The location to get weather for")]
) -> str:
    """Get weather for a location."""
    return f"Weather in {location}: sunny"

# Direct use with agent (automatic conversion)
agent = ChatAgent(name="assistant", chat_client=client, tools=[get_weather])

Untuk contoh terperinci, lihat:

Alat yang Dihosting (Agent Framework Exclusive)

Agent Framework menyediakan alat yang dihosting yang tidak tersedia di AutoGen:

from agent_framework import ChatAgent, HostedCodeInterpreterTool, HostedWebSearchTool
from agent_framework.azure import AzureOpenAIChatClient

# Azure OpenAI client with a model that supports hosted tools
client = AzureOpenAIChatClient(model_id="gpt-5")

# Code execution tool
code_tool = HostedCodeInterpreterTool()

# Web search tool
search_tool = HostedWebSearchTool()

agent = ChatAgent(
    name="researcher",
    chat_client=client,
    tools=[code_tool, search_tool]
)

Untuk contoh terperinci, lihat:

Persyaratan dan peringatan:

  • Alat yang dihosting hanya tersedia di model/akun yang mendukungnya. Verifikasi pemberian izin dan dukungan model untuk penyedia Anda sebelum mengaktifkan alat-alat ini.
  • Konfigurasi berbeda menurut penyedia; ikuti prasyarat di setiap sampel untuk penyiapan dan izin.
  • Tidak setiap model mendukung setiap alat yang dihosting (misalnya, pencarian web vs penerjemah kode). Pilih model yang kompatibel di lingkungan Anda.

Nota

AutoGen mendukung alat eksekusi kode lokal, tetapi fitur ini direncanakan untuk versi Agent Framework di masa mendatang.

Perbedaan Utama: Agent Framework menangani iterasi alat secara otomatis di tingkat agen. Tidak seperti parameter AutoGen max_tool_iterations , agen Agent Framework melanjutkan eksekusi alat hingga selesai secara default, dengan mekanisme keamanan bawaan untuk mencegah perulangan tak terbatas.

Dukungan Server MCP

Untuk integrasi alat tingkat lanjut, kedua kerangka kerja mendukung Model Context Protocol (MCP), memungkinkan agen berinteraksi dengan layanan eksternal dan sumber data. Agent Framework menyediakan dukungan bawaan yang lebih komprehensif.

Dukungan MCP AutoGen

AutoGen memiliki dukungan MCP dasar melalui ekstensi (detail implementasi tertentu bervariasi menurut versi).

Dukungan MCP Kerangka Kerja Agen

from agent_framework import ChatAgent, MCPStdioTool, MCPStreamableHTTPTool, MCPWebsocketTool
from agent_framework.openai import OpenAIChatClient

# Create client for the example
client = OpenAIChatClient(model_id="gpt-5")

# Stdio MCP server
mcp_tool = MCPStdioTool(
    name="filesystem",
    command="uvx mcp-server-filesystem",
    args=["/allowed/directory"]
)

# HTTP streaming MCP
http_mcp = MCPStreamableHTTPTool(
    name="http_mcp",
    url="http://localhost:8000/sse"
)

# WebSocket MCP
ws_mcp = MCPWebsocketTool(
    name="websocket_mcp",
    url="ws://localhost:8000/ws"
)

agent = ChatAgent(name="assistant", chat_client=client, tools=[mcp_tool])

Untuk contoh MCP, lihat:

Pola Agen sebagai Alat

Salah satu pola yang kuat adalah menggunakan agen itu sendiri sebagai alat, memungkinkan arsitektur agen hierarkis. Kedua kerangka kerja mendukung pola ini dengan implementasi yang berbeda.

AutoGen AgentTool

from autogen_agentchat.tools import AgentTool

# Create specialized agent
writer = AssistantAgent(
    name="writer",
    model_client=client,
    system_message="You are a creative writer."
)

# Wrap as tool
writer_tool = AgentTool(agent=writer)

# Use in coordinator (requires disabling parallel tool calls)
coordinator_client = OpenAIChatCompletionClient(
    model="gpt-5",
    parallel_tool_calls=False
)
coordinator = AssistantAgent(
    name="coordinator",
    model_client=coordinator_client,
    tools=[writer_tool]
)

As_tool Kerangka Kerja Agen()

from agent_framework import ChatAgent

# Assume we have client from previous examples
# Create specialized agent
writer = ChatAgent(
    name="writer",
    chat_client=client,
    instructions="You are a creative writer."
)

# Convert to tool
writer_tool = writer.as_tool(
    name="creative_writer",
    description="Generate creative content",
    arg_name="request",
    arg_description="What to write"
)

# Use in coordinator
coordinator = ChatAgent(
    name="coordinator",
    chat_client=client,
    tools=[writer_tool]
)

Catatan migrasi eksplisit: Di AutoGen, atur parallel_tool_calls=False pada klien model koordinator saat membungkus agen sebagai alat untuk menghindari masalah konkurensi saat memanggil instans agen yang sama. Dalam Agent Framework, as_tool() tidak memerlukan menonaktifkan panggilan alat paralel karena agen tidak memiliki status secara default.

Middleware (Fitur Kerangka Kerja Agen)

Agent Framework memperkenalkan kemampuan middleware yang kurang oleh AutoGen. Middleware memungkinkan masalah lintas-pemotongan yang kuat seperti pengelogan, keamanan, dan pemantauan performa.

from agent_framework import ChatAgent, AgentRunContext, FunctionInvocationContext
from typing import Callable, Awaitable

# Assume we have client from previous examples
async def logging_middleware(
    context: AgentRunContext,
    next: Callable[[AgentRunContext], Awaitable[None]]
) -> None:
    print(f"Agent {context.agent.name} starting")
    await next(context)
    print(f"Agent {context.agent.name} completed")

async def security_middleware(
    context: FunctionInvocationContext,
    next: Callable[[FunctionInvocationContext], Awaitable[None]]
) -> None:
    if "password" in str(context.arguments):
        print("Blocking function call with sensitive data")
        return  # Don't call next()
    await next(context)

agent = ChatAgent(
    name="secure_agent",
    chat_client=client,
    middleware=[logging_middleware, security_middleware]
)

Keuntungan:

  • Keamanan: Validasi input dan pemfilteran konten
  • Pengamatan: Pengelogan, metrik, dan pelacakan
  • Performa: Penembolokan dan pembatasan laju
  • Penanganan kesalahan: Degradasi yang anggun dan logika coba lagi

Untuk contoh middleware terperinci, lihat:

Agen Khusus

Terkadang Anda tidak ingin agen yang didukung model sama sekali—Anda menginginkan agen deterministik atau didukung API dengan logika kustom. Kedua kerangka kerja mendukung pembangunan agen kustom, tetapi polanya berbeda.

AutoGen: Subkelas BaseChatAgent

from typing import Sequence
from autogen_agentchat.agents import BaseChatAgent
from autogen_agentchat.base import Response
from autogen_agentchat.messages import BaseChatMessage, TextMessage, StopMessage
from autogen_core import CancellationToken

class StaticAgent(BaseChatAgent):
    def __init__(self, name: str = "static", description: str = "Static responder") -> None:
        super().__init__(name, description)

    @property
    def produced_message_types(self) -> Sequence[type[BaseChatMessage]]:  # Which message types this agent produces
        return (TextMessage,)

    async def on_messages(self, messages: Sequence[BaseChatMessage], cancellation_token: CancellationToken) -> Response:
        # Always return a static response
        return Response(chat_message=TextMessage(content="Hello from AutoGen custom agent", source=self.name))

Notes:

  • Terapkan on_messages(...) dan kembalikan Response dengan pesan obrolan.
  • Secara opsional terapkan on_reset(...) untuk menghapus status internal antara eksekusi.

Kerangka Kerja Agen: Perluas BaseAgent (sadar utas)

from collections.abc import AsyncIterable
from typing import Any
from agent_framework import (
    AgentResponse,
    AgentResponseUpdate,
    AgentThread,
    BaseAgent,
    ChatMessage,
    Role,
    TextContent,
)

class StaticAgent(BaseAgent):
    async def run(
        self,
        messages: str | ChatMessage | list[str] | list[ChatMessage] | None = None,
        *,
        thread: AgentThread | None = None,
        **kwargs: Any,
    ) -> AgentResponse:
        # Build a static reply
        reply = ChatMessage(role=Role.ASSISTANT, contents=[TextContent(text="Hello from AF custom agent")])

        # Persist conversation to the provided AgentThread (if any)
        if thread is not None:
            normalized = self._normalize_messages(messages)
            await self._notify_thread_of_new_messages(thread, normalized, reply)

        return AgentResponse(messages=[reply])

    async def run_stream(
        self,
        messages: str | ChatMessage | list[str] | list[ChatMessage] | None = None,
        *,
        thread: AgentThread | None = None,
        **kwargs: Any,
    ) -> AsyncIterable[AgentResponseUpdate]:
        # Stream the same static response in a single chunk for simplicity
        yield AgentResponseUpdate(contents=[TextContent(text="Hello from AF custom agent")], role=Role.ASSISTANT)

        # Notify thread of input and the complete response once streaming ends
        if thread is not None:
            reply = ChatMessage(role=Role.ASSISTANT, contents=[TextContent(text="Hello from AF custom agent")])
            normalized = self._normalize_messages(messages)
            await self._notify_thread_of_new_messages(thread, normalized, reply)

Notes:

  • AgentThread mempertahankan status percakapan secara eksternal; gunakan agent.get_new_thread() dan teruskan ke run/run_stream.
  • Panggil self._notify_thread_of_new_messages(thread, input_messages, response_messages) agar utas memiliki kedua sisi pertukaran.
  • Lihat sampel lengkap: Agen Kustom

Selanjutnya, mari kita lihat orkestrasi multi-agen—area di mana kerangka kerja paling berbeda.

Pemetaan Fitur Multi-Agen

Gambaran Umum Model Pemrograman

Model pemrograman multi-agen mewakili perbedaan paling signifikan antara dua kerangka kerja.

Pendekatan Model Ganda AutoGen

AutoGen menyediakan dua model pemrograman:

  1. autogen-core: Pemrograman tingkat rendah dan berbasis peristiwa dengan RoutedAgent langganan pesan
  2. Team abstraksi: Model tingkat tinggi yang berpusat pada eksekusi yang dibangun di atas autogen-core
# Low-level autogen-core (complex)
class MyAgent(RoutedAgent):
    @message_handler
    async def handle_message(self, message: TextMessage, ctx: MessageContext) -> None:
        # Handle specific message types
        pass

# High-level Team (easier but limited)
team = RoundRobinGroupChat(
    participants=[agent1, agent2],
    termination_condition=StopAfterNMessages(5)
)
result = await team.run(task="Collaborate on this task")

Tantangan:

  • Model tingkat rendah terlalu kompleks untuk sebagian besar pengguna
  • Model tingkat tinggi dapat menjadi pembatasan untuk perilaku kompleks
  • Menjembatani antara kedua model menambahkan kompleksitas implementasi

Model Alur Kerja Terpadu Kerangka Kerja Agen

Agent Framework menyediakan abstraksi tunggal Workflow yang menggabungkan yang terbaik dari kedua pendekatan:

from agent_framework import WorkflowBuilder, executor, WorkflowContext
from typing_extensions import Never

# Assume we have agent1 and agent2 from previous examples
@executor(id="agent1")
async def agent1_executor(input_msg: str, ctx: WorkflowContext[str]) -> None:
    response = await agent1.run(input_msg)
    await ctx.send_message(response.text)

@executor(id="agent2")
async def agent2_executor(input_msg: str, ctx: WorkflowContext[Never, str]) -> None:
    response = await agent2.run(input_msg)
    await ctx.yield_output(response.text)  # Final output

# Build typed data flow graph
workflow = (WorkflowBuilder()
           .add_edge(agent1_executor, agent2_executor)
           .set_start_executor(agent1_executor)
           .build())

# Example usage (would be in async context)
# result = await workflow.run("Initial input")

Untuk contoh alur kerja terperinci, lihat:

Keuntungan:

  • Model terpadu: Abstraksi tunggal untuk semua tingkat kompleksitas
  • Keamanan jenis: Input dan output yang sangat ditik
  • Visualisasi grafik: Menghapus representasi aliran data
  • Komposisi fleksibel: Mencampur agen, fungsi, dan sub-alur kerja

Alur Kerja vs GraphFlow

Abstraksi Agent Framework Workflow terinspirasi oleh fitur eksperimental GraphFlow AutoGen, tetapi mewakili evolusi yang signifikan dalam filsafat desain:

  • GraphFlow: Aliran kontrol berbasis di mana tepi adalah transisi dan pesan disiarkan ke semua agen; transisi dikondisikan pada konten pesan yang disiarkan
  • Alur kerja: Berbasis aliran data di mana pesan dirutekan melalui tepi dan pelaksana tertentu diaktifkan oleh tepi, dengan dukungan untuk eksekusi bersamaan.

Gambaran Umum Visual

Diagram di bawah ini membedakan alur kontrol AutoGen GraphFlow (kiri) dengan Alur Kerja aliran data Agent Framework (kanan). GraphFlow memodelkan agen sebagai simpul dengan transisi dan siaran bersyar. Pelaksana model alur kerja (agen, fungsi, atau sub-alur kerja) yang disambungkan oleh tepi yang diketik; ini juga mendukung jeda permintaan/respons dan titik pemeriksaan.

flowchart LR

  subgraph AutoGenGraphFlow
    direction TB
    U[User / Task] --> A[Agent A]
    A -->|success| B[Agent B]
    A -->|retry| C[Agent C]
    A -. broadcast .- B
    A -. broadcast .- C
  end

  subgraph AgentFrameworkWorkflow
    direction TB
    I[Input] --> E1[Executor 1]
    E1 -->|"str"| E2[Executor 2]
    E1 -->|"image"| E3[Executor 3]
    E3 -->|"str"| E2
    E2 --> OUT[(Final Output)]
  end

  R[Request / Response Gate]
  E2 -. request .-> R
  R -. resume .-> E2

  CP[Checkpoint]
  E1 -. save .-> CP
  CP -. load .-> E1

Dalam praktiknya:

  • GraphFlow menggunakan agen sebagai simpul dan menyiarkan pesan; tepi mewakili transisi bersyarah.
  • Alur kerja merutekan pesan yang ditik di sepanjang tepi. Simpul (pelaksana) dapat berupa agen, fungsi murni, atau sub-alur kerja.
  • Permintaan/respons memungkinkan alur kerja menjeda input eksternal; titik pemeriksaan mempertahankan kemajuan dan mengaktifkan lanjutkan.

Perbandingan Kode

1) Berurutan + Bersyarkat
# AutoGen GraphFlow (fluent builder) — writer → reviewer → editor (conditional)
from autogen_agentchat.agents import AssistantAgent
from autogen_agentchat.teams import DiGraphBuilder, GraphFlow

writer = AssistantAgent(name="writer", description="Writes a draft", model_client=client)
reviewer = AssistantAgent(name="reviewer", description="Reviews the draft", model_client=client)
editor = AssistantAgent(name="editor", description="Finalizes the draft", model_client=client)

graph = (
    DiGraphBuilder()
    .add_node(writer).add_node(reviewer).add_node(editor)
    .add_edge(writer, reviewer)  # always
    .add_edge(reviewer, editor, condition=lambda msg: "approve" in msg.to_model_text())
    .set_entry_point(writer)
).build()

team = GraphFlow(participants=[writer, reviewer, editor], graph=graph)
result = await team.run(task="Draft a short paragraph about solar power")
# Agent Framework Workflow — sequential executors with conditional logic
from agent_framework import WorkflowBuilder, executor, WorkflowContext
from typing_extensions import Never

@executor(id="writer")
async def writer_exec(task: str, ctx: WorkflowContext[str]) -> None:
    await ctx.send_message(f"Draft: {task}")

@executor(id="reviewer")
async def reviewer_exec(draft: str, ctx: WorkflowContext[str]) -> None:
    decision = "approve" if "solar" in draft.lower() else "revise"
    await ctx.send_message(f"{decision}:{draft}")

@executor(id="editor")
async def editor_exec(msg: str, ctx: WorkflowContext[Never, str]) -> None:
    if msg.startswith("approve:"):
        await ctx.yield_output(msg.split(":", 1)[1])
    else:
        await ctx.yield_output("Needs revision")

workflow_seq = (
    WorkflowBuilder()
    .add_edge(writer_exec, reviewer_exec)
    .add_edge(reviewer_exec, editor_exec)
    .set_start_executor(writer_exec)
    .build()
)
2) Fan-out + Join (ALL vs ANY)
# AutoGen GraphFlow — A → (B, C) → D with ALL/ANY join
from autogen_agentchat.teams import DiGraphBuilder, GraphFlow
A, B, C, D = agent_a, agent_b, agent_c, agent_d

# ALL (default): D runs after both B and C
g_all = (
    DiGraphBuilder()
    .add_node(A).add_node(B).add_node(C).add_node(D)
    .add_edge(A, B).add_edge(A, C)
    .add_edge(B, D).add_edge(C, D)
    .set_entry_point(A)
).build()

# ANY: D runs when either B or C completes
g_any = (
    DiGraphBuilder()
    .add_node(A).add_node(B).add_node(C).add_node(D)
    .add_edge(A, B).add_edge(A, C)
    .add_edge(B, D, activation_group="join_d", activation_condition="any")
    .add_edge(C, D, activation_group="join_d", activation_condition="any")
    .set_entry_point(A)
).build()
# Agent Framework Workflow — A → (B, C) → aggregator (ALL vs ANY)
from agent_framework import WorkflowBuilder, executor, WorkflowContext
from typing_extensions import Never

@executor(id="A")
async def start(task: str, ctx: WorkflowContext[str]) -> None:
    await ctx.send_message(f"B:{task}", target_id="B")
    await ctx.send_message(f"C:{task}", target_id="C")

@executor(id="B")
async def branch_b(text: str, ctx: WorkflowContext[str]) -> None:
    await ctx.send_message(f"B_done:{text}")

@executor(id="C")
async def branch_c(text: str, ctx: WorkflowContext[str]) -> None:
    await ctx.send_message(f"C_done:{text}")

@executor(id="join_any")
async def join_any(msg: str, ctx: WorkflowContext[Never, str]) -> None:
    await ctx.yield_output(f"First: {msg}")  # ANY join (first arrival)

@executor(id="join_all")
async def join_all(msg: str, ctx: WorkflowContext[str, str]) -> None:
    state = await ctx.get_executor_state() or {"items": []}
    state["items"].append(msg)
    await ctx.set_executor_state(state)
    if len(state["items"]) >= 2:
        await ctx.yield_output(" | ".join(state["items"]))  # ALL join

wf_any = (
    WorkflowBuilder()
    .add_edge(start, branch_b).add_edge(start, branch_c)
    .add_edge(branch_b, join_any).add_edge(branch_c, join_any)
    .set_start_executor(start)
    .build()
)

wf_all = (
    WorkflowBuilder()
    .add_edge(start, branch_b).add_edge(start, branch_c)
    .add_edge(branch_b, join_all).add_edge(branch_c, join_all)
    .set_start_executor(start)
    .build()
)
3) Perutean Bertarget (tanpa siaran)
from agent_framework import WorkflowBuilder, executor, WorkflowContext
from typing_extensions import Never

@executor(id="ingest")
async def ingest(task: str, ctx: WorkflowContext[str]) -> None:
    # Route selectively using target_id
    if task.startswith("image:"):
        await ctx.send_message(task.removeprefix("image:"), target_id="vision")
    else:
        await ctx.send_message(task, target_id="writer")

@executor(id="writer")
async def write(text: str, ctx: WorkflowContext[Never, str]) -> None:
    await ctx.yield_output(f"Draft: {text}")

@executor(id="vision")
async def caption(image_ref: str, ctx: WorkflowContext[Never, str]) -> None:
    await ctx.yield_output(f"Caption: {image_ref}")

workflow = (
    WorkflowBuilder()
    .add_edge(ingest, write)
    .add_edge(ingest, caption)
    .set_start_executor(ingest)
    .build()
)

# Example usage (async):
# await workflow.run("Summarize the benefits of solar power")
# await workflow.run("image:https://example.com/panel.jpg")

Apa yang harus diperhatikan:

  • GraphFlow menyiarkan pesan dan menggunakan transisi bersyar. Perilaku gabungan dikonfigurasi melalui sisi activation target dan per tepi activation_group/activation_condition (misalnya, kelompokkan kedua tepi ke dalam join_d dengan activation_condition="any").
  • Alur kerja merutekan data secara eksplisit; gunakan target_id untuk memilih pelaksana hilir. Perilaku bergabung hidup di pelaksana penerima (misalnya, menghasilkan input pertama vs menunggu semua), atau melalui penyusun/agregator orkestrasi.
  • Pelaksana dalam Alur Kerja adalah bentuk bebas: membungkus ChatAgent, fungsi, atau sub-alur kerja dan mencampurnya dalam grafik yang sama.

Perbedaan Utama

Tabel di bawah ini meringkas perbedaan mendasar antara Alur Kerja GraphFlow AutoGen dan Alur Kerja Agent Framework:

Aspek AutoGen GraphFlow Alur Kerja Kerangka Kerja Agen
Jenis Alur Alur kontrol (tepi adalah transisi) Aliran data (pesan rute tepi)
Jenis Node Hanya agen Agen, fungsi, sub-alur kerja
Aktivasi Siaran pesan Aktivasi berbasis Edge
Keamanan Jenis Terbatas Pengetikan yang kuat di seluruh
Komposabilitas Terbatas Sangat mudah disusam

Pola Bersarang

Tim AutoGen Bersarang

# Inner team
inner_team = RoundRobinGroupChat(
    participants=[specialist1, specialist2],
    termination_condition=StopAfterNMessages(3)
)

# Outer team with nested team as participant
outer_team = RoundRobinGroupChat(
    participants=[coordinator, inner_team, reviewer],  # Team as participant
    termination_condition=StopAfterNMessages(10)
)

# Messages are broadcasted to all participants including nested team
result = await outer_team.run("Complex task requiring collaboration")

Karakteristik berlapis AutoGen:

  • Tim berlapis menerima semua pesan dari tim luar
  • Pesan tim berlapis disiarkan ke semua peserta tim luar
  • Konteks pesan bersama di semua tingkatan

Alur Kerja Kerangka Kerja Agen Bersarang

from agent_framework import WorkflowExecutor, WorkflowBuilder

# Assume we have executors from previous examples
# specialist1_executor, specialist2_executor, coordinator_executor, reviewer_executor

# Create sub-workflow
sub_workflow = (WorkflowBuilder()
               .add_edge(specialist1_executor, specialist2_executor)
               .set_start_executor(specialist1_executor)
               .build())

# Wrap as executor
sub_workflow_executor = WorkflowExecutor(
    workflow=sub_workflow,
    id="sub_process"
)

# Use in parent workflow
parent_workflow = (WorkflowBuilder()
                  .add_edge(coordinator_executor, sub_workflow_executor)
                  .add_edge(sub_workflow_executor, reviewer_executor)
                  .set_start_executor(coordinator_executor)
                  .build())

Karakteristik berlapis Kerangka Kerja Agen:

  • Input/output terisolasi melalui WorkflowExecutor
  • Tidak ada penyiaran pesan - aliran data melalui koneksi tertentu
  • Manajemen status independen untuk setiap tingkat alur kerja

Pola Obrolan Grup

Pola obrolan grup memungkinkan beberapa agen untuk berkolaborasi pada tugas yang kompleks. Berikut adalah bagaimana pola umum diterjemahkan antar kerangka kerja.

Pola RoundRobinGroupChat

Implementasi AutoGen:

from autogen_agentchat.teams import RoundRobinGroupChat
from autogen_agentchat.conditions import StopAfterNMessages

team = RoundRobinGroupChat(
    participants=[agent1, agent2, agent3],
    termination_condition=StopAfterNMessages(10)
)
result = await team.run("Discuss this topic")

Implementasi Kerangka Kerja Agen:

from agent_framework import SequentialBuilder, WorkflowOutputEvent

# Assume we have agent1, agent2, agent3 from previous examples
# Sequential workflow through participants
workflow = SequentialBuilder().participants([agent1, agent2, agent3]).build()

# Example usage (would be in async context)
async def sequential_example():
    # Each agent appends to shared conversation
    async for event in workflow.run_stream("Discuss this topic"):
        if isinstance(event, WorkflowOutputEvent):
            conversation_history = event.data  # list[ChatMessage]

Untuk contoh orkestrasi terperinci, lihat:

Untuk pola eksekusi bersamaan, Agent Framework juga menyediakan:

from agent_framework import ConcurrentBuilder, WorkflowOutputEvent

# Assume we have agent1, agent2, agent3 from previous examples
# Concurrent workflow for parallel processing
workflow = (ConcurrentBuilder()
           .participants([agent1, agent2, agent3])
           .build())

# Example usage (would be in async context)
async def concurrent_example():
    # All agents process the input concurrently
    async for event in workflow.run_stream("Process this in parallel"):
        if isinstance(event, WorkflowOutputEvent):
            results = event.data  # Combined results from all agents

Untuk contoh eksekusi bersamaan, lihat:

Pola MagenticOneGroupChat

Implementasi AutoGen:

from autogen_agentchat.teams import MagenticOneGroupChat

team = MagenticOneGroupChat(
    participants=[researcher, coder, executor],
    model_client=coordinator_client,
    termination_condition=StopAfterNMessages(20)
)
result = await team.run("Complex research and analysis task")

Implementasi Kerangka Kerja Agen:

from typing import cast
from agent_framework import (
    MAGENTIC_EVENT_TYPE_AGENT_DELTA,
    MAGENTIC_EVENT_TYPE_ORCHESTRATOR,
    AgentResponseUpdateEvent,
    ChatAgent,
    ChatMessage,
    MagenticBuilder,
    WorkflowOutputEvent,
)
from agent_framework.openai import OpenAIChatClient

# Create a manager agent for orchestration
manager_agent = ChatAgent(
    name="MagenticManager",
    description="Orchestrator that coordinates the workflow",
    instructions="You coordinate a team to complete complex tasks efficiently.",
    chat_client=OpenAIChatClient(),
)

workflow = (
    MagenticBuilder()
    .participants([researcher, coder])
    .with_manager(
        agent=manager_agent,
        max_round_count=20,
        max_stall_count=3,
        max_reset_count=2,
    )
    .build()
)

# Example usage (would be in async context)
async def magentic_example():
    output: str | None = None
    async for event in workflow.run_stream("Complex research task"):
        if isinstance(event, WorkflowOutputEvent):
            output_messages = cast(list[ChatMessage], event.data)
            if output_messages:
                output = output_messages[-1].text

Opsi Kustomisasi Kerangka Kerja Agen:

Alur kerja Magentic menyediakan opsi kustomisasi yang luas:

  • Konfigurasi manajer: Menggunakan ChatAgent dengan instruksi kustom dan pengaturan model
  • Batas putaran: max_round_count, max_stall_count, max_reset_count
  • Streaming acara: Gunakan AgentResponseUpdateEvent dengan magentic_event_type metadata
  • Spesialisasi agen: Instruksi dan alat kustom per agen
  • Human-in-the-loop: Tinjauan rencana, persetujuan alat, dan intervensi stall
# Advanced customization example with human-in-the-loop
from typing import cast
from agent_framework import (
    MAGENTIC_EVENT_TYPE_AGENT_DELTA,
    MAGENTIC_EVENT_TYPE_ORCHESTRATOR,
    AgentResponseUpdateEvent,
    ChatAgent,
    MagenticBuilder,
    MagenticHumanInterventionDecision,
    MagenticHumanInterventionKind,
    MagenticHumanInterventionReply,
    MagenticHumanInterventionRequest,
    RequestInfoEvent,
    WorkflowOutputEvent,
)
from agent_framework.openai import OpenAIChatClient

# Create manager agent with custom configuration
manager_agent = ChatAgent(
    name="MagenticManager",
    description="Orchestrator for complex tasks",
    instructions="Custom orchestration instructions...",
    chat_client=OpenAIChatClient(model_id="gpt-4o"),
)

workflow = (
    MagenticBuilder()
    .participants([researcher_agent, coder_agent, analyst_agent])
    .with_manager(
        agent=manager_agent,
        max_round_count=15,      # Limit total rounds
        max_stall_count=2,       # Trigger stall handling
        max_reset_count=1,       # Allow one reset on failure
    )
    .with_plan_review()           # Enable human plan review
    .with_human_input_on_stall()  # Enable human intervention on stalls
    .build()
)

# Handle human intervention requests during execution
async for event in workflow.run_stream("Complex task"):
    if isinstance(event, RequestInfoEvent) and event.request_type is MagenticHumanInterventionRequest:
        req = cast(MagenticHumanInterventionRequest, event.data)
        if req.kind == MagenticHumanInterventionKind.PLAN_REVIEW:
            # Review and approve the plan
            reply = MagenticHumanInterventionReply(
                decision=MagenticHumanInterventionDecision.APPROVE
            )
            async for ev in workflow.send_responses_streaming({event.request_id: reply}):
                pass  # Handle continuation

Untuk contoh Magentic terperinci, lihat:

Pola Masa Depan

Peta strategi Agent Framework mencakup beberapa pola AutoGen yang saat ini sedang dalam pengembangan:

  • Pola kawanan: Koordinasi agen berbasis Handoff
  • SelectorGroupChat: Pilihan pembicara berbasis LLM

Human-in-the-Loop dengan Respons Permintaan

Fitur baru utama dalam Agent Framework Workflow adalah konsep permintaan dan respons, yang memungkinkan alur kerja menjeda eksekusi dan menunggu input eksternal sebelum melanjutkan. Kemampuan ini tidak ada dalam abstraksi AutoGen Team dan memungkinkan pola human-in-the-loop yang canggih.

Batasan AutoGen

Abstraksi AutoGen Team berjalan terus menerus setelah dimulai dan tidak menyediakan mekanisme bawaan untuk menjeda eksekusi untuk input manusia. Setiap fungsionalitas human-in-the-loop memerlukan implementasi kustom di luar kerangka kerja.

Agent Framework Request-Response API

Agent Framework menyediakan kemampuan respons permintaan bawaan di mana setiap pelaksana dapat mengirim permintaan menggunakan ctx.request_info() dan menangani respons dengan @response_handler dekorator.

from agent_framework import (
    RequestInfoEvent, WorkflowBuilder, WorkflowContext,
    Executor, handler, response_handler
)
from dataclasses import dataclass

# Assume we have agent_executor defined elsewhere

# Define typed request payload
@dataclass
class ApprovalRequest:
    """Request human approval for agent output."""
    content: str = ""
    agent_name: str = ""

# Workflow executor that requests human approval
class ReviewerExecutor(Executor):

    @handler
    async def review_content(
        self,
        agent_response: str,
        ctx: WorkflowContext
    ) -> None:
        # Request human input with structured data
        approval_request = ApprovalRequest(
            content=agent_response,
            agent_name="writer_agent"
        )
        await ctx.request_info(request_data=approval_request, response_type=str)

    @response_handler
    async def handle_approval_response(
        self,
        original_request: ApprovalRequest,
        decision: str,
        ctx: WorkflowContext
    ) -> None:
        decision_lower = decision.strip().lower()
        original_content = original_request.content

        if decision_lower == "approved":
            await ctx.yield_output(f"APPROVED: {original_content}")
        else:
            await ctx.yield_output(f"REVISION NEEDED: {decision}")

# Build workflow with human-in-the-loop
reviewer = ReviewerExecutor(id="reviewer")

workflow = (WorkflowBuilder()
           .add_edge(agent_executor, reviewer)
           .set_start_executor(agent_executor)
           .build())

Menjalankan Alur Kerja Human-in-the-Loop

Agent Framework menyediakan API streaming untuk menangani siklus jeda-resume:

from agent_framework import RequestInfoEvent, WorkflowOutputEvent

# Assume we have workflow defined from previous examples
async def run_with_human_input():
    pending_responses = None
    completed = False

    while not completed:
        # First iteration uses run_stream, subsequent use send_responses_streaming
        stream = (
            workflow.send_responses_streaming(pending_responses)
            if pending_responses
            else workflow.run_stream("initial input")
        )

        events = [event async for event in stream]
        pending_responses = None

        # Collect human requests and outputs
        for event in events:
            if isinstance(event, RequestInfoEvent):
                # Display request to human and collect response
                request_data = event.data  # ApprovalRequest instance
                print(f"Review needed: {request_data.content}")

                human_response = input("Enter 'approved' or revision notes: ")
                pending_responses = {event.request_id: human_response}

            elif isinstance(event, WorkflowOutputEvent):
                print(f"Final result: {event.data}")
                completed = True

Untuk contoh alur kerja human-in-the-loop, lihat:

Titik Pemeriksaan dan Lanjutkan Alur Kerja

Keuntungan utama lain dari Agent Framework Workflow daripada abstraksi AutoGen Team adalah dukungan bawaan untuk titik pemeriksaan dan lanjutkan eksekusi. Ini memungkinkan alur kerja dijeda, dipertahankan, dan dilanjutkan nanti dari titik pemeriksaan apa pun, memberikan toleransi kesalahan dan mengaktifkan alur kerja yang berjalan lama atau asinkron.

Batasan AutoGen

Abstraksi AutoGen Team tidak menyediakan kemampuan titik pemeriksaan bawaan. Setiap mekanisme persistensi atau pemulihan harus diimplementasikan secara eksternal, sering membutuhkan logika manajemen dan serialisasi status yang kompleks.

Titik Pemeriksaan Kerangka Kerja Agen

Agent Framework menyediakan titik pemeriksaan komprehensif melalui FileCheckpointStorage dan with_checkpointing() metode pada WorkflowBuilder. Pengambilan titik pemeriksaan:

  • Status pelaksana: Status lokal untuk setiap pelaksana menggunakan ctx.set_executor_state()
  • Status bersama: Status lintas eksekutor menggunakan ctx.set_shared_state()
  • Antrean pesan: Pesan tertunda antara pelaksana
  • Posisi alur kerja: Kemajuan eksekusi saat ini dan langkah berikutnya
from agent_framework import (
    FileCheckpointStorage, WorkflowBuilder, WorkflowContext,
    Executor, handler
)
from typing_extensions import Never

class ProcessingExecutor(Executor):
    @handler
    async def process(self, data: str, ctx: WorkflowContext[str]) -> None:
        # Process the data
        result = f"Processed: {data.upper()}"
        print(f"Processing: '{data}' -> '{result}'")

        # Persist executor-local state
        prev_state = await ctx.get_executor_state() or {}
        count = prev_state.get("count", 0) + 1
        await ctx.set_executor_state({
            "count": count,
            "last_input": data,
            "last_output": result
        })

        # Persist shared state for other executors
        await ctx.set_shared_state("original_input", data)
        await ctx.set_shared_state("processed_output", result)

        await ctx.send_message(result)

class FinalizeExecutor(Executor):
    @handler
    async def finalize(self, data: str, ctx: WorkflowContext[Never, str]) -> None:
        result = f"Final: {data}"
        await ctx.yield_output(result)

# Configure checkpoint storage
checkpoint_storage = FileCheckpointStorage(storage_path="./checkpoints")
processing_executor = ProcessingExecutor(id="processing")
finalize_executor = FinalizeExecutor(id="finalize")

# Build workflow with checkpointing enabled
workflow = (WorkflowBuilder()
           .add_edge(processing_executor, finalize_executor)
           .set_start_executor(processing_executor)
           .with_checkpointing(checkpoint_storage=checkpoint_storage)  # Enable checkpointing
           .build())

# Example usage (would be in async context)
async def checkpoint_example():
    # Run workflow - checkpoints are created automatically
    async for event in workflow.run_stream("input data"):
        print(f"Event: {event}")

Lanjutkan dari Titik Pemeriksaan

Agent Framework menyediakan API untuk mencantumkan, memeriksa, dan melanjutkan dari titik pemeriksaan tertentu:

from typing_extensions import Never

from agent_framework import (
    Executor,
    FileCheckpointStorage,
    WorkflowContext,
    WorkflowBuilder,
    get_checkpoint_summary,
    handler,
)

class UpperCaseExecutor(Executor):
    @handler
    async def process(self, text: str, ctx: WorkflowContext[str]) -> None:
        result = text.upper()
        await ctx.send_message(result)

class ReverseExecutor(Executor):
    @handler
    async def process(self, text: str, ctx: WorkflowContext[Never, str]) -> None:
        result = text[::-1]
        await ctx.yield_output(result)

def create_workflow(checkpoint_storage: FileCheckpointStorage):
    """Create a workflow with two executors and checkpointing."""
    upper_executor = UpperCaseExecutor(id="upper")
    reverse_executor = ReverseExecutor(id="reverse")

    return (WorkflowBuilder()
           .add_edge(upper_executor, reverse_executor)
           .set_start_executor(upper_executor)
           .with_checkpointing(checkpoint_storage=checkpoint_storage)
           .build())

# Assume we have checkpoint_storage from previous examples
checkpoint_storage = FileCheckpointStorage(storage_path="./checkpoints")

async def checkpoint_resume_example():
    # List available checkpoints
    checkpoints = await checkpoint_storage.list_checkpoints()

    # Display checkpoint information
    for checkpoint in checkpoints:
        summary = get_checkpoint_summary(checkpoint)
        print(f"Checkpoint {summary.checkpoint_id}: iteration={summary.iteration_count}")

    # Resume from a specific checkpoint
    if checkpoints:
        chosen_checkpoint_id = checkpoints[0].checkpoint_id

        # Create new workflow instance and resume
        new_workflow = create_workflow(checkpoint_storage)
        async for event in new_workflow.run_stream(
            checkpoint_id=chosen_checkpoint_id,
            checkpoint_storage=checkpoint_storage
        ):
            print(f"Resumed event: {event}")

Fitur Titik Pemeriksaan Tingkat Lanjut

Titik pemeriksaan dengan Integrasi Human-in-the-Loop:

Titik pemeriksaan berfungsi dengan mulus dengan alur kerja human-in-the-loop, memungkinkan alur kerja dijeda untuk input manusia dan dilanjutkan nanti. Saat melanjutkan dari titik pemeriksaan yang berisi permintaan yang tertunda, permintaan tersebut akan dipancarkan kembali sebagai peristiwa:

# Assume we have workflow, checkpoint_id, and checkpoint_storage from previous examples
async def resume_with_pending_requests_example():
    # Resume from checkpoint - pending requests will be re-emitted
    request_info_events = []
    async for event in workflow.run_stream(
        checkpoint_id=checkpoint_id,
        checkpoint_storage=checkpoint_storage
    ):
        if isinstance(event, RequestInfoEvent):
            request_info_events.append(event)

    # Handle re-emitted pending request
    responses = {}
    for event in request_info_events:
        response = handle_request(event.data)
        responses[event.request_id] = response

    # Send response back to workflow
    async for event in workflow.send_responses_streaming(responses):
        print(f"Event: {event}")

Manfaat Utama

Dibandingkan dengan AutoGen, titik pemeriksaan Agent Framework menyediakan:

  • Persistensi otomatis: Tidak diperlukan manajemen status manual
  • Pemulihan terperinci: Lanjutkan dari batas superstep apa pun
  • Isolasi status: Memisahkan status eksekutor-lokal dan bersama
  • Integrasi human-in-the-loop: Jeda-resume tanpa hambatan dengan input manusia
  • Toleransi kesalahan: Pemulihan yang kuat dari kegagalan atau gangguan

Contoh Praktis

Untuk contoh titik pemeriksaan komprehensif, lihat:


Observability

AutoGen dan Agent Framework menyediakan kemampuan pengamatan, tetapi dengan pendekatan dan fitur yang berbeda.

Pengamatan AutoGen

AutoGen memiliki dukungan asli untuk OpenTelemetry dengan instrumentasi untuk:

  • Pelacakan runtime: SingleThreadedAgentRuntime dan GrpcWorkerAgentRuntime
  • Eksekusi alat: BaseTool dengan execute_tool rentang berikut konvensi semantik GenAI
  • Operasi agen: BaseChatAgent dengan create_agent dan invoke_agent rentang
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from autogen_core import SingleThreadedAgentRuntime

# Configure OpenTelemetry
tracer_provider = TracerProvider()
trace.set_tracer_provider(tracer_provider)

# Pass to runtime
runtime = SingleThreadedAgentRuntime(tracer_provider=tracer_provider)

Pengamatan Kerangka Kerja Agen

Agent Framework memberikan pengamatan komprehensif melalui beberapa pendekatan:

  • Penyiapan kode nol: Instrumentasi otomatis melalui variabel lingkungan
  • Konfigurasi manual: Penyiapan terprogram dengan parameter kustom
  • Telemetri yang kaya: Agen, alur kerja, dan pelacakan eksekusi alat
  • Output konsol: Pengelogan dan visualisasi konsol bawaan
from agent_framework import ChatAgent
from agent_framework.observability import setup_observability
from agent_framework.openai import OpenAIChatClient

# Zero-code setup via environment variables
# Set ENABLE_OTEL=true
# Set OTLP_ENDPOINT=http://localhost:4317

# Or manual setup
setup_observability(
    otlp_endpoint="http://localhost:4317"
)

# Create client for the example
client = OpenAIChatClient(model_id="gpt-5")

async def observability_example():
    # Observability is automatically applied to all agents and workflows
    agent = ChatAgent(name="assistant", chat_client=client)
    result = await agent.run("Hello")  # Automatically traced

Perbedaan Utama:

  • Kompleksitas penyiapan: Agent Framework menawarkan opsi penyiapan nol kode yang lebih sederhana
  • Cakupan: Kerangka Kerja Agen menyediakan cakupan yang lebih luas termasuk pengamatan tingkat alur kerja
  • Visualisasi: Kerangka Kerja Agen mencakup output konsol bawaan dan UI pengembangan
  • Konfigurasi: Agent Framework menawarkan opsi konfigurasi yang lebih fleksibel

Untuk contoh pengamatan terperinci, lihat:


Conclusion

Panduan migrasi ini menyediakan pemetaan komprehensif antara AutoGen dan Microsoft Agent Framework, yang mencakup semuanya mulai dari pembuatan agen dasar hingga alur kerja multi-agen yang kompleks. Pengambilan kunci untuk migrasi:

  • Migrasi agen tunggal sangat mudah, dengan API serupa dan kemampuan yang ditingkatkan dalam Agent Framework
  • Pola multi-agen perlu memikirkan kembali pendekatan Anda dari arsitektur berbasis aliran data ke aliran data, tetapi jika Anda sudah terbiasa dengan GraphFlow, transisi akan lebih mudah
  • Agent Framework menawarkan fitur tambahan seperti middleware, alat yang dihosting, dan alur kerja yang di ketik

Untuk contoh tambahan dan panduan implementasi terperinci, lihat direktori sampel Kerangka Kerja Agen .

Kategori Sampel Tambahan

Kerangka Kerja Agen menyediakan sampel di beberapa area penting lainnya:

Langkah selanjutnya