Delen via


Migratiehandleiding voor AutoGen naar Microsoft Agent Framework

Een uitgebreide handleiding voor het migreren van AutoGen naar de Python SDK van Microsoft Agent Framework.

Inhoudsopgave

Achtergrond

AutoGen is een framework voor het bouwen van AI-agents en systemen met meerdere agents met behulp van LLM's (Large Language Models). Het begon als een onderzoeksproject bij Microsoft Research en pioniers in verschillende concepten in de indeling van meerdere agents, zoals GroupChat en gebeurtenisgestuurde agentruntime. Het project is een vruchtbare samenwerking van de opensource-community en veel belangrijke functies zijn afkomstig van externe inzenders.

Microsoft Agent Framework is een nieuwe sdk voor meerdere talen voor het bouwen van AI-agents en -werkstromen met behulp van LLM's. Het vertegenwoordigt een aanzienlijke evolutie van de ideeën die in AutoGen zijn ontwikkeld en bevat lessen die zijn geleerd van het echte gebruik. Het is ontwikkeld door de kernteams autogen en Semantische kernel bij Microsoft en is ontworpen als een nieuwe basis voor het bouwen van AI-toepassingen in de toekomst.

In deze handleiding wordt een praktisch migratiepad beschreven: het begint met het beschrijven van wat hetzelfde blijft en welke wijzigingen in één oogopslag veranderen. Vervolgens worden modelclientinstellingen, functies met één agent en ten slotte indeling met meerdere agents behandeld met concrete code naast elkaar. Onderweg helpen koppelingen naar uitvoerbare voorbeelden in de Agent Framework-opslagplaats u bij het valideren van elke stap.

Belangrijkste overeenkomsten en verschillen

Wat hetzelfde blijft

De fundamenten zijn bekend. U maakt nog steeds agents rond een modelclient, geeft instructies en koppelt hulpprogramma's. Beide bibliotheken ondersteunen hulpprogramma's in functiestijl, tokenstreaming, multimodale inhoud en asynchrone I/O.

# Both frameworks follow similar patterns
# AutoGen
agent = AssistantAgent(name="assistant", model_client=client, tools=[my_tool])
result = await agent.run(task="Help me with this task")

# Agent Framework
agent = ChatAgent(name="assistant", chat_client=client, tools=[my_tool])
result = await agent.run("Help me with this task")

Belangrijkste verschillen

  1. Indelingsstijl: AutoGen paren een gebeurtenisgestuurde kern met een hoog niveau Team. Agent Framework is gebaseerd op een getypte grafiek Workflow die gegevens langs randen routeert en uitvoerders activeert wanneer invoer gereed is.

  2. Hulpprogramma's: AutoGen verpakt functies met FunctionTool. Agent Framework gebruikt @ai_function, leiden schema's automatisch af en voegt gehoste hulpprogramma's toe, zoals een code-interpreter en webzoekopdrachten.

  3. Agentgedrag: AssistantAgent is één beurt, tenzij u verhoogt max_tool_iterations. ChatAgent is standaard multi-turn en blijft aanroepen van hulpprogramma's totdat het een definitief antwoord kan retourneren.

  4. Runtime: AutoGen biedt ingesloten en experimentele gedistribueerde runtimes. Agent Framework richt zich vandaag op de samenstelling van één proces; Gedistribueerde uitvoering is gepland.

Modelclient maken en configureren

Beide frameworks bieden modelclients voor belangrijke AI-providers, met vergelijkbare maar niet identieke API's.

Eigenschap AutoGen Agentenraamwerk
OpenAI-client OpenAIChatCompletionClient OpenAIChatClient
OpenAI-antwoordenclient ❌ Niet beschikbaar OpenAIResponsesClient
Azure OpenAI AzureOpenAIChatCompletionClient AzureOpenAIChatClient
Azure OpenAI-antwoorden ❌ Niet beschikbaar AzureOpenAIResponsesClient
Azure AI AzureAIChatCompletionClient AzureAIAgentClient
Antropisch AnthropicChatCompletionClient 🚧 Planmatig
Ollama OllamaChatCompletionClient 🚧 Planmatig
Cachebeheer ChatCompletionCache banderol 🚧 Planmatig

AutoGen-modelclients

from autogen_ext.models.openai import OpenAIChatCompletionClient, AzureOpenAIChatCompletionClient

# OpenAI
client = OpenAIChatCompletionClient(
    model="gpt-5",
    api_key="your-key"
)

# Azure OpenAI
client = AzureOpenAIChatCompletionClient(
    azure_endpoint="https://your-endpoint.openai.azure.com/",
    azure_deployment="gpt-5",
    api_version="2024-12-01",
    api_key="your-key"
)

Agent Framework ChatClients

from agent_framework.openai import OpenAIChatClient
from agent_framework.azure import AzureOpenAIChatClient

# OpenAI (reads API key from environment)
client = OpenAIChatClient(model_id="gpt-5")

# Azure OpenAI (uses environment or default credentials; see samples for auth options)
client = AzureOpenAIChatClient(model_id="gpt-5")

Zie voor gedetailleerde voorbeelden:

Antwoord-API-ondersteuning (exclusief Agent Framework)

Agent Framework biedt AzureOpenAIResponsesClientOpenAIResponsesClient gespecialiseerde ondersteuning voor redeneringsmodellen en gestructureerde antwoorden die niet beschikbaar zijn in AutoGen:

from agent_framework.azure import AzureOpenAIResponsesClient
from agent_framework.openai import OpenAIResponsesClient

# Azure OpenAI with Responses API
azure_responses_client = AzureOpenAIResponsesClient(model_id="gpt-5")

# OpenAI with Responses API
openai_responses_client = OpenAIResponsesClient(model_id="gpt-5")

Zie voor voorbeelden van antwoorden-API:

functietoewijzing Single-Agent

In deze sectie worden functies van één agent toegewezen tussen AutoGen en Agent Framework. Maak met een client een agent, koppel hulpprogramma's en kies tussen niet-streaming en streaming-uitvoering.

Basisagent maken en uitvoeren

Zodra u een modelclient hebt geconfigureerd, wordt in de volgende stap agents gemaakt. Beide frameworks bieden vergelijkbare agentabstracties, maar met verschillende standaardgedrag en configuratieopties.

AutoGen AssistantAgent

from autogen_agentchat.agents import AssistantAgent

agent = AssistantAgent(
    name="assistant",
    model_client=client,
    system_message="You are a helpful assistant.",
    tools=[my_tool],
    max_tool_iterations=1  # Single-turn by default
)

# Execution
result = await agent.run(task="What's the weather?")

Agent Framework ChatAgent

from agent_framework import ChatAgent, ai_function
from agent_framework.openai import OpenAIChatClient

# Create simple tools for the example
@ai_function
def get_weather(location: str) -> str:
    """Get weather for a location."""
    return f"Weather in {location}: sunny"

@ai_function
def get_time() -> str:
    """Get current time."""
    return "Current time: 2:30 PM"

# Create client
client = OpenAIChatClient(model_id="gpt-5")

