Remarque
L’accès à cette page nécessite une autorisation. Vous pouvez essayer de vous connecter ou de modifier des répertoires.
L’accès à cette page nécessite une autorisation. Vous pouvez essayer de modifier des répertoires.
Guide complet pour la migration d’AutoGen vers le Kit de développement logiciel (SDK) Microsoft Agent Framework Python.
Sommaire
- Arrière-plan
- Similitudes et différences clés
- Création et configuration du client de modèle
-
Mappage des fonctionnalitésSingle-Agent
- Création et exécution de l’agent de base
- Gestion de l’état de conversation avec AgentThread
- Équivalence de l’agent De l’Assistant OpenAI
- Prise en charge de la diffusion en continu
- Types de messages et création
- Création et intégration d’outils
- Outils hébergés (Agent Framework exclusif)
- Prise en charge du serveur MCP
- Modèle d’agent en tant qu’outil
- Intergiciel (fonctionnalité Agent Framework)
- Agents personnalisés
- Mappage de fonctionnalités multi-agent
- d’observabilité
- Conclusion
Contexte
AutoGen est une infrastructure permettant de créer des agents IA et des systèmes multi-agents à l’aide de modèles de langage volumineux (LLMs). Il a commencé en tant que projet de recherche chez Microsoft Research et a lancé plusieurs concepts dans l’orchestration multi-agent, comme GroupChat et le runtime d’agent piloté par les événements. Le projet a été une collaboration fructueuse de la communauté open source et de nombreuses fonctionnalités importantes proviennent de contributeurs externes.
Microsoft Agent Framework est un nouveau SDK multi-langage permettant de créer des agents et des flux de travail IA à l’aide de LLMs. Il représente une évolution significative des idées pionniers dans AutoGen et intègre les leçons tirées de l’utilisation réelle. Il est développé par les principales équipes AutoGen et Semantic Kernel chez Microsoft, et est conçu pour être une nouvelle base pour la création d’applications IA à l’avenir.
Ce guide décrit un parcours de migration pratique : il commence par couvrir ce qui reste le même et ce qui change en un clin d’œil. Ensuite, il couvre la configuration du client de modèle, les fonctionnalités à agent unique et enfin l’orchestration multi-agent avec du code concret côte à côte. En cours de route, les liens vers des exemples exécutables dans le référentiel Agent Framework vous aident à valider chaque étape.
Similitudes et différences clés
Ce qui reste le même
Les fondations sont familières. Vous créez toujours des agents autour d’un client de modèle, fournissez des instructions et joignez des outils. Les deux bibliothèques prennent en charge les outils de style de fonction, la diffusion en continu de jetons, le contenu modal et les E/S asynchrones.
# Both frameworks follow similar patterns
# AutoGen
agent = AssistantAgent(name="assistant", model_client=client, tools=[my_tool])
result = await agent.run(task="Help me with this task")
# Agent Framework
agent = ChatAgent(name="assistant", chat_client=client, tools=[my_tool])
result = await agent.run("Help me with this task")
Principales différences
Style d’orchestration : AutoGen associe un cœur piloté par les événements à un haut niveau
Team. Agent Framework se concentre sur une base de graphiqueWorkflowtypée qui achemine les données le long des arêtes et active les exécuteurs lorsque les entrées sont prêtes.Outils : AutoGen encapsule les fonctions avec
FunctionTool. Agent Framework utilise@ai_function, déduit automatiquement les schémas et ajoute des outils hébergés tels qu’un interpréteur de code et une recherche web.Comportement de l’agent :
AssistantAgentest à un seul tour, sauf si vous augmentezmax_tool_iterations.ChatAgentest multitour par défaut et continue d’appeler des outils jusqu’à ce qu’il puisse retourner une réponse finale.Runtime : AutoGen offre des runtimes distribués incorporés et expérimentaux. Agent Framework se concentre sur la composition à processus unique aujourd’hui ; l’exécution distribuée est planifiée.
Création et configuration du client de modèle
Les deux frameworks fournissent des clients de modèle pour les principaux fournisseurs d’IA, avec des API similaires, mais pas identiques.
| Caractéristique | AutoGen | Cadre d’agent |
|---|---|---|
| OpenAI Client | OpenAIChatCompletionClient |
OpenAIChatClient |
| Client réponses OpenAI | ❌ Non disponible | OpenAIResponsesClient |
| Azure OpenAI | AzureOpenAIChatCompletionClient |
AzureOpenAIChatClient |
| Réponses Azure OpenAI | ❌ Non disponible | AzureOpenAIResponsesClient |
| Azure AI | AzureAIChatCompletionClient |
AzureAIAgentClient |
| Anthropic | AnthropicChatCompletionClient |
🚧 Planifié |
| Ollama | OllamaChatCompletionClient |
🚧 Planifié |
| Mise en cache |
ChatCompletionCache enveloppe |
🚧 Planifié |
Clients de modèle AutoGen
from autogen_ext.models.openai import OpenAIChatCompletionClient, AzureOpenAIChatCompletionClient
# OpenAI
client = OpenAIChatCompletionClient(
model="gpt-5",
api_key="your-key"
)
# Azure OpenAI
client = AzureOpenAIChatCompletionClient(
azure_endpoint="https://your-endpoint.openai.azure.com/",
azure_deployment="gpt-5",
api_version="2024-12-01",
api_key="your-key"
)
Agent Framework ChatClients
from agent_framework.openai import OpenAIChatClient
from agent_framework.azure import AzureOpenAIChatClient
# OpenAI (reads API key from environment)
client = OpenAIChatClient(model_id="gpt-5")
# Azure OpenAI (uses environment or default credentials; see samples for auth options)
client = AzureOpenAIChatClient(model_id="gpt-5")
Pour obtenir des exemples détaillés, consultez :
- Client OpenAI Chat - Configuration du client OpenAI de base
- Client De conversation Azure OpenAI - Azure OpenAI avec authentification
- Client Azure AI - Intégration de l’agent Azure AI
Prise en charge de l’API Réponses (agent Framework exclusif)
Agent Framework AzureOpenAIResponsesClient et OpenAIResponsesClient fournir une prise en charge spécialisée pour les modèles de raisonnement et les réponses structurées non disponibles dans AutoGen :
from agent_framework.azure import AzureOpenAIResponsesClient
from agent_framework.openai import OpenAIResponsesClient
# Azure OpenAI with Responses API
azure_responses_client = AzureOpenAIResponsesClient(model_id="gpt-5")
# OpenAI with Responses API
openai_responses_client = OpenAIResponsesClient(model_id="gpt-5")
Pour obtenir des exemples d’API Réponses, consultez :
- Azure Responses Client Basic - Azure OpenAI avec des réponses
- OpenAI Responses Client Basic - Intégration des réponses OpenAI
Mappage des fonctionnalités Single-Agent
Cette section mappe les fonctionnalités d’un seul agent entre AutoGen et Agent Framework. Avec un client en place, créez un agent, attachez des outils et choisissez entre l’exécution en continu et la diffusion en continu.
Création et exécution de l’agent de base
Une fois que vous avez configuré un client de modèle, l’étape suivante consiste à créer des agents. Les deux frameworks fournissent des abstractions d’agent similaires, mais avec des comportements et des options de configuration différents.
Assistant AutoGenAgent
from autogen_agentchat.agents import AssistantAgent
agent = AssistantAgent(
name="assistant",
model_client=client,
system_message="You are a helpful assistant.",
tools=[my_tool],
max_tool_iterations=1 # Single-turn by default
)
# Execution
result = await agent.run(task="What's the weather?")
Agent Framework ChatAgent
from agent_framework import ChatAgent, ai_function
from agent_framework.openai import OpenAIChatClient
# Create simple tools for the example
@ai_function
def get_weather(location: str) -> str:
"""Get weather for a location."""
return f"Weather in {location}: sunny"
@ai_function
def get_time() -> str:
"""Get current time."""
return "Current time: 2:30 PM"
# Create client
client = OpenAIChatClient(model_id="gpt-5")
async def example():
# Direct creation
agent = ChatAgent(
name="assistant",
chat_client=client,
instructions="You are a helpful assistant.",
tools=[get_weather] # Multi-turn by default
)
# Factory method (more convenient)
agent = client.create_agent(
name="assistant",
instructions="You are a helpful assistant.",
tools=[get_weather]
)
# Execution with runtime tool configuration
result = await agent.run(
"What's the weather?",
tools=[get_time], # Can add tools at runtime
tool_choice="auto"
)
Principales différences :
-
Comportement par défaut :
ChatAgenteffectue automatiquement une itération par le biais d’appels d’outils, tandis queAssistantAgentnécessite un paramètre explicitemax_tool_iterations -
Configuration du runtime :
ChatAgent.run()acceptetoolsettool_choiceparamètres pour la personnalisation par appel - Méthodes de fabrique : Agent Framework fournit des méthodes de fabrique pratiques directement à partir de clients de conversation
-
Gestion de l’état :
ChatAgentsans état et ne conserve pas l’historique des conversations entre les appels, contrairement àAssistantAgentce qui maintient l’historique des conversations dans le cadre de son état
Gestion de l’état de conversation avec AgentThread
Pour poursuivre les conversations avec ChatAgent, utilisez cette opération AgentThread pour gérer l’historique des conversations :
# Assume we have an agent from previous examples
async def conversation_example():
# Create a new thread that will be reused
thread = agent.get_new_thread()
# First interaction - thread is empty
result1 = await agent.run("What's 2+2?", thread=thread)
print(result1.text) # "4"
# Continue conversation - thread contains previous messages
result2 = await agent.run("What about that number times 10?", thread=thread)
print(result2.text) # "40" (understands "that number" refers to 4)
# AgentThread can use external storage, similar to ChatCompletionContext in AutoGen
Sans état par défaut : démonstration rapide
# Without a thread (two independent invocations)
r1 = await agent.run("What's 2+2?")
print(r1.text) # for example, "4"
r2 = await agent.run("What about that number times 10?")
print(r2.text) # Likely ambiguous without prior context; cannot be "40"
# With a thread (shared context across calls)
thread = agent.get_new_thread()
print((await agent.run("What's 2+2?", thread=thread)).text) # "4"
print((await agent.run("What about that number times 10?", thread=thread)).text) # "40"
Pour obtenir des exemples de gestion des threads, consultez :
- Azure AI avec thread - Gestion de l’état de conversation
- Client de conversation OpenAI avec thread - Modèles d’utilisation des threads
- Threads soutenus par Redis - Persistance de l’état de conversation en externe
Équivalence de l’agent De l’Assistant OpenAI
Les deux frameworks fournissent l’intégration de l’API Assistant OpenAI :
# AutoGen OpenAIAssistantAgent
from autogen_ext.agents.openai import OpenAIAssistantAgent
# Agent Framework has OpenAI Assistants support via OpenAIAssistantsClient
from agent_framework.openai import OpenAIAssistantsClient
Pour obtenir des exemples d’Assistant OpenAI, consultez :
- OpenAI Assistants Basic - Configuration de l’Assistant de base
- Assistants OpenAI avec Function Tools - Intégration d’outils personnalisés
- Assistant Azure OpenAI De base - Configuration de l’Assistant Azure
- Assistants OpenAI avec thread - Gestion des threads
Prise en charge de la diffusion en continu
Les deux frameworks diffusent des jetons en temps réel, des clients et des agents, pour maintenir la réactivité des interfaces utilisateur.
Diffusion automatique en continu
# Model client streaming
async for chunk in client.create_stream(messages):
if isinstance(chunk, str):
print(chunk, end="")
# Agent streaming
async for event in agent.run_stream(task="Hello"):
if isinstance(event, ModelClientStreamingChunkEvent):
print(event.content, end="")
elif isinstance(event, TaskResult):
print("Final result received")
Streaming d’Agent Framework
# Assume we have client, agent, and tools from previous examples
async def streaming_example():
# Chat client streaming
async for chunk in client.get_streaming_response("Hello", tools=tools):
if chunk.text:
print(chunk.text, end="")
# Agent streaming
async for chunk in agent.run_stream("Hello"):
if chunk.text:
print(chunk.text, end="", flush=True)
Conseil : Dans Agent Framework, les clients et les agents produisent la même forme de mise à jour ; vous pouvez lire chunk.text dans les deux cas.
Types de messages et création
Comprendre le fonctionnement des messages est essentiel pour la communication efficace de l’agent. Les deux frameworks fournissent différentes approches pour la création et la gestion des messages, avec AutoGen à l’aide de classes de message distinctes et de l’infrastructure agent à l’aide d’un système de messages unifié.
Types de messages AutoGen
from autogen_agentchat.messages import TextMessage, MultiModalMessage
from autogen_core.models import UserMessage
# Text message
text_msg = TextMessage(content="Hello", source="user")
# Multi-modal message
multi_modal_msg = MultiModalMessage(
content=["Describe this image", image_data],
source="user"
)
# Convert to model format for use with model clients
user_message = text_msg.to_model_message()
Types de messages Agent Framework
from agent_framework import ChatMessage, TextContent, DataContent, UriContent, Role
import base64
# Text message
text_msg = ChatMessage(role=Role.USER, text="Hello")
# Supply real image bytes, or use a data: URI/URL via UriContent
image_bytes = b"<your_image_bytes>"
image_b64 = base64.b64encode(image_bytes).decode()
image_uri = f"data:image/jpeg;base64,{image_b64}"
# Multi-modal message with mixed content
multi_modal_msg = ChatMessage(
role=Role.USER,
contents=[
TextContent(text="Describe this image"),
DataContent(uri=image_uri, media_type="image/jpeg")
]
)
Principales différences :
- AutoGen utilise des classes de message distinctes (
TextMessage,MultiModalMessage) avec unsourcechamp - Agent Framework utilise un objet de contenu unifié
ChatMessageavec des objets de contenu typés et unrolechamp - Les messages Agent Framework utilisent
Rolel’énumération (USER, ASSISTANT, SYSTEM, TOOL) au lieu de sources de chaînes
Création et intégration d’outils
Les outils étendent les fonctionnalités de l’agent au-delà de la génération de texte. Les frameworks prennent différentes approches de la création d’outils, avec Agent Framework fournissant une génération de schéma plus automatisée.
AutoGen FunctionTool
from autogen_core.tools import FunctionTool
async def get_weather(location: str) -> str:
"""Get weather for a location."""
return f"Weather in {location}: sunny"
# Manual tool creation
tool = FunctionTool(
func=get_weather,
description="Get weather information"
)
# Use with agent
agent = AssistantAgent(name="assistant", model_client=client, tools=[tool])
Infrastructure de l’agent @ai_function
from agent_framework import ai_function
from typing import Annotated
from pydantic import Field
@ai_function
def get_weather(
location: Annotated[str, Field(description="The location to get weather for")]
) -> str:
"""Get weather for a location."""
return f"Weather in {location}: sunny"
# Direct use with agent (automatic conversion)
agent = ChatAgent(name="assistant", chat_client=client, tools=[get_weather])
Pour obtenir des exemples détaillés, consultez :
- OpenAI Chat Agent Basic - Agent de conversation OpenAI simple
- OpenAI avec Function Tools - Agent avec des outils personnalisés
- Azure OpenAI Basic - Configuration de l’agent Azure OpenAI
Outils hébergés (Agent Framework exclusif)
Agent Framework fournit des outils hébergés qui ne sont pas disponibles dans AutoGen :
from agent_framework import ChatAgent, HostedCodeInterpreterTool, HostedWebSearchTool
from agent_framework.azure import AzureOpenAIChatClient
# Azure OpenAI client with a model that supports hosted tools
client = AzureOpenAIChatClient(model_id="gpt-5")
# Code execution tool
code_tool = HostedCodeInterpreterTool()
# Web search tool
search_tool = HostedWebSearchTool()
agent = ChatAgent(
name="researcher",
chat_client=client,
tools=[code_tool, search_tool]
)
Pour obtenir des exemples détaillés, consultez :
- Azure AI avec Interpréteur de code - Outil d’exécution de code
- Azure AI avec plusieurs outils - Plusieurs outils hébergés
- OpenAI avec recherche web - Intégration de la recherche web
Exigences et mises en garde :
- Les outils hébergés sont disponibles uniquement sur les modèles/comptes qui les prennent en charge. Vérifiez les droits d’utilisation et la prise en charge du modèle pour votre fournisseur avant d’activer ces outils.
- La configuration diffère selon le fournisseur ; suivez les conditions préalables de chaque exemple pour l’installation et les autorisations.
- Chaque modèle ne prend pas en charge chaque outil hébergé (par exemple, recherche web et interpréteur de code). Choisissez un modèle compatible dans votre environnement.
Note
AutoGen prend en charge les outils d’exécution de code local, mais cette fonctionnalité est prévue pour les futures versions d’Agent Framework.
Principale différence : Agent Framework gère automatiquement l’itération de l’outil au niveau de l’agent. Contrairement au paramètre d’AutoGen, les agents Agent Framework continuent l’exécution de max_tool_iterations l’outil jusqu’à l’achèvement par défaut, avec des mécanismes de sécurité intégrés pour empêcher les boucles infinies.
Prise en charge du serveur MCP
Pour l’intégration avancée des outils, les deux frameworks prennent en charge le protocole MCP (Model Context Protocol), ce qui permet aux agents d’interagir avec des services externes et des sources de données. Agent Framework fournit une prise en charge intégrée plus complète.
Prise en charge d’AutoGen MCP
AutoGen prend en charge mcP de base par le biais d’extensions (les détails d’implémentation spécifiques varient selon la version).
Prise en charge de MCP de l’Infrastructure de l’agent
from agent_framework import ChatAgent, MCPStdioTool, MCPStreamableHTTPTool, MCPWebsocketTool
from agent_framework.openai import OpenAIChatClient
# Create client for the example
client = OpenAIChatClient(model_id="gpt-5")
# Stdio MCP server
mcp_tool = MCPStdioTool(
name="filesystem",
command="uvx mcp-server-filesystem",
args=["/allowed/directory"]
)
# HTTP streaming MCP
http_mcp = MCPStreamableHTTPTool(
name="http_mcp",
url="http://localhost:8000/sse"
)
# WebSocket MCP
ws_mcp = MCPWebsocketTool(
name="websocket_mcp",
url="ws://localhost:8000/ws"
)
agent = ChatAgent(name="assistant", chat_client=client, tools=[mcp_tool])
Pour obtenir des exemples MCP, consultez :
- OpenAI avec MCP local - Utilisation de MCPStreamableHTTPTool avec OpenAI
- OpenAI avec McP hébergé - Utilisation des services MCP hébergés
- Azure AI avec MCP local - Utilisation de MCP avec Azure AI
- Azure AI avec MCP hébergé - Utilisation de MCP hébergé avec Azure AI
Modèle d’agent en tant qu’outil
Un modèle puissant consiste à utiliser des agents eux-mêmes en tant qu’outils, ce qui permet des architectures d’agent hiérarchiques. Les deux frameworks prennent en charge ce modèle avec différentes implémentations.
AutoGen AgentTool
from autogen_agentchat.tools import AgentTool
# Create specialized agent
writer = AssistantAgent(
name="writer",
model_client=client,
system_message="You are a creative writer."
)
# Wrap as tool
writer_tool = AgentTool(agent=writer)
# Use in coordinator (requires disabling parallel tool calls)
coordinator_client = OpenAIChatCompletionClient(
model="gpt-5",
parallel_tool_calls=False
)
coordinator = AssistantAgent(
name="coordinator",
model_client=coordinator_client,
tools=[writer_tool]
)
Agent Framework as_tool()
from agent_framework import ChatAgent
# Assume we have client from previous examples
# Create specialized agent
writer = ChatAgent(
name="writer",
chat_client=client,
instructions="You are a creative writer."
)
# Convert to tool
writer_tool = writer.as_tool(
name="creative_writer",
description="Generate creative content",
arg_name="request",
arg_description="What to write"
)
# Use in coordinator
coordinator = ChatAgent(
name="coordinator",
chat_client=client,
tools=[writer_tool]
)
Remarque de migration explicite : Dans AutoGen, définissez parallel_tool_calls=False sur le client de modèle du coordinateur lors de l’habillage des agents en tant qu’outils pour éviter les problèmes de concurrence lors de l’appel de la même instance d’agent.
Dans Agent Framework, as_tool() ne nécessite pas de désactivation des appels d’outils parallèles, car les agents sont sans état par défaut.
Intergiciel (fonctionnalité Agent Framework)
Agent Framework introduit des fonctionnalités d’intergiciel qui manquent à AutoGen. L’intergiciel permet de puissantes préoccupations croisées telles que la journalisation, la sécurité et la surveillance des performances.
from agent_framework import ChatAgent, AgentRunContext, FunctionInvocationContext
from typing import Callable, Awaitable
# Assume we have client from previous examples
async def logging_middleware(
context: AgentRunContext,
next: Callable[[AgentRunContext], Awaitable[None]]
) -> None:
print(f"Agent {context.agent.name} starting")
await next(context)
print(f"Agent {context.agent.name} completed")
async def security_middleware(
context: FunctionInvocationContext,
next: Callable[[FunctionInvocationContext], Awaitable[None]]
) -> None:
if "password" in str(context.arguments):
print("Blocking function call with sensitive data")
return # Don't call next()
await next(context)
agent = ChatAgent(
name="secure_agent",
chat_client=client,
middleware=[logging_middleware, security_middleware]
)
Avantages :
- Sécurité : validation d’entrée et filtrage de contenu
- Observabilité : journalisation, métriques et suivi
- Performances : mise en cache et limitation du débit
- Gestion des erreurs : dégradation normale et logique de nouvelle tentative
Pour obtenir des exemples détaillés d’intergiciels, consultez :
- Intergiciel basé sur une fonction - Intergiciel de fonction simple
- Middleware basé sur la classe - Middleware orienté objet
- Middleware de gestion des exceptions - Modèles de gestion des erreurs
- Middleware d’état partagé - Gestion de l’état entre les agents
Agents personnalisés
Parfois, vous ne souhaitez pas d’agent sauvegardé par modèle , vous souhaitez un agent déterministe ou soutenu par l’API avec une logique personnalisée. Les deux frameworks prennent en charge la création d’agents personnalisés, mais les modèles diffèrent.
AutoGen : Sous-classe BaseChatAgent
from typing import Sequence
from autogen_agentchat.agents import BaseChatAgent
from autogen_agentchat.base import Response
from autogen_agentchat.messages import BaseChatMessage, TextMessage, StopMessage
from autogen_core import CancellationToken
class StaticAgent(BaseChatAgent):
def __init__(self, name: str = "static", description: str = "Static responder") -> None:
super().__init__(name, description)
@property
def produced_message_types(self) -> Sequence[type[BaseChatMessage]]: # Which message types this agent produces
return (TextMessage,)
async def on_messages(self, messages: Sequence[BaseChatMessage], cancellation_token: CancellationToken) -> Response:
# Always return a static response
return Response(chat_message=TextMessage(content="Hello from AutoGen custom agent", source=self.name))
Notes:
- Implémentez et retournez
on_messages(...)unResponsemessage de conversation. - Implémentez
on_reset(...)éventuellement pour effacer l’état interne entre les exécutions.
Infrastructure de l’agent : Étendre BaseAgent (prenant en charge les threads)
from collections.abc import AsyncIterable
from typing import Any
from agent_framework import (
AgentRunResponse,
AgentRunResponseUpdate,
AgentThread,
BaseAgent,
ChatMessage,
Role,
TextContent,
)
class StaticAgent(BaseAgent):
async def run(
self,
messages: str | ChatMessage | list[str] | list[ChatMessage] | None = None,
*,
thread: AgentThread | None = None,
**kwargs: Any,
) -> AgentRunResponse:
# Build a static reply
reply = ChatMessage(role=Role.ASSISTANT, contents=[TextContent(text="Hello from AF custom agent")])
# Persist conversation to the provided AgentThread (if any)
if thread is not None:
normalized = self._normalize_messages(messages)
await self._notify_thread_of_new_messages(thread, normalized, reply)
return AgentRunResponse(messages=[reply])
async def run_stream(
self,
messages: str | ChatMessage | list[str] | list[ChatMessage] | None = None,
*,
thread: AgentThread | None = None,
**kwargs: Any,
) -> AsyncIterable[AgentRunResponseUpdate]:
# Stream the same static response in a single chunk for simplicity
yield AgentRunResponseUpdate(contents=[TextContent(text="Hello from AF custom agent")], role=Role.ASSISTANT)
# Notify thread of input and the complete response once streaming ends
if thread is not None:
reply = ChatMessage(role=Role.ASSISTANT, contents=[TextContent(text="Hello from AF custom agent")])
normalized = self._normalize_messages(messages)
await self._notify_thread_of_new_messages(thread, normalized, reply)
Notes:
-
AgentThreadgère l’état de conversation en externe ; utilisezagent.get_new_thread()et passez-le àrun/run_stream. - Appelez
self._notify_thread_of_new_messages(thread, input_messages, response_messages)donc le thread a les deux côtés de l’échange. - Consultez l’exemple complet : Agent personnalisé
Examinons ensuite l’orchestration multi-agent , la zone où les frameworks diffèrent le plus.
Mappage de fonctionnalités multi-agent
Vue d’ensemble du modèle de programmation
Les modèles de programmation multi-agent représentent la différence la plus significative entre les deux frameworks.
Approche du double modèle d’AutoGen
AutoGen fournit deux modèles de programmation :
-
autogen-core: ProgrammationRoutedAgentpilotée par les événements avec et abonnements aux messages de bas niveau -
Teamabstraction : Modèle centré sur l’exécution de haut niveau basé surautogen-core
# Low-level autogen-core (complex)
class MyAgent(RoutedAgent):
@message_handler
async def handle_message(self, message: TextMessage, ctx: MessageContext) -> None:
# Handle specific message types
pass
# High-level Team (easier but limited)
team = RoundRobinGroupChat(
participants=[agent1, agent2],
termination_condition=StopAfterNMessages(5)
)
result = await team.run(task="Collaborate on this task")
Défis:
- Le modèle de bas niveau est trop complexe pour la plupart des utilisateurs
- Le modèle de haut niveau peut devenir limité pour les comportements complexes
- Le pontage entre les deux modèles ajoute la complexité de l’implémentation
Modèle de flux de travail unifié d’Agent Framework
Agent Framework fournit une abstraction unique Workflow qui combine le meilleur des deux approches :
from agent_framework import WorkflowBuilder, executor, WorkflowContext
from typing_extensions import Never
# Assume we have agent1 and agent2 from previous examples
@executor(id="agent1")
async def agent1_executor(input_msg: str, ctx: WorkflowContext[str]) -> None:
response = await agent1.run(input_msg)
await ctx.send_message(response.text)
@executor(id="agent2")
async def agent2_executor(input_msg: str, ctx: WorkflowContext[Never, str]) -> None:
response = await agent2.run(input_msg)
await ctx.yield_output(response.text) # Final output
# Build typed data flow graph
workflow = (WorkflowBuilder()
.add_edge(agent1_executor, agent2_executor)
.set_start_executor(agent1_executor)
.build())
# Example usage (would be in async context)
# result = await workflow.run("Initial input")
Pour obtenir des exemples de flux de travail détaillés, consultez :
- Principes de base du flux de travail - Présentation des exécuteurs et des arêtes
- Agents dans le flux de travail - Intégration d’agents dans des flux de travail
- Flux de travail streaming - Exécution de flux de travail en temps réel
Avantages :
- Modèle unifié : abstraction unique pour tous les niveaux de complexité
- Sécurité du type : entrées et sorties fortement typées
- Visualisation du graphique : Effacer la représentation du flux de données
- Composition flexible : Combiner des agents, des fonctions et des sous-workflows
Flux de travail et GraphFlow
L’abstraction de Workflow Agent Framework est inspirée par la fonctionnalité expérimentale GraphFlow d’AutoGen, mais représente une évolution significative de la philosophie de conception :
- GraphFlow : flux de contrôle basé sur lequel les arêtes sont des transitions et les messages sont diffusés vers tous les agents ; les transitions sont conditionnées sur le contenu du message diffusé
- Flux de travail : flux de données basé sur lequel les messages sont routés via des arêtes et des exécuteurs spécifiques sont activés par des arêtes, avec prise en charge de l’exécution simultanée.
Vue d’ensemble du visuel
Le diagramme ci-dessous contraste le flux de contrôle GraphFlow (gauche) de AutoGen avec le flux de données de l’Infrastructure agent (à droite). GraphFlow modélise des agents en tant que nœuds avec des transitions conditionnelles et des diffusions. Exécuteurs de modèles de flux de travail (agents, fonctions ou sous-flux de travail) connectés par des arêtes typées ; il prend également en charge les pauses de demande/réponse et le point de contrôle.
flowchart LR
subgraph AutoGenGraphFlow
direction TB
U[User / Task] --> A[Agent A]
A -->|success| B[Agent B]
A -->|retry| C[Agent C]
A -. broadcast .- B
A -. broadcast .- C
end
subgraph AgentFrameworkWorkflow
direction TB
I[Input] --> E1[Executor 1]
E1 -->|"str"| E2[Executor 2]
E1 -->|"image"| E3[Executor 3]
E3 -->|"str"| E2
E2 --> OUT[(Final Output)]
end
R[Request / Response Gate]
E2 -. request .-> R
R -. resume .-> E2
CP[Checkpoint]
E1 -. save .-> CP
CP -. load .-> E1
Pratiquement:
- GraphFlow utilise des agents en tant que nœuds et diffuse des messages ; les arêtes représentent des transitions conditionnelles.
- Les itinéraires de flux de travail ont tapé des messages le long des bords. Les nœuds (exécuteurs) peuvent être des agents, des fonctions pures ou des sous-workflows.
- La demande/réponse permet à un flux de travail de suspendre l’entrée externe ; le point de contrôle conserve la progression et active la reprise.
Comparaison de code
1) Séquentiel + Conditionnel
# AutoGen GraphFlow (fluent builder) — writer → reviewer → editor (conditional)
from autogen_agentchat.agents import AssistantAgent
from autogen_agentchat.teams import DiGraphBuilder, GraphFlow
writer = AssistantAgent(name="writer", description="Writes a draft", model_client=client)
reviewer = AssistantAgent(name="reviewer", description="Reviews the draft", model_client=client)
editor = AssistantAgent(name="editor", description="Finalizes the draft", model_client=client)
graph = (
DiGraphBuilder()
.add_node(writer).add_node(reviewer).add_node(editor)
.add_edge(writer, reviewer) # always
.add_edge(reviewer, editor, condition=lambda msg: "approve" in msg.to_model_text())
.set_entry_point(writer)
).build()
team = GraphFlow(participants=[writer, reviewer, editor], graph=graph)
result = await team.run(task="Draft a short paragraph about solar power")
# Agent Framework Workflow — sequential executors with conditional logic
from agent_framework import WorkflowBuilder, executor, WorkflowContext
from typing_extensions import Never
@executor(id="writer")
async def writer_exec(task: str, ctx: WorkflowContext[str]) -> None:
await ctx.send_message(f"Draft: {task}")
@executor(id="reviewer")
async def reviewer_exec(draft: str, ctx: WorkflowContext[str]) -> None:
decision = "approve" if "solar" in draft.lower() else "revise"
await ctx.send_message(f"{decision}:{draft}")
@executor(id="editor")
async def editor_exec(msg: str, ctx: WorkflowContext[Never, str]) -> None:
if msg.startswith("approve:"):
await ctx.yield_output(msg.split(":", 1)[1])
else:
await ctx.yield_output("Needs revision")
workflow_seq = (
WorkflowBuilder()
.add_edge(writer_exec, reviewer_exec)
.add_edge(reviewer_exec, editor_exec)
.set_start_executor(writer_exec)
.build()
)
2) Fan-out + Join (ALL vs ANY)
# AutoGen GraphFlow — A → (B, C) → D with ALL/ANY join
from autogen_agentchat.teams import DiGraphBuilder, GraphFlow
A, B, C, D = agent_a, agent_b, agent_c, agent_d
# ALL (default): D runs after both B and C
g_all = (
DiGraphBuilder()
.add_node(A).add_node(B).add_node(C).add_node(D)
.add_edge(A, B).add_edge(A, C)
.add_edge(B, D).add_edge(C, D)
.set_entry_point(A)
).build()
# ANY: D runs when either B or C completes
g_any = (
DiGraphBuilder()
.add_node(A).add_node(B).add_node(C).add_node(D)
.add_edge(A, B).add_edge(A, C)
.add_edge(B, D, activation_group="join_d", activation_condition="any")
.add_edge(C, D, activation_group="join_d", activation_condition="any")
.set_entry_point(A)
).build()
# Agent Framework Workflow — A → (B, C) → aggregator (ALL vs ANY)
from agent_framework import WorkflowBuilder, executor, WorkflowContext
from typing_extensions import Never
@executor(id="A")
async def start(task: str, ctx: WorkflowContext[str]) -> None:
await ctx.send_message(f"B:{task}", target_id="B")
await ctx.send_message(f"C:{task}", target_id="C")
@executor(id="B")
async def branch_b(text: str, ctx: WorkflowContext[str]) -> None:
await ctx.send_message(f"B_done:{text}")
@executor(id="C")
async def branch_c(text: str, ctx: WorkflowContext[str]) -> None:
await ctx.send_message(f"C_done:{text}")
@executor(id="join_any")
async def join_any(msg: str, ctx: WorkflowContext[Never, str]) -> None:
await ctx.yield_output(f"First: {msg}") # ANY join (first arrival)
@executor(id="join_all")
async def join_all(msg: str, ctx: WorkflowContext[str, str]) -> None:
state = await ctx.get_executor_state() or {"items": []}
state["items"].append(msg)
await ctx.set_executor_state(state)
if len(state["items"]) >= 2:
await ctx.yield_output(" | ".join(state["items"])) # ALL join
wf_any = (
WorkflowBuilder()
.add_edge(start, branch_b).add_edge(start, branch_c)
.add_edge(branch_b, join_any).add_edge(branch_c, join_any)
.set_start_executor(start)
.build()
)
wf_all = (
WorkflowBuilder()
.add_edge(start, branch_b).add_edge(start, branch_c)
.add_edge(branch_b, join_all).add_edge(branch_c, join_all)
.set_start_executor(start)
.build()
)
3) Routage ciblé (aucune diffusion)
from agent_framework import WorkflowBuilder, executor, WorkflowContext
from typing_extensions import Never
@executor(id="ingest")
async def ingest(task: str, ctx: WorkflowContext[str]) -> None:
# Route selectively using target_id
if task.startswith("image:"):
await ctx.send_message(task.removeprefix("image:"), target_id="vision")
else:
await ctx.send_message(task, target_id="writer")
@executor(id="writer")
async def write(text: str, ctx: WorkflowContext[Never, str]) -> None:
await ctx.yield_output(f"Draft: {text}")
@executor(id="vision")
async def caption(image_ref: str, ctx: WorkflowContext[Never, str]) -> None:
await ctx.yield_output(f"Caption: {image_ref}")
workflow = (
WorkflowBuilder()
.add_edge(ingest, write)
.add_edge(ingest, caption)
.set_start_executor(ingest)
.build()
)
# Example usage (async):
# await workflow.run("Summarize the benefits of solar power")
# await workflow.run("image:https://example.com/panel.jpg")
Remarques :
- GraphFlow diffuse des messages et utilise des transitions conditionnelles. Le comportement de jointure est configuré via le côté
activationcible et par périphérieactivation_group/activation_condition(par exemple, regrouper les deux bords avec ).join_dactivation_condition="any" - Flux de travail achemine explicitement les données ; permet
target_idde sélectionner des exécuteurs en aval. Le comportement de jointure réside dans l’exécuteur de réception (par exemple, rendement sur la première entrée et attendre tout), ou via les générateurs/agrégateurs d’orchestration. - Les exécuteurs du flux de travail sont sous forme libre : encapsuler une
ChatAgentfonction ou un sous-flux de travail et les mélanger dans le même graphique.
Principales différences
Le tableau ci-dessous résume les différences fondamentales entre le flux de travail GraphFlow d’AutoGen et le flux de travail de l’infrastructure d’agent :
| Aspect | AutoGen GraphFlow | Flux de travail Agent Framework |
|---|---|---|
| Type de flux | Flux de contrôle (les bords sont des transitions) | Flux de données (edges route messages) |
| Types de nœuds | Agents uniquement | Agents, fonctions, sous-flux de travail |
| Activation | Diffusion de messages | Activation basée sur la périphérie |
| Sécurité des types | Limité | Frappe forte tout au long de |
| Composabilité | Limité | Hautement composable |
Modèles d’imbrication
Imbrication d’équipe AutoGen
# Inner team
inner_team = RoundRobinGroupChat(
participants=[specialist1, specialist2],
termination_condition=StopAfterNMessages(3)
)
# Outer team with nested team as participant
outer_team = RoundRobinGroupChat(
participants=[coordinator, inner_team, reviewer], # Team as participant
termination_condition=StopAfterNMessages(10)
)
# Messages are broadcasted to all participants including nested team
result = await outer_team.run("Complex task requiring collaboration")
Caractéristiques d’imbrication automatique :
- L’équipe imbriquée reçoit tous les messages de l’équipe externe
- Les messages d’équipe imbriqués sont diffusés à tous les participants de l’équipe externe
- Contexte de message partagé entre tous les niveaux
Imbrication du flux de travail Agent Framework
from agent_framework import WorkflowExecutor, WorkflowBuilder
# Assume we have executors from previous examples
# specialist1_executor, specialist2_executor, coordinator_executor, reviewer_executor
# Create sub-workflow
sub_workflow = (WorkflowBuilder()
.add_edge(specialist1_executor, specialist2_executor)
.set_start_executor(specialist1_executor)
.build())
# Wrap as executor
sub_workflow_executor = WorkflowExecutor(
workflow=sub_workflow,
id="sub_process"
)
# Use in parent workflow
parent_workflow = (WorkflowBuilder()
.add_edge(coordinator_executor, sub_workflow_executor)
.add_edge(sub_workflow_executor, reviewer_executor)
.set_start_executor(coordinator_executor)
.build())
Caractéristiques d’imbrication d’Agent Framework :
- Entrée/sortie isolée via
WorkflowExecutor - Aucune diffusion de messages - flux de données via des connexions spécifiques
- Gestion d’état indépendante pour chaque niveau de flux de travail
Modèles de conversation de groupe
Les modèles de conversation de groupe permettent à plusieurs agents de collaborer sur des tâches complexes. Voici comment les modèles courants se traduisent entre les frameworks.
RoundRobinGroupChat Pattern
Implémentation AutoGen :
from autogen_agentchat.teams import RoundRobinGroupChat
from autogen_agentchat.conditions import StopAfterNMessages
team = RoundRobinGroupChat(
participants=[agent1, agent2, agent3],
termination_condition=StopAfterNMessages(10)
)
result = await team.run("Discuss this topic")
Implémentation de l’infrastructure de l’agent :
from agent_framework import SequentialBuilder, WorkflowOutputEvent
# Assume we have agent1, agent2, agent3 from previous examples
# Sequential workflow through participants
workflow = SequentialBuilder().participants([agent1, agent2, agent3]).build()
# Example usage (would be in async context)
async def sequential_example():
# Each agent appends to shared conversation
async for event in workflow.run_stream("Discuss this topic"):
if isinstance(event, WorkflowOutputEvent):
conversation_history = event.data # list[ChatMessage]
Pour obtenir des exemples d’orchestration détaillés, consultez :
- Agents séquentiels - Exécution de l’agent de style tourniquet
- Exécuteurs personnalisés séquentiels - Modèles d’exécuteur personnalisés
Pour les modèles d’exécution simultanés, Agent Framework fournit également les éléments suivants :
from agent_framework import ConcurrentBuilder, WorkflowOutputEvent
# Assume we have agent1, agent2, agent3 from previous examples
# Concurrent workflow for parallel processing
workflow = (ConcurrentBuilder()
.participants([agent1, agent2, agent3])
.build())
# Example usage (would be in async context)
async def concurrent_example():
# All agents process the input concurrently
async for event in workflow.run_stream("Process this in parallel"):
if isinstance(event, WorkflowOutputEvent):
results = event.data # Combined results from all agents
Pour obtenir des exemples d’exécution simultanées, consultez :
- Agents simultanés - Exécution de l’agent parallèle
- Exécuteurs personnalisés simultanés - Modèles parallèles personnalisés
- Simultané avec l’agrégation personnalisée - Modèles d’agrégation de résultats
Modèle MagenticOneGroupChat
Implémentation AutoGen :
from autogen_agentchat.teams import MagenticOneGroupChat
team = MagenticOneGroupChat(
participants=[researcher, coder, executor],
model_client=coordinator_client,
termination_condition=StopAfterNMessages(20)
)
result = await team.run("Complex research and analysis task")
Implémentation de l’infrastructure de l’agent :
from typing import cast
from agent_framework import (
MAGENTIC_EVENT_TYPE_AGENT_DELTA,
MAGENTIC_EVENT_TYPE_ORCHESTRATOR,
AgentRunUpdateEvent,
ChatAgent,
ChatMessage,
MagenticBuilder,
WorkflowOutputEvent,
)
from agent_framework.openai import OpenAIChatClient
# Create a manager agent for orchestration
manager_agent = ChatAgent(
name="MagenticManager",
description="Orchestrator that coordinates the workflow",
instructions="You coordinate a team to complete complex tasks efficiently.",
chat_client=OpenAIChatClient(),
)
workflow = (
MagenticBuilder()
.participants(researcher=researcher, coder=coder)
.with_standard_manager(
agent=manager_agent,
max_round_count=20,
max_stall_count=3,
max_reset_count=2,
)
.build()
)
# Example usage (would be in async context)
async def magentic_example():
output: str | None = None
async for event in workflow.run_stream("Complex research task"):
if isinstance(event, AgentRunUpdateEvent):
props = event.data.additional_properties if event.data else None
event_type = props.get("magentic_event_type") if props else None
if event_type == MAGENTIC_EVENT_TYPE_ORCHESTRATOR:
text = event.data.text if event.data else ""
print(f"[ORCHESTRATOR]: {text}")
elif event_type == MAGENTIC_EVENT_TYPE_AGENT_DELTA:
agent_id = props.get("agent_id", event.executor_id) if props else event.executor_id
if event.data and event.data.text:
print(f"[{agent_id}]: {event.data.text}", end="")
elif isinstance(event, WorkflowOutputEvent):
output_messages = cast(list[ChatMessage], event.data)
if output_messages:
output = output_messages[-1].text
Options de personnalisation de l’infrastructure agent :
Le flux de travail Magentic fournit des options de personnalisation étendues :
- Configuration du gestionnaire : utiliser un ChatAgent avec des instructions personnalisées et des paramètres de modèle
-
Limites arrondies :
max_round_count,max_stall_count,max_reset_count -
Diffusion en continu d’événements : utiliser
AgentRunUpdateEventavec desmagentic_event_typemétadonnées - Spécialisation de l’agent : instructions et outils personnalisés par agent
- Human-in-the-loop : Plan review, tool approval, and stall intervention
# Advanced customization example with human-in-the-loop
from typing import cast
from agent_framework import (
MAGENTIC_EVENT_TYPE_AGENT_DELTA,
MAGENTIC_EVENT_TYPE_ORCHESTRATOR,
AgentRunUpdateEvent,
ChatAgent,
MagenticBuilder,
MagenticHumanInterventionDecision,
MagenticHumanInterventionKind,
MagenticHumanInterventionReply,
MagenticHumanInterventionRequest,
RequestInfoEvent,
WorkflowOutputEvent,
)
from agent_framework.openai import OpenAIChatClient
# Create manager agent with custom configuration
manager_agent = ChatAgent(
name="MagenticManager",
description="Orchestrator for complex tasks",
instructions="Custom orchestration instructions...",
chat_client=OpenAIChatClient(model_id="gpt-4o"),
)
workflow = (
MagenticBuilder()
.participants(
researcher=researcher_agent,
coder=coder_agent,
analyst=analyst_agent,
)
.with_standard_manager(
agent=manager_agent,
max_round_count=15, # Limit total rounds
max_stall_count=2, # Trigger stall handling
max_reset_count=1, # Allow one reset on failure
)
.with_plan_review() # Enable human plan review
.with_human_input_on_stall() # Enable human intervention on stalls
.build()
)
# Handle human intervention requests during execution
async for event in workflow.run_stream("Complex task"):
if isinstance(event, RequestInfoEvent) and event.request_type is MagenticHumanInterventionRequest:
req = cast(MagenticHumanInterventionRequest, event.data)
if req.kind == MagenticHumanInterventionKind.PLAN_REVIEW:
# Review and approve the plan
reply = MagenticHumanInterventionReply(
decision=MagenticHumanInterventionDecision.APPROVE
)
async for ev in workflow.send_responses_streaming({event.request_id: reply}):
pass # Handle continuation
Pour obtenir des exemples Magentic détaillés, consultez :
- Flux de travail Magentic de base - Flux de travail multi-agent orchestré standard
- Magentic avec point de contrôle - Workflows orchestrés persistants
- Mise à jour du plan humain Magentic - Révision du plan humain dans la boucle
- Clarification de l’agent Magentic - Approbation de l’outil pour la clarification de l’agent
- Plan de replan humain Magentic - Intervention humaine sur les décrochages
Modèles futurs
La feuille de route agent Framework inclut plusieurs modèles AutoGen actuellement en cours de développement :
- Modèle Swarm : coordination de l’agent basé sur les handoffs
- SelectorGroupChat : sélection de l’orateur piloté par LLM
Human-in-the-Loop with Request Response
Une nouvelle fonctionnalité clé dans Workflow Agent Framework est le concept de demande et de réponse, qui permet aux flux de travail de suspendre l’exécution et d’attendre l’entrée externe avant de continuer. Cette fonctionnalité n’est pas présente dans l’abstraction d’AutoGen Team et permet des modèles de boucles humains sophistiqués.
Limitations de génération automatique
L’abstraction d’AutoGen Team s’exécute en continu une fois démarrée et ne fournit pas de mécanismes intégrés pour suspendre l’exécution pour l’entrée humaine. Toute fonctionnalité human-in-the-loop nécessite des implémentations personnalisées en dehors de l’infrastructure.
Agent Framework Request-Response API
Agent Framework fournit des fonctionnalités intégrées de demande-réponse où n’importe quel exécuteur peut envoyer des requêtes à l’aide ctx.request_info() et gérer des réponses avec le @response_handler décorateur.
from agent_framework import (
RequestInfoEvent, WorkflowBuilder, WorkflowContext,
Executor, handler, response_handler
)
from dataclasses import dataclass
# Assume we have agent_executor defined elsewhere
# Define typed request payload
@dataclass
class ApprovalRequest:
"""Request human approval for agent output."""
content: str = ""
agent_name: str = ""
# Workflow executor that requests human approval
class ReviewerExecutor(Executor):
@handler
async def review_content(
self,
agent_response: str,
ctx: WorkflowContext
) -> None:
# Request human input with structured data
approval_request = ApprovalRequest(
content=agent_response,
agent_name="writer_agent"
)
await ctx.request_info(request_data=approval_request, response_type=str)
@response_handler
async def handle_approval_response(
self,
original_request: ApprovalRequest,
decision: str,
ctx: WorkflowContext
) -> None:
decision_lower = decision.strip().lower()
original_content = original_request.content
if decision_lower == "approved":
await ctx.yield_output(f"APPROVED: {original_content}")
else:
await ctx.yield_output(f"REVISION NEEDED: {decision}")
# Build workflow with human-in-the-loop
reviewer = ReviewerExecutor(id="reviewer")
workflow = (WorkflowBuilder()
.add_edge(agent_executor, reviewer)
.set_start_executor(agent_executor)
.build())
Exécution de flux de travail human-in-the-loop
Agent Framework fournit des API de diffusion en continu pour gérer le cycle de pause-reprise :
from agent_framework import RequestInfoEvent, WorkflowOutputEvent
# Assume we have workflow defined from previous examples
async def run_with_human_input():
pending_responses = None
completed = False
while not completed:
# First iteration uses run_stream, subsequent use send_responses_streaming
stream = (
workflow.send_responses_streaming(pending_responses)
if pending_responses
else workflow.run_stream("initial input")
)
events = [event async for event in stream]
pending_responses = None
# Collect human requests and outputs
for event in events:
if isinstance(event, RequestInfoEvent):
# Display request to human and collect response
request_data = event.data # ApprovalRequest instance
print(f"Review needed: {request_data.content}")
human_response = input("Enter 'approved' or revision notes: ")
pending_responses = {event.request_id: human_response}
elif isinstance(event, WorkflowOutputEvent):
print(f"Final result: {event.data}")
completed = True
Pour obtenir des exemples de flux de travail human-in-the-loop, consultez :
- Jeu devinage avec entrée humaine - Flux de travail interactif avec commentaires utilisateur
- Flux de travail en tant qu’agent avec entrée humaine - Flux de travail imbriqués avec interaction humaine
Points de contrôle et reprise des flux de travail
Un autre avantage clé de l’abstraction d’Agent WorkflowTeam Framework sur AutoGen est la prise en charge intégrée du point de contrôle et de la reprise de l’exécution. Cela permet aux flux de travail d’être suspendus, persistants et repris ultérieurement à partir de n’importe quel point de contrôle, en fournissant une tolérance de panne et en activant des flux de travail longs ou asynchrones.
Limitations de génération automatique
L’abstraction de Team AutoGen ne fournit pas de fonctionnalités de point de contrôle intégrées. Toutes les mécanismes de persistance ou de récupération doivent être implémentés en externe, nécessitant souvent une logique complexe de gestion de l’état et de sérialisation.
Point de contrôle de l’infrastructure de l’agent
Agent Framework fournit un point de contrôle complet et FileCheckpointStorage la with_checkpointing() méthode sur WorkflowBuilder. Capture des points de contrôle :
-
État de l’exécuteur : état local pour chaque exécuteur à l’aide de
ctx.set_executor_state() -
État partagé : état de l’exécuteur croisé à l’aide de
ctx.set_shared_state() - Files d’attente de messages : messages en attente entre exécuteurs
- Position du flux de travail : progression de l’exécution actuelle et étapes suivantes
from agent_framework import (
FileCheckpointStorage, WorkflowBuilder, WorkflowContext,
Executor, handler
)
from typing_extensions import Never
class ProcessingExecutor(Executor):
@handler
async def process(self, data: str, ctx: WorkflowContext[str]) -> None:
# Process the data
result = f"Processed: {data.upper()}"
print(f"Processing: '{data}' -> '{result}'")
# Persist executor-local state
prev_state = await ctx.get_executor_state() or {}
count = prev_state.get("count", 0) + 1
await ctx.set_executor_state({
"count": count,
"last_input": data,
"last_output": result
})
# Persist shared state for other executors
await ctx.set_shared_state("original_input", data)
await ctx.set_shared_state("processed_output", result)
await ctx.send_message(result)
class FinalizeExecutor(Executor):
@handler
async def finalize(self, data: str, ctx: WorkflowContext[Never, str]) -> None:
result = f"Final: {data}"
await ctx.yield_output(result)
# Configure checkpoint storage
checkpoint_storage = FileCheckpointStorage(storage_path="./checkpoints")
processing_executor = ProcessingExecutor(id="processing")
finalize_executor = FinalizeExecutor(id="finalize")
# Build workflow with checkpointing enabled
workflow = (WorkflowBuilder()
.add_edge(processing_executor, finalize_executor)
.set_start_executor(processing_executor)
.with_checkpointing(checkpoint_storage=checkpoint_storage) # Enable checkpointing
.build())
# Example usage (would be in async context)
async def checkpoint_example():
# Run workflow - checkpoints are created automatically
async for event in workflow.run_stream("input data"):
print(f"Event: {event}")
Reprise à partir de points de contrôle
Agent Framework fournit des API pour répertorier, inspecter et reprendre à partir de points de contrôle spécifiques :
from typing_extensions import Never
from agent_framework import (
Executor,
FileCheckpointStorage,
WorkflowContext,
WorkflowBuilder,
get_checkpoint_summary,
handler,
)
class UpperCaseExecutor(Executor):
@handler
async def process(self, text: str, ctx: WorkflowContext[str]) -> None:
result = text.upper()
await ctx.send_message(result)
class ReverseExecutor(Executor):
@handler
async def process(self, text: str, ctx: WorkflowContext[Never, str]) -> None:
result = text[::-1]
await ctx.yield_output(result)
def create_workflow(checkpoint_storage: FileCheckpointStorage):
"""Create a workflow with two executors and checkpointing."""
upper_executor = UpperCaseExecutor(id="upper")
reverse_executor = ReverseExecutor(id="reverse")
return (WorkflowBuilder()
.add_edge(upper_executor, reverse_executor)
.set_start_executor(upper_executor)
.with_checkpointing(checkpoint_storage=checkpoint_storage)
.build())
# Assume we have checkpoint_storage from previous examples
checkpoint_storage = FileCheckpointStorage(storage_path="./checkpoints")
async def checkpoint_resume_example():
# List available checkpoints
checkpoints = await checkpoint_storage.list_checkpoints()
# Display checkpoint information
for checkpoint in checkpoints:
summary = get_checkpoint_summary(checkpoint)
print(f"Checkpoint {summary.checkpoint_id}: iteration={summary.iteration_count}")
# Resume from a specific checkpoint
if checkpoints:
chosen_checkpoint_id = checkpoints[0].checkpoint_id
# Create new workflow instance and resume
new_workflow = create_workflow(checkpoint_storage)
async for event in new_workflow.run_stream(
checkpoint_id=chosen_checkpoint_id,
checkpoint_storage=checkpoint_storage
):
print(f"Resumed event: {event}")
Fonctionnalités avancées de point de contrôle
Point de contrôle avec intégration de l’élément humain dans la boucle :
Le point de contrôle fonctionne en toute transparence avec les flux de travail en boucle, ce qui permet aux flux de travail d’être suspendus pour l’entrée humaine et repris ultérieurement. Lors de la reprise à partir d’un point de contrôle qui contient des demandes en attente, ces demandes sont réémises en tant qu’événements :
# Assume we have workflow, checkpoint_id, and checkpoint_storage from previous examples
async def resume_with_pending_requests_example():
# Resume from checkpoint - pending requests will be re-emitted
request_info_events = []
async for event in workflow.run_stream(
checkpoint_id=checkpoint_id,
checkpoint_storage=checkpoint_storage
):
if isinstance(event, RequestInfoEvent):
request_info_events.append(event)
# Handle re-emitted pending request
responses = {}
for event in request_info_events:
response = handle_request(event.data)
responses[event.request_id] = response
# Send response back to workflow
async for event in workflow.send_responses_streaming(responses):
print(f"Event: {event}")
Principaux avantages
Par rapport à AutoGen, le point de contrôle d’Agent Framework fournit les éléments suivants :
- Persistance automatique : aucune gestion manuelle de l’état requise
- Récupération granulaire : Reprendre à partir de n’importe quelle limite de superstep
- Isolation de l’état : état local et partagé distinct de l’exécuteur
- Intégration de l’élément humain dans la boucle : reprise en pause transparente avec entrée humaine
- Tolérance de panne : récupération robuste contre les défaillances ou interruptions
Exemples pratiques
Pour obtenir des exemples de points de contrôle complets, consultez :
- Point de contrôle avec reprise - Contrôle de base et reprise interactive
- Point de contrôle avec Human-in-the-Loop - Flux de travail persistants avec des portes d’approbation humaines
- Point de contrôle de sous-flux de travail - Points de contrôle des flux de travail imbriqués
- Point de contrôle Magentic - Flux de travail multi-agent orchestrés
Observability
AutoGen et Agent Framework offrent des fonctionnalités d’observabilité, mais avec différentes approches et fonctionnalités.
Observabilité automatique
AutoGen a une prise en charge native d’OpenTelemetry avec instrumentation pour :
-
Suivi du runtime :
SingleThreadedAgentRuntimeetGrpcWorkerAgentRuntime -
Exécution de l’outil :
BaseToolavecexecute_tooldes étendues de conventions sémantiques GenAI suivantes -
Opérations de l’agent :
BaseChatAgentaveccreate_agentetinvoke_agentétendues
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from autogen_core import SingleThreadedAgentRuntime
# Configure OpenTelemetry
tracer_provider = TracerProvider()
trace.set_tracer_provider(tracer_provider)
# Pass to runtime
runtime = SingleThreadedAgentRuntime(tracer_provider=tracer_provider)
Observabilité de l’infrastructure de l’agent
Agent Framework fournit une observabilité complète par le biais de plusieurs approches :
- Configuration de code zéro : instrumentation automatique via des variables d’environnement
- Configuration manuelle : configuration programmatique avec des paramètres personnalisés
- Données de télémétrie enrichies : Agents, flux de travail et suivi d’exécution d’outils
- Sortie de la console : journalisation et visualisation intégrées de la console
from agent_framework import ChatAgent
from agent_framework.observability import setup_observability
from agent_framework.openai import OpenAIChatClient
# Zero-code setup via environment variables
# Set ENABLE_OTEL=true
# Set OTLP_ENDPOINT=http://localhost:4317
# Or manual setup
setup_observability(
otlp_endpoint="http://localhost:4317"
)
# Create client for the example
client = OpenAIChatClient(model_id="gpt-5")
async def observability_example():
# Observability is automatically applied to all agents and workflows
agent = ChatAgent(name="assistant", chat_client=client)
result = await agent.run("Hello") # Automatically traced
Principales différences :
- Complexité de l’installation : Agent Framework offre des options d’installation sans code plus simples
- Étendue : Agent Framework offre une couverture plus large, notamment l’observabilité au niveau du flux de travail
- Visualisation : Agent Framework inclut une sortie de console intégrée et une interface utilisateur de développement
- Configuration : Agent Framework offre des options de configuration plus flexibles
Pour obtenir des exemples d’observabilité détaillés, consultez :
- Configuration de code zéro - Configuration des variables d’environnement
- Configuration manuelle - Configuration programmatique
- Observabilité de l’agent - Télémétrie d’agent unique
- Observabilité du flux de travail - Suivi de flux de travail multi-agent
Conclusion
Ce guide de migration fournit un mappage complet entre AutoGen et Microsoft Agent Framework, couvrant tout, de la création de l’agent de base aux flux de travail multi-agents complexes. Points clés à prendre pour la migration :
- La migration à agent unique est simple, avec des API similaires et des fonctionnalités améliorées dans Agent Framework
- Les modèles multi-agents nécessitent de repenser votre approche des architectures basées sur des flux de données, mais si vous connaissez déjà GraphFlow, la transition sera plus facile.
- Agent Framework offre des fonctionnalités supplémentaires telles que l’intergiciel, les outils hébergés et les flux de travail typés
Pour obtenir des exemples supplémentaires et des instructions détaillées sur l’implémentation, reportez-vous au répertoire d’exemples Agent Framework .
Exemples de catégories supplémentaires
L’infrastructure de l’agent fournit des exemples dans plusieurs autres domaines importants :
- Threads : Exemples de threads - Gestion de l’état et du contexte de conversation
- Entrée modale : exemples modals - Utilisation d’images et d’autres types de supports
- Fournisseurs de contexte : exemples de fournisseurs de contexte - Modèles d’intégration de contexte externe