Partager via


Guide de migration d’AutoGen to Microsoft Agent Framework

Guide complet pour la migration d’AutoGen vers le Kit de développement logiciel (SDK) Microsoft Agent Framework Python.

Sommaire

Contexte

AutoGen est une infrastructure permettant de créer des agents IA et des systèmes multi-agents à l’aide de modèles de langage volumineux (LLMs). Il a commencé en tant que projet de recherche chez Microsoft Research et a lancé plusieurs concepts dans l’orchestration multi-agent, comme GroupChat et le runtime d’agent piloté par les événements. Le projet a été une collaboration fructueuse de la communauté open source et de nombreuses fonctionnalités importantes proviennent de contributeurs externes.

Microsoft Agent Framework est un nouveau SDK multi-langage permettant de créer des agents et des flux de travail IA à l’aide de LLMs. Il représente une évolution significative des idées pionniers dans AutoGen et intègre les leçons tirées de l’utilisation réelle. Il est développé par les principales équipes AutoGen et Semantic Kernel chez Microsoft, et est conçu pour être une nouvelle base pour la création d’applications IA à l’avenir.

Ce guide décrit un parcours de migration pratique : il commence par couvrir ce qui reste le même et ce qui change en un clin d’œil. Ensuite, il couvre la configuration du client de modèle, les fonctionnalités à agent unique et enfin l’orchestration multi-agent avec du code concret côte à côte. En cours de route, les liens vers des exemples exécutables dans le référentiel Agent Framework vous aident à valider chaque étape.

Similitudes et différences clés

Ce qui reste le même

Les fondations sont familières. Vous créez toujours des agents autour d’un client de modèle, fournissez des instructions et joignez des outils. Les deux bibliothèques prennent en charge les outils de style de fonction, la diffusion en continu de jetons, le contenu modal et les E/S asynchrones.

# Both frameworks follow similar patterns
# AutoGen
agent = AssistantAgent(name="assistant", model_client=client, tools=[my_tool])
result = await agent.run(task="Help me with this task")

# Agent Framework
agent = ChatAgent(name="assistant", chat_client=client, tools=[my_tool])
result = await agent.run("Help me with this task")

Principales différences

  1. Style d’orchestration : AutoGen associe un cœur piloté par les événements à un haut niveau Team. Agent Framework se concentre sur une base de graphique Workflow typée qui achemine les données le long des arêtes et active les exécuteurs lorsque les entrées sont prêtes.

  2. Outils : AutoGen encapsule les fonctions avec FunctionTool. Agent Framework utilise @ai_function, déduit automatiquement les schémas et ajoute des outils hébergés tels qu’un interpréteur de code et une recherche web.

  3. Comportement de l’agent : AssistantAgent est à un seul tour, sauf si vous augmentez max_tool_iterations. ChatAgent est multitour par défaut et continue d’appeler des outils jusqu’à ce qu’il puisse retourner une réponse finale.

  4. Runtime : AutoGen offre des runtimes distribués incorporés et expérimentaux. Agent Framework se concentre sur la composition à processus unique aujourd’hui ; l’exécution distribuée est planifiée.

Création et configuration du client de modèle

Les deux frameworks fournissent des clients de modèle pour les principaux fournisseurs d’IA, avec des API similaires, mais pas identiques.

Caractéristique AutoGen Cadre d’agent
OpenAI Client OpenAIChatCompletionClient OpenAIChatClient
Client réponses OpenAI ❌ Non disponible OpenAIResponsesClient
Azure OpenAI AzureOpenAIChatCompletionClient AzureOpenAIChatClient
Réponses Azure OpenAI ❌ Non disponible AzureOpenAIResponsesClient
Azure AI AzureAIChatCompletionClient AzureAIAgentClient
Anthropic AnthropicChatCompletionClient 🚧 Planifié
Ollama OllamaChatCompletionClient 🚧 Planifié
Mise en cache ChatCompletionCache enveloppe 🚧 Planifié

Clients de modèle AutoGen

from autogen_ext.models.openai import OpenAIChatCompletionClient, AzureOpenAIChatCompletionClient

# OpenAI
client = OpenAIChatCompletionClient(
    model="gpt-5",
    api_key="your-key"
)

# Azure OpenAI
client = AzureOpenAIChatCompletionClient(
    azure_endpoint="https://your-endpoint.openai.azure.com/",
    azure_deployment="gpt-5",
    api_version="2024-12-01",
    api_key="your-key"
)

Agent Framework ChatClients

from agent_framework.openai import OpenAIChatClient
from agent_framework.azure import AzureOpenAIChatClient

# OpenAI (reads API key from environment)
client = OpenAIChatClient(model_id="gpt-5")

# Azure OpenAI (uses environment or default credentials; see samples for auth options)
client = AzureOpenAIChatClient(model_id="gpt-5")

Pour obtenir des exemples détaillés, consultez :

Prise en charge de l’API Réponses (agent Framework exclusif)

Agent Framework AzureOpenAIResponsesClient et OpenAIResponsesClient fournir une prise en charge spécialisée pour les modèles de raisonnement et les réponses structurées non disponibles dans AutoGen :

from agent_framework.azure import AzureOpenAIResponsesClient
from agent_framework.openai import OpenAIResponsesClient

# Azure OpenAI with Responses API
azure_responses_client = AzureOpenAIResponsesClient(model_id="gpt-5")

# OpenAI with Responses API
openai_responses_client = OpenAIResponsesClient(model_id="gpt-5")

Pour obtenir des exemples d’API Réponses, consultez :

Mappage des fonctionnalités Single-Agent

Cette section mappe les fonctionnalités d’un seul agent entre AutoGen et Agent Framework. Avec un client en place, créez un agent, attachez des outils et choisissez entre l’exécution en continu et la diffusion en continu.

Création et exécution de l’agent de base

Une fois que vous avez configuré un client de modèle, l’étape suivante consiste à créer des agents. Les deux frameworks fournissent des abstractions d’agent similaires, mais avec des comportements et des options de configuration différents.

Assistant AutoGenAgent

from autogen_agentchat.agents import AssistantAgent

agent = AssistantAgent(
    name="assistant",
    model_client=client,
    system_message="You are a helpful assistant.",
    tools=[my_tool],
    max_tool_iterations=1  # Single-turn by default
)

# Execution
result = await agent.run(task="What's the weather?")

Agent Framework ChatAgent

from agent_framework import ChatAgent, ai_function
from agent_framework.openai import OpenAIChatClient

# Create simple tools for the example
@ai_function
def get_weather(location: str) -> str:
    """Get weather for a location."""
    return f"Weather in {location}: sunny"

@ai_function
def get_time() -> str:
    """Get current time."""
    return "Current time: 2:30 PM"

# Create client
client = OpenAIChatClient(model_id="gpt-5")

async def example():
    # Direct creation
    agent = ChatAgent(
        name="assistant",
        chat_client=client,
        instructions="You are a helpful assistant.",
        tools=[get_weather]  # Multi-turn by default
    )

    # Factory method (more convenient)
    agent = client.create_agent(
        name="assistant",
        instructions="You are a helpful assistant.",
        tools=[get_weather]
    )

    # Execution with runtime tool configuration
    result = await agent.run(
        "What's the weather?",
        tools=[get_time],  # Can add tools at runtime
        tool_choice="auto"
    )