async def example():
    # Direct creation
    agent = ChatAgent(
        name="assistant",
        chat_client=client,
        instructions="You are a helpful assistant.",
        tools=[get_weather]  # Multi-turn by default
    )

    # Factory method (more convenient)
    agent = client.create_agent(
        name="assistant",
        instructions="You are a helpful assistant.",
        tools=[get_weather]
    )

    # Execution with runtime tool configuration
    result = await agent.run(
        "What's the weather?",
        tools=[get_time],  # Can add tools at runtime
        tool_choice="auto"
    )

Belangrijkste verschillen:

  • Standaardgedrag: ChatAgent automatisch doorlopen van hulpprogrammaaanroepen, terwijl AssistantAgent expliciete max_tool_iterations instelling is vereist
  • Runtimeconfiguratie: ChatAgent.run() accepteert tools en tool_choice parameters voor aanpassing per aanroep
  • Factory-methoden: Agent Framework biedt handige factorymethoden rechtstreeks vanuit chatclients
  • Statusbeheer: ChatAgent staatloos is en onderhoudt geen gespreksgeschiedenis tussen aanroepen, in tegenstelling tot AssistantAgent wat de gespreksgeschiedenis onderhoudt als onderdeel van de status

Gespreksstatus beheren met AgentThread

Als u gesprekken wilt voortzetten, ChatAgentgebruikt AgentThread u om de gespreksgeschiedenis te beheren:

# Assume we have an agent from previous examples
async def conversation_example():
    # Create a new thread that will be reused
    thread = agent.get_new_thread()

    # First interaction - thread is empty
    result1 = await agent.run("What's 2+2?", thread=thread)
    print(result1.text)  # "4"

    # Continue conversation - thread contains previous messages
    result2 = await agent.run("What about that number times 10?", thread=thread)
    print(result2.text)  # "40" (understands "that number" refers to 4)

    # AgentThread can use external storage, similar to ChatCompletionContext in AutoGen

Staatloos standaard: snelle demo

# Without a thread (two independent invocations)
r1 = await agent.run("What's 2+2?")
print(r1.text)  # for example, "4"

r2 = await agent.run("What about that number times 10?")
print(r2.text)  # Likely ambiguous without prior context; cannot be "40"

# With a thread (shared context across calls)
thread = agent.get_new_thread()
print((await agent.run("What's 2+2?", thread=thread)).text)  # "4"
print((await agent.run("What about that number times 10?", thread=thread)).text)  # "40"

Zie voor voorbeelden van threadbeheer:

OpenAI Assistant Agent Equivalentie

Beide frameworks bieden api-integratie van OpenAI Assistant:

# AutoGen OpenAIAssistantAgent
from autogen_ext.agents.openai import OpenAIAssistantAgent
# Agent Framework has OpenAI Assistants support via OpenAIAssistantsClient
from agent_framework.openai import OpenAIAssistantsClient

Zie voor voorbeelden van OpenAI Assistant:

Streaming-ondersteuning

Beide frameworks streamen tokens in realtime, van clients en van agents, om UIs responsief te houden.

AutoGen Streaming

# Model client streaming
async for chunk in client.create_stream(messages):
    if isinstance(chunk, str):
        print(chunk, end="")

# Agent streaming
async for event in agent.run_stream(task="Hello"):
    if isinstance(event, ModelClientStreamingChunkEvent):
        print(event.content, end="")
    elif isinstance(event, TaskResult):
        print("Final result received")

Agent Framework Streaming

# Assume we have client, agent, and tools from previous examples
async def streaming_example():
    # Chat client streaming
    async for chunk in client.get_streaming_response("Hello", tools=tools):
        if chunk.text:
            print(chunk.text, end="")

    # Agent streaming
    async for chunk in agent.run_stream("Hello"):
        if chunk.text:
            print(chunk.text, end="", flush=True)

Tip: In Agent Framework leveren zowel clients als agents dezelfde updatevorm op; u kunt in beide gevallen lezen chunk.text .

Berichttypen en maken

Begrijpen hoe berichten werken, is cruciaal voor effectieve communicatie tussen agents. Beide frameworks bieden verschillende benaderingen voor het maken en verwerken van berichten, met AutoGen met behulp van afzonderlijke berichtklassen en Agent Framework met behulp van een geïntegreerd berichtsysteem.

AutoGen-berichttypen

from autogen_agentchat.messages import TextMessage, MultiModalMessage
from autogen_core.models import UserMessage

# Text message
text_msg = TextMessage(content="Hello", source="user")

# Multi-modal message
multi_modal_msg = MultiModalMessage(
    content=["Describe this image", image_data],
    source="user"
)

# Convert to model format for use with model clients
user_message = text_msg.to_model_message()

Berichttypen van agentframework

from agent_framework import ChatMessage, TextContent, DataContent, UriContent, Role
import base64

# Text message
text_msg = ChatMessage(role=Role.USER, text="Hello")

# Supply real image bytes, or use a data: URI/URL via UriContent
image_bytes = b"<your_image_bytes>"
image_b64 = base64.b64encode(image_bytes).decode()
image_uri = f"data:image/jpeg;base64,{image_b64}"

# Multi-modal message with mixed content
multi_modal_msg = ChatMessage(
    role=Role.USER,
    contents=[
        TextContent(text="Describe this image"),
        DataContent(uri=image_uri, media_type="image/jpeg")
    ]
)

Belangrijkste verschillen:

  • AutoGen maakt gebruik van afzonderlijke berichtklassen (TextMessage, MultiModalMessage) met een source veld
  • Agent Framework maakt gebruik van een uniform ChatMessage met getypte inhoudsobjecten en een role veld
  • Agent Framework-berichten gebruiken Role enum (USER, ASSISTANT, SYSTEM, TOOL) in plaats van tekenreeksbronnen

Hulpprogramma's maken en integreren

Hulpprogramma's breiden agentmogelijkheden uit buiten het genereren van tekst. De frameworks hebben verschillende benaderingen voor het maken van hulpprogramma's, waarbij Agent Framework meer geautomatiseerde schemageneratie biedt.

AutoGen FunctionTool

from autogen_core.tools import FunctionTool

async def get_weather(location: str) -> str:
    """Get weather for a location."""
    return f"Weather in {location}: sunny"

# Manual tool creation
tool = FunctionTool(
    func=get_weather,
    description="Get weather information"
)

# Use with agent
agent = AssistantAgent(name="assistant", model_client=client, tools=[tool])

Agent Framework @ai_function

from agent_framework import ai_function
from typing import Annotated
from pydantic import Field

@ai_function
def get_weather(
    location: Annotated[str, Field(description="The location to get weather for")]
) -> str:
    """Get weather for a location."""
    return f"Weather in {location}: sunny"

# Direct use with agent (automatic conversion)
agent = ChatAgent(name="assistant", chat_client=client, tools=[get_weather])

Zie voor gedetailleerde voorbeelden:

Gehoste hulpprogramma's (exclusief Agent Framework)

Agent Framework biedt gehoste hulpprogramma's die niet beschikbaar zijn in AutoGen:

from agent_framework import ChatAgent, HostedCodeInterpreterTool, HostedWebSearchTool
from agent_framework.azure import AzureOpenAIChatClient

# Azure OpenAI client with a model that supports hosted tools
client = AzureOpenAIChatClient(model_id="gpt-5")

# Code execution tool
code_tool = HostedCodeInterpreterTool()

# Web search tool
search_tool = HostedWebSearchTool()

agent = ChatAgent(
    name="researcher",
    chat_client=client,
    tools=[code_tool, search_tool]
)

