Freigeben über


Migrationshandbuch für AutoGen zu Microsoft Agent Framework

Ein umfassendes Handbuch für die Migration von AutoGen zum Microsoft Agent Framework Python SDK.

Inhaltsverzeichnis

Hintergrund

AutoGen ist ein Framework zum Erstellen von KI-Agents und Multi-Agent-Systemen mit großen Sprachmodellen (LLMs). Es begann als Forschungsprojekt bei Microsoft Research und hat mehrere Konzepte in der Multi-Agent-Orchestrierung, z. B. GroupChat und ereignisgesteuerte Agent-Laufzeit, ins Leben gerufen. Das Projekt war eine fruchtbare Zusammenarbeit der Open-Source-Community, und viele wichtige Features stammen von externen Mitwirkenden.

Microsoft Agent Framework ist ein neues mehrsprachiges SDK zum Erstellen von KI-Agents und Workflows mithilfe von LLMs. Es stellt eine bedeutende Weiterentwicklung der Ideen dar, die in AutoGen pionieriert wurden, und beinhaltet Erkenntnisse aus der praxisnahen Nutzung. Es wird von den wichtigsten AutoGen- und Semantic Kernel-Teams bei Microsoft entwickelt und ist eine neue Grundlage für die zukünftige Entwicklung von KI-Anwendungen.

Dieser Leitfaden beschreibt einen praktischen Migrationspfad: Er beginnt mit dem, was gleich bleibt und was sich auf einen Blick ändert. Anschließend werden modellbasierte Clienteinrichtung, Einzel-Agent-Features und schließlich Multi-Agent-Orchestrierung mit konkretem Code nebeneinander behandelt. Links zu ausführungsfähigen Beispielen im Agent Framework-Repository helfen Ihnen dabei, jeden Schritt zu überprüfen.

Wichtige Ähnlichkeiten und Unterschiede

Was bleibt gleich

Die Grundlagen sind vertraut. Sie erstellen weiterhin Agents um einen Modellclient, stellen Anweisungen bereit und fügen Tools an. Beide Bibliotheken unterstützen Funktionsstiltools, Tokenstreaming, multimodale Inhalte und asynchrone E/A.

# Both frameworks follow similar patterns
# AutoGen
agent = AssistantAgent(name="assistant", model_client=client, tools=[my_tool])
result = await agent.run(task="Help me with this task")

# Agent Framework
agent = ChatAgent(name="assistant", chat_client=client, tools=[my_tool])
result = await agent.run("Help me with this task")

Zentrale Unterschiede

  1. Orchestrierungsstil: AutoGen koppelt einen ereignisgesteuerten Kern mit einem übergeordneten Team. Das Agent-Framework zentriert sich auf einem typierten, graphbasierten Workflow Diagramm, das Daten entlang der Kanten weiter leitet und Executoren aktiviert, wenn Eingaben bereit sind.

  2. Tools: AutoGen umschließt Funktionen mit FunctionTool. Agent Framework verwendet @ai_functionschemas automatisch, leitet Schemas ab und fügt gehostete Tools wie z. B. einen Codedolmetscher und eine Websuche hinzu.

  3. Agent-Verhalten: AssistantAgent ist eine einzelne Drehung, es sei denn, Sie erhöhen max_tool_iterations. ChatAgent ist standardmäßig multidrehend und fordert Tools immer wieder auf, bis sie eine endgültige Antwort zurückgeben kann.

  4. Laufzeit: AutoGen bietet eingebettete und experimentelle verteilte Laufzeiten. Agent Framework konzentriert sich heute auf die Komposition mit einem einzigen Prozess; die verteilte Ausführung ist geplant.

Modellclienterstellung und -konfiguration

Beide Frameworks bieten Modellclients für wichtige KI-Anbieter mit ähnlichen, aber nicht identischen APIs.

Merkmal AutoGen Agenten-Framework
OpenAI-Client OpenAIChatCompletionClient OpenAIChatClient
OpenAI-Antwortclient ❌ Nicht verfügbar OpenAIResponsesClient
Azure OpenAI AzureOpenAIChatCompletionClient AzureOpenAIChatClient
Azure OpenAI-Antworten ❌ Nicht verfügbar AzureOpenAIResponsesClient
Azure AI AzureAIChatCompletionClient AzureAIAgentClient
Anthropic AnthropicChatCompletionClient 🚧 Geplant
Ollama OllamaChatCompletionClient 🚧 Geplant
Zwischenspeicherung ChatCompletionCache Umschlag 🚧 Geplant

AutoGen-Modellclients

from autogen_ext.models.openai import OpenAIChatCompletionClient, AzureOpenAIChatCompletionClient

# OpenAI
client = OpenAIChatCompletionClient(
    model="gpt-5",
    api_key="your-key"
)

# Azure OpenAI
client = AzureOpenAIChatCompletionClient(
    azure_endpoint="https://your-endpoint.openai.azure.com/",
    azure_deployment="gpt-5",
    api_version="2024-12-01",
    api_key="your-key"
)

Agent Framework ChatClients

from agent_framework.openai import OpenAIChatClient
from agent_framework.azure import AzureOpenAIChatClient

# OpenAI (reads API key from environment)
client = OpenAIChatClient(model_id="gpt-5")

# Azure OpenAI (uses environment or default credentials; see samples for auth options)
client = AzureOpenAIChatClient(model_id="gpt-5")

Ausführliche Beispiele finden Sie unter:

Unterstützung der Antwort-API (Agent-Framework exklusiv)

Agent Frameworks AzureOpenAIResponsesClient und OpenAIResponsesClient bieten spezielle Unterstützung für Gründemodelle und strukturierte Antworten, die in AutoGen nicht verfügbar sind:

from agent_framework.azure import AzureOpenAIResponsesClient
from agent_framework.openai import OpenAIResponsesClient

# Azure OpenAI with Responses API
azure_responses_client = AzureOpenAIResponsesClient(model_id="gpt-5")

# OpenAI with Responses API
openai_responses_client = OpenAIResponsesClient(model_id="gpt-5")

Beispiele für die Antwort-API finden Sie unter:

Single-Agent Featurezuordnung

In diesem Abschnitt werden Einzel-Agent-Features zwischen AutoGen und Agent Framework zugeordnet. Erstellen Sie mit einem Client einen Agent, fügen Sie Tools an, und wählen Sie zwischen Nicht-Streaming- und Streamingausführung aus.

Grundlegende Agent-Erstellung und -Ausführung

Nachdem Sie einen Modellclient konfiguriert haben, erstellt der nächste Schritt Agents. Beide Frameworks bieten ähnliche Agentabstraktionen, aber mit unterschiedlichen Standardverhaltens- und Konfigurationsoptionen.

AutoGen AssistantAgent

from autogen_agentchat.agents import AssistantAgent

agent = AssistantAgent(
    name="assistant",
    model_client=client,
    system_message="You are a helpful assistant.",
    tools=[my_tool],
    max_tool_iterations=1  # Single-turn by default
)

# Execution
result = await agent.run(task="What's the weather?")

Agent Framework ChatAgent

from agent_framework import ChatAgent, ai_function
from agent_framework.openai import OpenAIChatClient

# Create simple tools for the example
@ai_function
def get_weather(location: str) -> str:
    """Get weather for a location."""
    return f"Weather in {location}: sunny"

@ai_function
def get_time() -> str:
    """Get current time."""
    return "Current time: 2:30 PM"

# Create client
client = OpenAIChatClient(model_id="gpt-5")