Principales différences :

  • Comportement par défaut : ChatAgent effectue automatiquement une itération par le biais d’appels d’outils, tandis que AssistantAgent nécessite un paramètre explicite max_tool_iterations
  • Configuration du runtime : ChatAgent.run() accepte tools et tool_choice paramètres pour la personnalisation par appel
  • Méthodes de fabrique : Agent Framework fournit des méthodes de fabrique pratiques directement à partir de clients de conversation
  • Gestion de l’état : ChatAgent sans état et ne conserve pas l’historique des conversations entre les appels, contrairement à AssistantAgent ce qui maintient l’historique des conversations dans le cadre de son état

Gestion de l’état de conversation avec AgentThread

Pour poursuivre les conversations avec ChatAgent, utilisez cette opération AgentThread pour gérer l’historique des conversations :

# Assume we have an agent from previous examples
async def conversation_example():
    # Create a new thread that will be reused
    thread = agent.get_new_thread()

    # First interaction - thread is empty
    result1 = await agent.run("What's 2+2?", thread=thread)
    print(result1.text)  # "4"

    # Continue conversation - thread contains previous messages
    result2 = await agent.run("What about that number times 10?", thread=thread)
    print(result2.text)  # "40" (understands "that number" refers to 4)

    # AgentThread can use external storage, similar to ChatCompletionContext in AutoGen

Sans état par défaut : démonstration rapide

# Without a thread (two independent invocations)
r1 = await agent.run("What's 2+2?")
print(r1.text)  # for example, "4"

r2 = await agent.run("What about that number times 10?")
print(r2.text)  # Likely ambiguous without prior context; cannot be "40"

# With a thread (shared context across calls)
thread = agent.get_new_thread()
print((await agent.run("What's 2+2?", thread=thread)).text)  # "4"
print((await agent.run("What about that number times 10?", thread=thread)).text)  # "40"

Pour obtenir des exemples de gestion des threads, consultez :

Équivalence de l’agent De l’Assistant OpenAI

Les deux frameworks fournissent l’intégration de l’API Assistant OpenAI :

# AutoGen OpenAIAssistantAgent
from autogen_ext.agents.openai import OpenAIAssistantAgent
# Agent Framework has OpenAI Assistants support via OpenAIAssistantsClient
from agent_framework.openai import OpenAIAssistantsClient

Pour obtenir des exemples d’Assistant OpenAI, consultez :

Prise en charge de la diffusion en continu

Les deux frameworks diffusent des jetons en temps réel, des clients et des agents, pour maintenir la réactivité des interfaces utilisateur.

Diffusion automatique en continu

# Model client streaming
async for chunk in client.create_stream(messages):
    if isinstance(chunk, str):
        print(chunk, end="")

# Agent streaming
async for event in agent.run_stream(task="Hello"):
    if isinstance(event, ModelClientStreamingChunkEvent):
        print(event.content, end="")
    elif isinstance(event, TaskResult):
        print("Final result received")

Streaming d’Agent Framework

# Assume we have client, agent, and tools from previous examples
async def streaming_example():
    # Chat client streaming
    async for chunk in client.get_streaming_response("Hello", tools=tools):
        if chunk.text:
            print(chunk.text, end="")

    # Agent streaming
    async for chunk in agent.run_stream("Hello"):
        if chunk.text:
            print(chunk.text, end="", flush=True)

Conseil : Dans Agent Framework, les clients et les agents produisent la même forme de mise à jour ; vous pouvez lire chunk.text dans les deux cas.

Types de messages et création

Comprendre le fonctionnement des messages est essentiel pour la communication efficace de l’agent. Les deux frameworks fournissent différentes approches pour la création et la gestion des messages, avec AutoGen à l’aide de classes de message distinctes et de l’infrastructure agent à l’aide d’un système de messages unifié.

Types de messages AutoGen

from autogen_agentchat.messages import TextMessage, MultiModalMessage
from autogen_core.models import UserMessage

# Text message
text_msg = TextMessage(content="Hello", source="user")

# Multi-modal message
multi_modal_msg = MultiModalMessage(
    content=["Describe this image", image_data],
    source="user"
)

# Convert to model format for use with model clients
user_message = text_msg.to_model_message()

Types de messages Agent Framework

from agent_framework import ChatMessage, TextContent, DataContent, UriContent, Role
import base64

# Text message
text_msg = ChatMessage(role=Role.USER, text="Hello")

# Supply real image bytes, or use a data: URI/URL via UriContent
image_bytes = b"<your_image_bytes>"
image_b64 = base64.b64encode(image_bytes).decode()
image_uri = f"data:image/jpeg;base64,{image_b64}"

# Multi-modal message with mixed content
multi_modal_msg = ChatMessage(
    role=Role.USER,
    contents=[
        TextContent(text="Describe this image"),
        DataContent(uri=image_uri, media_type="image/jpeg")
    ]
)

Principales différences :

  • AutoGen utilise des classes de message distinctes (TextMessage, MultiModalMessage) avec un source champ
  • Agent Framework utilise un objet de contenu unifié ChatMessage avec des objets de contenu typés et un role champ
  • Les messages Agent Framework utilisent Role l’énumération (USER, ASSISTANT, SYSTEM, TOOL) au lieu de sources de chaînes

Création et intégration d’outils

Les outils étendent les fonctionnalités de l’agent au-delà de la génération de texte. Les frameworks prennent différentes approches de la création d’outils, avec Agent Framework fournissant une génération de schéma plus automatisée.

AutoGen FunctionTool

from autogen_core.tools import FunctionTool

async def get_weather(location: str) -> str:
    """Get weather for a location."""
    return f"Weather in {location}: sunny"

# Manual tool creation
tool = FunctionTool(
    func=get_weather,
    description="Get weather information"
)

# Use with agent
agent = AssistantAgent(name="assistant", model_client=client, tools=[tool])

Infrastructure de l’agent @ai_function

from agent_framework import ai_function
from typing import Annotated
from pydantic import Field

@ai_function
def get_weather(
    location: Annotated[str, Field(description="The location to get weather for")]
) -> str:
    """Get weather for a location."""
    return f"Weather in {location}: sunny"

# Direct use with agent (automatic conversion)
agent = ChatAgent(name="assistant", chat_client=client, tools=[get_weather])

Pour obtenir des exemples détaillés, consultez :

Outils hébergés (Agent Framework exclusif)

Agent Framework fournit des outils hébergés qui ne sont pas disponibles dans AutoGen :

from agent_framework import ChatAgent, HostedCodeInterpreterTool, HostedWebSearchTool
from agent_framework.azure import AzureOpenAIChatClient

# Azure OpenAI client with a model that supports hosted tools
client = AzureOpenAIChatClient(model_id="gpt-5")

# Code execution tool
code_tool = HostedCodeInterpreterTool()

# Web search tool
search_tool = HostedWebSearchTool()

agent = ChatAgent(
    name="researcher",
    chat_client=client,
    tools=[code_tool, search_tool]
)

Pour obtenir des exemples détaillés, consultez :

