Sdílet prostřednictvím


Pracovní postupy rozhraní Microsoft Agent Framework – Použití pracovních postupů jako agentů

Tento dokument obsahuje přehled použití pracovních postupů jako agentů v rozhraní Microsoft Agent Framework.

Přehled

Někdy jste vytvořili sofistikovaný pracovní postup s několika agenty, vlastními exekutory a složitou logikou , ale chcete ho použít stejně jako jakýkoli jiný agent. Přesně to vám agenti pracovního postupu umožňují. Když pracovní postup zabalíte jako nějaký Agent, můžete s ním pracovat pomocí stejného známého rozhraní API, které byste použili pro jednoduchého chatovacího agenta.

Klíčové výhody

  • Sjednocené rozhraní: Interakce se složitými pracovními postupy pomocí stejného rozhraní API jako jednoduchých agentů
  • Kompatibilita rozhraní API: Integrace pracovních postupů se stávajícími systémy, které podporují rozhraní agenta
  • Kompozičnost: Použití pracovních postupových agentů jako základních stavebních prvků ve větších systémech agentů nebo v jiných pracovních postupech.
  • Správa vláken: Využití vláken agenta pro stav konverzace, vytváření kontrolních bodů a obnovení
  • Podpora streamování: Získání aktualizací v reálném čase při provádění pracovního postupu

Jak to funguje

Při převodu pracovního postupu na agenta:

  1. Pracovní postup se ověří, aby jeho spouštěcí proces mohl přijímat chatové zprávy.
  2. Vytvoří se vlákno pro správu stavu konverzace a kontrolních bodů.
  3. Vstupní zprávy se směrují do počátečního vykonavatele pracovního postupu.
  4. Události pracovního postupu se převedou na aktualizace odpovědí agenta.
  5. Žádosti o externí vstup (z RequestInfoExecutor) se zobrazují jako volání funkcí.

Požadavky

Pokud chcete jako agenta použít pracovní postup, musí být spouštěcí exekutor pracovního postupu schopný zpracovat IEnumerable<ChatMessage> jako vstup. To se automaticky splní při použití ChatClientAgent nebo jiných agentových vykonavatelů.

Vytvoření agenta pracovního postupu

AsAgent() Pomocí metody rozšíření můžete převést jakýkoli kompatibilní pracovní postup na agenta:

using Microsoft.Agents.AI.Workflows;
using Microsoft.Agents.AI;
using Microsoft.Extensions.AI;

// First, build your workflow
var workflow = AgentWorkflowBuilder
    .CreateSequentialPipeline(researchAgent, writerAgent, reviewerAgent)
    .Build();

// Convert the workflow to an agent
AIAgent workflowAgent = workflow.AsAgent(
    id: "content-pipeline",
    name: "Content Pipeline Agent",
    description: "A multi-agent workflow that researches, writes, and reviews content"
);

Parametry AsAgent

Parameter Typ Description
id string? Volitelný jedinečný identifikátor agenta. Automaticky vygenerováno, pokud není k dispozici.
name string? Volitelný zobrazovaný název agenta.
description string? Volitelný popis účelu agenta.
checkpointManager CheckpointManager? Volitelný správce kontrolních bodů pro zajištění trvalosti napříč relacemi.
executionEnvironment IWorkflowExecutionEnvironment? Volitelné spouštěcí prostředí. Výchozí hodnota je buď InProcessExecution.OffThread nebo InProcessExecution.Concurrent, v závislosti na konfiguraci pracovního postupu.

Použití agentů pracovního postupu

Vytvoření vlákna

Každá konverzace s agentem pracovního postupu vyžaduje vlákno pro správu stavu:

// Create a new thread for the conversation
AgentThread thread = await workflowAgent.GetNewThreadAsync();

Provádění bez streamování

V případě jednoduchých případů použití, kdy chcete úplnou odpověď:

var messages = new List<ChatMessage>
{
    new(ChatRole.User, "Write an article about renewable energy trends in 2025")
};

AgentResponse response = await workflowAgent.RunAsync(messages, thread);

foreach (ChatMessage message in response.Messages)
{
    Console.WriteLine($"{message.AuthorName}: {message.Text}");
}

Spouštění streamování

Aktualizace v reálném čase při provádění pracovního postupu:

var messages = new List<ChatMessage>
{
    new(ChatRole.User, "Write an article about renewable energy trends in 2025")
};

await foreach (AgentResponseUpdate update in workflowAgent.RunStreamingAsync(messages, thread))
{
    // Process streaming updates from each agent in the workflow
    if (!string.IsNullOrEmpty(update.Text))
    {
        Console.Write(update.Text);
    }
}

Zpracování externích vstupních požadavků