async def example():
    # Direct creation
    agent = ChatAgent(
        name="assistant",
        chat_client=client,
        instructions="You are a helpful assistant.",
        tools=[get_weather]  # Multi-turn by default
    )

    # Factory method (more convenient)
    agent = client.create_agent(
        name="assistant",
        instructions="You are a helpful assistant.",
        tools=[get_weather]
    )

    # Execution with runtime tool configuration
    result = await agent.run(
        "What's the weather?",
        tools=[get_time],  # Can add tools at runtime
        tool_choice="auto"
    )

Wichtige Unterschiede:

  • Standardverhalten: ChatAgent Automatisches Durchlaufen von Toolaufrufen, erfordert jedoch AssistantAgent eine explizite max_tool_iterations Einstellung
  • Laufzeitkonfiguration: ChatAgent.run() Akzeptiert tools und tool_choice Parameter für anpassungsbezogene Aufrufe
  • Factorymethoden: Agent Framework bietet praktische Factorymethoden direkt von Chatclients
  • Zustandsverwaltung: ChatAgent ist zustandslos und verwaltet keine Unterhaltungshistorie zwischen Aufrufen, im Gegensatz dazu AssistantAgent , dass der Unterhaltungsverlauf im Rahmen seines Zustands beibehalten wird.

Verwalten des Unterhaltungsstatus mit AgentThread

Verwenden Sie die Verwendung ChatAgent zum Verwalten des Unterhaltungsverlaufs, um Unterhaltungen AgentThreadfortzusetzen:

# Assume we have an agent from previous examples
async def conversation_example():
    # Create a new thread that will be reused
    thread = agent.get_new_thread()

    # First interaction - thread is empty
    result1 = await agent.run("What's 2+2?", thread=thread)
    print(result1.text)  # "4"

    # Continue conversation - thread contains previous messages
    result2 = await agent.run("What about that number times 10?", thread=thread)
    print(result2.text)  # "40" (understands "that number" refers to 4)

    # AgentThread can use external storage, similar to ChatCompletionContext in AutoGen

Statuslos standardmäßig: schnelle Demo

# Without a thread (two independent invocations)
r1 = await agent.run("What's 2+2?")
print(r1.text)  # for example, "4"

r2 = await agent.run("What about that number times 10?")
print(r2.text)  # Likely ambiguous without prior context; cannot be "40"

# With a thread (shared context across calls)
thread = agent.get_new_thread()
print((await agent.run("What's 2+2?", thread=thread)).text)  # "4"
print((await agent.run("What about that number times 10?", thread=thread)).text)  # "40"

Beispiele für die Threadverwaltung finden Sie unter:

OpenAI Assistant Agent Equivalence

Beide Frameworks bieten die Integration der OpenAI-Assistenten-API:

# AutoGen OpenAIAssistantAgent
from autogen_ext.agents.openai import OpenAIAssistantAgent
# Agent Framework has OpenAI Assistants support via OpenAIAssistantsClient
from agent_framework.openai import OpenAIAssistantsClient

Beispiele für OpenAI-Assistenten finden Sie unter:

Streamingunterstützung

Beide Frameworks streamen Token in Echtzeit – von Clients und von Agents – um UIs reaktionsfähig zu halten.

AutoGen Streaming

# Model client streaming
async for chunk in client.create_stream(messages):
    if isinstance(chunk, str):
        print(chunk, end="")

# Agent streaming
async for event in agent.run_stream(task="Hello"):
    if isinstance(event, ModelClientStreamingChunkEvent):
        print(event.content, end="")
    elif isinstance(event, TaskResult):
        print("Final result received")

Agent Framework Streaming

# Assume we have client, agent, and tools from previous examples
async def streaming_example():
    # Chat client streaming
    async for chunk in client.get_streaming_response("Hello", tools=tools):
        if chunk.text:
            print(chunk.text, end="")

    # Agent streaming
    async for chunk in agent.run_stream("Hello"):
        if chunk.text:
            print(chunk.text, end="", flush=True)

Tipp: Im Agent-Framework liefern Sowohl Clients als auch Agents das gleiche Update-Shape; Sie können in beiden Fällen lesen chunk.text .

Nachrichtentypen und Erstellung

Das Verständnis, wie Nachrichten funktionieren, ist für die effektive Agentkommunikation von entscheidender Bedeutung. Beide Frameworks bieten unterschiedliche Ansätze für die Nachrichtenerstellung und -behandlung, wobei AutoGen separate Nachrichtenklassen und Agent Framework mit einem einheitlichen Nachrichtensystem verwendet.

AutoGen-Nachrichtentypen

from autogen_agentchat.messages import TextMessage, MultiModalMessage
from autogen_core.models import UserMessage

# Text message
text_msg = TextMessage(content="Hello", source="user")

# Multi-modal message
multi_modal_msg = MultiModalMessage(
    content=["Describe this image", image_data],
    source="user"
)

# Convert to model format for use with model clients
user_message = text_msg.to_model_message()

Agent Framework-Nachrichtentypen

from agent_framework import ChatMessage, TextContent, DataContent, UriContent, Role
import base64

# Text message
text_msg = ChatMessage(role=Role.USER, text="Hello")

# Supply real image bytes, or use a data: URI/URL via UriContent
image_bytes = b"<your_image_bytes>"
image_b64 = base64.b64encode(image_bytes).decode()
image_uri = f"data:image/jpeg;base64,{image_b64}"

# Multi-modal message with mixed content
multi_modal_msg = ChatMessage(
    role=Role.USER,
    contents=[
        TextContent(text="Describe this image"),
        DataContent(uri=image_uri, media_type="image/jpeg")
    ]
)

Wichtige Unterschiede:

  • AutoGen verwendet separate Nachrichtenklassen (TextMessage, MultiModalMessage) mit einem source Feld
  • Agent Framework verwendet ein Einheitliches ChatMessage mit typierten Inhaltsobjekten und einem role Feld.
  • Agent Framework-Nachrichten verwenden Role Enum (USER, ASSISTANT, SYSTEM, TOOL) anstelle von Zeichenfolgenquellen

Toolerstellung und -integration

Tools erweitern Agent-Funktionen über die Textgenerierung hinaus. Die Frameworks nutzen unterschiedliche Ansätze für die Toolerstellung, wobei Agent Framework eine automatisiertere Schemagenerierung bereitstellt.

AutoGen FunctionTool

from autogen_core.tools import FunctionTool

async def get_weather(location: str) -> str:
    """Get weather for a location."""
    return f"Weather in {location}: sunny"

# Manual tool creation
tool = FunctionTool(
    func=get_weather,
    description="Get weather information"
)

# Use with agent
agent = AssistantAgent(name="assistant", model_client=client, tools=[tool])

Agent Framework @ai_function

from agent_framework import ai_function
from typing import Annotated
from pydantic import Field

@ai_function
def get_weather(
    location: Annotated[str, Field(description="The location to get weather for")]
) -> str:
    """Get weather for a location."""
    return f"Weather in {location}: sunny"

# Direct use with agent (automatic conversion)
agent = ChatAgent(name="assistant", chat_client=client, tools=[get_weather])

Ausführliche Beispiele finden Sie unter:

Gehostete Tools (Agent Framework Exklusiv)

Agent Framework stellt gehostete Tools bereit, die in AutoGen nicht verfügbar sind:

from agent_framework import ChatAgent, HostedCodeInterpreterTool, HostedWebSearchTool
from agent_framework.azure import AzureOpenAIChatClient

# Azure OpenAI client with a model that supports hosted tools
client = AzureOpenAIChatClient(model_id="gpt-5")