Exigences et mises en garde :

  • Les outils hébergés sont disponibles uniquement sur les modèles/comptes qui les prennent en charge. Vérifiez les droits d’utilisation et la prise en charge du modèle pour votre fournisseur avant d’activer ces outils.
  • La configuration diffère selon le fournisseur ; suivez les conditions préalables de chaque exemple pour l’installation et les autorisations.
  • Chaque modèle ne prend pas en charge chaque outil hébergé (par exemple, recherche web et interpréteur de code). Choisissez un modèle compatible dans votre environnement.

Note

AutoGen prend en charge les outils d’exécution de code local, mais cette fonctionnalité est prévue pour les futures versions d’Agent Framework.

Principale différence : Agent Framework gère automatiquement l’itération de l’outil au niveau de l’agent. Contrairement au paramètre d’AutoGen, les agents Agent Framework continuent l’exécution de max_tool_iterations l’outil jusqu’à l’achèvement par défaut, avec des mécanismes de sécurité intégrés pour empêcher les boucles infinies.

Prise en charge du serveur MCP

Pour l’intégration avancée des outils, les deux frameworks prennent en charge le protocole MCP (Model Context Protocol), ce qui permet aux agents d’interagir avec des services externes et des sources de données. Agent Framework fournit une prise en charge intégrée plus complète.

Prise en charge d’AutoGen MCP

AutoGen prend en charge mcP de base par le biais d’extensions (les détails d’implémentation spécifiques varient selon la version).

Prise en charge de MCP de l’Infrastructure de l’agent

from agent_framework import ChatAgent, MCPStdioTool, MCPStreamableHTTPTool, MCPWebsocketTool
from agent_framework.openai import OpenAIChatClient

# Create client for the example
client = OpenAIChatClient(model_id="gpt-5")

# Stdio MCP server
mcp_tool = MCPStdioTool(
    name="filesystem",
    command="uvx mcp-server-filesystem",
    args=["/allowed/directory"]
)

# HTTP streaming MCP
http_mcp = MCPStreamableHTTPTool(
    name="http_mcp",
    url="http://localhost:8000/sse"
)

# WebSocket MCP
ws_mcp = MCPWebsocketTool(
    name="websocket_mcp",
    url="ws://localhost:8000/ws"
)

agent = ChatAgent(name="assistant", chat_client=client, tools=[mcp_tool])

Pour obtenir des exemples MCP, consultez :

Modèle d’agent en tant qu’outil

Un modèle puissant consiste à utiliser des agents eux-mêmes en tant qu’outils, ce qui permet des architectures d’agent hiérarchiques. Les deux frameworks prennent en charge ce modèle avec différentes implémentations.

AutoGen AgentTool

from autogen_agentchat.tools import AgentTool

# Create specialized agent
writer = AssistantAgent(
    name="writer",
    model_client=client,
    system_message="You are a creative writer."
)

# Wrap as tool
writer_tool = AgentTool(agent=writer)

# Use in coordinator (requires disabling parallel tool calls)
coordinator_client = OpenAIChatCompletionClient(
    model="gpt-5",
    parallel_tool_calls=False
)
coordinator = AssistantAgent(
    name="coordinator",
    model_client=coordinator_client,
    tools=[writer_tool]
)

Agent Framework as_tool()

from agent_framework import ChatAgent

# Assume we have client from previous examples
# Create specialized agent
writer = ChatAgent(
    name="writer",
    chat_client=client,
    instructions="You are a creative writer."
)

# Convert to tool
writer_tool = writer.as_tool(
    name="creative_writer",
    description="Generate creative content",
    arg_name="request",
    arg_description="What to write"
)

# Use in coordinator
coordinator = ChatAgent(
    name="coordinator",
    chat_client=client,
    tools=[writer_tool]
)

Remarque de migration explicite : Dans AutoGen, définissez parallel_tool_calls=False sur le client de modèle du coordinateur lors de l’habillage des agents en tant qu’outils pour éviter les problèmes de concurrence lors de l’appel de la même instance d’agent. Dans Agent Framework, as_tool() ne nécessite pas de désactivation des appels d’outils parallèles, car les agents sont sans état par défaut.

Intergiciel (fonctionnalité Agent Framework)

Agent Framework introduit des fonctionnalités d’intergiciel qui manquent à AutoGen. L’intergiciel permet de puissantes préoccupations croisées telles que la journalisation, la sécurité et la surveillance des performances.

from agent_framework import ChatAgent, AgentRunContext, FunctionInvocationContext
from typing import Callable, Awaitable

# Assume we have client from previous examples
async def logging_middleware(
    context: AgentRunContext,
    next: Callable[[AgentRunContext], Awaitable[None]]
) -> None:
    print(f"Agent {context.agent.name} starting")
    await next(context)
    print(f"Agent {context.agent.name} completed")

async def security_middleware(
    context: FunctionInvocationContext,
    next: Callable[[FunctionInvocationContext], Awaitable[None]]
) -> None:
    if "password" in str(context.arguments):
        print("Blocking function call with sensitive data")
        return  # Don't call next()
    await next(context)

agent = ChatAgent(
    name="secure_agent",
    chat_client=client,
    middleware=[logging_middleware, security_middleware]
)

Avantages :

  • Sécurité : validation d’entrée et filtrage de contenu
  • Observabilité : journalisation, métriques et suivi
  • Performances : mise en cache et limitation du débit
  • Gestion des erreurs : dégradation normale et logique de nouvelle tentative

Pour obtenir des exemples détaillés d’intergiciels, consultez :

Agents personnalisés

Parfois, vous ne souhaitez pas d’agent sauvegardé par modèle , vous souhaitez un agent déterministe ou soutenu par l’API avec une logique personnalisée. Les deux frameworks prennent en charge la création d’agents personnalisés, mais les modèles diffèrent.

AutoGen : Sous-classe BaseChatAgent

from typing import Sequence
from autogen_agentchat.agents import BaseChatAgent
from autogen_agentchat.base import Response
from autogen_agentchat.messages import BaseChatMessage, TextMessage, StopMessage
from autogen_core import CancellationToken

class StaticAgent(BaseChatAgent):
    def __init__(self, name: str = "static", description: str = "Static responder") -> None:
        super().__init__(name, description)

    @property
    def produced_message_types(self) -> Sequence[type[BaseChatMessage]]:  # Which message types this agent produces
        return (TextMessage,)

    async def on_messages(self, messages: Sequence[BaseChatMessage], cancellation_token: CancellationToken) -> Response:
        # Always return a static response
        return Response(chat_message=TextMessage(content="Hello from AutoGen custom agent", source=self.name))

Notes:

  • Implémentez et retournez on_messages(...) un Response message de conversation.
  • Implémentez on_reset(...) éventuellement pour effacer l’état interne entre les exécutions.

Infrastructure de l’agent : Étendre BaseAgent (prenant en charge les threads)

from collections.abc import AsyncIterable
from typing import Any
from agent_framework import (
    AgentRunResponse,
    AgentRunResponseUpdate,
    AgentThread,
    BaseAgent,
    ChatMessage,
    Role,
    TextContent,
)

