Sdílet prostřednictvím


Průvodce migrací autogenu do architektury Microsoft Agent

Komplexní průvodce migrací z autogenu na sadu Python SDK microsoft Agent Framework

Seznam obsahu

Background

AutoGen je architektura pro vytváření agentů AI a systémů s více agenty pomocí rozsáhlých jazykových modelů (LLM). Začal jako výzkumný projekt v Microsoft Research a povýšil několik konceptů orchestrace multi-agentů, jako je GroupChat a modul runtime agentů řízený událostmi. Projekt byl plodnou spolupráci opensourcové komunity a mnoho důležitých funkcí pochází od externích přispěvatelů.

Microsoft Agent Framework je nová sada SDK s více jazyky pro vytváření agentů a pracovních postupů AI pomocí LLM. Představuje významný vývoj myšlenek průkopnických v AutoGenu a zahrnuje poznatky získané z reálného využití. Vyvíjí ho základní týmy AutoGen a Sémantic Kernel v Microsoftu a jsou navržené tak, aby byly novým základem pro vytváření aplikací umělé inteligence.

Tato příručka popisuje praktickou cestu migrace: začíná pokrytím toho, co zůstane stejné a co se změní na první pohled. Pak se zabývá nastavením klienta modelu, funkcemi s jedním agentem a nakonec orchestrací s více agenty s konkrétním kódem vedle sebe. Odkazy na spustitelné ukázky v úložišti Agent Framework vám pomůžou ověřit jednotlivé kroky.

Klíčové podobnosti a rozdíly

Co zůstane stejné

Základy jsou známé. Stále vytváříte agenty kolem klienta modelu, poskytnete pokyny a připojíte nástroje. Obě knihovny podporují nástroje ve stylu funkcí, streamování tokenů, multimodální obsah a asynchronní vstupně-výstupní operace.

# Both frameworks follow similar patterns
# AutoGen
agent = AssistantAgent(name="assistant", model_client=client, tools=[my_tool])
result = await agent.run(task="Help me with this task")

# Agent Framework
agent = ChatAgent(name="assistant", chat_client=client, tools=[my_tool])
result = await agent.run("Help me with this task")

Klíčové rozdíly

  1. Styl orchestrace: AutoGen spáruje jádro řízené událostmi s vysokou úrovní Team. Agent Framework se zaměřuje na typový graf, Workflow který směruje data podél hran a aktivuje exekutory, když jsou vstupy připravené.

  2. Nástroje: AutoGen zabalí funkce pomocí FunctionToolfunkce . Agent Framework používá @ai_function, odvozuje schémata automaticky a přidává hostované nástroje, jako je interpret kódu a webové vyhledávání.

  3. Chování agenta: AssistantAgent je jedno otočení, pokud nezvyšovat max_tool_iterations. ChatAgent je ve výchozím nastavení vícenásobný a pořád vyvolává nástroje, dokud nevrátí konečnou odpověď.

  4. Modul runtime: AutoGen nabízí vložené a experimentální distribuované moduly runtime. Agent Framework se dnes zaměřuje na složení s jedním procesem; Distribuované spuštění je naplánováno.

Vytvoření a konfigurace klienta modelu

Obě architektury poskytují klientům modelu pro hlavní poskytovatele AI, s podobnými, ale ne identickými rozhraními API.

Vlastnost AutoGen Agentní rámec
Klient OpenAI OpenAIChatCompletionClient OpenAIChatClient
Klient odpovědí OpenAI ❌ Není k dispozici OpenAIResponsesClient
Azure OpenAI AzureOpenAIChatCompletionClient AzureOpenAIChatClient
Odpovědi Azure OpenAI ❌ Není k dispozici AzureOpenAIResponsesClient
Azure AI AzureAIChatCompletionClient AzureAIAgentClient
Antropický AnthropicChatCompletionClient 🚧 Plánovaný
Ollama OllamaChatCompletionClient 🚧 Plánovaný
Cacheování ChatCompletionCache obal 🚧 Plánovaný

Klienti modelu AutoGen

from autogen_ext.models.openai import OpenAIChatCompletionClient, AzureOpenAIChatCompletionClient

# OpenAI
client = OpenAIChatCompletionClient(
    model="gpt-5",
    api_key="your-key"
)

# Azure OpenAI
client = AzureOpenAIChatCompletionClient(
    azure_endpoint="https://your-endpoint.openai.azure.com/",
    azure_deployment="gpt-5",
    api_version="2024-12-01",
    api_key="your-key"
)

Agent Framework ChatClients

from agent_framework.openai import OpenAIChatClient
from agent_framework.azure import AzureOpenAIChatClient

# OpenAI (reads API key from environment)
client = OpenAIChatClient(model_id="gpt-5")

# Azure OpenAI (uses environment or default credentials; see samples for auth options)
client = AzureOpenAIChatClient(model_id="gpt-5")

Podrobné příklady najdete tady:

Podpora rozhraní API pro odpovědi (exkluzivní rozhraní Agent Framework)

Agent Framework AzureOpenAIResponsesClient poskytuje OpenAIResponsesClient specializovanou podporu pro zdůvodňování modelů a strukturovaných odpovědí, které nejsou v autogenu k dispozici:

from agent_framework.azure import AzureOpenAIResponsesClient
from agent_framework.openai import OpenAIResponsesClient

# Azure OpenAI with Responses API
azure_responses_client = AzureOpenAIResponsesClient(model_id="gpt-5")

# OpenAI with Responses API
openai_responses_client = OpenAIResponsesClient(model_id="gpt-5")

Příklady rozhraní API pro odpovědi najdete tady:

Mapování funkcí Single-Agent

Tato část mapuje funkce s jedním agentem mezi autogen a architekturou agenta. Pomocí klienta vytvořte agenta, připojte nástroje a vyberte si mezi prováděním streamování a streamováním.

Vytvoření a spuštění základního agenta

Jakmile máte nakonfigurovaného klienta modelu, dalším krokem je vytváření agentů. Obě architektury poskytují podobné abstrakce agenta, ale s různými výchozími chováními a možnostmi konfigurace.

AutoGen AssistantAgent

from autogen_agentchat.agents import AssistantAgent

agent = AssistantAgent(
    name="assistant",
    model_client=client,
    system_message="You are a helpful assistant.",
    tools=[my_tool],
    max_tool_iterations=1  # Single-turn by default
)

# Execution
result = await agent.run(task="What's the weather?")

Agent Framework ChatAgent

from agent_framework import ChatAgent, ai_function
from agent_framework.openai import OpenAIChatClient

# Create simple tools for the example
@ai_function
def get_weather(location: str) -> str:
    """Get weather for a location."""
    return f"Weather in {location}: sunny"

@ai_function
def get_time() -> str:
    """Get current time."""
    return "Current time: 2:30 PM"

# Create client
client = OpenAIChatClient(model_id="gpt-5")