# Code execution tool
code_tool = HostedCodeInterpreterTool()

# Web search tool
search_tool = HostedWebSearchTool()

agent = ChatAgent(
    name="researcher",
    chat_client=client,
    tools=[code_tool, search_tool]
)

Ausführliche Beispiele finden Sie unter:

Anforderungen und Vorbehalte:

  • Gehostete Tools sind nur für Modelle/Konten verfügbar, die sie unterstützen. Überprüfen Sie die Berechtigungen und Modellunterstützung für Ihren Anbieter, bevor Sie diese Tools aktivieren.
  • Die Konfiguration unterscheidet sich von Anbieter; befolgen Sie die Voraussetzungen in den einzelnen Beispielen für Setup und Berechtigungen.
  • Nicht jedes Modell unterstützt jedes gehostete Tool (z. B. Websuche und Codedolmetscher). Wählen Sie ein kompatibles Modell in Ihrer Umgebung aus.

Hinweis

AutoGen unterstützt lokale Codeausführungstools, dieses Feature ist jedoch für zukünftige Agent Framework-Versionen geplant.

Schlüsselunterschied: Das Agentframework verarbeitet die Tooliteration automatisch auf Agentebene. Im Gegensatz zum Parameter von max_tool_iterations AutoGen setzen Agent Framework-Agents die Toolausführung bis zum Abschluss standardmäßig mit integrierten Sicherheitsmechanismen fort, um endlose Schleifen zu verhindern.

MCP-Serverunterstützung

Für die erweiterte Toolintegration unterstützen beide Frameworks Model Context Protocol (MCP), sodass Agents mit externen Diensten und Datenquellen interagieren können. Agent Framework bietet umfassendere integrierte Unterstützung.

AutoGen MCP-Unterstützung

AutoGen verfügt über grundlegende MCP-Unterstützung durch Erweiterungen (spezifische Implementierungsdetails variieren je nach Version).

Agent Framework MCP-Unterstützung

from agent_framework import ChatAgent, MCPStdioTool, MCPStreamableHTTPTool, MCPWebsocketTool
from agent_framework.openai import OpenAIChatClient

# Create client for the example
client = OpenAIChatClient(model_id="gpt-5")

# Stdio MCP server
mcp_tool = MCPStdioTool(
    name="filesystem",
    command="uvx mcp-server-filesystem",
    args=["/allowed/directory"]
)

# HTTP streaming MCP
http_mcp = MCPStreamableHTTPTool(
    name="http_mcp",
    url="http://localhost:8000/sse"
)

# WebSocket MCP
ws_mcp = MCPWebsocketTool(
    name="websocket_mcp",
    url="ws://localhost:8000/ws"
)

agent = ChatAgent(name="assistant", chat_client=client, tools=[mcp_tool])

Beispiele für MCP finden Sie unter:

Agent-as-a-Tool-Muster

Ein leistungsfähiges Muster ist die Verwendung von Agents selbst als Tools, die hierarchische Agentarchitekturen ermöglichen. Beide Frameworks unterstützen dieses Muster mit unterschiedlichen Implementierungen.

AutoGen AgentTool

from autogen_agentchat.tools import AgentTool

# Create specialized agent
writer = AssistantAgent(
    name="writer",
    model_client=client,
    system_message="You are a creative writer."
)

# Wrap as tool
writer_tool = AgentTool(agent=writer)

# Use in coordinator (requires disabling parallel tool calls)
coordinator_client = OpenAIChatCompletionClient(
    model="gpt-5",
    parallel_tool_calls=False
)
coordinator = AssistantAgent(
    name="coordinator",
    model_client=coordinator_client,
    tools=[writer_tool]
)

Agent Framework as_tool()

from agent_framework import ChatAgent

# Assume we have client from previous examples
# Create specialized agent
writer = ChatAgent(
    name="writer",
    chat_client=client,
    instructions="You are a creative writer."
)

# Convert to tool
writer_tool = writer.as_tool(
    name="creative_writer",
    description="Generate creative content",
    arg_name="request",
    arg_description="What to write"
)

# Use in coordinator
coordinator = ChatAgent(
    name="coordinator",
    chat_client=client,
    tools=[writer_tool]
)

Expliziter Migrationshinweis: Legen Sie in AutoGen beim Umschließen von Agents als Tools auf dem Modellclient des Koordinators fest, parallel_tool_calls=False um Parallelitätsprobleme beim Aufrufen derselben Agentinstanz zu vermeiden. Im Agent Framework ist es nicht erforderlich, parallele Toolaufrufe zu deaktivieren, as_tool() da Agents standardmäßig zustandslos sind.

Middleware (Agent Framework-Feature)

Das Agent Framework führt Middleware-Funktionen ein, die AutoGen fehlt. Middleware ermöglicht leistungsstarke cross-cutting-Bedenken wie Protokollierung, Sicherheit und Leistungsüberwachung.

from agent_framework import ChatAgent, AgentRunContext, FunctionInvocationContext
from typing import Callable, Awaitable

# Assume we have client from previous examples
async def logging_middleware(
    context: AgentRunContext,
    next: Callable[[AgentRunContext], Awaitable[None]]
) -> None:
    print(f"Agent {context.agent.name} starting")
    await next(context)
    print(f"Agent {context.agent.name} completed")

async def security_middleware(
    context: FunctionInvocationContext,
    next: Callable[[FunctionInvocationContext], Awaitable[None]]
) -> None:
    if "password" in str(context.arguments):
        print("Blocking function call with sensitive data")
        return  # Don't call next()
    await next(context)

agent = ChatAgent(
    name="secure_agent",
    chat_client=client,
    middleware=[logging_middleware, security_middleware]
)

Vorteile:

  • Sicherheit: Eingabeüberprüfung und Inhaltsfilterung
  • Observability: Protokollierung, Metriken und Ablaufverfolgung
  • Leistung: Zwischenspeichern und Einschränken der Rate
  • Fehlerbehandlung: Ordnungsgemäße Beeinträchtigung und Wiederholungslogik

Ausführliche Middlewarebeispiele finden Sie unter:

Benutzerdefinierte Agenten

Manchmal möchten Sie überhaupt keinen modellgestützten Agent – Sie möchten einen deterministischen oder API-gesicherten Agent mit benutzerdefinierter Logik. Beide Frameworks unterstützen das Erstellen von benutzerdefinierten Agents, aber die Muster unterscheiden sich.

AutoGen: Subclass BaseChatAgent

from typing import Sequence
from autogen_agentchat.agents import BaseChatAgent
from autogen_agentchat.base import Response
from autogen_agentchat.messages import BaseChatMessage, TextMessage, StopMessage
from autogen_core import CancellationToken

class StaticAgent(BaseChatAgent):
    def __init__(self, name: str = "static", description: str = "Static responder") -> None:
        super().__init__(name, description)

    @property
    def produced_message_types(self) -> Sequence[type[BaseChatMessage]]:  # Which message types this agent produces
        return (TextMessage,)

    async def on_messages(self, messages: Sequence[BaseChatMessage], cancellation_token: CancellationToken) -> Response:
        # Always return a static response
        return Response(chat_message=TextMessage(content="Hello from AutoGen custom agent", source=self.name))

Notes:

  • Implementieren und zurückgeben Sie on_messages(...) eine Response mit einer Chatnachricht.
  • Optional implementieren on_reset(...) , um den internen Zustand zwischen Denläufen zu löschen.

Agent-Framework: Erweitern von BaseAgent (threadfähig)