class StaticAgent(BaseAgent):
    async def run(
        self,
        messages: str | ChatMessage | list[str] | list[ChatMessage] | None = None,
        *,
        thread: AgentThread | None = None,
        **kwargs: Any,
    ) -> AgentRunResponse:
        # Build a static reply
        reply = ChatMessage(role=Role.ASSISTANT, contents=[TextContent(text="Hello from AF custom agent")])

        # Persist conversation to the provided AgentThread (if any)
        if thread is not None:
            normalized = self._normalize_messages(messages)
            await self._notify_thread_of_new_messages(thread, normalized, reply)

        return AgentRunResponse(messages=[reply])

    async def run_stream(
        self,
        messages: str | ChatMessage | list[str] | list[ChatMessage] | None = None,
        *,
        thread: AgentThread | None = None,
        **kwargs: Any,
    ) -> AsyncIterable[AgentRunResponseUpdate]:
        # Stream the same static response in a single chunk for simplicity
        yield AgentRunResponseUpdate(contents=[TextContent(text="Hello from AF custom agent")], role=Role.ASSISTANT)

        # Notify thread of input and the complete response once streaming ends
        if thread is not None:
            reply = ChatMessage(role=Role.ASSISTANT, contents=[TextContent(text="Hello from AF custom agent")])
            normalized = self._normalize_messages(messages)
            await self._notify_thread_of_new_messages(thread, normalized, reply)

Notes:

  • AgentThreadgère l’état de conversation en externe ; utilisez agent.get_new_thread() et passez-le àrun/run_stream .
  • Appelez self._notify_thread_of_new_messages(thread, input_messages, response_messages) donc le thread a les deux côtés de l’échange.
  • Consultez l’exemple complet : Agent personnalisé

Examinons ensuite l’orchestration multi-agent , la zone où les frameworks diffèrent le plus.

Mappage de fonctionnalités multi-agent

Vue d’ensemble du modèle de programmation

Les modèles de programmation multi-agent représentent la différence la plus significative entre les deux frameworks.

Approche du double modèle d’AutoGen

AutoGen fournit deux modèles de programmation :

  1. autogen-core: Programmation RoutedAgent pilotée par les événements avec et abonnements aux messages de bas niveau
  2. Team abstraction : Modèle centré sur l’exécution de haut niveau basé sur autogen-core
# Low-level autogen-core (complex)
class MyAgent(RoutedAgent):
    @message_handler
    async def handle_message(self, message: TextMessage, ctx: MessageContext) -> None:
        # Handle specific message types
        pass

# High-level Team (easier but limited)
team = RoundRobinGroupChat(
    participants=[agent1, agent2],
    termination_condition=StopAfterNMessages(5)
)
result = await team.run(task="Collaborate on this task")

Défis:

  • Le modèle de bas niveau est trop complexe pour la plupart des utilisateurs
  • Le modèle de haut niveau peut devenir limité pour les comportements complexes
  • Le pontage entre les deux modèles ajoute la complexité de l’implémentation

Modèle de flux de travail unifié d’Agent Framework

Agent Framework fournit une abstraction unique Workflow qui combine le meilleur des deux approches :

from agent_framework import WorkflowBuilder, executor, WorkflowContext
from typing_extensions import Never

# Assume we have agent1 and agent2 from previous examples
@executor(id="agent1")
async def agent1_executor(input_msg: str, ctx: WorkflowContext[str]) -> None:
    response = await agent1.run(input_msg)
    await ctx.send_message(response.text)

@executor(id="agent2")
async def agent2_executor(input_msg: str, ctx: WorkflowContext[Never, str]) -> None:
    response = await agent2.run(input_msg)
    await ctx.yield_output(response.text)  # Final output

# Build typed data flow graph
workflow = (WorkflowBuilder()
           .add_edge(agent1_executor, agent2_executor)
           .set_start_executor(agent1_executor)
           .build())

# Example usage (would be in async context)
# result = await workflow.run("Initial input")

Pour obtenir des exemples de flux de travail détaillés, consultez :

Avantages :

  • Modèle unifié : abstraction unique pour tous les niveaux de complexité
  • Sécurité du type : entrées et sorties fortement typées
  • Visualisation du graphique : Effacer la représentation du flux de données
  • Composition flexible : Combiner des agents, des fonctions et des sous-workflows

Flux de travail et GraphFlow

L’abstraction de Workflow Agent Framework est inspirée par la fonctionnalité expérimentale GraphFlow d’AutoGen, mais représente une évolution significative de la philosophie de conception :

  • GraphFlow : flux de contrôle basé sur lequel les arêtes sont des transitions et les messages sont diffusés vers tous les agents ; les transitions sont conditionnées sur le contenu du message diffusé
  • Flux de travail : flux de données basé sur lequel les messages sont routés via des arêtes et des exécuteurs spécifiques sont activés par des arêtes, avec prise en charge de l’exécution simultanée.

Vue d’ensemble du visuel

Le diagramme ci-dessous contraste le flux de contrôle GraphFlow (gauche) de AutoGen avec le flux de données de l’Infrastructure agent (à droite). GraphFlow modélise des agents en tant que nœuds avec des transitions conditionnelles et des diffusions. Exécuteurs de modèles de flux de travail (agents, fonctions ou sous-flux de travail) connectés par des arêtes typées ; il prend également en charge les pauses de demande/réponse et le point de contrôle.

flowchart LR

  subgraph AutoGenGraphFlow
    direction TB
    U[User / Task] --> A[Agent A]
    A -->|success| B[Agent B]
    A -->|retry| C[Agent C]
    A -. broadcast .- B
    A -. broadcast .- C
  end

  subgraph AgentFrameworkWorkflow
    direction TB
    I[Input] --> E1[Executor 1]
    E1 -->|"str"| E2[Executor 2]
    E1 -->|"image"| E3[Executor 3]
    E3 -->|"str"| E2
    E2 --> OUT[(Final Output)]
  end

  R[Request / Response Gate]
  E2 -. request .-> R
  R -. resume .-> E2

  CP[Checkpoint]
  E1 -. save .-> CP
  CP -. load .-> E1

Pratiquement:

  • GraphFlow utilise des agents en tant que nœuds et diffuse des messages ; les arêtes représentent des transitions conditionnelles.
  • Les itinéraires de flux de travail ont tapé des messages le long des bords. Les nœuds (exécuteurs) peuvent être des agents, des fonctions pures ou des sous-workflows.
  • La demande/réponse permet à un flux de travail de suspendre l’entrée externe ; le point de contrôle conserve la progression et active la reprise.

Comparaison de code

1) Séquentiel + Conditionnel
# AutoGen GraphFlow (fluent builder) — writer → reviewer → editor (conditional)
from autogen_agentchat.agents import AssistantAgent
from autogen_agentchat.teams import DiGraphBuilder, GraphFlow

writer = AssistantAgent(name="writer", description="Writes a draft", model_client=client)
reviewer = AssistantAgent(name="reviewer", description="Reviews the draft", model_client=client)
editor = AssistantAgent(name="editor", description="Finalizes the draft", model_client=client)

graph = (
    DiGraphBuilder()
    .add_node(writer).add_node(reviewer).add_node(editor)
    .add_edge(writer, reviewer)  # always
    .add_edge(reviewer, editor, condition=lambda msg: "approve" in msg.to_model_text())
    .set_entry_point(writer)
).build()