Pokud pracovní postup obsahuje exekutory, které vyžadují externí vstup (pomocí RequestInfoExecutor), zobrazí se tyto požadavky jako volání funkce v odpovědi agenta:

await foreach (AgentResponseUpdate update in workflowAgent.RunStreamingAsync(messages, thread))
{
    // Check for function call requests
    foreach (AIContent content in update.Contents)
    {
        if (content is FunctionCallContent functionCall)
        {
            // Handle the external input request
            Console.WriteLine($"Workflow requests input: {functionCall.Name}");
            Console.WriteLine($"Request data: {functionCall.Arguments}");

            // Provide the response in the next message
        }
    }
}

Serializace vláken a obnovení

Vlákna agenta pracovního postupu lze serializovat pro trvalé ukládání a mohou být později obnovena.

// Serialize the thread state
JsonElement serializedThread = thread.Serialize();

// Store serializedThread to your persistence layer...

// Later, resume the thread
AgentThread resumedThread = await workflowAgent.DeserializeThreadAsync(serializedThread);

// Continue the conversation
await foreach (var update in workflowAgent.RunStreamingAsync(newMessages, resumedThread))
{
    Console.Write(update.Text);
}

Vytváření kontrolních bodů pomocí agentů pracovního postupu

Povolte vytváření kontrolních bodů, aby se zachoval stav pracovního postupu napříč restartováními procesu:

// Create a checkpoint manager with your storage backend
var checkpointManager = new CheckpointManager(new FileCheckpointStorage("./checkpoints"));

// Create workflow agent with checkpointing enabled
AIAgent workflowAgent = workflow.AsAgent(
    id: "persistent-workflow",
    name: "Persistent Workflow Agent",
    checkpointManager: checkpointManager
);

Požadavky

Pokud chcete jako agenta použít pracovní postup, musí být spouštěcí exekutor pracovního postupu schopný zpracovat list[ChatMessage] jako vstup. Toto je automaticky splněno při použití ChatAgent nebo AgentExecutor.

Vytvoření agenta pracovního postupu

Zavolejte as_agent() na libovolný kompatibilní pracovní postup pro jeho převedení na agenta:

from agent_framework import WorkflowBuilder, ChatAgent
from agent_framework.azure import AzureOpenAIChatClient
from azure.identity import AzureCliCredential

# Create your chat client and agents
chat_client = AzureOpenAIChatClient(credential=AzureCliCredential())

researcher = ChatAgent(
    name="Researcher",
    instructions="Research and gather information on the given topic.",
    chat_client=chat_client,
)

writer = ChatAgent(
    name="Writer", 
    instructions="Write clear, engaging content based on research.",
    chat_client=chat_client,
)

# Build your workflow
workflow = (
    WorkflowBuilder()
    .set_start_executor(researcher)
    .add_edge(researcher, writer)
    .build()
)

# Convert the workflow to an agent
workflow_agent = workflow.as_agent(name="Content Pipeline Agent")

parametry as_agent

Parameter Typ Description
name str | None Volitelný zobrazovaný název agenta. Automaticky vygenerováno, pokud není k dispozici.

Použití agentů pracovního postupu

Vytvoření vlákna

Každá konverzace s agentem pracovního postupu vyžaduje vlákno pro správu stavu:

# Create a new thread for the conversation
thread = workflow_agent.get_new_thread()

Provádění bez streamování

V případě jednoduchých případů použití, kdy chcete úplnou odpověď:

from agent_framework import ChatMessage, Role

messages = [ChatMessage(role=Role.USER, content="Write an article about AI trends")]

response = await workflow_agent.run(messages, thread=thread)

for message in response.messages:
    print(f"{message.author_name}: {message.text}")

Spouštění streamování

Aktualizace v reálném čase při provádění pracovního postupu:

messages = [ChatMessage(role=Role.USER, content="Write an article about AI trends")]

async for update in workflow_agent.run_stream(messages, thread=thread):
    # Process streaming updates from each agent in the workflow
    if update.text:
        print(update.text, end="", flush=True)

Zpracování externích vstupních požadavků

Pokud pracovní postup obsahuje exekutory, které vyžadují externí vstup (pomocí RequestInfoExecutor), zobrazí se tyto požadavky jako volání funkce. Agent pracovního postupu sleduje čekající požadavky a před pokračováním očekává odpovědi:

from agent_framework import (
    FunctionCallContent,
    FunctionApprovalRequestContent,
    FunctionApprovalResponseContent,
)

async for update in workflow_agent.run_stream(messages, thread=thread):
    for content in update.contents:
        if isinstance(content, FunctionApprovalRequestContent):
            # The workflow is requesting external input
            request_id = content.id
            function_call = content.function_call

            print(f"Workflow requests input: {function_call.name}")
            print(f"Request data: {function_call.arguments}")

            # Store the request_id to provide a response later