from collections.abc import AsyncIterable
from typing import Any
from agent_framework import (
    AgentRunResponse,
    AgentRunResponseUpdate,
    AgentThread,
    BaseAgent,
    ChatMessage,
    Role,
    TextContent,
)

class StaticAgent(BaseAgent):
    async def run(
        self,
        messages: str | ChatMessage | list[str] | list[ChatMessage] | None = None,
        *,
        thread: AgentThread | None = None,
        **kwargs: Any,
    ) -> AgentRunResponse:
        # Build a static reply
        reply = ChatMessage(role=Role.ASSISTANT, contents=[TextContent(text="Hello from AF custom agent")])

        # Persist conversation to the provided AgentThread (if any)
        if thread is not None:
            normalized = self._normalize_messages(messages)
            await self._notify_thread_of_new_messages(thread, normalized, reply)

        return AgentRunResponse(messages=[reply])

    async def run_stream(
        self,
        messages: str | ChatMessage | list[str] | list[ChatMessage] | None = None,
        *,
        thread: AgentThread | None = None,
        **kwargs: Any,
    ) -> AsyncIterable[AgentRunResponseUpdate]:
        # Stream the same static response in a single chunk for simplicity
        yield AgentRunResponseUpdate(contents=[TextContent(text="Hello from AF custom agent")], role=Role.ASSISTANT)

        # Notify thread of input and the complete response once streaming ends
        if thread is not None:
            reply = ChatMessage(role=Role.ASSISTANT, contents=[TextContent(text="Hello from AF custom agent")])
            normalized = self._normalize_messages(messages)
            await self._notify_thread_of_new_messages(thread, normalized, reply)

Notes:

  • AgentThreadverwaltet den Unterhaltungszustand extern; verwenden agent.get_new_thread() und übergeben Sie es anrun/run_stream .
  • Rufen Sie self._notify_thread_of_new_messages(thread, input_messages, response_messages) auf, damit der Thread beide Seiten des Austauschs hat.
  • Sehen Sie sich das vollständige Beispiel an: Benutzerdefinierter Agent

Als Nächstes betrachten wir die Multi-Agent-Orchestrierung – der Bereich, in dem sich die Frameworks am meisten unterscheiden.

Zuordnung von Multi-Agent-Features

Übersicht über das Programmiermodell

Die Multi-Agent-Programmiermodelle stellen den wichtigsten Unterschied zwischen den beiden Frameworks dar.

Der Dualmodellansatz von AutoGen

AutoGen bietet zwei Programmiermodelle:

  1. autogen-core: Low-Level, ereignisgesteuerte Programmierung mit RoutedAgent und Nachrichtenabonnements
  2. Team Abstraktion: Allgemeines, lauforientiertes Modell, das auf der Grundlage von autogen-core
# Low-level autogen-core (complex)
class MyAgent(RoutedAgent):
    @message_handler
    async def handle_message(self, message: TextMessage, ctx: MessageContext) -> None:
        # Handle specific message types
        pass

# High-level Team (easier but limited)
team = RoundRobinGroupChat(
    participants=[agent1, agent2],
    termination_condition=StopAfterNMessages(5)
)
result = await team.run(task="Collaborate on this task")

Herausforderungen:

  • Das Modell auf niedriger Ebene ist für die meisten Benutzer zu komplex.
  • Allgemeines Modell kann für komplexe Verhaltensweisen eingeschränkt werden
  • Die Überbrückung zwischen den beiden Modellen erhöht die Implementierungskomplexität

Einheitliches Workflowmodell des Agent-Frameworks

Agent Framework bietet eine einzelne Workflow Abstraktion, die das Beste aus beiden Ansätzen kombiniert:

from agent_framework import WorkflowBuilder, executor, WorkflowContext
from typing_extensions import Never

# Assume we have agent1 and agent2 from previous examples
@executor(id="agent1")
async def agent1_executor(input_msg: str, ctx: WorkflowContext[str]) -> None:
    response = await agent1.run(input_msg)
    await ctx.send_message(response.text)

@executor(id="agent2")
async def agent2_executor(input_msg: str, ctx: WorkflowContext[Never, str]) -> None:
    response = await agent2.run(input_msg)
    await ctx.yield_output(response.text)  # Final output

# Build typed data flow graph
workflow = (WorkflowBuilder()
           .add_edge(agent1_executor, agent2_executor)
           .set_start_executor(agent1_executor)
           .build())

# Example usage (would be in async context)
# result = await workflow.run("Initial input")

Ausführliche Workflowbeispiele finden Sie unter:

Vorteile:

  • Einheitliches Modell: Einzelne Abstraktion für alle Komplexitätsstufen
  • Typsicherheit: Stark typierte Eingaben und Ausgaben
  • Graph-Visualisierung: Datenflussdarstellung löschen
  • Flexible Komposition: Mix-Agents, Funktionen und Unterworkflows

Workflow vs GraphFlow

Die Abstraktion des Workflow Agent Frameworks ist von AutoGens experimentellem GraphFlow Feature inspiriert, stellt aber eine bedeutende Entwicklung in der Designphilosophie dar:

  • GraphFlow: Steuerungsfluss basierend auf Übergängen und Nachrichten werden an alle Agents übertragen; Übergänge sind auf übertragenen Nachrichteninhalten bedingt
  • Workflow: Datenflussbasiert, bei dem Nachrichten über bestimmte Kanten und Executoren weitergeleitet werden, werden von Rändern aktiviert, mit Unterstützung für die gleichzeitige Ausführung.

Visuelle Übersicht

Das folgende Diagramm kontrastiert autoGens Steuerfluss-GraphFlow (links) mit dem Datenflussworkflow-Workflow von Agent Framework (rechts). GraphFlow modelliert Agents als Knoten mit bedingten Übergängen und Übertragungen. Workflowmodelle-Executoren (Agents, Funktionen oder Unterworkflows), die durch typierte Kanten verbunden sind; sie unterstützt auch Anforderungs-/Antwort-Pausen und Prüfpunkte.

flowchart LR

  subgraph AutoGenGraphFlow
    direction TB
    U[User / Task] --> A[Agent A]
    A -->|success| B[Agent B]
    A -->|retry| C[Agent C]
    A -. broadcast .- B
    A -. broadcast .- C
  end

  subgraph AgentFrameworkWorkflow
    direction TB
    I[Input] --> E1[Executor 1]
    E1 -->|"str"| E2[Executor 2]
    E1 -->|"image"| E3[Executor 3]
    E3 -->|"str"| E2
    E2 --> OUT[(Final Output)]
  end

  R[Request / Response Gate]
  E2 -. request .-> R
  R -. resume .-> E2

  CP[Checkpoint]
  E1 -. save .-> CP
  CP -. load .-> E1

Praktisch:

  • GraphFlow verwendet Agents als Knoten und sendet Nachrichten; Kanten stellen bedingte Übergänge dar.
  • Workflow leitet Nachrichten entlang der Ränder weiter. Knoten (Executoren) können Agents, reine Funktionen oder Unterworkflows sein.
  • Mit Anforderung/Antwort kann ein Workflow für externe Eingaben angehalten werden; Die Prüfpunkterstellung behält den Fortschritt bei und aktiviert den Fortsetzen.

Codevergleich

1) Sequenzielle + bedingte
# AutoGen GraphFlow (fluent builder) — writer → reviewer → editor (conditional)
from autogen_agentchat.agents import AssistantAgent
from autogen_agentchat.teams import DiGraphBuilder, GraphFlow