Zie voor gedetailleerde voorbeelden:

Vereisten en opmerkingen:

  • Gehoste hulpprogramma's zijn alleen beschikbaar voor modellen/accounts die deze ondersteunen. Controleer de rechten en modelondersteuning voor uw provider voordat u deze hulpprogramma's inschakelt.
  • Configuratie verschilt per provider; volg de vereisten in elk voorbeeld voor installatie en machtigingen.
  • Niet elk model ondersteunt elk gehost hulpprogramma (bijvoorbeeld webzoekopdrachten versus code-interpreter). Kies een compatibel model in uw omgeving.

Opmerking

AutoGen ondersteunt hulpprogramma's voor het uitvoeren van lokale code, maar deze functie is gepland voor toekomstige Agent Framework-versies.

Belangrijk verschil: Agent Framework verwerkt hulpprogramma-iteratie automatisch op agentniveau. In tegenstelling tot de parameter van max_tool_iterations AutoGen, worden Agent Framework-agents de uitvoering van het hulpprogramma voortgezet totdat deze standaard is voltooid, met ingebouwde veiligheidsmechanismen om oneindige lussen te voorkomen.

MCP-serverondersteuning

Voor geavanceerde hulpprogramma-integratie ondersteunen beide frameworks Model Context Protocol (MCP), waardoor agents kunnen communiceren met externe services en gegevensbronnen. Agent Framework biedt uitgebreidere ingebouwde ondersteuning.

Ondersteuning voor AutoGen MCP

AutoGen biedt eenvoudige MCP-ondersteuning via extensies (specifieke implementatiedetails variëren per versie).

MCP-ondersteuning voor Agent Framework

from agent_framework import ChatAgent, MCPStdioTool, MCPStreamableHTTPTool, MCPWebsocketTool
from agent_framework.openai import OpenAIChatClient

# Create client for the example
client = OpenAIChatClient(model_id="gpt-5")

# Stdio MCP server
mcp_tool = MCPStdioTool(
    name="filesystem",
    command="uvx mcp-server-filesystem",
    args=["/allowed/directory"]
)

# HTTP streaming MCP
http_mcp = MCPStreamableHTTPTool(
    name="http_mcp",
    url="http://localhost:8000/sse"
)

# WebSocket MCP
ws_mcp = MCPWebsocketTool(
    name="websocket_mcp",
    url="ws://localhost:8000/ws"
)

agent = ChatAgent(name="assistant", chat_client=client, tools=[mcp_tool])

Zie voor MCP-voorbeelden:

Patroon Agent-as-a-Tool

Een krachtig patroon is het gebruik van agents zelf als hulpprogramma's, waardoor hiërarchische agentarchitecturen mogelijk zijn. Beide frameworks ondersteunen dit patroon met verschillende implementaties.

AutoGen AgentTool

from autogen_agentchat.tools import AgentTool

# Create specialized agent
writer = AssistantAgent(
    name="writer",
    model_client=client,
    system_message="You are a creative writer."
)

# Wrap as tool
writer_tool = AgentTool(agent=writer)

# Use in coordinator (requires disabling parallel tool calls)
coordinator_client = OpenAIChatCompletionClient(
    model="gpt-5",
    parallel_tool_calls=False
)
coordinator = AssistantAgent(
    name="coordinator",
    model_client=coordinator_client,
    tools=[writer_tool]
)

Agent Framework as_tool()

from agent_framework import ChatAgent

# Assume we have client from previous examples
# Create specialized agent
writer = ChatAgent(
    name="writer",
    chat_client=client,
    instructions="You are a creative writer."
)

# Convert to tool
writer_tool = writer.as_tool(
    name="creative_writer",
    description="Generate creative content",
    arg_name="request",
    arg_description="What to write"
)

# Use in coordinator
coordinator = ChatAgent(
    name="coordinator",
    chat_client=client,
    tools=[writer_tool]
)

Expliciete migratieoptekening: Stel parallel_tool_calls=False in AutoGen de modelclient van de coördinator in wanneer agents worden verpakt als hulpprogramma's om gelijktijdigheidsproblemen te voorkomen bij het aanroepen van hetzelfde agentexemplaren. In Agent Framework hoeft u geen parallelle hulpprogramma-aanroepen uit te schakelen, as_tool() omdat agents standaard staatloos zijn.

Middleware (agentframeworkfunctie)

Agent Framework introduceert middlewaremogelijkheden die AutoGen mist. Middleware maakt krachtige cross-cutting zorgen mogelijk, zoals logboekregistratie, beveiliging en prestatiebewaking.

from agent_framework import ChatAgent, AgentRunContext, FunctionInvocationContext
from typing import Callable, Awaitable

# Assume we have client from previous examples
async def logging_middleware(
    context: AgentRunContext,
    next: Callable[[AgentRunContext], Awaitable[None]]
) -> None:
    print(f"Agent {context.agent.name} starting")
    await next(context)
    print(f"Agent {context.agent.name} completed")

async def security_middleware(
    context: FunctionInvocationContext,
    next: Callable[[FunctionInvocationContext], Awaitable[None]]
) -> None:
    if "password" in str(context.arguments):
        print("Blocking function call with sensitive data")
        return  # Don't call next()
    await next(context)

agent = ChatAgent(
    name="secure_agent",
    chat_client=client,
    middleware=[logging_middleware, security_middleware]
)

Voordelen:

  • Beveiliging: invoervalidatie en inhoudsfiltering
  • Waarneembaarheid: logboekregistratie, metrische gegevens en tracering
  • Prestaties: Caching en snelheidsbeperking
  • Foutafhandeling: Probleemloze degradatie en logica voor opnieuw proberen

Zie voor gedetailleerde middlewarevoorbeelden:

Gepersonaliseerde agents

Soms wilt u helemaal geen agent met modelsteun: u wilt een deterministische agent of door API ondersteunde agent met aangepaste logica. Beide frameworks bieden ondersteuning voor het bouwen van aangepaste agents, maar de patronen verschillen.

AutoGen: Subklasse BaseChatAgent

from typing import Sequence
from autogen_agentchat.agents import BaseChatAgent
from autogen_agentchat.base import Response
from autogen_agentchat.messages import BaseChatMessage, TextMessage, StopMessage
from autogen_core import CancellationToken

class StaticAgent(BaseChatAgent):
    def __init__(self, name: str = "static", description: str = "Static responder") -> None:
        super().__init__(name, description)

    @property
    def produced_message_types(self) -> Sequence[type[BaseChatMessage]]:  # Which message types this agent produces
        return (TextMessage,)

    async def on_messages(self, messages: Sequence[BaseChatMessage], cancellation_token: CancellationToken) -> Response:
        # Always return a static response
        return Response(chat_message=TextMessage(content="Hello from AutoGen custom agent", source=self.name))

Notes:

  • Implementeer en retourneer on_messages(...) een Response bericht met een chatbericht.
  • U kunt eventueel implementeren on_reset(...) om de interne status tussen uitvoeringen te wissen.

Agent Framework: BaseAgent uitbreiden (threadbewust)

from collections.abc import AsyncIterable
from typing import Any
from agent_framework import (
    AgentRunResponse,
    AgentRunResponseUpdate,
    AgentThread,
    BaseAgent,
    ChatMessage,
    Role,
    TextContent,
)