async def example():
    # Direct creation
    agent = ChatAgent(
        name="assistant",
        chat_client=client,
        instructions="You are a helpful assistant.",
        tools=[get_weather]  # Multi-turn by default
    )

    # Factory method (more convenient)
    agent = client.create_agent(
        name="assistant",
        instructions="You are a helpful assistant.",
        tools=[get_weather]
    )

    # Execution with runtime tool configuration
    result = await agent.run(
        "What's the weather?",
        tools=[get_time],  # Can add tools at runtime
        tool_choice="auto"
    )

Klíčové rozdíly:

  • Výchozí chování: ChatAgent automaticky iteruje prostřednictvím volání nástrojů, zatímco AssistantAgent vyžaduje explicitní max_tool_iterations nastavení
  • Konfigurace modulu runtime: ChatAgent.run() přijímá tools a tool_choice parametry pro přizpůsobení volání
  • Metody továrny: Agent Framework poskytuje pohodlné metody továrny přímo z chatovacích klientů
  • Správa stavu: ChatAgent je bezstavová a neudržuje historii konverzací mezi vyvoláním, na rozdíl od AssistantAgent toho, že udržuje historii konverzací jako součást svého stavu.

Správa stavu konverzace pomocí agentaThread

Pokud chcete pokračovat v ChatAgentkonverzacích, použijte AgentThread ke správě historie konverzací:

# Assume we have an agent from previous examples
async def conversation_example():
    # Create a new thread that will be reused
    thread = agent.get_new_thread()

    # First interaction - thread is empty
    result1 = await agent.run("What's 2+2?", thread=thread)
    print(result1.text)  # "4"

    # Continue conversation - thread contains previous messages
    result2 = await agent.run("What about that number times 10?", thread=thread)
    print(result2.text)  # "40" (understands "that number" refers to 4)

    # AgentThread can use external storage, similar to ChatCompletionContext in AutoGen

Bezstavová ve výchozím nastavení: rychlá ukázka

# Without a thread (two independent invocations)
r1 = await agent.run("What's 2+2?")
print(r1.text)  # for example, "4"

r2 = await agent.run("What about that number times 10?")
print(r2.text)  # Likely ambiguous without prior context; cannot be "40"

# With a thread (shared context across calls)
thread = agent.get_new_thread()
print((await agent.run("What's 2+2?", thread=thread)).text)  # "4"
print((await agent.run("What about that number times 10?", thread=thread)).text)  # "40"

Příklady správy vláken najdete tady:

Ekvivalence agenta OpenAI Assistant

Obě architektury poskytují integraci rozhraní API pro OpenAI Assistant:

# AutoGen OpenAIAssistantAgent
from autogen_ext.agents.openai import OpenAIAssistantAgent
# Agent Framework has OpenAI Assistants support via OpenAIAssistantsClient
from agent_framework.openai import OpenAIAssistantsClient

Příklady pomocníka OpenAI najdete tady:

Podpora streamování

Obě architektury streamují tokeny v reálném čase od klientů i z agentů, aby uživatelská rozhraní reagovala.

Automatické streamování

# Model client streaming
async for chunk in client.create_stream(messages):
    if isinstance(chunk, str):
        print(chunk, end="")

# Agent streaming
async for event in agent.run_stream(task="Hello"):
    if isinstance(event, ModelClientStreamingChunkEvent):
        print(event.content, end="")
    elif isinstance(event, TaskResult):
        print("Final result received")

Streamování rozhraní Agent Framework

# Assume we have client, agent, and tools from previous examples
async def streaming_example():
    # Chat client streaming
    async for chunk in client.get_streaming_response("Hello", tools=tools):
        if chunk.text:
            print(chunk.text, end="")

    # Agent streaming
    async for chunk in agent.run_stream("Hello"):
        if chunk.text:
            print(chunk.text, end="", flush=True)

Tip: V rozhraní Agent Framework mají klienti i agenti stejný tvar aktualizace; v obou případech si můžete přečíst chunk.text .

Typy a vytváření zpráv

Pochopení fungování zpráv je zásadní pro efektivní komunikaci agenta. Obě architektury poskytují různé přístupy k vytváření a zpracování zpráv pomocí autoGen pomocí samostatných tříd zpráv a agenta Framework pomocí sjednoceného systému zpráv.

Typy zpráv AutoGen

from autogen_agentchat.messages import TextMessage, MultiModalMessage
from autogen_core.models import UserMessage

# Text message
text_msg = TextMessage(content="Hello", source="user")

# Multi-modal message
multi_modal_msg = MultiModalMessage(
    content=["Describe this image", image_data],
    source="user"
)

# Convert to model format for use with model clients
user_message = text_msg.to_model_message()

Typy zpráv rozhraní Agent Framework

from agent_framework import ChatMessage, TextContent, DataContent, UriContent, Role
import base64

# Text message
text_msg = ChatMessage(role=Role.USER, text="Hello")

# Supply real image bytes, or use a data: URI/URL via UriContent
image_bytes = b"<your_image_bytes>"
image_b64 = base64.b64encode(image_bytes).decode()
image_uri = f"data:image/jpeg;base64,{image_b64}"

# Multi-modal message with mixed content
multi_modal_msg = ChatMessage(
    role=Role.USER,
    contents=[
        TextContent(text="Describe this image"),
        DataContent(uri=image_uri, media_type="image/jpeg")
    ]
)

Klíčové rozdíly:

  • AutoGen používá samostatné třídy zpráv (TextMessage, MultiModalMessage) s polem source .
  • Agent Framework používá sjednocenou ChatMessage s objekty typu obsahu a polem role .
  • Zprávy agenta Framework používají Role výčet (USER, ASSISTANT, SYSTEM, TOOL) místo zdrojů řetězců.

Vytváření a integrace nástrojů

Nástroje rozšiřují možnosti agenta nad rámec generování textu. Architektury mají různé přístupy k vytváření nástrojů a rozhraní Agent Framework poskytuje automatizovanější generování schématu.

AutoGen FunctionTool

from autogen_core.tools import FunctionTool

async def get_weather(location: str) -> str:
    """Get weather for a location."""
    return f"Weather in {location}: sunny"

# Manual tool creation
tool = FunctionTool(
    func=get_weather,
    description="Get weather information"
)

# Use with agent
agent = AssistantAgent(name="assistant", model_client=client, tools=[tool])

Agent Framework @ai_function

from agent_framework import ai_function
from typing import Annotated
from pydantic import Field

@ai_function
def get_weather(
    location: Annotated[str, Field(description="The location to get weather for")]
) -> str:
    """Get weather for a location."""
    return f"Weather in {location}: sunny"

# Direct use with agent (automatic conversion)
agent = ChatAgent(name="assistant", chat_client=client, tools=[get_weather])

Podrobné příklady najdete tady:

Hostované nástroje (exkluzivní rozhraní agenta)

Agent Framework poskytuje hostované nástroje, které nejsou dostupné v autogenu:

from agent_framework import ChatAgent, HostedCodeInterpreterTool, HostedWebSearchTool
from agent_framework.azure import AzureOpenAIChatClient

# Azure OpenAI client with a model that supports hosted tools
client = AzureOpenAIChatClient(model_id="gpt-5")