team = GraphFlow(participants=[writer, reviewer, editor], graph=graph)
result = await team.run(task="Draft a short paragraph about solar power")
# Agent Framework Workflow — sequential executors with conditional logic
from agent_framework import WorkflowBuilder, executor, WorkflowContext
from typing_extensions import Never

@executor(id="writer")
async def writer_exec(task: str, ctx: WorkflowContext[str]) -> None:
    await ctx.send_message(f"Draft: {task}")

@executor(id="reviewer")
async def reviewer_exec(draft: str, ctx: WorkflowContext[str]) -> None:
    decision = "approve" if "solar" in draft.lower() else "revise"
    await ctx.send_message(f"{decision}:{draft}")

@executor(id="editor")
async def editor_exec(msg: str, ctx: WorkflowContext[Never, str]) -> None:
    if msg.startswith("approve:"):
        await ctx.yield_output(msg.split(":", 1)[1])
    else:
        await ctx.yield_output("Needs revision")

workflow_seq = (
    WorkflowBuilder()
    .add_edge(writer_exec, reviewer_exec)
    .add_edge(reviewer_exec, editor_exec)
    .set_start_executor(writer_exec)
    .build()
)
2) Fan-out + Join (ALL vs ANY)
# AutoGen GraphFlow — A → (B, C) → D with ALL/ANY join
from autogen_agentchat.teams import DiGraphBuilder, GraphFlow
A, B, C, D = agent_a, agent_b, agent_c, agent_d

# ALL (default): D runs after both B and C
g_all = (
    DiGraphBuilder()
    .add_node(A).add_node(B).add_node(C).add_node(D)
    .add_edge(A, B).add_edge(A, C)
    .add_edge(B, D).add_edge(C, D)
    .set_entry_point(A)
).build()

# ANY: D runs when either B or C completes
g_any = (
    DiGraphBuilder()
    .add_node(A).add_node(B).add_node(C).add_node(D)
    .add_edge(A, B).add_edge(A, C)
    .add_edge(B, D, activation_group="join_d", activation_condition="any")
    .add_edge(C, D, activation_group="join_d", activation_condition="any")
    .set_entry_point(A)
).build()
# Agent Framework Workflow — A → (B, C) → aggregator (ALL vs ANY)
from agent_framework import WorkflowBuilder, executor, WorkflowContext
from typing_extensions import Never

@executor(id="A")
async def start(task: str, ctx: WorkflowContext[str]) -> None:
    await ctx.send_message(f"B:{task}", target_id="B")
    await ctx.send_message(f"C:{task}", target_id="C")

@executor(id="B")
async def branch_b(text: str, ctx: WorkflowContext[str]) -> None:
    await ctx.send_message(f"B_done:{text}")

@executor(id="C")
async def branch_c(text: str, ctx: WorkflowContext[str]) -> None:
    await ctx.send_message(f"C_done:{text}")

@executor(id="join_any")
async def join_any(msg: str, ctx: WorkflowContext[Never, str]) -> None:
    await ctx.yield_output(f"First: {msg}")  # ANY join (first arrival)

@executor(id="join_all")
async def join_all(msg: str, ctx: WorkflowContext[str, str]) -> None:
    state = await ctx.get_executor_state() or {"items": []}
    state["items"].append(msg)
    await ctx.set_executor_state(state)
    if len(state["items"]) >= 2:
        await ctx.yield_output(" | ".join(state["items"]))  # ALL join

wf_any = (
    WorkflowBuilder()
    .add_edge(start, branch_b).add_edge(start, branch_c)
    .add_edge(branch_b, join_any).add_edge(branch_c, join_any)
    .set_start_executor(start)
    .build()
)

wf_all = (
    WorkflowBuilder()
    .add_edge(start, branch_b).add_edge(start, branch_c)
    .add_edge(branch_b, join_all).add_edge(branch_c, join_all)
    .set_start_executor(start)
    .build()
)
3) Routage ciblé (aucune diffusion)
from agent_framework import WorkflowBuilder, executor, WorkflowContext
from typing_extensions import Never

@executor(id="ingest")
async def ingest(task: str, ctx: WorkflowContext[str]) -> None:
    # Route selectively using target_id
    if task.startswith("image:"):
        await ctx.send_message(task.removeprefix("image:"), target_id="vision")
    else:
        await ctx.send_message(task, target_id="writer")

@executor(id="writer")
async def write(text: str, ctx: WorkflowContext[Never, str]) -> None:
    await ctx.yield_output(f"Draft: {text}")

@executor(id="vision")
async def caption(image_ref: str, ctx: WorkflowContext[Never, str]) -> None:
    await ctx.yield_output(f"Caption: {image_ref}")

workflow = (
    WorkflowBuilder()
    .add_edge(ingest, write)
    .add_edge(ingest, caption)
    .set_start_executor(ingest)
    .build()
)

# Example usage (async):
# await workflow.run("Summarize the benefits of solar power")
# await workflow.run("image:https://example.com/panel.jpg")

Remarques :

  • GraphFlow diffuse des messages et utilise des transitions conditionnelles. Le comportement de jointure est configuré via le côté activation cible et par périphérie activation_group/activation_condition(par exemple, regrouper les deux bords avec ).join_dactivation_condition="any"
  • Flux de travail achemine explicitement les données ; permet target_id de sélectionner des exécuteurs en aval. Le comportement de jointure réside dans l’exécuteur de réception (par exemple, rendement sur la première entrée et attendre tout), ou via les générateurs/agrégateurs d’orchestration.
  • Les exécuteurs du flux de travail sont sous forme libre : encapsuler une ChatAgentfonction ou un sous-flux de travail et les mélanger dans le même graphique.

Principales différences

Le tableau ci-dessous résume les différences fondamentales entre le flux de travail GraphFlow d’AutoGen et le flux de travail de l’infrastructure d’agent :

Aspect AutoGen GraphFlow Flux de travail Agent Framework
Type de flux Flux de contrôle (les bords sont des transitions) Flux de données (edges route messages)
Types de nœuds Agents uniquement Agents, fonctions, sous-flux de travail
Activation Diffusion de messages Activation basée sur la périphérie
Sécurité des types Limité Frappe forte tout au long de
Composabilité Limité Hautement composable

Modèles d’imbrication

Imbrication d’équipe AutoGen

# Inner team
inner_team = RoundRobinGroupChat(
    participants=[specialist1, specialist2],
    termination_condition=StopAfterNMessages(3)
)

# Outer team with nested team as participant
outer_team = RoundRobinGroupChat(
    participants=[coordinator, inner_team, reviewer],  # Team as participant
    termination_condition=StopAfterNMessages(10)
)

# Messages are broadcasted to all participants including nested team
result = await outer_team.run("Complex task requiring collaboration")

Caractéristiques d’imbrication automatique :

  • L’équipe imbriquée reçoit tous les messages de l’équipe externe
  • Les messages d’équipe imbriqués sont diffusés à tous les participants de l’équipe externe
  • Contexte de message partagé entre tous les niveaux

Imbrication du flux de travail Agent Framework

from agent_framework import WorkflowExecutor, WorkflowBuilder

