Poznámka:
Přístup k této stránce vyžaduje autorizaci. Můžete se zkusit přihlásit nebo změnit adresáře.
Přístup k této stránce vyžaduje autorizaci. Můžete zkusit změnit adresáře.
Komplexní průvodce migrací z autogenu na sadu Python SDK microsoft Agent Framework
Seznam obsahu
- Pozadí
- Klíčové podobnosti a rozdíly
- Vytvoření a konfigurace klienta modelu
-
Mapování funkcíSingle-Agent
- Vytvoření a spuštění základního agenta
- Správa stavu konverzace pomocí agentaThread
- Ekvivalence agenta OpenAI Assistant
- Podpora streamování
- Typy a vytváření zpráv
- Vytváření a integrace nástrojů
- Hostované nástroje (exkluzivní rozhraní agenta)
- Podpora serveru MCP
- Vzor agenta jako nástroje
- Middleware (funkce rozhraní agenta)
- Vlastní agenti
- Mapování funkcí s více agenty
- pozorovatelnosti
- závěr
Background
AutoGen je architektura pro vytváření agentů AI a systémů s více agenty pomocí rozsáhlých jazykových modelů (LLM). Začal jako výzkumný projekt v Microsoft Research a povýšil několik konceptů orchestrace multi-agentů, jako je GroupChat a modul runtime agentů řízený událostmi. Projekt byl plodnou spolupráci opensourcové komunity a mnoho důležitých funkcí pochází od externích přispěvatelů.
Microsoft Agent Framework je nová sada SDK s více jazyky pro vytváření agentů a pracovních postupů AI pomocí LLM. Představuje významný vývoj myšlenek průkopnických v AutoGenu a zahrnuje poznatky získané z reálného využití. Vyvíjí ho základní týmy AutoGen a Sémantic Kernel v Microsoftu a jsou navržené tak, aby byly novým základem pro vytváření aplikací umělé inteligence.
Tato příručka popisuje praktickou cestu migrace: začíná pokrytím toho, co zůstane stejné a co se změní na první pohled. Pak se zabývá nastavením klienta modelu, funkcemi s jedním agentem a nakonec orchestrací s více agenty s konkrétním kódem vedle sebe. Odkazy na spustitelné ukázky v úložišti Agent Framework vám pomůžou ověřit jednotlivé kroky.
Klíčové podobnosti a rozdíly
Co zůstane stejné
Základy jsou známé. Stále vytváříte agenty kolem klienta modelu, poskytnete pokyny a připojíte nástroje. Obě knihovny podporují nástroje ve stylu funkcí, streamování tokenů, multimodální obsah a asynchronní vstupně-výstupní operace.
# Both frameworks follow similar patterns
# AutoGen
agent = AssistantAgent(name="assistant", model_client=client, tools=[my_tool])
result = await agent.run(task="Help me with this task")
# Agent Framework
agent = ChatAgent(name="assistant", chat_client=client, tools=[my_tool])
result = await agent.run("Help me with this task")
Klíčové rozdíly
Styl orchestrace: AutoGen spáruje jádro řízené událostmi s vysokou úrovní
Team. Agent Framework se zaměřuje na typový graf,Workflowkterý směruje data podél hran a aktivuje exekutory, když jsou vstupy připravené.Nástroje: AutoGen zabalí funkce pomocí
FunctionToolfunkce . Agent Framework používá@ai_function, odvozuje schémata automaticky a přidává hostované nástroje, jako je interpret kódu a webové vyhledávání.Chování agenta:
AssistantAgentje jedno otočení, pokud nezvyšovatmax_tool_iterations.ChatAgentje ve výchozím nastavení vícenásobný a pořád vyvolává nástroje, dokud nevrátí konečnou odpověď.Modul runtime: AutoGen nabízí vložené a experimentální distribuované moduly runtime. Agent Framework se dnes zaměřuje na složení s jedním procesem; Distribuované spuštění je naplánováno.
Vytvoření a konfigurace klienta modelu
Obě architektury poskytují klientům modelu pro hlavní poskytovatele AI, s podobnými, ale ne identickými rozhraními API.
| Vlastnost | AutoGen | Agentní rámec |
|---|---|---|
| Klient OpenAI | OpenAIChatCompletionClient |
OpenAIChatClient |
| Klient odpovědí OpenAI | ❌ Není k dispozici | OpenAIResponsesClient |
| Azure OpenAI | AzureOpenAIChatCompletionClient |
AzureOpenAIChatClient |
| Odpovědi Azure OpenAI | ❌ Není k dispozici | AzureOpenAIResponsesClient |
| Azure AI | AzureAIChatCompletionClient |
AzureAIAgentClient |
| Antropický | AnthropicChatCompletionClient |
🚧 Plánovaný |
| Ollama | OllamaChatCompletionClient |
🚧 Plánovaný |
| Cacheování |
ChatCompletionCache obal |
🚧 Plánovaný |
Klienti modelu AutoGen
from autogen_ext.models.openai import OpenAIChatCompletionClient, AzureOpenAIChatCompletionClient
# OpenAI
client = OpenAIChatCompletionClient(
model="gpt-5",
api_key="your-key"
)
# Azure OpenAI
client = AzureOpenAIChatCompletionClient(
azure_endpoint="https://your-endpoint.openai.azure.com/",
azure_deployment="gpt-5",
api_version="2024-12-01",
api_key="your-key"
)
Agent Framework ChatClients
from agent_framework.openai import OpenAIChatClient
from agent_framework.azure import AzureOpenAIChatClient
# OpenAI (reads API key from environment)
client = OpenAIChatClient(model_id="gpt-5")
# Azure OpenAI (uses environment or default credentials; see samples for auth options)
client = AzureOpenAIChatClient(model_id="gpt-5")
Podrobné příklady najdete tady:
- Chatovací klient OpenAI – Základní nastavení klienta OpenAI
- Klient chatu Azure OpenAI – Azure OpenAI s ověřováním
- Klient Azure AI – Integrace agenta Azure AI
Podpora rozhraní API pro odpovědi (exkluzivní rozhraní Agent Framework)
Agent Framework AzureOpenAIResponsesClient poskytuje OpenAIResponsesClient specializovanou podporu pro zdůvodňování modelů a strukturovaných odpovědí, které nejsou v autogenu k dispozici:
from agent_framework.azure import AzureOpenAIResponsesClient
from agent_framework.openai import OpenAIResponsesClient
# Azure OpenAI with Responses API
azure_responses_client = AzureOpenAIResponsesClient(model_id="gpt-5")
# OpenAI with Responses API
openai_responses_client = OpenAIResponsesClient(model_id="gpt-5")
Příklady rozhraní API pro odpovědi najdete tady:
- Azure Responses Client Basic – Azure OpenAI s odpověďmi
- Základní klient odpovědi OpenAI – Integrace odpovědí OpenAI
Mapování funkcí Single-Agent
Tato část mapuje funkce s jedním agentem mezi autogen a architekturou agenta. Pomocí klienta vytvořte agenta, připojte nástroje a vyberte si mezi prováděním streamování a streamováním.
Vytvoření a spuštění základního agenta
Jakmile máte nakonfigurovaného klienta modelu, dalším krokem je vytváření agentů. Obě architektury poskytují podobné abstrakce agenta, ale s různými výchozími chováními a možnostmi konfigurace.
AutoGen AssistantAgent
from autogen_agentchat.agents import AssistantAgent
agent = AssistantAgent(
name="assistant",
model_client=client,
system_message="You are a helpful assistant.",
tools=[my_tool],
max_tool_iterations=1 # Single-turn by default
)
# Execution
result = await agent.run(task="What's the weather?")
Agent Framework ChatAgent
from agent_framework import ChatAgent, ai_function
from agent_framework.openai import OpenAIChatClient
# Create simple tools for the example
@ai_function
def get_weather(location: str) -> str:
"""Get weather for a location."""
return f"Weather in {location}: sunny"
@ai_function
def get_time() -> str:
"""Get current time."""
return "Current time: 2:30 PM"
# Create client
client = OpenAIChatClient(model_id="gpt-5")
async def example():
# Direct creation
agent = ChatAgent(
name="assistant",
chat_client=client,
instructions="You are a helpful assistant.",
tools=[get_weather] # Multi-turn by default
)
# Factory method (more convenient)
agent = client.create_agent(
name="assistant",
instructions="You are a helpful assistant.",
tools=[get_weather]
)
# Execution with runtime tool configuration
result = await agent.run(
"What's the weather?",
tools=[get_time], # Can add tools at runtime
tool_choice="auto"
)
Klíčové rozdíly:
-
Výchozí chování:
ChatAgentautomaticky iteruje prostřednictvím volání nástrojů, zatímcoAssistantAgentvyžaduje explicitnímax_tool_iterationsnastavení -
Konfigurace modulu runtime:
ChatAgent.run()přijímátoolsatool_choiceparametry pro přizpůsobení volání - Metody továrny: Agent Framework poskytuje pohodlné metody továrny přímo z chatovacích klientů
-
Správa stavu:
ChatAgentje bezstavová a neudržuje historii konverzací mezi vyvoláním, na rozdíl odAssistantAgenttoho, že udržuje historii konverzací jako součást svého stavu.
Správa stavu konverzace pomocí agentaThread
Pokud chcete pokračovat v ChatAgentkonverzacích, použijte AgentThread ke správě historie konverzací:
# Assume we have an agent from previous examples
async def conversation_example():
# Create a new thread that will be reused
thread = agent.get_new_thread()
# First interaction - thread is empty
result1 = await agent.run("What's 2+2?", thread=thread)
print(result1.text) # "4"
# Continue conversation - thread contains previous messages
result2 = await agent.run("What about that number times 10?", thread=thread)
print(result2.text) # "40" (understands "that number" refers to 4)
# AgentThread can use external storage, similar to ChatCompletionContext in AutoGen
Bezstavová ve výchozím nastavení: rychlá ukázka
# Without a thread (two independent invocations)
r1 = await agent.run("What's 2+2?")
print(r1.text) # for example, "4"
r2 = await agent.run("What about that number times 10?")
print(r2.text) # Likely ambiguous without prior context; cannot be "40"
# With a thread (shared context across calls)
thread = agent.get_new_thread()
print((await agent.run("What's 2+2?", thread=thread)).text) # "4"
print((await agent.run("What about that number times 10?", thread=thread)).text) # "40"
Příklady správy vláken najdete tady:
- Azure AI s vláknem – Správa stavu konverzací
- Chatovací klient OpenAI s vlákny – vzory použití vláken
- Vlákna s podporou Redis – zachování stavu konverzace externě
Ekvivalence agenta OpenAI Assistant
Obě architektury poskytují integraci rozhraní API pro OpenAI Assistant:
# AutoGen OpenAIAssistantAgent
from autogen_ext.agents.openai import OpenAIAssistantAgent
# Agent Framework has OpenAI Assistants support via OpenAIAssistantsClient
from agent_framework.openai import OpenAIAssistantsClient
Příklady pomocníka OpenAI najdete tady:
- Základní nastavení pomocníka OpenAI – Základní pomocník
- OpenAI Assistants with Function Tools – Integrace vlastních nástrojů
- Azure OpenAI Assistants Basic – Nastavení pomocníka Azure
- Asistenti OpenAI s vlákny – správa vláken
Podpora streamování
Obě architektury streamují tokeny v reálném čase od klientů i z agentů, aby uživatelská rozhraní reagovala.
Automatické streamování
# Model client streaming
async for chunk in client.create_stream(messages):
if isinstance(chunk, str):
print(chunk, end="")
# Agent streaming
async for event in agent.run_stream(task="Hello"):
if isinstance(event, ModelClientStreamingChunkEvent):
print(event.content, end="")
elif isinstance(event, TaskResult):
print("Final result received")
Streamování rozhraní Agent Framework
# Assume we have client, agent, and tools from previous examples
async def streaming_example():
# Chat client streaming
async for chunk in client.get_streaming_response("Hello", tools=tools):
if chunk.text:
print(chunk.text, end="")
# Agent streaming
async for chunk in agent.run_stream("Hello"):
if chunk.text:
print(chunk.text, end="", flush=True)
Tip: V rozhraní Agent Framework mají klienti i agenti stejný tvar aktualizace; v obou případech si můžete přečíst chunk.text .
Typy a vytváření zpráv
Pochopení fungování zpráv je zásadní pro efektivní komunikaci agenta. Obě architektury poskytují různé přístupy k vytváření a zpracování zpráv pomocí autoGen pomocí samostatných tříd zpráv a agenta Framework pomocí sjednoceného systému zpráv.
Typy zpráv AutoGen
from autogen_agentchat.messages import TextMessage, MultiModalMessage
from autogen_core.models import UserMessage
# Text message
text_msg = TextMessage(content="Hello", source="user")
# Multi-modal message
multi_modal_msg = MultiModalMessage(
content=["Describe this image", image_data],
source="user"
)
# Convert to model format for use with model clients
user_message = text_msg.to_model_message()
Typy zpráv rozhraní Agent Framework
from agent_framework import ChatMessage, TextContent, DataContent, UriContent, Role
import base64
# Text message
text_msg = ChatMessage(role=Role.USER, text="Hello")
# Supply real image bytes, or use a data: URI/URL via UriContent
image_bytes = b"<your_image_bytes>"
image_b64 = base64.b64encode(image_bytes).decode()
image_uri = f"data:image/jpeg;base64,{image_b64}"
# Multi-modal message with mixed content
multi_modal_msg = ChatMessage(
role=Role.USER,
contents=[
TextContent(text="Describe this image"),
DataContent(uri=image_uri, media_type="image/jpeg")
]
)
Klíčové rozdíly:
- AutoGen používá samostatné třídy zpráv (
TextMessage,MultiModalMessage) s polemsource. - Agent Framework používá sjednocenou
ChatMessages objekty typu obsahu a polemrole. - Zprávy agenta Framework používají
Rolevýčet (USER, ASSISTANT, SYSTEM, TOOL) místo zdrojů řetězců.
Vytváření a integrace nástrojů
Nástroje rozšiřují možnosti agenta nad rámec generování textu. Architektury mají různé přístupy k vytváření nástrojů a rozhraní Agent Framework poskytuje automatizovanější generování schématu.
AutoGen FunctionTool
from autogen_core.tools import FunctionTool
async def get_weather(location: str) -> str:
"""Get weather for a location."""
return f"Weather in {location}: sunny"
# Manual tool creation
tool = FunctionTool(
func=get_weather,
description="Get weather information"
)
# Use with agent
agent = AssistantAgent(name="assistant", model_client=client, tools=[tool])
Agent Framework @ai_function
from agent_framework import ai_function
from typing import Annotated
from pydantic import Field
@ai_function
def get_weather(
location: Annotated[str, Field(description="The location to get weather for")]
) -> str:
"""Get weather for a location."""
return f"Weather in {location}: sunny"
# Direct use with agent (automatic conversion)
agent = ChatAgent(name="assistant", chat_client=client, tools=[get_weather])
Podrobné příklady najdete tady:
- OpenAI Chat Agent Basic – Jednoduchý chatovací agent OpenAI
- OpenAI s nástroji funkcí – Agent s vlastními nástroji
- Azure OpenAI Basic – Nastavení agenta Azure OpenAI
Hostované nástroje (exkluzivní rozhraní agenta)
Agent Framework poskytuje hostované nástroje, které nejsou dostupné v autogenu:
from agent_framework import ChatAgent, HostedCodeInterpreterTool, HostedWebSearchTool
from agent_framework.azure import AzureOpenAIChatClient
# Azure OpenAI client with a model that supports hosted tools
client = AzureOpenAIChatClient(model_id="gpt-5")
# Code execution tool
code_tool = HostedCodeInterpreterTool()
# Web search tool
search_tool = HostedWebSearchTool()
agent = ChatAgent(
name="researcher",
chat_client=client,
tools=[code_tool, search_tool]
)
Podrobné příklady najdete tady:
- Azure AI s interpretem kódu – nástroj pro spouštění kódu
- Azure AI s více nástroji – Více hostovaných nástrojů
- OpenAI s webovým vyhledáváním – integrace vyhledávání na webu
Požadavky a upozornění:
- Hostované nástroje jsou dostupné jenom u modelů nebo účtů, které je podporují. Před povolením těchto nástrojů ověřte nároky a podporu modelů pro vašeho poskytovatele.
- Konfigurace se liší podle poskytovatele; pro nastavení a oprávnění postupujte podle požadavků v každé ukázce.
- Ne každý model podporuje každý hostovaný nástroj (například webový vyhledávací vs. interpret kódu). Zvolte kompatibilní model ve vašem prostředí.
Poznámka:
AutoGen podporuje místní nástroje pro spouštění kódu, ale tato funkce se plánuje pro budoucí verze agenta Framework.
Klíčový rozdíl: Agent Framework zpracovává iteraci nástrojů automaticky na úrovni agenta. Na rozdíl od parametru max_tool_iterations AutoGen agenti Agent Framework pokračují ve spouštění nástrojů až do dokončení ve výchozím nastavení s integrovanými bezpečnostními mechanismy, které brání nekonečným smyčkám.
Podpora serveru MCP
Pro pokročilou integraci nástrojů podporují obě architektury protokol MCP (Model Context Protocol), které umožňují agentům pracovat s externími službami a zdroji dat. Agent Framework poskytuje komplexnější integrovanou podporu.
Podpora AutoGen MCP
AutoGen má základní podporu MCP prostřednictvím rozšíření (konkrétní podrobnosti implementace se liší podle verze).
Podpora AGENT FRAMEWORK MCP
from agent_framework import ChatAgent, MCPStdioTool, MCPStreamableHTTPTool, MCPWebsocketTool
from agent_framework.openai import OpenAIChatClient
# Create client for the example
client = OpenAIChatClient(model_id="gpt-5")
# Stdio MCP server
mcp_tool = MCPStdioTool(
name="filesystem",
command="uvx mcp-server-filesystem",
args=["/allowed/directory"]
)
# HTTP streaming MCP
http_mcp = MCPStreamableHTTPTool(
name="http_mcp",
url="http://localhost:8000/sse"
)
# WebSocket MCP
ws_mcp = MCPWebsocketTool(
name="websocket_mcp",
url="ws://localhost:8000/ws"
)
agent = ChatAgent(name="assistant", chat_client=client, tools=[mcp_tool])
Příklady MCP najdete tady:
- OpenAI s místním MCP – Použití MCPStreamableHTTPTool s OpenAI
- OpenAI s hostovaným MCP – Použití hostovaných služeb MCP
- Azure AI s místním MCP – Použití MCP s Azure AI
- Azure AI s hostovaným MCP – Použití hostovaného MCP s Azure AI
Vzor agenta jako nástroje
Jedním z výkonných vzorů je použití samotných agentů jako nástrojů, které umožňují hierarchické architektury agentů. Obě architektury podporují tento model s různými implementacemi.
AutoGen AgentTool
from autogen_agentchat.tools import AgentTool
# Create specialized agent
writer = AssistantAgent(
name="writer",
model_client=client,
system_message="You are a creative writer."
)
# Wrap as tool
writer_tool = AgentTool(agent=writer)
# Use in coordinator (requires disabling parallel tool calls)
coordinator_client = OpenAIChatCompletionClient(
model="gpt-5",
parallel_tool_calls=False
)
coordinator = AssistantAgent(
name="coordinator",
model_client=coordinator_client,
tools=[writer_tool]
)
Agent Framework as_tool()
from agent_framework import ChatAgent
# Assume we have client from previous examples
# Create specialized agent
writer = ChatAgent(
name="writer",
chat_client=client,
instructions="You are a creative writer."
)
# Convert to tool
writer_tool = writer.as_tool(
name="creative_writer",
description="Generate creative content",
arg_name="request",
arg_description="What to write"
)
# Use in coordinator
coordinator = ChatAgent(
name="coordinator",
chat_client=client,
tools=[writer_tool]
)
Poznámka k explicitní migraci: V autogenu nastavte parallel_tool_calls=False klienta modelu koordinátora při zabalení agentů jako nástroje, aby nedocházelo k problémům se souběžností při vyvolání stejné instance agenta.
V rozhraní Agent Framework nevyžaduje zakázání paralelních volání nástrojů, as_tool() protože agenti jsou ve výchozím nastavení bezstavové.
Middleware (funkce rozhraní agenta)
Agent Framework zavádí funkce middlewaru, které AutoGen nemá. Middleware umožňuje výkonné průřezové aspekty, jako je protokolování, zabezpečení a monitorování výkonu.
from agent_framework import ChatAgent, AgentRunContext, FunctionInvocationContext
from typing import Callable, Awaitable
# Assume we have client from previous examples
async def logging_middleware(
context: AgentRunContext,
next: Callable[[AgentRunContext], Awaitable[None]]
) -> None:
print(f"Agent {context.agent.name} starting")
await next(context)
print(f"Agent {context.agent.name} completed")
async def security_middleware(
context: FunctionInvocationContext,
next: Callable[[FunctionInvocationContext], Awaitable[None]]
) -> None:
if "password" in str(context.arguments):
print("Blocking function call with sensitive data")
return # Don't call next()
await next(context)
agent = ChatAgent(
name="secure_agent",
chat_client=client,
middleware=[logging_middleware, security_middleware]
)
Výhody:
- Zabezpečení: Ověřování vstupu a filtrování obsahu
- Pozorovatelnost: Protokolování, metriky a trasování
- Výkon: Ukládání do mezipaměti a omezování rychlosti
- Zpracování chyb: Řádné snížení výkonu a logika opakování
Podrobné příklady middlewaru najdete tady:
- Middleware založený na funkcích – middleware jednoduchých funkcí
- Middleware založený na třídách – objektově orientovaný middleware
- Middleware zpracování výjimek – Vzory zpracování chyb
- Middleware pro sdílený stav – Správa stavu napříč agenty
Upravitelní agenti
Někdy nechcete vůbec agenta založeného na modelu – chcete deterministický agent nebo agent založený na rozhraní API s vlastní logikou. Obě architektury podporují vytváření vlastních agentů, ale vzory se liší.
AutoGen: Podtřída BaseChatAgent
from typing import Sequence
from autogen_agentchat.agents import BaseChatAgent
from autogen_agentchat.base import Response
from autogen_agentchat.messages import BaseChatMessage, TextMessage, StopMessage
from autogen_core import CancellationToken
class StaticAgent(BaseChatAgent):
def __init__(self, name: str = "static", description: str = "Static responder") -> None:
super().__init__(name, description)
@property
def produced_message_types(self) -> Sequence[type[BaseChatMessage]]: # Which message types this agent produces
return (TextMessage,)
async def on_messages(self, messages: Sequence[BaseChatMessage], cancellation_token: CancellationToken) -> Response:
# Always return a static response
return Response(chat_message=TextMessage(content="Hello from AutoGen custom agent", source=self.name))
Notes:
- Implementujte
on_messages(...)a vraťte zprávu chatuResponse. - Volitelně můžete implementovat
on_reset(...)pro vymazání interního stavu mezi spuštěními.
Architektura agenta: Rozšíření baseagentu (s podporou vláken)
from collections.abc import AsyncIterable
from typing import Any
from agent_framework import (
AgentRunResponse,
AgentRunResponseUpdate,
AgentThread,
BaseAgent,
ChatMessage,
Role,
TextContent,
)
class StaticAgent(BaseAgent):
async def run(
self,
messages: str | ChatMessage | list[str] | list[ChatMessage] | None = None,
*,
thread: AgentThread | None = None,
**kwargs: Any,
) -> AgentRunResponse:
# Build a static reply
reply = ChatMessage(role=Role.ASSISTANT, contents=[TextContent(text="Hello from AF custom agent")])
# Persist conversation to the provided AgentThread (if any)
if thread is not None:
normalized = self._normalize_messages(messages)
await self._notify_thread_of_new_messages(thread, normalized, reply)
return AgentRunResponse(messages=[reply])
async def run_stream(
self,
messages: str | ChatMessage | list[str] | list[ChatMessage] | None = None,
*,
thread: AgentThread | None = None,
**kwargs: Any,
) -> AsyncIterable[AgentRunResponseUpdate]:
# Stream the same static response in a single chunk for simplicity
yield AgentRunResponseUpdate(contents=[TextContent(text="Hello from AF custom agent")], role=Role.ASSISTANT)
# Notify thread of input and the complete response once streaming ends
if thread is not None:
reply = ChatMessage(role=Role.ASSISTANT, contents=[TextContent(text="Hello from AF custom agent")])
normalized = self._normalize_messages(messages)
await self._notify_thread_of_new_messages(thread, normalized, reply)
Notes:
-
AgentThreadudržuje stav konverzace externě; aagent.get_new_thread()předejte horun/run_stream. - Volání
self._notify_thread_of_new_messages(thread, input_messages, response_messages)tak, aby vlákno má obě strany výměny. - Podívejte se na úplnou ukázku: Vlastní agent
V dalším kroku se podíváme na orchestraci s více agenty – oblast, ve které se architektury liší nejvíce.
Mapování funkcí s více agenty
Přehled programovacího modelu
Programovací modely s více agenty představují nejvýznamnější rozdíl mezi dvěma architekturami.
Přístup k duálnímu modelu AutoGenu
AutoGen poskytuje dva programovací modely:
-
autogen-core: Nízkoúrovňové programování řízené událostmi aRoutedAgentodběry zpráv -
Teamabstrakce: základní model orientovaný na běh založený naautogen-core
# Low-level autogen-core (complex)
class MyAgent(RoutedAgent):
@message_handler
async def handle_message(self, message: TextMessage, ctx: MessageContext) -> None:
# Handle specific message types
pass
# High-level Team (easier but limited)
team = RoundRobinGroupChat(
participants=[agent1, agent2],
termination_condition=StopAfterNMessages(5)
)
result = await team.run(task="Collaborate on this task")
Výzvy:
- Model nízké úrovně je pro většinu uživatelů příliš složitý.
- Model vysoké úrovně může být omezen na komplexní chování.
- Přemostění mezi těmito dvěma modely zvyšuje složitost implementace.
Model jednotného pracovního postupu v rámci agenta
Agent Framework poskytuje jednu Workflow abstrakci, která kombinuje nejlepší z obou přístupů:
from agent_framework import WorkflowBuilder, executor, WorkflowContext
from typing_extensions import Never
# Assume we have agent1 and agent2 from previous examples
@executor(id="agent1")
async def agent1_executor(input_msg: str, ctx: WorkflowContext[str]) -> None:
response = await agent1.run(input_msg)
await ctx.send_message(response.text)
@executor(id="agent2")
async def agent2_executor(input_msg: str, ctx: WorkflowContext[Never, str]) -> None:
response = await agent2.run(input_msg)
await ctx.yield_output(response.text) # Final output
# Build typed data flow graph
workflow = (WorkflowBuilder()
.add_edge(agent1_executor, agent2_executor)
.set_start_executor(agent1_executor)
.build())
# Example usage (would be in async context)
# result = await workflow.run("Initial input")
Podrobné příklady pracovních postupů najdete tady:
- Základy pracovního postupu – úvod do exekutorů a hran
- Agenti v pracovním postupu – Integrace agentů v pracovních postupech
- Streamování pracovního postupu – provádění pracovního postupu v reálném čase
Výhody:
- Jednotný model: Jedna abstrakce pro všechny úrovně složitosti
- Bezpečnost typů: Vstupy a výstupy silného typu
- Vizualizace grafu: Vymazání reprezentace toku dat
- Flexibilní složení: Mix agentů, funkcí a dílčích pracovních postupů
Pracovní postup vs. GraphFlow
Abstrakce rozhraní Agent Framework Workflow je inspirovaná experimentální GraphFlow funkcí AutoGenu, ale představuje významný vývoj ve filozofie návrhu:
- GraphFlow: Tok řízení založený na tom, kde hrany jsou přechody a zprávy jsou vysílány všem agentům; přechody jsou podmíněny obsahem všesměrové zprávy.
- Pracovní postup: Tok dat založený na tom, kde jsou zprávy směrovány přes konkrétní hrany a exekutory jsou aktivovány hrany s podporou souběžného spuštění.
Přehled vizuálu
Následující diagram kontrastuje s tokem řízení a tokem AutoGen GraphFlow (vlevo) s pracovním postupem toku dat v rozhraní Agent Framework (vpravo). GraphFlow modeluje agenty jako uzly s podmíněnými přechody a všesměrovými vysíláními. Exekutory modelů pracovních postupů (agenti, funkce nebo dílčí pracovní postupy) propojené podle zadaných hran; podporuje také pozastavení požadavků a odpovědí a vytváření kontrolních bodů.
flowchart LR
subgraph AutoGenGraphFlow
direction TB
U[User / Task] --> A[Agent A]
A -->|success| B[Agent B]
A -->|retry| C[Agent C]
A -. broadcast .- B
A -. broadcast .- C
end
subgraph AgentFrameworkWorkflow
direction TB
I[Input] --> E1[Executor 1]
E1 -->|"str"| E2[Executor 2]
E1 -->|"image"| E3[Executor 3]
E3 -->|"str"| E2
E2 --> OUT[(Final Output)]
end
R[Request / Response Gate]
E2 -. request .-> R
R -. resume .-> E2
CP[Checkpoint]
E1 -. save .-> CP
CP -. load .-> E1
V praxi:
- GraphFlow používá agenty jako uzly a všesměrové zprávy; hrany představují podmíněné přechody.
- Trasy pracovního postupu zadané zprávy podél okrajů Uzly (exekutory) můžou být agenti, čisté funkce nebo dílčí pracovní postupy.
- Požadavek/odpověď umožňuje pozastavení pracovního postupu pro externí vstup; vytváření kontrolních bodů přetrvává a umožňuje obnovení.
Porovnání kódu
1) Sekvenční + podmíněný
# AutoGen GraphFlow (fluent builder) — writer → reviewer → editor (conditional)
from autogen_agentchat.agents import AssistantAgent
from autogen_agentchat.teams import DiGraphBuilder, GraphFlow
writer = AssistantAgent(name="writer", description="Writes a draft", model_client=client)
reviewer = AssistantAgent(name="reviewer", description="Reviews the draft", model_client=client)
editor = AssistantAgent(name="editor", description="Finalizes the draft", model_client=client)
graph = (
DiGraphBuilder()
.add_node(writer).add_node(reviewer).add_node(editor)
.add_edge(writer, reviewer) # always
.add_edge(reviewer, editor, condition=lambda msg: "approve" in msg.to_model_text())
.set_entry_point(writer)
).build()
team = GraphFlow(participants=[writer, reviewer, editor], graph=graph)
result = await team.run(task="Draft a short paragraph about solar power")
# Agent Framework Workflow — sequential executors with conditional logic
from agent_framework import WorkflowBuilder, executor, WorkflowContext
from typing_extensions import Never
@executor(id="writer")
async def writer_exec(task: str, ctx: WorkflowContext[str]) -> None:
await ctx.send_message(f"Draft: {task}")
@executor(id="reviewer")
async def reviewer_exec(draft: str, ctx: WorkflowContext[str]) -> None:
decision = "approve" if "solar" in draft.lower() else "revise"
await ctx.send_message(f"{decision}:{draft}")
@executor(id="editor")
async def editor_exec(msg: str, ctx: WorkflowContext[Never, str]) -> None:
if msg.startswith("approve:"):
await ctx.yield_output(msg.split(":", 1)[1])
else:
await ctx.yield_output("Needs revision")
workflow_seq = (
WorkflowBuilder()
.add_edge(writer_exec, reviewer_exec)
.add_edge(reviewer_exec, editor_exec)
.set_start_executor(writer_exec)
.build()
)
2) Fan-out + Join (ALL vs. ANY)
# AutoGen GraphFlow — A → (B, C) → D with ALL/ANY join
from autogen_agentchat.teams import DiGraphBuilder, GraphFlow
A, B, C, D = agent_a, agent_b, agent_c, agent_d
# ALL (default): D runs after both B and C
g_all = (
DiGraphBuilder()
.add_node(A).add_node(B).add_node(C).add_node(D)
.add_edge(A, B).add_edge(A, C)
.add_edge(B, D).add_edge(C, D)
.set_entry_point(A)
).build()
# ANY: D runs when either B or C completes
g_any = (
DiGraphBuilder()
.add_node(A).add_node(B).add_node(C).add_node(D)
.add_edge(A, B).add_edge(A, C)
.add_edge(B, D, activation_group="join_d", activation_condition="any")
.add_edge(C, D, activation_group="join_d", activation_condition="any")
.set_entry_point(A)
).build()
# Agent Framework Workflow — A → (B, C) → aggregator (ALL vs ANY)
from agent_framework import WorkflowBuilder, executor, WorkflowContext
from typing_extensions import Never
@executor(id="A")
async def start(task: str, ctx: WorkflowContext[str]) -> None:
await ctx.send_message(f"B:{task}", target_id="B")
await ctx.send_message(f"C:{task}", target_id="C")
@executor(id="B")
async def branch_b(text: str, ctx: WorkflowContext[str]) -> None:
await ctx.send_message(f"B_done:{text}")
@executor(id="C")
async def branch_c(text: str, ctx: WorkflowContext[str]) -> None:
await ctx.send_message(f"C_done:{text}")
@executor(id="join_any")
async def join_any(msg: str, ctx: WorkflowContext[Never, str]) -> None:
await ctx.yield_output(f"First: {msg}") # ANY join (first arrival)
@executor(id="join_all")
async def join_all(msg: str, ctx: WorkflowContext[str, str]) -> None:
state = await ctx.get_executor_state() or {"items": []}
state["items"].append(msg)
await ctx.set_executor_state(state)
if len(state["items"]) >= 2:
await ctx.yield_output(" | ".join(state["items"])) # ALL join
wf_any = (
WorkflowBuilder()
.add_edge(start, branch_b).add_edge(start, branch_c)
.add_edge(branch_b, join_any).add_edge(branch_c, join_any)
.set_start_executor(start)
.build()
)
wf_all = (
WorkflowBuilder()
.add_edge(start, branch_b).add_edge(start, branch_c)
.add_edge(branch_b, join_all).add_edge(branch_c, join_all)
.set_start_executor(start)
.build()
)
3) Cílené směrování (bez vysílání)
from agent_framework import WorkflowBuilder, executor, WorkflowContext
from typing_extensions import Never
@executor(id="ingest")
async def ingest(task: str, ctx: WorkflowContext[str]) -> None:
# Route selectively using target_id
if task.startswith("image:"):
await ctx.send_message(task.removeprefix("image:"), target_id="vision")
else:
await ctx.send_message(task, target_id="writer")
@executor(id="writer")
async def write(text: str, ctx: WorkflowContext[Never, str]) -> None:
await ctx.yield_output(f"Draft: {text}")
@executor(id="vision")
async def caption(image_ref: str, ctx: WorkflowContext[Never, str]) -> None:
await ctx.yield_output(f"Caption: {image_ref}")
workflow = (
WorkflowBuilder()
.add_edge(ingest, write)
.add_edge(ingest, caption)
.set_start_executor(ingest)
.build()
)
# Example usage (async):
# await workflow.run("Summarize the benefits of solar power")
# await workflow.run("image:https://example.com/panel.jpg")
Co si všimnout:
- GraphFlow vysílá zprávy a používá podmíněné přechody. Chování spojení se konfiguruje přes cílovou stranu
activationa každou hranuactivation_group/activation_condition(například seskupte obě hrany s ).join_dactivation_condition="any" - Pracovní postup směruje data explicitně; slouží
target_idk výběru podřízených exekutorů. Chování spojení se nachází v přijímajícím exekutoru (například výnos při prvním vstupu vs. čekání na všechny) nebo prostřednictvím tvůrce/agregátorů orchestrace. - Exekutory v pracovním postupu jsou volné: zalomení
ChatAgent, funkce nebo dílčího pracovního postupu a jejich kombinace v rámci stejného grafu.
Klíčové rozdíly
Následující tabulka shrnuje základní rozdíly mezi graphflow a pracovním postupem agenta AutoGen:
| Aspekt | AutoGen GraphFlow | Pracovní postup agenta Framework |
|---|---|---|
| Typ toku | Tok řízení (hrany jsou přechody) | Tok dat (okraje směrují zprávy) |
| Typy uzlů | Pouze agenti | Agenti, funkce, dílčí pracovní postupy |
| Aktivace | Vysílání zpráv | Aktivace na základě hraničních zařízení |
| Bezpečnost typů | Omezený | Silné psaní v celém textu |
| Kompozičnost | Omezený | Vysoce kompozibilní |
Vnoření vzorů
Automatické vnoření týmu
# Inner team
inner_team = RoundRobinGroupChat(
participants=[specialist1, specialist2],
termination_condition=StopAfterNMessages(3)
)
# Outer team with nested team as participant
outer_team = RoundRobinGroupChat(
participants=[coordinator, inner_team, reviewer], # Team as participant
termination_condition=StopAfterNMessages(10)
)
# Messages are broadcasted to all participants including nested team
result = await outer_team.run("Complex task requiring collaboration")
Vlastnosti vnoření automatického genu:
- Vnořený tým přijímá všechny zprávy od vnějšího týmu.
- Vnořené týmové zprávy se odsílají všem účastníkům vnějšího týmu.
- Kontext sdílené zprávy napříč všemi úrovněmi
Vnoření pracovních postupů v rozhraní Agent Framework
from agent_framework import WorkflowExecutor, WorkflowBuilder
# Assume we have executors from previous examples
# specialist1_executor, specialist2_executor, coordinator_executor, reviewer_executor
# Create sub-workflow
sub_workflow = (WorkflowBuilder()
.add_edge(specialist1_executor, specialist2_executor)
.set_start_executor(specialist1_executor)
.build())
# Wrap as executor
sub_workflow_executor = WorkflowExecutor(
workflow=sub_workflow,
id="sub_process"
)
# Use in parent workflow
parent_workflow = (WorkflowBuilder()
.add_edge(coordinator_executor, sub_workflow_executor)
.add_edge(sub_workflow_executor, reviewer_executor)
.set_start_executor(coordinator_executor)
.build())
Vlastnosti vnořování rozhraní Agent Framework:
- Izolovaný vstup/výstup prostřednictvím
WorkflowExecutor - Žádné vysílání zpráv – data procházejí konkrétními připojeními
- Nezávislá správa stavu pro každou úroveň pracovního postupu
Vzory skupinového chatu
Vzory skupinového chatu umožňují více agentům spolupracovat na složitých úlohách. Tady je postup, jak se mezi architekturami překládají běžné vzory.
Model RoundRobinGroupChat
Implementace autogenu:
from autogen_agentchat.teams import RoundRobinGroupChat
from autogen_agentchat.conditions import StopAfterNMessages
team = RoundRobinGroupChat(
participants=[agent1, agent2, agent3],
termination_condition=StopAfterNMessages(10)
)
result = await team.run("Discuss this topic")
Implementace agenta Framework:
from agent_framework import SequentialBuilder, WorkflowOutputEvent
# Assume we have agent1, agent2, agent3 from previous examples
# Sequential workflow through participants
workflow = SequentialBuilder().participants([agent1, agent2, agent3]).build()
# Example usage (would be in async context)
async def sequential_example():
# Each agent appends to shared conversation
async for event in workflow.run_stream("Discuss this topic"):
if isinstance(event, WorkflowOutputEvent):
conversation_history = event.data # list[ChatMessage]
Podrobné příklady orchestrace najdete tady:
- Sekvenční agenti – provádění agenta ve stylu kruhového dotazování
- Sekvenční vlastní exekutory – vzory vlastních exekutorů
Pro vzory souběžného spouštění poskytuje rozhraní Agent Framework také:
from agent_framework import ConcurrentBuilder, WorkflowOutputEvent
# Assume we have agent1, agent2, agent3 from previous examples
# Concurrent workflow for parallel processing
workflow = (ConcurrentBuilder()
.participants([agent1, agent2, agent3])
.build())
# Example usage (would be in async context)
async def concurrent_example():
# All agents process the input concurrently
async for event in workflow.run_stream("Process this in parallel"):
if isinstance(event, WorkflowOutputEvent):
results = event.data # Combined results from all agents
Příklady souběžného spouštění najdete tady:
- Souběžní agenti – paralelní spouštění agentů
- Souběžné vlastní exekutory – vlastní paralelní vzory
- Souběžné s vlastním agregátorem – vzorce agregace výsledků
MagenticOneGroupChat Pattern
Implementace autogenu:
from autogen_agentchat.teams import MagenticOneGroupChat
team = MagenticOneGroupChat(
participants=[researcher, coder, executor],
model_client=coordinator_client,
termination_condition=StopAfterNMessages(20)
)
result = await team.run("Complex research and analysis task")
Implementace agenta Framework:
from agent_framework import (
MagenticBuilder, MagenticCallbackMode, WorkflowOutputEvent,
MagenticCallbackEvent, MagenticOrchestratorMessageEvent, MagenticAgentDeltaEvent
)
# Assume we have researcher, coder, and coordinator_client from previous examples
async def on_event(event: MagenticCallbackEvent) -> None:
if isinstance(event, MagenticOrchestratorMessageEvent):
print(f"[ORCHESTRATOR]: {event.message.text}")
elif isinstance(event, MagenticAgentDeltaEvent):
print(f"[{event.agent_id}]: {event.text}", end="")
workflow = (MagenticBuilder()
.participants(researcher=researcher, coder=coder)
.on_event(on_event, mode=MagenticCallbackMode.STREAMING)
.with_standard_manager(
chat_client=coordinator_client,
max_round_count=20,
max_stall_count=3,
max_reset_count=2
)
.build())
# Example usage (would be in async context)
async def magentic_example():
async for event in workflow.run_stream("Complex research task"):
if isinstance(event, WorkflowOutputEvent):
final_result = event.data
Možnosti přizpůsobení rozhraní Agent Framework:
Magentický pracovní postup poskytuje rozsáhlé možnosti přizpůsobení:
- Konfigurace správce: Vlastní modely orchestrátoru a výzvy
-
Limity zaokrouhlení:
max_round_count,max_stall_countmax_reset_count - Zpětná volání událostí: Streamování v reálném čase s podrobným filtrováním událostí
- Specializace agenta: Vlastní instrukce a nástroje na agenta
-
Režimy zpětného volání:
STREAMINGaktualizace v reálném čase neboBATCHkonečné výsledky - Plánování smyček pro člověka: Vlastní funkce plánovače pro interaktivní pracovní postupy
# Advanced customization example with human-in-the-loop
from agent_framework.openai import OpenAIChatClient
from agent_framework import MagenticBuilder, MagenticCallbackMode, MagenticPlannerContext
# Assume we have researcher_agent, coder_agent, analyst_agent, detailed_event_handler
# and get_human_input function defined elsewhere
async def custom_planner(context: MagenticPlannerContext) -> str:
"""Custom planner with human input for critical decisions."""
if context.round_count > 5:
# Request human input for complex decisions
return await get_human_input(f"Next action for: {context.current_state}")
return "Continue with automated planning"
workflow = (MagenticBuilder()
.participants(
researcher=researcher_agent,
coder=coder_agent,
analyst=analyst_agent
)
.with_standard_manager(
chat_client=OpenAIChatClient(model_id="gpt-5"),
max_round_count=15, # Limit total rounds
max_stall_count=2, # Prevent infinite loops
max_reset_count=1, # Allow one reset on failure
orchestrator_prompt="Custom orchestration instructions..."
)
.with_planner(custom_planner) # Human-in-the-loop planning
.on_event(detailed_event_handler, mode=MagenticCallbackMode.STREAMING)
.build())
Podrobné příklady magentického zobrazení najdete tady:
- Základní magentický pracovní postup – standardní orchestrovaný pracovní postup s více agenty
- Magentic with Checkpointing – trvalé orchestrované pracovní postupy
- Aktualizace magentického lidského plánu – plánování human-in-the-loop
Budoucí vzory
Plán nasazení agenta zahrnuje několik vzorů AutoGen, které jsou aktuálně ve vývoji:
- Vzor Swarm: Koordinace agentů založená na předávání
- SelectorGroupChat: Výběr mluvčího řízeného LLM
Human-in-the-Loop with Request Response
Klíčovou novou funkcí v architektuře Workflow Agent Framework je koncept požadavku a odpovědi, která umožňuje pracovním postupům pozastavit provádění a čekat na externí vstup, než budete pokračovat. Tato funkce není k dispozici v abstrakci AutoGenu Team a umožňuje sofistikované vzory human-in-the-loop.
Omezení autogenu
Abstrakce AutoGen Team se spouští nepřetržitě po spuštění a neposkytuje integrované mechanismy pro pozastavení provádění pro lidské vstupy. Všechny funkce typu human-in-the-loop vyžadují vlastní implementace mimo architekturu.
Rozhraní API Request-Response agenta
Agent Framework poskytuje integrované možnosti odezvy požadavků, kde jakýkoli exekutor může odesílat požadavky pomocí ctx.request_info() dekorátoru @response_handler a zpracovávat odpovědi.
from agent_framework import (
RequestInfoEvent, WorkflowBuilder, WorkflowContext,
Executor, handler, response_handler
)
from dataclasses import dataclass
# Assume we have agent_executor defined elsewhere
# Define typed request payload
@dataclass
class ApprovalRequest:
"""Request human approval for agent output."""
content: str = ""
agent_name: str = ""
# Workflow executor that requests human approval
class ReviewerExecutor(Executor):
@handler
async def review_content(
self,
agent_response: str,
ctx: WorkflowContext
) -> None:
# Request human input with structured data
approval_request = ApprovalRequest(
content=agent_response,
agent_name="writer_agent"
)
await ctx.request_info(request_data=approval_request, response_type=str)
@response_handler
async def handle_approval_response(
self,
original_request: ApprovalRequest,
decision: str,
ctx: WorkflowContext
) -> None:
decision_lower = decision.strip().lower()
original_content = original_request.content
if decision_lower == "approved":
await ctx.yield_output(f"APPROVED: {original_content}")
else:
await ctx.yield_output(f"REVISION NEEDED: {decision}")
# Build workflow with human-in-the-loop
reviewer = ReviewerExecutor(id="reviewer")
workflow = (WorkflowBuilder()
.add_edge(agent_executor, reviewer)
.set_start_executor(agent_executor)
.build())
Spouštění pracovních postupů human-in-the-loop
Agent Framework poskytuje rozhraní API pro streamování pro zpracování cyklu pozastavení a obnovení:
from agent_framework import RequestInfoEvent, WorkflowOutputEvent
# Assume we have workflow defined from previous examples
async def run_with_human_input():
pending_responses = None
completed = False
while not completed:
# First iteration uses run_stream, subsequent use send_responses_streaming
stream = (
workflow.send_responses_streaming(pending_responses)
if pending_responses
else workflow.run_stream("initial input")
)
events = [event async for event in stream]
pending_responses = None
# Collect human requests and outputs
for event in events:
if isinstance(event, RequestInfoEvent):
# Display request to human and collect response
request_data = event.data # ApprovalRequest instance
print(f"Review needed: {request_data.content}")
human_response = input("Enter 'approved' or revision notes: ")
pending_responses = {event.request_id: human_response}
elif isinstance(event, WorkflowOutputEvent):
print(f"Final result: {event.data}")
completed = True
Příklady pracovních postupů human-in-the-loop najdete tady:
- Hádání hry s lidským vstupem – interaktivní pracovní postup s zpětnou vazbou uživatelů
- Pracovní postup jako agent s lidským vstupem – vnořené pracovní postupy s lidskou interakcí
Vytváření kontrolních bodů a obnovení pracovních postupů
Další klíčovou výhodou architektury Agent Framework Workflow oproti abstrakci AutoGen Team je integrovaná podpora pro vytváření kontrolních bodů a obnovení provádění. To umožňuje pozastavit, zachovat a obnovit pracovní postupy později z jakéhokoli kontrolního bodu a zajistit odolnost proti chybám a povolit dlouhotrvající nebo asynchronní pracovní postupy.
Omezení autogenu
Abstrakce AutoGen Team neposkytuje integrované možnosti vytváření kontrolních bodů. Všechny mechanismy trvalosti nebo obnovení musí být implementovány externě, často vyžadují složitou správu stavu a logiku serializace.
Vytváření kontrolních bodů v rozhraní Agent Framework
Agent Framework poskytuje komplexní kontrolní body prostřednictvím FileCheckpointStorage a metodu with_checkpointing() na WorkflowBuilder. Zachytávání kontrolních bodů:
-
Stav exekutoru: Místní stav pro každý exekutor pomocí
ctx.set_executor_state() -
Sdílený stav: Stav křížového exekutoru pomocí
ctx.set_shared_state() - Fronty zpráv: Čekající zprávy mezi exekutory
- Pozice pracovního postupu: Aktuální průběh provádění a další kroky
from agent_framework import (
FileCheckpointStorage, WorkflowBuilder, WorkflowContext,
Executor, handler
)
from typing_extensions import Never
class ProcessingExecutor(Executor):
@handler
async def process(self, data: str, ctx: WorkflowContext[str]) -> None:
# Process the data
result = f"Processed: {data.upper()}"
print(f"Processing: '{data}' -> '{result}'")
# Persist executor-local state
prev_state = await ctx.get_executor_state() or {}
count = prev_state.get("count", 0) + 1
await ctx.set_executor_state({
"count": count,
"last_input": data,
"last_output": result
})
# Persist shared state for other executors
await ctx.set_shared_state("original_input", data)
await ctx.set_shared_state("processed_output", result)
await ctx.send_message(result)
class FinalizeExecutor(Executor):
@handler
async def finalize(self, data: str, ctx: WorkflowContext[Never, str]) -> None:
result = f"Final: {data}"
await ctx.yield_output(result)
# Configure checkpoint storage
checkpoint_storage = FileCheckpointStorage(storage_path="./checkpoints")
processing_executor = ProcessingExecutor(id="processing")
finalize_executor = FinalizeExecutor(id="finalize")
# Build workflow with checkpointing enabled
workflow = (WorkflowBuilder()
.add_edge(processing_executor, finalize_executor)
.set_start_executor(processing_executor)
.with_checkpointing(checkpoint_storage=checkpoint_storage) # Enable checkpointing
.build())
# Example usage (would be in async context)
async def checkpoint_example():
# Run workflow - checkpoints are created automatically
async for event in workflow.run_stream("input data"):
print(f"Event: {event}")
Obnovení z kontrolních bodů
Agent Framework poskytuje rozhraní API pro výpis, kontrolu a obnovení z konkrétních kontrolních bodů:
from typing_extensions import Never
from agent_framework import (
Executor,
FileCheckpointStorage,
WorkflowContext,
WorkflowBuilder,
get_checkpoint_summary,
handler,
)
class UpperCaseExecutor(Executor):
@handler
async def process(self, text: str, ctx: WorkflowContext[str]) -> None:
result = text.upper()
await ctx.send_message(result)
class ReverseExecutor(Executor):
@handler
async def process(self, text: str, ctx: WorkflowContext[Never, str]) -> None:
result = text[::-1]
await ctx.yield_output(result)
def create_workflow(checkpoint_storage: FileCheckpointStorage):
"""Create a workflow with two executors and checkpointing."""
upper_executor = UpperCaseExecutor(id="upper")
reverse_executor = ReverseExecutor(id="reverse")
return (WorkflowBuilder()
.add_edge(upper_executor, reverse_executor)
.set_start_executor(upper_executor)
.with_checkpointing(checkpoint_storage=checkpoint_storage)
.build())
# Assume we have checkpoint_storage from previous examples
checkpoint_storage = FileCheckpointStorage(storage_path="./checkpoints")
async def checkpoint_resume_example():
# List available checkpoints
checkpoints = await checkpoint_storage.list_checkpoints()
# Display checkpoint information
for checkpoint in checkpoints:
summary = get_checkpoint_summary(checkpoint)
print(f"Checkpoint {summary.checkpoint_id}: iteration={summary.iteration_count}")
# Resume from a specific checkpoint
if checkpoints:
chosen_checkpoint_id = checkpoints[0].checkpoint_id
# Create new workflow instance and resume
new_workflow = create_workflow(checkpoint_storage)
async for event in new_workflow.run_stream(
checkpoint_id=chosen_checkpoint_id,
checkpoint_storage=checkpoint_storage
):
print(f"Resumed event: {event}")
Pokročilé funkce kontrolních bodů
Kontrolní bod s integrací human-in-the-loop:
Vytváření kontrolních bodů bez problémů funguje s pracovními postupy lidské in-the-smyčky, což umožňuje pozastavit pracovní postupy pro vstup člověka a obnovit je později. Při obnovení z kontrolního bodu, který obsahuje čekající požadavky, se tyto žádosti znovu vygenerují jako události:
# Assume we have workflow, checkpoint_id, and checkpoint_storage from previous examples
async def resume_with_pending_requests_example():
# Resume from checkpoint - pending requests will be re-emitted
request_info_events = []
async for event in workflow.run_stream(
checkpoint_id=checkpoint_id,
checkpoint_storage=checkpoint_storage
):
if isinstance(event, RequestInfoEvent):
request_info_events.append(event)
# Handle re-emitted pending request
responses = {}
for event in request_info_events:
response = handle_request(event.data)
responses[event.request_id] = response
# Send response back to workflow
async for event in workflow.send_responses_streaming(responses):
print(f"Event: {event}")
Klíčové výhody
V porovnání s funkcí AutoGen poskytuje kontrolní body rozhraní Agent Framework:
- Automatická trvalost: Nevyžaduje se žádná ruční správa stavu
- Podrobné obnovení: Obnovení z libovolné hranice superkroku
- Izolace stavu: Samostatný exekutor – místní a sdílený stav
- Integrace mezi lidmi ve smyčce: Bezproblémové pozastavení a obnovení s lidským vstupem
- Odolnost proti chybám: Robustní obnovení ze selhání nebo přerušení
Praktické příklady
Komplexní příklady kontrolních bodů najdete tady:
- Kontrolní bod s životopisem – základní vytváření kontrolních bodů a interaktivní životopis
- Kontrolní bod se smyčkou Human-in-the-Loop – trvalé pracovní postupy s branami pro schvalování lidí
- Kontrolní bod dílčího pracovního postupu – Vytváření vnořených pracovních postupů
- Magentický kontrolní bod – vytváření kontrolních bodů orchestrovaných pracovních postupů s více agenty
Observability
Funkce AutoGen i Agent Framework poskytují možnosti pozorovatelnosti, ale s různými přístupy a funkcemi.
Pozorovatelnost automatickéhogenu
AutoGen má nativní podporu OpenTelemetry s instrumentací pro:
-
Trasování za běhu:
SingleThreadedAgentRuntimeaGrpcWorkerAgentRuntime -
Provádění nástrojů:
BaseToolsexecute_toolvyužitím sémantických konvencí GenAI -
Operace agenta:
BaseChatAgentscreate_agentrozsahy ainvoke_agentrozsahy
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from autogen_core import SingleThreadedAgentRuntime
# Configure OpenTelemetry
tracer_provider = TracerProvider()
trace.set_tracer_provider(tracer_provider)
# Pass to runtime
runtime = SingleThreadedAgentRuntime(tracer_provider=tracer_provider)
Pozorovatelnost v rozhraní Agent Framework
Agent Framework poskytuje komplexní pozorovatelnost prostřednictvím více přístupů:
- Nastavení nulového kódu: Automatická instrumentace prostřednictvím proměnných prostředí
- Ruční konfigurace: Programové nastavení s vlastními parametry
- Bohatá telemetrie: Agenti, pracovní postupy a sledování spouštění nástrojů
- Výstup konzoly: Integrované protokolování konzoly a vizualizace
from agent_framework import ChatAgent
from agent_framework.observability import setup_observability
from agent_framework.openai import OpenAIChatClient
# Zero-code setup via environment variables
# Set ENABLE_OTEL=true
# Set OTLP_ENDPOINT=http://localhost:4317
# Or manual setup
setup_observability(
otlp_endpoint="http://localhost:4317"
)
# Create client for the example
client = OpenAIChatClient(model_id="gpt-5")
async def observability_example():
# Observability is automatically applied to all agents and workflows
agent = ChatAgent(name="assistant", chat_client=client)
result = await agent.run("Hello") # Automatically traced
Klíčové rozdíly:
- Složitost instalace: Architektura agenta nabízí jednodušší možnosti nastavení bez kódu
- Rozsah: Architektura agenta poskytuje širší pokrytí, včetně pozorovatelnosti na úrovni pracovního postupu
- Vizualizace: Architektura agenta zahrnuje integrovaný výstup konzoly a vývojové uživatelské rozhraní
- Konfigurace: Architektura agenta nabízí flexibilnější možnosti konfigurace
Podrobné příklady pozorovatelnosti najdete tady:
- Nastavení nulového kódu – konfigurace proměnné prostředí
- Ruční instalace – programová konfigurace
- Pozorovatelnost agenta – telemetrie jednoho agenta
- Pozorovatelnost pracovního postupu – trasování pracovních postupů s více agenty
Conclusion
Tento průvodce migrací poskytuje komplexní mapování mezi rozhraním AutoGen a Microsoft Agent Framework, které pokrývá vše od základního vytvoření agenta až po složité pracovní postupy s více agenty. Klíčové poznatky pro migraci:
- Migrace s jedním agentem je jednoduchá, s podobnými rozhraními API a rozšířenými možnostmi v rozhraní Agent Framework
- Vzory s více agenty vyžadují přemýšlení přístupu z architektur založených na událostech až po architektury založené na toku dat, ale pokud už graphFlow znáte, přechod bude jednodušší.
- Agent Framework nabízí další funkce, jako je middleware, hostované nástroje a typové pracovní postupy.
Další příklady a podrobné pokyny k implementaci najdete v adresáři ukázek rozhraní Agent Framework .
Další ukázkové kategorie
Agent Framework poskytuje ukázky v několika dalších důležitých oblastech:
- Vlákna: Ukázky vláken – Správa stavu a kontextu konverzace
- Multimodální vstup: Multimodální ukázky – Práce s obrázky a dalšími typy médií
- Zprostředkovatelé kontextu: Ukázky zprostředkovatele kontextu – vzory integrace externího kontextu