# Code execution tool
code_tool = HostedCodeInterpreterTool()

# Web search tool
search_tool = HostedWebSearchTool()

agent = ChatAgent(
    name="researcher",
    chat_client=client,
    tools=[code_tool, search_tool]
)

Podrobné příklady najdete tady:

Požadavky a upozornění:

  • Hostované nástroje jsou dostupné jenom u modelů nebo účtů, které je podporují. Před povolením těchto nástrojů ověřte nároky a podporu modelů pro vašeho poskytovatele.
  • Konfigurace se liší podle poskytovatele; pro nastavení a oprávnění postupujte podle požadavků v každé ukázce.
  • Ne každý model podporuje každý hostovaný nástroj (například webový vyhledávací vs. interpret kódu). Zvolte kompatibilní model ve vašem prostředí.

Poznámka:

AutoGen podporuje místní nástroje pro spouštění kódu, ale tato funkce se plánuje pro budoucí verze agenta Framework.

Klíčový rozdíl: Agent Framework zpracovává iteraci nástrojů automaticky na úrovni agenta. Na rozdíl od parametru max_tool_iterations AutoGen agenti Agent Framework pokračují ve spouštění nástrojů až do dokončení ve výchozím nastavení s integrovanými bezpečnostními mechanismy, které brání nekonečným smyčkám.

Podpora serveru MCP

Pro pokročilou integraci nástrojů podporují obě architektury protokol MCP (Model Context Protocol), které umožňují agentům pracovat s externími službami a zdroji dat. Agent Framework poskytuje komplexnější integrovanou podporu.

Podpora AutoGen MCP

AutoGen má základní podporu MCP prostřednictvím rozšíření (konkrétní podrobnosti implementace se liší podle verze).

Podpora AGENT FRAMEWORK MCP

from agent_framework import ChatAgent, MCPStdioTool, MCPStreamableHTTPTool, MCPWebsocketTool
from agent_framework.openai import OpenAIChatClient

# Create client for the example
client = OpenAIChatClient(model_id="gpt-5")

# Stdio MCP server
mcp_tool = MCPStdioTool(
    name="filesystem",
    command="uvx mcp-server-filesystem",
    args=["/allowed/directory"]
)

# HTTP streaming MCP
http_mcp = MCPStreamableHTTPTool(
    name="http_mcp",
    url="http://localhost:8000/sse"
)

# WebSocket MCP
ws_mcp = MCPWebsocketTool(
    name="websocket_mcp",
    url="ws://localhost:8000/ws"
)

agent = ChatAgent(name="assistant", chat_client=client, tools=[mcp_tool])

Příklady MCP najdete tady:

Vzor agenta jako nástroje

Jedním z výkonných vzorů je použití samotných agentů jako nástrojů, které umožňují hierarchické architektury agentů. Obě architektury podporují tento model s různými implementacemi.

AutoGen AgentTool

from autogen_agentchat.tools import AgentTool

# Create specialized agent
writer = AssistantAgent(
    name="writer",
    model_client=client,
    system_message="You are a creative writer."
)

# Wrap as tool
writer_tool = AgentTool(agent=writer)

# Use in coordinator (requires disabling parallel tool calls)
coordinator_client = OpenAIChatCompletionClient(
    model="gpt-5",
    parallel_tool_calls=False
)
coordinator = AssistantAgent(
    name="coordinator",
    model_client=coordinator_client,
    tools=[writer_tool]
)

Agent Framework as_tool()

from agent_framework import ChatAgent

# Assume we have client from previous examples
# Create specialized agent
writer = ChatAgent(
    name="writer",
    chat_client=client,
    instructions="You are a creative writer."
)

# Convert to tool
writer_tool = writer.as_tool(
    name="creative_writer",
    description="Generate creative content",
    arg_name="request",
    arg_description="What to write"
)

# Use in coordinator
coordinator = ChatAgent(
    name="coordinator",
    chat_client=client,
    tools=[writer_tool]
)

Poznámka k explicitní migraci: V autogenu nastavte parallel_tool_calls=False klienta modelu koordinátora při zabalení agentů jako nástroje, aby nedocházelo k problémům se souběžností při vyvolání stejné instance agenta. V rozhraní Agent Framework nevyžaduje zakázání paralelních volání nástrojů, as_tool() protože agenti jsou ve výchozím nastavení bezstavové.

Middleware (funkce rozhraní agenta)

Agent Framework zavádí funkce middlewaru, které AutoGen nemá. Middleware umožňuje výkonné průřezové aspekty, jako je protokolování, zabezpečení a monitorování výkonu.

from agent_framework import ChatAgent, AgentRunContext, FunctionInvocationContext
from typing import Callable, Awaitable

# Assume we have client from previous examples
async def logging_middleware(
    context: AgentRunContext,
    next: Callable[[AgentRunContext], Awaitable[None]]
) -> None:
    print(f"Agent {context.agent.name} starting")
    await next(context)
    print(f"Agent {context.agent.name} completed")

async def security_middleware(
    context: FunctionInvocationContext,
    next: Callable[[FunctionInvocationContext], Awaitable[None]]
) -> None:
    if "password" in str(context.arguments):
        print("Blocking function call with sensitive data")
        return  # Don't call next()
    await next(context)

agent = ChatAgent(
    name="secure_agent",
    chat_client=client,
    middleware=[logging_middleware, security_middleware]
)

Výhody:

  • Zabezpečení: Ověřování vstupu a filtrování obsahu
  • Pozorovatelnost: Protokolování, metriky a trasování
  • Výkon: Ukládání do mezipaměti a omezování rychlosti
  • Zpracování chyb: Řádné snížení výkonu a logika opakování

Podrobné příklady middlewaru najdete tady:

Upravitelní agenti

Někdy nechcete vůbec agenta založeného na modelu – chcete deterministický agent nebo agent založený na rozhraní API s vlastní logikou. Obě architektury podporují vytváření vlastních agentů, ale vzory se liší.

AutoGen: Podtřída BaseChatAgent

from typing import Sequence
from autogen_agentchat.agents import BaseChatAgent
from autogen_agentchat.base import Response
from autogen_agentchat.messages import BaseChatMessage, TextMessage, StopMessage
from autogen_core import CancellationToken

class StaticAgent(BaseChatAgent):
    def __init__(self, name: str = "static", description: str = "Static responder") -> None:
        super().__init__(name, description)

    @property
    def produced_message_types(self) -> Sequence[type[BaseChatMessage]]:  # Which message types this agent produces
        return (TextMessage,)

    async def on_messages(self, messages: Sequence[BaseChatMessage], cancellation_token: CancellationToken) -> Response:
        # Always return a static response
        return Response(chat_message=TextMessage(content="Hello from AutoGen custom agent", source=self.name))

Notes:

  • Implementujte on_messages(...) a vraťte zprávu chatu Response .
  • Volitelně můžete implementovat on_reset(...) pro vymazání interního stavu mezi spuštěními.