# Check for pending requests
if workflow_agent.pending_requests:
    print(f"Pending requests: {list(workflow_agent.pending_requests.keys())}")

Poskytování odpovědí na čekající žádosti

Pokračování provádění pracovního postupu po požadavku na externí vstup:

# Create a response for the pending request
response_content = FunctionApprovalResponseContent(
    id=request_id,
    function_call=function_call,
    approved=True,
)

response_message = ChatMessage(
    role=Role.USER,
    contents=[response_content],
)

# Continue the workflow with the response
async for update in workflow_agent.run_stream([response_message], thread=thread):
    if update.text:
        print(update.text, end="", flush=True)

Kompletní příklad

Zde je úplný příklad agenta pracovního postupu s streamovaným výstupem:

import asyncio
from agent_framework import (
    ChatAgent,
    ChatMessage,
    Role,
)
from agent_framework.azure import AzureOpenAIChatClient
from agent_framework._workflows import SequentialBuilder
from azure.identity import AzureCliCredential


async def main():
    # Set up the chat client
    chat_client = AzureOpenAIChatClient(credential=AzureCliCredential())

    # Create specialized agents
    researcher = ChatAgent(
        name="Researcher",
        instructions="Research the given topic and provide key facts.",
        chat_client=chat_client,
    )

    writer = ChatAgent(
        name="Writer",
        instructions="Write engaging content based on the research provided.",
        chat_client=chat_client,
    )

    reviewer = ChatAgent(
        name="Reviewer",
        instructions="Review the content and provide a final polished version.",
        chat_client=chat_client,
    )

    # Build a sequential workflow
    workflow = (
        SequentialBuilder()
        .add_agents([researcher, writer, reviewer])
        .build()
    )

    # Convert to a workflow agent
    workflow_agent = workflow.as_agent(name="Content Creation Pipeline")

    # Create a thread and run the workflow
    thread = workflow_agent.get_new_thread()
    messages = [ChatMessage(role=Role.USER, content="Write about quantum computing")]

    print("Starting workflow...")
    print("=" * 60)

    current_author = None
    async for update in workflow_agent.run_stream(messages, thread=thread):
        # Show when different agents are responding
        if update.author_name and update.author_name != current_author:
            if current_author:
                print("\n" + "-" * 40)
            print(f"\n[{update.author_name}]:")
            current_author = update.author_name

        if update.text:
            print(update.text, end="", flush=True)

    print("\n" + "=" * 60)
    print("Workflow completed!")


if __name__ == "__main__":
    asyncio.run(main())

Principy převodu událostí

Když pracovní postup běží jako agent, události pracovního postupu se převedou na reakce agenta. Typ odpovědi závisí na tom, jakou metodu použijete:

  • run(): Vrátí AgentResponse po dokončení pracovního postupu celý výsledek.
  • run_stream(): Poskytuje objekty AgentResponseUpdate při provádění pracovního postupu a poskytuje aktualizace v reálném čase.

Během provádění se interní události pracovního postupu mapují na odpovědi agenta následujícím způsobem:

Událost pracovního postupu Odpověď agenta
AgentResponseUpdateEvent Předávané jako AgentResponseUpdate (streamování) nebo agregované do AgentResponse (bez streamování)
RequestInfoEvent Převedeno na FunctionCallContentFunctionApprovalRequestContent
Další události Zahrnuto v raw_representation pro pozorovatelnost

Tento převod umožňuje používat standardní rozhraní agenta a v případě potřeby mít přístup k podrobným informacím o pracovním postupu.

Případy použití

1. Komplexní kanály agentů

Zabalte pracovní postup s více agenty jako jednoho agenta pro použití v aplikacích:

User Request --> [Workflow Agent] --> Final Response
                      |
                      +-- Researcher Agent
                      +-- Writer Agent  
                      +-- Reviewer Agent

2. Složení agenta

Použití agentů pracovních postupů jako komponent ve větších systémech:

  • Agent pracovního postupu může použít jako nástroj jiného agenta.
  • Dohromady je možné orchestrovat více agentů pracovního postupu.
  • Agenty pracovního postupu je možné vnořit do jiných pracovních postupů.

3. Integrace rozhraní API

Zpřístupnění složitých pracovních postupů prostřednictvím API, které očekává standardní rozhraní Agenta, čímž umožňuje:

  • Rozhraní chatu, která používají sofistikované back-endové pracovní postupy
  • Integrace se stávajícími systémy založenými na agentech
  • Postupné migrace z jednoduchých agentů na složité pracovní postupy

Další kroky