class StaticAgent(BaseAgent):
    async def run(
        self,
        messages: str | ChatMessage | list[str] | list[ChatMessage] | None = None,
        *,
        thread: AgentThread | None = None,
        **kwargs: Any,
    ) -> AgentRunResponse:
        # Build a static reply
        reply = ChatMessage(role=Role.ASSISTANT, contents=[TextContent(text="Hello from AF custom agent")])

        # Persist conversation to the provided AgentThread (if any)
        if thread is not None:
            normalized = self._normalize_messages(messages)
            await self._notify_thread_of_new_messages(thread, normalized, reply)

        return AgentRunResponse(messages=[reply])

    async def run_stream(
        self,
        messages: str | ChatMessage | list[str] | list[ChatMessage] | None = None,
        *,
        thread: AgentThread | None = None,
        **kwargs: Any,
    ) -> AsyncIterable[AgentRunResponseUpdate]:
        # Stream the same static response in a single chunk for simplicity
        yield AgentRunResponseUpdate(contents=[TextContent(text="Hello from AF custom agent")], role=Role.ASSISTANT)

        # Notify thread of input and the complete response once streaming ends
        if thread is not None:
            reply = ChatMessage(role=Role.ASSISTANT, contents=[TextContent(text="Hello from AF custom agent")])
            normalized = self._normalize_messages(messages)
            await self._notify_thread_of_new_messages(thread, normalized, reply)

Notes:

  • AgentThreadonderhoudt de gespreksstatus extern; gebruiken agent.get_new_thread() en doorgeven aanrun/run_stream .
  • Oproep self._notify_thread_of_new_messages(thread, input_messages, response_messages) zodat de thread beide zijden van de uitwisseling heeft.
  • Bekijk het volledige voorbeeld: Custom Agent

Laten we nu eens kijken naar indeling met meerdere agents: het gebied waar de frameworks het meest verschillen.

Functietoewijzing voor meerdere agents

Overzicht van programmeermodel

De programmeermodellen met meerdere agents vertegenwoordigen het belangrijkste verschil tussen de twee frameworks.

Benadering van Dual Model van AutoGen

AutoGen biedt twee programmeermodellen:

  1. autogen-core: Programmeren op laag niveau, gebeurtenisgestuurd programmeren met RoutedAgent en berichtabonnementen
  2. Team abstractie: high-level, run-centric model gebouwd op boven op autogen-core
# Low-level autogen-core (complex)
class MyAgent(RoutedAgent):
    @message_handler
    async def handle_message(self, message: TextMessage, ctx: MessageContext) -> None:
        # Handle specific message types
        pass

# High-level Team (easier but limited)
team = RoundRobinGroupChat(
    participants=[agent1, agent2],
    termination_condition=StopAfterNMessages(5)
)
result = await team.run(task="Collaborate on this task")

Uitdagingen:

  • Model op laag niveau is te complex voor de meeste gebruikers
  • Model op hoog niveau kan worden beperkt voor complex gedrag
  • Overbrugging tussen de twee modellen voegt implementatiecomplexiteit toe

Unified Workflow Model van Agent Framework

Agent Framework biedt één Workflow abstractie die het beste van beide benaderingen combineert:

from agent_framework import WorkflowBuilder, executor, WorkflowContext
from typing_extensions import Never

# Assume we have agent1 and agent2 from previous examples
@executor(id="agent1")
async def agent1_executor(input_msg: str, ctx: WorkflowContext[str]) -> None:
    response = await agent1.run(input_msg)
    await ctx.send_message(response.text)

@executor(id="agent2")
async def agent2_executor(input_msg: str, ctx: WorkflowContext[Never, str]) -> None:
    response = await agent2.run(input_msg)
    await ctx.yield_output(response.text)  # Final output

# Build typed data flow graph
workflow = (WorkflowBuilder()
           .add_edge(agent1_executor, agent2_executor)
           .set_start_executor(agent1_executor)
           .build())

# Example usage (would be in async context)
# result = await workflow.run("Initial input")

Zie voor gedetailleerde werkstroomvoorbeelden:

Voordelen:

  • Uniform model: Enkele abstractie voor alle complexiteitsniveaus
  • Typeveiligheid: sterk getypte invoer en uitvoer
  • Grafiekvisualisatie: Weergave van gegevensstroom wissen
  • Flexibele samenstelling: Agents, functies en subwerkstromen combineren

Werkstroom versus GraphFlow

De abstractie van Workflow Het Agent Framework is geïnspireerd op de experimentele GraphFlow functie van AutoGen, maar vertegenwoordigt een aanzienlijke evolutie in ontwerp filosofie:

  • GraphFlow: Controlestroom op basis van waar randen overgangen zijn en berichten worden uitgezonden naar alle agents; overgangen zijn geconditioneerd op uitgezonden berichtinhoud
  • Werkstroom: gegevensstroom op basis van waar berichten worden gerouteerd via specifieke randen en uitvoerders worden geactiveerd door randen, met ondersteuning voor gelijktijdige uitvoering.

Overzicht van visuals

Het onderstaande diagram contrasteert de control-flow GraphFlow (links) van AutoGen met de werkstroomwerkstroom van Agent Framework (rechts). GraphFlow modelleert agents als knooppunten met voorwaardelijke overgangen en broadcasts. Werkstroommodellenuitvoeringen (agents, functies of subwerkstromen) die zijn verbonden door getypte randen; het biedt ook ondersteuning voor pauzes en controlepunten voor aanvragen/antwoorden.

flowchart LR

  subgraph AutoGenGraphFlow
    direction TB
    U[User / Task] --> A[Agent A]
    A -->|success| B[Agent B]
    A -->|retry| C[Agent C]
    A -. broadcast .- B
    A -. broadcast .- C
  end

  subgraph AgentFrameworkWorkflow
    direction TB
    I[Input] --> E1[Executor 1]
    E1 -->|"str"| E2[Executor 2]
    E1 -->|"image"| E3[Executor 3]
    E3 -->|"str"| E2
    E2 --> OUT[(Final Output)]
  end

  R[Request / Response Gate]
  E2 -. request .-> R
  R -. resume .-> E2

  CP[Checkpoint]
  E1 -. save .-> CP
  CP -. load .-> E1

In de praktijk:

  • GraphFlow gebruikt agents als knooppunten en uitzendt berichten; randen vertegenwoordigen voorwaardelijke overgangen.
  • Werkstroomroutes getypte berichten langs randen. Knooppunten (uitvoerders) kunnen agents, pure functies of subwerkstromen zijn.
  • Met aanvraag/antwoord kan een werkstroom worden onderbroken voor externe invoer; controlepunt houdt de voortgang vast en schakelt cv in.

Codevergelijking

1) Sequentiële + voorwaardelijk
# AutoGen GraphFlow (fluent builder) — writer → reviewer → editor (conditional)
from autogen_agentchat.agents import AssistantAgent
from autogen_agentchat.teams import DiGraphBuilder, GraphFlow

writer = AssistantAgent(name="writer", description="Writes a draft", model_client=client)
reviewer = AssistantAgent(name="reviewer", description="Reviews the draft", model_client=client)
editor = AssistantAgent(name="editor", description="Finalizes the draft", model_client=client)