Architektura agenta: Rozšíření baseagentu (s podporou vláken)

from collections.abc import AsyncIterable
from typing import Any
from agent_framework import (
    AgentRunResponse,
    AgentRunResponseUpdate,
    AgentThread,
    BaseAgent,
    ChatMessage,
    Role,
    TextContent,
)

class StaticAgent(BaseAgent):
    async def run(
        self,
        messages: str | ChatMessage | list[str] | list[ChatMessage] | None = None,
        *,
        thread: AgentThread | None = None,
        **kwargs: Any,
    ) -> AgentRunResponse:
        # Build a static reply
        reply = ChatMessage(role=Role.ASSISTANT, contents=[TextContent(text="Hello from AF custom agent")])

        # Persist conversation to the provided AgentThread (if any)
        if thread is not None:
            normalized = self._normalize_messages(messages)
            await self._notify_thread_of_new_messages(thread, normalized, reply)

        return AgentRunResponse(messages=[reply])

    async def run_stream(
        self,
        messages: str | ChatMessage | list[str] | list[ChatMessage] | None = None,
        *,
        thread: AgentThread | None = None,
        **kwargs: Any,
    ) -> AsyncIterable[AgentRunResponseUpdate]:
        # Stream the same static response in a single chunk for simplicity
        yield AgentRunResponseUpdate(contents=[TextContent(text="Hello from AF custom agent")], role=Role.ASSISTANT)

        # Notify thread of input and the complete response once streaming ends
        if thread is not None:
            reply = ChatMessage(role=Role.ASSISTANT, contents=[TextContent(text="Hello from AF custom agent")])
            normalized = self._normalize_messages(messages)
            await self._notify_thread_of_new_messages(thread, normalized, reply)

Notes:

  • AgentThreadudržuje stav konverzace externě; a agent.get_new_thread() předejte horun/run_stream .
  • Volání self._notify_thread_of_new_messages(thread, input_messages, response_messages) tak, aby vlákno má obě strany výměny.
  • Podívejte se na úplnou ukázku: Vlastní agent

V dalším kroku se podíváme na orchestraci s více agenty – oblast, ve které se architektury liší nejvíce.

Mapování funkcí s více agenty

Přehled programovacího modelu

Programovací modely s více agenty představují nejvýznamnější rozdíl mezi dvěma architekturami.

Přístup k duálnímu modelu AutoGenu

AutoGen poskytuje dva programovací modely:

  1. autogen-core: Nízkoúrovňové programování řízené událostmi a RoutedAgent odběry zpráv
  2. Team abstrakce: základní model orientovaný na běh založený na autogen-core
# Low-level autogen-core (complex)
class MyAgent(RoutedAgent):
    @message_handler
    async def handle_message(self, message: TextMessage, ctx: MessageContext) -> None:
        # Handle specific message types
        pass

# High-level Team (easier but limited)
team = RoundRobinGroupChat(
    participants=[agent1, agent2],
    termination_condition=StopAfterNMessages(5)
)
result = await team.run(task="Collaborate on this task")

Výzvy:

  • Model nízké úrovně je pro většinu uživatelů příliš složitý.
  • Model vysoké úrovně může být omezen na komplexní chování.
  • Přemostění mezi těmito dvěma modely zvyšuje složitost implementace.

Model jednotného pracovního postupu v rámci agenta

Agent Framework poskytuje jednu Workflow abstrakci, která kombinuje nejlepší z obou přístupů:

from agent_framework import WorkflowBuilder, executor, WorkflowContext
from typing_extensions import Never

# Assume we have agent1 and agent2 from previous examples
@executor(id="agent1")
async def agent1_executor(input_msg: str, ctx: WorkflowContext[str]) -> None:
    response = await agent1.run(input_msg)
    await ctx.send_message(response.text)

@executor(id="agent2")
async def agent2_executor(input_msg: str, ctx: WorkflowContext[Never, str]) -> None:
    response = await agent2.run(input_msg)
    await ctx.yield_output(response.text)  # Final output

# Build typed data flow graph
workflow = (WorkflowBuilder()
           .add_edge(agent1_executor, agent2_executor)
           .set_start_executor(agent1_executor)
           .build())

# Example usage (would be in async context)
# result = await workflow.run("Initial input")

Podrobné příklady pracovních postupů najdete tady:

Výhody:

  • Jednotný model: Jedna abstrakce pro všechny úrovně složitosti
  • Bezpečnost typů: Vstupy a výstupy silného typu
  • Vizualizace grafu: Vymazání reprezentace toku dat
  • Flexibilní složení: Mix agentů, funkcí a dílčích pracovních postupů

Pracovní postup vs. GraphFlow

Abstrakce rozhraní Agent Framework Workflow je inspirovaná experimentální GraphFlow funkcí AutoGenu, ale představuje významný vývoj ve filozofie návrhu:

  • GraphFlow: Tok řízení založený na tom, kde hrany jsou přechody a zprávy jsou vysílány všem agentům; přechody jsou podmíněny obsahem všesměrové zprávy.
  • Pracovní postup: Tok dat založený na tom, kde jsou zprávy směrovány přes konkrétní hrany a exekutory jsou aktivovány hrany s podporou souběžného spuštění.

Přehled vizuálu

Následující diagram kontrastuje s tokem řízení a tokem AutoGen GraphFlow (vlevo) s pracovním postupem toku dat v rozhraní Agent Framework (vpravo). GraphFlow modeluje agenty jako uzly s podmíněnými přechody a všesměrovými vysíláními. Exekutory modelů pracovních postupů (agenti, funkce nebo dílčí pracovní postupy) propojené podle zadaných hran; podporuje také pozastavení požadavků a odpovědí a vytváření kontrolních bodů.

flowchart LR

  subgraph AutoGenGraphFlow
    direction TB
    U[User / Task] --> A[Agent A]
    A -->|success| B[Agent B]
    A -->|retry| C[Agent C]
    A -. broadcast .- B
    A -. broadcast .- C
  end

  subgraph AgentFrameworkWorkflow
    direction TB
    I[Input] --> E1[Executor 1]
    E1 -->|"str"| E2[Executor 2]
    E1 -->|"image"| E3[Executor 3]
    E3 -->|"str"| E2
    E2 --> OUT[(Final Output)]
  end

  R[Request / Response Gate]
  E2 -. request .-> R
  R -. resume .-> E2

  CP[Checkpoint]
  E1 -. save .-> CP
  CP -. load .-> E1

V praxi:

  • GraphFlow používá agenty jako uzly a všesměrové zprávy; hrany představují podmíněné přechody.
  • Trasy pracovního postupu zadané zprávy podél okrajů Uzly (exekutory) můžou být agenti, čisté funkce nebo dílčí pracovní postupy.
  • Požadavek/odpověď umožňuje pozastavení pracovního postupu pro externí vstup; vytváření kontrolních bodů přetrvává a umožňuje obnovení.

Porovnání kódu