# Assume we have executors from previous examples
# specialist1_executor, specialist2_executor, coordinator_executor, reviewer_executor

# Create sub-workflow
sub_workflow = (WorkflowBuilder()
               .add_edge(specialist1_executor, specialist2_executor)
               .set_start_executor(specialist1_executor)
               .build())

# Wrap as executor
sub_workflow_executor = WorkflowExecutor(
    workflow=sub_workflow,
    id="sub_process"
)

# Use in parent workflow
parent_workflow = (WorkflowBuilder()
                  .add_edge(coordinator_executor, sub_workflow_executor)
                  .add_edge(sub_workflow_executor, reviewer_executor)
                  .set_start_executor(coordinator_executor)
                  .build())

Caractéristiques d’imbrication d’Agent Framework :

  • Entrée/sortie isolée via WorkflowExecutor
  • Aucune diffusion de messages - flux de données via des connexions spécifiques
  • Gestion d’état indépendante pour chaque niveau de flux de travail

Modèles de conversation de groupe

Les modèles de conversation de groupe permettent à plusieurs agents de collaborer sur des tâches complexes. Voici comment les modèles courants se traduisent entre les frameworks.

RoundRobinGroupChat Pattern

Implémentation AutoGen :

from autogen_agentchat.teams import RoundRobinGroupChat
from autogen_agentchat.conditions import StopAfterNMessages

team = RoundRobinGroupChat(
    participants=[agent1, agent2, agent3],
    termination_condition=StopAfterNMessages(10)
)
result = await team.run("Discuss this topic")

Implémentation de l’infrastructure de l’agent :

from agent_framework import SequentialBuilder, WorkflowOutputEvent

# Assume we have agent1, agent2, agent3 from previous examples
# Sequential workflow through participants
workflow = SequentialBuilder().participants([agent1, agent2, agent3]).build()

# Example usage (would be in async context)
async def sequential_example():
    # Each agent appends to shared conversation
    async for event in workflow.run_stream("Discuss this topic"):
        if isinstance(event, WorkflowOutputEvent):
            conversation_history = event.data  # list[ChatMessage]

Pour obtenir des exemples d’orchestration détaillés, consultez :

Pour les modèles d’exécution simultanés, Agent Framework fournit également les éléments suivants :

from agent_framework import ConcurrentBuilder, WorkflowOutputEvent

# Assume we have agent1, agent2, agent3 from previous examples
# Concurrent workflow for parallel processing
workflow = (ConcurrentBuilder()
           .participants([agent1, agent2, agent3])
           .build())

# Example usage (would be in async context)
async def concurrent_example():
    # All agents process the input concurrently
    async for event in workflow.run_stream("Process this in parallel"):
        if isinstance(event, WorkflowOutputEvent):
            results = event.data  # Combined results from all agents

Pour obtenir des exemples d’exécution simultanées, consultez :

Modèle MagenticOneGroupChat

Implémentation AutoGen :

from autogen_agentchat.teams import MagenticOneGroupChat

team = MagenticOneGroupChat(
    participants=[researcher, coder, executor],
    model_client=coordinator_client,
    termination_condition=StopAfterNMessages(20)
)
result = await team.run("Complex research and analysis task")

Implémentation de l’infrastructure de l’agent :

from typing import cast
from agent_framework import (
    MAGENTIC_EVENT_TYPE_AGENT_DELTA,
    MAGENTIC_EVENT_TYPE_ORCHESTRATOR,
    AgentRunUpdateEvent,
    ChatAgent,
    ChatMessage,
    MagenticBuilder,
    WorkflowOutputEvent,
)
from agent_framework.openai import OpenAIChatClient

# Create a manager agent for orchestration
manager_agent = ChatAgent(
    name="MagenticManager",
    description="Orchestrator that coordinates the workflow",
    instructions="You coordinate a team to complete complex tasks efficiently.",
    chat_client=OpenAIChatClient(),
)

workflow = (
    MagenticBuilder()
    .participants(researcher=researcher, coder=coder)
    .with_standard_manager(
        agent=manager_agent,
        max_round_count=20,
        max_stall_count=3,
        max_reset_count=2,
    )
    .build()
)

# Example usage (would be in async context)
async def magentic_example():
    output: str | None = None
    async for event in workflow.run_stream("Complex research task"):
        if isinstance(event, AgentRunUpdateEvent):
            props = event.data.additional_properties if event.data else None
            event_type = props.get("magentic_event_type") if props else None

            if event_type == MAGENTIC_EVENT_TYPE_ORCHESTRATOR:
                text = event.data.text if event.data else ""
                print(f"[ORCHESTRATOR]: {text}")
            elif event_type == MAGENTIC_EVENT_TYPE_AGENT_DELTA:
                agent_id = props.get("agent_id", event.executor_id) if props else event.executor_id
                if event.data and event.data.text:
                    print(f"[{agent_id}]: {event.data.text}", end="")

        elif isinstance(event, WorkflowOutputEvent):
            output_messages = cast(list[ChatMessage], event.data)
            if output_messages:
                output = output_messages[-1].text

Options de personnalisation de l’infrastructure agent :

Le flux de travail Magentic fournit des options de personnalisation étendues :

  • Configuration du gestionnaire : utiliser un ChatAgent avec des instructions personnalisées et des paramètres de modèle
  • Limites arrondies : max_round_count, max_stall_count, max_reset_count
  • Diffusion en continu d’événements : utiliser AgentRunUpdateEvent avec des magentic_event_type métadonnées
  • Spécialisation de l’agent : instructions et outils personnalisés par agent
  • Human-in-the-loop : Plan review, tool approval, and stall intervention
# Advanced customization example with human-in-the-loop
from typing import cast
from agent_framework import (
    MAGENTIC_EVENT_TYPE_AGENT_DELTA,
    MAGENTIC_EVENT_TYPE_ORCHESTRATOR,
    AgentRunUpdateEvent,
    ChatAgent,
    MagenticBuilder,
    MagenticHumanInterventionDecision,
    MagenticHumanInterventionKind,
    MagenticHumanInterventionReply,
    MagenticHumanInterventionRequest,
    RequestInfoEvent,
    WorkflowOutputEvent,
)
from agent_framework.openai import OpenAIChatClient

# Create manager agent with custom configuration
manager_agent = ChatAgent(
    name="MagenticManager",
    description="Orchestrator for complex tasks",
    instructions="Custom orchestration instructions...",
    chat_client=OpenAIChatClient(model_id="gpt-4o"),
)

workflow = (
    MagenticBuilder()
    .participants(
        researcher=researcher_agent,
        coder=coder_agent,
        analyst=analyst_agent,
    )
    .with_standard_manager(
        agent=manager_agent,
        max_round_count=15,      # Limit total rounds
        max_stall_count=2,       # Trigger stall handling
        max_reset_count=1,       # Allow one reset on failure
    )
    .with_plan_review()           # Enable human plan review
    .with_human_input_on_stall()  # Enable human intervention on stalls
    .build()
)