graph = (
    DiGraphBuilder()
    .add_node(writer).add_node(reviewer).add_node(editor)
    .add_edge(writer, reviewer)  # always
    .add_edge(reviewer, editor, condition=lambda msg: "approve" in msg.to_model_text())
    .set_entry_point(writer)
).build()

team = GraphFlow(participants=[writer, reviewer, editor], graph=graph)
result = await team.run(task="Draft a short paragraph about solar power")
# Agent Framework Workflow — sequential executors with conditional logic
from agent_framework import WorkflowBuilder, executor, WorkflowContext
from typing_extensions import Never

@executor(id="writer")
async def writer_exec(task: str, ctx: WorkflowContext[str]) -> None:
    await ctx.send_message(f"Draft: {task}")

@executor(id="reviewer")
async def reviewer_exec(draft: str, ctx: WorkflowContext[str]) -> None:
    decision = "approve" if "solar" in draft.lower() else "revise"
    await ctx.send_message(f"{decision}:{draft}")

@executor(id="editor")
async def editor_exec(msg: str, ctx: WorkflowContext[Never, str]) -> None:
    if msg.startswith("approve:"):
        await ctx.yield_output(msg.split(":", 1)[1])
    else:
        await ctx.yield_output("Needs revision")

workflow_seq = (
    WorkflowBuilder()
    .add_edge(writer_exec, reviewer_exec)
    .add_edge(reviewer_exec, editor_exec)
    .set_start_executor(writer_exec)
    .build()
)
2) Fan-out + Join (ALL versus ANY)
# AutoGen GraphFlow — A → (B, C) → D with ALL/ANY join
from autogen_agentchat.teams import DiGraphBuilder, GraphFlow
A, B, C, D = agent_a, agent_b, agent_c, agent_d

# ALL (default): D runs after both B and C
g_all = (
    DiGraphBuilder()
    .add_node(A).add_node(B).add_node(C).add_node(D)
    .add_edge(A, B).add_edge(A, C)
    .add_edge(B, D).add_edge(C, D)
    .set_entry_point(A)
).build()

# ANY: D runs when either B or C completes
g_any = (
    DiGraphBuilder()
    .add_node(A).add_node(B).add_node(C).add_node(D)
    .add_edge(A, B).add_edge(A, C)
    .add_edge(B, D, activation_group="join_d", activation_condition="any")
    .add_edge(C, D, activation_group="join_d", activation_condition="any")
    .set_entry_point(A)
).build()
# Agent Framework Workflow — A → (B, C) → aggregator (ALL vs ANY)
from agent_framework import WorkflowBuilder, executor, WorkflowContext
from typing_extensions import Never

@executor(id="A")
async def start(task: str, ctx: WorkflowContext[str]) -> None:
    await ctx.send_message(f"B:{task}", target_id="B")
    await ctx.send_message(f"C:{task}", target_id="C")

@executor(id="B")
async def branch_b(text: str, ctx: WorkflowContext[str]) -> None:
    await ctx.send_message(f"B_done:{text}")

@executor(id="C")
async def branch_c(text: str, ctx: WorkflowContext[str]) -> None:
    await ctx.send_message(f"C_done:{text}")

@executor(id="join_any")
async def join_any(msg: str, ctx: WorkflowContext[Never, str]) -> None:
    await ctx.yield_output(f"First: {msg}")  # ANY join (first arrival)

@executor(id="join_all")
async def join_all(msg: str, ctx: WorkflowContext[str, str]) -> None:
    state = await ctx.get_executor_state() or {"items": []}
    state["items"].append(msg)
    await ctx.set_executor_state(state)
    if len(state["items"]) >= 2:
        await ctx.yield_output(" | ".join(state["items"]))  # ALL join

wf_any = (
    WorkflowBuilder()
    .add_edge(start, branch_b).add_edge(start, branch_c)
    .add_edge(branch_b, join_any).add_edge(branch_c, join_any)
    .set_start_executor(start)
    .build()
)

wf_all = (
    WorkflowBuilder()
    .add_edge(start, branch_b).add_edge(start, branch_c)
    .add_edge(branch_b, join_all).add_edge(branch_c, join_all)
    .set_start_executor(start)
    .build()
)
3) Gerichte routering (geen uitzending)
from agent_framework import WorkflowBuilder, executor, WorkflowContext
from typing_extensions import Never

@executor(id="ingest")
async def ingest(task: str, ctx: WorkflowContext[str]) -> None:
    # Route selectively using target_id
    if task.startswith("image:"):
        await ctx.send_message(task.removeprefix("image:"), target_id="vision")
    else:
        await ctx.send_message(task, target_id="writer")

@executor(id="writer")
async def write(text: str, ctx: WorkflowContext[Never, str]) -> None:
    await ctx.yield_output(f"Draft: {text}")

@executor(id="vision")
async def caption(image_ref: str, ctx: WorkflowContext[Never, str]) -> None:
    await ctx.yield_output(f"Caption: {image_ref}")

workflow = (
    WorkflowBuilder()
    .add_edge(ingest, write)
    .add_edge(ingest, caption)
    .set_start_executor(ingest)
    .build()
)

# Example usage (async):
# await workflow.run("Summarize the benefits of solar power")
# await workflow.run("image:https://example.com/panel.jpg")

Wat u moet zien:

  • GraphFlow verzendt berichten en maakt gebruik van voorwaardelijke overgangen. Het joingedrag wordt geconfigureerd via de doelzijde activation en per rand activation_group/activation_condition(groepeer bijvoorbeeld beide randen in join_d met).activation_condition="any"
  • Werkstroom routeert gegevens expliciet; gebruiken target_id om downstreamexecutors te selecteren. Deelnemen aan gedrag leeft in de ontvangende uitvoerder (bijvoorbeeld rendement op eerste invoer versus wachten op alles), of via orchestration builders/aggregators.
  • Uitvoerders in werkstroom zijn vrije vorm: een, een functie of een subwerkstroom verpakken ChatAgenten deze combineren in dezelfde grafiek.

Belangrijkste verschillen

De onderstaande tabel bevat een overzicht van de fundamentele verschillen tussen de GraphFlow van AutoGen en de werkstroom van Agent Framework:

Kenmerk AutoGen GraphFlow Werkstroom agentframework
Stroomtype Controlestroom (randen zijn overgangen) Gegevensstroom (berichten omleiden van randen)
Knooppunttypen Alleen agents Agents, functies, subwerkstromen
Activering Berichtuitzending Activering op basis van Edge
Typeveiligheid Beperkt Sterk typen overal
Composabiliteit Beperkt Zeer composable

Geneste patronen

AutoGen Team Nesting

# Inner team
inner_team = RoundRobinGroupChat(
    participants=[specialist1, specialist2],
    termination_condition=StopAfterNMessages(3)
)

# Outer team with nested team as participant
outer_team = RoundRobinGroupChat(
    participants=[coordinator, inner_team, reviewer],  # Team as participant
    termination_condition=StopAfterNMessages(10)
)

# Messages are broadcasted to all participants including nested team
result = await outer_team.run("Complex task requiring collaboration")

AutoGen geneste kenmerken:

  • Genest team ontvangt alle berichten van het buitenste team
  • Geneste teamberichten worden uitgezonden naar alle buitenste teamdeelnemers
  • Gedeelde berichtcontext op alle niveaus