1) Sekvenční + podmíněný
# AutoGen GraphFlow (fluent builder) — writer → reviewer → editor (conditional)
from autogen_agentchat.agents import AssistantAgent
from autogen_agentchat.teams import DiGraphBuilder, GraphFlow

writer = AssistantAgent(name="writer", description="Writes a draft", model_client=client)
reviewer = AssistantAgent(name="reviewer", description="Reviews the draft", model_client=client)
editor = AssistantAgent(name="editor", description="Finalizes the draft", model_client=client)

graph = (
    DiGraphBuilder()
    .add_node(writer).add_node(reviewer).add_node(editor)
    .add_edge(writer, reviewer)  # always
    .add_edge(reviewer, editor, condition=lambda msg: "approve" in msg.to_model_text())
    .set_entry_point(writer)
).build()

team = GraphFlow(participants=[writer, reviewer, editor], graph=graph)
result = await team.run(task="Draft a short paragraph about solar power")
# Agent Framework Workflow — sequential executors with conditional logic
from agent_framework import WorkflowBuilder, executor, WorkflowContext
from typing_extensions import Never

@executor(id="writer")
async def writer_exec(task: str, ctx: WorkflowContext[str]) -> None:
    await ctx.send_message(f"Draft: {task}")

@executor(id="reviewer")
async def reviewer_exec(draft: str, ctx: WorkflowContext[str]) -> None:
    decision = "approve" if "solar" in draft.lower() else "revise"
    await ctx.send_message(f"{decision}:{draft}")

@executor(id="editor")
async def editor_exec(msg: str, ctx: WorkflowContext[Never, str]) -> None:
    if msg.startswith("approve:"):
        await ctx.yield_output(msg.split(":", 1)[1])
    else:
        await ctx.yield_output("Needs revision")

workflow_seq = (
    WorkflowBuilder()
    .add_edge(writer_exec, reviewer_exec)
    .add_edge(reviewer_exec, editor_exec)
    .set_start_executor(writer_exec)
    .build()
)
2) Fan-out + Join (ALL vs. ANY)
# AutoGen GraphFlow — A → (B, C) → D with ALL/ANY join
from autogen_agentchat.teams import DiGraphBuilder, GraphFlow
A, B, C, D = agent_a, agent_b, agent_c, agent_d

# ALL (default): D runs after both B and C
g_all = (
    DiGraphBuilder()
    .add_node(A).add_node(B).add_node(C).add_node(D)
    .add_edge(A, B).add_edge(A, C)
    .add_edge(B, D).add_edge(C, D)
    .set_entry_point(A)
).build()

# ANY: D runs when either B or C completes
g_any = (
    DiGraphBuilder()
    .add_node(A).add_node(B).add_node(C).add_node(D)
    .add_edge(A, B).add_edge(A, C)
    .add_edge(B, D, activation_group="join_d", activation_condition="any")
    .add_edge(C, D, activation_group="join_d", activation_condition="any")
    .set_entry_point(A)
).build()
# Agent Framework Workflow — A → (B, C) → aggregator (ALL vs ANY)
from agent_framework import WorkflowBuilder, executor, WorkflowContext
from typing_extensions import Never

@executor(id="A")
async def start(task: str, ctx: WorkflowContext[str]) -> None:
    await ctx.send_message(f"B:{task}", target_id="B")
    await ctx.send_message(f"C:{task}", target_id="C")

@executor(id="B")
async def branch_b(text: str, ctx: WorkflowContext[str]) -> None:
    await ctx.send_message(f"B_done:{text}")

@executor(id="C")
async def branch_c(text: str, ctx: WorkflowContext[str]) -> None:
    await ctx.send_message(f"C_done:{text}")

@executor(id="join_any")
async def join_any(msg: str, ctx: WorkflowContext[Never, str]) -> None:
    await ctx.yield_output(f"First: {msg}")  # ANY join (first arrival)

@executor(id="join_all")
async def join_all(msg: str, ctx: WorkflowContext[str, str]) -> None:
    state = await ctx.get_executor_state() or {"items": []}
    state["items"].append(msg)
    await ctx.set_executor_state(state)
    if len(state["items"]) >= 2:
        await ctx.yield_output(" | ".join(state["items"]))  # ALL join

wf_any = (
    WorkflowBuilder()
    .add_edge(start, branch_b).add_edge(start, branch_c)
    .add_edge(branch_b, join_any).add_edge(branch_c, join_any)
    .set_start_executor(start)
    .build()
)

wf_all = (
    WorkflowBuilder()
    .add_edge(start, branch_b).add_edge(start, branch_c)
    .add_edge(branch_b, join_all).add_edge(branch_c, join_all)
    .set_start_executor(start)
    .build()
)
3) Cílené směrování (bez vysílání)
from agent_framework import WorkflowBuilder, executor, WorkflowContext
from typing_extensions import Never

@executor(id="ingest")
async def ingest(task: str, ctx: WorkflowContext[str]) -> None:
    # Route selectively using target_id
    if task.startswith("image:"):
        await ctx.send_message(task.removeprefix("image:"), target_id="vision")
    else:
        await ctx.send_message(task, target_id="writer")

@executor(id="writer")
async def write(text: str, ctx: WorkflowContext[Never, str]) -> None:
    await ctx.yield_output(f"Draft: {text}")

@executor(id="vision")
async def caption(image_ref: str, ctx: WorkflowContext[Never, str]) -> None:
    await ctx.yield_output(f"Caption: {image_ref}")

workflow = (
    WorkflowBuilder()
    .add_edge(ingest, write)
    .add_edge(ingest, caption)
    .set_start_executor(ingest)
    .build()
)

# Example usage (async):
# await workflow.run("Summarize the benefits of solar power")
# await workflow.run("image:https://example.com/panel.jpg")

Co si všimnout:

  • GraphFlow vysílá zprávy a používá podmíněné přechody. Chování spojení se konfiguruje přes cílovou stranu activation a každou hranu activation_group/activation_condition(například seskupte obě hrany s ).join_dactivation_condition="any"
  • Pracovní postup směruje data explicitně; slouží target_id k výběru podřízených exekutorů. Chování spojení se nachází v přijímajícím exekutoru (například výnos při prvním vstupu vs. čekání na všechny) nebo prostřednictvím tvůrce/agregátorů orchestrace.
  • Exekutory v pracovním postupu jsou volné: zalomení ChatAgent, funkce nebo dílčího pracovního postupu a jejich kombinace v rámci stejného grafu.

Klíčové rozdíly

Následující tabulka shrnuje základní rozdíly mezi graphflow a pracovním postupem agenta AutoGen:

Aspekt AutoGen GraphFlow Pracovní postup agenta Framework
Typ toku Tok řízení (hrany jsou přechody) Tok dat (okraje směrují zprávy)
Typy uzlů Pouze agenti Agenti, funkce, dílčí pracovní postupy
Aktivace Vysílání zpráv Aktivace na základě hraničních zařízení
Bezpečnost typů Omezený Silné psaní v celém textu
Kompozičnost Omezený Vysoce kompozibilní

Vnoření vzorů

Automatické vnoření týmu