writer = AssistantAgent(name="writer", description="Writes a draft", model_client=client)
reviewer = AssistantAgent(name="reviewer", description="Reviews the draft", model_client=client)
editor = AssistantAgent(name="editor", description="Finalizes the draft", model_client=client)

graph = (
    DiGraphBuilder()
    .add_node(writer).add_node(reviewer).add_node(editor)
    .add_edge(writer, reviewer)  # always
    .add_edge(reviewer, editor, condition=lambda msg: "approve" in msg.to_model_text())
    .set_entry_point(writer)
).build()

team = GraphFlow(participants=[writer, reviewer, editor], graph=graph)
result = await team.run(task="Draft a short paragraph about solar power")
# Agent Framework Workflow — sequential executors with conditional logic
from agent_framework import WorkflowBuilder, executor, WorkflowContext
from typing_extensions import Never

@executor(id="writer")
async def writer_exec(task: str, ctx: WorkflowContext[str]) -> None:
    await ctx.send_message(f"Draft: {task}")

@executor(id="reviewer")
async def reviewer_exec(draft: str, ctx: WorkflowContext[str]) -> None:
    decision = "approve" if "solar" in draft.lower() else "revise"
    await ctx.send_message(f"{decision}:{draft}")

@executor(id="editor")
async def editor_exec(msg: str, ctx: WorkflowContext[Never, str]) -> None:
    if msg.startswith("approve:"):
        await ctx.yield_output(msg.split(":", 1)[1])
    else:
        await ctx.yield_output("Needs revision")

workflow_seq = (
    WorkflowBuilder()
    .add_edge(writer_exec, reviewer_exec)
    .add_edge(reviewer_exec, editor_exec)
    .set_start_executor(writer_exec)
    .build()
)
2) Fan-out + Join (ALL vs ANY)
# AutoGen GraphFlow — A → (B, C) → D with ALL/ANY join
from autogen_agentchat.teams import DiGraphBuilder, GraphFlow
A, B, C, D = agent_a, agent_b, agent_c, agent_d

# ALL (default): D runs after both B and C
g_all = (
    DiGraphBuilder()
    .add_node(A).add_node(B).add_node(C).add_node(D)
    .add_edge(A, B).add_edge(A, C)
    .add_edge(B, D).add_edge(C, D)
    .set_entry_point(A)
).build()

# ANY: D runs when either B or C completes
g_any = (
    DiGraphBuilder()
    .add_node(A).add_node(B).add_node(C).add_node(D)
    .add_edge(A, B).add_edge(A, C)
    .add_edge(B, D, activation_group="join_d", activation_condition="any")
    .add_edge(C, D, activation_group="join_d", activation_condition="any")
    .set_entry_point(A)
).build()
# Agent Framework Workflow — A → (B, C) → aggregator (ALL vs ANY)
from agent_framework import WorkflowBuilder, executor, WorkflowContext
from typing_extensions import Never

@executor(id="A")
async def start(task: str, ctx: WorkflowContext[str]) -> None:
    await ctx.send_message(f"B:{task}", target_id="B")
    await ctx.send_message(f"C:{task}", target_id="C")

@executor(id="B")
async def branch_b(text: str, ctx: WorkflowContext[str]) -> None:
    await ctx.send_message(f"B_done:{text}")

@executor(id="C")
async def branch_c(text: str, ctx: WorkflowContext[str]) -> None:
    await ctx.send_message(f"C_done:{text}")

@executor(id="join_any")
async def join_any(msg: str, ctx: WorkflowContext[Never, str]) -> None:
    await ctx.yield_output(f"First: {msg}")  # ANY join (first arrival)

@executor(id="join_all")
async def join_all(msg: str, ctx: WorkflowContext[str, str]) -> None:
    state = await ctx.get_executor_state() or {"items": []}
    state["items"].append(msg)
    await ctx.set_executor_state(state)
    if len(state["items"]) >= 2:
        await ctx.yield_output(" | ".join(state["items"]))  # ALL join

wf_any = (
    WorkflowBuilder()
    .add_edge(start, branch_b).add_edge(start, branch_c)
    .add_edge(branch_b, join_any).add_edge(branch_c, join_any)
    .set_start_executor(start)
    .build()
)

wf_all = (
    WorkflowBuilder()
    .add_edge(start, branch_b).add_edge(start, branch_c)
    .add_edge(branch_b, join_all).add_edge(branch_c, join_all)
    .set_start_executor(start)
    .build()
)
3) Gezieltes Routing (keine Übertragung)
from agent_framework import WorkflowBuilder, executor, WorkflowContext
from typing_extensions import Never

@executor(id="ingest")
async def ingest(task: str, ctx: WorkflowContext[str]) -> None:
    # Route selectively using target_id
    if task.startswith("image:"):
        await ctx.send_message(task.removeprefix("image:"), target_id="vision")
    else:
        await ctx.send_message(task, target_id="writer")

@executor(id="writer")
async def write(text: str, ctx: WorkflowContext[Never, str]) -> None:
    await ctx.yield_output(f"Draft: {text}")

@executor(id="vision")
async def caption(image_ref: str, ctx: WorkflowContext[Never, str]) -> None:
    await ctx.yield_output(f"Caption: {image_ref}")

workflow = (
    WorkflowBuilder()
    .add_edge(ingest, write)
    .add_edge(ingest, caption)
    .set_start_executor(ingest)
    .build()
)

# Example usage (async):
# await workflow.run("Summarize the benefits of solar power")
# await workflow.run("image:https://example.com/panel.jpg")

Was zu beachten ist:

  • GraphFlow sendet Nachrichten und verwendet bedingte Übergänge. Das Verknüpfungsverhalten wird über zielseitige activation und pro Edge activation_group/activation_conditionkonfiguriert (z. B. gruppieren Sie beide Kanten mit ).join_dactivation_condition="any"
  • Workflow leitet Daten explizit weiter; wird target_id verwendet, um nachgeschaltete Executoren auszuwählen. Das Verknüpfungsverhalten befindet sich im empfangenden Vollstreckungsausführer (z. B. ertragen bei der ersten Eingabe vs für alle) oder über Orchestrierungs-Generatoren/Aggregatoren.
  • Executors in Workflow are free-form: wrap a ChatAgent, a function, or a sub-workflow and mix them within the same graph.

Zentrale Unterschiede

Die folgende Tabelle fasst die grundlegenden Unterschiede zwischen dem GraphFlow von AutoGen und dem Workflow von Agent Framework zusammen:

Aspekt AutoGen GraphFlow Agent Framework-Workflow
Flusstyp Steuerungsfluss (Kanten sind Übergänge) Datenfluss (Ränder weiterleiten Nachrichten)
Knotentypen Nur Agents Agents, Funktionen, Unterworkflows
Aktivierung Nachrichtenübertragung Edgebasierte Aktivierung
Typsicherheit Begrenzt Starke Eingabe während des gesamten
Kompositierbarkeit Begrenzt Hochkomposierbar

Schachtelungsmuster

AutoGen Team Nesting

# Inner team
inner_team = RoundRobinGroupChat(
    participants=[specialist1, specialist2],
    termination_condition=StopAfterNMessages(3)
)

# Outer team with nested team as participant
outer_team = RoundRobinGroupChat(
    participants=[coordinator, inner_team, reviewer],  # Team as participant
    termination_condition=StopAfterNMessages(10)
)

# Messages are broadcasted to all participants including nested team
result = await outer_team.run("Complex task requiring collaboration")

AutoGen-Schachtelungsmerkmale:

  • Geschachteltes Team empfängt alle Nachrichten aus dem äußeren Team
  • Geschachtelte Teamnachrichten werden an alle äußeren Teamteilnehmer übertragen.
  • Freigegebener Nachrichtenkontext auf allen Ebenen