Werkstroom nesten van Agent Framework

from agent_framework import WorkflowExecutor, WorkflowBuilder

# Assume we have executors from previous examples
# specialist1_executor, specialist2_executor, coordinator_executor, reviewer_executor

# Create sub-workflow
sub_workflow = (WorkflowBuilder()
               .add_edge(specialist1_executor, specialist2_executor)
               .set_start_executor(specialist1_executor)
               .build())

# Wrap as executor
sub_workflow_executor = WorkflowExecutor(
    workflow=sub_workflow,
    id="sub_process"
)

# Use in parent workflow
parent_workflow = (WorkflowBuilder()
                  .add_edge(coordinator_executor, sub_workflow_executor)
                  .add_edge(sub_workflow_executor, reviewer_executor)
                  .set_start_executor(coordinator_executor)
                  .build())

Geneste kenmerken van Agent Framework:

  • Geïsoleerde invoer/uitvoer via WorkflowExecutor
  • Geen berichtuitzending- gegevensstromen via specifieke verbindingen
  • Onafhankelijk statusbeheer voor elk werkstroomniveau

Groepschatpatronen

Met groepschatpatronen kunnen meerdere agents samenwerken aan complexe taken. Hier ziet u hoe veelvoorkomende patronen worden omgezet tussen frameworks.

RoundRobinGroupChat-patroon

AutoGen-implementatie:

from autogen_agentchat.teams import RoundRobinGroupChat
from autogen_agentchat.conditions import StopAfterNMessages

team = RoundRobinGroupChat(
    participants=[agent1, agent2, agent3],
    termination_condition=StopAfterNMessages(10)
)
result = await team.run("Discuss this topic")

Implementatie van agentframework:

from agent_framework import SequentialBuilder, WorkflowOutputEvent

# Assume we have agent1, agent2, agent3 from previous examples
# Sequential workflow through participants
workflow = SequentialBuilder().participants([agent1, agent2, agent3]).build()

# Example usage (would be in async context)
async def sequential_example():
    # Each agent appends to shared conversation
    async for event in workflow.run_stream("Discuss this topic"):
        if isinstance(event, WorkflowOutputEvent):
            conversation_history = event.data  # list[ChatMessage]

Zie voor gedetailleerde indelingsvoorbeelden:

Voor gelijktijdige uitvoeringspatronen biedt Agent Framework ook het volgende:

from agent_framework import ConcurrentBuilder, WorkflowOutputEvent

# Assume we have agent1, agent2, agent3 from previous examples
# Concurrent workflow for parallel processing
workflow = (ConcurrentBuilder()
           .participants([agent1, agent2, agent3])
           .build())

# Example usage (would be in async context)
async def concurrent_example():
    # All agents process the input concurrently
    async for event in workflow.run_stream("Process this in parallel"):
        if isinstance(event, WorkflowOutputEvent):
            results = event.data  # Combined results from all agents

Zie voor voorbeelden van gelijktijdige uitvoering:

MagenticOneGroupChat-patroon

AutoGen-implementatie:

from autogen_agentchat.teams import MagenticOneGroupChat

team = MagenticOneGroupChat(
    participants=[researcher, coder, executor],
    model_client=coordinator_client,
    termination_condition=StopAfterNMessages(20)
)
result = await team.run("Complex research and analysis task")

Implementatie van agentframework:

from typing import cast
from agent_framework import (
    MAGENTIC_EVENT_TYPE_AGENT_DELTA,
    MAGENTIC_EVENT_TYPE_ORCHESTRATOR,
    AgentRunUpdateEvent,
    ChatAgent,
    ChatMessage,
    MagenticBuilder,
    WorkflowOutputEvent,
)
from agent_framework.openai import OpenAIChatClient

# Create a manager agent for orchestration
manager_agent = ChatAgent(
    name="MagenticManager",
    description="Orchestrator that coordinates the workflow",
    instructions="You coordinate a team to complete complex tasks efficiently.",
    chat_client=OpenAIChatClient(),
)

workflow = (
    MagenticBuilder()
    .participants(researcher=researcher, coder=coder)
    .with_standard_manager(
        agent=manager_agent,
        max_round_count=20,
        max_stall_count=3,
        max_reset_count=2,
    )
    .build()
)

# Example usage (would be in async context)
async def magentic_example():
    output: str | None = None
    async for event in workflow.run_stream("Complex research task"):
        if isinstance(event, AgentRunUpdateEvent):
            props = event.data.additional_properties if event.data else None
            event_type = props.get("magentic_event_type") if props else None

            if event_type == MAGENTIC_EVENT_TYPE_ORCHESTRATOR:
                text = event.data.text if event.data else ""
                print(f"[ORCHESTRATOR]: {text}")
            elif event_type == MAGENTIC_EVENT_TYPE_AGENT_DELTA:
                agent_id = props.get("agent_id", event.executor_id) if props else event.executor_id
                if event.data and event.data.text:
                    print(f"[{agent_id}]: {event.data.text}", end="")

        elif isinstance(event, WorkflowOutputEvent):
            output_messages = cast(list[ChatMessage], event.data)
            if output_messages:
                output = output_messages[-1].text

Aanpassingsopties voor Agent Framework:

De Magentic-werkstroom biedt uitgebreide aanpassingsopties:

  • Managerconfiguratie: Een ChatAgent gebruiken met aangepaste instructies en modelinstellingen
  • Ronde limieten: max_round_count, max_stall_countmax_reset_count
  • Gebeurtenisstreaming: gebruiken AgentRunUpdateEvent met magentic_event_type metagegevens
  • Specialisatie van agents: aangepaste instructies en hulpprogramma's per agent
  • Human-in-the-loop: Plan review, tool approval, and stall intervention
# Advanced customization example with human-in-the-loop
from typing import cast
from agent_framework import (
    MAGENTIC_EVENT_TYPE_AGENT_DELTA,
    MAGENTIC_EVENT_TYPE_ORCHESTRATOR,
    AgentRunUpdateEvent,
    ChatAgent,
    MagenticBuilder,
    MagenticHumanInterventionDecision,
    MagenticHumanInterventionKind,
    MagenticHumanInterventionReply,
    MagenticHumanInterventionRequest,
    RequestInfoEvent,
    WorkflowOutputEvent,
)
from agent_framework.openai import OpenAIChatClient

# Create manager agent with custom configuration
manager_agent = ChatAgent(
    name="MagenticManager",
    description="Orchestrator for complex tasks",
    instructions="Custom orchestration instructions...",
    chat_client=OpenAIChatClient(model_id="gpt-4o"),
)

workflow = (
    MagenticBuilder()
    .participants(
        researcher=researcher_agent,
        coder=coder_agent,
        analyst=analyst_agent,
    )
    .with_standard_manager(
        agent=manager_agent,
        max_round_count=15,      # Limit total rounds
        max_stall_count=2,       # Trigger stall handling
        max_reset_count=1,       # Allow one reset on failure
    )
    .with_plan_review()           # Enable human plan review
    .with_human_input_on_stall()  # Enable human intervention on stalls
    .build()
)