# Inner team
inner_team = RoundRobinGroupChat(
    participants=[specialist1, specialist2],
    termination_condition=StopAfterNMessages(3)
)

# Outer team with nested team as participant
outer_team = RoundRobinGroupChat(
    participants=[coordinator, inner_team, reviewer],  # Team as participant
    termination_condition=StopAfterNMessages(10)
)

# Messages are broadcasted to all participants including nested team
result = await outer_team.run("Complex task requiring collaboration")

Vlastnosti vnoření automatického genu:

  • Vnořený tým přijímá všechny zprávy od vnějšího týmu.
  • Vnořené týmové zprávy se odsílají všem účastníkům vnějšího týmu.
  • Kontext sdílené zprávy napříč všemi úrovněmi

Vnoření pracovních postupů v rozhraní Agent Framework

from agent_framework import WorkflowExecutor, WorkflowBuilder

# Assume we have executors from previous examples
# specialist1_executor, specialist2_executor, coordinator_executor, reviewer_executor

# Create sub-workflow
sub_workflow = (WorkflowBuilder()
               .add_edge(specialist1_executor, specialist2_executor)
               .set_start_executor(specialist1_executor)
               .build())

# Wrap as executor
sub_workflow_executor = WorkflowExecutor(
    workflow=sub_workflow,
    id="sub_process"
)

# Use in parent workflow
parent_workflow = (WorkflowBuilder()
                  .add_edge(coordinator_executor, sub_workflow_executor)
                  .add_edge(sub_workflow_executor, reviewer_executor)
                  .set_start_executor(coordinator_executor)
                  .build())

Vlastnosti vnořování rozhraní Agent Framework:

  • Izolovaný vstup/výstup prostřednictvím WorkflowExecutor
  • Žádné vysílání zpráv – data procházejí konkrétními připojeními
  • Nezávislá správa stavu pro každou úroveň pracovního postupu

Vzory skupinového chatu

Vzory skupinového chatu umožňují více agentům spolupracovat na složitých úlohách. Tady je postup, jak se mezi architekturami překládají běžné vzory.

Model RoundRobinGroupChat

Implementace autogenu:

from autogen_agentchat.teams import RoundRobinGroupChat
from autogen_agentchat.conditions import StopAfterNMessages

team = RoundRobinGroupChat(
    participants=[agent1, agent2, agent3],
    termination_condition=StopAfterNMessages(10)
)
result = await team.run("Discuss this topic")

Implementace agenta Framework:

from agent_framework import SequentialBuilder, WorkflowOutputEvent

# Assume we have agent1, agent2, agent3 from previous examples
# Sequential workflow through participants
workflow = SequentialBuilder().participants([agent1, agent2, agent3]).build()

# Example usage (would be in async context)
async def sequential_example():
    # Each agent appends to shared conversation
    async for event in workflow.run_stream("Discuss this topic"):
        if isinstance(event, WorkflowOutputEvent):
            conversation_history = event.data  # list[ChatMessage]

Podrobné příklady orchestrace najdete tady:

Pro vzory souběžného spouštění poskytuje rozhraní Agent Framework také:

from agent_framework import ConcurrentBuilder, WorkflowOutputEvent

# Assume we have agent1, agent2, agent3 from previous examples
# Concurrent workflow for parallel processing
workflow = (ConcurrentBuilder()
           .participants([agent1, agent2, agent3])
           .build())

# Example usage (would be in async context)
async def concurrent_example():
    # All agents process the input concurrently
    async for event in workflow.run_stream("Process this in parallel"):
        if isinstance(event, WorkflowOutputEvent):
            results = event.data  # Combined results from all agents

Příklady souběžného spouštění najdete tady:

MagenticOneGroupChat Pattern

Implementace autogenu:

from autogen_agentchat.teams import MagenticOneGroupChat

team = MagenticOneGroupChat(
    participants=[researcher, coder, executor],
    model_client=coordinator_client,
    termination_condition=StopAfterNMessages(20)
)
result = await team.run("Complex research and analysis task")

Implementace agenta Framework:

from agent_framework import (
    MagenticBuilder, MagenticCallbackMode, WorkflowOutputEvent,
    MagenticCallbackEvent, MagenticOrchestratorMessageEvent, MagenticAgentDeltaEvent
)

# Assume we have researcher, coder, and coordinator_client from previous examples
async def on_event(event: MagenticCallbackEvent) -> None:
    if isinstance(event, MagenticOrchestratorMessageEvent):
        print(f"[ORCHESTRATOR]: {event.message.text}")
    elif isinstance(event, MagenticAgentDeltaEvent):
        print(f"[{event.agent_id}]: {event.text}", end="")

workflow = (MagenticBuilder()
           .participants(researcher=researcher, coder=coder)
           .on_event(on_event, mode=MagenticCallbackMode.STREAMING)
           .with_standard_manager(
               chat_client=coordinator_client,
               max_round_count=20,
               max_stall_count=3,
               max_reset_count=2
           )
           .build())

# Example usage (would be in async context)
async def magentic_example():
    async for event in workflow.run_stream("Complex research task"):
        if isinstance(event, WorkflowOutputEvent):
            final_result = event.data

Možnosti přizpůsobení rozhraní Agent Framework:

Magentický pracovní postup poskytuje rozsáhlé možnosti přizpůsobení:

  • Konfigurace správce: Vlastní modely orchestrátoru a výzvy
  • Limity zaokrouhlení: max_round_count, max_stall_countmax_reset_count
  • Zpětná volání událostí: Streamování v reálném čase s podrobným filtrováním událostí
  • Specializace agenta: Vlastní instrukce a nástroje na agenta
  • Režimy zpětného volání: STREAMING aktualizace v reálném čase nebo BATCH konečné výsledky
  • Plánování smyček pro člověka: Vlastní funkce plánovače pro interaktivní pracovní postupy
# Advanced customization example with human-in-the-loop
from agent_framework.openai import OpenAIChatClient
from agent_framework import MagenticBuilder, MagenticCallbackMode, MagenticPlannerContext

# Assume we have researcher_agent, coder_agent, analyst_agent, detailed_event_handler
# and get_human_input function defined elsewhere

async def custom_planner(context: MagenticPlannerContext) -> str:
    """Custom planner with human input for critical decisions."""
    if context.round_count > 5:
        # Request human input for complex decisions
        return await get_human_input(f"Next action for: {context.current_state}")
    return "Continue with automated planning"

workflow = (MagenticBuilder()
           .participants(
               researcher=researcher_agent,
               coder=coder_agent,
               analyst=analyst_agent
           )
           .with_standard_manager(
               chat_client=OpenAIChatClient(model_id="gpt-5"),
               max_round_count=15,      # Limit total rounds
               max_stall_count=2,       # Prevent infinite loops
               max_reset_count=1,       # Allow one reset on failure
               orchestrator_prompt="Custom orchestration instructions..."
           )
           .with_planner(custom_planner)  # Human-in-the-loop planning
           .on_event(detailed_event_handler, mode=MagenticCallbackMode.STREAMING)
           .build())