# Handle human intervention requests during execution
async for event in workflow.run_stream("Complex task"):
    if isinstance(event, RequestInfoEvent) and event.request_type is MagenticHumanInterventionRequest:
        req = cast(MagenticHumanInterventionRequest, event.data)
        if req.kind == MagenticHumanInterventionKind.PLAN_REVIEW:
            # Review and approve the plan
            reply = MagenticHumanInterventionReply(
                decision=MagenticHumanInterventionDecision.APPROVE
            )
            async for ev in workflow.send_responses_streaming({event.request_id: reply}):
                pass  # Handle continuation

Pour obtenir des exemples Magentic détaillés, consultez :

Modèles futurs

La feuille de route agent Framework inclut plusieurs modèles AutoGen actuellement en cours de développement :

  • Modèle Swarm : coordination de l’agent basé sur les handoffs
  • SelectorGroupChat : sélection de l’orateur piloté par LLM

Human-in-the-Loop with Request Response

Une nouvelle fonctionnalité clé dans Workflow Agent Framework est le concept de demande et de réponse, qui permet aux flux de travail de suspendre l’exécution et d’attendre l’entrée externe avant de continuer. Cette fonctionnalité n’est pas présente dans l’abstraction d’AutoGen Team et permet des modèles de boucles humains sophistiqués.

Limitations de génération automatique

L’abstraction d’AutoGen Team s’exécute en continu une fois démarrée et ne fournit pas de mécanismes intégrés pour suspendre l’exécution pour l’entrée humaine. Toute fonctionnalité human-in-the-loop nécessite des implémentations personnalisées en dehors de l’infrastructure.

Agent Framework Request-Response API

Agent Framework fournit des fonctionnalités intégrées de demande-réponse où n’importe quel exécuteur peut envoyer des requêtes à l’aide ctx.request_info() et gérer des réponses avec le @response_handler décorateur.

from agent_framework import (
    RequestInfoEvent, WorkflowBuilder, WorkflowContext, 
    Executor, handler, response_handler
)
from dataclasses import dataclass

# Assume we have agent_executor defined elsewhere

# Define typed request payload
@dataclass
class ApprovalRequest:
    """Request human approval for agent output."""
    content: str = ""
    agent_name: str = ""

# Workflow executor that requests human approval
class ReviewerExecutor(Executor):
    
    @handler
    async def review_content(
        self,
        agent_response: str,
        ctx: WorkflowContext
    ) -> None:
        # Request human input with structured data
        approval_request = ApprovalRequest(
            content=agent_response,
            agent_name="writer_agent"
        )
        await ctx.request_info(request_data=approval_request, response_type=str)
    
    @response_handler
    async def handle_approval_response(
        self,
        original_request: ApprovalRequest,
        decision: str,
        ctx: WorkflowContext
    ) -> None:
        decision_lower = decision.strip().lower()
        original_content = original_request.content

        if decision_lower == "approved":
            await ctx.yield_output(f"APPROVED: {original_content}")
        else:
            await ctx.yield_output(f"REVISION NEEDED: {decision}")

# Build workflow with human-in-the-loop
reviewer = ReviewerExecutor(id="reviewer")

workflow = (WorkflowBuilder()
           .add_edge(agent_executor, reviewer)
           .set_start_executor(agent_executor)
           .build())

Exécution de flux de travail human-in-the-loop

Agent Framework fournit des API de diffusion en continu pour gérer le cycle de pause-reprise :

from agent_framework import RequestInfoEvent, WorkflowOutputEvent

# Assume we have workflow defined from previous examples
async def run_with_human_input():
    pending_responses = None
    completed = False

    while not completed:
        # First iteration uses run_stream, subsequent use send_responses_streaming
        stream = (
            workflow.send_responses_streaming(pending_responses)
            if pending_responses
            else workflow.run_stream("initial input")
        )

        events = [event async for event in stream]
        pending_responses = None

        # Collect human requests and outputs
        for event in events:
            if isinstance(event, RequestInfoEvent):
                # Display request to human and collect response
                request_data = event.data  # ApprovalRequest instance
                print(f"Review needed: {request_data.content}")

                human_response = input("Enter 'approved' or revision notes: ")
                pending_responses = {event.request_id: human_response}

            elif isinstance(event, WorkflowOutputEvent):
                print(f"Final result: {event.data}")
                completed = True

Pour obtenir des exemples de flux de travail human-in-the-loop, consultez :

Points de contrôle et reprise des flux de travail

Un autre avantage clé de l’abstraction d’Agent WorkflowTeam Framework sur AutoGen est la prise en charge intégrée du point de contrôle et de la reprise de l’exécution. Cela permet aux flux de travail d’être suspendus, persistants et repris ultérieurement à partir de n’importe quel point de contrôle, en fournissant une tolérance de panne et en activant des flux de travail longs ou asynchrones.

Limitations de génération automatique

L’abstraction de Team AutoGen ne fournit pas de fonctionnalités de point de contrôle intégrées. Toutes les mécanismes de persistance ou de récupération doivent être implémentés en externe, nécessitant souvent une logique complexe de gestion de l’état et de sérialisation.

Point de contrôle de l’infrastructure de l’agent

Agent Framework fournit un point de contrôle complet et FileCheckpointStorage la with_checkpointing() méthode sur WorkflowBuilder. Capture des points de contrôle :

  • État de l’exécuteur : état local pour chaque exécuteur à l’aide de ctx.set_executor_state()
  • État partagé : état de l’exécuteur croisé à l’aide de ctx.set_shared_state()
  • Files d’attente de messages : messages en attente entre exécuteurs
  • Position du flux de travail : progression de l’exécution actuelle et étapes suivantes
from agent_framework import (
    FileCheckpointStorage, WorkflowBuilder, WorkflowContext,
    Executor, handler
)
from typing_extensions import Never

class ProcessingExecutor(Executor):
    @handler
    async def process(self, data: str, ctx: WorkflowContext[str]) -> None:
        # Process the data
        result = f"Processed: {data.upper()}"
        print(f"Processing: '{data}' -> '{result}'")

        # Persist executor-local state
        prev_state = await ctx.get_executor_state() or {}
        count = prev_state.get("count", 0) + 1
        await ctx.set_executor_state({
            "count": count,
            "last_input": data,
            "last_output": result
        })

        # Persist shared state for other executors
        await ctx.set_shared_state("original_input", data)
        await ctx.set_shared_state("processed_output", result)

        await ctx.send_message(result)

class FinalizeExecutor(Executor):
    @handler
    async def finalize(self, data: str, ctx: WorkflowContext[Never, str]) -> None:
        result = f"Final: {data}"
        await ctx.yield_output(result)

# Configure checkpoint storage
checkpoint_storage = FileCheckpointStorage(storage_path="./checkpoints")
processing_executor = ProcessingExecutor(id="processing")
finalize_executor = FinalizeExecutor(id="finalize")

# Build workflow with checkpointing enabled
workflow = (WorkflowBuilder()
           .add_edge(processing_executor, finalize_executor)
           .set_start_executor(processing_executor)
           .with_checkpointing(checkpoint_storage=checkpoint_storage)  # Enable checkpointing
           .build())

# Example usage (would be in async context)
async def checkpoint_example():
    # Run workflow - checkpoints are created automatically
    async for event in workflow.run_stream("input data"):
        print(f"Event: {event}")

Reprise à partir de points de contrôle

Agent Framework fournit des API pour répertorier, inspecter et reprendre à partir de points de contrôle spécifiques :