# Handle human intervention requests during execution
async for event in workflow.run_stream("Complex task"):
    if isinstance(event, RequestInfoEvent) and event.request_type is MagenticHumanInterventionRequest:
        req = cast(MagenticHumanInterventionRequest, event.data)
        if req.kind == MagenticHumanInterventionKind.PLAN_REVIEW:
            # Review and approve the plan
            reply = MagenticHumanInterventionReply(
                decision=MagenticHumanInterventionDecision.APPROVE
            )
            async for ev in workflow.send_responses_streaming({event.request_id: reply}):
                pass  # Handle continuation

Zie voor gedetailleerde Magentic-voorbeelden:

Toekomstige patronen

De roadmap van agentframework bevat verschillende AutoGen-patronen die momenteel in ontwikkeling zijn:

  • Swarm-patroon: Coördinatie van handoff-gebaseerde agent
  • SelectorGroupChat: LLM-gestuurde sprekerselectie

Human-in-the-loop met aanvraagreactie

Een belangrijke nieuwe functie in Agent Framework Workflow is het concept van aanvraag en antwoord, waarmee werkstromen de uitvoering kunnen onderbreken en wachten op externe invoer voordat u doorgaat. Deze mogelijkheid is niet aanwezig in de abstractie van Team AutoGen en maakt geavanceerde human-in-the-loop patronen mogelijk.

Beperkingen voor AutoGen

De abstractie van Team AutoGen wordt continu uitgevoerd zodra deze is gestart en biedt geen ingebouwde mechanismen om de uitvoering voor menselijke invoer te onderbreken. Voor elke human-in-the-loop-functionaliteit zijn aangepaste implementaties buiten het framework vereist.

Agent Framework Request-Response-API

Agent Framework biedt ingebouwde mogelijkheden voor aanvraag-antwoord, waarbij elke uitvoerder aanvragen kan verzenden met behulp van ctx.request_info() en antwoorden kan verwerken met de @response_handler decorator.

from agent_framework import (
    RequestInfoEvent, WorkflowBuilder, WorkflowContext, 
    Executor, handler, response_handler
)
from dataclasses import dataclass

# Assume we have agent_executor defined elsewhere

# Define typed request payload
@dataclass
class ApprovalRequest:
    """Request human approval for agent output."""
    content: str = ""
    agent_name: str = ""

# Workflow executor that requests human approval
class ReviewerExecutor(Executor):
    
    @handler
    async def review_content(
        self,
        agent_response: str,
        ctx: WorkflowContext
    ) -> None:
        # Request human input with structured data
        approval_request = ApprovalRequest(
            content=agent_response,
            agent_name="writer_agent"
        )
        await ctx.request_info(request_data=approval_request, response_type=str)
    
    @response_handler
    async def handle_approval_response(
        self,
        original_request: ApprovalRequest,
        decision: str,
        ctx: WorkflowContext
    ) -> None:
        decision_lower = decision.strip().lower()
        original_content = original_request.content

        if decision_lower == "approved":
            await ctx.yield_output(f"APPROVED: {original_content}")
        else:
            await ctx.yield_output(f"REVISION NEEDED: {decision}")

# Build workflow with human-in-the-loop
reviewer = ReviewerExecutor(id="reviewer")

workflow = (WorkflowBuilder()
           .add_edge(agent_executor, reviewer)
           .set_start_executor(agent_executor)
           .build())

Werkstromen voor human-in-the-loop uitvoeren

Agent Framework biedt streaming-API's voor het afhandelen van de cyclus voor onderbreken en hervatten:

from agent_framework import RequestInfoEvent, WorkflowOutputEvent

# Assume we have workflow defined from previous examples
async def run_with_human_input():
    pending_responses = None
    completed = False

    while not completed:
        # First iteration uses run_stream, subsequent use send_responses_streaming
        stream = (
            workflow.send_responses_streaming(pending_responses)
            if pending_responses
            else workflow.run_stream("initial input")
        )

        events = [event async for event in stream]
        pending_responses = None

        # Collect human requests and outputs
        for event in events:
            if isinstance(event, RequestInfoEvent):
                # Display request to human and collect response
                request_data = event.data  # ApprovalRequest instance
                print(f"Review needed: {request_data.content}")

                human_response = input("Enter 'approved' or revision notes: ")
                pending_responses = {event.request_id: human_response}

            elif isinstance(event, WorkflowOutputEvent):
                print(f"Final result: {event.data}")
                completed = True

Zie voor voorbeelden van werkstromen voor mensen in de lus:

Controlepunten en werkstromen hervatten

Een ander belangrijk voordeel van agentframework Workflow ten opzichte van de abstractie van Team AutoGen is ingebouwde ondersteuning voor controlepunten en het hervatten van de uitvoering. Hierdoor kunnen werkstromen worden onderbroken, behouden en later worden hervat vanaf een controlepunt, waardoor fouttolerantie wordt geboden en langlopende of asynchrone werkstromen kunnen worden ingeschakeld.

Beperkingen voor AutoGen

De abstractie van Team AutoGen biedt geen ingebouwde controlepuntmogelijkheden. Eventuele persistentie- of herstelmechanismen moeten extern worden geïmplementeerd, waarbij vaak complexe statusbeheer- en serialisatielogica vereist is.

Controlepunten voor agentframework

Agent Framework biedt uitgebreide controlepunten via FileCheckpointStorage en de with_checkpointing() methode op WorkflowBuilder. Controlepunten vastleggen:

  • Uitvoerdersstatus: lokale status voor elke uitvoerder met behulp van ctx.set_executor_state()
  • Gedeelde status: de status van meerdere uitvoerders met behulp van ctx.set_shared_state()
  • Berichtenwachtrijen: Berichten in behandeling tussen uitvoerders
  • Werkstroompositie: huidige voortgang van de uitvoering en volgende stappen
from agent_framework import (
    FileCheckpointStorage, WorkflowBuilder, WorkflowContext,
    Executor, handler
)
from typing_extensions import Never

class ProcessingExecutor(Executor):
    @handler
    async def process(self, data: str, ctx: WorkflowContext[str]) -> None:
        # Process the data
        result = f"Processed: {data.upper()}"
        print(f"Processing: '{data}' -> '{result}'")

        # Persist executor-local state
        prev_state = await ctx.get_executor_state() or {}
        count = prev_state.get("count", 0) + 1
        await ctx.set_executor_state({
            "count": count,
            "last_input": data,
            "last_output": result
        })

        # Persist shared state for other executors
        await ctx.set_shared_state("original_input", data)
        await ctx.set_shared_state("processed_output", result)

        await ctx.send_message(result)

class FinalizeExecutor(Executor):
    @handler
    async def finalize(self, data: str, ctx: WorkflowContext[Never, str]) -> None:
        result = f"Final: {data}"
        await ctx.yield_output(result)

# Configure checkpoint storage
checkpoint_storage = FileCheckpointStorage(storage_path="./checkpoints")
processing_executor = ProcessingExecutor(id="processing")
finalize_executor = FinalizeExecutor(id="finalize")

# Build workflow with checkpointing enabled
workflow = (WorkflowBuilder()
           .add_edge(processing_executor, finalize_executor)
           .set_start_executor(processing_executor)
           .with_checkpointing(checkpoint_storage=checkpoint_storage)  # Enable checkpointing
           .build())

# Example usage (would be in async context)
async def checkpoint_example():
    # Run workflow - checkpoints are created automatically
    async for event in workflow.run_stream("input data"):
        print(f"Event: {event}")