Podrobné příklady magentického zobrazení najdete tady:

Budoucí vzory

Plán nasazení agenta zahrnuje několik vzorů AutoGen, které jsou aktuálně ve vývoji:

  • Vzor Swarm: Koordinace agentů založená na předávání
  • SelectorGroupChat: Výběr mluvčího řízeného LLM

Human-in-the-Loop with Request Response

Klíčovou novou funkcí v architektuře Workflow Agent Framework je koncept požadavku a odpovědi, která umožňuje pracovním postupům pozastavit provádění a čekat na externí vstup, než budete pokračovat. Tato funkce není k dispozici v abstrakci AutoGenu Team a umožňuje sofistikované vzory human-in-the-loop.

Omezení autogenu

Abstrakce AutoGen Team se spouští nepřetržitě po spuštění a neposkytuje integrované mechanismy pro pozastavení provádění pro lidské vstupy. Všechny funkce typu human-in-the-loop vyžadují vlastní implementace mimo architekturu.

Rozhraní API Request-Response agenta

Agent Framework poskytuje integrované možnosti odezvy požadavků, kde jakýkoli exekutor může odesílat požadavky pomocí ctx.request_info() dekorátoru @response_handler a zpracovávat odpovědi.

from agent_framework import (
    RequestInfoEvent, WorkflowBuilder, WorkflowContext, 
    Executor, handler, response_handler
)
from dataclasses import dataclass

# Assume we have agent_executor defined elsewhere

# Define typed request payload
@dataclass
class ApprovalRequest:
    """Request human approval for agent output."""
    content: str = ""
    agent_name: str = ""

# Workflow executor that requests human approval
class ReviewerExecutor(Executor):
    
    @handler
    async def review_content(
        self,
        agent_response: str,
        ctx: WorkflowContext
    ) -> None:
        # Request human input with structured data
        approval_request = ApprovalRequest(
            content=agent_response,
            agent_name="writer_agent"
        )
        await ctx.request_info(request_data=approval_request, response_type=str)
    
    @response_handler
    async def handle_approval_response(
        self,
        original_request: ApprovalRequest,
        decision: str,
        ctx: WorkflowContext
    ) -> None:
        decision_lower = decision.strip().lower()
        original_content = original_request.content

        if decision_lower == "approved":
            await ctx.yield_output(f"APPROVED: {original_content}")
        else:
            await ctx.yield_output(f"REVISION NEEDED: {decision}")

# Build workflow with human-in-the-loop
reviewer = ReviewerExecutor(id="reviewer")

workflow = (WorkflowBuilder()
           .add_edge(agent_executor, reviewer)
           .set_start_executor(agent_executor)
           .build())

Spouštění pracovních postupů human-in-the-loop

Agent Framework poskytuje rozhraní API pro streamování pro zpracování cyklu pozastavení a obnovení:

from agent_framework import RequestInfoEvent, WorkflowOutputEvent

# Assume we have workflow defined from previous examples
async def run_with_human_input():
    pending_responses = None
    completed = False

    while not completed:
        # First iteration uses run_stream, subsequent use send_responses_streaming
        stream = (
            workflow.send_responses_streaming(pending_responses)
            if pending_responses
            else workflow.run_stream("initial input")
        )

        events = [event async for event in stream]
        pending_responses = None

        # Collect human requests and outputs
        for event in events:
            if isinstance(event, RequestInfoEvent):
                # Display request to human and collect response
                request_data = event.data  # ApprovalRequest instance
                print(f"Review needed: {request_data.content}")

                human_response = input("Enter 'approved' or revision notes: ")
                pending_responses = {event.request_id: human_response}

            elif isinstance(event, WorkflowOutputEvent):
                print(f"Final result: {event.data}")
                completed = True

Příklady pracovních postupů human-in-the-loop najdete tady:

Vytváření kontrolních bodů a obnovení pracovních postupů

Další klíčovou výhodou architektury Agent Framework Workflow oproti abstrakci AutoGen Team je integrovaná podpora pro vytváření kontrolních bodů a obnovení provádění. To umožňuje pozastavit, zachovat a obnovit pracovní postupy později z jakéhokoli kontrolního bodu a zajistit odolnost proti chybám a povolit dlouhotrvající nebo asynchronní pracovní postupy.

Omezení autogenu

Abstrakce AutoGen Team neposkytuje integrované možnosti vytváření kontrolních bodů. Všechny mechanismy trvalosti nebo obnovení musí být implementovány externě, často vyžadují složitou správu stavu a logiku serializace.

Vytváření kontrolních bodů v rozhraní Agent Framework

Agent Framework poskytuje komplexní kontrolní body prostřednictvím FileCheckpointStorage a metodu with_checkpointing() na WorkflowBuilder. Zachytávání kontrolních bodů:

  • Stav exekutoru: Místní stav pro každý exekutor pomocí ctx.set_executor_state()
  • Sdílený stav: Stav křížového exekutoru pomocí ctx.set_shared_state()
  • Fronty zpráv: Čekající zprávy mezi exekutory
  • Pozice pracovního postupu: Aktuální průběh provádění a další kroky
from agent_framework import (
    FileCheckpointStorage, WorkflowBuilder, WorkflowContext,
    Executor, handler
)
from typing_extensions import Never

class ProcessingExecutor(Executor):
    @handler
    async def process(self, data: str, ctx: WorkflowContext[str]) -> None:
        # Process the data
        result = f"Processed: {data.upper()}"
        print(f"Processing: '{data}' -> '{result}'")

        # Persist executor-local state
        prev_state = await ctx.get_executor_state() or {}
        count = prev_state.get("count", 0) + 1
        await ctx.set_executor_state({
            "count": count,
            "last_input": data,
            "last_output": result
        })

        # Persist shared state for other executors
        await ctx.set_shared_state("original_input", data)
        await ctx.set_shared_state("processed_output", result)

        await ctx.send_message(result)

class FinalizeExecutor(Executor):
    @handler
    async def finalize(self, data: str, ctx: WorkflowContext[Never, str]) -> None:
        result = f"Final: {data}"
        await ctx.yield_output(result)

# Configure checkpoint storage
checkpoint_storage = FileCheckpointStorage(storage_path="./checkpoints")
processing_executor = ProcessingExecutor(id="processing")
finalize_executor = FinalizeExecutor(id="finalize")

# Build workflow with checkpointing enabled
workflow = (WorkflowBuilder()
           .add_edge(processing_executor, finalize_executor)
           .set_start_executor(processing_executor)
           .with_checkpointing(checkpoint_storage=checkpoint_storage)  # Enable checkpointing
           .build())

# Example usage (would be in async context)
async def checkpoint_example():
    # Run workflow - checkpoints are created automatically
    async for event in workflow.run_stream("input data"):
        print(f"Event: {event}")

Obnovení z kontrolních bodů

Agent Framework poskytuje rozhraní API pro výpis, kontrolu a obnovení z konkrétních kontrolních bodů:

from typing_extensions import Never