from typing_extensions import Never

from agent_framework import (
    Executor,
    FileCheckpointStorage,
    WorkflowContext,
    WorkflowBuilder,
    get_checkpoint_summary,
    handler,
)

class UpperCaseExecutor(Executor):
    @handler
    async def process(self, text: str, ctx: WorkflowContext[str]) -> None:
        result = text.upper()
        await ctx.send_message(result)

class ReverseExecutor(Executor):
    @handler
    async def process(self, text: str, ctx: WorkflowContext[Never, str]) -> None:
        result = text[::-1]
        await ctx.yield_output(result)

def create_workflow(checkpoint_storage: FileCheckpointStorage):
    """Create a workflow with two executors and checkpointing."""
    upper_executor = UpperCaseExecutor(id="upper")
    reverse_executor = ReverseExecutor(id="reverse")

    return (WorkflowBuilder()
           .add_edge(upper_executor, reverse_executor)
           .set_start_executor(upper_executor)
           .with_checkpointing(checkpoint_storage=checkpoint_storage)
           .build())

# Assume we have checkpoint_storage from previous examples
checkpoint_storage = FileCheckpointStorage(storage_path="./checkpoints")

async def checkpoint_resume_example():
    # List available checkpoints
    checkpoints = await checkpoint_storage.list_checkpoints()

    # Display checkpoint information
    for checkpoint in checkpoints:
        summary = get_checkpoint_summary(checkpoint)
        print(f"Checkpoint {summary.checkpoint_id}: iteration={summary.iteration_count}")

    # Resume from a specific checkpoint
    if checkpoints:
        chosen_checkpoint_id = checkpoints[0].checkpoint_id

        # Create new workflow instance and resume
        new_workflow = create_workflow(checkpoint_storage)
        async for event in new_workflow.run_stream(
            checkpoint_id=chosen_checkpoint_id,
            checkpoint_storage=checkpoint_storage
        ):
            print(f"Resumed event: {event}")

Fonctionnalités avancées de point de contrôle

Point de contrôle avec intégration de l’élément humain dans la boucle :

Le point de contrôle fonctionne en toute transparence avec les flux de travail en boucle, ce qui permet aux flux de travail d’être suspendus pour l’entrée humaine et repris ultérieurement. Lors de la reprise à partir d’un point de contrôle qui contient des demandes en attente, ces demandes sont réémises en tant qu’événements :

# Assume we have workflow, checkpoint_id, and checkpoint_storage from previous examples
async def resume_with_pending_requests_example():
    # Resume from checkpoint - pending requests will be re-emitted
    request_info_events = []
    async for event in workflow.run_stream(
        checkpoint_id=checkpoint_id,
        checkpoint_storage=checkpoint_storage
    ):
        if isinstance(event, RequestInfoEvent):
            request_info_events.append(event)

    # Handle re-emitted pending request
    responses = {}
    for event in request_info_events:
        response = handle_request(event.data)
        responses[event.request_id] = response

    # Send response back to workflow
    async for event in workflow.send_responses_streaming(responses):
        print(f"Event: {event}")

Principaux avantages

Par rapport à AutoGen, le point de contrôle d’Agent Framework fournit les éléments suivants :

  • Persistance automatique : aucune gestion manuelle de l’état requise
  • Récupération granulaire : Reprendre à partir de n’importe quelle limite de superstep
  • Isolation de l’état : état local et partagé distinct de l’exécuteur
  • Intégration de l’élément humain dans la boucle : reprise en pause transparente avec entrée humaine
  • Tolérance de panne : récupération robuste contre les défaillances ou interruptions

Exemples pratiques

Pour obtenir des exemples de points de contrôle complets, consultez :


Observability

AutoGen et Agent Framework offrent des fonctionnalités d’observabilité, mais avec différentes approches et fonctionnalités.

Observabilité automatique

AutoGen a une prise en charge native d’OpenTelemetry avec instrumentation pour :

  • Suivi du runtime : SingleThreadedAgentRuntime et GrpcWorkerAgentRuntime
  • Exécution de l’outil : BaseTool avec execute_tool des étendues de conventions sémantiques GenAI suivantes
  • Opérations de l’agent : BaseChatAgent avec create_agent et invoke_agent étendues
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from autogen_core import SingleThreadedAgentRuntime

# Configure OpenTelemetry
tracer_provider = TracerProvider()
trace.set_tracer_provider(tracer_provider)

# Pass to runtime
runtime = SingleThreadedAgentRuntime(tracer_provider=tracer_provider)

Observabilité de l’infrastructure de l’agent

Agent Framework fournit une observabilité complète par le biais de plusieurs approches :

  • Configuration de code zéro : instrumentation automatique via des variables d’environnement
  • Configuration manuelle : configuration programmatique avec des paramètres personnalisés
  • Données de télémétrie enrichies : Agents, flux de travail et suivi d’exécution d’outils
  • Sortie de la console : journalisation et visualisation intégrées de la console
from agent_framework import ChatAgent
from agent_framework.observability import setup_observability
from agent_framework.openai import OpenAIChatClient

# Zero-code setup via environment variables
# Set ENABLE_OTEL=true
# Set OTLP_ENDPOINT=http://localhost:4317

# Or manual setup
setup_observability(
    otlp_endpoint="http://localhost:4317"
)

# Create client for the example
client = OpenAIChatClient(model_id="gpt-5")

async def observability_example():
    # Observability is automatically applied to all agents and workflows
    agent = ChatAgent(name="assistant", chat_client=client)
    result = await agent.run("Hello")  # Automatically traced

Principales différences :

  • Complexité de l’installation : Agent Framework offre des options d’installation sans code plus simples
  • Étendue : Agent Framework offre une couverture plus large, notamment l’observabilité au niveau du flux de travail
  • Visualisation : Agent Framework inclut une sortie de console intégrée et une interface utilisateur de développement
  • Configuration : Agent Framework offre des options de configuration plus flexibles

Pour obtenir des exemples d’observabilité détaillés, consultez :


Conclusion

Ce guide de migration fournit un mappage complet entre AutoGen et Microsoft Agent Framework, couvrant tout, de la création de l’agent de base aux flux de travail multi-agents complexes. Points clés à prendre pour la migration :

  • La migration à agent unique est simple, avec des API similaires et des fonctionnalités améliorées dans Agent Framework
  • Les modèles multi-agents nécessitent de repenser votre approche des architectures basées sur des flux de données, mais si vous connaissez déjà GraphFlow, la transition sera plus facile.
  • Agent Framework offre des fonctionnalités supplémentaires telles que l’intergiciel, les outils hébergés et les flux de travail typés

Pour obtenir des exemples supplémentaires et des instructions détaillées sur l’implémentation, reportez-vous au répertoire d’exemples Agent Framework .

Exemples de catégories supplémentaires

L’infrastructure de l’agent fournit des exemples dans plusieurs autres domaines importants :

  • Threads : Exemples de threads - Gestion de l’état et du contexte de conversation
  • Entrée modale : exemples modals - Utilisation d’images et d’autres types de supports
  • Fournisseurs de contexte : exemples de fournisseurs de contexte - Modèles d’intégration de contexte externe

Étapes suivantes