Hervatten vanaf controlepunten

Agent Framework biedt API's voor het weergeven, inspecteren en hervatten van specifieke controlepunten:

from typing_extensions import Never

from agent_framework import (
    Executor,
    FileCheckpointStorage,
    WorkflowContext,
    WorkflowBuilder,
    get_checkpoint_summary,
    handler,
)

class UpperCaseExecutor(Executor):
    @handler
    async def process(self, text: str, ctx: WorkflowContext[str]) -> None:
        result = text.upper()
        await ctx.send_message(result)

class ReverseExecutor(Executor):
    @handler
    async def process(self, text: str, ctx: WorkflowContext[Never, str]) -> None:
        result = text[::-1]
        await ctx.yield_output(result)

def create_workflow(checkpoint_storage: FileCheckpointStorage):
    """Create a workflow with two executors and checkpointing."""
    upper_executor = UpperCaseExecutor(id="upper")
    reverse_executor = ReverseExecutor(id="reverse")

    return (WorkflowBuilder()
           .add_edge(upper_executor, reverse_executor)
           .set_start_executor(upper_executor)
           .with_checkpointing(checkpoint_storage=checkpoint_storage)
           .build())

# Assume we have checkpoint_storage from previous examples
checkpoint_storage = FileCheckpointStorage(storage_path="./checkpoints")

async def checkpoint_resume_example():
    # List available checkpoints
    checkpoints = await checkpoint_storage.list_checkpoints()

    # Display checkpoint information
    for checkpoint in checkpoints:
        summary = get_checkpoint_summary(checkpoint)
        print(f"Checkpoint {summary.checkpoint_id}: iteration={summary.iteration_count}")

    # Resume from a specific checkpoint
    if checkpoints:
        chosen_checkpoint_id = checkpoints[0].checkpoint_id

        # Create new workflow instance and resume
        new_workflow = create_workflow(checkpoint_storage)
        async for event in new_workflow.run_stream(
            checkpoint_id=chosen_checkpoint_id,
            checkpoint_storage=checkpoint_storage
        ):
            print(f"Resumed event: {event}")

Geavanceerde functies voor controlepunten

Controlepunt met Human-in-the-Loop-integratie:

Controlepunten werken naadloos samen met human-in-the-loop-werkstromen, zodat werkstromen kunnen worden onderbroken voor menselijke invoer en later kunnen worden hervat. Bij hervatting van een controlepunt dat in behandeling zijnde aanvragen bevat, worden deze aanvragen opnieuw verzonden als gebeurtenissen:

# Assume we have workflow, checkpoint_id, and checkpoint_storage from previous examples
async def resume_with_pending_requests_example():
    # Resume from checkpoint - pending requests will be re-emitted
    request_info_events = []
    async for event in workflow.run_stream(
        checkpoint_id=checkpoint_id,
        checkpoint_storage=checkpoint_storage
    ):
        if isinstance(event, RequestInfoEvent):
            request_info_events.append(event)

    # Handle re-emitted pending request
    responses = {}
    for event in request_info_events:
        response = handle_request(event.data)
        responses[event.request_id] = response

    # Send response back to workflow
    async for event in workflow.send_responses_streaming(responses):
        print(f"Event: {event}")

Belangrijkste voordelen

Vergeleken met AutoGen biedt het controlepunt van Agent Framework het volgende:

  • Automatische persistentie: geen handmatig statusbeheer vereist
  • Gedetailleerd herstel: Hervatten vanaf elke superstepgrens
  • Statusisolatie: afzonderlijke uitvoerder-lokale en gedeelde status
  • Integratie van human-in-the-loop: naadloze pauze-cv met menselijke invoer
  • Fouttolerantie: Robuust herstel na fouten of onderbrekingen

Praktische voorbeelden

Zie voor uitgebreide voorbeelden van controlepunten:


Observability

Zowel AutoGen als Agent Framework bieden waarneembaarheidsmogelijkheden, maar met verschillende benaderingen en functies.

AutoGen Waarneembaarheid

AutoGen biedt systeemeigen ondersteuning voor OpenTelemetry met instrumentatie voor:

  • Runtimetracering: SingleThreadedAgentRuntime en GrpcWorkerAgentRuntime
  • Uitvoering van hulpprogramma's: BaseTool met execute_tool de volgende genAI-semantische conventies
  • Agentbewerkingen: BaseChatAgent met create_agent en invoke_agent spans
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from autogen_core import SingleThreadedAgentRuntime

# Configure OpenTelemetry
tracer_provider = TracerProvider()
trace.set_tracer_provider(tracer_provider)

# Pass to runtime
runtime = SingleThreadedAgentRuntime(tracer_provider=tracer_provider)

Waarneembaarheid van agentframework

Agent Framework biedt uitgebreide waarneembaarheid via meerdere benaderingen:

  • Installatie van nulcode: Automatische instrumentatie via omgevingsvariabelen
  • Handmatige configuratie: Programmatische installatie met aangepaste parameters
  • Uitgebreide telemetrie: agents, werkstromen en tracering van hulpprogramma's
  • Console-uitvoer: ingebouwde consolelogboekregistratie en visualisatie
from agent_framework import ChatAgent
from agent_framework.observability import setup_observability
from agent_framework.openai import OpenAIChatClient

# Zero-code setup via environment variables
# Set ENABLE_OTEL=true
# Set OTLP_ENDPOINT=http://localhost:4317

# Or manual setup
setup_observability(
    otlp_endpoint="http://localhost:4317"
)

# Create client for the example
client = OpenAIChatClient(model_id="gpt-5")

async def observability_example():
    # Observability is automatically applied to all agents and workflows
    agent = ChatAgent(name="assistant", chat_client=client)
    result = await agent.run("Hello")  # Automatically traced

Belangrijkste verschillen:

  • Complexiteit van de installatie: Agent Framework biedt eenvoudigere installatieopties voor nulcode
  • Bereik: Agent Framework biedt een bredere dekking, waaronder waarneembaarheid op werkstroomniveau
  • Visualisatie: Agent Framework bevat ingebouwde console-uitvoer en gebruikersinterface voor ontwikkeling
  • Configuratie: Agent Framework biedt flexibelere configuratieopties

Zie voor gedetailleerde voorbeelden van waarneembaarheid:


Conclusion

Deze migratiehandleiding biedt een uitgebreide toewijzing tussen AutoGen en Microsoft Agent Framework, waarin alles wordt besproken, van het maken van basisagents tot complexe werkstromen voor meerdere agents. Belangrijke punten voor migratie:

  • Migratie van één agent is eenvoudig, met vergelijkbare API's en verbeterde mogelijkheden in Agent Framework
  • Voor patronen met meerdere agents moet u uw benadering herzien van op gebeurtenissen gebaseerde architecturen naar op gegevensstromen gebaseerde architecturen, maar als u al bekend bent met GraphFlow, is de overgang eenvoudiger
  • Agent Framework biedt aanvullende functies zoals middleware, gehoste hulpprogramma's en getypte werkstromen

Raadpleeg de map met agentframeworkvoorbeelden voor aanvullende voorbeelden en gedetailleerde implementatierichtlijnen.

Aanvullende voorbeeldcategorieën

Het Agent Framework biedt voorbeelden op verschillende andere belangrijke gebieden:

Volgende stappen