Schachtelung von Agent-Framework-Workflows

from agent_framework import WorkflowExecutor, WorkflowBuilder

# Assume we have executors from previous examples
# specialist1_executor, specialist2_executor, coordinator_executor, reviewer_executor

# Create sub-workflow
sub_workflow = (WorkflowBuilder()
               .add_edge(specialist1_executor, specialist2_executor)
               .set_start_executor(specialist1_executor)
               .build())

# Wrap as executor
sub_workflow_executor = WorkflowExecutor(
    workflow=sub_workflow,
    id="sub_process"
)

# Use in parent workflow
parent_workflow = (WorkflowBuilder()
                  .add_edge(coordinator_executor, sub_workflow_executor)
                  .add_edge(sub_workflow_executor, reviewer_executor)
                  .set_start_executor(coordinator_executor)
                  .build())

Schachtelungsmerkmale des Agent-Frameworks:

  • Isolierte Eingabe/Ausgabe über WorkflowExecutor
  • Keine Nachrichtenübertragung – Daten fließen über bestimmte Verbindungen
  • Unabhängige Zustandsverwaltung für jede Workflowebene

Gruppenchatmuster

Gruppenchatmuster ermöglichen es mehreren Agents, an komplexen Aufgaben zusammenzuarbeiten. Hier erfahren Sie, wie allgemeine Muster zwischen Frameworks übersetzt werden.

RoundRobinGroupChat-Muster

AutoGen-Implementierung:

from autogen_agentchat.teams import RoundRobinGroupChat
from autogen_agentchat.conditions import StopAfterNMessages

team = RoundRobinGroupChat(
    participants=[agent1, agent2, agent3],
    termination_condition=StopAfterNMessages(10)
)
result = await team.run("Discuss this topic")

Agent Framework-Implementierung:

from agent_framework import SequentialBuilder, WorkflowOutputEvent

# Assume we have agent1, agent2, agent3 from previous examples
# Sequential workflow through participants
workflow = SequentialBuilder().participants([agent1, agent2, agent3]).build()

# Example usage (would be in async context)
async def sequential_example():
    # Each agent appends to shared conversation
    async for event in workflow.run_stream("Discuss this topic"):
        if isinstance(event, WorkflowOutputEvent):
            conversation_history = event.data  # list[ChatMessage]

Ausführliche Orchestrierungsbeispiele finden Sie unter:

Für gleichzeitige Ausführungsmuster bietet Das Agent Framework auch Folgendes:

from agent_framework import ConcurrentBuilder, WorkflowOutputEvent

# Assume we have agent1, agent2, agent3 from previous examples
# Concurrent workflow for parallel processing
workflow = (ConcurrentBuilder()
           .participants([agent1, agent2, agent3])
           .build())

# Example usage (would be in async context)
async def concurrent_example():
    # All agents process the input concurrently
    async for event in workflow.run_stream("Process this in parallel"):
        if isinstance(event, WorkflowOutputEvent):
            results = event.data  # Combined results from all agents

Beispiele für gleichzeitige Ausführung finden Sie unter:

MagenticOneGroupChat-Muster

AutoGen-Implementierung:

from autogen_agentchat.teams import MagenticOneGroupChat

team = MagenticOneGroupChat(
    participants=[researcher, coder, executor],
    model_client=coordinator_client,
    termination_condition=StopAfterNMessages(20)
)
result = await team.run("Complex research and analysis task")

Agent Framework-Implementierung:

from typing import cast
from agent_framework import (
    MAGENTIC_EVENT_TYPE_AGENT_DELTA,
    MAGENTIC_EVENT_TYPE_ORCHESTRATOR,
    AgentRunUpdateEvent,
    ChatAgent,
    ChatMessage,
    MagenticBuilder,
    WorkflowOutputEvent,
)
from agent_framework.openai import OpenAIChatClient

# Create a manager agent for orchestration
manager_agent = ChatAgent(
    name="MagenticManager",
    description="Orchestrator that coordinates the workflow",
    instructions="You coordinate a team to complete complex tasks efficiently.",
    chat_client=OpenAIChatClient(),
)

workflow = (
    MagenticBuilder()
    .participants(researcher=researcher, coder=coder)
    .with_standard_manager(
        agent=manager_agent,
        max_round_count=20,
        max_stall_count=3,
        max_reset_count=2,
    )
    .build()
)

# Example usage (would be in async context)
async def magentic_example():
    output: str | None = None
    async for event in workflow.run_stream("Complex research task"):
        if isinstance(event, AgentRunUpdateEvent):
            props = event.data.additional_properties if event.data else None
            event_type = props.get("magentic_event_type") if props else None

            if event_type == MAGENTIC_EVENT_TYPE_ORCHESTRATOR:
                text = event.data.text if event.data else ""
                print(f"[ORCHESTRATOR]: {text}")
            elif event_type == MAGENTIC_EVENT_TYPE_AGENT_DELTA:
                agent_id = props.get("agent_id", event.executor_id) if props else event.executor_id
                if event.data and event.data.text:
                    print(f"[{agent_id}]: {event.data.text}", end="")

        elif isinstance(event, WorkflowOutputEvent):
            output_messages = cast(list[ChatMessage], event.data)
            if output_messages:
                output = output_messages[-1].text

Anpassungsoptionen des Agent-Frameworks:

Der Magentic-Workflow bietet umfangreiche Anpassungsoptionen:

  • Managerkonfiguration: Verwenden eines ChatAgent mit benutzerdefinierten Anweisungen und Modelleinstellungen
  • Round limits: max_round_count, , max_stall_countmax_reset_count
  • Ereignisstreaming: Verwenden AgentRunUpdateEvent mit magentic_event_type Metadaten
  • Agent-Spezialisierung: Benutzerdefinierte Anweisungen und Tools pro Agent
  • Mensch-in-the-Loop: Planen der Überprüfung, Genehmigung von Tools und Einstandsintervention
# Advanced customization example with human-in-the-loop
from typing import cast
from agent_framework import (
    MAGENTIC_EVENT_TYPE_AGENT_DELTA,
    MAGENTIC_EVENT_TYPE_ORCHESTRATOR,
    AgentRunUpdateEvent,
    ChatAgent,
    MagenticBuilder,
    MagenticHumanInterventionDecision,
    MagenticHumanInterventionKind,
    MagenticHumanInterventionReply,
    MagenticHumanInterventionRequest,
    RequestInfoEvent,
    WorkflowOutputEvent,
)
from agent_framework.openai import OpenAIChatClient

# Create manager agent with custom configuration
manager_agent = ChatAgent(
    name="MagenticManager",
    description="Orchestrator for complex tasks",
    instructions="Custom orchestration instructions...",
    chat_client=OpenAIChatClient(model_id="gpt-4o"),
)

workflow = (
    MagenticBuilder()
    .participants(
        researcher=researcher_agent,
        coder=coder_agent,
        analyst=analyst_agent,
    )
    .with_standard_manager(
        agent=manager_agent,
        max_round_count=15,      # Limit total rounds
        max_stall_count=2,       # Trigger stall handling
        max_reset_count=1,       # Allow one reset on failure
    )
    .with_plan_review()           # Enable human plan review
    .with_human_input_on_stall()  # Enable human intervention on stalls
    .build()
)