from agent_framework import (
    Executor,
    FileCheckpointStorage,
    WorkflowContext,
    WorkflowBuilder,
    get_checkpoint_summary,
    handler,
)

class UpperCaseExecutor(Executor):
    @handler
    async def process(self, text: str, ctx: WorkflowContext[str]) -> None:
        result = text.upper()
        await ctx.send_message(result)

class ReverseExecutor(Executor):
    @handler
    async def process(self, text: str, ctx: WorkflowContext[Never, str]) -> None:
        result = text[::-1]
        await ctx.yield_output(result)

def create_workflow(checkpoint_storage: FileCheckpointStorage):
    """Create a workflow with two executors and checkpointing."""
    upper_executor = UpperCaseExecutor(id="upper")
    reverse_executor = ReverseExecutor(id="reverse")

    return (WorkflowBuilder()
           .add_edge(upper_executor, reverse_executor)
           .set_start_executor(upper_executor)
           .with_checkpointing(checkpoint_storage=checkpoint_storage)
           .build())

# Assume we have checkpoint_storage from previous examples
checkpoint_storage = FileCheckpointStorage(storage_path="./checkpoints")

async def checkpoint_resume_example():
    # List available checkpoints
    checkpoints = await checkpoint_storage.list_checkpoints()

    # Display checkpoint information
    for checkpoint in checkpoints:
        summary = get_checkpoint_summary(checkpoint)
        print(f"Checkpoint {summary.checkpoint_id}: iteration={summary.iteration_count}")

    # Resume from a specific checkpoint
    if checkpoints:
        chosen_checkpoint_id = checkpoints[0].checkpoint_id

        # Create new workflow instance and resume
        new_workflow = create_workflow(checkpoint_storage)
        async for event in new_workflow.run_stream(
            checkpoint_id=chosen_checkpoint_id,
            checkpoint_storage=checkpoint_storage
        ):
            print(f"Resumed event: {event}")

Pokročilé funkce kontrolních bodů

Kontrolní bod s integrací human-in-the-loop:

Vytváření kontrolních bodů bez problémů funguje s pracovními postupy lidské in-the-smyčky, což umožňuje pozastavit pracovní postupy pro vstup člověka a obnovit je později. Při obnovení z kontrolního bodu, který obsahuje čekající požadavky, se tyto žádosti znovu vygenerují jako události:

# Assume we have workflow, checkpoint_id, and checkpoint_storage from previous examples
async def resume_with_pending_requests_example():
    # Resume from checkpoint - pending requests will be re-emitted
    request_info_events = []
    async for event in workflow.run_stream(
        checkpoint_id=checkpoint_id,
        checkpoint_storage=checkpoint_storage
    ):
        if isinstance(event, RequestInfoEvent):
            request_info_events.append(event)

    # Handle re-emitted pending request
    responses = {}
    for event in request_info_events:
        response = handle_request(event.data)
        responses[event.request_id] = response

    # Send response back to workflow
    async for event in workflow.send_responses_streaming(responses):
        print(f"Event: {event}")

Klíčové výhody

V porovnání s funkcí AutoGen poskytuje kontrolní body rozhraní Agent Framework:

  • Automatická trvalost: Nevyžaduje se žádná ruční správa stavu
  • Podrobné obnovení: Obnovení z libovolné hranice superkroku
  • Izolace stavu: Samostatný exekutor – místní a sdílený stav
  • Integrace mezi lidmi ve smyčce: Bezproblémové pozastavení a obnovení s lidským vstupem
  • Odolnost proti chybám: Robustní obnovení ze selhání nebo přerušení

Praktické příklady

Komplexní příklady kontrolních bodů najdete tady:


Observability

Funkce AutoGen i Agent Framework poskytují možnosti pozorovatelnosti, ale s různými přístupy a funkcemi.

Pozorovatelnost automatickéhogenu

AutoGen má nativní podporu OpenTelemetry s instrumentací pro:

  • Trasování za běhu: SingleThreadedAgentRuntime a GrpcWorkerAgentRuntime
  • Provádění nástrojů: BaseTool s execute_tool využitím sémantických konvencí GenAI
  • Operace agenta: BaseChatAgent s create_agent rozsahy a invoke_agent rozsahy
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from autogen_core import SingleThreadedAgentRuntime

# Configure OpenTelemetry
tracer_provider = TracerProvider()
trace.set_tracer_provider(tracer_provider)

# Pass to runtime
runtime = SingleThreadedAgentRuntime(tracer_provider=tracer_provider)

Pozorovatelnost v rozhraní Agent Framework

Agent Framework poskytuje komplexní pozorovatelnost prostřednictvím více přístupů:

  • Nastavení nulového kódu: Automatická instrumentace prostřednictvím proměnných prostředí
  • Ruční konfigurace: Programové nastavení s vlastními parametry
  • Bohatá telemetrie: Agenti, pracovní postupy a sledování spouštění nástrojů
  • Výstup konzoly: Integrované protokolování konzoly a vizualizace
from agent_framework import ChatAgent
from agent_framework.observability import setup_observability
from agent_framework.openai import OpenAIChatClient

# Zero-code setup via environment variables
# Set ENABLE_OTEL=true
# Set OTLP_ENDPOINT=http://localhost:4317

# Or manual setup
setup_observability(
    otlp_endpoint="http://localhost:4317"
)

# Create client for the example
client = OpenAIChatClient(model_id="gpt-5")

async def observability_example():
    # Observability is automatically applied to all agents and workflows
    agent = ChatAgent(name="assistant", chat_client=client)
    result = await agent.run("Hello")  # Automatically traced

Klíčové rozdíly:

  • Složitost instalace: Architektura agenta nabízí jednodušší možnosti nastavení bez kódu
  • Rozsah: Architektura agenta poskytuje širší pokrytí, včetně pozorovatelnosti na úrovni pracovního postupu
  • Vizualizace: Architektura agenta zahrnuje integrovaný výstup konzoly a vývojové uživatelské rozhraní
  • Konfigurace: Architektura agenta nabízí flexibilnější možnosti konfigurace

Podrobné příklady pozorovatelnosti najdete tady:


Conclusion

Tento průvodce migrací poskytuje komplexní mapování mezi rozhraním AutoGen a Microsoft Agent Framework, které pokrývá vše od základního vytvoření agenta až po složité pracovní postupy s více agenty. Klíčové poznatky pro migraci:

  • Migrace s jedním agentem je jednoduchá, s podobnými rozhraními API a rozšířenými možnostmi v rozhraní Agent Framework
  • Vzory s více agenty vyžadují přemýšlení přístupu z architektur založených na událostech až po architektury založené na toku dat, ale pokud už graphFlow znáte, přechod bude jednodušší.
  • Agent Framework nabízí další funkce, jako je middleware, hostované nástroje a typové pracovní postupy.

Další příklady a podrobné pokyny k implementaci najdete v adresáři ukázek rozhraní Agent Framework .

Další ukázkové kategorie

Agent Framework poskytuje ukázky v několika dalších důležitých oblastech:

Další kroky