# Handle human intervention requests during execution
async for event in workflow.run_stream("Complex task"):
    if isinstance(event, RequestInfoEvent) and event.request_type is MagenticHumanInterventionRequest:
        req = cast(MagenticHumanInterventionRequest, event.data)
        if req.kind == MagenticHumanInterventionKind.PLAN_REVIEW:
            # Review and approve the plan
            reply = MagenticHumanInterventionReply(
                decision=MagenticHumanInterventionDecision.APPROVE
            )
            async for ev in workflow.send_responses_streaming({event.request_id: reply}):
                pass  # Handle continuation

Ausführliche Magentische Beispiele finden Sie unter:

Zukünftige Muster

Die Agent Framework-Roadmap enthält mehrere AutoGen-Muster, die sich derzeit in der Entwicklung befinden:

  • Schwarmmuster: Handoff-basierte Agentenkoordination
  • SelectorGroupChat: LLM-gesteuerte Lautsprecherauswahl

Human-in-the-Loop mit Anforderungsantwort

Ein wichtiges neues Feature im Agent Framework Workflow ist das Konzept der Anforderung und Antwort, mit dem Workflows die Ausführung anhalten und auf externe Eingaben warten können, bevor sie fortfahren. Diese Funktion ist nicht in der Abstraktion von Team AutoGen vorhanden und ermöglicht anspruchsvolle Menschliche-in-the-Loop-Muster.

AutoGen-Einschränkungen

Die Abstraktion von Team AutoGen wird kontinuierlich ausgeführt, sobald sie gestartet wurde, und bietet keine integrierten Mechanismen zum Anhalten der Ausführung für menschliche Eingaben. Für alle Funktionen von Menschen in der Schleife sind benutzerdefinierte Implementierungen außerhalb des Frameworks erforderlich.

Agent Framework-Request-Response-API

Das Agent-Framework bietet integrierte Anforderungsantwortfunktionen, bei denen jeder Verwalter Anforderungen mithilfe ctx.request_info() von Antworten mit dem @response_handler Dekorateur senden und verarbeiten kann.

from agent_framework import (
    RequestInfoEvent, WorkflowBuilder, WorkflowContext, 
    Executor, handler, response_handler
)
from dataclasses import dataclass

# Assume we have agent_executor defined elsewhere

# Define typed request payload
@dataclass
class ApprovalRequest:
    """Request human approval for agent output."""
    content: str = ""
    agent_name: str = ""

# Workflow executor that requests human approval
class ReviewerExecutor(Executor):
    
    @handler
    async def review_content(
        self,
        agent_response: str,
        ctx: WorkflowContext
    ) -> None:
        # Request human input with structured data
        approval_request = ApprovalRequest(
            content=agent_response,
            agent_name="writer_agent"
        )
        await ctx.request_info(request_data=approval_request, response_type=str)
    
    @response_handler
    async def handle_approval_response(
        self,
        original_request: ApprovalRequest,
        decision: str,
        ctx: WorkflowContext
    ) -> None:
        decision_lower = decision.strip().lower()
        original_content = original_request.content

        if decision_lower == "approved":
            await ctx.yield_output(f"APPROVED: {original_content}")
        else:
            await ctx.yield_output(f"REVISION NEEDED: {decision}")

# Build workflow with human-in-the-loop
reviewer = ReviewerExecutor(id="reviewer")

workflow = (WorkflowBuilder()
           .add_edge(agent_executor, reviewer)
           .set_start_executor(agent_executor)
           .build())

Ausführen von Human-in-the-Loop-Workflows

Das Agent Framework stellt Streaming-APIs bereit, um den Pausen-Fortsetzungszyklus zu behandeln:

from agent_framework import RequestInfoEvent, WorkflowOutputEvent

# Assume we have workflow defined from previous examples
async def run_with_human_input():
    pending_responses = None
    completed = False

    while not completed:
        # First iteration uses run_stream, subsequent use send_responses_streaming
        stream = (
            workflow.send_responses_streaming(pending_responses)
            if pending_responses
            else workflow.run_stream("initial input")
        )

        events = [event async for event in stream]
        pending_responses = None

        # Collect human requests and outputs
        for event in events:
            if isinstance(event, RequestInfoEvent):
                # Display request to human and collect response
                request_data = event.data  # ApprovalRequest instance
                print(f"Review needed: {request_data.content}")

                human_response = input("Enter 'approved' or revision notes: ")
                pending_responses = {event.request_id: human_response}

            elif isinstance(event, WorkflowOutputEvent):
                print(f"Final result: {event.data}")
                completed = True

Beispiele für Benutzer-in-the-Loop-Workflows finden Sie unter:

Prüfpunkt und Fortsetzen von Workflows

Ein weiterer wichtiger Vorteil der AutoGen-Abstraktion von Workflow Agent Framework Team ist die integrierte Unterstützung für die Prüfpunkt- und Fortsetzungsausführung. Auf diese Weise können Workflows angehalten, beibehalten und später von jedem Prüfpunkt fortgesetzt werden, wodurch Fehlertoleranz bereitgestellt und lange ausgeführte oder asynchrone Workflows aktiviert werden.

AutoGen-Einschränkungen

Die Abstraktion von Team AutoGen bietet keine integrierten Prüfpunktfunktionen. Alle Persistenz- oder Wiederherstellungsmechanismen müssen extern implementiert werden und erfordern häufig komplexe Zustandsverwaltungs- und Serialisierungslogik.

Agent Framework Prüfpunkting

Agent Framework bietet umfassende Prüfpunkte und FileCheckpointStorage die with_checkpointing() Methode für WorkflowBuilder. Prüfpunkteerfassung:

  • Ausführungszustand: Lokaler Zustand für jeden Executor, der verwendet wird ctx.set_executor_state()
  • Freigegebener Zustand: Ausführungsübergreifender Zustand mithilfe von ctx.set_shared_state()
  • Nachrichtenwarteschlangen: Ausstehende Nachrichten zwischen Executoren
  • Workflowposition: Aktueller Ausführungsfortschritt und nächste Schritte
from agent_framework import (
    FileCheckpointStorage, WorkflowBuilder, WorkflowContext,
    Executor, handler
)
from typing_extensions import Never

class ProcessingExecutor(Executor):
    @handler
    async def process(self, data: str, ctx: WorkflowContext[str]) -> None:
        # Process the data
        result = f"Processed: {data.upper()}"
        print(f"Processing: '{data}' -> '{result}'")

        # Persist executor-local state
        prev_state = await ctx.get_executor_state() or {}
        count = prev_state.get("count", 0) + 1
        await ctx.set_executor_state({
            "count": count,
            "last_input": data,
            "last_output": result
        })

        # Persist shared state for other executors
        await ctx.set_shared_state("original_input", data)
        await ctx.set_shared_state("processed_output", result)

        await ctx.send_message(result)

class FinalizeExecutor(Executor):
    @handler
    async def finalize(self, data: str, ctx: WorkflowContext[Never, str]) -> None:
        result = f"Final: {data}"
        await ctx.yield_output(result)

# Configure checkpoint storage
checkpoint_storage = FileCheckpointStorage(storage_path="./checkpoints")
processing_executor = ProcessingExecutor(id="processing")
finalize_executor = FinalizeExecutor(id="finalize")

# Build workflow with checkpointing enabled
workflow = (WorkflowBuilder()
           .add_edge(processing_executor, finalize_executor)
           .set_start_executor(processing_executor)
           .with_checkpointing(checkpoint_storage=checkpoint_storage)  # Enable checkpointing
           .build())

# Example usage (would be in async context)
async def checkpoint_example():
    # Run workflow - checkpoints are created automatically
    async for event in workflow.run_stream("input data"):
        print(f"Event: {event}")

Fortsetzen ab Prüfpunkten

Agent Framework stellt APIs bereit, um bestimmte Prüfpunkte auflisten, prüfen und fortsetzen zu können:

from typing_extensions import Never

from agent_framework import (
    Executor,
    FileCheckpointStorage,
    WorkflowContext,
    WorkflowBuilder,
    get_checkpoint_summary,
    handler,
)

class UpperCaseExecutor(Executor):
    @handler
    async def process(self, text: str, ctx: WorkflowContext[str]) -> None:
        result = text.upper()
        await ctx.send_message(result)

class ReverseExecutor(Executor):
    @handler
    async def process(self, text: str, ctx: WorkflowContext[Never, str]) -> None:
        result = text[::-1]
        await ctx.yield_output(result)

def create_workflow(checkpoint_storage: FileCheckpointStorage):
    """Create a workflow with two executors and checkpointing."""
    upper_executor = UpperCaseExecutor(id="upper")
    reverse_executor = ReverseExecutor(id="reverse")

    return (WorkflowBuilder()
           .add_edge(upper_executor, reverse_executor)
           .set_start_executor(upper_executor)
           .with_checkpointing(checkpoint_storage=checkpoint_storage)
           .build())

# Assume we have checkpoint_storage from previous examples
checkpoint_storage = FileCheckpointStorage(storage_path="./checkpoints")

async def checkpoint_resume_example():
    # List available checkpoints
    checkpoints = await checkpoint_storage.list_checkpoints()

    # Display checkpoint information
    for checkpoint in checkpoints:
        summary = get_checkpoint_summary(checkpoint)
        print(f"Checkpoint {summary.checkpoint_id}: iteration={summary.iteration_count}")

    # Resume from a specific checkpoint
    if checkpoints:
        chosen_checkpoint_id = checkpoints[0].checkpoint_id

        # Create new workflow instance and resume
        new_workflow = create_workflow(checkpoint_storage)
        async for event in new_workflow.run_stream(
            checkpoint_id=chosen_checkpoint_id,
            checkpoint_storage=checkpoint_storage
        ):
            print(f"Resumed event: {event}")

Erweiterte Prüfpunktfunktionen

Prüfpunkt mit Human-in-the-Loop-Integration:

Die Prüfpunkterstellung funktioniert nahtlos mit menschlichen In-the-Loop-Workflows, sodass Workflows für menschliche Eingaben angehalten und später fortgesetzt werden können. Beim Fortsetzen eines Prüfpunkts, der ausstehende Anforderungen enthält, werden diese Anforderungen als Ereignisse erneut ausgegeben:

# Assume we have workflow, checkpoint_id, and checkpoint_storage from previous examples
async def resume_with_pending_requests_example():
    # Resume from checkpoint - pending requests will be re-emitted
    request_info_events = []
    async for event in workflow.run_stream(
        checkpoint_id=checkpoint_id,
        checkpoint_storage=checkpoint_storage
    ):
        if isinstance(event, RequestInfoEvent):
            request_info_events.append(event)

    # Handle re-emitted pending request
    responses = {}
    for event in request_info_events:
        response = handle_request(event.data)
        responses[event.request_id] = response

    # Send response back to workflow
    async for event in workflow.send_responses_streaming(responses):
        print(f"Event: {event}")

Wichtige Vorteile

Im Vergleich zu AutoGen bietet die Prüfpunkterstellung des Agent-Frameworks Folgendes:

  • Automatische Persistenz: Keine manuelle Zustandsverwaltung erforderlich
  • Granulare Wiederherstellung: Fortsetzen von jeder Oberschrittgrenze
  • Statusisolation: Separater ausführungslokaler und gemeinsam genutzter Zustand
  • Human-in-the-Loop-Integration: Nahtloses Anhalten mit menschlichem Input
  • Fehlertoleranz: Robuste Wiederherstellung von Fehlern oder Unterbrechungen

Praktische Beispiele

Umfassende Prüfpunktbeispiele finden Sie unter:


Observierbarkeit

Sowohl AutoGen als auch Agent Framework bieten Observability-Funktionen, aber mit unterschiedlichen Ansätzen und Features.

AutoGen Observability

AutoGen verfügt über systemeigene Unterstützung für OpenTelemetry mit Instrumentierung für:

  • Laufzeitablaufverfolgung: SingleThreadedAgentRuntime und GrpcWorkerAgentRuntime
  • Toolausführung: BaseTool umfasst execute_tool folgende GenAI-Semantikkonventionen
  • Agent-Vorgänge: BaseChatAgent mit create_agent und invoke_agent überspannen
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from autogen_core import SingleThreadedAgentRuntime

# Configure OpenTelemetry
tracer_provider = TracerProvider()
trace.set_tracer_provider(tracer_provider)

# Pass to runtime
runtime = SingleThreadedAgentRuntime(tracer_provider=tracer_provider)

Agent Framework Observability

Agent Framework bietet umfassende Observability durch mehrere Ansätze:

  • Zero-Code-Setup: Automatische Instrumentierung über Umgebungsvariablen
  • Manuelle Konfiguration: Programmgesteuertes Setup mit benutzerdefinierten Parametern
  • Umfassende Telemetrie: Agents, Workflows und Toolausführungsverfolgung
  • Konsolenausgabe: Integrierte Konsolenprotokollierung und -visualisierung
from agent_framework import ChatAgent
from agent_framework.observability import setup_observability
from agent_framework.openai import OpenAIChatClient

# Zero-code setup via environment variables
# Set ENABLE_OTEL=true
# Set OTLP_ENDPOINT=http://localhost:4317

# Or manual setup
setup_observability(
    otlp_endpoint="http://localhost:4317"
)

# Create client for the example
client = OpenAIChatClient(model_id="gpt-5")

async def observability_example():
    # Observability is automatically applied to all agents and workflows
    agent = ChatAgent(name="assistant", chat_client=client)
    result = await agent.run("Hello")  # Automatically traced

Wichtige Unterschiede:

  • Setupkomplexität: Das Agent-Framework bietet einfachere Zero-Code-Setupoptionen.
  • Bereich: Das Agent-Framework bietet eine umfassendere Abdeckung, einschließlich der Beobachtbarkeit auf Workflowebene
  • Visualisierung: Agent Framework umfasst integrierte Konsolenausgabe und Entwicklungs-UI
  • Konfiguration: Agent Framework bietet flexiblere Konfigurationsoptionen

Ausführliche Beispiele zur Observierbarkeit finden Sie unter:


Conclusion

Dieser Migrationsleitfaden bietet eine umfassende Zuordnung zwischen AutoGen und Microsoft Agent Framework, die alles von der grundlegenden Agenterstellung bis hin zu komplexen Multi-Agent-Workflows abdeckt. Wichtige Voraussetzungen für die Migration:

  • Die Migration mit einem einzelnen Agent ist einfach, mit ähnlichen APIs und erweiterten Funktionen im Agent Framework
  • Multi-Agent-Muster müssen Ihren Ansatz von ereignisgesteuerten zu datenflussbasierten Architekturen überdenken, aber wenn Sie bereits mit GraphFlow vertraut sind, ist der Übergang einfacher.
  • Agent Framework bietet zusätzliche Features wie Middleware, gehostete Tools und eingegebene Workflows.

Weitere Beispiele und detaillierte Anleitungen zur Implementierung finden Sie im Verzeichnis "Agent Framework-Beispiele ".

Zusätzliche Beispielkategorien

Das Agent Framework bietet Beispiele in mehreren anderen wichtigen Bereichen:

Nächste Schritte